Merge pull request 'fix: custom subscriptionIdentifier' (#23) from dev into main
All checks were successful
Build Worker Base and Application Images / check-base-changes (push) Successful in 7s
Build Worker Base and Application Images / build-base (push) Has been skipped
Build Worker Base and Application Images / build-docker (push) Successful in 2m48s
Build Worker Base and Application Images / deploy-stack (push) Successful in 17s

Reviewed-on: #23
This commit is contained in:
Chawanwit Pornnatwuttigul 2025-09-30 08:14:48 +00:00
commit 116a75d861
2 changed files with 30 additions and 14 deletions

View file

@ -606,8 +606,13 @@ class StreamManager:
# Check if this subscription matches the display_id
subscription_display_id = subscription_info.subscription_id.split(';')[0]
if subscription_display_id == display_id and subscription_info.tracking_integration:
subscription_info.tracking_integration.set_session_id(display_id, session_id)
logger.debug(f"Set session {session_id} for display {display_id}")
# Pass the full subscription_id (displayId;cameraId) to the tracking integration
subscription_info.tracking_integration.set_session_id(
display_id,
session_id,
subscription_id=subscription_info.subscription_id
)
logger.debug(f"Set session {session_id} for display {display_id} with subscription {subscription_info.subscription_id}")
def clear_session_id(self, session_id: str):
"""Clear session ID from the specific tracking integration handling this session."""

View file

@ -61,6 +61,7 @@ class TrackingPipelineIntegration:
self.cleared_sessions: Dict[str, float] = {} # session_id -> clear_time
self.pending_vehicles: Dict[str, int] = {} # display_id -> track_id (waiting for session ID)
self.pending_processing_data: Dict[str, Dict] = {} # display_id -> processing data (waiting for session ID)
self.display_to_subscription: Dict[str, str] = {} # display_id -> subscription_id (for fallback)
# Additional validators for enhanced flow control
self.permanently_processed: Dict[str, float] = {} # "camera_id:track_id" -> process_time (never process again)
@ -459,7 +460,7 @@ class TrackingPipelineIntegration:
self.subscription_info = subscription_info
logger.debug(f"Set subscription info with snapshot_url: {subscription_info.stream_config.snapshot_url if subscription_info else None}")
def set_session_id(self, display_id: str, session_id: str):
def set_session_id(self, display_id: str, session_id: str, subscription_id: str = None):
"""
Set session ID for a display (from backend).
This is called when backend sends setSessionId after receiving imageDetection.
@ -467,10 +468,17 @@ class TrackingPipelineIntegration:
Args:
display_id: Display identifier
session_id: Session identifier
subscription_id: Subscription identifier (displayId;cameraId) - needed for fallback
"""
# Ensure session_id is always a string for consistent type handling
session_id = str(session_id) if session_id is not None else None
self.active_sessions[display_id] = session_id
# Store subscription_id for fallback usage
if subscription_id:
self.display_to_subscription[display_id] = subscription_id
logger.info(f"Set session {session_id} for display {display_id} with subscription {subscription_id}")
else:
logger.info(f"Set session {session_id} for display {display_id}")
# Check if we have a pending vehicle for this display
@ -513,17 +521,19 @@ class TrackingPipelineIntegration:
logger.warning(f"No pending processing data found for display {display_id} when setting session {session_id}")
# FALLBACK: Execute pipeline for POS-initiated sessions
logger.info(f"[FALLBACK] Triggering fallback pipeline for session {session_id} on display {display_id}")
# Use stored subscription_id instead of creating fake one
stored_subscription_id = self.display_to_subscription.get(display_id)
if stored_subscription_id:
logger.info(f"[FALLBACK] Triggering fallback pipeline for session {session_id} on display {display_id} with subscription {stored_subscription_id}")
# Create subscription_id for fallback (needed for pipeline execution)
fallback_subscription_id = f"{display_id};fallback"
# Trigger the fallback pipeline asynchronously
# Trigger the fallback pipeline asynchronously with real subscription_id
asyncio.create_task(self._execute_fallback_pipeline(
display_id=display_id,
session_id=session_id,
subscription_id=fallback_subscription_id
subscription_id=stored_subscription_id
))
else:
logger.error(f"[FALLBACK] No subscription_id stored for display {display_id}, cannot execute fallback pipeline")
def clear_session_id(self, session_id: str):
"""
@ -574,6 +584,7 @@ class TrackingPipelineIntegration:
self.cleared_sessions.clear()
self.pending_vehicles.clear()
self.pending_processing_data.clear()
self.display_to_subscription.clear()
self.permanently_processed.clear()
self.progression_stages.clear()
self.last_detection_time.clear()