import threading from typing import Dict, List, Optional, Tuple, Any from dataclasses import dataclass, field from collections import defaultdict, deque import time import torch import numpy as np @dataclass class TrackedObject: """ Represents a tracked object with persistent ID and metadata. Attributes: track_id: Unique persistent tracking ID class_id: Object class ID from detection model class_name: Object class name (if available) confidence: Detection confidence score (0-1) bbox: Bounding box in format [x1, y1, x2, y2] (normalized or absolute) last_seen_frame: Frame number when object was last detected first_seen_frame: Frame number when object was first detected track_history: Deque of historical bboxes for trajectory tracking state: Custom state dict for additional tracking data """ track_id: int class_id: int class_name: str = "unknown" confidence: float = 0.0 bbox: List[float] = field(default_factory=list) last_seen_frame: int = 0 first_seen_frame: int = 0 track_history: deque = field(default_factory=lambda: deque(maxlen=30)) state: Dict[str, Any] = field(default_factory=dict) def update(self, bbox: List[float], confidence: float, frame_num: int): """Update tracked object with new detection""" self.bbox = bbox self.confidence = confidence self.last_seen_frame = frame_num self.track_history.append((frame_num, bbox, confidence)) def age(self, current_frame: int) -> int: """Get age of track in frames since last seen""" return current_frame - self.last_seen_frame def to_dict(self) -> Dict[str, Any]: """Convert to dictionary for serialization""" return { 'track_id': self.track_id, 'class_id': self.class_id, 'class_name': self.class_name, 'confidence': self.confidence, 'bbox': self.bbox, 'last_seen_frame': self.last_seen_frame, 'first_seen_frame': self.first_seen_frame, 'age': self.last_seen_frame - self.first_seen_frame, 'history_length': len(self.track_history), 'state': self.state } @dataclass class Detection: """ Represents a single detection from object detection model. Attributes: bbox: Bounding box [x1, y1, x2, y2] confidence: Detection confidence (0-1) class_id: Object class ID class_name: Object class name (optional) """ bbox: List[float] confidence: float class_id: int class_name: str = "unknown" class ObjectTracker: """ Lightweight GPU-accelerated object tracker (decoupled from inference). This class only handles tracking logic - associating detections with existing tracks, maintaining track IDs, and managing track lifecycle. It does NOT perform inference. Architecture (Event-Driven Mode): - Receives pre-computed detections (from ModelController) - Maintains persistent tracking state (track IDs, histories) - GPU-accelerated IoU computation for track association - Thread-safe for concurrent operations Tracking Flow: Detections → Track Association (GPU IoU) → Update Tracks → Return Tracked Objects Features: - Lightweight: No model_repository dependency (zero VRAM overhead) - GPU-accelerated: IoU computation on GPU for performance - Persistent IDs: Tracks maintain consistent IDs across frames - Track History: Maintains trajectory history for each object - Thread-safe: Mutex-based locking for concurrent access Example: # Event-driven mode (no model dependency) tracker = ObjectTracker( gpu_id=0, tracker_type="iou", max_age=30, iou_threshold=0.3, class_names=COCO_CLASSES ) # Update with pre-computed detections detections = [Detection(bbox=[x1,y1,x2,y2], confidence=0.9, class_id=0)] tracked_objects = tracker.update(detections) """ def __init__(self, gpu_id: int = 0, tracker_type: str = "iou", max_age: int = 30, iou_threshold: float = 0.3, class_names: Optional[Dict[int, str]] = None): """ Initialize ObjectTracker (no model dependency). Args: gpu_id: GPU device ID for IoU computation tracker_type: Tracking algorithm type ("iou") max_age: Maximum frames to keep track without detection iou_threshold: IoU threshold for track association class_names: Optional mapping of class IDs to names """ self.gpu_id = gpu_id self.device = torch.device(f'cuda:{gpu_id}') self.tracker_type = tracker_type self.max_age = max_age self.iou_threshold = iou_threshold self.class_names = class_names or {} # Tracking state self._tracks: Dict[int, TrackedObject] = {} self._next_track_id: int = 0 self._frame_count: int = 0 self._lock = threading.RLock() # Statistics self._total_detections = 0 self._total_tracks_created = 0 def _compute_iou_gpu(self, boxes1: torch.Tensor, boxes2: torch.Tensor) -> torch.Tensor: """ Compute IoU between two sets of boxes on GPU. Args: boxes1: Tensor of shape (N, 4) in format [x1, y1, x2, y2] boxes2: Tensor of shape (M, 4) in format [x1, y1, x2, y2] Returns: IoU matrix of shape (N, M) """ # Ensure on GPU boxes1 = boxes1.to(self.device) boxes2 = boxes2.to(self.device) # Compute intersection x1_max = torch.max(boxes1[:, None, 0], boxes2[:, 0]) # (N, M) y1_max = torch.max(boxes1[:, None, 1], boxes2[:, 1]) # (N, M) x2_min = torch.min(boxes1[:, None, 2], boxes2[:, 2]) # (N, M) y2_min = torch.min(boxes1[:, None, 3], boxes2[:, 3]) # (N, M) intersection_width = torch.clamp(x2_min - x1_max, min=0) intersection_height = torch.clamp(y2_min - y1_max, min=0) intersection_area = intersection_width * intersection_height # Compute areas boxes1_area = (boxes1[:, 2] - boxes1[:, 0]) * (boxes1[:, 3] - boxes1[:, 1]) boxes2_area = (boxes2[:, 2] - boxes2[:, 0]) * (boxes2[:, 3] - boxes2[:, 1]) # Compute union union_area = boxes1_area[:, None] + boxes2_area - intersection_area # Compute IoU iou = intersection_area / (union_area + 1e-6) return iou def _iou_tracking(self, detections: torch.Tensor) -> List[Tuple[int, int]]: """ Simple IoU-based tracking algorithm (on GPU). Args: detections: Tensor of shape (N, 6) with [x1, y1, x2, y2, conf, class_id] Returns: List of (detection_idx, track_id) associations """ if len(self._tracks) == 0: # No existing tracks, create new ones for all detections return [(-1, -1) for _ in range(len(detections))] # Get existing track bboxes track_ids = list(self._tracks.keys()) track_bboxes = torch.tensor( [self._tracks[tid].bbox for tid in track_ids], dtype=torch.float32, device=self.device ) # Extract detection bboxes det_bboxes = detections[:, :4] # (N, 4) # Compute IoU matrix (GPU) iou_matrix = self._compute_iou_gpu(det_bboxes, track_bboxes) # (N, M) # Greedy matching: assign each detection to best matching track associations = [] matched_tracks = set() # Convert to CPU for matching logic (small matrix) iou_cpu = iou_matrix.cpu().numpy() for det_idx in range(len(detections)): best_iou = self.iou_threshold best_track_idx = -1 for track_idx, track_id in enumerate(track_ids): if track_idx in matched_tracks: continue if iou_cpu[det_idx, track_idx] > best_iou: best_iou = iou_cpu[det_idx, track_idx] best_track_idx = track_idx if best_track_idx >= 0: associations.append((det_idx, track_ids[best_track_idx])) matched_tracks.add(best_track_idx) else: associations.append((det_idx, -1)) # New track return associations def _create_track(self, bbox: List[float], confidence: float, class_id: int, frame_num: int) -> TrackedObject: """Create a new tracked object""" track_id = self._next_track_id self._next_track_id += 1 self._total_tracks_created += 1 class_name = self.class_names.get(class_id, f"class_{class_id}") track = TrackedObject( track_id=track_id, class_id=class_id, class_name=class_name, confidence=confidence, bbox=bbox, last_seen_frame=frame_num, first_seen_frame=frame_num ) track.track_history.append((frame_num, bbox, confidence)) return track def _cleanup_stale_tracks(self): """Remove tracks that haven't been seen for max_age frames""" stale_track_ids = [ tid for tid, track in self._tracks.items() if track.age(self._frame_count) > self.max_age ] for tid in stale_track_ids: del self._tracks[tid] def update(self, detections: List[Detection], frame_shape: tuple = None, model_input_size: int = 640) -> List[TrackedObject]: """ Update tracker with new detections (decoupled from inference). Args: detections: List of Detection objects from model inference frame_shape: Original frame shape (C, H, W) for scaling bboxes back from model space model_input_size: Model input size (default: 640 for YOLOv8) Returns: List of currently tracked objects """ with self._lock: self._frame_count += 1 self._total_detections += len(detections) # No detections, just cleanup stale tracks if len(detections) == 0: self._cleanup_stale_tracks() return list(self._tracks.values()) # Scale detections from model space (640x640) to frame space (H x W) if frame_shape is not None: _, frame_h, frame_w = frame_shape scale_x = frame_w / model_input_size scale_y = frame_h / model_input_size # Scale all detection bboxes for det in detections: x1, y1, x2, y2 = det.bbox det.bbox = [ x1 * scale_x, y1 * scale_y, x2 * scale_x, y2 * scale_y ] # Convert detections to tensor for GPU processing det_tensor = torch.tensor( [[*det.bbox, det.confidence, det.class_id] for det in detections], dtype=torch.float32, device=self.device ) # Run tracking algorithm if self.tracker_type == "iou": associations = self._iou_tracking(det_tensor) else: raise NotImplementedError(f"Tracker type '{self.tracker_type}' not implemented") # Update tracks based on associations for det_idx, track_id in associations: det = detections[det_idx] if track_id == -1: # Create new track new_track = self._create_track( det.bbox, det.confidence, det.class_id, self._frame_count ) self._tracks[new_track.track_id] = new_track else: # Update existing track self._tracks[track_id].update(det.bbox, det.confidence, self._frame_count) # Cleanup stale tracks self._cleanup_stale_tracks() return list(self._tracks.values()) def get_all_tracks(self, active_only: bool = True) -> List[TrackedObject]: """ Get all tracked objects. Args: active_only: If True, only return tracks seen in recent frames Returns: List of tracked objects """ with self._lock: if active_only: return [ track for track in self._tracks.values() if track.age(self._frame_count) <= self.max_age ] else: return list(self._tracks.values()) def get_track_by_id(self, track_id: int) -> Optional[TrackedObject]: """ Get a specific track by ID. Args: track_id: Track ID to retrieve Returns: TrackedObject or None if not found """ with self._lock: return self._tracks.get(track_id) def get_tracks_by_class(self, class_id: int, active_only: bool = True) -> List[TrackedObject]: """ Get all tracks of a specific class. Args: class_id: Class ID to filter by active_only: If True, only return active tracks Returns: List of tracked objects """ all_tracks = self.get_all_tracks(active_only=active_only) return [track for track in all_tracks if track.class_id == class_id] def get_track_count(self, active_only: bool = True) -> int: """ Get number of tracked objects. Args: active_only: If True, only count active tracks Returns: Number of tracks """ return len(self.get_all_tracks(active_only=active_only)) def get_class_counts(self, active_only: bool = True) -> Dict[int, int]: """ Get count of tracked objects per class. Args: active_only: If True, only count active tracks Returns: Dictionary mapping class_id to count """ tracks = self.get_all_tracks(active_only=active_only) counts = defaultdict(int) for track in tracks: counts[track.class_id] += 1 return dict(counts) def reset_tracks(self): """Clear all tracking state""" with self._lock: self._tracks.clear() self._next_track_id = 0 self._frame_count = 0 print("Tracking state reset") def get_statistics(self) -> Dict[str, Any]: """ Get tracking statistics. Returns: Dictionary with tracking stats """ with self._lock: return { 'frame_count': self._frame_count, 'active_tracks': len(self._tracks), 'total_tracks_created': self._total_tracks_created, 'total_detections': self._total_detections, 'avg_detections_per_frame': self._total_detections / max(self._frame_count, 1), 'tracker_type': self.tracker_type, 'class_counts': self.get_class_counts(active_only=True) } def export_tracks(self, format: str = "dict") -> Any: """ Export all tracks in specified format. Args: format: Export format ("dict", "json", "numpy") Returns: Tracks in specified format """ with self._lock: tracks = self.get_all_tracks(active_only=False) if format == "dict": return {track.track_id: track.to_dict() for track in tracks} elif format == "json": import json return json.dumps( {track.track_id: track.to_dict() for track in tracks}, indent=2 ) elif format == "numpy": # Export as numpy array: [track_id, class_id, x1, y1, x2, y2, conf] data = [] for track in tracks: data.append([ track.track_id, track.class_id, *track.bbox, track.confidence ]) return np.array(data) if data else np.array([]) else: raise ValueError(f"Unknown export format: {format}") def __repr__(self): with self._lock: return (f"ObjectTracker(tracker={self.tracker_type}, " f"frame={self._frame_count}, " f"tracks={len(self._tracks)})")