1# Copyright (c) 2025 NXP
2#
3# SPDX-License-Identifier: Apache-2.0
4
5import hashlib
6import os
7import pickle
8from datetime import datetime
9
10import cv2
11import numpy as np
12
13from camera_shield.uvc_core.plugin_base import DetectionPlugin
14
15
16class VideoSignaturePlugin(DetectionPlugin):
17    """Plugin for generating and comparing video frame signatures and fingerprints"""
18
19    def __init__(self, name, config):
20        super().__init__(name, config)
21        self.fingerprint_cache = []  # Store video fingerprints
22        self.fingerprint = {
23            "method": config.get("method", "combined"),
24            "frame_signatures": [],
25            "metadata": config.get("metadata", {}),
26        }
27        self.frame_count = 0
28        self.operations = config.get("operations", "compare")
29        self.saved = False
30        self.result = []
31
32    def initialize(self):
33        """Initialize detection resources"""
34        print("initialize")
35        if self.operations == "compare":
36            print("compare")
37            directory = self.config.get("directory", "./fingerprints")
38            if os.path.isdir(directory):
39                self.load_all_fingerprints(directory)
40            else:
41                print(f"{directory} not exist")
42
43    def generate_frame_signature(self, frame, method="combined") -> dict:
44        """Generate a robust signature for a given frame using multiple techniques
45
46        Args:
47            frame: The input frame
48            method: Signature method ('phash', 'dhash', 'histogram', 'combined')
49
50        Returns:
51            A dictionary containing signature data
52        """
53        if frame is None:
54            return None
55
56        # Convert to grayscale for consistent processing
57        if len(frame.shape) == 3:
58            gray = cv2.cvtColor(frame, cv2.COLOR_RGB2GRAY)
59        else:
60            gray = frame.copy()
61
62        # Resize to standard size for consistent signatures
63        resized = cv2.resize(gray, (64, 64))
64
65        signature = {}
66
67        # Perceptual Hash (pHash)
68        if method in ["phash", "combined"]:
69            # DCT transform
70            dct = cv2.dct(np.float32(resized))
71            # Keep only the top-left 8x8 coefficients
72            dct_low = dct[:8, :8]
73            # Compute median value
74            med = np.median(dct_low)
75            # Convert to binary hash
76            phash = (dct_low > med).flatten().astype(int)
77            signature["phash"] = phash.tolist()
78            # Generate a compact hex representation
79            phash_hex = "".join(
80                [
81                    hex(int("".join(map(str, phash[i : i + 4])), 2))[2:]
82                    for i in range(0, len(phash), 4)
83                ]
84            )
85            signature["phash_hex"] = phash_hex
86
87        # Difference Hash (dHash)
88        if method in ["dhash", "combined"]:
89            # Resize to 9x8 (one pixel larger horizontally)
90            resized_dhash = cv2.resize(gray, (9, 8))
91            # Compute differences horizontally
92            diff = resized_dhash[:, 1:] > resized_dhash[:, :-1]
93            # Flatten to 1D array
94            dhash = diff.flatten().astype(int)
95            signature["dhash"] = dhash.tolist()
96            # Generate a compact hex representation
97            dhash_hex = "".join(
98                [
99                    hex(int("".join(map(str, dhash[i : i + 4])), 2))[2:]
100                    for i in range(0, len(dhash), 4)
101                ]
102            )
103            signature["dhash_hex"] = dhash_hex
104
105        # Color histogram features
106        if method in ["histogram", "combined"]:
107            if len(frame.shape) == 3:  # Color image
108                hist_features = []
109                for i in range(3):  # For each color channel
110                    hist = cv2.calcHist([frame], [i], None, [16], [0, 256])
111                    hist = cv2.normalize(hist, hist).flatten()
112                    hist_features.extend(hist)
113                signature["color_hist"] = [float(x) for x in hist_features]
114            else:  # Grayscale
115                hist = cv2.calcHist([gray], [0], None, [32], [0, 256])
116                hist = cv2.normalize(hist, hist).flatten()
117                signature["gray_hist"] = [float(x) for x in hist]
118
119        # Edge features
120        if method in ["combined"]:
121            # Canny edge detection
122            edges = cv2.Canny(resized, 100, 200)
123            # Count percentage of edge pixels
124            edge_ratio = np.count_nonzero(edges) / edges.size
125            signature["edge_ratio"] = float(edge_ratio)
126
127            # Compute gradient magnitude histogram
128            sobelx = cv2.Sobel(resized, cv2.CV_64F, 1, 0, ksize=3)
129            sobely = cv2.Sobel(resized, cv2.CV_64F, 0, 1, ksize=3)
130            magnitude = np.sqrt(sobelx**2 + sobely**2)
131            mag_hist = np.histogram(magnitude, bins=8, range=(0, 255))[0]
132            mag_hist = mag_hist / np.sum(mag_hist)  # Normalize
133            signature["gradient_hist"] = [float(x) for x in mag_hist]
134
135        # Generate a unique hash for the entire signature
136        signature_str = str(signature)
137        signature["hash"] = hashlib.sha256(signature_str.encode()).hexdigest()
138
139        return signature
140
141    def compare_signatures(self, sig1, sig2, method="combined") -> float:
142        """Compare two frame signatures and return similarity score (0-1)
143
144        Args:
145            sig1: First signature dictionary
146            sig2: Second signature dictionary
147            method: Comparison method matching the signature generation method
148
149        Returns:
150            Similarity score between 0 (different) and 1 (identical)
151        """
152        if sig1 is None or sig2 is None:
153            return 0.0
154
155        scores = []
156
157        # Compare perceptual hashes (Hamming distance)
158        if method in ["phash", "combined"] and "phash" in sig1 and "phash" in sig2:
159            phash1 = np.array(sig1["phash"])
160            phash2 = np.array(sig2["phash"])
161            hamming_dist = np.sum(phash1 != phash2)
162            max_dist = len(phash1)
163            phash_score = 1.0 - (hamming_dist / max_dist)
164            scores.append(phash_score)
165
166        # Compare difference hashes
167        if method in ["dhash", "combined"] and "dhash" in sig1 and "dhash" in sig2:
168            dhash1 = np.array(sig1["dhash"])
169            dhash2 = np.array(sig2["dhash"])
170            hamming_dist = np.sum(dhash1 != dhash2)
171            max_dist = len(dhash1)
172            dhash_score = 1.0 - (hamming_dist / max_dist)
173            scores.append(dhash_score)
174
175        # Compare color histograms
176        if method in ["histogram", "combined"]:
177            if "color_hist" in sig1 and "color_hist" in sig2:
178                hist1 = np.array(sig1["color_hist"])
179                hist2 = np.array(sig2["color_hist"])
180                # Bhattacharyya distance for histograms
181                hist_score = cv2.compareHist(
182                    hist1.astype(np.float32),
183                    hist2.astype(np.float32),
184                    cv2.HISTCMP_BHATTACHARYYA,
185                )
186                # Convert to similarity score (0-1)
187                hist_score = 1.0 - min(hist_score, 1.0)
188                scores.append(hist_score)
189            elif "gray_hist" in sig1 and "gray_hist" in sig2:
190                hist1 = np.array(sig1["gray_hist"])
191                hist2 = np.array(sig2["gray_hist"])
192                hist_score = cv2.compareHist(
193                    hist1.astype(np.float32),
194                    hist2.astype(np.float32),
195                    cv2.HISTCMP_BHATTACHARYYA,
196                )
197                hist_score = 1.0 - min(hist_score, 1.0)
198                scores.append(hist_score)
199
200        # Compare edge features
201        if method in ["combined"] and "edge_ratio" in sig1 and "edge_ratio" in sig2:
202            edge_diff = abs(sig1["edge_ratio"] - sig2["edge_ratio"])
203            edge_score = 1.0 - min(edge_diff, 1.0)
204            scores.append(edge_score)
205
206        # Compare gradient histograms
207        if method in ["combined"] and "gradient_hist" in sig1 and "gradient_hist" in sig2:
208            grad1 = np.array(sig1["gradient_hist"])
209            grad2 = np.array(sig2["gradient_hist"])
210            grad_score = cv2.compareHist(
211                grad1.astype(np.float32),
212                grad2.astype(np.float32),
213                cv2.HISTCMP_BHATTACHARYYA,
214            )
215            grad_score = 1.0 - min(grad_score, 1.0)
216            scores.append(grad_score)
217
218        # If no scores were calculated, return 0
219        if not scores:
220            return 0.0
221
222        # Weight the scores based on reliability (can be adjusted)
223        weights = {
224            "phash": self.config.get("phash_weight", 0.35),
225            "dhash": self.config.get("dhash_weight", 0.25),
226            "histogram": self.config.get("histogram_weight", 0.2),
227            "edge_ratio": self.config.get("edge_ratio_weight", 0.1),
228            "gradient_hist": self.config.get("gradient_hist_weight", 0.1),
229        }
230
231        # For combined method, use weighted average
232        if method == "combined":
233            final_score = 0.0
234            total_weight = 0.0
235
236            if "phash" in sig1 and "phash" in sig2:
237                final_score += scores[0] * weights["phash"]
238                total_weight += weights["phash"]
239
240            if "dhash" in sig1 and "dhash" in sig2:
241                final_score += scores[1] * weights["dhash"]
242                total_weight += weights["dhash"]
243
244            if ("color_hist" in sig1 and "color_hist" in sig2) or (
245                "gray_hist" in sig1 and "gray_hist" in sig2
246            ):
247                final_score += scores[2] * weights["histogram"]
248                total_weight += weights["histogram"]
249
250            if "edge_ratio" in sig1 and "edge_ratio" in sig2:
251                final_score += scores[3] * weights["edge_ratio"]
252                total_weight += weights["edge_ratio"]
253
254            if "gradient_hist" in sig1 and "gradient_hist" in sig2:
255                final_score += scores[4] * weights["gradient_hist"]
256                total_weight += weights["gradient_hist"]
257
258            if total_weight > 0:
259                return final_score / total_weight
260            else:
261                return 0.0
262        else:
263            # For single methods, return the calculated score
264            return scores[0]
265
266    def generate_video_fingerprint(self, frame, method="combined") -> dict:
267        """Generate a fingerprint from a list of video frames
268
269        Args:
270            frames: List of video frames
271            device_id: Camera device ID
272            method: Signature method to use
273
274        Returns:
275            Dictionary containing video fingerprint data
276        """
277
278        if self.frame_count < self.config.get("duration", 100):
279            # Generate signature for this frame
280            signature = self.generate_frame_signature(frame, method)
281            if signature:
282                self.fingerprint["frame_signatures"].append(signature)
283
284            self.frame_count += 1
285
286        return self.frame_count
287
288    def save_fingerprint(self, directory="fingerprints"):
289        """Save a video fingerprint to disk
290
291        Args:
292            fingerprint: The fingerprint dictionary to save
293            directory: Directory to save fingerprints in
294
295        Returns:
296            Path to the saved fingerprint file
297        """
298
299        if not self.fingerprint["frame_signatures"]:
300            return ""
301
302        if self.frame_count < self.config.get("duration", 100):
303            return ""
304
305        if self.saved:
306            return ""
307
308        # Create directory if it doesn't exist
309        os.makedirs(directory, exist_ok=True)
310
311        # Create filename based on fingerprint ID and timestamp
312        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
313        filename = f"fingerprint_{timestamp}.pkl"
314        filepath = os.path.join(directory, filename)
315
316        # Save fingerprint using pickle
317        with open(filepath, "wb") as f:
318            pickle.dump(self.fingerprint, f)
319
320        self.saved = True
321
322        return filepath
323
324    def load_fingerprint(self, filepath):
325        """Load a fingerprint from disk
326
327        Args:
328            filepath: Path to the fingerprint file
329
330        Returns:
331            The loaded fingerprint dictionary or None if loading fails
332        """
333        try:
334            with open(filepath, "rb") as f:
335                fingerprint = pickle.load(f)
336
337            return fingerprint
338        except Exception as e:
339            print(f"Error loading fingerprint from {filepath}: {str(e)}")
340            return None
341
342    def compare_video_fingerprints(self, fp1, fp2) -> dict:
343        """Compare two video fingerprints and return similarity metrics
344
345        Args:
346            fp1: First fingerprint dictionary
347            fp2: Second fingerprint dictionary
348
349        Returns:
350            Dictionary with similarity metrics
351        """
352        if fp1 is None or fp2 is None:
353            return {"overall_similarity": 0.0, "error": "Invalid fingerprints"}
354
355        # Check if fingerprints use the same method
356        if fp1.get("method") != fp2.get("method"):
357            print(
358                f"Warning: Comparing fingerprints with different methods:\
359                    {fp1.get('method')} vs {fp2.get('method')}"
360            )
361
362        method = fp1.get("method", "combined")
363
364        # Get frame signatures
365        sigs1 = fp1.get("frame_signatures", [])
366        sigs2 = fp2.get("frame_signatures", [])
367
368        if not sigs1 or not sigs2:
369            return {"overall_similarity": 0.0, "error": "Empty frame signatures"}
370
371        # Calculate frame-by-frame similarities
372        frame_similarities = []
373
374        # Use dynamic time warping approach for different length fingerprints
375        if len(sigs1) != len(sigs2):
376            # Create similarity matrix
377            sim_matrix = np.zeros((len(sigs1), len(sigs2)))
378
379            for i, sig1 in enumerate(sigs1):
380                for j, sig2 in enumerate(sigs2):
381                    sim_matrix[i, j] = self.compare_signatures(sig1, sig2, method)
382
383            # Find optimal path through similarity matrix (simplified DTW)
384            path_similarities = []
385
386            # For each frame in the shorter fingerprint, find best match in longer one
387            if len(sigs1) <= len(sigs2):
388                for i in range(len(sigs1)):
389                    best_match = np.max(sim_matrix[i, :])
390                    path_similarities.append(best_match)
391            else:
392                for j in range(len(sigs2)):
393                    best_match = np.max(sim_matrix[:, j])
394                    path_similarities.append(best_match)
395
396            frame_similarities = path_similarities
397        else:
398            # Direct frame-by-frame comparison for same length fingerprints
399            for sig1, sig2 in zip(sigs1, sigs2, strict=False):
400                similarity = self.compare_signatures(sig1, sig2, method)
401                frame_similarities.append(similarity)
402
403        # Calculate overall metrics
404        overall_similarity = np.mean(frame_similarities) if frame_similarities else 0.0
405        min_similarity = np.min(frame_similarities) if frame_similarities else 0.0
406        max_similarity = np.max(frame_similarities) if frame_similarities else 0.0
407
408        # Calculate temporal consistency (how consistent the similarity is across frames)
409        temporal_consistency = 1.0 - np.std(frame_similarities) if frame_similarities else 0.0
410
411        return {
412            "overall_similarity": float(overall_similarity),
413            "min_similarity": float(min_similarity),
414            "max_similarity": float(max_similarity),
415            "temporal_consistency": float(temporal_consistency),
416            "frame_similarities": [float(s) for s in frame_similarities],
417            "name": fp2["metadata"]["name"],
418            "metadata": fp2["metadata"],
419        }
420
421    def identify_video(self, threshold=0.85) -> list[dict]:
422        """Identify a video sample against stored fingerprints
423
424        Args:
425            sample_fingerprint: Fingerprint of the video to identify
426            threshold: Minimum similarity threshold for a match
427
428        Returns:
429            List of matching fingerprints with similarity scores
430        """
431        matches = []
432
433        # Compare against all cached fingerprints
434        for fingerprint in self.fingerprint_cache:
435            comparison = self.compare_video_fingerprints(self.fingerprint, fingerprint)
436
437            if comparison["overall_similarity"] >= threshold:
438                matches.append(
439                    {
440                        "similarity": comparison["overall_similarity"],
441                        "details": comparison,
442                    }
443                )
444
445        # Sort matches by similarity (highest first)
446        matches.sort(key=lambda x: x["similarity"], reverse=True)
447
448        return matches
449
450    def load_all_fingerprints(self, directory="fingerprints"):
451        """Load all fingerprints from a directory into cache
452
453        Args:
454            directory: Directory containing fingerprint files
455
456        Returns:
457            Number of fingerprints loaded
458        """
459        if not os.path.exists(directory):
460            return 0
461
462        count = 0
463        for filename in os.listdir(directory):
464            if filename.endswith(".pkl"):
465                filepath = os.path.join(directory, filename)
466                fingerprint = self.load_fingerprint(filepath)
467                if fingerprint:
468                    self.fingerprint_cache.append(fingerprint)
469                    count += 1
470
471        return count
472
473    def process_frame(self, frame):
474        method = self.config.get("method", "combined")
475        count = self.generate_video_fingerprint(frame, method)
476        return {
477            "frame_count": count,
478            "frame": frame,
479        }
480
481    def handle_results(self, result, frame):
482        matched = []
483        operations = self.config.get("operations", "compare")
484        if result["frame_count"] == self.config.get("duration", 100):
485            if operations == "compare":
486                matched = self.identify_video(self.config.get("threshold", 0.85))
487            elif operations == "generate":
488                self.save_fingerprint(self.config.get("directory", "./fingerprint"))
489            else:
490                print("not supported operation")
491
492            if matched:
493                if matched[0]["details"]["name"] not in self.result:
494                    self.result.append(matched[0]["details"]["name"])
495                cv2.putText(
496                    frame,
497                    f"match with {matched[0]['details']['name']} :{matched[0]['similarity']:.2f}",
498                    (150, frame.shape[0] - 90),
499                    cv2.FONT_HERSHEY_SIMPLEX,
500                    0.7,
501                    (0, 0, 255),
502                    2,
503                )
504            else:
505                cv2.putText(
506                    frame,
507                    "signature done",
508                    (150, frame.shape[0] - 90),
509                    cv2.FONT_HERSHEY_SIMPLEX,
510                    0.7,
511                    (0, 0, 255),
512                    2,
513                )
514        else:
515            cv2.putText(
516                frame,
517                "signature in progress",
518                (150, frame.shape[0] - 90),
519                cv2.FONT_HERSHEY_SIMPLEX,
520                0.7,
521                (0, 0, 255),
522                2,
523            )
524
525    def shutdown(self) -> list:
526        """Release plugin resources"""
527        if self.config.get("operations", "compare") == "compare":
528            if self.result:
529                print(f"{self.__class__.__name__} result: {self.result}\n")
530            else:
531                print(f"{self.__class__.__name__} result: no match\n")
532
533        return self.result
534