1# Test characteristic read/write/notify from both GATTS and GATTC. 2 3from micropython import const 4import time, machine, bluetooth 5 6TIMEOUT_MS = 5000 7 8_IRQ_CENTRAL_CONNECT = const(1) 9_IRQ_CENTRAL_DISCONNECT = const(2) 10_IRQ_GATTS_WRITE = const(3) 11_IRQ_PERIPHERAL_CONNECT = const(7) 12_IRQ_PERIPHERAL_DISCONNECT = const(8) 13_IRQ_GATTC_CHARACTERISTIC_RESULT = const(11) 14_IRQ_GATTC_CHARACTERISTIC_DONE = const(12) 15_IRQ_GATTC_READ_RESULT = const(15) 16_IRQ_GATTC_READ_DONE = const(16) 17_IRQ_GATTC_WRITE_DONE = const(17) 18_IRQ_GATTC_NOTIFY = const(18) 19_IRQ_GATTC_INDICATE = const(19) 20_IRQ_GATTS_INDICATE_DONE = const(20) 21 22SERVICE_UUID = bluetooth.UUID("A5A5A5A5-FFFF-9999-1111-5A5A5A5A5A5A") 23CHAR_UUID = bluetooth.UUID("00000000-1111-2222-3333-444444444444") 24CHAR = ( 25 CHAR_UUID, 26 bluetooth.FLAG_READ | bluetooth.FLAG_WRITE | bluetooth.FLAG_NOTIFY | bluetooth.FLAG_INDICATE, 27) 28SERVICE = ( 29 SERVICE_UUID, 30 (CHAR,), 31) 32SERVICES = (SERVICE,) 33 34waiting_events = {} 35 36 37def irq(event, data): 38 if event == _IRQ_CENTRAL_CONNECT: 39 print("_IRQ_CENTRAL_CONNECT") 40 waiting_events[event] = data[0] 41 elif event == _IRQ_CENTRAL_DISCONNECT: 42 print("_IRQ_CENTRAL_DISCONNECT") 43 elif event == _IRQ_GATTS_WRITE: 44 print("_IRQ_GATTS_WRITE", ble.gatts_read(data[-1])) 45 elif event == _IRQ_PERIPHERAL_CONNECT: 46 print("_IRQ_PERIPHERAL_CONNECT") 47 waiting_events[event] = data[0] 48 elif event == _IRQ_PERIPHERAL_DISCONNECT: 49 print("_IRQ_PERIPHERAL_DISCONNECT") 50 elif event == _IRQ_GATTC_CHARACTERISTIC_RESULT: 51 # conn_handle, def_handle, value_handle, properties, uuid = data 52 if data[-1] == CHAR_UUID: 53 print("_IRQ_GATTC_CHARACTERISTIC_RESULT", data[-1]) 54 waiting_events[event] = data[2] 55 else: 56 return 57 elif event == _IRQ_GATTC_CHARACTERISTIC_DONE: 58 print("_IRQ_GATTC_CHARACTERISTIC_DONE") 59 elif event == _IRQ_GATTC_READ_RESULT: 60 print("_IRQ_GATTC_READ_RESULT", bytes(data[-1])) 61 elif event == _IRQ_GATTC_READ_DONE: 62 print("_IRQ_GATTC_READ_DONE", data[-1]) 63 elif event == _IRQ_GATTC_WRITE_DONE: 64 print("_IRQ_GATTC_WRITE_DONE", data[-1]) 65 elif event == _IRQ_GATTC_NOTIFY: 66 print("_IRQ_GATTC_NOTIFY", bytes(data[-1])) 67 elif event == _IRQ_GATTC_INDICATE: 68 print("_IRQ_GATTC_INDICATE", bytes(data[-1])) 69 elif event == _IRQ_GATTS_INDICATE_DONE: 70 print("_IRQ_GATTS_INDICATE_DONE", data[-1]) 71 72 if event not in waiting_events: 73 waiting_events[event] = None 74 75 76def wait_for_event(event, timeout_ms): 77 t0 = time.ticks_ms() 78 while time.ticks_diff(time.ticks_ms(), t0) < timeout_ms: 79 if event in waiting_events: 80 return waiting_events.pop(event) 81 machine.idle() 82 raise ValueError("Timeout waiting for {}".format(event)) 83 84 85# Acting in peripheral role. 86def instance0(): 87 multitest.globals(BDADDR=ble.config("mac")) 88 ((char_handle,),) = ble.gatts_register_services(SERVICES) 89 print("gap_advertise") 90 ble.gap_advertise(20_000, b"\x02\x01\x06\x04\xffMPY") 91 multitest.next() 92 try: 93 # Write initial characteristic value. 94 ble.gatts_write(char_handle, "periph0") 95 96 # Wait for central to connect to us. 97 conn_handle = wait_for_event(_IRQ_CENTRAL_CONNECT, TIMEOUT_MS) 98 99 # A 100 101 # Wait for a write to the characteristic from the central, 102 # then reply with a notification. 103 wait_for_event(_IRQ_GATTS_WRITE, TIMEOUT_MS) 104 print("gatts_write") 105 ble.gatts_write(char_handle, "periph1") 106 print("gatts_notify") 107 ble.gatts_notify(conn_handle, char_handle) 108 109 # B 110 111 # Wait for a write to the characteristic from the central, 112 # then reply with value-included notification. 113 wait_for_event(_IRQ_GATTS_WRITE, TIMEOUT_MS) 114 print("gatts_notify") 115 ble.gatts_notify(conn_handle, char_handle, "periph2") 116 117 # C 118 119 # Wait for a write to the characteristic from the central, 120 # then reply with an indication. 121 wait_for_event(_IRQ_GATTS_WRITE, TIMEOUT_MS) 122 print("gatts_write") 123 ble.gatts_write(char_handle, "periph3") 124 print("gatts_indicate") 125 ble.gatts_indicate(conn_handle, char_handle) 126 wait_for_event(_IRQ_GATTS_INDICATE_DONE, TIMEOUT_MS) 127 128 # Wait for the central to disconnect. 129 wait_for_event(_IRQ_CENTRAL_DISCONNECT, TIMEOUT_MS) 130 finally: 131 ble.active(0) 132 133 134# Acting in central role. 135def instance1(): 136 multitest.next() 137 try: 138 # Connect to peripheral and then disconnect. 139 print("gap_connect") 140 ble.gap_connect(*BDADDR) 141 conn_handle = wait_for_event(_IRQ_PERIPHERAL_CONNECT, TIMEOUT_MS) 142 143 # Discover characteristics. 144 ble.gattc_discover_characteristics(conn_handle, 1, 65535) 145 value_handle = wait_for_event(_IRQ_GATTC_CHARACTERISTIC_RESULT, TIMEOUT_MS) 146 wait_for_event(_IRQ_GATTC_CHARACTERISTIC_DONE, TIMEOUT_MS) 147 148 # Issue read of characteristic, should get initial value. 149 print("gattc_read") 150 ble.gattc_read(conn_handle, value_handle) 151 wait_for_event(_IRQ_GATTC_READ_RESULT, TIMEOUT_MS) 152 153 # Write to the characteristic, which will trigger a notification. 154 print("gattc_write") 155 ble.gattc_write(conn_handle, value_handle, "central0", 1) 156 wait_for_event(_IRQ_GATTC_WRITE_DONE, TIMEOUT_MS) 157 # A 158 wait_for_event(_IRQ_GATTC_NOTIFY, TIMEOUT_MS) 159 print("gattc_read") # Read the new value set immediately before notification. 160 ble.gattc_read(conn_handle, value_handle) 161 wait_for_event(_IRQ_GATTC_READ_RESULT, TIMEOUT_MS) 162 163 # Write to the characteristic, which will trigger a value-included notification. 164 print("gattc_write") 165 ble.gattc_write(conn_handle, value_handle, "central1", 1) 166 wait_for_event(_IRQ_GATTC_WRITE_DONE, TIMEOUT_MS) 167 # B 168 wait_for_event(_IRQ_GATTC_NOTIFY, TIMEOUT_MS) 169 print("gattc_read") # Read value should be unchanged. 170 ble.gattc_read(conn_handle, value_handle) 171 wait_for_event(_IRQ_GATTC_READ_RESULT, TIMEOUT_MS) 172 173 # Write to the characteristic, which will trigger an indication. 174 print("gattc_write") 175 ble.gattc_write(conn_handle, value_handle, "central2", 1) 176 wait_for_event(_IRQ_GATTC_WRITE_DONE, TIMEOUT_MS) 177 # C 178 wait_for_event(_IRQ_GATTC_INDICATE, TIMEOUT_MS) 179 print("gattc_read") # Read the new value set immediately before indication. 180 ble.gattc_read(conn_handle, value_handle) 181 wait_for_event(_IRQ_GATTC_READ_RESULT, TIMEOUT_MS) 182 183 # Disconnect from peripheral. 184 print("gap_disconnect:", ble.gap_disconnect(conn_handle)) 185 wait_for_event(_IRQ_PERIPHERAL_DISCONNECT, TIMEOUT_MS) 186 finally: 187 ble.active(0) 188 189 190ble = bluetooth.BLE() 191ble.active(1) 192ble.irq(irq) 193