This is xnu-11215.1.10. See this file in:
"""
Wrappers around globals and caches to service the kmem package
"""
from abc import ABCMeta, abstractmethod
from collections import namedtuple
from core import (
caching,
gettype,
lldbwrap,
)
from ctypes import c_int64
class MemoryRange(namedtuple('MemoryRange', ['start', 'end'])):
@property
def size(self):
start, end = self
return end - start
def contains(self, addr):
start, end = self
return start <= addr < end
def __repr__(self):
return "{0.__class__.__name__}[{0.start:#x}, {0.end:#x})".format(self)
class VMPointerUnpacker(object):
"""
Pointer unpacker for pointers packed with VM_PACK_POINTER()
"""
def __init__(self, target, param_var):
params = target.chkFindFirstGlobalVariable(param_var)
self.base_relative = params.xGetScalarByName('vmpp_base_relative')
self.bits = params.xGetScalarByName('vmpp_bits')
self.shift = params.xGetScalarByName('vmpp_shift')
self.base = params.xGetScalarByName('vmpp_base')
def unpack(self, packed):
"""
Unpacks an address according to the VM_PACK_POINTER() scheme
@param packed (int)
The packed value to unpack
@returns (int)
The unpacked address
"""
if not packed:
return None
if self.base_relative:
addr = (packed << self.shift) + self.base
else:
bits = self.bits
shift = self.shift
addr = c_int64(packed << (64 - bits)).value
addr >>= 64 - bits - shift
return addr & 0xffffffffffffffff
def unpack_value(self, sbv):
"""
Conveniency wrapper for self.unpack(sbv.chkGetValueAsUnsigned())
"""
return self.unpack(sbv.chkGetValueAsUnsigned())
class KMem(object, metaclass=ABCMeta):
"""
Singleton class that holds various important information
that is needed to make sense of the kernel memory layout,
heap data structures, globals, ...
"""
_HEAP_NAMES = [ "", "shared.", "data.", "" ]
@staticmethod
def _parse_range(zone_info_v, name):
"""
Create a tuple representing a range (min_address, max_address, size)
"""
range_v = zone_info_v.chkGetChildMemberWithName(name)
left = range_v.xGetIntegerByName('min_address')
right = range_v.xGetIntegerByName('max_address')
return MemoryRange(left, right)
def __init__(self, target):
self.target = target
#
# Cache some globals everyone needs
#
self.page_shift = target.chkFindFirstGlobalVariable('page_shift').xGetValueAsInteger()
self.page_size = 1 << self.page_shift
self.page_mask = self.page_size - 1
phase_v = target.chkFindFirstGlobalVariable('startup_phase')
self.phase = phase_v.xGetValueAsInteger()
self.phases = set(
e.GetName()[len('STARTUP_SUB_'):]
for e in phase_v.GetType().get_enum_members_array()
if e.GetValueAsUnsigned() <= self.phase
)
#
# Setup the number of CPUs we have
#
self.ncpus = target.chkFindFirstGlobalVariable('zpercpu_early_count').xGetValueAsInteger()
self.master_cpu = target.chkFindFirstGlobalVariable('master_cpu').xGetValueAsInteger()
self.zcpus = range(self.ncpus) if 'ZALLOC' in self.phases else (self.master_cpu, )
self.pcpus = range(self.ncpus) if 'PERCPU' in self.phases else (self.master_cpu, )
#
# Load all the ranges we will need
#
zone_info = target.chkFindFirstGlobalVariable('zone_info')
self.meta_range = self._parse_range(zone_info, 'zi_meta_range')
self.bits_range = self._parse_range(zone_info, 'zi_bits_range')
self.zone_range = self._parse_range(zone_info, 'zi_map_range')
try:
self.pgz_range = self._parse_range(zone_info, 'zi_pgz_range')
self.pgz_bt = target.chkFindFirstGlobalVariable('pgz_backtraces').xDereference()
except:
self.pgz_range = MemoryRange(0, 0)
self.pgz_bt = None
kmem_ranges = target.chkFindFirstGlobalVariable('kmem_ranges')
count = kmem_ranges.GetByteSize() // target.GetAddressByteSize()
addresses = target.xIterAsUInt64(kmem_ranges.GetLoadAddress(), count)
self.kmem_ranges = [
MemoryRange(next(addresses), next(addresses))
for i in range(0, count, 2)
]
kmem_ranges = target.chkFindFirstGlobalVariable('gIOKitPageableFixedRanges')
count = kmem_ranges.GetByteSize() // target.GetAddressByteSize()
addresses = target.xIterAsUInt64(kmem_ranges.GetLoadAddress(), count)
self.iokit_ranges = [
MemoryRange(next(addresses), next(addresses))
for i in range(0, count, 2)
]
#
# And other important globals
#
self.stext = target.chkFindFirstGlobalVariable('vm_kernel_stext').xGetValueAsInteger()
self.num_zones = target.chkFindFirstGlobalVariable('num_zones').xGetValueAsInteger()
self.mag_size = target.chkFindFirstGlobalVariable('_zc_mag_size').xGetValueAsInteger()
self.zone_array = target.chkFindFirstGlobalVariable('zone_array')
self.zsec_array = target.chkFindFirstGlobalVariable('zone_security_array')
self.kernel_map = target.chkFindFirstGlobalVariable('kernel_map').Dereference()
self.vm_kobject = target.chkFindFirstGlobalVariable('kernel_object_store')
#
# Cache some crucial types used for memory walks
#
self.zpm_type = gettype('struct zone_page_metadata')
self.vm_map_type = gettype('struct _vm_map')
self.vmo_type = self.vm_kobject.GetType()
#
# Recognize whether the target is any form of KASAN kernel.
#
if any(target.FindFirstGlobalVariable('kasan_enabled')):
self.kasan = True
self.kasan_tbi = any(target.FindFirstGlobalVariable('kasan_tbi_enabled'))
self.kasan_classic = not self.kasan_tbi
else:
self.kasan = False
self.kasan_tbi = False
self.kasan_classic = False
#
# VM_PACK_POINTER Unpackers
#
self.kn_kq_packing = VMPointerUnpacker(target, 'kn_kq_packing_params')
self.vm_page_packing = VMPointerUnpacker(target, 'vm_page_packing_params')
self.rwlde_caller_packing = VMPointerUnpacker(target, 'rwlde_caller_packing_params')
self.c_slot_packing = VMPointerUnpacker(target, 'c_slot_packing_params')
@staticmethod
@caching.cache_statically
def get_shared(target=None):
"""
Returns a shared instance of the class
"""
arch = target.triple[:target.triple.find('-')]
if arch.startswith('arm64e'):
return _KMemARM64e(target)
elif arch.startswith('arm64'):
return _KMemARM64(target)
elif arch.startswith('x86_64'):
return _KMemX86(target)
else:
raise RuntimeError("Unsupported architecture: {}".format(arch))
def iter_addresses(self, iterable):
"""
Conveniency wrapper to transform a list of integer to addresses
"""
return (self.make_address(a) for a in iterable)
#
# Abstract per-arch methods
#
@property
@abstractmethod
def has_ptrauth(self):
""" whether this target has ptrauth """
pass
@abstractmethod
def PERCPU_BASE(self, cpu):
"""
Returns the per-cpu base for a given CPU number
@param cpu (int)
A CPU number
@returns (int)
The percpu base for this CPU
"""
pass
@abstractmethod
def make_address(self, addr):
"""
Make an address out of an integer
@param addr (int)
An address to convert
@returns (int)
"""
pass
class _KMemARM64(KMem):
"""
Specialization of KMem for arm64
"""
def __init__(self, target):
super().__init__(target)
self.arm64_CpuDataEntries = target.chkFindFirstGlobalVariable('CpuDataEntries')
self.arm64_BootCpuData = target.chkFindFirstGlobalVariable('percpu_slot_cpu_data')
self.arm64_t1sz = target.chkFindFirstGlobalVariable('gT1Sz').xGetValueAsInteger()
self.arm64_sign_mask = 1 << (63 - self.arm64_t1sz)
@property
def has_ptrauth(self):
return False
def PERCPU_BASE(self, cpu):
cpu_data = self.arm64_CpuDataEntries.chkGetChildAtIndex(cpu)
boot_vaddr = self.arm64_BootCpuData.GetLoadAddress()
return cpu_data.xGetIntegerByName('cpu_data_vaddr') - boot_vaddr
def make_address(self, addr):
sign_mask = self.arm64_sign_mask
addr = addr & (sign_mask + sign_mask - 1)
return ((addr ^ sign_mask) - sign_mask) & 0xffffffffffffffff
class _KMemARM64e(_KMemARM64):
"""
Specialization of KMem for arm64e
"""
@property
def has_ptrauth(self):
return True
class _KMemX86(KMem):
"""
Specialization of KMem for Intel
"""
def __init__(self, target):
super().__init__(target)
self.intel_cpu_data = target.chkFindFirstGlobalVariable('cpu_data_ptr')
@property
def has_ptrauth(self):
return False
def PERCPU_BASE(self, cpu):
cpu_data = self.intel_cpu_data.chkGetChildAtIndex(cpu)
return cpu_data.xGetIntegerByName('cpu_pcpu_base')
def make_address(self, addr):
return addr
class PERCPUValue(object):
"""
Provides an enumerator for a percpu value
"""
def __init__(self, name, target = None):
"""
@param name (str)
The percpu slot name
@param target (SBTarget or None)
"""
self.kmem = KMem.get_shared()
self.sbv = self.kmem.target.chkFindFirstGlobalVariable('percpu_slot_' + name)
def __getitem__(self, cpu):
if cpu in self.kmem.pcpus:
sbv = self.sbv
addr = sbv.GetLoadAddress() + self.kmem.PERCPU_BASE(cpu)
return sbv.chkCreateValueFromAddress(sbv.GetName(), addr, sbv.GetType())
raise IndexError
def __iter__(self):
return (item[1] for items in self.items())
def items(self):
"""
Iterator of (cpu, SBValue) tuples for the given PERCPUValue
"""
kmem = self.kmem
sbv = self.sbv
name = sbv.GetName()
ty = sbv.GetType()
addr = sbv.GetLoadAddress()
return (
(cpu, sbv.chkCreateValueFromAddress(name, addr + kmem.PERCPU_BASE(cpu), ty))
for cpu in kmem.pcpus
)
__all__ = [
KMem.__name__,
MemoryRange.__name__,
PERCPUValue.__name__,
]