1#!/usr/bin/env python3
2
3# This file is part of the MicroPython project, http://micropython.org/
4# The MIT License (MIT)
5# Copyright (c) 2020 Damien P. George
6
7import sys, os, time, re, select
8import argparse
9import itertools
10import subprocess
11import tempfile
12
13sys.path.append("../tools")
14import pyboard
15
16if os.name == "nt":
17    CPYTHON3 = os.getenv("MICROPY_CPYTHON3", "python3.exe")
18    MICROPYTHON = os.getenv("MICROPY_MICROPYTHON", "../ports/windows/micropython.exe")
19else:
20    CPYTHON3 = os.getenv("MICROPY_CPYTHON3", "python3")
21    MICROPYTHON = os.getenv("MICROPY_MICROPYTHON", "../ports/unix/micropython")
22
23# For diff'ing test output
24DIFF = os.getenv("MICROPY_DIFF", "diff -u")
25
26PYTHON_TRUTH = CPYTHON3
27
28INSTANCE_READ_TIMEOUT_S = 10
29
30APPEND_CODE_TEMPLATE = """
31import sys
32class multitest:
33    @staticmethod
34    def flush():
35        if hasattr(sys.stdout, "flush"):
36            sys.stdout.flush()
37    @staticmethod
38    def skip():
39        print("SKIP")
40        multitest.flush()
41        raise SystemExit
42    @staticmethod
43    def next():
44        print("NEXT")
45        multitest.flush()
46    @staticmethod
47    def globals(**gs):
48        for g in gs:
49            print("SET {{}} = {{!r}}".format(g, gs[g]))
50        multitest.flush()
51    @staticmethod
52    def get_network_ip():
53        try:
54            import network
55            ip = network.WLAN().ifconfig()[0]
56        except:
57            ip = "127.0.0.1"
58        return ip
59
60{}
61
62instance{}()
63multitest.flush()
64"""
65
66# The btstack implementation on Unix generates some spurious output that we
67# can't control.
68IGNORE_OUTPUT_MATCHES = (
69    "libusb: error ",  # It tries to open devices that it doesn't have access to (libusb prints unconditionally).
70    "hci_transport_h2_libusb.c",  # Same issue. We enable LOG_ERROR in btstack.
71    "USB Path: ",  # Hardcoded in btstack's libusb transport.
72    "hci_number_completed_packet",  # Warning from btstack.
73)
74
75
76class PyInstance:
77    def __init__(self):
78        pass
79
80    def close(self):
81        pass
82
83    def prepare_script_from_file(self, filename, prepend, append):
84        with open(filename, "rb") as f:
85            script = f.read()
86        if prepend:
87            script = bytes(prepend, "ascii") + b"\n" + script
88        if append:
89            script += b"\n" + bytes(append, "ascii")
90        return script
91
92    def run_file(self, filename, prepend="", append=""):
93        return self.run_script(self.prepare_script_from_file(filename, prepend, append))
94
95    def start_file(self, filename, prepend="", append=""):
96        return self.start_script(self.prepare_script_from_file(filename, prepend, append))
97
98
99class PyInstanceSubProcess(PyInstance):
100    def __init__(self, argv, env=None):
101        self.argv = argv
102        self.env = {n: v for n, v in (i.split("=") for i in env)} if env else None
103        self.popen = None
104        self.finished = True
105
106    def __str__(self):
107        return self.argv[0].rsplit("/")[-1]
108
109    def run_script(self, script):
110        output = b""
111        err = None
112        try:
113            p = subprocess.run(
114                self.argv,
115                stdout=subprocess.PIPE,
116                stderr=subprocess.STDOUT,
117                input=script,
118                env=self.env,
119            )
120            output = p.stdout
121        except subprocess.CalledProcessError as er:
122            err = er
123        return str(output.strip(), "ascii"), err
124
125    def start_script(self, script):
126        self.popen = subprocess.Popen(
127            self.argv,
128            stdin=subprocess.PIPE,
129            stdout=subprocess.PIPE,
130            stderr=subprocess.STDOUT,
131            env=self.env,
132        )
133        self.popen.stdin.write(script)
134        self.popen.stdin.close()
135        self.finished = False
136
137    def stop(self):
138        if self.popen and self.popen.poll() is None:
139            self.popen.terminate()
140
141    def readline(self):
142        sel = select.select([self.popen.stdout.raw], [], [], 0.1)
143        if not sel[0]:
144            self.finished = self.popen.poll() is not None
145            return None, None
146        out = self.popen.stdout.raw.readline()
147        if out == b"":
148            self.finished = self.popen.poll() is not None
149            return None, None
150        else:
151            return str(out.rstrip(), "ascii"), None
152
153    def is_finished(self):
154        return self.finished
155
156    def wait_finished(self):
157        self.popen.wait()
158        out = self.popen.stdout.read()
159        return str(out, "ascii"), ""
160
161
162class PyInstancePyboard(PyInstance):
163    def __init__(self, device):
164        self.device = device
165        self.pyb = pyboard.Pyboard(device)
166        self.pyb.enter_raw_repl()
167        self.finished = True
168
169    def __str__(self):
170        return self.device.rsplit("/")[-1]
171
172    def close(self):
173        self.pyb.exit_raw_repl()
174        self.pyb.close()
175
176    def run_script(self, script):
177        output = b""
178        err = None
179        try:
180            self.pyb.enter_raw_repl()
181            output = self.pyb.exec_(script)
182        except pyboard.PyboardError as er:
183            err = er
184        return str(output.strip(), "ascii"), err
185
186    def start_script(self, script):
187        self.pyb.enter_raw_repl()
188        self.pyb.exec_raw_no_follow(script)
189        self.finished = False
190
191    def stop(self):
192        self.pyb.serial.write(b"\r\x03")
193
194    def readline(self):
195        if self.finished:
196            return None, None
197        if self.pyb.serial.inWaiting() == 0:
198            return None, None
199        out = self.pyb.read_until(1, (b"\r\n", b"\x04"))
200        if out.endswith(b"\x04"):
201            self.finished = True
202            out = out[:-1]
203            err = str(self.pyb.read_until(1, b"\x04"), "ascii")
204            err = err[:-1]
205            if not out and not err:
206                return None, None
207        else:
208            err = None
209        return str(out.rstrip(), "ascii"), err
210
211    def is_finished(self):
212        return self.finished
213
214    def wait_finished(self):
215        out, err = self.pyb.follow(10, None)
216        return str(out, "ascii"), str(err, "ascii")
217
218
219def prepare_test_file_list(test_files):
220    test_files2 = []
221    for test_file in sorted(test_files):
222        num_instances = 0
223        with open(test_file) as f:
224            for line in f:
225                m = re.match(r"def instance([0-9]+)\(\):", line)
226                if m:
227                    num_instances = max(num_instances, int(m.group(1)) + 1)
228        test_files2.append((test_file, num_instances))
229    return test_files2
230
231
232def trace_instance_output(instance_idx, line):
233    if cmd_args.trace_output:
234        t_ms = round((time.time() - trace_t0) * 1000)
235        print("{:6} i{} :".format(t_ms, instance_idx), line)
236
237
238def run_test_on_instances(test_file, num_instances, instances):
239    global trace_t0
240    trace_t0 = time.time()
241
242    error = False
243    skip = False
244    injected_globals = ""
245    output = [[] for _ in range(num_instances)]
246
247    if cmd_args.trace_output:
248        print("TRACE {}:".format("|".join(str(i) for i in instances)))
249
250    # Start all instances running, in order, waiting until they signal they are ready
251    for idx in range(num_instances):
252        append_code = APPEND_CODE_TEMPLATE.format(injected_globals, idx)
253        instance = instances[idx]
254        instance.start_file(test_file, append=append_code)
255        last_read_time = time.time()
256        while True:
257            if instance.is_finished():
258                break
259            out, err = instance.readline()
260            if out is None and err is None:
261                if time.time() > last_read_time + INSTANCE_READ_TIMEOUT_S:
262                    output[idx].append("TIMEOUT")
263                    error = True
264                    break
265                time.sleep(0.1)
266                continue
267            last_read_time = time.time()
268            if out is not None and not any(m in out for m in IGNORE_OUTPUT_MATCHES):
269                trace_instance_output(idx, out)
270                if out.startswith("SET "):
271                    injected_globals += out[4:] + "\n"
272                elif out == "SKIP":
273                    skip = True
274                    break
275                elif out == "NEXT":
276                    break
277                else:
278                    output[idx].append(out)
279            if err is not None:
280                trace_instance_output(idx, err)
281                output[idx].append(err)
282                error = True
283
284        if error or skip:
285            break
286
287    if not error and not skip:
288        # Capture output and wait for all instances to finish running
289        last_read_time = [time.time() for _ in range(num_instances)]
290        while True:
291            num_running = 0
292            num_output = 0
293            for idx in range(num_instances):
294                instance = instances[idx]
295                if instance.is_finished():
296                    continue
297                num_running += 1
298                out, err = instance.readline()
299                if out is None and err is None:
300                    if time.time() > last_read_time[idx] + INSTANCE_READ_TIMEOUT_S:
301                        output[idx].append("TIMEOUT")
302                        error = True
303                    continue
304                num_output += 1
305                last_read_time[idx] = time.time()
306                if out is not None and not any(m in out for m in IGNORE_OUTPUT_MATCHES):
307                    trace_instance_output(idx, out)
308                    output[idx].append(out)
309                if err is not None:
310                    trace_instance_output(idx, err)
311                    output[idx].append(err)
312                    error = True
313
314            if not num_output:
315                time.sleep(0.1)
316            if not num_running or error:
317                break
318
319    # Stop all instances
320    for idx in range(num_instances):
321        instances[idx].stop()
322
323    output_str = ""
324    for idx, lines in enumerate(output):
325        output_str += "--- instance{} ---\n".format(idx)
326        output_str += "\n".join(lines) + "\n"
327
328    return error, skip, output_str
329
330
331def print_diff(a, b):
332    a_fd, a_path = tempfile.mkstemp(text=True)
333    b_fd, b_path = tempfile.mkstemp(text=True)
334    os.write(a_fd, a.encode())
335    os.write(b_fd, b.encode())
336    os.close(a_fd)
337    os.close(b_fd)
338    subprocess.run(DIFF.split(" ") + [a_path, b_path])
339    os.unlink(a_path)
340    os.unlink(b_path)
341
342
343def run_tests(test_files, instances_truth, instances_test):
344    skipped_tests = []
345    passed_tests = []
346    failed_tests = []
347
348    for test_file, num_instances in test_files:
349        instances_str = "|".join(str(instances_test[i]) for i in range(num_instances))
350        print("{} on {}: ".format(test_file, instances_str), end="")
351        if cmd_args.show_output or cmd_args.trace_output:
352            print()
353        sys.stdout.flush()
354
355        # Run test on test instances
356        error, skip, output_test = run_test_on_instances(test_file, num_instances, instances_test)
357
358        if not skip:
359            # Check if truth exists in a file, and read it in
360            test_file_expected = test_file + ".exp"
361            if os.path.isfile(test_file_expected):
362                with open(test_file_expected) as f:
363                    output_truth = f.read()
364            else:
365                # Run test on truth instances to get expected output
366                _, _, output_truth = run_test_on_instances(
367                    test_file, num_instances, instances_truth
368                )
369
370        if cmd_args.show_output:
371            print("### TEST ###")
372            print(output_test, end="")
373            if not skip:
374                print("### TRUTH ###")
375                print(output_truth, end="")
376
377        # Print result of test
378        if skip:
379            print("skip")
380            skipped_tests.append(test_file)
381        elif output_test == output_truth:
382            print("pass")
383            passed_tests.append(test_file)
384        else:
385            print("FAIL")
386            failed_tests.append(test_file)
387            if not cmd_args.show_output:
388                print("### TEST ###")
389                print(output_test, end="")
390                print("### TRUTH ###")
391                print(output_truth, end="")
392                print("### DIFF ###")
393                print_diff(output_truth, output_test)
394
395        if cmd_args.show_output:
396            print()
397
398    print("{} tests performed".format(len(skipped_tests) + len(passed_tests) + len(failed_tests)))
399    print("{} tests passed".format(len(passed_tests)))
400
401    if skipped_tests:
402        print("{} tests skipped: {}".format(len(skipped_tests), " ".join(skipped_tests)))
403    if failed_tests:
404        print("{} tests failed: {}".format(len(failed_tests), " ".join(failed_tests)))
405
406    return not failed_tests
407
408
409def main():
410    global cmd_args
411
412    cmd_parser = argparse.ArgumentParser(description="Run network tests for MicroPython")
413    cmd_parser.add_argument(
414        "-s", "--show-output", action="store_true", help="show test output after running"
415    )
416    cmd_parser.add_argument(
417        "-t", "--trace-output", action="store_true", help="trace test output while running"
418    )
419    cmd_parser.add_argument(
420        "-i", "--instance", action="append", default=[], help="instance(s) to run the tests on"
421    )
422    cmd_parser.add_argument(
423        "-p",
424        "--permutations",
425        type=int,
426        default=1,
427        help="repeat the test with this many permutations of the instance order",
428    )
429    cmd_parser.add_argument("files", nargs="+", help="input test files")
430    cmd_args = cmd_parser.parse_args()
431
432    # clear search path to make sure tests use only builtin modules and those in extmod
433    os.environ["MICROPYPATH"] = os.pathsep + "../extmod"
434
435    test_files = prepare_test_file_list(cmd_args.files)
436    max_instances = max(t[1] for t in test_files)
437
438    instances_truth = [PyInstanceSubProcess([PYTHON_TRUTH]) for _ in range(max_instances)]
439
440    instances_test = []
441    for i in cmd_args.instance:
442        # Each instance arg is <cmd>,ENV=VAR,ENV=VAR...
443        i = i.split(",")
444        cmd = i[0]
445        env = i[1:]
446        if cmd.startswith("exec:"):
447            instances_test.append(PyInstanceSubProcess([cmd[len("exec:") :]], env))
448        elif cmd == "micropython":
449            instances_test.append(PyInstanceSubProcess([MICROPYTHON], env))
450        elif cmd == "cpython":
451            instances_test.append(PyInstanceSubProcess([CPYTHON3], env))
452        elif cmd.startswith("pyb:"):
453            instances_test.append(PyInstancePyboard(cmd[len("pyb:") :]))
454        else:
455            print("unknown instance string: {}".format(cmd), file=sys.stderr)
456            sys.exit(1)
457
458    for _ in range(max_instances - len(instances_test)):
459        instances_test.append(PyInstanceSubProcess([MICROPYTHON]))
460
461    all_pass = True
462    try:
463        for i, instances_test_permutation in enumerate(itertools.permutations(instances_test)):
464            if i >= cmd_args.permutations:
465                break
466
467            all_pass &= run_tests(test_files, instances_truth, instances_test_permutation)
468
469    finally:
470        for i in instances_truth:
471            i.close()
472        for i in instances_test:
473            i.close()
474
475    if not all_pass:
476        sys.exit(1)
477
478
479if __name__ == "__main__":
480    main()
481