1# Copyright 2023, 2025 NXP
2# SPDX-License-Identifier: Apache-2.0
3"""
4Runner for NXP S32 Debug Probe.
5"""
6
7import argparse
8import os
9import platform
10import re
11import shlex
12import subprocess
13import sys
14import tempfile
15from dataclasses import dataclass
16from pathlib import Path
17
18from runners.core import BuildConfiguration, RunnerCaps, RunnerConfig, ZephyrBinaryRunner
19
20NXP_S32DBG_USB_VID = 0x1FC9
21NXP_S32DBG_USB_PID = 0x014D
22
23@dataclass
24class NXPS32DebugProbeConfig:
25    """NXP S32 Debug Probe configuration parameters."""
26    conn_str: str = 's32dbg'
27    server_port: int = 45000
28    speed: int = 16000
29    remote_timeout: int = 30
30    reset_type: str | None = 'default'
31    reset_delay: int = 0
32
33
34class NXPS32DebugProbeRunner(ZephyrBinaryRunner):
35    """Runner front-end for NXP S32 Debug Probe."""
36
37    def __init__(self,
38                 runner_cfg: RunnerConfig,
39                 probe_cfg: NXPS32DebugProbeConfig,
40                 core_name: str,
41                 soc_name: str,
42                 soc_family_name: str,
43                 start_all_cores: bool,
44                 s32ds_path: str | None = None,
45                 tool_opt: list[str] | None = None) -> None:
46        super().__init__(runner_cfg)
47        self.elf_file: str = runner_cfg.elf_file or ''
48        self.probe_cfg: NXPS32DebugProbeConfig = probe_cfg
49        self.core_name: str = core_name
50        self.soc_name: str = soc_name
51        self.soc_family_name: str = soc_family_name
52        self.start_all_cores: bool = start_all_cores
53        self.s32ds_path_override: str | None = s32ds_path
54
55        self.tool_opt: list[str] = []
56        if tool_opt:
57            for opt in tool_opt:
58                self.tool_opt.extend(shlex.split(opt))
59
60        build_cfg = BuildConfiguration(runner_cfg.build_dir)
61        self.arch = build_cfg.get('CONFIG_ARCH').replace('"', '')
62
63    @classmethod
64    def name(cls) -> str:
65        return 'nxp_s32dbg'
66
67    @classmethod
68    def capabilities(cls) -> RunnerCaps:
69        return RunnerCaps(commands={'debug', 'debugserver', 'attach'},
70                          dev_id=True, tool_opt=True)
71
72    @classmethod
73    def dev_id_help(cls) -> str:
74        return '''Debug probe connection string as in "s32dbg[:<address>]"
75                  where <address> can be the IP address if TAP is available via Ethernet,
76                  the serial ID of the probe or empty if TAP is available via USB.'''
77
78    @classmethod
79    def tool_opt_help(cls) -> str:
80        return '''Additional options for GDB client when used with "debug" or "attach" commands
81                  or for GTA server when used with "debugserver" command.'''
82
83    @classmethod
84    def do_add_parser(cls, parser: argparse.ArgumentParser) -> None:
85        parser.add_argument('--core-name',
86                            required=True,
87                            help='Core name as supported by the debug probe (e.g. "R52_0_0")')
88        parser.add_argument('--soc-name',
89                            required=True,
90                            help='SoC name as supported by the debug probe (e.g. "S32Z270")')
91        parser.add_argument('--soc-family-name',
92                            required=True,
93                            help='SoC family name as supported by the debug probe (e.g. "s32z2e2")')
94        parser.add_argument('--start-all-cores',
95                            action='store_true',
96                            help='Start all SoC cores and not just the one being debugged. '
97                                 'Use together with "debug" command.')
98        parser.add_argument('--s32ds-path',
99                            help='Override the path to NXP S32 Design Studio installation. '
100                                 'By default, this runner will try to obtain it from the system '
101                                 'path, if available.')
102        parser.add_argument('--server-port',
103                            default=NXPS32DebugProbeConfig.server_port,
104                            type=int,
105                            help='GTA server port')
106        parser.add_argument('--speed',
107                            default=NXPS32DebugProbeConfig.speed,
108                            type=int,
109                            help='JTAG interface speed')
110        parser.add_argument('--remote-timeout',
111                            default=NXPS32DebugProbeConfig.remote_timeout,
112                            type=int,
113                            help='Number of seconds to wait for the remote target responses')
114
115    @classmethod
116    def do_create(cls, cfg: RunnerConfig, args: argparse.Namespace) -> 'NXPS32DebugProbeRunner':
117        probe_cfg = NXPS32DebugProbeConfig(args.dev_id,
118                                           server_port=args.server_port,
119                                           speed=args.speed,
120                                           remote_timeout=args.remote_timeout)
121
122        return NXPS32DebugProbeRunner(cfg, probe_cfg, args.core_name, args.soc_name,
123                                      args.soc_family_name, args.start_all_cores,
124                                      s32ds_path=args.s32ds_path, tool_opt=args.tool_opt)
125
126    @staticmethod
127    def find_usb_probes() -> list[str]:
128        """Return a list of debug probe serial numbers connected via USB to this host."""
129        # use system's native commands to enumerate and retrieve the USB serial ID
130        # to avoid bloating this runner with third-party dependencies that often
131        # require priviledged permissions to access the device info
132        macaddr_pattern = r'(?:[0-9a-f]{2}[:]){5}[0-9a-f]{2}'
133        if platform.system() == 'Windows':
134            cmd = 'pnputil /enum-devices /connected'
135            serialid_pattern = 'instance id: +usb\\\\' \
136                               f'VID_{NXP_S32DBG_USB_VID:04X}&PID_{NXP_S32DBG_USB_PID:04X}\\\\' \
137                               f'({macaddr_pattern})'.lower()
138        else:
139            cmd = f'lsusb -v -d {NXP_S32DBG_USB_VID:x}:{NXP_S32DBG_USB_PID:x}'
140            serialid_pattern = f'iserial +.*({macaddr_pattern})'
141
142        try:
143            outb = subprocess.check_output(shlex.split(cmd), stderr=subprocess.DEVNULL)
144            out = outb.decode('utf-8').strip().lower()
145        except subprocess.CalledProcessError as err:
146            raise RuntimeError('error while looking for debug probes connected') from err
147
148        devices: list[str] = []
149        if out and 'no devices were found' not in out:
150            devices = re.findall(serialid_pattern, out)
151
152        return sorted(devices)
153
154    @classmethod
155    def select_probe(cls) -> str:
156        """
157        Find debugger probes connected and return the serial number of the one selected.
158
159        If there are multiple debugger probes connected and this runner is being executed
160        in a interactive prompt, ask the user to select one of the probes.
161        """
162        probes_snr = cls.find_usb_probes()
163        if not probes_snr:
164            raise RuntimeError('there are no debug probes connected')
165        elif len(probes_snr) == 1:
166            return probes_snr[0]
167        else:
168            if not sys.stdin.isatty():
169                raise RuntimeError(
170                    f'refusing to guess which of {len(probes_snr)} connected probes to use '
171                    '(Interactive prompts disabled since standard input is not a terminal). '
172                    'Please specify a device ID on the command line.')
173
174            print('There are multiple debug probes connected')
175            for i, probe in enumerate(probes_snr, 1):
176                print(f'{i}. {probe}')
177
178            prompt = f'Please select one with desired serial number (1-{len(probes_snr)}): '
179            while True:
180                try:
181                    value: int = int(input(prompt))
182                except EOFError:
183                    sys.exit(0)
184                except ValueError:
185                    continue
186                if 1 <= value <= len(probes_snr):
187                    break
188            return probes_snr[value - 1]
189
190    @property
191    def runtime_environment(self) -> dict[str, str] | None:
192        """Execution environment used for the client process."""
193        if platform.system() == 'Windows':
194            python_lib = (self.s32ds_path / 'S32DS' / 'build_tools' / 'msys32'
195                / 'mingw64' / 'lib' / 'python3.10')
196            return {
197                **os.environ,
198                'PYTHONPATH': f'{python_lib}{os.pathsep}{python_lib / "site-packages"}{os.pathsep}'
199                              f'{python_lib / "lib-dynload"}'
200            }
201
202        return None
203
204    @property
205    def script_globals(self) -> dict[str, str | int | None]:
206        """Global variables required by the debugger scripts."""
207        return {
208            '_PROBE_IP': self.probe_cfg.conn_str,
209            '_JTAG_SPEED': self.probe_cfg.speed,
210            '_GDB_SERVER_PORT': self.probe_cfg.server_port,
211            '_RESET_TYPE': self.probe_cfg.reset_type,
212            '_RESET_DELAY': self.probe_cfg.reset_delay,
213            '_REMOTE_TIMEOUT': self.probe_cfg.remote_timeout,
214            '_CORE_NAME': f'{self.soc_name}_{self.core_name}',
215            '_SOC_NAME': self.soc_name,
216            '_IS_LOGGING_ENABLED': False,
217            '_FLASH_NAME': None,    # not supported
218            '_SECURE_TYPE': None,   # not supported
219            '_SECURE_KEY': None,    # not supported
220        }
221
222    def server_commands(self) -> list[str]:
223        """Get launch commands to start the GTA server."""
224        server_exec = str(self.s32ds_path / 'S32DS' / 'tools' / 'S32Debugger'
225                          / 'Debugger' / 'Server' / 'gta' / 'gta')
226        cmd = [server_exec, '-p', str(self.probe_cfg.server_port)]
227        return cmd
228
229    def client_commands(self) -> list[str]:
230        """Get launch commands to start the GDB client."""
231        if self.arch == 'arm':
232            client_exec_name = 'arm-none-eabi-gdb-py'
233        elif self.arch == 'arm64':
234            client_exec_name = 'aarch64-none-elf-gdb-py'
235        else:
236            raise RuntimeError(f'architecture {self.arch} not supported')
237
238        client_exec = str(self.s32ds_path / 'S32DS' / 'tools' / 'gdb-arm'
239                          / 'arm32-eabi' / 'bin' / client_exec_name)
240        cmd = [client_exec]
241        return cmd
242
243    def get_script(self, name: str) -> Path:
244        """
245        Get the file path of a debugger script with the given name.
246
247        :param name: name of the script, without the SoC family name prefix
248        :returns: path to the script
249        :raises RuntimeError: if file does not exist
250        """
251        script = (self.s32ds_path / 'S32DS' / 'tools' / 'S32Debugger' / 'Debugger' / 'scripts'
252                  / self.soc_family_name / f'{self.soc_family_name}_{name}.py')
253        if not script.exists():
254            raise RuntimeError(f'script not found: {script}')
255        return script
256
257    def do_run(self, command: str, **kwargs) -> None:
258        """
259        Execute the given command.
260
261        :param command: command name to execute
262        :raises RuntimeError: if target architecture or host OS is not supported
263        :raises MissingProgram: if required tools are not found in the host
264        """
265        if platform.system() not in ('Windows', 'Linux'):
266            raise RuntimeError(f'runner not supported on {platform.system()} systems')
267
268        if self.arch not in ('arm', 'arm64'):
269            raise RuntimeError(f'architecture {self.arch} not supported')
270
271        app_name = 's32ds' if platform.system() == 'Windows' else 's32ds.sh'
272        self.s32ds_path = Path(self.require(app_name, path=self.s32ds_path_override)).parent
273
274        if not self.probe_cfg.conn_str:
275            self.probe_cfg.conn_str = f's32dbg:{self.select_probe()}'
276            self.logger.info(f'using debug probe {self.probe_cfg.conn_str}')
277
278        if command in ('attach', 'debug'):
279            self.ensure_output('elf')
280            self.do_attach_debug(command, **kwargs)
281        else:
282            self.do_debugserver(**kwargs)
283
284    def do_attach_debug(self, command: str, **kwargs) -> None:
285        """
286        Launch the GTA server and GDB client to start a debugging session.
287
288        :param command: command name to execute
289        """
290        gdb_script: list[str] = []
291
292        # setup global variables required for the scripts before sourcing them
293        for name, val in self.script_globals.items():
294            gdb_script.append(f'py {name} = {repr(val)}')
295
296        # load platform-specific debugger script
297        if command == 'debug':
298            if self.start_all_cores:
299                startup_script = self.get_script('generic_bareboard_all_cores')
300            else:
301                startup_script = self.get_script('generic_bareboard')
302        else:
303            startup_script = self.get_script('attach')
304        gdb_script.append(f'source {startup_script}')
305
306        # executes the SoC and board initialization sequence
307        if command == 'debug':
308            gdb_script.append('py board_init()')
309
310        # initializes the debugger connection to the core specified
311        gdb_script.append('py core_init()')
312
313        gdb_script.append(f'file {Path(self.elf_file).as_posix()}')
314        if command == 'debug':
315            gdb_script.append('load')
316
317        with tempfile.TemporaryDirectory(suffix='nxp_s32dbg') as tmpdir:
318            gdb_cmds = Path(tmpdir) / 'runner.nxp_s32dbg'
319            gdb_cmds.write_text('\n'.join(gdb_script), encoding='utf-8')
320            self.logger.debug(gdb_cmds.read_text(encoding='utf-8'))
321
322            server_cmd = self.server_commands()
323            client_cmd = self.client_commands()
324            client_cmd.extend(['-x', gdb_cmds.as_posix()])
325            client_cmd.extend(self.tool_opt)
326
327            self.run_server_and_client(server_cmd, client_cmd, env=self.runtime_environment)
328
329    def do_debugserver(self, **kwargs) -> None:
330        """Start the GTA server on a given port with the given extra parameters from cli."""
331        server_cmd = self.server_commands()
332        server_cmd.extend(self.tool_opt)
333        self.check_call(server_cmd)
334