1# Copyright (c) 2023 Nordic Semiconductor ASA. 2# 3# SPDX-License-Identifier: Apache-2.0 4 5'''Runner for flashing with nrfutil.''' 6 7import json 8import subprocess 9import sys 10from pathlib import Path 11 12from runners.core import _DRY_RUN 13from runners.nrf_common import NrfBinaryRunner 14 15 16class NrfUtilBinaryRunner(NrfBinaryRunner): 17 '''Runner front-end for nrfutil.''' 18 19 def __init__(self, cfg, family, softreset, pinreset, dev_id, erase=False, 20 erase_mode=None, ext_erase_mode=None, reset=True, tool_opt=None, 21 force=False, recover=False, suit_starter=False, 22 ext_mem_config_file=None): 23 24 super().__init__(cfg, family, softreset, pinreset, dev_id, erase, 25 erase_mode, ext_erase_mode, reset, tool_opt, force, 26 recover) 27 28 self.suit_starter = suit_starter 29 self.ext_mem_config_file = ext_mem_config_file 30 31 self._ops = [] 32 self._op_id = 1 33 34 @classmethod 35 def name(cls): 36 return 'nrfutil' 37 38 @classmethod 39 def capabilities(cls): 40 return NrfBinaryRunner._capabilities(mult_dev_ids=True) 41 42 @classmethod 43 def dev_id_help(cls) -> str: 44 return NrfBinaryRunner._dev_id_help() + \ 45 '''.\n This option can be specified multiple times''' 46 47 @classmethod 48 def tool_opt_help(cls) -> str: 49 return 'Additional options for nrfutil, e.g. "--log-level"' 50 51 @classmethod 52 def do_create(cls, cfg, args): 53 return NrfUtilBinaryRunner(cfg, args.nrf_family, args.softreset, 54 args.pinreset, args.dev_id, erase=args.erase, 55 erase_mode=args.erase_mode, 56 ext_erase_mode=args.ext_erase_mode, 57 reset=args.reset, tool_opt=args.tool_opt, 58 force=args.force, recover=args.recover, 59 suit_starter=args.suit_manifest_starter, 60 ext_mem_config_file=args.ext_mem_config_file) 61 62 @classmethod 63 def do_add_parser(cls, parser): 64 super().do_add_parser(parser) 65 parser.add_argument('--suit-manifest-starter', required=False, 66 action='store_true', 67 help='Use the SUIT manifest starter file') 68 parser.add_argument('--ext-mem-config-file', required=False, 69 dest='ext_mem_config_file', 70 help='path to an JSON file with external memory configuration') 71 72 def _exec(self, args): 73 jout_all = [] 74 75 cmd = ['nrfutil', '--json', 'device'] + args 76 self._log_cmd(cmd) 77 78 if _DRY_RUN: 79 return {} 80 81 with subprocess.Popen(cmd, stdout=subprocess.PIPE) as p: 82 for line in iter(p.stdout.readline, b''): 83 # https://github.com/ndjson/ndjson-spec 84 jout = json.loads(line.decode(sys.getdefaultencoding())) 85 jout_all.append(jout) 86 87 if 'x-execute-batch' in args: 88 if jout['type'] == 'batch_update': 89 pld = jout['data']['data'] 90 if ( 91 pld['type'] == 'task_progress' and 92 pld['data']['progress']['progressPercentage'] == 0 93 ): 94 self.logger.info(pld['data']['progress']['description']) 95 elif jout['type'] == 'batch_end' and jout['data']['error']: 96 raise subprocess.CalledProcessError( 97 jout['data']['error']['code'], cmd 98 ) 99 if p.returncode != 0: 100 raise subprocess.CalledProcessError(p.returncode, cmd) 101 102 return jout_all 103 104 def do_get_boards(self): 105 out = self._exec(['list']) 106 devs = [] 107 for o in out: 108 if o['type'] == 'task_end': 109 devs = o['data']['data']['devices'] 110 snrs = [dev['serialNumber'] for dev in devs if dev['traits']['jlink']] 111 112 self.logger.debug(f'Found boards: {snrs}') 113 return snrs 114 115 def do_require(self): 116 self.require('nrfutil') 117 118 def _insert_op(self, op): 119 op['operationId'] = f'{self._op_id}' 120 self._op_id += 1 121 self._ops.append(op) 122 123 def _format_dev_ids(self): 124 if isinstance(self.dev_id, list): 125 return ','.join(self.dev_id) 126 else: 127 return self.dev_id 128 129 def _append_batch(self, op, json_file): 130 _op = op['operation'] 131 op_type = _op['type'] 132 133 cmd = [f'{op_type}'] 134 135 if op_type == 'program': 136 cmd += ['--firmware', _op['firmware']['file']] 137 opts = _op['options'] 138 # populate the options 139 cmd.append('--options') 140 cli_opts = f"chip_erase_mode={opts['chip_erase_mode']}" 141 if opts.get('ext_mem_erase_mode'): 142 cli_opts += f",ext_mem_erase_mode={opts['ext_mem_erase_mode']}" 143 if opts.get('verify'): 144 cli_opts += f",verify={opts['verify']}" 145 cmd.append(cli_opts) 146 elif op_type == 'reset': 147 cmd += ['--reset-kind', _op['kind']] 148 elif op_type == 'erase': 149 cmd.append(f'--{_op["kind"]}') 150 elif op_type == 'x-provision-keys': 151 cmd += ['--key-file', _op['keyfile']] 152 153 cmd += ['--core', op['core']] if op.get('core') else [] 154 cmd += ['--x-family', f'{self.family}'] 155 cmd += ['--x-append-batch', f'{json_file}'] 156 self._exec(cmd) 157 158 def _exec_batch(self): 159 # Use x-append-batch to get the JSON from nrfutil itself 160 json_file = Path(self.hex_).parent / 'generated_nrfutil_batch.json' 161 json_file.unlink(missing_ok=True) 162 for op in self._ops: 163 self._append_batch(op, json_file) 164 165 # reset first in case an exception is thrown 166 self._ops = [] 167 self._op_id = 1 168 self.logger.debug(f'Executing batch in: {json_file}') 169 precmd = [] 170 if self.ext_mem_config_file: 171 # This needs to be prepended, as it's a global option 172 precmd = ['--x-ext-mem-config-file', self.ext_mem_config_file] 173 174 self._exec(precmd + ['x-execute-batch', '--batch-path', f'{json_file}', 175 '--serial-number', self._format_dev_ids()]) 176 177 def do_exec_op(self, op, force=False): 178 self.logger.debug(f'Executing op: {op}') 179 if force: 180 if len(self._ops) != 0: 181 raise RuntimeError(f'Forced exec with {len(self._ops)} ops') 182 self._insert_op(op) 183 self._exec_batch() 184 return True 185 # Defer by default 186 return False 187 188 def flush_ops(self, force=True): 189 if not force: 190 return 191 while self.ops: 192 self._insert_op(self.ops.popleft()) 193 self._exec_batch() 194