Merge pull request 'fix: inference in reader thread' (#22) from dev into main
	
		
			
	
		
	
	
		
	
		
			All checks were successful
		
		
	
	
		
			
				
	
				Build Worker Base and Application Images / check-base-changes (push) Successful in 8s
				
			
		
			
				
	
				Build Worker Base and Application Images / build-base (push) Has been skipped
				
			
		
			
				
	
				Build Worker Base and Application Images / build-docker (push) Successful in 2m47s
				
			
		
			
				
	
				Build Worker Base and Application Images / deploy-stack (push) Successful in 1m3s
				
			
		
		
	
	
				
					
				
			
		
			All checks were successful
		
		
	
	Build Worker Base and Application Images / check-base-changes (push) Successful in 8s
				
			Build Worker Base and Application Images / build-base (push) Has been skipped
				
			Build Worker Base and Application Images / build-docker (push) Successful in 2m47s
				
			Build Worker Base and Application Images / deploy-stack (push) Successful in 1m3s
				
			Reviewed-on: #22
This commit is contained in:
		
						commit
						e37d88a570
					
				
					 2 changed files with 223 additions and 4 deletions
				
			
		| 
						 | 
				
			
			@ -5,6 +5,8 @@ Optimized for 1280x720@6fps RTSP and 2560x1440 HTTP snapshots.
 | 
			
		|||
import logging
 | 
			
		||||
import threading
 | 
			
		||||
import time
 | 
			
		||||
import queue
 | 
			
		||||
import asyncio
 | 
			
		||||
from typing import Dict, Set, Optional, List, Any
 | 
			
		||||
from dataclasses import dataclass
 | 
			
		||||
from collections import defaultdict
 | 
			
		||||
| 
						 | 
				
			
			@ -50,6 +52,64 @@ class StreamManager:
 | 
			
		|||
        self._camera_subscribers: Dict[str, Set[str]] = defaultdict(set)  # camera_id -> set of subscription_ids
 | 
			
		||||
        self._lock = threading.RLock()
 | 
			
		||||
 | 
			
		||||
        # Fair tracking queue system - per camera queues
 | 
			
		||||
        self._tracking_queues: Dict[str, queue.Queue] = {}  # camera_id -> queue
 | 
			
		||||
        self._tracking_workers = []
 | 
			
		||||
        self._stop_workers = threading.Event()
 | 
			
		||||
        self._dropped_frame_counts: Dict[str, int] = {}  # per-camera drop counts
 | 
			
		||||
 | 
			
		||||
        # Round-robin scheduling state
 | 
			
		||||
        self._camera_list = []  # Ordered list of active cameras
 | 
			
		||||
        self._camera_round_robin_index = 0
 | 
			
		||||
        self._round_robin_lock = threading.Lock()
 | 
			
		||||
 | 
			
		||||
        # Start worker threads for tracking processing
 | 
			
		||||
        num_workers = min(4, max_streams // 2 + 1)  # Scale with streams
 | 
			
		||||
        for i in range(num_workers):
 | 
			
		||||
            worker = threading.Thread(
 | 
			
		||||
                target=self._tracking_worker_loop,
 | 
			
		||||
                name=f"TrackingWorker-{i}",
 | 
			
		||||
                daemon=True
 | 
			
		||||
            )
 | 
			
		||||
            worker.start()
 | 
			
		||||
            self._tracking_workers.append(worker)
 | 
			
		||||
 | 
			
		||||
        logger.info(f"Started {num_workers} tracking worker threads")
 | 
			
		||||
 | 
			
		||||
    def _ensure_camera_queue(self, camera_id: str):
 | 
			
		||||
        """Ensure a tracking queue exists for the camera."""
 | 
			
		||||
        if camera_id not in self._tracking_queues:
 | 
			
		||||
            self._tracking_queues[camera_id] = queue.Queue(maxsize=10)  # 10 frames per camera
 | 
			
		||||
            self._dropped_frame_counts[camera_id] = 0
 | 
			
		||||
 | 
			
		||||
            with self._round_robin_lock:
 | 
			
		||||
                if camera_id not in self._camera_list:
 | 
			
		||||
                    self._camera_list.append(camera_id)
 | 
			
		||||
 | 
			
		||||
            logger.info(f"Created tracking queue for camera {camera_id}")
 | 
			
		||||
 | 
			
		||||
    def _remove_camera_queue(self, camera_id: str):
 | 
			
		||||
        """Remove tracking queue for a camera that's no longer active."""
 | 
			
		||||
        if camera_id in self._tracking_queues:
 | 
			
		||||
            # Clear any remaining items
 | 
			
		||||
            while not self._tracking_queues[camera_id].empty():
 | 
			
		||||
                try:
 | 
			
		||||
                    self._tracking_queues[camera_id].get_nowait()
 | 
			
		||||
                except queue.Empty:
 | 
			
		||||
                    break
 | 
			
		||||
 | 
			
		||||
            del self._tracking_queues[camera_id]
 | 
			
		||||
            del self._dropped_frame_counts[camera_id]
 | 
			
		||||
 | 
			
		||||
            with self._round_robin_lock:
 | 
			
		||||
                if camera_id in self._camera_list:
 | 
			
		||||
                    self._camera_list.remove(camera_id)
 | 
			
		||||
                    # Reset index if needed
 | 
			
		||||
                    if self._camera_round_robin_index >= len(self._camera_list):
 | 
			
		||||
                        self._camera_round_robin_index = 0
 | 
			
		||||
 | 
			
		||||
            logger.info(f"Removed tracking queue for camera {camera_id}")
 | 
			
		||||
 | 
			
		||||
    def add_subscription(self, subscription_id: str, stream_config: StreamConfig,
 | 
			
		||||
                        crop_coords: Optional[tuple] = None,
 | 
			
		||||
                        model_id: Optional[str] = None,
 | 
			
		||||
| 
						 | 
				
			
			@ -139,6 +199,7 @@ class StreamManager:
 | 
			
		|||
                reader.set_frame_callback(self._frame_callback)
 | 
			
		||||
                reader.start()
 | 
			
		||||
                self._streams[camera_id] = reader
 | 
			
		||||
                self._ensure_camera_queue(camera_id)  # Create tracking queue
 | 
			
		||||
                logger.info(f"\033[92m[RTSP] {camera_id} connected\033[0m")
 | 
			
		||||
 | 
			
		||||
            elif stream_config.snapshot_url:
 | 
			
		||||
| 
						 | 
				
			
			@ -153,6 +214,7 @@ class StreamManager:
 | 
			
		|||
                reader.set_frame_callback(self._frame_callback)
 | 
			
		||||
                reader.start()
 | 
			
		||||
                self._streams[camera_id] = reader
 | 
			
		||||
                self._ensure_camera_queue(camera_id)  # Create tracking queue
 | 
			
		||||
                logger.info(f"\033[92m[HTTP] {camera_id} connected\033[0m")
 | 
			
		||||
 | 
			
		||||
            else:
 | 
			
		||||
| 
						 | 
				
			
			@ -171,6 +233,7 @@ class StreamManager:
 | 
			
		|||
            try:
 | 
			
		||||
                self._streams[camera_id].stop()
 | 
			
		||||
                del self._streams[camera_id]
 | 
			
		||||
                self._remove_camera_queue(camera_id)  # Remove tracking queue
 | 
			
		||||
                # DON'T clear frames - they should persist until replaced
 | 
			
		||||
                # shared_cache_buffer.clear_camera(camera_id)  # REMOVED - frames should persist
 | 
			
		||||
                logger.info(f"Stopped stream for camera {camera_id} (frames preserved in buffer)")
 | 
			
		||||
| 
						 | 
				
			
			@ -193,8 +256,19 @@ class StreamManager:
 | 
			
		|||
                available_cameras = shared_cache_buffer.frame_buffer.get_camera_list()
 | 
			
		||||
                logger.info(f"\033[96m[BUFFER] {len(available_cameras)} active cameras: {', '.join(available_cameras)}\033[0m")
 | 
			
		||||
 | 
			
		||||
            # Process tracking for subscriptions with tracking integration
 | 
			
		||||
            self._process_tracking_for_camera(camera_id, frame)
 | 
			
		||||
            # Queue for tracking processing (non-blocking) - route to camera-specific queue
 | 
			
		||||
            if camera_id in self._tracking_queues:
 | 
			
		||||
                try:
 | 
			
		||||
                    self._tracking_queues[camera_id].put_nowait({
 | 
			
		||||
                        'frame': frame,
 | 
			
		||||
                        'timestamp': time.time()
 | 
			
		||||
                    })
 | 
			
		||||
                except queue.Full:
 | 
			
		||||
                    # Drop frame if camera queue is full (maintain real-time)
 | 
			
		||||
                    self._dropped_frame_counts[camera_id] += 1
 | 
			
		||||
 | 
			
		||||
                    if self._dropped_frame_counts[camera_id] % 50 == 0:
 | 
			
		||||
                        logger.warning(f"Dropped {self._dropped_frame_counts[camera_id]} frames for camera {camera_id} due to full queue")
 | 
			
		||||
 | 
			
		||||
        except Exception as e:
 | 
			
		||||
            logger.error(f"Error in frame callback for camera {camera_id}: {e}")
 | 
			
		||||
| 
						 | 
				
			
			@ -251,6 +325,127 @@ class StreamManager:
 | 
			
		|||
        except Exception as e:
 | 
			
		||||
            logger.error(f"Error processing tracking for camera {camera_id}: {e}")
 | 
			
		||||
 | 
			
		||||
    def _tracking_worker_loop(self):
 | 
			
		||||
        """Worker thread loop for round-robin processing of camera queues."""
 | 
			
		||||
        logger.info(f"Tracking worker {threading.current_thread().name} started")
 | 
			
		||||
 | 
			
		||||
        consecutive_empty = 0
 | 
			
		||||
        max_consecutive_empty = 10  # Sleep if all cameras empty this many times
 | 
			
		||||
 | 
			
		||||
        while not self._stop_workers.is_set():
 | 
			
		||||
            try:
 | 
			
		||||
                # Get next camera in round-robin fashion
 | 
			
		||||
                camera_id, item = self._get_next_camera_item()
 | 
			
		||||
 | 
			
		||||
                if camera_id is None:
 | 
			
		||||
                    # No cameras have items, sleep briefly
 | 
			
		||||
                    consecutive_empty += 1
 | 
			
		||||
                    if consecutive_empty >= max_consecutive_empty:
 | 
			
		||||
                        time.sleep(0.1)  # Sleep 100ms if nothing to process
 | 
			
		||||
                        consecutive_empty = 0
 | 
			
		||||
                    continue
 | 
			
		||||
 | 
			
		||||
                consecutive_empty = 0  # Reset counter when we find work
 | 
			
		||||
 | 
			
		||||
                frame = item['frame']
 | 
			
		||||
                timestamp = item['timestamp']
 | 
			
		||||
 | 
			
		||||
                # Check if frame is too old (drop if > 1 second old)
 | 
			
		||||
                age = time.time() - timestamp
 | 
			
		||||
                if age > 1.0:
 | 
			
		||||
                    logger.debug(f"Dropping old frame for {camera_id} (age: {age:.2f}s)")
 | 
			
		||||
                    continue
 | 
			
		||||
 | 
			
		||||
                # Process tracking for this camera's frame
 | 
			
		||||
                self._process_tracking_for_camera_sync(camera_id, frame)
 | 
			
		||||
 | 
			
		||||
            except Exception as e:
 | 
			
		||||
                logger.error(f"Error in tracking worker: {e}", exc_info=True)
 | 
			
		||||
 | 
			
		||||
        logger.info(f"Tracking worker {threading.current_thread().name} stopped")
 | 
			
		||||
 | 
			
		||||
    def _get_next_camera_item(self):
 | 
			
		||||
        """Get next item from camera queues using round-robin scheduling."""
 | 
			
		||||
        with self._round_robin_lock:
 | 
			
		||||
            if not self._camera_list:
 | 
			
		||||
                return None, None
 | 
			
		||||
 | 
			
		||||
            attempts = 0
 | 
			
		||||
            max_attempts = len(self._camera_list)
 | 
			
		||||
 | 
			
		||||
            while attempts < max_attempts:
 | 
			
		||||
                # Get current camera
 | 
			
		||||
                if self._camera_round_robin_index >= len(self._camera_list):
 | 
			
		||||
                    self._camera_round_robin_index = 0
 | 
			
		||||
 | 
			
		||||
                camera_id = self._camera_list[self._camera_round_robin_index]
 | 
			
		||||
 | 
			
		||||
                # Move to next camera for next call
 | 
			
		||||
                self._camera_round_robin_index = (self._camera_round_robin_index + 1) % len(self._camera_list)
 | 
			
		||||
 | 
			
		||||
                # Try to get item from this camera's queue
 | 
			
		||||
                if camera_id in self._tracking_queues:
 | 
			
		||||
                    try:
 | 
			
		||||
                        item = self._tracking_queues[camera_id].get_nowait()
 | 
			
		||||
                        return camera_id, item
 | 
			
		||||
                    except queue.Empty:
 | 
			
		||||
                        pass  # Try next camera
 | 
			
		||||
 | 
			
		||||
                attempts += 1
 | 
			
		||||
 | 
			
		||||
            return None, None  # All cameras empty
 | 
			
		||||
 | 
			
		||||
    def _process_tracking_for_camera_sync(self, camera_id: str, frame):
 | 
			
		||||
        """Synchronous version of tracking processing for worker threads."""
 | 
			
		||||
        try:
 | 
			
		||||
            with self._lock:
 | 
			
		||||
                subscription_ids = list(self._camera_subscribers.get(camera_id, []))
 | 
			
		||||
 | 
			
		||||
            for subscription_id in subscription_ids:
 | 
			
		||||
                subscription_info = self._subscriptions.get(subscription_id)
 | 
			
		||||
 | 
			
		||||
                if not subscription_info or not subscription_info.tracking_integration:
 | 
			
		||||
                    continue
 | 
			
		||||
 | 
			
		||||
                display_id = subscription_id.split(';')[0] if ';' in subscription_id else subscription_id
 | 
			
		||||
 | 
			
		||||
                try:
 | 
			
		||||
                    # Run async tracking in thread's event loop
 | 
			
		||||
                    loop = asyncio.new_event_loop()
 | 
			
		||||
                    asyncio.set_event_loop(loop)
 | 
			
		||||
                    try:
 | 
			
		||||
                        result = loop.run_until_complete(
 | 
			
		||||
                            subscription_info.tracking_integration.process_frame(
 | 
			
		||||
                                frame, display_id, subscription_id
 | 
			
		||||
                            )
 | 
			
		||||
                        )
 | 
			
		||||
 | 
			
		||||
                        # Log tracking results
 | 
			
		||||
                        if result:
 | 
			
		||||
                            tracked_count = len(result.get('tracked_vehicles', []))
 | 
			
		||||
                            validated_vehicle = result.get('validated_vehicle')
 | 
			
		||||
                            pipeline_result = result.get('pipeline_result')
 | 
			
		||||
 | 
			
		||||
                            if tracked_count > 0:
 | 
			
		||||
                                logger.info(f"[Tracking] {camera_id}: {tracked_count} vehicles tracked")
 | 
			
		||||
 | 
			
		||||
                            if validated_vehicle:
 | 
			
		||||
                                logger.info(f"[Tracking] {camera_id}: Vehicle {validated_vehicle['track_id']} "
 | 
			
		||||
                                          f"validated as {validated_vehicle['state']} "
 | 
			
		||||
                                          f"(confidence: {validated_vehicle['confidence']:.2f})")
 | 
			
		||||
 | 
			
		||||
                            if pipeline_result:
 | 
			
		||||
                                logger.info(f"[Pipeline] {camera_id}: {pipeline_result.get('status', 'unknown')} - "
 | 
			
		||||
                                          f"{pipeline_result.get('message', 'no message')}")
 | 
			
		||||
                    finally:
 | 
			
		||||
                        loop.close()
 | 
			
		||||
 | 
			
		||||
                except Exception as track_e:
 | 
			
		||||
                    logger.error(f"Error in tracking for {subscription_id}: {track_e}")
 | 
			
		||||
 | 
			
		||||
        except Exception as e:
 | 
			
		||||
            logger.error(f"Error processing tracking for camera {camera_id}: {e}")
 | 
			
		||||
 | 
			
		||||
    def get_frame(self, camera_id: str, crop_coords: Optional[tuple] = None):
 | 
			
		||||
        """Get the latest frame for a camera with optional cropping."""
 | 
			
		||||
        return shared_cache_buffer.get_frame(camera_id, crop_coords)
 | 
			
		||||
| 
						 | 
				
			
			@ -366,6 +561,30 @@ class StreamManager:
 | 
			
		|||
 | 
			
		||||
    def stop_all(self):
 | 
			
		||||
        """Stop all streams and clear all subscriptions."""
 | 
			
		||||
        # Signal workers to stop
 | 
			
		||||
        self._stop_workers.set()
 | 
			
		||||
 | 
			
		||||
        # Clear all camera queues
 | 
			
		||||
        for camera_id, camera_queue in list(self._tracking_queues.items()):
 | 
			
		||||
            while not camera_queue.empty():
 | 
			
		||||
                try:
 | 
			
		||||
                    camera_queue.get_nowait()
 | 
			
		||||
                except queue.Empty:
 | 
			
		||||
                    break
 | 
			
		||||
 | 
			
		||||
        # Wait for workers to finish
 | 
			
		||||
        for worker in self._tracking_workers:
 | 
			
		||||
            worker.join(timeout=2.0)
 | 
			
		||||
 | 
			
		||||
        # Clear queue management structures
 | 
			
		||||
        self._tracking_queues.clear()
 | 
			
		||||
        self._dropped_frame_counts.clear()
 | 
			
		||||
        with self._round_robin_lock:
 | 
			
		||||
            self._camera_list.clear()
 | 
			
		||||
            self._camera_round_robin_index = 0
 | 
			
		||||
 | 
			
		||||
        logger.info("Stopped all tracking worker threads")
 | 
			
		||||
 | 
			
		||||
        with self._lock:
 | 
			
		||||
            # Stop all streams
 | 
			
		||||
            for camera_id in list(self._streams.keys()):
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -113,8 +113,8 @@ class FFmpegRTSPReader(VideoReader):
 | 
			
		|||
        cmd = [
 | 
			
		||||
            'ffmpeg',
 | 
			
		||||
            # DO NOT REMOVE
 | 
			
		||||
            '-hwaccel', 'cuda',
 | 
			
		||||
            '-hwaccel_device', '0',
 | 
			
		||||
            # '-hwaccel', 'cuda',
 | 
			
		||||
            # '-hwaccel_device', '0',
 | 
			
		||||
            # Real-time input flags
 | 
			
		||||
            '-fflags', 'nobuffer+genpts',
 | 
			
		||||
            '-flags', 'low_delay',
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue