python-detector-worker/core/tracking/validator.py
2025-09-26 14:50:45 +07:00

402 lines
No EOL
15 KiB
Python

"""
Vehicle Validation Module - Stable car detection and validation logic.
Differentiates between stable (fueling) cars and passing-by vehicles.
"""
import logging
import time
import numpy as np
from typing import List, Optional, Tuple, Dict, Any
from dataclasses import dataclass
from enum import Enum
from .tracker import TrackedVehicle
logger = logging.getLogger(__name__)
class VehicleState(Enum):
"""Vehicle state classification."""
UNKNOWN = "unknown"
ENTERING = "entering"
STABLE = "stable"
LEAVING = "leaving"
PASSING_BY = "passing_by"
@dataclass
class ValidationResult:
"""Result of vehicle validation."""
is_valid: bool
state: VehicleState
confidence: float
reason: str
should_process: bool = False
track_id: Optional[int] = None
class StableCarValidator:
"""
Validates whether a tracked vehicle should be processed through the pipeline.
Updated for BoT-SORT integration: Trusts the sophisticated BoT-SORT tracking algorithm
for stability determination and focuses on business logic validation:
- Duration requirements for processing
- Confidence thresholds
- Session management and cooldowns
- Camera isolation with composite keys
"""
def __init__(self, config: Optional[Dict] = None):
"""
Initialize the validator with configuration.
Args:
config: Optional configuration dictionary
"""
self.config = config or {}
# Validation thresholds
self.min_stable_duration = self.config.get('min_stable_duration', 3.0) # seconds
self.min_stable_frames = self.config.get('min_stable_frames', 10)
self.position_variance_threshold = self.config.get('position_variance_threshold', 25.0) # pixels
self.min_confidence = self.config.get('min_confidence', 0.7)
self.velocity_threshold = self.config.get('velocity_threshold', 5.0) # pixels/frame
self.entering_zone_ratio = self.config.get('entering_zone_ratio', 0.3) # 30% of frame
self.leaving_zone_ratio = self.config.get('leaving_zone_ratio', 0.3)
# Frame dimensions (will be updated on first frame)
self.frame_width = 1920
self.frame_height = 1080
# History for validation
self.validation_history: Dict[int, List[VehicleState]] = {}
self.last_processed_vehicles: Dict[int, float] = {} # track_id -> last_process_time
logger.info(f"StableCarValidator initialized with min_duration={self.min_stable_duration}s, "
f"min_frames={self.min_stable_frames}, position_variance={self.position_variance_threshold}")
def update_frame_dimensions(self, width: int, height: int):
"""Update frame dimensions for zone calculations."""
self.frame_width = width
self.frame_height = height
# Commented out verbose frame dimension logging
# logger.debug(f"Updated frame dimensions: {width}x{height}")
def validate_vehicle(self, vehicle: TrackedVehicle, frame_shape: Optional[Tuple] = None) -> ValidationResult:
"""
Validate whether a tracked vehicle is stable and should be processed.
Args:
vehicle: The tracked vehicle to validate
frame_shape: Optional frame shape (height, width, channels)
Returns:
ValidationResult with validation status and reasoning
"""
# Update frame dimensions if provided
if frame_shape:
self.update_frame_dimensions(frame_shape[1], frame_shape[0])
# Initialize validation history for new vehicles
if vehicle.track_id not in self.validation_history:
self.validation_history[vehicle.track_id] = []
# Check if already processed
if vehicle.processed_pipeline:
return ValidationResult(
is_valid=False,
state=VehicleState.STABLE,
confidence=1.0,
reason="Already processed through pipeline",
should_process=False,
track_id=vehicle.track_id
)
# Check if recently processed (cooldown period)
if vehicle.track_id in self.last_processed_vehicles:
time_since_process = time.time() - self.last_processed_vehicles[vehicle.track_id]
if time_since_process < 10.0: # 10 second cooldown
return ValidationResult(
is_valid=False,
state=VehicleState.STABLE,
confidence=1.0,
reason=f"Recently processed ({time_since_process:.1f}s ago)",
should_process=False,
track_id=vehicle.track_id
)
# Determine vehicle state
state = self._determine_vehicle_state(vehicle)
# Update history
self.validation_history[vehicle.track_id].append(state)
if len(self.validation_history[vehicle.track_id]) > 20:
self.validation_history[vehicle.track_id].pop(0)
# Validate based on state
if state == VehicleState.STABLE:
return self._validate_stable_vehicle(vehicle)
elif state == VehicleState.PASSING_BY:
return ValidationResult(
is_valid=False,
state=state,
confidence=0.8,
reason="Vehicle is passing by",
should_process=False,
track_id=vehicle.track_id
)
elif state == VehicleState.ENTERING:
return ValidationResult(
is_valid=False,
state=state,
confidence=0.5,
reason="Vehicle is entering, waiting for stability",
should_process=False,
track_id=vehicle.track_id
)
elif state == VehicleState.LEAVING:
return ValidationResult(
is_valid=False,
state=state,
confidence=0.5,
reason="Vehicle is leaving",
should_process=False,
track_id=vehicle.track_id
)
else:
return ValidationResult(
is_valid=False,
state=state,
confidence=0.0,
reason="Unknown vehicle state",
should_process=False,
track_id=vehicle.track_id
)
def _determine_vehicle_state(self, vehicle: TrackedVehicle) -> VehicleState:
"""
Determine the current state of the vehicle based on BoT-SORT tracking results.
BoT-SORT provides sophisticated tracking, so we trust its stability determination
and focus on business logic validation.
Args:
vehicle: The tracked vehicle
Returns:
Current vehicle state
"""
# Trust BoT-SORT's stability determination
if vehicle.is_stable:
# Check if it's been stable long enough for processing
duration = time.time() - vehicle.first_seen
if duration >= self.min_stable_duration:
return VehicleState.STABLE
else:
return VehicleState.ENTERING
# For non-stable vehicles, use simplified state determination
if len(vehicle.last_position_history) < 2:
return VehicleState.UNKNOWN
# Calculate velocity for movement classification
velocity = self._calculate_velocity(vehicle)
# Basic movement classification
if velocity > self.velocity_threshold:
# Vehicle is moving - classify as passing by or entering/leaving
x_position = vehicle.center[0] / self.frame_width
# Simple heuristic: vehicles near edges are entering/leaving, center vehicles are passing
if x_position < 0.2 or x_position > 0.8:
return VehicleState.ENTERING
else:
return VehicleState.PASSING_BY
# Low velocity but not marked stable by tracker - likely entering
return VehicleState.ENTERING
def _validate_stable_vehicle(self, vehicle: TrackedVehicle) -> ValidationResult:
"""
Perform business logic validation of a stable vehicle.
Since BoT-SORT already determined the vehicle is stable, we focus on:
- Duration requirements for processing
- Confidence thresholds
- Business logic constraints
Args:
vehicle: The stable vehicle to validate
Returns:
Detailed validation result
"""
# Check duration (business requirement)
duration = time.time() - vehicle.first_seen
if duration < self.min_stable_duration:
return ValidationResult(
is_valid=False,
state=VehicleState.STABLE,
confidence=0.6,
reason=f"Not stable long enough ({duration:.1f}s < {self.min_stable_duration}s)",
should_process=False,
track_id=vehicle.track_id
)
# Check confidence (business requirement)
if vehicle.avg_confidence < self.min_confidence:
return ValidationResult(
is_valid=False,
state=VehicleState.STABLE,
confidence=vehicle.avg_confidence,
reason=f"Confidence too low ({vehicle.avg_confidence:.2f} < {self.min_confidence})",
should_process=False,
track_id=vehicle.track_id
)
# Trust BoT-SORT's stability determination - skip position variance check
# BoT-SORT's sophisticated tracking already ensures consistent positioning
# Simplified state history check - just ensure recent stability
if vehicle.track_id in self.validation_history:
history = self.validation_history[vehicle.track_id][-3:] # Last 3 states
stable_count = sum(1 for s in history if s == VehicleState.STABLE)
if len(history) >= 2 and stable_count == 0: # Only fail if clear instability
return ValidationResult(
is_valid=False,
state=VehicleState.STABLE,
confidence=0.7,
reason="Recent state history shows instability",
should_process=False,
track_id=vehicle.track_id
)
# All checks passed - vehicle is valid for processing
self.last_processed_vehicles[vehicle.track_id] = time.time()
return ValidationResult(
is_valid=True,
state=VehicleState.STABLE,
confidence=vehicle.avg_confidence,
reason="Vehicle is stable and ready for processing (BoT-SORT validated)",
should_process=True,
track_id=vehicle.track_id
)
def _calculate_velocity(self, vehicle: TrackedVehicle) -> float:
"""
Calculate the velocity of the vehicle based on position history.
Args:
vehicle: The tracked vehicle
Returns:
Velocity in pixels per frame
"""
if len(vehicle.last_position_history) < 2:
return 0.0
positions = np.array(vehicle.last_position_history)
if len(positions) < 2:
return 0.0
# Calculate velocity over last 3 frames
recent_positions = positions[-min(3, len(positions)):]
velocities = []
for i in range(1, len(recent_positions)):
dx = recent_positions[i][0] - recent_positions[i-1][0]
dy = recent_positions[i][1] - recent_positions[i-1][1]
velocity = np.sqrt(dx**2 + dy**2)
velocities.append(velocity)
return np.mean(velocities) if velocities else 0.0
def _calculate_position_variance(self, vehicle: TrackedVehicle) -> float:
"""
Calculate the position variance of the vehicle.
Args:
vehicle: The tracked vehicle
Returns:
Position variance in pixels
"""
if len(vehicle.last_position_history) < 2:
return 0.0
positions = np.array(vehicle.last_position_history)
variance_x = np.var(positions[:, 0])
variance_y = np.var(positions[:, 1])
return np.sqrt(variance_x + variance_y)
def should_skip_same_car(self,
vehicle: TrackedVehicle,
session_cleared: bool = False,
permanently_processed: Dict[str, float] = None) -> bool:
"""
Determine if we should skip processing for the same car after session clear.
Args:
vehicle: The tracked vehicle
session_cleared: Whether the session was recently cleared
permanently_processed: Dict of permanently processed vehicles (camera_id:track_id -> time)
Returns:
True if we should skip this vehicle
"""
# Check if this vehicle was permanently processed (never process again)
if permanently_processed:
# Create composite key using camera_id and track_id
permanent_key = f"{vehicle.camera_id}:{vehicle.track_id}"
if permanent_key in permanently_processed:
process_time = permanently_processed[permanent_key]
time_since = time.time() - process_time
logger.debug(f"Skipping permanently processed vehicle {vehicle.track_id} on camera {vehicle.camera_id} "
f"(processed {time_since:.1f}s ago)")
return True
# If vehicle has a session_id but it was cleared, skip for a period
if vehicle.session_id is None and vehicle.processed_pipeline and session_cleared:
# Check if enough time has passed since processing
if vehicle.track_id in self.last_processed_vehicles:
time_since = time.time() - self.last_processed_vehicles[vehicle.track_id]
if time_since < 30.0: # 30 second cooldown after session clear
logger.debug(f"Skipping same car {vehicle.track_id} after session clear "
f"({time_since:.1f}s since processing)")
return True
return False
def reset_vehicle(self, track_id: int):
"""
Reset validation state for a specific vehicle.
Args:
track_id: Track ID of the vehicle to reset
"""
if track_id in self.validation_history:
del self.validation_history[track_id]
if track_id in self.last_processed_vehicles:
del self.last_processed_vehicles[track_id]
logger.debug(f"Reset validation state for vehicle {track_id}")
def get_statistics(self) -> Dict:
"""Get validation statistics."""
return {
'vehicles_in_history': len(self.validation_history),
'recently_processed': len(self.last_processed_vehicles),
'state_distribution': self._get_state_distribution()
}
def _get_state_distribution(self) -> Dict[str, int]:
"""Get distribution of current vehicle states."""
distribution = {state.value: 0 for state in VehicleState}
for history in self.validation_history.values():
if history:
current_state = history[-1]
distribution[current_state.value] += 1
return distribution