This is xnu-11215.1.10. See this file in:
"""
Wrappers around globals and caches to service the kmem package
"""
from abc import ABCMeta, abstractmethod
from collections import namedtuple
from core import (
    caching,
    gettype,
    lldbwrap,
)
from ctypes import c_int64

class MemoryRange(namedtuple('MemoryRange', ['start', 'end'])):
    @property
    def size(self):
        start, end = self
        return end - start

    def contains(self, addr):
        start, end = self
        return start <= addr < end

    def __repr__(self):
        return "{0.__class__.__name__}[{0.start:#x}, {0.end:#x})".format(self)


class VMPointerUnpacker(object):
    """
    Pointer unpacker for pointers packed with VM_PACK_POINTER()
    """
    def __init__(self, target, param_var):
        params = target.chkFindFirstGlobalVariable(param_var)
        self.base_relative = params.xGetScalarByName('vmpp_base_relative')
        self.bits          = params.xGetScalarByName('vmpp_bits')
        self.shift         = params.xGetScalarByName('vmpp_shift')
        self.base          = params.xGetScalarByName('vmpp_base')

    def unpack(self, packed):
        """
        Unpacks an address according to the VM_PACK_POINTER() scheme

        @param packed (int)
            The packed value to unpack

        @returns (int)
            The unpacked address
        """

        if not packed:
            return None

        if self.base_relative:
            addr = (packed << self.shift) + self.base
        else:
            bits  = self.bits
            shift = self.shift
            addr  = c_int64(packed << (64 - bits)).value
            addr >>= 64 - bits - shift

        return addr & 0xffffffffffffffff

    def unpack_value(self, sbv):
        """
        Conveniency wrapper for self.unpack(sbv.chkGetValueAsUnsigned())
        """
        return self.unpack(sbv.chkGetValueAsUnsigned())


class KMem(object, metaclass=ABCMeta):
    """
    Singleton class that holds various important information
    that is needed to make sense of the kernel memory layout,
    heap data structures, globals, ...
    """

    _HEAP_NAMES = [ "", "shared.", "data.", "" ]

    @staticmethod
    def _parse_range(zone_info_v, name):
        """
        Create a tuple representing a range (min_address, max_address, size)
        """
        range_v = zone_info_v.chkGetChildMemberWithName(name)
        left    = range_v.xGetIntegerByName('min_address')
        right   = range_v.xGetIntegerByName('max_address')
        return MemoryRange(left, right)

    def __init__(self, target):
        self.target = target

        #
        # Cache some globals everyone needs
        #
        self.page_shift = target.chkFindFirstGlobalVariable('page_shift').xGetValueAsInteger()
        self.page_size  = 1 << self.page_shift
        self.page_mask  = self.page_size - 1

        phase_v = target.chkFindFirstGlobalVariable('startup_phase')
        self.phase      = phase_v.xGetValueAsInteger()
        self.phases     = set(
            e.GetName()[len('STARTUP_SUB_'):]
            for e in phase_v.GetType().get_enum_members_array()
            if  e.GetValueAsUnsigned() <= self.phase
        )

        #
        # Setup the number of CPUs we have
        #
        self.ncpus      = target.chkFindFirstGlobalVariable('zpercpu_early_count').xGetValueAsInteger()
        self.master_cpu = target.chkFindFirstGlobalVariable('master_cpu').xGetValueAsInteger()
        self.zcpus      = range(self.ncpus) if 'ZALLOC' in self.phases else (self.master_cpu, )
        self.pcpus      = range(self.ncpus) if 'PERCPU' in self.phases else (self.master_cpu, )

        #
        # Load all the ranges we will need
        #
        zone_info = target.chkFindFirstGlobalVariable('zone_info')
        self.meta_range = self._parse_range(zone_info, 'zi_meta_range')
        self.bits_range = self._parse_range(zone_info, 'zi_bits_range')
        self.zone_range = self._parse_range(zone_info, 'zi_map_range')
        try:
            self.pgz_range = self._parse_range(zone_info, 'zi_pgz_range')
            self.pgz_bt    = target.chkFindFirstGlobalVariable('pgz_backtraces').xDereference()
        except:
            self.pgz_range = MemoryRange(0, 0)
            self.pgz_bt    = None

        kmem_ranges = target.chkFindFirstGlobalVariable('kmem_ranges')
        count       = kmem_ranges.GetByteSize() // target.GetAddressByteSize()
        addresses   = target.xIterAsUInt64(kmem_ranges.GetLoadAddress(), count)
        self.kmem_ranges = [
            MemoryRange(next(addresses), next(addresses))
            for i in range(0, count, 2)
        ]

        kmem_ranges = target.chkFindFirstGlobalVariable('gIOKitPageableFixedRanges')
        count       = kmem_ranges.GetByteSize() // target.GetAddressByteSize()
        addresses   = target.xIterAsUInt64(kmem_ranges.GetLoadAddress(), count)
        self.iokit_ranges = [
            MemoryRange(next(addresses), next(addresses))
            for i in range(0, count, 2)
        ]

        #
        # And other important globals
        #
        self.stext      = target.chkFindFirstGlobalVariable('vm_kernel_stext').xGetValueAsInteger()
        self.num_zones  = target.chkFindFirstGlobalVariable('num_zones').xGetValueAsInteger()
        self.mag_size   = target.chkFindFirstGlobalVariable('_zc_mag_size').xGetValueAsInteger()
        self.zone_array = target.chkFindFirstGlobalVariable('zone_array')
        self.zsec_array = target.chkFindFirstGlobalVariable('zone_security_array')

        self.kernel_map = target.chkFindFirstGlobalVariable('kernel_map').Dereference()
        self.vm_kobject = target.chkFindFirstGlobalVariable('kernel_object_store')

        #
        # Cache some crucial types used for memory walks
        #
        self.zpm_type    = gettype('struct zone_page_metadata')
        self.vm_map_type = gettype('struct _vm_map')
        self.vmo_type    = self.vm_kobject.GetType()

        #
        # Recognize whether the target is any form of KASAN kernel.
        #
        if any(target.FindFirstGlobalVariable('kasan_enabled')):
            self.kasan         = True
            self.kasan_tbi     = any(target.FindFirstGlobalVariable('kasan_tbi_enabled'))
            self.kasan_classic = not self.kasan_tbi
        else:
            self.kasan         = False
            self.kasan_tbi     = False
            self.kasan_classic = False

        #
        # VM_PACK_POINTER Unpackers
        #
        self.kn_kq_packing = VMPointerUnpacker(target, 'kn_kq_packing_params')
        self.vm_page_packing = VMPointerUnpacker(target, 'vm_page_packing_params')
        self.rwlde_caller_packing = VMPointerUnpacker(target, 'rwlde_caller_packing_params')
        self.c_slot_packing = VMPointerUnpacker(target, 'c_slot_packing_params')

    @staticmethod
    @caching.cache_statically
    def get_shared(target=None):
        """
        Returns a shared instance of the class
        """

        arch = target.triple[:target.triple.find('-')]

        if arch.startswith('arm64e'):
            return _KMemARM64e(target)
        elif arch.startswith('arm64'):
            return _KMemARM64(target)
        elif arch.startswith('x86_64'):
            return _KMemX86(target)
        else:
            raise RuntimeError("Unsupported architecture: {}".format(arch))

    def iter_addresses(self, iterable):
        """
        Conveniency wrapper to transform a list of integer to addresses
        """
        return (self.make_address(a) for a in iterable)

    #
    # Abstract per-arch methods
    #

    @property
    @abstractmethod
    def has_ptrauth(self):
        """ whether this target has ptrauth """

        pass

    @abstractmethod
    def PERCPU_BASE(self, cpu):
        """
        Returns the per-cpu base for a given CPU number

        @param cpu (int)
            A CPU number

        @returns (int)
            The percpu base for this CPU
        """

        pass

    @abstractmethod
    def make_address(self, addr):
        """
        Make an address out of an integer

        @param addr (int)
            An address to convert

        @returns (int)
        """

        pass


class _KMemARM64(KMem):
    """
    Specialization of KMem for arm64
    """

    def __init__(self, target):
        super().__init__(target)

        self.arm64_CpuDataEntries = target.chkFindFirstGlobalVariable('CpuDataEntries')
        self.arm64_BootCpuData    = target.chkFindFirstGlobalVariable('percpu_slot_cpu_data')
        self.arm64_t1sz           = target.chkFindFirstGlobalVariable('gT1Sz').xGetValueAsInteger()
        self.arm64_sign_mask      = 1 << (63 - self.arm64_t1sz)

    @property
    def has_ptrauth(self):
        return False

    def PERCPU_BASE(self, cpu):
        cpu_data   = self.arm64_CpuDataEntries.chkGetChildAtIndex(cpu)
        boot_vaddr = self.arm64_BootCpuData.GetLoadAddress()

        return cpu_data.xGetIntegerByName('cpu_data_vaddr') - boot_vaddr

    def make_address(self, addr):
        sign_mask = self.arm64_sign_mask
        addr = addr & (sign_mask + sign_mask - 1)
        return ((addr ^ sign_mask) - sign_mask) & 0xffffffffffffffff


class _KMemARM64e(_KMemARM64):
    """
    Specialization of KMem for arm64e
    """

    @property
    def has_ptrauth(self):
        return True


class _KMemX86(KMem):
    """
    Specialization of KMem for Intel
    """

    def __init__(self, target):
        super().__init__(target)

        self.intel_cpu_data = target.chkFindFirstGlobalVariable('cpu_data_ptr')

    @property
    def has_ptrauth(self):
        return False

    def PERCPU_BASE(self, cpu):
        cpu_data = self.intel_cpu_data.chkGetChildAtIndex(cpu)
        return cpu_data.xGetIntegerByName('cpu_pcpu_base')

    def make_address(self, addr):
        return addr


class PERCPUValue(object):
    """
    Provides an enumerator for a percpu value
    """

    def __init__(self, name, target = None):
        """
        @param name (str)
            The percpu slot name

        @param target (SBTarget or None)
        """

        self.kmem = KMem.get_shared()
        self.sbv  = self.kmem.target.chkFindFirstGlobalVariable('percpu_slot_' + name)

    def __getitem__(self, cpu):
        if cpu in self.kmem.pcpus:
            sbv  = self.sbv
            addr = sbv.GetLoadAddress() + self.kmem.PERCPU_BASE(cpu)
            return sbv.chkCreateValueFromAddress(sbv.GetName(), addr, sbv.GetType())
        raise IndexError

    def __iter__(self):
        return (item[1] for items in self.items())

    def items(self):
        """
        Iterator of (cpu, SBValue) tuples for the given PERCPUValue
        """

        kmem = self.kmem
        sbv  = self.sbv
        name = sbv.GetName()
        ty   = sbv.GetType()
        addr = sbv.GetLoadAddress()

        return (
            (cpu, sbv.chkCreateValueFromAddress(name, addr + kmem.PERCPU_BASE(cpu), ty))
            for cpu in kmem.pcpus
        )

__all__ = [
    KMem.__name__,
    MemoryRange.__name__,
    PERCPUValue.__name__,
]