refactor: add FFmpegRTSPReader for enhanced RTSP stream handling with CUDA acceleration
All checks were successful
Build Worker Base and Application Images / check-base-changes (push) Successful in 8s
Build Worker Base and Application Images / build-base (push) Has been skipped
Build Worker Base and Application Images / build-docker (push) Successful in 2m57s
Build Worker Base and Application Images / deploy-stack (push) Successful in 20s

This commit is contained in:
Siwat Sirichai 2025-09-26 02:07:17 +07:00
parent 08cb4eafc4
commit c38b58e34c
3 changed files with 149 additions and 12 deletions

View file

@ -2,7 +2,7 @@
Streaming system for RTSP and HTTP camera feeds.
Provides modular frame readers, buffers, and stream management.
"""
from .readers import RTSPReader, HTTPSnapshotReader
from .readers import RTSPReader, HTTPSnapshotReader, FFmpegRTSPReader
from .buffers import FrameBuffer, CacheBuffer, shared_frame_buffer, shared_cache_buffer
from .manager import StreamManager, StreamConfig, SubscriptionInfo, shared_stream_manager, initialize_stream_manager
@ -10,6 +10,7 @@ __all__ = [
# Readers
'RTSPReader',
'HTTPSnapshotReader',
'FFmpegRTSPReader',
# Buffers
'FrameBuffer',

View file

@ -9,7 +9,7 @@ from typing import Dict, Set, Optional, List, Any
from dataclasses import dataclass
from collections import defaultdict
from .readers import RTSPReader, HTTPSnapshotReader
from .readers import RTSPReader, HTTPSnapshotReader, FFmpegRTSPReader
from .buffers import shared_cache_buffer
from ..tracking.integration import TrackingPipelineIntegration
@ -129,8 +129,8 @@ class StreamManager:
"""Start a stream for the given camera."""
try:
if stream_config.rtsp_url:
# RTSP stream
reader = RTSPReader(
# RTSP stream using FFmpeg subprocess with CUDA acceleration
reader = FFmpegRTSPReader(
camera_id=camera_id,
rtsp_url=stream_config.rtsp_url,
max_retries=stream_config.max_retries
@ -138,7 +138,7 @@ class StreamManager:
reader.set_frame_callback(self._frame_callback)
reader.start()
self._streams[camera_id] = reader
logger.info(f"Started RTSP stream for camera {camera_id}")
logger.info(f"Started FFmpeg RTSP stream for camera {camera_id}")
elif stream_config.snapshot_url:
# HTTP snapshot stream

View file

@ -9,6 +9,7 @@ import threading
import requests
import numpy as np
import os
import subprocess
from typing import Optional, Callable
# Suppress FFMPEG/H.264 error messages if needed
@ -19,6 +20,143 @@ os.environ["OPENCV_FFMPEG_LOGLEVEL"] = "-8" # Suppress FFMPEG warnings
logger = logging.getLogger(__name__)
class FFmpegRTSPReader:
"""RTSP stream reader using subprocess FFmpeg with CUDA hardware acceleration."""
def __init__(self, camera_id: str, rtsp_url: str, max_retries: int = 3):
self.camera_id = camera_id
self.rtsp_url = rtsp_url
self.max_retries = max_retries
self.process = None
self.stop_event = threading.Event()
self.thread = None
self.frame_callback: Optional[Callable] = None
# Stream specs
self.width = 1280
self.height = 720
def set_frame_callback(self, callback: Callable[[str, np.ndarray], None]):
"""Set callback function to handle captured frames."""
self.frame_callback = callback
def start(self):
"""Start the FFmpeg subprocess reader."""
if self.thread and self.thread.is_alive():
logger.warning(f"FFmpeg reader for {self.camera_id} already running")
return
self.stop_event.clear()
self.thread = threading.Thread(target=self._read_frames, daemon=True)
self.thread.start()
logger.info(f"Started FFmpeg reader for camera {self.camera_id}")
def stop(self):
"""Stop the FFmpeg subprocess reader."""
self.stop_event.set()
if self.process:
self.process.terminate()
try:
self.process.wait(timeout=5)
except subprocess.TimeoutExpired:
self.process.kill()
if self.thread:
self.thread.join(timeout=5.0)
logger.info(f"Stopped FFmpeg reader for camera {self.camera_id}")
def _start_ffmpeg_process(self):
"""Start FFmpeg subprocess with CUDA hardware acceleration."""
cmd = [
'ffmpeg',
'-hwaccel', 'cuda',
'-hwaccel_device', '0',
'-rtsp_transport', 'tcp',
'-i', self.rtsp_url,
'-f', 'rawvideo',
'-pix_fmt', 'bgr24',
'-an', # No audio
'-' # Output to stdout
]
try:
self.process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
bufsize=0
)
logger.info(f"Started FFmpeg process for camera {self.camera_id}")
return True
except Exception as e:
logger.error(f"Failed to start FFmpeg for camera {self.camera_id}: {e}")
return False
def _read_frames(self):
"""Read frames from FFmpeg stdout pipe."""
consecutive_errors = 0
frame_count = 0
last_log_time = time.time()
bytes_per_frame = self.width * self.height * 3 # BGR = 3 bytes per pixel
while not self.stop_event.is_set():
try:
# Start/restart FFmpeg process if needed
if not self.process or self.process.poll() is not None:
if not self._start_ffmpeg_process():
time.sleep(5.0)
continue
# Read one frame worth of data
frame_data = self.process.stdout.read(bytes_per_frame)
if len(frame_data) != bytes_per_frame:
consecutive_errors += 1
if consecutive_errors >= 30:
logger.error(f"Camera {self.camera_id}: Too many read errors, restarting FFmpeg")
if self.process:
self.process.terminate()
consecutive_errors = 0
continue
# Convert raw bytes to numpy array
frame = np.frombuffer(frame_data, dtype=np.uint8)
frame = frame.reshape((self.height, self.width, 3))
# Frame is valid
consecutive_errors = 0
frame_count += 1
# Call frame callback
if self.frame_callback:
try:
self.frame_callback(self.camera_id, frame)
except Exception as e:
logger.error(f"Camera {self.camera_id}: Frame callback error: {e}")
# Log progress
current_time = time.time()
if current_time - last_log_time >= 30:
logger.info(f"Camera {self.camera_id}: {frame_count} frames processed via FFmpeg")
last_log_time = current_time
except Exception as e:
logger.error(f"Camera {self.camera_id}: FFmpeg read error: {e}")
consecutive_errors += 1
if consecutive_errors >= 30:
if self.process:
self.process.terminate()
consecutive_errors = 0
time.sleep(1.0)
# Cleanup
if self.process:
self.process.terminate()
logger.info(f"FFmpeg reader thread ended for camera {self.camera_id}")
logger = logging.getLogger(__name__)
class RTSPReader:
"""RTSP stream frame reader optimized for 1280x720 @ 6fps streams."""
@ -90,14 +228,12 @@ class RTSPReader:
# Read frame immediately without rate limiting for minimum latency
try:
# Force grab then retrieve for better error handling
ret = self.cap.grab()
if ret:
ret, frame = self.cap.retrieve()
else:
frame = None
ret, frame = self.cap.read()
if ret and frame is None:
# Grab succeeded but retrieve failed - decoder issue
logger.error(f"Camera {self.camera_id}: Frame grab OK but decode failed")
except Exception as read_error:
logger.error(f"Camera {self.camera_id}: cap.grab/retrieve threw exception: {type(read_error).__name__}: {read_error}")
logger.error(f"Camera {self.camera_id}: cap.read() threw exception: {type(read_error).__name__}: {read_error}")
ret, frame = False, None
if not ret or frame is None: