1# Copyright: (c) 2025, Intel Corporation 2# Author: Arkadiusz Cholewinski <arkadiuszx.cholewinski@intel.com> 3 4import numpy as np 5from scipy import signal 6 7 8def convert_acq_time(value): 9 """ 10 Converts an acquisition time value to a more readable format with units. 11 - Converts values to m (milli), u (micro), or leaves them as is for whole numbers. 12 13 :param value: The numeric value to convert. 14 :return: A tuple with the value and the unit as separate elements. 15 """ 16 if value < 1e-3: 17 # If the value is smaller than 1 millisecond (10^-3), express in micro (u) 18 return f"{value * 1e6:.0f}", "us" 19 elif value < 1: 20 # If the value is smaller than 1 but larger than or equal to 1 milli (10^-3) 21 return f"{value * 1e3:.0f}", "ms" 22 else: 23 # If the value is 1 or larger, express in seconds (s) 24 return f"{value:.0f}", "s" 25 26 27def calculate_rms(data): 28 """ 29 Calculate the Root Mean Square (RMS) of a given data array. 30 31 :param data: List or numpy array containing the data 32 :return: RMS value 33 """ 34 # Convert to a numpy array for easier mathematical operations 35 data_array = np.array(data, dtype=np.float64) # Convert to float64 to avoid type issues 36 37 # Calculate the RMS value 38 rms = np.sqrt(np.mean(np.square(data_array))) 39 return rms 40 41 42def bytes_to_twobyte_values(data): 43 value = int.from_bytes(data[0], 'big') << 8 | int.from_bytes(data[1], 'big') 44 return value 45 46 47def convert_to_amps(value): 48 """ 49 Convert amps to watts 50 """ 51 amps = (value & 4095) * (16 ** (0 - (value >> 12))) 52 return amps 53 54 55def convert_to_scientific_notation(time: int, unit: str) -> str: 56 """ 57 Converts time to scientific notation based on the provided unit. 58 :param time: The time value to convert. 59 :param unit: The unit of the time ('us', 'ms', or 's'). 60 :return: A string representing the time in scientific notation. 61 """ 62 if unit == 'us': # microseconds 63 return f"{time}-6" 64 elif unit == 'ms': # milliseconds 65 return f"{time}-3" 66 elif unit == 's': # seconds 67 return f"{time}" 68 else: 69 raise ValueError("Invalid unit. Use 'us', 'ms', or 's'.") 70 71 72def current_RMS(data, trim=100, num_peaks=1, peak_height=0.008, peak_distance=40, padding=40): 73 """ 74 Function to process a given data array, find peaks, split data into chunks, 75 and then compute the Root Mean Square (RMS) value for each chunk. The function 76 allows for excluding the first `trim` number of data points, specifies the 77 number of peaks to consider for chunking, and allows for configurable peak 78 height and peak distance for detecting peaks. 79 80 Args: 81 - data (list or numpy array): The input data array for RMS calculation. 82 - trim (int): The number of initial elements to exclude 83 from the data before processing (default is 100). 84 - num_peaks (int): The number of peaks to consider for splitting 85 the data into chunks (default is 1). 86 - peak_height (float): The minimum height of the peaks to consider 87 when detecting them (default is 0.008). 88 - peak_distance (int): The minimum distance (in samples) between 89 consecutive peaks (default is 40). 90 - padding (int): The padding to add around the detected peaks when 91 chunking the data (default is 40). 92 93 Returns: 94 - rms_values (list): A list of RMS values calculated 95 from the data chunks based on the detected peaks. 96 97 The function will exclude the first `trim` elements of the data and look 98 for peaks that meet the specified criteria (`peak_height` and `peak_distance`). 99 The data will be split into chunks around the detected peaks, and RMS values 100 will be computed for each chunk. 101 """ 102 103 # Optionally exclude the first x elements of the data 104 data = data[trim:] 105 106 # Convert the data to a list of floats for consistency 107 data = [float(x) for x in data] 108 109 # Find the peaks in the data using the `find_peaks` function 110 peaks = signal.find_peaks(data, distance=peak_distance, height=peak_height)[0] 111 112 # Check if we have enough peaks, otherwise raise an exception 113 if len(peaks) < num_peaks: 114 raise ValueError( 115 f"Not enough peaks detected. Expected at least {num_peaks}, but found {len(peaks)}." 116 ) 117 118 # Limit the number of peaks based on the `num_peaks` parameter 119 peaks = peaks[:num_peaks] 120 121 # Add the start (index 0) and end (index of the last data point) to the list of peak indices 122 indices = np.concatenate(([0], np.array(peaks), [len(data)])) 123 124 # Split the data into chunks based on the peak indices 125 # with padding of 'padding' elements at both ends 126 split_data = [] 127 for i in range(len(indices) - 1): 128 start_idx = indices[i] + padding 129 end_idx = indices[i + 1] - padding 130 split_data.append(data[start_idx:end_idx]) 131 132 # Function to calculate RMS for a given list of data chunks 133 def calculate_rms(chunks): 134 """ 135 Helper function to compute RMS values for each data chunk. 136 137 Args: 138 - chunks (list): A list of data chunks. 139 140 Returns: 141 - rms (list): A list of RMS values, one for each data chunk. 142 """ 143 rms = [] 144 for chunk in chunks: 145 # Calculate RMS by taking the square root of the mean of squared values 146 rms_value = np.sqrt(np.mean(np.square(chunk))) 147 rms.append(rms_value) 148 return rms 149 150 # Calculate RMS for each chunk of data 151 rms_values = calculate_rms(split_data) 152 153 # Return the calculated RMS values 154 return rms_values 155