All checks were successful
		
		
	
	Build Worker Base and Application Images / check-base-changes (push) Successful in 7s
				
			Build Worker Base and Application Images / build-base (push) Has been skipped
				
			Build Worker Base and Application Images / build-docker (push) Successful in 3m39s
				
			Build Worker Base and Application Images / deploy-stack (push) Successful in 16s
				
			
		
			
				
	
	
		
			407 lines
		
	
	
		
			No EOL
		
	
	
		
			16 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			407 lines
		
	
	
		
			No EOL
		
	
	
		
			16 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
"""
 | 
						|
Vehicle Validation Module - Stable car detection and validation logic.
 | 
						|
Differentiates between stable (fueling) cars and passing-by vehicles.
 | 
						|
"""
 | 
						|
import logging
 | 
						|
import time
 | 
						|
import numpy as np
 | 
						|
from typing import List, Optional, Tuple, Dict, Any
 | 
						|
from dataclasses import dataclass
 | 
						|
from enum import Enum
 | 
						|
 | 
						|
from .tracker import TrackedVehicle
 | 
						|
 | 
						|
logger = logging.getLogger(__name__)
 | 
						|
 | 
						|
 | 
						|
class VehicleState(Enum):
 | 
						|
    """Vehicle state classification."""
 | 
						|
    UNKNOWN = "unknown"
 | 
						|
    ENTERING = "entering"
 | 
						|
    STABLE = "stable"
 | 
						|
    LEAVING = "leaving"
 | 
						|
    PASSING_BY = "passing_by"
 | 
						|
 | 
						|
 | 
						|
@dataclass
 | 
						|
class ValidationResult:
 | 
						|
    """Result of vehicle validation."""
 | 
						|
    is_valid: bool
 | 
						|
    state: VehicleState
 | 
						|
    confidence: float
 | 
						|
    reason: str
 | 
						|
    should_process: bool = False
 | 
						|
    track_id: Optional[int] = None
 | 
						|
 | 
						|
 | 
						|
class StableCarValidator:
 | 
						|
    """
 | 
						|
    Validates whether a tracked vehicle should be processed through the pipeline.
 | 
						|
 | 
						|
    Updated for BoT-SORT integration: Trusts the sophisticated BoT-SORT tracking algorithm
 | 
						|
    for stability determination and focuses on business logic validation:
 | 
						|
    - Duration requirements for processing
 | 
						|
    - Confidence thresholds
 | 
						|
    - Session management and cooldowns
 | 
						|
    - Camera isolation with composite keys
 | 
						|
    """
 | 
						|
 | 
						|
    def __init__(self, config: Optional[Dict] = None):
 | 
						|
        """
 | 
						|
        Initialize the validator with configuration.
 | 
						|
 | 
						|
        Args:
 | 
						|
            config: Optional configuration dictionary
 | 
						|
        """
 | 
						|
        self.config = config or {}
 | 
						|
 | 
						|
        # Validation thresholds
 | 
						|
        # Optimized for 6 FPS RTSP source with 8 concurrent cameras on GPU
 | 
						|
        # GPU contention reduces effective FPS to ~3-5 per camera
 | 
						|
        # Reduced from 3.0s to 1.5s to achieve ~2.75s total validation time (was ~4.25s)
 | 
						|
        self.min_stable_duration = self.config.get('min_stable_duration', 1.5)  # seconds
 | 
						|
        # Reduced from 10 to 5 to align with tracker requirement and reduce validation time
 | 
						|
        self.min_stable_frames = self.config.get('min_stable_frames', 5)
 | 
						|
        self.position_variance_threshold = self.config.get('position_variance_threshold', 25.0)  # pixels
 | 
						|
        # Reduced from 0.7 to 0.45 to be more permissive under GPU load
 | 
						|
        self.min_confidence = self.config.get('min_confidence', 0.45)
 | 
						|
        self.velocity_threshold = self.config.get('velocity_threshold', 5.0)  # pixels/frame
 | 
						|
        self.entering_zone_ratio = self.config.get('entering_zone_ratio', 0.3)  # 30% of frame
 | 
						|
        self.leaving_zone_ratio = self.config.get('leaving_zone_ratio', 0.3)
 | 
						|
 | 
						|
        # Frame dimensions (will be updated on first frame)
 | 
						|
        self.frame_width = 1920
 | 
						|
        self.frame_height = 1080
 | 
						|
 | 
						|
        # History for validation
 | 
						|
        self.validation_history: Dict[int, List[VehicleState]] = {}
 | 
						|
        self.last_processed_vehicles: Dict[int, float] = {}  # track_id -> last_process_time
 | 
						|
 | 
						|
        logger.info(f"StableCarValidator initialized with min_duration={self.min_stable_duration}s, "
 | 
						|
                   f"min_frames={self.min_stable_frames}, position_variance={self.position_variance_threshold}")
 | 
						|
 | 
						|
    def update_frame_dimensions(self, width: int, height: int):
 | 
						|
        """Update frame dimensions for zone calculations."""
 | 
						|
        self.frame_width = width
 | 
						|
        self.frame_height = height
 | 
						|
        # Commented out verbose frame dimension logging
 | 
						|
        # logger.debug(f"Updated frame dimensions: {width}x{height}")
 | 
						|
 | 
						|
    def validate_vehicle(self, vehicle: TrackedVehicle, frame_shape: Optional[Tuple] = None) -> ValidationResult:
 | 
						|
        """
 | 
						|
        Validate whether a tracked vehicle is stable and should be processed.
 | 
						|
 | 
						|
        Args:
 | 
						|
            vehicle: The tracked vehicle to validate
 | 
						|
            frame_shape: Optional frame shape (height, width, channels)
 | 
						|
 | 
						|
        Returns:
 | 
						|
            ValidationResult with validation status and reasoning
 | 
						|
        """
 | 
						|
        # Update frame dimensions if provided
 | 
						|
        if frame_shape:
 | 
						|
            self.update_frame_dimensions(frame_shape[1], frame_shape[0])
 | 
						|
 | 
						|
        # Initialize validation history for new vehicles
 | 
						|
        if vehicle.track_id not in self.validation_history:
 | 
						|
            self.validation_history[vehicle.track_id] = []
 | 
						|
 | 
						|
        # Check if already processed
 | 
						|
        if vehicle.processed_pipeline:
 | 
						|
            return ValidationResult(
 | 
						|
                is_valid=False,
 | 
						|
                state=VehicleState.STABLE,
 | 
						|
                confidence=1.0,
 | 
						|
                reason="Already processed through pipeline",
 | 
						|
                should_process=False,
 | 
						|
                track_id=vehicle.track_id
 | 
						|
            )
 | 
						|
 | 
						|
        # Check if recently processed (cooldown period)
 | 
						|
        if vehicle.track_id in self.last_processed_vehicles:
 | 
						|
            time_since_process = time.time() - self.last_processed_vehicles[vehicle.track_id]
 | 
						|
            if time_since_process < 10.0:  # 10 second cooldown
 | 
						|
                return ValidationResult(
 | 
						|
                    is_valid=False,
 | 
						|
                    state=VehicleState.STABLE,
 | 
						|
                    confidence=1.0,
 | 
						|
                    reason=f"Recently processed ({time_since_process:.1f}s ago)",
 | 
						|
                    should_process=False,
 | 
						|
                    track_id=vehicle.track_id
 | 
						|
                )
 | 
						|
 | 
						|
        # Determine vehicle state
 | 
						|
        state = self._determine_vehicle_state(vehicle)
 | 
						|
 | 
						|
        # Update history
 | 
						|
        self.validation_history[vehicle.track_id].append(state)
 | 
						|
        if len(self.validation_history[vehicle.track_id]) > 20:
 | 
						|
            self.validation_history[vehicle.track_id].pop(0)
 | 
						|
 | 
						|
        # Validate based on state
 | 
						|
        if state == VehicleState.STABLE:
 | 
						|
            return self._validate_stable_vehicle(vehicle)
 | 
						|
        elif state == VehicleState.PASSING_BY:
 | 
						|
            return ValidationResult(
 | 
						|
                is_valid=False,
 | 
						|
                state=state,
 | 
						|
                confidence=0.8,
 | 
						|
                reason="Vehicle is passing by",
 | 
						|
                should_process=False,
 | 
						|
                track_id=vehicle.track_id
 | 
						|
            )
 | 
						|
        elif state == VehicleState.ENTERING:
 | 
						|
            return ValidationResult(
 | 
						|
                is_valid=False,
 | 
						|
                state=state,
 | 
						|
                confidence=0.5,
 | 
						|
                reason="Vehicle is entering, waiting for stability",
 | 
						|
                should_process=False,
 | 
						|
                track_id=vehicle.track_id
 | 
						|
            )
 | 
						|
        elif state == VehicleState.LEAVING:
 | 
						|
            return ValidationResult(
 | 
						|
                is_valid=False,
 | 
						|
                state=state,
 | 
						|
                confidence=0.5,
 | 
						|
                reason="Vehicle is leaving",
 | 
						|
                should_process=False,
 | 
						|
                track_id=vehicle.track_id
 | 
						|
            )
 | 
						|
        else:
 | 
						|
            return ValidationResult(
 | 
						|
                is_valid=False,
 | 
						|
                state=state,
 | 
						|
                confidence=0.0,
 | 
						|
                reason="Unknown vehicle state",
 | 
						|
                should_process=False,
 | 
						|
                track_id=vehicle.track_id
 | 
						|
            )
 | 
						|
 | 
						|
    def _determine_vehicle_state(self, vehicle: TrackedVehicle) -> VehicleState:
 | 
						|
        """
 | 
						|
        Determine the current state of the vehicle based on BoT-SORT tracking results.
 | 
						|
 | 
						|
        BoT-SORT provides sophisticated tracking, so we trust its stability determination
 | 
						|
        and focus on business logic validation.
 | 
						|
 | 
						|
        Args:
 | 
						|
            vehicle: The tracked vehicle
 | 
						|
 | 
						|
        Returns:
 | 
						|
            Current vehicle state
 | 
						|
        """
 | 
						|
        # Trust BoT-SORT's stability determination
 | 
						|
        if vehicle.is_stable:
 | 
						|
            # Check if it's been stable long enough for processing
 | 
						|
            duration = time.time() - vehicle.first_seen
 | 
						|
            if duration >= self.min_stable_duration:
 | 
						|
                return VehicleState.STABLE
 | 
						|
            else:
 | 
						|
                return VehicleState.ENTERING
 | 
						|
 | 
						|
        # For non-stable vehicles, use simplified state determination
 | 
						|
        if len(vehicle.last_position_history) < 2:
 | 
						|
            return VehicleState.UNKNOWN
 | 
						|
 | 
						|
        # Calculate velocity for movement classification
 | 
						|
        velocity = self._calculate_velocity(vehicle)
 | 
						|
 | 
						|
        # Basic movement classification
 | 
						|
        if velocity > self.velocity_threshold:
 | 
						|
            # Vehicle is moving - classify as passing by or entering/leaving
 | 
						|
            x_position = vehicle.center[0] / self.frame_width
 | 
						|
 | 
						|
            # Simple heuristic: vehicles near edges are entering/leaving, center vehicles are passing
 | 
						|
            if x_position < 0.2 or x_position > 0.8:
 | 
						|
                return VehicleState.ENTERING
 | 
						|
            else:
 | 
						|
                return VehicleState.PASSING_BY
 | 
						|
 | 
						|
        # Low velocity but not marked stable by tracker - likely entering
 | 
						|
        return VehicleState.ENTERING
 | 
						|
 | 
						|
    def _validate_stable_vehicle(self, vehicle: TrackedVehicle) -> ValidationResult:
 | 
						|
        """
 | 
						|
        Perform business logic validation of a stable vehicle.
 | 
						|
 | 
						|
        Since BoT-SORT already determined the vehicle is stable, we focus on:
 | 
						|
        - Duration requirements for processing
 | 
						|
        - Confidence thresholds
 | 
						|
        - Business logic constraints
 | 
						|
 | 
						|
        Args:
 | 
						|
            vehicle: The stable vehicle to validate
 | 
						|
 | 
						|
        Returns:
 | 
						|
            Detailed validation result
 | 
						|
        """
 | 
						|
        # Check duration (business requirement)
 | 
						|
        duration = time.time() - vehicle.first_seen
 | 
						|
        if duration < self.min_stable_duration:
 | 
						|
            return ValidationResult(
 | 
						|
                is_valid=False,
 | 
						|
                state=VehicleState.STABLE,
 | 
						|
                confidence=0.6,
 | 
						|
                reason=f"Not stable long enough ({duration:.1f}s < {self.min_stable_duration}s)",
 | 
						|
                should_process=False,
 | 
						|
                track_id=vehicle.track_id
 | 
						|
            )
 | 
						|
 | 
						|
        # Check confidence (business requirement)
 | 
						|
        if vehicle.avg_confidence < self.min_confidence:
 | 
						|
            return ValidationResult(
 | 
						|
                is_valid=False,
 | 
						|
                state=VehicleState.STABLE,
 | 
						|
                confidence=vehicle.avg_confidence,
 | 
						|
                reason=f"Confidence too low ({vehicle.avg_confidence:.2f} < {self.min_confidence})",
 | 
						|
                should_process=False,
 | 
						|
                track_id=vehicle.track_id
 | 
						|
            )
 | 
						|
 | 
						|
        # Trust BoT-SORT's stability determination - skip position variance check
 | 
						|
        # BoT-SORT's sophisticated tracking already ensures consistent positioning
 | 
						|
 | 
						|
        # Simplified state history check - just ensure recent stability
 | 
						|
        if vehicle.track_id in self.validation_history:
 | 
						|
            history = self.validation_history[vehicle.track_id][-3:]  # Last 3 states
 | 
						|
            stable_count = sum(1 for s in history if s == VehicleState.STABLE)
 | 
						|
            if len(history) >= 2 and stable_count == 0:  # Only fail if clear instability
 | 
						|
                return ValidationResult(
 | 
						|
                    is_valid=False,
 | 
						|
                    state=VehicleState.STABLE,
 | 
						|
                    confidence=0.7,
 | 
						|
                    reason="Recent state history shows instability",
 | 
						|
                    should_process=False,
 | 
						|
                    track_id=vehicle.track_id
 | 
						|
                )
 | 
						|
 | 
						|
        # All checks passed - vehicle is valid for processing
 | 
						|
        self.last_processed_vehicles[vehicle.track_id] = time.time()
 | 
						|
 | 
						|
        return ValidationResult(
 | 
						|
            is_valid=True,
 | 
						|
            state=VehicleState.STABLE,
 | 
						|
            confidence=vehicle.avg_confidence,
 | 
						|
            reason="Vehicle is stable and ready for processing (BoT-SORT validated)",
 | 
						|
            should_process=True,
 | 
						|
            track_id=vehicle.track_id
 | 
						|
        )
 | 
						|
 | 
						|
    def _calculate_velocity(self, vehicle: TrackedVehicle) -> float:
 | 
						|
        """
 | 
						|
        Calculate the velocity of the vehicle based on position history.
 | 
						|
 | 
						|
        Args:
 | 
						|
            vehicle: The tracked vehicle
 | 
						|
 | 
						|
        Returns:
 | 
						|
            Velocity in pixels per frame
 | 
						|
        """
 | 
						|
        if len(vehicle.last_position_history) < 2:
 | 
						|
            return 0.0
 | 
						|
 | 
						|
        positions = np.array(vehicle.last_position_history)
 | 
						|
        if len(positions) < 2:
 | 
						|
            return 0.0
 | 
						|
 | 
						|
        # Calculate velocity over last 3 frames
 | 
						|
        recent_positions = positions[-min(3, len(positions)):]
 | 
						|
        velocities = []
 | 
						|
 | 
						|
        for i in range(1, len(recent_positions)):
 | 
						|
            dx = recent_positions[i][0] - recent_positions[i-1][0]
 | 
						|
            dy = recent_positions[i][1] - recent_positions[i-1][1]
 | 
						|
            velocity = np.sqrt(dx**2 + dy**2)
 | 
						|
            velocities.append(velocity)
 | 
						|
 | 
						|
        return np.mean(velocities) if velocities else 0.0
 | 
						|
 | 
						|
    def _calculate_position_variance(self, vehicle: TrackedVehicle) -> float:
 | 
						|
        """
 | 
						|
        Calculate the position variance of the vehicle.
 | 
						|
 | 
						|
        Args:
 | 
						|
            vehicle: The tracked vehicle
 | 
						|
 | 
						|
        Returns:
 | 
						|
            Position variance in pixels
 | 
						|
        """
 | 
						|
        if len(vehicle.last_position_history) < 2:
 | 
						|
            return 0.0
 | 
						|
 | 
						|
        positions = np.array(vehicle.last_position_history)
 | 
						|
        variance_x = np.var(positions[:, 0])
 | 
						|
        variance_y = np.var(positions[:, 1])
 | 
						|
 | 
						|
        return np.sqrt(variance_x + variance_y)
 | 
						|
 | 
						|
    def should_skip_same_car(self,
 | 
						|
                            vehicle: TrackedVehicle,
 | 
						|
                            session_cleared: bool = False,
 | 
						|
                            permanently_processed: Dict[str, float] = None) -> bool:
 | 
						|
        """
 | 
						|
        Determine if we should skip processing for the same car after session clear.
 | 
						|
 | 
						|
        Args:
 | 
						|
            vehicle: The tracked vehicle
 | 
						|
            session_cleared: Whether the session was recently cleared
 | 
						|
            permanently_processed: Dict of permanently processed vehicles (camera_id:track_id -> time)
 | 
						|
 | 
						|
        Returns:
 | 
						|
            True if we should skip this vehicle
 | 
						|
        """
 | 
						|
        # Check if this vehicle was permanently processed (never process again)
 | 
						|
        if permanently_processed:
 | 
						|
            # Create composite key using camera_id and track_id
 | 
						|
            permanent_key = f"{vehicle.camera_id}:{vehicle.track_id}"
 | 
						|
            if permanent_key in permanently_processed:
 | 
						|
                process_time = permanently_processed[permanent_key]
 | 
						|
                time_since = time.time() - process_time
 | 
						|
                logger.debug(f"Skipping permanently processed vehicle {vehicle.track_id} on camera {vehicle.camera_id} "
 | 
						|
                            f"(processed {time_since:.1f}s ago)")
 | 
						|
                return True
 | 
						|
 | 
						|
        # If vehicle has a session_id but it was cleared, skip for a period
 | 
						|
        if vehicle.session_id is None and vehicle.processed_pipeline and session_cleared:
 | 
						|
            # Check if enough time has passed since processing
 | 
						|
            if vehicle.track_id in self.last_processed_vehicles:
 | 
						|
                time_since = time.time() - self.last_processed_vehicles[vehicle.track_id]
 | 
						|
                if time_since < 30.0:  # 30 second cooldown after session clear
 | 
						|
                    logger.debug(f"Skipping same car {vehicle.track_id} after session clear "
 | 
						|
                               f"({time_since:.1f}s since processing)")
 | 
						|
                    return True
 | 
						|
 | 
						|
        return False
 | 
						|
 | 
						|
    def reset_vehicle(self, track_id: int):
 | 
						|
        """
 | 
						|
        Reset validation state for a specific vehicle.
 | 
						|
 | 
						|
        Args:
 | 
						|
            track_id: Track ID of the vehicle to reset
 | 
						|
        """
 | 
						|
        if track_id in self.validation_history:
 | 
						|
            del self.validation_history[track_id]
 | 
						|
        if track_id in self.last_processed_vehicles:
 | 
						|
            del self.last_processed_vehicles[track_id]
 | 
						|
        logger.debug(f"Reset validation state for vehicle {track_id}")
 | 
						|
 | 
						|
    def get_statistics(self) -> Dict:
 | 
						|
        """Get validation statistics."""
 | 
						|
        return {
 | 
						|
            'vehicles_in_history': len(self.validation_history),
 | 
						|
            'recently_processed': len(self.last_processed_vehicles),
 | 
						|
            'state_distribution': self._get_state_distribution()
 | 
						|
        }
 | 
						|
 | 
						|
    def _get_state_distribution(self) -> Dict[str, int]:
 | 
						|
        """Get distribution of current vehicle states."""
 | 
						|
        distribution = {state.value: 0 for state in VehicleState}
 | 
						|
 | 
						|
        for history in self.validation_history.values():
 | 
						|
            if history:
 | 
						|
                current_state = history[-1]
 | 
						|
                distribution[current_state.value] += 1
 | 
						|
 | 
						|
        return distribution |