1 /*
2  * This file is part of the MicroPython project, http://micropython.org/
3  *
4  * The MIT License (MIT)
5  *
6  * Copyright (c) 2020 Damien P. George
7  *
8  * Permission is hereby granted, free of charge, to any person obtaining a copy
9  * of this software and associated documentation files (the "Software"), to deal
10  * in the Software without restriction, including without limitation the rights
11  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
12  * copies of the Software, and to permit persons to whom the Software is
13  * furnished to do so, subject to the following conditions:
14  *
15  * The above copyright notice and this permission notice shall be included in
16  * all copies or substantial portions of the Software.
17  *
18  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
19  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
20  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
21  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
22  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
23  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
24  * THE SOFTWARE.
25  */
26 
27 #include "py/runtime.h"
28 #include "py/smallint.h"
29 #include "py/pairheap.h"
30 #include "py/mphal.h"
31 
32 #if MICROPY_PY_UASYNCIO
33 
34 // Used when task cannot be guaranteed to be non-NULL.
35 #define TASK_PAIRHEAP(task) ((task) ? &(task)->pairheap : NULL)
36 
37 #define TASK_STATE_RUNNING_NOT_WAITED_ON (mp_const_true)
38 #define TASK_STATE_DONE_NOT_WAITED_ON (mp_const_none)
39 #define TASK_STATE_DONE_WAS_WAITED_ON (mp_const_false)
40 
41 #define TASK_IS_DONE(task) ( \
42     (task)->state == TASK_STATE_DONE_NOT_WAITED_ON \
43     || (task)->state == TASK_STATE_DONE_WAS_WAITED_ON)
44 
45 typedef struct _mp_obj_task_t {
46     mp_pairheap_t pairheap;
47     mp_obj_t coro;
48     mp_obj_t data;
49     mp_obj_t state;
50     mp_obj_t ph_key;
51 } mp_obj_task_t;
52 
53 typedef struct _mp_obj_task_queue_t {
54     mp_obj_base_t base;
55     mp_obj_task_t *heap;
56 } mp_obj_task_queue_t;
57 
58 STATIC const mp_obj_type_t task_queue_type;
59 STATIC const mp_obj_type_t task_type;
60 
61 STATIC mp_obj_t task_queue_make_new(const mp_obj_type_t *type, size_t n_args, size_t n_kw, const mp_obj_t *args);
62 
63 /******************************************************************************/
64 // Ticks for task ordering in pairing heap
65 
ticks(void)66 STATIC mp_obj_t ticks(void) {
67     return MP_OBJ_NEW_SMALL_INT(mp_hal_ticks_ms() & (MICROPY_PY_UTIME_TICKS_PERIOD - 1));
68 }
69 
ticks_diff(mp_obj_t t1_in,mp_obj_t t0_in)70 STATIC mp_int_t ticks_diff(mp_obj_t t1_in, mp_obj_t t0_in) {
71     mp_uint_t t0 = MP_OBJ_SMALL_INT_VALUE(t0_in);
72     mp_uint_t t1 = MP_OBJ_SMALL_INT_VALUE(t1_in);
73     mp_int_t diff = ((t1 - t0 + MICROPY_PY_UTIME_TICKS_PERIOD / 2) & (MICROPY_PY_UTIME_TICKS_PERIOD - 1))
74         - MICROPY_PY_UTIME_TICKS_PERIOD / 2;
75     return diff;
76 }
77 
task_lt(mp_pairheap_t * n1,mp_pairheap_t * n2)78 STATIC int task_lt(mp_pairheap_t *n1, mp_pairheap_t *n2) {
79     mp_obj_task_t *t1 = (mp_obj_task_t *)n1;
80     mp_obj_task_t *t2 = (mp_obj_task_t *)n2;
81     return MP_OBJ_SMALL_INT_VALUE(ticks_diff(t1->ph_key, t2->ph_key)) < 0;
82 }
83 
84 /******************************************************************************/
85 // TaskQueue class
86 
task_queue_make_new(const mp_obj_type_t * type,size_t n_args,size_t n_kw,const mp_obj_t * args)87 STATIC mp_obj_t task_queue_make_new(const mp_obj_type_t *type, size_t n_args, size_t n_kw, const mp_obj_t *args) {
88     (void)args;
89     mp_arg_check_num(n_args, n_kw, 0, 0, false);
90     mp_obj_task_queue_t *self = m_new_obj(mp_obj_task_queue_t);
91     self->base.type = type;
92     self->heap = (mp_obj_task_t *)mp_pairheap_new(task_lt);
93     return MP_OBJ_FROM_PTR(self);
94 }
95 
task_queue_peek(mp_obj_t self_in)96 STATIC mp_obj_t task_queue_peek(mp_obj_t self_in) {
97     mp_obj_task_queue_t *self = MP_OBJ_TO_PTR(self_in);
98     if (self->heap == NULL) {
99         return mp_const_none;
100     } else {
101         return MP_OBJ_FROM_PTR(self->heap);
102     }
103 }
104 STATIC MP_DEFINE_CONST_FUN_OBJ_1(task_queue_peek_obj, task_queue_peek);
105 
task_queue_push_sorted(size_t n_args,const mp_obj_t * args)106 STATIC mp_obj_t task_queue_push_sorted(size_t n_args, const mp_obj_t *args) {
107     mp_obj_task_queue_t *self = MP_OBJ_TO_PTR(args[0]);
108     mp_obj_task_t *task = MP_OBJ_TO_PTR(args[1]);
109     task->data = mp_const_none;
110     if (n_args == 2) {
111         task->ph_key = ticks();
112     } else {
113         assert(mp_obj_is_small_int(args[2]));
114         task->ph_key = args[2];
115     }
116     self->heap = (mp_obj_task_t *)mp_pairheap_push(task_lt, TASK_PAIRHEAP(self->heap), TASK_PAIRHEAP(task));
117     return mp_const_none;
118 }
119 STATIC MP_DEFINE_CONST_FUN_OBJ_VAR_BETWEEN(task_queue_push_sorted_obj, 2, 3, task_queue_push_sorted);
120 
task_queue_pop_head(mp_obj_t self_in)121 STATIC mp_obj_t task_queue_pop_head(mp_obj_t self_in) {
122     mp_obj_task_queue_t *self = MP_OBJ_TO_PTR(self_in);
123     mp_obj_task_t *head = (mp_obj_task_t *)mp_pairheap_peek(task_lt, &self->heap->pairheap);
124     if (head == NULL) {
125         mp_raise_msg(&mp_type_IndexError, MP_ERROR_TEXT("empty heap"));
126     }
127     self->heap = (mp_obj_task_t *)mp_pairheap_pop(task_lt, &self->heap->pairheap);
128     return MP_OBJ_FROM_PTR(head);
129 }
130 STATIC MP_DEFINE_CONST_FUN_OBJ_1(task_queue_pop_head_obj, task_queue_pop_head);
131 
task_queue_remove(mp_obj_t self_in,mp_obj_t task_in)132 STATIC mp_obj_t task_queue_remove(mp_obj_t self_in, mp_obj_t task_in) {
133     mp_obj_task_queue_t *self = MP_OBJ_TO_PTR(self_in);
134     mp_obj_task_t *task = MP_OBJ_TO_PTR(task_in);
135     self->heap = (mp_obj_task_t *)mp_pairheap_delete(task_lt, &self->heap->pairheap, &task->pairheap);
136     return mp_const_none;
137 }
138 STATIC MP_DEFINE_CONST_FUN_OBJ_2(task_queue_remove_obj, task_queue_remove);
139 
140 STATIC const mp_rom_map_elem_t task_queue_locals_dict_table[] = {
141     { MP_ROM_QSTR(MP_QSTR_peek), MP_ROM_PTR(&task_queue_peek_obj) },
142     { MP_ROM_QSTR(MP_QSTR_push_sorted), MP_ROM_PTR(&task_queue_push_sorted_obj) },
143     { MP_ROM_QSTR(MP_QSTR_push_head), MP_ROM_PTR(&task_queue_push_sorted_obj) },
144     { MP_ROM_QSTR(MP_QSTR_pop_head), MP_ROM_PTR(&task_queue_pop_head_obj) },
145     { MP_ROM_QSTR(MP_QSTR_remove), MP_ROM_PTR(&task_queue_remove_obj) },
146 };
147 STATIC MP_DEFINE_CONST_DICT(task_queue_locals_dict, task_queue_locals_dict_table);
148 
149 STATIC const mp_obj_type_t task_queue_type = {
150     { &mp_type_type },
151     .name = MP_QSTR_TaskQueue,
152     .make_new = task_queue_make_new,
153     .locals_dict = (mp_obj_dict_t *)&task_queue_locals_dict,
154 };
155 
156 /******************************************************************************/
157 // Task class
158 
159 // This is the core uasyncio context with cur_task, _task_queue and CancelledError.
160 STATIC mp_obj_t uasyncio_context = MP_OBJ_NULL;
161 
task_make_new(const mp_obj_type_t * type,size_t n_args,size_t n_kw,const mp_obj_t * args)162 STATIC mp_obj_t task_make_new(const mp_obj_type_t *type, size_t n_args, size_t n_kw, const mp_obj_t *args) {
163     mp_arg_check_num(n_args, n_kw, 1, 2, false);
164     mp_obj_task_t *self = m_new_obj(mp_obj_task_t);
165     self->pairheap.base.type = type;
166     mp_pairheap_init_node(task_lt, &self->pairheap);
167     self->coro = args[0];
168     self->data = mp_const_none;
169     self->state = TASK_STATE_RUNNING_NOT_WAITED_ON;
170     self->ph_key = MP_OBJ_NEW_SMALL_INT(0);
171     if (n_args == 2) {
172         uasyncio_context = args[1];
173     }
174     return MP_OBJ_FROM_PTR(self);
175 }
176 
task_done(mp_obj_t self_in)177 STATIC mp_obj_t task_done(mp_obj_t self_in) {
178     mp_obj_task_t *self = MP_OBJ_TO_PTR(self_in);
179     return mp_obj_new_bool(TASK_IS_DONE(self));
180 }
181 STATIC MP_DEFINE_CONST_FUN_OBJ_1(task_done_obj, task_done);
182 
task_cancel(mp_obj_t self_in)183 STATIC mp_obj_t task_cancel(mp_obj_t self_in) {
184     mp_obj_task_t *self = MP_OBJ_TO_PTR(self_in);
185     // Check if task is already finished.
186     if (TASK_IS_DONE(self)) {
187         return mp_const_false;
188     }
189     // Can't cancel self (not supported yet).
190     mp_obj_t cur_task = mp_obj_dict_get(uasyncio_context, MP_OBJ_NEW_QSTR(MP_QSTR_cur_task));
191     if (self_in == cur_task) {
192         mp_raise_msg(&mp_type_RuntimeError, MP_ERROR_TEXT("can't cancel self"));
193     }
194     // If Task waits on another task then forward the cancel to the one it's waiting on.
195     while (mp_obj_is_subclass_fast(MP_OBJ_FROM_PTR(mp_obj_get_type(self->data)), MP_OBJ_FROM_PTR(&task_type))) {
196         self = MP_OBJ_TO_PTR(self->data);
197     }
198 
199     mp_obj_t _task_queue = mp_obj_dict_get(uasyncio_context, MP_OBJ_NEW_QSTR(MP_QSTR__task_queue));
200 
201     // Reschedule Task as a cancelled task.
202     mp_obj_t dest[3];
203     mp_load_method_maybe(self->data, MP_QSTR_remove, dest);
204     if (dest[0] != MP_OBJ_NULL) {
205         // Not on the main running queue, remove the task from the queue it's on.
206         dest[2] = MP_OBJ_FROM_PTR(self);
207         mp_call_method_n_kw(1, 0, dest);
208         // _task_queue.push_head(self)
209         dest[0] = _task_queue;
210         dest[1] = MP_OBJ_FROM_PTR(self);
211         task_queue_push_sorted(2, dest);
212     } else if (ticks_diff(self->ph_key, ticks()) > 0) {
213         // On the main running queue but scheduled in the future, so bring it forward to now.
214         // _task_queue.remove(self)
215         task_queue_remove(_task_queue, MP_OBJ_FROM_PTR(self));
216         // _task_queue.push_head(self)
217         dest[0] = _task_queue;
218         dest[1] = MP_OBJ_FROM_PTR(self);
219         task_queue_push_sorted(2, dest);
220     }
221 
222     self->data = mp_obj_dict_get(uasyncio_context, MP_OBJ_NEW_QSTR(MP_QSTR_CancelledError));
223 
224     return mp_const_true;
225 }
226 STATIC MP_DEFINE_CONST_FUN_OBJ_1(task_cancel_obj, task_cancel);
227 
task_attr(mp_obj_t self_in,qstr attr,mp_obj_t * dest)228 STATIC void task_attr(mp_obj_t self_in, qstr attr, mp_obj_t *dest) {
229     mp_obj_task_t *self = MP_OBJ_TO_PTR(self_in);
230     if (dest[0] == MP_OBJ_NULL) {
231         // Load
232         if (attr == MP_QSTR_coro) {
233             dest[0] = self->coro;
234         } else if (attr == MP_QSTR_data) {
235             dest[0] = self->data;
236         } else if (attr == MP_QSTR_state) {
237             dest[0] = self->state;
238         } else if (attr == MP_QSTR_done) {
239             dest[0] = MP_OBJ_FROM_PTR(&task_done_obj);
240             dest[1] = self_in;
241         } else if (attr == MP_QSTR_cancel) {
242             dest[0] = MP_OBJ_FROM_PTR(&task_cancel_obj);
243             dest[1] = self_in;
244         } else if (attr == MP_QSTR_ph_key) {
245             dest[0] = self->ph_key;
246         }
247     } else if (dest[1] != MP_OBJ_NULL) {
248         // Store
249         if (attr == MP_QSTR_data) {
250             self->data = dest[1];
251             dest[0] = MP_OBJ_NULL;
252         } else if (attr == MP_QSTR_state) {
253             self->state = dest[1];
254             dest[0] = MP_OBJ_NULL;
255         }
256     }
257 }
258 
task_getiter(mp_obj_t self_in,mp_obj_iter_buf_t * iter_buf)259 STATIC mp_obj_t task_getiter(mp_obj_t self_in, mp_obj_iter_buf_t *iter_buf) {
260     (void)iter_buf;
261     mp_obj_task_t *self = MP_OBJ_TO_PTR(self_in);
262     if (TASK_IS_DONE(self)) {
263         // Signal that the completed-task has been await'ed on.
264         self->state = TASK_STATE_DONE_WAS_WAITED_ON;
265     } else if (self->state == TASK_STATE_RUNNING_NOT_WAITED_ON) {
266         // Allocate the waiting queue.
267         self->state = task_queue_make_new(&task_queue_type, 0, 0, NULL);
268     }
269     return self_in;
270 }
271 
task_iternext(mp_obj_t self_in)272 STATIC mp_obj_t task_iternext(mp_obj_t self_in) {
273     mp_obj_task_t *self = MP_OBJ_TO_PTR(self_in);
274     if (TASK_IS_DONE(self)) {
275         // Task finished, raise return value to caller so it can continue.
276         nlr_raise(self->data);
277     } else {
278         // Put calling task on waiting queue.
279         mp_obj_t cur_task = mp_obj_dict_get(uasyncio_context, MP_OBJ_NEW_QSTR(MP_QSTR_cur_task));
280         mp_obj_t args[2] = { self->state, cur_task };
281         task_queue_push_sorted(2, args);
282         // Set calling task's data to this task that it waits on, to double-link it.
283         ((mp_obj_task_t *)MP_OBJ_TO_PTR(cur_task))->data = self_in;
284     }
285     return mp_const_none;
286 }
287 
288 STATIC const mp_obj_type_t task_type = {
289     { &mp_type_type },
290     .name = MP_QSTR_Task,
291     .make_new = task_make_new,
292     .attr = task_attr,
293     .getiter = task_getiter,
294     .iternext = task_iternext,
295 };
296 
297 /******************************************************************************/
298 // C-level uasyncio module
299 
300 STATIC const mp_rom_map_elem_t mp_module_uasyncio_globals_table[] = {
301     { MP_ROM_QSTR(MP_QSTR___name__), MP_ROM_QSTR(MP_QSTR__uasyncio) },
302     { MP_ROM_QSTR(MP_QSTR_TaskQueue), MP_ROM_PTR(&task_queue_type) },
303     { MP_ROM_QSTR(MP_QSTR_Task), MP_ROM_PTR(&task_type) },
304 };
305 STATIC MP_DEFINE_CONST_DICT(mp_module_uasyncio_globals, mp_module_uasyncio_globals_table);
306 
307 const mp_obj_module_t mp_module_uasyncio = {
308     .base = { &mp_type_module },
309     .globals = (mp_obj_dict_t *)&mp_module_uasyncio_globals,
310 };
311 
312 #endif // MICROPY_PY_UASYNCIO
313