1# Test asyncio.wait_for, with forwarding cancellation 2 3try: 4 import uasyncio as asyncio 5except ImportError: 6 try: 7 import asyncio 8 except ImportError: 9 print("SKIP") 10 raise SystemExit 11 12 13async def awaiting(t, return_if_fail): 14 try: 15 print("awaiting started") 16 await asyncio.sleep(t) 17 except asyncio.CancelledError as er: 18 # CPython wait_for raises CancelledError inside task but TimeoutError in wait_for 19 print("awaiting canceled") 20 if return_if_fail: 21 return False # return has no effect if Cancelled 22 else: 23 raise er 24 except Exception as er: 25 print("caught exception", er) 26 raise er 27 28 29async def test_cancellation_forwarded(catch, catch_inside): 30 print("----------") 31 32 async def wait(): 33 try: 34 await asyncio.wait_for(awaiting(2, catch_inside), 1) 35 except asyncio.TimeoutError as er: 36 print("Got timeout error") 37 raise er 38 except asyncio.CancelledError as er: 39 print("Got canceled") 40 if not catch: 41 raise er 42 43 async def cancel(t): 44 print("cancel started") 45 await asyncio.sleep(0.01) 46 print("cancel wait()") 47 t.cancel() 48 49 t = asyncio.create_task(wait()) 50 k = asyncio.create_task(cancel(t)) 51 try: 52 await t 53 except asyncio.CancelledError: 54 print("waiting got cancelled") 55 56 57asyncio.run(test_cancellation_forwarded(False, False)) 58asyncio.run(test_cancellation_forwarded(False, True)) 59asyncio.run(test_cancellation_forwarded(True, True)) 60asyncio.run(test_cancellation_forwarded(True, False)) 61