1#
2# Copyright (c) 2024 Nordic Semiconductor ASA
3#
4# SPDX-License-Identifier: Apache-2.0
5#
6
7import logging
8import re
9import subprocess
10from dataclasses import dataclass
11from pathlib import Path
12from time import sleep
13
14import psutil
15from twister_harness import DeviceAdapter
16
17logger = logging.getLogger(__name__)
18
19SB_CONFIG_APP_CPUPPR_RUN = None
20SB_CONFIG_APP_CPUFLPR_RUN = None
21
22# See definition of stm_m_id[] and stm_m_name[] in
23# https://github.com/zephyrproject-rtos/zephyr/blob/main/drivers/misc/coresight/nrf_etr.c
24STM_M_ID = {
25    "sec": 33,
26    "app": 34,
27    "rad": 35,
28    "mod": 36,
29    "sys": 44,
30    "flpr": 45,
31    "ppr": 46,
32    "hw": 128,
33}
34
35
36@dataclass
37class STMLimits:
38    log_0_arg: float | None
39    log_1_arg: float | None
40    log_2_arg: float | None
41    log_3_arg: float | None
42    log_str: float | None
43    tracepoint: float | None
44    tracepoint_d32: float | None
45    tolerance: float | None
46
47
48def analyse_autoconf(filepath: str) -> None:
49    global SB_CONFIG_APP_CPUPPR_RUN
50    global SB_CONFIG_APP_CPUFLPR_RUN
51
52    SB_CONFIG_APP_CPUPPR_RUN = False
53    SB_CONFIG_APP_CPUFLPR_RUN = False
54
55    # Parse contents of {BUILD_DIR}/_sysbuild/autoconf.h
56    with open(f"{filepath}", errors="ignore") as autoconf:
57        for line in autoconf:
58            if "SB_CONFIG_APP_CPUPPR_RUN 1" in line:
59                SB_CONFIG_APP_CPUPPR_RUN = True
60                continue
61            if "SB_CONFIG_APP_CPUFLPR_RUN 1" in line:
62                SB_CONFIG_APP_CPUFLPR_RUN = True
63    logger.debug(f"{SB_CONFIG_APP_CPUPPR_RUN=}")
64    logger.debug(f"{SB_CONFIG_APP_CPUFLPR_RUN=}")
65
66
67def check_console_output(output: str, core: str) -> None:
68    """
69    Use regular expressions to parse 'output' string.
70    Search for benchmark results related to 'core' coprocessor.
71    """
72
73    latency_msg_0_str = re.search(
74        rf"{core}: Timing for log message with 0 arguments: (.+)us", output
75    ).group(1)
76    assert latency_msg_0_str is not None, "Timing for log message with 0 arguments NOT found"
77
78    latency_msg_1_str = re.search(
79        rf"{core}: Timing for log message with 1 argument: (.+)us", output
80    ).group(1)
81    assert latency_msg_1_str is not None, "Timing for log message with 1 argument NOT found"
82
83    latency_msg_2_str = re.search(
84        rf"{core}: Timing for log message with 2 arguments: (.+)us", output
85    ).group(1)
86    assert latency_msg_2_str is not None, "Timing for log message with 2 arguments NOT found"
87
88    latency_msg_3_str = re.search(
89        rf"{core}: Timing for log message with 3 arguments: (.+)us", output
90    ).group(1)
91    assert latency_msg_3_str is not None, "Timing for log message with 3 arguments NOT found"
92
93    latency_msg_string_str = re.search(
94        rf"{core}: Timing for log_message with string: (.+)us", output
95    ).group(1)
96    assert latency_msg_string_str is not None, "Timing for log_message with string NOT found"
97
98    latency_tracepoint_str = re.search(
99        rf"{core}: Timing for tracepoint: (.+)us", output
100    ).group(1)
101    assert latency_tracepoint_str is not None, "Timing for tracepoint NOT found"
102
103    latency_tracepoint_d32_str = re.search(
104        rf"{core}: Timing for tracepoint_d32: (.+)us", output
105    ).group(1)
106    assert latency_tracepoint_d32_str is not None, "Timing for tracepoint_d32 NOT found"
107
108
109def check_benchmark_results(output: str, core: str, constraints: STMLimits) -> None:
110    """
111    Use regular expressions to parse 'output' string.
112    Search for benchmark results related to 'core' coprocessor.
113    Check that benchamrk results are lower than limits provided in 'constraints'.
114    """
115
116    cfg = {
117        "log message with 0 arguments": {
118            "regex": rf"{core}: Timing for log message with 0 arguments: (.+)us",
119            "expected": constraints.log_0_arg,
120        },
121        "log message with 1 argument": {
122            "regex": rf"{core}: Timing for log message with 1 argument: (.+)us",
123            "expected": constraints.log_1_arg,
124        },
125        "log message with 2 arguments": {
126            "regex": rf"{core}: Timing for log message with 2 arguments: (.+)us",
127            "expected": constraints.log_2_arg,
128        },
129        "log message with 3 arguments": {
130            "regex": rf"{core}: Timing for log message with 3 arguments: (.+)us",
131            "expected": constraints.log_3_arg,
132        },
133        "log message with string": {
134            "regex": rf"{core}: Timing for log_message with string: (.+)us",
135            "expected": constraints.log_str,
136        },
137        "tracepoint": {
138            "regex": rf"{core}: Timing for tracepoint: (.+)us",
139            "expected": constraints.tracepoint,
140        },
141        "tracepoint_d32": {
142            "regex": rf"{core}: Timing for tracepoint_d32: (.+)us",
143            "expected": constraints.tracepoint_d32,
144        },
145    }
146
147    for check in cfg:
148        observed_str = re.search(cfg[check]["regex"], output).group(1)
149        assert observed_str is not None, f"Timing for {check} NOT found"
150        # check value
151        observed = float(observed_str)
152        threshold = cfg[check]["expected"] * (1 + constraints.tolerance)
153        assert (
154            observed < threshold
155        ), f"{core}: Timing for {check} - {observed} us exceeds {threshold} us"
156
157
158# nrfutil starts children processes
159# when subprocess.terminate(nrfutil_process) is executed, only the parent terminates
160# this blocks serial port for other uses
161def _kill(proc):
162    try:
163        for child in psutil.Process(proc.pid).children(recursive=True):
164            child.kill()
165        proc.kill()
166    except Exception as e:
167        logger.exception(f'Could not kill nrfutil - {e}')
168
169
170def nrfutil_dictionary_from_serial(
171    dut: DeviceAdapter,
172    decoded_file_name: str = "output.txt",
173    collect_time: float = 60.0,
174) -> None:
175    UART_PATH = dut.device_config.serial
176    dut.close()
177
178    logger.debug(f"Using serial: {UART_PATH}")
179
180    try:
181        Path(f"{decoded_file_name}").unlink()
182        logger.info("Old output file was deleted")
183    except FileNotFoundError:
184        pass
185
186    # prepare database config string
187    BUILD_DIR = str(dut.device_config.build_dir)
188    logger.debug(f"{BUILD_DIR=}")
189    config_str = f"{STM_M_ID['app']}:{BUILD_DIR}/coresight_stm/zephyr/log_dictionary.json"
190    config_str += f",{STM_M_ID['rad']}:{BUILD_DIR}/remote_rad/zephyr/log_dictionary.json"
191    if SB_CONFIG_APP_CPUPPR_RUN:
192        config_str += f",{STM_M_ID['ppr']}:{BUILD_DIR}/remote_ppr/zephyr/log_dictionary.json"
193    if SB_CONFIG_APP_CPUFLPR_RUN:
194        config_str += f",{STM_M_ID['flpr']}:{BUILD_DIR}/remote_flpr/zephyr/log_dictionary.json"
195    logger.debug(f"{config_str=}")
196
197    cmd = (
198        "nrfutil trace stm --database-config "
199        f"{config_str} "
200        f"--input-serialport {UART_PATH} "
201        f"--output-ascii {decoded_file_name}"
202    )
203    try:
204        # run nrfutil trace in background non-blocking
205        logger.info(f"Executing:\n{cmd}")
206        proc = subprocess.Popen(cmd.split(), stdout=subprocess.DEVNULL)
207    except OSError as exc:
208        logger.error(f"Unable to start nrfutil trace:\n{cmd}\n{exc}")
209    try:
210        proc.wait(collect_time)
211    except subprocess.TimeoutExpired:
212        pass
213    finally:
214        _kill(proc)
215
216
217def test_sample_STM_decoded(dut: DeviceAdapter):
218    """
219    Run sample.boards.nrf.coresight_stm from samples/boards/nordic/coresight_stm sample.
220    Application, Radio, PPR and FLPR cores use STM for logging.
221    STM proxy (Application core) decodes logs from all cores.
222    After reset, coprocessors execute code in expected way and Application core
223    outputs STM traces on UART port.
224    """
225    BUILD_DIR = str(dut.device_config.build_dir)
226    autoconf_file = f"{BUILD_DIR}/_sysbuild/autoconf.h"
227
228    # nrf54h20 prints immediately after it is flashed.
229    # Wait a bit to skipp logs from previous test.
230    sleep(5)
231
232    # Get output from serial port
233    output = "\n".join(dut.readlines())
234
235    # set SB_CONFIG_APP_CPUPPR_RUN, SB_CONFIG_APP_CPUFLPR_RUN
236    analyse_autoconf(autoconf_file)
237
238    # check that LOGs from Application core are present
239    check_console_output(
240        output=output,
241        core='app',
242    )
243
244    # check that LOGs from Radio core are present
245    check_console_output(
246        output=output,
247        core='rad',
248    )
249
250    if SB_CONFIG_APP_CPUPPR_RUN:
251        # check that LOGs from PPR core are present
252        check_console_output(
253            output=output,
254            core='ppr',
255        )
256
257    if SB_CONFIG_APP_CPUFLPR_RUN:
258        # check that LOGs from FLPR core are present
259        check_console_output(
260            output=output,
261            core='flpr',
262        )
263
264
265def test_STM_decoded(dut: DeviceAdapter):
266    """
267    Run boards.nrf.coresight_stm from tests/boards/nrf/coresight_stm test suite.
268    Application, Radio, PPR and FLPR cores use STM for logging.
269    STM proxy (Application core) decodes logs from all cores.
270    After reset, coprocessors execute code in expected way and Application core
271    outputs STM traces on UART port.
272    """
273    BUILD_DIR = str(dut.device_config.build_dir)
274    autoconf_file = f"{BUILD_DIR}/_sysbuild/autoconf.h"
275
276    app_constraints = STMLimits(
277        # all values in us
278        log_0_arg=1.8,
279        log_1_arg=2.1,
280        log_2_arg=2.0,
281        log_3_arg=2.1,
282        log_str=4.5,
283        tracepoint=0.5,
284        tracepoint_d32=0.5,
285        tolerance=0.5,  # 50 %
286    )
287    rad_constraints = STMLimits(
288        # all values in us
289        log_0_arg=4.6,
290        log_1_arg=5.0,
291        log_2_arg=5.2,
292        log_3_arg=5.6,
293        log_str=6.3,
294        tracepoint=0.5,
295        tracepoint_d32=0.5,
296        tolerance=0.5,
297    )
298    ppr_constraints = STMLimits(
299        # all values in us
300        log_0_arg=25.7,
301        log_1_arg=27.1,
302        log_2_arg=27.3,
303        log_3_arg=30.4,
304        log_str=65.7,
305        tracepoint=0.55,
306        tracepoint_d32=0.25,
307        tolerance=0.5,
308    )
309    flpr_constraints = STMLimits(
310        # all values in us
311        log_0_arg=1.3,
312        log_1_arg=1.6,
313        log_2_arg=1.6,
314        log_3_arg=1.7,
315        log_str=3.0,
316        tracepoint=0.5,
317        tracepoint_d32=0.5,
318        tolerance=0.5,
319    )
320
321    # nrf54h20 prints immediately after it is flashed.
322    # Wait a bit to skipp logs from previous test.
323    sleep(5)
324
325    # Get output from serial port
326    output = "\n".join(dut.readlines())
327
328    # set SB_CONFIG_APP_CPUPPR_RUN, SB_CONFIG_APP_CPUFLPR_RUN
329    analyse_autoconf(autoconf_file)
330
331    # check results on Application core
332    check_benchmark_results(output=output, core='app', constraints=app_constraints)
333
334    # check results on Radio core
335    check_benchmark_results(output=output, core='rad', constraints=rad_constraints)
336
337    if SB_CONFIG_APP_CPUPPR_RUN:
338        # check results on PPR core
339        check_benchmark_results(output=output, core='ppr', constraints=ppr_constraints)
340
341    if SB_CONFIG_APP_CPUFLPR_RUN:
342        # check results on FLPR core
343        check_benchmark_results(output=output, core='flpr', constraints=flpr_constraints)
344
345
346def test_sample_STM_dictionary_mode(dut: DeviceAdapter):
347    """
348    Run sample.boards.nrf.coresight_stm.dict from samples/boards/nordic/coresight_stm sample.
349    Application, Radio, PPR and FLPR cores use STM for logging.
350    STM proxy (Application core) prints on serial port raw logs from all domains.
351    Nrfutil trace is used to decode STM logs.
352    After reset, coprocessors execute code in expected way and Application core
353    outputs STM traces on UART port.
354    """
355    BUILD_DIR = str(dut.device_config.build_dir)
356    test_filename = f"{BUILD_DIR}/coresight_stm_dictionary.txt"
357    autoconf_file = f"{BUILD_DIR}/_sysbuild/autoconf.h"
358    COLLECT_TIMEOUT = 10.0
359
360    # set SB_CONFIG_APP_CPUPPR_RUN, SB_CONFIG_APP_CPUFLPR_RUN
361    # this information is needed to build nrfutil database-config
362    analyse_autoconf(autoconf_file)
363
364    # use nrfutil trace to decode logs
365    nrfutil_dictionary_from_serial(
366        dut=dut,
367        decoded_file_name=f"{test_filename}",
368        collect_time=COLLECT_TIMEOUT,
369    )
370
371    # read decoded logs
372    with open(f"{test_filename}", errors="ignore") as decoded_file:
373        decoded_file_content = decoded_file.read()
374
375    # if nothing in decoded_file, stop test
376    assert(
377        len(decoded_file_content) > 0
378    ), f"File {test_filename} is empty"
379
380    # check that LOGs from Application core are present
381    check_console_output(
382        output=decoded_file_content,
383        core='app',
384    )
385
386    # check that LOGs from Radio core are present
387    check_console_output(
388        output=decoded_file_content,
389        core='rad',
390    )
391
392    if SB_CONFIG_APP_CPUPPR_RUN:
393        # check that LOGs from PPR core are present
394        check_console_output(
395            output=decoded_file_content,
396            core='ppr',
397        )
398
399    if SB_CONFIG_APP_CPUFLPR_RUN:
400        # check that LOGs from FLPR core are present
401        check_console_output(
402            output=decoded_file_content,
403            core='flpr',
404        )
405
406
407def test_STM_dictionary_mode(dut: DeviceAdapter):
408    """
409    Run boards.nrf.coresight_stm.dict from tests/boards/nrf/coresight_stm test suite.
410    Application, Radio, PPR and FLPR cores use STM for logging.
411    STM proxy (Application core) prints on serial port raw logs from all domains.
412    Nrfutil trace is used to decode STM logs.
413    After reset, coprocessors execute code in expected way and Application core
414    outputs STM traces on UART port.
415    """
416    BUILD_DIR = str(dut.device_config.build_dir)
417    test_filename = f"{BUILD_DIR}/coresight_stm_dictionary.txt"
418    autoconf_file = f"{BUILD_DIR}/_sysbuild/autoconf.h"
419    COLLECT_TIMEOUT = 10.0
420
421    app_constraints = STMLimits(
422        # all values in us
423        log_0_arg=0.7,
424        log_1_arg=0.8,
425        log_2_arg=0.8,
426        log_3_arg=1.3,
427        log_str=3.2,
428        tracepoint=0.5,
429        tracepoint_d32=0.5,
430        tolerance=0.5,  # 50 %
431    )
432    rad_constraints = STMLimits(
433        # all values in us
434        log_0_arg=0.8,
435        log_1_arg=0.9,
436        log_2_arg=1.0,
437        log_3_arg=1.5,
438        log_str=3.6,
439        tracepoint=0.5,
440        tracepoint_d32=0.5,
441        tolerance=0.5,
442    )
443    ppr_constraints = STMLimits(
444        # all values in us
445        log_0_arg=7.5,
446        log_1_arg=8.5,
447        log_2_arg=8.6,
448        log_3_arg=17.4,
449        log_str=45.2,
450        tracepoint=0.5,
451        tracepoint_d32=0.5,
452        tolerance=0.5,
453    )
454    flpr_constraints = STMLimits(
455        # all values in us
456        log_0_arg=0.3,
457        log_1_arg=0.4,
458        log_2_arg=0.5,
459        log_3_arg=0.8,
460        log_str=2.6,
461        tracepoint=0.5,
462        tracepoint_d32=0.5,
463        tolerance=0.5,  # 50 %
464    )
465
466    # set SB_CONFIG_APP_CPUPPR_RUN, SB_CONFIG_APP_CPUFLPR_RUN
467    # this information is needed to build nrfutil database-config
468    analyse_autoconf(autoconf_file)
469
470    # use nrfutil trace to decode logs
471    nrfutil_dictionary_from_serial(
472        dut=dut,
473        decoded_file_name=f"{test_filename}",
474        collect_time=COLLECT_TIMEOUT,
475    )
476
477    # read decoded logs
478    with open(f"{test_filename}", errors="ignore") as decoded_file:
479        decoded_file_content = decoded_file.read()
480
481    # if nothing in decoded_file, stop test
482    assert len(decoded_file_content) > 0, f"File {test_filename} is empty"
483
484    # check results on Application core
485    check_benchmark_results(output=decoded_file_content, core='app', constraints=app_constraints)
486
487    # check results on Radio core
488    check_benchmark_results(output=decoded_file_content, core='rad', constraints=rad_constraints)
489
490    if SB_CONFIG_APP_CPUPPR_RUN:
491        # check results on PPR core
492        check_benchmark_results(
493            output=decoded_file_content, core='ppr', constraints=ppr_constraints
494        )
495
496    if SB_CONFIG_APP_CPUFLPR_RUN:
497        # check results on FLPR core
498        check_benchmark_results(
499            output=decoded_file_content, core='flpr', constraints=flpr_constraints
500        )
501