1# Test for utimeq module which implements task queue with support for 2# wraparound time (utime.ticks_ms() style). 3try: 4 from utime import ticks_add, ticks_diff 5 from utimeq import utimeq 6except ImportError: 7 print("SKIP") 8 raise SystemExit 9 10DEBUG = 0 11 12MAX = ticks_add(0, -1) 13MODULO_HALF = MAX // 2 + 1 14 15if DEBUG: 16 17 def dprint(*v): 18 print(*v) 19 20 21else: 22 23 def dprint(*v): 24 pass 25 26 27# Try not to crash on invalid data 28h = utimeq(10) 29try: 30 h.push(1) 31 assert False 32except TypeError: 33 pass 34 35try: 36 h.pop(1) 37 assert False 38except IndexError: 39 pass 40 41# unsupported unary op 42try: 43 ~h 44 assert False 45except TypeError: 46 pass 47 48# pushing on full queue 49h = utimeq(1) 50h.push(1, 0, 0) 51try: 52 h.push(2, 0, 0) 53 assert False 54except IndexError: 55 pass 56 57# popping into invalid type 58try: 59 h.pop([]) 60 assert False 61except TypeError: 62 pass 63 64# length 65assert len(h) == 1 66 67# peektime 68assert h.peektime() == 1 69 70# peektime with empty queue 71try: 72 utimeq(1).peektime() 73 assert False 74except IndexError: 75 pass 76 77 78def pop_all(h): 79 l = [] 80 while h: 81 item = [0, 0, 0] 82 h.pop(item) 83 # print("!", item) 84 l.append(tuple(item)) 85 dprint(l) 86 return l 87 88 89def add(h, v): 90 h.push(v, 0, 0) 91 dprint("-----") 92 # h.dump() 93 dprint("-----") 94 95 96h = utimeq(10) 97add(h, 0) 98add(h, MAX) 99add(h, MAX - 1) 100add(h, 101) 101add(h, 100) 102add(h, MAX - 2) 103dprint(h) 104l = pop_all(h) 105for i in range(len(l) - 1): 106 diff = ticks_diff(l[i + 1][0], l[i][0]) 107 assert diff > 0 108 109 110def edge_case(edge, offset): 111 h = utimeq(10) 112 add(h, ticks_add(0, offset)) 113 add(h, ticks_add(edge, offset)) 114 dprint(h) 115 l = pop_all(h) 116 diff = ticks_diff(l[1][0], l[0][0]) 117 dprint(diff, diff > 0) 118 return diff 119 120 121dprint("===") 122diff = edge_case(MODULO_HALF - 1, 0) 123assert diff == MODULO_HALF - 1 124assert edge_case(MODULO_HALF - 1, 100) == diff 125assert edge_case(MODULO_HALF - 1, -100) == diff 126 127# We expect diff to be always positive, per the definition of heappop() which should return 128# the smallest value. 129# This is the edge case where this invariant breaks, due to assymetry of two's-complement 130# range - there's one more negative integer than positive, so heappushing values like below 131# will then make ticks_diff() return the minimum negative value. We could make heappop 132# return them in a different order, but ticks_diff() result would be the same. Conclusion: 133# never add to a heap values where (a - b) == MODULO_HALF (and which are >= MODULO_HALF 134# ticks apart in real time of course). 135dprint("===") 136diff = edge_case(MODULO_HALF, 0) 137assert diff == -MODULO_HALF 138assert edge_case(MODULO_HALF, 100) == diff 139assert edge_case(MODULO_HALF, -100) == diff 140 141dprint("===") 142diff = edge_case(MODULO_HALF + 1, 0) 143assert diff == MODULO_HALF - 1 144assert edge_case(MODULO_HALF + 1, 100) == diff 145assert edge_case(MODULO_HALF + 1, -100) == diff 146 147print("OK") 148