From e92efdbe11e6fe9254d2f44581fab2fc92546eb1 Mon Sep 17 00:00:00 2001 From: ziesorx Date: Tue, 30 Sep 2025 15:14:28 +0700 Subject: [PATCH] fix: custom subscriptionIdentifier --- core/streaming/manager.py | 9 +++++++-- core/tracking/integration.py | 35 +++++++++++++++++++++++------------ 2 files changed, 30 insertions(+), 14 deletions(-) diff --git a/core/streaming/manager.py b/core/streaming/manager.py index c082e70..497f1b8 100644 --- a/core/streaming/manager.py +++ b/core/streaming/manager.py @@ -606,8 +606,13 @@ class StreamManager: # Check if this subscription matches the display_id subscription_display_id = subscription_info.subscription_id.split(';')[0] if subscription_display_id == display_id and subscription_info.tracking_integration: - subscription_info.tracking_integration.set_session_id(display_id, session_id) - logger.debug(f"Set session {session_id} for display {display_id}") + # Pass the full subscription_id (displayId;cameraId) to the tracking integration + subscription_info.tracking_integration.set_session_id( + display_id, + session_id, + subscription_id=subscription_info.subscription_id + ) + logger.debug(f"Set session {session_id} for display {display_id} with subscription {subscription_info.subscription_id}") def clear_session_id(self, session_id: str): """Clear session ID from the specific tracking integration handling this session.""" diff --git a/core/tracking/integration.py b/core/tracking/integration.py index 7d5f3f8..58afcec 100644 --- a/core/tracking/integration.py +++ b/core/tracking/integration.py @@ -61,6 +61,7 @@ class TrackingPipelineIntegration: self.cleared_sessions: Dict[str, float] = {} # session_id -> clear_time self.pending_vehicles: Dict[str, int] = {} # display_id -> track_id (waiting for session ID) self.pending_processing_data: Dict[str, Dict] = {} # display_id -> processing data (waiting for session ID) + self.display_to_subscription: Dict[str, str] = {} # display_id -> subscription_id (for fallback) # Additional validators for enhanced flow control self.permanently_processed: Dict[str, float] = {} # "camera_id:track_id" -> process_time (never process again) @@ -459,7 +460,7 @@ class TrackingPipelineIntegration: self.subscription_info = subscription_info logger.debug(f"Set subscription info with snapshot_url: {subscription_info.stream_config.snapshot_url if subscription_info else None}") - def set_session_id(self, display_id: str, session_id: str): + def set_session_id(self, display_id: str, session_id: str, subscription_id: str = None): """ Set session ID for a display (from backend). This is called when backend sends setSessionId after receiving imageDetection. @@ -467,11 +468,18 @@ class TrackingPipelineIntegration: Args: display_id: Display identifier session_id: Session identifier + subscription_id: Subscription identifier (displayId;cameraId) - needed for fallback """ # Ensure session_id is always a string for consistent type handling session_id = str(session_id) if session_id is not None else None self.active_sessions[display_id] = session_id - logger.info(f"Set session {session_id} for display {display_id}") + + # Store subscription_id for fallback usage + if subscription_id: + self.display_to_subscription[display_id] = subscription_id + logger.info(f"Set session {session_id} for display {display_id} with subscription {subscription_id}") + else: + logger.info(f"Set session {session_id} for display {display_id}") # Check if we have a pending vehicle for this display if display_id in self.pending_vehicles: @@ -513,17 +521,19 @@ class TrackingPipelineIntegration: logger.warning(f"No pending processing data found for display {display_id} when setting session {session_id}") # FALLBACK: Execute pipeline for POS-initiated sessions - logger.info(f"[FALLBACK] Triggering fallback pipeline for session {session_id} on display {display_id}") + # Use stored subscription_id instead of creating fake one + stored_subscription_id = self.display_to_subscription.get(display_id) + if stored_subscription_id: + logger.info(f"[FALLBACK] Triggering fallback pipeline for session {session_id} on display {display_id} with subscription {stored_subscription_id}") - # Create subscription_id for fallback (needed for pipeline execution) - fallback_subscription_id = f"{display_id};fallback" - - # Trigger the fallback pipeline asynchronously - asyncio.create_task(self._execute_fallback_pipeline( - display_id=display_id, - session_id=session_id, - subscription_id=fallback_subscription_id - )) + # Trigger the fallback pipeline asynchronously with real subscription_id + asyncio.create_task(self._execute_fallback_pipeline( + display_id=display_id, + session_id=session_id, + subscription_id=stored_subscription_id + )) + else: + logger.error(f"[FALLBACK] No subscription_id stored for display {display_id}, cannot execute fallback pipeline") def clear_session_id(self, session_id: str): """ @@ -574,6 +584,7 @@ class TrackingPipelineIntegration: self.cleared_sessions.clear() self.pending_vehicles.clear() self.pending_processing_data.clear() + self.display_to_subscription.clear() self.permanently_processed.clear() self.progression_stages.clear() self.last_detection_time.clear()