1# Copyright 2025 NXP
2#
3# SPDX-License-Identifier: Apache-2.0
4
5import contextlib
6import logging
7import re
8import time
9
10import pytest
11from twister_harness import DeviceAdapter, Shell
12
13logger = logging.getLogger(__name__)
14
15
16def pytest_addoption(parser) -> None:
17    """Add local parser options to pytest."""
18    parser.addoption('--hci-transport', default=None, help='Configuration HCI transport for bumble')
19
20
21@pytest.fixture(name='initialize', scope='session')
22def fixture_initialize(request, shell: Shell, dut: DeviceAdapter):
23    """Session initializtion"""
24    # Get HCI transport for bumble
25    hci = request.config.getoption('--hci-transport')
26
27    if hci is None:
28        for fixture in dut.device_config.fixtures:
29            if fixture.startswith('usb_hci:'):
30                hci = fixture.split(sep=':', maxsplit=1)[1]
31                break
32
33    assert hci is not None
34
35    shell.exec_command("bt init")
36    dut.readlines_until("Settings Loaded")
37    regex = r'(?P<bd_addr>([0-9A-Fa-f]{2}[:-]){5}([0-9A-Fa-f]{2}) *\((.*?)\))'
38    bd_addr = None
39    lines = shell.exec_command("bt id-show")
40    for line in lines:
41        m = re.search(regex, line)
42        if m:
43            bd_addr = m.group('bd_addr')
44
45    if bd_addr is None:
46        logger.error('Fail to get IUT BD address')
47        raise AssertionError
48
49    shell.exec_command("br pscan on")
50    shell.exec_command("br iscan on")
51    logger.info('initialized')
52    return hci, bd_addr
53
54
55@pytest.fixture
56def smp_initiator_dut(initialize):
57    logger.info('Start running testcase')
58    yield initialize
59    logger.info('Done')
60
61
62def app_handle_device_output(self) -> None:
63    """
64    This method is dedicated to run it in separate thread to read output
65    from device and put them into internal queue and save to log file.
66    """
67    with open(self.handler_log_path, 'a+') as log_file:
68        while self.is_device_running():
69            if self.is_device_connected():
70                output = self._read_device_output().decode(errors='replace').rstrip("\r\n")
71                if output:
72                    self._device_read_queue.put(output)
73                    logger.debug(f'{output}\n')
74                    try:
75                        log_file.write(f'{output}\n')
76                    except Exception:
77                        contextlib.suppress(Exception)
78                    log_file.flush()
79            else:
80                # ignore output from device
81                self._flush_device_output()
82                time.sleep(0.1)
83
84
85# After reboot, there may be gbk character in the console, so replace _handle_device_output to
86# handle the exception.
87DeviceAdapter._handle_device_output = app_handle_device_output
88