1# SPDX-License-Identifier: GPL-2.0
2# Copyright (c) 2015 Stephen Warren
3# Copyright (c) 2015-2016, NVIDIA CORPORATION. All rights reserved.
4
5# Implementation of pytest run-time hook functions. These are invoked by
6# pytest at certain points during operation, e.g. startup, for each executed
7# test, at shutdown etc. These hooks perform functions such as:
8# - Parsing custom command-line options.
9# - Pullilng in user-specified board configuration.
10# - Creating the U-Boot console test fixture.
11# - Creating the HTML log file.
12# - Monitoring each test's results.
13# - Implementing custom pytest markers.
14
15import atexit
16import configparser
17import errno
18import filelock
19import io
20import os
21import os.path
22from pathlib import Path
23import pytest
24import re
25from _pytest.runner import runtestprotocol
26import sys
27
28# Globals: The HTML log file, and the connection to the U-Boot console.
29log = None
30console = None
31
32TEST_PY_DIR = os.path.dirname(os.path.abspath(__file__))
33
34def mkdir_p(path):
35    """Create a directory path.
36
37    This includes creating any intermediate/parent directories. Any errors
38    caused due to already extant directories are ignored.
39
40    Args:
41        path: The directory path to create.
42
43    Returns:
44        Nothing.
45    """
46
47    try:
48        os.makedirs(path)
49    except OSError as exc:
50        if exc.errno == errno.EEXIST and os.path.isdir(path):
51            pass
52        else:
53            raise
54
55def pytest_addoption(parser):
56    """pytest hook: Add custom command-line options to the cmdline parser.
57
58    Args:
59        parser: The pytest command-line parser.
60
61    Returns:
62        Nothing.
63    """
64
65    parser.addoption('--build-dir', default=None,
66        help='U-Boot build directory (O=)')
67    parser.addoption('--result-dir', default=None,
68        help='U-Boot test result/tmp directory')
69    parser.addoption('--persistent-data-dir', default=None,
70        help='U-Boot test persistent generated data directory')
71    parser.addoption('--board-type', '--bd', '-B', default='sandbox',
72        help='U-Boot board type')
73    parser.addoption('--board-identity', '--id', default='na',
74        help='U-Boot board identity/instance')
75    parser.addoption('--build', default=False, action='store_true',
76        help='Compile U-Boot before running tests')
77    parser.addoption('--buildman', default=False, action='store_true',
78        help='Use buildman to build U-Boot (assuming --build is given)')
79    parser.addoption('--gdbserver', default=None,
80        help='Run sandbox under gdbserver. The argument is the channel '+
81        'over which gdbserver should communicate, e.g. localhost:1234')
82
83def run_build(config, source_dir, build_dir, board_type, log):
84    """run_build: Build U-Boot
85
86    Args:
87        config: The pytest configuration.
88        soruce_dir (str): Directory containing source code
89        build_dir (str): Directory to build in
90        board_type (str): board_type parameter (e.g. 'sandbox')
91        log (Logfile): Log file to use
92    """
93    if config.getoption('buildman'):
94        if build_dir != source_dir:
95            dest_args = ['-o', build_dir, '-w']
96        else:
97            dest_args = ['-i']
98        cmds = (['buildman', '--board', board_type] + dest_args,)
99        name = 'buildman'
100    else:
101        if build_dir != source_dir:
102            o_opt = 'O=%s' % build_dir
103        else:
104            o_opt = ''
105        cmds = (
106            ['make', o_opt, '-s', board_type + '_defconfig'],
107            ['make', o_opt, '-s', '-j{}'.format(os.cpu_count())],
108        )
109        name = 'make'
110
111    with log.section(name):
112        runner = log.get_runner(name, sys.stdout)
113        for cmd in cmds:
114            runner.run(cmd, cwd=source_dir)
115        runner.close()
116        log.status_pass('OK')
117
118def pytest_xdist_setupnodes(config, specs):
119    """Clear out any 'done' file from a previous build"""
120    global build_done_file
121    build_dir = config.getoption('build_dir')
122    board_type = config.getoption('board_type')
123    source_dir = os.path.dirname(os.path.dirname(TEST_PY_DIR))
124    if not build_dir:
125        build_dir = source_dir + '/build-' + board_type
126    build_done_file = Path(build_dir) / 'build.done'
127    if build_done_file.exists():
128        os.remove(build_done_file)
129
130def pytest_configure(config):
131    """pytest hook: Perform custom initialization at startup time.
132
133    Args:
134        config: The pytest configuration.
135
136    Returns:
137        Nothing.
138    """
139    def parse_config(conf_file):
140        """Parse a config file, loading it into the ubconfig container
141
142        Args:
143            conf_file: Filename to load (within build_dir)
144
145        Raises
146            Exception if the file does not exist
147        """
148        dot_config = build_dir + '/' + conf_file
149        if not os.path.exists(dot_config):
150            raise Exception(conf_file + ' does not exist; ' +
151                            'try passing --build option?')
152
153        with open(dot_config, 'rt') as f:
154            ini_str = '[root]\n' + f.read()
155            ini_sio = io.StringIO(ini_str)
156            parser = configparser.RawConfigParser()
157            parser.read_file(ini_sio)
158            ubconfig.buildconfig.update(parser.items('root'))
159
160    global log
161    global console
162    global ubconfig
163
164    source_dir = os.path.dirname(os.path.dirname(TEST_PY_DIR))
165
166    board_type = config.getoption('board_type')
167    board_type_filename = board_type.replace('-', '_')
168
169    board_identity = config.getoption('board_identity')
170    board_identity_filename = board_identity.replace('-', '_')
171
172    build_dir = config.getoption('build_dir')
173    if not build_dir:
174        build_dir = source_dir + '/build-' + board_type
175    mkdir_p(build_dir)
176
177    result_dir = config.getoption('result_dir')
178    if not result_dir:
179        result_dir = build_dir
180    mkdir_p(result_dir)
181
182    persistent_data_dir = config.getoption('persistent_data_dir')
183    if not persistent_data_dir:
184        persistent_data_dir = build_dir + '/persistent-data'
185    mkdir_p(persistent_data_dir)
186
187    gdbserver = config.getoption('gdbserver')
188    if gdbserver and not board_type.startswith('sandbox'):
189        raise Exception('--gdbserver only supported with sandbox targets')
190
191    import multiplexed_log
192    log = multiplexed_log.Logfile(result_dir + '/test-log.html')
193
194    if config.getoption('build'):
195        worker_id = os.environ.get("PYTEST_XDIST_WORKER")
196        with filelock.FileLock(os.path.join(build_dir, 'build.lock')):
197            build_done_file = Path(build_dir) / 'build.done'
198            if (not worker_id or worker_id == 'master' or
199                not build_done_file.exists()):
200                run_build(config, source_dir, build_dir, board_type, log)
201                build_done_file.touch()
202
203    class ArbitraryAttributeContainer(object):
204        pass
205
206    ubconfig = ArbitraryAttributeContainer()
207    ubconfig.brd = dict()
208    ubconfig.env = dict()
209
210    modules = [
211        (ubconfig.brd, 'u_boot_board_' + board_type_filename),
212        (ubconfig.env, 'u_boot_boardenv_' + board_type_filename),
213        (ubconfig.env, 'u_boot_boardenv_' + board_type_filename + '_' +
214            board_identity_filename),
215    ]
216    for (dict_to_fill, module_name) in modules:
217        try:
218            module = __import__(module_name)
219        except ImportError:
220            continue
221        dict_to_fill.update(module.__dict__)
222
223    ubconfig.buildconfig = dict()
224
225    # buildman -k puts autoconf.mk in the rootdir, so handle this as well
226    # as the standard U-Boot build which leaves it in include/autoconf.mk
227    parse_config('.config')
228    if os.path.exists(build_dir + '/' + 'autoconf.mk'):
229        parse_config('autoconf.mk')
230    else:
231        parse_config('include/autoconf.mk')
232
233    ubconfig.test_py_dir = TEST_PY_DIR
234    ubconfig.source_dir = source_dir
235    ubconfig.build_dir = build_dir
236    ubconfig.result_dir = result_dir
237    ubconfig.persistent_data_dir = persistent_data_dir
238    ubconfig.board_type = board_type
239    ubconfig.board_identity = board_identity
240    ubconfig.gdbserver = gdbserver
241    ubconfig.dtb = build_dir + '/arch/sandbox/dts/test.dtb'
242
243    env_vars = (
244        'board_type',
245        'board_identity',
246        'source_dir',
247        'test_py_dir',
248        'build_dir',
249        'result_dir',
250        'persistent_data_dir',
251    )
252    for v in env_vars:
253        os.environ['U_BOOT_' + v.upper()] = getattr(ubconfig, v)
254
255    if board_type.startswith('sandbox'):
256        import u_boot_console_sandbox
257        console = u_boot_console_sandbox.ConsoleSandbox(log, ubconfig)
258    else:
259        import u_boot_console_exec_attach
260        console = u_boot_console_exec_attach.ConsoleExecAttach(log, ubconfig)
261
262re_ut_test_list = re.compile(r'[^a-zA-Z0-9_]_u_boot_list_2_ut_(.*)_test_2_(.*)\s*$')
263def generate_ut_subtest(metafunc, fixture_name, sym_path):
264    """Provide parametrization for a ut_subtest fixture.
265
266    Determines the set of unit tests built into a U-Boot binary by parsing the
267    list of symbols generated by the build process. Provides this information
268    to test functions by parameterizing their ut_subtest fixture parameter.
269
270    Args:
271        metafunc: The pytest test function.
272        fixture_name: The fixture name to test.
273        sym_path: Relative path to the symbol file with preceding '/'
274            (e.g. '/u-boot.sym')
275
276    Returns:
277        Nothing.
278    """
279    fn = console.config.build_dir + sym_path
280    try:
281        with open(fn, 'rt') as f:
282            lines = f.readlines()
283    except:
284        lines = []
285    lines.sort()
286
287    vals = []
288    for l in lines:
289        m = re_ut_test_list.search(l)
290        if not m:
291            continue
292        suite, name = m.groups()
293
294        # Tests marked with _norun should only be run manually using 'ut -f'
295        if name.endswith('_norun'):
296            continue
297
298        vals.append(f'{suite} {name}')
299
300    ids = ['ut_' + s.replace(' ', '_') for s in vals]
301    metafunc.parametrize(fixture_name, vals, ids=ids)
302
303def generate_config(metafunc, fixture_name):
304    """Provide parametrization for {env,brd}__ fixtures.
305
306    If a test function takes parameter(s) (fixture names) of the form brd__xxx
307    or env__xxx, the brd and env configuration dictionaries are consulted to
308    find the list of values to use for those parameters, and the test is
309    parametrized so that it runs once for each combination of values.
310
311    Args:
312        metafunc: The pytest test function.
313        fixture_name: The fixture name to test.
314
315    Returns:
316        Nothing.
317    """
318
319    subconfigs = {
320        'brd': console.config.brd,
321        'env': console.config.env,
322    }
323    parts = fixture_name.split('__')
324    if len(parts) < 2:
325        return
326    if parts[0] not in subconfigs:
327        return
328    subconfig = subconfigs[parts[0]]
329    vals = []
330    val = subconfig.get(fixture_name, [])
331    # If that exact name is a key in the data source:
332    if val:
333        # ... use the dict value as a single parameter value.
334        vals = (val, )
335    else:
336        # ... otherwise, see if there's a key that contains a list of
337        # values to use instead.
338        vals = subconfig.get(fixture_name+ 's', [])
339    def fixture_id(index, val):
340        try:
341            return val['fixture_id']
342        except:
343            return fixture_name + str(index)
344    ids = [fixture_id(index, val) for (index, val) in enumerate(vals)]
345    metafunc.parametrize(fixture_name, vals, ids=ids)
346
347def pytest_generate_tests(metafunc):
348    """pytest hook: parameterize test functions based on custom rules.
349
350    Check each test function parameter (fixture name) to see if it is one of
351    our custom names, and if so, provide the correct parametrization for that
352    parameter.
353
354    Args:
355        metafunc: The pytest test function.
356
357    Returns:
358        Nothing.
359    """
360    for fn in metafunc.fixturenames:
361        if fn == 'ut_subtest':
362            generate_ut_subtest(metafunc, fn, '/u-boot.sym')
363            continue
364        m_subtest = re.match('ut_(.)pl_subtest', fn)
365        if m_subtest:
366            spl_name = m_subtest.group(1)
367            generate_ut_subtest(
368                metafunc, fn, f'/{spl_name}pl/u-boot-{spl_name}pl.sym')
369            continue
370        generate_config(metafunc, fn)
371
372@pytest.fixture(scope='session')
373def u_boot_log(request):
374     """Generate the value of a test's log fixture.
375
376     Args:
377         request: The pytest request.
378
379     Returns:
380         The fixture value.
381     """
382
383     return console.log
384
385@pytest.fixture(scope='session')
386def u_boot_config(request):
387     """Generate the value of a test's u_boot_config fixture.
388
389     Args:
390         request: The pytest request.
391
392     Returns:
393         The fixture value.
394     """
395
396     return console.config
397
398@pytest.fixture(scope='function')
399def u_boot_console(request):
400    """Generate the value of a test's u_boot_console fixture.
401
402    Args:
403        request: The pytest request.
404
405    Returns:
406        The fixture value.
407    """
408
409    console.ensure_spawned()
410    return console
411
412anchors = {}
413tests_not_run = []
414tests_failed = []
415tests_xpassed = []
416tests_xfailed = []
417tests_skipped = []
418tests_warning = []
419tests_passed = []
420
421def pytest_itemcollected(item):
422    """pytest hook: Called once for each test found during collection.
423
424    This enables our custom result analysis code to see the list of all tests
425    that should eventually be run.
426
427    Args:
428        item: The item that was collected.
429
430    Returns:
431        Nothing.
432    """
433
434    tests_not_run.append(item.name)
435
436def cleanup():
437    """Clean up all global state.
438
439    Executed (via atexit) once the entire test process is complete. This
440    includes logging the status of all tests, and the identity of any failed
441    or skipped tests.
442
443    Args:
444        None.
445
446    Returns:
447        Nothing.
448    """
449
450    if console:
451        console.close()
452    if log:
453        with log.section('Status Report', 'status_report'):
454            log.status_pass('%d passed' % len(tests_passed))
455            if tests_warning:
456                log.status_warning('%d passed with warning' % len(tests_warning))
457                for test in tests_warning:
458                    anchor = anchors.get(test, None)
459                    log.status_warning('... ' + test, anchor)
460            if tests_skipped:
461                log.status_skipped('%d skipped' % len(tests_skipped))
462                for test in tests_skipped:
463                    anchor = anchors.get(test, None)
464                    log.status_skipped('... ' + test, anchor)
465            if tests_xpassed:
466                log.status_xpass('%d xpass' % len(tests_xpassed))
467                for test in tests_xpassed:
468                    anchor = anchors.get(test, None)
469                    log.status_xpass('... ' + test, anchor)
470            if tests_xfailed:
471                log.status_xfail('%d xfail' % len(tests_xfailed))
472                for test in tests_xfailed:
473                    anchor = anchors.get(test, None)
474                    log.status_xfail('... ' + test, anchor)
475            if tests_failed:
476                log.status_fail('%d failed' % len(tests_failed))
477                for test in tests_failed:
478                    anchor = anchors.get(test, None)
479                    log.status_fail('... ' + test, anchor)
480            if tests_not_run:
481                log.status_fail('%d not run' % len(tests_not_run))
482                for test in tests_not_run:
483                    anchor = anchors.get(test, None)
484                    log.status_fail('... ' + test, anchor)
485        log.close()
486atexit.register(cleanup)
487
488def setup_boardspec(item):
489    """Process any 'boardspec' marker for a test.
490
491    Such a marker lists the set of board types that a test does/doesn't
492    support. If tests are being executed on an unsupported board, the test is
493    marked to be skipped.
494
495    Args:
496        item: The pytest test item.
497
498    Returns:
499        Nothing.
500    """
501
502    required_boards = []
503    for boards in item.iter_markers('boardspec'):
504        board = boards.args[0]
505        if board.startswith('!'):
506            if ubconfig.board_type == board[1:]:
507                pytest.skip('board "%s" not supported' % ubconfig.board_type)
508                return
509        else:
510            required_boards.append(board)
511    if required_boards and ubconfig.board_type not in required_boards:
512        pytest.skip('board "%s" not supported' % ubconfig.board_type)
513
514def setup_buildconfigspec(item):
515    """Process any 'buildconfigspec' marker for a test.
516
517    Such a marker lists some U-Boot configuration feature that the test
518    requires. If tests are being executed on an U-Boot build that doesn't
519    have the required feature, the test is marked to be skipped.
520
521    Args:
522        item: The pytest test item.
523
524    Returns:
525        Nothing.
526    """
527
528    for options in item.iter_markers('buildconfigspec'):
529        option = options.args[0]
530        if not ubconfig.buildconfig.get('config_' + option.lower(), None):
531            pytest.skip('.config feature "%s" not enabled' % option.lower())
532    for options in item.iter_markers('notbuildconfigspec'):
533        option = options.args[0]
534        if ubconfig.buildconfig.get('config_' + option.lower(), None):
535            pytest.skip('.config feature "%s" enabled' % option.lower())
536
537def tool_is_in_path(tool):
538    for path in os.environ["PATH"].split(os.pathsep):
539        fn = os.path.join(path, tool)
540        if os.path.isfile(fn) and os.access(fn, os.X_OK):
541            return True
542    return False
543
544def setup_requiredtool(item):
545    """Process any 'requiredtool' marker for a test.
546
547    Such a marker lists some external tool (binary, executable, application)
548    that the test requires. If tests are being executed on a system that
549    doesn't have the required tool, the test is marked to be skipped.
550
551    Args:
552        item: The pytest test item.
553
554    Returns:
555        Nothing.
556    """
557
558    for tools in item.iter_markers('requiredtool'):
559        tool = tools.args[0]
560        if not tool_is_in_path(tool):
561            pytest.skip('tool "%s" not in $PATH' % tool)
562
563def setup_singlethread(item):
564    """Process any 'singlethread' marker for a test.
565
566    Skip this test if running in parallel.
567
568    Args:
569        item: The pytest test item.
570
571    Returns:
572        Nothing.
573    """
574    for single in item.iter_markers('singlethread'):
575        worker_id = os.environ.get("PYTEST_XDIST_WORKER")
576        if worker_id and worker_id != 'master':
577            pytest.skip('must run single-threaded')
578
579def start_test_section(item):
580    anchors[item.name] = log.start_section(item.name)
581
582def pytest_runtest_setup(item):
583    """pytest hook: Configure (set up) a test item.
584
585    Called once for each test to perform any custom configuration. This hook
586    is used to skip the test if certain conditions apply.
587
588    Args:
589        item: The pytest test item.
590
591    Returns:
592        Nothing.
593    """
594
595    start_test_section(item)
596    setup_boardspec(item)
597    setup_buildconfigspec(item)
598    setup_requiredtool(item)
599    setup_singlethread(item)
600
601def pytest_runtest_protocol(item, nextitem):
602    """pytest hook: Called to execute a test.
603
604    This hook wraps the standard pytest runtestprotocol() function in order
605    to acquire visibility into, and record, each test function's result.
606
607    Args:
608        item: The pytest test item to execute.
609        nextitem: The pytest test item that will be executed after this one.
610
611    Returns:
612        A list of pytest reports (test result data).
613    """
614
615    log.get_and_reset_warning()
616    ihook = item.ihook
617    ihook.pytest_runtest_logstart(nodeid=item.nodeid, location=item.location)
618    reports = runtestprotocol(item, nextitem=nextitem)
619    ihook.pytest_runtest_logfinish(nodeid=item.nodeid, location=item.location)
620    was_warning = log.get_and_reset_warning()
621
622    # In pytest 3, runtestprotocol() may not call pytest_runtest_setup() if
623    # the test is skipped. That call is required to create the test's section
624    # in the log file. The call to log.end_section() requires that the log
625    # contain a section for this test. Create a section for the test if it
626    # doesn't already exist.
627    if not item.name in anchors:
628        start_test_section(item)
629
630    failure_cleanup = False
631    if not was_warning:
632        test_list = tests_passed
633        msg = 'OK'
634        msg_log = log.status_pass
635    else:
636        test_list = tests_warning
637        msg = 'OK (with warning)'
638        msg_log = log.status_warning
639    for report in reports:
640        if report.outcome == 'failed':
641            if hasattr(report, 'wasxfail'):
642                test_list = tests_xpassed
643                msg = 'XPASSED'
644                msg_log = log.status_xpass
645            else:
646                failure_cleanup = True
647                test_list = tests_failed
648                msg = 'FAILED:\n' + str(report.longrepr)
649                msg_log = log.status_fail
650            break
651        if report.outcome == 'skipped':
652            if hasattr(report, 'wasxfail'):
653                failure_cleanup = True
654                test_list = tests_xfailed
655                msg = 'XFAILED:\n' + str(report.longrepr)
656                msg_log = log.status_xfail
657                break
658            test_list = tests_skipped
659            msg = 'SKIPPED:\n' + str(report.longrepr)
660            msg_log = log.status_skipped
661
662    if failure_cleanup:
663        console.drain_console()
664
665    test_list.append(item.name)
666    tests_not_run.remove(item.name)
667
668    try:
669        msg_log(msg)
670    except:
671        # If something went wrong with logging, it's better to let the test
672        # process continue, which may report other exceptions that triggered
673        # the logging issue (e.g. console.log wasn't created). Hence, just
674        # squash the exception. If the test setup failed due to e.g. syntax
675        # error somewhere else, this won't be seen. However, once that issue
676        # is fixed, if this exception still exists, it will then be logged as
677        # part of the test's stdout.
678        import traceback
679        print('Exception occurred while logging runtest status:')
680        traceback.print_exc()
681        # FIXME: Can we force a test failure here?
682
683    log.end_section(item.name)
684
685    if failure_cleanup:
686        console.cleanup_spawn()
687
688    return True
689