1# Test L2CAP COC send/recv. 2 3# Sends a sequence of varying-sized payloads from central->peripheral, and 4# verifies that the other device sees the same data, then does the same thing 5# peripheral->central. 6 7from micropython import const 8import time, machine, bluetooth, random 9 10TIMEOUT_MS = 1000 11 12_IRQ_CENTRAL_CONNECT = const(1) 13_IRQ_CENTRAL_DISCONNECT = const(2) 14_IRQ_PERIPHERAL_CONNECT = const(7) 15_IRQ_PERIPHERAL_DISCONNECT = const(8) 16_IRQ_L2CAP_ACCEPT = const(22) 17_IRQ_L2CAP_CONNECT = const(23) 18_IRQ_L2CAP_DISCONNECT = const(24) 19_IRQ_L2CAP_RECV = const(25) 20_IRQ_L2CAP_SEND_READY = const(26) 21 22_L2CAP_MTU = const(450) 23_L2CAP_PSM = const(22) 24 25_PAYLOAD_LEN = const(_L2CAP_MTU - 50) 26_PAYLOAD_LEN_STEP = -23 27_NUM_PAYLOADS = const(16) 28 29_RANDOM_SEED = 22 30 31 32waiting_events = {} 33 34 35def irq(event, data): 36 if event == _IRQ_CENTRAL_CONNECT: 37 conn_handle, addr_type, addr = data 38 print("_IRQ_CENTRAL_CONNECT") 39 waiting_events[event] = conn_handle 40 elif event == _IRQ_CENTRAL_DISCONNECT: 41 print("_IRQ_CENTRAL_DISCONNECT") 42 elif event == _IRQ_PERIPHERAL_CONNECT: 43 conn_handle, addr_type, addr = data 44 print("_IRQ_PERIPHERAL_CONNECT") 45 waiting_events[event] = conn_handle 46 elif event == _IRQ_PERIPHERAL_DISCONNECT: 47 print("_IRQ_PERIPHERAL_DISCONNECT") 48 elif event == _IRQ_L2CAP_ACCEPT: 49 conn_handle, cid, psm, our_mtu, peer_mtu = data 50 print("_IRQ_L2CAP_ACCEPT", psm, our_mtu, peer_mtu) 51 waiting_events[event] = (conn_handle, cid, psm) 52 elif event == _IRQ_L2CAP_CONNECT: 53 conn_handle, cid, psm, our_mtu, peer_mtu = data 54 print("_IRQ_L2CAP_CONNECT", psm, our_mtu, peer_mtu) 55 waiting_events[event] = (conn_handle, cid, psm, our_mtu, peer_mtu) 56 elif event == _IRQ_L2CAP_DISCONNECT: 57 conn_handle, cid, psm, status = data 58 print("_IRQ_L2CAP_DISCONNECT", psm, status) 59 elif event == _IRQ_L2CAP_RECV: 60 conn_handle, cid = data 61 elif event == _IRQ_L2CAP_SEND_READY: 62 conn_handle, cid, status = data 63 64 if event not in waiting_events: 65 waiting_events[event] = None 66 67 68def wait_for_event(event, timeout_ms): 69 t0 = time.ticks_ms() 70 while time.ticks_diff(time.ticks_ms(), t0) < timeout_ms: 71 if event in waiting_events: 72 return waiting_events.pop(event) 73 machine.idle() 74 raise ValueError("Timeout waiting for {}".format(event)) 75 76 77def send_data(ble, conn_handle, cid): 78 buf = bytearray(_PAYLOAD_LEN) 79 mv = memoryview(buf) 80 print("l2cap_send", _NUM_PAYLOADS, _PAYLOAD_LEN) 81 for i in range(_NUM_PAYLOADS): 82 n = _PAYLOAD_LEN + i * _PAYLOAD_LEN_STEP 83 for j in range(n): 84 buf[j] = random.randint(0, 255) 85 if not ble.l2cap_send(conn_handle, cid, mv[:n]): 86 wait_for_event(_IRQ_L2CAP_SEND_READY, TIMEOUT_MS) 87 88 89def recv_data(ble, conn_handle, cid): 90 buf = bytearray(_PAYLOAD_LEN) 91 recv_bytes = 0 92 recv_correct = 0 93 expected_bytes = ( 94 _PAYLOAD_LEN * _NUM_PAYLOADS + _PAYLOAD_LEN_STEP * _NUM_PAYLOADS * (_NUM_PAYLOADS - 1) // 2 95 ) 96 print("l2cap_recvinto", expected_bytes) 97 while recv_bytes < expected_bytes: 98 wait_for_event(_IRQ_L2CAP_RECV, TIMEOUT_MS) 99 while True: 100 n = ble.l2cap_recvinto(conn_handle, cid, buf) 101 if n == 0: 102 break 103 recv_bytes += n 104 for i in range(n): 105 if buf[i] == random.randint(0, 255): 106 recv_correct += 1 107 return recv_bytes, recv_correct 108 109 110# Acting in peripheral role. 111def instance0(): 112 multitest.globals(BDADDR=ble.config("mac")) 113 print("gap_advertise") 114 ble.gap_advertise(20_000, b"\x02\x01\x06\x04\xffMPY") 115 multitest.next() 116 try: 117 # Wait for central to connect to us. 118 conn_handle = wait_for_event(_IRQ_CENTRAL_CONNECT, TIMEOUT_MS) 119 120 print("l2cap_listen") 121 ble.l2cap_listen(_L2CAP_PSM, _L2CAP_MTU) 122 123 conn_handle, cid, psm = wait_for_event(_IRQ_L2CAP_ACCEPT, TIMEOUT_MS) 124 conn_handle, cid, psm, our_mtu, peer_mtu = wait_for_event(_IRQ_L2CAP_CONNECT, TIMEOUT_MS) 125 126 random.seed(_RANDOM_SEED) 127 128 recv_bytes, recv_correct = recv_data(ble, conn_handle, cid) 129 send_data(ble, conn_handle, cid) 130 131 wait_for_event(_IRQ_L2CAP_DISCONNECT, TIMEOUT_MS) 132 133 print("received", recv_bytes, recv_correct) 134 135 # Wait for the central to disconnect. 136 wait_for_event(_IRQ_CENTRAL_DISCONNECT, TIMEOUT_MS) 137 finally: 138 ble.active(0) 139 140 141# Acting in central role. 142def instance1(): 143 multitest.next() 144 try: 145 # Connect to peripheral and then disconnect. 146 print("gap_connect") 147 ble.gap_connect(*BDADDR) 148 conn_handle = wait_for_event(_IRQ_PERIPHERAL_CONNECT, TIMEOUT_MS) 149 150 print("l2cap_connect") 151 ble.l2cap_connect(conn_handle, _L2CAP_PSM, _L2CAP_MTU) 152 conn_handle, cid, psm, our_mtu, peer_mtu = wait_for_event(_IRQ_L2CAP_CONNECT, TIMEOUT_MS) 153 154 random.seed(_RANDOM_SEED) 155 156 send_data(ble, conn_handle, cid) 157 recv_bytes, recv_correct = recv_data(ble, conn_handle, cid) 158 159 # Disconnect channel. 160 print("l2cap_disconnect") 161 ble.l2cap_disconnect(conn_handle, cid) 162 wait_for_event(_IRQ_L2CAP_DISCONNECT, TIMEOUT_MS) 163 164 print("received", recv_bytes, recv_correct) 165 166 # Disconnect from peripheral. 167 print("gap_disconnect:", ble.gap_disconnect(conn_handle)) 168 wait_for_event(_IRQ_PERIPHERAL_DISCONNECT, TIMEOUT_MS) 169 finally: 170 ble.active(0) 171 172 173ble = bluetooth.BLE() 174ble.active(1) 175ble.irq(irq) 176