From 8d2a71fcd73daa8f6ddc156f72e20eb09b0bf3de Mon Sep 17 00:00:00 2001 From: ziesorx Date: Tue, 30 Sep 2025 14:21:29 +0700 Subject: [PATCH] fix: inference in reader thread --- core/streaming/manager.py | 223 +++++++++++++++++++++++++- core/streaming/readers/ffmpeg_rtsp.py | 4 +- 2 files changed, 223 insertions(+), 4 deletions(-) diff --git a/core/streaming/manager.py b/core/streaming/manager.py index e2f02d9..c082e70 100644 --- a/core/streaming/manager.py +++ b/core/streaming/manager.py @@ -5,6 +5,8 @@ Optimized for 1280x720@6fps RTSP and 2560x1440 HTTP snapshots. import logging import threading import time +import queue +import asyncio from typing import Dict, Set, Optional, List, Any from dataclasses import dataclass from collections import defaultdict @@ -50,6 +52,64 @@ class StreamManager: self._camera_subscribers: Dict[str, Set[str]] = defaultdict(set) # camera_id -> set of subscription_ids self._lock = threading.RLock() + # Fair tracking queue system - per camera queues + self._tracking_queues: Dict[str, queue.Queue] = {} # camera_id -> queue + self._tracking_workers = [] + self._stop_workers = threading.Event() + self._dropped_frame_counts: Dict[str, int] = {} # per-camera drop counts + + # Round-robin scheduling state + self._camera_list = [] # Ordered list of active cameras + self._camera_round_robin_index = 0 + self._round_robin_lock = threading.Lock() + + # Start worker threads for tracking processing + num_workers = min(4, max_streams // 2 + 1) # Scale with streams + for i in range(num_workers): + worker = threading.Thread( + target=self._tracking_worker_loop, + name=f"TrackingWorker-{i}", + daemon=True + ) + worker.start() + self._tracking_workers.append(worker) + + logger.info(f"Started {num_workers} tracking worker threads") + + def _ensure_camera_queue(self, camera_id: str): + """Ensure a tracking queue exists for the camera.""" + if camera_id not in self._tracking_queues: + self._tracking_queues[camera_id] = queue.Queue(maxsize=10) # 10 frames per camera + self._dropped_frame_counts[camera_id] = 0 + + with self._round_robin_lock: + if camera_id not in self._camera_list: + self._camera_list.append(camera_id) + + logger.info(f"Created tracking queue for camera {camera_id}") + + def _remove_camera_queue(self, camera_id: str): + """Remove tracking queue for a camera that's no longer active.""" + if camera_id in self._tracking_queues: + # Clear any remaining items + while not self._tracking_queues[camera_id].empty(): + try: + self._tracking_queues[camera_id].get_nowait() + except queue.Empty: + break + + del self._tracking_queues[camera_id] + del self._dropped_frame_counts[camera_id] + + with self._round_robin_lock: + if camera_id in self._camera_list: + self._camera_list.remove(camera_id) + # Reset index if needed + if self._camera_round_robin_index >= len(self._camera_list): + self._camera_round_robin_index = 0 + + logger.info(f"Removed tracking queue for camera {camera_id}") + def add_subscription(self, subscription_id: str, stream_config: StreamConfig, crop_coords: Optional[tuple] = None, model_id: Optional[str] = None, @@ -139,6 +199,7 @@ class StreamManager: reader.set_frame_callback(self._frame_callback) reader.start() self._streams[camera_id] = reader + self._ensure_camera_queue(camera_id) # Create tracking queue logger.info(f"\033[92m[RTSP] {camera_id} connected\033[0m") elif stream_config.snapshot_url: @@ -153,6 +214,7 @@ class StreamManager: reader.set_frame_callback(self._frame_callback) reader.start() self._streams[camera_id] = reader + self._ensure_camera_queue(camera_id) # Create tracking queue logger.info(f"\033[92m[HTTP] {camera_id} connected\033[0m") else: @@ -171,6 +233,7 @@ class StreamManager: try: self._streams[camera_id].stop() del self._streams[camera_id] + self._remove_camera_queue(camera_id) # Remove tracking queue # DON'T clear frames - they should persist until replaced # shared_cache_buffer.clear_camera(camera_id) # REMOVED - frames should persist logger.info(f"Stopped stream for camera {camera_id} (frames preserved in buffer)") @@ -193,8 +256,19 @@ class StreamManager: available_cameras = shared_cache_buffer.frame_buffer.get_camera_list() logger.info(f"\033[96m[BUFFER] {len(available_cameras)} active cameras: {', '.join(available_cameras)}\033[0m") - # Process tracking for subscriptions with tracking integration - self._process_tracking_for_camera(camera_id, frame) + # Queue for tracking processing (non-blocking) - route to camera-specific queue + if camera_id in self._tracking_queues: + try: + self._tracking_queues[camera_id].put_nowait({ + 'frame': frame, + 'timestamp': time.time() + }) + except queue.Full: + # Drop frame if camera queue is full (maintain real-time) + self._dropped_frame_counts[camera_id] += 1 + + if self._dropped_frame_counts[camera_id] % 50 == 0: + logger.warning(f"Dropped {self._dropped_frame_counts[camera_id]} frames for camera {camera_id} due to full queue") except Exception as e: logger.error(f"Error in frame callback for camera {camera_id}: {e}") @@ -251,6 +325,127 @@ class StreamManager: except Exception as e: logger.error(f"Error processing tracking for camera {camera_id}: {e}") + def _tracking_worker_loop(self): + """Worker thread loop for round-robin processing of camera queues.""" + logger.info(f"Tracking worker {threading.current_thread().name} started") + + consecutive_empty = 0 + max_consecutive_empty = 10 # Sleep if all cameras empty this many times + + while not self._stop_workers.is_set(): + try: + # Get next camera in round-robin fashion + camera_id, item = self._get_next_camera_item() + + if camera_id is None: + # No cameras have items, sleep briefly + consecutive_empty += 1 + if consecutive_empty >= max_consecutive_empty: + time.sleep(0.1) # Sleep 100ms if nothing to process + consecutive_empty = 0 + continue + + consecutive_empty = 0 # Reset counter when we find work + + frame = item['frame'] + timestamp = item['timestamp'] + + # Check if frame is too old (drop if > 1 second old) + age = time.time() - timestamp + if age > 1.0: + logger.debug(f"Dropping old frame for {camera_id} (age: {age:.2f}s)") + continue + + # Process tracking for this camera's frame + self._process_tracking_for_camera_sync(camera_id, frame) + + except Exception as e: + logger.error(f"Error in tracking worker: {e}", exc_info=True) + + logger.info(f"Tracking worker {threading.current_thread().name} stopped") + + def _get_next_camera_item(self): + """Get next item from camera queues using round-robin scheduling.""" + with self._round_robin_lock: + if not self._camera_list: + return None, None + + attempts = 0 + max_attempts = len(self._camera_list) + + while attempts < max_attempts: + # Get current camera + if self._camera_round_robin_index >= len(self._camera_list): + self._camera_round_robin_index = 0 + + camera_id = self._camera_list[self._camera_round_robin_index] + + # Move to next camera for next call + self._camera_round_robin_index = (self._camera_round_robin_index + 1) % len(self._camera_list) + + # Try to get item from this camera's queue + if camera_id in self._tracking_queues: + try: + item = self._tracking_queues[camera_id].get_nowait() + return camera_id, item + except queue.Empty: + pass # Try next camera + + attempts += 1 + + return None, None # All cameras empty + + def _process_tracking_for_camera_sync(self, camera_id: str, frame): + """Synchronous version of tracking processing for worker threads.""" + try: + with self._lock: + subscription_ids = list(self._camera_subscribers.get(camera_id, [])) + + for subscription_id in subscription_ids: + subscription_info = self._subscriptions.get(subscription_id) + + if not subscription_info or not subscription_info.tracking_integration: + continue + + display_id = subscription_id.split(';')[0] if ';' in subscription_id else subscription_id + + try: + # Run async tracking in thread's event loop + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + try: + result = loop.run_until_complete( + subscription_info.tracking_integration.process_frame( + frame, display_id, subscription_id + ) + ) + + # Log tracking results + if result: + tracked_count = len(result.get('tracked_vehicles', [])) + validated_vehicle = result.get('validated_vehicle') + pipeline_result = result.get('pipeline_result') + + if tracked_count > 0: + logger.info(f"[Tracking] {camera_id}: {tracked_count} vehicles tracked") + + if validated_vehicle: + logger.info(f"[Tracking] {camera_id}: Vehicle {validated_vehicle['track_id']} " + f"validated as {validated_vehicle['state']} " + f"(confidence: {validated_vehicle['confidence']:.2f})") + + if pipeline_result: + logger.info(f"[Pipeline] {camera_id}: {pipeline_result.get('status', 'unknown')} - " + f"{pipeline_result.get('message', 'no message')}") + finally: + loop.close() + + except Exception as track_e: + logger.error(f"Error in tracking for {subscription_id}: {track_e}") + + except Exception as e: + logger.error(f"Error processing tracking for camera {camera_id}: {e}") + def get_frame(self, camera_id: str, crop_coords: Optional[tuple] = None): """Get the latest frame for a camera with optional cropping.""" return shared_cache_buffer.get_frame(camera_id, crop_coords) @@ -366,6 +561,30 @@ class StreamManager: def stop_all(self): """Stop all streams and clear all subscriptions.""" + # Signal workers to stop + self._stop_workers.set() + + # Clear all camera queues + for camera_id, camera_queue in list(self._tracking_queues.items()): + while not camera_queue.empty(): + try: + camera_queue.get_nowait() + except queue.Empty: + break + + # Wait for workers to finish + for worker in self._tracking_workers: + worker.join(timeout=2.0) + + # Clear queue management structures + self._tracking_queues.clear() + self._dropped_frame_counts.clear() + with self._round_robin_lock: + self._camera_list.clear() + self._camera_round_robin_index = 0 + + logger.info("Stopped all tracking worker threads") + with self._lock: # Stop all streams for camera_id in list(self._streams.keys()): diff --git a/core/streaming/readers/ffmpeg_rtsp.py b/core/streaming/readers/ffmpeg_rtsp.py index 88f45ae..e469c9e 100644 --- a/core/streaming/readers/ffmpeg_rtsp.py +++ b/core/streaming/readers/ffmpeg_rtsp.py @@ -113,8 +113,8 @@ class FFmpegRTSPReader(VideoReader): cmd = [ 'ffmpeg', # DO NOT REMOVE - '-hwaccel', 'cuda', - '-hwaccel_device', '0', + # '-hwaccel', 'cuda', + # '-hwaccel_device', '0', # Real-time input flags '-fflags', 'nobuffer+genpts', '-flags', 'low_delay',