1# Copyright 2025 NXP 2# 3# SPDX-License-Identifier: Apache-2.0 4import asyncio 5import logging 6import sys 7 8from bumble import hci 9from bumble.core import DeviceClass 10from bumble.device import Device 11from bumble.hci import Address, HCI_Write_Page_Timeout_Command 12from bumble.snoop import BtSnooper 13from bumble.transport import open_transport_or_link 14from twister_harness import DeviceAdapter, Shell 15 16logger = logging.getLogger(__name__) 17 18 19async def device_power_on(device) -> None: 20 while True: 21 try: 22 await device.power_on() 23 break 24 except Exception: 25 continue 26 27 28# wait for shell response 29async def _wait_for_shell_response(dut, response, max_wait_sec=20): 30 """ 31 _wait_for_shell_response() is used to wait for shell response. 32 It will return after finding a specific 'response' or waiting long enough. 33 :param dut: 34 :param response: shell response that you want to monitor. 35 :param max_wait_sec: maximum waiting time 36 :return: found: whether the 'response' is found; lines: DUT shell response 37 """ 38 found = False 39 lines = [] 40 try: 41 for _ in range(0, max_wait_sec): 42 read_lines = dut.readlines() 43 for line in read_lines: 44 if response in line: 45 found = True 46 break 47 lines = lines + read_lines 48 await asyncio.sleep(1) 49 logger.info(f'{str(lines)}') 50 except Exception as e: 51 logger.error(f'{e}!', exc_info=True) 52 raise e 53 return found, lines 54 55 56# interact between script and DUT 57async def send_cmd_to_iut( 58 shell, dut, cmd, response=None, expect_to_find_resp=True, max_wait_sec=20 59): 60 """ 61 send_cmd_to_iut() is used to send shell cmd to DUT and monitor the response. 62 It can choose whether to monitor the shell response of DUT. 63 Use 'expect_to_find_resp' to set whether to expect the response to contain certain 'response'. 64 'max_wait_sec' indicates the maximum waiting time. 65 For 'expect_to_find_resp=False', this is useful 66 because we need to wait long enough to get enough response 67 to more accurately judge that the response does not contain specific characters. 68 69 :param shell: 70 :param dut: 71 :param cmd: shell cmd sent to DUT 72 :param response: shell response that you want to monitor. 73 'None' means not to monitor any response. 74 :param expect_to_find_resp: set whether to expect the response to contain certain 'response' 75 :param max_wait_sec: maximum monitoring time 76 :return: DUT shell response 77 """ 78 shell.exec_command(cmd) 79 if response is not None: 80 found, lines = await _wait_for_shell_response(dut, response, max_wait_sec) 81 else: 82 found = True 83 lines = '' 84 assert found is expect_to_find_resp 85 return lines 86 87 88# set limited discoverab mode of dongle 89async def set_limited_discoverable(device, discoverable=True): 90 # Read current class of device 91 response = await device.send_command( 92 hci.HCI_Command( 93 op_code=0x0C23, # Read Class of Device 94 parameters=b'', 95 ) 96 ) 97 current_cod = response.return_parameters.class_of_device 98 99 if discoverable: 100 # set Limited Discoverable Mode (bit 13) 101 new_cod = (current_cod | 0x2000).to_bytes(3, byteorder='little') 102 # Limited Inquiry Access Code(LIAC) = 0x9E8B00 103 iac = hci.HCI_LIMITED_DEDICATED_INQUIRY_LAP.to_bytes(3, byteorder='little') 104 else: 105 mask = ~0x2000 106 new_cod = (current_cod & mask).to_bytes(3, byteorder='little') 107 # General Inquiry Access Code(GIAC) = 0x9E8B33 108 iac = hci.HCI_GENERAL_INQUIRY_LAP.to_bytes(3, byteorder='little') 109 110 await device.send_command( 111 hci.HCI_Command( 112 op_code=0x0C24, # Write Class of Device 113 parameters=new_cod, 114 ) 115 ) 116 117 await device.send_command( 118 hci.HCI_Command( 119 op_code=0x0C3A, # Write Current IAC LAP 120 parameters=bytes([0x01]) + iac, # num_current_iac=1, iac_lap 121 ) 122 ) 123 124 device.discoverable = discoverable 125 126 127# dongle listener for receiving scan results 128class DiscoveryListener(Device.Listener): 129 def __init__(self): 130 self.discovered_addresses = set() 131 132 def on_inquiry_result(self, address, class_of_device, data, rssi): 133 DeviceClass.split_class_of_device(class_of_device) 134 found_address = str(address).replace(r'/P', '') 135 logger.info(f'Found addr: {found_address}') 136 self.discovered_addresses.add(found_address) 137 138 def has_found_target_addr(self, target_addr): 139 return str(target_addr).upper() in self.discovered_addresses 140 141 142async def tc_gap_c_1(hci_port, shell, dut, address) -> None: 143 case_name = 'GAP-C-1: General Inquiry followed by Connection and Active Disconnection' 144 logger.info(f'<<< Start {case_name} ...') 145 146 async with await open_transport_or_link(hci_port) as hci_transport: 147 # init Dongle bluetooth 148 device = Device.with_hci( 149 'Bumble', 150 Address('F0:F1:F2:F3:F4:F5'), 151 hci_transport.source, 152 hci_transport.sink, 153 ) 154 device.classic_enabled = True 155 device.le_enabled = False 156 device.listener = DiscoveryListener() 157 158 with open(f"bumble_hci_{sys._getframe().f_code.co_name}.log", "wb") as snoop_file: 159 device.host.snooper = BtSnooper(snoop_file) 160 await device_power_on(device) 161 dongle_address = str(device.public_address).replace(r'/P', '') 162 # Start of Initial Condition 163 await device.set_discoverable(True) # Set peripheral as discoverable 164 await device.set_connectable(True) # Set peripheral as connectable 165 await device.send_command(HCI_Write_Page_Timeout_Command(page_timeout=0xFFFF)) 166 shell.exec_command("bt disconnect") 167 # End of Initial Condition 168 169 # Test Start 170 logger.info("Step 1: DUT initiates general inquiry") 171 # Use limited inquiry as the control group 172 await send_cmd_to_iut(shell, dut, "br discovery on 8 limited", dongle_address, False) 173 await send_cmd_to_iut(shell, dut, "br discovery on", dongle_address) 174 175 logger.info("Step 2: Tester responds to the inquiry") 176 logger.info("This is a passive step and it always succeed.") 177 178 logger.info("Step 3: DUT sends connect request to tester") 179 await send_cmd_to_iut(shell, dut, f"br connect {dongle_address}", "Connected") 180 181 logger.info( 182 "Step 5: Tester accepts the connection request and connected event is received" 183 ) 184 logger.info("This is a passive step and it always succeed.") 185 186 logger.info("Step 6: DUT initiates disconnection") 187 await send_cmd_to_iut(shell, dut, "bt disconnect", "Disconnected") 188 189 logger.info("Step 7: Connection is terminated") 190 logger.info("This is a passive step and it is verified in previous step.") 191 192 193async def tc_gap_c_2(hci_port, shell, dut, address) -> None: 194 case_name = 'GAP-C-2: General Inquiry followed by Connection and Passive Disconnection' 195 logger.info(f'<<< Start {case_name} ...') 196 dut_address = address.split(" ")[0] 197 198 async with await open_transport_or_link(hci_port) as hci_transport: 199 # init Dongle bluetooth 200 device = Device.with_hci( 201 'Bumble', 202 Address('F0:F1:F2:F3:F4:F5'), 203 hci_transport.source, 204 hci_transport.sink, 205 ) 206 device.classic_enabled = True 207 device.le_enabled = False 208 device.listener = DiscoveryListener() 209 210 with open(f"bumble_hci_{sys._getframe().f_code.co_name}.log", "wb") as snoop_file: 211 device.host.snooper = BtSnooper(snoop_file) 212 await device_power_on(device) 213 dongle_address = str(device.public_address).replace(r'/P', '') 214 # Start of Initial Condition 215 await device.set_discoverable(True) # Set peripheral as discoverable 216 await device.set_connectable(True) # Set peripheral as connectable 217 await device.send_command(HCI_Write_Page_Timeout_Command(page_timeout=0xFFFF)) 218 shell.exec_command("bt disconnect") 219 # End of Initial Condition 220 221 # Test Start 222 logger.info("Step 1: DUT initiates general inquiry") 223 # Use limited inquiry as the control group 224 await send_cmd_to_iut(shell, dut, "br discovery on", dongle_address) 225 226 logger.info("Step 2: Tester responds to the inquiry") 227 logger.info("This is a passive step and it always succeed.") 228 229 logger.info("Step 3: DUT sends connect request to tester") 230 await send_cmd_to_iut(shell, dut, f"br connect {dongle_address}", "Connected") 231 232 logger.info( 233 "Step 4: Tester accepts the connection request and connected event is received" 234 ) 235 logger.info("This is a passive step and it always succeed.") 236 237 logger.info("Step 5: Tester initiates disconnection") 238 connection = device.find_connection_by_bd_addr(Address(dut_address)) 239 assert connection is not None, "No connection found with the DUT" 240 await connection.disconnect() 241 242 logger.info("Step 6: Connection is terminated") 243 found, _ = await _wait_for_shell_response(dut, "Disconnected") 244 assert found, "Disconnection event not received" 245 246 247async def tc_gap_c_3(hci_port, shell, dut, address) -> None: 248 case_name = 'GAP-C-3: General Inquiry followed by Rejected Connection Request' 249 logger.info(f'<<< Start {case_name} ...') 250 251 async with await open_transport_or_link(hci_port) as hci_transport: 252 # init Dongle bluetooth 253 device = Device.with_hci( 254 'Bumble', 255 Address('F0:F1:F2:F3:F4:F5'), 256 hci_transport.source, 257 hci_transport.sink, 258 ) 259 device.classic_enabled = True 260 device.le_enabled = False 261 device.listener = DiscoveryListener() 262 device.classic_accept_any = False 263 264 with open(f"bumble_hci_{sys._getframe().f_code.co_name}.log", "wb") as snoop_file: 265 device.host.snooper = BtSnooper(snoop_file) 266 await device_power_on(device) 267 dongle_address = str(device.public_address).replace(r'/P', '') 268 # Start of Initial Condition 269 await device.set_discoverable(True) # Set peripheral as discoverable 270 await device.set_connectable(True) # Set peripheral to reject connections 271 await device.send_command(HCI_Write_Page_Timeout_Command(page_timeout=0xFFFF)) 272 shell.exec_command("bt disconnect") 273 # End of Initial Condition 274 275 # Test Start 276 logger.info("Step 1: DUT initiates general inquiry") 277 await send_cmd_to_iut(shell, dut, "br discovery on", dongle_address) 278 279 logger.info("Step 2: Tester responds to the inquiry") 280 logger.info("This is a passive step and it always succeed.") 281 282 logger.info("Step 3: DUT sends connect request to tester") 283 shell.exec_command(f"br connect {dongle_address}") 284 285 logger.info("Step 4: Tester rejects the connection request") 286 logger.info("This is a passive step since tester is set to reject connections.") 287 288 logger.info("Step 5: Wait some time for the connection attempt to fail") 289 # Wait some time for the connection attempt to fail 290 await asyncio.sleep(5) 291 # Verify connection failure - Connected message should not appear 292 found, _ = await _wait_for_shell_response(dut, "Failed to connect", 5) 293 assert found, "Connected event was received when it should have failed" 294 295 296async def tc_gap_c_4(hci_port, shell, dut, address) -> None: 297 case_name = 'GAP-C-4: Limited Inquiry followed by Connection and Active Disconnection' 298 logger.info(f'<<< Start {case_name} ...') 299 300 async with await open_transport_or_link(hci_port) as hci_transport: 301 # init Dongle bluetooth 302 device = Device.with_hci( 303 'Bumble', 304 Address('F0:F1:F2:F3:F4:F5'), 305 hci_transport.source, 306 hci_transport.sink, 307 ) 308 device.classic_enabled = True 309 device.le_enabled = False 310 device.listener = DiscoveryListener() 311 312 with open(f"bumble_hci_{sys._getframe().f_code.co_name}.log", "wb") as snoop_file: 313 device.host.snooper = BtSnooper(snoop_file) 314 await device_power_on(device) 315 dongle_address = str(device.public_address).replace(r'/P', '') 316 # Start of Initial Condition 317 await set_limited_discoverable(device, True) # Set peripheral as limited discoverable 318 await device.set_connectable(True) # Set peripheral as connectable 319 await device.send_command(HCI_Write_Page_Timeout_Command(page_timeout=0xFFFF)) 320 shell.exec_command("bt disconnect") 321 # End of Initial Condition 322 323 # Test Start 324 logger.info("Step 1: DUT initiates limited inquiry") 325 await send_cmd_to_iut(shell, dut, "br discovery off") # Reset discovery first 326 # Use general inquiry as the control group 327 await send_cmd_to_iut( 328 shell, dut, "br discovery on", dongle_address, False, max_wait_sec=30 329 ) 330 await send_cmd_to_iut(shell, dut, "br discovery on 8 limited", dongle_address) 331 332 logger.info("Step 2: Tester responds to the inquiry") 333 logger.info("This is a passive step and it always succeed.") 334 335 logger.info("Step 3: DUT sends connect request to tester") 336 await send_cmd_to_iut(shell, dut, f"br connect {dongle_address}", "Connected") 337 338 logger.info( 339 "Step 4: Tester accepts the connection request and connected event is received" 340 ) 341 logger.info("This is a passive step and it always succeed.") 342 343 logger.info("Step 5: DUT initiates disconnection") 344 await send_cmd_to_iut(shell, dut, "bt disconnect", "Disconnected") 345 346 logger.info("Step 6: Connection is terminated") 347 logger.info("This is a passive step and it is verified in previous step.") 348 349 350async def tc_gap_c_5(hci_port, shell, dut, address) -> None: 351 case_name = 'GAP-C-5: Limited Inquiry followed by Connection and Passive Disconnection' 352 logger.info(f'<<< Start {case_name} ...') 353 dut_address = address.split(" ")[0] 354 355 async with await open_transport_or_link(hci_port) as hci_transport: 356 # init Dongle bluetooth 357 device = Device.with_hci( 358 'Bumble', 359 Address('F0:F1:F2:F3:F4:F5'), 360 hci_transport.source, 361 hci_transport.sink, 362 ) 363 device.classic_enabled = True 364 device.le_enabled = False 365 device.listener = DiscoveryListener() 366 367 with open(f"bumble_hci_{sys._getframe().f_code.co_name}.log", "wb") as snoop_file: 368 device.host.snooper = BtSnooper(snoop_file) 369 await device_power_on(device) 370 dongle_address = str(device.public_address).replace(r'/P', '') 371 # Start of Initial Condition 372 await set_limited_discoverable(device, True) # Set peripheral as limited discoverable 373 await device.set_connectable(True) # Set peripheral as connectable 374 await device.send_command(HCI_Write_Page_Timeout_Command(page_timeout=0xFFFF)) 375 shell.exec_command("bt disconnect") 376 # End of Initial Condition 377 378 # Test Start 379 logger.info("Step 1: DUT initiates limited inquiry") 380 await send_cmd_to_iut(shell, dut, "br discovery off") # Reset discovery first 381 await send_cmd_to_iut(shell, dut, "br discovery on 8 limited", dongle_address) 382 383 logger.info("Step 2: Tester responds to the inquiry") 384 logger.info("This is a passive step and it always succeed.") 385 386 logger.info("Step 3: DUT sends connect request to tester") 387 await send_cmd_to_iut(shell, dut, f"br connect {dongle_address}", "Connected") 388 389 logger.info( 390 "Step 4: Tester accepts the connection request and connected event is received" 391 ) 392 logger.info("This is a passive step and it always succeed.") 393 394 logger.info("Step 5: Tester initiates disconnection") 395 connection = device.find_connection_by_bd_addr(Address(dut_address)) 396 assert connection is not None, "No connection found with the DUT" 397 await connection.disconnect() 398 399 logger.info("Step 6: Connection is terminated") 400 found, _ = await _wait_for_shell_response(dut, "Disconnected") 401 assert found, "Disconnection event not received" 402 403 404async def tc_gap_c_6(hci_port, shell, dut, address) -> None: 405 case_name = 'GAP-C-6: Limited Inquiry followed by Rejected Connection Request' 406 logger.info(f'<<< Start {case_name} ...') 407 408 async with await open_transport_or_link(hci_port) as hci_transport: 409 # init Dongle bluetooth 410 device = Device.with_hci( 411 'Bumble', 412 Address('F0:F1:F2:F3:F4:F5'), 413 hci_transport.source, 414 hci_transport.sink, 415 ) 416 device.classic_enabled = True 417 device.le_enabled = False 418 device.listener = DiscoveryListener() 419 device.classic_accept_any = False 420 421 with open(f"bumble_hci_{sys._getframe().f_code.co_name}.log", "wb") as snoop_file: 422 device.host.snooper = BtSnooper(snoop_file) 423 await device_power_on(device) 424 dongle_address = str(device.public_address).replace(r'/P', '') 425 # Start of Initial Condition 426 await set_limited_discoverable(device, True) # Set peripheral as limited discoverable 427 await device.set_connectable(True) # Set peripheral to reject connections 428 await device.send_command(HCI_Write_Page_Timeout_Command(page_timeout=0xFFFF)) 429 shell.exec_command("bt disconnect") 430 # End of Initial Condition 431 432 # Test Start 433 logger.info("Step 1: DUT initiates limited inquiry") 434 await send_cmd_to_iut(shell, dut, "br discovery off") # Reset discovery first 435 await send_cmd_to_iut(shell, dut, "br discovery on 8 limited", dongle_address) 436 437 logger.info("Step 2: Tester responds to the inquiry") 438 logger.info("This is a passive step and it always succeed.") 439 440 logger.info("Step 3: DUT sends connect request to tester") 441 shell.exec_command(f"br connect {dongle_address}") 442 443 logger.info("Step 4: Tester rejects the connection request") 444 logger.info("This is a passive step since tester is set to reject connections.") 445 446 logger.info("Step 5: Wait some time for the connection attempt to fail") 447 await asyncio.sleep(5) 448 # Verify connection failure - Connected message should not appear 449 found, _ = await _wait_for_shell_response(dut, "Failed to connect", 5) 450 assert found, "Connected event was received when it should have failed" 451 452 453class TestGAPCentral: 454 def test_gap_c_1(self, shell: Shell, dut: DeviceAdapter, device_under_test): 455 """Test GAP-C-1: General Inquiry followed by Connection and Active Disconnection.""" 456 logger.info(f'Running test_gap_c_1 {device_under_test}') 457 hci, iut_address = device_under_test 458 asyncio.run(tc_gap_c_1(hci, shell, dut, iut_address)) 459 460 def test_gap_c_2(self, shell: Shell, dut: DeviceAdapter, device_under_test): 461 """Test GAP-C-2: General Inquiry with Connection and Passive Disconnection.""" 462 logger.info(f'Running test_gap_c_2 {device_under_test}') 463 hci, iut_address = device_under_test 464 asyncio.run(tc_gap_c_2(hci, shell, dut, iut_address)) 465 466 def test_gap_c_3(self, shell: Shell, dut: DeviceAdapter, device_under_test): 467 """Test GAP-C-3: General Inquiry with Connection Rejection.""" 468 logger.info(f'Running test_gap_c_3 {device_under_test}') 469 hci, iut_address = device_under_test 470 asyncio.run(tc_gap_c_3(hci, shell, dut, iut_address)) 471 472 def test_gap_c_4(self, shell: Shell, dut: DeviceAdapter, device_under_test): 473 """Test GAP-C-4: Limited Inquiry with Successful Connection and Active Disconnection.""" 474 logger.info(f'Running test_gap_c_4 {device_under_test}') 475 hci, iut_address = device_under_test 476 asyncio.run(tc_gap_c_4(hci, shell, dut, iut_address)) 477 478 def test_gap_c_5(self, shell: Shell, dut: DeviceAdapter, device_under_test): 479 """Test GAP-C-5: Limited Inquiry with Connection and Passive Disconnection.""" 480 logger.info(f'Running test_gap_c_5 {device_under_test}') 481 hci, iut_address = device_under_test 482 asyncio.run(tc_gap_c_5(hci, shell, dut, iut_address)) 483 484 def test_gap_c_6(self, shell: Shell, dut: DeviceAdapter, device_under_test): 485 """Test GAP-C-6: Limited Inquiry with Connection Rejection.""" 486 logger.info(f'Running test_gap_c_6 {device_under_test}') 487 hci, iut_address = device_under_test 488 asyncio.run(tc_gap_c_6(hci, shell, dut, iut_address)) 489