1#!/usr/bin/env python3 2# vim: set syntax=python ts=4 : 3# 4# Copyright (c) 2018-2024 Intel Corporation 5# Copyright (c) 2024 Arm Limited (or its affiliates). All rights reserved. 6# 7# SPDX-License-Identifier: Apache-2.0 8import collections 9import copy 10import itertools 11import json 12import logging 13import os 14import random 15import re 16import subprocess 17import sys 18from argparse import Namespace 19from collections import OrderedDict 20from itertools import islice 21from pathlib import Path 22 23import snippets 24 25try: 26 from anytree import Node, RenderTree, find 27except ImportError: 28 print("Install the anytree module to use the --test-tree option") 29 30import scl 31from twisterlib.config_parser import TwisterConfigParser 32from twisterlib.error import TwisterRuntimeError 33from twisterlib.platform import Platform, generate_platforms 34from twisterlib.quarantine import Quarantine 35from twisterlib.statuses import TwisterStatus 36from twisterlib.testinstance import TestInstance 37from twisterlib.testsuite import TestSuite, scan_testsuite_path 38from zephyr_module import parse_modules 39 40logger = logging.getLogger('twister') 41 42ZEPHYR_BASE = os.getenv("ZEPHYR_BASE") 43if not ZEPHYR_BASE: 44 sys.exit("$ZEPHYR_BASE environment variable undefined") 45 46# This is needed to load edt.pickle files. 47sys.path.insert(0, os.path.join(ZEPHYR_BASE, "scripts", "dts", 48 "python-devicetree", "src")) 49from devicetree import edtlib # pylint: disable=unused-import 50 51sys.path.insert(0, os.path.join(ZEPHYR_BASE, "scripts/")) 52 53 54class Filters: 55 # platform keys 56 PLATFORM_KEY = 'platform key filter' 57 # filters provided on command line by the user/tester 58 CMD_LINE = 'command line filter' 59 # filters in the testsuite yaml definition 60 TESTSUITE = 'testsuite filter' 61 # filters in the testplan yaml definition 62 TESTPLAN = 'testplan filter' 63 # filters related to platform definition 64 PLATFORM = 'Platform related filter' 65 # in case a test suite is skipped intentionally . 66 SKIP = 'Skip filter' 67 # in case of incompatibility between selected and allowed toolchains. 68 TOOLCHAIN = 'Toolchain filter' 69 # in case where an optional module is not available 70 MODULE = 'Module filter' 71 # in case of missing env. variable required for a platform 72 ENVIRONMENT = 'Environment filter' 73 74 75class TestLevel: 76 name = None 77 levels = [] 78 scenarios = [] 79 80 81class TestConfiguration: 82 __test__ = False 83 tc_schema_path = os.path.join( 84 ZEPHYR_BASE, 85 "scripts", 86 "schemas", 87 "twister", 88 "test-config-schema.yaml" 89 ) 90 91 def __init__(self, config_file): 92 self.test_config = None 93 self.override_default_platforms = False 94 self.increased_platform_scope = True 95 self.default_platforms = [] 96 self.parse(config_file) 97 98 def parse(self, config_file): 99 if os.path.exists(config_file): 100 tc_schema = scl.yaml_load(self.tc_schema_path) 101 self.test_config = scl.yaml_load_verify(config_file, tc_schema) 102 else: 103 raise TwisterRuntimeError(f"File {config_file} not found.") 104 105 platform_config = self.test_config.get('platforms', {}) 106 107 self.override_default_platforms = platform_config.get('override_default_platforms', False) 108 self.increased_platform_scope = platform_config.get('increased_platform_scope', True) 109 self.default_platforms = platform_config.get('default_platforms', []) 110 111 self.options = self.test_config.get('options', {}) 112 113 114 @staticmethod 115 def get_level(levels, name): 116 level = next((lvl for lvl in levels if lvl.name == name), None) 117 return level 118 119 def get_levels(self, scenarios): 120 levels = [] 121 configured_levels = self.test_config.get('levels', []) 122 123 # Do first pass on levels to get initial data. 124 for level in configured_levels: 125 adds = [] 126 for s in level.get('adds', []): 127 r = re.compile(s) 128 adds.extend(list(filter(r.fullmatch, scenarios))) 129 130 test_level = TestLevel() 131 test_level.name = level['name'] 132 test_level.scenarios = adds 133 test_level.levels = level.get('inherits', []) 134 levels.append(test_level) 135 136 # Go over levels again to resolve inheritance. 137 for level in configured_levels: 138 inherit = level.get('inherits', []) 139 _level = self.get_level(levels, level['name']) 140 if inherit: 141 for inherted_level in inherit: 142 _inherited = self.get_level(levels, inherted_level) 143 assert _inherited, "Unknown inherited level {inherted_level}" 144 _inherited_scenarios = _inherited.scenarios 145 level_scenarios = _level.scenarios if _level else [] 146 level_scenarios.extend(_inherited_scenarios) 147 148 return levels 149 150class TestPlan: 151 __test__ = False # for pytest to skip this class when collects tests 152 config_re = re.compile('(CONFIG_[A-Za-z0-9_]+)[=]\"?([^\"]*)\"?$') 153 dt_re = re.compile('([A-Za-z0-9_]+)[=]\"?([^\"]*)\"?$') 154 155 suite_schema = scl.yaml_load( 156 os.path.join(ZEPHYR_BASE, 157 "scripts", "schemas", "twister", "testsuite-schema.yaml")) 158 quarantine_schema = scl.yaml_load( 159 os.path.join(ZEPHYR_BASE, 160 "scripts", "schemas", "twister", "quarantine-schema.yaml")) 161 162 SAMPLE_FILENAME = 'sample.yaml' 163 TESTSUITE_FILENAME = 'testcase.yaml' 164 165 def __init__(self, env: Namespace): 166 167 self.options = env.options 168 self.env = env 169 170 # Keep track of which test cases we've filtered out and why 171 self.testsuites = {} 172 self.quarantine = None 173 self.platforms = [] 174 self.platform_names = [] 175 self.selected_platforms = [] 176 self.default_platforms = [] 177 self.load_errors = 0 178 self.instances = dict() 179 self.instance_fail_count = 0 180 self.warnings = 0 181 182 self.scenarios = [] 183 184 self.hwm = env.hwm 185 # used during creating shorter build paths 186 self.link_dir_counter = 0 187 self.modules = [] 188 189 self.run_individual_testsuite = [] 190 self.levels = [] 191 self.test_config = None 192 193 self.name = "unnamed" 194 195 def find_subtests(self): 196 sub_tests = self.options.sub_test 197 if sub_tests: 198 for subtest in sub_tests: 199 _subtests = self.get_testcase(subtest) 200 for _subtest in _subtests: 201 self.run_individual_testsuite.append(_subtest.name) 202 203 if self.run_individual_testsuite: 204 logger.info("Running the following tests:") 205 for test in self.run_individual_testsuite: 206 print(f" - {test}") 207 else: 208 raise TwisterRuntimeError("Tests not found") 209 210 def discover(self): 211 self.handle_modules() 212 self.test_config = TestConfiguration(self.env.test_config) 213 214 self.add_configurations() 215 num = self.add_testsuites(testsuite_filter=self.options.test, 216 testsuite_pattern=self.options.test_pattern) 217 218 if num == 0: 219 raise TwisterRuntimeError("No testsuites found at the specified location...") 220 if self.load_errors: 221 raise TwisterRuntimeError( 222 f"Found {self.load_errors} errors loading {num} test configurations." 223 ) 224 225 self.find_subtests() 226 # get list of scenarios we have parsed into one list 227 for _, ts in self.testsuites.items(): 228 self.scenarios.append(ts.id) 229 230 self.report_duplicates() 231 self.levels = self.test_config.get_levels(self.scenarios) 232 233 # handle quarantine 234 ql = self.options.quarantine_list 235 qv = self.options.quarantine_verify 236 if qv and not ql: 237 logger.error("No quarantine list given to be verified") 238 raise TwisterRuntimeError("No quarantine list given to be verified") 239 if ql: 240 for quarantine_file in ql: 241 try: 242 # validate quarantine yaml file against the provided schema 243 scl.yaml_load_verify(quarantine_file, self.quarantine_schema) 244 except scl.EmptyYamlFileException: 245 logger.debug(f'Quarantine file {quarantine_file} is empty') 246 self.quarantine = Quarantine(ql) 247 248 def get_level(self, name): 249 level = next((lvl for lvl in self.levels if lvl.name == name), None) 250 return level 251 252 def load(self): 253 254 if self.options.report_suffix: 255 last_run = os.path.join( 256 self.options.outdir, 257 f"twister_{self.options.report_suffix}.json" 258 ) 259 else: 260 last_run = os.path.join(self.options.outdir, "twister.json") 261 262 if self.options.only_failed or self.options.report_summary is not None: 263 self.load_from_file(last_run) 264 self.selected_platforms = set(p.platform.name for p in self.instances.values()) 265 elif self.options.load_tests: 266 self.load_from_file(self.options.load_tests) 267 self.selected_platforms = set(p.platform.name for p in self.instances.values()) 268 elif self.options.test_only: 269 # Get list of connected hardware and filter tests to only be run on connected hardware. 270 # If the platform does not exist in the hardware map or was not specified by --platform, 271 # just skip it. 272 273 connected_list = [] 274 excluded_list = [] 275 for _cp in self.options.platform: 276 if _cp in self.platform_names: 277 connected_list.append(self.get_platform(_cp).name) 278 279 if self.options.exclude_platform: 280 for _p in self.options.exclude_platform: 281 if _p in self.platform_names: 282 excluded_list.append(self.get_platform(_p).name) 283 for excluded in excluded_list: 284 if excluded in connected_list: 285 connected_list.remove(excluded) 286 287 self.load_from_file(last_run, filter_platform=connected_list) 288 self.selected_platforms = set(p.platform.name for p in self.instances.values()) 289 else: 290 self.apply_filters() 291 292 if self.options.subset: 293 s = self.options.subset 294 try: 295 subset, sets = (int(x) for x in s.split("/")) 296 except ValueError as err: 297 raise TwisterRuntimeError("Bad subset value.") from err 298 299 if subset > sets: 300 raise TwisterRuntimeError("subset should not exceed the total number of sets") 301 302 if int(subset) > 0 and int(sets) >= int(subset): 303 logger.info(f"Running only a subset: {subset}/{sets}") 304 else: 305 raise TwisterRuntimeError( 306 f"You have provided a wrong subset value: {self.options.subset}." 307 ) 308 309 self.generate_subset(subset, int(sets)) 310 311 def generate_subset(self, subset, sets): 312 # Test instances are sorted depending on the context. For CI runs 313 # the execution order is: "plat1-testA, plat1-testB, ..., 314 # plat1-testZ, plat2-testA, ...". For hardware tests 315 # (device_testing), were multiple physical platforms can run the tests 316 # in parallel, it is more efficient to run in the order: 317 # "plat1-testA, plat2-testA, ..., plat1-testB, plat2-testB, ..." 318 if self.options.device_testing: 319 self.instances = OrderedDict(sorted(self.instances.items(), 320 key=lambda x: x[0][x[0].find("/") + 1:])) 321 else: 322 self.instances = OrderedDict(sorted(self.instances.items())) 323 324 if self.options.shuffle_tests: 325 seed_value = int.from_bytes(os.urandom(8), byteorder="big") 326 if self.options.shuffle_tests_seed is not None: 327 seed_value = self.options.shuffle_tests_seed 328 329 logger.info(f"Shuffle tests with seed: {seed_value}") 330 random.seed(seed_value) 331 temp_list = list(self.instances.items()) 332 random.shuffle(temp_list) 333 self.instances = OrderedDict(temp_list) 334 335 # Do calculation based on what is actually going to be run and evaluated 336 # at runtime, ignore the cases we already know going to be skipped. 337 # This fixes an issue where some sets would get majority of skips and 338 # basically run nothing beside filtering. 339 to_run = {k : v for k,v in self.instances.items() if v.status == TwisterStatus.NONE} 340 total = len(to_run) 341 per_set = int(total / sets) 342 num_extra_sets = total - (per_set * sets) 343 344 # Try and be more fair for rounding error with integer division 345 # so the last subset doesn't get overloaded, we add 1 extra to 346 # subsets 1..num_extra_sets. 347 if subset <= num_extra_sets: 348 start = (subset - 1) * (per_set + 1) 349 end = start + per_set + 1 350 else: 351 base = num_extra_sets * (per_set + 1) 352 start = ((subset - num_extra_sets - 1) * per_set) + base 353 end = start + per_set 354 355 sliced_instances = islice(to_run.items(), start, end) 356 skipped = {k : v for k,v in self.instances.items() if v.status == TwisterStatus.SKIP} 357 errors = {k : v for k,v in self.instances.items() if v.status == TwisterStatus.ERROR} 358 self.instances = OrderedDict(sliced_instances) 359 if subset == 1: 360 # add all pre-filtered tests that are skipped or got error status 361 # to the first set to allow for better distribution among all sets. 362 self.instances.update(skipped) 363 self.instances.update(errors) 364 365 366 def handle_modules(self): 367 # get all enabled west projects 368 modules_meta = parse_modules(ZEPHYR_BASE) 369 self.modules = [module.meta.get('name') for module in modules_meta] 370 371 372 def report(self): 373 if self.options.test_tree: 374 if not self.options.detailed_test_id: 375 logger.info("Test tree is always shown with detailed test-id.") 376 self.report_test_tree() 377 return 0 378 elif self.options.list_tests: 379 if not self.options.detailed_test_id: 380 logger.info("Test list is always shown with detailed test-id.") 381 self.report_test_list() 382 return 0 383 elif self.options.list_tags: 384 self.report_tag_list() 385 return 0 386 387 return 1 388 389 def report_duplicates(self): 390 dupes = [item for item, count in collections.Counter(self.scenarios).items() if count > 1] 391 if dupes: 392 msg = "Duplicated test scenarios found:\n" 393 for dupe in dupes: 394 msg += (f"- {dupe} found in:\n") 395 for dc in self.get_testsuite(dupe): 396 msg += (f" - {dc.yamlfile}\n") 397 raise TwisterRuntimeError(msg) 398 else: 399 logger.debug("No duplicates found.") 400 401 def report_tag_list(self): 402 tags = set() 403 for _, tc in self.testsuites.items(): 404 tags = tags.union(tc.tags) 405 406 for t in tags: 407 print(f"- {t}") 408 409 def report_test_tree(self): 410 tests_list = self.get_tests_list() 411 412 testsuite = Node("Testsuite") 413 samples = Node("Samples", parent=testsuite) 414 tests = Node("Tests", parent=testsuite) 415 416 for test in sorted(tests_list): 417 if test.startswith("sample."): 418 sec = test.split(".") 419 area = find( 420 samples, 421 lambda node, sname=sec[1]: node.name == sname and node.parent == samples 422 ) 423 if not area: 424 area = Node(sec[1], parent=samples) 425 426 Node(test, parent=area) 427 else: 428 sec = test.split(".") 429 area = find( 430 tests, 431 lambda node, sname=sec[0]: node.name == sname and node.parent == tests 432 ) 433 if not area: 434 area = Node(sec[0], parent=tests) 435 436 if area and len(sec) > 2: 437 subarea = find( 438 area, lambda node, sname=sec[1], sparent=area: node.name == sname 439 and node.parent == sparent 440 ) 441 if not subarea: 442 subarea = Node(sec[1], parent=area) 443 Node(test, parent=subarea) 444 445 for pre, _, node in RenderTree(testsuite): 446 print(f"{pre}{node.name}") 447 448 def report_test_list(self): 449 tests_list = self.get_tests_list() 450 451 cnt = 0 452 for test in sorted(tests_list): 453 cnt = cnt + 1 454 print(f" - {test}") 455 print(f"{cnt} total.") 456 457 458 # Debug Functions 459 @staticmethod 460 def info(what): 461 sys.stdout.write(what + "\n") 462 sys.stdout.flush() 463 464 def add_configurations(self): 465 # Create a list of board roots as defined by the build system in general 466 # Note, internally in twister a board root includes the `boards` folder 467 # but in Zephyr build system, the board root is without the `boards` in folder path. 468 board_roots = [Path(os.path.dirname(root)) for root in self.env.board_roots] 469 soc_roots = self.env.soc_roots 470 arch_roots = self.env.arch_roots 471 472 for platform in generate_platforms(board_roots, soc_roots, arch_roots): 473 if not platform.twister: 474 continue 475 self.platforms.append(platform) 476 477 if not self.test_config.override_default_platforms: 478 if platform.default: 479 self.default_platforms.append(platform.name) 480 continue 481 for pp in self.test_config.default_platforms: 482 if pp in platform.aliases: 483 logger.debug(f"adding {platform.name} to default platforms (override mode)") 484 self.default_platforms.append(platform.name) 485 486 self.platform_names = [a for p in self.platforms for a in p.aliases] 487 488 def get_all_tests(self): 489 testcases = [] 490 for _, ts in self.testsuites.items(): 491 for case in ts.testcases: 492 testcases.append(case.name) 493 494 return testcases 495 496 def get_tests_list(self): 497 testcases = [] 498 if tag_filter := self.options.tag: 499 for _, ts in self.testsuites.items(): 500 if ts.tags.intersection(tag_filter): 501 for case in ts.testcases: 502 testcases.append(case.detailed_name) 503 else: 504 for _, ts in self.testsuites.items(): 505 for case in ts.testcases: 506 testcases.append(case.detailed_name) 507 508 if exclude_tag := self.options.exclude_tag: 509 for _, ts in self.testsuites.items(): 510 if ts.tags.intersection(exclude_tag): 511 for case in ts.testcases: 512 if case.detailed_name in testcases: 513 testcases.remove(case.detailed_name) 514 return testcases 515 516 def _is_testsuite_selected(self, suite: TestSuite, testsuite_filter, testsuite_patterns_r): 517 """Check if the testsuite is selected by the user.""" 518 if not testsuite_filter and not testsuite_patterns_r: 519 # no matching requested, include all testsuites 520 return True 521 if testsuite_filter: 522 scenario = os.path.basename(suite.name) 523 if ( 524 suite.name 525 and (suite.name in testsuite_filter or scenario in testsuite_filter) 526 ): 527 return True 528 if testsuite_patterns_r: 529 for r in testsuite_patterns_r: 530 if r.search(suite.id): 531 return True 532 return False 533 534 def add_testsuites(self, testsuite_filter=None, testsuite_pattern=None): 535 if testsuite_filter is None: 536 testsuite_filter = [] 537 538 testsuite_patterns_r = [] 539 if testsuite_pattern is None: 540 testsuite_pattern = [] 541 else: 542 for pattern in testsuite_pattern: 543 testsuite_patterns_r.append(re.compile(pattern)) 544 545 for root in self.env.test_roots: 546 root = os.path.abspath(root) 547 548 logger.debug(f"Reading testsuite configuration files under {root}...") 549 550 for dirpath, _, filenames in os.walk(root, topdown=True): 551 if self.SAMPLE_FILENAME in filenames: 552 filename = self.SAMPLE_FILENAME 553 elif self.TESTSUITE_FILENAME in filenames: 554 filename = self.TESTSUITE_FILENAME 555 else: 556 continue 557 558 logger.debug("Found possible testsuite in " + dirpath) 559 560 suite_yaml_path = os.path.join(dirpath, filename) 561 suite_path = os.path.dirname(suite_yaml_path) 562 563 for alt_config_root in self.env.alt_config_root: 564 alt_config = os.path.join(os.path.abspath(alt_config_root), 565 os.path.relpath(suite_path, root), 566 filename) 567 if os.path.exists(alt_config): 568 logger.info( 569 f"Using alternative configuration from {os.path.normpath(alt_config)}" 570 ) 571 suite_yaml_path = alt_config 572 break 573 574 try: 575 parsed_data = TwisterConfigParser(suite_yaml_path, self.suite_schema) 576 parsed_data.load() 577 subcases = None 578 ztest_suite_names = None 579 580 for name in parsed_data.scenarios: 581 suite_dict = parsed_data.get_scenario(name) 582 suite = TestSuite( 583 root, 584 suite_path, 585 name, 586 data=suite_dict, 587 detailed_test_id=self.options.detailed_test_id 588 ) 589 590 # convert to fully qualified names 591 suite.integration_platforms = self.verify_platforms_existence( 592 suite.integration_platforms, 593 f"integration_platforms in {suite.name}") 594 suite.platform_exclude = self.verify_platforms_existence( 595 suite.platform_exclude, 596 f"platform_exclude in {suite.name}") 597 suite.platform_allow = self.verify_platforms_existence( 598 suite.platform_allow, 599 f"platform_allow in {suite.name}") 600 601 if suite.harness in ['ztest', 'test']: 602 if subcases is None: 603 # scan it only once per testsuite 604 subcases, ztest_suite_names = scan_testsuite_path(suite_path) 605 suite.add_subcases(suite_dict, subcases, ztest_suite_names) 606 else: 607 suite.add_subcases(suite_dict) 608 609 if not self._is_testsuite_selected(suite, testsuite_filter, 610 testsuite_patterns_r): 611 # skip testsuite if they were not selected directly by the user 612 continue 613 if suite.name in self.testsuites: 614 msg = ( 615 f"test suite '{suite.name}' in '{suite.yamlfile}' is already added" 616 ) 617 if suite.yamlfile == self.testsuites[suite.name].yamlfile: 618 logger.debug(f"Skip - {msg}") 619 else: 620 msg = ( 621 f"Duplicate {msg} from '{self.testsuites[suite.name].yamlfile}'" 622 ) 623 raise TwisterRuntimeError(msg) 624 else: 625 self.testsuites[suite.name] = suite 626 627 except Exception as e: 628 logger.error(f"{suite_path}: can't load (skipping): {e!r}") 629 self.load_errors += 1 630 return len(self.testsuites) 631 632 def __str__(self): 633 return self.name 634 635 def get_platform(self, name): 636 selected_platform = None 637 for platform in self.platforms: 638 if name in platform.aliases: 639 selected_platform = platform 640 break 641 return selected_platform 642 643 def handle_quarantined_tests(self, instance: TestInstance, plat: Platform): 644 if self.quarantine: 645 sim_name = plat.simulation 646 if sim_name != "na" and (simulator := plat.simulator_by_name(self.options.sim_name)): 647 sim_name = simulator.name 648 matched_quarantine = self.quarantine.get_matched_quarantine( 649 instance.testsuite.id, 650 plat.name, 651 plat.arch, 652 sim_name 653 ) 654 if matched_quarantine and not self.options.quarantine_verify: 655 instance.status = TwisterStatus.SKIP 656 instance.reason = "Quarantine: " + matched_quarantine 657 return 658 if not matched_quarantine and self.options.quarantine_verify: 659 instance.add_filter("Not under quarantine", Filters.CMD_LINE) 660 661 def load_from_file(self, file, filter_platform=None): 662 if filter_platform is None: 663 filter_platform = [] 664 try: 665 with open(file) as json_test_plan: 666 jtp = json.load(json_test_plan) 667 instance_list = [] 668 for ts in jtp.get("testsuites", []): 669 logger.debug(f"loading {ts['name']}...") 670 testsuite = ts["name"] 671 toolchain = ts["toolchain"] 672 673 platform = self.get_platform(ts["platform"]) 674 if filter_platform and platform.name not in filter_platform: 675 continue 676 instance = TestInstance( 677 self.testsuites[testsuite], platform, toolchain, self.env.outdir 678 ) 679 if ts.get("run_id"): 680 instance.run_id = ts.get("run_id") 681 682 instance.run = instance.check_runnable( 683 self.options, 684 self.hwm 685 ) 686 687 if self.options.test_only and not instance.run: 688 continue 689 690 instance.metrics['handler_time'] = ts.get('execution_time', 0) 691 instance.metrics['used_ram'] = ts.get("used_ram", 0) 692 instance.metrics['used_rom'] = ts.get("used_rom",0) 693 instance.metrics['available_ram'] = ts.get('available_ram', 0) 694 instance.metrics['available_rom'] = ts.get('available_rom', 0) 695 696 status = TwisterStatus(ts.get('status')) 697 reason = ts.get("reason", "Unknown") 698 if status in [TwisterStatus.ERROR, TwisterStatus.FAIL]: 699 if self.options.report_summary is not None: 700 instance.status = status 701 instance.reason = reason 702 self.instance_fail_count += 1 703 else: 704 instance.status = TwisterStatus.NONE 705 instance.reason = None 706 instance.retries += 1 707 # test marked as built only can run when --test-only is used. 708 # Reset status to capture new results. 709 elif status == TwisterStatus.NOTRUN and instance.run and self.options.test_only: 710 instance.status = TwisterStatus.NONE 711 instance.reason = None 712 else: 713 instance.status = status 714 instance.reason = reason 715 716 self.handle_quarantined_tests(instance, platform) 717 718 for tc in ts.get('testcases', []): 719 identifier = tc['identifier'] 720 tc_status = TwisterStatus(tc.get('status')) 721 tc_reason = None 722 # we set reason only if status is valid, it might have been 723 # reset above... 724 if instance.status != TwisterStatus.NONE: 725 tc_reason = tc.get('reason') 726 if tc_status != TwisterStatus.NONE: 727 case = instance.set_case_status_by_name( 728 identifier, 729 tc_status, 730 tc_reason 731 ) 732 case.duration = tc.get('execution_time', 0) 733 if tc.get('log'): 734 case.output = tc.get('log') 735 736 instance.create_overlay(platform, 737 self.options.enable_asan, 738 self.options.enable_ubsan, 739 self.options.enable_coverage, 740 self.options.coverage_platform 741 ) 742 instance_list.append(instance) 743 self.add_instances(instance_list) 744 except FileNotFoundError as e: 745 logger.error(f"{e}") 746 return 1 747 748 def check_platform(self, platform, platform_list): 749 return any(p in platform.aliases for p in platform_list) 750 751 def apply_filters(self, **kwargs): 752 753 platform_filter = self.options.platform 754 platform_pattern = self.options.platform_pattern 755 vendor_filter = self.options.vendor 756 exclude_platform = self.options.exclude_platform 757 testsuite_filter = self.run_individual_testsuite 758 arch_filter = self.options.arch 759 tag_filter = self.options.tag 760 exclude_tag = self.options.exclude_tag 761 all_filter = self.options.all 762 runnable = (self.options.device_testing or self.options.filter == 'runnable') 763 force_toolchain = self.options.force_toolchain 764 force_platform = self.options.force_platform 765 slow_only = self.options.enable_slow_only 766 ignore_platform_key = self.options.ignore_platform_key 767 emu_filter = self.options.emulation_only 768 769 logger.debug(" platform filter: " + str(platform_filter)) 770 logger.debug("platform_pattern: " + str(platform_pattern)) 771 logger.debug(" vendor filter: " + str(vendor_filter)) 772 logger.debug(" arch_filter: " + str(arch_filter)) 773 logger.debug(" tag_filter: " + str(tag_filter)) 774 logger.debug(" exclude_tag: " + str(exclude_tag)) 775 776 default_platforms = False 777 vendor_platforms = False 778 emulation_platforms = False 779 780 if all_filter: 781 logger.info("Selecting all possible platforms per testsuite scenario") 782 # When --all used, any --platform arguments ignored 783 platform_filter = [] 784 elif not platform_filter and not emu_filter and not vendor_filter and not platform_pattern: 785 logger.info("Selecting default platforms per testsuite scenario") 786 default_platforms = True 787 elif emu_filter: 788 logger.info("Selecting emulation platforms per testsuite scenario") 789 emulation_platforms = True 790 elif vendor_filter: 791 vendor_platforms = True 792 793 _platforms = [] 794 if platform_filter: 795 logger.debug(f"Checking platform filter: {platform_filter}") 796 # find in aliases and rename 797 platform_filter = self.verify_platforms_existence(platform_filter, "platform_filter") 798 platforms = list(filter(lambda p: p.name in platform_filter, self.platforms)) 799 elif platform_pattern: 800 platforms = list( 801 filter(lambda p: any(re.match(pat, alias) for pat in platform_pattern \ 802 for alias in p.aliases), self.platforms) 803 ) 804 elif emu_filter: 805 platforms = list( 806 filter(lambda p: bool(p.simulator_by_name(self.options.sim_name)), self.platforms) 807 ) 808 elif vendor_filter: 809 platforms = list(filter(lambda p: p.vendor in vendor_filter, self.platforms)) 810 logger.info(f"Selecting platforms by vendors: {','.join(vendor_filter)}") 811 elif arch_filter: 812 platforms = list(filter(lambda p: p.arch in arch_filter, self.platforms)) 813 elif default_platforms: 814 _platforms = list(filter(lambda p: p.name in self.default_platforms, self.platforms)) 815 platforms = [] 816 # default platforms that can't be run are dropped from the list of 817 # the default platforms list. Default platforms should always be 818 # runnable. 819 for p in _platforms: 820 sim = p.simulator_by_name(self.options.sim_name) 821 if (not sim) or sim.is_runnable(): 822 platforms.append(p) 823 else: 824 platforms = self.platforms 825 826 # test configuration options 827 test_config_options = self.test_config.options 828 integration_mode_list = test_config_options.get('integration_mode', []) 829 830 logger.info("Building initial testsuite list...") 831 832 keyed_tests = {} 833 for _, ts in self.testsuites.items(): 834 if ts.integration_platforms: 835 _integration_platforms = list( 836 filter(lambda item: item.name in ts.integration_platforms, self.platforms) 837 ) 838 else: 839 _integration_platforms = [] 840 841 if (ts.build_on_all and not platform_filter and not platform_pattern and 842 self.test_config.increased_platform_scope): 843 # if build_on_all is set, we build on all platforms 844 platform_scope = self.platforms 845 elif ts.integration_platforms and self.options.integration: 846 # if integration is set, we build on integration platforms 847 platform_scope = _integration_platforms 848 elif ts.integration_platforms and not platform_filter and not platform_pattern: 849 # if integration platforms are set, we build on those and integration mode is set 850 # for this test suite, we build on integration platforms 851 if any(ts.id.startswith(i) for i in integration_mode_list): 852 platform_scope = _integration_platforms 853 else: 854 platform_scope = platforms + _integration_platforms 855 else: 856 platform_scope = platforms 857 858 integration = self.options.integration and ts.integration_platforms 859 860 # If there isn't any overlap between the platform_allow list and the platform_scope 861 # we set the scope to the platform_allow list 862 if ( 863 ts.platform_allow 864 and not platform_filter 865 and not integration 866 and self.test_config.increased_platform_scope 867 ): 868 a = set(platform_scope) 869 b = set(filter(lambda item: item.name in ts.platform_allow, self.platforms)) 870 c = a.intersection(b) 871 if not c: 872 platform_scope = list( 873 filter(lambda item: item.name in ts.platform_allow, self.platforms) 874 ) 875 # list of instances per testsuite, aka configurations. 876 instance_list = [] 877 for itoolchain, plat in itertools.product( 878 ts.integration_toolchains or [None], platform_scope 879 ): 880 if itoolchain: 881 toolchain = itoolchain 882 elif plat.arch in ['posix', 'unit']: 883 # workaround until toolchain variant in zephyr is overhauled and improved. 884 if self.env.toolchain in ['llvm']: 885 toolchain = 'llvm' 886 else: 887 toolchain = 'host' 888 else: 889 toolchain = "zephyr" if not self.env.toolchain else self.env.toolchain 890 891 instance = TestInstance(ts, plat, toolchain, self.env.outdir) 892 instance.run = instance.check_runnable( 893 self.options, 894 self.hwm 895 ) 896 897 if not force_platform and self.check_platform(plat,exclude_platform): 898 instance.add_filter("Platform is excluded on command line.", Filters.CMD_LINE) 899 900 if (plat.arch == "unit") != (ts.type == "unit"): 901 # Discard silently 902 continue 903 904 if ts.modules and self.modules and not set(ts.modules).issubset(set(self.modules)): 905 instance.add_filter( 906 f"one or more required modules not available: {','.join(ts.modules)}", 907 Filters.MODULE 908 ) 909 910 if self.options.level: 911 tl = self.get_level(self.options.level) 912 if tl is None: 913 instance.add_filter( 914 f"Unknown test level '{self.options.level}'", 915 Filters.TESTPLAN 916 ) 917 else: 918 planned_scenarios = tl.scenarios 919 if ( 920 ts.id not in planned_scenarios 921 and not set(ts.levels).intersection(set(tl.levels)) 922 ): 923 instance.add_filter("Not part of requested test plan", Filters.TESTPLAN) 924 925 if runnable and not instance.run: 926 instance.add_filter("Not runnable on device", Filters.CMD_LINE) 927 928 if ( 929 self.options.integration 930 and ts.integration_platforms 931 and plat.name not in ts.integration_platforms 932 ): 933 instance.add_filter("Not part of integration platforms", Filters.TESTSUITE) 934 935 if ts.skip: 936 instance.add_filter("Skip filter", Filters.SKIP) 937 938 if tag_filter and not ts.tags.intersection(tag_filter): 939 instance.add_filter("Command line testsuite tag filter", Filters.CMD_LINE) 940 941 if slow_only and not ts.slow: 942 instance.add_filter("Not a slow test", Filters.CMD_LINE) 943 944 if exclude_tag and ts.tags.intersection(exclude_tag): 945 instance.add_filter("Command line testsuite exclude filter", Filters.CMD_LINE) 946 947 if testsuite_filter: 948 normalized_f = [os.path.basename(_ts) for _ts in testsuite_filter] 949 if ts.id not in normalized_f: 950 instance.add_filter("Testsuite name filter", Filters.CMD_LINE) 951 952 if arch_filter and plat.arch not in arch_filter: 953 instance.add_filter("Command line testsuite arch filter", Filters.CMD_LINE) 954 955 if not force_platform: 956 957 if ts.arch_allow and plat.arch not in ts.arch_allow: 958 instance.add_filter("Not in testsuite arch allow list", Filters.TESTSUITE) 959 960 if ts.arch_exclude and plat.arch in ts.arch_exclude: 961 instance.add_filter("In testsuite arch exclude", Filters.TESTSUITE) 962 963 if ts.vendor_allow and plat.vendor not in ts.vendor_allow: 964 instance.add_filter( 965 "Not in testsuite vendor allow list", 966 Filters.TESTSUITE 967 ) 968 969 if ts.vendor_exclude and plat.vendor in ts.vendor_exclude: 970 instance.add_filter("In testsuite vendor exclude", Filters.TESTSUITE) 971 972 if ts.platform_exclude and plat.name in ts.platform_exclude: 973 instance.add_filter("In testsuite platform exclude", Filters.TESTSUITE) 974 975 if ts.toolchain_exclude and toolchain in ts.toolchain_exclude: 976 instance.add_filter("In testsuite toolchain exclude", Filters.TOOLCHAIN) 977 978 if platform_filter and plat.name not in platform_filter: 979 instance.add_filter("Command line platform filter", Filters.CMD_LINE) 980 981 if ts.platform_allow \ 982 and plat.name not in ts.platform_allow \ 983 and not (platform_filter and force_platform): 984 instance.add_filter("Not in testsuite platform allow list", Filters.TESTSUITE) 985 986 if ts.platform_type and plat.type not in ts.platform_type: 987 instance.add_filter("Not in testsuite platform type list", Filters.TESTSUITE) 988 989 if ts.toolchain_allow and toolchain not in ts.toolchain_allow: 990 instance.add_filter("Not in testsuite toolchain allow list", Filters.TOOLCHAIN) 991 992 if not plat.env_satisfied: 993 instance.add_filter( 994 "Environment ({}) not satisfied".format(", ".join(plat.env)), 995 Filters.ENVIRONMENT 996 ) 997 if plat.type == 'native' and sys.platform != 'linux': 998 instance.add_filter("Native platform requires Linux", Filters.ENVIRONMENT) 999 1000 if not force_toolchain \ 1001 and toolchain and (toolchain not in plat.supported_toolchains): 1002 instance.add_filter( 1003 f"Not supported by the toolchain: {toolchain}", 1004 Filters.PLATFORM 1005 ) 1006 1007 if plat.ram < ts.min_ram: 1008 instance.add_filter("Not enough RAM", Filters.PLATFORM) 1009 1010 if ts.harness: 1011 sim = plat.simulator_by_name(self.options.sim_name) 1012 if ts.harness == 'robot' and not (sim and sim.name == 'renode'): 1013 instance.add_filter( 1014 "No robot support for the selected platform", 1015 Filters.SKIP 1016 ) 1017 1018 if ts.depends_on: 1019 dep_intersection = ts.depends_on.intersection(set(plat.supported)) 1020 if dep_intersection != set(ts.depends_on): 1021 instance.add_filter( 1022 f"No hardware support for {set(ts.depends_on)-dep_intersection}", 1023 Filters.PLATFORM 1024 ) 1025 1026 if plat.flash < ts.min_flash: 1027 instance.add_filter("Not enough FLASH", Filters.PLATFORM) 1028 1029 if set(plat.ignore_tags) & ts.tags: 1030 instance.add_filter( 1031 "Excluded tags per platform (exclude_tags)", 1032 Filters.PLATFORM 1033 ) 1034 1035 if plat.only_tags and not set(plat.only_tags) & ts.tags: 1036 instance.add_filter("Excluded tags per platform (only_tags)", Filters.PLATFORM) 1037 1038 if ts.required_snippets: 1039 missing_snippet = False 1040 snippet_args = {"snippets": ts.required_snippets} 1041 found_snippets = snippets.find_snippets_in_roots( 1042 snippet_args, 1043 [*self.env.snippet_roots, Path(ts.source_dir)] 1044 ) 1045 1046 # Search and check that all required snippet files are found 1047 for this_snippet in snippet_args['snippets']: 1048 if this_snippet not in found_snippets: 1049 logger.error( 1050 f"Can't find snippet '{this_snippet}' for test '{ts.name}'" 1051 ) 1052 instance.status = TwisterStatus.ERROR 1053 instance.reason = f"Snippet {this_snippet} not found" 1054 missing_snippet = True 1055 break 1056 1057 if not missing_snippet: 1058 # Look for required snippets and check that they are applicable for these 1059 # platforms/boards 1060 for this_snippet in snippet_args['snippets']: 1061 matched_snippet_board = False 1062 1063 # If the "appends" key is present with at least one entry then this 1064 # snippet applies to all boards and further platform-specific checks 1065 # are not required 1066 if found_snippets[this_snippet].appends: 1067 continue 1068 1069 for this_board in found_snippets[this_snippet].board2appends: 1070 if this_board.startswith('/'): 1071 match = re.search(this_board[1:-1], plat.name) 1072 if match is not None: 1073 matched_snippet_board = True 1074 break 1075 elif this_board == plat.name: 1076 matched_snippet_board = True 1077 break 1078 1079 if matched_snippet_board is False: 1080 instance.add_filter("Snippet not supported", Filters.PLATFORM) 1081 break 1082 1083 # handle quarantined tests 1084 self.handle_quarantined_tests(instance, plat) 1085 1086 # platform_key is a list of unique platform attributes that form a unique key 1087 # a test will match against to determine if it should be scheduled to run. 1088 # A key containing a field name that the platform does not have 1089 # will filter the platform. 1090 # 1091 # A simple example is keying on arch and simulation 1092 # to run a test once per unique (arch, simulation) platform. 1093 if ( 1094 not ignore_platform_key 1095 and hasattr(ts, 'platform_key') 1096 and len(ts.platform_key) > 0 1097 ): 1098 key_fields = sorted(set(ts.platform_key)) 1099 keys = [getattr(plat, key_field, None) for key_field in key_fields] 1100 for key in keys: 1101 if key is None or key == 'na': 1102 instance.add_filter( 1103 "Excluded platform missing key fields" 1104 f" demanded by test {key_fields}", 1105 Filters.PLATFORM 1106 ) 1107 break 1108 else: 1109 test_keys = copy.deepcopy(keys) 1110 test_keys.append(ts.name) 1111 test_keys = tuple(test_keys) 1112 keyed_test = keyed_tests.get(test_keys) 1113 if keyed_test is not None: 1114 plat_key = { 1115 key_field: getattr( 1116 keyed_test['plat'], 1117 key_field 1118 ) for key_field in key_fields 1119 } 1120 instance.add_filter( 1121 f"Already covered for key {key}" 1122 f" by platform {keyed_test['plat'].name} having key {plat_key}", 1123 Filters.PLATFORM_KEY 1124 ) 1125 else: 1126 # do not add a platform to keyed tests if previously 1127 # filtered 1128 1129 if not instance.filters: 1130 keyed_tests[test_keys] = {'plat': plat, 'ts': ts} 1131 1132 # if nothing stopped us until now, it means this configuration 1133 # needs to be added. 1134 instance_list.append(instance) 1135 1136 # no configurations, so jump to next testsuite 1137 if not instance_list: 1138 continue 1139 1140 # if twister was launched with no platform options at all, we 1141 # take all default platforms 1142 if default_platforms and not ts.build_on_all and not integration: 1143 if ts.platform_allow: 1144 _default_p = set(self.default_platforms) 1145 _platform_allow = set(ts.platform_allow) 1146 _intersection = _default_p.intersection(_platform_allow) 1147 if _intersection: 1148 aa = list( 1149 filter( 1150 lambda _scenario: _scenario.platform.name in _intersection, 1151 instance_list 1152 ) 1153 ) 1154 self.add_instances(aa) 1155 else: 1156 self.add_instances(instance_list) 1157 else: 1158 # add integration platforms to the list of default 1159 # platforms, even if we are not in integration mode 1160 _platforms = self.default_platforms + ts.integration_platforms 1161 instances = list( 1162 filter(lambda ts: ts.platform.name in _platforms, instance_list) 1163 ) 1164 self.add_instances(instances) 1165 elif integration: 1166 instances = list( 1167 filter( 1168 lambda item: item.platform.name in ts.integration_platforms, 1169 instance_list 1170 ) 1171 ) 1172 self.add_instances(instances) 1173 1174 elif emulation_platforms: 1175 self.add_instances(instance_list) 1176 for instance in list( 1177 filter( 1178 lambda inst: not inst.platform.simulator_by_name(self.options.sim_name), 1179 instance_list 1180 ) 1181 ): 1182 instance.add_filter("Not an emulated platform", Filters.CMD_LINE) 1183 elif vendor_platforms: 1184 self.add_instances(instance_list) 1185 for instance in list( 1186 filter( 1187 lambda inst: inst.platform.vendor not in vendor_filter, 1188 instance_list 1189 ) 1190 ): 1191 instance.add_filter("Not a selected vendor platform", Filters.CMD_LINE) 1192 else: 1193 self.add_instances(instance_list) 1194 1195 for _, case in self.instances.items(): 1196 # Do not create files for filtered instances 1197 if case.status == TwisterStatus.FILTER: 1198 continue 1199 # set run_id for each unfiltered instance 1200 case.setup_run_id() 1201 case.create_overlay(case.platform, 1202 self.options.enable_asan, 1203 self.options.enable_ubsan, 1204 self.options.enable_coverage, 1205 self.options.coverage_platform) 1206 1207 self.selected_platforms = set(p.platform.name for p in self.instances.values()) 1208 1209 filtered_and_skipped_instances = list( 1210 filter( 1211 lambda item: item.status in [TwisterStatus.FILTER, TwisterStatus.SKIP], 1212 self.instances.values() 1213 ) 1214 ) 1215 for inst in filtered_and_skipped_instances: 1216 change_skip_to_error_if_integration(self.options, inst) 1217 inst.add_missing_case_status(inst.status) 1218 1219 def add_instances(self, instance_list): 1220 for instance in instance_list: 1221 self.instances[instance.name] = instance 1222 1223 1224 def get_testsuite(self, identifier): 1225 results = [] 1226 for _, ts in self.testsuites.items(): 1227 if ts.id == identifier: 1228 results.append(ts) 1229 return results 1230 1231 def get_testcase(self, identifier): 1232 results = [] 1233 for _, ts in self.testsuites.items(): 1234 for case in ts.testcases: 1235 if case.name == identifier: 1236 results.append(ts) 1237 return results 1238 1239 def verify_platforms_existence(self, platform_names_to_verify, log_info=""): 1240 """ 1241 Verify if platform name (passed by --platform option, or in yaml file 1242 as platform_allow or integration_platforms options) is correct. If not - 1243 log and raise error. 1244 """ 1245 _platforms = [] 1246 for platform in platform_names_to_verify: 1247 if platform in self.platform_names: 1248 p = self.get_platform(platform) 1249 if p: 1250 _platforms.append(p.name) 1251 else: 1252 logger.error(f"{log_info} - unrecognized platform - {platform}") 1253 sys.exit(2) 1254 return _platforms 1255 1256 def create_build_dir_links(self): 1257 """ 1258 Iterate through all no-skipped instances in suite and create links 1259 for each one build directories. Those links will be passed in the next 1260 steps to the CMake command. 1261 """ 1262 1263 links_dir_name = "twister_links" # folder for all links 1264 links_dir_path = os.path.join(self.env.outdir, links_dir_name) 1265 if not os.path.exists(links_dir_path): 1266 os.mkdir(links_dir_path) 1267 1268 for instance in self.instances.values(): 1269 if instance.status != TwisterStatus.SKIP: 1270 self._create_build_dir_link(links_dir_path, instance) 1271 1272 def _create_build_dir_link(self, links_dir_path, instance): 1273 """ 1274 Create build directory with original "long" path. Next take shorter 1275 path and link them with original path - create link. At the end 1276 replace build_dir to created link. This link will be passed to CMake 1277 command. This action helps to limit path length which can be 1278 significant during building by CMake on Windows OS. 1279 """ 1280 1281 os.makedirs(instance.build_dir, exist_ok=True) 1282 1283 link_name = f"test_{self.link_dir_counter}" 1284 link_path = os.path.join(links_dir_path, link_name) 1285 1286 if os.name == "nt": # if OS is Windows 1287 command = ["mklink", "/J", f"{link_path}", os.path.normpath(instance.build_dir)] 1288 subprocess.call(command, shell=True) 1289 else: # for Linux and MAC OS 1290 os.symlink(instance.build_dir, link_path) 1291 1292 # Here original build directory is replaced with symbolic link. It will 1293 # be passed to CMake command 1294 instance.build_dir = link_path 1295 1296 self.link_dir_counter += 1 1297 1298 1299def change_skip_to_error_if_integration(options, instance): 1300 ''' All skips on integration_platforms are treated as errors.''' 1301 if instance.platform.name in instance.testsuite.integration_platforms: 1302 # Do not treat this as error if filter type is among ignore_filters 1303 filters = {t['type'] for t in instance.filters} 1304 ignore_filters ={Filters.CMD_LINE, Filters.SKIP, Filters.PLATFORM_KEY, 1305 Filters.TOOLCHAIN, Filters.MODULE, Filters.TESTPLAN, 1306 Filters.ENVIRONMENT} 1307 if filters.intersection(ignore_filters): 1308 return 1309 if "quarantine" in instance.reason.lower(): 1310 return 1311 instance.status = TwisterStatus.ERROR 1312 instance.reason += " but is one of the integration platforms" 1313 logger.debug( 1314 f"Changing status of {instance.name} to ERROR because it is an integration platform" 1315 ) 1316