This is xnu-12377.1.9. See this file in:
#!/usr/bin/env python3
'''
How to Use:
Load in LLDB:
(lldb) command script import ./tests/unit/tools/fibers_lldb.py
Run the commands:
(lldb) fibers_all # Lists all existing fibers
(lldb) fibers_ready # Lists fibers in the run queue
(lldb) fibers_current # Gets information about the current fiber
(lldb) fibers_regs [id] # Get the registers saved in the fiber end (default current fiber)
'''
import lldb
import sys
def fiber_state_to_string(state):
"""Converts a fiber state integer to a human-readable string."""
states = []
if state & 0x1:
states.append("RUN")
if state & 0x2:
states.append("STOP")
if state & 0x4:
states.append("WAIT")
if state & 0x8:
states.append("JOIN")
if state & 0x10:
states.append("DEAD")
return "|".join(states) if states else "UNKNOWN"
def strip_pointer(target, addr):
"""Strips the PAC signature from a pointer."""
val = target.CreateValueFromAddress("__tmp_strip_pac", lldb.SBAddress(addr, target), target.FindFirstType("unsigned long long"))
val.SetPreferDynamicValue(lldb.eNoDynamicValues)
val = val.AddressOf()
return val.GetValueAsAddress()
def strip_fp_lr_sp(process, target, fp, lr, sp):
"""Strip manged registers in the jmp buf from the munge token and PAC."""
# get the munge token (see __longjmp impl)
frame = process.selected_thread.GetFrameAtIndex(0)
# ref. os/tsd.h
# define __TSD_PTR_MUNGE 7
munge_token = frame.EvaluateExpression('({void** r; __asm__("mrs %0, TPIDRRO_EL0" : "=r"(r)); r[7];})')
if munge_token.GetError().Fail():
return None
munge_token = munge_token.GetValueAsAddress()
fp = strip_pointer(target, fp ^ munge_token)
lr = strip_pointer(target, lr ^ munge_token)
sp = strip_pointer(target, sp ^ munge_token)
return (fp, lr, sp)
def get_fiber_info(debugger, fiber_value):
"""Retrieves information about a fiber from its SBValue address."""
if not fiber_value or not fiber_value.IsValid():
return None
fiber_address = fiber_value.GetValueAsAddress()
fiber_id_value = fiber_value.GetChildMemberWithName('id')
fiber_id_state = fiber_value.GetChildMemberWithName('state')
stack_bottom_value = fiber_value.GetChildMemberWithName('stack_bottom')
env_value = fiber_value.GetChildMemberWithName('env')
if not fiber_id_value.IsValid() or not fiber_id_state.IsValid() or not stack_bottom_value.IsValid() or not env_value.IsValid():
print(f"Error reading fiber memory")
return None
fiber_id = fiber_id_value.GetValueAsUnsigned()
fiber_state = fiber_id_state.GetValueAsUnsigned()
stack_bottom = stack_bottom_value.GetValueAsAddress()
env_address = env_value.AddressOf().GetValueAsAddress()
return {
"id": fiber_id,
"address": fiber_address,
"state": fiber_state,
"state_str": fiber_state_to_string(fiber_state),
"stack_bottom": stack_bottom,
"env_address": env_address
}
def print_stack_trace_from_jmp_buf(debugger, fiber_info, result, arch):
"""Prints a stack trace by manually walking the stack."""
target = debugger.GetSelectedTarget()
process = target.GetProcess()
env_address = fiber_info["env_address"]
error = lldb.SBError()
addr_size = target.GetAddressByteSize()
if arch == "x86_64":
result.AppendMessage(f" Error: Register printing is not supported on x86_64.")
return
elif arch == "arm64":
FP_OFFSET = 80
LR_OFFSET = 88
SP_OFFSET = 96
fp = process.ReadPointerFromMemory(env_address + FP_OFFSET, error)
lr = process.ReadPointerFromMemory(env_address + LR_OFFSET, error)
sp = process.ReadPointerFromMemory(env_address + SP_OFFSET, error)
if error.Fail():
result.AppendMessage(f" Error: Could not read registers from jmp_buf: {error}")
return
strip_res = strip_fp_lr_sp(process, target, fp, lr, sp)
if strip_res is None:
result.AppendMessage(f" Error: Could not strip FP LR or SP")
return
fp, lr, sp = strip_res
result.AppendMessage(f" Stack trace for fiber {fiber_info['id']} (manual backtrace):")
for i in range(10): # Limit to 10 frames for simplicity
symbol_context = target.ResolveSymbolContextForAddress(lldb.SBAddress(lr, target), lldb.eSymbolContextEverything)
symbol = symbol_context.GetSymbol()
if symbol:
symbol_name = symbol.GetName()
else:
symbol_name = "unknown"
result.AppendMessage(f" #{i}: 0x{lr:x} {symbol_name}")
next_fp = process.ReadPointerFromMemory(fp, error)
if error.Fail():
result.AppendMessage(f" Error: Could not read next FP from memory: {error}")
break
next_lr = process.ReadPointerFromMemory(fp + 8, error) # read next LR from the stack using current SP
if error.Fail():
result.AppendMessage(f" Error: Could not read next LR from memory: {error}")
break
if next_lr == 0:
result.AppendMessage(" End of stack or error reading memory.")
break
next_lr = strip_pointer(target, next_lr)
lr = next_lr
fp = next_fp
else:
result.AppendMessage(f" Error: Unsupported architecture: {arch}")
return
def list_fibers_all(debugger, command, result, internal_dict, arch):
"""Lists all existing fibers."""
list_fibers_from_queue(debugger, command, result, internal_dict, "fibers_existing_queue", "All Existing Fibers", arch)
def list_fibers_ready(debugger, command, result, internal_dict, arch):
"""Lists fibers in the run queue (now called 'ready')."""
list_fibers_from_queue(debugger, command, result, internal_dict, "fibers_run_queue", "Ready Fibers (Run Queue)", arch)
def list_fibers_from_queue(debugger, command, result, internal_dict, queue_name, title, arch):
"""Lists fibers from a specified queue."""
target = debugger.GetSelectedTarget()
queue_var = target.FindFirstGlobalVariable(queue_name)
if not queue_var.IsValid():
result.SetError(f"Could not find '{queue_name}' global variable.")
return
result.AppendMessage(f"{title}:")
result.AppendMessage("-------")
queue_top_value = queue_var.GetChildMemberWithName('top')
if not queue_top_value.IsValid():
result.SetError(f"Could not find '{queue_name}.top' field.")
return
fiber_value = queue_top_value
while fiber_value and fiber_value.IsValid():
fiber = get_fiber_info(debugger, fiber_value)
if fiber:
result.AppendMessage(f" ID: {fiber['id']}, Address: 0x{fiber['address']:x}, State: {fiber['state_str']}, Stack Bottom: 0x{fiber['stack_bottom']:x}")
try:
print_stack_trace_from_jmp_buf(debugger, fiber, result, arch) # Optional: Add stack traces
except Exception as err:
result.AppendMessage(f"Error: failed to get a stacktrace: {err}")
break
else:
result.AppendMessage(f"Warning: Could not read fiber at address 0x{fiber_value.GetValueAsUnsigned():x}")
break
if queue_name == "fibers_existing_queue":
next_fiber_value = fiber_value.GetChildMemberWithName('next_existing')
else:
next_fiber_value = fiber_value.GetChildMemberWithName('next')
if not next_fiber_value.IsValid():
break
fiber_value = next_fiber_value
def get_current_fiber_info(debugger, command, result, internal_dict, arch):
"""Gets and prints information about the current fiber."""
target = debugger.GetSelectedTarget()
fibers_current_var = target.FindFirstGlobalVariable("fibers_current")
if not fibers_current_var.IsValid():
result.SetError("Could not find 'fibers_current' global variable.")
return
current_fiber = get_fiber_info(debugger, fibers_current_var)
if not current_fiber:
result.AppendMessage("No current fiber.")
return
result.AppendMessage("Current Fiber Information:")
result.AppendMessage("--------------------------")
result.AppendMessage(f" ID: {current_fiber['id']}")
result.AppendMessage(f" Address: 0x{current_fiber['address']:x}")
result.AppendMessage(f" State: {current_fiber['state_str']}")
result.AppendMessage(f" Stack Bottom: 0x{current_fiber['stack_bottom']:x}")
try:
print_stack_trace_from_jmp_buf(debugger, current_fiber, result, arch) # Optional: Add stack traces
except Exception as err:
print(f"Error: failed to get a stacktrace: {err}")
def print_fiber_registers(debugger, command, result, internal_dict, arch, fiber_id=None):
"""Prints the registers of a specified fiber."""
target = debugger.GetSelectedTarget()
process = target.GetProcess()
if fiber_id is None:
fibers_current_var = target.FindFirstGlobalVariable("fibers_current")
if not fibers_current_var.IsValid():
result.SetError("Could not find 'fibers_current' global variable.")
return
current_fiber = get_fiber_info(debugger, fibers_current_var)
if not current_fiber:
result.AppendMessage("No current fiber.")
return
else:
# find the specified fiber in the existing queue
fiber_address = None
existing_queue_var = target.FindFirstGlobalVariable("fibers_existing_queue")
if not existing_queue_var.IsValid():
result.SetError("Could not find 'fibers_existing_queue' global variable.")
return
queue_top_value = existing_queue_var.GetChildMemberWithName('top')
if not queue_top_value.IsValid():
result.SetError(f"Could not find '{existing_queue_var.GetName()}.top' field.")
return
fiber_value = queue_top_value
while fiber_value and fiber_value.IsValid():
temp_fiber = get_fiber_info(debugger, fiber_value)
if temp_fiber and temp_fiber['id'] == int(fiber_id):
current_fiber = temp_fiber
break
next_fiber_value = fiber_value.GetChildMemberWithName('next_existing')
if not next_fiber_value.IsValid():
break
fiber_value = next_fiber_value
if not current_fiber:
result.AppendMessage(f"Fiber with ID {fiber_id} not found.")
return
env_address = current_fiber["env_address"]
error = lldb.SBError()
addr_size = target.GetAddressByteSize()
if arch == "x86_64":
result.AppendMessage(f" Error: Register printing is not supported on x86_64.")
return
elif arch == "arm64":
# reference: libplatform src/setjmp/arm64/setjmp.s __longjmp
X19_OFFSET = 0
X20_OFFSET = 8
X21_OFFSET = 16
X22_OFFSET = 24
X23_OFFSET = 32
X24_OFFSET = 40
X25_OFFSET = 48
X26_OFFSET = 56
X27_OFFSET = 64
X28_OFFSET = 72
FP_OFFSET = 80
LR_OFFSET = 88
SP_OFFSET = 96
x19 = process.ReadPointerFromMemory(env_address + X19_OFFSET, error)
x20 = process.ReadPointerFromMemory(env_address + X20_OFFSET, error)
x21 = process.ReadPointerFromMemory(env_address + X21_OFFSET, error)
x22 = process.ReadPointerFromMemory(env_address + X22_OFFSET, error)
x23 = process.ReadPointerFromMemory(env_address + X23_OFFSET, error)
x24 = process.ReadPointerFromMemory(env_address + X24_OFFSET, error)
x25 = process.ReadPointerFromMemory(env_address + X25_OFFSET, error)
x26 = process.ReadPointerFromMemory(env_address + X26_OFFSET, error)
x27 = process.ReadPointerFromMemory(env_address + X27_OFFSET, error)
x28 = process.ReadPointerFromMemory(env_address + X28_OFFSET, error)
fp = process.ReadPointerFromMemory(env_address + FP_OFFSET, error)
lr = process.ReadPointerFromMemory(env_address + LR_OFFSET, error)
sp = process.ReadPointerFromMemory(env_address + SP_OFFSET, error)
if error.Fail():
result.AppendMessage(f" Error: Could not read registers from jmp_buf: {error}")
return
strip_res = strip_fp_lr_sp(process, target, fp, lr, sp)
if strip_res is None:
result.AppendMessage(f" Error: Could not strip FP LR or SP")
return
fp, lr, sp = strip_res
result.AppendMessage(f"Fiber {current_fiber['id']} Registers (arm64):")
result.AppendMessage("-----------------------------")
result.AppendMessage(f" X19: 0x{x19:x}")
result.AppendMessage(f" X20: 0x{x20:x}")
result.AppendMessage(f" X21: 0x{x21:x}")
result.AppendMessage(f" X22: 0x{x22:x}")
result.AppendMessage(f" X23: 0x{x23:x}")
result.AppendMessage(f" X24: 0x{x24:x}")
result.AppendMessage(f" X25: 0x{x25:x}")
result.AppendMessage(f" X26: 0x{x26:x}")
result.AppendMessage(f" X27: 0x{x27:x}")
result.AppendMessage(f" X28: 0x{x28:x}")
result.AppendMessage(f" LR: 0x{lr:x}")
result.AppendMessage(f" FP: 0x{fp:x}")
result.AppendMessage(f" SP: 0x{sp:x}")
else:
result.AppendMessage(f" Error: Unsupported architecture: {arch}")
return
arch = None
def list_fibers_all_cmd(debugger, command, result, internal_dict):
list_fibers_all(debugger, command, result, internal_dict, arch)
def list_fibers_ready_cmd(debugger, command, result, internal_dict):
list_fibers_ready(debugger, command, result, internal_dict, arch)
def get_current_fiber_info_cmd(debugger, command, result, internal_dict):
get_current_fiber_info(debugger, command, result, internal_dict, arch)
def print_fiber_registers_cmd(debugger, command, result, internal_dict):
"""Prints the registers of a specified fiber."""
args = command.split()
fiber_id = None
if len(args) > 0:
try:
fiber_id = int(args[0])
except ValueError:
result.SetError("Invalid fiber ID. Please provide an integer.")
return
print_fiber_registers(debugger, command, result, internal_dict, arch, fiber_id)
def __lldb_init_module(debugger, internal_dict):
global arch
"""LLDB calls this function to load the script."""
target = debugger.GetSelectedTarget()
platform = target.GetPlatform()
if platform:
platform_name = platform.GetTriple()
if "x86_64" in platform_name:
arch = "x86_64"
elif "arm64" in platform_name or "aarch64" in platform_name:
arch = "arm64"
else:
print(f"Warning: Unsupported architecture: {platform_name}. Stack traces may not work.")
arch = "unknown"
else:
print("Warning: Could not get platform information. Stack traces may not work.")
arch = "unknown"
debugger.HandleCommand('command script add -f fibers_lldb.list_fibers_all_cmd fibers_all')
debugger.HandleCommand('command script add -f fibers_lldb.list_fibers_ready_cmd fibers_ready')
debugger.HandleCommand('command script add -f fibers_lldb.get_current_fiber_info_cmd fibers_current')
debugger.HandleCommand('command script add -f fibers_lldb.print_fiber_registers_cmd fibers_regs')
print("The 'fibers_all', 'fibers_ready', 'fibers_current', and 'fibers_regs' commands have been added.")
print(f"Detected architecture: {arch}")