1# MicroPython uasyncio module
2# MIT license; Copyright (c) 2019 Damien P. George
3
4from time import ticks_ms as ticks, ticks_diff, ticks_add
5import sys, select
6
7# Import TaskQueue and Task, preferring built-in C code over Python code
8try:
9    from _uasyncio import TaskQueue, Task
10except:
11    from .task import TaskQueue, Task
12
13
14################################################################################
15# Exceptions
16
17
18class CancelledError(BaseException):
19    pass
20
21
22class TimeoutError(Exception):
23    pass
24
25
26# Used when calling Loop.call_exception_handler
27_exc_context = {"message": "Task exception wasn't retrieved", "exception": None, "future": None}
28
29
30################################################################################
31# Sleep functions
32
33# "Yield" once, then raise StopIteration
34class SingletonGenerator:
35    def __init__(self):
36        self.state = None
37        self.exc = StopIteration()
38
39    def __iter__(self):
40        return self
41
42    def __next__(self):
43        if self.state is not None:
44            _task_queue.push_sorted(cur_task, self.state)
45            self.state = None
46            return None
47        else:
48            self.exc.__traceback__ = None
49            raise self.exc
50
51
52# Pause task execution for the given time (integer in milliseconds, uPy extension)
53# Use a SingletonGenerator to do it without allocating on the heap
54def sleep_ms(t, sgen=SingletonGenerator()):
55    assert sgen.state is None
56    sgen.state = ticks_add(ticks(), max(0, t))
57    return sgen
58
59
60# Pause task execution for the given time (in seconds)
61def sleep(t):
62    return sleep_ms(int(t * 1000))
63
64
65################################################################################
66# Queue and poller for stream IO
67
68
69class IOQueue:
70    def __init__(self):
71        self.poller = select.poll()
72        self.map = {}  # maps id(stream) to [task_waiting_read, task_waiting_write, stream]
73
74    def _enqueue(self, s, idx):
75        if id(s) not in self.map:
76            entry = [None, None, s]
77            entry[idx] = cur_task
78            self.map[id(s)] = entry
79            self.poller.register(s, select.POLLIN if idx == 0 else select.POLLOUT)
80        else:
81            sm = self.map[id(s)]
82            assert sm[idx] is None
83            assert sm[1 - idx] is not None
84            sm[idx] = cur_task
85            self.poller.modify(s, select.POLLIN | select.POLLOUT)
86        # Link task to this IOQueue so it can be removed if needed
87        cur_task.data = self
88
89    def _dequeue(self, s):
90        del self.map[id(s)]
91        self.poller.unregister(s)
92
93    def queue_read(self, s):
94        self._enqueue(s, 0)
95
96    def queue_write(self, s):
97        self._enqueue(s, 1)
98
99    def remove(self, task):
100        while True:
101            del_s = None
102            for k in self.map:  # Iterate without allocating on the heap
103                q0, q1, s = self.map[k]
104                if q0 is task or q1 is task:
105                    del_s = s
106                    break
107            if del_s is not None:
108                self._dequeue(s)
109            else:
110                break
111
112    def wait_io_event(self, dt):
113        for s, ev in self.poller.ipoll(dt):
114            sm = self.map[id(s)]
115            # print('poll', s, sm, ev)
116            if ev & ~select.POLLOUT and sm[0] is not None:
117                # POLLIN or error
118                _task_queue.push_head(sm[0])
119                sm[0] = None
120            if ev & ~select.POLLIN and sm[1] is not None:
121                # POLLOUT or error
122                _task_queue.push_head(sm[1])
123                sm[1] = None
124            if sm[0] is None and sm[1] is None:
125                self._dequeue(s)
126            elif sm[0] is None:
127                self.poller.modify(s, select.POLLOUT)
128            else:
129                self.poller.modify(s, select.POLLIN)
130
131
132################################################################################
133# Main run loop
134
135# Ensure the awaitable is a task
136def _promote_to_task(aw):
137    return aw if isinstance(aw, Task) else create_task(aw)
138
139
140# Create and schedule a new task from a coroutine
141def create_task(coro):
142    if not hasattr(coro, "send"):
143        raise TypeError("coroutine expected")
144    t = Task(coro, globals())
145    _task_queue.push_head(t)
146    return t
147
148
149# Keep scheduling tasks until there are none left to schedule
150def run_until_complete(main_task=None):
151    global cur_task
152    excs_all = (CancelledError, Exception)  # To prevent heap allocation in loop
153    excs_stop = (CancelledError, StopIteration)  # To prevent heap allocation in loop
154    while True:
155        # Wait until the head of _task_queue is ready to run
156        dt = 1
157        while dt > 0:
158            dt = -1
159            t = _task_queue.peek()
160            if t:
161                # A task waiting on _task_queue; "ph_key" is time to schedule task at
162                dt = max(0, ticks_diff(t.ph_key, ticks()))
163            elif not _io_queue.map:
164                # No tasks can be woken so finished running
165                return
166            # print('(poll {})'.format(dt), len(_io_queue.map))
167            _io_queue.wait_io_event(dt)
168
169        # Get next task to run and continue it
170        t = _task_queue.pop_head()
171        cur_task = t
172        try:
173            # Continue running the coroutine, it's responsible for rescheduling itself
174            exc = t.data
175            if not exc:
176                t.coro.send(None)
177            else:
178                # If the task is finished and on the run queue and gets here, then it
179                # had an exception and was not await'ed on.  Throwing into it now will
180                # raise StopIteration and the code below will catch this and run the
181                # call_exception_handler function.
182                t.data = None
183                t.coro.throw(exc)
184        except excs_all as er:
185            # Check the task is not on any event queue
186            assert t.data is None
187            # This task is done, check if it's the main task and then loop should stop
188            if t is main_task:
189                if isinstance(er, StopIteration):
190                    return er.value
191                raise er
192            if t.state:
193                # Task was running but is now finished.
194                waiting = False
195                if t.state is True:
196                    # "None" indicates that the task is complete and not await'ed on (yet).
197                    t.state = None
198                else:
199                    # Schedule any other tasks waiting on the completion of this task.
200                    while t.state.peek():
201                        _task_queue.push_head(t.state.pop_head())
202                        waiting = True
203                    # "False" indicates that the task is complete and has been await'ed on.
204                    t.state = False
205                if not waiting and not isinstance(er, excs_stop):
206                    # An exception ended this detached task, so queue it for later
207                    # execution to handle the uncaught exception if no other task retrieves
208                    # the exception in the meantime (this is handled by Task.throw).
209                    _task_queue.push_head(t)
210                # Save return value of coro to pass up to caller.
211                t.data = er
212            elif t.state is None:
213                # Task is already finished and nothing await'ed on the task,
214                # so call the exception handler.
215                _exc_context["exception"] = exc
216                _exc_context["future"] = t
217                Loop.call_exception_handler(_exc_context)
218
219
220# Create a new task from a coroutine and run it until it finishes
221def run(coro):
222    return run_until_complete(create_task(coro))
223
224
225################################################################################
226# Event loop wrapper
227
228
229async def _stopper():
230    pass
231
232
233_stop_task = None
234
235
236class Loop:
237    _exc_handler = None
238
239    def create_task(coro):
240        return create_task(coro)
241
242    def run_forever():
243        global _stop_task
244        _stop_task = Task(_stopper(), globals())
245        run_until_complete(_stop_task)
246        # TODO should keep running until .stop() is called, even if there're no tasks left
247
248    def run_until_complete(aw):
249        return run_until_complete(_promote_to_task(aw))
250
251    def stop():
252        global _stop_task
253        if _stop_task is not None:
254            _task_queue.push_head(_stop_task)
255            # If stop() is called again, do nothing
256            _stop_task = None
257
258    def close():
259        pass
260
261    def set_exception_handler(handler):
262        Loop._exc_handler = handler
263
264    def get_exception_handler():
265        return Loop._exc_handler
266
267    def default_exception_handler(loop, context):
268        print(context["message"])
269        print("future:", context["future"], "coro=", context["future"].coro)
270        sys.print_exception(context["exception"])
271
272    def call_exception_handler(context):
273        (Loop._exc_handler or Loop.default_exception_handler)(Loop, context)
274
275
276# The runq_len and waitq_len arguments are for legacy uasyncio compatibility
277def get_event_loop(runq_len=0, waitq_len=0):
278    return Loop
279
280
281def current_task():
282    return cur_task
283
284
285def new_event_loop():
286    global _task_queue, _io_queue
287    # TaskQueue of Task instances
288    _task_queue = TaskQueue()
289    # Task queue and poller for stream IO
290    _io_queue = IOQueue()
291    return Loop
292
293
294# Initialise default event loop
295new_event_loop()
296