fix: inference in reader thread #22
2 changed files with 223 additions and 4 deletions
|
@ -5,6 +5,8 @@ Optimized for 1280x720@6fps RTSP and 2560x1440 HTTP snapshots.
|
|||
import logging
|
||||
import threading
|
||||
import time
|
||||
import queue
|
||||
import asyncio
|
||||
from typing import Dict, Set, Optional, List, Any
|
||||
from dataclasses import dataclass
|
||||
from collections import defaultdict
|
||||
|
@ -50,6 +52,64 @@ class StreamManager:
|
|||
self._camera_subscribers: Dict[str, Set[str]] = defaultdict(set) # camera_id -> set of subscription_ids
|
||||
self._lock = threading.RLock()
|
||||
|
||||
# Fair tracking queue system - per camera queues
|
||||
self._tracking_queues: Dict[str, queue.Queue] = {} # camera_id -> queue
|
||||
self._tracking_workers = []
|
||||
self._stop_workers = threading.Event()
|
||||
self._dropped_frame_counts: Dict[str, int] = {} # per-camera drop counts
|
||||
|
||||
# Round-robin scheduling state
|
||||
self._camera_list = [] # Ordered list of active cameras
|
||||
self._camera_round_robin_index = 0
|
||||
self._round_robin_lock = threading.Lock()
|
||||
|
||||
# Start worker threads for tracking processing
|
||||
num_workers = min(4, max_streams // 2 + 1) # Scale with streams
|
||||
for i in range(num_workers):
|
||||
worker = threading.Thread(
|
||||
target=self._tracking_worker_loop,
|
||||
name=f"TrackingWorker-{i}",
|
||||
daemon=True
|
||||
)
|
||||
worker.start()
|
||||
self._tracking_workers.append(worker)
|
||||
|
||||
logger.info(f"Started {num_workers} tracking worker threads")
|
||||
|
||||
def _ensure_camera_queue(self, camera_id: str):
|
||||
"""Ensure a tracking queue exists for the camera."""
|
||||
if camera_id not in self._tracking_queues:
|
||||
self._tracking_queues[camera_id] = queue.Queue(maxsize=10) # 10 frames per camera
|
||||
self._dropped_frame_counts[camera_id] = 0
|
||||
|
||||
with self._round_robin_lock:
|
||||
if camera_id not in self._camera_list:
|
||||
self._camera_list.append(camera_id)
|
||||
|
||||
logger.info(f"Created tracking queue for camera {camera_id}")
|
||||
|
||||
def _remove_camera_queue(self, camera_id: str):
|
||||
"""Remove tracking queue for a camera that's no longer active."""
|
||||
if camera_id in self._tracking_queues:
|
||||
# Clear any remaining items
|
||||
while not self._tracking_queues[camera_id].empty():
|
||||
try:
|
||||
self._tracking_queues[camera_id].get_nowait()
|
||||
except queue.Empty:
|
||||
break
|
||||
|
||||
del self._tracking_queues[camera_id]
|
||||
del self._dropped_frame_counts[camera_id]
|
||||
|
||||
with self._round_robin_lock:
|
||||
if camera_id in self._camera_list:
|
||||
self._camera_list.remove(camera_id)
|
||||
# Reset index if needed
|
||||
if self._camera_round_robin_index >= len(self._camera_list):
|
||||
self._camera_round_robin_index = 0
|
||||
|
||||
logger.info(f"Removed tracking queue for camera {camera_id}")
|
||||
|
||||
def add_subscription(self, subscription_id: str, stream_config: StreamConfig,
|
||||
crop_coords: Optional[tuple] = None,
|
||||
model_id: Optional[str] = None,
|
||||
|
@ -139,6 +199,7 @@ class StreamManager:
|
|||
reader.set_frame_callback(self._frame_callback)
|
||||
reader.start()
|
||||
self._streams[camera_id] = reader
|
||||
self._ensure_camera_queue(camera_id) # Create tracking queue
|
||||
logger.info(f"\033[92m[RTSP] {camera_id} connected\033[0m")
|
||||
|
||||
elif stream_config.snapshot_url:
|
||||
|
@ -153,6 +214,7 @@ class StreamManager:
|
|||
reader.set_frame_callback(self._frame_callback)
|
||||
reader.start()
|
||||
self._streams[camera_id] = reader
|
||||
self._ensure_camera_queue(camera_id) # Create tracking queue
|
||||
logger.info(f"\033[92m[HTTP] {camera_id} connected\033[0m")
|
||||
|
||||
else:
|
||||
|
@ -171,6 +233,7 @@ class StreamManager:
|
|||
try:
|
||||
self._streams[camera_id].stop()
|
||||
del self._streams[camera_id]
|
||||
self._remove_camera_queue(camera_id) # Remove tracking queue
|
||||
# DON'T clear frames - they should persist until replaced
|
||||
# shared_cache_buffer.clear_camera(camera_id) # REMOVED - frames should persist
|
||||
logger.info(f"Stopped stream for camera {camera_id} (frames preserved in buffer)")
|
||||
|
@ -193,8 +256,19 @@ class StreamManager:
|
|||
available_cameras = shared_cache_buffer.frame_buffer.get_camera_list()
|
||||
logger.info(f"\033[96m[BUFFER] {len(available_cameras)} active cameras: {', '.join(available_cameras)}\033[0m")
|
||||
|
||||
# Process tracking for subscriptions with tracking integration
|
||||
self._process_tracking_for_camera(camera_id, frame)
|
||||
# Queue for tracking processing (non-blocking) - route to camera-specific queue
|
||||
if camera_id in self._tracking_queues:
|
||||
try:
|
||||
self._tracking_queues[camera_id].put_nowait({
|
||||
'frame': frame,
|
||||
'timestamp': time.time()
|
||||
})
|
||||
except queue.Full:
|
||||
# Drop frame if camera queue is full (maintain real-time)
|
||||
self._dropped_frame_counts[camera_id] += 1
|
||||
|
||||
if self._dropped_frame_counts[camera_id] % 50 == 0:
|
||||
logger.warning(f"Dropped {self._dropped_frame_counts[camera_id]} frames for camera {camera_id} due to full queue")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error in frame callback for camera {camera_id}: {e}")
|
||||
|
@ -251,6 +325,127 @@ class StreamManager:
|
|||
except Exception as e:
|
||||
logger.error(f"Error processing tracking for camera {camera_id}: {e}")
|
||||
|
||||
def _tracking_worker_loop(self):
|
||||
"""Worker thread loop for round-robin processing of camera queues."""
|
||||
logger.info(f"Tracking worker {threading.current_thread().name} started")
|
||||
|
||||
consecutive_empty = 0
|
||||
max_consecutive_empty = 10 # Sleep if all cameras empty this many times
|
||||
|
||||
while not self._stop_workers.is_set():
|
||||
try:
|
||||
# Get next camera in round-robin fashion
|
||||
camera_id, item = self._get_next_camera_item()
|
||||
|
||||
if camera_id is None:
|
||||
# No cameras have items, sleep briefly
|
||||
consecutive_empty += 1
|
||||
if consecutive_empty >= max_consecutive_empty:
|
||||
time.sleep(0.1) # Sleep 100ms if nothing to process
|
||||
consecutive_empty = 0
|
||||
continue
|
||||
|
||||
consecutive_empty = 0 # Reset counter when we find work
|
||||
|
||||
frame = item['frame']
|
||||
timestamp = item['timestamp']
|
||||
|
||||
# Check if frame is too old (drop if > 1 second old)
|
||||
age = time.time() - timestamp
|
||||
if age > 1.0:
|
||||
logger.debug(f"Dropping old frame for {camera_id} (age: {age:.2f}s)")
|
||||
continue
|
||||
|
||||
# Process tracking for this camera's frame
|
||||
self._process_tracking_for_camera_sync(camera_id, frame)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error in tracking worker: {e}", exc_info=True)
|
||||
|
||||
logger.info(f"Tracking worker {threading.current_thread().name} stopped")
|
||||
|
||||
def _get_next_camera_item(self):
|
||||
"""Get next item from camera queues using round-robin scheduling."""
|
||||
with self._round_robin_lock:
|
||||
if not self._camera_list:
|
||||
return None, None
|
||||
|
||||
attempts = 0
|
||||
max_attempts = len(self._camera_list)
|
||||
|
||||
while attempts < max_attempts:
|
||||
# Get current camera
|
||||
if self._camera_round_robin_index >= len(self._camera_list):
|
||||
self._camera_round_robin_index = 0
|
||||
|
||||
camera_id = self._camera_list[self._camera_round_robin_index]
|
||||
|
||||
# Move to next camera for next call
|
||||
self._camera_round_robin_index = (self._camera_round_robin_index + 1) % len(self._camera_list)
|
||||
|
||||
# Try to get item from this camera's queue
|
||||
if camera_id in self._tracking_queues:
|
||||
try:
|
||||
item = self._tracking_queues[camera_id].get_nowait()
|
||||
return camera_id, item
|
||||
except queue.Empty:
|
||||
pass # Try next camera
|
||||
|
||||
attempts += 1
|
||||
|
||||
return None, None # All cameras empty
|
||||
|
||||
def _process_tracking_for_camera_sync(self, camera_id: str, frame):
|
||||
"""Synchronous version of tracking processing for worker threads."""
|
||||
try:
|
||||
with self._lock:
|
||||
subscription_ids = list(self._camera_subscribers.get(camera_id, []))
|
||||
|
||||
for subscription_id in subscription_ids:
|
||||
subscription_info = self._subscriptions.get(subscription_id)
|
||||
|
||||
if not subscription_info or not subscription_info.tracking_integration:
|
||||
continue
|
||||
|
||||
display_id = subscription_id.split(';')[0] if ';' in subscription_id else subscription_id
|
||||
|
||||
try:
|
||||
# Run async tracking in thread's event loop
|
||||
loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(loop)
|
||||
try:
|
||||
result = loop.run_until_complete(
|
||||
subscription_info.tracking_integration.process_frame(
|
||||
frame, display_id, subscription_id
|
||||
)
|
||||
)
|
||||
|
||||
# Log tracking results
|
||||
if result:
|
||||
tracked_count = len(result.get('tracked_vehicles', []))
|
||||
validated_vehicle = result.get('validated_vehicle')
|
||||
pipeline_result = result.get('pipeline_result')
|
||||
|
||||
if tracked_count > 0:
|
||||
logger.info(f"[Tracking] {camera_id}: {tracked_count} vehicles tracked")
|
||||
|
||||
if validated_vehicle:
|
||||
logger.info(f"[Tracking] {camera_id}: Vehicle {validated_vehicle['track_id']} "
|
||||
f"validated as {validated_vehicle['state']} "
|
||||
f"(confidence: {validated_vehicle['confidence']:.2f})")
|
||||
|
||||
if pipeline_result:
|
||||
logger.info(f"[Pipeline] {camera_id}: {pipeline_result.get('status', 'unknown')} - "
|
||||
f"{pipeline_result.get('message', 'no message')}")
|
||||
finally:
|
||||
loop.close()
|
||||
|
||||
except Exception as track_e:
|
||||
logger.error(f"Error in tracking for {subscription_id}: {track_e}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error processing tracking for camera {camera_id}: {e}")
|
||||
|
||||
def get_frame(self, camera_id: str, crop_coords: Optional[tuple] = None):
|
||||
"""Get the latest frame for a camera with optional cropping."""
|
||||
return shared_cache_buffer.get_frame(camera_id, crop_coords)
|
||||
|
@ -366,6 +561,30 @@ class StreamManager:
|
|||
|
||||
def stop_all(self):
|
||||
"""Stop all streams and clear all subscriptions."""
|
||||
# Signal workers to stop
|
||||
self._stop_workers.set()
|
||||
|
||||
# Clear all camera queues
|
||||
for camera_id, camera_queue in list(self._tracking_queues.items()):
|
||||
while not camera_queue.empty():
|
||||
try:
|
||||
camera_queue.get_nowait()
|
||||
except queue.Empty:
|
||||
break
|
||||
|
||||
# Wait for workers to finish
|
||||
for worker in self._tracking_workers:
|
||||
worker.join(timeout=2.0)
|
||||
|
||||
# Clear queue management structures
|
||||
self._tracking_queues.clear()
|
||||
self._dropped_frame_counts.clear()
|
||||
with self._round_robin_lock:
|
||||
self._camera_list.clear()
|
||||
self._camera_round_robin_index = 0
|
||||
|
||||
logger.info("Stopped all tracking worker threads")
|
||||
|
||||
with self._lock:
|
||||
# Stop all streams
|
||||
for camera_id in list(self._streams.keys()):
|
||||
|
|
|
@ -113,8 +113,8 @@ class FFmpegRTSPReader(VideoReader):
|
|||
cmd = [
|
||||
'ffmpeg',
|
||||
# DO NOT REMOVE
|
||||
'-hwaccel', 'cuda',
|
||||
'-hwaccel_device', '0',
|
||||
# '-hwaccel', 'cuda',
|
||||
# '-hwaccel_device', '0',
|
||||
# Real-time input flags
|
||||
'-fflags', 'nobuffer+genpts',
|
||||
'-flags', 'low_delay',
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue