1# Test MTU exchange (initiated by both central and peripheral) and the effect on
2# notify and write size.
3
4# Seven connections are made (four central->peripheral, three peripheral->central).
5#
6# Test | Requested | Preferred | Result | Notes
7#   0  |  300 (C)  |  256 (P)  |  256   |
8#   1  |  300 (C)  |  200 (P)  |  200   |
9#   2  |  300 (C)  |  400 (P)  |  300   |
10#   3  |  300 (C)  |  50  (P)  |  50    | Shorter than 64 so the notification is truncated.
11#   4  |  290 (P)  |  256 (C)  |  256   |
12#   5  |  290 (P)  |  190 (C)  |  190   |
13#   6  |  290 (P)  |  350 (C)  |  290   |
14#
15# For each connection a notification is sent by the server (peripheral) and a characteristic
16# is written by the client (central) to ensure that the expected size is transmitted.
17#
18# Note: This currently fails on btstack for two reasons:
19# - btstack doesn't truncate writes to the MTU (it fails instead)
20# - btstack (in central mode) doesn't handle the peripheral initiating the MTU exchange
21
22from micropython import const
23import time, machine, bluetooth
24
25TIMEOUT_MS = 5000
26
27_IRQ_CENTRAL_CONNECT = const(1)
28_IRQ_CENTRAL_DISCONNECT = const(2)
29_IRQ_GATTS_WRITE = const(3)
30_IRQ_PERIPHERAL_CONNECT = const(7)
31_IRQ_PERIPHERAL_DISCONNECT = const(8)
32_IRQ_GATTC_CHARACTERISTIC_RESULT = const(11)
33_IRQ_GATTC_CHARACTERISTIC_DONE = const(12)
34_IRQ_GATTC_WRITE_DONE = const(17)
35_IRQ_GATTC_NOTIFY = const(18)
36_IRQ_MTU_EXCHANGED = const(21)
37
38SERVICE_UUID = bluetooth.UUID("A5A5A5A5-FFFF-9999-1111-5A5A5A5A5A5A")
39CHAR_UUID = bluetooth.UUID("00000000-1111-2222-3333-444444444444")
40CHAR = (
41    CHAR_UUID,
42    bluetooth.FLAG_READ | bluetooth.FLAG_WRITE | bluetooth.FLAG_NOTIFY,
43)
44SERVICE = (
45    SERVICE_UUID,
46    (CHAR,),
47)
48SERVICES = (SERVICE,)
49
50waiting_events = {}
51
52
53def irq(event, data):
54    if event == _IRQ_CENTRAL_CONNECT:
55        print("_IRQ_CENTRAL_CONNECT")
56        waiting_events[event] = data[0]
57    elif event == _IRQ_CENTRAL_DISCONNECT:
58        print("_IRQ_CENTRAL_DISCONNECT")
59    elif event == _IRQ_GATTS_WRITE:
60        print("_IRQ_GATTS_WRITE")
61    elif event == _IRQ_PERIPHERAL_CONNECT:
62        print("_IRQ_PERIPHERAL_CONNECT")
63        waiting_events[event] = data[0]
64    elif event == _IRQ_PERIPHERAL_DISCONNECT:
65        print("_IRQ_PERIPHERAL_DISCONNECT")
66    elif event == _IRQ_GATTC_CHARACTERISTIC_RESULT:
67        if data[-1] == CHAR_UUID:
68            print("_IRQ_GATTC_CHARACTERISTIC_RESULT", data[-1])
69            waiting_events[event] = data[2]
70        else:
71            return
72    elif event == _IRQ_GATTC_CHARACTERISTIC_DONE:
73        print("_IRQ_GATTC_CHARACTERISTIC_DONE")
74    elif event == _IRQ_GATTC_WRITE_DONE:
75        print("_IRQ_GATTC_WRITE_DONE")
76    elif event == _IRQ_GATTC_NOTIFY:
77        print("_IRQ_GATTC_NOTIFY", len(data[-1]), chr(data[-1][0]))
78    elif event == _IRQ_MTU_EXCHANGED:
79        print("_IRQ_MTU_EXCHANGED", data[-1])
80        waiting_events[event] = data[-1]
81
82    if event not in waiting_events:
83        waiting_events[event] = None
84
85
86def wait_for_event(event, timeout_ms):
87    t0 = time.ticks_ms()
88    while time.ticks_diff(time.ticks_ms(), t0) < timeout_ms:
89        if event in waiting_events:
90            return waiting_events.pop(event)
91        machine.idle()
92    raise ValueError("Timeout waiting for {}".format(event))
93
94
95# Acting in peripheral role.
96def instance0():
97    multitest.globals(BDADDR=ble.config("mac"))
98    ((char_handle,),) = ble.gatts_register_services(SERVICES)
99    ble.gatts_set_buffer(char_handle, 500, False)
100    print("gap_advertise")
101    ble.gap_advertise(20_000, b"\x02\x01\x06\x04\xffMPY")
102    multitest.next()
103    try:
104        for i in range(7):
105            if i == 1:
106                ble.config(mtu=200)
107            elif i == 2:
108                ble.config(mtu=400)
109            elif i == 3:
110                ble.config(mtu=50)
111            elif i >= 4:
112                ble.config(mtu=290)
113            else:
114                # This is the NimBLE default.
115                ble.config(mtu=256)
116
117            # Wait for central to connect to us.
118            conn_handle = wait_for_event(_IRQ_CENTRAL_CONNECT, TIMEOUT_MS)
119
120            if i >= 4:
121                print("gattc_exchange_mtu")
122                ble.gattc_exchange_mtu(conn_handle)
123
124            mtu = wait_for_event(_IRQ_MTU_EXCHANGED, TIMEOUT_MS)
125
126            print("gatts_notify")
127            ble.gatts_notify(conn_handle, char_handle, str(i) * 64)
128
129            # Extra timeout while client does service discovery.
130            wait_for_event(_IRQ_GATTS_WRITE, TIMEOUT_MS * 2)
131
132            print("gatts_read")
133            data = ble.gatts_read(char_handle)
134            print("characteristic len:", len(data), chr(data[0]))
135
136            # Wait for the central to disconnect.
137            wait_for_event(_IRQ_CENTRAL_DISCONNECT, TIMEOUT_MS)
138
139            print("gap_advertise")
140            ble.gap_advertise(20_000, b"\x02\x01\x06\x04\xffMPY")
141
142    finally:
143        ble.active(0)
144
145
146# Acting in central role.
147def instance1():
148    multitest.next()
149    try:
150        for i in range(7):
151            if i < 4:
152                ble.config(mtu=300)
153            elif i == 5:
154                ble.config(mtu=190)
155            elif i == 6:
156                ble.config(mtu=350)
157            else:
158                ble.config(mtu=256)
159
160            # Connect to peripheral and then disconnect.
161            # Extra scan timeout allows for the peripheral to receive the previous disconnect
162            # event and start advertising again.
163            print("gap_connect")
164            ble.gap_connect(BDADDR[0], BDADDR[1], 5000)
165            conn_handle = wait_for_event(_IRQ_PERIPHERAL_CONNECT, TIMEOUT_MS)
166
167            if i < 4:
168                print("gattc_exchange_mtu")
169                ble.gattc_exchange_mtu(conn_handle)
170
171            mtu = wait_for_event(_IRQ_MTU_EXCHANGED, TIMEOUT_MS)
172
173            wait_for_event(_IRQ_GATTC_NOTIFY, TIMEOUT_MS)
174
175            print("gattc_discover_characteristics")
176            ble.gattc_discover_characteristics(conn_handle, 1, 65535)
177            value_handle = wait_for_event(_IRQ_GATTC_CHARACTERISTIC_RESULT, TIMEOUT_MS)
178            wait_for_event(_IRQ_GATTC_CHARACTERISTIC_DONE, TIMEOUT_MS)
179
180            # Write 20 more than the MTU to test truncation.
181            print("gattc_write")
182            ble.gattc_write(conn_handle, value_handle, chr(ord("a") + i) * (mtu + 20), 1)
183            wait_for_event(_IRQ_GATTC_WRITE_DONE, TIMEOUT_MS)
184
185            # Disconnect from peripheral.
186            print("gap_disconnect:", ble.gap_disconnect(conn_handle))
187            wait_for_event(_IRQ_PERIPHERAL_DISCONNECT, TIMEOUT_MS)
188    finally:
189        ble.active(0)
190
191
192ble = bluetooth.BLE()
193ble.active(1)
194ble.irq(irq)
195