1# MicroPython uasyncio module 2# MIT license; Copyright (c) 2019 Damien P. George 3 4from time import ticks_ms as ticks, ticks_diff, ticks_add 5import sys, select 6 7# Import TaskQueue and Task, preferring built-in C code over Python code 8try: 9 from _uasyncio import TaskQueue, Task 10except: 11 from .task import TaskQueue, Task 12 13 14################################################################################ 15# Exceptions 16 17 18class CancelledError(BaseException): 19 pass 20 21 22class TimeoutError(Exception): 23 pass 24 25 26# Used when calling Loop.call_exception_handler 27_exc_context = {"message": "Task exception wasn't retrieved", "exception": None, "future": None} 28 29 30################################################################################ 31# Sleep functions 32 33# "Yield" once, then raise StopIteration 34class SingletonGenerator: 35 def __init__(self): 36 self.state = None 37 self.exc = StopIteration() 38 39 def __iter__(self): 40 return self 41 42 def __next__(self): 43 if self.state is not None: 44 _task_queue.push_sorted(cur_task, self.state) 45 self.state = None 46 return None 47 else: 48 self.exc.__traceback__ = None 49 raise self.exc 50 51 52# Pause task execution for the given time (integer in milliseconds, uPy extension) 53# Use a SingletonGenerator to do it without allocating on the heap 54def sleep_ms(t, sgen=SingletonGenerator()): 55 assert sgen.state is None 56 sgen.state = ticks_add(ticks(), max(0, t)) 57 return sgen 58 59 60# Pause task execution for the given time (in seconds) 61def sleep(t): 62 return sleep_ms(int(t * 1000)) 63 64 65################################################################################ 66# Queue and poller for stream IO 67 68 69class IOQueue: 70 def __init__(self): 71 self.poller = select.poll() 72 self.map = {} # maps id(stream) to [task_waiting_read, task_waiting_write, stream] 73 74 def _enqueue(self, s, idx): 75 if id(s) not in self.map: 76 entry = [None, None, s] 77 entry[idx] = cur_task 78 self.map[id(s)] = entry 79 self.poller.register(s, select.POLLIN if idx == 0 else select.POLLOUT) 80 else: 81 sm = self.map[id(s)] 82 assert sm[idx] is None 83 assert sm[1 - idx] is not None 84 sm[idx] = cur_task 85 self.poller.modify(s, select.POLLIN | select.POLLOUT) 86 # Link task to this IOQueue so it can be removed if needed 87 cur_task.data = self 88 89 def _dequeue(self, s): 90 del self.map[id(s)] 91 self.poller.unregister(s) 92 93 def queue_read(self, s): 94 self._enqueue(s, 0) 95 96 def queue_write(self, s): 97 self._enqueue(s, 1) 98 99 def remove(self, task): 100 while True: 101 del_s = None 102 for k in self.map: # Iterate without allocating on the heap 103 q0, q1, s = self.map[k] 104 if q0 is task or q1 is task: 105 del_s = s 106 break 107 if del_s is not None: 108 self._dequeue(s) 109 else: 110 break 111 112 def wait_io_event(self, dt): 113 for s, ev in self.poller.ipoll(dt): 114 sm = self.map[id(s)] 115 # print('poll', s, sm, ev) 116 if ev & ~select.POLLOUT and sm[0] is not None: 117 # POLLIN or error 118 _task_queue.push_head(sm[0]) 119 sm[0] = None 120 if ev & ~select.POLLIN and sm[1] is not None: 121 # POLLOUT or error 122 _task_queue.push_head(sm[1]) 123 sm[1] = None 124 if sm[0] is None and sm[1] is None: 125 self._dequeue(s) 126 elif sm[0] is None: 127 self.poller.modify(s, select.POLLOUT) 128 else: 129 self.poller.modify(s, select.POLLIN) 130 131 132################################################################################ 133# Main run loop 134 135# Ensure the awaitable is a task 136def _promote_to_task(aw): 137 return aw if isinstance(aw, Task) else create_task(aw) 138 139 140# Create and schedule a new task from a coroutine 141def create_task(coro): 142 if not hasattr(coro, "send"): 143 raise TypeError("coroutine expected") 144 t = Task(coro, globals()) 145 _task_queue.push_head(t) 146 return t 147 148 149# Keep scheduling tasks until there are none left to schedule 150def run_until_complete(main_task=None): 151 global cur_task 152 excs_all = (CancelledError, Exception) # To prevent heap allocation in loop 153 excs_stop = (CancelledError, StopIteration) # To prevent heap allocation in loop 154 while True: 155 # Wait until the head of _task_queue is ready to run 156 dt = 1 157 while dt > 0: 158 dt = -1 159 t = _task_queue.peek() 160 if t: 161 # A task waiting on _task_queue; "ph_key" is time to schedule task at 162 dt = max(0, ticks_diff(t.ph_key, ticks())) 163 elif not _io_queue.map: 164 # No tasks can be woken so finished running 165 return 166 # print('(poll {})'.format(dt), len(_io_queue.map)) 167 _io_queue.wait_io_event(dt) 168 169 # Get next task to run and continue it 170 t = _task_queue.pop_head() 171 cur_task = t 172 try: 173 # Continue running the coroutine, it's responsible for rescheduling itself 174 exc = t.data 175 if not exc: 176 t.coro.send(None) 177 else: 178 # If the task is finished and on the run queue and gets here, then it 179 # had an exception and was not await'ed on. Throwing into it now will 180 # raise StopIteration and the code below will catch this and run the 181 # call_exception_handler function. 182 t.data = None 183 t.coro.throw(exc) 184 except excs_all as er: 185 # Check the task is not on any event queue 186 assert t.data is None 187 # This task is done, check if it's the main task and then loop should stop 188 if t is main_task: 189 if isinstance(er, StopIteration): 190 return er.value 191 raise er 192 if t.state: 193 # Task was running but is now finished. 194 waiting = False 195 if t.state is True: 196 # "None" indicates that the task is complete and not await'ed on (yet). 197 t.state = None 198 else: 199 # Schedule any other tasks waiting on the completion of this task. 200 while t.state.peek(): 201 _task_queue.push_head(t.state.pop_head()) 202 waiting = True 203 # "False" indicates that the task is complete and has been await'ed on. 204 t.state = False 205 if not waiting and not isinstance(er, excs_stop): 206 # An exception ended this detached task, so queue it for later 207 # execution to handle the uncaught exception if no other task retrieves 208 # the exception in the meantime (this is handled by Task.throw). 209 _task_queue.push_head(t) 210 # Save return value of coro to pass up to caller. 211 t.data = er 212 elif t.state is None: 213 # Task is already finished and nothing await'ed on the task, 214 # so call the exception handler. 215 _exc_context["exception"] = exc 216 _exc_context["future"] = t 217 Loop.call_exception_handler(_exc_context) 218 219 220# Create a new task from a coroutine and run it until it finishes 221def run(coro): 222 return run_until_complete(create_task(coro)) 223 224 225################################################################################ 226# Event loop wrapper 227 228 229async def _stopper(): 230 pass 231 232 233_stop_task = None 234 235 236class Loop: 237 _exc_handler = None 238 239 def create_task(coro): 240 return create_task(coro) 241 242 def run_forever(): 243 global _stop_task 244 _stop_task = Task(_stopper(), globals()) 245 run_until_complete(_stop_task) 246 # TODO should keep running until .stop() is called, even if there're no tasks left 247 248 def run_until_complete(aw): 249 return run_until_complete(_promote_to_task(aw)) 250 251 def stop(): 252 global _stop_task 253 if _stop_task is not None: 254 _task_queue.push_head(_stop_task) 255 # If stop() is called again, do nothing 256 _stop_task = None 257 258 def close(): 259 pass 260 261 def set_exception_handler(handler): 262 Loop._exc_handler = handler 263 264 def get_exception_handler(): 265 return Loop._exc_handler 266 267 def default_exception_handler(loop, context): 268 print(context["message"]) 269 print("future:", context["future"], "coro=", context["future"].coro) 270 sys.print_exception(context["exception"]) 271 272 def call_exception_handler(context): 273 (Loop._exc_handler or Loop.default_exception_handler)(Loop, context) 274 275 276# The runq_len and waitq_len arguments are for legacy uasyncio compatibility 277def get_event_loop(runq_len=0, waitq_len=0): 278 return Loop 279 280 281def current_task(): 282 return cur_task 283 284 285def new_event_loop(): 286 global _task_queue, _io_queue 287 # TaskQueue of Task instances 288 _task_queue = TaskQueue() 289 # Task queue and poller for stream IO 290 _io_queue = IOQueue() 291 return Loop 292 293 294# Initialise default event loop 295new_event_loop() 296