1#!/usr/bin/env python3
2
3# This file is part of the MicroPython project, http://micropython.org/
4# The MIT License (MIT)
5# Copyright (c) 2019 Damien P. George
6
7import os
8import subprocess
9import sys
10import argparse
11
12sys.path.append("../tools")
13import pyboard
14
15# Paths for host executables
16CPYTHON3 = os.getenv("MICROPY_CPYTHON3", "python3")
17MICROPYTHON = os.getenv("MICROPY_MICROPYTHON", "../ports/unix/micropython-coverage")
18
19NATMOD_EXAMPLE_DIR = "../examples/natmod/"
20
21# Supported tests and their corresponding mpy module
22TEST_MAPPINGS = {
23    "btree": "btree/btree_$(ARCH).mpy",
24    "framebuf": "framebuf/framebuf_$(ARCH).mpy",
25    "uheapq": "uheapq/uheapq_$(ARCH).mpy",
26    "urandom": "urandom/urandom_$(ARCH).mpy",
27    "ure": "ure/ure_$(ARCH).mpy",
28    "uzlib": "uzlib/uzlib_$(ARCH).mpy",
29}
30
31# Code to allow a target MicroPython to import an .mpy from RAM
32injected_import_hook_code = """\
33import usys, uos, uio
34class __File(uio.IOBase):
35  def __init__(self):
36    self.off = 0
37  def ioctl(self, request, arg):
38    return 0
39  def readinto(self, buf):
40    buf[:] = memoryview(__buf)[self.off:self.off + len(buf)]
41    self.off += len(buf)
42    return len(buf)
43class __FS:
44  def mount(self, readonly, mkfs):
45    pass
46  def chdir(self, path):
47    pass
48  def stat(self, path):
49    if path == '__injected.mpy':
50      return tuple(0 for _ in range(10))
51    else:
52      raise OSError(-2) # ENOENT
53  def open(self, path, mode):
54    return __File()
55uos.mount(__FS(), '/__remote')
56uos.chdir('/__remote')
57usys.modules['{}'] = __import__('__injected')
58"""
59
60
61class TargetSubprocess:
62    def __init__(self, cmd):
63        self.cmd = cmd
64
65    def close(self):
66        pass
67
68    def run_script(self, script):
69        try:
70            p = subprocess.run(
71                self.cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, input=script
72            )
73            return p.stdout, None
74        except subprocess.CalledProcessError as er:
75            return b"", er
76
77
78class TargetPyboard:
79    def __init__(self, pyb):
80        self.pyb = pyb
81        self.pyb.enter_raw_repl()
82
83    def close(self):
84        self.pyb.exit_raw_repl()
85        self.pyb.close()
86
87    def run_script(self, script):
88        try:
89            self.pyb.enter_raw_repl()
90            output = self.pyb.exec_(script)
91            output = output.replace(b"\r\n", b"\n")
92            return output, None
93        except pyboard.PyboardError as er:
94            return b"", er
95
96
97def run_tests(target_truth, target, args, stats):
98    for test_file in args.files:
99        # Find supported test
100        for k, v in TEST_MAPPINGS.items():
101            if test_file.find(k) != -1:
102                test_module = k
103                test_mpy = v.replace("$(ARCH)", args.arch)
104                break
105        else:
106            print("----  {} - no matching mpy".format(test_file))
107            continue
108
109        # Read test script
110        with open(test_file, "rb") as f:
111            test_file_data = f.read()
112
113        # Create full test with embedded .mpy
114        try:
115            with open(NATMOD_EXAMPLE_DIR + test_mpy, "rb") as f:
116                test_script = b"__buf=" + bytes(repr(f.read()), "ascii") + b"\n"
117        except OSError:
118            print("----  {} - mpy file not compiled".format(test_file))
119            continue
120        test_script += bytes(injected_import_hook_code.format(test_module), "ascii")
121        test_script += test_file_data
122
123        # Run test under MicroPython
124        result_out, error = target.run_script(test_script)
125
126        # Work out result of test
127        extra = ""
128        if error is None and result_out == b"SKIP\n":
129            result = "SKIP"
130        elif error is not None:
131            result = "FAIL"
132            extra = " - " + str(error)
133        else:
134            # Check result against truth
135            try:
136                with open(test_file + ".exp", "rb") as f:
137                    result_exp = f.read()
138                error = None
139            except OSError:
140                result_exp, error = target_truth.run_script(test_file_data)
141            if error is not None:
142                result = "TRUTH FAIL"
143            elif result_out != result_exp:
144                result = "FAIL"
145                print(result_out)
146            else:
147                result = "pass"
148
149        # Accumulate statistics
150        stats["total"] += 1
151        if result == "pass":
152            stats["pass"] += 1
153        elif result == "SKIP":
154            stats["skip"] += 1
155        else:
156            stats["fail"] += 1
157
158        # Print result
159        print("{:4}  {}{}".format(result, test_file, extra))
160
161
162def main():
163    cmd_parser = argparse.ArgumentParser(
164        description="Run dynamic-native-module tests under MicroPython"
165    )
166    cmd_parser.add_argument(
167        "-p", "--pyboard", action="store_true", help="run tests via pyboard.py"
168    )
169    cmd_parser.add_argument(
170        "-d", "--device", default="/dev/ttyACM0", help="the device for pyboard.py"
171    )
172    cmd_parser.add_argument(
173        "-a", "--arch", default="x64", help="native architecture of the target"
174    )
175    cmd_parser.add_argument("files", nargs="*", help="input test files")
176    args = cmd_parser.parse_args()
177
178    target_truth = TargetSubprocess([CPYTHON3])
179
180    if args.pyboard:
181        target = TargetPyboard(pyboard.Pyboard(args.device))
182    else:
183        target = TargetSubprocess([MICROPYTHON])
184
185    stats = {"total": 0, "pass": 0, "fail": 0, "skip": 0}
186    run_tests(target_truth, target, args, stats)
187
188    target.close()
189    target_truth.close()
190
191    print("{} tests performed".format(stats["total"]))
192    print("{} tests passed".format(stats["pass"]))
193    if stats["fail"]:
194        print("{} tests failed".format(stats["fail"]))
195    if stats["skip"]:
196        print("{} tests skipped".format(stats["skip"]))
197
198    if stats["fail"]:
199        sys.exit(1)
200
201
202if __name__ == "__main__":
203    main()
204