1# Copyright 2025 NXP 2# 3# SPDX-License-Identifier: Apache-2.0 4 5import contextlib 6import logging 7import re 8import time 9 10import pytest 11from twister_harness import DeviceAdapter, Shell 12 13logger = logging.getLogger(__name__) 14 15 16def pytest_addoption(parser) -> None: 17 """Add local parser options to pytest.""" 18 parser.addoption('--hci-transport', default=None, help='Configuration HCI transport for bumble') 19 20 21@pytest.fixture(name='initialize', scope='session') 22def fixture_initialize(request, shell: Shell, dut: DeviceAdapter): 23 """Session initializtion""" 24 # Get HCI transport for bumble 25 hci = request.config.getoption('--hci-transport') 26 27 if hci is None: 28 for fixture in dut.device_config.fixtures: 29 if fixture.startswith('usb_hci:'): 30 hci = fixture.split(sep=':', maxsplit=1)[1] 31 break 32 33 assert hci is not None 34 35 shell.exec_command("bt init") 36 dut.readlines_until("Settings Loaded") 37 regex = r'(?P<bd_addr>([0-9A-Fa-f]{2}[:-]){5}([0-9A-Fa-f]{2}) *\((.*?)\))' 38 bd_addr = None 39 lines = shell.exec_command("bt id-show") 40 for line in lines: 41 m = re.search(regex, line) 42 if m: 43 bd_addr = m.group('bd_addr') 44 45 if bd_addr is None: 46 logger.error('Fail to get IUT BD address') 47 raise AssertionError 48 49 shell.exec_command("br pscan on") 50 shell.exec_command("br iscan on") 51 logger.info('initialized') 52 return hci, bd_addr 53 54 55@pytest.fixture 56def smp_initiator_dut(initialize): 57 logger.info('Start running testcase') 58 yield initialize 59 logger.info('Done') 60 61 62def app_handle_device_output(self) -> None: 63 """ 64 This method is dedicated to run it in separate thread to read output 65 from device and put them into internal queue and save to log file. 66 """ 67 with open(self.handler_log_path, 'a+') as log_file: 68 while self.is_device_running(): 69 if self.is_device_connected(): 70 output = self._read_device_output().decode(errors='replace').rstrip("\r\n") 71 if output: 72 self._device_read_queue.put(output) 73 logger.debug(f'{output}\n') 74 try: 75 log_file.write(f'{output}\n') 76 except Exception: 77 contextlib.suppress(Exception) 78 log_file.flush() 79 else: 80 # ignore output from device 81 self._flush_device_output() 82 time.sleep(0.1) 83 84 85# After reboot, there may be gbk character in the console, so replace _handle_device_output to 86# handle the exception. 87DeviceAdapter._handle_device_output = app_handle_device_output 88