1#! /usr/bin/env python3
2
3# Copyright (c) 2017 Linaro Limited.
4# Copyright (c) 2017 Open Source Foundries Limited.
5#
6# SPDX-License-Identifier: Apache-2.0
7
8"""Zephyr binary runner core interfaces
9
10This provides the core ZephyrBinaryRunner class meant for public use,
11as well as some other helpers for concrete runner classes.
12"""
13
14import abc
15import argparse
16import errno
17import logging
18import os
19import platform
20import re
21import selectors
22import shlex
23import shutil
24import signal
25import socket
26import subprocess
27import sys
28from dataclasses import dataclass, field
29from enum import Enum
30from functools import partial
31from inspect import isabstract
32from typing import NamedTuple, NoReturn
33
34try:
35    from elftools.elf.elffile import ELFFile
36    ELFTOOLS_MISSING = False
37except ImportError:
38    ELFTOOLS_MISSING = True
39
40
41# Turn on to enable just logging the commands that would be run (at
42# info rather than debug level), without actually running them. This
43# can break runners that are expecting output or if one command
44# depends on another, so it's just for debugging.
45_DRY_RUN = False
46
47_logger = logging.getLogger('runners')
48
49# FIXME: I assume this code belongs somewhere else, but i couldn't figure out
50# a good location for it, so i put it here for now
51# We could potentially search for RTT blocks in hex or bin files as well,
52# but since the magic string is "SEGGER RTT", i thought it might be better
53# to avoid, at the risk of false positives.
54def find_rtt_block(elf_file: str) -> int | None:
55    if ELFTOOLS_MISSING:
56        raise RuntimeError('the Python dependency elftools was missing; '
57                           'see the getting started guide for details on '
58                           'how to fix')
59
60    with open(elf_file, 'rb') as f:
61        elffile = ELFFile(f)
62        for sect in elffile.iter_sections('SHT_SYMTAB'):
63            symbols = sect.get_symbol_by_name('_SEGGER_RTT')
64            if symbols is None:
65                continue
66            for s in symbols:
67                return s.entry.get('st_value')
68    return None
69
70
71class _DebugDummyPopen:
72
73    def terminate(self):
74        pass
75
76    def wait(self):
77        pass
78
79
80MAX_PORT = 49151
81
82
83class NetworkPortHelper:
84    '''Helper class for dealing with local IP network ports.'''
85
86    def get_unused_ports(self, starting_from):
87        '''Find unused network ports, starting at given values.
88
89        starting_from is an iterable of ports the caller would like to use.
90
91        The return value is an iterable of ports, in the same order, using
92        the given values if they were unused, or the next sequentially
93        available unused port otherwise.
94
95        Ports may be bound between this call's check and actual usage, so
96        callers still need to handle errors involving returned ports.'''
97        start = list(starting_from)
98        used = self._used_now()
99        ret = []
100
101        for desired in start:
102            port = desired
103            while port in used:
104                port += 1
105                if port > MAX_PORT:
106                    msg = "ports above {} are in use"
107                    raise ValueError(msg.format(desired))
108            used.add(port)
109            ret.append(port)
110
111        return ret
112
113    def _used_now(self):
114        handlers = {
115            'Windows': self._used_now_windows,
116            'Linux': self._used_now_linux,
117            'Darwin': self._used_now_darwin,
118        }
119        handler = handlers[platform.system()]
120        return handler()
121
122    def _used_now_windows(self):
123        cmd = ['netstat', '-a', '-n', '-p', 'tcp']
124        return self._parser_windows(cmd)
125
126    def _used_now_linux(self):
127        cmd = ['ss', '-a', '-n', '-t']
128        return self._parser_linux(cmd)
129
130    def _used_now_darwin(self):
131        cmd = ['netstat', '-a', '-n', '-p', 'tcp']
132        return self._parser_darwin(cmd)
133
134    @staticmethod
135    def _parser_windows(cmd):
136        out = subprocess.check_output(cmd).split(b'\r\n')
137        used_bytes = [x.split()[1].rsplit(b':', 1)[1] for x in out
138                      if x.startswith(b'  TCP')]
139        return {int(b) for b in used_bytes}
140
141    @staticmethod
142    def _parser_linux(cmd):
143        out = subprocess.check_output(cmd).splitlines()[1:]
144        used_bytes = [s.split()[3].rsplit(b':', 1)[1] for s in out]
145        return {int(b) for b in used_bytes}
146
147    @staticmethod
148    def _parser_darwin(cmd):
149        out = subprocess.check_output(cmd).split(b'\n')
150        used_bytes = [x.split()[3].rsplit(b':', 1)[1] for x in out
151                      if x.startswith(b'tcp')]
152        return {int(b) for b in used_bytes}
153
154
155class BuildConfiguration:
156    '''This helper class provides access to build-time configuration.
157
158    Configuration options can be read as if the object were a dict,
159    either object['CONFIG_FOO'] or object.get('CONFIG_FOO').
160
161    Kconfig configuration values are available (parsed from .config).'''
162
163    config_prefix = 'CONFIG'
164
165    def __init__(self, build_dir: str):
166        self.build_dir = build_dir
167        self.options: dict[str, str | int] = {}
168        self.path = os.path.join(self.build_dir, 'zephyr', '.config')
169        self._parse()
170
171    def __contains__(self, item):
172        return item in self.options
173
174    def __getitem__(self, item):
175        return self.options[item]
176
177    def get(self, option, *args):
178        return self.options.get(option, *args)
179
180    def getboolean(self, option):
181        '''If a boolean option is explicitly set to y or n,
182        returns its value. Otherwise, falls back to False.
183        '''
184        return self.options.get(option, False)
185
186    def _parse(self):
187        filename = self.path
188
189        opt_value = re.compile(f'^(?P<option>{self.config_prefix}_[A-Za-z0-9_]+)=(?P<value>.*)$')
190        not_set = re.compile(f'^# (?P<option>{self.config_prefix}_[A-Za-z0-9_]+) is not set$')
191
192        with open(filename) as f:
193            for line in f:
194                match = opt_value.match(line)
195                if match:
196                    value = match.group('value').rstrip()
197                    if value.startswith('"') and value.endswith('"'):
198                        # A string literal should have the quotes stripped,
199                        # but otherwise be left as is.
200                        value = value[1:-1]
201                    elif value == 'y':
202                        # The character 'y' is a boolean option
203                        # that is set to True.
204                        value = True
205                    else:
206                        # Neither a string nor 'y', so try to parse it
207                        # as an integer.
208                        try:
209                            base = 16 if value.startswith('0x') else 10
210                            self.options[match.group('option')] = int(value, base=base)
211                            continue
212                        except ValueError:
213                            pass
214
215                    self.options[match.group('option')] = value
216                    continue
217
218                match = not_set.match(line)
219                if match:
220                    # '# CONFIG_FOO is not set' means a boolean option is false.
221                    self.options[match.group('option')] = False
222
223class SysbuildConfiguration(BuildConfiguration):
224    '''This helper class provides access to sysbuild-time configuration.
225
226    Configuration options can be read as if the object were a dict,
227    either object['SB_CONFIG_FOO'] or object.get('SB_CONFIG_FOO').
228
229    Kconfig configuration values are available (parsed from .config).'''
230
231    config_prefix = 'SB_CONFIG'
232
233    def _parse(self):
234        # If the build does not use sysbuild, skip parsing the file.
235        if not os.path.exists(self.path):
236            return
237        super()._parse()
238
239class MissingProgram(FileNotFoundError):
240    '''FileNotFoundError subclass for missing program dependencies.
241
242    No significant changes from the parent FileNotFoundError; this is
243    useful for explicitly signaling that the file in question is a
244    program that some class requires to proceed.
245
246    The filename attribute contains the missing program.'''
247
248    def __init__(self, program):
249        super().__init__(errno.ENOENT, os.strerror(errno.ENOENT), program)
250
251
252_RUNNERCAPS_COMMANDS = {'flash', 'debug', 'debugserver', 'attach', 'simulate', 'robot', 'rtt'}
253
254@dataclass
255class RunnerCaps:
256    '''This class represents a runner class's capabilities.
257
258    Each capability is represented as an attribute with the same
259    name. Flag attributes are True or False.
260
261    Available capabilities:
262
263    - commands: set of supported commands; default is {'flash',
264      'debug', 'debugserver', 'attach', 'simulate', 'robot', 'rtt'}.
265
266    - dev_id: whether the runner supports device identifiers, in the form of an
267      -i, --dev-id option. This is useful when the user has multiple debuggers
268      connected to a single computer, in order to select which one will be used
269      with the command provided.
270
271    - mult_dev_ids: whether the runner supports multiple device identifiers
272      for a single operation, allowing for bulk flashing of devices.
273
274    - flash_addr: whether the runner supports flashing to an
275      arbitrary address. Default is False. If true, the runner
276      must honor the --dt-flash option.
277
278    - erase: whether the runner supports an --erase option, which
279      does a mass-erase of the entire addressable flash on the target
280      before flashing. On multi-core SoCs, this may only erase portions of
281      flash specific the actual target core. (This option can be useful for
282      things like clearing out old settings values or other subsystem state
283      that may affect the behavior of the zephyr image. It is also sometimes
284      needed by SoCs which have flash-like areas that can't be sector
285      erased by the underlying tool before flashing; UICR on nRF SoCs
286      is one example.)
287
288    - reset: whether the runner supports a --reset option, which
289      resets the device after a flash operation is complete.
290
291    - extload: whether the runner supports a --extload option, which
292      must be given one time and is passed on to the underlying tool
293      that the runner wraps.
294
295    - tool_opt: whether the runner supports a --tool-opt (-O) option, which
296      can be given multiple times and is passed on to the underlying tool
297      that the runner wraps.
298
299    - file: whether the runner supports a --file option, which specifies
300      exactly the file that should be used to flash, overriding any default
301      discovered in the build directory.
302
303    - hide_load_files: whether the elf/hex/bin file arguments should be hidden.
304
305    - rtt: whether the runner supports SEGGER RTT. This adds a --rtt-address
306      option.
307    '''
308
309    commands: set[str] = field(default_factory=lambda: set(_RUNNERCAPS_COMMANDS))
310    dev_id: bool = False
311    mult_dev_ids: bool = False
312    flash_addr: bool = False
313    erase: bool = False
314    reset: bool = False
315    extload: bool = False
316    tool_opt: bool = False
317    file: bool = False
318    hide_load_files: bool = False
319    rtt: bool = False  # This capability exists separately from the rtt command
320                       # to allow other commands to use the rtt address
321
322    def __post_init__(self):
323        if self.mult_dev_ids and not self.dev_id:
324            raise RuntimeError('dev_id must be set along mult_dev_ids')
325        if not self.commands.issubset(_RUNNERCAPS_COMMANDS):
326            raise ValueError(f'{self.commands=} contains invalid command')
327
328
329def _missing_cap(cls: type['ZephyrBinaryRunner'], option: str) -> NoReturn:
330    # Helper function that's called when an option was given on the
331    # command line that corresponds to a missing capability in the
332    # runner class cls.
333
334    raise ValueError(f"{cls.name()} doesn't support {option} option")
335
336
337class FileType(Enum):
338    OTHER = 0
339    HEX = 1
340    BIN = 2
341    ELF = 3
342    MOT = 4
343
344
345class RunnerConfig(NamedTuple):
346    '''Runner execution-time configuration.
347
348    This is a common object shared by all runners. Individual runners
349    can register specific configuration options using their
350    do_add_parser() hooks.
351    '''
352    build_dir: str                  # application build directory
353    board_dir: str                  # board definition directory
354    elf_file: str | None         # zephyr.elf path, or None
355    exe_file: str | None         # zephyr.exe path, or None
356    hex_file: str | None         # zephyr.hex path, or None
357    bin_file: str | None         # zephyr.bin path, or None
358    uf2_file: str | None         # zephyr.uf2 path, or None
359    mot_file: str | None         # zephyr.mot path
360    file: str | None             # binary file path (provided by the user), or None
361    file_type: FileType | None = FileType.OTHER  # binary file type
362    gdb: str | None = None       # path to a usable gdb
363    openocd: str | None = None   # path to a usable openocd
364    openocd_search: list[str] = []  # add these paths to the openocd search path
365    rtt_address: int | None = None # address of the rtt control block
366
367
368_YN_CHOICES = ['Y', 'y', 'N', 'n', 'yes', 'no', 'YES', 'NO']
369
370
371class _DTFlashAction(argparse.Action):
372
373    def __call__(self, parser, namespace, values, option_string=None):
374        if values.lower().startswith('y'):
375            namespace.dt_flash = True
376        else:
377            namespace.dt_flash = False
378
379
380class _ToggleAction(argparse.Action):
381
382    def __call__(self, parser, args, ignored, option):
383        setattr(args, self.dest, not option.startswith('--no-'))
384
385class DeprecatedAction(argparse.Action):
386
387    def __call__(self, parser, namespace, values, option_string=None):
388        _logger.warning(f'Argument {self.option_strings[0]} is deprecated' +
389                        (f' for your runner {self._cls.name()}'  if self._cls is not None else '') +
390                        f', use {self._replacement} instead.')
391        setattr(namespace, self.dest, values)
392
393def depr_action(*args, cls=None, replacement=None, **kwargs):
394    action = DeprecatedAction(*args, **kwargs)
395    action._cls = cls
396    action._replacement = replacement
397    return action
398
399class ZephyrBinaryRunner(abc.ABC):
400    '''Abstract superclass for binary runners (flashers, debuggers).
401
402    **Note**: this class's API has changed relatively rarely since it
403    as added, but it is not considered a stable Zephyr API, and may change
404    without notice.
405
406    With some exceptions, boards supported by Zephyr must provide
407    generic means to be flashed (have a Zephyr firmware binary
408    permanently installed on the device for running) and debugged
409    (have a breakpoint debugger and program loader on a host
410    workstation attached to a running target).
411
412    This is supported by four top-level commands managed by the
413    Zephyr build system:
414
415    - 'flash': flash a previously configured binary to the board,
416      start execution on the target, then return.
417
418    - 'debug': connect to the board via a debugging protocol, program
419      the flash, then drop the user into a debugger interface with
420      symbol tables loaded from the current binary, and block until it
421      exits.
422
423    - 'debugserver': connect via a board-specific debugging protocol,
424      then reset and halt the target. Ensure the user is now able to
425      connect to a debug server with symbol tables loaded from the
426      binary.
427
428    - 'attach': connect to the board via a debugging protocol, then drop
429      the user into a debugger interface with symbol tables loaded from
430      the current binary, and block until it exits. Unlike 'debug', this
431      command does not program the flash.
432
433    This class provides an API for these commands. Every subclass is
434    called a 'runner' for short. Each runner has a name (like
435    'pyocd'), and declares commands it can handle (like
436    'flash'). Boards (like 'nrf52dk/nrf52832') declare which runner(s)
437    are compatible with them to the Zephyr build system, along with
438    information on how to configure the runner to work with the board.
439
440    The build system will then place enough information in the build
441    directory to create and use runners with this class's create()
442    method, which provides a command line argument parsing API. You
443    can also create runners by instantiating subclasses directly.
444
445    In order to define your own runner, you need to:
446
447    1. Define a ZephyrBinaryRunner subclass, and implement its
448       abstract methods. You may need to override capabilities().
449
450    2. Make sure the Python module defining your runner class is
451       imported, e.g. by editing this package's __init__.py (otherwise,
452       get_runners() won't work).
453
454    3. Give your runner's name to the Zephyr build system in your
455       board's board.cmake.
456
457    Additional advice:
458
459    - If you need to import any non-standard-library modules, make sure
460      to catch ImportError and defer complaints about it to a RuntimeError
461      if one is missing. This avoids affecting users that don't require your
462      runner, while still making it clear what went wrong to users that do
463      require it that don't have the necessary modules installed.
464
465    - If you need to ask the user something (e.g. using input()), do it
466      in your create() classmethod, not do_run(). That ensures your
467      __init__() really has everything it needs to call do_run(), and also
468      avoids calling input() when not instantiating within a command line
469      application.
470
471    - Use self.logger to log messages using the standard library's
472      logging API; your logger is named "runner.<your-runner-name()>"
473
474    For command-line invocation from the Zephyr build system, runners
475    define their own argparse-based interface through the common
476    add_parser() (and runner-specific do_add_parser() it delegates
477    to), and provide a way to create instances of themselves from
478    a RunnerConfig and parsed runner-specific arguments via create().
479
480    Runners use a variety of host tools and configuration values, the
481    user interface to which is abstracted by this class. Each runner
482    subclass should take any values it needs to execute one of these
483    commands in its constructor.  The actual command execution is
484    handled in the run() method.'''
485
486    def __init__(self, cfg: RunnerConfig):
487        '''Initialize core runner state.'''
488
489        self.cfg = cfg
490        '''RunnerConfig for this instance.'''
491
492        self.logger = logging.getLogger(f'runners.{self.name()}')
493        '''logging.Logger for this instance.'''
494
495    @staticmethod
496    def get_runners() -> list[type['ZephyrBinaryRunner']]:
497        '''Get a list of all currently defined runner classes.'''
498        def inheritors(klass):
499            subclasses = set()
500            work = [klass]
501            while work:
502                parent = work.pop()
503                for child in parent.__subclasses__():
504                    if child not in subclasses:
505                        if not isabstract(child):
506                            subclasses.add(child)
507                        work.append(child)
508            return subclasses
509
510        return inheritors(ZephyrBinaryRunner)
511
512    @classmethod
513    @abc.abstractmethod
514    def name(cls) -> str:
515        '''Return this runner's user-visible name.
516
517        When choosing a name, pick something short and lowercase,
518        based on the name of the tool (like openocd, jlink, etc.) or
519        the target architecture/board (like xtensa etc.).'''
520
521    @classmethod
522    def capabilities(cls) -> RunnerCaps:
523        '''Returns a RunnerCaps representing this runner's capabilities.
524
525        This implementation returns the default capabilities.
526
527        Subclasses should override appropriately if needed.'''
528        return RunnerCaps()
529
530    @classmethod
531    def add_parser(cls, parser):
532        '''Adds a sub-command parser for this runner.
533
534        The given object, parser, is a sub-command parser from the
535        argparse module. For more details, refer to the documentation
536        for argparse.ArgumentParser.add_subparsers().
537
538        The lone common optional argument is:
539
540        * --dt-flash (if the runner capabilities includes flash_addr)
541
542        Runner-specific options are added through the do_add_parser()
543        hook.'''
544        # Unfortunately, the parser argument's type is not documented
545        # in typeshed, so we can't type annotate much here.
546
547        # Common options that depend on runner capabilities. If a
548        # capability is not supported, the option string or strings
549        # are added anyway, to prevent an individual runner class from
550        # using them to mean something else.
551        caps = cls.capabilities()
552
553        if caps.dev_id:
554            action = 'append' if caps.mult_dev_ids else 'store'
555            parser.add_argument('-i', '--dev-id',
556                                action=action,
557                                dest='dev_id',
558                                help=cls.dev_id_help())
559        else:
560            parser.add_argument('-i', '--dev-id', help=argparse.SUPPRESS)
561
562        if caps.flash_addr:
563            parser.add_argument('--dt-flash', default=False, choices=_YN_CHOICES,
564                                action=_DTFlashAction,
565                                help='''If 'yes', try to use flash address
566                                information from devicetree when flash
567                                addresses are unknown (e.g. when flashing a .bin)''')
568        else:
569            parser.add_argument('--dt-flash', help=argparse.SUPPRESS)
570
571        if caps.file:
572            parser.add_argument('-f', '--file',
573                                dest='file',
574                                help="path to binary file")
575            parser.add_argument('-t', '--file-type',
576                                dest='file_type',
577                                help="type of binary file")
578        else:
579            parser.add_argument('-f', '--file', help=argparse.SUPPRESS)
580            parser.add_argument('-t', '--file-type', help=argparse.SUPPRESS)
581
582        if caps.hide_load_files:
583            parser.add_argument('--elf-file', help=argparse.SUPPRESS)
584            parser.add_argument('--hex-file', help=argparse.SUPPRESS)
585            parser.add_argument('--bin-file', help=argparse.SUPPRESS)
586            parser.add_argument('--mot-file', help=argparse.SUPPRESS)
587        else:
588            parser.add_argument('--elf-file',
589                                metavar='FILE',
590                                action=(partial(depr_action, cls=cls,
591                                                replacement='-f/--file') if caps.file else None),
592                                help='path to zephyr.elf'
593                                if not caps.file else 'Deprecated, use -f/--file instead.')
594            parser.add_argument('--hex-file',
595                                metavar='FILE',
596                                action=(partial(depr_action, cls=cls,
597                                                replacement='-f/--file') if caps.file else None),
598                                help='path to zephyr.hex'
599                                if not caps.file else 'Deprecated, use -f/--file instead.')
600            parser.add_argument('--bin-file',
601                                metavar='FILE',
602                                action=(partial(depr_action, cls=cls,
603                                                replacement='-f/--file') if caps.file else None),
604                                help='path to zephyr.bin'
605                                if not caps.file else 'Deprecated, use -f/--file instead.')
606            parser.add_argument('--mot-file',
607                                metavar='FILE',
608                                action=(partial(depr_action, cls=cls,
609                                                replacement='-f/--file') if caps.file else None),
610                                help='path to zephyr.mot'
611                                if not caps.file else 'Deprecated, use -f/--file instead.')
612
613        parser.add_argument('--erase', '--no-erase', nargs=0,
614                            action=_ToggleAction,
615                            help=("mass erase flash before loading, or don't. "
616                                  "Default action depends on each specific runner."
617                                  if caps.erase else argparse.SUPPRESS))
618
619        parser.add_argument('--reset', '--no-reset', nargs=0,
620                            action=_ToggleAction,
621                            help=("reset device after flashing, or don't. "
622                                  "Default action depends on each specific runner."
623                                  if caps.reset else argparse.SUPPRESS))
624
625        parser.add_argument('--extload', dest='extload',
626                            help=(cls.extload_help() if caps.extload
627                                  else argparse.SUPPRESS))
628
629        parser.add_argument('-O', '--tool-opt', dest='tool_opt',
630                            default=[], action='append',
631                            help=(cls.tool_opt_help() if caps.tool_opt
632                                  else argparse.SUPPRESS))
633
634        if caps.rtt:
635            parser.add_argument('--rtt-address', dest='rtt_address',
636                                type=lambda x: int(x, 0),
637                                help="""address of RTT control block. If not supplied,
638                                it will be autodetected if possible""")
639        else:
640            parser.add_argument('--rtt-address', help=argparse.SUPPRESS)
641
642        # Runner-specific options.
643        cls.do_add_parser(parser)
644
645    @classmethod
646    @abc.abstractmethod
647    def do_add_parser(cls, parser):
648        '''Hook for adding runner-specific options.'''
649
650    @classmethod  # noqa: B027
651    def args_from_previous_runner(cls, previous_runner,
652                                  args: argparse.Namespace):
653        '''Update arguments from a previously created runner.
654
655        This is intended for propagating relevant user responses
656        between multiple runs of the same runner, for example a
657        JTAG serial number.'''
658
659    @classmethod
660    def create(cls, cfg: RunnerConfig,
661               args: argparse.Namespace) -> 'ZephyrBinaryRunner':
662        '''Create an instance from command-line arguments.
663
664        - ``cfg``: runner configuration (pass to superclass __init__)
665        - ``args``: arguments parsed from execution environment, as
666          specified by ``add_parser()``.'''
667        caps = cls.capabilities()
668        if args.dev_id and not caps.dev_id:
669            _missing_cap(cls, '--dev-id')
670        if args.dt_flash and not caps.flash_addr:
671            _missing_cap(cls, '--dt-flash')
672        if args.erase and not caps.erase:
673            _missing_cap(cls, '--erase')
674        if args.reset and not caps.reset:
675            _missing_cap(cls, '--reset')
676        if args.extload and not caps.extload:
677            _missing_cap(cls, '--extload')
678        if args.tool_opt and not caps.tool_opt:
679            _missing_cap(cls, '--tool-opt')
680        if args.file and not caps.file:
681            _missing_cap(cls, '--file')
682        if args.file_type and not args.file:
683            raise ValueError("--file-type requires --file")
684        if args.file_type and not caps.file:
685            _missing_cap(cls, '--file-type')
686        if args.rtt_address and not caps.rtt:
687            _missing_cap(cls, '--rtt-address')
688
689        ret = cls.do_create(cfg, args)
690        if args.erase:
691            ret.logger.info('mass erase requested')
692        if args.reset:
693            ret.logger.info('reset after flashing requested')
694        return ret
695
696    @classmethod
697    @abc.abstractmethod
698    def do_create(cls, cfg: RunnerConfig,
699                  args: argparse.Namespace) -> 'ZephyrBinaryRunner':
700        '''Hook for instance creation from command line arguments.'''
701
702    @staticmethod
703    def get_flash_address(args: argparse.Namespace,
704                          build_conf: BuildConfiguration,
705                          default: int = 0x0) -> int:
706        '''Helper method for extracting a flash address.
707
708        If args.dt_flash is true, returns the address obtained from
709        ZephyrBinaryRunner.flash_address_from_build_conf(build_conf).
710
711        Otherwise (when args.dt_flash is False), the default value is
712        returned.'''
713        if args.dt_flash:
714            return ZephyrBinaryRunner.flash_address_from_build_conf(build_conf)
715        else:
716            return default
717
718    @staticmethod
719    def flash_address_from_build_conf(build_conf: BuildConfiguration):
720        '''If CONFIG_HAS_FLASH_LOAD_OFFSET is n in build_conf,
721        return the CONFIG_FLASH_BASE_ADDRESS value. Otherwise, return
722        CONFIG_FLASH_BASE_ADDRESS + CONFIG_FLASH_LOAD_OFFSET.
723        '''
724        if build_conf.getboolean('CONFIG_HAS_FLASH_LOAD_OFFSET'):
725            return (build_conf['CONFIG_FLASH_BASE_ADDRESS'] +
726                    build_conf['CONFIG_FLASH_LOAD_OFFSET'])
727        else:
728            return build_conf['CONFIG_FLASH_BASE_ADDRESS']
729
730    @staticmethod
731    def sram_address_from_build_conf(build_conf: BuildConfiguration):
732        '''return CONFIG_SRAM_BASE_ADDRESS.
733        '''
734        return build_conf['CONFIG_SRAM_BASE_ADDRESS']
735
736    def run(self, command: str, **kwargs):
737        '''Runs command ('flash', 'debug', 'debugserver', 'attach').
738
739        This is the main entry point to this runner.'''
740        caps = self.capabilities()
741        if command not in caps.commands:
742            raise ValueError(f'runner {self.name()} does not implement command {command}')
743        self.do_run(command, **kwargs)
744
745    @abc.abstractmethod
746    def do_run(self, command: str, **kwargs):
747        '''Concrete runner; run() delegates to this. Implement in subclasses.
748
749        In case of an unsupported command, raise a ValueError.'''
750
751    @property
752    def build_conf(self) -> BuildConfiguration:
753        '''Get a BuildConfiguration for the build directory.'''
754        if not hasattr(self, '_build_conf'):
755            self._build_conf = BuildConfiguration(self.cfg.build_dir)
756        return self._build_conf
757
758    @property
759    def sysbuild_conf(self) -> SysbuildConfiguration:
760        '''Get a SysbuildConfiguration for the sysbuild directory.'''
761        if not hasattr(self, '_sysbuild_conf'):
762            self._sysbuild_conf = SysbuildConfiguration(os.path.dirname(self.cfg.build_dir))
763        return self._sysbuild_conf
764
765    @property
766    def thread_info_enabled(self) -> bool:
767        '''Returns True if self.build_conf has
768        CONFIG_DEBUG_THREAD_INFO enabled.
769        '''
770        return self.build_conf.getboolean('CONFIG_DEBUG_THREAD_INFO')
771
772    @classmethod
773    def dev_id_help(cls) -> str:
774        ''' Get the ArgParse help text for the --dev-id option.'''
775        help = '''Device identifier. Use it to select
776                  which debugger, device, node or instance to
777                  target when multiple ones are available or
778                  connected.'''
779        addendum = '''\nThis option can be present multiple times.''' if \
780                   cls.capabilities().mult_dev_ids else ''
781        return help + addendum
782
783    @classmethod
784    def extload_help(cls) -> str:
785        ''' Get the ArgParse help text for the --extload option.'''
786        return '''External loader to be used by stm32cubeprogrammer
787                  to program the targeted external memory.
788                  The runner requires the external loader (*.stldr) filename.
789                  This external loader (*.stldr) must be located within
790                  STM32CubeProgrammer/bin/ExternalLoader directory.'''
791
792    @classmethod
793    def tool_opt_help(cls) -> str:
794        ''' Get the ArgParse help text for the --tool-opt option.'''
795        return '''Option to pass on to the underlying tool used
796                  by this runner. This can be given multiple times;
797                  the resulting arguments will be given to the tool
798                  in the order they appear on the command line.'''
799
800    @staticmethod
801    def require(program: str, path: str | None = None) -> str:
802        '''Require that a program is installed before proceeding.
803
804        :param program: name of the program that is required,
805                        or path to a program binary.
806        :param path:    PATH where to search for the program binary.
807                        By default check on the system PATH.
808
809        If ``program`` is an absolute path to an existing program
810        binary, this call succeeds. Otherwise, try to find the program
811        by name on the system PATH or in the given PATH, if provided.
812
813        If the program can be found, its path is returned.
814        Otherwise, raises MissingProgram.'''
815        ret = shutil.which(program, path=path)
816        if ret is None:
817            raise MissingProgram(program)
818        return ret
819
820    def get_rtt_address(self) -> int | None:
821        '''Helper method for extracting a the RTT control block address.
822
823        If args.rtt_address was supplied, returns that.
824
825        Otherwise, attempt to locate an rtt block in the elf file.
826        If this is not found, None is returned'''
827        if self.cfg.rtt_address is not None:
828            return self.cfg.rtt_address
829        elif self.cfg.elf_file is not None:
830            return find_rtt_block(self.cfg.elf_file)
831        return None
832
833    def run_server_and_client(self, server, client, **kwargs):
834        '''Run a server that ignores SIGINT, and a client that handles it.
835
836        This routine portably:
837
838        - creates a Popen object for the ``server`` command which ignores
839          SIGINT
840        - runs ``client`` in a subprocess while temporarily ignoring SIGINT
841        - cleans up the server after the client exits.
842        - the keyword arguments, if any, will be passed down to both server and
843          client subprocess calls
844
845        It's useful to e.g. open a GDB server and client.'''
846        server_proc = self.popen_ignore_int(server, **kwargs)
847        try:
848            self.run_client(client, **kwargs)
849        finally:
850            server_proc.terminate()
851            server_proc.wait()
852
853    def run_client(self, client, **kwargs):
854        '''Run a client that handles SIGINT.'''
855        previous = signal.signal(signal.SIGINT, signal.SIG_IGN)
856        try:
857            self.check_call(client, **kwargs)
858        finally:
859            signal.signal(signal.SIGINT, previous)
860
861    def _log_cmd(self, cmd: list[str]):
862        escaped = ' '.join(shlex.quote(s) for s in cmd)
863        if not _DRY_RUN:
864            self.logger.debug(escaped)
865        else:
866            self.logger.info(escaped)
867
868    def call(self, cmd: list[str], **kwargs) -> int:
869        '''Subclass subprocess.call() wrapper.
870
871        Subclasses should use this method to run command in a
872        subprocess and get its return code, rather than
873        using subprocess directly, to keep accurate debug logs.
874        '''
875        self._log_cmd(cmd)
876        if _DRY_RUN:
877            return 0
878        return subprocess.call(cmd, **kwargs)
879
880    def check_call(self, cmd: list[str], **kwargs):
881        '''Subclass subprocess.check_call() wrapper.
882
883        Subclasses should use this method to run command in a
884        subprocess and check that it executed correctly, rather than
885        using subprocess directly, to keep accurate debug logs.
886        '''
887        self._log_cmd(cmd)
888        if _DRY_RUN:
889            return
890        subprocess.check_call(cmd, **kwargs)
891
892    def check_output(self, cmd: list[str], **kwargs) -> bytes:
893        '''Subclass subprocess.check_output() wrapper.
894
895        Subclasses should use this method to run command in a
896        subprocess and check that it executed correctly, rather than
897        using subprocess directly, to keep accurate debug logs.
898        '''
899        self._log_cmd(cmd)
900        if _DRY_RUN:
901            return b''
902        return subprocess.check_output(cmd, **kwargs)
903
904    def popen_ignore_int(self, cmd: list[str], **kwargs) -> subprocess.Popen:
905        '''Spawn a child command, ensuring it ignores SIGINT.
906
907        The returned subprocess.Popen object must be manually terminated.'''
908        cflags = 0
909        preexec = None
910        system = platform.system()
911
912        if system == 'Windows':
913            # We can't type check this line on Unix operating systems:
914            # mypy thinks the subprocess module has no such attribute.
915            cflags |= subprocess.CREATE_NEW_PROCESS_GROUP  # type: ignore
916        elif system in {'Linux', 'Darwin'}:
917            # We can't type check this on Windows for the same reason.
918            preexec = os.setsid # type: ignore
919
920        self._log_cmd(cmd)
921        if _DRY_RUN:
922            return _DebugDummyPopen()  # type: ignore
923
924        return subprocess.Popen(cmd, creationflags=cflags, preexec_fn=preexec, **kwargs)
925
926    def ensure_output(self, output_type: str) -> None:
927        '''Ensure self.cfg has a particular output artifact.
928
929        For example, ensure_output('bin') ensures that self.cfg.bin_file
930        refers to an existing file. Errors out if it's missing or undefined.
931
932        :param output_type: string naming the output type
933        '''
934        output_file = getattr(self.cfg, f'{output_type}_file', None)
935
936        if output_file is None:
937            err = f'{output_type} file location is unknown.'
938        elif not os.path.isfile(output_file):
939            err = f'{output_file} does not exist.'
940        else:
941            return
942
943        if output_type in ('elf', 'hex', 'bin', 'uf2'):
944            err += f' Try enabling CONFIG_BUILD_OUTPUT_{output_type.upper()}.'
945
946        # RuntimeError avoids a stack trace saved in run_common.
947        raise RuntimeError(err)
948
949    def run_telnet_client(self, host: str, port: int, active_sock=None) -> None:
950        '''
951        Run a telnet client for user interaction.
952        '''
953        # If the caller passed in an active socket, use that
954        if active_sock is not None:
955            sock = active_sock
956        elif shutil.which('nc') is not None:
957            # If a `nc` command is available, run it, as it will provide the
958            # best support for CONFIG_SHELL_VT100_COMMANDS etc.
959            client_cmd = ['nc', host, str(port)]
960            # Note: netcat (nc) does not handle sigint, so cannot use run_client()
961            self.check_call(client_cmd)
962            return
963        else:
964            # Start a new socket connection
965            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
966            sock.connect((host, port))
967
968        # Otherwise, use a pure python implementation. This will work well for logging,
969        # but input is line based only.
970        sel = selectors.DefaultSelector()
971        sel.register(sys.stdin, selectors.EVENT_READ)
972        sel.register(sock, selectors.EVENT_READ)
973        while True:
974            events = sel.select()
975            for key, _ in events:
976                if key.fileobj == sys.stdin:
977                    text = sys.stdin.readline()
978                    if text:
979                        sock.send(text.encode())
980
981                elif key.fileobj == sock:
982                    resp = sock.recv(2048)
983                    if resp:
984                        print(resp.decode(), end='')
985