1# vim: set syntax=python ts=4 :
2#
3# Copyright (c) 2018-2025 Intel Corporation
4# Copyright 2022 NXP
5# Copyright (c) 2024 Arm Limited (or its affiliates). All rights reserved.
6#
7# SPDX-License-Identifier: Apache-2.0
8from __future__ import annotations
9
10import csv
11import glob
12import hashlib
13import logging
14import os
15import random
16from enum import Enum
17
18from twisterlib.constants import (
19    SUPPORTED_SIMS,
20    SUPPORTED_SIMS_IN_PYTEST,
21    SUPPORTED_SIMS_WITH_EXEC,
22)
23from twisterlib.environment import TwisterEnv
24from twisterlib.error import BuildError, StatusAttributeError
25from twisterlib.handlers import (
26    BinaryHandler,
27    DeviceHandler,
28    Handler,
29    QEMUHandler,
30    QEMUWinHandler,
31    SimulationHandler,
32)
33from twisterlib.platform import Platform
34from twisterlib.size_calc import SizeCalculator
35from twisterlib.statuses import TwisterStatus
36from twisterlib.testsuite import TestCase, TestSuite
37
38logger = logging.getLogger('twister')
39
40
41class TestInstance:
42    """Class representing the execution of a particular TestSuite on a platform
43
44    @param test The TestSuite object we want to build/execute
45    @param platform Platform object that we want to build and run against
46    @param base_outdir Base directory for all test results. The actual
47        out directory used is <outdir>/<platform>/<test case name>
48    """
49
50    __test__ = False
51
52    def __init__(self, testsuite, platform, toolchain, outdir):
53
54        self.testsuite: TestSuite = testsuite
55        self.platform: Platform = platform
56
57        self._status = TwisterStatus.NONE
58        self.reason = "Unknown"
59        self.metrics = dict()
60        self.handler = None
61        self.recording = None
62        self.coverage = None
63        self.coverage_status = None
64        self.outdir = outdir
65        self.execution_time = 0
66        self.build_time = 0
67        self.retries = 0
68        self.toolchain = toolchain
69        self.name = os.path.join(platform.name, toolchain, testsuite.name)
70        self.dut = None
71        self.suite_repeat = None
72        self.test_repeat = None
73        self.test_shuffle = None
74
75        if testsuite.detailed_test_id:
76            self.build_dir = os.path.join(
77                outdir, platform.normalized_name, self.toolchain, testsuite.name
78            )
79        else:
80            # if suite is not in zephyr,
81            # keep only the part after ".." in reconstructed dir structure
82            source_dir_rel = testsuite.source_dir_rel.rsplit(os.pardir+os.path.sep, 1)[-1]
83            self.build_dir = os.path.join(
84                outdir,
85                platform.normalized_name,
86                self.toolchain,
87                source_dir_rel,
88                testsuite.name
89            )
90        self.run_id = None
91        self.domains = None
92        # Instance need to use sysbuild if a given suite or a platform requires it
93        self.sysbuild = testsuite.sysbuild or platform.sysbuild
94
95        self.run = False
96        self.testcases: list[TestCase] = []
97        self.init_cases()
98        self.filters = []
99        self.filter_type = None
100
101    def setup_run_id(self):
102        self.run_id = self._get_run_id()
103
104    def record(self, recording, fname_csv="recording.csv"):
105        if recording:
106            if self.recording is None:
107                self.recording = recording.copy()
108            else:
109                self.recording.extend(recording)
110
111            filename = os.path.join(self.build_dir, fname_csv)
112            fieldnames = set()
113            for r in self.recording:
114                fieldnames.update(r)
115            with open(filename, 'w') as csvfile:
116                cw = csv.DictWriter(csvfile,
117                                    fieldnames = sorted(list(fieldnames)),
118                                    lineterminator = os.linesep,
119                                    quoting = csv.QUOTE_NONNUMERIC)
120                cw.writeheader()
121                cw.writerows(self.recording)
122
123    @property
124    def status(self) -> TwisterStatus:
125        return self._status
126
127    @status.setter
128    def status(self, value : TwisterStatus) -> None:
129        # Check for illegal assignments by value
130        try:
131            key = value.name if isinstance(value, Enum) else value
132            self._status = TwisterStatus[key]
133        except KeyError as err:
134            raise StatusAttributeError(self.__class__, value) from err
135
136    def add_filter(self, reason, filter_type):
137        self.filters.append({'type': filter_type, 'reason': reason })
138        self.status = TwisterStatus.FILTER
139        self.reason = reason
140        self.filter_type = filter_type
141
142    # Fix an issue with copying objects from testsuite, need better solution.
143    def init_cases(self):
144        for c in self.testsuite.testcases:
145            self.add_testcase(c.name, freeform=c.freeform)
146
147    def _get_run_id(self):
148        """ generate run id from instance unique identifier and a random
149        number
150        If exist, get cached run id from previous run."""
151        run_id = ""
152        run_id_file = os.path.join(self.build_dir, "run_id.txt")
153        if os.path.exists(run_id_file):
154            with open(run_id_file) as fp:
155                run_id = fp.read()
156        else:
157            hash_object = hashlib.md5(self.name.encode(), usedforsecurity=False)
158            random_str = f"{random.getrandbits(64)}".encode()
159            hash_object.update(random_str)
160            run_id = hash_object.hexdigest()
161            os.makedirs(self.build_dir, exist_ok=True)
162            with open(run_id_file, 'w+') as fp:
163                fp.write(run_id)
164        return run_id
165
166    def add_missing_case_status(self, status, reason=None):
167        for case in self.testcases:
168            if case.status == TwisterStatus.STARTED:
169                case.status = TwisterStatus.FAIL
170            elif case.status == TwisterStatus.NONE:
171                case.status = status
172                if reason:
173                    case.reason = reason
174                else:
175                    case.reason = self.reason
176
177    def __getstate__(self):
178        d = self.__dict__.copy()
179        return d
180
181    def __setstate__(self, d):
182        self.__dict__.update(d)
183
184    def __lt__(self, other):
185        return self.name < other.name
186
187    def compose_case_name(self, tc_name) -> str:
188        return self.testsuite.compose_case_name(tc_name)
189
190    def set_case_status_by_name(self, name, status, reason=None):
191        tc = self.get_case_or_create(name)
192        tc.status = status
193        if reason:
194            tc.reason = reason
195        return tc
196
197    def add_testcase(self, name, freeform=False):
198        tc = TestCase(name=name)
199        tc.freeform = freeform
200        self.testcases.append(tc)
201        return tc
202
203    def get_case_by_name(self, name):
204        for c in self.testcases:
205            if c.name == name:
206                return c
207        return None
208
209    def get_case_or_create(self, name):
210        for c in self.testcases:
211            if c.name == name:
212                return c
213
214        logger.debug(f"Could not find a matching testcase for {name}")
215        tc = TestCase(name=name)
216        self.testcases.append(tc)
217        return tc
218
219    @staticmethod
220    def testsuite_runnable(testsuite, fixtures):
221        can_run = False
222        # console harness allows us to run the test and capture data.
223        if testsuite.harness in [
224            'console',
225            'display_capture',
226            'ztest',
227            'pytest',
228            'power',
229            'test',
230            'gtest',
231            'robot',
232            'ctest',
233            'shell'
234            ]:
235            can_run = True
236            # if we have a fixture that is also being supplied on the
237            # command-line, then we need to run the test, not just build it.
238            fixture = testsuite.harness_config.get('fixture')
239            if fixture:
240                can_run = fixture in map(lambda f: f.split(sep=':')[0], fixtures)
241
242        return can_run
243
244    def setup_handler(self, env: TwisterEnv):
245        # only setup once.
246        if self.handler:
247            return
248
249        options = env.options
250        common_args = (options, env.generator_cmd, not options.disable_suite_name_check)
251        simulator = self.platform.simulator_by_name(options.sim_name)
252        if options.device_testing:
253            handler = DeviceHandler(self, "device", *common_args)
254            handler.call_make_run = False
255            handler.ready = True
256        elif simulator:
257            if simulator.name == "qemu":
258                if os.name != "nt":
259                    handler = QEMUHandler(self, "qemu", *common_args)
260                else:
261                    handler = QEMUWinHandler(self, "qemu", *common_args)
262                handler.args.append(f"QEMU_PIPE={handler.get_fifo()}")
263                handler.ready = True
264            else:
265                handler = SimulationHandler(self, simulator.name, *common_args)
266                handler.ready = simulator.is_runnable()
267
268        elif self.testsuite.type == "unit":
269            handler = BinaryHandler(self, "unit", *common_args)
270            handler.binary = os.path.join(self.build_dir, "testbinary")
271            if options.enable_coverage:
272                handler.args.append("COVERAGE=1")
273            handler.call_make_run = False
274            handler.ready = True
275        else:
276            handler = Handler(self, "", *common_args)
277            if self.testsuite.harness == "ctest":
278                handler.ready = True
279
280        self.handler = handler
281
282    # Global testsuite parameters
283    def check_runnable(self,
284                       options: TwisterEnv,
285                       hardware_map=None):
286
287        enable_slow = options.enable_slow
288        filter = options.filter
289        fixtures = options.fixture
290        device_testing = options.device_testing
291        simulation = options.sim_name
292
293        simulator = self.platform.simulator_by_name(simulation)
294        if os.name == 'nt' and simulator:
295            # running on simulators is currently supported only for QEMU on Windows
296            if simulator.name not in ('na', 'qemu'):
297                return False
298
299            # check presence of QEMU on Windows
300            if simulator.name == 'qemu' and 'QEMU_BIN_PATH' not in os.environ:
301                return False
302
303        # we asked for build-only on the command line
304        if self.testsuite.build_only:
305            return False
306
307        # Do not run slow tests:
308        skip_slow = self.testsuite.slow and not enable_slow
309        if skip_slow:
310            return False
311
312        target_ready = bool(self.testsuite.type == "unit" or \
313                            self.platform.type == "native" or \
314                            self.testsuite.harness == "ctest" or \
315                            (simulator and simulator.name in SUPPORTED_SIMS and \
316                             simulator.name not in self.testsuite.simulation_exclude) or \
317                            device_testing)
318
319        # check if test is runnable in pytest
320        if self.testsuite.harness in ['pytest', 'shell', 'power', 'display_capture']:
321            target_ready = bool(
322                filter == 'runnable' or simulator and simulator.name in SUPPORTED_SIMS_IN_PYTEST
323            )
324
325        if filter != 'runnable' and \
326                simulator and \
327                simulator.name in SUPPORTED_SIMS_WITH_EXEC and \
328                not simulator.is_runnable():
329            target_ready = False
330
331        testsuite_runnable = self.testsuite_runnable(self.testsuite, fixtures)
332
333        if hardware_map:
334            for h in hardware_map.duts:
335                if (h.platform in self.platform.aliases and
336                        self.testsuite_runnable(self.testsuite, h.fixtures)):
337                    testsuite_runnable = True
338                    break
339
340        return testsuite_runnable and target_ready
341
342    def create_overlay(
343        self,
344        platform,
345        enable_asan=False,
346        enable_ubsan=False,
347        enable_coverage=False,
348        coverage_platform=None
349    ):
350        if coverage_platform is None:
351            coverage_platform = []
352        # Create this in a "twister/" subdirectory otherwise this
353        # will pass this overlay to kconfig.py *twice* and kconfig.cmake
354        # will silently give that second time precedence over any
355        # --extra-args=CONFIG_*
356        subdir = os.path.join(self.build_dir, "twister")
357
358        content = ""
359
360        if self.testsuite.extra_configs:
361            new_config_list = []
362            # some configs might be conditional on arch or platform, see if we
363            # have a namespace defined and apply only if the namespace matches.
364            # we currently support both arch: and platform:
365            for config in self.testsuite.extra_configs:
366                cond_config = config.split(":")
367                if cond_config[0] == "arch" and len(cond_config) == 3:
368                    if self.platform.arch == cond_config[1]:
369                        new_config_list.append(cond_config[2])
370                elif cond_config[0] == "platform" and len(cond_config) == 3:
371                    if self.platform.name == cond_config[1]:
372                        new_config_list.append(cond_config[2])
373                else:
374                    new_config_list.append(config)
375
376            content = "\n".join(new_config_list)
377
378
379        if self.testsuite.harness_config:
380            self.suite_repeat = self.testsuite.harness_config.get('ztest_suite_repeat', None)
381            self.test_repeat = self.testsuite.harness_config.get('ztest_test_repeat', None)
382            self.test_shuffle = self.testsuite.harness_config.get('ztest_test_shuffle', False)
383
384
385        # Use suite_repeat and test_repeat values
386        if self.suite_repeat or self.test_repeat or self.test_shuffle:
387            content +="\nCONFIG_ZTEST_REPEAT=y"
388            if self.suite_repeat:
389                content += f"\nCONFIG_ZTEST_SUITE_REPEAT_COUNT={self.suite_repeat}"
390            if self.test_repeat:
391                content += f"\nCONFIG_ZTEST_TEST_REPEAT_COUNT={self.test_repeat}"
392            if self.test_shuffle:
393                content +="\nCONFIG_ZTEST_SHUFFLE=y"
394
395        if enable_coverage:
396            for cp in coverage_platform:
397                if cp in platform.aliases:
398                    content = content + "\nCONFIG_COVERAGE=y"
399                    content = content + "\nCONFIG_COVERAGE_DUMP=y"
400
401        if platform.type == "native":
402            if enable_asan:
403                content = content + "\nCONFIG_ASAN=y"
404            if enable_ubsan:
405                content = content + "\nCONFIG_UBSAN=y"
406
407        if content:
408            os.makedirs(subdir, exist_ok=True)
409            file = os.path.join(subdir, "testsuite_extra.conf")
410            with open(file, "w", encoding='utf-8') as f:
411                f.write(content)
412
413        return content
414
415    def calculate_sizes(
416        self,
417        from_buildlog: bool = False,
418        generate_warning: bool = True
419    ) -> SizeCalculator:
420        """Get the RAM/ROM sizes of a test case.
421
422        This can only be run after the instance has been executed by
423        MakeGenerator, otherwise there won't be any binaries to measure.
424
425        @return A SizeCalculator object
426        """
427        elf_filepath = self.get_elf_file()
428        buildlog_filepath = self.get_buildlog_file() if from_buildlog else ''
429        return SizeCalculator(elf_filename=elf_filepath,
430                            extra_sections=self.testsuite.extra_sections,
431                            buildlog_filepath=buildlog_filepath,
432                            generate_warning=generate_warning)
433
434    def get_elf_file(self) -> str:
435
436        if self.sysbuild:
437            build_dir = self.domains.get_default_domain().build_dir
438        else:
439            build_dir = self.build_dir
440
441        fns = glob.glob(os.path.join(build_dir, "zephyr", "*.elf"))
442        fns.extend(glob.glob(os.path.join(build_dir, "testbinary")))
443        blocklist = [
444                'remapped', # used for xtensa plaforms
445                'zefi', # EFI for Zephyr
446                'qemu', # elf files generated after running in qemu
447                '_pre']
448        fns = [x for x in fns if not any(bad in os.path.basename(x) for bad in blocklist)]
449        if not fns:
450            raise BuildError("Missing output binary")
451        elif len(fns) > 1:
452            logger.warning(f"multiple ELF files detected: {', '.join(fns)}")
453        return fns[0]
454
455    def get_buildlog_file(self) -> str:
456        """Get path to build.log file.
457
458        @raises BuildError: Incorrect amount (!=1) of build logs.
459        @return: Path to build.log (str).
460        """
461        buildlog_paths = glob.glob(os.path.join(self.build_dir, "build.log"))
462        if len(buildlog_paths) != 1:
463            raise BuildError("Missing/multiple build.log file.")
464        return buildlog_paths[0]
465
466    def __repr__(self):
467        return f"<TestSuite {self.testsuite.name} on {self.platform.name}>"
468