1# vim: set syntax=python ts=4 : 2# 3# Copyright (c) 2018-2025 Intel Corporation 4# Copyright 2022 NXP 5# Copyright (c) 2024 Arm Limited (or its affiliates). All rights reserved. 6# 7# SPDX-License-Identifier: Apache-2.0 8from __future__ import annotations 9 10import csv 11import glob 12import hashlib 13import logging 14import os 15import random 16from enum import Enum 17 18from twisterlib.constants import ( 19 SUPPORTED_SIMS, 20 SUPPORTED_SIMS_IN_PYTEST, 21 SUPPORTED_SIMS_WITH_EXEC, 22) 23from twisterlib.environment import TwisterEnv 24from twisterlib.error import BuildError, StatusAttributeError 25from twisterlib.handlers import ( 26 BinaryHandler, 27 DeviceHandler, 28 Handler, 29 QEMUHandler, 30 QEMUWinHandler, 31 SimulationHandler, 32) 33from twisterlib.platform import Platform 34from twisterlib.size_calc import SizeCalculator 35from twisterlib.statuses import TwisterStatus 36from twisterlib.testsuite import TestCase, TestSuite 37 38logger = logging.getLogger('twister') 39 40 41class TestInstance: 42 """Class representing the execution of a particular TestSuite on a platform 43 44 @param test The TestSuite object we want to build/execute 45 @param platform Platform object that we want to build and run against 46 @param base_outdir Base directory for all test results. The actual 47 out directory used is <outdir>/<platform>/<test case name> 48 """ 49 50 __test__ = False 51 52 def __init__(self, testsuite, platform, toolchain, outdir): 53 54 self.testsuite: TestSuite = testsuite 55 self.platform: Platform = platform 56 57 self._status = TwisterStatus.NONE 58 self.reason = "Unknown" 59 self.metrics = dict() 60 self.handler = None 61 self.recording = None 62 self.coverage = None 63 self.coverage_status = None 64 self.outdir = outdir 65 self.execution_time = 0 66 self.build_time = 0 67 self.retries = 0 68 self.toolchain = toolchain 69 self.name = os.path.join(platform.name, toolchain, testsuite.name) 70 self.dut = None 71 self.suite_repeat = None 72 self.test_repeat = None 73 self.test_shuffle = None 74 75 if testsuite.detailed_test_id: 76 self.build_dir = os.path.join( 77 outdir, platform.normalized_name, self.toolchain, testsuite.name 78 ) 79 else: 80 # if suite is not in zephyr, 81 # keep only the part after ".." in reconstructed dir structure 82 source_dir_rel = testsuite.source_dir_rel.rsplit(os.pardir+os.path.sep, 1)[-1] 83 self.build_dir = os.path.join( 84 outdir, 85 platform.normalized_name, 86 self.toolchain, 87 source_dir_rel, 88 testsuite.name 89 ) 90 self.run_id = None 91 self.domains = None 92 # Instance need to use sysbuild if a given suite or a platform requires it 93 self.sysbuild = testsuite.sysbuild or platform.sysbuild 94 95 self.run = False 96 self.testcases: list[TestCase] = [] 97 self.init_cases() 98 self.filters = [] 99 self.filter_type = None 100 101 def setup_run_id(self): 102 self.run_id = self._get_run_id() 103 104 def record(self, recording, fname_csv="recording.csv"): 105 if recording: 106 if self.recording is None: 107 self.recording = recording.copy() 108 else: 109 self.recording.extend(recording) 110 111 filename = os.path.join(self.build_dir, fname_csv) 112 fieldnames = set() 113 for r in self.recording: 114 fieldnames.update(r) 115 with open(filename, 'w') as csvfile: 116 cw = csv.DictWriter(csvfile, 117 fieldnames = sorted(list(fieldnames)), 118 lineterminator = os.linesep, 119 quoting = csv.QUOTE_NONNUMERIC) 120 cw.writeheader() 121 cw.writerows(self.recording) 122 123 @property 124 def status(self) -> TwisterStatus: 125 return self._status 126 127 @status.setter 128 def status(self, value : TwisterStatus) -> None: 129 # Check for illegal assignments by value 130 try: 131 key = value.name if isinstance(value, Enum) else value 132 self._status = TwisterStatus[key] 133 except KeyError as err: 134 raise StatusAttributeError(self.__class__, value) from err 135 136 def add_filter(self, reason, filter_type): 137 self.filters.append({'type': filter_type, 'reason': reason }) 138 self.status = TwisterStatus.FILTER 139 self.reason = reason 140 self.filter_type = filter_type 141 142 # Fix an issue with copying objects from testsuite, need better solution. 143 def init_cases(self): 144 for c in self.testsuite.testcases: 145 self.add_testcase(c.name, freeform=c.freeform) 146 147 def _get_run_id(self): 148 """ generate run id from instance unique identifier and a random 149 number 150 If exist, get cached run id from previous run.""" 151 run_id = "" 152 run_id_file = os.path.join(self.build_dir, "run_id.txt") 153 if os.path.exists(run_id_file): 154 with open(run_id_file) as fp: 155 run_id = fp.read() 156 else: 157 hash_object = hashlib.md5(self.name.encode(), usedforsecurity=False) 158 random_str = f"{random.getrandbits(64)}".encode() 159 hash_object.update(random_str) 160 run_id = hash_object.hexdigest() 161 os.makedirs(self.build_dir, exist_ok=True) 162 with open(run_id_file, 'w+') as fp: 163 fp.write(run_id) 164 return run_id 165 166 def add_missing_case_status(self, status, reason=None): 167 for case in self.testcases: 168 if case.status == TwisterStatus.STARTED: 169 case.status = TwisterStatus.FAIL 170 elif case.status == TwisterStatus.NONE: 171 case.status = status 172 if reason: 173 case.reason = reason 174 else: 175 case.reason = self.reason 176 177 def __getstate__(self): 178 d = self.__dict__.copy() 179 return d 180 181 def __setstate__(self, d): 182 self.__dict__.update(d) 183 184 def __lt__(self, other): 185 return self.name < other.name 186 187 def compose_case_name(self, tc_name) -> str: 188 return self.testsuite.compose_case_name(tc_name) 189 190 def set_case_status_by_name(self, name, status, reason=None): 191 tc = self.get_case_or_create(name) 192 tc.status = status 193 if reason: 194 tc.reason = reason 195 return tc 196 197 def add_testcase(self, name, freeform=False): 198 tc = TestCase(name=name) 199 tc.freeform = freeform 200 self.testcases.append(tc) 201 return tc 202 203 def get_case_by_name(self, name): 204 for c in self.testcases: 205 if c.name == name: 206 return c 207 return None 208 209 def get_case_or_create(self, name): 210 for c in self.testcases: 211 if c.name == name: 212 return c 213 214 logger.debug(f"Could not find a matching testcase for {name}") 215 tc = TestCase(name=name) 216 self.testcases.append(tc) 217 return tc 218 219 @staticmethod 220 def testsuite_runnable(testsuite, fixtures): 221 can_run = False 222 # console harness allows us to run the test and capture data. 223 if testsuite.harness in [ 224 'console', 225 'display_capture', 226 'ztest', 227 'pytest', 228 'power', 229 'test', 230 'gtest', 231 'robot', 232 'ctest', 233 'shell' 234 ]: 235 can_run = True 236 # if we have a fixture that is also being supplied on the 237 # command-line, then we need to run the test, not just build it. 238 fixture = testsuite.harness_config.get('fixture') 239 if fixture: 240 can_run = fixture in map(lambda f: f.split(sep=':')[0], fixtures) 241 242 return can_run 243 244 def setup_handler(self, env: TwisterEnv): 245 # only setup once. 246 if self.handler: 247 return 248 249 options = env.options 250 common_args = (options, env.generator_cmd, not options.disable_suite_name_check) 251 simulator = self.platform.simulator_by_name(options.sim_name) 252 if options.device_testing: 253 handler = DeviceHandler(self, "device", *common_args) 254 handler.call_make_run = False 255 handler.ready = True 256 elif simulator: 257 if simulator.name == "qemu": 258 if os.name != "nt": 259 handler = QEMUHandler(self, "qemu", *common_args) 260 else: 261 handler = QEMUWinHandler(self, "qemu", *common_args) 262 handler.args.append(f"QEMU_PIPE={handler.get_fifo()}") 263 handler.ready = True 264 else: 265 handler = SimulationHandler(self, simulator.name, *common_args) 266 handler.ready = simulator.is_runnable() 267 268 elif self.testsuite.type == "unit": 269 handler = BinaryHandler(self, "unit", *common_args) 270 handler.binary = os.path.join(self.build_dir, "testbinary") 271 if options.enable_coverage: 272 handler.args.append("COVERAGE=1") 273 handler.call_make_run = False 274 handler.ready = True 275 else: 276 handler = Handler(self, "", *common_args) 277 if self.testsuite.harness == "ctest": 278 handler.ready = True 279 280 self.handler = handler 281 282 # Global testsuite parameters 283 def check_runnable(self, 284 options: TwisterEnv, 285 hardware_map=None): 286 287 enable_slow = options.enable_slow 288 filter = options.filter 289 fixtures = options.fixture 290 device_testing = options.device_testing 291 simulation = options.sim_name 292 293 simulator = self.platform.simulator_by_name(simulation) 294 if os.name == 'nt' and simulator: 295 # running on simulators is currently supported only for QEMU on Windows 296 if simulator.name not in ('na', 'qemu'): 297 return False 298 299 # check presence of QEMU on Windows 300 if simulator.name == 'qemu' and 'QEMU_BIN_PATH' not in os.environ: 301 return False 302 303 # we asked for build-only on the command line 304 if self.testsuite.build_only: 305 return False 306 307 # Do not run slow tests: 308 skip_slow = self.testsuite.slow and not enable_slow 309 if skip_slow: 310 return False 311 312 target_ready = bool(self.testsuite.type == "unit" or \ 313 self.platform.type == "native" or \ 314 self.testsuite.harness == "ctest" or \ 315 (simulator and simulator.name in SUPPORTED_SIMS and \ 316 simulator.name not in self.testsuite.simulation_exclude) or \ 317 device_testing) 318 319 # check if test is runnable in pytest 320 if self.testsuite.harness in ['pytest', 'shell', 'power', 'display_capture']: 321 target_ready = bool( 322 filter == 'runnable' or simulator and simulator.name in SUPPORTED_SIMS_IN_PYTEST 323 ) 324 325 if filter != 'runnable' and \ 326 simulator and \ 327 simulator.name in SUPPORTED_SIMS_WITH_EXEC and \ 328 not simulator.is_runnable(): 329 target_ready = False 330 331 testsuite_runnable = self.testsuite_runnable(self.testsuite, fixtures) 332 333 if hardware_map: 334 for h in hardware_map.duts: 335 if (h.platform in self.platform.aliases and 336 self.testsuite_runnable(self.testsuite, h.fixtures)): 337 testsuite_runnable = True 338 break 339 340 return testsuite_runnable and target_ready 341 342 def create_overlay( 343 self, 344 platform, 345 enable_asan=False, 346 enable_ubsan=False, 347 enable_coverage=False, 348 coverage_platform=None 349 ): 350 if coverage_platform is None: 351 coverage_platform = [] 352 # Create this in a "twister/" subdirectory otherwise this 353 # will pass this overlay to kconfig.py *twice* and kconfig.cmake 354 # will silently give that second time precedence over any 355 # --extra-args=CONFIG_* 356 subdir = os.path.join(self.build_dir, "twister") 357 358 content = "" 359 360 if self.testsuite.extra_configs: 361 new_config_list = [] 362 # some configs might be conditional on arch or platform, see if we 363 # have a namespace defined and apply only if the namespace matches. 364 # we currently support both arch: and platform: 365 for config in self.testsuite.extra_configs: 366 cond_config = config.split(":") 367 if cond_config[0] == "arch" and len(cond_config) == 3: 368 if self.platform.arch == cond_config[1]: 369 new_config_list.append(cond_config[2]) 370 elif cond_config[0] == "platform" and len(cond_config) == 3: 371 if self.platform.name == cond_config[1]: 372 new_config_list.append(cond_config[2]) 373 else: 374 new_config_list.append(config) 375 376 content = "\n".join(new_config_list) 377 378 379 if self.testsuite.harness_config: 380 self.suite_repeat = self.testsuite.harness_config.get('ztest_suite_repeat', None) 381 self.test_repeat = self.testsuite.harness_config.get('ztest_test_repeat', None) 382 self.test_shuffle = self.testsuite.harness_config.get('ztest_test_shuffle', False) 383 384 385 # Use suite_repeat and test_repeat values 386 if self.suite_repeat or self.test_repeat or self.test_shuffle: 387 content +="\nCONFIG_ZTEST_REPEAT=y" 388 if self.suite_repeat: 389 content += f"\nCONFIG_ZTEST_SUITE_REPEAT_COUNT={self.suite_repeat}" 390 if self.test_repeat: 391 content += f"\nCONFIG_ZTEST_TEST_REPEAT_COUNT={self.test_repeat}" 392 if self.test_shuffle: 393 content +="\nCONFIG_ZTEST_SHUFFLE=y" 394 395 if enable_coverage: 396 for cp in coverage_platform: 397 if cp in platform.aliases: 398 content = content + "\nCONFIG_COVERAGE=y" 399 content = content + "\nCONFIG_COVERAGE_DUMP=y" 400 401 if platform.type == "native": 402 if enable_asan: 403 content = content + "\nCONFIG_ASAN=y" 404 if enable_ubsan: 405 content = content + "\nCONFIG_UBSAN=y" 406 407 if content: 408 os.makedirs(subdir, exist_ok=True) 409 file = os.path.join(subdir, "testsuite_extra.conf") 410 with open(file, "w", encoding='utf-8') as f: 411 f.write(content) 412 413 return content 414 415 def calculate_sizes( 416 self, 417 from_buildlog: bool = False, 418 generate_warning: bool = True 419 ) -> SizeCalculator: 420 """Get the RAM/ROM sizes of a test case. 421 422 This can only be run after the instance has been executed by 423 MakeGenerator, otherwise there won't be any binaries to measure. 424 425 @return A SizeCalculator object 426 """ 427 elf_filepath = self.get_elf_file() 428 buildlog_filepath = self.get_buildlog_file() if from_buildlog else '' 429 return SizeCalculator(elf_filename=elf_filepath, 430 extra_sections=self.testsuite.extra_sections, 431 buildlog_filepath=buildlog_filepath, 432 generate_warning=generate_warning) 433 434 def get_elf_file(self) -> str: 435 436 if self.sysbuild: 437 build_dir = self.domains.get_default_domain().build_dir 438 else: 439 build_dir = self.build_dir 440 441 fns = glob.glob(os.path.join(build_dir, "zephyr", "*.elf")) 442 fns.extend(glob.glob(os.path.join(build_dir, "testbinary"))) 443 blocklist = [ 444 'remapped', # used for xtensa plaforms 445 'zefi', # EFI for Zephyr 446 'qemu', # elf files generated after running in qemu 447 '_pre'] 448 fns = [x for x in fns if not any(bad in os.path.basename(x) for bad in blocklist)] 449 if not fns: 450 raise BuildError("Missing output binary") 451 elif len(fns) > 1: 452 logger.warning(f"multiple ELF files detected: {', '.join(fns)}") 453 return fns[0] 454 455 def get_buildlog_file(self) -> str: 456 """Get path to build.log file. 457 458 @raises BuildError: Incorrect amount (!=1) of build logs. 459 @return: Path to build.log (str). 460 """ 461 buildlog_paths = glob.glob(os.path.join(self.build_dir, "build.log")) 462 if len(buildlog_paths) != 1: 463 raise BuildError("Missing/multiple build.log file.") 464 return buildlog_paths[0] 465 466 def __repr__(self): 467 return f"<TestSuite {self.testsuite.name} on {self.platform.name}>" 468