1#! /usr/bin/env python3 2 3# Copyright (c) 2017 Linaro Limited. 4# Copyright (c) 2017 Open Source Foundries Limited. 5# 6# SPDX-License-Identifier: Apache-2.0 7 8"""Zephyr binary runner core interfaces 9 10This provides the core ZephyrBinaryRunner class meant for public use, 11as well as some other helpers for concrete runner classes. 12""" 13 14import abc 15import argparse 16import errno 17import logging 18import os 19import platform 20import re 21import selectors 22import shlex 23import shutil 24import signal 25import socket 26import subprocess 27import sys 28from dataclasses import dataclass, field 29from enum import Enum 30from functools import partial 31from inspect import isabstract 32from typing import NamedTuple, NoReturn 33 34try: 35 from elftools.elf.elffile import ELFFile 36 ELFTOOLS_MISSING = False 37except ImportError: 38 ELFTOOLS_MISSING = True 39 40 41# Turn on to enable just logging the commands that would be run (at 42# info rather than debug level), without actually running them. This 43# can break runners that are expecting output or if one command 44# depends on another, so it's just for debugging. 45_DRY_RUN = False 46 47_logger = logging.getLogger('runners') 48 49# FIXME: I assume this code belongs somewhere else, but i couldn't figure out 50# a good location for it, so i put it here for now 51# We could potentially search for RTT blocks in hex or bin files as well, 52# but since the magic string is "SEGGER RTT", i thought it might be better 53# to avoid, at the risk of false positives. 54def find_rtt_block(elf_file: str) -> int | None: 55 if ELFTOOLS_MISSING: 56 raise RuntimeError('the Python dependency elftools was missing; ' 57 'see the getting started guide for details on ' 58 'how to fix') 59 60 with open(elf_file, 'rb') as f: 61 elffile = ELFFile(f) 62 for sect in elffile.iter_sections('SHT_SYMTAB'): 63 symbols = sect.get_symbol_by_name('_SEGGER_RTT') 64 if symbols is None: 65 continue 66 for s in symbols: 67 return s.entry.get('st_value') 68 return None 69 70 71class _DebugDummyPopen: 72 73 def terminate(self): 74 pass 75 76 def wait(self): 77 pass 78 79 80MAX_PORT = 49151 81 82 83class NetworkPortHelper: 84 '''Helper class for dealing with local IP network ports.''' 85 86 def get_unused_ports(self, starting_from): 87 '''Find unused network ports, starting at given values. 88 89 starting_from is an iterable of ports the caller would like to use. 90 91 The return value is an iterable of ports, in the same order, using 92 the given values if they were unused, or the next sequentially 93 available unused port otherwise. 94 95 Ports may be bound between this call's check and actual usage, so 96 callers still need to handle errors involving returned ports.''' 97 start = list(starting_from) 98 used = self._used_now() 99 ret = [] 100 101 for desired in start: 102 port = desired 103 while port in used: 104 port += 1 105 if port > MAX_PORT: 106 msg = "ports above {} are in use" 107 raise ValueError(msg.format(desired)) 108 used.add(port) 109 ret.append(port) 110 111 return ret 112 113 def _used_now(self): 114 handlers = { 115 'Windows': self._used_now_windows, 116 'Linux': self._used_now_linux, 117 'Darwin': self._used_now_darwin, 118 } 119 handler = handlers[platform.system()] 120 return handler() 121 122 def _used_now_windows(self): 123 cmd = ['netstat', '-a', '-n', '-p', 'tcp'] 124 return self._parser_windows(cmd) 125 126 def _used_now_linux(self): 127 cmd = ['ss', '-a', '-n', '-t'] 128 return self._parser_linux(cmd) 129 130 def _used_now_darwin(self): 131 cmd = ['netstat', '-a', '-n', '-p', 'tcp'] 132 return self._parser_darwin(cmd) 133 134 @staticmethod 135 def _parser_windows(cmd): 136 out = subprocess.check_output(cmd).split(b'\r\n') 137 used_bytes = [x.split()[1].rsplit(b':', 1)[1] for x in out 138 if x.startswith(b' TCP')] 139 return {int(b) for b in used_bytes} 140 141 @staticmethod 142 def _parser_linux(cmd): 143 out = subprocess.check_output(cmd).splitlines()[1:] 144 used_bytes = [s.split()[3].rsplit(b':', 1)[1] for s in out] 145 return {int(b) for b in used_bytes} 146 147 @staticmethod 148 def _parser_darwin(cmd): 149 out = subprocess.check_output(cmd).split(b'\n') 150 used_bytes = [x.split()[3].rsplit(b':', 1)[1] for x in out 151 if x.startswith(b'tcp')] 152 return {int(b) for b in used_bytes} 153 154 155class BuildConfiguration: 156 '''This helper class provides access to build-time configuration. 157 158 Configuration options can be read as if the object were a dict, 159 either object['CONFIG_FOO'] or object.get('CONFIG_FOO'). 160 161 Kconfig configuration values are available (parsed from .config).''' 162 163 config_prefix = 'CONFIG' 164 165 def __init__(self, build_dir: str): 166 self.build_dir = build_dir 167 self.options: dict[str, str | int] = {} 168 self.path = os.path.join(self.build_dir, 'zephyr', '.config') 169 self._parse() 170 171 def __contains__(self, item): 172 return item in self.options 173 174 def __getitem__(self, item): 175 return self.options[item] 176 177 def get(self, option, *args): 178 return self.options.get(option, *args) 179 180 def getboolean(self, option): 181 '''If a boolean option is explicitly set to y or n, 182 returns its value. Otherwise, falls back to False. 183 ''' 184 return self.options.get(option, False) 185 186 def _parse(self): 187 filename = self.path 188 189 opt_value = re.compile(f'^(?P<option>{self.config_prefix}_[A-Za-z0-9_]+)=(?P<value>.*)$') 190 not_set = re.compile(f'^# (?P<option>{self.config_prefix}_[A-Za-z0-9_]+) is not set$') 191 192 with open(filename) as f: 193 for line in f: 194 match = opt_value.match(line) 195 if match: 196 value = match.group('value').rstrip() 197 if value.startswith('"') and value.endswith('"'): 198 # A string literal should have the quotes stripped, 199 # but otherwise be left as is. 200 value = value[1:-1] 201 elif value == 'y': 202 # The character 'y' is a boolean option 203 # that is set to True. 204 value = True 205 else: 206 # Neither a string nor 'y', so try to parse it 207 # as an integer. 208 try: 209 base = 16 if value.startswith('0x') else 10 210 self.options[match.group('option')] = int(value, base=base) 211 continue 212 except ValueError: 213 pass 214 215 self.options[match.group('option')] = value 216 continue 217 218 match = not_set.match(line) 219 if match: 220 # '# CONFIG_FOO is not set' means a boolean option is false. 221 self.options[match.group('option')] = False 222 223class SysbuildConfiguration(BuildConfiguration): 224 '''This helper class provides access to sysbuild-time configuration. 225 226 Configuration options can be read as if the object were a dict, 227 either object['SB_CONFIG_FOO'] or object.get('SB_CONFIG_FOO'). 228 229 Kconfig configuration values are available (parsed from .config).''' 230 231 config_prefix = 'SB_CONFIG' 232 233 def _parse(self): 234 # If the build does not use sysbuild, skip parsing the file. 235 if not os.path.exists(self.path): 236 return 237 super()._parse() 238 239class MissingProgram(FileNotFoundError): 240 '''FileNotFoundError subclass for missing program dependencies. 241 242 No significant changes from the parent FileNotFoundError; this is 243 useful for explicitly signaling that the file in question is a 244 program that some class requires to proceed. 245 246 The filename attribute contains the missing program.''' 247 248 def __init__(self, program): 249 super().__init__(errno.ENOENT, os.strerror(errno.ENOENT), program) 250 251 252_RUNNERCAPS_COMMANDS = {'flash', 'debug', 'debugserver', 'attach', 'simulate', 'robot', 'rtt'} 253 254@dataclass 255class RunnerCaps: 256 '''This class represents a runner class's capabilities. 257 258 Each capability is represented as an attribute with the same 259 name. Flag attributes are True or False. 260 261 Available capabilities: 262 263 - commands: set of supported commands; default is {'flash', 264 'debug', 'debugserver', 'attach', 'simulate', 'robot', 'rtt'}. 265 266 - dev_id: whether the runner supports device identifiers, in the form of an 267 -i, --dev-id option. This is useful when the user has multiple debuggers 268 connected to a single computer, in order to select which one will be used 269 with the command provided. 270 271 - mult_dev_ids: whether the runner supports multiple device identifiers 272 for a single operation, allowing for bulk flashing of devices. 273 274 - flash_addr: whether the runner supports flashing to an 275 arbitrary address. Default is False. If true, the runner 276 must honor the --dt-flash option. 277 278 - erase: whether the runner supports an --erase option, which 279 does a mass-erase of the entire addressable flash on the target 280 before flashing. On multi-core SoCs, this may only erase portions of 281 flash specific the actual target core. (This option can be useful for 282 things like clearing out old settings values or other subsystem state 283 that may affect the behavior of the zephyr image. It is also sometimes 284 needed by SoCs which have flash-like areas that can't be sector 285 erased by the underlying tool before flashing; UICR on nRF SoCs 286 is one example.) 287 288 - reset: whether the runner supports a --reset option, which 289 resets the device after a flash operation is complete. 290 291 - extload: whether the runner supports a --extload option, which 292 must be given one time and is passed on to the underlying tool 293 that the runner wraps. 294 295 - tool_opt: whether the runner supports a --tool-opt (-O) option, which 296 can be given multiple times and is passed on to the underlying tool 297 that the runner wraps. 298 299 - file: whether the runner supports a --file option, which specifies 300 exactly the file that should be used to flash, overriding any default 301 discovered in the build directory. 302 303 - hide_load_files: whether the elf/hex/bin file arguments should be hidden. 304 305 - rtt: whether the runner supports SEGGER RTT. This adds a --rtt-address 306 option. 307 ''' 308 309 commands: set[str] = field(default_factory=lambda: set(_RUNNERCAPS_COMMANDS)) 310 dev_id: bool = False 311 mult_dev_ids: bool = False 312 flash_addr: bool = False 313 erase: bool = False 314 reset: bool = False 315 extload: bool = False 316 tool_opt: bool = False 317 file: bool = False 318 hide_load_files: bool = False 319 rtt: bool = False # This capability exists separately from the rtt command 320 # to allow other commands to use the rtt address 321 322 def __post_init__(self): 323 if self.mult_dev_ids and not self.dev_id: 324 raise RuntimeError('dev_id must be set along mult_dev_ids') 325 if not self.commands.issubset(_RUNNERCAPS_COMMANDS): 326 raise ValueError(f'{self.commands=} contains invalid command') 327 328 329def _missing_cap(cls: type['ZephyrBinaryRunner'], option: str) -> NoReturn: 330 # Helper function that's called when an option was given on the 331 # command line that corresponds to a missing capability in the 332 # runner class cls. 333 334 raise ValueError(f"{cls.name()} doesn't support {option} option") 335 336 337class FileType(Enum): 338 OTHER = 0 339 HEX = 1 340 BIN = 2 341 ELF = 3 342 MOT = 4 343 344 345class RunnerConfig(NamedTuple): 346 '''Runner execution-time configuration. 347 348 This is a common object shared by all runners. Individual runners 349 can register specific configuration options using their 350 do_add_parser() hooks. 351 ''' 352 build_dir: str # application build directory 353 board_dir: str # board definition directory 354 elf_file: str | None # zephyr.elf path, or None 355 exe_file: str | None # zephyr.exe path, or None 356 hex_file: str | None # zephyr.hex path, or None 357 bin_file: str | None # zephyr.bin path, or None 358 uf2_file: str | None # zephyr.uf2 path, or None 359 mot_file: str | None # zephyr.mot path 360 file: str | None # binary file path (provided by the user), or None 361 file_type: FileType | None = FileType.OTHER # binary file type 362 gdb: str | None = None # path to a usable gdb 363 openocd: str | None = None # path to a usable openocd 364 openocd_search: list[str] = [] # add these paths to the openocd search path 365 rtt_address: int | None = None # address of the rtt control block 366 367 368_YN_CHOICES = ['Y', 'y', 'N', 'n', 'yes', 'no', 'YES', 'NO'] 369 370 371class _DTFlashAction(argparse.Action): 372 373 def __call__(self, parser, namespace, values, option_string=None): 374 if values.lower().startswith('y'): 375 namespace.dt_flash = True 376 else: 377 namespace.dt_flash = False 378 379 380class _ToggleAction(argparse.Action): 381 382 def __call__(self, parser, args, ignored, option): 383 setattr(args, self.dest, not option.startswith('--no-')) 384 385class DeprecatedAction(argparse.Action): 386 387 def __call__(self, parser, namespace, values, option_string=None): 388 _logger.warning(f'Argument {self.option_strings[0]} is deprecated' + 389 (f' for your runner {self._cls.name()}' if self._cls is not None else '') + 390 f', use {self._replacement} instead.') 391 setattr(namespace, self.dest, values) 392 393def depr_action(*args, cls=None, replacement=None, **kwargs): 394 action = DeprecatedAction(*args, **kwargs) 395 action._cls = cls 396 action._replacement = replacement 397 return action 398 399class ZephyrBinaryRunner(abc.ABC): 400 '''Abstract superclass for binary runners (flashers, debuggers). 401 402 **Note**: this class's API has changed relatively rarely since it 403 as added, but it is not considered a stable Zephyr API, and may change 404 without notice. 405 406 With some exceptions, boards supported by Zephyr must provide 407 generic means to be flashed (have a Zephyr firmware binary 408 permanently installed on the device for running) and debugged 409 (have a breakpoint debugger and program loader on a host 410 workstation attached to a running target). 411 412 This is supported by four top-level commands managed by the 413 Zephyr build system: 414 415 - 'flash': flash a previously configured binary to the board, 416 start execution on the target, then return. 417 418 - 'debug': connect to the board via a debugging protocol, program 419 the flash, then drop the user into a debugger interface with 420 symbol tables loaded from the current binary, and block until it 421 exits. 422 423 - 'debugserver': connect via a board-specific debugging protocol, 424 then reset and halt the target. Ensure the user is now able to 425 connect to a debug server with symbol tables loaded from the 426 binary. 427 428 - 'attach': connect to the board via a debugging protocol, then drop 429 the user into a debugger interface with symbol tables loaded from 430 the current binary, and block until it exits. Unlike 'debug', this 431 command does not program the flash. 432 433 This class provides an API for these commands. Every subclass is 434 called a 'runner' for short. Each runner has a name (like 435 'pyocd'), and declares commands it can handle (like 436 'flash'). Boards (like 'nrf52dk/nrf52832') declare which runner(s) 437 are compatible with them to the Zephyr build system, along with 438 information on how to configure the runner to work with the board. 439 440 The build system will then place enough information in the build 441 directory to create and use runners with this class's create() 442 method, which provides a command line argument parsing API. You 443 can also create runners by instantiating subclasses directly. 444 445 In order to define your own runner, you need to: 446 447 1. Define a ZephyrBinaryRunner subclass, and implement its 448 abstract methods. You may need to override capabilities(). 449 450 2. Make sure the Python module defining your runner class is 451 imported, e.g. by editing this package's __init__.py (otherwise, 452 get_runners() won't work). 453 454 3. Give your runner's name to the Zephyr build system in your 455 board's board.cmake. 456 457 Additional advice: 458 459 - If you need to import any non-standard-library modules, make sure 460 to catch ImportError and defer complaints about it to a RuntimeError 461 if one is missing. This avoids affecting users that don't require your 462 runner, while still making it clear what went wrong to users that do 463 require it that don't have the necessary modules installed. 464 465 - If you need to ask the user something (e.g. using input()), do it 466 in your create() classmethod, not do_run(). That ensures your 467 __init__() really has everything it needs to call do_run(), and also 468 avoids calling input() when not instantiating within a command line 469 application. 470 471 - Use self.logger to log messages using the standard library's 472 logging API; your logger is named "runner.<your-runner-name()>" 473 474 For command-line invocation from the Zephyr build system, runners 475 define their own argparse-based interface through the common 476 add_parser() (and runner-specific do_add_parser() it delegates 477 to), and provide a way to create instances of themselves from 478 a RunnerConfig and parsed runner-specific arguments via create(). 479 480 Runners use a variety of host tools and configuration values, the 481 user interface to which is abstracted by this class. Each runner 482 subclass should take any values it needs to execute one of these 483 commands in its constructor. The actual command execution is 484 handled in the run() method.''' 485 486 def __init__(self, cfg: RunnerConfig): 487 '''Initialize core runner state.''' 488 489 self.cfg = cfg 490 '''RunnerConfig for this instance.''' 491 492 self.logger = logging.getLogger(f'runners.{self.name()}') 493 '''logging.Logger for this instance.''' 494 495 @staticmethod 496 def get_runners() -> list[type['ZephyrBinaryRunner']]: 497 '''Get a list of all currently defined runner classes.''' 498 def inheritors(klass): 499 subclasses = set() 500 work = [klass] 501 while work: 502 parent = work.pop() 503 for child in parent.__subclasses__(): 504 if child not in subclasses: 505 if not isabstract(child): 506 subclasses.add(child) 507 work.append(child) 508 return subclasses 509 510 return inheritors(ZephyrBinaryRunner) 511 512 @classmethod 513 @abc.abstractmethod 514 def name(cls) -> str: 515 '''Return this runner's user-visible name. 516 517 When choosing a name, pick something short and lowercase, 518 based on the name of the tool (like openocd, jlink, etc.) or 519 the target architecture/board (like xtensa etc.).''' 520 521 @classmethod 522 def capabilities(cls) -> RunnerCaps: 523 '''Returns a RunnerCaps representing this runner's capabilities. 524 525 This implementation returns the default capabilities. 526 527 Subclasses should override appropriately if needed.''' 528 return RunnerCaps() 529 530 @classmethod 531 def add_parser(cls, parser): 532 '''Adds a sub-command parser for this runner. 533 534 The given object, parser, is a sub-command parser from the 535 argparse module. For more details, refer to the documentation 536 for argparse.ArgumentParser.add_subparsers(). 537 538 The lone common optional argument is: 539 540 * --dt-flash (if the runner capabilities includes flash_addr) 541 542 Runner-specific options are added through the do_add_parser() 543 hook.''' 544 # Unfortunately, the parser argument's type is not documented 545 # in typeshed, so we can't type annotate much here. 546 547 # Common options that depend on runner capabilities. If a 548 # capability is not supported, the option string or strings 549 # are added anyway, to prevent an individual runner class from 550 # using them to mean something else. 551 caps = cls.capabilities() 552 553 if caps.dev_id: 554 action = 'append' if caps.mult_dev_ids else 'store' 555 parser.add_argument('-i', '--dev-id', 556 action=action, 557 dest='dev_id', 558 help=cls.dev_id_help()) 559 else: 560 parser.add_argument('-i', '--dev-id', help=argparse.SUPPRESS) 561 562 if caps.flash_addr: 563 parser.add_argument('--dt-flash', default=False, choices=_YN_CHOICES, 564 action=_DTFlashAction, 565 help='''If 'yes', try to use flash address 566 information from devicetree when flash 567 addresses are unknown (e.g. when flashing a .bin)''') 568 else: 569 parser.add_argument('--dt-flash', help=argparse.SUPPRESS) 570 571 if caps.file: 572 parser.add_argument('-f', '--file', 573 dest='file', 574 help="path to binary file") 575 parser.add_argument('-t', '--file-type', 576 dest='file_type', 577 help="type of binary file") 578 else: 579 parser.add_argument('-f', '--file', help=argparse.SUPPRESS) 580 parser.add_argument('-t', '--file-type', help=argparse.SUPPRESS) 581 582 if caps.hide_load_files: 583 parser.add_argument('--elf-file', help=argparse.SUPPRESS) 584 parser.add_argument('--hex-file', help=argparse.SUPPRESS) 585 parser.add_argument('--bin-file', help=argparse.SUPPRESS) 586 parser.add_argument('--mot-file', help=argparse.SUPPRESS) 587 else: 588 parser.add_argument('--elf-file', 589 metavar='FILE', 590 action=(partial(depr_action, cls=cls, 591 replacement='-f/--file') if caps.file else None), 592 help='path to zephyr.elf' 593 if not caps.file else 'Deprecated, use -f/--file instead.') 594 parser.add_argument('--hex-file', 595 metavar='FILE', 596 action=(partial(depr_action, cls=cls, 597 replacement='-f/--file') if caps.file else None), 598 help='path to zephyr.hex' 599 if not caps.file else 'Deprecated, use -f/--file instead.') 600 parser.add_argument('--bin-file', 601 metavar='FILE', 602 action=(partial(depr_action, cls=cls, 603 replacement='-f/--file') if caps.file else None), 604 help='path to zephyr.bin' 605 if not caps.file else 'Deprecated, use -f/--file instead.') 606 parser.add_argument('--mot-file', 607 metavar='FILE', 608 action=(partial(depr_action, cls=cls, 609 replacement='-f/--file') if caps.file else None), 610 help='path to zephyr.mot' 611 if not caps.file else 'Deprecated, use -f/--file instead.') 612 613 parser.add_argument('--erase', '--no-erase', nargs=0, 614 action=_ToggleAction, 615 help=("mass erase flash before loading, or don't. " 616 "Default action depends on each specific runner." 617 if caps.erase else argparse.SUPPRESS)) 618 619 parser.add_argument('--reset', '--no-reset', nargs=0, 620 action=_ToggleAction, 621 help=("reset device after flashing, or don't. " 622 "Default action depends on each specific runner." 623 if caps.reset else argparse.SUPPRESS)) 624 625 parser.add_argument('--extload', dest='extload', 626 help=(cls.extload_help() if caps.extload 627 else argparse.SUPPRESS)) 628 629 parser.add_argument('-O', '--tool-opt', dest='tool_opt', 630 default=[], action='append', 631 help=(cls.tool_opt_help() if caps.tool_opt 632 else argparse.SUPPRESS)) 633 634 if caps.rtt: 635 parser.add_argument('--rtt-address', dest='rtt_address', 636 type=lambda x: int(x, 0), 637 help="""address of RTT control block. If not supplied, 638 it will be autodetected if possible""") 639 else: 640 parser.add_argument('--rtt-address', help=argparse.SUPPRESS) 641 642 # Runner-specific options. 643 cls.do_add_parser(parser) 644 645 @classmethod 646 @abc.abstractmethod 647 def do_add_parser(cls, parser): 648 '''Hook for adding runner-specific options.''' 649 650 @classmethod # noqa: B027 651 def args_from_previous_runner(cls, previous_runner, 652 args: argparse.Namespace): 653 '''Update arguments from a previously created runner. 654 655 This is intended for propagating relevant user responses 656 between multiple runs of the same runner, for example a 657 JTAG serial number.''' 658 659 @classmethod 660 def create(cls, cfg: RunnerConfig, 661 args: argparse.Namespace) -> 'ZephyrBinaryRunner': 662 '''Create an instance from command-line arguments. 663 664 - ``cfg``: runner configuration (pass to superclass __init__) 665 - ``args``: arguments parsed from execution environment, as 666 specified by ``add_parser()``.''' 667 caps = cls.capabilities() 668 if args.dev_id and not caps.dev_id: 669 _missing_cap(cls, '--dev-id') 670 if args.dt_flash and not caps.flash_addr: 671 _missing_cap(cls, '--dt-flash') 672 if args.erase and not caps.erase: 673 _missing_cap(cls, '--erase') 674 if args.reset and not caps.reset: 675 _missing_cap(cls, '--reset') 676 if args.extload and not caps.extload: 677 _missing_cap(cls, '--extload') 678 if args.tool_opt and not caps.tool_opt: 679 _missing_cap(cls, '--tool-opt') 680 if args.file and not caps.file: 681 _missing_cap(cls, '--file') 682 if args.file_type and not args.file: 683 raise ValueError("--file-type requires --file") 684 if args.file_type and not caps.file: 685 _missing_cap(cls, '--file-type') 686 if args.rtt_address and not caps.rtt: 687 _missing_cap(cls, '--rtt-address') 688 689 ret = cls.do_create(cfg, args) 690 if args.erase: 691 ret.logger.info('mass erase requested') 692 if args.reset: 693 ret.logger.info('reset after flashing requested') 694 return ret 695 696 @classmethod 697 @abc.abstractmethod 698 def do_create(cls, cfg: RunnerConfig, 699 args: argparse.Namespace) -> 'ZephyrBinaryRunner': 700 '''Hook for instance creation from command line arguments.''' 701 702 @staticmethod 703 def get_flash_address(args: argparse.Namespace, 704 build_conf: BuildConfiguration, 705 default: int = 0x0) -> int: 706 '''Helper method for extracting a flash address. 707 708 If args.dt_flash is true, returns the address obtained from 709 ZephyrBinaryRunner.flash_address_from_build_conf(build_conf). 710 711 Otherwise (when args.dt_flash is False), the default value is 712 returned.''' 713 if args.dt_flash: 714 return ZephyrBinaryRunner.flash_address_from_build_conf(build_conf) 715 else: 716 return default 717 718 @staticmethod 719 def flash_address_from_build_conf(build_conf: BuildConfiguration): 720 '''If CONFIG_HAS_FLASH_LOAD_OFFSET is n in build_conf, 721 return the CONFIG_FLASH_BASE_ADDRESS value. Otherwise, return 722 CONFIG_FLASH_BASE_ADDRESS + CONFIG_FLASH_LOAD_OFFSET. 723 ''' 724 if build_conf.getboolean('CONFIG_HAS_FLASH_LOAD_OFFSET'): 725 return (build_conf['CONFIG_FLASH_BASE_ADDRESS'] + 726 build_conf['CONFIG_FLASH_LOAD_OFFSET']) 727 else: 728 return build_conf['CONFIG_FLASH_BASE_ADDRESS'] 729 730 @staticmethod 731 def sram_address_from_build_conf(build_conf: BuildConfiguration): 732 '''return CONFIG_SRAM_BASE_ADDRESS. 733 ''' 734 return build_conf['CONFIG_SRAM_BASE_ADDRESS'] 735 736 def run(self, command: str, **kwargs): 737 '''Runs command ('flash', 'debug', 'debugserver', 'attach'). 738 739 This is the main entry point to this runner.''' 740 caps = self.capabilities() 741 if command not in caps.commands: 742 raise ValueError(f'runner {self.name()} does not implement command {command}') 743 self.do_run(command, **kwargs) 744 745 @abc.abstractmethod 746 def do_run(self, command: str, **kwargs): 747 '''Concrete runner; run() delegates to this. Implement in subclasses. 748 749 In case of an unsupported command, raise a ValueError.''' 750 751 @property 752 def build_conf(self) -> BuildConfiguration: 753 '''Get a BuildConfiguration for the build directory.''' 754 if not hasattr(self, '_build_conf'): 755 self._build_conf = BuildConfiguration(self.cfg.build_dir) 756 return self._build_conf 757 758 @property 759 def sysbuild_conf(self) -> SysbuildConfiguration: 760 '''Get a SysbuildConfiguration for the sysbuild directory.''' 761 if not hasattr(self, '_sysbuild_conf'): 762 self._sysbuild_conf = SysbuildConfiguration(os.path.dirname(self.cfg.build_dir)) 763 return self._sysbuild_conf 764 765 @property 766 def thread_info_enabled(self) -> bool: 767 '''Returns True if self.build_conf has 768 CONFIG_DEBUG_THREAD_INFO enabled. 769 ''' 770 return self.build_conf.getboolean('CONFIG_DEBUG_THREAD_INFO') 771 772 @classmethod 773 def dev_id_help(cls) -> str: 774 ''' Get the ArgParse help text for the --dev-id option.''' 775 help = '''Device identifier. Use it to select 776 which debugger, device, node or instance to 777 target when multiple ones are available or 778 connected.''' 779 addendum = '''\nThis option can be present multiple times.''' if \ 780 cls.capabilities().mult_dev_ids else '' 781 return help + addendum 782 783 @classmethod 784 def extload_help(cls) -> str: 785 ''' Get the ArgParse help text for the --extload option.''' 786 return '''External loader to be used by stm32cubeprogrammer 787 to program the targeted external memory. 788 The runner requires the external loader (*.stldr) filename. 789 This external loader (*.stldr) must be located within 790 STM32CubeProgrammer/bin/ExternalLoader directory.''' 791 792 @classmethod 793 def tool_opt_help(cls) -> str: 794 ''' Get the ArgParse help text for the --tool-opt option.''' 795 return '''Option to pass on to the underlying tool used 796 by this runner. This can be given multiple times; 797 the resulting arguments will be given to the tool 798 in the order they appear on the command line.''' 799 800 @staticmethod 801 def require(program: str, path: str | None = None) -> str: 802 '''Require that a program is installed before proceeding. 803 804 :param program: name of the program that is required, 805 or path to a program binary. 806 :param path: PATH where to search for the program binary. 807 By default check on the system PATH. 808 809 If ``program`` is an absolute path to an existing program 810 binary, this call succeeds. Otherwise, try to find the program 811 by name on the system PATH or in the given PATH, if provided. 812 813 If the program can be found, its path is returned. 814 Otherwise, raises MissingProgram.''' 815 ret = shutil.which(program, path=path) 816 if ret is None: 817 raise MissingProgram(program) 818 return ret 819 820 def get_rtt_address(self) -> int | None: 821 '''Helper method for extracting a the RTT control block address. 822 823 If args.rtt_address was supplied, returns that. 824 825 Otherwise, attempt to locate an rtt block in the elf file. 826 If this is not found, None is returned''' 827 if self.cfg.rtt_address is not None: 828 return self.cfg.rtt_address 829 elif self.cfg.elf_file is not None: 830 return find_rtt_block(self.cfg.elf_file) 831 return None 832 833 def run_server_and_client(self, server, client, **kwargs): 834 '''Run a server that ignores SIGINT, and a client that handles it. 835 836 This routine portably: 837 838 - creates a Popen object for the ``server`` command which ignores 839 SIGINT 840 - runs ``client`` in a subprocess while temporarily ignoring SIGINT 841 - cleans up the server after the client exits. 842 - the keyword arguments, if any, will be passed down to both server and 843 client subprocess calls 844 845 It's useful to e.g. open a GDB server and client.''' 846 server_proc = self.popen_ignore_int(server, **kwargs) 847 try: 848 self.run_client(client, **kwargs) 849 finally: 850 server_proc.terminate() 851 server_proc.wait() 852 853 def run_client(self, client, **kwargs): 854 '''Run a client that handles SIGINT.''' 855 previous = signal.signal(signal.SIGINT, signal.SIG_IGN) 856 try: 857 self.check_call(client, **kwargs) 858 finally: 859 signal.signal(signal.SIGINT, previous) 860 861 def _log_cmd(self, cmd: list[str]): 862 escaped = ' '.join(shlex.quote(s) for s in cmd) 863 if not _DRY_RUN: 864 self.logger.debug(escaped) 865 else: 866 self.logger.info(escaped) 867 868 def call(self, cmd: list[str], **kwargs) -> int: 869 '''Subclass subprocess.call() wrapper. 870 871 Subclasses should use this method to run command in a 872 subprocess and get its return code, rather than 873 using subprocess directly, to keep accurate debug logs. 874 ''' 875 self._log_cmd(cmd) 876 if _DRY_RUN: 877 return 0 878 return subprocess.call(cmd, **kwargs) 879 880 def check_call(self, cmd: list[str], **kwargs): 881 '''Subclass subprocess.check_call() wrapper. 882 883 Subclasses should use this method to run command in a 884 subprocess and check that it executed correctly, rather than 885 using subprocess directly, to keep accurate debug logs. 886 ''' 887 self._log_cmd(cmd) 888 if _DRY_RUN: 889 return 890 subprocess.check_call(cmd, **kwargs) 891 892 def check_output(self, cmd: list[str], **kwargs) -> bytes: 893 '''Subclass subprocess.check_output() wrapper. 894 895 Subclasses should use this method to run command in a 896 subprocess and check that it executed correctly, rather than 897 using subprocess directly, to keep accurate debug logs. 898 ''' 899 self._log_cmd(cmd) 900 if _DRY_RUN: 901 return b'' 902 return subprocess.check_output(cmd, **kwargs) 903 904 def popen_ignore_int(self, cmd: list[str], **kwargs) -> subprocess.Popen: 905 '''Spawn a child command, ensuring it ignores SIGINT. 906 907 The returned subprocess.Popen object must be manually terminated.''' 908 cflags = 0 909 preexec = None 910 system = platform.system() 911 912 if system == 'Windows': 913 # We can't type check this line on Unix operating systems: 914 # mypy thinks the subprocess module has no such attribute. 915 cflags |= subprocess.CREATE_NEW_PROCESS_GROUP # type: ignore 916 elif system in {'Linux', 'Darwin'}: 917 # We can't type check this on Windows for the same reason. 918 preexec = os.setsid # type: ignore 919 920 self._log_cmd(cmd) 921 if _DRY_RUN: 922 return _DebugDummyPopen() # type: ignore 923 924 return subprocess.Popen(cmd, creationflags=cflags, preexec_fn=preexec, **kwargs) 925 926 def ensure_output(self, output_type: str) -> None: 927 '''Ensure self.cfg has a particular output artifact. 928 929 For example, ensure_output('bin') ensures that self.cfg.bin_file 930 refers to an existing file. Errors out if it's missing or undefined. 931 932 :param output_type: string naming the output type 933 ''' 934 output_file = getattr(self.cfg, f'{output_type}_file', None) 935 936 if output_file is None: 937 err = f'{output_type} file location is unknown.' 938 elif not os.path.isfile(output_file): 939 err = f'{output_file} does not exist.' 940 else: 941 return 942 943 if output_type in ('elf', 'hex', 'bin', 'uf2'): 944 err += f' Try enabling CONFIG_BUILD_OUTPUT_{output_type.upper()}.' 945 946 # RuntimeError avoids a stack trace saved in run_common. 947 raise RuntimeError(err) 948 949 def run_telnet_client(self, host: str, port: int, active_sock=None) -> None: 950 ''' 951 Run a telnet client for user interaction. 952 ''' 953 # If the caller passed in an active socket, use that 954 if active_sock is not None: 955 sock = active_sock 956 elif shutil.which('nc') is not None: 957 # If a `nc` command is available, run it, as it will provide the 958 # best support for CONFIG_SHELL_VT100_COMMANDS etc. 959 client_cmd = ['nc', host, str(port)] 960 # Note: netcat (nc) does not handle sigint, so cannot use run_client() 961 self.check_call(client_cmd) 962 return 963 else: 964 # Start a new socket connection 965 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 966 sock.connect((host, port)) 967 968 # Otherwise, use a pure python implementation. This will work well for logging, 969 # but input is line based only. 970 sel = selectors.DefaultSelector() 971 sel.register(sys.stdin, selectors.EVENT_READ) 972 sel.register(sock, selectors.EVENT_READ) 973 while True: 974 events = sel.select() 975 for key, _ in events: 976 if key.fileobj == sys.stdin: 977 text = sys.stdin.readline() 978 if text: 979 sock.send(text.encode()) 980 981 elif key.fileobj == sock: 982 resp = sock.recv(2048) 983 if resp: 984 print(resp.decode(), end='') 985