1#!/usr/bin/env python3
2# vim: set syntax=python ts=4 :
3#
4# Copyright (c) 2018-2022 Intel Corporation
5# Copyright 2022 NXP
6# Copyright (c) 2024 Arm Limited (or its affiliates). All rights reserved.
7#
8# SPDX-License-Identifier: Apache-2.0
9
10import argparse
11import contextlib
12import logging
13import math
14import os
15import re
16import select
17import shlex
18import signal
19import subprocess
20import sys
21import threading
22import time
23from contextlib import contextmanager
24from pathlib import Path
25from queue import Empty, Queue
26
27import psutil
28from twisterlib.environment import ZEPHYR_BASE, strip_ansi_sequences
29from twisterlib.error import TwisterException
30from twisterlib.platform import Platform
31from twisterlib.statuses import TwisterStatus
32
33sys.path.insert(0, os.path.join(ZEPHYR_BASE, "scripts/pylib/build_helpers"))
34from domains import Domains
35
36try:
37    import serial
38except ImportError:
39    print("Install pyserial python module with pip to use --device-testing option.")
40
41try:
42    import pty
43except ImportError as capture_error:
44    if os.name == "nt":  # "nt" means that program is running on Windows OS
45        pass  # "--device-serial-pty" option is not supported on Windows OS
46    else:
47        raise capture_error
48
49logger = logging.getLogger('twister')
50
51
52def terminate_process(proc):
53    """
54    encapsulate terminate functionality so we do it consistently where ever
55    we might want to terminate the proc.  We need try_kill_process_by_pid
56    because of both how newer ninja (1.6.0 or greater) and .NET / renode
57    work.  Newer ninja's don't seem to pass SIGTERM down to the children
58    so we need to use try_kill_process_by_pid.
59    """
60
61    with contextlib.suppress(ProcessLookupError, psutil.NoSuchProcess):
62        for child in psutil.Process(proc.pid).children(recursive=True):
63            with contextlib.suppress(ProcessLookupError, psutil.NoSuchProcess):
64                os.kill(child.pid, signal.SIGTERM)
65    proc.terminate()
66    # sleep for a while before attempting to kill
67    time.sleep(0.5)
68    proc.kill()
69
70
71class Handler:
72    def __init__(self, instance, type_str: str, options: argparse.Namespace,
73                 generator_cmd: str | None = None, suite_name_check: bool = True):
74        """Constructor
75
76        """
77        self.options = options
78
79        self.run = False
80        self.type_str = type_str
81
82        self.pid_fn = None
83        self.call_make_run = True
84
85        self.name = instance.name
86        self.instance = instance
87        self.sourcedir = instance.testsuite.source_dir
88        self.build_dir = instance.build_dir
89        self.log = os.path.join(self.build_dir, "handler.log")
90        self.returncode = 0
91        self.generator_cmd = generator_cmd
92        self.suite_name_check = suite_name_check
93        self.ready = False
94
95        self.args = []
96        self.terminated = False
97
98    def get_test_timeout(self):
99        return math.ceil(self.instance.testsuite.timeout *
100                         self.instance.platform.timeout_multiplier *
101                         self.options.timeout_multiplier)
102
103    def terminate(self, proc):
104        terminate_process(proc)
105        self.terminated = True
106
107    def _verify_ztest_suite_name(self, harness_status, detected_suite_names, handler_time):
108        """
109        If test suite names was found in test's C source code, then verify if
110        detected suite names from output correspond to expected suite names
111        (and not in reverse).
112        """
113        expected_suite_names = self.instance.testsuite.ztest_suite_names
114        logger.debug(f"Expected suite names:{expected_suite_names}")
115        logger.debug(f"Detected suite names:{detected_suite_names}")
116        if not expected_suite_names or \
117                harness_status != TwisterStatus.PASS:
118            return
119        if not detected_suite_names:
120            self._missing_suite_name(expected_suite_names, handler_time)
121            return
122        # compare the expect and detect from end one by one without order
123        _d_suite = detected_suite_names[-len(expected_suite_names):]
124        if (
125            set(_d_suite) != set(expected_suite_names)
126            and not set(_d_suite).issubset(set(expected_suite_names))
127        ):
128            self._missing_suite_name(expected_suite_names, handler_time)
129
130    def _missing_suite_name(self, expected_suite_names, handler_time):
131        """
132        Change result of performed test if problem with missing or unpropper
133        suite name was occurred.
134        """
135        self.instance.status = TwisterStatus.FAIL
136        self.instance.execution_time = handler_time
137        for tc in self.instance.testcases:
138            tc.status = TwisterStatus.FAIL
139        self.instance.reason = "Testsuite mismatch"
140        logger.debug(
141            "Test suite names were not printed or some of them in output"
142            f" do not correspond with expected: {str(expected_suite_names)}",
143        )
144
145    def _final_handle_actions(self, harness, handler_time):
146
147        # only for Ztest tests:
148        harness_class_name = type(harness).__name__
149        if self.suite_name_check and harness_class_name == "Test":
150            self._verify_ztest_suite_name(
151                harness.status,
152                harness.detected_suite_names,
153                handler_time
154            )
155            if self.instance.status == TwisterStatus.FAIL:
156                return
157            if not harness.matched_run_id and harness.run_id_exists:
158                self.instance.status = TwisterStatus.FAIL
159                self.instance.execution_time = handler_time
160                self.instance.reason = "RunID mismatch"
161                for tc in self.instance.testcases:
162                    tc.status = TwisterStatus.FAIL
163
164        self.instance.record(harness.recording)
165
166    def get_default_domain_build_dir(self):
167        if self.instance.sysbuild:
168            # Load domain yaml to get default domain build directory
169            # Note: for targets using QEMU, we assume that the target will
170            # have added any additional images to the run target manually
171            domain_path = os.path.join(self.build_dir, "domains.yaml")
172            domains = Domains.from_file(domain_path)
173            logger.debug(f"Loaded sysbuild domain data from {domain_path}")
174            build_dir = domains.get_default_domain().build_dir
175        else:
176            build_dir = self.build_dir
177        return build_dir
178
179
180class BinaryHandler(Handler):
181    def __init__(
182        self,
183        instance,
184        type_str: str,
185        options: argparse.Namespace,
186        generator_cmd: str | None = None,
187        suite_name_check: bool = True
188    ):
189        """Constructor
190
191        @param instance Test Instance
192        """
193        super().__init__(instance, type_str, options, generator_cmd, suite_name_check)
194
195        self.seed = None
196        self.extra_test_args = None
197        self.line = b""
198        self.binary: str | None = None
199
200    def try_kill_process_by_pid(self):
201        if self.pid_fn:
202            with open(self.pid_fn) as pid_file:
203                pid = int(pid_file.read())
204            os.unlink(self.pid_fn)
205            self.pid_fn = None  # clear so we don't try to kill the binary twice
206            with contextlib.suppress(ProcessLookupError, psutil.NoSuchProcess):
207                os.kill(pid, signal.SIGKILL)
208
209    def _output_reader(self, proc):
210        self.line = proc.stdout.readline()
211
212    def _output_handler(self, proc, harness):
213        suffix = '\\r\\n'
214
215        with open(self.log, "w") as log_out_fp:
216            timeout_extended = False
217            timeout_time = time.time() + self.get_test_timeout()
218            while True:
219                this_timeout = timeout_time - time.time()
220                if this_timeout < 0:
221                    break
222                reader_t = threading.Thread(target=self._output_reader, args=(proc,), daemon=True)
223                reader_t.start()
224                reader_t.join(this_timeout)
225                if not reader_t.is_alive() and self.line != b"":
226                    line_decoded = self.line.decode('utf-8', "replace")
227                    stripped_line = line_decoded.rstrip()
228                    if stripped_line.endswith(suffix):
229                        stripped_line = stripped_line[:-len(suffix)].rstrip()
230                    logger.debug(f"OUTPUT: {stripped_line}")
231                    log_out_fp.write(strip_ansi_sequences(line_decoded))
232                    log_out_fp.flush()
233                    harness.handle(stripped_line)
234                    if (
235                        harness.status != TwisterStatus.NONE
236                        and not timeout_extended
237                        or harness.capture_coverage
238                    ):
239                        timeout_extended = True
240                        if harness.capture_coverage:
241                            timeout_time = time.time() + 30
242                        else:
243                            timeout_time = time.time() + 2
244                else:
245                    reader_t.join(0)
246                    break
247            try:
248                # POSIX arch based ztests end on their own,
249                # so let's give it up to 100ms to do so
250                proc.wait(0.1)
251            except subprocess.TimeoutExpired:
252                self.terminate(proc)
253
254    def _create_command(self, robot_test):
255
256        if robot_test:
257            keywords = os.path.join(self.options.coverage_basedir, 'tests/robot/common.robot')
258            elf = os.path.join(self.build_dir, "zephyr/zephyr.elf")
259            command = [self.generator_cmd]
260            resc = ""
261            uart = ""
262            # os.path.join cannot be used on a Mock object, so we are
263            # explicitly checking the type
264            if isinstance(self.instance.platform, Platform):
265                for board_dir in self.options.board_root:
266                    path = os.path.join(Path(board_dir).parent, self.instance.platform.resc)
267                    if os.path.exists(path):
268                        resc = path
269                        break
270                uart = self.instance.platform.uart
271                command = ["renode-test",
272                            "--variable", "KEYWORDS:" + keywords,
273                            "--variable", "ELF:@" + elf,
274                            "--variable", "RESC:@" + resc,
275                            "--variable", "UART:" + uart]
276        elif self.call_make_run:
277            if self.options.sim_name:
278                target = f"run_{self.options.sim_name}"
279            else:
280                target = "run"
281
282            command = [self.generator_cmd, "-C", self.get_default_domain_build_dir(), target]
283        elif self.instance.testsuite.type == "unit":
284            assert self.binary, "Missing binary in unit testsuite."
285            command = [self.binary]
286        else:
287            binary = os.path.join(self.get_default_domain_build_dir(), "zephyr", "zephyr.exe")
288            command = [binary]
289
290        if self.options.enable_valgrind:
291            command = ["valgrind", "--error-exitcode=2",
292                       "--leak-check=full",
293                       "--suppressions=" + ZEPHYR_BASE + "/scripts/valgrind.supp",
294                       "--log-file=" + self.build_dir + "/valgrind.log",
295                       "--track-origins=yes",
296                       ] + command
297
298        # Only valid for native_sim
299        if self.seed is not None:
300            command.append(f"--seed={self.seed}")
301        if self.extra_test_args is not None:
302            command.extend(self.extra_test_args)
303
304        return command
305
306    def _create_env(self):
307        env = os.environ.copy()
308        if self.options.enable_asan:
309            env["ASAN_OPTIONS"] = "log_path=stdout:" + \
310                                  env.get("ASAN_OPTIONS", "")
311            if not self.options.enable_lsan:
312                env["ASAN_OPTIONS"] += "detect_leaks=0"
313
314        if self.options.enable_ubsan:
315            env["UBSAN_OPTIONS"] = "log_path=stdout:halt_on_error=1:" + \
316                                  env.get("UBSAN_OPTIONS", "")
317
318        return env
319
320    def _update_instance_info(self, harness, handler_time):
321        self.instance.execution_time = handler_time
322        if not self.terminated and self.returncode != 0:
323            self.instance.status = TwisterStatus.FAIL
324            if self.options.enable_valgrind and self.returncode == 2:
325                self.instance.reason = "Valgrind error"
326            else:
327                # When a process is killed, the default handler returns 128 + SIGTERM
328                # so in that case the return code itself is not meaningful
329                self.instance.reason = f"Failed (rc={self.returncode})"
330            self.instance.add_missing_case_status(TwisterStatus.BLOCK)
331        elif harness.status != TwisterStatus.NONE:
332            self.instance.status = harness.status
333            if harness.status == TwisterStatus.FAIL:
334                self.instance.reason = f"Failed harness:'{harness.reason}'"
335            self.instance.add_missing_case_status(TwisterStatus.BLOCK)
336        else:
337            self.instance.status = TwisterStatus.FAIL
338            self.instance.reason = "Timeout"
339            self.instance.add_missing_case_status(TwisterStatus.BLOCK, "Timeout")
340
341    def handle(self, harness):
342        robot_test = getattr(harness, "is_robot_test", False)
343
344        command = self._create_command(robot_test)
345
346        logger.debug("Spawning process: " +
347                     " ".join(shlex.quote(word) for word in command) + os.linesep +
348                     "in directory: " + self.build_dir)
349
350        start_time = time.time()
351
352        env = self._create_env()
353
354        if robot_test:
355            harness.run_robot_test(command, self)
356            return
357
358        stderr_log = f"{self.instance.build_dir}/handler_stderr.log"
359        with open(stderr_log, "w+") as stderr_log_fp, subprocess.Popen(
360            command, stdout=subprocess.PIPE, stderr=stderr_log_fp, cwd=self.build_dir, env=env
361        ) as proc:
362            logger.debug(f"Spawning BinaryHandler Thread for {self.name}")
363            t = threading.Thread(target=self._output_handler, args=(proc, harness,), daemon=True)
364            t.start()
365            t.join()
366            if t.is_alive():
367                self.terminate(proc)
368                t.join()
369            proc.wait()
370            self.returncode = proc.returncode
371            if proc.returncode != 0:
372                self.instance.status = TwisterStatus.ERROR
373                self.instance.reason = f"BinaryHandler returned {proc.returncode}"
374            self.try_kill_process_by_pid()
375
376        handler_time = time.time() - start_time
377
378        # FIXME: This is needed when killing the simulator, the console is
379        # garbled and needs to be reset. Did not find a better way to do that.
380        if sys.stdout.isatty():
381            subprocess.call(["stty", "sane"], stdin=sys.stdout)
382
383        self._update_instance_info(harness, handler_time)
384
385        self._final_handle_actions(harness, handler_time)
386
387
388class SimulationHandler(BinaryHandler):
389    def __init__(
390        self,
391        instance,
392        type_str: str,
393        options: argparse.Namespace,
394        generator_cmd: str | None = None,
395        suite_name_check: bool = True,
396    ):
397        """Constructor
398
399        @param instance Test Instance
400        """
401        super().__init__(instance, type_str, options, generator_cmd, suite_name_check)
402
403        if type_str == 'renode':
404            self.pid_fn = os.path.join(instance.build_dir, "renode.pid")
405        elif type_str == 'native':
406            self.call_make_run = False
407            self.ready = True
408
409
410class DeviceHandler(Handler):
411    def get_test_timeout(self):
412        timeout = super().get_test_timeout()
413        if self.options.enable_coverage:
414            # wait more for gcov data to be dumped on console
415            timeout += 120
416        return timeout
417
418    def monitor_serial(self, ser, halt_event, harness):
419        if self.options.enable_coverage:
420            # Set capture_coverage to True to indicate that right after
421            # test results we should get coverage data, otherwise we exit
422            # from the test.
423            harness.capture_coverage = True
424
425        # Wait for serial connection
426        while not ser.isOpen():
427            time.sleep(0.1)
428
429        # Clear serial leftover.
430        ser.reset_input_buffer()
431
432        with open(self.log, "wb") as log_out_fp:
433            while ser.isOpen():
434                if halt_event.is_set():
435                    logger.debug('halted')
436                    ser.close()
437                    break
438
439                try:
440                    if not ser.in_waiting:
441                        # no incoming bytes are waiting to be read from
442                        # the serial input buffer, let other threads run
443                        time.sleep(0.001)
444                        continue
445                # maybe the serial port is still in reset
446                # check status may cause error
447                # wait for more time
448                except OSError:
449                    time.sleep(0.001)
450                    continue
451                except TypeError:
452                    # This exception happens if the serial port was closed and
453                    # its file descriptor cleared in between of ser.isOpen()
454                    # and ser.in_waiting checks.
455                    logger.debug("Serial port is already closed, stop reading.")
456                    break
457
458                serial_line = None
459                # SerialException may happen during the serial device power off/on process.
460                with contextlib.suppress(TypeError, serial.SerialException):
461                    serial_line = ser.readline()
462
463                # Just because ser_fileno has data doesn't mean an entire line
464                # is available yet.
465                if serial_line:
466                    # can be more lines in serial_line so split them before handling
467                    for sl in serial_line.decode('utf-8', 'ignore').splitlines(keepends=True):
468                        log_out_fp.write(strip_ansi_sequences(sl).encode('utf-8'))
469                        log_out_fp.flush()
470                        if sl := sl.strip():
471                            logger.debug(f"DEVICE: {sl}")
472                            harness.handle(sl)
473
474                if harness.status != TwisterStatus.NONE and not harness.capture_coverage:
475                    ser.close()
476                    break
477
478    @staticmethod
479    @contextmanager
480    def acquire_dut_locks(duts):
481        try:
482            for d in duts:
483                d.lock.acquire()
484            yield
485        finally:
486            for d in duts:
487                d.lock.release()
488
489    def device_is_available(self, instance):
490        device = instance.platform.name
491        fixture = instance.testsuite.harness_config.get("fixture")
492        duts_found = []
493
494        for d in self.duts:
495            if fixture and fixture not in map(lambda f: f.split(sep=':')[0], d.fixtures):
496                continue
497            if d.platform != device or (d.serial is None and d.serial_pty is None):
498                continue
499            duts_found.append(d)
500
501        if not duts_found:
502            raise TwisterException(f"No device to serve as {device} platform.")
503
504        # Select an available DUT with less failures
505        for d in sorted(duts_found, key=lambda _dut: _dut.failures):
506            # get all DUTs with the same id
507            duts_shared_hw = [_d for _d in self.duts if _d.id == d.id]
508            with self.acquire_dut_locks(duts_shared_hw):
509                avail = False
510                if d.available:
511                    for _d in duts_shared_hw:
512                        _d.available = 0
513                    d.counter_increment()
514                    avail = True
515                    logger.debug(f"Retain DUT:{d.platform}, Id:{d.id}, "
516                                f"counter:{d.counter}, failures:{d.failures}")
517            if avail:
518                return d
519
520        return None
521
522    def make_dut_available(self, dut):
523        if self.instance.status in [TwisterStatus.ERROR, TwisterStatus.FAIL]:
524            dut.failures_increment()
525        logger.debug(f"Release DUT:{dut.platform}, Id:{dut.id}, "
526                     f"counter:{dut.counter}, failures:{dut.failures}")
527        # get all DUTs with the same id
528        duts_shared_hw = [_d for _d in self.duts if _d.id == dut.id]
529        with self.acquire_dut_locks(duts_shared_hw):
530            for _d in duts_shared_hw:
531                _d.available = 1
532
533    @staticmethod
534    def run_custom_script(script, timeout):
535        with subprocess.Popen(script, stderr=subprocess.PIPE, stdout=subprocess.PIPE) as proc:
536            try:
537                stdout, stderr = proc.communicate(timeout=timeout)
538                logger.debug(stdout.decode())
539                if proc.returncode != 0:
540                    logger.error(f"Custom script failure: {stderr.decode(errors='ignore')}")
541
542            except subprocess.TimeoutExpired:
543                proc.kill()
544                proc.communicate()
545                logger.error(f"{script} timed out")
546
547    def _create_command(self, runner, hardware):
548        command = ["west", "flash", "--skip-rebuild", "-d", self.build_dir]
549        command_extra_args = []
550
551        # There are three ways this option is used.
552        # 1) bare: default flags
553        #    This results in options.west_flash == []
554        # 2) with a value: --west-flash="--board-id=42"
555        #    This results in options.west_flash == "--board-id=42"
556        # 3) Multiple values: --west-flash="--board-id=42,--erase"
557        #    This results in options.west_flash == "--board-id=42 --erase"
558        if self.options.west_flash and self.options.west_flash != []:
559            command_extra_args.extend(self.options.west_flash.split(','))
560
561        if runner:
562            command.append("--runner")
563            command.append(runner)
564
565            board_id = hardware.probe_id or hardware.id
566            product = hardware.product
567            if board_id is not None:
568                if runner in ("pyocd", "nrfjprog", "nrfutil", "nrfutil_next"):
569                    command_extra_args.append("--dev-id")
570                    command_extra_args.append(board_id)
571                elif runner == "esp32":
572                    command_extra_args.append("--esp-device")
573                    command_extra_args.append(board_id)
574                elif (
575                    runner == "openocd"
576                    and product == "STM32 STLink"
577                    or runner == "openocd"
578                    and product == "STLINK-V3"
579                ):
580                    command_extra_args.append("--cmd-pre-init")
581                    command_extra_args.append(f"hla_serial {board_id}")
582                elif runner == "openocd" and product == "EDBG CMSIS-DAP":
583                    command_extra_args.append("--cmd-pre-init")
584                    command_extra_args.append(f"cmsis_dap_serial {board_id}")
585                elif runner == "openocd" and product == "LPC-LINK2 CMSIS-DAP":
586                    command_extra_args.append("--cmd-pre-init")
587                    command_extra_args.append(f"adapter serial {board_id}")
588                elif runner == "jlink":
589                    command.append("--dev-id")
590                    command.append(board_id)
591                elif runner == "linkserver":
592                    # for linkserver
593                    # --probe=#<number> select by probe index
594                    # --probe=<serial number> select by probe serial number
595                    command.append(f"--probe={board_id}")
596                elif runner == "stm32cubeprogrammer" and product != "BOOT-SERIAL":
597                    command.append(f"--tool-opt=sn={board_id}")
598
599                # Receive parameters from runner_params field.
600                if hardware.runner_params:
601                    for param in hardware.runner_params:
602                        command.append(param)
603
604        if command_extra_args:
605            command.append('--')
606            command.extend(command_extra_args)
607
608        return command
609
610    def _update_instance_info(self, harness, handler_time, flash_error):
611        self.instance.execution_time = handler_time
612        if harness.status != TwisterStatus.NONE:
613            self.instance.status = harness.status
614            if harness.status == TwisterStatus.FAIL:
615                self.instance.reason = f"Failed harness:'{harness.reason}'"
616            self.instance.add_missing_case_status(TwisterStatus.BLOCK, harness.status)
617        elif not flash_error:
618            self.instance.status = TwisterStatus.FAIL
619            self.instance.reason = "Timeout"
620
621        if self.instance.status in [TwisterStatus.ERROR, TwisterStatus.FAIL]:
622            self.instance.add_missing_case_status(TwisterStatus.BLOCK, self.instance.reason)
623
624    def _terminate_pty(self, ser_pty, ser_pty_process):
625        logger.debug(f"Terminating serial-pty:'{ser_pty}'")
626        terminate_process(ser_pty_process)
627        try:
628            (stdout, stderr) = ser_pty_process.communicate(timeout=self.get_test_timeout())
629            logger.debug(f"Terminated serial-pty:'{ser_pty}', stdout:'{stdout}', stderr:'{stderr}'")
630        except subprocess.TimeoutExpired:
631            logger.debug(f"Terminated serial-pty:'{ser_pty}'")
632    #
633
634    def _create_serial_connection(self, dut, serial_device, hardware_baud,
635                                  flash_timeout, serial_pty, ser_pty_process):
636        try:
637            ser = serial.Serial(
638                serial_device,
639                baudrate=hardware_baud,
640                parity=serial.PARITY_NONE,
641                stopbits=serial.STOPBITS_ONE,
642                bytesize=serial.EIGHTBITS,
643                # the worst case of no serial input
644                timeout=max(flash_timeout, self.get_test_timeout())
645            )
646        except serial.SerialException as e:
647            self._handle_serial_exception(e, dut, serial_pty, ser_pty_process)
648            raise
649
650        return ser
651
652
653    def _handle_serial_exception(self, exception, dut, serial_pty, ser_pty_process):
654        self.instance.status = TwisterStatus.FAIL
655        self.instance.reason = "Serial Device Error"
656        logger.error(f"Serial device error: {exception!s}")
657
658        self.instance.add_missing_case_status(TwisterStatus.BLOCK, "Serial Device Error")
659        if serial_pty and ser_pty_process:
660            self._terminate_pty(serial_pty, ser_pty_process)
661
662        self.make_dut_available(dut)
663
664
665    def get_hardware(self):
666        hardware = None
667        try:
668            hardware = self.device_is_available(self.instance)
669            in_waiting = 0
670            while not hardware:
671                time.sleep(1)
672                in_waiting += 1
673                if in_waiting%60 == 0:
674                    logger.debug(f"Waiting for a DUT to run {self.instance.name}")
675                hardware = self.device_is_available(self.instance)
676        except TwisterException as error:
677            self.instance.status = TwisterStatus.FAIL
678            self.instance.reason = str(error)
679            logger.error(self.instance.reason)
680        return hardware
681
682    def _get_serial_device(self, serial_pty, hardware_serial):
683        ser_pty_process = None
684        if serial_pty:
685            master, slave = pty.openpty()
686            try:
687                ser_pty_process = subprocess.Popen(
688                    re.split('[, ]', serial_pty),
689                    stdout=master,
690                    stdin=master,
691                    stderr=master
692                )
693            except subprocess.CalledProcessError as error:
694                logger.error(
695                    f"Failed to run subprocess {serial_pty}, error {error.output}"
696                )
697                return
698
699            serial_device = os.ttyname(slave)
700        else:
701            serial_device = hardware_serial
702
703        return serial_device, ser_pty_process
704
705    def handle(self, harness):
706        runner = None
707        hardware = self.get_hardware()
708        if hardware:
709            self.instance.dut = hardware.id
710        else:
711            return
712
713        runner = hardware.runner or self.options.west_runner
714        serial_pty = hardware.serial_pty
715
716        serial_device, ser_pty_process = self._get_serial_device(serial_pty, hardware.serial)
717
718        logger.debug(f"Using serial device {serial_device} @ {hardware.baud} baud")
719
720        command = self._create_command(runner, hardware)
721
722        pre_script = hardware.pre_script
723        post_flash_script = hardware.post_flash_script
724        post_script = hardware.post_script
725        script_param = hardware.script_param
726
727        if pre_script:
728            timeout = 30
729            if script_param:
730                timeout = script_param.get("pre_script_timeout", timeout)
731            self.run_custom_script(pre_script, timeout)
732
733        flash_timeout = hardware.flash_timeout
734        if hardware.flash_with_test:
735            flash_timeout += self.get_test_timeout()
736
737        serial_port = None
738        if hardware.flash_before is False:
739            serial_port = serial_device
740
741        try:
742            ser = self._create_serial_connection(
743                hardware,
744                serial_port,
745                hardware.baud,
746                flash_timeout,
747                serial_pty,
748                ser_pty_process
749            )
750        except serial.SerialException:
751            return
752
753        halt_monitor_evt = threading.Event()
754
755        t = threading.Thread(target=self.monitor_serial, daemon=True,
756                             args=(ser, halt_monitor_evt, harness))
757        start_time = time.time()
758        t.start()
759
760        d_log = f"{self.instance.build_dir}/device.log"
761        logger.debug(f'Flash command: {command}', )
762        flash_error = False
763        try:
764            stdout = stderr = None
765            with subprocess.Popen(command, stderr=subprocess.PIPE, stdout=subprocess.PIPE) as proc:
766                try:
767                    (stdout, stderr) = proc.communicate(timeout=flash_timeout)
768                    # ignore unencodable unicode chars
769                    logger.debug(stdout.decode(errors="ignore"))
770
771                    if proc.returncode != 0:
772                        self.instance.status = TwisterStatus.ERROR
773                        self.instance.reason = "Device issue (Flash error?)"
774                        flash_error = True
775                        with open(d_log, "w") as dlog_fp:
776                            dlog_fp.write(stderr.decode())
777                        halt_monitor_evt.set()
778                except subprocess.TimeoutExpired:
779                    logger.warning("Flash operation timed out.")
780                    self.terminate(proc)
781                    (stdout, stderr) = proc.communicate()
782                    self.instance.status = TwisterStatus.ERROR
783                    self.instance.reason = "Device issue (Timeout)"
784                    flash_error = True
785
786            with open(d_log, "w") as dlog_fp:
787                dlog_fp.write(stderr.decode())
788
789        except subprocess.CalledProcessError:
790            halt_monitor_evt.set()
791            self.instance.status = TwisterStatus.ERROR
792            self.instance.reason = "Device issue (Flash error)"
793            flash_error = True
794
795        if post_flash_script:
796            timeout = 30
797            if script_param:
798                timeout = script_param.get("post_flash_timeout", timeout)
799            self.run_custom_script(post_flash_script, timeout)
800
801        # Connect to device after flashing it
802        if hardware.flash_before:
803            try:
804                logger.debug(f"Attach serial device {serial_device} @ {hardware.baud} baud")
805                ser.port = serial_device
806                ser.open()
807            except serial.SerialException as e:
808                self._handle_serial_exception(e, hardware, serial_pty, ser_pty_process)
809                return
810
811        if not flash_error:
812            # Always wait at most the test timeout here after flashing.
813            t.join(self.get_test_timeout())
814        else:
815            # When the flash error is due exceptions,
816            # twister tell the monitor serial thread
817            # to close the serial. But it is necessary
818            # for this thread being run first and close
819            # have the change to close the serial.
820            t.join(0.1)
821
822        if t.is_alive():
823            logger.debug(
824                f"Timed out while monitoring serial output on {self.instance.platform.name}"
825            )
826
827        if ser.isOpen():
828            ser.close()
829
830        if serial_pty:
831            self._terminate_pty(serial_pty, ser_pty_process)
832
833        handler_time = time.time() - start_time
834
835        self._update_instance_info(harness, handler_time, flash_error)
836
837        self._final_handle_actions(harness, handler_time)
838
839        if post_script:
840            timeout = 30
841            if script_param:
842                timeout = script_param.get("post_script_timeout", timeout)
843            self.run_custom_script(post_script, timeout)
844
845        self.make_dut_available(hardware)
846
847
848class QEMUHandler(Handler):
849    """Spawns a thread to monitor QEMU output from pipes
850
851    We pass QEMU_PIPE to 'make run' and monitor the pipes for output.
852    We need to do this as once qemu starts, it runs forever until killed.
853    Test cases emit special messages to the console as they run, we check
854    for these to collect whether the test passed or failed.
855    """
856
857    def __init__(
858        self,
859        instance,
860        type_str: str,
861        options: argparse.Namespace,
862        generator_cmd: str | None = None,
863        suite_name_check: bool = True,
864    ):
865        """Constructor
866
867        @param instance Test instance
868        """
869
870        super().__init__(instance, type_str, options, generator_cmd, suite_name_check)
871        self.fifo_fn = os.path.join(instance.build_dir, "qemu-fifo")
872
873        self.pid_fn = os.path.join(instance.build_dir, "qemu.pid")
874
875        self.stdout_fn = os.path.join(instance.build_dir, "qemu.stdout")
876
877        self.stderr_fn = os.path.join(instance.build_dir, "qemu.stderr")
878
879        if instance.testsuite.ignore_qemu_crash:
880            self.ignore_qemu_crash = True
881            self.ignore_unexpected_eof = True
882        else:
883            self.ignore_qemu_crash = False
884            self.ignore_unexpected_eof = False
885
886    @staticmethod
887    def _get_cpu_time(pid):
888        """get process CPU time.
889
890        The guest virtual time in QEMU icount mode isn't host time and
891        it's maintained by counting guest instructions, so we use QEMU
892        process execution time to mostly simulate the time of guest OS.
893        """
894        proc = psutil.Process(pid)
895        cpu_time = proc.cpu_times()
896        return cpu_time.user + cpu_time.system
897
898    @staticmethod
899    def _thread_get_fifo_names(fifo_fn):
900        fifo_in = fifo_fn + ".in"
901        fifo_out = fifo_fn + ".out"
902
903        return fifo_in, fifo_out
904
905    @staticmethod
906    def _thread_update_instance_info(handler, handler_time, status, reason):
907        handler.instance.execution_time = handler_time
908        handler.instance.status = status
909        if reason:
910            handler.instance.reason = reason
911        else:
912            handler.instance.reason = "Unknown"
913
914    @staticmethod
915    def _thread(handler, timeout, outdir, logfile, fifo_fn, pid_fn,
916                harness, ignore_unexpected_eof=False):
917        fifo_in, fifo_out = QEMUHandler._thread_get_fifo_names(fifo_fn)
918
919        # These in/out nodes are named from QEMU's perspective, not ours
920        if os.path.exists(fifo_in):
921            os.unlink(fifo_in)
922        os.mkfifo(fifo_in)
923        if os.path.exists(fifo_out):
924            os.unlink(fifo_out)
925        os.mkfifo(fifo_out)
926
927        with (
928            # We don't do anything with out_fp but we need to open it for
929            # writing so that QEMU doesn't block, due to the way pipes work
930            open(fifo_in, "wb") as _,
931            # Disable internal buffering, we don't
932            # want read() or poll() to ever block if there is data in there
933            open(fifo_out, "rb", buffering=0) as in_fp,
934            open(logfile, "w") as log_out_fp
935        ):
936            start_time = time.time()
937            timeout_time = start_time + timeout
938            p = select.poll()
939            p.register(in_fp, select.POLLIN)
940            _status = TwisterStatus.NONE
941            _reason = None
942
943            line = ""
944            timeout_extended = False
945
946            pid = 0
947            if os.path.exists(pid_fn):
948                with open(pid_fn) as pid_file:
949                    pid = int(pid_file.read())
950
951            while True:
952                this_timeout = int((timeout_time - time.time()) * 1000)
953                if timeout_extended:
954                    # Quit early after timeout extension if no more data is being received
955                    this_timeout = min(this_timeout, 1000)
956                if this_timeout < 0 or not p.poll(this_timeout):
957                    try:
958                        if pid and this_timeout > 0:
959                            # there's possibility we polled nothing because
960                            # of not enough CPU time scheduled by host for
961                            # QEMU process during p.poll(this_timeout)
962                            cpu_time = QEMUHandler._get_cpu_time(pid)
963                            if cpu_time < timeout and _status == TwisterStatus.NONE:
964                                timeout_time = time.time() + (timeout - cpu_time)
965                                continue
966                    except psutil.NoSuchProcess:
967                        pass
968                    except ProcessLookupError:
969                        _status = TwisterStatus.FAIL
970                        _reason = "Execution error"
971                        break
972
973                    if _status == TwisterStatus.NONE:
974                        _status = TwisterStatus.FAIL
975                        _reason = "timeout"
976                    break
977
978                if pid == 0 and os.path.exists(pid_fn):
979                    with open(pid_fn) as pid_file:
980                        pid = int(pid_file.read())
981
982                try:
983                    c = in_fp.read(1).decode("utf-8")
984                except UnicodeDecodeError:
985                    # Test is writing something weird, fail
986                    _status = TwisterStatus.FAIL
987                    _reason = "unexpected byte"
988                    break
989
990                if c == "":
991                    # EOF, this shouldn't happen unless QEMU crashes
992                    if not ignore_unexpected_eof:
993                        _status = TwisterStatus.FAIL
994                        _reason = "unexpected eof"
995                    break
996                line = line + c
997                if c != "\n":
998                    continue
999
1000                # line contains a full line of data output from QEMU
1001                log_out_fp.write(strip_ansi_sequences(line))
1002                log_out_fp.flush()
1003                line = line.rstrip()
1004                logger.debug(f"QEMU ({pid}): {line}")
1005
1006                harness.handle(line)
1007                if harness.status != TwisterStatus.NONE:
1008                    # if we have registered a fail make sure the status is not
1009                    # overridden by a false success message coming from the
1010                    # testsuite
1011                    if _status != TwisterStatus.FAIL:
1012                        _status = harness.status
1013                        _reason = harness.reason
1014
1015                    # if we get some status, that means test is doing well, we reset
1016                    # the timeout and wait for 2 more seconds to catch anything
1017                    # printed late. We wait much longer if code
1018                    # coverage is enabled since dumping this information can
1019                    # take some time.
1020                    if not timeout_extended or harness.capture_coverage:
1021                        timeout_extended = True
1022                        if harness.capture_coverage:
1023                            timeout_time = time.time() + 30
1024                        else:
1025                            timeout_time = time.time() + 2
1026                line = ""
1027
1028            handler_time = time.time() - start_time
1029            logger.debug(
1030                f"QEMU ({pid}) complete with {_status} ({_reason}) after {handler_time} seconds"
1031            )
1032
1033            QEMUHandler._thread_update_instance_info(handler, handler_time, _status, _reason)
1034
1035        if pid:
1036            # Oh well, as long as it's dead! User probably sent Ctrl-C
1037            with contextlib.suppress(ProcessLookupError, psutil.NoSuchProcess):
1038                if pid:
1039                    os.kill(pid, signal.SIGTERM)
1040
1041        os.unlink(fifo_in)
1042        os.unlink(fifo_out)
1043
1044    def _set_qemu_filenames(self, sysbuild_build_dir):
1045        # We pass this to QEMU which looks for fifos with .in and .out suffixes.
1046        # QEMU fifo will use main build dir
1047        self.fifo_fn = os.path.join(self.instance.build_dir, "qemu-fifo")
1048        # PID file will be created in the main sysbuild app's build dir
1049        self.pid_fn = os.path.join(sysbuild_build_dir, "qemu.pid")
1050
1051        if os.path.exists(self.pid_fn):
1052            os.unlink(self.pid_fn)
1053
1054        self.log_fn = self.log
1055
1056    def _create_command(self, sysbuild_build_dir):
1057        command = [self.generator_cmd]
1058        command += ["-C", sysbuild_build_dir, "run"]
1059
1060        return command
1061
1062    def _update_instance_info(self, harness, is_timeout):
1063        if (self.returncode != 0 and not self.ignore_qemu_crash) or \
1064            harness.status == TwisterStatus.NONE:
1065            self.instance.status = TwisterStatus.FAIL
1066            if is_timeout:
1067                self.instance.reason = "Timeout"
1068            else:
1069                if not self.instance.reason:
1070                    self.instance.reason = f"Exited with {self.returncode}"
1071            self.instance.add_missing_case_status(TwisterStatus.BLOCK)
1072
1073    def handle(self, harness):
1074        self.run = True
1075
1076        domain_build_dir = self.get_default_domain_build_dir()
1077
1078        command = self._create_command(domain_build_dir)
1079
1080        self._set_qemu_filenames(domain_build_dir)
1081
1082        self.thread = threading.Thread(name=self.name, target=QEMUHandler._thread,
1083                                       args=(self, self.get_test_timeout(), self.build_dir,
1084                                             self.log_fn, self.fifo_fn,
1085                                             self.pid_fn, harness,
1086                                             self.ignore_unexpected_eof))
1087
1088        self.thread.daemon = True
1089        logger.debug(f"Spawning QEMUHandler Thread for {self.name}")
1090        self.thread.start()
1091        thread_max_time = time.time() + self.get_test_timeout()
1092        if sys.stdout.isatty():
1093            subprocess.call(["stty", "sane"], stdin=sys.stdout)
1094
1095        logger.debug(f"Running {self.name} ({self.type_str})")
1096
1097        is_timeout = False
1098        qemu_pid = None
1099
1100        with open(self.stdout_fn, "w") as stdout_fp, \
1101            open(self.stderr_fn, "w") as stderr_fp, subprocess.Popen(
1102            command,
1103            stdout=stdout_fp,
1104            stderr=stderr_fp,
1105            cwd=self.build_dir
1106        ) as proc:
1107            logger.debug(f"Spawning QEMUHandler Thread for {self.name}")
1108
1109            try:
1110                proc.wait(self.get_test_timeout())
1111            except subprocess.TimeoutExpired:
1112                # sometimes QEMU can't handle SIGTERM signal correctly
1113                # in that case kill -9 QEMU process directly and leave
1114                # twister to judge testing result by console output
1115
1116                is_timeout = True
1117                self.terminate(proc)
1118                if harness.status == TwisterStatus.PASS:
1119                    self.returncode = 0
1120                else:
1121                    self.returncode = proc.returncode
1122            else:
1123                if os.path.exists(self.pid_fn):
1124                    with open(self.pid_fn) as pid_file:
1125                        qemu_pid = int(pid_file.read())
1126                logger.debug(f"No timeout, return code from QEMU ({qemu_pid}): {proc.returncode}")
1127                self.returncode = proc.returncode
1128            # Need to wait for harness to finish processing
1129            # output from QEMU. Otherwise it might miss some
1130            # messages.
1131            self.thread.join(max(thread_max_time - time.time(), 0))
1132            if self.thread.is_alive():
1133                logger.debug("Timed out while monitoring QEMU output")
1134
1135            if os.path.exists(self.pid_fn):
1136                with open(self.pid_fn) as pid_file:
1137                    qemu_pid = int(pid_file.read())
1138                os.unlink(self.pid_fn)
1139
1140        logger.debug(f"return code from QEMU ({qemu_pid}): {self.returncode}")
1141
1142        self._update_instance_info(harness, is_timeout)
1143
1144        self._final_handle_actions(harness, 0)
1145
1146    def get_fifo(self):
1147        return self.fifo_fn
1148
1149
1150class QEMUWinHandler(Handler):
1151    """Spawns a thread to monitor QEMU output from pipes on Windows OS
1152
1153     QEMU creates single duplex pipe at //.pipe/path, where path is fifo_fn.
1154     We need to do this as once qemu starts, it runs forever until killed.
1155     Test cases emit special messages to the console as they run, we check
1156     for these to collect whether the test passed or failed.
1157     """
1158
1159    def __init__(
1160        self,
1161        instance,
1162        type_str: str,
1163        options: argparse.Namespace,
1164        generator_cmd: str | None = None,
1165        suite_name_check: bool = True,
1166    ):
1167        """Constructor
1168
1169        @param instance Test instance
1170        """
1171
1172        super().__init__(instance, type_str, options, generator_cmd, suite_name_check)
1173        self.pid_fn = os.path.join(instance.build_dir, "qemu.pid")
1174        self.fifo_fn = os.path.join(instance.build_dir, "qemu-fifo")
1175        self.pipe_handle = None
1176        self.pid = 0
1177        self.thread = None
1178        self.stop_thread = False
1179
1180        if instance.testsuite.ignore_qemu_crash:
1181            self.ignore_qemu_crash = True
1182            self.ignore_unexpected_eof = True
1183        else:
1184            self.ignore_qemu_crash = False
1185            self.ignore_unexpected_eof = False
1186
1187    @staticmethod
1188    def _get_cpu_time(pid):
1189        """get process CPU time.
1190
1191        The guest virtual time in QEMU icount mode isn't host time and
1192        it's maintained by counting guest instructions, so we use QEMU
1193        process execution time to mostly simulate the time of guest OS.
1194        """
1195        proc = psutil.Process(pid)
1196        cpu_time = proc.cpu_times()
1197        return cpu_time.user + cpu_time.system
1198
1199    @staticmethod
1200    def _open_log_file(logfile):
1201        return open(logfile, "w")
1202
1203    @staticmethod
1204    def _close_log_file(log_file):
1205        log_file.close()
1206
1207    @staticmethod
1208    def _stop_qemu_process(pid):
1209        if pid:
1210            try:
1211                if pid:
1212                    os.kill(pid, signal.SIGTERM)
1213            except (ProcessLookupError, psutil.NoSuchProcess):
1214                # Oh well, as long as it's dead! User probably sent Ctrl-C
1215                pass
1216            except OSError:
1217                pass
1218
1219    @staticmethod
1220    def _monitor_update_instance_info(handler, handler_time, status, reason):
1221        handler.instance.execution_time = handler_time
1222        handler.instance.status = status
1223        if reason:
1224            handler.instance.reason = reason
1225        else:
1226            handler.instance.reason = "Unknown"
1227
1228    def _set_qemu_filenames(self, sysbuild_build_dir):
1229        # PID file will be created in the main sysbuild app's build dir
1230        self.pid_fn = os.path.join(sysbuild_build_dir, "qemu.pid")
1231
1232        if os.path.exists(self.pid_fn):
1233            os.unlink(self.pid_fn)
1234
1235        self.log_fn = self.log
1236
1237    def _create_command(self, sysbuild_build_dir):
1238        command = [self.generator_cmd]
1239        command += ["-C", sysbuild_build_dir, "run"]
1240
1241        return command
1242
1243    def _update_instance_info(self, harness, is_timeout):
1244        if (self.returncode != 0 and not self.ignore_qemu_crash) or \
1245            harness.status == TwisterStatus.NONE:
1246            self.instance.status = TwisterStatus.FAIL
1247            if is_timeout:
1248                self.instance.reason = "Timeout"
1249            else:
1250                if not self.instance.reason:
1251                    self.instance.reason = f"Exited with {self.returncode}"
1252            self.instance.add_missing_case_status(TwisterStatus.BLOCK)
1253
1254    def _enqueue_char(self, queue):
1255        while not self.stop_thread:
1256            if not self.pipe_handle:
1257                try:
1258                    self.pipe_handle = os.open(r"\\.\pipe\\" + self.fifo_fn, os.O_RDONLY)
1259                except FileNotFoundError as e:
1260                    if e.args[0] == 2:
1261                        # Pipe is not opened yet, try again after a delay.
1262                        time.sleep(1)
1263                continue
1264
1265            c = ""
1266            try:
1267                c = os.read(self.pipe_handle, 1)
1268            finally:
1269                queue.put(c)
1270
1271    def _monitor_output(
1272        self,
1273        queue,
1274        timeout,
1275        logfile,
1276        pid_fn,
1277        harness,
1278        ignore_unexpected_eof=False
1279    ):
1280        start_time = time.time()
1281        timeout_time = start_time + timeout
1282        _status = TwisterStatus.NONE
1283        _reason = None
1284        line = ""
1285        timeout_extended = False
1286        self.pid = 0
1287
1288        log_out_fp = self._open_log_file(logfile)
1289
1290        while True:
1291            this_timeout = int((timeout_time - time.time()) * 1000)
1292            if this_timeout < 0:
1293                try:
1294                    if self.pid and this_timeout > 0:
1295                        # there's possibility we polled nothing because
1296                        # of not enough CPU time scheduled by host for
1297                        # QEMU process during p.poll(this_timeout)
1298                        cpu_time = self._get_cpu_time(self.pid)
1299                        if cpu_time < timeout and _status == TwisterStatus.NONE:
1300                            timeout_time = time.time() + (timeout - cpu_time)
1301                            continue
1302                except psutil.NoSuchProcess:
1303                    pass
1304                except ProcessLookupError:
1305                    _status = TwisterStatus.FAIL
1306                    _reason = "Execution error"
1307                    break
1308
1309                if _status == TwisterStatus.NONE:
1310                    _status = TwisterStatus.FAIL
1311                    _reason = "timeout"
1312                break
1313
1314            if self.pid == 0 and os.path.exists(pid_fn):
1315                # pid file probably not contains pid yet, continue
1316                with (
1317                    contextlib.suppress(ValueError),
1318                    open(pid_fn) as pid_file
1319                ):
1320                    self.pid = int(pid_file.read())
1321
1322            try:
1323                c = queue.get_nowait()
1324            except Empty:
1325                continue
1326            try:
1327                c = c.decode("utf-8")
1328            except UnicodeDecodeError:
1329                # Test is writing something weird, fail
1330                _status = TwisterStatus.FAIL
1331                _reason = "unexpected byte"
1332                break
1333
1334            if c == "":
1335                # EOF, this shouldn't happen unless QEMU crashes
1336                if not ignore_unexpected_eof:
1337                    _status = TwisterStatus.FAIL
1338                    _reason = "unexpected eof"
1339                break
1340            line = line + c
1341            if c != "\n":
1342                continue
1343
1344            # line contains a full line of data output from QEMU
1345            log_out_fp.write(line)
1346            log_out_fp.flush()
1347            line = line.rstrip()
1348            logger.debug(f"QEMU ({self.pid}): {line}")
1349
1350            harness.handle(line)
1351            if harness.status != TwisterStatus.NONE:
1352                # if we have registered a fail make sure the status is not
1353                # overridden by a false success message coming from the
1354                # testsuite
1355                if _status != TwisterStatus.FAIL:
1356                    _status = harness.status
1357                    _reason = harness.reason
1358
1359                # if we get some status, that means test is doing well, we reset
1360                # the timeout and wait for 2 more seconds to catch anything
1361                # printed late. We wait much longer if code
1362                # coverage is enabled since dumping this information can
1363                # take some time.
1364                if not timeout_extended or harness.capture_coverage:
1365                    timeout_extended = True
1366                    if harness.capture_coverage:
1367                        timeout_time = time.time() + 30
1368                    else:
1369                        timeout_time = time.time() + 2
1370            line = ""
1371
1372        self.stop_thread = True
1373
1374        handler_time = time.time() - start_time
1375        logger.debug(
1376            f"QEMU ({self.pid}) complete with {_status} ({_reason}) after {handler_time} seconds"
1377        )
1378        self._monitor_update_instance_info(self, handler_time, _status, _reason)
1379        self._close_log_file(log_out_fp)
1380        self._stop_qemu_process(self.pid)
1381
1382    def handle(self, harness):
1383        self.run = True
1384
1385        domain_build_dir = self.get_default_domain_build_dir()
1386        command = self._create_command(domain_build_dir)
1387        self._set_qemu_filenames(domain_build_dir)
1388
1389        logger.debug(f"Running {self.name} ({self.type_str})")
1390        is_timeout = False
1391        self.stop_thread = False
1392        queue = Queue()
1393
1394        with subprocess.Popen(command, stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT,
1395                              cwd=self.build_dir) as proc:
1396            logger.debug(f"Spawning QEMUHandler Thread for {self.name}")
1397
1398            self.thread = threading.Thread(target=self._enqueue_char, args=(queue,))
1399            self.thread.daemon = True
1400            self.thread.start()
1401
1402            thread_max_time = time.time() + self.get_test_timeout()
1403
1404            self._monitor_output(queue, self.get_test_timeout(), self.log_fn, self.pid_fn, harness,
1405                                 self.ignore_unexpected_eof)
1406
1407            if (thread_max_time - time.time()) < 0:
1408                logger.debug("Timed out while monitoring QEMU output")
1409                proc.terminate()
1410                # sleep for a while before attempting to kill
1411                time.sleep(0.5)
1412                proc.kill()
1413
1414            if harness.status == TwisterStatus.PASS:
1415                self.returncode = 0
1416            else:
1417                self.returncode = proc.returncode
1418
1419            if os.path.exists(self.pid_fn):
1420                os.unlink(self.pid_fn)
1421
1422        logger.debug(f"return code from QEMU ({self.pid}): {self.returncode}")
1423
1424        os.close(self.pipe_handle)
1425        self.pipe_handle = None
1426
1427        self._update_instance_info(harness, is_timeout)
1428
1429        self._final_handle_actions(harness, 0)
1430
1431    def get_fifo(self):
1432        return self.fifo_fn
1433