All checks were successful
		
		
	
	Build Worker Base and Application Images / check-base-changes (push) Successful in 6s
				
			Build Worker Base and Application Images / build-base (push) Has been skipped
				
			Build Worker Base and Application Images / build-docker (push) Successful in 3m34s
				
			Build Worker Base and Application Images / deploy-stack (push) Successful in 15s
				
			
		
			
				
	
	
		
			882 lines
		
	
	
		
			No EOL
		
	
	
		
			39 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			882 lines
		
	
	
		
			No EOL
		
	
	
		
			39 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
"""
 | 
						|
Tracking-Pipeline Integration Module.
 | 
						|
Connects the tracking system with the main detection pipeline and manages the flow.
 | 
						|
"""
 | 
						|
import logging
 | 
						|
import time
 | 
						|
import uuid
 | 
						|
from typing import Dict, Optional, Any, List, Tuple
 | 
						|
from concurrent.futures import ThreadPoolExecutor
 | 
						|
import asyncio
 | 
						|
import numpy as np
 | 
						|
 | 
						|
from .tracker import VehicleTracker, TrackedVehicle
 | 
						|
from .validator import StableCarValidator
 | 
						|
from ..models.inference import YOLOWrapper
 | 
						|
from ..models.pipeline import PipelineParser
 | 
						|
from ..detection.pipeline import DetectionPipeline
 | 
						|
 | 
						|
logger = logging.getLogger(__name__)
 | 
						|
 | 
						|
 | 
						|
class TrackingPipelineIntegration:
 | 
						|
    """
 | 
						|
    Integrates vehicle tracking with the detection pipeline.
 | 
						|
    Manages tracking state transitions and pipeline execution triggers.
 | 
						|
    """
 | 
						|
 | 
						|
    def __init__(self, pipeline_parser: PipelineParser, model_manager: Any, model_id: int, message_sender=None):
 | 
						|
        """
 | 
						|
        Initialize tracking-pipeline integration.
 | 
						|
 | 
						|
        Args:
 | 
						|
            pipeline_parser: Pipeline parser with loaded configuration
 | 
						|
            model_manager: Model manager for loading models
 | 
						|
            model_id: The model ID to use for loading models
 | 
						|
            message_sender: Optional callback function for sending WebSocket messages
 | 
						|
        """
 | 
						|
        self.pipeline_parser = pipeline_parser
 | 
						|
        self.model_manager = model_manager
 | 
						|
        self.model_id = model_id
 | 
						|
        self.message_sender = message_sender
 | 
						|
 | 
						|
        # Store subscription info for snapshot access
 | 
						|
        self.subscription_info = None
 | 
						|
 | 
						|
        # Initialize tracking components
 | 
						|
        tracking_config = pipeline_parser.tracking_config.__dict__ if pipeline_parser.tracking_config else {}
 | 
						|
        self.tracker = VehicleTracker(tracking_config)
 | 
						|
        self.validator = StableCarValidator()
 | 
						|
 | 
						|
        # Tracking model
 | 
						|
        self.tracking_model: Optional[YOLOWrapper] = None
 | 
						|
        self.tracking_model_id = None
 | 
						|
 | 
						|
        # Detection pipeline (Phase 5)
 | 
						|
        self.detection_pipeline: Optional[DetectionPipeline] = None
 | 
						|
 | 
						|
        # Session management
 | 
						|
        self.active_sessions: Dict[str, str] = {}  # display_id -> session_id
 | 
						|
        self.session_vehicles: Dict[str, int] = {}  # session_id -> track_id
 | 
						|
        self.cleared_sessions: Dict[str, float] = {}  # session_id -> clear_time
 | 
						|
        self.pending_vehicles: Dict[str, int] = {}  # display_id -> track_id (waiting for session ID)
 | 
						|
        self.pending_processing_data: Dict[str, Dict] = {}  # display_id -> processing data (waiting for session ID)
 | 
						|
        self.display_to_subscription: Dict[str, str] = {}  # display_id -> subscription_id (for fallback)
 | 
						|
 | 
						|
        # Additional validators for enhanced flow control
 | 
						|
        self.permanently_processed: Dict[str, float] = {}  # "camera_id:track_id" -> process_time (never process again)
 | 
						|
        self.progression_stages: Dict[str, str] = {}  # session_id -> current_stage
 | 
						|
        self.last_detection_time: Dict[str, float] = {}  # display_id -> last_detection_timestamp
 | 
						|
        self.abandonment_timeout = 3.0  # seconds to wait before declaring car abandoned
 | 
						|
 | 
						|
        # Thread pool for pipeline execution
 | 
						|
        self.executor = ThreadPoolExecutor(max_workers=2)
 | 
						|
 | 
						|
        # Min bbox filtering configuration
 | 
						|
        # TODO: Make this configurable via pipeline.json in the future
 | 
						|
        self.min_bbox_area_percentage = 3.5  # 3.5% of frame area minimum
 | 
						|
 | 
						|
        # Statistics
 | 
						|
        self.stats = {
 | 
						|
            'frames_processed': 0,
 | 
						|
            'vehicles_detected': 0,
 | 
						|
            'vehicles_validated': 0,
 | 
						|
            'pipelines_executed': 0,
 | 
						|
            'frontals_filtered_small': 0  # Track filtered detections
 | 
						|
        }
 | 
						|
 | 
						|
 | 
						|
        logger.info("TrackingPipelineIntegration initialized")
 | 
						|
 | 
						|
    async def initialize_tracking_model(self) -> bool:
 | 
						|
        """
 | 
						|
        Load and initialize the tracking model.
 | 
						|
 | 
						|
        Returns:
 | 
						|
            True if successful, False otherwise
 | 
						|
        """
 | 
						|
        try:
 | 
						|
            if not self.pipeline_parser.tracking_config:
 | 
						|
                logger.warning("No tracking configuration found in pipeline")
 | 
						|
                return False
 | 
						|
 | 
						|
            model_file = self.pipeline_parser.tracking_config.model_file
 | 
						|
            model_id = self.pipeline_parser.tracking_config.model_id
 | 
						|
 | 
						|
            if not model_file:
 | 
						|
                logger.warning("No tracking model file specified")
 | 
						|
                return False
 | 
						|
 | 
						|
            # Load tracking model
 | 
						|
            logger.info(f"Loading tracking model: {model_id} ({model_file})")
 | 
						|
            self.tracking_model = self.model_manager.get_yolo_model(self.model_id, model_file)
 | 
						|
            if not self.tracking_model:
 | 
						|
                logger.error(f"Failed to load tracking model {model_file} from model {self.model_id}")
 | 
						|
                return False
 | 
						|
            self.tracking_model_id = model_id
 | 
						|
 | 
						|
            if self.tracking_model:
 | 
						|
                logger.info(f"Tracking model {model_id} loaded successfully")
 | 
						|
 | 
						|
                # Initialize detection pipeline (Phase 5)
 | 
						|
                await self._initialize_detection_pipeline()
 | 
						|
 | 
						|
                return True
 | 
						|
            else:
 | 
						|
                logger.error(f"Failed to load tracking model {model_id}")
 | 
						|
                return False
 | 
						|
 | 
						|
        except Exception as e:
 | 
						|
            logger.error(f"Error initializing tracking model: {e}", exc_info=True)
 | 
						|
            return False
 | 
						|
 | 
						|
    async def _initialize_detection_pipeline(self) -> bool:
 | 
						|
        """
 | 
						|
        Initialize the detection pipeline for main detection processing.
 | 
						|
 | 
						|
        Returns:
 | 
						|
            True if successful, False otherwise
 | 
						|
        """
 | 
						|
        try:
 | 
						|
            if not self.pipeline_parser:
 | 
						|
                logger.warning("No pipeline parser available for detection pipeline")
 | 
						|
                return False
 | 
						|
 | 
						|
            # Create detection pipeline with message sender capability
 | 
						|
            self.detection_pipeline = DetectionPipeline(self.pipeline_parser, self.model_manager, self.model_id, self.message_sender)
 | 
						|
 | 
						|
            # Initialize detection pipeline
 | 
						|
            if await self.detection_pipeline.initialize():
 | 
						|
                logger.info("Detection pipeline initialized successfully")
 | 
						|
                return True
 | 
						|
            else:
 | 
						|
                logger.error("Failed to initialize detection pipeline")
 | 
						|
                return False
 | 
						|
 | 
						|
        except Exception as e:
 | 
						|
            logger.error(f"Error initializing detection pipeline: {e}", exc_info=True)
 | 
						|
            return False
 | 
						|
 | 
						|
    async def process_frame(self,
 | 
						|
                          frame: np.ndarray,
 | 
						|
                          display_id: str,
 | 
						|
                          subscription_id: str,
 | 
						|
                          session_id: Optional[str] = None) -> Dict[str, Any]:
 | 
						|
        """
 | 
						|
        Process a frame through tracking and potentially the detection pipeline.
 | 
						|
 | 
						|
        Args:
 | 
						|
            frame: Input frame to process
 | 
						|
            display_id: Display identifier
 | 
						|
            subscription_id: Full subscription identifier
 | 
						|
            session_id: Optional session ID from backend
 | 
						|
 | 
						|
        Returns:
 | 
						|
            Dictionary with processing results
 | 
						|
        """
 | 
						|
        start_time = time.time()
 | 
						|
        result = {
 | 
						|
            'tracked_vehicles': [],
 | 
						|
            'validated_vehicle': None,
 | 
						|
            'pipeline_result': None,
 | 
						|
            'session_id': session_id,
 | 
						|
            'processing_time': 0.0
 | 
						|
        }
 | 
						|
 | 
						|
        try:
 | 
						|
            # Update stats
 | 
						|
            self.stats['frames_processed'] += 1
 | 
						|
 | 
						|
            # Run tracking model
 | 
						|
            if self.tracking_model:
 | 
						|
                # Run detection-only (tracking handled by our own tracker)
 | 
						|
                tracking_results = self.tracking_model.track(
 | 
						|
                    frame,
 | 
						|
                    confidence_threshold=self.tracker.min_confidence,
 | 
						|
                    trigger_classes=self.tracker.trigger_classes,
 | 
						|
                    persist=True
 | 
						|
                )
 | 
						|
 | 
						|
                # Debug: Log raw detection results
 | 
						|
                if tracking_results and hasattr(tracking_results, 'detections'):
 | 
						|
                    raw_detections = len(tracking_results.detections)
 | 
						|
                    if raw_detections > 0:
 | 
						|
                        class_names = [detection.class_name for detection in tracking_results.detections]
 | 
						|
                        logger.debug(f"Raw detections: {raw_detections}, classes: {class_names}")
 | 
						|
                    else:
 | 
						|
                        logger.debug(f"No raw detections found")
 | 
						|
                else:
 | 
						|
                    logger.debug(f"No tracking results or detections attribute")
 | 
						|
 | 
						|
                # Filter out small frontal detections (neighboring pumps/distant cars)
 | 
						|
                if tracking_results and hasattr(tracking_results, 'detections'):
 | 
						|
                    tracking_results = self._filter_small_frontals(tracking_results, frame)
 | 
						|
 | 
						|
                # Process tracking results
 | 
						|
                tracked_vehicles = self.tracker.process_detections(
 | 
						|
                    tracking_results,
 | 
						|
                    display_id,
 | 
						|
                    frame
 | 
						|
                )
 | 
						|
 | 
						|
                # Update last detection time for abandonment detection
 | 
						|
                # Update when vehicles ARE detected, so when they leave, timestamp ages
 | 
						|
                if tracked_vehicles:
 | 
						|
                    self.last_detection_time[display_id] = time.time()
 | 
						|
                    logger.debug(f"Updated last_detection_time for {display_id}: {len(tracked_vehicles)} vehicles")
 | 
						|
 | 
						|
                # Check for car abandonment (vehicle left after getting car_wait_staff stage)
 | 
						|
                await self._check_car_abandonment(display_id, subscription_id)
 | 
						|
 | 
						|
                result['tracked_vehicles'] = [
 | 
						|
                    {
 | 
						|
                        'track_id': v.track_id,
 | 
						|
                        'bbox': v.bbox,
 | 
						|
                        'confidence': v.confidence,
 | 
						|
                        'is_stable': v.is_stable,
 | 
						|
                        'session_id': v.session_id
 | 
						|
                    }
 | 
						|
                    for v in tracked_vehicles
 | 
						|
                ]
 | 
						|
 | 
						|
                # Log tracking info periodically
 | 
						|
                if self.stats['frames_processed'] % 30 == 0:  # Every 30 frames
 | 
						|
                    logger.debug(f"Tracking: {len(tracked_vehicles)} vehicles, "
 | 
						|
                               f"display={display_id}")
 | 
						|
 | 
						|
                # Get stable vehicles for validation
 | 
						|
                stable_vehicles = self.tracker.get_stable_vehicles(display_id)
 | 
						|
 | 
						|
                # Validate and potentially process stable vehicles
 | 
						|
                for vehicle in stable_vehicles:
 | 
						|
                    # Check if vehicle is already processed or has session
 | 
						|
                    if vehicle.processed_pipeline:
 | 
						|
                        continue
 | 
						|
 | 
						|
                    # Check for session cleared (post-fueling)
 | 
						|
                    if session_id and vehicle.session_id == session_id:
 | 
						|
                        # Same vehicle with same session, skip
 | 
						|
                        continue
 | 
						|
 | 
						|
                    # Check if this was a recently cleared session
 | 
						|
                    session_cleared = False
 | 
						|
                    if vehicle.session_id in self.cleared_sessions:
 | 
						|
                        clear_time = self.cleared_sessions[vehicle.session_id]
 | 
						|
                        if (time.time() - clear_time) < 30:  # 30 second cooldown
 | 
						|
                            session_cleared = True
 | 
						|
 | 
						|
                    # Skip same car after session clear or if permanently processed
 | 
						|
                    if self.validator.should_skip_same_car(vehicle, session_cleared, self.permanently_processed):
 | 
						|
                        continue
 | 
						|
 | 
						|
                    # Validate vehicle
 | 
						|
                    validation_result = self.validator.validate_vehicle(vehicle, frame.shape)
 | 
						|
 | 
						|
                    if validation_result.is_valid and validation_result.should_process:
 | 
						|
                        logger.info(f"Vehicle {vehicle.track_id} validated for processing: "
 | 
						|
                                  f"{validation_result.reason}")
 | 
						|
 | 
						|
                        result['validated_vehicle'] = {
 | 
						|
                            'track_id': vehicle.track_id,
 | 
						|
                            'state': validation_result.state.value,
 | 
						|
                            'confidence': validation_result.confidence
 | 
						|
                        }
 | 
						|
 | 
						|
                        # Execute detection pipeline - this will send real imageDetection when detection is found
 | 
						|
 | 
						|
                        # Mark vehicle as pending session ID assignment
 | 
						|
                        self.pending_vehicles[display_id] = vehicle.track_id
 | 
						|
                        logger.info(f"Vehicle {vehicle.track_id} waiting for session ID from backend")
 | 
						|
 | 
						|
                        # Execute detection pipeline (placeholder for Phase 5)
 | 
						|
                        pipeline_result = await self._execute_pipeline(
 | 
						|
                            frame,
 | 
						|
                            vehicle,
 | 
						|
                            display_id,
 | 
						|
                            None,  # No session ID yet
 | 
						|
                            subscription_id
 | 
						|
                        )
 | 
						|
 | 
						|
                        result['pipeline_result'] = pipeline_result
 | 
						|
                        # No session_id in result yet - backend will provide it
 | 
						|
                        self.stats['pipelines_executed'] += 1
 | 
						|
 | 
						|
                        # Only process one vehicle per frame
 | 
						|
                        break
 | 
						|
 | 
						|
                self.stats['vehicles_detected'] = len(tracked_vehicles)
 | 
						|
                self.stats['vehicles_validated'] = len(stable_vehicles)
 | 
						|
 | 
						|
            else:
 | 
						|
                logger.warning("No tracking model available")
 | 
						|
 | 
						|
        except Exception as e:
 | 
						|
            logger.error(f"Error in tracking pipeline: {e}", exc_info=True)
 | 
						|
 | 
						|
 | 
						|
        result['processing_time'] = time.time() - start_time
 | 
						|
        return result
 | 
						|
 | 
						|
    async def _execute_pipeline(self,
 | 
						|
                              frame: np.ndarray,
 | 
						|
                              vehicle: TrackedVehicle,
 | 
						|
                              display_id: str,
 | 
						|
                              session_id: str,
 | 
						|
                              subscription_id: str) -> Dict[str, Any]:
 | 
						|
        """
 | 
						|
        Execute the main detection pipeline for a validated vehicle.
 | 
						|
 | 
						|
        Args:
 | 
						|
            frame: Input frame
 | 
						|
            vehicle: Validated tracked vehicle
 | 
						|
            display_id: Display identifier
 | 
						|
            session_id: Session identifier
 | 
						|
            subscription_id: Full subscription identifier
 | 
						|
 | 
						|
        Returns:
 | 
						|
            Pipeline execution results
 | 
						|
        """
 | 
						|
        logger.info(f"Executing detection pipeline for vehicle {vehicle.track_id}, "
 | 
						|
                   f"session={session_id}, display={display_id}")
 | 
						|
 | 
						|
        try:
 | 
						|
            # Check if detection pipeline is available
 | 
						|
            if not self.detection_pipeline:
 | 
						|
                logger.warning("Detection pipeline not initialized, using fallback")
 | 
						|
                return {
 | 
						|
                    'status': 'error',
 | 
						|
                    'message': 'Detection pipeline not available',
 | 
						|
                    'vehicle_id': vehicle.track_id,
 | 
						|
                    'session_id': session_id
 | 
						|
                }
 | 
						|
 | 
						|
            # Fetch high-quality 2K snapshot for detection phase (not RTSP frame)
 | 
						|
            # This ensures bbox coordinates match the frame used in processing phase
 | 
						|
            logger.info(f"[DETECTION PHASE] Fetching 2K snapshot for vehicle {vehicle.track_id}")
 | 
						|
            snapshot_frame = self._fetch_snapshot()
 | 
						|
 | 
						|
            if snapshot_frame is None:
 | 
						|
                logger.warning(f"[DETECTION PHASE] Failed to fetch snapshot, falling back to RTSP frame")
 | 
						|
                snapshot_frame = frame  # Fallback to RTSP if snapshot fails
 | 
						|
            else:
 | 
						|
                logger.info(f"[DETECTION PHASE] Using {snapshot_frame.shape[1]}x{snapshot_frame.shape[0]} snapshot for detection")
 | 
						|
 | 
						|
            # Execute only the detection phase (first phase)
 | 
						|
            # This will run detection and send imageDetection message to backend
 | 
						|
            detection_result = await self.detection_pipeline.execute_detection_phase(
 | 
						|
                frame=snapshot_frame,  # Use 2K snapshot instead of RTSP frame
 | 
						|
                display_id=display_id,
 | 
						|
                subscription_id=subscription_id
 | 
						|
            )
 | 
						|
 | 
						|
            # Add vehicle information to result
 | 
						|
            detection_result['vehicle_id'] = vehicle.track_id
 | 
						|
            detection_result['vehicle_bbox'] = vehicle.bbox
 | 
						|
            detection_result['vehicle_confidence'] = vehicle.confidence
 | 
						|
            detection_result['phase'] = 'detection'
 | 
						|
 | 
						|
            logger.info(f"Detection phase executed for vehicle {vehicle.track_id}: "
 | 
						|
                       f"status={detection_result.get('status', 'unknown')}, "
 | 
						|
                       f"message_sent={detection_result.get('message_sent', False)}, "
 | 
						|
                       f"processing_time={detection_result.get('processing_time', 0):.3f}s")
 | 
						|
 | 
						|
            # Store frame and detection results for processing phase
 | 
						|
            if detection_result['message_sent']:
 | 
						|
                # Store for later processing when sessionId is received
 | 
						|
                self.pending_processing_data[display_id] = {
 | 
						|
                    'frame': snapshot_frame.copy(),  # Store copy of 2K snapshot (not RTSP frame!)
 | 
						|
                    'vehicle': vehicle,
 | 
						|
                    'subscription_id': subscription_id,
 | 
						|
                    'detection_result': detection_result,
 | 
						|
                    'timestamp': time.time()
 | 
						|
                }
 | 
						|
                logger.info(f"Stored processing data ({snapshot_frame.shape[1]}x{snapshot_frame.shape[0]} frame) for {display_id}, waiting for sessionId from backend")
 | 
						|
 | 
						|
            return detection_result
 | 
						|
 | 
						|
        except Exception as e:
 | 
						|
            logger.error(f"Error executing detection pipeline: {e}", exc_info=True)
 | 
						|
            return {
 | 
						|
                'status': 'error',
 | 
						|
                'message': str(e),
 | 
						|
                'vehicle_id': vehicle.track_id,
 | 
						|
                'session_id': session_id,
 | 
						|
                'processing_time': 0.0
 | 
						|
            }
 | 
						|
 | 
						|
    async def _execute_processing_phase(self,
 | 
						|
                                      processing_data: Dict[str, Any],
 | 
						|
                                      session_id: str,
 | 
						|
                                      display_id: str) -> None:
 | 
						|
        """
 | 
						|
        Execute the processing phase after receiving sessionId from backend.
 | 
						|
        This includes branch processing and database operations.
 | 
						|
 | 
						|
        Args:
 | 
						|
            processing_data: Stored processing data from detection phase
 | 
						|
            session_id: Session ID from backend
 | 
						|
            display_id: Display identifier
 | 
						|
        """
 | 
						|
        try:
 | 
						|
            vehicle = processing_data['vehicle']
 | 
						|
            subscription_id = processing_data['subscription_id']
 | 
						|
            detection_result = processing_data['detection_result']
 | 
						|
 | 
						|
            logger.info(f"Executing processing phase for session {session_id}, vehicle {vehicle.track_id}")
 | 
						|
 | 
						|
            # Reuse the snapshot from detection phase OR fetch fresh one if detection used RTSP fallback
 | 
						|
            detection_frame = processing_data['frame']
 | 
						|
            frame_height = detection_frame.shape[0]
 | 
						|
 | 
						|
            # Check if detection phase used 2K snapshot (height > 1000) or RTSP fallback (height = 720)
 | 
						|
            if frame_height >= 1000:
 | 
						|
                # Detection used 2K snapshot - reuse it for consistent coordinates
 | 
						|
                logger.info(f"[PROCESSING PHASE] Reusing 2K snapshot from detection phase ({detection_frame.shape[1]}x{detection_frame.shape[0]})")
 | 
						|
                frame = detection_frame
 | 
						|
            else:
 | 
						|
                # Detection used RTSP fallback - need to fetch fresh 2K snapshot
 | 
						|
                logger.warning(f"[PROCESSING PHASE] Detection used RTSP fallback ({detection_frame.shape[1]}x{detection_frame.shape[0]}), fetching fresh 2K snapshot")
 | 
						|
                frame = self._fetch_snapshot()
 | 
						|
 | 
						|
                if frame is None:
 | 
						|
                    logger.error(f"[PROCESSING PHASE] Failed to fetch snapshot and detection used RTSP - coordinate mismatch will occur!")
 | 
						|
                    logger.error(f"[PROCESSING PHASE] Cannot proceed with mismatched coordinates. Aborting processing phase.")
 | 
						|
                    return  # Cannot process safely - bbox coordinates won't match frame resolution
 | 
						|
                else:
 | 
						|
                    logger.warning(f"[PROCESSING PHASE] Fetched fresh 2K snapshot ({frame.shape[1]}x{frame.shape[0]}), but coordinates may not match exactly")
 | 
						|
                    logger.warning(f"[PROCESSING PHASE] Re-running detection on fresh snapshot is recommended but not implemented yet")
 | 
						|
 | 
						|
            # Extract detected regions from detection phase result if available
 | 
						|
            detected_regions = detection_result.get('detected_regions', {})
 | 
						|
            logger.info(f"[INTEGRATION] Passing detected_regions to processing phase: {list(detected_regions.keys())}")
 | 
						|
 | 
						|
            # Execute processing phase with detection pipeline
 | 
						|
            if self.detection_pipeline:
 | 
						|
                processing_result = await self.detection_pipeline.execute_processing_phase(
 | 
						|
                    frame=frame,
 | 
						|
                    display_id=display_id,
 | 
						|
                    session_id=session_id,
 | 
						|
                    subscription_id=subscription_id,
 | 
						|
                    detected_regions=detected_regions
 | 
						|
                )
 | 
						|
 | 
						|
                logger.info(f"Processing phase completed for session {session_id}: "
 | 
						|
                           f"status={processing_result.get('status', 'unknown')}, "
 | 
						|
                           f"branches={len(processing_result.get('branch_results', {}))}, "
 | 
						|
                           f"actions={len(processing_result.get('actions_executed', []))}, "
 | 
						|
                           f"processing_time={processing_result.get('processing_time', 0):.3f}s")
 | 
						|
 | 
						|
                # Update stats
 | 
						|
                self.stats['pipelines_executed'] += 1
 | 
						|
 | 
						|
            else:
 | 
						|
                logger.error("Detection pipeline not available for processing phase")
 | 
						|
 | 
						|
        except Exception as e:
 | 
						|
            logger.error(f"Error in processing phase for session {session_id}: {e}", exc_info=True)
 | 
						|
 | 
						|
 | 
						|
    def set_subscription_info(self, subscription_info):
 | 
						|
        """
 | 
						|
        Set subscription info to access snapshot URL and other stream details.
 | 
						|
 | 
						|
        Args:
 | 
						|
            subscription_info: SubscriptionInfo object containing stream config
 | 
						|
        """
 | 
						|
        self.subscription_info = subscription_info
 | 
						|
        logger.debug(f"Set subscription info with snapshot_url: {subscription_info.stream_config.snapshot_url if subscription_info else None}")
 | 
						|
 | 
						|
    def set_session_id(self, display_id: str, session_id: str, subscription_id: str = None):
 | 
						|
        """
 | 
						|
        Set session ID for a display (from backend).
 | 
						|
        This is called when backend sends setSessionId after receiving imageDetection.
 | 
						|
 | 
						|
        Args:
 | 
						|
            display_id: Display identifier
 | 
						|
            session_id: Session identifier
 | 
						|
            subscription_id: Subscription identifier (displayId;cameraId) - needed for fallback
 | 
						|
        """
 | 
						|
        # Ensure session_id is always a string for consistent type handling
 | 
						|
        session_id = str(session_id) if session_id is not None else None
 | 
						|
        self.active_sessions[display_id] = session_id
 | 
						|
 | 
						|
        # Store subscription_id for fallback usage
 | 
						|
        if subscription_id:
 | 
						|
            self.display_to_subscription[display_id] = subscription_id
 | 
						|
            logger.info(f"Set session {session_id} for display {display_id} with subscription {subscription_id}")
 | 
						|
        else:
 | 
						|
            logger.info(f"Set session {session_id} for display {display_id}")
 | 
						|
 | 
						|
        # Check if we have a pending vehicle for this display
 | 
						|
        if display_id in self.pending_vehicles:
 | 
						|
            track_id = self.pending_vehicles[display_id]
 | 
						|
 | 
						|
            # Mark vehicle as processed with the session ID
 | 
						|
            self.tracker.mark_processed(track_id, session_id)
 | 
						|
            self.session_vehicles[session_id] = track_id
 | 
						|
 | 
						|
            # Mark vehicle as permanently processed (won't process again even after session clear)
 | 
						|
            # Use composite key to distinguish same track IDs across different cameras
 | 
						|
            camera_id = display_id  # Using display_id as camera_id for isolation
 | 
						|
            permanent_key = f"{camera_id}:{track_id}"
 | 
						|
            self.permanently_processed[permanent_key] = time.time()
 | 
						|
 | 
						|
            # Remove from pending
 | 
						|
            del self.pending_vehicles[display_id]
 | 
						|
 | 
						|
            logger.info(f"Assigned session {session_id} to vehicle {track_id}, marked as permanently processed")
 | 
						|
        else:
 | 
						|
            logger.warning(f"No pending vehicle found for display {display_id} when setting session {session_id}")
 | 
						|
 | 
						|
        # Check if we have pending processing data for this display
 | 
						|
        if display_id in self.pending_processing_data:
 | 
						|
            processing_data = self.pending_processing_data[display_id]
 | 
						|
 | 
						|
            # Trigger the processing phase asynchronously
 | 
						|
            asyncio.create_task(self._execute_processing_phase(
 | 
						|
                processing_data=processing_data,
 | 
						|
                session_id=session_id,
 | 
						|
                display_id=display_id
 | 
						|
            ))
 | 
						|
 | 
						|
            # Remove from pending processing
 | 
						|
            del self.pending_processing_data[display_id]
 | 
						|
 | 
						|
            logger.info(f"Triggered processing phase for session {session_id} on display {display_id}")
 | 
						|
        else:
 | 
						|
            logger.warning(f"No pending processing data found for display {display_id} when setting session {session_id}")
 | 
						|
 | 
						|
            # FALLBACK: Execute pipeline for POS-initiated sessions
 | 
						|
            # Skip if session_id is None (no car present or car has left)
 | 
						|
            if session_id is not None:
 | 
						|
                # Use stored subscription_id instead of creating fake one
 | 
						|
                stored_subscription_id = self.display_to_subscription.get(display_id)
 | 
						|
                if stored_subscription_id:
 | 
						|
                    logger.info(f"[FALLBACK] Triggering fallback pipeline for session {session_id} on display {display_id} with subscription {stored_subscription_id}")
 | 
						|
 | 
						|
                    # Trigger the fallback pipeline asynchronously with real subscription_id
 | 
						|
                    asyncio.create_task(self._execute_fallback_pipeline(
 | 
						|
                        display_id=display_id,
 | 
						|
                        session_id=session_id,
 | 
						|
                        subscription_id=stored_subscription_id
 | 
						|
                    ))
 | 
						|
                else:
 | 
						|
                    logger.error(f"[FALLBACK] No subscription_id stored for display {display_id}, cannot execute fallback pipeline")
 | 
						|
            else:
 | 
						|
                logger.debug(f"[FALLBACK] Skipping pipeline execution for session_id=None on display {display_id}")
 | 
						|
 | 
						|
    def clear_session_id(self, session_id: str):
 | 
						|
        """
 | 
						|
        Clear session ID (post-fueling).
 | 
						|
 | 
						|
        Args:
 | 
						|
            session_id: Session identifier to clear
 | 
						|
        """
 | 
						|
        # Mark session as cleared
 | 
						|
        self.cleared_sessions[session_id] = time.time()
 | 
						|
 | 
						|
        # Clear from tracker
 | 
						|
        self.tracker.clear_session(session_id)
 | 
						|
 | 
						|
        # Remove from active sessions
 | 
						|
        display_to_remove = None
 | 
						|
        for display_id, sess_id in self.active_sessions.items():
 | 
						|
            if sess_id == session_id:
 | 
						|
                display_to_remove = display_id
 | 
						|
                break
 | 
						|
 | 
						|
        if display_to_remove:
 | 
						|
            del self.active_sessions[display_to_remove]
 | 
						|
 | 
						|
        if session_id in self.session_vehicles:
 | 
						|
            del self.session_vehicles[session_id]
 | 
						|
 | 
						|
        logger.info(f"Cleared session {session_id}")
 | 
						|
 | 
						|
        # Clean old cleared sessions (older than 5 minutes)
 | 
						|
        current_time = time.time()
 | 
						|
        old_sessions = [
 | 
						|
            sid for sid, clear_time in self.cleared_sessions.items()
 | 
						|
            if (current_time - clear_time) > 300
 | 
						|
        ]
 | 
						|
        for sid in old_sessions:
 | 
						|
            del self.cleared_sessions[sid]
 | 
						|
 | 
						|
    def get_session_for_display(self, display_id: str) -> Optional[str]:
 | 
						|
        """Get active session for a display."""
 | 
						|
        return self.active_sessions.get(display_id)
 | 
						|
 | 
						|
    def reset_tracking(self):
 | 
						|
        """Reset all tracking state."""
 | 
						|
        self.tracker.reset_tracking()
 | 
						|
        self.active_sessions.clear()
 | 
						|
        self.session_vehicles.clear()
 | 
						|
        self.cleared_sessions.clear()
 | 
						|
        self.pending_vehicles.clear()
 | 
						|
        self.pending_processing_data.clear()
 | 
						|
        self.display_to_subscription.clear()
 | 
						|
        self.permanently_processed.clear()
 | 
						|
        self.progression_stages.clear()
 | 
						|
        self.last_detection_time.clear()
 | 
						|
        logger.info("Tracking pipeline integration reset")
 | 
						|
 | 
						|
    def get_statistics(self) -> Dict[str, Any]:
 | 
						|
        """Get comprehensive statistics."""
 | 
						|
        tracker_stats = self.tracker.get_statistics()
 | 
						|
        validator_stats = self.validator.get_statistics()
 | 
						|
 | 
						|
        return {
 | 
						|
            'integration': self.stats,
 | 
						|
            'tracker': tracker_stats,
 | 
						|
            'validator': validator_stats,
 | 
						|
            'active_sessions': len(self.active_sessions),
 | 
						|
            'cleared_sessions': len(self.cleared_sessions)
 | 
						|
        }
 | 
						|
 | 
						|
    async def _check_car_abandonment(self, display_id: str, subscription_id: str):
 | 
						|
        """
 | 
						|
        Check if a car has abandoned the fueling process (left after getting car_wait_staff stage).
 | 
						|
 | 
						|
        Args:
 | 
						|
            display_id: Display identifier
 | 
						|
            subscription_id: Subscription identifier
 | 
						|
        """
 | 
						|
        current_time = time.time()
 | 
						|
 | 
						|
        # Check all sessions in car_wait_staff stage
 | 
						|
        abandoned_sessions = []
 | 
						|
        for session_id, stage in self.progression_stages.items():
 | 
						|
            if stage == "car_wait_staff":
 | 
						|
                # Check if we have recent detections for this session's display
 | 
						|
                session_display = None
 | 
						|
                for disp_id, sess_id in self.active_sessions.items():
 | 
						|
                    if sess_id == session_id:
 | 
						|
                        session_display = disp_id
 | 
						|
                        break
 | 
						|
 | 
						|
                if session_display:
 | 
						|
                    last_detection = self.last_detection_time.get(session_display, 0)
 | 
						|
                    time_since_detection = current_time - last_detection
 | 
						|
 | 
						|
                    logger.info(f"[ABANDON CHECK] Session {session_id} (display: {session_display}): "
 | 
						|
                              f"time_since_detection={time_since_detection:.1f}s, "
 | 
						|
                              f"timeout={self.abandonment_timeout}s")
 | 
						|
 | 
						|
                    if time_since_detection > self.abandonment_timeout:
 | 
						|
                        logger.warning(f"🚨 Car abandonment detected: session {session_id}, "
 | 
						|
                                  f"no detection for {time_since_detection:.1f}s")
 | 
						|
                        abandoned_sessions.append(session_id)
 | 
						|
                else:
 | 
						|
                    logger.debug(f"[ABANDON CHECK] Session {session_id} has no associated display")
 | 
						|
 | 
						|
        # Send abandonment detection for each abandoned session
 | 
						|
        for session_id in abandoned_sessions:
 | 
						|
            await self._send_abandonment_detection(subscription_id, session_id)
 | 
						|
            # Remove from progression stages to avoid repeated detection
 | 
						|
            if session_id in self.progression_stages:
 | 
						|
                del self.progression_stages[session_id]
 | 
						|
                logger.info(f"[ABANDON] Removed session {session_id} from progression_stages after notification")
 | 
						|
 | 
						|
    async def _send_abandonment_detection(self, subscription_id: str, session_id: str):
 | 
						|
        """
 | 
						|
        Send imageDetection with null detection to indicate car abandonment.
 | 
						|
 | 
						|
        Args:
 | 
						|
            subscription_id: Subscription identifier
 | 
						|
            session_id: Session ID of the abandoned car
 | 
						|
        """
 | 
						|
        try:
 | 
						|
            # Import here to avoid circular imports
 | 
						|
            from ..communication.messages import create_image_detection
 | 
						|
 | 
						|
            # Create abandonment detection message with null detection
 | 
						|
            detection_message = create_image_detection(
 | 
						|
                subscription_identifier=subscription_id,
 | 
						|
                detection_data=None,  # Null detection indicates abandonment
 | 
						|
                model_id=self.model_id,
 | 
						|
                model_name=self.pipeline_parser.tracking_config.model_id if self.pipeline_parser.tracking_config else "tracking_model"
 | 
						|
            )
 | 
						|
 | 
						|
            # Send to backend via WebSocket if sender is available
 | 
						|
            if self.message_sender:
 | 
						|
                await self.message_sender(detection_message)
 | 
						|
                logger.info(f"[CAR ABANDONMENT] Sent null detection for session {session_id}")
 | 
						|
            else:
 | 
						|
                logger.info(f"[CAR ABANDONMENT] No message sender available, would send: {detection_message}")
 | 
						|
 | 
						|
        except Exception as e:
 | 
						|
            logger.error(f"Error sending abandonment detection: {e}", exc_info=True)
 | 
						|
 | 
						|
    def set_progression_stage(self, session_id: str, stage: str):
 | 
						|
        """
 | 
						|
        Set progression stage for a session (from backend setProgessionStage message).
 | 
						|
 | 
						|
        Args:
 | 
						|
            session_id: Session identifier
 | 
						|
            stage: Progression stage (e.g., "car_wait_staff")
 | 
						|
        """
 | 
						|
        self.progression_stages[session_id] = stage
 | 
						|
        logger.info(f"Set progression stage for session {session_id}: {stage}")
 | 
						|
 | 
						|
        # If car reaches car_wait_staff, start monitoring for abandonment
 | 
						|
        if stage == "car_wait_staff":
 | 
						|
            logger.info(f"Started monitoring session {session_id} for car abandonment")
 | 
						|
 | 
						|
    def _fetch_snapshot(self) -> Optional[np.ndarray]:
 | 
						|
        """
 | 
						|
        Fetch high-quality snapshot from camera's snapshot URL.
 | 
						|
        Reusable method for both processing phase and fallback pipeline.
 | 
						|
 | 
						|
        Returns:
 | 
						|
            Snapshot frame or None if unavailable
 | 
						|
        """
 | 
						|
        if not (self.subscription_info and self.subscription_info.stream_config.snapshot_url):
 | 
						|
            logger.warning("[SNAPSHOT] No subscription info or snapshot URL available")
 | 
						|
            return None
 | 
						|
 | 
						|
        try:
 | 
						|
            from ..streaming.readers import HTTPSnapshotReader
 | 
						|
 | 
						|
            logger.info(f"[SNAPSHOT] Fetching snapshot for {self.subscription_info.camera_id}")
 | 
						|
            snapshot_reader = HTTPSnapshotReader(
 | 
						|
                camera_id=self.subscription_info.camera_id,
 | 
						|
                snapshot_url=self.subscription_info.stream_config.snapshot_url,
 | 
						|
                max_retries=3
 | 
						|
            )
 | 
						|
 | 
						|
            frame = snapshot_reader.fetch_single_snapshot()
 | 
						|
 | 
						|
            if frame is not None:
 | 
						|
                logger.info(f"[SNAPSHOT] Successfully fetched {frame.shape[1]}x{frame.shape[0]} snapshot")
 | 
						|
                return frame
 | 
						|
            else:
 | 
						|
                logger.warning("[SNAPSHOT] Failed to fetch snapshot")
 | 
						|
                return None
 | 
						|
 | 
						|
        except Exception as e:
 | 
						|
            logger.error(f"[SNAPSHOT] Error fetching snapshot: {e}", exc_info=True)
 | 
						|
            return None
 | 
						|
 | 
						|
    async def _execute_fallback_pipeline(self, display_id: str, session_id: str, subscription_id: str):
 | 
						|
        """
 | 
						|
        Execute fallback pipeline when sessionId is received without prior detection.
 | 
						|
        This handles POS-initiated sessions where backend starts transaction before car detection.
 | 
						|
 | 
						|
        Args:
 | 
						|
            display_id: Display identifier
 | 
						|
            session_id: Session ID from backend
 | 
						|
            subscription_id: Subscription identifier for pipeline execution
 | 
						|
        """
 | 
						|
        try:
 | 
						|
            logger.info(f"[FALLBACK PIPELINE] Executing for session {session_id}, display {display_id}")
 | 
						|
 | 
						|
            # Fetch fresh snapshot from camera
 | 
						|
            frame = self._fetch_snapshot()
 | 
						|
 | 
						|
            if frame is None:
 | 
						|
                logger.error(f"[FALLBACK] Failed to fetch snapshot for session {session_id}, cannot execute pipeline")
 | 
						|
                return
 | 
						|
 | 
						|
            logger.info(f"[FALLBACK] Using snapshot frame {frame.shape[1]}x{frame.shape[0]} for session {session_id}")
 | 
						|
 | 
						|
            # Check if detection pipeline is available
 | 
						|
            if not self.detection_pipeline:
 | 
						|
                logger.error(f"[FALLBACK] Detection pipeline not available for session {session_id}")
 | 
						|
                return
 | 
						|
 | 
						|
            # Execute detection phase to get detected regions
 | 
						|
            detection_result = await self.detection_pipeline.execute_detection_phase(
 | 
						|
                frame=frame,
 | 
						|
                display_id=display_id,
 | 
						|
                subscription_id=subscription_id
 | 
						|
            )
 | 
						|
 | 
						|
            logger.info(f"[FALLBACK] Detection phase completed for session {session_id}: "
 | 
						|
                       f"status={detection_result.get('status', 'unknown')}, "
 | 
						|
                       f"regions={list(detection_result.get('detected_regions', {}).keys())}")
 | 
						|
 | 
						|
            # If detection found regions, execute processing phase
 | 
						|
            detected_regions = detection_result.get('detected_regions', {})
 | 
						|
            if detected_regions:
 | 
						|
                processing_result = await self.detection_pipeline.execute_processing_phase(
 | 
						|
                    frame=frame,
 | 
						|
                    display_id=display_id,
 | 
						|
                    session_id=session_id,
 | 
						|
                    subscription_id=subscription_id,
 | 
						|
                    detected_regions=detected_regions
 | 
						|
                )
 | 
						|
 | 
						|
                logger.info(f"[FALLBACK] Processing phase completed for session {session_id}: "
 | 
						|
                           f"status={processing_result.get('status', 'unknown')}, "
 | 
						|
                           f"branches={len(processing_result.get('branch_results', {}))}, "
 | 
						|
                           f"actions={len(processing_result.get('actions_executed', []))}")
 | 
						|
 | 
						|
                # Update statistics
 | 
						|
                self.stats['pipelines_executed'] += 1
 | 
						|
 | 
						|
            else:
 | 
						|
                logger.warning(f"[FALLBACK] No detections found in snapshot for session {session_id}")
 | 
						|
 | 
						|
        except Exception as e:
 | 
						|
            logger.error(f"[FALLBACK] Error executing fallback pipeline for session {session_id}: {e}", exc_info=True)
 | 
						|
 | 
						|
    def _filter_small_frontals(self, tracking_results, frame):
 | 
						|
        """
 | 
						|
        Filter out frontal detections that are smaller than minimum bbox area percentage.
 | 
						|
        This prevents processing of cars from neighboring pumps that appear in camera view.
 | 
						|
 | 
						|
        Args:
 | 
						|
            tracking_results: YOLO tracking results with detections
 | 
						|
            frame: Input frame for calculating frame area
 | 
						|
 | 
						|
        Returns:
 | 
						|
            Modified tracking_results with small frontals removed
 | 
						|
        """
 | 
						|
        if not hasattr(tracking_results, 'detections') or not tracking_results.detections:
 | 
						|
            return tracking_results
 | 
						|
 | 
						|
        # Calculate frame area and minimum bbox area threshold
 | 
						|
        frame_area = frame.shape[0] * frame.shape[1]  # height * width
 | 
						|
        min_bbox_area = frame_area * (self.min_bbox_area_percentage / 100.0)
 | 
						|
 | 
						|
        # Filter detections
 | 
						|
        filtered_detections = []
 | 
						|
        filtered_count = 0
 | 
						|
 | 
						|
        for detection in tracking_results.detections:
 | 
						|
            # Calculate detection bbox area
 | 
						|
            bbox = detection.bbox  # Assuming bbox is [x1, y1, x2, y2]
 | 
						|
            bbox_area = (bbox[2] - bbox[0]) * (bbox[3] - bbox[1])
 | 
						|
 | 
						|
            if bbox_area >= min_bbox_area:
 | 
						|
                # Keep detection - bbox is large enough
 | 
						|
                filtered_detections.append(detection)
 | 
						|
            else:
 | 
						|
                # Filter out small detection
 | 
						|
                filtered_count += 1
 | 
						|
                area_percentage = (bbox_area / frame_area) * 100
 | 
						|
                logger.debug(f"Filtered small frontal: area={bbox_area:.0f}px² ({area_percentage:.1f}% of frame, "
 | 
						|
                           f"min required: {self.min_bbox_area_percentage}%)")
 | 
						|
 | 
						|
        # Update tracking results with filtered detections
 | 
						|
        tracking_results.detections = filtered_detections
 | 
						|
 | 
						|
        # Update statistics
 | 
						|
        if filtered_count > 0:
 | 
						|
            self.stats['frontals_filtered_small'] += filtered_count
 | 
						|
            logger.info(f"Filtered {filtered_count} small frontal detections, "
 | 
						|
                       f"{len(filtered_detections)} remaining (total filtered: {self.stats['frontals_filtered_small']})")
 | 
						|
 | 
						|
        return tracking_results
 | 
						|
 | 
						|
    def cleanup(self):
 | 
						|
        """Cleanup resources."""
 | 
						|
        self.executor.shutdown(wait=False)
 | 
						|
        self.reset_tracking()
 | 
						|
 | 
						|
 | 
						|
        # Cleanup detection pipeline
 | 
						|
        if self.detection_pipeline:
 | 
						|
            self.detection_pipeline.cleanup()
 | 
						|
 | 
						|
        logger.info("Tracking pipeline integration cleaned up") |