1# Test uasyncio TCP server and client using start_server() and open_connection() 2 3try: 4 import uasyncio as asyncio 5except ImportError: 6 try: 7 import asyncio 8 except ImportError: 9 print("SKIP") 10 raise SystemExit 11 12PORT = 8000 13 14 15async def handle_connection(reader, writer): 16 # Test that peername exists (but don't check its value, it changes) 17 writer.get_extra_info("peername") 18 19 data = await reader.read(100) 20 print("echo:", data) 21 writer.write(data) 22 await writer.drain() 23 24 print("close") 25 writer.close() 26 await writer.wait_closed() 27 28 print("done") 29 ev.set() 30 31 32async def tcp_server(): 33 global ev 34 ev = asyncio.Event() 35 server = await asyncio.start_server(handle_connection, "0.0.0.0", PORT) 36 print("server running") 37 multitest.next() 38 async with server: 39 await asyncio.wait_for(ev.wait(), 10) 40 41 42async def tcp_client(message): 43 reader, writer = await asyncio.open_connection(IP, PORT) 44 print("write:", message) 45 writer.write(message) 46 await writer.drain() 47 data = await reader.read(100) 48 print("read:", data) 49 50 51def instance0(): 52 multitest.globals(IP=multitest.get_network_ip()) 53 asyncio.run(tcp_server()) 54 55 56def instance1(): 57 multitest.next() 58 asyncio.run(tcp_client(b"client data")) 59