""" Tracking-Pipeline Integration Module. Connects the tracking system with the main detection pipeline and manages the flow. """ import logging import time import uuid from typing import Dict, Optional, Any, List, Tuple from concurrent.futures import ThreadPoolExecutor import asyncio import numpy as np from .tracker import VehicleTracker, TrackedVehicle from .validator import StableCarValidator from ..models.inference import YOLOWrapper from ..models.pipeline import PipelineParser from ..detection.pipeline import DetectionPipeline logger = logging.getLogger(__name__) class TrackingPipelineIntegration: """ Integrates vehicle tracking with the detection pipeline. Manages tracking state transitions and pipeline execution triggers. """ def __init__(self, pipeline_parser: PipelineParser, model_manager: Any, model_id: int, message_sender=None): """ Initialize tracking-pipeline integration. Args: pipeline_parser: Pipeline parser with loaded configuration model_manager: Model manager for loading models model_id: The model ID to use for loading models message_sender: Optional callback function for sending WebSocket messages """ self.pipeline_parser = pipeline_parser self.model_manager = model_manager self.model_id = model_id self.message_sender = message_sender # Store subscription info for snapshot access self.subscription_info = None # Initialize tracking components tracking_config = pipeline_parser.tracking_config.__dict__ if pipeline_parser.tracking_config else {} self.tracker = VehicleTracker(tracking_config) self.validator = StableCarValidator() # Tracking model self.tracking_model: Optional[YOLOWrapper] = None self.tracking_model_id = None # Detection pipeline (Phase 5) self.detection_pipeline: Optional[DetectionPipeline] = None # Session management self.active_sessions: Dict[str, str] = {} # display_id -> session_id self.session_vehicles: Dict[str, int] = {} # session_id -> track_id self.cleared_sessions: Dict[str, float] = {} # session_id -> clear_time self.pending_vehicles: Dict[str, int] = {} # display_id -> track_id (waiting for session ID) self.pending_processing_data: Dict[str, Dict] = {} # display_id -> processing data (waiting for session ID) # Additional validators for enhanced flow control self.permanently_processed: Dict[str, float] = {} # "camera_id:track_id" -> process_time (never process again) self.progression_stages: Dict[str, str] = {} # session_id -> current_stage self.last_detection_time: Dict[str, float] = {} # display_id -> last_detection_timestamp self.abandonment_timeout = 3.0 # seconds to wait before declaring car abandoned # Thread pool for pipeline execution self.executor = ThreadPoolExecutor(max_workers=2) # Statistics self.stats = { 'frames_processed': 0, 'vehicles_detected': 0, 'vehicles_validated': 0, 'pipelines_executed': 0 } logger.info("TrackingPipelineIntegration initialized") async def initialize_tracking_model(self) -> bool: """ Load and initialize the tracking model. Returns: True if successful, False otherwise """ try: if not self.pipeline_parser.tracking_config: logger.warning("No tracking configuration found in pipeline") return False model_file = self.pipeline_parser.tracking_config.model_file model_id = self.pipeline_parser.tracking_config.model_id if not model_file: logger.warning("No tracking model file specified") return False # Load tracking model logger.info(f"Loading tracking model: {model_id} ({model_file})") self.tracking_model = self.model_manager.get_yolo_model(self.model_id, model_file) if not self.tracking_model: logger.error(f"Failed to load tracking model {model_file} from model {self.model_id}") return False self.tracking_model_id = model_id if self.tracking_model: logger.info(f"Tracking model {model_id} loaded successfully") # Initialize detection pipeline (Phase 5) await self._initialize_detection_pipeline() return True else: logger.error(f"Failed to load tracking model {model_id}") return False except Exception as e: logger.error(f"Error initializing tracking model: {e}", exc_info=True) return False async def _initialize_detection_pipeline(self) -> bool: """ Initialize the detection pipeline for main detection processing. Returns: True if successful, False otherwise """ try: if not self.pipeline_parser: logger.warning("No pipeline parser available for detection pipeline") return False # Create detection pipeline with message sender capability self.detection_pipeline = DetectionPipeline(self.pipeline_parser, self.model_manager, self.model_id, self.message_sender) # Initialize detection pipeline if await self.detection_pipeline.initialize(): logger.info("Detection pipeline initialized successfully") return True else: logger.error("Failed to initialize detection pipeline") return False except Exception as e: logger.error(f"Error initializing detection pipeline: {e}", exc_info=True) return False async def process_frame(self, frame: np.ndarray, display_id: str, subscription_id: str, session_id: Optional[str] = None) -> Dict[str, Any]: """ Process a frame through tracking and potentially the detection pipeline. Args: frame: Input frame to process display_id: Display identifier subscription_id: Full subscription identifier session_id: Optional session ID from backend Returns: Dictionary with processing results """ start_time = time.time() result = { 'tracked_vehicles': [], 'validated_vehicle': None, 'pipeline_result': None, 'session_id': session_id, 'processing_time': 0.0 } try: # Update stats self.stats['frames_processed'] += 1 # Run tracking model if self.tracking_model: # Run detection-only (tracking handled by our own tracker) tracking_results = self.tracking_model.track( frame, confidence_threshold=self.tracker.min_confidence, trigger_classes=self.tracker.trigger_classes, persist=True ) # Debug: Log raw detection results if tracking_results and hasattr(tracking_results, 'detections'): raw_detections = len(tracking_results.detections) if raw_detections > 0: class_names = [detection.class_name for detection in tracking_results.detections] logger.debug(f"Raw detections: {raw_detections}, classes: {class_names}") else: logger.debug(f"No raw detections found") else: logger.debug(f"No tracking results or detections attribute") # Process tracking results tracked_vehicles = self.tracker.process_detections( tracking_results, display_id, frame ) # Update last detection time for abandonment detection if tracked_vehicles: self.last_detection_time[display_id] = time.time() # Check for car abandonment (vehicle left after getting car_wait_staff stage) await self._check_car_abandonment(display_id, subscription_id) result['tracked_vehicles'] = [ { 'track_id': v.track_id, 'bbox': v.bbox, 'confidence': v.confidence, 'is_stable': v.is_stable, 'session_id': v.session_id } for v in tracked_vehicles ] # Log tracking info periodically if self.stats['frames_processed'] % 30 == 0: # Every 30 frames logger.debug(f"Tracking: {len(tracked_vehicles)} vehicles, " f"display={display_id}") # Get stable vehicles for validation stable_vehicles = self.tracker.get_stable_vehicles(display_id) # Validate and potentially process stable vehicles for vehicle in stable_vehicles: # Check if vehicle is already processed or has session if vehicle.processed_pipeline: continue # Check for session cleared (post-fueling) if session_id and vehicle.session_id == session_id: # Same vehicle with same session, skip continue # Check if this was a recently cleared session session_cleared = False if vehicle.session_id in self.cleared_sessions: clear_time = self.cleared_sessions[vehicle.session_id] if (time.time() - clear_time) < 30: # 30 second cooldown session_cleared = True # Skip same car after session clear or if permanently processed if self.validator.should_skip_same_car(vehicle, session_cleared, self.permanently_processed): continue # Validate vehicle validation_result = self.validator.validate_vehicle(vehicle, frame.shape) if validation_result.is_valid and validation_result.should_process: logger.info(f"Vehicle {vehicle.track_id} validated for processing: " f"{validation_result.reason}") result['validated_vehicle'] = { 'track_id': vehicle.track_id, 'state': validation_result.state.value, 'confidence': validation_result.confidence } # Execute detection pipeline - this will send real imageDetection when detection is found # Mark vehicle as pending session ID assignment self.pending_vehicles[display_id] = vehicle.track_id logger.info(f"Vehicle {vehicle.track_id} waiting for session ID from backend") # Execute detection pipeline (placeholder for Phase 5) pipeline_result = await self._execute_pipeline( frame, vehicle, display_id, None, # No session ID yet subscription_id ) result['pipeline_result'] = pipeline_result # No session_id in result yet - backend will provide it self.stats['pipelines_executed'] += 1 # Only process one vehicle per frame break self.stats['vehicles_detected'] = len(tracked_vehicles) self.stats['vehicles_validated'] = len(stable_vehicles) else: logger.warning("No tracking model available") except Exception as e: logger.error(f"Error in tracking pipeline: {e}", exc_info=True) result['processing_time'] = time.time() - start_time return result async def _execute_pipeline(self, frame: np.ndarray, vehicle: TrackedVehicle, display_id: str, session_id: str, subscription_id: str) -> Dict[str, Any]: """ Execute the main detection pipeline for a validated vehicle. Args: frame: Input frame vehicle: Validated tracked vehicle display_id: Display identifier session_id: Session identifier subscription_id: Full subscription identifier Returns: Pipeline execution results """ logger.info(f"Executing detection pipeline for vehicle {vehicle.track_id}, " f"session={session_id}, display={display_id}") try: # Check if detection pipeline is available if not self.detection_pipeline: logger.warning("Detection pipeline not initialized, using fallback") return { 'status': 'error', 'message': 'Detection pipeline not available', 'vehicle_id': vehicle.track_id, 'session_id': session_id } # Execute only the detection phase (first phase) # This will run detection and send imageDetection message to backend detection_result = await self.detection_pipeline.execute_detection_phase( frame=frame, display_id=display_id, subscription_id=subscription_id ) # Add vehicle information to result detection_result['vehicle_id'] = vehicle.track_id detection_result['vehicle_bbox'] = vehicle.bbox detection_result['vehicle_confidence'] = vehicle.confidence detection_result['phase'] = 'detection' logger.info(f"Detection phase executed for vehicle {vehicle.track_id}: " f"status={detection_result.get('status', 'unknown')}, " f"message_sent={detection_result.get('message_sent', False)}, " f"processing_time={detection_result.get('processing_time', 0):.3f}s") # Store frame and detection results for processing phase if detection_result['message_sent']: # Store for later processing when sessionId is received self.pending_processing_data[display_id] = { 'frame': frame.copy(), # Store copy of frame for processing phase 'vehicle': vehicle, 'subscription_id': subscription_id, 'detection_result': detection_result, 'timestamp': time.time() } logger.info(f"Stored processing data for {display_id}, waiting for sessionId from backend") return detection_result except Exception as e: logger.error(f"Error executing detection pipeline: {e}", exc_info=True) return { 'status': 'error', 'message': str(e), 'vehicle_id': vehicle.track_id, 'session_id': session_id, 'processing_time': 0.0 } async def _execute_processing_phase(self, processing_data: Dict[str, Any], session_id: str, display_id: str) -> None: """ Execute the processing phase after receiving sessionId from backend. This includes branch processing and database operations. Args: processing_data: Stored processing data from detection phase session_id: Session ID from backend display_id: Display identifier """ try: vehicle = processing_data['vehicle'] subscription_id = processing_data['subscription_id'] detection_result = processing_data['detection_result'] logger.info(f"Executing processing phase for session {session_id}, vehicle {vehicle.track_id}") # Capture high-quality snapshot for pipeline processing frame = None if self.subscription_info and self.subscription_info.stream_config.snapshot_url: from ..streaming.readers import HTTPSnapshotReader logger.info(f"[PROCESSING PHASE] Fetching 2K snapshot for session {session_id}") snapshot_reader = HTTPSnapshotReader( camera_id=self.subscription_info.camera_id, snapshot_url=self.subscription_info.stream_config.snapshot_url, max_retries=3 ) frame = snapshot_reader.fetch_single_snapshot() if frame is not None: logger.info(f"[PROCESSING PHASE] Successfully fetched {frame.shape[1]}x{frame.shape[0]} snapshot for pipeline") else: logger.warning(f"[PROCESSING PHASE] Failed to capture snapshot, falling back to RTSP frame") # Fall back to RTSP frame if snapshot fails frame = processing_data['frame'] else: logger.warning(f"[PROCESSING PHASE] No snapshot URL available, using RTSP frame") frame = processing_data['frame'] # Extract detected regions from detection phase result if available detected_regions = detection_result.get('detected_regions', {}) logger.info(f"[INTEGRATION] Passing detected_regions to processing phase: {list(detected_regions.keys())}") # Execute processing phase with detection pipeline if self.detection_pipeline: processing_result = await self.detection_pipeline.execute_processing_phase( frame=frame, display_id=display_id, session_id=session_id, subscription_id=subscription_id, detected_regions=detected_regions ) logger.info(f"Processing phase completed for session {session_id}: " f"status={processing_result.get('status', 'unknown')}, " f"branches={len(processing_result.get('branch_results', {}))}, " f"actions={len(processing_result.get('actions_executed', []))}, " f"processing_time={processing_result.get('processing_time', 0):.3f}s") # Update stats self.stats['pipelines_executed'] += 1 else: logger.error("Detection pipeline not available for processing phase") except Exception as e: logger.error(f"Error in processing phase for session {session_id}: {e}", exc_info=True) def set_subscription_info(self, subscription_info): """ Set subscription info to access snapshot URL and other stream details. Args: subscription_info: SubscriptionInfo object containing stream config """ self.subscription_info = subscription_info logger.debug(f"Set subscription info with snapshot_url: {subscription_info.stream_config.snapshot_url if subscription_info else None}") def set_session_id(self, display_id: str, session_id: str): """ Set session ID for a display (from backend). This is called when backend sends setSessionId after receiving imageDetection. Args: display_id: Display identifier session_id: Session identifier """ # Ensure session_id is always a string for consistent type handling session_id = str(session_id) if session_id is not None else None self.active_sessions[display_id] = session_id logger.info(f"Set session {session_id} for display {display_id}") # Check if we have a pending vehicle for this display if display_id in self.pending_vehicles: track_id = self.pending_vehicles[display_id] # Mark vehicle as processed with the session ID self.tracker.mark_processed(track_id, session_id) self.session_vehicles[session_id] = track_id # Mark vehicle as permanently processed (won't process again even after session clear) # Use composite key to distinguish same track IDs across different cameras camera_id = display_id # Using display_id as camera_id for isolation permanent_key = f"{camera_id}:{track_id}" self.permanently_processed[permanent_key] = time.time() # Remove from pending del self.pending_vehicles[display_id] logger.info(f"Assigned session {session_id} to vehicle {track_id}, marked as permanently processed") else: logger.warning(f"No pending vehicle found for display {display_id} when setting session {session_id}") # Check if we have pending processing data for this display if display_id in self.pending_processing_data: processing_data = self.pending_processing_data[display_id] # Trigger the processing phase asynchronously asyncio.create_task(self._execute_processing_phase( processing_data=processing_data, session_id=session_id, display_id=display_id )) # Remove from pending processing del self.pending_processing_data[display_id] logger.info(f"Triggered processing phase for session {session_id} on display {display_id}") else: logger.warning(f"No pending processing data found for display {display_id} when setting session {session_id}") def clear_session_id(self, session_id: str): """ Clear session ID (post-fueling). Args: session_id: Session identifier to clear """ # Mark session as cleared self.cleared_sessions[session_id] = time.time() # Clear from tracker self.tracker.clear_session(session_id) # Remove from active sessions display_to_remove = None for display_id, sess_id in self.active_sessions.items(): if sess_id == session_id: display_to_remove = display_id break if display_to_remove: del self.active_sessions[display_to_remove] if session_id in self.session_vehicles: del self.session_vehicles[session_id] logger.info(f"Cleared session {session_id}") # Clean old cleared sessions (older than 5 minutes) current_time = time.time() old_sessions = [ sid for sid, clear_time in self.cleared_sessions.items() if (current_time - clear_time) > 300 ] for sid in old_sessions: del self.cleared_sessions[sid] def get_session_for_display(self, display_id: str) -> Optional[str]: """Get active session for a display.""" return self.active_sessions.get(display_id) def reset_tracking(self): """Reset all tracking state.""" self.tracker.reset_tracking() self.active_sessions.clear() self.session_vehicles.clear() self.cleared_sessions.clear() self.pending_vehicles.clear() self.pending_processing_data.clear() self.permanently_processed.clear() self.progression_stages.clear() self.last_detection_time.clear() logger.info("Tracking pipeline integration reset") def get_statistics(self) -> Dict[str, Any]: """Get comprehensive statistics.""" tracker_stats = self.tracker.get_statistics() validator_stats = self.validator.get_statistics() return { 'integration': self.stats, 'tracker': tracker_stats, 'validator': validator_stats, 'active_sessions': len(self.active_sessions), 'cleared_sessions': len(self.cleared_sessions) } async def _check_car_abandonment(self, display_id: str, subscription_id: str): """ Check if a car has abandoned the fueling process (left after getting car_wait_staff stage). Args: display_id: Display identifier subscription_id: Subscription identifier """ current_time = time.time() # Check all sessions in car_wait_staff stage abandoned_sessions = [] for session_id, stage in self.progression_stages.items(): if stage == "car_wait_staff": # Check if we have recent detections for this session's display session_display = None for disp_id, sess_id in self.active_sessions.items(): if sess_id == session_id: session_display = disp_id break if session_display: last_detection = self.last_detection_time.get(session_display, 0) time_since_detection = current_time - last_detection if time_since_detection > self.abandonment_timeout: logger.info(f"Car abandonment detected: session {session_id}, " f"no detection for {time_since_detection:.1f}s") abandoned_sessions.append(session_id) # Send abandonment detection for each abandoned session for session_id in abandoned_sessions: await self._send_abandonment_detection(subscription_id, session_id) # Remove from progression stages to avoid repeated detection if session_id in self.progression_stages: del self.progression_stages[session_id] async def _send_abandonment_detection(self, subscription_id: str, session_id: str): """ Send imageDetection with null detection to indicate car abandonment. Args: subscription_id: Subscription identifier session_id: Session ID of the abandoned car """ try: # Import here to avoid circular imports from ..communication.messages import create_image_detection # Create abandonment detection message with null detection detection_message = create_image_detection( subscription_identifier=subscription_id, detection_data=None, # Null detection indicates abandonment model_id=self.model_id, model_name=self.pipeline_parser.tracking_config.model_id if self.pipeline_parser.tracking_config else "tracking_model" ) # Send to backend via WebSocket if sender is available if self.message_sender: await self.message_sender(detection_message) logger.info(f"[CAR ABANDONMENT] Sent null detection for session {session_id}") else: logger.info(f"[CAR ABANDONMENT] No message sender available, would send: {detection_message}") except Exception as e: logger.error(f"Error sending abandonment detection: {e}", exc_info=True) def set_progression_stage(self, session_id: str, stage: str): """ Set progression stage for a session (from backend setProgessionStage message). Args: session_id: Session identifier stage: Progression stage (e.g., "car_wait_staff") """ self.progression_stages[session_id] = stage logger.info(f"Set progression stage for session {session_id}: {stage}") # If car reaches car_wait_staff, start monitoring for abandonment if stage == "car_wait_staff": logger.info(f"Started monitoring session {session_id} for car abandonment") def cleanup(self): """Cleanup resources.""" self.executor.shutdown(wait=False) self.reset_tracking() # Cleanup detection pipeline if self.detection_pipeline: self.detection_pipeline.cleanup() logger.info("Tracking pipeline integration cleaned up")