1# SPDX-License-Identifier: GPL-2.0+
2#
3# Copyright (c) 2016 Google, Inc
4#
5
6import doctest
7import glob
8import multiprocessing
9import os
10import re
11import sys
12import unittest
13
14from u_boot_pylib import command
15from u_boot_pylib import terminal
16
17use_concurrent = True
18try:
19    from concurrencytest import ConcurrentTestSuite
20    from concurrencytest import fork_for_tests
21except:
22    use_concurrent = False
23
24
25def run_test_coverage(prog, filter_fname, exclude_list, build_dir,
26                      required=None, extra_args=None, single_thread='-P1',
27                      args=None, allow_failures=None):
28    """Run tests and check that we get 100% coverage
29
30    Args:
31        prog: Program to run (with be passed a '-t' argument to run tests
32        filter_fname: Normally all *.py files in the program's directory will
33            be included. If this is not None, then it is used to filter the
34            list so that only filenames that don't contain filter_fname are
35            included.
36        exclude_list: List of file patterns to exclude from the coverage
37            calculation
38        build_dir: Build directory, used to locate libfdt.py
39        required: List of modules which must be in the coverage report
40        extra_args (str): Extra arguments to pass to the tool before the -t/test
41            arg
42        single_thread (str): Argument string to make the tests run
43            single-threaded. This is necessary to get proper coverage results.
44            The default is '-P0'
45        args (list of str): List of tests to run, or None to run all
46
47    Raises:
48        ValueError if the code coverage is not 100%
49    """
50    # This uses the build output from sandbox_spl to get _libfdt.so
51    path = os.path.dirname(prog)
52    if filter_fname:
53        glob_list = glob.glob(os.path.join(path, '*.py'))
54        glob_list = [fname for fname in glob_list if filter_fname in fname]
55    else:
56        glob_list = []
57    glob_list += exclude_list
58    glob_list += ['*libfdt.py', '*/site-packages/*', '*/dist-packages/*']
59    glob_list += ['*concurrencytest*']
60    test_cmd = 'test' if 'binman' in prog or 'patman' in prog else '-t'
61    prefix = ''
62    if build_dir:
63        prefix = 'PYTHONPATH=$PYTHONPATH:%s/sandbox_spl/tools ' % build_dir
64
65    # Detect a Python sandbox and use 'coverage' instead
66    covtool = ('python3-coverage' if sys.prefix == sys.base_prefix else
67               'coverage')
68
69    cmd = ('%s%s run '
70           '--omit "%s" %s %s %s %s %s' % (prefix, covtool, ','.join(glob_list),
71                                           prog, extra_args or '', test_cmd,
72                                           single_thread or '-P1',
73                                           ' '.join(args) if args else ''))
74    os.system(cmd)
75    stdout = command.output(covtool, 'report')
76    lines = stdout.splitlines()
77    if required:
78        # Convert '/path/to/name.py' just the module name 'name'
79        test_set = set([os.path.splitext(os.path.basename(line.split()[0]))[0]
80                        for line in lines if '/etype/' in line])
81        missing_list = required
82        missing_list.discard('__init__')
83        missing_list.difference_update(test_set)
84        if missing_list:
85            print('Missing tests for %s' % (', '.join(missing_list)))
86            print(stdout)
87            ok = False
88
89    coverage = lines[-1].split(' ')[-1]
90    ok = True
91    print(coverage)
92    if coverage != '100%':
93        print(stdout)
94        print("To get a report in 'htmlcov/index.html', type: python3-coverage html")
95        print('Coverage error: %s, but should be 100%%' % coverage)
96        ok = False
97    if not ok:
98        if allow_failures:
99            # for line in lines:
100                # print('.', line, re.match(r'^(tools/.*py) *\d+ *(\d+) *(\d+)%$', line))
101            lines = [re.match(r'^(tools/.*py) *\d+ *(\d+) *\d+%$', line)
102                     for line in stdout.splitlines()]
103            bad = []
104            for mat in lines:
105                if mat and mat.group(2) != '0':
106                    fname = mat.group(1)
107                    if fname not in allow_failures:
108                        bad.append(fname)
109            if not bad:
110                return
111        raise ValueError('Test coverage failure')
112
113
114class FullTextTestResult(unittest.TextTestResult):
115    """A test result class that can print extended text results to a stream
116
117    This is meant to be used by a TestRunner as a result class. Like
118    TextTestResult, this prints out the names of tests as they are run,
119    errors as they occur, and a summary of the results at the end of the
120    test run. Beyond those, this prints information about skipped tests,
121    expected failures and unexpected successes.
122
123    Args:
124        stream: A file-like object to write results to
125        descriptions (bool): True to print descriptions with test names
126        verbosity (int): Detail of printed output per test as they run
127            Test stdout and stderr always get printed when buffering
128            them is disabled by the test runner. In addition to that,
129            0: Print nothing
130            1: Print a dot per test
131            2: Print test names
132    """
133    def __init__(self, stream, descriptions, verbosity):
134        self.verbosity = verbosity
135        super().__init__(stream, descriptions, verbosity)
136
137    def printErrors(self):
138        "Called by TestRunner after test run to summarize the tests"
139        # The parent class doesn't keep unexpected successes in the same
140        # format as the rest. Adapt it to what printErrorList expects.
141        unexpected_successes = [
142            (test, 'Test was expected to fail, but succeeded.\n')
143            for test in self.unexpectedSuccesses
144        ]
145
146        super().printErrors()  # FAIL and ERROR
147        self.printErrorList('SKIP', self.skipped)
148        self.printErrorList('XFAIL', self.expectedFailures)
149        self.printErrorList('XPASS', unexpected_successes)
150
151    def addSkip(self, test, reason):
152        """Called when a test is skipped."""
153        # Add empty line to keep spacing consistent with other results
154        if not reason.endswith('\n'):
155            reason += '\n'
156        super().addSkip(test, reason)
157
158
159def run_test_suites(toolname, debug, verbosity, no_capture, test_preserve_dirs,
160                    processes, test_name, toolpath, class_and_module_list):
161    """Run a series of test suites and collect the results
162
163    Args:
164        toolname: Name of the tool that ran the tests
165        debug: True to enable debugging, which shows a full stack trace on error
166        verbosity: Verbosity level to use (0-4)
167        test_preserve_dirs: True to preserve the input directory used by tests
168            so that it can be examined afterwards (only useful for debugging
169            tests). If a single test is selected (in args[0]) it also preserves
170            the output directory for this test. Both directories are displayed
171            on the command line.
172        processes: Number of processes to use to run tests (None=same as #CPUs)
173        test_name: Name of test to run, or None for all
174        toolpath: List of paths to use for tools
175        class_and_module_list: List of test classes (type class) and module
176           names (type str) to run
177    """
178    sys.argv = [sys.argv[0]]
179    if debug:
180        sys.argv.append('-D')
181    if verbosity:
182        sys.argv.append('-v%d' % verbosity)
183    if no_capture:
184        sys.argv.append('-N')
185        terminal.USE_CAPTURE = False
186    if toolpath:
187        for path in toolpath:
188            sys.argv += ['--toolpath', path]
189
190    suite = unittest.TestSuite()
191    loader = unittest.TestLoader()
192    runner = unittest.TextTestRunner(
193        stream=sys.stdout,
194        verbosity=(1 if verbosity is None else verbosity),
195        resultclass=FullTextTestResult,
196    )
197
198    if use_concurrent and processes != 1 and not test_name:
199        suite = ConcurrentTestSuite(suite,
200                fork_for_tests(processes or multiprocessing.cpu_count()))
201
202    for module in class_and_module_list:
203        if isinstance(module, str) and (not test_name or test_name == module):
204            suite.addTests(doctest.DocTestSuite(module))
205
206    for module in class_and_module_list:
207        if isinstance(module, str):
208            continue
209        # Test the test module about our arguments, if it is interested
210        if hasattr(module, 'setup_test_args'):
211            setup_test_args = getattr(module, 'setup_test_args')
212            setup_test_args(preserve_indir=test_preserve_dirs,
213                preserve_outdirs=test_preserve_dirs and test_name is not None,
214                toolpath=toolpath, verbosity=verbosity, no_capture=no_capture)
215        if test_name:
216            # Since Python v3.5 If an ImportError or AttributeError occurs
217            # while traversing a name then a synthetic test that raises that
218            # error when run will be returned. Check that the requested test
219            # exists, otherwise these errors are included in the results.
220            if test_name in loader.getTestCaseNames(module):
221                suite.addTests(loader.loadTestsFromName(test_name, module))
222        else:
223            suite.addTests(loader.loadTestsFromTestCase(module))
224
225    print(f" Running {toolname} tests ".center(70, "="))
226    result = runner.run(suite)
227    print()
228
229    return result
230