This is xnu-11215.1.10. See this file in:
"""
A basic caching module for xnu debug macros to use.


When to use caching?
~~~~~~~~~~~~~~~~~~~~

Very often you do not need to: LLDB already provides extensive data caching.

The most common things that need caching are:
- types (the gettype() function provides this)
- globals (kern.globals / kern.GetGlobalVariable() provides this)


If your macro is slow to get some data, before slapping a caching decorator,
please profile your code using `xnudebug profile`. Very often slowness happens
due to the usage of CreateValueFromExpression() which spins a full compiler
to parse relatively trivial expressions and is easily 10-100x as slow as
alternatives like CreateValueFromAddress().

Only use caching once you have eliminated those obvious performance hogs
and an A/B shows meaningful speed improvements over the base line.


I really need caching how does it work?
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

This module provides function decorators to easily cache the result of
functions based on their parameters, while keeping the caches separated
per lldb target.

@cache_statically can be used to cache data once per target,
because it won't change if the process is resumed and stopped again.
For example: types, constant values, global addresses, ...

@cache_dynamically can be used to cache data that is expensive to compute
but which content depends on the state of the process. It will be
automatically invalidated for you if the process is resumed and
then stops or hits a breakpoint.

@dyn_cached_property can be used on instances to turn a member into
a per-target cached dynamic property that will be cleaned up when
the object dies or if the process is resumed and then stops
or hits a breakpoint.

Functions using these decorators, must have a `target=None` named argument
that no caller of those functions need to pass explicitly, the decorator
will take care of passing the proper current target value (equivalent to
LazyTarget.GetTarget() except more efficiently).


Cache Invalidation
~~~~~~~~~~~~~~~~~~

This module make the crucial assumption that no XNU lldb macro
will step/resume/... processes as the invalidation only happens
when a new command is being run.

If some macro needs to step/resume it has to provide its own
cache invalidation, by e.g. pushing its own ImplicitContext()
around such process state manipulations.
"""

# Private Routines and objects
from collections import namedtuple

import functools
import gc
import inspect
import sys
import weakref

import lldb

from .configuration import config
from . import lldbwrap


class _Registry(list):
    """ Private class that holds a list of _Cache instances """

    __slots__ = ('name')

    def __init__(self, name):
        super().__init__()
        self.name = name

    @property
    def size(self):
        return sys.getsizeof(self)

    def invalidate(self, pid):
        for cache in self:
            cache.invalidate(pid)

    def clear(self):
        for cache in self:
            cache.clear()

    def __repr__(self):
        return "_Registry({}, {})".format(
            self.name, super().__repr__())

    def __str__(self):
        return "_Registry({}, {} caches, size {})".format(
            self.name, len(self), self.size)


class _Cache(dict):
    """ Private class that implements a given global function _Cache

        Those are created with the @cache_statically/@cache_dynamically
        decorators
    """

    __slots__ = ('name')

    def __init__(self, name, registry):
        super().__init__()
        self.name = name
        registry.append(self)

    @property
    def size(self):
        return sys.getsizeof(self)

    def invalidate(self, pid):
        if pid in self: del self[pid]

    def __repr__(self):
        return "_Cache({}, {})".format(
            self.name, super().__repr__())

    def __str__(self):
        return "_Cache({}, {} entries, size {})".format(
            self.name, len(self), self.size)


_static_registry     = _Registry('static')
_dynamic_registry    = _Registry('dynamic')
_dynamic_keys        = {}

_implicit_target     = None
_implicit_process    = None
_implicit_exe_id     = None
_implicit_dynkey     = None


class _DynamicKey(object):
    """ Wraps a process StopID as a key that can be used as caches keys """

    def __init__(self, exe_id, stop_id):
        self.exe_id  = exe_id
        self.stop_id = stop_id

    def __hash__(self):
        return id(self)


class LazyTarget(object):
    """ A common object that lazy-evaluates and caches the lldb.SBTarget
        and lldb.SBProcess for the current interactive debugging session.
    """

    @staticmethod
    def _CacheUpdateAnchor(exe_id, process):
        global _dynamic_keys, _dynamic_registry

        stop_id = process.GetStopID()
        dyn_key = _dynamic_keys.get(exe_id)

        if dyn_key is None:
            _dynamic_keys[exe_id] = dyn_key = _DynamicKey(exe_id, stop_id)
        elif dyn_key.stop_id != stop_id:
            _dynamic_registry.invalidate(exe_id)
            _dynamic_keys[exe_id] = dyn_key = _DynamicKey(exe_id, stop_id)
            gc.collect()

        return dyn_key

    @staticmethod
    def _CacheGC():
        global _dynamic_keys, _dynamic_registry, _static_registry

        exe_ids = _dynamic_keys.keys() - set(
            tg.GetProcess().GetUniqueID()
            for tg in lldb.debugger
        )

        for exe_id in exe_ids:
            _static_registry.invalidate(exe_id)
            _dynamic_registry.invalidate(exe_id)
            del _dynamic_keys[exe_id]

        if len(exe_ids):
            gc.collect()

    @staticmethod
    def _CacheGetDynamicKey():
        """ Get a _DynamicKey for the most likely current process
        """
        global _implicit_dynkey

        dyn_key = _implicit_dynkey
        if dyn_key is None:
            process = lldbwrap.GetProcess()
            exe_id  = process.GetUniqueID()
            dyn_key = LazyTarget._CacheUpdateAnchor(exe_id, process)

        return dyn_key

    @staticmethod
    def _CacheClear():
        """ remove all cached data.
        """
        global _dynamic_registry, _static_registry, _dynamic_keys

        _dynamic_registry.clear()
        _static_registry.clear()
        _dynamic_keys.clear()

    @staticmethod
    def _CacheSize():
        """ Returns number of bytes held in cache.
            returns:
                int - size of cache including static and dynamic
        """
        global _dynamic_registry, _static_registry

        return _dynamic_registry.size + _static_registry.size


    @staticmethod
    def GetTarget():
        """ Get the SBTarget that is the most likely current target
        """
        global _implicit_target

        return _implicit_target or lldbwrap.GetTarget()

    @staticmethod
    def GetProcess():
        """ Get an SBProcess for the most likely current process
        """
        global _implicit_process

        return _implicit_process or lldbwrap.GetProcess()


class ImplicitContext(object):
    """ This class sets up the implicit target/process
        being used by the XNu lldb macros system.

        In order for lldb macros to function properly, such a context
        must be used around code being run, otherwise macros will try
        to infer it from the current lldb selected target which is
        incorrect in certain contexts.

        typical usage is:

            with ImplicitContext(thing):
                # code

        where @c thing is any of an SBExecutionContext, an SBValue,
        an SBBreakpoint, an SBProcess, or an SBTarget.
    """

    __slots__ = ('target', 'process', 'exe_id', 'old_ctx')

    def __init__(self, arg):
        if isinstance(arg, lldb.SBExecutionContext):
            exe_ctx = lldbwrap.SBExecutionContext(arg)
            target  = exe_ctx.GetTarget()
            process = exe_ctx.GetProcess()
        elif isinstance(arg, lldb.SBValue):
            target  = lldbwrap.SBTarget(arg.GetTarget())
            process = target.GetProcess()
        elif isinstance(arg, lldb.SBBreakpoint):
            bpoint  = lldbwrap.SBBreakpoint(arg)
            target  = bpoint.GetTarget()
            process = target.GetProcess()
        elif isinstance(arg, lldb.SBProcess):
            process = lldbwrap.SBProcess(arg)
            target  = process.GetTarget()
        elif isinstance(arg, lldb.SBTarget):
            target  = lldbwrap.SBTarget(arg)
            process = target.GetProcess()
        else:
            raise TypeError("argument type unsupported {}".format(
                arg.__class__.__name__))

        self.target  = target
        self.process = process
        self.exe_id  = process.GetUniqueID()
        self.old_ctx = None

    def __enter__(self):
        global _implicit_target, _implicit_process, _implicit_exe_id
        global _implicit_dynkey, _dynamic_keys

        self.old_ctx = (_implicit_target, _implicit_process, _implicit_exe_id)

        _implicit_target  = self.target
        _implicit_process = process = self.process
        _implicit_exe_id  = exe_id = self.exe_id
        _implicit_dynkey  = LazyTarget._CacheUpdateAnchor(exe_id, process)

        if len(_dynamic_keys) > 1:
            LazyTarget._CacheGC()

    def __exit__(self, *args):
        global _implicit_target, _implicit_process, _implicit_exe_id
        global _implicit_dynkey, _dynamic_keys

        target, process, exe_id = self.old_ctx
        self.old_ctx = None

        _implicit_target  = target
        _implicit_process = process
        _implicit_exe_id  = exe_id

        if process:
            _implicit_dynkey = LazyTarget._CacheUpdateAnchor(exe_id, process)
        else:
            _implicit_dynkey = None


class _HashedSeq(list):
    """ This class guarantees that hash() will be called no more than once
        per element.  This is important because the lru_cache() will hash
        the key multiple times on a cache miss.

        Inspired by python3's lru_cache decorator implementation
    """

    __slots__ = 'hashvalue'

    def __init__(self, tup, hash=hash):
        self[:] = tup
        self.hashvalue = hash(tup)

    def __hash__(self):
        return self.hashvalue

    @classmethod
    def make_key(cls, args, kwds, kwd_mark = (object(),),
        fasttypes = {int, str}, tuple=tuple, type=type, len=len):

        """ Inspired from python3's cache implementation """

        key = args
        if kwds:
            key += kwd_mark
            key += tuple(kwd.items())
        elif len(key) == 0:
            return None
        elif len(key) == 1 and type(key[0]) in fasttypes:
            return key[0]
        return cls(key)


def _cache_with_registry(fn, registry, maxsize=128, sentinel=object()):
    """ Internal function """

    nokey = False

    if hasattr(inspect, 'signature'): # PY3
        sig = inspect.signature(fn)
        tg  = sig.parameters.get('target')
        if not tg or tg.default is not None:
            raise ValueError("function doesn't have a 'target=None' argument")

        nokey = len(sig.parameters) == 1
        cache = _Cache(fn.__qualname__, registry)
    else:
        spec = inspect.getargspec(fn)
        try:
            index = spec.args.index('target')
            offs  = len(spec.args) - len(spec.defaults)
            if index < offs or spec.defaults[index - offs] is not None:
                raise ValueError
        except:
            raise ValueError("function doesn't have a 'target=None' argument")

        nokey = len(spec.args) == 1 and spec.varargs is None and spec.keywords is None
        cache = _Cache(fn.__name__, registry)

    c_setdef = cache.setdefault
    c_get    = cache.get
    make_key = _HashedSeq.make_key
    getdynk  = LazyTarget._CacheGetDynamicKey
    gettg    = LazyTarget.GetTarget

    if nokey:
        def caching_wrapper(*args, **kwds):
            global _implicit_exe_id, _implicit_target

            key = _implicit_exe_id or getdynk().exe_id
            result = c_get(key, sentinel)
            if result is not sentinel:
                return result

            kwds['target'] = _implicit_target or gettg()
            return c_setdef(key, fn(*args, **kwds))

        def cached(*args, **kwds):
            global _implicit_exe_id

            return c_get(_implicit_exe_id or getdynk().exe_id, sentinel) != sentinel
    else:
        def caching_wrapper(*args, **kwds):
            global _implicit_exe_id, _implicit_target

            tg_d   = c_setdef(_implicit_exe_id or getdynk().exe_id, {})
            c_key  = make_key(args, kwds)
            result = tg_d.get(c_key, sentinel)
            if result is not sentinel:
                return result

            #
            # Blunt policy to avoid exploding memory,
            # that is simpler than an actual LRU.
            #
            # TODO: be smarter?
            #
            if len(tg_d) >= maxsize: tg_d.clear()

            kwds['target'] = _implicit_target or gettg()
            return tg_d.setdefault(c_key, fn(*args, **kwds))

        def cached(*args, **kwds):
            global _implicit_exe_id

            tg_d = c_get(_implicit_exe_id or getdynk().exe_id)
            return tg_d and tg_d.get(make_key(args, kwds), sentinel) != sentinel

    caching_wrapper.cached = cached
    return functools.update_wrapper(caching_wrapper, fn)


def cache_statically(fn):
    """ Decorator to cache the results statically

        This basically makes the decorated function cache its result based
        on its arguments with an automatic static per target cache

        The function must have a named parameter called 'target' defaulting
        to None, with no clients ever passing it explicitly.  It will be
        passed the proper SBTarget when called.

        @cache_statically(user_function)
            Cache the results of this function automatically per target,
            using the arguments of the function as the cache key.
    """

    return _cache_with_registry(fn, _static_registry)


def cache_dynamically(fn):
    """ Decorator to cache the results dynamically

        This basically makes the decorated function cache its result based
        on its arguments with an automatic dynamic cache that is reset
        every time the process state changes

        The function must have a named parameter called 'target' defaulting
        to None, with no clients ever passing it explicitly.  It will be
        passed the proper SBTarget when called.

        @cache_dynamically(user_function)
            Cache the results of this function automatically per target,
            using the arguments of the function as the cache key.
    """

    return _cache_with_registry(fn, _dynamic_registry)


def dyn_cached_property(fn, sentinel=object()):
    """ Decorator to make a class or method property cached per instance

        The method must have the prototype:

            def foo(self, target=None)

        and will generate the property "foo".
    """

    if hasattr(inspect, 'signature'): # PY3
        if list(inspect.signature(fn).parameters) != ['self', 'target']:
            raise ValueError("function signature must be (self, target=None)")
    else:
        spec = inspect.getargspec(fn)
        if spec.args != ['self', 'target'] or \
                spec.varargs is not None or spec.keywords is not None:
            raise ValueError("function signature must be (self, target=None)")

    getdynk  = LazyTarget._CacheGetDynamicKey
    gettg    = LazyTarget.GetTarget
    c_attr   = "_dyn_key__" + fn.__name__

    def dyn_cached_property_wrapper(self, target=None):
        global _implicit_dynkey, _implicit_target

        cache = getattr(self, c_attr, None)
        if cache is None:
            cache = weakref.WeakKeyDictionary()
            setattr(self, c_attr, cache)

        c_key  = _implicit_dynkey or getdynk()
        result = cache.get(c_key, sentinel)
        if result is not sentinel:
            return result

        return cache.setdefault(c_key, fn(self, _implicit_target or gettg()))

    return property(functools.update_wrapper(dyn_cached_property_wrapper, fn))


ClearAllCache = LazyTarget._CacheClear

GetSizeOfCache = LazyTarget._CacheSize

__all__ = [
    LazyTarget.__name__,
    ImplicitContext.__name__,

    cache_statically.__name__,
    cache_dynamically.__name__,
    dyn_cached_property.__name__,

    ClearAllCache.__name__,
    GetSizeOfCache.__name__,
]