python-detector-worker/core/tracking/integration.py
ziesorx 402f7732a8
All checks were successful
Build Worker Base and Application Images / check-base-changes (push) Successful in 7s
Build Worker Base and Application Images / build-base (push) Has been skipped
Build Worker Base and Application Images / build-docker (push) Successful in 2m53s
Build Worker Base and Application Images / deploy-stack (push) Successful in 15s
fix: change min bbox size for frontal
2025-09-30 17:24:33 +07:00

858 lines
No EOL
37 KiB
Python

"""
Tracking-Pipeline Integration Module.
Connects the tracking system with the main detection pipeline and manages the flow.
"""
import logging
import time
import uuid
from typing import Dict, Optional, Any, List, Tuple
from concurrent.futures import ThreadPoolExecutor
import asyncio
import numpy as np
from .tracker import VehicleTracker, TrackedVehicle
from .validator import StableCarValidator
from ..models.inference import YOLOWrapper
from ..models.pipeline import PipelineParser
from ..detection.pipeline import DetectionPipeline
logger = logging.getLogger(__name__)
class TrackingPipelineIntegration:
"""
Integrates vehicle tracking with the detection pipeline.
Manages tracking state transitions and pipeline execution triggers.
"""
def __init__(self, pipeline_parser: PipelineParser, model_manager: Any, model_id: int, message_sender=None):
"""
Initialize tracking-pipeline integration.
Args:
pipeline_parser: Pipeline parser with loaded configuration
model_manager: Model manager for loading models
model_id: The model ID to use for loading models
message_sender: Optional callback function for sending WebSocket messages
"""
self.pipeline_parser = pipeline_parser
self.model_manager = model_manager
self.model_id = model_id
self.message_sender = message_sender
# Store subscription info for snapshot access
self.subscription_info = None
# Initialize tracking components
tracking_config = pipeline_parser.tracking_config.__dict__ if pipeline_parser.tracking_config else {}
self.tracker = VehicleTracker(tracking_config)
self.validator = StableCarValidator()
# Tracking model
self.tracking_model: Optional[YOLOWrapper] = None
self.tracking_model_id = None
# Detection pipeline (Phase 5)
self.detection_pipeline: Optional[DetectionPipeline] = None
# Session management
self.active_sessions: Dict[str, str] = {} # display_id -> session_id
self.session_vehicles: Dict[str, int] = {} # session_id -> track_id
self.cleared_sessions: Dict[str, float] = {} # session_id -> clear_time
self.pending_vehicles: Dict[str, int] = {} # display_id -> track_id (waiting for session ID)
self.pending_processing_data: Dict[str, Dict] = {} # display_id -> processing data (waiting for session ID)
self.display_to_subscription: Dict[str, str] = {} # display_id -> subscription_id (for fallback)
# Additional validators for enhanced flow control
self.permanently_processed: Dict[str, float] = {} # "camera_id:track_id" -> process_time (never process again)
self.progression_stages: Dict[str, str] = {} # session_id -> current_stage
self.last_detection_time: Dict[str, float] = {} # display_id -> last_detection_timestamp
self.abandonment_timeout = 3.0 # seconds to wait before declaring car abandoned
# Thread pool for pipeline execution
self.executor = ThreadPoolExecutor(max_workers=2)
# Min bbox filtering configuration
# TODO: Make this configurable via pipeline.json in the future
self.min_bbox_area_percentage = 3.5 # 3.5% of frame area minimum
# Statistics
self.stats = {
'frames_processed': 0,
'vehicles_detected': 0,
'vehicles_validated': 0,
'pipelines_executed': 0,
'frontals_filtered_small': 0 # Track filtered detections
}
logger.info("TrackingPipelineIntegration initialized")
async def initialize_tracking_model(self) -> bool:
"""
Load and initialize the tracking model.
Returns:
True if successful, False otherwise
"""
try:
if not self.pipeline_parser.tracking_config:
logger.warning("No tracking configuration found in pipeline")
return False
model_file = self.pipeline_parser.tracking_config.model_file
model_id = self.pipeline_parser.tracking_config.model_id
if not model_file:
logger.warning("No tracking model file specified")
return False
# Load tracking model
logger.info(f"Loading tracking model: {model_id} ({model_file})")
self.tracking_model = self.model_manager.get_yolo_model(self.model_id, model_file)
if not self.tracking_model:
logger.error(f"Failed to load tracking model {model_file} from model {self.model_id}")
return False
self.tracking_model_id = model_id
if self.tracking_model:
logger.info(f"Tracking model {model_id} loaded successfully")
# Initialize detection pipeline (Phase 5)
await self._initialize_detection_pipeline()
return True
else:
logger.error(f"Failed to load tracking model {model_id}")
return False
except Exception as e:
logger.error(f"Error initializing tracking model: {e}", exc_info=True)
return False
async def _initialize_detection_pipeline(self) -> bool:
"""
Initialize the detection pipeline for main detection processing.
Returns:
True if successful, False otherwise
"""
try:
if not self.pipeline_parser:
logger.warning("No pipeline parser available for detection pipeline")
return False
# Create detection pipeline with message sender capability
self.detection_pipeline = DetectionPipeline(self.pipeline_parser, self.model_manager, self.model_id, self.message_sender)
# Initialize detection pipeline
if await self.detection_pipeline.initialize():
logger.info("Detection pipeline initialized successfully")
return True
else:
logger.error("Failed to initialize detection pipeline")
return False
except Exception as e:
logger.error(f"Error initializing detection pipeline: {e}", exc_info=True)
return False
async def process_frame(self,
frame: np.ndarray,
display_id: str,
subscription_id: str,
session_id: Optional[str] = None) -> Dict[str, Any]:
"""
Process a frame through tracking and potentially the detection pipeline.
Args:
frame: Input frame to process
display_id: Display identifier
subscription_id: Full subscription identifier
session_id: Optional session ID from backend
Returns:
Dictionary with processing results
"""
start_time = time.time()
result = {
'tracked_vehicles': [],
'validated_vehicle': None,
'pipeline_result': None,
'session_id': session_id,
'processing_time': 0.0
}
try:
# Update stats
self.stats['frames_processed'] += 1
# Run tracking model
if self.tracking_model:
# Run detection-only (tracking handled by our own tracker)
tracking_results = self.tracking_model.track(
frame,
confidence_threshold=self.tracker.min_confidence,
trigger_classes=self.tracker.trigger_classes,
persist=True
)
# Debug: Log raw detection results
if tracking_results and hasattr(tracking_results, 'detections'):
raw_detections = len(tracking_results.detections)
if raw_detections > 0:
class_names = [detection.class_name for detection in tracking_results.detections]
logger.debug(f"Raw detections: {raw_detections}, classes: {class_names}")
else:
logger.debug(f"No raw detections found")
else:
logger.debug(f"No tracking results or detections attribute")
# Filter out small frontal detections (neighboring pumps/distant cars)
if tracking_results and hasattr(tracking_results, 'detections'):
tracking_results = self._filter_small_frontals(tracking_results, frame)
# Process tracking results
tracked_vehicles = self.tracker.process_detections(
tracking_results,
display_id,
frame
)
# Update last detection time for abandonment detection
# Update when vehicles ARE detected, so when they leave, timestamp ages
if tracked_vehicles:
self.last_detection_time[display_id] = time.time()
logger.debug(f"Updated last_detection_time for {display_id}: {len(tracked_vehicles)} vehicles")
# Check for car abandonment (vehicle left after getting car_wait_staff stage)
await self._check_car_abandonment(display_id, subscription_id)
result['tracked_vehicles'] = [
{
'track_id': v.track_id,
'bbox': v.bbox,
'confidence': v.confidence,
'is_stable': v.is_stable,
'session_id': v.session_id
}
for v in tracked_vehicles
]
# Log tracking info periodically
if self.stats['frames_processed'] % 30 == 0: # Every 30 frames
logger.debug(f"Tracking: {len(tracked_vehicles)} vehicles, "
f"display={display_id}")
# Get stable vehicles for validation
stable_vehicles = self.tracker.get_stable_vehicles(display_id)
# Validate and potentially process stable vehicles
for vehicle in stable_vehicles:
# Check if vehicle is already processed or has session
if vehicle.processed_pipeline:
continue
# Check for session cleared (post-fueling)
if session_id and vehicle.session_id == session_id:
# Same vehicle with same session, skip
continue
# Check if this was a recently cleared session
session_cleared = False
if vehicle.session_id in self.cleared_sessions:
clear_time = self.cleared_sessions[vehicle.session_id]
if (time.time() - clear_time) < 30: # 30 second cooldown
session_cleared = True
# Skip same car after session clear or if permanently processed
if self.validator.should_skip_same_car(vehicle, session_cleared, self.permanently_processed):
continue
# Validate vehicle
validation_result = self.validator.validate_vehicle(vehicle, frame.shape)
if validation_result.is_valid and validation_result.should_process:
logger.info(f"Vehicle {vehicle.track_id} validated for processing: "
f"{validation_result.reason}")
result['validated_vehicle'] = {
'track_id': vehicle.track_id,
'state': validation_result.state.value,
'confidence': validation_result.confidence
}
# Execute detection pipeline - this will send real imageDetection when detection is found
# Mark vehicle as pending session ID assignment
self.pending_vehicles[display_id] = vehicle.track_id
logger.info(f"Vehicle {vehicle.track_id} waiting for session ID from backend")
# Execute detection pipeline (placeholder for Phase 5)
pipeline_result = await self._execute_pipeline(
frame,
vehicle,
display_id,
None, # No session ID yet
subscription_id
)
result['pipeline_result'] = pipeline_result
# No session_id in result yet - backend will provide it
self.stats['pipelines_executed'] += 1
# Only process one vehicle per frame
break
self.stats['vehicles_detected'] = len(tracked_vehicles)
self.stats['vehicles_validated'] = len(stable_vehicles)
else:
logger.warning("No tracking model available")
except Exception as e:
logger.error(f"Error in tracking pipeline: {e}", exc_info=True)
result['processing_time'] = time.time() - start_time
return result
async def _execute_pipeline(self,
frame: np.ndarray,
vehicle: TrackedVehicle,
display_id: str,
session_id: str,
subscription_id: str) -> Dict[str, Any]:
"""
Execute the main detection pipeline for a validated vehicle.
Args:
frame: Input frame
vehicle: Validated tracked vehicle
display_id: Display identifier
session_id: Session identifier
subscription_id: Full subscription identifier
Returns:
Pipeline execution results
"""
logger.info(f"Executing detection pipeline for vehicle {vehicle.track_id}, "
f"session={session_id}, display={display_id}")
try:
# Check if detection pipeline is available
if not self.detection_pipeline:
logger.warning("Detection pipeline not initialized, using fallback")
return {
'status': 'error',
'message': 'Detection pipeline not available',
'vehicle_id': vehicle.track_id,
'session_id': session_id
}
# Execute only the detection phase (first phase)
# This will run detection and send imageDetection message to backend
detection_result = await self.detection_pipeline.execute_detection_phase(
frame=frame,
display_id=display_id,
subscription_id=subscription_id
)
# Add vehicle information to result
detection_result['vehicle_id'] = vehicle.track_id
detection_result['vehicle_bbox'] = vehicle.bbox
detection_result['vehicle_confidence'] = vehicle.confidence
detection_result['phase'] = 'detection'
logger.info(f"Detection phase executed for vehicle {vehicle.track_id}: "
f"status={detection_result.get('status', 'unknown')}, "
f"message_sent={detection_result.get('message_sent', False)}, "
f"processing_time={detection_result.get('processing_time', 0):.3f}s")
# Store frame and detection results for processing phase
if detection_result['message_sent']:
# Store for later processing when sessionId is received
self.pending_processing_data[display_id] = {
'frame': frame.copy(), # Store copy of frame for processing phase
'vehicle': vehicle,
'subscription_id': subscription_id,
'detection_result': detection_result,
'timestamp': time.time()
}
logger.info(f"Stored processing data for {display_id}, waiting for sessionId from backend")
return detection_result
except Exception as e:
logger.error(f"Error executing detection pipeline: {e}", exc_info=True)
return {
'status': 'error',
'message': str(e),
'vehicle_id': vehicle.track_id,
'session_id': session_id,
'processing_time': 0.0
}
async def _execute_processing_phase(self,
processing_data: Dict[str, Any],
session_id: str,
display_id: str) -> None:
"""
Execute the processing phase after receiving sessionId from backend.
This includes branch processing and database operations.
Args:
processing_data: Stored processing data from detection phase
session_id: Session ID from backend
display_id: Display identifier
"""
try:
vehicle = processing_data['vehicle']
subscription_id = processing_data['subscription_id']
detection_result = processing_data['detection_result']
logger.info(f"Executing processing phase for session {session_id}, vehicle {vehicle.track_id}")
# Capture high-quality snapshot for pipeline processing
logger.info(f"[PROCESSING PHASE] Fetching 2K snapshot for session {session_id}")
frame = self._fetch_snapshot()
if frame is None:
logger.warning(f"[PROCESSING PHASE] Failed to capture snapshot, falling back to RTSP frame")
# Fall back to RTSP frame if snapshot fails
frame = processing_data['frame']
# Extract detected regions from detection phase result if available
detected_regions = detection_result.get('detected_regions', {})
logger.info(f"[INTEGRATION] Passing detected_regions to processing phase: {list(detected_regions.keys())}")
# Execute processing phase with detection pipeline
if self.detection_pipeline:
processing_result = await self.detection_pipeline.execute_processing_phase(
frame=frame,
display_id=display_id,
session_id=session_id,
subscription_id=subscription_id,
detected_regions=detected_regions
)
logger.info(f"Processing phase completed for session {session_id}: "
f"status={processing_result.get('status', 'unknown')}, "
f"branches={len(processing_result.get('branch_results', {}))}, "
f"actions={len(processing_result.get('actions_executed', []))}, "
f"processing_time={processing_result.get('processing_time', 0):.3f}s")
# Update stats
self.stats['pipelines_executed'] += 1
else:
logger.error("Detection pipeline not available for processing phase")
except Exception as e:
logger.error(f"Error in processing phase for session {session_id}: {e}", exc_info=True)
def set_subscription_info(self, subscription_info):
"""
Set subscription info to access snapshot URL and other stream details.
Args:
subscription_info: SubscriptionInfo object containing stream config
"""
self.subscription_info = subscription_info
logger.debug(f"Set subscription info with snapshot_url: {subscription_info.stream_config.snapshot_url if subscription_info else None}")
def set_session_id(self, display_id: str, session_id: str, subscription_id: str = None):
"""
Set session ID for a display (from backend).
This is called when backend sends setSessionId after receiving imageDetection.
Args:
display_id: Display identifier
session_id: Session identifier
subscription_id: Subscription identifier (displayId;cameraId) - needed for fallback
"""
# Ensure session_id is always a string for consistent type handling
session_id = str(session_id) if session_id is not None else None
self.active_sessions[display_id] = session_id
# Store subscription_id for fallback usage
if subscription_id:
self.display_to_subscription[display_id] = subscription_id
logger.info(f"Set session {session_id} for display {display_id} with subscription {subscription_id}")
else:
logger.info(f"Set session {session_id} for display {display_id}")
# Check if we have a pending vehicle for this display
if display_id in self.pending_vehicles:
track_id = self.pending_vehicles[display_id]
# Mark vehicle as processed with the session ID
self.tracker.mark_processed(track_id, session_id)
self.session_vehicles[session_id] = track_id
# Mark vehicle as permanently processed (won't process again even after session clear)
# Use composite key to distinguish same track IDs across different cameras
camera_id = display_id # Using display_id as camera_id for isolation
permanent_key = f"{camera_id}:{track_id}"
self.permanently_processed[permanent_key] = time.time()
# Remove from pending
del self.pending_vehicles[display_id]
logger.info(f"Assigned session {session_id} to vehicle {track_id}, marked as permanently processed")
else:
logger.warning(f"No pending vehicle found for display {display_id} when setting session {session_id}")
# Check if we have pending processing data for this display
if display_id in self.pending_processing_data:
processing_data = self.pending_processing_data[display_id]
# Trigger the processing phase asynchronously
asyncio.create_task(self._execute_processing_phase(
processing_data=processing_data,
session_id=session_id,
display_id=display_id
))
# Remove from pending processing
del self.pending_processing_data[display_id]
logger.info(f"Triggered processing phase for session {session_id} on display {display_id}")
else:
logger.warning(f"No pending processing data found for display {display_id} when setting session {session_id}")
# FALLBACK: Execute pipeline for POS-initiated sessions
# Skip if session_id is None (no car present or car has left)
if session_id is not None:
# Use stored subscription_id instead of creating fake one
stored_subscription_id = self.display_to_subscription.get(display_id)
if stored_subscription_id:
logger.info(f"[FALLBACK] Triggering fallback pipeline for session {session_id} on display {display_id} with subscription {stored_subscription_id}")
# Trigger the fallback pipeline asynchronously with real subscription_id
asyncio.create_task(self._execute_fallback_pipeline(
display_id=display_id,
session_id=session_id,
subscription_id=stored_subscription_id
))
else:
logger.error(f"[FALLBACK] No subscription_id stored for display {display_id}, cannot execute fallback pipeline")
else:
logger.debug(f"[FALLBACK] Skipping pipeline execution for session_id=None on display {display_id}")
def clear_session_id(self, session_id: str):
"""
Clear session ID (post-fueling).
Args:
session_id: Session identifier to clear
"""
# Mark session as cleared
self.cleared_sessions[session_id] = time.time()
# Clear from tracker
self.tracker.clear_session(session_id)
# Remove from active sessions
display_to_remove = None
for display_id, sess_id in self.active_sessions.items():
if sess_id == session_id:
display_to_remove = display_id
break
if display_to_remove:
del self.active_sessions[display_to_remove]
if session_id in self.session_vehicles:
del self.session_vehicles[session_id]
logger.info(f"Cleared session {session_id}")
# Clean old cleared sessions (older than 5 minutes)
current_time = time.time()
old_sessions = [
sid for sid, clear_time in self.cleared_sessions.items()
if (current_time - clear_time) > 300
]
for sid in old_sessions:
del self.cleared_sessions[sid]
def get_session_for_display(self, display_id: str) -> Optional[str]:
"""Get active session for a display."""
return self.active_sessions.get(display_id)
def reset_tracking(self):
"""Reset all tracking state."""
self.tracker.reset_tracking()
self.active_sessions.clear()
self.session_vehicles.clear()
self.cleared_sessions.clear()
self.pending_vehicles.clear()
self.pending_processing_data.clear()
self.display_to_subscription.clear()
self.permanently_processed.clear()
self.progression_stages.clear()
self.last_detection_time.clear()
logger.info("Tracking pipeline integration reset")
def get_statistics(self) -> Dict[str, Any]:
"""Get comprehensive statistics."""
tracker_stats = self.tracker.get_statistics()
validator_stats = self.validator.get_statistics()
return {
'integration': self.stats,
'tracker': tracker_stats,
'validator': validator_stats,
'active_sessions': len(self.active_sessions),
'cleared_sessions': len(self.cleared_sessions)
}
async def _check_car_abandonment(self, display_id: str, subscription_id: str):
"""
Check if a car has abandoned the fueling process (left after getting car_wait_staff stage).
Args:
display_id: Display identifier
subscription_id: Subscription identifier
"""
current_time = time.time()
# Check all sessions in car_wait_staff stage
abandoned_sessions = []
for session_id, stage in self.progression_stages.items():
if stage == "car_wait_staff":
# Check if we have recent detections for this session's display
session_display = None
for disp_id, sess_id in self.active_sessions.items():
if sess_id == session_id:
session_display = disp_id
break
if session_display:
last_detection = self.last_detection_time.get(session_display, 0)
time_since_detection = current_time - last_detection
logger.info(f"[ABANDON CHECK] Session {session_id} (display: {session_display}): "
f"time_since_detection={time_since_detection:.1f}s, "
f"timeout={self.abandonment_timeout}s")
if time_since_detection > self.abandonment_timeout:
logger.warning(f"🚨 Car abandonment detected: session {session_id}, "
f"no detection for {time_since_detection:.1f}s")
abandoned_sessions.append(session_id)
else:
logger.debug(f"[ABANDON CHECK] Session {session_id} has no associated display")
# Send abandonment detection for each abandoned session
for session_id in abandoned_sessions:
await self._send_abandonment_detection(subscription_id, session_id)
# Remove from progression stages to avoid repeated detection
if session_id in self.progression_stages:
del self.progression_stages[session_id]
logger.info(f"[ABANDON] Removed session {session_id} from progression_stages after notification")
async def _send_abandonment_detection(self, subscription_id: str, session_id: str):
"""
Send imageDetection with null detection to indicate car abandonment.
Args:
subscription_id: Subscription identifier
session_id: Session ID of the abandoned car
"""
try:
# Import here to avoid circular imports
from ..communication.messages import create_image_detection
# Create abandonment detection message with null detection
detection_message = create_image_detection(
subscription_identifier=subscription_id,
detection_data=None, # Null detection indicates abandonment
model_id=self.model_id,
model_name=self.pipeline_parser.tracking_config.model_id if self.pipeline_parser.tracking_config else "tracking_model"
)
# Send to backend via WebSocket if sender is available
if self.message_sender:
await self.message_sender(detection_message)
logger.info(f"[CAR ABANDONMENT] Sent null detection for session {session_id}")
else:
logger.info(f"[CAR ABANDONMENT] No message sender available, would send: {detection_message}")
except Exception as e:
logger.error(f"Error sending abandonment detection: {e}", exc_info=True)
def set_progression_stage(self, session_id: str, stage: str):
"""
Set progression stage for a session (from backend setProgessionStage message).
Args:
session_id: Session identifier
stage: Progression stage (e.g., "car_wait_staff")
"""
self.progression_stages[session_id] = stage
logger.info(f"Set progression stage for session {session_id}: {stage}")
# If car reaches car_wait_staff, start monitoring for abandonment
if stage == "car_wait_staff":
logger.info(f"Started monitoring session {session_id} for car abandonment")
def _fetch_snapshot(self) -> Optional[np.ndarray]:
"""
Fetch high-quality snapshot from camera's snapshot URL.
Reusable method for both processing phase and fallback pipeline.
Returns:
Snapshot frame or None if unavailable
"""
if not (self.subscription_info and self.subscription_info.stream_config.snapshot_url):
logger.warning("[SNAPSHOT] No subscription info or snapshot URL available")
return None
try:
from ..streaming.readers import HTTPSnapshotReader
logger.info(f"[SNAPSHOT] Fetching snapshot for {self.subscription_info.camera_id}")
snapshot_reader = HTTPSnapshotReader(
camera_id=self.subscription_info.camera_id,
snapshot_url=self.subscription_info.stream_config.snapshot_url,
max_retries=3
)
frame = snapshot_reader.fetch_single_snapshot()
if frame is not None:
logger.info(f"[SNAPSHOT] Successfully fetched {frame.shape[1]}x{frame.shape[0]} snapshot")
return frame
else:
logger.warning("[SNAPSHOT] Failed to fetch snapshot")
return None
except Exception as e:
logger.error(f"[SNAPSHOT] Error fetching snapshot: {e}", exc_info=True)
return None
async def _execute_fallback_pipeline(self, display_id: str, session_id: str, subscription_id: str):
"""
Execute fallback pipeline when sessionId is received without prior detection.
This handles POS-initiated sessions where backend starts transaction before car detection.
Args:
display_id: Display identifier
session_id: Session ID from backend
subscription_id: Subscription identifier for pipeline execution
"""
try:
logger.info(f"[FALLBACK PIPELINE] Executing for session {session_id}, display {display_id}")
# Fetch fresh snapshot from camera
frame = self._fetch_snapshot()
if frame is None:
logger.error(f"[FALLBACK] Failed to fetch snapshot for session {session_id}, cannot execute pipeline")
return
logger.info(f"[FALLBACK] Using snapshot frame {frame.shape[1]}x{frame.shape[0]} for session {session_id}")
# Check if detection pipeline is available
if not self.detection_pipeline:
logger.error(f"[FALLBACK] Detection pipeline not available for session {session_id}")
return
# Execute detection phase to get detected regions
detection_result = await self.detection_pipeline.execute_detection_phase(
frame=frame,
display_id=display_id,
subscription_id=subscription_id
)
logger.info(f"[FALLBACK] Detection phase completed for session {session_id}: "
f"status={detection_result.get('status', 'unknown')}, "
f"regions={list(detection_result.get('detected_regions', {}).keys())}")
# If detection found regions, execute processing phase
detected_regions = detection_result.get('detected_regions', {})
if detected_regions:
processing_result = await self.detection_pipeline.execute_processing_phase(
frame=frame,
display_id=display_id,
session_id=session_id,
subscription_id=subscription_id,
detected_regions=detected_regions
)
logger.info(f"[FALLBACK] Processing phase completed for session {session_id}: "
f"status={processing_result.get('status', 'unknown')}, "
f"branches={len(processing_result.get('branch_results', {}))}, "
f"actions={len(processing_result.get('actions_executed', []))}")
# Update statistics
self.stats['pipelines_executed'] += 1
else:
logger.warning(f"[FALLBACK] No detections found in snapshot for session {session_id}")
except Exception as e:
logger.error(f"[FALLBACK] Error executing fallback pipeline for session {session_id}: {e}", exc_info=True)
def _filter_small_frontals(self, tracking_results, frame):
"""
Filter out frontal detections that are smaller than minimum bbox area percentage.
This prevents processing of cars from neighboring pumps that appear in camera view.
Args:
tracking_results: YOLO tracking results with detections
frame: Input frame for calculating frame area
Returns:
Modified tracking_results with small frontals removed
"""
if not hasattr(tracking_results, 'detections') or not tracking_results.detections:
return tracking_results
# Calculate frame area and minimum bbox area threshold
frame_area = frame.shape[0] * frame.shape[1] # height * width
min_bbox_area = frame_area * (self.min_bbox_area_percentage / 100.0)
# Filter detections
filtered_detections = []
filtered_count = 0
for detection in tracking_results.detections:
# Calculate detection bbox area
bbox = detection.bbox # Assuming bbox is [x1, y1, x2, y2]
bbox_area = (bbox[2] - bbox[0]) * (bbox[3] - bbox[1])
if bbox_area >= min_bbox_area:
# Keep detection - bbox is large enough
filtered_detections.append(detection)
else:
# Filter out small detection
filtered_count += 1
area_percentage = (bbox_area / frame_area) * 100
logger.debug(f"Filtered small frontal: area={bbox_area:.0f}px² ({area_percentage:.1f}% of frame, "
f"min required: {self.min_bbox_area_percentage}%)")
# Update tracking results with filtered detections
tracking_results.detections = filtered_detections
# Update statistics
if filtered_count > 0:
self.stats['frontals_filtered_small'] += filtered_count
logger.info(f"Filtered {filtered_count} small frontal detections, "
f"{len(filtered_detections)} remaining (total filtered: {self.stats['frontals_filtered_small']})")
return tracking_results
def cleanup(self):
"""Cleanup resources."""
self.executor.shutdown(wait=False)
self.reset_tracking()
# Cleanup detection pipeline
if self.detection_pipeline:
self.detection_pipeline.cleanup()
logger.info("Tracking pipeline integration cleaned up")