1# Copyright: (c)  2025, Intel Corporation
2# Author: Arkadiusz Cholewinski <arkadiuszx.cholewinski@intel.com>
3
4import logging
5import re
6
7import serial
8
9
10class SerialHandler:
11    def __init__(self, port: str, baudrate: int = 9600, timeout: float = 1.0):
12        """
13        Initializes the class for serial communication.
14
15        :param port: The serial port name (e.g., 'COM1', '/dev/ttyUSB0').
16        :param baudrate: The baud rate for the connection (default is 9600).
17        :param timeout: The timeout for read operations in seconds (default is 1.0).
18        """
19        self.port = port
20        self.baudrate = baudrate
21        self.timeout = timeout
22        self.serial_connection = None
23
24    def open(self):
25        """
26        Opens the serial connection.
27        """
28        if self.serial_connection is None:
29            try:
30                self.serial_connection = serial.Serial(
31                    self.port, self.baudrate, timeout=self.timeout
32                )
33                logging.info(
34                    "Connection to %s at %d baud opened successfully.", self.port, self.baudrate
35                )
36            except serial.SerialException as e:
37                logging.error("Error opening serial port %s: %s", self.port, str(e))
38                self.serial_connection = None
39                raise
40
41    def close(self):
42        """Closes the serial connection."""
43        if self.serial_connection and self.serial_connection.is_open:
44            self.serial_connection.close()
45            logging.info("Serial connection closed.")
46
47    def send_cmd(self, cmd: str):
48        """
49        Sends a command to the serial device with a newline, and prints it.
50
51        :param cmd: The command to be sent.
52        """
53        if self.serial_connection and self.serial_connection.is_open:
54            try:
55                self.serial_connection.write((cmd + "\r\n").encode('ascii'))
56            except serial.SerialException as e:
57                logging.error(f"Error sending command: {e}")
58
59    def read_bytes(self, count: int):
60        if self.serial_connection:
61            x = self.serial_connection.read(count)
62            return x
63
64    def receive_cmd(self) -> str:
65        """
66        Reads data from the serial device until no more data is available.
67
68        :return: The processed received data as a string.
69        """
70        output = ""
71        if self.serial_connection and self.serial_connection.is_open:
72            while True:
73                x = self.serial_connection.read()
74                if len(x) < 1 or x == 0xF0:
75                    return re.sub(r'(\0|\r|\n\n\n)', '', output).strip()
76                output += str(x, encoding='ascii', errors='ignore')
77
78    def is_open(self) -> bool:
79        """Checks if the connection is open."""
80        return self.serial_connection and self.serial_connection.is_open
81