1# Copyright 2025 NXP
2#
3# SPDX-License-Identifier: Apache-2.0
4import asyncio
5import logging
6import sys
7
8from bumble import hci
9from bumble.core import DeviceClass
10from bumble.device import Device
11from bumble.hci import Address, HCI_Write_Page_Timeout_Command
12from bumble.snoop import BtSnooper
13from bumble.transport import open_transport_or_link
14from twister_harness import DeviceAdapter, Shell
15
16logger = logging.getLogger(__name__)
17
18
19async def device_power_on(device) -> None:
20    while True:
21        try:
22            await device.power_on()
23            break
24        except Exception:
25            continue
26
27
28# wait for shell response
29async def _wait_for_shell_response(dut, response, max_wait_sec=20):
30    """
31    _wait_for_shell_response() is used to wait for shell response.
32    It will return after finding a specific 'response' or waiting long enough.
33    :param dut:
34    :param response: shell response that you want to monitor.
35    :param max_wait_sec: maximum waiting time
36    :return: found: whether the 'response' is found; lines: DUT shell response
37    """
38    found = False
39    lines = []
40    try:
41        for _ in range(0, max_wait_sec):
42            read_lines = dut.readlines()
43            for line in read_lines:
44                if response in line:
45                    found = True
46                    break
47            lines = lines + read_lines
48            await asyncio.sleep(1)
49        logger.info(f'{str(lines)}')
50    except Exception as e:
51        logger.error(f'{e}!', exc_info=True)
52        raise e
53    return found, lines
54
55
56# interact between script and DUT
57async def send_cmd_to_iut(
58    shell, dut, cmd, response=None, expect_to_find_resp=True, max_wait_sec=20
59):
60    """
61    send_cmd_to_iut() is used to send shell cmd to DUT and monitor the response.
62    It can choose whether to monitor the shell response of DUT.
63    Use 'expect_to_find_resp' to set whether to expect the response to contain certain 'response'.
64    'max_wait_sec' indicates the maximum waiting time.
65    For 'expect_to_find_resp=False', this is useful
66    because we need to wait long enough to get enough response
67    to more accurately judge that the response does not contain specific characters.
68
69    :param shell:
70    :param dut:
71    :param cmd: shell cmd sent to DUT
72    :param response: shell response that you want to monitor.
73                     'None' means not to monitor any response.
74    :param expect_to_find_resp: set whether to expect the response to contain certain 'response'
75    :param max_wait_sec: maximum monitoring time
76    :return: DUT shell response
77    """
78    shell.exec_command(cmd)
79    if response is not None:
80        found, lines = await _wait_for_shell_response(dut, response, max_wait_sec)
81    else:
82        found = True
83        lines = ''
84    assert found is expect_to_find_resp
85    return lines
86
87
88# set limited discoverab mode of dongle
89async def set_limited_discoverable(device, discoverable=True):
90    # Read current class of device
91    response = await device.send_command(
92        hci.HCI_Command(
93            op_code=0x0C23,  # Read Class of Device
94            parameters=b'',
95        )
96    )
97    current_cod = response.return_parameters.class_of_device
98
99    if discoverable:
100        # set Limited Discoverable Mode (bit 13)
101        new_cod = (current_cod | 0x2000).to_bytes(3, byteorder='little')
102        # Limited Inquiry Access Code(LIAC) = 0x9E8B00
103        iac = hci.HCI_LIMITED_DEDICATED_INQUIRY_LAP.to_bytes(3, byteorder='little')
104    else:
105        mask = ~0x2000
106        new_cod = (current_cod & mask).to_bytes(3, byteorder='little')
107        # General Inquiry Access Code(GIAC) = 0x9E8B33
108        iac = hci.HCI_GENERAL_INQUIRY_LAP.to_bytes(3, byteorder='little')
109
110    await device.send_command(
111        hci.HCI_Command(
112            op_code=0x0C24,  # Write Class of Device
113            parameters=new_cod,
114        )
115    )
116
117    await device.send_command(
118        hci.HCI_Command(
119            op_code=0x0C3A,  # Write Current IAC LAP
120            parameters=bytes([0x01]) + iac,  # num_current_iac=1, iac_lap
121        )
122    )
123
124    device.discoverable = discoverable
125
126
127# dongle listener for receiving scan results
128class DiscoveryListener(Device.Listener):
129    def __init__(self):
130        self.discovered_addresses = set()
131
132    def on_inquiry_result(self, address, class_of_device, data, rssi):
133        DeviceClass.split_class_of_device(class_of_device)
134        found_address = str(address).replace(r'/P', '')
135        logger.info(f'Found addr: {found_address}')
136        self.discovered_addresses.add(found_address)
137
138    def has_found_target_addr(self, target_addr):
139        return str(target_addr).upper() in self.discovered_addresses
140
141
142async def tc_gap_c_1(hci_port, shell, dut, address) -> None:
143    case_name = 'GAP-C-1: General Inquiry followed by Connection and Active Disconnection'
144    logger.info(f'<<< Start {case_name} ...')
145
146    async with await open_transport_or_link(hci_port) as hci_transport:
147        # init Dongle bluetooth
148        device = Device.with_hci(
149            'Bumble',
150            Address('F0:F1:F2:F3:F4:F5'),
151            hci_transport.source,
152            hci_transport.sink,
153        )
154        device.classic_enabled = True
155        device.le_enabled = False
156        device.listener = DiscoveryListener()
157
158        with open(f"bumble_hci_{sys._getframe().f_code.co_name}.log", "wb") as snoop_file:
159            device.host.snooper = BtSnooper(snoop_file)
160            await device_power_on(device)
161            dongle_address = str(device.public_address).replace(r'/P', '')
162            # Start of Initial Condition
163            await device.set_discoverable(True)  # Set peripheral as discoverable
164            await device.set_connectable(True)  # Set peripheral as connectable
165            await device.send_command(HCI_Write_Page_Timeout_Command(page_timeout=0xFFFF))
166            shell.exec_command("bt disconnect")
167            # End of Initial Condition
168
169            # Test Start
170            logger.info("Step 1: DUT initiates general inquiry")
171            # Use limited inquiry as the control group
172            await send_cmd_to_iut(shell, dut, "br discovery on 8 limited", dongle_address, False)
173            await send_cmd_to_iut(shell, dut, "br discovery on", dongle_address)
174
175            logger.info("Step 2: Tester responds to the inquiry")
176            logger.info("This is a passive step and it always succeed.")
177
178            logger.info("Step 3: DUT sends connect request to tester")
179            await send_cmd_to_iut(shell, dut, f"br connect {dongle_address}", "Connected")
180
181            logger.info(
182                "Step 5: Tester accepts the connection request and connected event is received"
183            )
184            logger.info("This is a passive step and it always succeed.")
185
186            logger.info("Step 6: DUT initiates disconnection")
187            await send_cmd_to_iut(shell, dut, "bt disconnect", "Disconnected")
188
189            logger.info("Step 7: Connection is terminated")
190            logger.info("This is a passive step and it is verified in previous step.")
191
192
193async def tc_gap_c_2(hci_port, shell, dut, address) -> None:
194    case_name = 'GAP-C-2: General Inquiry followed by Connection and Passive Disconnection'
195    logger.info(f'<<< Start {case_name} ...')
196    dut_address = address.split(" ")[0]
197
198    async with await open_transport_or_link(hci_port) as hci_transport:
199        # init Dongle bluetooth
200        device = Device.with_hci(
201            'Bumble',
202            Address('F0:F1:F2:F3:F4:F5'),
203            hci_transport.source,
204            hci_transport.sink,
205        )
206        device.classic_enabled = True
207        device.le_enabled = False
208        device.listener = DiscoveryListener()
209
210        with open(f"bumble_hci_{sys._getframe().f_code.co_name}.log", "wb") as snoop_file:
211            device.host.snooper = BtSnooper(snoop_file)
212            await device_power_on(device)
213            dongle_address = str(device.public_address).replace(r'/P', '')
214            # Start of Initial Condition
215            await device.set_discoverable(True)  # Set peripheral as discoverable
216            await device.set_connectable(True)  # Set peripheral as connectable
217            await device.send_command(HCI_Write_Page_Timeout_Command(page_timeout=0xFFFF))
218            shell.exec_command("bt disconnect")
219            # End of Initial Condition
220
221            # Test Start
222            logger.info("Step 1: DUT initiates general inquiry")
223            # Use limited inquiry as the control group
224            await send_cmd_to_iut(shell, dut, "br discovery on", dongle_address)
225
226            logger.info("Step 2: Tester responds to the inquiry")
227            logger.info("This is a passive step and it always succeed.")
228
229            logger.info("Step 3: DUT sends connect request to tester")
230            await send_cmd_to_iut(shell, dut, f"br connect {dongle_address}", "Connected")
231
232            logger.info(
233                "Step 4: Tester accepts the connection request and connected event is received"
234            )
235            logger.info("This is a passive step and it always succeed.")
236
237            logger.info("Step 5: Tester initiates disconnection")
238            connection = device.find_connection_by_bd_addr(Address(dut_address))
239            assert connection is not None, "No connection found with the DUT"
240            await connection.disconnect()
241
242            logger.info("Step 6: Connection is terminated")
243            found, _ = await _wait_for_shell_response(dut, "Disconnected")
244            assert found, "Disconnection event not received"
245
246
247async def tc_gap_c_3(hci_port, shell, dut, address) -> None:
248    case_name = 'GAP-C-3: General Inquiry followed by Rejected Connection Request'
249    logger.info(f'<<< Start {case_name} ...')
250
251    async with await open_transport_or_link(hci_port) as hci_transport:
252        # init Dongle bluetooth
253        device = Device.with_hci(
254            'Bumble',
255            Address('F0:F1:F2:F3:F4:F5'),
256            hci_transport.source,
257            hci_transport.sink,
258        )
259        device.classic_enabled = True
260        device.le_enabled = False
261        device.listener = DiscoveryListener()
262        device.classic_accept_any = False
263
264        with open(f"bumble_hci_{sys._getframe().f_code.co_name}.log", "wb") as snoop_file:
265            device.host.snooper = BtSnooper(snoop_file)
266            await device_power_on(device)
267            dongle_address = str(device.public_address).replace(r'/P', '')
268            # Start of Initial Condition
269            await device.set_discoverable(True)  # Set peripheral as discoverable
270            await device.set_connectable(True)  # Set peripheral to reject connections
271            await device.send_command(HCI_Write_Page_Timeout_Command(page_timeout=0xFFFF))
272            shell.exec_command("bt disconnect")
273            # End of Initial Condition
274
275            # Test Start
276            logger.info("Step 1: DUT initiates general inquiry")
277            await send_cmd_to_iut(shell, dut, "br discovery on", dongle_address)
278
279            logger.info("Step 2: Tester responds to the inquiry")
280            logger.info("This is a passive step and it always succeed.")
281
282            logger.info("Step 3: DUT sends connect request to tester")
283            shell.exec_command(f"br connect {dongle_address}")
284
285            logger.info("Step 4: Tester rejects the connection request")
286            logger.info("This is a passive step since tester is set to reject connections.")
287
288            logger.info("Step 5: Wait some time for the connection attempt to fail")
289            # Wait some time for the connection attempt to fail
290            await asyncio.sleep(5)
291            # Verify connection failure - Connected message should not appear
292            found, _ = await _wait_for_shell_response(dut, "Failed to connect", 5)
293            assert found, "Connected event was received when it should have failed"
294
295
296async def tc_gap_c_4(hci_port, shell, dut, address) -> None:
297    case_name = 'GAP-C-4: Limited Inquiry followed by Connection and Active Disconnection'
298    logger.info(f'<<< Start {case_name} ...')
299
300    async with await open_transport_or_link(hci_port) as hci_transport:
301        # init Dongle bluetooth
302        device = Device.with_hci(
303            'Bumble',
304            Address('F0:F1:F2:F3:F4:F5'),
305            hci_transport.source,
306            hci_transport.sink,
307        )
308        device.classic_enabled = True
309        device.le_enabled = False
310        device.listener = DiscoveryListener()
311
312        with open(f"bumble_hci_{sys._getframe().f_code.co_name}.log", "wb") as snoop_file:
313            device.host.snooper = BtSnooper(snoop_file)
314            await device_power_on(device)
315            dongle_address = str(device.public_address).replace(r'/P', '')
316            # Start of Initial Condition
317            await set_limited_discoverable(device, True)  # Set peripheral as limited discoverable
318            await device.set_connectable(True)  # Set peripheral as connectable
319            await device.send_command(HCI_Write_Page_Timeout_Command(page_timeout=0xFFFF))
320            shell.exec_command("bt disconnect")
321            # End of Initial Condition
322
323            # Test Start
324            logger.info("Step 1: DUT initiates limited inquiry")
325            await send_cmd_to_iut(shell, dut, "br discovery off")  # Reset discovery first
326            # Use general inquiry as the control group
327            await send_cmd_to_iut(
328                shell, dut, "br discovery on", dongle_address, False, max_wait_sec=30
329            )
330            await send_cmd_to_iut(shell, dut, "br discovery on 8 limited", dongle_address)
331
332            logger.info("Step 2: Tester responds to the inquiry")
333            logger.info("This is a passive step and it always succeed.")
334
335            logger.info("Step 3: DUT sends connect request to tester")
336            await send_cmd_to_iut(shell, dut, f"br connect {dongle_address}", "Connected")
337
338            logger.info(
339                "Step 4: Tester accepts the connection request and connected event is received"
340            )
341            logger.info("This is a passive step and it always succeed.")
342
343            logger.info("Step 5: DUT initiates disconnection")
344            await send_cmd_to_iut(shell, dut, "bt disconnect", "Disconnected")
345
346            logger.info("Step 6: Connection is terminated")
347            logger.info("This is a passive step and it is verified in previous step.")
348
349
350async def tc_gap_c_5(hci_port, shell, dut, address) -> None:
351    case_name = 'GAP-C-5: Limited Inquiry followed by Connection and Passive Disconnection'
352    logger.info(f'<<< Start {case_name} ...')
353    dut_address = address.split(" ")[0]
354
355    async with await open_transport_or_link(hci_port) as hci_transport:
356        # init Dongle bluetooth
357        device = Device.with_hci(
358            'Bumble',
359            Address('F0:F1:F2:F3:F4:F5'),
360            hci_transport.source,
361            hci_transport.sink,
362        )
363        device.classic_enabled = True
364        device.le_enabled = False
365        device.listener = DiscoveryListener()
366
367        with open(f"bumble_hci_{sys._getframe().f_code.co_name}.log", "wb") as snoop_file:
368            device.host.snooper = BtSnooper(snoop_file)
369            await device_power_on(device)
370            dongle_address = str(device.public_address).replace(r'/P', '')
371            # Start of Initial Condition
372            await set_limited_discoverable(device, True)  # Set peripheral as limited discoverable
373            await device.set_connectable(True)  # Set peripheral as connectable
374            await device.send_command(HCI_Write_Page_Timeout_Command(page_timeout=0xFFFF))
375            shell.exec_command("bt disconnect")
376            # End of Initial Condition
377
378            # Test Start
379            logger.info("Step 1: DUT initiates limited inquiry")
380            await send_cmd_to_iut(shell, dut, "br discovery off")  # Reset discovery first
381            await send_cmd_to_iut(shell, dut, "br discovery on 8 limited", dongle_address)
382
383            logger.info("Step 2: Tester responds to the inquiry")
384            logger.info("This is a passive step and it always succeed.")
385
386            logger.info("Step 3: DUT sends connect request to tester")
387            await send_cmd_to_iut(shell, dut, f"br connect {dongle_address}", "Connected")
388
389            logger.info(
390                "Step 4: Tester accepts the connection request and connected event is received"
391            )
392            logger.info("This is a passive step and it always succeed.")
393
394            logger.info("Step 5: Tester initiates disconnection")
395            connection = device.find_connection_by_bd_addr(Address(dut_address))
396            assert connection is not None, "No connection found with the DUT"
397            await connection.disconnect()
398
399            logger.info("Step 6: Connection is terminated")
400            found, _ = await _wait_for_shell_response(dut, "Disconnected")
401            assert found, "Disconnection event not received"
402
403
404async def tc_gap_c_6(hci_port, shell, dut, address) -> None:
405    case_name = 'GAP-C-6: Limited Inquiry followed by Rejected Connection Request'
406    logger.info(f'<<< Start {case_name} ...')
407
408    async with await open_transport_or_link(hci_port) as hci_transport:
409        # init Dongle bluetooth
410        device = Device.with_hci(
411            'Bumble',
412            Address('F0:F1:F2:F3:F4:F5'),
413            hci_transport.source,
414            hci_transport.sink,
415        )
416        device.classic_enabled = True
417        device.le_enabled = False
418        device.listener = DiscoveryListener()
419        device.classic_accept_any = False
420
421        with open(f"bumble_hci_{sys._getframe().f_code.co_name}.log", "wb") as snoop_file:
422            device.host.snooper = BtSnooper(snoop_file)
423            await device_power_on(device)
424            dongle_address = str(device.public_address).replace(r'/P', '')
425            # Start of Initial Condition
426            await set_limited_discoverable(device, True)  # Set peripheral as limited discoverable
427            await device.set_connectable(True)  # Set peripheral to reject connections
428            await device.send_command(HCI_Write_Page_Timeout_Command(page_timeout=0xFFFF))
429            shell.exec_command("bt disconnect")
430            # End of Initial Condition
431
432            # Test Start
433            logger.info("Step 1: DUT initiates limited inquiry")
434            await send_cmd_to_iut(shell, dut, "br discovery off")  # Reset discovery first
435            await send_cmd_to_iut(shell, dut, "br discovery on 8 limited", dongle_address)
436
437            logger.info("Step 2: Tester responds to the inquiry")
438            logger.info("This is a passive step and it always succeed.")
439
440            logger.info("Step 3: DUT sends connect request to tester")
441            shell.exec_command(f"br connect {dongle_address}")
442
443            logger.info("Step 4: Tester rejects the connection request")
444            logger.info("This is a passive step since tester is set to reject connections.")
445
446            logger.info("Step 5: Wait some time for the connection attempt to fail")
447            await asyncio.sleep(5)
448            # Verify connection failure - Connected message should not appear
449            found, _ = await _wait_for_shell_response(dut, "Failed to connect", 5)
450            assert found, "Connected event was received when it should have failed"
451
452
453class TestGAPCentral:
454    def test_gap_c_1(self, shell: Shell, dut: DeviceAdapter, device_under_test):
455        """Test GAP-C-1: General Inquiry followed by Connection and Active Disconnection."""
456        logger.info(f'Running test_gap_c_1 {device_under_test}')
457        hci, iut_address = device_under_test
458        asyncio.run(tc_gap_c_1(hci, shell, dut, iut_address))
459
460    def test_gap_c_2(self, shell: Shell, dut: DeviceAdapter, device_under_test):
461        """Test GAP-C-2: General Inquiry with Connection and Passive Disconnection."""
462        logger.info(f'Running test_gap_c_2 {device_under_test}')
463        hci, iut_address = device_under_test
464        asyncio.run(tc_gap_c_2(hci, shell, dut, iut_address))
465
466    def test_gap_c_3(self, shell: Shell, dut: DeviceAdapter, device_under_test):
467        """Test GAP-C-3: General Inquiry with Connection Rejection."""
468        logger.info(f'Running test_gap_c_3 {device_under_test}')
469        hci, iut_address = device_under_test
470        asyncio.run(tc_gap_c_3(hci, shell, dut, iut_address))
471
472    def test_gap_c_4(self, shell: Shell, dut: DeviceAdapter, device_under_test):
473        """Test GAP-C-4: Limited Inquiry with Successful Connection and Active Disconnection."""
474        logger.info(f'Running test_gap_c_4 {device_under_test}')
475        hci, iut_address = device_under_test
476        asyncio.run(tc_gap_c_4(hci, shell, dut, iut_address))
477
478    def test_gap_c_5(self, shell: Shell, dut: DeviceAdapter, device_under_test):
479        """Test GAP-C-5: Limited Inquiry with Connection and Passive Disconnection."""
480        logger.info(f'Running test_gap_c_5 {device_under_test}')
481        hci, iut_address = device_under_test
482        asyncio.run(tc_gap_c_5(hci, shell, dut, iut_address))
483
484    def test_gap_c_6(self, shell: Shell, dut: DeviceAdapter, device_under_test):
485        """Test GAP-C-6: Limited Inquiry with Connection Rejection."""
486        logger.info(f'Running test_gap_c_6 {device_under_test}')
487        hci, iut_address = device_under_test
488        asyncio.run(tc_gap_c_6(hci, shell, dut, iut_address))
489