This is xnu-12377.1.9. See this file in:
#!/usr/bin/env python3
'''
How to Use:

Load in LLDB:
(lldb) command script import ./tests/unit/tools/fibers_lldb.py

Run the commands:
(lldb) fibers_all         # Lists all existing fibers
(lldb) fibers_ready       # Lists fibers in the run queue
(lldb) fibers_current     # Gets information about the current fiber
(lldb) fibers_regs [id]   # Get the registers saved in the fiber end (default current fiber)
'''

import lldb
import sys

def fiber_state_to_string(state):
    """Converts a fiber state integer to a human-readable string."""
    states = []
    if state & 0x1:
        states.append("RUN")
    if state & 0x2:
        states.append("STOP")
    if state & 0x4:
        states.append("WAIT")
    if state & 0x8:
        states.append("JOIN")
    if state & 0x10:
        states.append("DEAD")
    return "|".join(states) if states else "UNKNOWN"

def strip_pointer(target, addr):
    """Strips the PAC signature from a pointer."""
    val = target.CreateValueFromAddress("__tmp_strip_pac", lldb.SBAddress(addr, target), target.FindFirstType("unsigned long long"))
    val.SetPreferDynamicValue(lldb.eNoDynamicValues)
    val = val.AddressOf()
    return val.GetValueAsAddress()

def strip_fp_lr_sp(process, target, fp, lr, sp):
    """Strip manged registers in the jmp buf from the munge token and PAC."""
    # get the munge token (see __longjmp impl)
    frame = process.selected_thread.GetFrameAtIndex(0)

    # ref. os/tsd.h
    # define __TSD_PTR_MUNGE 7
    munge_token = frame.EvaluateExpression('({void** r; __asm__("mrs %0, TPIDRRO_EL0" : "=r"(r)); r[7];})')
    if munge_token.GetError().Fail():
        return None
    munge_token = munge_token.GetValueAsAddress()

    fp  = strip_pointer(target, fp ^ munge_token)
    lr  = strip_pointer(target, lr ^ munge_token)
    sp  = strip_pointer(target, sp ^ munge_token)
    return (fp, lr, sp)

def get_fiber_info(debugger, fiber_value):
    """Retrieves information about a fiber from its SBValue address."""
    if not fiber_value or not fiber_value.IsValid():
        return None

    fiber_address = fiber_value.GetValueAsAddress()

    fiber_id_value = fiber_value.GetChildMemberWithName('id')
    fiber_id_state = fiber_value.GetChildMemberWithName('state')
    stack_bottom_value = fiber_value.GetChildMemberWithName('stack_bottom')
    env_value = fiber_value.GetChildMemberWithName('env')
    if not fiber_id_value.IsValid() or not fiber_id_state.IsValid() or not stack_bottom_value.IsValid() or not env_value.IsValid():
        print(f"Error reading fiber memory")
        return None

    fiber_id = fiber_id_value.GetValueAsUnsigned()
    fiber_state = fiber_id_state.GetValueAsUnsigned()
    stack_bottom = stack_bottom_value.GetValueAsAddress()
    env_address = env_value.AddressOf().GetValueAsAddress()

    return {
        "id": fiber_id,
        "address": fiber_address,
        "state": fiber_state,
        "state_str": fiber_state_to_string(fiber_state),
        "stack_bottom": stack_bottom,
        "env_address": env_address
    }

def print_stack_trace_from_jmp_buf(debugger, fiber_info, result, arch):
    """Prints a stack trace by manually walking the stack."""
    target = debugger.GetSelectedTarget()
    process = target.GetProcess()
    env_address = fiber_info["env_address"]
    error = lldb.SBError()
    addr_size = target.GetAddressByteSize()

    if arch == "x86_64":
        result.AppendMessage(f"  Error: Register printing is not supported on x86_64.")
        return

    elif arch == "arm64":
        FP_OFFSET = 80
        LR_OFFSET  = 88
        SP_OFFSET = 96

        fp  = process.ReadPointerFromMemory(env_address + FP_OFFSET, error)
        lr  = process.ReadPointerFromMemory(env_address + LR_OFFSET, error)
        sp  = process.ReadPointerFromMemory(env_address + SP_OFFSET, error)

        if error.Fail():
            result.AppendMessage(f"  Error: Could not read registers from jmp_buf: {error}")
            return
    
        strip_res = strip_fp_lr_sp(process, target, fp, lr, sp)
        if strip_res is None:
            result.AppendMessage(f"  Error: Could not strip FP LR or SP")
            return
        fp, lr, sp = strip_res

        result.AppendMessage(f"  Stack trace for fiber {fiber_info['id']} (manual backtrace):")

        for i in range(10):  # Limit to 10 frames for simplicity
            symbol_context = target.ResolveSymbolContextForAddress(lldb.SBAddress(lr, target), lldb.eSymbolContextEverything)
            symbol = symbol_context.GetSymbol()
            if symbol:
                symbol_name = symbol.GetName()
            else:
                symbol_name = "unknown"
            result.AppendMessage(f"    #{i}: 0x{lr:x} {symbol_name}")

            next_fp = process.ReadPointerFromMemory(fp, error)
            if error.Fail():
                result.AppendMessage(f"  Error: Could not read next FP from memory: {error}")
                break

            next_lr = process.ReadPointerFromMemory(fp + 8, error) # read next LR from the stack using current SP
            if error.Fail():
                result.AppendMessage(f"  Error: Could not read next LR from memory: {error}")
                break

            if next_lr == 0:
                result.AppendMessage("    End of stack or error reading memory.")
                break

            next_lr = strip_pointer(target, next_lr)
            lr = next_lr
            fp = next_fp

    else:
        result.AppendMessage(f"  Error: Unsupported architecture: {arch}")
        return


def list_fibers_all(debugger, command, result, internal_dict, arch):
    """Lists all existing fibers."""
    list_fibers_from_queue(debugger, command, result, internal_dict, "fibers_existing_queue", "All Existing Fibers", arch)

def list_fibers_ready(debugger, command, result, internal_dict, arch):
    """Lists fibers in the run queue (now called 'ready')."""
    list_fibers_from_queue(debugger, command, result, internal_dict, "fibers_run_queue", "Ready Fibers (Run Queue)", arch)

def list_fibers_from_queue(debugger, command, result, internal_dict, queue_name, title, arch):
    """Lists fibers from a specified queue."""

    target = debugger.GetSelectedTarget()

    queue_var = target.FindFirstGlobalVariable(queue_name)
    if not queue_var.IsValid():
        result.SetError(f"Could not find '{queue_name}' global variable.")
        return

    result.AppendMessage(f"{title}:")
    result.AppendMessage("-------")

    queue_top_value = queue_var.GetChildMemberWithName('top')
    if not queue_top_value.IsValid():
        result.SetError(f"Could not find '{queue_name}.top' field.")
        return

    fiber_value = queue_top_value
    while fiber_value and fiber_value.IsValid():
        fiber = get_fiber_info(debugger, fiber_value)
        if fiber:
            result.AppendMessage(f"  ID: {fiber['id']}, Address: 0x{fiber['address']:x}, State: {fiber['state_str']}, Stack Bottom: 0x{fiber['stack_bottom']:x}")
            try:
                print_stack_trace_from_jmp_buf(debugger, fiber, result, arch)  # Optional: Add stack traces
            except Exception as err:
                result.AppendMessage(f"Error: failed to get a stacktrace: {err}")
                break
        else:
            result.AppendMessage(f"Warning: Could not read fiber at address 0x{fiber_value.GetValueAsUnsigned():x}")
            break

        if queue_name == "fibers_existing_queue":
            next_fiber_value = fiber_value.GetChildMemberWithName('next_existing')
        else:
            next_fiber_value = fiber_value.GetChildMemberWithName('next')

        if not next_fiber_value.IsValid():
            break

        fiber_value = next_fiber_value

def get_current_fiber_info(debugger, command, result, internal_dict, arch):
    """Gets and prints information about the current fiber."""
    target = debugger.GetSelectedTarget()

    fibers_current_var = target.FindFirstGlobalVariable("fibers_current")
    if not fibers_current_var.IsValid():
        result.SetError("Could not find 'fibers_current' global variable.")
        return

    current_fiber = get_fiber_info(debugger, fibers_current_var)

    if not current_fiber:
        result.AppendMessage("No current fiber.")
        return

    result.AppendMessage("Current Fiber Information:")
    result.AppendMessage("--------------------------")
    result.AppendMessage(f"  ID: {current_fiber['id']}")
    result.AppendMessage(f"  Address: 0x{current_fiber['address']:x}")
    result.AppendMessage(f"  State: {current_fiber['state_str']}")
    result.AppendMessage(f"  Stack Bottom: 0x{current_fiber['stack_bottom']:x}")
    try:
        print_stack_trace_from_jmp_buf(debugger, current_fiber, result, arch)  # Optional: Add stack traces
    except Exception as err:
        print(f"Error: failed to get a stacktrace: {err}")

def print_fiber_registers(debugger, command, result, internal_dict, arch, fiber_id=None):
    """Prints the registers of a specified fiber."""
    target = debugger.GetSelectedTarget()
    process = target.GetProcess()

    if fiber_id is None:
        fibers_current_var = target.FindFirstGlobalVariable("fibers_current")
        if not fibers_current_var.IsValid():
            result.SetError("Could not find 'fibers_current' global variable.")
            return

        current_fiber = get_fiber_info(debugger, fibers_current_var)
        if not current_fiber:
            result.AppendMessage("No current fiber.")
            return

    else:
        # find the specified fiber in the existing queue
        fiber_address = None
        existing_queue_var = target.FindFirstGlobalVariable("fibers_existing_queue")
        if not existing_queue_var.IsValid():
            result.SetError("Could not find 'fibers_existing_queue' global variable.")
            return

        queue_top_value = existing_queue_var.GetChildMemberWithName('top')
        if not queue_top_value.IsValid():
            result.SetError(f"Could not find '{existing_queue_var.GetName()}.top' field.")
            return

        fiber_value = queue_top_value
        while fiber_value and fiber_value.IsValid():
            temp_fiber = get_fiber_info(debugger, fiber_value)
            if temp_fiber and temp_fiber['id'] == int(fiber_id):
                current_fiber = temp_fiber
                break

            next_fiber_value = fiber_value.GetChildMemberWithName('next_existing')
            if not next_fiber_value.IsValid():
                break

            fiber_value = next_fiber_value

        if not current_fiber:
            result.AppendMessage(f"Fiber with ID {fiber_id} not found.")
            return

    env_address = current_fiber["env_address"]
    error = lldb.SBError()
    addr_size = target.GetAddressByteSize()

    if arch == "x86_64":
        result.AppendMessage(f"  Error: Register printing is not supported on x86_64.")
        return

    elif arch == "arm64":
        # reference: libplatform src/setjmp/arm64/setjmp.s __longjmp
        X19_OFFSET = 0
        X20_OFFSET = 8
        X21_OFFSET = 16
        X22_OFFSET = 24
        X23_OFFSET = 32
        X24_OFFSET = 40
        X25_OFFSET = 48
        X26_OFFSET = 56
        X27_OFFSET = 64
        X28_OFFSET = 72

        FP_OFFSET = 80
        LR_OFFSET  = 88
        SP_OFFSET = 96
        
        x19 = process.ReadPointerFromMemory(env_address + X19_OFFSET, error)
        x20 = process.ReadPointerFromMemory(env_address + X20_OFFSET, error)
        x21 = process.ReadPointerFromMemory(env_address + X21_OFFSET, error)
        x22 = process.ReadPointerFromMemory(env_address + X22_OFFSET, error)
        x23 = process.ReadPointerFromMemory(env_address + X23_OFFSET, error)
        x24 = process.ReadPointerFromMemory(env_address + X24_OFFSET, error)
        x25 = process.ReadPointerFromMemory(env_address + X25_OFFSET, error)
        x26 = process.ReadPointerFromMemory(env_address + X26_OFFSET, error)
        x27 = process.ReadPointerFromMemory(env_address + X27_OFFSET, error)
        x28 = process.ReadPointerFromMemory(env_address + X28_OFFSET, error)

        fp  = process.ReadPointerFromMemory(env_address + FP_OFFSET, error)
        lr  = process.ReadPointerFromMemory(env_address + LR_OFFSET, error)
        sp  = process.ReadPointerFromMemory(env_address + SP_OFFSET, error)

        if error.Fail():
            result.AppendMessage(f"  Error: Could not read registers from jmp_buf: {error}")
            return
    
        strip_res = strip_fp_lr_sp(process, target, fp, lr, sp)
        if strip_res is None:
            result.AppendMessage(f"  Error: Could not strip FP LR or SP")
            return
        fp, lr, sp = strip_res

        result.AppendMessage(f"Fiber {current_fiber['id']} Registers (arm64):")
        result.AppendMessage("-----------------------------")
        result.AppendMessage(f"  X19: 0x{x19:x}")
        result.AppendMessage(f"  X20: 0x{x20:x}")
        result.AppendMessage(f"  X21: 0x{x21:x}")
        result.AppendMessage(f"  X22: 0x{x22:x}")
        result.AppendMessage(f"  X23: 0x{x23:x}")
        result.AppendMessage(f"  X24: 0x{x24:x}")
        result.AppendMessage(f"  X25: 0x{x25:x}")
        result.AppendMessage(f"  X26: 0x{x26:x}")
        result.AppendMessage(f"  X27: 0x{x27:x}")
        result.AppendMessage(f"  X28: 0x{x28:x}")
        result.AppendMessage(f"  LR:  0x{lr:x}")
        result.AppendMessage(f"  FP:  0x{fp:x}")
        result.AppendMessage(f"  SP:  0x{sp:x}")
    else:
        result.AppendMessage(f"  Error: Unsupported architecture: {arch}")
        return

arch = None

def list_fibers_all_cmd(debugger, command, result, internal_dict):
    list_fibers_all(debugger, command, result, internal_dict, arch)

def list_fibers_ready_cmd(debugger, command, result, internal_dict):
    list_fibers_ready(debugger, command, result, internal_dict, arch)

def get_current_fiber_info_cmd(debugger, command, result, internal_dict):
    get_current_fiber_info(debugger, command, result, internal_dict, arch)

def print_fiber_registers_cmd(debugger, command, result, internal_dict):
    """Prints the registers of a specified fiber."""
    args = command.split()
    fiber_id = None
    if len(args) > 0:
        try:
            fiber_id = int(args[0])
        except ValueError:
            result.SetError("Invalid fiber ID. Please provide an integer.")
            return

    print_fiber_registers(debugger, command, result, internal_dict, arch, fiber_id)

def __lldb_init_module(debugger, internal_dict):
    global arch
    """LLDB calls this function to load the script."""

    target = debugger.GetSelectedTarget()
    platform = target.GetPlatform()
    if platform:
        platform_name = platform.GetTriple()
        if "x86_64" in platform_name:
            arch = "x86_64"
        elif "arm64" in platform_name or "aarch64" in platform_name:
            arch = "arm64"
        else:
            print(f"Warning: Unsupported architecture: {platform_name}. Stack traces may not work.")
            arch = "unknown"
    else:
        print("Warning: Could not get platform information. Stack traces may not work.")
        arch = "unknown"

    debugger.HandleCommand('command script add -f fibers_lldb.list_fibers_all_cmd fibers_all')
    debugger.HandleCommand('command script add -f fibers_lldb.list_fibers_ready_cmd fibers_ready')
    debugger.HandleCommand('command script add -f fibers_lldb.get_current_fiber_info_cmd fibers_current')
    debugger.HandleCommand('command script add -f fibers_lldb.print_fiber_registers_cmd fibers_regs')
    print("The 'fibers_all', 'fibers_ready', 'fibers_current', and 'fibers_regs' commands have been added.")
    print(f"Detected architecture: {arch}")