fix: abandonment works
	
		
			
	
		
	
	
		
	
		
			Some checks failed
		
		
	
	
		
			
				
	
				Build Worker Base and Application Images / deploy-stack (push) Blocked by required conditions
				
			
		
			
				
	
				Build Worker Base and Application Images / check-base-changes (push) Successful in 9s
				
			
		
			
				
	
				Build Worker Base and Application Images / build-base (push) Has been skipped
				
			
		
			
				
	
				Build Worker Base and Application Images / build-docker (push) Has been cancelled
				
			
		
		
	
	
				
					
				
			
		
			Some checks failed
		
		
	
	Build Worker Base and Application Images / deploy-stack (push) Blocked by required conditions
				
			Build Worker Base and Application Images / check-base-changes (push) Successful in 9s
				
			Build Worker Base and Application Images / build-base (push) Has been skipped
				
			Build Worker Base and Application Images / build-docker (push) Has been cancelled
				
			This commit is contained in:
		
							parent
							
								
									793beb1571
								
							
						
					
					
						commit
						3ed7a2cd53
					
				
					 3 changed files with 14 additions and 38 deletions
				
			
		| 
						 | 
				
			
			@ -197,24 +197,18 @@ class WebSocketHandler:
 | 
			
		|||
 | 
			
		||||
    async def _handle_set_subscription_list(self, message: SetSubscriptionListMessage) -> None:
 | 
			
		||||
        """Handle setSubscriptionList message for declarative subscription management."""
 | 
			
		||||
        logger.info(f"🎯 [RX Processing] setSubscriptionList with {len(message.subscriptions)} subscriptions")
 | 
			
		||||
        for i, sub in enumerate(message.subscriptions):
 | 
			
		||||
            logger.info(f"   📋 Sub {i+1}: {sub.subscriptionIdentifier} (model: {sub.modelId})")
 | 
			
		||||
        logger.info(f"[RX Processing] setSubscriptionList with {len(message.subscriptions)} subscriptions")
 | 
			
		||||
 | 
			
		||||
        # Update worker state with new subscriptions
 | 
			
		||||
        worker_state.set_subscriptions(message.subscriptions)
 | 
			
		||||
 | 
			
		||||
        # Phase 2: Download and manage models
 | 
			
		||||
        logger.info("📦 Starting model download phase...")
 | 
			
		||||
        await self._ensure_models(message.subscriptions)
 | 
			
		||||
        logger.info("✅ Model download phase complete")
 | 
			
		||||
 | 
			
		||||
        # Phase 3 & 4: Integrate with streaming management and tracking
 | 
			
		||||
        logger.info("🎬 Starting stream subscription update...")
 | 
			
		||||
        await self._update_stream_subscriptions(message.subscriptions)
 | 
			
		||||
        logger.info("✅ Stream subscription update complete")
 | 
			
		||||
 | 
			
		||||
        logger.info("🏁 Subscription list updated successfully")
 | 
			
		||||
        logger.info("Subscription list updated successfully")
 | 
			
		||||
 | 
			
		||||
    async def _ensure_models(self, subscriptions) -> None:
 | 
			
		||||
        """Ensure all required models are downloaded and available."""
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -85,9 +85,7 @@ class StreamManager:
 | 
			
		|||
            with self._round_robin_lock:
 | 
			
		||||
                if camera_id not in self._camera_list:
 | 
			
		||||
                    self._camera_list.append(camera_id)
 | 
			
		||||
                    logger.info(f"✅ Created tracking queue for camera {camera_id}, camera_list now has {len(self._camera_list)} cameras: {self._camera_list}")
 | 
			
		||||
                else:
 | 
			
		||||
                    logger.warning(f"Camera {camera_id} already in camera_list")
 | 
			
		||||
                    logger.info(f"Created tracking queue for camera {camera_id}")
 | 
			
		||||
        else:
 | 
			
		||||
            logger.debug(f"Camera {camera_id} already has tracking queue")
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -195,7 +193,6 @@ class StreamManager:
 | 
			
		|||
    def _start_stream(self, camera_id: str, stream_config: StreamConfig) -> bool:
 | 
			
		||||
        """Start a stream for the given camera."""
 | 
			
		||||
        try:
 | 
			
		||||
            logger.info(f"🚀 _start_stream called for {camera_id}")
 | 
			
		||||
            if stream_config.rtsp_url:
 | 
			
		||||
                # RTSP stream using FFmpeg subprocess with CUDA acceleration
 | 
			
		||||
                logger.info(f"\033[94m[RTSP] Starting {camera_id}\033[0m")
 | 
			
		||||
| 
						 | 
				
			
			@ -207,9 +204,7 @@ class StreamManager:
 | 
			
		|||
                reader.set_frame_callback(self._frame_callback)
 | 
			
		||||
                reader.start()
 | 
			
		||||
                self._streams[camera_id] = reader
 | 
			
		||||
                logger.info(f"🎬 About to call _ensure_camera_queue for {camera_id}")
 | 
			
		||||
                self._ensure_camera_queue(camera_id)  # Create tracking queue
 | 
			
		||||
                logger.info(f"✅ _ensure_camera_queue completed for {camera_id}")
 | 
			
		||||
                logger.info(f"\033[92m[RTSP] {camera_id} connected\033[0m")
 | 
			
		||||
 | 
			
		||||
            elif stream_config.snapshot_url:
 | 
			
		||||
| 
						 | 
				
			
			@ -224,9 +219,7 @@ class StreamManager:
 | 
			
		|||
                reader.set_frame_callback(self._frame_callback)
 | 
			
		||||
                reader.start()
 | 
			
		||||
                self._streams[camera_id] = reader
 | 
			
		||||
                logger.info(f"🎬 About to call _ensure_camera_queue for {camera_id}")
 | 
			
		||||
                self._ensure_camera_queue(camera_id)  # Create tracking queue
 | 
			
		||||
                logger.info(f"✅ _ensure_camera_queue completed for {camera_id}")
 | 
			
		||||
                logger.info(f"\033[92m[HTTP] {camera_id} connected\033[0m")
 | 
			
		||||
 | 
			
		||||
            else:
 | 
			
		||||
| 
						 | 
				
			
			@ -346,22 +339,18 @@ class StreamManager:
 | 
			
		|||
 | 
			
		||||
        while not self._stop_workers.is_set():
 | 
			
		||||
            try:
 | 
			
		||||
                logger.debug(f"Worker {threading.current_thread().name} loop iteration, stop_event={self._stop_workers.is_set()}")
 | 
			
		||||
 | 
			
		||||
                # Get next camera in round-robin fashion
 | 
			
		||||
                camera_id, item = self._get_next_camera_item()
 | 
			
		||||
 | 
			
		||||
                if camera_id is None:
 | 
			
		||||
                    # No cameras have items, sleep briefly
 | 
			
		||||
                    consecutive_empty += 1
 | 
			
		||||
                    logger.debug(f"Worker {threading.current_thread().name}: All queues empty ({consecutive_empty}/{max_consecutive_empty})")
 | 
			
		||||
                    if consecutive_empty >= max_consecutive_empty:
 | 
			
		||||
                        time.sleep(0.1)  # Sleep 100ms if nothing to process
 | 
			
		||||
                        consecutive_empty = 0
 | 
			
		||||
                    continue
 | 
			
		||||
 | 
			
		||||
                consecutive_empty = 0  # Reset counter when we find work
 | 
			
		||||
                logger.info(f"Worker {threading.current_thread().name}: Processing frame from {camera_id}")
 | 
			
		||||
 | 
			
		||||
                frame = item['frame']
 | 
			
		||||
                timestamp = item['timestamp']
 | 
			
		||||
| 
						 | 
				
			
			@ -369,13 +358,11 @@ class StreamManager:
 | 
			
		|||
                # Check if frame is too old (drop if > 1 second old)
 | 
			
		||||
                age = time.time() - timestamp
 | 
			
		||||
                if age > 1.0:
 | 
			
		||||
                    logger.warning(f"Dropping old frame for {camera_id} (age: {age:.2f}s)")
 | 
			
		||||
                    logger.debug(f"Dropping old frame for {camera_id} (age: {age:.2f}s)")
 | 
			
		||||
                    continue
 | 
			
		||||
 | 
			
		||||
                logger.info(f"Worker {threading.current_thread().name}: Calling tracking sync for {camera_id}")
 | 
			
		||||
                # Process tracking for this camera's frame
 | 
			
		||||
                self._process_tracking_for_camera_sync(camera_id, frame)
 | 
			
		||||
                logger.info(f"Worker {threading.current_thread().name}: Finished tracking sync for {camera_id}")
 | 
			
		||||
 | 
			
		||||
            except Exception as e:
 | 
			
		||||
                logger.error(f"Error in tracking worker: {e}", exc_info=True)
 | 
			
		||||
| 
						 | 
				
			
			@ -388,17 +375,9 @@ class StreamManager:
 | 
			
		|||
            # Get current list of cameras from actual tracking queues (central state)
 | 
			
		||||
            camera_list = list(self._tracking_queues.keys())
 | 
			
		||||
 | 
			
		||||
            # Debug: show ALL state
 | 
			
		||||
            logger.info(f"🔍 _tracking_queues keys: {list(self._tracking_queues.keys())}")
 | 
			
		||||
            logger.info(f"🔍 _streams keys: {list(self._streams.keys())}")
 | 
			
		||||
            logger.info(f"🔍 _subscriptions keys: {list(self._subscriptions.keys())}")
 | 
			
		||||
 | 
			
		||||
            if not camera_list:
 | 
			
		||||
                logger.warning("⚠️ _get_next_camera_item: No cameras have tracking queues yet, but streams/subscriptions exist!")
 | 
			
		||||
                return None, None
 | 
			
		||||
 | 
			
		||||
            logger.debug(f"_get_next_camera_item: {len(camera_list)} cameras with queues: {camera_list}")
 | 
			
		||||
 | 
			
		||||
            attempts = 0
 | 
			
		||||
            max_attempts = len(camera_list)
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -408,25 +387,19 @@ class StreamManager:
 | 
			
		|||
                    self._camera_round_robin_index = 0
 | 
			
		||||
 | 
			
		||||
                camera_id = camera_list[self._camera_round_robin_index]
 | 
			
		||||
                logger.debug(f"_get_next_camera_item: Trying camera {camera_id} (attempt {attempts + 1}/{max_attempts})")
 | 
			
		||||
 | 
			
		||||
                # Move to next camera for next call
 | 
			
		||||
                self._camera_round_robin_index = (self._camera_round_robin_index + 1) % len(camera_list)
 | 
			
		||||
 | 
			
		||||
                # Try to get item from this camera's queue
 | 
			
		||||
                queue_size = self._tracking_queues[camera_id].qsize()
 | 
			
		||||
                logger.debug(f"_get_next_camera_item: Camera {camera_id} queue has {queue_size} items")
 | 
			
		||||
                try:
 | 
			
		||||
                    item = self._tracking_queues[camera_id].get_nowait()
 | 
			
		||||
                    logger.info(f"_get_next_camera_item: Got item from {camera_id}")
 | 
			
		||||
                    return camera_id, item
 | 
			
		||||
                except queue.Empty:
 | 
			
		||||
                    logger.debug(f"_get_next_camera_item: Camera {camera_id} queue empty")
 | 
			
		||||
                    pass  # Try next camera
 | 
			
		||||
 | 
			
		||||
                attempts += 1
 | 
			
		||||
 | 
			
		||||
            logger.debug("_get_next_camera_item: All cameras empty")
 | 
			
		||||
            return None, None  # All cameras empty
 | 
			
		||||
 | 
			
		||||
    def _process_tracking_for_camera_sync(self, camera_id: str, frame):
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -220,8 +220,10 @@ class TrackingPipelineIntegration:
 | 
			
		|||
                )
 | 
			
		||||
 | 
			
		||||
                # Update last detection time for abandonment detection
 | 
			
		||||
                # Update when vehicles ARE detected, so when they leave, timestamp ages
 | 
			
		||||
                if tracked_vehicles:
 | 
			
		||||
                    self.last_detection_time[display_id] = time.time()
 | 
			
		||||
                    logger.debug(f"Updated last_detection_time for {display_id}: {len(tracked_vehicles)} vehicles")
 | 
			
		||||
 | 
			
		||||
                # Check for car abandonment (vehicle left after getting car_wait_staff stage)
 | 
			
		||||
                await self._check_car_abandonment(display_id, subscription_id)
 | 
			
		||||
| 
						 | 
				
			
			@ -632,10 +634,16 @@ class TrackingPipelineIntegration:
 | 
			
		|||
                    last_detection = self.last_detection_time.get(session_display, 0)
 | 
			
		||||
                    time_since_detection = current_time - last_detection
 | 
			
		||||
 | 
			
		||||
                    logger.info(f"[ABANDON CHECK] Session {session_id} (display: {session_display}): "
 | 
			
		||||
                              f"time_since_detection={time_since_detection:.1f}s, "
 | 
			
		||||
                              f"timeout={self.abandonment_timeout}s")
 | 
			
		||||
 | 
			
		||||
                    if time_since_detection > self.abandonment_timeout:
 | 
			
		||||
                        logger.info(f"Car abandonment detected: session {session_id}, "
 | 
			
		||||
                        logger.warning(f"🚨 Car abandonment detected: session {session_id}, "
 | 
			
		||||
                                  f"no detection for {time_since_detection:.1f}s")
 | 
			
		||||
                        abandoned_sessions.append(session_id)
 | 
			
		||||
                else:
 | 
			
		||||
                    logger.debug(f"[ABANDON CHECK] Session {session_id} has no associated display")
 | 
			
		||||
 | 
			
		||||
        # Send abandonment detection for each abandoned session
 | 
			
		||||
        for session_id in abandoned_sessions:
 | 
			
		||||
| 
						 | 
				
			
			@ -643,6 +651,7 @@ class TrackingPipelineIntegration:
 | 
			
		|||
            # Remove from progression stages to avoid repeated detection
 | 
			
		||||
            if session_id in self.progression_stages:
 | 
			
		||||
                del self.progression_stages[session_id]
 | 
			
		||||
                logger.info(f"[ABANDON] Removed session {session_id} from progression_stages after notification")
 | 
			
		||||
 | 
			
		||||
    async def _send_abandonment_detection(self, subscription_id: str, session_id: str):
 | 
			
		||||
        """
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue