""" Vehicle Tracking Module - Continuous tracking with front_rear_detection model Implements vehicle identification, persistence, and motion analysis. """ import logging import time import uuid from typing import Dict, List, Optional, Tuple, Any from dataclasses import dataclass, field import numpy as np from threading import Lock logger = logging.getLogger(__name__) @dataclass class TrackedVehicle: """Represents a tracked vehicle with all its state information.""" track_id: int first_seen: float last_seen: float session_id: Optional[str] = None display_id: Optional[str] = None confidence: float = 0.0 bbox: Tuple[int, int, int, int] = (0, 0, 0, 0) # x1, y1, x2, y2 center: Tuple[float, float] = (0.0, 0.0) stable_frames: int = 0 total_frames: int = 0 is_stable: bool = False processed_pipeline: bool = False last_position_history: List[Tuple[float, float]] = field(default_factory=list) avg_confidence: float = 0.0 # Hybrid validation fields track_id_changes: int = 0 # Number of times track ID changed for same position position_stability_score: float = 0.0 # Independent position-based stability continuous_stable_duration: float = 0.0 # Time continuously stable (ignoring track ID changes) last_track_id_change: Optional[float] = None # When track ID last changed original_track_id: int = None # First track ID seen at this position def update_position(self, bbox: Tuple[int, int, int, int], confidence: float, new_track_id: Optional[int] = None): """Update vehicle position and confidence.""" self.bbox = bbox self.center = ((bbox[0] + bbox[2]) / 2, (bbox[1] + bbox[3]) / 2) current_time = time.time() self.last_seen = current_time self.confidence = confidence self.total_frames += 1 # Track ID change detection if new_track_id is not None and new_track_id != self.track_id: self.track_id_changes += 1 self.last_track_id_change = current_time logger.debug(f"Track ID changed from {self.track_id} to {new_track_id} for same vehicle") self.track_id = new_track_id # Set original track ID if not set if self.original_track_id is None: self.original_track_id = self.track_id # Update confidence average self.avg_confidence = ((self.avg_confidence * (self.total_frames - 1)) + confidence) / self.total_frames # Maintain position history (last 15 positions for better stability analysis) self.last_position_history.append(self.center) if len(self.last_position_history) > 15: self.last_position_history.pop(0) # Update position-based stability self._update_position_stability() def _update_position_stability(self): """Update position-based stability score independent of track ID.""" if len(self.last_position_history) < 5: self.position_stability_score = 0.0 return positions = np.array(self.last_position_history) # Calculate position variance (lower = more stable) std_x = np.std(positions[:, 0]) std_y = np.std(positions[:, 1]) # Calculate movement velocity if len(positions) >= 3: recent_movement = np.mean([ np.sqrt((positions[i][0] - positions[i-1][0])**2 + (positions[i][1] - positions[i-1][1])**2) for i in range(-3, 0) ]) else: recent_movement = 0 # Position-based stability (0-1 where 1 = perfectly stable) max_reasonable_std = 150 # For HD resolution variance_score = max(0, 1 - (std_x + std_y) / max_reasonable_std) velocity_score = max(0, 1 - recent_movement / 20) # 20 pixels max reasonable movement self.position_stability_score = (variance_score * 0.7 + velocity_score * 0.3) # Update continuous stable duration if self.position_stability_score > 0.7: if self.continuous_stable_duration == 0: # Start tracking stable duration self.continuous_stable_duration = 0.1 # Small initial value else: # Continue tracking self.continuous_stable_duration = time.time() - self.first_seen else: # Reset if not stable self.continuous_stable_duration = 0.0 def calculate_stability(self) -> float: """Calculate stability score based on position history.""" return self.position_stability_score def calculate_hybrid_stability(self) -> Tuple[float, str]: """ Calculate hybrid stability considering both track ID continuity and position stability. Returns: Tuple of (stability_score, reasoning) """ if len(self.last_position_history) < 5: return 0.0, "Insufficient position history" position_stable = self.position_stability_score > 0.7 has_stable_duration = self.continuous_stable_duration > 2.0 # 2+ seconds stable recent_track_change = (self.last_track_id_change is not None and (time.time() - self.last_track_id_change) < 1.0) # Base stability from position base_score = self.position_stability_score # Penalties and bonuses if self.track_id_changes > 3: # Too many track ID changes - likely tracking issues base_score *= 0.8 reason = f"Multiple track ID changes ({self.track_id_changes})" elif recent_track_change: # Recent track change - be cautious base_score *= 0.9 reason = "Recent track ID change" else: reason = "Position-based stability" # Bonus for long continuous stability regardless of track ID changes if has_stable_duration: base_score = min(1.0, base_score + 0.1) reason += f" + {self.continuous_stable_duration:.1f}s continuous" return base_score, reason def is_expired(self, timeout_seconds: float = 2.0) -> bool: """Check if vehicle tracking has expired.""" return (time.time() - self.last_seen) > timeout_seconds class VehicleTracker: """ Main vehicle tracking implementation using YOLO tracking capabilities. Manages continuous tracking, vehicle identification, and state persistence. """ def __init__(self, tracking_config: Optional[Dict] = None): """ Initialize the vehicle tracker. Args: tracking_config: Configuration from pipeline.json tracking section """ self.config = tracking_config or {} self.trigger_classes = self.config.get('trigger_classes', self.config.get('triggerClasses', ['frontal'])) self.min_confidence = self.config.get('minConfidence', 0.6) # Tracking state self.tracked_vehicles: Dict[int, TrackedVehicle] = {} self.position_registry: Dict[str, TrackedVehicle] = {} # Position-based vehicle registry self.next_track_id = 1 self.lock = Lock() # Tracking parameters self.stability_threshold = 0.65 # Lowered for gas station scenarios self.min_stable_frames = 8 # Increased for 4fps processing self.position_tolerance = 80 # pixels - increased for gas station scenarios self.timeout_seconds = 8.0 # Increased for gas station scenarios logger.info(f"VehicleTracker initialized with trigger_classes={self.trigger_classes}, " f"min_confidence={self.min_confidence}") def process_detections(self, results: Any, display_id: str, frame: np.ndarray) -> List[TrackedVehicle]: """ Process YOLO detection results and update tracking state. Args: results: YOLO detection results with tracking display_id: Display identifier for this stream frame: Current frame being processed Returns: List of currently tracked vehicles """ current_time = time.time() active_tracks = [] with self.lock: # Clean up expired tracks expired_ids = [ track_id for track_id, vehicle in self.tracked_vehicles.items() if vehicle.is_expired(self.timeout_seconds) ] for track_id in expired_ids: vehicle = self.tracked_vehicles[track_id] # Remove from position registry too position_key = self._get_position_key(vehicle.center) if position_key in self.position_registry and self.position_registry[position_key] == vehicle: del self.position_registry[position_key] logger.debug(f"Removing expired track {track_id}") del self.tracked_vehicles[track_id] # Process new detections from InferenceResult if hasattr(results, 'detections') and results.detections: # Process detections from InferenceResult for detection in results.detections: # Skip if confidence is too low if detection.confidence < self.min_confidence: continue # Check if class is in trigger classes if detection.class_name not in self.trigger_classes: continue # Get bounding box and center from Detection object x1, y1, x2, y2 = detection.bbox bbox = (int(x1), int(y1), int(x2), int(y2)) center = ((x1 + x2) / 2, (y1 + y2) / 2) confidence = detection.confidence # Hybrid approach: Try position-based association first, then track ID track_id = detection.track_id existing_vehicle = None position_key = self._get_position_key(center) # 1. Check position registry first (same physical location) if position_key in self.position_registry: existing_vehicle = self.position_registry[position_key] if track_id is not None and track_id != existing_vehicle.track_id: # Track ID changed for same position - update vehicle existing_vehicle.update_position(bbox, confidence, track_id) logger.debug(f"Track ID changed {existing_vehicle.track_id}->{track_id} at same position") # Update tracking dict if existing_vehicle.track_id in self.tracked_vehicles: del self.tracked_vehicles[existing_vehicle.track_id] self.tracked_vehicles[track_id] = existing_vehicle else: # Same position, same/no track ID existing_vehicle.update_position(bbox, confidence) track_id = existing_vehicle.track_id # 2. If no position match, try track ID approach elif track_id is not None and track_id in self.tracked_vehicles: # Existing track ID, check if position moved significantly existing_vehicle = self.tracked_vehicles[track_id] old_position_key = self._get_position_key(existing_vehicle.center) # If position moved significantly, update position registry if old_position_key != position_key: if old_position_key in self.position_registry: del self.position_registry[old_position_key] self.position_registry[position_key] = existing_vehicle existing_vehicle.update_position(bbox, confidence) # 3. Try closest track association (fallback) elif track_id is None: closest_track = self._find_closest_track(center) if closest_track: existing_vehicle = closest_track track_id = closest_track.track_id existing_vehicle.update_position(bbox, confidence) # Update position registry self.position_registry[position_key] = existing_vehicle logger.debug(f"Associated detection with existing track {track_id} based on proximity") # 4. Create new vehicle if no associations found if existing_vehicle is None: track_id = track_id if track_id is not None else self.next_track_id if track_id == self.next_track_id: self.next_track_id += 1 existing_vehicle = TrackedVehicle( track_id=track_id, first_seen=current_time, last_seen=current_time, display_id=display_id, confidence=confidence, bbox=bbox, center=center, total_frames=1, original_track_id=track_id ) existing_vehicle.last_position_history.append(center) self.tracked_vehicles[track_id] = existing_vehicle self.position_registry[position_key] = existing_vehicle logger.info(f"New vehicle tracked: ID={track_id}, display={display_id}") # Check stability using hybrid approach stability_score, reason = existing_vehicle.calculate_hybrid_stability() if stability_score > self.stability_threshold: existing_vehicle.stable_frames += 1 if existing_vehicle.stable_frames >= self.min_stable_frames: existing_vehicle.is_stable = True else: existing_vehicle.stable_frames = max(0, existing_vehicle.stable_frames - 1) if existing_vehicle.stable_frames < self.min_stable_frames: existing_vehicle.is_stable = False logger.debug(f"Updated track {track_id}: conf={confidence:.2f}, " f"stable={existing_vehicle.is_stable}, hybrid_stability={stability_score:.2f} ({reason})") active_tracks.append(existing_vehicle) return active_tracks def _get_position_key(self, center: Tuple[float, float]) -> str: """ Generate a position-based key for vehicle registry. Groups nearby positions into the same key for association. Args: center: Center position (x, y) Returns: Position key string """ # Grid-based quantization - 60 pixel grid for gas station scenarios grid_size = 60 grid_x = int(center[0] // grid_size) grid_y = int(center[1] // grid_size) return f"{grid_x}_{grid_y}" def _find_closest_track(self, center: Tuple[float, float]) -> Optional[TrackedVehicle]: """ Find the closest existing track to a given position. Args: center: Center position to match Returns: Closest tracked vehicle if within tolerance, None otherwise """ min_distance = float('inf') closest_track = None for vehicle in self.tracked_vehicles.values(): if vehicle.is_expired(1.0): # Allow slightly older tracks for matching continue distance = np.sqrt( (center[0] - vehicle.center[0]) ** 2 + (center[1] - vehicle.center[1]) ** 2 ) if distance < min_distance and distance < self.position_tolerance: min_distance = distance closest_track = vehicle return closest_track def get_stable_vehicles(self, display_id: Optional[str] = None) -> List[TrackedVehicle]: """ Get all stable vehicles, optionally filtered by display. Args: display_id: Optional display ID to filter by Returns: List of stable tracked vehicles """ with self.lock: stable = [ v for v in self.tracked_vehicles.values() if v.is_stable and not v.is_expired(self.timeout_seconds) and (display_id is None or v.display_id == display_id) ] return stable def get_vehicle_by_session(self, session_id: str) -> Optional[TrackedVehicle]: """ Get a tracked vehicle by its session ID. Args: session_id: Session ID to look up Returns: Tracked vehicle if found, None otherwise """ with self.lock: for vehicle in self.tracked_vehicles.values(): if vehicle.session_id == session_id: return vehicle return None def mark_processed(self, track_id: int, session_id: str): """ Mark a vehicle as processed through the pipeline. Args: track_id: Track ID of the vehicle session_id: Session ID assigned to this vehicle """ with self.lock: if track_id in self.tracked_vehicles: vehicle = self.tracked_vehicles[track_id] vehicle.processed_pipeline = True vehicle.session_id = session_id logger.info(f"Marked vehicle {track_id} as processed with session {session_id}") def clear_session(self, session_id: str): """ Clear session ID from a tracked vehicle (post-fueling). Args: session_id: Session ID to clear """ with self.lock: for vehicle in self.tracked_vehicles.values(): if vehicle.session_id == session_id: logger.info(f"Clearing session {session_id} from vehicle {vehicle.track_id}") vehicle.session_id = None # Keep processed_pipeline=True to prevent re-processing def reset_tracking(self): """Reset all tracking state.""" with self.lock: self.tracked_vehicles.clear() self.position_registry.clear() self.next_track_id = 1 logger.info("Vehicle tracking state reset") def get_statistics(self) -> Dict: """Get tracking statistics.""" with self.lock: total = len(self.tracked_vehicles) stable = sum(1 for v in self.tracked_vehicles.values() if v.is_stable) processed = sum(1 for v in self.tracked_vehicles.values() if v.processed_pipeline) return { 'total_tracked': total, 'stable_vehicles': stable, 'processed_vehicles': processed, 'avg_confidence': np.mean([v.avg_confidence for v in self.tracked_vehicles.values()]) if self.tracked_vehicles else 0.0 }