""" Vehicle Tracking Module - BoT-SORT based tracking with camera isolation Implements vehicle identification, persistence, and motion analysis using external tracker. """ import logging import time import uuid from typing import Dict, List, Optional, Tuple, Any from dataclasses import dataclass, field import numpy as np from threading import Lock from .bot_sort_tracker import MultiCameraBoTSORT logger = logging.getLogger(__name__) @dataclass class TrackedVehicle: """Represents a tracked vehicle with all its state information.""" track_id: int camera_id: str first_seen: float last_seen: float session_id: Optional[str] = None display_id: Optional[str] = None confidence: float = 0.0 bbox: Tuple[int, int, int, int] = (0, 0, 0, 0) # x1, y1, x2, y2 center: Tuple[float, float] = (0.0, 0.0) stable_frames: int = 0 total_frames: int = 0 is_stable: bool = False processed_pipeline: bool = False last_position_history: List[Tuple[float, float]] = field(default_factory=list) avg_confidence: float = 0.0 hit_streak: int = 0 age: int = 0 def update_position(self, bbox: Tuple[int, int, int, int], confidence: float): """Update vehicle position and confidence.""" self.bbox = bbox self.center = ((bbox[0] + bbox[2]) / 2, (bbox[1] + bbox[3]) / 2) self.last_seen = time.time() self.confidence = confidence self.total_frames += 1 # Update confidence average self.avg_confidence = ((self.avg_confidence * (self.total_frames - 1)) + confidence) / self.total_frames # Maintain position history (last 10 positions) self.last_position_history.append(self.center) if len(self.last_position_history) > 10: self.last_position_history.pop(0) def calculate_stability(self) -> float: """Calculate stability score based on position history.""" if len(self.last_position_history) < 2: return 0.0 # Calculate movement variance positions = np.array(self.last_position_history) if len(positions) < 2: return 0.0 # Calculate standard deviation of positions std_x = np.std(positions[:, 0]) std_y = np.std(positions[:, 1]) # Lower variance means more stable (inverse relationship) # Normalize to 0-1 range (assuming max reasonable std is 50 pixels) stability = max(0, 1 - (std_x + std_y) / 100) return stability def is_expired(self, timeout_seconds: float = 2.0) -> bool: """Check if vehicle tracking has expired.""" return (time.time() - self.last_seen) > timeout_seconds class VehicleTracker: """ Main vehicle tracking implementation using BoT-SORT with camera isolation. Manages continuous tracking, vehicle identification, and state persistence. """ def __init__(self, tracking_config: Optional[Dict] = None): """ Initialize the vehicle tracker. Args: tracking_config: Configuration from pipeline.json tracking section """ self.config = tracking_config or {} self.trigger_classes = self.config.get('trigger_classes', self.config.get('triggerClasses', ['frontal'])) self.min_confidence = self.config.get('minConfidence', 0.6) # BoT-SORT multi-camera tracker self.bot_sort = MultiCameraBoTSORT(self.trigger_classes, self.min_confidence) # Tracking state - maintain compatibility with existing code self.tracked_vehicles: Dict[str, Dict[int, TrackedVehicle]] = {} # camera_id -> {track_id: vehicle} self.lock = Lock() # Tracking parameters self.stability_threshold = 0.7 self.min_stable_frames = 5 self.timeout_seconds = 2.0 logger.info(f"VehicleTracker initialized with BoT-SORT: trigger_classes={self.trigger_classes}, " f"min_confidence={self.min_confidence}") def process_detections(self, results: Any, display_id: str, frame: np.ndarray) -> List[TrackedVehicle]: """ Process detection results using BoT-SORT tracking. Args: results: Detection results (InferenceResult) display_id: Display identifier for this stream frame: Current frame being processed Returns: List of currently tracked vehicles """ current_time = time.time() # Extract camera_id from display_id for tracking isolation camera_id = display_id # Using display_id as camera_id for isolation with self.lock: # Update BoT-SORT tracker track_results = self.bot_sort.update(camera_id, results) # Ensure camera tracking dict exists if camera_id not in self.tracked_vehicles: self.tracked_vehicles[camera_id] = {} # Update tracked vehicles based on BoT-SORT results current_tracks = {} active_tracks = [] for track_result in track_results: track_id = track_result['track_id'] # Create or update TrackedVehicle if track_id in self.tracked_vehicles[camera_id]: # Update existing vehicle vehicle = self.tracked_vehicles[camera_id][track_id] vehicle.update_position(track_result['bbox'], track_result['confidence']) vehicle.hit_streak = track_result['hit_streak'] vehicle.age = track_result['age'] # Update stability based on hit_streak if vehicle.hit_streak >= self.min_stable_frames: vehicle.is_stable = True vehicle.stable_frames = vehicle.hit_streak logger.debug(f"Updated track {track_id}: conf={vehicle.confidence:.2f}, " f"stable={vehicle.is_stable}, hit_streak={vehicle.hit_streak}") else: # Create new vehicle x1, y1, x2, y2 = track_result['bbox'] vehicle = TrackedVehicle( track_id=track_id, camera_id=camera_id, first_seen=current_time, last_seen=current_time, display_id=display_id, confidence=track_result['confidence'], bbox=tuple(track_result['bbox']), center=((x1 + x2) / 2, (y1 + y2) / 2), total_frames=1, hit_streak=track_result['hit_streak'], age=track_result['age'] ) vehicle.last_position_history.append(vehicle.center) logger.info(f"New vehicle tracked: ID={track_id}, camera={camera_id}, display={display_id}") current_tracks[track_id] = vehicle active_tracks.append(vehicle) # Update the camera's tracked vehicles self.tracked_vehicles[camera_id] = current_tracks return active_tracks def get_stable_vehicles(self, display_id: Optional[str] = None) -> List[TrackedVehicle]: """ Get all stable vehicles, optionally filtered by display. Args: display_id: Optional display ID to filter by Returns: List of stable tracked vehicles """ with self.lock: stable = [] camera_id = display_id # Using display_id as camera_id if camera_id in self.tracked_vehicles: for vehicle in self.tracked_vehicles[camera_id].values(): if (vehicle.is_stable and not vehicle.is_expired(self.timeout_seconds) and (display_id is None or vehicle.display_id == display_id)): stable.append(vehicle) return stable def get_vehicle_by_session(self, session_id: str) -> Optional[TrackedVehicle]: """ Get a tracked vehicle by its session ID. Args: session_id: Session ID to look up Returns: Tracked vehicle if found, None otherwise """ with self.lock: # Search across all cameras for camera_vehicles in self.tracked_vehicles.values(): for vehicle in camera_vehicles.values(): if vehicle.session_id == session_id: return vehicle return None def mark_processed(self, track_id: int, session_id: str): """ Mark a vehicle as processed through the pipeline. Args: track_id: Track ID of the vehicle session_id: Session ID assigned to this vehicle """ with self.lock: # Search across all cameras for the track_id for camera_vehicles in self.tracked_vehicles.values(): if track_id in camera_vehicles: vehicle = camera_vehicles[track_id] vehicle.processed_pipeline = True vehicle.session_id = session_id logger.info(f"Marked vehicle {track_id} as processed with session {session_id}") return def clear_session(self, session_id: str): """ Clear session ID from a tracked vehicle (post-fueling). Args: session_id: Session ID to clear """ with self.lock: # Search across all cameras for camera_vehicles in self.tracked_vehicles.values(): for vehicle in camera_vehicles.values(): if vehicle.session_id == session_id: logger.info(f"Clearing session {session_id} from vehicle {vehicle.track_id}") vehicle.session_id = None # Keep processed_pipeline=True to prevent re-processing def reset_tracking(self): """Reset all tracking state.""" with self.lock: self.tracked_vehicles.clear() self.bot_sort.reset_all() logger.info("Vehicle tracking state reset") def get_statistics(self) -> Dict: """Get tracking statistics.""" with self.lock: total = 0 stable = 0 processed = 0 all_confidences = [] # Aggregate stats across all cameras for camera_vehicles in self.tracked_vehicles.values(): total += len(camera_vehicles) for vehicle in camera_vehicles.values(): if vehicle.is_stable: stable += 1 if vehicle.processed_pipeline: processed += 1 all_confidences.append(vehicle.avg_confidence) return { 'total_tracked': total, 'stable_vehicles': stable, 'processed_vehicles': processed, 'avg_confidence': np.mean(all_confidences) if all_confidences else 0.0, 'bot_sort_stats': self.bot_sort.get_statistics() }