""" Vehicle Validation Module - Stable car detection and validation logic. Differentiates between stable (fueling) cars and passing-by vehicles. """ import logging import time import numpy as np from typing import List, Optional, Tuple, Dict, Any from dataclasses import dataclass from enum import Enum from .tracker import TrackedVehicle logger = logging.getLogger(__name__) class VehicleState(Enum): """Vehicle state classification.""" UNKNOWN = "unknown" ENTERING = "entering" STABLE = "stable" LEAVING = "leaving" PASSING_BY = "passing_by" @dataclass class ValidationResult: """Result of vehicle validation.""" is_valid: bool state: VehicleState confidence: float reason: str should_process: bool = False track_id: Optional[int] = None class StableCarValidator: """ Validates whether a tracked vehicle is stable (fueling) or just passing by. Uses multiple criteria including position stability, duration, and movement patterns. """ def __init__(self, config: Optional[Dict] = None): """ Initialize the validator with configuration. Args: config: Optional configuration dictionary """ self.config = config or {} # Validation thresholds self.min_stable_duration = self.config.get('min_stable_duration', 3.0) # seconds self.min_stable_frames = self.config.get('min_stable_frames', 10) self.position_variance_threshold = self.config.get('position_variance_threshold', 25.0) # pixels self.min_confidence = self.config.get('min_confidence', 0.7) self.velocity_threshold = self.config.get('velocity_threshold', 5.0) # pixels/frame self.entering_zone_ratio = self.config.get('entering_zone_ratio', 0.3) # 30% of frame self.leaving_zone_ratio = self.config.get('leaving_zone_ratio', 0.3) # Frame dimensions (will be updated on first frame) self.frame_width = 1920 self.frame_height = 1080 # History for validation self.validation_history: Dict[int, List[VehicleState]] = {} self.last_processed_vehicles: Dict[int, float] = {} # track_id -> last_process_time logger.info(f"StableCarValidator initialized with min_duration={self.min_stable_duration}s, " f"min_frames={self.min_stable_frames}, position_variance={self.position_variance_threshold}") def update_frame_dimensions(self, width: int, height: int): """Update frame dimensions for zone calculations.""" self.frame_width = width self.frame_height = height # Commented out verbose frame dimension logging # logger.debug(f"Updated frame dimensions: {width}x{height}") def validate_vehicle(self, vehicle: TrackedVehicle, frame_shape: Optional[Tuple] = None) -> ValidationResult: """ Validate whether a tracked vehicle is stable and should be processed. Args: vehicle: The tracked vehicle to validate frame_shape: Optional frame shape (height, width, channels) Returns: ValidationResult with validation status and reasoning """ # Update frame dimensions if provided if frame_shape: self.update_frame_dimensions(frame_shape[1], frame_shape[0]) # Initialize validation history for new vehicles if vehicle.track_id not in self.validation_history: self.validation_history[vehicle.track_id] = [] # Check if already processed if vehicle.processed_pipeline: return ValidationResult( is_valid=False, state=VehicleState.STABLE, confidence=1.0, reason="Already processed through pipeline", should_process=False, track_id=vehicle.track_id ) # Check if recently processed (cooldown period) if vehicle.track_id in self.last_processed_vehicles: time_since_process = time.time() - self.last_processed_vehicles[vehicle.track_id] if time_since_process < 10.0: # 10 second cooldown return ValidationResult( is_valid=False, state=VehicleState.STABLE, confidence=1.0, reason=f"Recently processed ({time_since_process:.1f}s ago)", should_process=False, track_id=vehicle.track_id ) # Determine vehicle state state = self._determine_vehicle_state(vehicle) # Update history self.validation_history[vehicle.track_id].append(state) if len(self.validation_history[vehicle.track_id]) > 20: self.validation_history[vehicle.track_id].pop(0) # Validate based on state if state == VehicleState.STABLE: return self._validate_stable_vehicle(vehicle) elif state == VehicleState.PASSING_BY: return ValidationResult( is_valid=False, state=state, confidence=0.8, reason="Vehicle is passing by", should_process=False, track_id=vehicle.track_id ) elif state == VehicleState.ENTERING: return ValidationResult( is_valid=False, state=state, confidence=0.5, reason="Vehicle is entering, waiting for stability", should_process=False, track_id=vehicle.track_id ) elif state == VehicleState.LEAVING: return ValidationResult( is_valid=False, state=state, confidence=0.5, reason="Vehicle is leaving", should_process=False, track_id=vehicle.track_id ) else: return ValidationResult( is_valid=False, state=state, confidence=0.0, reason="Unknown vehicle state", should_process=False, track_id=vehicle.track_id ) def _determine_vehicle_state(self, vehicle: TrackedVehicle) -> VehicleState: """ Determine the current state of the vehicle based on movement patterns. Args: vehicle: The tracked vehicle Returns: Current vehicle state """ # Not enough data if len(vehicle.last_position_history) < 3: return VehicleState.UNKNOWN # Calculate velocity velocity = self._calculate_velocity(vehicle) # Get position zones x_position = vehicle.center[0] / self.frame_width y_position = vehicle.center[1] / self.frame_height # Check if vehicle is stable stability = vehicle.calculate_stability() if stability > 0.7 and velocity < self.velocity_threshold: # Check if it's been stable long enough duration = time.time() - vehicle.first_seen if duration > self.min_stable_duration and vehicle.stable_frames >= self.min_stable_frames: return VehicleState.STABLE else: return VehicleState.ENTERING # Check if vehicle is entering or leaving if velocity > self.velocity_threshold: # Determine direction based on position history positions = np.array(vehicle.last_position_history) if len(positions) >= 2: direction = positions[-1] - positions[0] # Entering: moving towards center if x_position < self.entering_zone_ratio or x_position > (1 - self.entering_zone_ratio): if abs(direction[0]) > abs(direction[1]): # Horizontal movement if (x_position < 0.5 and direction[0] > 0) or (x_position > 0.5 and direction[0] < 0): return VehicleState.ENTERING # Leaving: moving away from center if 0.3 < x_position < 0.7: # In center zone if abs(direction[0]) > abs(direction[1]): # Horizontal movement if abs(direction[0]) > 10: # Significant movement return VehicleState.LEAVING return VehicleState.PASSING_BY return VehicleState.UNKNOWN def _validate_stable_vehicle(self, vehicle: TrackedVehicle) -> ValidationResult: """ Perform detailed validation of a stable vehicle. Args: vehicle: The stable vehicle to validate Returns: Detailed validation result """ # Check duration duration = time.time() - vehicle.first_seen if duration < self.min_stable_duration: return ValidationResult( is_valid=False, state=VehicleState.STABLE, confidence=0.6, reason=f"Not stable long enough ({duration:.1f}s < {self.min_stable_duration}s)", should_process=False, track_id=vehicle.track_id ) # Check frame count if vehicle.stable_frames < self.min_stable_frames: return ValidationResult( is_valid=False, state=VehicleState.STABLE, confidence=0.6, reason=f"Not enough stable frames ({vehicle.stable_frames} < {self.min_stable_frames})", should_process=False, track_id=vehicle.track_id ) # Check confidence if vehicle.avg_confidence < self.min_confidence: return ValidationResult( is_valid=False, state=VehicleState.STABLE, confidence=vehicle.avg_confidence, reason=f"Confidence too low ({vehicle.avg_confidence:.2f} < {self.min_confidence})", should_process=False, track_id=vehicle.track_id ) # Check position variance variance = self._calculate_position_variance(vehicle) if variance > self.position_variance_threshold: return ValidationResult( is_valid=False, state=VehicleState.STABLE, confidence=0.7, reason=f"Position variance too high ({variance:.1f} > {self.position_variance_threshold})", should_process=False, track_id=vehicle.track_id ) # Check state history consistency if vehicle.track_id in self.validation_history: history = self.validation_history[vehicle.track_id][-5:] # Last 5 states stable_count = sum(1 for s in history if s == VehicleState.STABLE) if stable_count < 3: return ValidationResult( is_valid=False, state=VehicleState.STABLE, confidence=0.7, reason="Inconsistent state history", should_process=False, track_id=vehicle.track_id ) # All checks passed - vehicle is valid for processing self.last_processed_vehicles[vehicle.track_id] = time.time() return ValidationResult( is_valid=True, state=VehicleState.STABLE, confidence=vehicle.avg_confidence, reason="Vehicle is stable and ready for processing", should_process=True, track_id=vehicle.track_id ) def _calculate_velocity(self, vehicle: TrackedVehicle) -> float: """ Calculate the velocity of the vehicle based on position history. Args: vehicle: The tracked vehicle Returns: Velocity in pixels per frame """ if len(vehicle.last_position_history) < 2: return 0.0 positions = np.array(vehicle.last_position_history) if len(positions) < 2: return 0.0 # Calculate velocity over last 3 frames recent_positions = positions[-min(3, len(positions)):] velocities = [] for i in range(1, len(recent_positions)): dx = recent_positions[i][0] - recent_positions[i-1][0] dy = recent_positions[i][1] - recent_positions[i-1][1] velocity = np.sqrt(dx**2 + dy**2) velocities.append(velocity) return np.mean(velocities) if velocities else 0.0 def _calculate_position_variance(self, vehicle: TrackedVehicle) -> float: """ Calculate the position variance of the vehicle. Args: vehicle: The tracked vehicle Returns: Position variance in pixels """ if len(vehicle.last_position_history) < 2: return 0.0 positions = np.array(vehicle.last_position_history) variance_x = np.var(positions[:, 0]) variance_y = np.var(positions[:, 1]) return np.sqrt(variance_x + variance_y) def should_skip_same_car(self, vehicle: TrackedVehicle, session_cleared: bool = False, permanently_processed: Dict[int, float] = None) -> bool: """ Determine if we should skip processing for the same car after session clear. Args: vehicle: The tracked vehicle session_cleared: Whether the session was recently cleared permanently_processed: Dict of permanently processed vehicles Returns: True if we should skip this vehicle """ # Check if this vehicle was permanently processed (never process again) if permanently_processed and vehicle.track_id in permanently_processed: process_time = permanently_processed[vehicle.track_id] time_since = time.time() - process_time logger.debug(f"Skipping permanently processed vehicle {vehicle.track_id} " f"(processed {time_since:.1f}s ago)") return True # If vehicle has a session_id but it was cleared, skip for a period if vehicle.session_id is None and vehicle.processed_pipeline and session_cleared: # Check if enough time has passed since processing if vehicle.track_id in self.last_processed_vehicles: time_since = time.time() - self.last_processed_vehicles[vehicle.track_id] if time_since < 30.0: # 30 second cooldown after session clear logger.debug(f"Skipping same car {vehicle.track_id} after session clear " f"({time_since:.1f}s since processing)") return True return False def reset_vehicle(self, track_id: int): """ Reset validation state for a specific vehicle. Args: track_id: Track ID of the vehicle to reset """ if track_id in self.validation_history: del self.validation_history[track_id] if track_id in self.last_processed_vehicles: del self.last_processed_vehicles[track_id] logger.debug(f"Reset validation state for vehicle {track_id}") def get_statistics(self) -> Dict: """Get validation statistics.""" return { 'vehicles_in_history': len(self.validation_history), 'recently_processed': len(self.last_processed_vehicles), 'state_distribution': self._get_state_distribution() } def _get_state_distribution(self) -> Dict[str, int]: """Get distribution of current vehicle states.""" distribution = {state.value: 0 for state in VehicleState} for history in self.validation_history.values(): if history: current_state = history[-1] distribution[current_state.value] += 1 return distribution