##
# Copyright (c) 2023 Apple Inc. All rights reserved.
#
# @APPLE_OSREFERENCE_LICENSE_HEADER_START@
#
# This file contains Original Code and/or Modifications of Original Code
# as defined in and that are subject to the Apple Public Source License
# Version 2.0 (the 'License'). You may not use this file except in
# compliance with the License. The rights granted to you under the License
# may not be used to create, or enable the creation or redistribution of,
# unlawful or unlicensed copies of an Apple operating system, or to
# circumvent, violate, or enable the circumvention or violation of, any
# terms of an Apple operating system software license agreement.
#
# Please obtain a copy of the License at
# http://www.opensource.apple.com/apsl/ and read it before using this file.
#
# The Original Code and all software distributed under the License are
# distributed on an 'AS IS' basis, WITHOUT WARRANTY OF ANY KIND, EITHER
# EXPRESS OR IMPLIED, AND APPLE HEREBY DISCLAIMS ALL SUCH WARRANTIES,
# INCLUDING WITHOUT LIMITATION, ANY WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE, QUIET ENJOYMENT OR NON-INFRINGEMENT.
# Please see the License for the specific language governing rights and
# limitations under the License.
#
# @APPLE_OSREFERENCE_LICENSE_HEADER_END@
##
# pylint: disable=invalid-name
# pylint: disable=protected-access
""" Test process.py """
import contextlib
import io
import unittest
from lldbtest.testcase import LLDBTestCase
from lldbmock.utils import lookup_type
from lldbmock.valuemock import ValueMock
import process as tst_process
import utils as tst_utils
from process import ShowTask, ShowProc, INVALID_PROC_SUMMARY, INVALID_TASK_SUMMARY
class ProcessTest(LLDBTestCase):
""" Tests for process.py module """
ROUNDED_UP_PROC_SIZE = 2048
def test_GetProcPid(self):
""" Test a pid gets returned. """
proc = ValueMock.createFromType('proc')
proc.p_pid = 12345
self.assertEqual(tst_process.GetProcPID(proc), 12345)
self.assertEqual(tst_process.GetProcPID(None), -1)
def test_GetNameShort(self):
""" Test fallback to short name. """
proc = ValueMock.createFromType('proc')
proc.p_name = ""
proc.p_comm = "short-proc"
self.assertEqual(tst_process.GetProcName(proc), "short-proc")
def test_GetNameLong(self):
""" Test that long name is preferred. """
proc = ValueMock.createFromType('proc')
proc.p_name = "long-proc"
proc.p_comm = "short-proc"
self.assertEqual(tst_process.GetProcName(proc), "long-proc")
def test_GetNameInvalid(self):
""" Test that invalid proc returns default name. """
self.assertEqual(
tst_process.GetProcName(None),
tst_process.NO_PROC_NAME
)
def test_ASTValuesInSync(self):
""" Test that thread states cover all values defined in kernel. """
# Compare all values with AST chars dictionary.
macro = tst_process._AST_CHARS.keys()
# Add rest of values from the enum in kernel.
enum = lookup_type('ast_t')
self.assertTrue(enum.IsValid())
kernel = [
k.GetValueAsUnsigned()
for k in enum.get_enum_members_array()
]
# Assert that both sides handle identical set of flags.
self.assertSetEqual(set(macro), set(kernel),
"thread state chars mismatch")
def test_GetAstSummary(self):
""" Test AST string genration. """
ast = tst_utils.GetEnumValue('ast_t', 'AST_DTRACE')
ast |= tst_utils.GetEnumValue('ast_t', 'AST_TELEMETRY_KERNEL')
# Check valid AST
self.assertEqual(tst_process.GetASTSummary(ast), 'TD')
# Check that we never touch unsupported bits in an invalid value
ast = 0xffffffff
aststr = ''.join(char for _, char in tst_process._AST_CHARS.items())
self.assertEqual(tst_process.GetASTSummary(ast), aststr)
# Mock global variable value (accessed by the macro being used)
@unittest.mock.patch('xnu.kern.globals.proc_struct_size', ROUNDED_UP_PROC_SIZE)
def test_ProcWithoutTask(self):
""" Test that ShowTask handles process-less task gracefully, and vice-versa for ShowProc """
self.reset_mocks()
PROC_ADDR = 0xffffffff90909090
TASK_ADDR = PROC_ADDR + self.ROUNDED_UP_PROC_SIZE
PROC_RO_ADDR = 0xffffff0040404040
# Create fake proc_t instance at 0xffffffff90909090
proc = self.create_mock('proc', PROC_ADDR).fromDict({
'p_pid': 12345,
'p_lflag': 0, # P_LHASTASK not set
'p_comm': b'test-proc\0'
})
# Create task which is expected to be placed + sizeof(proc)
task = self.create_mock('task', TASK_ADDR).fromDict({
'effective_policy': {
'tep_sup_active': 0,
'tep_darwinbg': 0,
'tep_lowpri_cpu': 1
},
't_flags': 0 # TF_HASPROC not set
})
# Created shared proc_ro reference from both task/proc
self.create_mock('proc_ro', PROC_RO_ADDR)
proc.p_proc_ro = PROC_RO_ADDR
task.bsd_info_ro = PROC_RO_ADDR
# Capture stdout and check expected output
stdout = io.StringIO()
with contextlib.redirect_stdout(stdout):
ShowTask([f'{TASK_ADDR:#x}'])
ShowProc([f'{PROC_ADDR:#x}'])
self.assertIn(INVALID_PROC_SUMMARY, stdout.getvalue())
self.assertIn(INVALID_TASK_SUMMARY, stdout.getvalue())