1#!/usr/bin/env python3 2# 3# Copyright (c) 2024 Nordic Semiconductor ASA 4# 5# SPDX-License-Identifier: Apache-2.0 6 7""" 8Log Parser for Dictionary-based Logging 9 10This uses the JSON database file to decode the binary 11log data taken directly from input serialport and print 12the log messages. 13""" 14 15import argparse 16import contextlib 17import logging 18import os 19import select 20import sys 21 22import parserlib 23import serial 24 25LOGGER_FORMAT = "%(message)s" 26logger = logging.getLogger("parser") 27 28 29class SerialReader: 30 """Class to read data from serial port and parse it""" 31 32 def __init__(self, serial_port, baudrate): 33 self.serial_port = serial_port 34 self.baudrate = baudrate 35 self.serial = None 36 37 @contextlib.contextmanager 38 def open(self): 39 try: 40 self.serial = serial.Serial(self.serial_port, self.baudrate) 41 yield 42 finally: 43 self.serial.close() 44 45 def fileno(self): 46 return self.serial.fileno() 47 48 def read_non_blocking(self): 49 size = self.serial.in_waiting 50 return self.serial.read(size) 51 52 53class FileReader: 54 """Class to read data from serial port and parse it""" 55 56 def __init__(self, filepath): 57 self.filepath = filepath 58 self.file = None 59 60 @contextlib.contextmanager 61 def open(self): 62 if self.filepath is not None: 63 with open(self.filepath, 'rb') as f: 64 self.file = f 65 yield 66 else: 67 sys.stdin = os.fdopen(sys.stdin.fileno(), 'rb', 0) 68 self.file = sys.stdin 69 yield 70 71 def fileno(self): 72 return self.file.fileno() 73 74 def read_non_blocking(self): 75 # Read available data using a reasonable buffer size (without buffer size, this blocks 76 # forever, but with buffer size it returns even when less data than the buffer read was 77 # available). 78 return self.file.read(1024) 79 80 81def parse_args(): 82 """Parse command line arguments""" 83 parser = argparse.ArgumentParser(allow_abbrev=False) 84 85 parser.add_argument("dbfile", help="Dictionary Logging Database file") 86 parser.add_argument("--debug", action="store_true", help="Print extra debugging information") 87 88 # Create subparsers for different input modes 89 subparsers = parser.add_subparsers(dest="mode", required=True, help="Input source mode") 90 91 # Serial subparser 92 serial_parser = subparsers.add_parser("serial", help="Read from serial port") 93 serial_parser.add_argument("port", help="Serial port") 94 serial_parser.add_argument("baudrate", type=int, help="Baudrate") 95 96 # File subparser 97 file_parser = subparsers.add_parser("file", help="Read from file") 98 file_parser.add_argument( 99 "filepath", nargs="?", default=None, help="Input file path, leave empty for stdin" 100 ) 101 102 return parser.parse_args() 103 104 105def main(): 106 """function of serial parser""" 107 args = parse_args() 108 109 if args.dbfile is None or '.json' not in args.dbfile: 110 logger.error("ERROR: invalid log database path: %s, exiting...", args.dbfile) 111 sys.exit(1) 112 113 logging.basicConfig(format=LOGGER_FORMAT) 114 115 if args.debug: 116 logger.setLevel(logging.DEBUG) 117 else: 118 logger.setLevel(logging.INFO) 119 120 log_parser = parserlib.get_log_parser(args.dbfile, logger) 121 122 data = b'' 123 124 if args.mode == "serial": 125 reader = SerialReader(args.port, args.baudrate) 126 elif args.mode == "file": 127 reader = FileReader(args.filepath) 128 else: 129 raise ValueError("Invalid mode selected. Use 'serial' or 'file'.") 130 131 with reader.open(): 132 while True: 133 ready, _, _ = select.select([reader], [], []) 134 if ready: 135 data += reader.read_non_blocking() 136 parsed_data_offset = parserlib.parser(data, log_parser, logger) 137 data = data[parsed_data_offset:] 138 139 140if __name__ == "__main__": 141 main() 142