1# Copyright (c) 2025 NXP 2# 3# SPDX-License-Identifier: Apache-2.0 4 5import hashlib 6import os 7import pickle 8from datetime import datetime 9 10import cv2 11import numpy as np 12 13from camera_shield.uvc_core.plugin_base import DetectionPlugin 14 15 16class VideoSignaturePlugin(DetectionPlugin): 17 """Plugin for generating and comparing video frame signatures and fingerprints""" 18 19 def __init__(self, name, config): 20 super().__init__(name, config) 21 self.fingerprint_cache = [] # Store video fingerprints 22 self.fingerprint = { 23 "method": config.get("method", "combined"), 24 "frame_signatures": [], 25 "metadata": config.get("metadata", {}), 26 } 27 self.frame_count = 0 28 self.operations = config.get("operations", "compare") 29 self.saved = False 30 self.result = [] 31 32 def initialize(self): 33 """Initialize detection resources""" 34 print("initialize") 35 if self.operations == "compare": 36 print("compare") 37 directory = self.config.get("directory", "./fingerprints") 38 if os.path.isdir(directory): 39 self.load_all_fingerprints(directory) 40 else: 41 print(f"{directory} not exist") 42 43 def generate_frame_signature(self, frame, method="combined") -> dict: 44 """Generate a robust signature for a given frame using multiple techniques 45 46 Args: 47 frame: The input frame 48 method: Signature method ('phash', 'dhash', 'histogram', 'combined') 49 50 Returns: 51 A dictionary containing signature data 52 """ 53 if frame is None: 54 return None 55 56 # Convert to grayscale for consistent processing 57 if len(frame.shape) == 3: 58 gray = cv2.cvtColor(frame, cv2.COLOR_RGB2GRAY) 59 else: 60 gray = frame.copy() 61 62 # Resize to standard size for consistent signatures 63 resized = cv2.resize(gray, (64, 64)) 64 65 signature = {} 66 67 # Perceptual Hash (pHash) 68 if method in ["phash", "combined"]: 69 # DCT transform 70 dct = cv2.dct(np.float32(resized)) 71 # Keep only the top-left 8x8 coefficients 72 dct_low = dct[:8, :8] 73 # Compute median value 74 med = np.median(dct_low) 75 # Convert to binary hash 76 phash = (dct_low > med).flatten().astype(int) 77 signature["phash"] = phash.tolist() 78 # Generate a compact hex representation 79 phash_hex = "".join( 80 [ 81 hex(int("".join(map(str, phash[i : i + 4])), 2))[2:] 82 for i in range(0, len(phash), 4) 83 ] 84 ) 85 signature["phash_hex"] = phash_hex 86 87 # Difference Hash (dHash) 88 if method in ["dhash", "combined"]: 89 # Resize to 9x8 (one pixel larger horizontally) 90 resized_dhash = cv2.resize(gray, (9, 8)) 91 # Compute differences horizontally 92 diff = resized_dhash[:, 1:] > resized_dhash[:, :-1] 93 # Flatten to 1D array 94 dhash = diff.flatten().astype(int) 95 signature["dhash"] = dhash.tolist() 96 # Generate a compact hex representation 97 dhash_hex = "".join( 98 [ 99 hex(int("".join(map(str, dhash[i : i + 4])), 2))[2:] 100 for i in range(0, len(dhash), 4) 101 ] 102 ) 103 signature["dhash_hex"] = dhash_hex 104 105 # Color histogram features 106 if method in ["histogram", "combined"]: 107 if len(frame.shape) == 3: # Color image 108 hist_features = [] 109 for i in range(3): # For each color channel 110 hist = cv2.calcHist([frame], [i], None, [16], [0, 256]) 111 hist = cv2.normalize(hist, hist).flatten() 112 hist_features.extend(hist) 113 signature["color_hist"] = [float(x) for x in hist_features] 114 else: # Grayscale 115 hist = cv2.calcHist([gray], [0], None, [32], [0, 256]) 116 hist = cv2.normalize(hist, hist).flatten() 117 signature["gray_hist"] = [float(x) for x in hist] 118 119 # Edge features 120 if method in ["combined"]: 121 # Canny edge detection 122 edges = cv2.Canny(resized, 100, 200) 123 # Count percentage of edge pixels 124 edge_ratio = np.count_nonzero(edges) / edges.size 125 signature["edge_ratio"] = float(edge_ratio) 126 127 # Compute gradient magnitude histogram 128 sobelx = cv2.Sobel(resized, cv2.CV_64F, 1, 0, ksize=3) 129 sobely = cv2.Sobel(resized, cv2.CV_64F, 0, 1, ksize=3) 130 magnitude = np.sqrt(sobelx**2 + sobely**2) 131 mag_hist = np.histogram(magnitude, bins=8, range=(0, 255))[0] 132 mag_hist = mag_hist / np.sum(mag_hist) # Normalize 133 signature["gradient_hist"] = [float(x) for x in mag_hist] 134 135 # Generate a unique hash for the entire signature 136 signature_str = str(signature) 137 signature["hash"] = hashlib.sha256(signature_str.encode()).hexdigest() 138 139 return signature 140 141 def compare_signatures(self, sig1, sig2, method="combined") -> float: 142 """Compare two frame signatures and return similarity score (0-1) 143 144 Args: 145 sig1: First signature dictionary 146 sig2: Second signature dictionary 147 method: Comparison method matching the signature generation method 148 149 Returns: 150 Similarity score between 0 (different) and 1 (identical) 151 """ 152 if sig1 is None or sig2 is None: 153 return 0.0 154 155 scores = [] 156 157 # Compare perceptual hashes (Hamming distance) 158 if method in ["phash", "combined"] and "phash" in sig1 and "phash" in sig2: 159 phash1 = np.array(sig1["phash"]) 160 phash2 = np.array(sig2["phash"]) 161 hamming_dist = np.sum(phash1 != phash2) 162 max_dist = len(phash1) 163 phash_score = 1.0 - (hamming_dist / max_dist) 164 scores.append(phash_score) 165 166 # Compare difference hashes 167 if method in ["dhash", "combined"] and "dhash" in sig1 and "dhash" in sig2: 168 dhash1 = np.array(sig1["dhash"]) 169 dhash2 = np.array(sig2["dhash"]) 170 hamming_dist = np.sum(dhash1 != dhash2) 171 max_dist = len(dhash1) 172 dhash_score = 1.0 - (hamming_dist / max_dist) 173 scores.append(dhash_score) 174 175 # Compare color histograms 176 if method in ["histogram", "combined"]: 177 if "color_hist" in sig1 and "color_hist" in sig2: 178 hist1 = np.array(sig1["color_hist"]) 179 hist2 = np.array(sig2["color_hist"]) 180 # Bhattacharyya distance for histograms 181 hist_score = cv2.compareHist( 182 hist1.astype(np.float32), 183 hist2.astype(np.float32), 184 cv2.HISTCMP_BHATTACHARYYA, 185 ) 186 # Convert to similarity score (0-1) 187 hist_score = 1.0 - min(hist_score, 1.0) 188 scores.append(hist_score) 189 elif "gray_hist" in sig1 and "gray_hist" in sig2: 190 hist1 = np.array(sig1["gray_hist"]) 191 hist2 = np.array(sig2["gray_hist"]) 192 hist_score = cv2.compareHist( 193 hist1.astype(np.float32), 194 hist2.astype(np.float32), 195 cv2.HISTCMP_BHATTACHARYYA, 196 ) 197 hist_score = 1.0 - min(hist_score, 1.0) 198 scores.append(hist_score) 199 200 # Compare edge features 201 if method in ["combined"] and "edge_ratio" in sig1 and "edge_ratio" in sig2: 202 edge_diff = abs(sig1["edge_ratio"] - sig2["edge_ratio"]) 203 edge_score = 1.0 - min(edge_diff, 1.0) 204 scores.append(edge_score) 205 206 # Compare gradient histograms 207 if method in ["combined"] and "gradient_hist" in sig1 and "gradient_hist" in sig2: 208 grad1 = np.array(sig1["gradient_hist"]) 209 grad2 = np.array(sig2["gradient_hist"]) 210 grad_score = cv2.compareHist( 211 grad1.astype(np.float32), 212 grad2.astype(np.float32), 213 cv2.HISTCMP_BHATTACHARYYA, 214 ) 215 grad_score = 1.0 - min(grad_score, 1.0) 216 scores.append(grad_score) 217 218 # If no scores were calculated, return 0 219 if not scores: 220 return 0.0 221 222 # Weight the scores based on reliability (can be adjusted) 223 weights = { 224 "phash": self.config.get("phash_weight", 0.35), 225 "dhash": self.config.get("dhash_weight", 0.25), 226 "histogram": self.config.get("histogram_weight", 0.2), 227 "edge_ratio": self.config.get("edge_ratio_weight", 0.1), 228 "gradient_hist": self.config.get("gradient_hist_weight", 0.1), 229 } 230 231 # For combined method, use weighted average 232 if method == "combined": 233 final_score = 0.0 234 total_weight = 0.0 235 236 if "phash" in sig1 and "phash" in sig2: 237 final_score += scores[0] * weights["phash"] 238 total_weight += weights["phash"] 239 240 if "dhash" in sig1 and "dhash" in sig2: 241 final_score += scores[1] * weights["dhash"] 242 total_weight += weights["dhash"] 243 244 if ("color_hist" in sig1 and "color_hist" in sig2) or ( 245 "gray_hist" in sig1 and "gray_hist" in sig2 246 ): 247 final_score += scores[2] * weights["histogram"] 248 total_weight += weights["histogram"] 249 250 if "edge_ratio" in sig1 and "edge_ratio" in sig2: 251 final_score += scores[3] * weights["edge_ratio"] 252 total_weight += weights["edge_ratio"] 253 254 if "gradient_hist" in sig1 and "gradient_hist" in sig2: 255 final_score += scores[4] * weights["gradient_hist"] 256 total_weight += weights["gradient_hist"] 257 258 if total_weight > 0: 259 return final_score / total_weight 260 else: 261 return 0.0 262 else: 263 # For single methods, return the calculated score 264 return scores[0] 265 266 def generate_video_fingerprint(self, frame, method="combined") -> dict: 267 """Generate a fingerprint from a list of video frames 268 269 Args: 270 frames: List of video frames 271 device_id: Camera device ID 272 method: Signature method to use 273 274 Returns: 275 Dictionary containing video fingerprint data 276 """ 277 278 if self.frame_count < self.config.get("duration", 100): 279 # Generate signature for this frame 280 signature = self.generate_frame_signature(frame, method) 281 if signature: 282 self.fingerprint["frame_signatures"].append(signature) 283 284 self.frame_count += 1 285 286 return self.frame_count 287 288 def save_fingerprint(self, directory="fingerprints"): 289 """Save a video fingerprint to disk 290 291 Args: 292 fingerprint: The fingerprint dictionary to save 293 directory: Directory to save fingerprints in 294 295 Returns: 296 Path to the saved fingerprint file 297 """ 298 299 if not self.fingerprint["frame_signatures"]: 300 return "" 301 302 if self.frame_count < self.config.get("duration", 100): 303 return "" 304 305 if self.saved: 306 return "" 307 308 # Create directory if it doesn't exist 309 os.makedirs(directory, exist_ok=True) 310 311 # Create filename based on fingerprint ID and timestamp 312 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") 313 filename = f"fingerprint_{timestamp}.pkl" 314 filepath = os.path.join(directory, filename) 315 316 # Save fingerprint using pickle 317 with open(filepath, "wb") as f: 318 pickle.dump(self.fingerprint, f) 319 320 self.saved = True 321 322 return filepath 323 324 def load_fingerprint(self, filepath): 325 """Load a fingerprint from disk 326 327 Args: 328 filepath: Path to the fingerprint file 329 330 Returns: 331 The loaded fingerprint dictionary or None if loading fails 332 """ 333 try: 334 with open(filepath, "rb") as f: 335 fingerprint = pickle.load(f) 336 337 return fingerprint 338 except Exception as e: 339 print(f"Error loading fingerprint from {filepath}: {str(e)}") 340 return None 341 342 def compare_video_fingerprints(self, fp1, fp2) -> dict: 343 """Compare two video fingerprints and return similarity metrics 344 345 Args: 346 fp1: First fingerprint dictionary 347 fp2: Second fingerprint dictionary 348 349 Returns: 350 Dictionary with similarity metrics 351 """ 352 if fp1 is None or fp2 is None: 353 return {"overall_similarity": 0.0, "error": "Invalid fingerprints"} 354 355 # Check if fingerprints use the same method 356 if fp1.get("method") != fp2.get("method"): 357 print( 358 f"Warning: Comparing fingerprints with different methods:\ 359 {fp1.get('method')} vs {fp2.get('method')}" 360 ) 361 362 method = fp1.get("method", "combined") 363 364 # Get frame signatures 365 sigs1 = fp1.get("frame_signatures", []) 366 sigs2 = fp2.get("frame_signatures", []) 367 368 if not sigs1 or not sigs2: 369 return {"overall_similarity": 0.0, "error": "Empty frame signatures"} 370 371 # Calculate frame-by-frame similarities 372 frame_similarities = [] 373 374 # Use dynamic time warping approach for different length fingerprints 375 if len(sigs1) != len(sigs2): 376 # Create similarity matrix 377 sim_matrix = np.zeros((len(sigs1), len(sigs2))) 378 379 for i, sig1 in enumerate(sigs1): 380 for j, sig2 in enumerate(sigs2): 381 sim_matrix[i, j] = self.compare_signatures(sig1, sig2, method) 382 383 # Find optimal path through similarity matrix (simplified DTW) 384 path_similarities = [] 385 386 # For each frame in the shorter fingerprint, find best match in longer one 387 if len(sigs1) <= len(sigs2): 388 for i in range(len(sigs1)): 389 best_match = np.max(sim_matrix[i, :]) 390 path_similarities.append(best_match) 391 else: 392 for j in range(len(sigs2)): 393 best_match = np.max(sim_matrix[:, j]) 394 path_similarities.append(best_match) 395 396 frame_similarities = path_similarities 397 else: 398 # Direct frame-by-frame comparison for same length fingerprints 399 for sig1, sig2 in zip(sigs1, sigs2, strict=False): 400 similarity = self.compare_signatures(sig1, sig2, method) 401 frame_similarities.append(similarity) 402 403 # Calculate overall metrics 404 overall_similarity = np.mean(frame_similarities) if frame_similarities else 0.0 405 min_similarity = np.min(frame_similarities) if frame_similarities else 0.0 406 max_similarity = np.max(frame_similarities) if frame_similarities else 0.0 407 408 # Calculate temporal consistency (how consistent the similarity is across frames) 409 temporal_consistency = 1.0 - np.std(frame_similarities) if frame_similarities else 0.0 410 411 return { 412 "overall_similarity": float(overall_similarity), 413 "min_similarity": float(min_similarity), 414 "max_similarity": float(max_similarity), 415 "temporal_consistency": float(temporal_consistency), 416 "frame_similarities": [float(s) for s in frame_similarities], 417 "name": fp2["metadata"]["name"], 418 "metadata": fp2["metadata"], 419 } 420 421 def identify_video(self, threshold=0.85) -> list[dict]: 422 """Identify a video sample against stored fingerprints 423 424 Args: 425 sample_fingerprint: Fingerprint of the video to identify 426 threshold: Minimum similarity threshold for a match 427 428 Returns: 429 List of matching fingerprints with similarity scores 430 """ 431 matches = [] 432 433 # Compare against all cached fingerprints 434 for fingerprint in self.fingerprint_cache: 435 comparison = self.compare_video_fingerprints(self.fingerprint, fingerprint) 436 437 if comparison["overall_similarity"] >= threshold: 438 matches.append( 439 { 440 "similarity": comparison["overall_similarity"], 441 "details": comparison, 442 } 443 ) 444 445 # Sort matches by similarity (highest first) 446 matches.sort(key=lambda x: x["similarity"], reverse=True) 447 448 return matches 449 450 def load_all_fingerprints(self, directory="fingerprints"): 451 """Load all fingerprints from a directory into cache 452 453 Args: 454 directory: Directory containing fingerprint files 455 456 Returns: 457 Number of fingerprints loaded 458 """ 459 if not os.path.exists(directory): 460 return 0 461 462 count = 0 463 for filename in os.listdir(directory): 464 if filename.endswith(".pkl"): 465 filepath = os.path.join(directory, filename) 466 fingerprint = self.load_fingerprint(filepath) 467 if fingerprint: 468 self.fingerprint_cache.append(fingerprint) 469 count += 1 470 471 return count 472 473 def process_frame(self, frame): 474 method = self.config.get("method", "combined") 475 count = self.generate_video_fingerprint(frame, method) 476 return { 477 "frame_count": count, 478 "frame": frame, 479 } 480 481 def handle_results(self, result, frame): 482 matched = [] 483 operations = self.config.get("operations", "compare") 484 if result["frame_count"] == self.config.get("duration", 100): 485 if operations == "compare": 486 matched = self.identify_video(self.config.get("threshold", 0.85)) 487 elif operations == "generate": 488 self.save_fingerprint(self.config.get("directory", "./fingerprint")) 489 else: 490 print("not supported operation") 491 492 if matched: 493 if matched[0]["details"]["name"] not in self.result: 494 self.result.append(matched[0]["details"]["name"]) 495 cv2.putText( 496 frame, 497 f"match with {matched[0]['details']['name']} :{matched[0]['similarity']:.2f}", 498 (150, frame.shape[0] - 90), 499 cv2.FONT_HERSHEY_SIMPLEX, 500 0.7, 501 (0, 0, 255), 502 2, 503 ) 504 else: 505 cv2.putText( 506 frame, 507 "signature done", 508 (150, frame.shape[0] - 90), 509 cv2.FONT_HERSHEY_SIMPLEX, 510 0.7, 511 (0, 0, 255), 512 2, 513 ) 514 else: 515 cv2.putText( 516 frame, 517 "signature in progress", 518 (150, frame.shape[0] - 90), 519 cv2.FONT_HERSHEY_SIMPLEX, 520 0.7, 521 (0, 0, 255), 522 2, 523 ) 524 525 def shutdown(self) -> list: 526 """Release plugin resources""" 527 if self.config.get("operations", "compare") == "compare": 528 if self.result: 529 print(f"{self.__class__.__name__} result: {self.result}\n") 530 else: 531 print(f"{self.__class__.__name__} result: no match\n") 532 533 return self.result 534