1# Copyright (c) 2023 Nordic Semiconductor ASA.
2#
3# SPDX-License-Identifier: Apache-2.0
4
5'''Runner for flashing with nrfutil.'''
6
7import json
8import subprocess
9import sys
10from pathlib import Path
11
12from runners.core import _DRY_RUN
13from runners.nrf_common import NrfBinaryRunner
14
15
16class NrfUtilBinaryRunner(NrfBinaryRunner):
17    '''Runner front-end for nrfutil.'''
18
19    def __init__(self, cfg, family, softreset, pinreset, dev_id, erase=False,
20                 erase_mode=None, ext_erase_mode=None, reset=True, tool_opt=None,
21                 force=False, recover=False, suit_starter=False,
22                 ext_mem_config_file=None):
23
24        super().__init__(cfg, family, softreset, pinreset, dev_id, erase,
25                         erase_mode, ext_erase_mode, reset, tool_opt, force,
26                         recover)
27
28        self.suit_starter = suit_starter
29        self.ext_mem_config_file = ext_mem_config_file
30
31        self._ops = []
32        self._op_id = 1
33
34    @classmethod
35    def name(cls):
36        return 'nrfutil'
37
38    @classmethod
39    def capabilities(cls):
40        return NrfBinaryRunner._capabilities(mult_dev_ids=True)
41
42    @classmethod
43    def dev_id_help(cls) -> str:
44        return NrfBinaryRunner._dev_id_help() + \
45               '''.\n This option can be specified multiple times'''
46
47    @classmethod
48    def tool_opt_help(cls) -> str:
49        return 'Additional options for nrfutil, e.g. "--log-level"'
50
51    @classmethod
52    def do_create(cls, cfg, args):
53        return NrfUtilBinaryRunner(cfg, args.nrf_family, args.softreset,
54                                   args.pinreset, args.dev_id, erase=args.erase,
55                                   erase_mode=args.erase_mode,
56                                   ext_erase_mode=args.ext_erase_mode,
57                                   reset=args.reset, tool_opt=args.tool_opt,
58                                   force=args.force, recover=args.recover,
59                                   suit_starter=args.suit_manifest_starter,
60                                   ext_mem_config_file=args.ext_mem_config_file)
61
62    @classmethod
63    def do_add_parser(cls, parser):
64        super().do_add_parser(parser)
65        parser.add_argument('--suit-manifest-starter', required=False,
66                            action='store_true',
67                            help='Use the SUIT manifest starter file')
68        parser.add_argument('--ext-mem-config-file', required=False,
69                            dest='ext_mem_config_file',
70                            help='path to an JSON file with external memory configuration')
71
72    def _exec(self, args):
73        jout_all = []
74
75        cmd = ['nrfutil', '--json', 'device'] + args
76        self._log_cmd(cmd)
77
78        if _DRY_RUN:
79            return {}
80
81        with subprocess.Popen(cmd, stdout=subprocess.PIPE) as p:
82            for line in iter(p.stdout.readline, b''):
83                # https://github.com/ndjson/ndjson-spec
84                jout = json.loads(line.decode(sys.getdefaultencoding()))
85                jout_all.append(jout)
86
87                if 'x-execute-batch' in args:
88                    if jout['type'] == 'batch_update':
89                        pld = jout['data']['data']
90                        if (
91                            pld['type'] == 'task_progress' and
92                            pld['data']['progress']['progressPercentage'] == 0
93                        ):
94                            self.logger.info(pld['data']['progress']['description'])
95                    elif jout['type'] == 'batch_end' and jout['data']['error']:
96                        raise subprocess.CalledProcessError(
97                            jout['data']['error']['code'], cmd
98                        )
99        if p.returncode != 0:
100            raise subprocess.CalledProcessError(p.returncode, cmd)
101
102        return jout_all
103
104    def do_get_boards(self):
105        out = self._exec(['list'])
106        devs = []
107        for o in out:
108            if o['type'] == 'task_end':
109                devs = o['data']['data']['devices']
110        snrs = [dev['serialNumber'] for dev in devs if dev['traits']['jlink']]
111
112        self.logger.debug(f'Found boards: {snrs}')
113        return snrs
114
115    def do_require(self):
116        self.require('nrfutil')
117
118    def _insert_op(self, op):
119        op['operationId'] = f'{self._op_id}'
120        self._op_id += 1
121        self._ops.append(op)
122
123    def _format_dev_ids(self):
124        if isinstance(self.dev_id, list):
125            return ','.join(self.dev_id)
126        else:
127            return self.dev_id
128
129    def _append_batch(self, op, json_file):
130        _op = op['operation']
131        op_type = _op['type']
132
133        cmd = [f'{op_type}']
134
135        if op_type == 'program':
136            cmd += ['--firmware', _op['firmware']['file']]
137            opts = _op['options']
138            # populate the options
139            cmd.append('--options')
140            cli_opts = f"chip_erase_mode={opts['chip_erase_mode']}"
141            if opts.get('ext_mem_erase_mode'):
142                cli_opts += f",ext_mem_erase_mode={opts['ext_mem_erase_mode']}"
143            if opts.get('verify'):
144                cli_opts += f",verify={opts['verify']}"
145            cmd.append(cli_opts)
146        elif op_type == 'reset':
147            cmd += ['--reset-kind', _op['kind']]
148        elif op_type == 'erase':
149            cmd.append(f'--{_op["kind"]}')
150        elif op_type == 'x-provision-keys':
151            cmd += ['--key-file', _op['keyfile']]
152
153        cmd += ['--core', op['core']] if op.get('core') else []
154        cmd += ['--x-family', f'{self.family}']
155        cmd += ['--x-append-batch', f'{json_file}']
156        self._exec(cmd)
157
158    def _exec_batch(self):
159        # Use x-append-batch to get the JSON from nrfutil itself
160        json_file = Path(self.hex_).parent / 'generated_nrfutil_batch.json'
161        json_file.unlink(missing_ok=True)
162        for op in self._ops:
163            self._append_batch(op, json_file)
164
165        # reset first in case an exception is thrown
166        self._ops = []
167        self._op_id = 1
168        self.logger.debug(f'Executing batch in: {json_file}')
169        precmd = []
170        if self.ext_mem_config_file:
171            # This needs to be prepended, as it's a global option
172            precmd = ['--x-ext-mem-config-file', self.ext_mem_config_file]
173
174        self._exec(precmd + ['x-execute-batch', '--batch-path', f'{json_file}',
175                             '--serial-number', self._format_dev_ids()])
176
177    def do_exec_op(self, op, force=False):
178        self.logger.debug(f'Executing op: {op}')
179        if force:
180            if len(self._ops) != 0:
181                raise RuntimeError(f'Forced exec with {len(self._ops)} ops')
182            self._insert_op(op)
183            self._exec_batch()
184            return True
185        # Defer by default
186        return False
187
188    def flush_ops(self, force=True):
189        if not force:
190            return
191        while self.ops:
192            self._insert_op(self.ops.popleft())
193        self._exec_batch()
194