From 354ed9ce3cfae296450b2e747ac77e963d3080a4 Mon Sep 17 00:00:00 2001 From: ziesorx Date: Tue, 30 Sep 2025 15:46:32 +0700 Subject: [PATCH 1/3] fix: fallback when there is sessionId --- core/detection/pipeline.py | 92 ++++++++++++++++++++++++++++-------- core/tracking/integration.py | 26 +++++----- 2 files changed, 88 insertions(+), 30 deletions(-) diff --git a/core/detection/pipeline.py b/core/detection/pipeline.py index 076cdc9..d395f3a 100644 --- a/core/detection/pipeline.py +++ b/core/detection/pipeline.py @@ -64,6 +64,10 @@ class DetectionPipeline: # SessionId to processing results mapping (for combining with license plate results) self.session_processing_results = {} + # Field mappings from parallelActions (e.g., {"car_brand": "{car_brand_cls_v3.brand}"}) + self.field_mappings = {} + self._parse_field_mappings() + # Statistics self.stats = { 'detections_processed': 0, @@ -74,6 +78,25 @@ class DetectionPipeline: logger.info("DetectionPipeline initialized") + def _parse_field_mappings(self): + """ + Parse field mappings from parallelActions.postgresql_update_combined.fields. + Extracts mappings like {"car_brand": "{car_brand_cls_v3.brand}"} for dynamic field resolution. + """ + try: + if not self.pipeline_config or not hasattr(self.pipeline_config, 'parallel_actions'): + return + + for action in self.pipeline_config.parallel_actions: + if action.type.value == 'postgresql_update_combined': + fields = action.params.get('fields', {}) + self.field_mappings = fields + logger.info(f"[FIELD MAPPINGS] Parsed from pipeline config: {self.field_mappings}") + break + + except Exception as e: + logger.error(f"Error parsing field mappings: {e}", exc_info=True) + async def initialize(self) -> bool: """ Initialize all pipeline components including models, Redis, and database. @@ -165,6 +188,44 @@ class DetectionPipeline: logger.error(f"Error initializing detection model: {e}", exc_info=True) return False + def _extract_fields_from_branches(self, branch_results: Dict[str, Any]) -> Dict[str, Any]: + """ + Extract fields dynamically from branch results using field mappings. + + Args: + branch_results: Dictionary of branch execution results + + Returns: + Dictionary with extracted field values (e.g., {"car_brand": "Honda", "body_type": "Sedan"}) + """ + extracted = {} + + try: + for db_field_name, template in self.field_mappings.items(): + # Parse template like "{car_brand_cls_v3.brand}" -> branch_id="car_brand_cls_v3", field="brand" + if template.startswith('{') and template.endswith('}'): + var_name = template[1:-1] + if '.' in var_name: + branch_id, field_name = var_name.split('.', 1) + + # Look up value in branch_results + if branch_id in branch_results: + branch_data = branch_results[branch_id] + if isinstance(branch_data, dict) and 'result' in branch_data: + result_data = branch_data['result'] + if isinstance(result_data, dict) and field_name in result_data: + extracted[field_name] = result_data[field_name] + logger.debug(f"[DYNAMIC EXTRACT] {field_name}={result_data[field_name]} from branch {branch_id}") + else: + logger.debug(f"[DYNAMIC EXTRACT] Field '{field_name}' not found in branch {branch_id}") + else: + logger.debug(f"[DYNAMIC EXTRACT] Branch '{branch_id}' not in results") + + except Exception as e: + logger.error(f"Error extracting fields from branches: {e}", exc_info=True) + + return extracted + async def _on_license_plate_result(self, session_id: str, license_data: Dict[str, Any]): """ Callback for handling license plate results from LPR service. @@ -272,12 +333,12 @@ class DetectionPipeline: branch_results = self.session_processing_results[session_id_for_lookup] logger.info(f"[LICENSE PLATE] Retrieved processing results for session {session_id_for_lookup}") - if 'car_brand_cls_v2' in branch_results: - brand_result = branch_results['car_brand_cls_v2'].get('result', {}) - car_brand = brand_result.get('brand') - if 'car_bodytype_cls_v1' in branch_results: - bodytype_result = branch_results['car_bodytype_cls_v1'].get('result', {}) - body_type = bodytype_result.get('body_type') + # Extract fields dynamically using field mappings from pipeline config + extracted_fields = self._extract_fields_from_branches(branch_results) + car_brand = extracted_fields.get('brand') + body_type = extracted_fields.get('body_type') + + logger.info(f"[LICENSE PLATE] Extracted fields: brand={car_brand}, body_type={body_type}") # Clean up stored results after use del self.session_processing_results[session_id_for_lookup] @@ -1003,7 +1064,7 @@ class DetectionPipeline: Resolve field template using branch results and context. Args: - template: Template string like "{car_brand_cls_v2.brand}" + template: Template string like "{car_brand_cls_v3.brand}" branch_results: Dictionary of branch execution results context: Detection context @@ -1015,7 +1076,7 @@ class DetectionPipeline: if template.startswith('{') and template.endswith('}'): var_name = template[1:-1] - # Check for branch result reference (e.g., "car_brand_cls_v2.brand") + # Check for branch result reference (e.g., "car_brand_cls_v3.brand") if '.' in var_name: branch_id, field_name = var_name.split('.', 1) if branch_id in branch_results: @@ -1061,17 +1122,10 @@ class DetectionPipeline: logger.warning("No session_id in context for processing results") return - # Extract car brand from car_brand_cls_v2 results - car_brand = None - if 'car_brand_cls_v2' in branch_results: - brand_result = branch_results['car_brand_cls_v2'].get('result', {}) - car_brand = brand_result.get('brand') - - # Extract body type from car_bodytype_cls_v1 results - body_type = None - if 'car_bodytype_cls_v1' in branch_results: - bodytype_result = branch_results['car_bodytype_cls_v1'].get('result', {}) - body_type = bodytype_result.get('body_type') + # Extract fields dynamically using field mappings from pipeline config + extracted_fields = self._extract_fields_from_branches(branch_results) + car_brand = extracted_fields.get('brand') + body_type = extracted_fields.get('body_type') logger.info(f"[PROCESSING RESULTS] Completed for session {session_id}: " f"brand={car_brand}, bodyType={body_type}") diff --git a/core/tracking/integration.py b/core/tracking/integration.py index 58afcec..8e0d8fa 100644 --- a/core/tracking/integration.py +++ b/core/tracking/integration.py @@ -521,19 +521,23 @@ class TrackingPipelineIntegration: logger.warning(f"No pending processing data found for display {display_id} when setting session {session_id}") # FALLBACK: Execute pipeline for POS-initiated sessions - # Use stored subscription_id instead of creating fake one - stored_subscription_id = self.display_to_subscription.get(display_id) - if stored_subscription_id: - logger.info(f"[FALLBACK] Triggering fallback pipeline for session {session_id} on display {display_id} with subscription {stored_subscription_id}") + # Skip if session_id is None (no car present or car has left) + if session_id is not None: + # Use stored subscription_id instead of creating fake one + stored_subscription_id = self.display_to_subscription.get(display_id) + if stored_subscription_id: + logger.info(f"[FALLBACK] Triggering fallback pipeline for session {session_id} on display {display_id} with subscription {stored_subscription_id}") - # Trigger the fallback pipeline asynchronously with real subscription_id - asyncio.create_task(self._execute_fallback_pipeline( - display_id=display_id, - session_id=session_id, - subscription_id=stored_subscription_id - )) + # Trigger the fallback pipeline asynchronously with real subscription_id + asyncio.create_task(self._execute_fallback_pipeline( + display_id=display_id, + session_id=session_id, + subscription_id=stored_subscription_id + )) + else: + logger.error(f"[FALLBACK] No subscription_id stored for display {display_id}, cannot execute fallback pipeline") else: - logger.error(f"[FALLBACK] No subscription_id stored for display {display_id}, cannot execute fallback pipeline") + logger.debug(f"[FALLBACK] Skipping pipeline execution for session_id=None on display {display_id}") def clear_session_id(self, session_id: str): """ From 793beb15710cb46605a754a83b08abb0e4fe1d92 Mon Sep 17 00:00:00 2001 From: ziesorx Date: Tue, 30 Sep 2025 16:04:24 +0700 Subject: [PATCH 2/3] fix: tracking works but absent not work --- app.py | 9 +++-- core/communication/websocket.py | 10 ++++- core/streaming/manager.py | 71 +++++++++++++++++++++++++-------- 3 files changed, 68 insertions(+), 22 deletions(-) diff --git a/app.py b/app.py index eb1440f..7b82d23 100644 --- a/app.py +++ b/app.py @@ -201,10 +201,11 @@ else: os.makedirs("models", exist_ok=True) logger.info("Ensured models directory exists") -# Initialize stream manager with config value -from core.streaming import initialize_stream_manager -initialize_stream_manager(max_streams=config.get('max_streams', 10)) -logger.info(f"Initialized stream manager with max_streams={config.get('max_streams', 10)}") +# Stream manager already initialized at module level with max_streams=20 +# Calling initialize_stream_manager() creates a NEW instance, breaking references +# from core.streaming import initialize_stream_manager +# initialize_stream_manager(max_streams=config.get('max_streams', 10)) +logger.info(f"Using stream manager with max_streams=20 (module-level initialization)") # Frames are now stored in the shared cache buffer from core.streaming.buffers # latest_frames = {} # Deprecated - using shared_cache_buffer instead diff --git a/core/communication/websocket.py b/core/communication/websocket.py index e53096a..d20ee32 100644 --- a/core/communication/websocket.py +++ b/core/communication/websocket.py @@ -197,18 +197,24 @@ class WebSocketHandler: async def _handle_set_subscription_list(self, message: SetSubscriptionListMessage) -> None: """Handle setSubscriptionList message for declarative subscription management.""" - logger.info(f"[RX Processing] setSubscriptionList with {len(message.subscriptions)} subscriptions") + logger.info(f"🎯 [RX Processing] setSubscriptionList with {len(message.subscriptions)} subscriptions") + for i, sub in enumerate(message.subscriptions): + logger.info(f" 📋 Sub {i+1}: {sub.subscriptionIdentifier} (model: {sub.modelId})") # Update worker state with new subscriptions worker_state.set_subscriptions(message.subscriptions) # Phase 2: Download and manage models + logger.info("📦 Starting model download phase...") await self._ensure_models(message.subscriptions) + logger.info("✅ Model download phase complete") # Phase 3 & 4: Integrate with streaming management and tracking + logger.info("🎬 Starting stream subscription update...") await self._update_stream_subscriptions(message.subscriptions) + logger.info("✅ Stream subscription update complete") - logger.info("Subscription list updated successfully") + logger.info("🏁 Subscription list updated successfully") async def _ensure_models(self, subscriptions) -> None: """Ensure all required models are downloaded and available.""" diff --git a/core/streaming/manager.py b/core/streaming/manager.py index 497f1b8..2de86e4 100644 --- a/core/streaming/manager.py +++ b/core/streaming/manager.py @@ -85,8 +85,11 @@ class StreamManager: with self._round_robin_lock: if camera_id not in self._camera_list: self._camera_list.append(camera_id) - - logger.info(f"Created tracking queue for camera {camera_id}") + logger.info(f"✅ Created tracking queue for camera {camera_id}, camera_list now has {len(self._camera_list)} cameras: {self._camera_list}") + else: + logger.warning(f"Camera {camera_id} already in camera_list") + else: + logger.debug(f"Camera {camera_id} already has tracking queue") def _remove_camera_queue(self, camera_id: str): """Remove tracking queue for a camera that's no longer active.""" @@ -153,6 +156,10 @@ class StreamManager: if not success: self._remove_subscription_internal(subscription_id) return False + else: + # Stream already exists, but ensure queue exists too + logger.info(f"Stream already exists for {camera_id}, ensuring queue exists") + self._ensure_camera_queue(camera_id) logger.info(f"Added subscription {subscription_id} for camera {camera_id} " f"({len(self._camera_subscribers[camera_id])} total subscribers)") @@ -188,6 +195,7 @@ class StreamManager: def _start_stream(self, camera_id: str, stream_config: StreamConfig) -> bool: """Start a stream for the given camera.""" try: + logger.info(f"🚀 _start_stream called for {camera_id}") if stream_config.rtsp_url: # RTSP stream using FFmpeg subprocess with CUDA acceleration logger.info(f"\033[94m[RTSP] Starting {camera_id}\033[0m") @@ -199,7 +207,9 @@ class StreamManager: reader.set_frame_callback(self._frame_callback) reader.start() self._streams[camera_id] = reader + logger.info(f"🎬 About to call _ensure_camera_queue for {camera_id}") self._ensure_camera_queue(camera_id) # Create tracking queue + logger.info(f"✅ _ensure_camera_queue completed for {camera_id}") logger.info(f"\033[92m[RTSP] {camera_id} connected\033[0m") elif stream_config.snapshot_url: @@ -214,7 +224,9 @@ class StreamManager: reader.set_frame_callback(self._frame_callback) reader.start() self._streams[camera_id] = reader + logger.info(f"🎬 About to call _ensure_camera_queue for {camera_id}") self._ensure_camera_queue(camera_id) # Create tracking queue + logger.info(f"✅ _ensure_camera_queue completed for {camera_id}") logger.info(f"\033[92m[HTTP] {camera_id} connected\033[0m") else: @@ -334,18 +346,22 @@ class StreamManager: while not self._stop_workers.is_set(): try: + logger.debug(f"Worker {threading.current_thread().name} loop iteration, stop_event={self._stop_workers.is_set()}") + # Get next camera in round-robin fashion camera_id, item = self._get_next_camera_item() if camera_id is None: # No cameras have items, sleep briefly consecutive_empty += 1 + logger.debug(f"Worker {threading.current_thread().name}: All queues empty ({consecutive_empty}/{max_consecutive_empty})") if consecutive_empty >= max_consecutive_empty: time.sleep(0.1) # Sleep 100ms if nothing to process consecutive_empty = 0 continue consecutive_empty = 0 # Reset counter when we find work + logger.info(f"Worker {threading.current_thread().name}: Processing frame from {camera_id}") frame = item['frame'] timestamp = item['timestamp'] @@ -353,11 +369,13 @@ class StreamManager: # Check if frame is too old (drop if > 1 second old) age = time.time() - timestamp if age > 1.0: - logger.debug(f"Dropping old frame for {camera_id} (age: {age:.2f}s)") + logger.warning(f"Dropping old frame for {camera_id} (age: {age:.2f}s)") continue + logger.info(f"Worker {threading.current_thread().name}: Calling tracking sync for {camera_id}") # Process tracking for this camera's frame self._process_tracking_for_camera_sync(camera_id, frame) + logger.info(f"Worker {threading.current_thread().name}: Finished tracking sync for {camera_id}") except Exception as e: logger.error(f"Error in tracking worker: {e}", exc_info=True) @@ -367,32 +385,48 @@ class StreamManager: def _get_next_camera_item(self): """Get next item from camera queues using round-robin scheduling.""" with self._round_robin_lock: - if not self._camera_list: + # Get current list of cameras from actual tracking queues (central state) + camera_list = list(self._tracking_queues.keys()) + + # Debug: show ALL state + logger.info(f"🔍 _tracking_queues keys: {list(self._tracking_queues.keys())}") + logger.info(f"🔍 _streams keys: {list(self._streams.keys())}") + logger.info(f"🔍 _subscriptions keys: {list(self._subscriptions.keys())}") + + if not camera_list: + logger.warning("⚠️ _get_next_camera_item: No cameras have tracking queues yet, but streams/subscriptions exist!") return None, None + logger.debug(f"_get_next_camera_item: {len(camera_list)} cameras with queues: {camera_list}") + attempts = 0 - max_attempts = len(self._camera_list) + max_attempts = len(camera_list) while attempts < max_attempts: - # Get current camera - if self._camera_round_robin_index >= len(self._camera_list): + # Get current camera using round-robin index + if self._camera_round_robin_index >= len(camera_list): self._camera_round_robin_index = 0 - camera_id = self._camera_list[self._camera_round_robin_index] + camera_id = camera_list[self._camera_round_robin_index] + logger.debug(f"_get_next_camera_item: Trying camera {camera_id} (attempt {attempts + 1}/{max_attempts})") # Move to next camera for next call - self._camera_round_robin_index = (self._camera_round_robin_index + 1) % len(self._camera_list) + self._camera_round_robin_index = (self._camera_round_robin_index + 1) % len(camera_list) # Try to get item from this camera's queue - if camera_id in self._tracking_queues: - try: - item = self._tracking_queues[camera_id].get_nowait() - return camera_id, item - except queue.Empty: - pass # Try next camera + queue_size = self._tracking_queues[camera_id].qsize() + logger.debug(f"_get_next_camera_item: Camera {camera_id} queue has {queue_size} items") + try: + item = self._tracking_queues[camera_id].get_nowait() + logger.info(f"_get_next_camera_item: Got item from {camera_id}") + return camera_id, item + except queue.Empty: + logger.debug(f"_get_next_camera_item: Camera {camera_id} queue empty") + pass # Try next camera attempts += 1 + logger.debug("_get_next_camera_item: All cameras empty") return None, None # All cameras empty def _process_tracking_for_camera_sync(self, camera_id: str, frame): @@ -404,7 +438,12 @@ class StreamManager: for subscription_id in subscription_ids: subscription_info = self._subscriptions.get(subscription_id) - if not subscription_info or not subscription_info.tracking_integration: + if not subscription_info: + logger.warning(f"No subscription info found for {subscription_id}") + continue + + if not subscription_info.tracking_integration: + logger.debug(f"No tracking integration for {subscription_id} (camera {camera_id}), skipping inference") continue display_id = subscription_id.split(';')[0] if ';' in subscription_id else subscription_id From 3ed7a2cd53dbf3fd06055fc189f3b3f1368770d7 Mon Sep 17 00:00:00 2001 From: ziesorx Date: Tue, 30 Sep 2025 16:20:39 +0700 Subject: [PATCH 3/3] fix: abandonment works --- core/communication/websocket.py | 10 ++-------- core/streaming/manager.py | 31 ++----------------------------- core/tracking/integration.py | 11 ++++++++++- 3 files changed, 14 insertions(+), 38 deletions(-) diff --git a/core/communication/websocket.py b/core/communication/websocket.py index d20ee32..e53096a 100644 --- a/core/communication/websocket.py +++ b/core/communication/websocket.py @@ -197,24 +197,18 @@ class WebSocketHandler: async def _handle_set_subscription_list(self, message: SetSubscriptionListMessage) -> None: """Handle setSubscriptionList message for declarative subscription management.""" - logger.info(f"🎯 [RX Processing] setSubscriptionList with {len(message.subscriptions)} subscriptions") - for i, sub in enumerate(message.subscriptions): - logger.info(f" 📋 Sub {i+1}: {sub.subscriptionIdentifier} (model: {sub.modelId})") + logger.info(f"[RX Processing] setSubscriptionList with {len(message.subscriptions)} subscriptions") # Update worker state with new subscriptions worker_state.set_subscriptions(message.subscriptions) # Phase 2: Download and manage models - logger.info("📦 Starting model download phase...") await self._ensure_models(message.subscriptions) - logger.info("✅ Model download phase complete") # Phase 3 & 4: Integrate with streaming management and tracking - logger.info("🎬 Starting stream subscription update...") await self._update_stream_subscriptions(message.subscriptions) - logger.info("✅ Stream subscription update complete") - logger.info("🏁 Subscription list updated successfully") + logger.info("Subscription list updated successfully") async def _ensure_models(self, subscriptions) -> None: """Ensure all required models are downloaded and available.""" diff --git a/core/streaming/manager.py b/core/streaming/manager.py index 2de86e4..c4ebd77 100644 --- a/core/streaming/manager.py +++ b/core/streaming/manager.py @@ -85,9 +85,7 @@ class StreamManager: with self._round_robin_lock: if camera_id not in self._camera_list: self._camera_list.append(camera_id) - logger.info(f"✅ Created tracking queue for camera {camera_id}, camera_list now has {len(self._camera_list)} cameras: {self._camera_list}") - else: - logger.warning(f"Camera {camera_id} already in camera_list") + logger.info(f"Created tracking queue for camera {camera_id}") else: logger.debug(f"Camera {camera_id} already has tracking queue") @@ -195,7 +193,6 @@ class StreamManager: def _start_stream(self, camera_id: str, stream_config: StreamConfig) -> bool: """Start a stream for the given camera.""" try: - logger.info(f"🚀 _start_stream called for {camera_id}") if stream_config.rtsp_url: # RTSP stream using FFmpeg subprocess with CUDA acceleration logger.info(f"\033[94m[RTSP] Starting {camera_id}\033[0m") @@ -207,9 +204,7 @@ class StreamManager: reader.set_frame_callback(self._frame_callback) reader.start() self._streams[camera_id] = reader - logger.info(f"🎬 About to call _ensure_camera_queue for {camera_id}") self._ensure_camera_queue(camera_id) # Create tracking queue - logger.info(f"✅ _ensure_camera_queue completed for {camera_id}") logger.info(f"\033[92m[RTSP] {camera_id} connected\033[0m") elif stream_config.snapshot_url: @@ -224,9 +219,7 @@ class StreamManager: reader.set_frame_callback(self._frame_callback) reader.start() self._streams[camera_id] = reader - logger.info(f"🎬 About to call _ensure_camera_queue for {camera_id}") self._ensure_camera_queue(camera_id) # Create tracking queue - logger.info(f"✅ _ensure_camera_queue completed for {camera_id}") logger.info(f"\033[92m[HTTP] {camera_id} connected\033[0m") else: @@ -346,22 +339,18 @@ class StreamManager: while not self._stop_workers.is_set(): try: - logger.debug(f"Worker {threading.current_thread().name} loop iteration, stop_event={self._stop_workers.is_set()}") - # Get next camera in round-robin fashion camera_id, item = self._get_next_camera_item() if camera_id is None: # No cameras have items, sleep briefly consecutive_empty += 1 - logger.debug(f"Worker {threading.current_thread().name}: All queues empty ({consecutive_empty}/{max_consecutive_empty})") if consecutive_empty >= max_consecutive_empty: time.sleep(0.1) # Sleep 100ms if nothing to process consecutive_empty = 0 continue consecutive_empty = 0 # Reset counter when we find work - logger.info(f"Worker {threading.current_thread().name}: Processing frame from {camera_id}") frame = item['frame'] timestamp = item['timestamp'] @@ -369,13 +358,11 @@ class StreamManager: # Check if frame is too old (drop if > 1 second old) age = time.time() - timestamp if age > 1.0: - logger.warning(f"Dropping old frame for {camera_id} (age: {age:.2f}s)") + logger.debug(f"Dropping old frame for {camera_id} (age: {age:.2f}s)") continue - logger.info(f"Worker {threading.current_thread().name}: Calling tracking sync for {camera_id}") # Process tracking for this camera's frame self._process_tracking_for_camera_sync(camera_id, frame) - logger.info(f"Worker {threading.current_thread().name}: Finished tracking sync for {camera_id}") except Exception as e: logger.error(f"Error in tracking worker: {e}", exc_info=True) @@ -388,17 +375,9 @@ class StreamManager: # Get current list of cameras from actual tracking queues (central state) camera_list = list(self._tracking_queues.keys()) - # Debug: show ALL state - logger.info(f"🔍 _tracking_queues keys: {list(self._tracking_queues.keys())}") - logger.info(f"🔍 _streams keys: {list(self._streams.keys())}") - logger.info(f"🔍 _subscriptions keys: {list(self._subscriptions.keys())}") - if not camera_list: - logger.warning("⚠️ _get_next_camera_item: No cameras have tracking queues yet, but streams/subscriptions exist!") return None, None - logger.debug(f"_get_next_camera_item: {len(camera_list)} cameras with queues: {camera_list}") - attempts = 0 max_attempts = len(camera_list) @@ -408,25 +387,19 @@ class StreamManager: self._camera_round_robin_index = 0 camera_id = camera_list[self._camera_round_robin_index] - logger.debug(f"_get_next_camera_item: Trying camera {camera_id} (attempt {attempts + 1}/{max_attempts})") # Move to next camera for next call self._camera_round_robin_index = (self._camera_round_robin_index + 1) % len(camera_list) # Try to get item from this camera's queue - queue_size = self._tracking_queues[camera_id].qsize() - logger.debug(f"_get_next_camera_item: Camera {camera_id} queue has {queue_size} items") try: item = self._tracking_queues[camera_id].get_nowait() - logger.info(f"_get_next_camera_item: Got item from {camera_id}") return camera_id, item except queue.Empty: - logger.debug(f"_get_next_camera_item: Camera {camera_id} queue empty") pass # Try next camera attempts += 1 - logger.debug("_get_next_camera_item: All cameras empty") return None, None # All cameras empty def _process_tracking_for_camera_sync(self, camera_id: str, frame): diff --git a/core/tracking/integration.py b/core/tracking/integration.py index 8e0d8fa..28e7d3a 100644 --- a/core/tracking/integration.py +++ b/core/tracking/integration.py @@ -220,8 +220,10 @@ class TrackingPipelineIntegration: ) # Update last detection time for abandonment detection + # Update when vehicles ARE detected, so when they leave, timestamp ages if tracked_vehicles: self.last_detection_time[display_id] = time.time() + logger.debug(f"Updated last_detection_time for {display_id}: {len(tracked_vehicles)} vehicles") # Check for car abandonment (vehicle left after getting car_wait_staff stage) await self._check_car_abandonment(display_id, subscription_id) @@ -632,10 +634,16 @@ class TrackingPipelineIntegration: last_detection = self.last_detection_time.get(session_display, 0) time_since_detection = current_time - last_detection + logger.info(f"[ABANDON CHECK] Session {session_id} (display: {session_display}): " + f"time_since_detection={time_since_detection:.1f}s, " + f"timeout={self.abandonment_timeout}s") + if time_since_detection > self.abandonment_timeout: - logger.info(f"Car abandonment detected: session {session_id}, " + logger.warning(f"🚨 Car abandonment detected: session {session_id}, " f"no detection for {time_since_detection:.1f}s") abandoned_sessions.append(session_id) + else: + logger.debug(f"[ABANDON CHECK] Session {session_id} has no associated display") # Send abandonment detection for each abandoned session for session_id in abandoned_sessions: @@ -643,6 +651,7 @@ class TrackingPipelineIntegration: # Remove from progression stages to avoid repeated detection if session_id in self.progression_stages: del self.progression_stages[session_id] + logger.info(f"[ABANDON] Removed session {session_id} from progression_stages after notification") async def _send_abandonment_detection(self, subscription_id: str, session_id: str): """