1# Copyright: (c) 2025, Intel Corporation 2# Author: Arkadiusz Cholewinski <arkadiuszx.cholewinski@intel.com> 3 4import logging 5import re 6 7import serial 8 9 10class SerialHandler: 11 def __init__(self, port: str, baudrate: int = 9600, timeout: float = 1.0): 12 """ 13 Initializes the class for serial communication. 14 15 :param port: The serial port name (e.g., 'COM1', '/dev/ttyUSB0'). 16 :param baudrate: The baud rate for the connection (default is 9600). 17 :param timeout: The timeout for read operations in seconds (default is 1.0). 18 """ 19 self.port = port 20 self.baudrate = baudrate 21 self.timeout = timeout 22 self.serial_connection = None 23 24 def open(self): 25 """ 26 Opens the serial connection. 27 """ 28 if self.serial_connection is None: 29 try: 30 self.serial_connection = serial.Serial( 31 self.port, self.baudrate, timeout=self.timeout 32 ) 33 logging.info( 34 "Connection to %s at %d baud opened successfully.", self.port, self.baudrate 35 ) 36 except serial.SerialException as e: 37 logging.error("Error opening serial port %s: %s", self.port, str(e)) 38 self.serial_connection = None 39 raise 40 41 def close(self): 42 """Closes the serial connection.""" 43 if self.serial_connection and self.serial_connection.is_open: 44 self.serial_connection.close() 45 logging.info("Serial connection closed.") 46 47 def send_cmd(self, cmd: str): 48 """ 49 Sends a command to the serial device with a newline, and prints it. 50 51 :param cmd: The command to be sent. 52 """ 53 if self.serial_connection and self.serial_connection.is_open: 54 try: 55 self.serial_connection.write((cmd + "\r\n").encode('ascii')) 56 except serial.SerialException as e: 57 logging.error(f"Error sending command: {e}") 58 59 def read_bytes(self, count: int): 60 if self.serial_connection: 61 x = self.serial_connection.read(count) 62 return x 63 64 def receive_cmd(self) -> str: 65 """ 66 Reads data from the serial device until no more data is available. 67 68 :return: The processed received data as a string. 69 """ 70 output = "" 71 if self.serial_connection and self.serial_connection.is_open: 72 while True: 73 x = self.serial_connection.read() 74 if len(x) < 1 or x == 0xF0: 75 return re.sub(r'(\0|\r|\n\n\n)', '', output).strip() 76 output += str(x, encoding='ascii', errors='ignore') 77 78 def is_open(self) -> bool: 79 """Checks if the connection is open.""" 80 return self.serial_connection and self.serial_connection.is_open 81