1# SPDX-License-Identifier: Apache-2.0
2from __future__ import annotations
3
4import json
5import logging
6import os
7import platform
8import re
9import shlex
10import shutil
11import subprocess
12import sys
13import threading
14import time
15import xml.etree.ElementTree as ET
16from collections import OrderedDict
17from enum import Enum
18
19import junitparser.junitparser as junit
20import yaml
21from pytest import ExitCode
22from twisterlib.constants import SUPPORTED_SIMS_IN_PYTEST
23from twisterlib.environment import PYTEST_PLUGIN_INSTALLED, ZEPHYR_BASE
24from twisterlib.error import ConfigurationError, StatusAttributeError
25from twisterlib.handlers import Handler, terminate_process
26from twisterlib.reports import ReportStatus
27from twisterlib.statuses import TwisterStatus
28from twisterlib.testinstance import TestInstance
29
30logger = logging.getLogger('twister')
31
32_WINDOWS = platform.system() == 'Windows'
33
34
35class Harness:
36    GCOV_START = "GCOV_COVERAGE_DUMP_START"
37    GCOV_END = "GCOV_COVERAGE_DUMP_END"
38    FAULT = "ZEPHYR FATAL ERROR"
39    RUN_PASSED = "PROJECT EXECUTION SUCCESSFUL"
40    RUN_FAILED = "PROJECT EXECUTION FAILED"
41    run_id_pattern = r"RunID: (?P<run_id>[0-9A-Fa-f]+)"
42
43    def __init__(self):
44        self._status = TwisterStatus.NONE
45        self.reason = None
46        self.type = None
47        self.regex = []
48        self.matches = OrderedDict()
49        self.ordered = True
50        self.id = None
51        self.fail_on_fault = True
52        self.fault = False
53        self.capture_coverage = False
54        self.next_pattern = 0
55        self.record = None
56        self.record_patterns = []
57        self.record_merge = False
58        self.record_as_json = None
59        self.recording = []
60        self.ztest = False
61        self.detected_suite_names = []
62        self.run_id = None
63        self.started_suites = {}
64        self.started_cases = {}
65        self.matched_run_id = False
66        self.run_id_exists = False
67        self.instance: TestInstance | None = None
68        self.testcase_output = ""
69        self._match = False
70
71
72    @property
73    def trace(self) -> bool:
74        return self.instance.handler.options.verbose > 2
75
76    @property
77    def status(self) -> TwisterStatus:
78        return self._status
79
80    @status.setter
81    def status(self, value : TwisterStatus) -> None:
82        # Check for illegal assignments by value
83        try:
84            key = value.name if isinstance(value, Enum) else value
85            self._status = TwisterStatus[key]
86        except KeyError as err:
87            raise StatusAttributeError(self.__class__, value) from err
88
89    def configure(self, instance):
90        self.instance = instance
91        config = instance.testsuite.harness_config
92        self.id = instance.testsuite.id
93        self.run_id = instance.run_id
94        self.expect_reboot = getattr(instance.testsuite, 'expect_reboot', False)
95        if instance.testsuite.ignore_faults:
96            self.fail_on_fault = False
97
98        if config:
99            self.type = config.get('type', None)
100            self.regex = config.get('regex', [])
101            self.ordered = config.get('ordered', True)
102            self.record = config.get('record', {})
103            if self.record:
104                self.record_patterns = [re.compile(p) for p in self.record.get("regex", [])]
105                self.record_merge = self.record.get("merge", False)
106                self.record_as_json = self.record.get("as_json")
107
108    def build(self):
109        pass
110
111    def get_testcase_name(self):
112        """
113        Get current TestCase name.
114        """
115        return self.id
116
117    def translate_record(self, record: dict) -> dict:
118        if self.record_as_json:
119            for k in self.record_as_json:
120                if k not in record:
121                    continue
122                try:
123                    record[k] = json.loads(record[k]) if record[k] else {}
124                except json.JSONDecodeError as parse_error:
125                    logger.warning(f"HARNESS:{self.__class__.__name__}: recording JSON failed:"
126                                   f" {parse_error} for '{k}':'{record[k]}'")
127                    # Don't set the Harness state to failed for recordings.
128                    record[k] = { 'ERROR': { 'msg': str(parse_error), 'doc': record[k] } }
129        return record
130
131    def parse_record(self, line) -> int:
132        match_cnt = 0
133        for record_pattern in self.record_patterns:
134            match = record_pattern.search(line)
135            if match:
136                match_cnt += 1
137                rec = self.translate_record(
138                    { k:v.strip() for k,v in match.groupdict(default="").items() }
139                )
140                if self.record_merge and len(self.recording) > 0:
141                    for k,v in rec.items():
142                        if k in self.recording[0]:
143                            if isinstance(self.recording[0][k], list):
144                                self.recording[0][k].append(v)
145                            else:
146                                self.recording[0][k] = [self.recording[0][k], v]
147                        else:
148                            self.recording[0][k] = v
149                else:
150                    self.recording.append(rec)
151        return match_cnt
152
153    def process_test(self, line):
154
155        self.parse_record(line)
156
157        runid_match = re.search(self.run_id_pattern, line)
158        if runid_match:
159            run_id = runid_match.group("run_id")
160            self.run_id_exists = True
161            if run_id == str(self.run_id):
162                self.matched_run_id = True
163
164        if self.RUN_PASSED in line:
165            if self.fault:
166                self.status = TwisterStatus.FAIL
167                self.reason = "Fault detected while running test"
168            else:
169                self.status = TwisterStatus.PASS
170
171        if self.RUN_FAILED in line:
172            self.status = TwisterStatus.FAIL
173            self.reason = "Testsuite failed"
174
175        if self.fail_on_fault and line == self.FAULT:
176            self.fault = True
177
178        if self.GCOV_START in line:
179            self.capture_coverage = True
180        elif self.GCOV_END in line:
181            self.capture_coverage = False
182
183class Robot(Harness):
184
185    is_robot_test = True
186
187    def configure(self, instance):
188        super().configure(instance)
189        self.instance = instance
190
191        config = instance.testsuite.harness_config
192        if config:
193            self.path = config.get('robot_testsuite', None)
194            self.option = config.get('robot_option', None)
195
196    def handle(self, line):
197        ''' Test cases that make use of this harness care about results given
198            by Robot Framework which is called in run_robot_test(), so works of this
199            handle is trying to give a PASS or FAIL to avoid timeout, nothing
200            is written into handler.log
201        '''
202        self.instance.status = TwisterStatus.PASS
203        tc = self.instance.get_case_or_create(self.id)
204        tc.status = TwisterStatus.PASS
205
206    def run_robot_test(self, command, handler):
207        start_time = time.time()
208        env = os.environ.copy()
209
210        if self.option:
211            if isinstance(self.option, list):
212                for option in self.option:
213                    for v in str(option).split():
214                        command.append(f'{v}')
215            else:
216                for v in str(self.option).split():
217                    command.append(f'{v}')
218
219        if self.path is None:
220            raise PytestHarnessException('The parameter robot_testsuite is mandatory')
221
222        if isinstance(self.path, list):
223            for suite in self.path:
224                command.append(os.path.join(handler.sourcedir, suite))
225        else:
226            command.append(os.path.join(handler.sourcedir, self.path))
227
228        with subprocess.Popen(command, stdout=subprocess.PIPE,
229                stderr=subprocess.STDOUT, cwd=self.instance.build_dir, env=env) as renode_test_proc:
230            out, _ = renode_test_proc.communicate()
231
232            self.instance.execution_time = time.time() - start_time
233
234            if renode_test_proc.returncode == 0:
235                self.instance.status = TwisterStatus.PASS
236                # all tests in one Robot file are treated as a single test case,
237                # so its status should be set accordingly to the instance status
238                # please note that there should be only one testcase in testcases list
239                self.instance.testcases[0].status = TwisterStatus.PASS
240            else:
241                logger.error(
242                    f"Robot test failure: {handler.sourcedir} for {self.instance.platform.name}"
243                )
244                self.instance.status = TwisterStatus.FAIL
245                self.instance.testcases[0].status = TwisterStatus.FAIL
246
247            if out:
248                with open(os.path.join(self.instance.build_dir, handler.log), 'w') as log:
249                    log_msg = out.decode(sys.getdefaultencoding())
250                    log.write(log_msg)
251
252class Console(Harness):
253
254    def get_testcase_name(self):
255        '''
256        Get current TestCase name.
257
258        Console Harness id has only TestSuite id without TestCase name suffix.
259        Only the first TestCase name might be taken if available when a Ztest with
260        a single test case is configured to use this harness type for simplified
261        output parsing instead of the Ztest harness as Ztest suite should do.
262        '''
263        if self.instance and len(self.instance.testcases) == 1:
264            return self.instance.testcases[0].name
265        return super().get_testcase_name()
266
267    def configure(self, instance):
268        super().configure(instance)
269        if self.regex is None or len(self.regex) == 0:
270            self.status = TwisterStatus.FAIL
271            tc = self.instance.set_case_status_by_name(
272                self.get_testcase_name(),
273                TwisterStatus.FAIL,
274                f"HARNESS:{self.__class__.__name__}:no regex patterns configured."
275            )
276            raise ConfigurationError(self.instance.name, tc.reason)
277        if self.type == "one_line":
278            self.pattern = re.compile(self.regex[0])
279            self.patterns_expected = 1
280        elif self.type == "multi_line":
281            self.patterns = []
282            for r in self.regex:
283                self.patterns.append(re.compile(r))
284            self.patterns_expected = len(self.patterns)
285        else:
286            self.status = TwisterStatus.FAIL
287            tc = self.instance.set_case_status_by_name(
288                self.get_testcase_name(),
289                TwisterStatus.FAIL,
290                f"HARNESS:{self.__class__.__name__}:incorrect type={self.type}"
291            )
292            raise ConfigurationError(self.instance.name, tc.reason)
293        #
294
295    def handle(self, line):
296        if self.type == "one_line":
297            if self.pattern.search(line):
298                logger.debug(f"HARNESS:{self.__class__.__name__}:EXPECTED:"
299                             f"'{self.pattern.pattern}'")
300                self.next_pattern += 1
301                self.status = TwisterStatus.PASS
302        elif self.type == "multi_line" and self.ordered:
303            if (self.next_pattern < len(self.patterns) and
304                self.patterns[self.next_pattern].search(line)):
305                logger.debug(f"HARNESS:{self.__class__.__name__}:EXPECTED("
306                             f"{self.next_pattern + 1}/{self.patterns_expected}):"
307                             f"'{self.patterns[self.next_pattern].pattern}'")
308                self.next_pattern += 1
309                if self.next_pattern >= len(self.patterns):
310                    self.status = TwisterStatus.PASS
311        elif self.type == "multi_line" and not self.ordered:
312            for i, pattern in enumerate(self.patterns):
313                r = self.regex[i]
314                if pattern.search(line) and r not in self.matches:
315                    self.matches[r] = line
316                    logger.debug(f"HARNESS:{self.__class__.__name__}:EXPECTED("
317                                 f"{len(self.matches)}/{self.patterns_expected}):"
318                                 f"'{pattern.pattern}'")
319            if len(self.matches) == len(self.regex):
320                self.status = TwisterStatus.PASS
321        else:
322            logger.error("Unknown harness_config type")
323
324        if self.fail_on_fault and self.FAULT in line:
325            self.fault = True
326
327        if self.GCOV_START in line:
328            self.capture_coverage = True
329        elif self.GCOV_END in line:
330            self.capture_coverage = False
331
332        self.process_test(line)
333        # Reset the resulting test state to FAIL when not all of the patterns were
334        # found in the output, but just ztest's 'PROJECT EXECUTION SUCCESSFUL'.
335        # It might happen because of the pattern sequence diverged from the
336        # test code, the test platform has console issues, or even some other
337        # test image was executed.
338        # TODO: Introduce explicit match policy type to reject
339        # unexpected console output, allow missing patterns, deny duplicates.
340        if self.status == TwisterStatus.PASS and \
341           self.ordered and \
342           self.next_pattern < self.patterns_expected:
343            logger.error(f"HARNESS:{self.__class__.__name__}: failed with"
344                         f" {self.next_pattern} of {self.patterns_expected}"
345                         f" expected ordered patterns.")
346            self.status = TwisterStatus.FAIL
347            self.reason = "patterns did not match (ordered)"
348        if self.status == TwisterStatus.PASS and \
349           not self.ordered and \
350           len(self.matches) < self.patterns_expected:
351            logger.error(f"HARNESS:{self.__class__.__name__}: failed with"
352                         f" {len(self.matches)} of {self.patterns_expected}"
353                         f" expected unordered patterns.")
354            self.status = TwisterStatus.FAIL
355            self.reason = "patterns did not match (unordered)"
356
357        tc = self.instance.get_case_or_create(self.get_testcase_name())
358        if self.status == TwisterStatus.PASS:
359            tc.status = TwisterStatus.PASS
360        else:
361            tc.status = TwisterStatus.FAIL
362
363
364class PytestHarnessException(Exception):
365    """General exception for pytest."""
366
367
368class Pytest(Harness):
369
370    def configure(self, instance: TestInstance):
371        super().configure(instance)
372        self.running_dir = instance.build_dir
373        self.source_dir = instance.testsuite.source_dir
374        self.report_file = os.path.join(self.running_dir, 'report.xml')
375        self.pytest_log_file_path = os.path.join(self.running_dir, 'twister_harness.log')
376        self.reserved_dut = None
377        self._output = []
378
379    def pytest_run(self, timeout):
380        try:
381            cmd = self.generate_command()
382            self.run_command(cmd, timeout)
383        except PytestHarnessException as pytest_exception:
384            logger.error(str(pytest_exception))
385            self.status = TwisterStatus.FAIL
386            self.instance.reason = str(pytest_exception)
387        finally:
388            self.instance.record(self.recording)
389            self._update_test_status()
390            if self.reserved_dut:
391                self.instance.handler.make_dut_available(self.reserved_dut)
392
393    def generate_command(self):
394        config = self.instance.testsuite.harness_config
395        handler: Handler = self.instance.handler
396        pytest_root = config.get('pytest_root', ['pytest']) if config else ['pytest']
397        pytest_args_yaml = config.get('pytest_args', []) if config else []
398        pytest_dut_scope = config.get('pytest_dut_scope', None) if config else None
399        command = [
400            'pytest',
401            '--twister-harness',
402            '-s', '-v',
403            f'--build-dir={self.running_dir}',
404            f'--junit-xml={self.report_file}',
405            f'--platform={self.instance.platform.name}'
406        ]
407
408        command.extend([os.path.normpath(os.path.join(
409            self.source_dir, os.path.expanduser(os.path.expandvars(src)))) for src in pytest_root])
410
411        if pytest_dut_scope:
412            command.append(f'--dut-scope={pytest_dut_scope}')
413
414        # Always pass output from the pytest test and the test image up to Twister log.
415        command.extend([
416            '--log-cli-level=DEBUG',
417            '--log-cli-format=%(levelname)s: %(message)s'
418        ])
419
420        # Use the test timeout as the base timeout for pytest
421        base_timeout = handler.get_test_timeout()
422        command.append(f'--base-timeout={base_timeout}')
423
424        if handler.type_str == 'device':
425            command.extend(
426                self._generate_parameters_for_hardware(handler)
427            )
428        elif handler.type_str in SUPPORTED_SIMS_IN_PYTEST:
429            command.append(f'--device-type={handler.type_str}')
430        elif handler.type_str == 'build':
431            command.append('--device-type=custom')
432        else:
433            raise PytestHarnessException(
434                f'Support for handler {handler.type_str} not implemented yet'
435            )
436
437        if handler.type_str != 'device':
438            for fixture in handler.options.fixture:
439                command.append(f'--twister-fixture={fixture}')
440
441        if handler.options.extra_test_args and handler.type_str == 'native':
442            command.append(f'--extra-test-args={shlex.join(handler.options.extra_test_args)}')
443
444        command.extend(pytest_args_yaml)
445
446        if handler.options.pytest_args:
447            command.extend(handler.options.pytest_args)
448
449        return command
450
451    def _generate_parameters_for_hardware(self, handler: Handler):
452        command = ['--device-type=hardware']
453        hardware = handler.get_hardware()
454        if not hardware:
455            raise PytestHarnessException('Hardware is not available')
456        # update the instance with the device id to have it in the summary report
457        self.instance.dut = hardware.id
458
459        self.reserved_dut = hardware
460        if hardware.serial_pty:
461            command.append(f'--device-serial-pty={hardware.serial_pty}')
462        else:
463            command.extend([
464                f'--device-serial={hardware.serial}',
465                f'--device-serial-baud={hardware.baud}'
466            ])
467
468        if hardware.flash_timeout:
469            command.append(f'--flash-timeout={hardware.flash_timeout}')
470
471        options = handler.options
472        if runner := hardware.runner or options.west_runner:
473            command.append(f'--runner={runner}')
474
475        if hardware.runner_params:
476            for param in hardware.runner_params:
477                command.append(f'--runner-params={param}')
478
479        if options.west_flash and options.west_flash != []:
480            command.append(f'--west-flash-extra-args={options.west_flash}')
481
482        if board_id := hardware.probe_id or hardware.id:
483            command.append(f'--device-id={board_id}')
484
485        if hardware.product:
486            command.append(f'--device-product={hardware.product}')
487
488        if hardware.pre_script:
489            command.append(f'--pre-script={hardware.pre_script}')
490
491        if hardware.post_flash_script:
492            command.append(f'--post-flash-script={hardware.post_flash_script}')
493
494        if hardware.post_script:
495            command.append(f'--post-script={hardware.post_script}')
496
497        if hardware.flash_before:
498            command.append(f'--flash-before={hardware.flash_before}')
499
500        for fixture in hardware.fixtures:
501            command.append(f'--twister-fixture={fixture}')
502
503        return command
504
505    def run_command(self, cmd, timeout):
506        cmd, env = self._update_command_with_env_dependencies(cmd)
507        with subprocess.Popen(
508            cmd,
509            stdout=subprocess.PIPE,
510            stderr=subprocess.STDOUT,
511            env=env
512        ) as proc:
513            try:
514                reader_t = threading.Thread(target=self._output_reader, args=(proc,), daemon=True)
515                reader_t.start()
516                reader_t.join(timeout)
517                if reader_t.is_alive():
518                    terminate_process(proc)
519                    logger.warning('Timeout has occurred. Can be extended in testspec file. '
520                                   f'Currently set to {timeout} seconds.')
521                    self.instance.reason = 'Pytest timeout'
522                    self.status = TwisterStatus.FAIL
523                proc.wait(timeout)
524            except subprocess.TimeoutExpired:
525                self.status = TwisterStatus.FAIL
526                proc.kill()
527
528        if proc.returncode in (ExitCode.INTERRUPTED, ExitCode.USAGE_ERROR, ExitCode.INTERNAL_ERROR):
529            self.status = TwisterStatus.ERROR
530            self.instance.reason = f'Pytest error - return code {proc.returncode}'
531        with open(self.pytest_log_file_path, 'w') as log_file:
532            log_file.write(shlex.join(cmd) + '\n\n')
533            log_file.write('\n'.join(self._output))
534
535    @staticmethod
536    def _update_command_with_env_dependencies(cmd):
537        '''
538        If python plugin wasn't installed by pip, then try to indicate it to
539        pytest by update PYTHONPATH and append -p argument to pytest command.
540        '''
541        env = os.environ.copy()
542        if not PYTEST_PLUGIN_INSTALLED:
543            cmd.extend(['-p', 'twister_harness.plugin'])
544            pytest_plugin_path = os.path.join(
545                ZEPHYR_BASE,
546                'scripts',
547                'pylib',
548                'pytest-twister-harness',
549                'src'
550            )
551            env['PYTHONPATH'] = pytest_plugin_path + os.pathsep + env.get('PYTHONPATH', '')
552            if _WINDOWS:
553                cmd_append_python_path = f'set PYTHONPATH={pytest_plugin_path};%PYTHONPATH% && '
554            else:
555                cmd_append_python_path = (
556                    f'export PYTHONPATH={pytest_plugin_path}:${{PYTHONPATH}} && '
557                )
558        else:
559            cmd_append_python_path = ''
560        cmd_to_print = cmd_append_python_path + shlex.join(cmd)
561        logger.debug(f'Running pytest command: {cmd_to_print}')
562
563        return cmd, env
564
565    def _output_reader(self, proc):
566        self._output = []
567        while proc.stdout.readable() and proc.poll() is None:
568            line = proc.stdout.readline().decode().strip()
569            if not line:
570                continue
571            self._output.append(line)
572            logger.debug(f'PYTEST: {line}')
573            self.parse_record(line)
574        proc.communicate()
575
576    def _update_test_status(self):
577        if self.status == TwisterStatus.NONE:
578            self.instance.testcases = []
579            try:
580                self._parse_report_file(self.report_file)
581            except Exception as e:
582                logger.error(f'Error when parsing file {self.report_file}: {e}')
583                self.status = TwisterStatus.FAIL
584            finally:
585                if not self.instance.testcases:
586                    self.instance.init_cases()
587
588        self.instance.status = self.status if self.status != TwisterStatus.NONE else \
589                               TwisterStatus.FAIL
590        if self.instance.status in [TwisterStatus.ERROR, TwisterStatus.FAIL]:
591            self.instance.reason = self.instance.reason or 'Pytest failed'
592            self.instance.add_missing_case_status(TwisterStatus.BLOCK, self.instance.reason)
593
594    def _parse_report_file(self, report):
595        tree = ET.parse(report)
596        root = tree.getroot()
597
598        if (elem_ts := root.find('testsuite')) is not None:
599            if elem_ts.get('failures') != '0':
600                self.status = TwisterStatus.FAIL
601                self.instance.reason = (
602                    f"{elem_ts.get('failures')}/{elem_ts.get('tests')} pytest scenario(s) failed"
603                )
604            elif elem_ts.get('errors') != '0':
605                self.status = TwisterStatus.ERROR
606                self.instance.reason = 'Error during pytest execution'
607            elif elem_ts.get('skipped') == elem_ts.get('tests'):
608                self.status = TwisterStatus.SKIP
609            else:
610                self.status = TwisterStatus.PASS
611            self.instance.execution_time = float(elem_ts.get('time'))
612
613            for elem_tc in elem_ts.findall('testcase'):
614                tc = self.instance.add_testcase(f"{self.id}.{elem_tc.get('name')}")
615                tc.duration = float(elem_tc.get('time'))
616                elem = elem_tc.find('*')
617                if elem is None:
618                    tc.status = TwisterStatus.PASS
619                else:
620                    if elem.tag == ReportStatus.SKIP:
621                        tc.status = TwisterStatus.SKIP
622                    elif elem.tag == ReportStatus.FAIL:
623                        tc.status = TwisterStatus.FAIL
624                    else:
625                        tc.status = TwisterStatus.ERROR
626                    tc.reason = elem.get('message')
627                    tc.output = elem.text
628        else:
629            self.status = TwisterStatus.SKIP
630            self.instance.reason = 'No tests collected'
631
632class Display_capture(Pytest):
633    def generate_command(self):
634        config = self.instance.testsuite.harness_config
635        pytest_root = [os.path.join(ZEPHYR_BASE, 'scripts', 'pylib', 'display-twister-harness')]
636        config['pytest_root'] = pytest_root
637
638        command = super().generate_command()
639        if test_config_file := self._get_display_config_file(config):
640            command.append(f'--config={test_config_file}')
641        else:
642            logger.warning('No config file provided')
643        return command
644
645    def _get_display_config_file(self, harness_config):
646        if test_config_file := harness_config.get('display_capture_config'):
647            test_config_path = os.path.join(self.source_dir, test_config_file)
648            logger.info(f'test_config_path = {test_config_path}')
649            if os.path.exists(test_config_path):
650                return test_config_path
651        return None
652
653
654class Shell(Pytest):
655    def generate_command(self):
656        config = self.instance.testsuite.harness_config
657        pytest_root = [os.path.join(ZEPHYR_BASE, 'scripts', 'pylib', 'shell-twister-harness')]
658        config['pytest_root'] = pytest_root
659
660        command = super().generate_command()
661        if test_shell_file := self._get_shell_commands_file(config):
662            command.append(f'--testdata={test_shell_file}')
663        else:
664            logger.warning('No shell commands provided')
665        return command
666
667    def _get_shell_commands_file(self, harness_config):
668        if shell_commands := harness_config.get('shell_commands'):
669            test_shell_file = os.path.join(self.running_dir, 'test_shell.yml')
670            with open(test_shell_file, 'w') as f:
671                yaml.dump(shell_commands, f)
672            return test_shell_file
673
674        test_shell_file = harness_config.get('shell_commands_file', 'test_shell.yml')
675        test_shell_file = os.path.join(
676            self.source_dir, os.path.expanduser(os.path.expandvars(test_shell_file))
677        )
678        if os.path.exists(test_shell_file):
679            return test_shell_file
680        return None
681
682class Power(Pytest):
683    def generate_command(self):
684        config = self.instance.testsuite.harness_config
685        pytest_root = [os.path.join(ZEPHYR_BASE, 'scripts', 'pylib', 'power-twister-harness')]
686        config['pytest_root'] = pytest_root
687
688        command = super().generate_command()
689
690        if self.instance.testsuite.harness == 'power':
691            measurements = config.get('power_measurements')
692            command.append(f'--testdata={measurements}')
693        return command
694
695class Gtest(Harness):
696    ANSI_ESCAPE = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')
697    _NAME_PATTERN = "[a-zA-Z_][a-zA-Z0-9_]*"
698    _SUITE_TEST_NAME_PATTERN = f"(?P<suite_name>{_NAME_PATTERN})\\.(?P<test_name>{_NAME_PATTERN})"
699    TEST_START_PATTERN = f".*\\[ RUN      \\] {_SUITE_TEST_NAME_PATTERN}"
700    TEST_PASS_PATTERN = f".*\\[       OK \\] {_SUITE_TEST_NAME_PATTERN}"
701    TEST_SKIP_PATTERN = f".*\\[ DISABLED \\] {_SUITE_TEST_NAME_PATTERN}"
702    TEST_FAIL_PATTERN = f".*\\[  FAILED  \\] {_SUITE_TEST_NAME_PATTERN}"
703    FINISHED_PATTERN = (
704        ".*(?:\\[==========\\] Done running all tests\\.|"
705        + "\\[----------\\] Global test environment tear-down)"
706    )
707
708    def __init__(self):
709        super().__init__()
710        self.tc = None
711        self.has_failures = False
712
713    def handle(self, line):
714        # Strip the ANSI characters, they mess up the patterns
715        non_ansi_line = self.ANSI_ESCAPE.sub('', line)
716
717        if self.status != TwisterStatus.NONE:
718            return
719
720        # Check if we started running a new test
721        test_start_match = re.search(self.TEST_START_PATTERN, non_ansi_line)
722        if test_start_match:
723            # Add the suite name
724            suite_name = test_start_match.group("suite_name")
725            if suite_name not in self.detected_suite_names:
726                self.detected_suite_names.append(suite_name)
727
728            # Generate the internal name of the test
729            name = "{}.{}.{}".format(self.id, suite_name, test_start_match.group("test_name"))
730
731            # Assert that we don't already have a running test
732            assert (
733                self.tc is None
734            ), f"gTest error, {self.tc} didn't finish"
735
736            # Check that the instance doesn't exist yet (prevents re-running)
737            tc = self.instance.get_case_by_name(name)
738            assert tc is None, f"gTest error, {tc} running twice"
739
740            # Create the test instance and set the context
741            tc = self.instance.get_case_or_create(name)
742            self.tc = tc
743            self.tc.status = TwisterStatus.STARTED
744            self.testcase_output += line + "\n"
745            self._match = True
746
747        # Check if the test run finished
748        finished_match = re.search(self.FINISHED_PATTERN, non_ansi_line)
749        if finished_match:
750            tc = self.instance.get_case_or_create(self.id)
751            if self.has_failures or self.tc is not None:
752                self.status = TwisterStatus.FAIL
753                tc.status = TwisterStatus.FAIL
754            else:
755                self.status = TwisterStatus.PASS
756                tc.status = TwisterStatus.PASS
757            return
758
759        # Check if the individual test finished
760        state, name = self._check_result(non_ansi_line)
761        if state == TwisterStatus.NONE or name is None:
762            # Nothing finished, keep processing lines
763            return
764
765        # Get the matching test and make sure it's the same as the current context
766        tc = self.instance.get_case_by_name(name)
767        assert (
768            tc is not None and tc == self.tc
769        ), f"gTest error, mismatched tests. Expected {self.tc} but got {tc}"
770
771        # Test finished, clear the context
772        self.tc = None
773
774        # Update the status of the test
775        tc.status = state
776        if tc.status == TwisterStatus.FAIL:
777            self.has_failures = True
778            tc.output = self.testcase_output
779        self.testcase_output = ""
780        self._match = False
781
782    def _check_result(self, line):
783        test_pass_match = re.search(self.TEST_PASS_PATTERN, line)
784        if test_pass_match:
785            return TwisterStatus.PASS, \
786                   "{}.{}.{}".format(
787                        self.id, test_pass_match.group("suite_name"),
788                        test_pass_match.group("test_name")
789                    )
790        test_skip_match = re.search(self.TEST_SKIP_PATTERN, line)
791        if test_skip_match:
792            return TwisterStatus.SKIP, \
793                   "{}.{}.{}".format(
794                       self.id, test_skip_match.group("suite_name"),
795                       test_skip_match.group("test_name")
796                    )
797        test_fail_match = re.search(self.TEST_FAIL_PATTERN, line)
798        if test_fail_match:
799            return TwisterStatus.FAIL, \
800                   "{}.{}.{}".format(
801                       self.id, test_fail_match.group("suite_name"),
802                       test_fail_match.group("test_name")
803                    )
804        return None, None
805
806
807class Test(Harness):
808    __test__ = False  # for pytest to skip this class when collects tests
809
810    # Ztest log patterns don't require to match the line start exactly: there are platforms
811    # where there is some logging prefix at each console line whereas on other platforms
812    # without prefixes the leading space is stripped.
813    test_suite_start_pattern = re.compile(r"Running TESTSUITE (?P<suite_name>\w*)")
814    test_suite_end_pattern = re.compile(
815        r"TESTSUITE (?P<suite_name>\S*)\s+(?P<suite_status>succeeded|failed)"
816    )
817    test_case_start_pattern = re.compile(r"START - (test_)?([a-zA-Z0-9_-]+)")
818    test_case_end_pattern = re.compile(
819        r".*(PASS|FAIL|SKIP) - (test_)?(\S*) in (\d*[.,]?\d*) seconds"
820    )
821    test_suite_summary_pattern = re.compile(
822        r"SUITE (?P<suite_status>\S*) - .* \[(?P<suite_name>\S*)\]:"
823        r" .* duration = (\d*[.,]?\d*) seconds"
824    )
825    test_case_summary_pattern = re.compile(
826        r".*- (PASS|FAIL|SKIP) - \[([^\.]*).(test_)?(\S*)\] duration = (\d*[.,]?\d*) seconds"
827    )
828
829
830    def get_testcase(self, tc_name, phase, ts_name=None):
831        """ Search a Ztest case among detected in the test image binary
832            expecting the same test names as already known from the ELF.
833            Track suites and cases unexpectedly found in the log.
834        """
835        ts_names = self.started_suites.keys()
836        if ts_name:
837            if self.trace and ts_name not in self.instance.testsuite.ztest_suite_names:
838                # This can happen if a ZTEST_SUITE name is macro-generated
839                # in the test source files, e.g. based on DT information.
840                logger.debug(f"{phase}: unexpected Ztest suite '{ts_name}' is "
841                             f"not present among: {self.instance.testsuite.ztest_suite_names}")
842            if ts_name not in self.detected_suite_names:
843                if self.trace:
844                    logger.debug(f"{phase}: detected new Ztest suite '{ts_name}'")
845                self.detected_suite_names.append(ts_name)
846            ts_names = [ ts_name ] if ts_name in ts_names else []
847
848        # First, try to match the test case ID to the first running Ztest suite with this test name.
849        for ts_name_ in ts_names:
850            if self.started_suites[ts_name_]['count'] < (0 if phase == 'TS_SUM' else 1):
851                continue
852            tc_fq_id = self.instance.compose_case_name(f"{ts_name_}.{tc_name}")
853            if tc := self.instance.get_case_by_name(tc_fq_id):
854                if self.trace:
855                    logger.debug(f"{phase}: Ztest case '{tc_name}' matched to '{tc_fq_id}")
856                return tc
857        logger.debug(
858            f"{phase}: Ztest case '{tc_name}' is not known"
859            f" in {self.started_suites} running suite(s)."
860        )
861        tc_id = self.instance.compose_case_name(tc_name)
862        return self.instance.get_case_or_create(tc_id)
863
864    def start_suite(self, suite_name, phase='TS_START'):
865        if suite_name not in self.detected_suite_names:
866            self.detected_suite_names.append(suite_name)
867        if self.trace and suite_name not in self.instance.testsuite.ztest_suite_names:
868            # This can happen if a ZTEST_SUITE name is macro-generated
869            # in the test source files, e.g. based on DT information.
870            logger.debug(f"{phase}: unexpected Ztest suite '{suite_name}' is "
871                         f"not present among: {self.instance.testsuite.ztest_suite_names}")
872        if suite_name in self.started_suites:
873            if self.started_suites[suite_name]['count'] > 0 and not self.expect_reboot:
874                # Either the suite restarts itself or unexpected state transition.
875                logger.warning(f"{phase}: already STARTED '{suite_name}':"
876                               f"{self.started_suites[suite_name]}")
877            elif self.trace:
878                logger.debug(f"{phase}: START suite '{suite_name}'")
879            self.started_suites[suite_name]['count'] += 1
880            self.started_suites[suite_name]['repeat'] += 1
881        else:
882            self.started_suites[suite_name] = { 'count': 1, 'repeat': 0 }
883
884    def end_suite(self, suite_name, phase='TS_END', suite_status=None):
885        if suite_name in self.started_suites:
886            if phase == 'TS_SUM' and self.started_suites[suite_name]['count'] == 0:
887                return
888            if self.started_suites[suite_name]['count'] < 1:
889                logger.error(
890                    f"{phase}: already ENDED suite '{suite_name}':{self.started_suites[suite_name]}"
891                )
892            elif self.trace:
893                logger.debug(f"{phase}: END suite '{suite_name}':{self.started_suites[suite_name]}")
894            self.started_suites[suite_name]['count'] -= 1
895        elif suite_status == 'SKIP':
896            self.start_suite(suite_name, phase)  # register skipped suites at their summary end
897            self.started_suites[suite_name]['count'] -= 1
898        else:
899            logger.warning(f"{phase}: END suite '{suite_name}' without START detected")
900
901    def start_case(self, tc_name, phase='TC_START'):
902        if tc_name in self.started_cases:
903            if self.started_cases[tc_name]['count'] > 0 and not self.expect_reboot:
904                logger.warning(f"{phase}: already STARTED case "
905                               f"'{tc_name}':{self.started_cases[tc_name]}")
906            self.started_cases[tc_name]['count'] += 1
907        else:
908            self.started_cases[tc_name] = { 'count': 1 }
909
910    def end_case(self, tc_name, phase='TC_END'):
911        if tc_name in self.started_cases:
912            if phase == 'TS_SUM' and self.started_cases[tc_name]['count'] == 0:
913                return
914            if self.started_cases[tc_name]['count'] < 1:
915                logger.error(
916                    f"{phase}: already ENDED case '{tc_name}':{self.started_cases[tc_name]}"
917                )
918            elif self.trace:
919                logger.debug(f"{phase}: END case '{tc_name}':{self.started_cases[tc_name]}")
920            self.started_cases[tc_name]['count'] -= 1
921        elif phase != 'TS_SUM':
922            logger.warning(f"{phase}: END case '{tc_name}' without START detected")
923
924    def handle(self, line):
925        testcase_match = None
926        if self._match:
927            self.testcase_output += line + "\n"
928        if test_suite_start_match := re.search(self.test_suite_start_pattern, line):
929            self.start_suite(test_suite_start_match.group("suite_name"))
930        elif test_suite_end_match := re.search(self.test_suite_end_pattern, line):
931            suite_name=test_suite_end_match.group("suite_name")
932            self.end_suite(suite_name)
933            self.ztest = True
934        elif testcase_match := re.search(self.test_case_start_pattern, line):
935            tc_name = testcase_match.group(2)
936            tc = self.get_testcase(tc_name, 'TC_START')
937            self.start_case(tc.name)
938            # Mark the test as started, if something happens here, it is mostly
939            # due to this tests, for example timeout. This should in this case
940            # be marked as failed and not blocked (not run).
941            tc.status = TwisterStatus.STARTED
942            if not self._match:
943                self.testcase_output += line + "\n"
944                self._match = True
945        # some testcases are skipped based on predicates and do not show up
946        # during test execution, however they are listed in the summary. Parse
947        # the summary for status and use that status instead.
948        elif result_match := self.test_case_end_pattern.match(line):
949            matched_status = result_match.group(1)
950            tc_name = result_match.group(3)
951            tc = self.get_testcase(tc_name, 'TC_END')
952            self.end_case(tc.name)
953            tc.status = TwisterStatus[matched_status]
954            if tc.status == TwisterStatus.SKIP:
955                tc.reason = "ztest skip"
956            tc.duration = float(result_match.group(4))
957            if tc.status == TwisterStatus.FAIL:
958                tc.output = self.testcase_output
959            self.testcase_output = ""
960            self._match = False
961            self.ztest = True
962        elif test_suite_summary_match := self.test_suite_summary_pattern.match(line):
963            suite_name=test_suite_summary_match.group("suite_name")
964            suite_status=test_suite_summary_match.group("suite_status")
965            self._match = False
966            self.ztest = True
967            self.end_suite(suite_name, 'TS_SUM', suite_status=suite_status)
968        elif test_case_summary_match := self.test_case_summary_pattern.match(line):
969            matched_status = test_case_summary_match.group(1)
970            suite_name = test_case_summary_match.group(2)
971            tc_name = test_case_summary_match.group(4)
972            tc = self.get_testcase(tc_name, 'TS_SUM', suite_name)
973            self.end_case(tc.name, 'TS_SUM')
974            if tc.status not in [TwisterStatus.NONE, TwisterStatus[matched_status]]:
975                # TestCase miss its END log entry, so its status is from the Suite summary.
976                logger.warning(
977                    f"TS_SUM: {tc.name} force status: {tc.status}->{TwisterStatus[matched_status]}"
978                )
979            tc.status = TwisterStatus[matched_status]
980            if tc.status == TwisterStatus.SKIP:
981                tc.reason = "ztest skip"
982            tc.duration = float(test_case_summary_match.group(5))
983            if tc.status == TwisterStatus.FAIL:
984                tc.output = self.testcase_output
985            self.testcase_output = ""
986            self._match = False
987            self.ztest = True
988
989        self.process_test(line)
990
991        if not self.ztest and self.status != TwisterStatus.NONE:
992            logger.debug(f"{self.id} is not a Ztest, status:{self.status}")
993            tc = self.instance.get_case_or_create(self.id)
994            if self.status == TwisterStatus.PASS:
995                tc.status = TwisterStatus.PASS
996            else:
997                tc.status = TwisterStatus.FAIL
998                tc.reason = "Test failure"
999
1000
1001class Ztest(Test):
1002    pass
1003
1004
1005class Bsim(Harness):
1006
1007    def build(self):
1008        """
1009        Copying the application executable to BabbleSim's bin directory enables
1010        running multidevice bsim tests after twister has built them.
1011        """
1012
1013        if self.instance is None:
1014            return
1015
1016        original_exe_path: str = os.path.join(self.instance.build_dir, 'zephyr', 'zephyr.exe')
1017        if not os.path.exists(original_exe_path):
1018            logger.warning('Cannot copy bsim exe - cannot find original executable.')
1019            return
1020
1021        bsim_out_path: str = os.getenv('BSIM_OUT_PATH', '')
1022        if not bsim_out_path:
1023            logger.warning('Cannot copy bsim exe - BSIM_OUT_PATH not provided.')
1024            return
1025
1026        new_exe_name: str = self.instance.testsuite.harness_config.get('bsim_exe_name', '')
1027        if new_exe_name:
1028            new_exe_name = f'bs_{self.instance.platform.name}_{new_exe_name}'
1029        else:
1030            new_exe_name = self.instance.name
1031            new_exe_name = f'bs_{new_exe_name}'
1032
1033        new_exe_name = new_exe_name.replace(os.path.sep, '_').replace('.', '_').replace('@', '_')
1034
1035        new_exe_path: str = os.path.join(bsim_out_path, 'bin', new_exe_name)
1036        logger.debug(f'Copying executable from {original_exe_path} to {new_exe_path}')
1037        shutil.copy(original_exe_path, new_exe_path)
1038
1039class Ctest(Harness):
1040    def configure(self, instance: TestInstance):
1041        super().configure(instance)
1042        self.running_dir = instance.build_dir
1043        self.report_file = os.path.join(self.running_dir, 'report.xml')
1044        self.ctest_log_file_path = os.path.join(self.running_dir, 'twister_harness.log')
1045        self._output = []
1046
1047    def ctest_run(self, timeout):
1048        assert self.instance is not None
1049        try:
1050            cmd = self.generate_command()
1051            self.run_command(cmd, timeout)
1052        except Exception as err:
1053            logger.error(str(err))
1054            self.status = TwisterStatus.FAIL
1055            self.instance.reason = str(err)
1056        finally:
1057            self.instance.record(self.recording)
1058            self._update_test_status()
1059
1060    def generate_command(self):
1061        config = self.instance.testsuite.harness_config
1062        handler: Handler = self.instance.handler
1063        ctest_args_yaml = config.get('ctest_args', []) if config else []
1064        command = [
1065            'ctest',
1066            '--build-nocmake',
1067            '--test-dir',
1068            self.running_dir,
1069            '--output-junit',
1070            self.report_file,
1071            '--output-log',
1072            self.ctest_log_file_path,
1073            '--output-on-failure',
1074        ]
1075        base_timeout = handler.get_test_timeout()
1076        command.extend(['--timeout', str(base_timeout)])
1077        command.extend(ctest_args_yaml)
1078
1079        if handler.options.ctest_args:
1080            command.extend(handler.options.ctest_args)
1081
1082        return command
1083
1084    def run_command(self, cmd, timeout):
1085        with subprocess.Popen(
1086            cmd,
1087            stdout=subprocess.PIPE,
1088            stderr=subprocess.STDOUT,
1089        ) as proc:
1090            try:
1091                reader_t = threading.Thread(target=self._output_reader, args=(proc,), daemon=True)
1092                reader_t.start()
1093                reader_t.join(timeout)
1094                if reader_t.is_alive():
1095                    terminate_process(proc)
1096                    logger.warning('Timeout has occurred. Can be extended in testspec file. '
1097                                   f'Currently set to {timeout} seconds.')
1098                    self.instance.reason = 'Ctest timeout'
1099                    self.status = TwisterStatus.FAIL
1100                proc.wait(timeout)
1101            except subprocess.TimeoutExpired:
1102                self.status = TwisterStatus.FAIL
1103                proc.kill()
1104
1105        if proc.returncode in (ExitCode.INTERRUPTED, ExitCode.USAGE_ERROR, ExitCode.INTERNAL_ERROR):
1106            self.status = TwisterStatus.ERROR
1107            self.instance.reason = f'Ctest error - return code {proc.returncode}'
1108            with open(self.ctest_log_file_path, 'w') as log_file:
1109                log_file.write(shlex.join(cmd) + '\n\n')
1110                log_file.write('\n'.join(self._output))
1111
1112    def _output_reader(self, proc):
1113        self._output = []
1114        while proc.stdout.readable() and proc.poll() is None:
1115            line = proc.stdout.readline().decode().strip()
1116            if not line:
1117                continue
1118            self._output.append(line)
1119            logger.debug(f'CTEST: {line}')
1120            self.parse_record(line)
1121        proc.communicate()
1122
1123    def _update_test_status(self):
1124        if self.status == TwisterStatus.NONE:
1125            self.instance.testcases = []
1126            try:
1127                self._parse_report_file(self.report_file)
1128            except Exception as e:
1129                logger.error(f'Error when parsing file {self.report_file}: {e}')
1130                self.status = TwisterStatus.FAIL
1131            finally:
1132                if not self.instance.testcases:
1133                    self.instance.init_cases()
1134
1135        self.instance.status = self.status if self.status != TwisterStatus.NONE else \
1136                               TwisterStatus.FAIL
1137        if self.instance.status in [TwisterStatus.ERROR, TwisterStatus.FAIL]:
1138            self.instance.reason = self.instance.reason or 'Ctest failed'
1139            self.instance.add_missing_case_status(TwisterStatus.BLOCK, self.instance.reason)
1140
1141    def _parse_report_file(self, report):
1142        suite = junit.JUnitXml.fromfile(report)
1143        if suite is None:
1144            self.status = TwisterStatus.SKIP
1145            self.instance.reason = 'No tests collected'
1146            return
1147
1148        if not isinstance(suite, junit.TestSuite):
1149            suite = junit.TestSuite.fromelem(suite)
1150
1151        if suite.failures and suite.failures > 0:
1152            self.status = TwisterStatus.FAIL
1153            self.instance.reason = f"{suite.failures}/{suite.tests} ctest scenario(s) failed"
1154        elif suite.errors and suite.errors > 0:
1155            self.status = TwisterStatus.ERROR
1156            self.instance.reason = 'Error during ctest execution'
1157        elif suite.skipped == suite.tests:
1158            self.status = TwisterStatus.SKIP
1159        else:
1160            self.status = TwisterStatus.PASS
1161        self.instance.execution_time = suite.time
1162
1163        for case in suite:
1164            tc = self.instance.add_testcase(f"{self.id}.{case.name}")
1165            tc.duration = case.time
1166            if any(isinstance(r, junit.Failure) for r in case.result):
1167                tc.status = TwisterStatus.FAIL
1168                tc.output = case.system_out
1169            elif any(isinstance(r, junit.Error) for r in case.result):
1170                tc.status = TwisterStatus.ERROR
1171                tc.output = case.system_out
1172            elif any(isinstance(r, junit.Skipped) for r in case.result):
1173                tc.status = TwisterStatus.SKIP
1174                tc.reason = next((r.message for r in case.result \
1175                        if isinstance(r, junit.Skipped)), 'Ctest skip')
1176            else:
1177                tc.status = TwisterStatus.PASS
1178
1179class HarnessImporter:
1180
1181    @staticmethod
1182    def get_harness(harness_name):
1183        thismodule = sys.modules[__name__]
1184        try:
1185            if harness_name:
1186                harness_class = getattr(thismodule, harness_name)
1187            else:
1188                harness_class = thismodule.Test
1189            return harness_class()
1190        except AttributeError as e:
1191            logger.debug(f"harness {harness_name} not implemented: {e}")
1192            return None
1193