1# MicroPython uasyncio module 2# MIT license; Copyright (c) 2019-2020 Damien P. George 3 4# This file contains the core TaskQueue based on a pairing heap, and the core Task class. 5# They can optionally be replaced by C implementations. 6 7from . import core 8 9 10# pairing-heap meld of 2 heaps; O(1) 11def ph_meld(h1, h2): 12 if h1 is None: 13 return h2 14 if h2 is None: 15 return h1 16 lt = core.ticks_diff(h1.ph_key, h2.ph_key) < 0 17 if lt: 18 if h1.ph_child is None: 19 h1.ph_child = h2 20 else: 21 h1.ph_child_last.ph_next = h2 22 h1.ph_child_last = h2 23 h2.ph_next = None 24 h2.ph_rightmost_parent = h1 25 return h1 26 else: 27 h1.ph_next = h2.ph_child 28 h2.ph_child = h1 29 if h1.ph_next is None: 30 h2.ph_child_last = h1 31 h1.ph_rightmost_parent = h2 32 return h2 33 34 35# pairing-heap pairing operation; amortised O(log N) 36def ph_pairing(child): 37 heap = None 38 while child is not None: 39 n1 = child 40 child = child.ph_next 41 n1.ph_next = None 42 if child is not None: 43 n2 = child 44 child = child.ph_next 45 n2.ph_next = None 46 n1 = ph_meld(n1, n2) 47 heap = ph_meld(heap, n1) 48 return heap 49 50 51# pairing-heap delete of a node; stable, amortised O(log N) 52def ph_delete(heap, node): 53 if node is heap: 54 child = heap.ph_child 55 node.ph_child = None 56 return ph_pairing(child) 57 # Find parent of node 58 parent = node 59 while parent.ph_next is not None: 60 parent = parent.ph_next 61 parent = parent.ph_rightmost_parent 62 # Replace node with pairing of its children 63 if node is parent.ph_child and node.ph_child is None: 64 parent.ph_child = node.ph_next 65 node.ph_next = None 66 return heap 67 elif node is parent.ph_child: 68 child = node.ph_child 69 next = node.ph_next 70 node.ph_child = None 71 node.ph_next = None 72 node = ph_pairing(child) 73 parent.ph_child = node 74 else: 75 n = parent.ph_child 76 while node is not n.ph_next: 77 n = n.ph_next 78 child = node.ph_child 79 next = node.ph_next 80 node.ph_child = None 81 node.ph_next = None 82 node = ph_pairing(child) 83 if node is None: 84 node = n 85 else: 86 n.ph_next = node 87 node.ph_next = next 88 if next is None: 89 node.ph_rightmost_parent = parent 90 parent.ph_child_last = node 91 return heap 92 93 94# TaskQueue class based on the above pairing-heap functions. 95class TaskQueue: 96 def __init__(self): 97 self.heap = None 98 99 def peek(self): 100 return self.heap 101 102 def push_sorted(self, v, key): 103 v.data = None 104 v.ph_key = key 105 v.ph_child = None 106 v.ph_next = None 107 self.heap = ph_meld(v, self.heap) 108 109 def push_head(self, v): 110 self.push_sorted(v, core.ticks()) 111 112 def pop_head(self): 113 v = self.heap 114 self.heap = ph_pairing(self.heap.ph_child) 115 return v 116 117 def remove(self, v): 118 self.heap = ph_delete(self.heap, v) 119 120 121# Task class representing a coroutine, can be waited on and cancelled. 122class Task: 123 def __init__(self, coro, globals=None): 124 self.coro = coro # Coroutine of this Task 125 self.data = None # General data for queue it is waiting on 126 self.state = True # None, False, True or a TaskQueue instance 127 self.ph_key = 0 # Pairing heap 128 self.ph_child = None # Paring heap 129 self.ph_child_last = None # Paring heap 130 self.ph_next = None # Paring heap 131 self.ph_rightmost_parent = None # Paring heap 132 133 def __iter__(self): 134 if not self.state: 135 # Task finished, signal that is has been await'ed on. 136 self.state = False 137 elif self.state is True: 138 # Allocated head of linked list of Tasks waiting on completion of this task. 139 self.state = TaskQueue() 140 return self 141 142 def __next__(self): 143 if not self.state: 144 # Task finished, raise return value to caller so it can continue. 145 raise self.data 146 else: 147 # Put calling task on waiting queue. 148 self.state.push_head(core.cur_task) 149 # Set calling task's data to this task that it waits on, to double-link it. 150 core.cur_task.data = self 151 152 def done(self): 153 return not self.state 154 155 def cancel(self): 156 # Check if task is already finished. 157 if not self.state: 158 return False 159 # Can't cancel self (not supported yet). 160 if self is core.cur_task: 161 raise RuntimeError("can't cancel self") 162 # If Task waits on another task then forward the cancel to the one it's waiting on. 163 while isinstance(self.data, Task): 164 self = self.data 165 # Reschedule Task as a cancelled task. 166 if hasattr(self.data, "remove"): 167 # Not on the main running queue, remove the task from the queue it's on. 168 self.data.remove(self) 169 core._task_queue.push_head(self) 170 elif core.ticks_diff(self.ph_key, core.ticks()) > 0: 171 # On the main running queue but scheduled in the future, so bring it forward to now. 172 core._task_queue.remove(self) 173 core._task_queue.push_head(self) 174 self.data = core.CancelledError 175 return True 176