diff --git a/core/communication/websocket.py b/core/communication/websocket.py index c40c912..e5cbe72 100644 --- a/core/communication/websocket.py +++ b/core/communication/websocket.py @@ -471,10 +471,14 @@ class WebSocketHandler: # Update worker state worker_state.set_progression_stage(display_identifier, stage) + # Update tracking integration for car abandonment detection + session_id = worker_state.get_session_id(display_identifier) + if session_id: + shared_stream_manager.set_progression_stage(session_id, stage) + # If stage indicates session is cleared/finished, clear from tracking if stage in ['finished', 'cleared', 'idle']: # Get session ID for this display and clear it - session_id = worker_state.get_session_id(display_identifier) if session_id: shared_stream_manager.clear_session_id(session_id) logger.info(f"[Tracking] Cleared session {session_id} due to progression stage: {stage}") diff --git a/core/streaming/manager.py b/core/streaming/manager.py index 2e381e9..893f128 100644 --- a/core/streaming/manager.py +++ b/core/streaming/manager.py @@ -389,6 +389,14 @@ class StreamManager: subscription_info.tracking_integration.clear_session_id(session_id) logger.debug(f"Cleared session {session_id}") + def set_progression_stage(self, session_id: str, stage: str): + """Set progression stage for tracking integrations.""" + with self._lock: + for subscription_info in self._subscriptions.values(): + if subscription_info.tracking_integration: + subscription_info.tracking_integration.set_progression_stage(session_id, stage) + logger.debug(f"Set progression stage for session {session_id}: {stage}") + def get_tracking_stats(self) -> Dict[str, Any]: """Get tracking statistics from all subscriptions.""" stats = {} diff --git a/core/tracking/integration.py b/core/tracking/integration.py index 5427e29..961fab4 100644 --- a/core/tracking/integration.py +++ b/core/tracking/integration.py @@ -52,6 +52,12 @@ class TrackingPipelineIntegration: self.cleared_sessions: Dict[str, float] = {} # session_id -> clear_time self.pending_vehicles: Dict[str, int] = {} # display_id -> track_id (waiting for session ID) + # Additional validators for enhanced flow control + self.permanently_processed: Dict[int, float] = {} # track_id -> process_time (never process again) + self.progression_stages: Dict[str, str] = {} # session_id -> current_stage + self.last_detection_time: Dict[str, float] = {} # display_id -> last_detection_timestamp + self.abandonment_timeout = 3.0 # seconds to wait before declaring car abandoned + # Thread pool for pipeline execution self.executor = ThreadPoolExecutor(max_workers=2) @@ -170,6 +176,13 @@ class TrackingPipelineIntegration: frame ) + # Update last detection time for abandonment detection + if tracked_vehicles: + self.last_detection_time[display_id] = time.time() + + # Check for car abandonment (vehicle left after getting car_wait_staff stage) + await self._check_car_abandonment(display_id, subscription_id) + result['tracked_vehicles'] = [ { 'track_id': v.track_id, @@ -207,8 +220,8 @@ class TrackingPipelineIntegration: if (time.time() - clear_time) < 30: # 30 second cooldown session_cleared = True - # Skip same car after session clear - if self.validator.should_skip_same_car(vehicle, session_cleared): + # Skip same car after session clear or if permanently processed + if self.validator.should_skip_same_car(vehicle, session_cleared, self.permanently_processed): continue # Validate vehicle @@ -370,10 +383,13 @@ class TrackingPipelineIntegration: self.tracker.mark_processed(track_id, session_id) self.session_vehicles[session_id] = track_id + # Mark vehicle as permanently processed (won't process again even after session clear) + self.permanently_processed[track_id] = time.time() + # Remove from pending del self.pending_vehicles[display_id] - logger.info(f"Assigned session {session_id} to vehicle {track_id}, marked as processed") + logger.info(f"Assigned session {session_id} to vehicle {track_id}, marked as permanently processed") else: logger.warning(f"No pending vehicle found for display {display_id} when setting session {session_id}") @@ -425,6 +441,9 @@ class TrackingPipelineIntegration: self.session_vehicles.clear() self.cleared_sessions.clear() self.pending_vehicles.clear() + self.permanently_processed.clear() + self.progression_stages.clear() + self.last_detection_time.clear() logger.info("Tracking pipeline integration reset") def get_statistics(self) -> Dict[str, Any]: @@ -440,6 +459,88 @@ class TrackingPipelineIntegration: 'cleared_sessions': len(self.cleared_sessions) } + async def _check_car_abandonment(self, display_id: str, subscription_id: str): + """ + Check if a car has abandoned the fueling process (left after getting car_wait_staff stage). + + Args: + display_id: Display identifier + subscription_id: Subscription identifier + """ + current_time = time.time() + + # Check all sessions in car_wait_staff stage + abandoned_sessions = [] + for session_id, stage in self.progression_stages.items(): + if stage == "car_wait_staff": + # Check if we have recent detections for this session's display + session_display = None + for disp_id, sess_id in self.active_sessions.items(): + if sess_id == session_id: + session_display = disp_id + break + + if session_display: + last_detection = self.last_detection_time.get(session_display, 0) + time_since_detection = current_time - last_detection + + if time_since_detection > self.abandonment_timeout: + logger.info(f"Car abandonment detected: session {session_id}, " + f"no detection for {time_since_detection:.1f}s") + abandoned_sessions.append(session_id) + + # Send abandonment detection for each abandoned session + for session_id in abandoned_sessions: + await self._send_abandonment_detection(subscription_id, session_id) + # Remove from progression stages to avoid repeated detection + if session_id in self.progression_stages: + del self.progression_stages[session_id] + + async def _send_abandonment_detection(self, subscription_id: str, session_id: str): + """ + Send imageDetection with null detection to indicate car abandonment. + + Args: + subscription_id: Subscription identifier + session_id: Session ID of the abandoned car + """ + try: + # Import here to avoid circular imports + from ..communication.messages import create_image_detection + + # Create abandonment detection message with null detection + detection_message = create_image_detection( + subscription_identifier=subscription_id, + detection_data=None, # Null detection indicates abandonment + model_id=52, + model_name="front_rear_detection_v1" + ) + + # Send to backend via WebSocket if sender is available + if self.message_sender: + await self.message_sender(detection_message) + logger.info(f"[CAR ABANDONMENT] Sent null detection for session {session_id}") + else: + logger.info(f"[CAR ABANDONMENT] No message sender available, would send: {detection_message}") + + except Exception as e: + logger.error(f"Error sending abandonment detection: {e}", exc_info=True) + + def set_progression_stage(self, session_id: str, stage: str): + """ + Set progression stage for a session (from backend setProgessionStage message). + + Args: + session_id: Session identifier + stage: Progression stage (e.g., "car_wait_staff") + """ + self.progression_stages[session_id] = stage + logger.info(f"Set progression stage for session {session_id}: {stage}") + + # If car reaches car_wait_staff, start monitoring for abandonment + if stage == "car_wait_staff": + logger.info(f"Started monitoring session {session_id} for car abandonment") + def cleanup(self): """Cleanup resources.""" self.executor.shutdown(wait=False) diff --git a/core/tracking/validator.py b/core/tracking/validator.py index e39386f..f4e5cd7 100644 --- a/core/tracking/validator.py +++ b/core/tracking/validator.py @@ -352,17 +352,27 @@ class StableCarValidator: def should_skip_same_car(self, vehicle: TrackedVehicle, - session_cleared: bool = False) -> bool: + session_cleared: bool = False, + permanently_processed: Dict[int, float] = None) -> bool: """ Determine if we should skip processing for the same car after session clear. Args: vehicle: The tracked vehicle session_cleared: Whether the session was recently cleared + permanently_processed: Dict of permanently processed vehicles Returns: True if we should skip this vehicle """ + # Check if this vehicle was permanently processed (never process again) + if permanently_processed and vehicle.track_id in permanently_processed: + process_time = permanently_processed[vehicle.track_id] + time_since = time.time() - process_time + logger.debug(f"Skipping permanently processed vehicle {vehicle.track_id} " + f"(processed {time_since:.1f}s ago)") + return True + # If vehicle has a session_id but it was cleared, skip for a period if vehicle.session_id is None and vehicle.processed_pipeline and session_cleared: # Check if enough time has passed since processing