diff --git a/core/streaming/__init__.py b/core/streaming/__init__.py index c4c40dc..d878aac 100644 --- a/core/streaming/__init__.py +++ b/core/streaming/__init__.py @@ -2,7 +2,7 @@ Streaming system for RTSP and HTTP camera feeds. Provides modular frame readers, buffers, and stream management. """ -from .readers import RTSPReader, HTTPSnapshotReader +from .readers import RTSPReader, HTTPSnapshotReader, FFmpegRTSPReader from .buffers import FrameBuffer, CacheBuffer, shared_frame_buffer, shared_cache_buffer from .manager import StreamManager, StreamConfig, SubscriptionInfo, shared_stream_manager, initialize_stream_manager @@ -10,6 +10,7 @@ __all__ = [ # Readers 'RTSPReader', 'HTTPSnapshotReader', + 'FFmpegRTSPReader', # Buffers 'FrameBuffer', diff --git a/core/streaming/manager.py b/core/streaming/manager.py index 1e3719f..156daf1 100644 --- a/core/streaming/manager.py +++ b/core/streaming/manager.py @@ -9,7 +9,7 @@ from typing import Dict, Set, Optional, List, Any from dataclasses import dataclass from collections import defaultdict -from .readers import RTSPReader, HTTPSnapshotReader +from .readers import RTSPReader, HTTPSnapshotReader, FFmpegRTSPReader from .buffers import shared_cache_buffer from ..tracking.integration import TrackingPipelineIntegration @@ -129,8 +129,8 @@ class StreamManager: """Start a stream for the given camera.""" try: if stream_config.rtsp_url: - # RTSP stream - reader = RTSPReader( + # RTSP stream using FFmpeg subprocess with CUDA acceleration + reader = FFmpegRTSPReader( camera_id=camera_id, rtsp_url=stream_config.rtsp_url, max_retries=stream_config.max_retries @@ -138,7 +138,7 @@ class StreamManager: reader.set_frame_callback(self._frame_callback) reader.start() self._streams[camera_id] = reader - logger.info(f"Started RTSP stream for camera {camera_id}") + logger.info(f"Started FFmpeg RTSP stream for camera {camera_id}") elif stream_config.snapshot_url: # HTTP snapshot stream diff --git a/core/streaming/readers.py b/core/streaming/readers.py index 6f31cf1..243f088 100644 --- a/core/streaming/readers.py +++ b/core/streaming/readers.py @@ -9,6 +9,7 @@ import threading import requests import numpy as np import os +import subprocess from typing import Optional, Callable # Suppress FFMPEG/H.264 error messages if needed @@ -19,6 +20,143 @@ os.environ["OPENCV_FFMPEG_LOGLEVEL"] = "-8" # Suppress FFMPEG warnings logger = logging.getLogger(__name__) +class FFmpegRTSPReader: + """RTSP stream reader using subprocess FFmpeg with CUDA hardware acceleration.""" + + def __init__(self, camera_id: str, rtsp_url: str, max_retries: int = 3): + self.camera_id = camera_id + self.rtsp_url = rtsp_url + self.max_retries = max_retries + self.process = None + self.stop_event = threading.Event() + self.thread = None + self.frame_callback: Optional[Callable] = None + + # Stream specs + self.width = 1280 + self.height = 720 + + def set_frame_callback(self, callback: Callable[[str, np.ndarray], None]): + """Set callback function to handle captured frames.""" + self.frame_callback = callback + + def start(self): + """Start the FFmpeg subprocess reader.""" + if self.thread and self.thread.is_alive(): + logger.warning(f"FFmpeg reader for {self.camera_id} already running") + return + + self.stop_event.clear() + self.thread = threading.Thread(target=self._read_frames, daemon=True) + self.thread.start() + logger.info(f"Started FFmpeg reader for camera {self.camera_id}") + + def stop(self): + """Stop the FFmpeg subprocess reader.""" + self.stop_event.set() + if self.process: + self.process.terminate() + try: + self.process.wait(timeout=5) + except subprocess.TimeoutExpired: + self.process.kill() + if self.thread: + self.thread.join(timeout=5.0) + logger.info(f"Stopped FFmpeg reader for camera {self.camera_id}") + + def _start_ffmpeg_process(self): + """Start FFmpeg subprocess with CUDA hardware acceleration.""" + cmd = [ + 'ffmpeg', + '-hwaccel', 'cuda', + '-hwaccel_device', '0', + '-rtsp_transport', 'tcp', + '-i', self.rtsp_url, + '-f', 'rawvideo', + '-pix_fmt', 'bgr24', + '-an', # No audio + '-' # Output to stdout + ] + + try: + self.process = subprocess.Popen( + cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + bufsize=0 + ) + logger.info(f"Started FFmpeg process for camera {self.camera_id}") + return True + except Exception as e: + logger.error(f"Failed to start FFmpeg for camera {self.camera_id}: {e}") + return False + + def _read_frames(self): + """Read frames from FFmpeg stdout pipe.""" + consecutive_errors = 0 + frame_count = 0 + last_log_time = time.time() + bytes_per_frame = self.width * self.height * 3 # BGR = 3 bytes per pixel + + while not self.stop_event.is_set(): + try: + # Start/restart FFmpeg process if needed + if not self.process or self.process.poll() is not None: + if not self._start_ffmpeg_process(): + time.sleep(5.0) + continue + + # Read one frame worth of data + frame_data = self.process.stdout.read(bytes_per_frame) + + if len(frame_data) != bytes_per_frame: + consecutive_errors += 1 + if consecutive_errors >= 30: + logger.error(f"Camera {self.camera_id}: Too many read errors, restarting FFmpeg") + if self.process: + self.process.terminate() + consecutive_errors = 0 + continue + + # Convert raw bytes to numpy array + frame = np.frombuffer(frame_data, dtype=np.uint8) + frame = frame.reshape((self.height, self.width, 3)) + + # Frame is valid + consecutive_errors = 0 + frame_count += 1 + + # Call frame callback + if self.frame_callback: + try: + self.frame_callback(self.camera_id, frame) + except Exception as e: + logger.error(f"Camera {self.camera_id}: Frame callback error: {e}") + + # Log progress + current_time = time.time() + if current_time - last_log_time >= 30: + logger.info(f"Camera {self.camera_id}: {frame_count} frames processed via FFmpeg") + last_log_time = current_time + + except Exception as e: + logger.error(f"Camera {self.camera_id}: FFmpeg read error: {e}") + consecutive_errors += 1 + if consecutive_errors >= 30: + if self.process: + self.process.terminate() + consecutive_errors = 0 + time.sleep(1.0) + + # Cleanup + if self.process: + self.process.terminate() + logger.info(f"FFmpeg reader thread ended for camera {self.camera_id}") + + +logger = logging.getLogger(__name__) + + class RTSPReader: """RTSP stream frame reader optimized for 1280x720 @ 6fps streams.""" @@ -90,14 +228,12 @@ class RTSPReader: # Read frame immediately without rate limiting for minimum latency try: - # Force grab then retrieve for better error handling - ret = self.cap.grab() - if ret: - ret, frame = self.cap.retrieve() - else: - frame = None + ret, frame = self.cap.read() + if ret and frame is None: + # Grab succeeded but retrieve failed - decoder issue + logger.error(f"Camera {self.camera_id}: Frame grab OK but decode failed") except Exception as read_error: - logger.error(f"Camera {self.camera_id}: cap.grab/retrieve threw exception: {type(read_error).__name__}: {read_error}") + logger.error(f"Camera {self.camera_id}: cap.read() threw exception: {type(read_error).__name__}: {read_error}") ret, frame = False, None if not ret or frame is None: