""" Multiprocessing-based RTSP stream management for scalability. Handles multiple camera streams using separate processes to bypass GIL limitations. """ import multiprocessing as mp import time import logging import cv2 import numpy as np import queue import threading import os import psutil from typing import Dict, Optional, Tuple, Any, Callable from dataclasses import dataclass from multiprocessing import Process, Queue, Lock, Value, Array, Manager from multiprocessing.shared_memory import SharedMemory import signal import sys # Ensure proper multiprocessing context for uvicorn compatibility try: mp.set_start_method('spawn', force=True) except RuntimeError: pass # Already set logger = logging.getLogger("detector_worker.process_manager") # Frame dimensions (1280x720 RGB) FRAME_WIDTH = 1280 FRAME_HEIGHT = 720 FRAME_CHANNELS = 3 FRAME_SIZE = FRAME_WIDTH * FRAME_HEIGHT * FRAME_CHANNELS @dataclass class ProcessConfig: """Configuration for camera process.""" camera_id: str rtsp_url: str expected_fps: int = 6 buffer_size: int = 3 max_retries: int = 30 reconnect_delay: float = 5.0 class SharedFrameBuffer: """Thread-safe shared memory frame buffer with double buffering.""" def __init__(self, camera_id: str): self.camera_id = camera_id self.lock = mp.Lock() # Double buffering for lock-free reads self.buffer_a = mp.Array('B', FRAME_SIZE, lock=False) self.buffer_b = mp.Array('B', FRAME_SIZE, lock=False) # Atomic index for current read buffer (0 or 1) self.read_buffer_idx = mp.Value('i', 0) # Frame metadata (atomic access) self.timestamp = mp.Value('d', 0.0) self.frame_number = mp.Value('L', 0) self.is_valid = mp.Value('b', False) # Statistics self.frames_written = mp.Value('L', 0) self.frames_dropped = mp.Value('L', 0) def write_frame(self, frame: np.ndarray, timestamp: float) -> bool: """Write frame to buffer with atomic swap.""" if frame is None or frame.size == 0: return False # Resize if needed if frame.shape != (FRAME_HEIGHT, FRAME_WIDTH, FRAME_CHANNELS): frame = cv2.resize(frame, (FRAME_WIDTH, FRAME_HEIGHT)) # Get write buffer (opposite of read buffer) write_idx = 1 - self.read_buffer_idx.value write_buffer = self.buffer_a if write_idx == 0 else self.buffer_b try: # Write to buffer without lock (safe because of double buffering) frame_flat = frame.flatten() write_buffer[:] = frame_flat.astype(np.uint8) # Update metadata self.timestamp.value = timestamp self.frame_number.value += 1 # Atomic swap of buffers with self.lock: self.read_buffer_idx.value = write_idx self.is_valid.value = True self.frames_written.value += 1 return True except Exception as e: logger.error(f"Error writing frame for {self.camera_id}: {e}") self.frames_dropped.value += 1 return False def read_frame(self) -> Optional[Tuple[np.ndarray, float]]: """Read frame from buffer without blocking writers.""" if not self.is_valid.value: return None # Get current read buffer index (atomic read) read_idx = self.read_buffer_idx.value read_buffer = self.buffer_a if read_idx == 0 else self.buffer_b # Read timestamp (atomic) timestamp = self.timestamp.value # Copy frame data (no lock needed for read) try: frame_data = np.array(read_buffer, dtype=np.uint8) frame = frame_data.reshape((FRAME_HEIGHT, FRAME_WIDTH, FRAME_CHANNELS)) return frame.copy(), timestamp except Exception as e: logger.error(f"Error reading frame for {self.camera_id}: {e}") return None def get_stats(self) -> Dict[str, int]: """Get buffer statistics.""" return { 'frames_written': self.frames_written.value, 'frames_dropped': self.frames_dropped.value, 'frame_number': self.frame_number.value, 'is_valid': self.is_valid.value } def camera_worker_process( config: ProcessConfig, frame_buffer: SharedFrameBuffer, command_queue: Queue, status_queue: Queue, stop_event: mp.Event ): """ Worker process for individual camera stream. Runs in separate process to bypass GIL. """ # Set process name for debugging mp.current_process().name = f"Camera-{config.camera_id}" # Configure logging for subprocess logging.basicConfig( level=logging.INFO, format=f'%(asctime)s [%(levelname)s] Camera-{config.camera_id}: %(message)s' ) logger.info(f"Starting camera worker for {config.camera_id}") cap = None consecutive_errors = 0 frame_interval = 1.0 / config.expected_fps last_frame_time = 0 def initialize_capture(): """Initialize OpenCV capture with optimized settings.""" nonlocal cap try: # Set RTSP transport to TCP for reliability os.environ['OPENCV_FFMPEG_CAPTURE_OPTIONS'] = 'rtsp_transport;tcp' # Create capture cap = cv2.VideoCapture(config.rtsp_url, cv2.CAP_FFMPEG) if not cap.isOpened(): logger.error(f"Failed to open RTSP stream") return False # Set capture properties cap.set(cv2.CAP_PROP_FRAME_WIDTH, FRAME_WIDTH) cap.set(cv2.CAP_PROP_FRAME_HEIGHT, FRAME_HEIGHT) cap.set(cv2.CAP_PROP_FPS, config.expected_fps) cap.set(cv2.CAP_PROP_BUFFERSIZE, config.buffer_size) # Read initial frames to stabilize for _ in range(3): ret, _ = cap.read() if not ret: logger.warning("Failed to read initial frames") time.sleep(0.1) logger.info(f"Successfully initialized capture") return True except Exception as e: logger.error(f"Error initializing capture: {e}") return False # Main processing loop while not stop_event.is_set(): try: # Check for commands (non-blocking) try: command = command_queue.get_nowait() if command == "reinit": logger.info("Received reinit command") if cap: cap.release() cap = None consecutive_errors = 0 except queue.Empty: pass # Initialize capture if needed if cap is None or not cap.isOpened(): if not initialize_capture(): time.sleep(config.reconnect_delay) consecutive_errors += 1 if consecutive_errors > config.max_retries and config.max_retries > 0: logger.error("Max retries reached, exiting") break continue else: consecutive_errors = 0 # Read frame with timing control current_time = time.time() if current_time - last_frame_time < frame_interval: time.sleep(0.01) # Small sleep to prevent busy waiting continue ret, frame = cap.read() if not ret or frame is None: consecutive_errors += 1 if consecutive_errors >= config.max_retries: logger.error(f"Too many consecutive errors ({consecutive_errors}), reinitializing") if cap: cap.release() cap = None consecutive_errors = 0 time.sleep(config.reconnect_delay) else: if consecutive_errors <= 5: logger.debug(f"Frame read failed (error {consecutive_errors})") elif consecutive_errors % 10 == 0: logger.warning(f"Continuing frame failures (error {consecutive_errors})") # Exponential backoff sleep_time = min(0.1 * (1.5 ** min(consecutive_errors, 10)), 1.0) time.sleep(sleep_time) continue # Frame read successful consecutive_errors = 0 last_frame_time = current_time # Write to shared buffer if frame_buffer.write_frame(frame, current_time): # Send status update periodically if frame_buffer.frame_number.value % 30 == 0: # Every 30 frames status_queue.put({ 'camera_id': config.camera_id, 'status': 'running', 'frames': frame_buffer.frame_number.value, 'timestamp': current_time }) except KeyboardInterrupt: logger.info("Received interrupt signal") break except Exception as e: logger.error(f"Error in camera worker: {e}") consecutive_errors += 1 time.sleep(1.0) # Cleanup if cap: cap.release() logger.info(f"Camera worker stopped") status_queue.put({ 'camera_id': config.camera_id, 'status': 'stopped', 'frames': frame_buffer.frame_number.value }) class RTSPProcessManager: """ Manages multiple camera processes with health monitoring and auto-restart. """ def __init__(self, max_processes: int = None): self.max_processes = max_processes or (mp.cpu_count() - 2) self.processes: Dict[str, Process] = {} self.frame_buffers: Dict[str, SharedFrameBuffer] = {} self.command_queues: Dict[str, Queue] = {} self.status_queue = mp.Queue() self.stop_events: Dict[str, mp.Event] = {} self.configs: Dict[str, ProcessConfig] = {} # Manager for shared objects self.manager = Manager() self.process_stats = self.manager.dict() # Health monitoring thread self.monitor_thread = None self.monitor_stop = threading.Event() logger.info(f"RTSPProcessManager initialized with max_processes={self.max_processes}") def add_camera(self, config: ProcessConfig) -> bool: """Add a new camera stream.""" if config.camera_id in self.processes: logger.warning(f"Camera {config.camera_id} already exists") return False if len(self.processes) >= self.max_processes: logger.error(f"Max processes ({self.max_processes}) reached") return False try: # Create shared resources frame_buffer = SharedFrameBuffer(config.camera_id) command_queue = mp.Queue() stop_event = mp.Event() # Store resources self.frame_buffers[config.camera_id] = frame_buffer self.command_queues[config.camera_id] = command_queue self.stop_events[config.camera_id] = stop_event self.configs[config.camera_id] = config # Start process process = mp.Process( target=camera_worker_process, args=(config, frame_buffer, command_queue, self.status_queue, stop_event), name=f"Camera-{config.camera_id}" ) process.start() self.processes[config.camera_id] = process logger.info(f"Started process for camera {config.camera_id} (PID: {process.pid})") return True except Exception as e: logger.error(f"Error adding camera {config.camera_id}: {e}") self._cleanup_camera(config.camera_id) return False def remove_camera(self, camera_id: str) -> bool: """Remove a camera stream.""" if camera_id not in self.processes: return False logger.info(f"Removing camera {camera_id}") # Signal stop if camera_id in self.stop_events: self.stop_events[camera_id].set() # Wait for process to stop process = self.processes.get(camera_id) if process and process.is_alive(): process.join(timeout=5.0) if process.is_alive(): logger.warning(f"Force terminating process for {camera_id}") process.terminate() process.join(timeout=2.0) # Cleanup self._cleanup_camera(camera_id) return True def _cleanup_camera(self, camera_id: str): """Clean up camera resources.""" for collection in [self.processes, self.frame_buffers, self.command_queues, self.stop_events, self.configs]: collection.pop(camera_id, None) def get_frame(self, camera_id: str) -> Optional[Tuple[np.ndarray, float]]: """Get latest frame from camera.""" buffer = self.frame_buffers.get(camera_id) if buffer: return buffer.read_frame() return None def get_stats(self) -> Dict[str, Any]: """Get statistics for all cameras.""" stats = {} for camera_id, buffer in self.frame_buffers.items(): process = self.processes.get(camera_id) stats[camera_id] = { 'buffer_stats': buffer.get_stats(), 'process_alive': process.is_alive() if process else False, 'process_pid': process.pid if process else None } return stats def start_monitoring(self): """Start health monitoring thread.""" if self.monitor_thread and self.monitor_thread.is_alive(): return self.monitor_stop.clear() self.monitor_thread = threading.Thread(target=self._monitor_processes) self.monitor_thread.start() logger.info("Started process monitoring") def _monitor_processes(self): """Monitor process health and restart if needed.""" while not self.monitor_stop.is_set(): try: # Check status queue try: while True: status = self.status_queue.get_nowait() self.process_stats[status['camera_id']] = status except queue.Empty: pass # Check process health for camera_id in list(self.processes.keys()): process = self.processes.get(camera_id) if process and not process.is_alive(): logger.warning(f"Process for {camera_id} died, restarting") config = self.configs.get(camera_id) if config: self.remove_camera(camera_id) time.sleep(1.0) self.add_camera(config) time.sleep(5.0) # Check every 5 seconds except Exception as e: logger.error(f"Error in monitor thread: {e}") time.sleep(5.0) def stop_all(self): """Stop all camera processes.""" logger.info("Stopping all camera processes") # Stop monitoring if self.monitor_thread: self.monitor_stop.set() self.monitor_thread.join(timeout=5.0) # Stop all cameras for camera_id in list(self.processes.keys()): self.remove_camera(camera_id) logger.info("All processes stopped")