This is xnu-11215.1.10. See this file in:
from collections import namedtuple
from functools import cached_property
import os
import io
from typing import Any, Generator
import core
from uuid import UUID

from core.cvalue import (
    unsigned,
    signed,
    addressof
)
from core.caching import (
    cache_dynamically,
    LazyTarget,
)
from core.io import SBProcessRawIO
from macho import MachOSegment, MemMachO, VisualMachoMap

from xnu import (
    IterateLinkedList,
    lldb_alias,
    lldb_command,
    lldb_run_command,
    lldb_type_summary,
    kern,
    Cast,
    header,
    GetLongestMatchOption,
    debuglog,
    dsymForUUID,
    addDSYM,
    loadDSYM,
    ArgumentError,
    ArgumentStringToInt,
    GetObjectAtIndexFromArray,
    ResolveFSPath,
    uuid_regex,
    GetLLDBThreadForKernelThread
)

import kmemory
import macho
import lldb
import concurrent.futures


#
# Summary of information available about a kext.
#
#   uuid     - UUID of the object
#   vmaddr   - VA of the text segment
#   name     - Name of the kext
#   address  - Kext address
#   segments - Mach-O segments (if available)
#   summary  - OSKextLoadedSummary
#   kmod     - kmod_info_t
class KextSummary:
    def __init__(self, uuid: str, vmaddr, name: str, address: int, segments: list[MachOSegment], summary: core.value):
        self.uuid = uuid
        self.vmaddr = vmaddr
        self.name = name
        self.address = address
        self.segments = segments
        self.summary = summary

    @cached_property
    def kmod(self):
        try:
            kmod = GetKmodWithAddr(unsigned(self.address))
        except ValueError:
            kmod = None

        return kmod

# Segment helpers


def text_segment(segments):
    """ Return TEXT segment if present in the list of first one.
        segments: List of MachOSegment.
    """

    text_segments = {
        s.name: s
        for s in segments
        if s.name in ('__TEXT_EXEC', '__TEXT')
    }

    # Pick text segment based on our prefered order.
    for name in ['__TEXT_EXEC', '__TEXT']:
        if name in text_segments:
            return text_segments[name]

    return segments[0]


def seg_contains(segments, addr):
    """ Returns generator of all segments that contains given address. """

    return (
        s for s in segments
        if s.vmaddr <= addr < (s.vmaddr + s.vmsize)
    )


def sec_contains(sections, addr):
    """ Returns generator of all sections that contains given address. """

    return (
        s for s in sections
        if s.addr <= addr < (s.addr + s.size)
    )

def sbsec_contains(target, sbsections, addr):
    """ Returns generator of all SBSections that contains given address. """

    return (
        s for s in sbsections
        if s.GetLoadAddress(target) <= addr < s.GetLoadAddress(target) + s.GetByteSize()
    )


# Summary helpers

def LoadMachO(address, size):
    """ Parses Mach-O headers in given VA range.

        return: MemMachO instance.
    """

    process = LazyTarget.GetProcess()
    procio = SBProcessRawIO(process, address, size)
    bufio = io.BufferedRandom(procio)
    return macho.MemMachO(bufio)


def IterateKextSummaries(target) -> Generator[KextSummary, Any, None]:
    """ Generator walking over all kext summaries. """

    hdr   = target.chkFindFirstGlobalVariable('gLoadedKextSummaries').Dereference()
    arr   = hdr.GetValueForExpressionPath('.summaries[0]')
    total = hdr.xGetIntegerByName('numSummaries')

    for kext in (core.value(e.AddressOf()) for e in arr.xIterSiblings(0, total)):
        # Load Mach-O segments/sections.
        mobj = LoadMachO(unsigned(kext.address), unsigned(kext.size))

        # Construct kext summary.
        yield KextSummary(
            uuid=GetUUIDSummary(kext.uuid),
            vmaddr=text_segment(mobj.segments).vmaddr,
            name=str(kext.name),
            address=unsigned(kext.address),
            segments=mobj.segments,
            summary=kext
        )


@cache_dynamically
def GetAllKextSummaries(target=None) -> list[KextSummary]:
    """ Return all kext summaries. (cached) """

    return list(IterateKextSummaries(target))


def FindKextSummary(kmod_addr):
    """ Returns summary for given kmod_info_t. """

    for mod in GetAllKextSummaries():
        if mod.address == kmod_addr or mod.vmaddr == kmod_addr:
            return mod

    return None


# Keep this around until DiskImages2 migrate over to new methods above.
def GetKextLoadInformation(addr=0, show_progress=False):
    """ Original wrapper kept for backwards compatibility. """
    if addr:
        return [FindKextSummary(addr)]
    else:
        return GetAllKextSummaries()


@lldb_command('showkextmacho')
def ShowKextMachO(cmd_args=[]):
    """ Show visual Mach-O layout.

        Syntax: (lldb) showkextmacho <name of a kext>
    """
    if len(cmd_args) != 1:
        raise ArgumentError("kext name is missing")

    for kext in GetAllKextSummaries():

        # Skip not matching kexts.
        if kext.name.find(cmd_args[0]) == -1:
            continue

        # Load Mach-O segments/sections.
        mobj = LoadMachO(unsigned(kext.kmod.address), unsigned(kext.kmod.size))

        p = VisualMachoMap(kext.name)
        p.printMachoMap(mobj)
        print(" \n")


_UNKNOWN_UUID = "........-....-....-....-............"


@lldb_type_summary(['uuid_t'])
@header("")
def GetUUIDSummary(uuid):
    """ returns a UUID string in form CA50DA4C-CA10-3246-B8DC-93542489AA26

        uuid - Address of a memory where UUID is stored.
    """

    err = lldb.SBError()
    addr = unsigned(addressof(uuid))
    data = LazyTarget.GetProcess().ReadMemory(addr, 16, err)

    if not err.Success():
        return _UNKNOWN_UUID

    return str(UUID(bytes=data)).upper()


@lldb_type_summary(['kmod_info_t *'])
@header((
    "{0: <20s} {1: <20s} {2: <20s} {3: >3s} {4: >5s} {5: <20s} {6: <20s} "
    "{7: >20s} {8: <30s}"
).format('kmod_info', 'address', 'size', 'id', 'refs', 'TEXT exec', 'size',
         'version', 'name'))
def GetKextSummary(kmod):
    """ returns a string representation of kext information """
    if not kmod:
        return "kmod info missing"
    
    format_string = (
        "{mod: <#020x} {mod.address: <#020x} {mod.size: <#020x} "
        "{mod.id: >3d} {mod.reference_count: >5d} {seg.vmaddr: <#020x} "
        "{seg.vmsize: <#020x} {mod.version: >20s} {mod.name: <30s}"
    )

    # Try to obtain text segment from kext summary
    summary = FindKextSummary(unsigned(kmod.address))
    if summary:
        seg = text_segment(summary.segments)
    else:
        # Fake text segment for pseudo kexts.
        seg = MachOSegment('__TEXT', kmod.address, kmod.size, 0, kmod.size, [])

    return format_string.format(mod=kmod, seg=seg)


def GetKmodWithAddr(addr):
    """ Go through kmod list and find one with begin_addr as addr.
        returns: None if not found else a cvalue of type kmod.
    """

    for kmod in IterateLinkedList(kern.globals.kmod, 'next'):
        if addr == unsigned(kmod.address):
            return kmod

    return None


@lldb_command('showkmodaddr')
def ShowKmodAddr(cmd_args=[]):
    """ Given an address, print the offset and name for the kmod containing it
        Syntax: (lldb) showkmodaddr <addr>
    """
    if len(cmd_args) < 1:
        raise ArgumentError("Insufficient arguments")

    addr = ArgumentStringToInt(cmd_args[0])

    # Find first summary/segment pair that covers given address.
    sumseg = (
        (m, next(seg_contains(m.segments, addr), None))
        for m in GetAllKextSummaries()
    )

    print(GetKextSummary.header)
    for ksum, segment in (t for t in sumseg if t[1] is not None):
        summary = GetKextSummary(ksum.kmod)
        print(summary + " segment: {} offset = {:#0x}".format(
            segment.name, (addr - segment.vmaddr)))

    return True


def GetOSKextVersion(version_num):
    """ returns a string of format 1.2.3x from the version_num
        params: version_num - int
        return: str
    """
    if version_num == -1:
        return "invalid"

    (MAJ_MULT, MIN_MULT) = (1000000000000, 100000000)
    (REV_MULT, STAGE_MULT) = (10000, 1000)

    version = version_num

    vers_major = version // MAJ_MULT
    version = version - (vers_major * MAJ_MULT)

    vers_minor = version // MIN_MULT
    version = version - (vers_minor * MIN_MULT)

    vers_revision = version // REV_MULT
    version = version - (vers_revision * REV_MULT)

    vers_stage = version // STAGE_MULT
    version = version - (vers_stage * STAGE_MULT)

    vers_stage_level = version

    out_str = "%d.%d" % (vers_major, vers_minor)
    if vers_revision > 0:
        out_str += ".%d" % vers_revision
    if vers_stage == 1:
        out_str += "d%d" % vers_stage_level
    if vers_stage == 3:
        out_str += "a%d" % vers_stage_level
    if vers_stage == 5:
        out_str += "b%d" % vers_stage_level
    if vers_stage == 6:
        out_str += "fc%d" % vers_stage_level

    return out_str


def FindKmodNameForAddr(addr):
    """ Given an address, return the name of the kext containing that address.
    """

    names = (
        mod.kmod.name
        for mod in GetAllKextSummaries()
        if (any(seg_contains(mod.segments, unsigned(addr))))
    )

    return next(names, None)


@lldb_command('showallkmods')
def ShowAllKexts(cmd_args=None):
    """ Display a summary listing of all loaded kexts (alias: showallkmods) """

    print("{: <36s} ".format("UUID") + GetKextSummary.header)

    for kmod in IterateLinkedList(kern.globals.kmod, 'next'):
        sum = FindKextSummary(unsigned(kmod.address))

        if sum:
            _ksummary = GetKextSummary(sum.kmod)
            uuid = sum.uuid
        else:
            _ksummary = GetKextSummary(kmod)
            uuid = _UNKNOWN_UUID

        print(uuid + " " + _ksummary)


@lldb_command('showallknownkmods')
def ShowAllKnownKexts(cmd_args=None):
    """ Display a summary listing of all kexts known in the system.
        This is particularly useful to find if some kext was unloaded
        before this crash'ed state.
    """
    kext_ptr = kern.globals.sKextsByID
    kext_count = unsigned(kext_ptr.count)

    print("%d kexts in sKextsByID:" % kext_count)
    print("{0: <20s} {1: <20s} {2: >5s} {3: >20s} {4: <30s}".format('OSKEXT *', 'load_addr', 'id', 'version', 'name'))
    format_string = "{0: <#020x} {1: <20s} {2: >5s} {3: >20s} {4: <30s}"

    for kext_dict in (GetObjectAtIndexFromArray(kext_ptr.dictionary, i)
                      for i in range(kext_count)):

        kext_name = str(kext_dict.key.string)
        osk = Cast(kext_dict.value, 'OSKext *')

        load_addr = "------"
        id = "--"

        if int(osk.flags.loaded):
            load_addr = "{0: <#020x}".format(osk.kmod_info)
            id = "{0: >5d}".format(osk.loadTag)

        version_num = signed(osk.version)
        version = GetOSKextVersion(version_num)
        print(format_string.format(osk, load_addr, id, version, kext_name))


def FetchDSYM(kinfo):
    """ Obtains and adds dSYM based on kext summary. """

    # No op for built-in modules.
    kernel_uuid = str(kern.globals.kernel_uuid_string)
    if kernel_uuid == kinfo.uuid:
        print("(built-in)")
        return

    # Obtain and load binary from dSYM.
    print("Fetching dSYM for %s" % kinfo.uuid)
    info = dsymForUUID(kinfo.uuid)
    if info and 'DBGSymbolRichExecutable' in info:
        print("Adding dSYM (%s) for %s" % (kinfo.uuid, info['DBGSymbolRichExecutable']))
        addDSYM(kinfo.uuid, info)
        loadDSYM(kinfo.uuid, kinfo.vmaddr, kinfo.segments)
    else:
        print("Failed to get symbol info for %s" % kinfo.uuid)


def AddKextSymsByFile(filename, slide):
    """ Add kext based on file name and slide. """
    sections = None

    filespec = lldb.SBFileSpec(filename, False)
    print("target modules add \"{:s}\"".format(filename))
    print(lldb_run_command("target modules add \"{:s}\"".format(filename)))

    loaded_module = LazyTarget.GetTarget().FindModule(filespec)
    if loaded_module.IsValid():
        uuid_str = loaded_module.GetUUIDString()
        debuglog("added module {:s} with uuid {:s}".format(filename, uuid_str))

        if slide is None:
            for k in GetAllKextSummaries():
                debuglog(k.uuid)
                if k.uuid.lower() == uuid_str.lower():
                    slide = k.vmaddr
                    sections = k.segments
                    debuglog("found the slide {:#0x} for uuid {:s}".format(k.vmaddr, k.uuid))
    if slide is None:
        raise ArgumentError("Unable to find load address for module described at {:s} ".format(filename))

    if not sections:
        cmd_str = "target modules load --file \"{:s}\" --slide {:s}".format(filename, str(slide))
        debuglog(cmd_str)
    else:
        cmd_str = "target modules load --file \"{:s}\"".format(filename)
        for s in sections:
            cmd_str += " {:s} {:#0x} ".format(s.name, s.vmaddr)
        debuglog(cmd_str)

    lldb.debugger.HandleCommand(cmd_str)

    kern.symbolicator = None
    return True


def AddKextSymsByName(kextname, all=False):
    """ Add kext based on longest name match"""

    kexts = GetLongestMatchOption(kextname, [x.name for x in GetAllKextSummaries()], True)
    if not kexts:
        print("No matching kext found.")
        return False

    if len(kexts) != 1 and not all:
        print("Ambiguous match for name: {:s}".format(kextname))
        if len(kexts) > 0:
            print("Options are:\n\t" + "\n\t".join(kexts))
        return False

    # Load all matching dSYMs
    for sum in GetAllKextSummaries():
        if sum.name in kexts:
            debuglog("matched the kext to name {:s} "
                     "and uuid {:s}".format(sum.name, sum.uuid))
            FetchDSYM(sum)

    kern.symbolicator = None
    return True


def AddKextByAddress(addr: str):
    """ Given an address, load the kext which contains that address """

    match = (
        (kinfo, seg_contains(kinfo.segments, addr))
        for kinfo in GetAllKextSummaries()
        if any(seg_contains(kinfo.segments, addr))
    )

    # Load all kexts which contain given address.
    print(GetKextSummary.header)
    for kinfo, segs in match:
        for s in segs:
            print(f"{GetKextSummary(kinfo.kmod)} segment: {s.name} offset = {(addr - s.vmaddr):0x}")
            FetchDSYM(kinfo)


def AddKextByThread(addr: str):
    """ Given a thread, load all kexts needed to symbolicate its backtrace """

    thread_value = kern.GetValueFromAddress(addr, "thread_t")
    thread_lldb_SBThread = GetLLDBThreadForKernelThread(thread_value)

    kexts_needed = dict()
    printed_header = False
    for frame in thread_lldb_SBThread.frames:
        if not frame.name:
            frame_addr = frame.GetPC()

            match = (
                (kinfo, seg_contains(kinfo.segments, frame_addr))
                for kinfo in GetAllKextSummaries()
                if any(seg_contains(kinfo.segments, frame_addr))
            )

            if match and not printed_header:
                print(GetKextSummary.header)
                printed_header = True

            for kinfo, segs in match:
                for s in segs:
                    print(f"{GetKextSummary(kinfo.kmod)} segment: {s.name} offset = {(frame_addr - s.vmaddr):0x}")
                    kexts_needed[kinfo.uuid] = kinfo
    
    print(f"Fetching {len(kexts_needed)} dSyms")
    pool = concurrent.futures.ThreadPoolExecutor()
    for kinfo in kexts_needed.values():
        pool.submit(FetchDSYM, kinfo)
    pool.shutdown(wait=True)


def AddKextByUUID(uuid: str):
    """ Loads the dSym for a specific UUID, or all dSym """

    kernel_uuid = str(kern.globals.kernel_uuid_string).lower()
    load_all_kexts = (uuid == "all")
    if not load_all_kexts and len(uuid_regex.findall(uuid)) == 0:
        raise ArgumentError("Unknown argument {:s}".format(uuid))

    pool = concurrent.futures.ThreadPoolExecutor()
    for sum in GetAllKextSummaries():
        cur_uuid = sum.uuid.lower()
        if load_all_kexts or (uuid == cur_uuid):
            if kernel_uuid != cur_uuid:
                pool.submit(FetchDSYM, sum)
    pool.shutdown(wait=True)

    kern.symbolicator = None


@lldb_command('addkext', 'AF:T:N:')
def AddKextSyms(cmd_args=[], cmd_options={}):
    """ Add kext symbols into lldb.
        This command finds symbols for a uuid and load the required executable
        Usage:
            addkext <uuid> : Load one kext based on uuid. eg. (lldb)addkext 4DD2344C0-4A81-3EAB-BDCF-FEAFED9EB73E
            addkext -F <abs/path/to/executable> : Load kext with executable
            addkext -F <abs/path/to/executable> <load_address> : Load kext with executable at specified load address
            addkext -N <name> : Load one kext that matches the name provided. eg. (lldb) addkext -N corecrypto
            addkext -N <name> -A: Load all kext that matches the name provided. eg. to load all kext with Apple in name do (lldb) addkext -N Apple -A
            addkext -T <thread>: Given a thread, load all kexts needed to symbolicate its backtrace
            addkext all    : Will load all the kext symbols - SLOW
    """

    # Load kext by file name.
    if "-F" in cmd_options:
        exec_path = cmd_options["-F"]
        exec_full_path = ResolveFSPath(exec_path)
        if not os.path.exists(exec_full_path):
            raise ArgumentError("Unable to resolve {:s}".format(exec_path))

        if not os.path.isfile(exec_full_path):
            raise ArgumentError(
                """Path is {:s} not a filepath.
                Please check that path points to executable.
                For ex. path/to/Symbols/IOUSBFamily.kext/Contents/PlugIns/AppleUSBHub.kext/Contents/MacOS/AppleUSBHub.
                Note: LLDB does not support adding kext based on directory paths like gdb used to.""".format(exec_path))

        slide_value = None
        if cmd_args:
            slide_value = cmd_args[0]
            debuglog("loading slide value from user input {:s}".format(cmd_args[0]))

        return AddKextSymsByFile(exec_full_path, slide_value)

    # Load kext by name.
    if "-N" in cmd_options:
        kext_name = cmd_options["-N"]
        return AddKextSymsByName(kext_name, "-A" in cmd_options)
    
    # Load all kexts needed to symbolicate a thread's backtrace
    if "-T" in cmd_options:
        return AddKextByThread(cmd_options["-T"])

    # Load kexts by UUID or "all"
    if len(cmd_args) < 1:
        raise ArgumentError("No arguments specified.")

    uuid = cmd_args[0].lower()
    return AddKextByUUID(uuid)


@lldb_command('addkextaddr')
def AddKextAddr(cmd_args=[]):
    """ Given an address, load the kext which contains that address
        Syntax: (lldb) addkextaddr <addr>
    """
    if len(cmd_args) < 1:
        raise ArgumentError("Insufficient arguments")

    addr = ArgumentStringToInt(cmd_args[0])
    AddKextByAddress(addr)


class KextMemoryObject(kmemory.MemoryObject):
    """ Describes an object landing in some kext """

    MO_KIND = "kext mach-o"

    def __init__(self, kmem, address, kinfo):
        super().__init__(kmem, address)
        self.kinfo = kinfo
        self.target = kmem.target

    @property
    def object_range(self):
        seg = next(seg_contains(self.kinfo.segments, self.address))
        sec = next(sec_contains(seg.sections, self.address), None)
        if sec:
            return kmemory.MemoryRange(sec.addr, sec.addr + sec.size)
        return kmemory.MemoryRange(seg.vmaddr, seg.vmaddr + seg.vmsize)

    def find_mod_seg_sect(self):
        target  = self.target
        address = self.address

        return next((
            (module, segment, next(sbsec_contains(target, segment, address), None))
            for module in target.module_iter()
            for segment in sbsec_contains(target, module.section_iter(), address)
        ), (None, None, None))

    def describe(self, verbose=False):
        from lldb.utils.symbolication import Symbolicator

        addr    = self.address
        kinfo   = self.kinfo

        sbmod, sbseg, sbsec = self.find_mod_seg_sect()
        if sbmod is None:
            FetchDSYM(kinfo)
            print()
            sbmod, sbseg, sbsec = self.find_mod_seg_sect()

        syms   = Symbolicator.InitWithSBTarget(self.target).symbolicate(addr)
        sym    = next(iter(syms)) if syms else None

        if not sbseg:
            # not really an SBSection but we only need to pretty print 'name'
            # which both have, yay duck typing
            sbseg = next(seg_contains(kinfo.segments, addr), None)

        fmt  = "Kext Symbol Info\n"
        fmt += " kext                 : {kinfo.name} ({kinfo.uuid})\n"
        fmt += " module               : {sbmod.file.basename}\n" if sbmod else ""
        fmt += " section              : {sbseg.name} {sbsec.name}\n" if sbsec else \
               " segment              : {sbseg.name}\n" if sbseg else ""
        fmt += " symbol               : {sym!s}\n" if sym else ""

        print(fmt.format(kinfo=kinfo, sbmod=sbmod, sbseg=sbseg, sbsec=sbsec, sym=sym))


class MainBinaryMemoryObject(kmemory.MemoryObject):
    """ Describes an object landing in the main kernel binary """

    MO_KIND = "kernel mach-o"

    def __init__(self, kmem, address, section):
        super().__init__(kmem, address)
        self.section = section
        self.target = kmem.target

    def _subsection(self):
        return next(sbsec_contains(self.target, self.section, self.address), None)

    @property
    def object_range(self):
        target  = self.target
        section = self._subsection() or self.section
        addr    = section.GetLoadAddress(target)
        size    = section.GetByteSize()
        return kmemory.MemoryRange(addr, addr + size)

    @property
    def module(self):
        return self.target.GetModuleAtIndex(0).GetFileSpec().GetFilename()

    @property
    def uuid(self):
        return self.target.GetModuleAtIndex(0).GetUUIDString()

    def describe(self, verbose=False):
        from lldb.utils.symbolication import Symbolicator

        subsec  = self._subsection()
        syms    = Symbolicator.InitWithSBTarget(self.target).symbolicate(self.address)
        sym     = next(iter(syms)) if syms else None

        fmt  = "Symbol Info\n"
        fmt += " module               : {mo.module}\n"
        fmt += " uuid                 : {mo.uuid}\n"
        fmt += " section              : {mo.section.name} {subsec.name}\n" if subsec else ""
        fmt += " segment              : {mo.section.name}\n" if not subsec else ""
        fmt += " symbol               : {sym}\n" if sym else ""

        print(fmt.format(mo=self, subsec=subsec, sym=sym))


@kmemory.whatis_provider
class KextWhatisProvider(kmemory.WhatisProvider):
    """ Kext ranges whatis provider """

    COST = 100

    def claims(self, address):
        target  = self.target
        mainmod = target.GetModuleAtIndex(0)

        #
        # TODO: surely the kexts can provide a better range check
        #

        return any(
            sbsec_contains(target, mainmod.section_iter(), address)
        ) or any(
            any(seg_contains(kinfo.segments, address))
            for kinfo in GetAllKextSummaries()
        )

    def lookup(self, address):
        target  = self.target
        mainmod = target.GetModuleAtIndex(0)

        section = next(sbsec_contains(target, mainmod.section_iter(), address), None)

        if section:
            return MainBinaryMemoryObject(self.kmem, address, section)

        return KextMemoryObject(self.kmem, address, next(
            kinfo
            for kinfo in GetAllKextSummaries()
            if any(seg_contains(kinfo.segments, address))
        ))


# Aliases for backward compatibility.

lldb_alias('showkmod', 'showkmodaddr')
lldb_alias('showkext', 'showkmodaddr')
lldb_alias('showkextaddr', 'showkmodaddr')
lldb_alias('showallkexts', 'showallkmods')