1# vim: set syntax=python ts=4 : 2# 3# Copyright (c) 2018-2024 Intel Corporation 4# SPDX-License-Identifier: Apache-2.0 5 6import contextlib 7import glob 8import logging 9import mmap 10import os 11import re 12from enum import Enum 13from pathlib import Path 14 15from twisterlib.environment import canonical_zephyr_base 16from twisterlib.error import StatusAttributeError, TwisterException, TwisterRuntimeError 17from twisterlib.mixins import DisablePyTestCollectionMixin 18from twisterlib.statuses import TwisterStatus 19 20logger = logging.getLogger('twister') 21 22class ScanPathResult: 23 """Result of the scan_tesuite_path function call. 24 25 Attributes: 26 matches A list of test cases 27 warnings A string containing one or more 28 warnings to display 29 has_registered_test_suites Whether or not the path contained any 30 calls to the ztest_register_test_suite 31 macro. 32 has_run_registered_test_suites Whether or not the path contained at 33 least one call to 34 ztest_run_registered_test_suites. 35 has_test_main Whether or not the path contains a 36 definition of test_main(void) 37 ztest_suite_names Names of found ztest suites 38 """ 39 def __init__(self, 40 matches: list[str] = None, 41 warnings: str = None, 42 has_registered_test_suites: bool = False, 43 has_run_registered_test_suites: bool = False, 44 has_test_main: bool = False, 45 ztest_suite_names: list[str] = None): 46 if ztest_suite_names is None: 47 ztest_suite_names = [] 48 self.matches = matches 49 self.warnings = warnings 50 self.has_registered_test_suites = has_registered_test_suites 51 self.has_run_registered_test_suites = has_run_registered_test_suites 52 self.has_test_main = has_test_main 53 self.ztest_suite_names = ztest_suite_names 54 55 def __eq__(self, other): 56 if not isinstance(other, ScanPathResult): 57 return False 58 return (sorted(self.matches) == sorted(other.matches) and 59 self.warnings == other.warnings and 60 (self.has_registered_test_suites == 61 other.has_registered_test_suites) and 62 (self.has_run_registered_test_suites == 63 other.has_run_registered_test_suites) and 64 self.has_test_main == other.has_test_main and 65 (sorted(self.ztest_suite_names) == 66 sorted(other.ztest_suite_names))) 67 68def scan_file(inf_name): 69 regular_suite_regex = re.compile( 70 # do not match until end-of-line, otherwise we won't allow 71 # stc_regex below to catch the ones that are declared in the same 72 # line--as we only search starting the end of this match 73 br"^\s*ztest_test_suite\(\s*(?P<suite_name>[a-zA-Z0-9_]+)\s*,", 74 re.MULTILINE) 75 registered_suite_regex = re.compile( 76 br"^\s*ztest_register_test_suite" 77 br"\(\s*(?P<suite_name>[a-zA-Z0-9_]+)\s*,", 78 re.MULTILINE) 79 new_suite_regex = re.compile( 80 br"^\s*ZTEST_SUITE\(\s*(?P<suite_name>[a-zA-Z0-9_]+)\s*,", 81 re.MULTILINE) 82 testcase_regex = re.compile( 83 br"^\s*(?:ZTEST|ZTEST_F|ZTEST_USER|ZTEST_USER_F)\(\s*(?P<suite_name>[a-zA-Z0-9_]+)\s*," 84 br"\s*(?P<testcase_name>[a-zA-Z0-9_]+)\s*", 85 re.MULTILINE) 86 # Checks if the file contains a definition of "void test_main(void)" 87 # Since ztest provides a plain test_main implementation it is OK to: 88 # 1. register test suites and not call the run function if and only if 89 # the test doesn't have a custom test_main. 90 # 2. register test suites and a custom test_main definition if and only if 91 # the test also calls ztest_run_registered_test_suites. 92 test_main_regex = re.compile( 93 br"^\s*void\s+test_main\(void\)", 94 re.MULTILINE) 95 registered_suite_run_regex = re.compile( 96 br"^\s*ztest_run_registered_test_suites\(" 97 br"(\*+|&)?(?P<state_identifier>[a-zA-Z0-9_]+)\)", 98 re.MULTILINE) 99 100 warnings = None 101 has_registered_test_suites = False 102 has_run_registered_test_suites = False 103 has_test_main = False 104 105 with open(inf_name) as inf: 106 if os.name == 'nt': 107 mmap_args = {'fileno': inf.fileno(), 'length': 0, 'access': mmap.ACCESS_READ} 108 else: 109 mmap_args = { 110 'fileno': inf.fileno(), 111 'length': 0, 112 'flags': mmap.MAP_PRIVATE, 113 'prot': mmap.PROT_READ, 114 'offset': 0 115 } 116 117 with contextlib.closing(mmap.mmap(**mmap_args)) as main_c: 118 regular_suite_regex_matches = \ 119 [m for m in regular_suite_regex.finditer(main_c)] 120 registered_suite_regex_matches = \ 121 [m for m in registered_suite_regex.finditer(main_c)] 122 new_suite_testcase_regex_matches = \ 123 [m for m in testcase_regex.finditer(main_c)] 124 new_suite_regex_matches = \ 125 [m for m in new_suite_regex.finditer(main_c)] 126 127 if registered_suite_regex_matches: 128 has_registered_test_suites = True 129 if registered_suite_run_regex.search(main_c): 130 has_run_registered_test_suites = True 131 if test_main_regex.search(main_c): 132 has_test_main = True 133 134 if regular_suite_regex_matches: 135 ztest_suite_names = \ 136 _extract_ztest_suite_names(regular_suite_regex_matches) 137 testcase_names, warnings = _find_regular_ztest_testcases( 138 main_c, 139 regular_suite_regex_matches, 140 has_registered_test_suites 141 ) 142 elif registered_suite_regex_matches: 143 ztest_suite_names = \ 144 _extract_ztest_suite_names(registered_suite_regex_matches) 145 testcase_names, warnings = _find_regular_ztest_testcases( 146 main_c, 147 registered_suite_regex_matches, 148 has_registered_test_suites 149 ) 150 elif new_suite_regex_matches or new_suite_testcase_regex_matches: 151 ztest_suite_names = \ 152 _extract_ztest_suite_names(new_suite_regex_matches) 153 testcase_names, warnings = \ 154 _find_new_ztest_testcases(main_c) 155 else: 156 # can't find ztest_test_suite, maybe a client, because 157 # it includes ztest.h 158 ztest_suite_names = [] 159 testcase_names, warnings = None, None 160 161 return ScanPathResult( 162 matches=testcase_names, 163 warnings=warnings, 164 has_registered_test_suites=has_registered_test_suites, 165 has_run_registered_test_suites=has_run_registered_test_suites, 166 has_test_main=has_test_main, 167 ztest_suite_names=ztest_suite_names) 168 169def _extract_ztest_suite_names(suite_regex_matches): 170 ztest_suite_names = \ 171 [m.group("suite_name") for m in suite_regex_matches] 172 ztest_suite_names = \ 173 [name.decode("UTF-8") for name in ztest_suite_names] 174 return ztest_suite_names 175 176def _find_regular_ztest_testcases(search_area, suite_regex_matches, is_registered_test_suite): 177 """ 178 Find regular ztest testcases like "ztest_unit_test" or similar. Return 179 testcases' names and eventually found warnings. 180 """ 181 testcase_regex = re.compile( 182 br"""^\s* # empty space at the beginning is ok 183 # catch the case where it is declared in the same sentence, e.g: 184 # 185 # ztest_test_suite(mutex_complex, ztest_user_unit_test(TESTNAME)); 186 # ztest_register_test_suite(n, p, ztest_user_unit_test(TESTNAME), 187 (?:ztest_ 188 (?:test_suite\(|register_test_suite\([a-zA-Z0-9_]+\s*,\s*) 189 [a-zA-Z0-9_]+\s*,\s* 190 )? 191 # Catch ztest[_user]_unit_test-[_setup_teardown](TESTNAME) 192 ztest_(?:1cpu_)?(?:user_)?unit_test(?:_setup_teardown)? 193 # Consume the argument that becomes the extra testcase 194 \(\s*(?P<testcase_name>[a-zA-Z0-9_]+) 195 # _setup_teardown() variant has two extra arguments that we ignore 196 (?:\s*,\s*[a-zA-Z0-9_]+\s*,\s*[a-zA-Z0-9_]+)? 197 \s*\)""", 198 # We don't check how it finishes; we don't care 199 re.MULTILINE | re.VERBOSE) 200 achtung_regex = re.compile( 201 br"(#ifdef|#endif)", 202 re.MULTILINE) 203 204 search_start, search_end = \ 205 _get_search_area_boundary(search_area, suite_regex_matches, is_registered_test_suite) 206 limited_search_area = search_area[search_start:search_end] 207 testcase_names, warnings = \ 208 _find_ztest_testcases(limited_search_area, testcase_regex) 209 210 achtung_matches = re.findall(achtung_regex, limited_search_area) 211 if achtung_matches and warnings is None: 212 achtung = ", ".join(sorted({match.decode() for match in achtung_matches},reverse = True)) 213 warnings = f"found invalid {achtung} in ztest_test_suite()" 214 215 return testcase_names, warnings 216 217 218def _get_search_area_boundary(search_area, suite_regex_matches, is_registered_test_suite): 219 """ 220 Get search area boundary based on "ztest_test_suite(...)", 221 "ztest_register_test_suite(...)" or "ztest_run_test_suite(...)" 222 functions occurrence. 223 """ 224 suite_run_regex = re.compile( 225 br"^\s*ztest_run_test_suite\((?P<suite_name>[a-zA-Z0-9_]+)\)", 226 re.MULTILINE) 227 228 search_start = suite_regex_matches[0].end() 229 230 suite_run_match = suite_run_regex.search(search_area) 231 if suite_run_match: 232 search_end = suite_run_match.start() 233 elif not suite_run_match and not is_registered_test_suite: 234 raise ValueError("can't find ztest_run_test_suite") 235 else: 236 search_end = re.compile(br"\);", re.MULTILINE) \ 237 .search(search_area, search_start) \ 238 .end() 239 240 return search_start, search_end 241 242def _find_new_ztest_testcases(search_area): 243 """ 244 Find regular ztest testcases like "ZTEST", "ZTEST_F" etc. Return 245 testcases' names and eventually found warnings. 246 """ 247 testcase_regex = re.compile( 248 br"^\s*(?:ZTEST|ZTEST_F|ZTEST_USER|ZTEST_USER_F)\(\s*(?P<suite_name>[a-zA-Z0-9_]+)\s*," 249 br"\s*(?P<testcase_name>[a-zA-Z0-9_]+)\s*", 250 re.MULTILINE) 251 252 return _find_ztest_testcases(search_area, testcase_regex) 253 254def _find_ztest_testcases(search_area, testcase_regex): 255 """ 256 Parse search area and try to find testcases defined in testcase_regex 257 argument. Return testcase names and eventually found warnings. 258 """ 259 testcase_regex_matches = \ 260 [m for m in testcase_regex.finditer(search_area)] 261 testcase_names = [ 262 ( 263 m.group("suite_name") if m.groupdict().get("suite_name") else b'', 264 m.group("testcase_name") 265 ) for m in testcase_regex_matches 266 ] 267 testcase_names = [ 268 (ts_name.decode("UTF-8"), tc_name.decode("UTF-8")) for ts_name, tc_name in testcase_names 269 ] 270 warnings = None 271 for testcase_name in testcase_names: 272 if not testcase_name[1].startswith("test_"): 273 warnings = "Found a test that does not start with test_" 274 testcase_names = \ 275 [(ts_name + '.' if ts_name else '') + f"{tc_name.replace('test_', '', 1)}" \ 276 for (ts_name, tc_name) in testcase_names] 277 278 return testcase_names, warnings 279 280def find_c_files_in(path: str, extensions: list = None) -> list: 281 """ 282 Find C or C++ sources in the directory specified by "path" 283 """ 284 if extensions is None: 285 extensions = ['c', 'cpp', 'cxx', 'cc'] 286 if not os.path.isdir(path): 287 return [] 288 289 # back up previous CWD 290 oldpwd = os.getcwd() 291 os.chdir(path) 292 293 filenames = [] 294 for ext in extensions: 295 # glob.glob('**/*.c') does not pick up the base directory 296 filenames += [os.path.join(path, x) for x in glob.glob(f'*.{ext}')] 297 # glob matches in subdirectories too 298 filenames += [os.path.join(path, x) for x in glob.glob(f'**/*.{ext}')] 299 300 # restore previous CWD 301 os.chdir(oldpwd) 302 303 return filenames 304 305def scan_testsuite_path(testsuite_path): 306 subcases = [] 307 has_registered_test_suites = False 308 has_run_registered_test_suites = False 309 has_test_main = False 310 ztest_suite_names = [] 311 312 src_dir_path = _find_src_dir_path(testsuite_path) 313 for filename in find_c_files_in(src_dir_path): 314 if os.stat(filename).st_size == 0: 315 continue 316 try: 317 result: ScanPathResult = scan_file(filename) 318 if result.warnings: 319 logger.error(f"{filename}: {result.warnings}") 320 raise TwisterRuntimeError(f"{filename}: {result.warnings}") 321 if result.matches: 322 subcases += result.matches 323 if result.has_registered_test_suites: 324 has_registered_test_suites = True 325 if result.has_run_registered_test_suites: 326 has_run_registered_test_suites = True 327 if result.has_test_main: 328 has_test_main = True 329 if result.ztest_suite_names: 330 ztest_suite_names += result.ztest_suite_names 331 332 except ValueError as e: 333 logger.error(f"{filename}: error parsing source file: {e}") 334 335 src_dir_pathlib_path = Path(src_dir_path) 336 for filename in find_c_files_in(testsuite_path): 337 # If we have already scanned those files in the src_dir step, skip them. 338 filename_path = Path(filename) 339 if src_dir_pathlib_path in filename_path.parents: 340 continue 341 342 try: 343 result: ScanPathResult = scan_file(filename) 344 if result.warnings: 345 logger.error(f"{filename}: {result.warnings}") 346 if result.matches: 347 subcases += result.matches 348 if result.ztest_suite_names: 349 ztest_suite_names += result.ztest_suite_names 350 except ValueError as e: 351 logger.error(f"{filename}: can't find: {e}") 352 353 if (has_registered_test_suites and has_test_main and 354 not has_run_registered_test_suites): 355 warning = \ 356 "Found call to 'ztest_register_test_suite()' but no "\ 357 "call to 'ztest_run_registered_test_suites()'" 358 logger.error(warning) 359 raise TwisterRuntimeError(warning) 360 361 return subcases, ztest_suite_names 362 363def _find_src_dir_path(test_dir_path): 364 """ 365 Try to find src directory with test source code. Sometimes due to the 366 optimization reasons it is placed in upper directory. 367 """ 368 src_dir_name = "src" 369 src_dir_path = os.path.join(test_dir_path, src_dir_name) 370 if os.path.isdir(src_dir_path): 371 return src_dir_path 372 src_dir_path = os.path.join(test_dir_path, "..", src_dir_name) 373 if os.path.isdir(src_dir_path): 374 return src_dir_path 375 return "" 376 377class TestCase(DisablePyTestCollectionMixin): 378 379 def __init__(self, name=None, testsuite=None): 380 self.duration = 0 381 self.name = name 382 self._status = TwisterStatus.NONE 383 self.reason = None 384 self.testsuite = testsuite 385 self.output = "" 386 self.freeform = False 387 388 @property 389 def detailed_name(self) -> str: 390 return TestSuite.get_case_name_(self.testsuite, self.name, detailed=True) 391 392 @property 393 def status(self) -> TwisterStatus: 394 return self._status 395 396 @status.setter 397 def status(self, value : TwisterStatus) -> None: 398 # Check for illegal assignments by value 399 try: 400 key = value.name if isinstance(value, Enum) else value 401 self._status = TwisterStatus[key] 402 except KeyError as err: 403 raise StatusAttributeError(self.__class__, value) from err 404 405 def __lt__(self, other): 406 return self.name < other.name 407 408 def __repr__(self): 409 return f"<TestCase {self.name} with {self.status}>" 410 411 def __str__(self): 412 return self.name 413 414class TestSuite(DisablePyTestCollectionMixin): 415 """Class representing a test application 416 """ 417 418 def __init__(self, suite_root, suite_path, name, data=None, detailed_test_id=True): 419 """TestSuite constructor. 420 421 This gets called by TestPlan as it finds and reads test yaml files. 422 Multiple TestSuite instances may be generated from a single testcase.yaml, 423 each one corresponds to an entry within that file. 424 425 We need to have a unique name for every single test case. Since 426 a testcase.yaml can define multiple tests, the canonical name for 427 the test case is <workdir>/<name>. 428 429 @param testsuite_root os.path.abspath() of one of the --testsuite-root 430 @param suite_path path to testsuite 431 @param name Name of this test case, corresponding to the entry name 432 in the test case configuration file. For many test cases that just 433 define one test, can be anything and is usually "test". This is 434 really only used to distinguish between different cases when 435 the testcase.yaml defines multiple tests 436 """ 437 438 workdir = os.path.relpath(suite_path, suite_root) 439 440 assert self.check_suite_name(name, suite_root, workdir) 441 self.detailed_test_id = detailed_test_id 442 self.name = self.get_unique(suite_root, workdir, name) if self.detailed_test_id else name 443 self.id = name 444 445 self.source_dir = suite_path 446 self.source_dir_rel = os.path.relpath( 447 os.path.realpath(suite_path), start=canonical_zephyr_base 448 ) 449 self.yamlfile = suite_path 450 self.testcases = [] 451 self.integration_platforms = [] 452 453 self.ztest_suite_names = [] 454 455 self._status = TwisterStatus.NONE 456 457 if data: 458 self.load(data) 459 460 @property 461 def status(self) -> TwisterStatus: 462 return self._status 463 464 @status.setter 465 def status(self, value : TwisterStatus) -> None: 466 # Check for illegal assignments by value 467 try: 468 key = value.name if isinstance(value, Enum) else value 469 self._status = TwisterStatus[key] 470 except KeyError as err: 471 raise StatusAttributeError(self.__class__, value) from err 472 473 def load(self, data): 474 for k, v in data.items(): 475 if k != "testcases": 476 setattr(self, k, v) 477 478 if self.harness == 'console' and not self.harness_config: 479 raise Exception( 480 'Harness config error: console harness defined without a configuration.' 481 ) 482 483 @staticmethod 484 def get_case_name_(test_suite, tc_name, detailed=True) -> str: 485 return f"{test_suite.id}.{tc_name}" \ 486 if test_suite and detailed and not test_suite.detailed_test_id else f"{tc_name}" 487 488 @staticmethod 489 def compose_case_name_(test_suite, tc_name) -> str: 490 return f"{test_suite.id}.{tc_name}" \ 491 if test_suite and test_suite.detailed_test_id else f"{tc_name}" 492 493 def compose_case_name(self, tc_name) -> str: 494 return self.compose_case_name_(self, tc_name) 495 496 def add_subcases(self, data, parsed_subcases=None, suite_names=None): 497 testcases = data.get("testcases", []) 498 if testcases: 499 for tc in testcases: 500 self.add_testcase(name=self.compose_case_name(tc)) 501 else: 502 if not parsed_subcases: 503 self.add_testcase(self.id, freeform=True) 504 else: 505 # only add each testcase once 506 for tc in set(parsed_subcases): 507 self.add_testcase(name=self.compose_case_name(tc)) 508 if suite_names: 509 self.ztest_suite_names = suite_names 510 511 def add_testcase(self, name, freeform=False): 512 tc = TestCase(name=name, testsuite=self) 513 tc.freeform = freeform 514 self.testcases.append(tc) 515 516 @staticmethod 517 def get_unique(testsuite_root, workdir, name): 518 519 canonical_testsuite_root = os.path.realpath(testsuite_root) 520 if Path(canonical_zephyr_base) in Path(canonical_testsuite_root).parents: 521 # This is in ZEPHYR_BASE, so include path in name for uniqueness 522 # FIXME: We should not depend on path of test for unique names. 523 relative_ts_root = os.path.relpath(canonical_testsuite_root, 524 start=canonical_zephyr_base) 525 else: 526 relative_ts_root = "" 527 528 # workdir can be "." 529 unique = os.path.normpath( 530 os.path.join(relative_ts_root, workdir, name) 531 ).replace(os.sep, '/') 532 return unique 533 534 @staticmethod 535 def check_suite_name(name, testsuite_root, workdir): 536 check = name.split(".") 537 if len(check) < 2: 538 raise TwisterException(f"""bad test name '{name}' in {testsuite_root}/{workdir}. \ 539Tests should reference the category and subsystem with a dot as a separator. 540 """ 541 ) 542 return True 543