1#!/usr/bin/env python3
2# vim: set syntax=python ts=4 :
3#
4# Copyright (c) 2018-2024 Intel Corporation
5# Copyright (c) 2024 Arm Limited (or its affiliates). All rights reserved.
6#
7# SPDX-License-Identifier: Apache-2.0
8import collections
9import copy
10import itertools
11import json
12import logging
13import os
14import random
15import re
16import subprocess
17import sys
18from argparse import Namespace
19from collections import OrderedDict
20from itertools import islice
21from pathlib import Path
22
23import snippets
24
25try:
26    from anytree import Node, RenderTree, find
27except ImportError:
28    print("Install the anytree module to use the --test-tree option")
29
30import scl
31from twisterlib.config_parser import TwisterConfigParser
32from twisterlib.error import TwisterRuntimeError
33from twisterlib.platform import Platform, generate_platforms
34from twisterlib.quarantine import Quarantine
35from twisterlib.statuses import TwisterStatus
36from twisterlib.testinstance import TestInstance
37from twisterlib.testsuite import TestSuite, scan_testsuite_path
38from zephyr_module import parse_modules
39
40logger = logging.getLogger('twister')
41
42ZEPHYR_BASE = os.getenv("ZEPHYR_BASE")
43if not ZEPHYR_BASE:
44    sys.exit("$ZEPHYR_BASE environment variable undefined")
45
46# This is needed to load edt.pickle files.
47sys.path.insert(0, os.path.join(ZEPHYR_BASE, "scripts", "dts",
48                                "python-devicetree", "src"))
49from devicetree import edtlib  # pylint: disable=unused-import
50
51sys.path.insert(0, os.path.join(ZEPHYR_BASE, "scripts/"))
52
53
54class Filters:
55    # platform keys
56    PLATFORM_KEY = 'platform key filter'
57    # filters provided on command line by the user/tester
58    CMD_LINE = 'command line filter'
59    # filters in the testsuite yaml definition
60    TESTSUITE = 'testsuite filter'
61    # filters in the testplan yaml definition
62    TESTPLAN = 'testplan filter'
63    # filters related to platform definition
64    PLATFORM = 'Platform related filter'
65    # in case a test suite is skipped intentionally .
66    SKIP = 'Skip filter'
67    # in case of incompatibility between selected and allowed toolchains.
68    TOOLCHAIN = 'Toolchain filter'
69    # in case where an optional module is not available
70    MODULE = 'Module filter'
71    # in case of missing env. variable required for a platform
72    ENVIRONMENT = 'Environment filter'
73
74
75class TestLevel:
76    name = None
77    levels = []
78    scenarios = []
79
80
81class TestConfiguration:
82    __test__ = False
83    tc_schema_path = os.path.join(
84        ZEPHYR_BASE,
85        "scripts",
86        "schemas",
87        "twister",
88        "test-config-schema.yaml"
89    )
90
91    def __init__(self, config_file):
92        self.test_config = None
93        self.override_default_platforms = False
94        self.increased_platform_scope = True
95        self.default_platforms = []
96        self.parse(config_file)
97
98    def parse(self, config_file):
99        if os.path.exists(config_file):
100            tc_schema = scl.yaml_load(self.tc_schema_path)
101            self.test_config = scl.yaml_load_verify(config_file, tc_schema)
102        else:
103            raise TwisterRuntimeError(f"File {config_file} not found.")
104
105        platform_config = self.test_config.get('platforms', {})
106
107        self.override_default_platforms = platform_config.get('override_default_platforms', False)
108        self.increased_platform_scope = platform_config.get('increased_platform_scope', True)
109        self.default_platforms = platform_config.get('default_platforms', [])
110
111        self.options = self.test_config.get('options', {})
112
113
114    @staticmethod
115    def get_level(levels, name):
116        level = next((lvl for lvl in levels if lvl.name == name), None)
117        return level
118
119    def get_levels(self, scenarios):
120        levels = []
121        configured_levels = self.test_config.get('levels', [])
122
123        # Do first pass on levels to get initial data.
124        for level in configured_levels:
125            adds = []
126            for s in  level.get('adds', []):
127                r = re.compile(s)
128                adds.extend(list(filter(r.fullmatch, scenarios)))
129
130            test_level = TestLevel()
131            test_level.name = level['name']
132            test_level.scenarios = adds
133            test_level.levels = level.get('inherits', [])
134            levels.append(test_level)
135
136        # Go over levels again to resolve inheritance.
137        for level in configured_levels:
138            inherit = level.get('inherits', [])
139            _level = self.get_level(levels, level['name'])
140            if inherit:
141                for inherted_level in inherit:
142                    _inherited = self.get_level(levels, inherted_level)
143                    assert _inherited, "Unknown inherited level {inherted_level}"
144                    _inherited_scenarios = _inherited.scenarios
145                    level_scenarios = _level.scenarios if _level else []
146                    level_scenarios.extend(_inherited_scenarios)
147
148        return levels
149
150class TestPlan:
151    __test__ = False  # for pytest to skip this class when collects tests
152    config_re = re.compile('(CONFIG_[A-Za-z0-9_]+)[=]\"?([^\"]*)\"?$')
153    dt_re = re.compile('([A-Za-z0-9_]+)[=]\"?([^\"]*)\"?$')
154
155    suite_schema = scl.yaml_load(
156        os.path.join(ZEPHYR_BASE,
157                     "scripts", "schemas", "twister", "testsuite-schema.yaml"))
158    quarantine_schema = scl.yaml_load(
159        os.path.join(ZEPHYR_BASE,
160                     "scripts", "schemas", "twister", "quarantine-schema.yaml"))
161
162    SAMPLE_FILENAME = 'sample.yaml'
163    TESTSUITE_FILENAME = 'testcase.yaml'
164
165    def __init__(self, env: Namespace):
166
167        self.options = env.options
168        self.env = env
169
170        # Keep track of which test cases we've filtered out and why
171        self.testsuites = {}
172        self.quarantine = None
173        self.platforms = []
174        self.platform_names = []
175        self.selected_platforms = []
176        self.default_platforms = []
177        self.load_errors = 0
178        self.instances = dict()
179        self.instance_fail_count = 0
180        self.warnings = 0
181
182        self.scenarios = []
183
184        self.hwm = env.hwm
185        # used during creating shorter build paths
186        self.link_dir_counter = 0
187        self.modules = []
188
189        self.run_individual_testsuite = []
190        self.levels = []
191        self.test_config =  None
192
193        self.name = "unnamed"
194
195    def find_subtests(self):
196        sub_tests = self.options.sub_test
197        if sub_tests:
198            for subtest in sub_tests:
199                _subtests = self.get_testcase(subtest)
200                for _subtest in _subtests:
201                    self.run_individual_testsuite.append(_subtest.name)
202
203            if self.run_individual_testsuite:
204                logger.info("Running the following tests:")
205                for test in self.run_individual_testsuite:
206                    print(f" - {test}")
207            else:
208                raise TwisterRuntimeError("Tests not found")
209
210    def discover(self):
211        self.handle_modules()
212        self.test_config = TestConfiguration(self.env.test_config)
213
214        self.add_configurations()
215        num = self.add_testsuites(testsuite_filter=self.options.test,
216                                testsuite_pattern=self.options.test_pattern)
217
218        if num == 0:
219            raise TwisterRuntimeError("No testsuites found at the specified location...")
220        if self.load_errors:
221            raise TwisterRuntimeError(
222                f"Found {self.load_errors} errors loading {num} test configurations."
223            )
224
225        self.find_subtests()
226        # get list of scenarios we have parsed into one list
227        for _, ts in self.testsuites.items():
228            self.scenarios.append(ts.id)
229
230        self.report_duplicates()
231        self.levels = self.test_config.get_levels(self.scenarios)
232
233        # handle quarantine
234        ql = self.options.quarantine_list
235        qv = self.options.quarantine_verify
236        if qv and not ql:
237            logger.error("No quarantine list given to be verified")
238            raise TwisterRuntimeError("No quarantine list given to be verified")
239        if ql:
240            for quarantine_file in ql:
241                try:
242                    # validate quarantine yaml file against the provided schema
243                    scl.yaml_load_verify(quarantine_file, self.quarantine_schema)
244                except scl.EmptyYamlFileException:
245                    logger.debug(f'Quarantine file {quarantine_file} is empty')
246            self.quarantine = Quarantine(ql)
247
248    def get_level(self, name):
249        level = next((lvl for lvl in self.levels if lvl.name == name), None)
250        return level
251
252    def load(self):
253
254        if self.options.report_suffix:
255            last_run = os.path.join(
256                self.options.outdir,
257                f"twister_{self.options.report_suffix}.json"
258            )
259        else:
260            last_run = os.path.join(self.options.outdir, "twister.json")
261
262        if self.options.only_failed or self.options.report_summary is not None:
263            self.load_from_file(last_run)
264            self.selected_platforms = set(p.platform.name for p in self.instances.values())
265        elif self.options.load_tests:
266            self.load_from_file(self.options.load_tests)
267            self.selected_platforms = set(p.platform.name for p in self.instances.values())
268        elif self.options.test_only:
269            # Get list of connected hardware and filter tests to only be run on connected hardware.
270            # If the platform does not exist in the hardware map or was not specified by --platform,
271            # just skip it.
272
273            connected_list = []
274            excluded_list = []
275            for _cp in self.options.platform:
276                if _cp in self.platform_names:
277                    connected_list.append(self.get_platform(_cp).name)
278
279            if self.options.exclude_platform:
280                for _p in self.options.exclude_platform:
281                    if _p in self.platform_names:
282                        excluded_list.append(self.get_platform(_p).name)
283                for excluded in excluded_list:
284                    if excluded in connected_list:
285                        connected_list.remove(excluded)
286
287            self.load_from_file(last_run, filter_platform=connected_list)
288            self.selected_platforms = set(p.platform.name for p in self.instances.values())
289        else:
290            self.apply_filters()
291
292        if self.options.subset:
293            s =  self.options.subset
294            try:
295                subset, sets = (int(x) for x in s.split("/"))
296            except ValueError as err:
297                raise TwisterRuntimeError("Bad subset value.") from err
298
299            if subset > sets:
300                raise TwisterRuntimeError("subset should not exceed the total number of sets")
301
302            if int(subset) > 0 and int(sets) >= int(subset):
303                logger.info(f"Running only a subset: {subset}/{sets}")
304            else:
305                raise TwisterRuntimeError(
306                    f"You have provided a wrong subset value: {self.options.subset}."
307                )
308
309            self.generate_subset(subset, int(sets))
310
311    def generate_subset(self, subset, sets):
312        # Test instances are sorted depending on the context. For CI runs
313        # the execution order is: "plat1-testA, plat1-testB, ...,
314        # plat1-testZ, plat2-testA, ...". For hardware tests
315        # (device_testing), were multiple physical platforms can run the tests
316        # in parallel, it is more efficient to run in the order:
317        # "plat1-testA, plat2-testA, ..., plat1-testB, plat2-testB, ..."
318        if self.options.device_testing:
319            self.instances = OrderedDict(sorted(self.instances.items(),
320                                key=lambda x: x[0][x[0].find("/") + 1:]))
321        else:
322            self.instances = OrderedDict(sorted(self.instances.items()))
323
324        if self.options.shuffle_tests:
325            seed_value = int.from_bytes(os.urandom(8), byteorder="big")
326            if self.options.shuffle_tests_seed is not None:
327                seed_value = self.options.shuffle_tests_seed
328
329            logger.info(f"Shuffle tests with seed: {seed_value}")
330            random.seed(seed_value)
331            temp_list = list(self.instances.items())
332            random.shuffle(temp_list)
333            self.instances = OrderedDict(temp_list)
334
335        # Do calculation based on what is actually going to be run and evaluated
336        # at runtime, ignore the cases we already know going to be skipped.
337        # This fixes an issue where some sets would get majority of skips and
338        # basically run nothing beside filtering.
339        to_run = {k : v for k,v in self.instances.items() if v.status == TwisterStatus.NONE}
340        total = len(to_run)
341        per_set = int(total / sets)
342        num_extra_sets = total - (per_set * sets)
343
344        # Try and be more fair for rounding error with integer division
345        # so the last subset doesn't get overloaded, we add 1 extra to
346        # subsets 1..num_extra_sets.
347        if subset <= num_extra_sets:
348            start = (subset - 1) * (per_set + 1)
349            end = start + per_set + 1
350        else:
351            base = num_extra_sets * (per_set + 1)
352            start = ((subset - num_extra_sets - 1) * per_set) + base
353            end = start + per_set
354
355        sliced_instances = islice(to_run.items(), start, end)
356        skipped = {k : v for k,v in self.instances.items() if v.status == TwisterStatus.SKIP}
357        errors = {k : v for k,v in self.instances.items() if v.status == TwisterStatus.ERROR}
358        self.instances = OrderedDict(sliced_instances)
359        if subset == 1:
360            # add all pre-filtered tests that are skipped or got error status
361            # to the first set to allow for better distribution among all sets.
362            self.instances.update(skipped)
363            self.instances.update(errors)
364
365
366    def handle_modules(self):
367        # get all enabled west projects
368        modules_meta = parse_modules(ZEPHYR_BASE)
369        self.modules = [module.meta.get('name') for module in modules_meta]
370
371
372    def report(self):
373        if self.options.test_tree:
374            if not self.options.detailed_test_id:
375                logger.info("Test tree is always shown with detailed test-id.")
376            self.report_test_tree()
377            return 0
378        elif self.options.list_tests:
379            if not self.options.detailed_test_id:
380                logger.info("Test list is always shown with detailed test-id.")
381            self.report_test_list()
382            return 0
383        elif self.options.list_tags:
384            self.report_tag_list()
385            return 0
386
387        return 1
388
389    def report_duplicates(self):
390        dupes = [item for item, count in collections.Counter(self.scenarios).items() if count > 1]
391        if dupes:
392            msg = "Duplicated test scenarios found:\n"
393            for dupe in dupes:
394                msg += (f"- {dupe} found in:\n")
395                for dc in self.get_testsuite(dupe):
396                    msg += (f"  - {dc.yamlfile}\n")
397            raise TwisterRuntimeError(msg)
398        else:
399            logger.debug("No duplicates found.")
400
401    def report_tag_list(self):
402        tags = set()
403        for _, tc in self.testsuites.items():
404            tags = tags.union(tc.tags)
405
406        for t in tags:
407            print(f"- {t}")
408
409    def report_test_tree(self):
410        tests_list = self.get_tests_list()
411
412        testsuite = Node("Testsuite")
413        samples = Node("Samples", parent=testsuite)
414        tests = Node("Tests", parent=testsuite)
415
416        for test in sorted(tests_list):
417            if test.startswith("sample."):
418                sec = test.split(".")
419                area = find(
420                    samples,
421                    lambda node, sname=sec[1]: node.name == sname and node.parent == samples
422                )
423                if not area:
424                    area = Node(sec[1], parent=samples)
425
426                Node(test, parent=area)
427            else:
428                sec = test.split(".")
429                area = find(
430                    tests,
431                    lambda node, sname=sec[0]: node.name == sname and node.parent == tests
432                )
433                if not area:
434                    area = Node(sec[0], parent=tests)
435
436                if area and len(sec) > 2:
437                    subarea = find(
438                        area, lambda node, sname=sec[1], sparent=area: node.name == sname
439                        and node.parent == sparent
440                    )
441                    if not subarea:
442                        subarea = Node(sec[1], parent=area)
443                    Node(test, parent=subarea)
444
445        for pre, _, node in RenderTree(testsuite):
446            print(f"{pre}{node.name}")
447
448    def report_test_list(self):
449        tests_list = self.get_tests_list()
450
451        cnt = 0
452        for test in sorted(tests_list):
453            cnt = cnt + 1
454            print(f" - {test}")
455        print(f"{cnt} total.")
456
457
458    # Debug Functions
459    @staticmethod
460    def info(what):
461        sys.stdout.write(what + "\n")
462        sys.stdout.flush()
463
464    def add_configurations(self):
465        # Create a list of board roots as defined by the build system in general
466        # Note, internally in twister a board root includes the `boards` folder
467        # but in Zephyr build system, the board root is without the `boards` in folder path.
468        board_roots = [Path(os.path.dirname(root)) for root in self.env.board_roots]
469        soc_roots = self.env.soc_roots
470        arch_roots = self.env.arch_roots
471
472        for platform in generate_platforms(board_roots, soc_roots, arch_roots):
473            if not platform.twister:
474                continue
475            self.platforms.append(platform)
476
477            if not self.test_config.override_default_platforms:
478                if platform.default:
479                    self.default_platforms.append(platform.name)
480                continue
481            for pp in self.test_config.default_platforms:
482                if pp in platform.aliases:
483                    logger.debug(f"adding {platform.name} to default platforms (override  mode)")
484                    self.default_platforms.append(platform.name)
485
486        self.platform_names = [a for p in self.platforms for a in p.aliases]
487
488    def get_all_tests(self):
489        testcases = []
490        for _, ts in self.testsuites.items():
491            for case in ts.testcases:
492                testcases.append(case.name)
493
494        return testcases
495
496    def get_tests_list(self):
497        testcases = []
498        if tag_filter := self.options.tag:
499            for _, ts in self.testsuites.items():
500                if ts.tags.intersection(tag_filter):
501                    for case in ts.testcases:
502                        testcases.append(case.detailed_name)
503        else:
504            for _, ts in self.testsuites.items():
505                for case in ts.testcases:
506                    testcases.append(case.detailed_name)
507
508        if exclude_tag := self.options.exclude_tag:
509            for _, ts in self.testsuites.items():
510                if ts.tags.intersection(exclude_tag):
511                    for case in ts.testcases:
512                        if case.detailed_name in testcases:
513                            testcases.remove(case.detailed_name)
514        return testcases
515
516    def _is_testsuite_selected(self, suite: TestSuite, testsuite_filter, testsuite_patterns_r):
517        """Check if the testsuite is selected by the user."""
518        if not testsuite_filter and not testsuite_patterns_r:
519            # no matching requested, include all testsuites
520            return True
521        if testsuite_filter:
522            scenario = os.path.basename(suite.name)
523            if (
524                suite.name
525                and (suite.name in testsuite_filter or scenario in testsuite_filter)
526            ):
527                return True
528        if testsuite_patterns_r:
529            for r in testsuite_patterns_r:
530                if r.search(suite.id):
531                    return True
532        return False
533
534    def add_testsuites(self, testsuite_filter=None, testsuite_pattern=None):
535        if testsuite_filter is None:
536            testsuite_filter = []
537
538        testsuite_patterns_r = []
539        if testsuite_pattern is None:
540            testsuite_pattern = []
541        else:
542            for pattern in testsuite_pattern:
543                testsuite_patterns_r.append(re.compile(pattern))
544
545        for root in self.env.test_roots:
546            root = os.path.abspath(root)
547
548            logger.debug(f"Reading testsuite configuration files under {root}...")
549
550            for dirpath, _, filenames in os.walk(root, topdown=True):
551                if self.SAMPLE_FILENAME in filenames:
552                    filename = self.SAMPLE_FILENAME
553                elif self.TESTSUITE_FILENAME in filenames:
554                    filename = self.TESTSUITE_FILENAME
555                else:
556                    continue
557
558                logger.debug("Found possible testsuite in " + dirpath)
559
560                suite_yaml_path = os.path.join(dirpath, filename)
561                suite_path = os.path.dirname(suite_yaml_path)
562
563                for alt_config_root in self.env.alt_config_root:
564                    alt_config = os.path.join(os.path.abspath(alt_config_root),
565                                              os.path.relpath(suite_path, root),
566                                              filename)
567                    if os.path.exists(alt_config):
568                        logger.info(
569                            f"Using alternative configuration from {os.path.normpath(alt_config)}"
570                        )
571                        suite_yaml_path = alt_config
572                        break
573
574                try:
575                    parsed_data = TwisterConfigParser(suite_yaml_path, self.suite_schema)
576                    parsed_data.load()
577                    subcases = None
578                    ztest_suite_names = None
579
580                    for name in parsed_data.scenarios:
581                        suite_dict = parsed_data.get_scenario(name)
582                        suite = TestSuite(
583                            root,
584                            suite_path,
585                            name,
586                            data=suite_dict,
587                            detailed_test_id=self.options.detailed_test_id
588                        )
589
590                        # convert to fully qualified names
591                        suite.integration_platforms = self.verify_platforms_existence(
592                                suite.integration_platforms,
593                                f"integration_platforms in {suite.name}")
594                        suite.platform_exclude = self.verify_platforms_existence(
595                                suite.platform_exclude,
596                                f"platform_exclude in {suite.name}")
597                        suite.platform_allow =  self.verify_platforms_existence(
598                                suite.platform_allow,
599                                f"platform_allow in {suite.name}")
600
601                        if suite.harness in ['ztest', 'test']:
602                            if subcases is None:
603                                # scan it only once per testsuite
604                                subcases, ztest_suite_names = scan_testsuite_path(suite_path)
605                            suite.add_subcases(suite_dict, subcases, ztest_suite_names)
606                        else:
607                            suite.add_subcases(suite_dict)
608
609                        if not self._is_testsuite_selected(suite, testsuite_filter,
610                                                       testsuite_patterns_r):
611                            # skip testsuite if they were not selected directly by the user
612                            continue
613                        if suite.name in self.testsuites:
614                            msg = (
615                                f"test suite '{suite.name}' in '{suite.yamlfile}' is already added"
616                            )
617                            if suite.yamlfile == self.testsuites[suite.name].yamlfile:
618                                logger.debug(f"Skip - {msg}")
619                            else:
620                                msg = (
621                                    f"Duplicate {msg} from '{self.testsuites[suite.name].yamlfile}'"
622                                )
623                                raise TwisterRuntimeError(msg)
624                        else:
625                            self.testsuites[suite.name] = suite
626
627                except Exception as e:
628                    logger.error(f"{suite_path}: can't load (skipping): {e!r}")
629                    self.load_errors += 1
630        return len(self.testsuites)
631
632    def __str__(self):
633        return self.name
634
635    def get_platform(self, name):
636        selected_platform = None
637        for platform in self.platforms:
638            if name in platform.aliases:
639                selected_platform = platform
640                break
641        return selected_platform
642
643    def handle_quarantined_tests(self, instance: TestInstance, plat: Platform):
644        if self.quarantine:
645            sim_name = plat.simulation
646            if sim_name != "na" and (simulator := plat.simulator_by_name(self.options.sim_name)):
647                sim_name = simulator.name
648            matched_quarantine = self.quarantine.get_matched_quarantine(
649                instance.testsuite.id,
650                plat.name,
651                plat.arch,
652                sim_name
653            )
654            if matched_quarantine and not self.options.quarantine_verify:
655                instance.status = TwisterStatus.SKIP
656                instance.reason = "Quarantine: " + matched_quarantine
657                return
658            if not matched_quarantine and self.options.quarantine_verify:
659                instance.add_filter("Not under quarantine", Filters.CMD_LINE)
660
661    def load_from_file(self, file, filter_platform=None):
662        if filter_platform is None:
663            filter_platform = []
664        try:
665            with open(file) as json_test_plan:
666                jtp = json.load(json_test_plan)
667                instance_list = []
668                for ts in jtp.get("testsuites", []):
669                    logger.debug(f"loading {ts['name']}...")
670                    testsuite = ts["name"]
671                    toolchain = ts["toolchain"]
672
673                    platform = self.get_platform(ts["platform"])
674                    if filter_platform and platform.name not in filter_platform:
675                        continue
676                    instance = TestInstance(
677                        self.testsuites[testsuite], platform, toolchain, self.env.outdir
678                    )
679                    if ts.get("run_id"):
680                        instance.run_id = ts.get("run_id")
681
682                    instance.run = instance.check_runnable(
683                        self.options,
684                        self.hwm
685                    )
686
687                    if self.options.test_only and not instance.run:
688                        continue
689
690                    instance.metrics['handler_time'] = ts.get('execution_time', 0)
691                    instance.metrics['used_ram'] = ts.get("used_ram", 0)
692                    instance.metrics['used_rom']  = ts.get("used_rom",0)
693                    instance.metrics['available_ram'] = ts.get('available_ram', 0)
694                    instance.metrics['available_rom'] = ts.get('available_rom', 0)
695
696                    status = TwisterStatus(ts.get('status'))
697                    reason = ts.get("reason", "Unknown")
698                    if status in [TwisterStatus.ERROR, TwisterStatus.FAIL]:
699                        if self.options.report_summary is not None:
700                            instance.status = status
701                            instance.reason = reason
702                            self.instance_fail_count += 1
703                        else:
704                            instance.status = TwisterStatus.NONE
705                            instance.reason = None
706                            instance.retries += 1
707                    # test marked as built only can run when --test-only is used.
708                    # Reset status to capture new results.
709                    elif status == TwisterStatus.NOTRUN and instance.run and self.options.test_only:
710                        instance.status = TwisterStatus.NONE
711                        instance.reason = None
712                    else:
713                        instance.status = status
714                        instance.reason = reason
715
716                    self.handle_quarantined_tests(instance, platform)
717
718                    for tc in ts.get('testcases', []):
719                        identifier = tc['identifier']
720                        tc_status = TwisterStatus(tc.get('status'))
721                        tc_reason = None
722                        # we set reason only if status is valid, it might have been
723                        # reset above...
724                        if instance.status != TwisterStatus.NONE:
725                            tc_reason = tc.get('reason')
726                        if tc_status != TwisterStatus.NONE:
727                            case = instance.set_case_status_by_name(
728                                identifier,
729                                tc_status,
730                                tc_reason
731                            )
732                            case.duration = tc.get('execution_time', 0)
733                            if tc.get('log'):
734                                case.output = tc.get('log')
735
736                    instance.create_overlay(platform,
737                                            self.options.enable_asan,
738                                            self.options.enable_ubsan,
739                                            self.options.enable_coverage,
740                                            self.options.coverage_platform
741                                            )
742                    instance_list.append(instance)
743                self.add_instances(instance_list)
744        except FileNotFoundError as e:
745            logger.error(f"{e}")
746            return 1
747
748    def check_platform(self, platform, platform_list):
749        return any(p in platform.aliases for p in platform_list)
750
751    def apply_filters(self, **kwargs):
752
753        platform_filter = self.options.platform
754        platform_pattern = self.options.platform_pattern
755        vendor_filter = self.options.vendor
756        exclude_platform = self.options.exclude_platform
757        testsuite_filter = self.run_individual_testsuite
758        arch_filter = self.options.arch
759        tag_filter = self.options.tag
760        exclude_tag = self.options.exclude_tag
761        all_filter = self.options.all
762        runnable = (self.options.device_testing or self.options.filter == 'runnable')
763        force_toolchain = self.options.force_toolchain
764        force_platform = self.options.force_platform
765        slow_only = self.options.enable_slow_only
766        ignore_platform_key = self.options.ignore_platform_key
767        emu_filter = self.options.emulation_only
768
769        logger.debug(" platform filter: " + str(platform_filter))
770        logger.debug("platform_pattern: " + str(platform_pattern))
771        logger.debug("   vendor filter: " + str(vendor_filter))
772        logger.debug("     arch_filter: " + str(arch_filter))
773        logger.debug("      tag_filter: " + str(tag_filter))
774        logger.debug("     exclude_tag: " + str(exclude_tag))
775
776        default_platforms = False
777        vendor_platforms = False
778        emulation_platforms = False
779
780        if all_filter:
781            logger.info("Selecting all possible platforms per testsuite scenario")
782            # When --all used, any --platform arguments ignored
783            platform_filter = []
784        elif not platform_filter and not emu_filter and not vendor_filter and not platform_pattern:
785            logger.info("Selecting default platforms per testsuite scenario")
786            default_platforms = True
787        elif emu_filter:
788            logger.info("Selecting emulation platforms per testsuite scenario")
789            emulation_platforms = True
790        elif vendor_filter:
791            vendor_platforms = True
792
793        _platforms = []
794        if platform_filter:
795            logger.debug(f"Checking platform filter: {platform_filter}")
796            # find in aliases and rename
797            platform_filter = self.verify_platforms_existence(platform_filter, "platform_filter")
798            platforms = list(filter(lambda p: p.name in platform_filter, self.platforms))
799        elif platform_pattern:
800            platforms = list(
801                filter(lambda p: any(re.match(pat, alias) for pat in platform_pattern \
802                                    for alias in p.aliases), self.platforms)
803            )
804        elif emu_filter:
805            platforms = list(
806                filter(lambda p: bool(p.simulator_by_name(self.options.sim_name)), self.platforms)
807            )
808        elif vendor_filter:
809            platforms = list(filter(lambda p: p.vendor in vendor_filter, self.platforms))
810            logger.info(f"Selecting platforms by vendors: {','.join(vendor_filter)}")
811        elif arch_filter:
812            platforms = list(filter(lambda p: p.arch in arch_filter, self.platforms))
813        elif default_platforms:
814            _platforms = list(filter(lambda p: p.name in self.default_platforms, self.platforms))
815            platforms = []
816            # default platforms that can't be run are dropped from the list of
817            # the default platforms list. Default platforms should always be
818            # runnable.
819            for p in _platforms:
820                sim = p.simulator_by_name(self.options.sim_name)
821                if (not sim) or sim.is_runnable():
822                    platforms.append(p)
823        else:
824            platforms = self.platforms
825
826        # test configuration options
827        test_config_options = self.test_config.options
828        integration_mode_list = test_config_options.get('integration_mode', [])
829
830        logger.info("Building initial testsuite list...")
831
832        keyed_tests = {}
833        for _, ts in self.testsuites.items():
834            if ts.integration_platforms:
835                _integration_platforms = list(
836                    filter(lambda item: item.name in ts.integration_platforms, self.platforms)
837                )
838            else:
839                _integration_platforms = []
840
841            if (ts.build_on_all and not platform_filter and not platform_pattern and
842                self.test_config.increased_platform_scope):
843                # if build_on_all is set, we build on all platforms
844                platform_scope = self.platforms
845            elif ts.integration_platforms and self.options.integration:
846                # if integration is set, we build on integration platforms
847                platform_scope = _integration_platforms
848            elif ts.integration_platforms and not platform_filter and not platform_pattern:
849                # if integration platforms are set, we build on those and integration mode is set
850                # for this test suite, we build on integration platforms
851                if any(ts.id.startswith(i) for i in integration_mode_list):
852                    platform_scope = _integration_platforms
853                else:
854                    platform_scope = platforms + _integration_platforms
855            else:
856                platform_scope = platforms
857
858            integration = self.options.integration and ts.integration_platforms
859
860            # If there isn't any overlap between the platform_allow list and the platform_scope
861            # we set the scope to the platform_allow list
862            if (
863                ts.platform_allow
864                and not platform_filter
865                and not integration
866                and self.test_config.increased_platform_scope
867            ):
868                a = set(platform_scope)
869                b = set(filter(lambda item: item.name in ts.platform_allow, self.platforms))
870                c = a.intersection(b)
871                if not c:
872                    platform_scope = list(
873                        filter(lambda item: item.name in ts.platform_allow, self.platforms)
874                    )
875            # list of instances per testsuite, aka configurations.
876            instance_list = []
877            for itoolchain, plat in itertools.product(
878                ts.integration_toolchains or [None], platform_scope
879            ):
880                if itoolchain:
881                    toolchain = itoolchain
882                elif plat.arch in ['posix', 'unit']:
883                    # workaround until toolchain variant in zephyr is overhauled and improved.
884                    if self.env.toolchain in ['llvm']:
885                        toolchain = 'llvm'
886                    else:
887                        toolchain = 'host'
888                else:
889                    toolchain = "zephyr" if not self.env.toolchain else self.env.toolchain
890
891                instance = TestInstance(ts, plat, toolchain, self.env.outdir)
892                instance.run = instance.check_runnable(
893                    self.options,
894                    self.hwm
895                )
896
897                if not force_platform and self.check_platform(plat,exclude_platform):
898                    instance.add_filter("Platform is excluded on command line.", Filters.CMD_LINE)
899
900                if (plat.arch == "unit") != (ts.type == "unit"):
901                    # Discard silently
902                    continue
903
904                if ts.modules and self.modules and not set(ts.modules).issubset(set(self.modules)):
905                    instance.add_filter(
906                        f"one or more required modules not available: {','.join(ts.modules)}",
907                        Filters.MODULE
908                    )
909
910                if self.options.level:
911                    tl = self.get_level(self.options.level)
912                    if tl is None:
913                        instance.add_filter(
914                            f"Unknown test level '{self.options.level}'",
915                            Filters.TESTPLAN
916                        )
917                    else:
918                        planned_scenarios = tl.scenarios
919                        if (
920                            ts.id not in planned_scenarios
921                            and not set(ts.levels).intersection(set(tl.levels))
922                        ):
923                            instance.add_filter("Not part of requested test plan", Filters.TESTPLAN)
924
925                if runnable and not instance.run:
926                    instance.add_filter("Not runnable on device", Filters.CMD_LINE)
927
928                if (
929                    self.options.integration
930                    and ts.integration_platforms
931                    and plat.name not in ts.integration_platforms
932                ):
933                    instance.add_filter("Not part of integration platforms", Filters.TESTSUITE)
934
935                if ts.skip:
936                    instance.add_filter("Skip filter", Filters.SKIP)
937
938                if tag_filter and not ts.tags.intersection(tag_filter):
939                    instance.add_filter("Command line testsuite tag filter", Filters.CMD_LINE)
940
941                if slow_only and not ts.slow:
942                    instance.add_filter("Not a slow test", Filters.CMD_LINE)
943
944                if exclude_tag and ts.tags.intersection(exclude_tag):
945                    instance.add_filter("Command line testsuite exclude filter", Filters.CMD_LINE)
946
947                if testsuite_filter:
948                    normalized_f = [os.path.basename(_ts) for _ts in testsuite_filter]
949                    if ts.id not in normalized_f:
950                        instance.add_filter("Testsuite name filter", Filters.CMD_LINE)
951
952                if arch_filter and plat.arch not in arch_filter:
953                    instance.add_filter("Command line testsuite arch filter", Filters.CMD_LINE)
954
955                if not force_platform:
956
957                    if ts.arch_allow and plat.arch not in ts.arch_allow:
958                        instance.add_filter("Not in testsuite arch allow list", Filters.TESTSUITE)
959
960                    if ts.arch_exclude and plat.arch in ts.arch_exclude:
961                        instance.add_filter("In testsuite arch exclude", Filters.TESTSUITE)
962
963                    if ts.vendor_allow and plat.vendor not in ts.vendor_allow:
964                        instance.add_filter(
965                            "Not in testsuite vendor allow list",
966                            Filters.TESTSUITE
967                        )
968
969                    if ts.vendor_exclude and plat.vendor in ts.vendor_exclude:
970                        instance.add_filter("In testsuite vendor exclude", Filters.TESTSUITE)
971
972                    if ts.platform_exclude and plat.name in ts.platform_exclude:
973                        instance.add_filter("In testsuite platform exclude", Filters.TESTSUITE)
974
975                if ts.toolchain_exclude and toolchain in ts.toolchain_exclude:
976                    instance.add_filter("In testsuite toolchain exclude", Filters.TOOLCHAIN)
977
978                if platform_filter and plat.name not in platform_filter:
979                    instance.add_filter("Command line platform filter", Filters.CMD_LINE)
980
981                if ts.platform_allow \
982                        and plat.name not in ts.platform_allow \
983                        and not (platform_filter and force_platform):
984                    instance.add_filter("Not in testsuite platform allow list", Filters.TESTSUITE)
985
986                if ts.platform_type and plat.type not in ts.platform_type:
987                    instance.add_filter("Not in testsuite platform type list", Filters.TESTSUITE)
988
989                if ts.toolchain_allow and toolchain not in ts.toolchain_allow:
990                    instance.add_filter("Not in testsuite toolchain allow list", Filters.TOOLCHAIN)
991
992                if not plat.env_satisfied:
993                    instance.add_filter(
994                        "Environment ({}) not satisfied".format(", ".join(plat.env)),
995                        Filters.ENVIRONMENT
996                    )
997                if plat.type == 'native' and sys.platform != 'linux':
998                    instance.add_filter("Native platform requires Linux", Filters.ENVIRONMENT)
999
1000                if not force_toolchain \
1001                        and toolchain and (toolchain not in plat.supported_toolchains):
1002                    instance.add_filter(
1003                        f"Not supported by the toolchain: {toolchain}",
1004                        Filters.PLATFORM
1005                    )
1006
1007                if plat.ram < ts.min_ram:
1008                    instance.add_filter("Not enough RAM", Filters.PLATFORM)
1009
1010                if ts.harness:
1011                    sim = plat.simulator_by_name(self.options.sim_name)
1012                    if ts.harness == 'robot' and not (sim and sim.name == 'renode'):
1013                        instance.add_filter(
1014                            "No robot support for the selected platform",
1015                            Filters.SKIP
1016                        )
1017
1018                if ts.depends_on:
1019                    dep_intersection = ts.depends_on.intersection(set(plat.supported))
1020                    if dep_intersection != set(ts.depends_on):
1021                        instance.add_filter(
1022                            f"No hardware support for {set(ts.depends_on)-dep_intersection}",
1023                            Filters.PLATFORM
1024                        )
1025
1026                if plat.flash < ts.min_flash:
1027                    instance.add_filter("Not enough FLASH", Filters.PLATFORM)
1028
1029                if set(plat.ignore_tags) & ts.tags:
1030                    instance.add_filter(
1031                        "Excluded tags per platform (exclude_tags)",
1032                        Filters.PLATFORM
1033                    )
1034
1035                if plat.only_tags and not set(plat.only_tags) & ts.tags:
1036                    instance.add_filter("Excluded tags per platform (only_tags)", Filters.PLATFORM)
1037
1038                if ts.required_snippets:
1039                    missing_snippet = False
1040                    snippet_args = {"snippets": ts.required_snippets}
1041                    found_snippets = snippets.find_snippets_in_roots(
1042                        snippet_args,
1043                        [*self.env.snippet_roots, Path(ts.source_dir)]
1044                    )
1045
1046                    # Search and check that all required snippet files are found
1047                    for this_snippet in snippet_args['snippets']:
1048                        if this_snippet not in found_snippets:
1049                            logger.error(
1050                                f"Can't find snippet '{this_snippet}' for test '{ts.name}'"
1051                            )
1052                            instance.status = TwisterStatus.ERROR
1053                            instance.reason = f"Snippet {this_snippet} not found"
1054                            missing_snippet = True
1055                            break
1056
1057                    if not missing_snippet:
1058                        # Look for required snippets and check that they are applicable for these
1059                        # platforms/boards
1060                        for this_snippet in snippet_args['snippets']:
1061                            matched_snippet_board = False
1062
1063                            # If the "appends" key is present with at least one entry then this
1064                            # snippet applies to all boards and further platform-specific checks
1065                            # are not required
1066                            if found_snippets[this_snippet].appends:
1067                                continue
1068
1069                            for this_board in found_snippets[this_snippet].board2appends:
1070                                if this_board.startswith('/'):
1071                                    match = re.search(this_board[1:-1], plat.name)
1072                                    if match is not None:
1073                                        matched_snippet_board = True
1074                                        break
1075                                elif this_board == plat.name:
1076                                    matched_snippet_board = True
1077                                    break
1078
1079                            if matched_snippet_board is False:
1080                                instance.add_filter("Snippet not supported", Filters.PLATFORM)
1081                                break
1082
1083                # handle quarantined tests
1084                self.handle_quarantined_tests(instance, plat)
1085
1086                # platform_key is a list of unique platform attributes that form a unique key
1087                # a test will match against to determine if it should be scheduled to run.
1088                # A key containing a field name that the platform does not have
1089                # will filter the platform.
1090                #
1091                # A simple example is keying on arch and simulation
1092                # to run a test once per unique (arch, simulation) platform.
1093                if (
1094                    not ignore_platform_key
1095                    and hasattr(ts, 'platform_key')
1096                    and len(ts.platform_key) > 0
1097                ):
1098                    key_fields = sorted(set(ts.platform_key))
1099                    keys = [getattr(plat, key_field, None) for key_field in key_fields]
1100                    for key in keys:
1101                        if key is None or key == 'na':
1102                            instance.add_filter(
1103                                "Excluded platform missing key fields"
1104                                f" demanded by test {key_fields}",
1105                                Filters.PLATFORM
1106                            )
1107                            break
1108                    else:
1109                        test_keys = copy.deepcopy(keys)
1110                        test_keys.append(ts.name)
1111                        test_keys = tuple(test_keys)
1112                        keyed_test = keyed_tests.get(test_keys)
1113                        if keyed_test is not None:
1114                            plat_key = {
1115                                key_field: getattr(
1116                                    keyed_test['plat'],
1117                                    key_field
1118                                ) for key_field in key_fields
1119                            }
1120                            instance.add_filter(
1121                                f"Already covered for key {key}"
1122                                f" by platform {keyed_test['plat'].name} having key {plat_key}",
1123                                Filters.PLATFORM_KEY
1124                            )
1125                        else:
1126                            # do not add a platform to keyed tests if previously
1127                            # filtered
1128
1129                            if not instance.filters:
1130                                keyed_tests[test_keys] = {'plat': plat, 'ts': ts}
1131
1132                # if nothing stopped us until now, it means this configuration
1133                # needs to be added.
1134                instance_list.append(instance)
1135
1136            # no configurations, so jump to next testsuite
1137            if not instance_list:
1138                continue
1139
1140            # if twister was launched with no platform options at all, we
1141            # take all default platforms
1142            if default_platforms and not ts.build_on_all and not integration:
1143                if ts.platform_allow:
1144                    _default_p = set(self.default_platforms)
1145                    _platform_allow = set(ts.platform_allow)
1146                    _intersection = _default_p.intersection(_platform_allow)
1147                    if _intersection:
1148                        aa = list(
1149                            filter(
1150                                lambda _scenario: _scenario.platform.name in _intersection,
1151                                instance_list
1152                            )
1153                        )
1154                        self.add_instances(aa)
1155                    else:
1156                        self.add_instances(instance_list)
1157                else:
1158                    # add integration platforms to the list of default
1159                    # platforms, even if we are not in integration mode
1160                    _platforms = self.default_platforms + ts.integration_platforms
1161                    instances = list(
1162                        filter(lambda ts: ts.platform.name in _platforms, instance_list)
1163                    )
1164                    self.add_instances(instances)
1165            elif integration:
1166                instances = list(
1167                    filter(
1168                        lambda item:  item.platform.name in ts.integration_platforms,
1169                        instance_list
1170                    )
1171                )
1172                self.add_instances(instances)
1173
1174            elif emulation_platforms:
1175                self.add_instances(instance_list)
1176                for instance in list(
1177                    filter(
1178                        lambda inst: not inst.platform.simulator_by_name(self.options.sim_name),
1179                        instance_list
1180                    )
1181                ):
1182                    instance.add_filter("Not an emulated platform", Filters.CMD_LINE)
1183            elif vendor_platforms:
1184                self.add_instances(instance_list)
1185                for instance in list(
1186                    filter(
1187                        lambda inst: inst.platform.vendor not in vendor_filter,
1188                        instance_list
1189                    )
1190                ):
1191                    instance.add_filter("Not a selected vendor platform", Filters.CMD_LINE)
1192            else:
1193                self.add_instances(instance_list)
1194
1195        for _, case in self.instances.items():
1196            # Do not create files for filtered instances
1197            if case.status == TwisterStatus.FILTER:
1198                continue
1199            # set run_id for each unfiltered instance
1200            case.setup_run_id()
1201            case.create_overlay(case.platform,
1202                                self.options.enable_asan,
1203                                self.options.enable_ubsan,
1204                                self.options.enable_coverage,
1205                                self.options.coverage_platform)
1206
1207        self.selected_platforms = set(p.platform.name for p in self.instances.values())
1208
1209        filtered_and_skipped_instances = list(
1210            filter(
1211            lambda item: item.status in [TwisterStatus.FILTER, TwisterStatus.SKIP],
1212            self.instances.values()
1213            )
1214        )
1215        for inst in filtered_and_skipped_instances:
1216            change_skip_to_error_if_integration(self.options, inst)
1217            inst.add_missing_case_status(inst.status)
1218
1219    def add_instances(self, instance_list):
1220        for instance in instance_list:
1221            self.instances[instance.name] = instance
1222
1223
1224    def get_testsuite(self, identifier):
1225        results = []
1226        for _, ts in self.testsuites.items():
1227            if ts.id == identifier:
1228                results.append(ts)
1229        return results
1230
1231    def get_testcase(self, identifier):
1232        results = []
1233        for _, ts in self.testsuites.items():
1234            for case in ts.testcases:
1235                if case.name == identifier:
1236                    results.append(ts)
1237        return results
1238
1239    def verify_platforms_existence(self, platform_names_to_verify, log_info=""):
1240        """
1241        Verify if platform name (passed by --platform option, or in yaml file
1242        as platform_allow or integration_platforms options) is correct. If not -
1243        log and raise error.
1244        """
1245        _platforms = []
1246        for platform in platform_names_to_verify:
1247            if platform in self.platform_names:
1248                p = self.get_platform(platform)
1249                if p:
1250                    _platforms.append(p.name)
1251            else:
1252                logger.error(f"{log_info} - unrecognized platform - {platform}")
1253                sys.exit(2)
1254        return _platforms
1255
1256    def create_build_dir_links(self):
1257        """
1258        Iterate through all no-skipped instances in suite and create links
1259        for each one build directories. Those links will be passed in the next
1260        steps to the CMake command.
1261        """
1262
1263        links_dir_name = "twister_links"  # folder for all links
1264        links_dir_path = os.path.join(self.env.outdir, links_dir_name)
1265        if not os.path.exists(links_dir_path):
1266            os.mkdir(links_dir_path)
1267
1268        for instance in self.instances.values():
1269            if instance.status != TwisterStatus.SKIP:
1270                self._create_build_dir_link(links_dir_path, instance)
1271
1272    def _create_build_dir_link(self, links_dir_path, instance):
1273        """
1274        Create build directory with original "long" path. Next take shorter
1275        path and link them with original path - create link. At the end
1276        replace build_dir to created link. This link will be passed to CMake
1277        command. This action helps to limit path length which can be
1278        significant during building by CMake on Windows OS.
1279        """
1280
1281        os.makedirs(instance.build_dir, exist_ok=True)
1282
1283        link_name = f"test_{self.link_dir_counter}"
1284        link_path = os.path.join(links_dir_path, link_name)
1285
1286        if os.name == "nt":  # if OS is Windows
1287            command = ["mklink", "/J", f"{link_path}", os.path.normpath(instance.build_dir)]
1288            subprocess.call(command, shell=True)
1289        else:  # for Linux and MAC OS
1290            os.symlink(instance.build_dir, link_path)
1291
1292        # Here original build directory is replaced with symbolic link. It will
1293        # be passed to CMake command
1294        instance.build_dir = link_path
1295
1296        self.link_dir_counter += 1
1297
1298
1299def change_skip_to_error_if_integration(options, instance):
1300    ''' All skips on integration_platforms are treated as errors.'''
1301    if instance.platform.name in instance.testsuite.integration_platforms:
1302        # Do not treat this as error if filter type is among ignore_filters
1303        filters = {t['type'] for t in instance.filters}
1304        ignore_filters ={Filters.CMD_LINE, Filters.SKIP, Filters.PLATFORM_KEY,
1305                         Filters.TOOLCHAIN, Filters.MODULE, Filters.TESTPLAN,
1306                         Filters.ENVIRONMENT}
1307        if filters.intersection(ignore_filters):
1308            return
1309        if "quarantine" in instance.reason.lower():
1310            return
1311        instance.status = TwisterStatus.ERROR
1312        instance.reason += " but is one of the integration platforms"
1313        logger.debug(
1314            f"Changing status of {instance.name} to ERROR because it is an integration platform"
1315        )
1316