1#!/usr/bin/env python3 2 3# This file is part of the MicroPython project, http://micropython.org/ 4# The MIT License (MIT) 5# Copyright (c) 2020 Damien P. George 6 7import sys, os, time, re, select 8import argparse 9import itertools 10import subprocess 11import tempfile 12 13sys.path.append("../tools") 14import pyboard 15 16if os.name == "nt": 17 CPYTHON3 = os.getenv("MICROPY_CPYTHON3", "python3.exe") 18 MICROPYTHON = os.getenv("MICROPY_MICROPYTHON", "../ports/windows/micropython.exe") 19else: 20 CPYTHON3 = os.getenv("MICROPY_CPYTHON3", "python3") 21 MICROPYTHON = os.getenv("MICROPY_MICROPYTHON", "../ports/unix/micropython") 22 23# For diff'ing test output 24DIFF = os.getenv("MICROPY_DIFF", "diff -u") 25 26PYTHON_TRUTH = CPYTHON3 27 28INSTANCE_READ_TIMEOUT_S = 10 29 30APPEND_CODE_TEMPLATE = """ 31import sys 32class multitest: 33 @staticmethod 34 def flush(): 35 if hasattr(sys.stdout, "flush"): 36 sys.stdout.flush() 37 @staticmethod 38 def skip(): 39 print("SKIP") 40 multitest.flush() 41 raise SystemExit 42 @staticmethod 43 def next(): 44 print("NEXT") 45 multitest.flush() 46 @staticmethod 47 def globals(**gs): 48 for g in gs: 49 print("SET {{}} = {{!r}}".format(g, gs[g])) 50 multitest.flush() 51 @staticmethod 52 def get_network_ip(): 53 try: 54 import network 55 ip = network.WLAN().ifconfig()[0] 56 except: 57 ip = "127.0.0.1" 58 return ip 59 60{} 61 62instance{}() 63multitest.flush() 64""" 65 66# The btstack implementation on Unix generates some spurious output that we 67# can't control. 68IGNORE_OUTPUT_MATCHES = ( 69 "libusb: error ", # It tries to open devices that it doesn't have access to (libusb prints unconditionally). 70 "hci_transport_h2_libusb.c", # Same issue. We enable LOG_ERROR in btstack. 71 "USB Path: ", # Hardcoded in btstack's libusb transport. 72 "hci_number_completed_packet", # Warning from btstack. 73) 74 75 76class PyInstance: 77 def __init__(self): 78 pass 79 80 def close(self): 81 pass 82 83 def prepare_script_from_file(self, filename, prepend, append): 84 with open(filename, "rb") as f: 85 script = f.read() 86 if prepend: 87 script = bytes(prepend, "ascii") + b"\n" + script 88 if append: 89 script += b"\n" + bytes(append, "ascii") 90 return script 91 92 def run_file(self, filename, prepend="", append=""): 93 return self.run_script(self.prepare_script_from_file(filename, prepend, append)) 94 95 def start_file(self, filename, prepend="", append=""): 96 return self.start_script(self.prepare_script_from_file(filename, prepend, append)) 97 98 99class PyInstanceSubProcess(PyInstance): 100 def __init__(self, argv, env=None): 101 self.argv = argv 102 self.env = {n: v for n, v in (i.split("=") for i in env)} if env else None 103 self.popen = None 104 self.finished = True 105 106 def __str__(self): 107 return self.argv[0].rsplit("/")[-1] 108 109 def run_script(self, script): 110 output = b"" 111 err = None 112 try: 113 p = subprocess.run( 114 self.argv, 115 stdout=subprocess.PIPE, 116 stderr=subprocess.STDOUT, 117 input=script, 118 env=self.env, 119 ) 120 output = p.stdout 121 except subprocess.CalledProcessError as er: 122 err = er 123 return str(output.strip(), "ascii"), err 124 125 def start_script(self, script): 126 self.popen = subprocess.Popen( 127 self.argv, 128 stdin=subprocess.PIPE, 129 stdout=subprocess.PIPE, 130 stderr=subprocess.STDOUT, 131 env=self.env, 132 ) 133 self.popen.stdin.write(script) 134 self.popen.stdin.close() 135 self.finished = False 136 137 def stop(self): 138 if self.popen and self.popen.poll() is None: 139 self.popen.terminate() 140 141 def readline(self): 142 sel = select.select([self.popen.stdout.raw], [], [], 0.1) 143 if not sel[0]: 144 self.finished = self.popen.poll() is not None 145 return None, None 146 out = self.popen.stdout.raw.readline() 147 if out == b"": 148 self.finished = self.popen.poll() is not None 149 return None, None 150 else: 151 return str(out.rstrip(), "ascii"), None 152 153 def is_finished(self): 154 return self.finished 155 156 def wait_finished(self): 157 self.popen.wait() 158 out = self.popen.stdout.read() 159 return str(out, "ascii"), "" 160 161 162class PyInstancePyboard(PyInstance): 163 def __init__(self, device): 164 self.device = device 165 self.pyb = pyboard.Pyboard(device) 166 self.pyb.enter_raw_repl() 167 self.finished = True 168 169 def __str__(self): 170 return self.device.rsplit("/")[-1] 171 172 def close(self): 173 self.pyb.exit_raw_repl() 174 self.pyb.close() 175 176 def run_script(self, script): 177 output = b"" 178 err = None 179 try: 180 self.pyb.enter_raw_repl() 181 output = self.pyb.exec_(script) 182 except pyboard.PyboardError as er: 183 err = er 184 return str(output.strip(), "ascii"), err 185 186 def start_script(self, script): 187 self.pyb.enter_raw_repl() 188 self.pyb.exec_raw_no_follow(script) 189 self.finished = False 190 191 def stop(self): 192 self.pyb.serial.write(b"\r\x03") 193 194 def readline(self): 195 if self.finished: 196 return None, None 197 if self.pyb.serial.inWaiting() == 0: 198 return None, None 199 out = self.pyb.read_until(1, (b"\r\n", b"\x04")) 200 if out.endswith(b"\x04"): 201 self.finished = True 202 out = out[:-1] 203 err = str(self.pyb.read_until(1, b"\x04"), "ascii") 204 err = err[:-1] 205 if not out and not err: 206 return None, None 207 else: 208 err = None 209 return str(out.rstrip(), "ascii"), err 210 211 def is_finished(self): 212 return self.finished 213 214 def wait_finished(self): 215 out, err = self.pyb.follow(10, None) 216 return str(out, "ascii"), str(err, "ascii") 217 218 219def prepare_test_file_list(test_files): 220 test_files2 = [] 221 for test_file in sorted(test_files): 222 num_instances = 0 223 with open(test_file) as f: 224 for line in f: 225 m = re.match(r"def instance([0-9]+)\(\):", line) 226 if m: 227 num_instances = max(num_instances, int(m.group(1)) + 1) 228 test_files2.append((test_file, num_instances)) 229 return test_files2 230 231 232def trace_instance_output(instance_idx, line): 233 if cmd_args.trace_output: 234 t_ms = round((time.time() - trace_t0) * 1000) 235 print("{:6} i{} :".format(t_ms, instance_idx), line) 236 237 238def run_test_on_instances(test_file, num_instances, instances): 239 global trace_t0 240 trace_t0 = time.time() 241 242 error = False 243 skip = False 244 injected_globals = "" 245 output = [[] for _ in range(num_instances)] 246 247 if cmd_args.trace_output: 248 print("TRACE {}:".format("|".join(str(i) for i in instances))) 249 250 # Start all instances running, in order, waiting until they signal they are ready 251 for idx in range(num_instances): 252 append_code = APPEND_CODE_TEMPLATE.format(injected_globals, idx) 253 instance = instances[idx] 254 instance.start_file(test_file, append=append_code) 255 last_read_time = time.time() 256 while True: 257 if instance.is_finished(): 258 break 259 out, err = instance.readline() 260 if out is None and err is None: 261 if time.time() > last_read_time + INSTANCE_READ_TIMEOUT_S: 262 output[idx].append("TIMEOUT") 263 error = True 264 break 265 time.sleep(0.1) 266 continue 267 last_read_time = time.time() 268 if out is not None and not any(m in out for m in IGNORE_OUTPUT_MATCHES): 269 trace_instance_output(idx, out) 270 if out.startswith("SET "): 271 injected_globals += out[4:] + "\n" 272 elif out == "SKIP": 273 skip = True 274 break 275 elif out == "NEXT": 276 break 277 else: 278 output[idx].append(out) 279 if err is not None: 280 trace_instance_output(idx, err) 281 output[idx].append(err) 282 error = True 283 284 if error or skip: 285 break 286 287 if not error and not skip: 288 # Capture output and wait for all instances to finish running 289 last_read_time = [time.time() for _ in range(num_instances)] 290 while True: 291 num_running = 0 292 num_output = 0 293 for idx in range(num_instances): 294 instance = instances[idx] 295 if instance.is_finished(): 296 continue 297 num_running += 1 298 out, err = instance.readline() 299 if out is None and err is None: 300 if time.time() > last_read_time[idx] + INSTANCE_READ_TIMEOUT_S: 301 output[idx].append("TIMEOUT") 302 error = True 303 continue 304 num_output += 1 305 last_read_time[idx] = time.time() 306 if out is not None and not any(m in out for m in IGNORE_OUTPUT_MATCHES): 307 trace_instance_output(idx, out) 308 output[idx].append(out) 309 if err is not None: 310 trace_instance_output(idx, err) 311 output[idx].append(err) 312 error = True 313 314 if not num_output: 315 time.sleep(0.1) 316 if not num_running or error: 317 break 318 319 # Stop all instances 320 for idx in range(num_instances): 321 instances[idx].stop() 322 323 output_str = "" 324 for idx, lines in enumerate(output): 325 output_str += "--- instance{} ---\n".format(idx) 326 output_str += "\n".join(lines) + "\n" 327 328 return error, skip, output_str 329 330 331def print_diff(a, b): 332 a_fd, a_path = tempfile.mkstemp(text=True) 333 b_fd, b_path = tempfile.mkstemp(text=True) 334 os.write(a_fd, a.encode()) 335 os.write(b_fd, b.encode()) 336 os.close(a_fd) 337 os.close(b_fd) 338 subprocess.run(DIFF.split(" ") + [a_path, b_path]) 339 os.unlink(a_path) 340 os.unlink(b_path) 341 342 343def run_tests(test_files, instances_truth, instances_test): 344 skipped_tests = [] 345 passed_tests = [] 346 failed_tests = [] 347 348 for test_file, num_instances in test_files: 349 instances_str = "|".join(str(instances_test[i]) for i in range(num_instances)) 350 print("{} on {}: ".format(test_file, instances_str), end="") 351 if cmd_args.show_output or cmd_args.trace_output: 352 print() 353 sys.stdout.flush() 354 355 # Run test on test instances 356 error, skip, output_test = run_test_on_instances(test_file, num_instances, instances_test) 357 358 if not skip: 359 # Check if truth exists in a file, and read it in 360 test_file_expected = test_file + ".exp" 361 if os.path.isfile(test_file_expected): 362 with open(test_file_expected) as f: 363 output_truth = f.read() 364 else: 365 # Run test on truth instances to get expected output 366 _, _, output_truth = run_test_on_instances( 367 test_file, num_instances, instances_truth 368 ) 369 370 if cmd_args.show_output: 371 print("### TEST ###") 372 print(output_test, end="") 373 if not skip: 374 print("### TRUTH ###") 375 print(output_truth, end="") 376 377 # Print result of test 378 if skip: 379 print("skip") 380 skipped_tests.append(test_file) 381 elif output_test == output_truth: 382 print("pass") 383 passed_tests.append(test_file) 384 else: 385 print("FAIL") 386 failed_tests.append(test_file) 387 if not cmd_args.show_output: 388 print("### TEST ###") 389 print(output_test, end="") 390 print("### TRUTH ###") 391 print(output_truth, end="") 392 print("### DIFF ###") 393 print_diff(output_truth, output_test) 394 395 if cmd_args.show_output: 396 print() 397 398 print("{} tests performed".format(len(skipped_tests) + len(passed_tests) + len(failed_tests))) 399 print("{} tests passed".format(len(passed_tests))) 400 401 if skipped_tests: 402 print("{} tests skipped: {}".format(len(skipped_tests), " ".join(skipped_tests))) 403 if failed_tests: 404 print("{} tests failed: {}".format(len(failed_tests), " ".join(failed_tests))) 405 406 return not failed_tests 407 408 409def main(): 410 global cmd_args 411 412 cmd_parser = argparse.ArgumentParser(description="Run network tests for MicroPython") 413 cmd_parser.add_argument( 414 "-s", "--show-output", action="store_true", help="show test output after running" 415 ) 416 cmd_parser.add_argument( 417 "-t", "--trace-output", action="store_true", help="trace test output while running" 418 ) 419 cmd_parser.add_argument( 420 "-i", "--instance", action="append", default=[], help="instance(s) to run the tests on" 421 ) 422 cmd_parser.add_argument( 423 "-p", 424 "--permutations", 425 type=int, 426 default=1, 427 help="repeat the test with this many permutations of the instance order", 428 ) 429 cmd_parser.add_argument("files", nargs="+", help="input test files") 430 cmd_args = cmd_parser.parse_args() 431 432 # clear search path to make sure tests use only builtin modules and those in extmod 433 os.environ["MICROPYPATH"] = os.pathsep + "../extmod" 434 435 test_files = prepare_test_file_list(cmd_args.files) 436 max_instances = max(t[1] for t in test_files) 437 438 instances_truth = [PyInstanceSubProcess([PYTHON_TRUTH]) for _ in range(max_instances)] 439 440 instances_test = [] 441 for i in cmd_args.instance: 442 # Each instance arg is <cmd>,ENV=VAR,ENV=VAR... 443 i = i.split(",") 444 cmd = i[0] 445 env = i[1:] 446 if cmd.startswith("exec:"): 447 instances_test.append(PyInstanceSubProcess([cmd[len("exec:") :]], env)) 448 elif cmd == "micropython": 449 instances_test.append(PyInstanceSubProcess([MICROPYTHON], env)) 450 elif cmd == "cpython": 451 instances_test.append(PyInstanceSubProcess([CPYTHON3], env)) 452 elif cmd.startswith("pyb:"): 453 instances_test.append(PyInstancePyboard(cmd[len("pyb:") :])) 454 else: 455 print("unknown instance string: {}".format(cmd), file=sys.stderr) 456 sys.exit(1) 457 458 for _ in range(max_instances - len(instances_test)): 459 instances_test.append(PyInstanceSubProcess([MICROPYTHON])) 460 461 all_pass = True 462 try: 463 for i, instances_test_permutation in enumerate(itertools.permutations(instances_test)): 464 if i >= cmd_args.permutations: 465 break 466 467 all_pass &= run_tests(test_files, instances_truth, instances_test_permutation) 468 469 finally: 470 for i in instances_truth: 471 i.close() 472 for i in instances_test: 473 i.close() 474 475 if not all_pass: 476 sys.exit(1) 477 478 479if __name__ == "__main__": 480 main() 481