1# vim: set syntax=python ts=4 :
2#
3# Copyright (c) 2018-2024 Intel Corporation
4# SPDX-License-Identifier: Apache-2.0
5
6import contextlib
7import glob
8import logging
9import mmap
10import os
11import re
12from enum import Enum
13from pathlib import Path
14
15from twisterlib.environment import canonical_zephyr_base
16from twisterlib.error import StatusAttributeError, TwisterException, TwisterRuntimeError
17from twisterlib.mixins import DisablePyTestCollectionMixin
18from twisterlib.statuses import TwisterStatus
19
20logger = logging.getLogger('twister')
21
22class ScanPathResult:
23    """Result of the scan_tesuite_path function call.
24
25    Attributes:
26        matches                          A list of test cases
27        warnings                         A string containing one or more
28                                         warnings to display
29        has_registered_test_suites       Whether or not the path contained any
30                                         calls to the ztest_register_test_suite
31                                         macro.
32        has_run_registered_test_suites   Whether or not the path contained at
33                                         least one call to
34                                         ztest_run_registered_test_suites.
35        has_test_main                    Whether or not the path contains a
36                                         definition of test_main(void)
37        ztest_suite_names                Names of found ztest suites
38    """
39    def __init__(self,
40                 matches: list[str] = None,
41                 warnings: str = None,
42                 has_registered_test_suites: bool = False,
43                 has_run_registered_test_suites: bool = False,
44                 has_test_main: bool = False,
45                 ztest_suite_names: list[str] = None):
46        if ztest_suite_names is None:
47            ztest_suite_names = []
48        self.matches = matches
49        self.warnings = warnings
50        self.has_registered_test_suites = has_registered_test_suites
51        self.has_run_registered_test_suites = has_run_registered_test_suites
52        self.has_test_main = has_test_main
53        self.ztest_suite_names = ztest_suite_names
54
55    def __eq__(self, other):
56        if not isinstance(other, ScanPathResult):
57            return False
58        return (sorted(self.matches) == sorted(other.matches) and
59                self.warnings == other.warnings and
60                (self.has_registered_test_suites ==
61                 other.has_registered_test_suites) and
62                (self.has_run_registered_test_suites ==
63                 other.has_run_registered_test_suites) and
64                self.has_test_main == other.has_test_main and
65                (sorted(self.ztest_suite_names) ==
66                 sorted(other.ztest_suite_names)))
67
68def scan_file(inf_name):
69    regular_suite_regex = re.compile(
70        # do not match until end-of-line, otherwise we won't allow
71        # stc_regex below to catch the ones that are declared in the same
72        # line--as we only search starting the end of this match
73        br"^\s*ztest_test_suite\(\s*(?P<suite_name>[a-zA-Z0-9_]+)\s*,",
74        re.MULTILINE)
75    registered_suite_regex = re.compile(
76        br"^\s*ztest_register_test_suite"
77        br"\(\s*(?P<suite_name>[a-zA-Z0-9_]+)\s*,",
78        re.MULTILINE)
79    new_suite_regex = re.compile(
80        br"^\s*ZTEST_SUITE\(\s*(?P<suite_name>[a-zA-Z0-9_]+)\s*,",
81        re.MULTILINE)
82    testcase_regex = re.compile(
83        br"^\s*(?:ZTEST|ZTEST_F|ZTEST_USER|ZTEST_USER_F)\(\s*(?P<suite_name>[a-zA-Z0-9_]+)\s*,"
84        br"\s*(?P<testcase_name>[a-zA-Z0-9_]+)\s*",
85        re.MULTILINE)
86    # Checks if the file contains a definition of "void test_main(void)"
87    # Since ztest provides a plain test_main implementation it is OK to:
88    # 1. register test suites and not call the run function if and only if
89    #    the test doesn't have a custom test_main.
90    # 2. register test suites and a custom test_main definition if and only if
91    #    the test also calls ztest_run_registered_test_suites.
92    test_main_regex = re.compile(
93        br"^\s*void\s+test_main\(void\)",
94        re.MULTILINE)
95    registered_suite_run_regex = re.compile(
96        br"^\s*ztest_run_registered_test_suites\("
97        br"(\*+|&)?(?P<state_identifier>[a-zA-Z0-9_]+)\)",
98        re.MULTILINE)
99
100    warnings = None
101    has_registered_test_suites = False
102    has_run_registered_test_suites = False
103    has_test_main = False
104
105    with open(inf_name) as inf:
106        if os.name == 'nt':
107            mmap_args = {'fileno': inf.fileno(), 'length': 0, 'access': mmap.ACCESS_READ}
108        else:
109            mmap_args = {
110                'fileno': inf.fileno(),
111                'length': 0,
112                'flags': mmap.MAP_PRIVATE,
113                'prot': mmap.PROT_READ,
114                'offset': 0
115            }
116
117        with contextlib.closing(mmap.mmap(**mmap_args)) as main_c:
118            regular_suite_regex_matches = \
119                [m for m in regular_suite_regex.finditer(main_c)]
120            registered_suite_regex_matches = \
121                [m for m in registered_suite_regex.finditer(main_c)]
122            new_suite_testcase_regex_matches = \
123                [m for m in testcase_regex.finditer(main_c)]
124            new_suite_regex_matches = \
125                [m for m in new_suite_regex.finditer(main_c)]
126
127            if registered_suite_regex_matches:
128                has_registered_test_suites = True
129            if registered_suite_run_regex.search(main_c):
130                has_run_registered_test_suites = True
131            if test_main_regex.search(main_c):
132                has_test_main = True
133
134            if regular_suite_regex_matches:
135                ztest_suite_names = \
136                    _extract_ztest_suite_names(regular_suite_regex_matches)
137                testcase_names, warnings = _find_regular_ztest_testcases(
138                    main_c,
139                    regular_suite_regex_matches,
140                    has_registered_test_suites
141                )
142            elif registered_suite_regex_matches:
143                ztest_suite_names = \
144                    _extract_ztest_suite_names(registered_suite_regex_matches)
145                testcase_names, warnings = _find_regular_ztest_testcases(
146                    main_c,
147                    registered_suite_regex_matches,
148                    has_registered_test_suites
149                )
150            elif new_suite_regex_matches or new_suite_testcase_regex_matches:
151                ztest_suite_names = \
152                    _extract_ztest_suite_names(new_suite_regex_matches)
153                testcase_names, warnings = \
154                    _find_new_ztest_testcases(main_c)
155            else:
156                # can't find ztest_test_suite, maybe a client, because
157                # it includes ztest.h
158                ztest_suite_names = []
159                testcase_names, warnings = None, None
160
161            return ScanPathResult(
162                matches=testcase_names,
163                warnings=warnings,
164                has_registered_test_suites=has_registered_test_suites,
165                has_run_registered_test_suites=has_run_registered_test_suites,
166                has_test_main=has_test_main,
167                ztest_suite_names=ztest_suite_names)
168
169def _extract_ztest_suite_names(suite_regex_matches):
170    ztest_suite_names = \
171        [m.group("suite_name") for m in suite_regex_matches]
172    ztest_suite_names = \
173        [name.decode("UTF-8") for name in ztest_suite_names]
174    return ztest_suite_names
175
176def _find_regular_ztest_testcases(search_area, suite_regex_matches, is_registered_test_suite):
177    """
178    Find regular ztest testcases like "ztest_unit_test" or similar. Return
179    testcases' names and eventually found warnings.
180    """
181    testcase_regex = re.compile(
182        br"""^\s*  # empty space at the beginning is ok
183        # catch the case where it is declared in the same sentence, e.g:
184        #
185        # ztest_test_suite(mutex_complex, ztest_user_unit_test(TESTNAME));
186        # ztest_register_test_suite(n, p, ztest_user_unit_test(TESTNAME),
187        (?:ztest_
188            (?:test_suite\(|register_test_suite\([a-zA-Z0-9_]+\s*,\s*)
189            [a-zA-Z0-9_]+\s*,\s*
190        )?
191        # Catch ztest[_user]_unit_test-[_setup_teardown](TESTNAME)
192        ztest_(?:1cpu_)?(?:user_)?unit_test(?:_setup_teardown)?
193        # Consume the argument that becomes the extra testcase
194        \(\s*(?P<testcase_name>[a-zA-Z0-9_]+)
195        # _setup_teardown() variant has two extra arguments that we ignore
196        (?:\s*,\s*[a-zA-Z0-9_]+\s*,\s*[a-zA-Z0-9_]+)?
197        \s*\)""",
198        # We don't check how it finishes; we don't care
199        re.MULTILINE | re.VERBOSE)
200    achtung_regex = re.compile(
201        br"(#ifdef|#endif)",
202        re.MULTILINE)
203
204    search_start, search_end = \
205        _get_search_area_boundary(search_area, suite_regex_matches, is_registered_test_suite)
206    limited_search_area = search_area[search_start:search_end]
207    testcase_names, warnings = \
208        _find_ztest_testcases(limited_search_area, testcase_regex)
209
210    achtung_matches = re.findall(achtung_regex, limited_search_area)
211    if achtung_matches and warnings is None:
212        achtung = ", ".join(sorted({match.decode() for match in achtung_matches},reverse = True))
213        warnings = f"found invalid {achtung} in ztest_test_suite()"
214
215    return testcase_names, warnings
216
217
218def _get_search_area_boundary(search_area, suite_regex_matches, is_registered_test_suite):
219    """
220    Get search area boundary based on "ztest_test_suite(...)",
221    "ztest_register_test_suite(...)" or "ztest_run_test_suite(...)"
222    functions occurrence.
223    """
224    suite_run_regex = re.compile(
225        br"^\s*ztest_run_test_suite\((?P<suite_name>[a-zA-Z0-9_]+)\)",
226        re.MULTILINE)
227
228    search_start = suite_regex_matches[0].end()
229
230    suite_run_match = suite_run_regex.search(search_area)
231    if suite_run_match:
232        search_end = suite_run_match.start()
233    elif not suite_run_match and not is_registered_test_suite:
234        raise ValueError("can't find ztest_run_test_suite")
235    else:
236        search_end = re.compile(br"\);", re.MULTILINE) \
237            .search(search_area, search_start) \
238            .end()
239
240    return search_start, search_end
241
242def _find_new_ztest_testcases(search_area):
243    """
244    Find regular ztest testcases like "ZTEST", "ZTEST_F" etc. Return
245    testcases' names and eventually found warnings.
246    """
247    testcase_regex = re.compile(
248        br"^\s*(?:ZTEST|ZTEST_F|ZTEST_USER|ZTEST_USER_F)\(\s*(?P<suite_name>[a-zA-Z0-9_]+)\s*,"
249        br"\s*(?P<testcase_name>[a-zA-Z0-9_]+)\s*",
250        re.MULTILINE)
251
252    return _find_ztest_testcases(search_area, testcase_regex)
253
254def _find_ztest_testcases(search_area, testcase_regex):
255    """
256    Parse search area and try to find testcases defined in testcase_regex
257    argument. Return testcase names and eventually found warnings.
258    """
259    testcase_regex_matches = \
260        [m for m in testcase_regex.finditer(search_area)]
261    testcase_names = [
262        (
263            m.group("suite_name") if m.groupdict().get("suite_name") else b'',
264            m.group("testcase_name")
265        ) for m in testcase_regex_matches
266    ]
267    testcase_names = [
268        (ts_name.decode("UTF-8"), tc_name.decode("UTF-8")) for ts_name, tc_name in testcase_names
269    ]
270    warnings = None
271    for testcase_name in testcase_names:
272        if not testcase_name[1].startswith("test_"):
273            warnings = "Found a test that does not start with test_"
274    testcase_names = \
275        [(ts_name + '.' if ts_name else '') + f"{tc_name.replace('test_', '', 1)}" \
276         for (ts_name, tc_name) in testcase_names]
277
278    return testcase_names, warnings
279
280def find_c_files_in(path: str, extensions: list = None) -> list:
281    """
282    Find C or C++ sources in the directory specified by "path"
283    """
284    if extensions is None:
285        extensions = ['c', 'cpp', 'cxx', 'cc']
286    if not os.path.isdir(path):
287        return []
288
289    # back up previous CWD
290    oldpwd = os.getcwd()
291    os.chdir(path)
292
293    filenames = []
294    for ext in extensions:
295        # glob.glob('**/*.c') does not pick up the base directory
296        filenames += [os.path.join(path, x) for x in glob.glob(f'*.{ext}')]
297        # glob matches in subdirectories too
298        filenames += [os.path.join(path, x) for x in glob.glob(f'**/*.{ext}')]
299
300    # restore previous CWD
301    os.chdir(oldpwd)
302
303    return filenames
304
305def scan_testsuite_path(testsuite_path):
306    subcases = []
307    has_registered_test_suites = False
308    has_run_registered_test_suites = False
309    has_test_main = False
310    ztest_suite_names = []
311
312    src_dir_path = _find_src_dir_path(testsuite_path)
313    for filename in find_c_files_in(src_dir_path):
314        if os.stat(filename).st_size == 0:
315            continue
316        try:
317            result: ScanPathResult = scan_file(filename)
318            if result.warnings:
319                logger.error(f"{filename}: {result.warnings}")
320                raise TwisterRuntimeError(f"{filename}: {result.warnings}")
321            if result.matches:
322                subcases += result.matches
323            if result.has_registered_test_suites:
324                has_registered_test_suites = True
325            if result.has_run_registered_test_suites:
326                has_run_registered_test_suites = True
327            if result.has_test_main:
328                has_test_main = True
329            if result.ztest_suite_names:
330                ztest_suite_names += result.ztest_suite_names
331
332        except ValueError as e:
333            logger.error(f"{filename}: error parsing source file: {e}")
334
335    src_dir_pathlib_path = Path(src_dir_path)
336    for filename in find_c_files_in(testsuite_path):
337        # If we have already scanned those files in the src_dir step, skip them.
338        filename_path = Path(filename)
339        if src_dir_pathlib_path in filename_path.parents:
340            continue
341
342        try:
343            result: ScanPathResult = scan_file(filename)
344            if result.warnings:
345                logger.error(f"{filename}: {result.warnings}")
346            if result.matches:
347                subcases += result.matches
348            if result.ztest_suite_names:
349                ztest_suite_names += result.ztest_suite_names
350        except ValueError as e:
351            logger.error(f"{filename}: can't find: {e}")
352
353    if (has_registered_test_suites and has_test_main and
354            not has_run_registered_test_suites):
355        warning = \
356            "Found call to 'ztest_register_test_suite()' but no "\
357            "call to 'ztest_run_registered_test_suites()'"
358        logger.error(warning)
359        raise TwisterRuntimeError(warning)
360
361    return subcases, ztest_suite_names
362
363def _find_src_dir_path(test_dir_path):
364    """
365    Try to find src directory with test source code. Sometimes due to the
366    optimization reasons it is placed in upper directory.
367    """
368    src_dir_name = "src"
369    src_dir_path = os.path.join(test_dir_path, src_dir_name)
370    if os.path.isdir(src_dir_path):
371        return src_dir_path
372    src_dir_path = os.path.join(test_dir_path, "..", src_dir_name)
373    if os.path.isdir(src_dir_path):
374        return src_dir_path
375    return ""
376
377class TestCase(DisablePyTestCollectionMixin):
378
379    def __init__(self, name=None, testsuite=None):
380        self.duration = 0
381        self.name = name
382        self._status = TwisterStatus.NONE
383        self.reason = None
384        self.testsuite = testsuite
385        self.output = ""
386        self.freeform = False
387
388    @property
389    def detailed_name(self) -> str:
390        return TestSuite.get_case_name_(self.testsuite, self.name, detailed=True)
391
392    @property
393    def status(self) -> TwisterStatus:
394        return self._status
395
396    @status.setter
397    def status(self, value : TwisterStatus) -> None:
398        # Check for illegal assignments by value
399        try:
400            key = value.name if isinstance(value, Enum) else value
401            self._status = TwisterStatus[key]
402        except KeyError as err:
403            raise StatusAttributeError(self.__class__, value) from err
404
405    def __lt__(self, other):
406        return self.name < other.name
407
408    def __repr__(self):
409        return f"<TestCase {self.name} with {self.status}>"
410
411    def __str__(self):
412        return self.name
413
414class TestSuite(DisablePyTestCollectionMixin):
415    """Class representing a test application
416    """
417
418    def __init__(self, suite_root, suite_path, name, data=None, detailed_test_id=True):
419        """TestSuite constructor.
420
421        This gets called by TestPlan as it finds and reads test yaml files.
422        Multiple TestSuite instances may be generated from a single testcase.yaml,
423        each one corresponds to an entry within that file.
424
425        We need to have a unique name for every single test case. Since
426        a testcase.yaml can define multiple tests, the canonical name for
427        the test case is <workdir>/<name>.
428
429        @param testsuite_root os.path.abspath() of one of the --testsuite-root
430        @param suite_path path to testsuite
431        @param name Name of this test case, corresponding to the entry name
432            in the test case configuration file. For many test cases that just
433            define one test, can be anything and is usually "test". This is
434            really only used to distinguish between different cases when
435            the testcase.yaml defines multiple tests
436        """
437
438        workdir = os.path.relpath(suite_path, suite_root)
439
440        assert self.check_suite_name(name, suite_root, workdir)
441        self.detailed_test_id = detailed_test_id
442        self.name = self.get_unique(suite_root, workdir, name) if self.detailed_test_id else name
443        self.id = name
444
445        self.source_dir = suite_path
446        self.source_dir_rel = os.path.relpath(
447            os.path.realpath(suite_path), start=canonical_zephyr_base
448        )
449        self.yamlfile = suite_path
450        self.testcases = []
451        self.integration_platforms = []
452
453        self.ztest_suite_names = []
454
455        self._status = TwisterStatus.NONE
456
457        if data:
458            self.load(data)
459
460    @property
461    def status(self) -> TwisterStatus:
462        return self._status
463
464    @status.setter
465    def status(self, value : TwisterStatus) -> None:
466        # Check for illegal assignments by value
467        try:
468            key = value.name if isinstance(value, Enum) else value
469            self._status = TwisterStatus[key]
470        except KeyError as err:
471            raise StatusAttributeError(self.__class__, value) from err
472
473    def load(self, data):
474        for k, v in data.items():
475            if k != "testcases":
476                setattr(self, k, v)
477
478        if self.harness == 'console' and not self.harness_config:
479            raise Exception(
480                'Harness config error: console harness defined without a configuration.'
481            )
482
483    @staticmethod
484    def get_case_name_(test_suite, tc_name, detailed=True) -> str:
485        return f"{test_suite.id}.{tc_name}" \
486            if test_suite and detailed and not test_suite.detailed_test_id else f"{tc_name}"
487
488    @staticmethod
489    def compose_case_name_(test_suite, tc_name) -> str:
490        return f"{test_suite.id}.{tc_name}" \
491            if test_suite and test_suite.detailed_test_id else f"{tc_name}"
492
493    def compose_case_name(self, tc_name) -> str:
494        return self.compose_case_name_(self, tc_name)
495
496    def add_subcases(self, data, parsed_subcases=None, suite_names=None):
497        testcases = data.get("testcases", [])
498        if testcases:
499            for tc in testcases:
500                self.add_testcase(name=self.compose_case_name(tc))
501        else:
502            if not parsed_subcases:
503                self.add_testcase(self.id, freeform=True)
504            else:
505                # only add each testcase once
506                for tc in set(parsed_subcases):
507                    self.add_testcase(name=self.compose_case_name(tc))
508        if suite_names:
509            self.ztest_suite_names = suite_names
510
511    def add_testcase(self, name, freeform=False):
512        tc = TestCase(name=name, testsuite=self)
513        tc.freeform = freeform
514        self.testcases.append(tc)
515
516    @staticmethod
517    def get_unique(testsuite_root, workdir, name):
518
519        canonical_testsuite_root = os.path.realpath(testsuite_root)
520        if Path(canonical_zephyr_base) in Path(canonical_testsuite_root).parents:
521            # This is in ZEPHYR_BASE, so include path in name for uniqueness
522            # FIXME: We should not depend on path of test for unique names.
523            relative_ts_root = os.path.relpath(canonical_testsuite_root,
524                                               start=canonical_zephyr_base)
525        else:
526            relative_ts_root = ""
527
528        # workdir can be "."
529        unique = os.path.normpath(
530            os.path.join(relative_ts_root, workdir, name)
531        ).replace(os.sep, '/')
532        return unique
533
534    @staticmethod
535    def check_suite_name(name, testsuite_root, workdir):
536        check = name.split(".")
537        if len(check) < 2:
538            raise TwisterException(f"""bad test name '{name}' in {testsuite_root}/{workdir}. \
539Tests should reference the category and subsystem with a dot as a separator.
540                    """
541                    )
542        return True
543