1# Copyright (c) 2023 Nordic Semiconductor ASA
2#
3# SPDX-License-Identifier: Apache-2.0
4
5import logging
6from pathlib import Path
7from typing import Generator, Type
8
9import pytest
10import time
11
12from twister_harness.device.device_adapter import DeviceAdapter
13from twister_harness.device.factory import DeviceFactory
14from twister_harness.twister_harness_config import DeviceConfig, TwisterHarnessConfig
15from twister_harness.helpers.shell import Shell
16from twister_harness.helpers.mcumgr import MCUmgr, MCUmgrBle
17from twister_harness.helpers.utils import find_in_config
18
19logger = logging.getLogger(__name__)
20
21
22@pytest.fixture(scope='session')
23def twister_harness_config(request: pytest.FixtureRequest) -> TwisterHarnessConfig:
24    """Return twister_harness_config object."""
25    twister_harness_config: TwisterHarnessConfig = request.config.twister_harness_config  # type: ignore
26    return twister_harness_config
27
28
29@pytest.fixture(scope='session')
30def device_object(twister_harness_config: TwisterHarnessConfig) -> Generator[DeviceAdapter, None, None]:
31    """Return device object - without run application."""
32    device_config: DeviceConfig = twister_harness_config.devices[0]
33    device_type = device_config.type
34    device_class: Type[DeviceAdapter] = DeviceFactory.get_device(device_type)
35    device_object = device_class(device_config)
36    try:
37        yield device_object
38    finally:  # to make sure we close all running processes execution
39        device_object.close()
40
41
42def determine_scope(fixture_name, config):
43    if dut_scope := config.getoption("--dut-scope", None):
44        return dut_scope
45    return 'function'
46
47
48@pytest.fixture(scope=determine_scope)
49def unlaunched_dut(
50    request: pytest.FixtureRequest, device_object: DeviceAdapter
51) -> Generator[DeviceAdapter, None, None]:
52    """Return device object - with logs connected, but not run"""
53    device_object.initialize_log_files(request.node.name)
54    try:
55        yield device_object
56    finally:  # to make sure we close all running processes execution
57        device_object.close()
58
59
60@pytest.fixture(scope=determine_scope)
61def dut(request: pytest.FixtureRequest, device_object: DeviceAdapter) -> Generator[DeviceAdapter, None, None]:
62    """Return launched device - with run application."""
63    device_object.initialize_log_files(request.node.name)
64    try:
65        device_object.launch()
66        yield device_object
67    finally:  # to make sure we close all running processes execution
68        device_object.close()
69
70
71@pytest.fixture(scope=determine_scope)
72def shell(dut: DeviceAdapter) -> Shell:
73    """Return ready to use shell interface"""
74    shell = Shell(dut, timeout=20.0)
75    if prompt := find_in_config(Path(dut.device_config.app_build_dir) / 'zephyr' / '.config',
76                                'CONFIG_SHELL_PROMPT_UART'):
77        shell.prompt = prompt
78    logger.info('Wait for prompt')
79    if not shell.wait_for_prompt():
80        pytest.fail('Prompt not found')
81    if dut.device_config.type == 'hardware':
82        # after booting up the device, there might appear additional logs
83        # after first prompt, so we need to wait and clear the buffer
84        time.sleep(0.5)
85        dut.clear_buffer()
86    return shell
87
88
89@pytest.fixture()
90def mcumgr(device_object: DeviceAdapter) -> Generator[MCUmgr, None, None]:
91    """Fixture to create an MCUmgr instance for serial connection."""
92    if not MCUmgr.is_available():
93        pytest.skip('mcumgr not available')
94    yield MCUmgr.create_for_serial(device_object.device_config.serial)
95
96
97@pytest.fixture()
98def mcumgr_ble(device_object: DeviceAdapter) -> Generator[MCUmgrBle, None, None]:
99    """Fixture to create an MCUmgr instance for BLE connection."""
100    if not MCUmgrBle.is_available():
101        pytest.skip('mcumgr for ble not available')
102
103    for fixture in device_object.device_config.fixtures:
104        if fixture.startswith('usb_hci:'):
105            hci_name = fixture.split(':', 1)[1]
106            break
107    else:
108        pytest.skip('usb_hci fixture not found')
109
110    try:
111        hci_index = int(hci_name.split('hci')[-1])
112    except ValueError:
113        pytest.skip(f'Invalid HCI name: {hci_name}. Expected format is "hciX".')
114
115    peer_name = find_in_config(
116        Path(device_object.device_config.app_build_dir) / 'zephyr' / '.config', 'CONFIG_BT_DEVICE_NAME'
117    ) or 'Zephyr'
118
119    yield MCUmgrBle.create_for_ble(hci_index, peer_name)
120