refactor: enhance FFmpegRTSPReader with file watching and reactive frame reading
Some checks failed
Build Worker Base and Application Images / deploy-stack (push) Blocked by required conditions
Build Worker Base and Application Images / check-base-changes (push) Successful in 8s
Build Worker Base and Application Images / build-base (push) Has been skipped
Build Worker Base and Application Images / build-docker (push) Has been cancelled

This commit is contained in:
Siwat Sirichai 2025-09-26 02:18:20 +07:00
parent 79a1189675
commit cb31633cc1
2 changed files with 101 additions and 81 deletions

View file

@ -11,6 +11,8 @@ import numpy as np
import os
import subprocess
from typing import Optional, Callable
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
# Suppress FFMPEG/H.264 error messages if needed
# Set this environment variable to reduce noise from decoder errors
@ -20,8 +22,25 @@ os.environ["OPENCV_FFMPEG_LOGLEVEL"] = "-8" # Suppress FFMPEG warnings
logger = logging.getLogger(__name__)
class FrameFileHandler(FileSystemEventHandler):
"""File system event handler for frame file changes."""
def __init__(self, callback):
self.callback = callback
self.last_modified = 0
def on_modified(self, event):
if event.is_directory:
return
# Debounce rapid file changes
current_time = time.time()
if current_time - self.last_modified > 0.01: # 10ms debounce
self.last_modified = current_time
self.callback()
class FFmpegRTSPReader:
"""RTSP stream reader using subprocess FFmpeg with CUDA hardware acceleration."""
"""RTSP stream reader using subprocess FFmpeg with CUDA hardware acceleration and file watching."""
def __init__(self, camera_id: str, rtsp_url: str, max_retries: int = 3):
self.camera_id = camera_id
@ -31,6 +50,8 @@ class FFmpegRTSPReader:
self.stop_event = threading.Event()
self.thread = None
self.frame_callback: Optional[Callable] = None
self.observer = None
self.frame_ready_event = threading.Event()
# Stream specs
self.width = 1280
@ -67,7 +88,6 @@ class FFmpegRTSPReader:
def _start_ffmpeg_process(self):
"""Start FFmpeg subprocess with CUDA hardware acceleration writing to temp file."""
# Create temp file path for this camera
import tempfile
self.temp_file = f"/tmp/claude/camera_{self.camera_id.replace(' ', '_')}.raw"
os.makedirs("/tmp/claude", exist_ok=True)
@ -85,114 +105,113 @@ class FFmpegRTSPReader:
]
try:
# Start FFmpeg detached - we don't need to communicate with it
self.process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
bufsize=0
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL
)
logger.info(f"Started FFmpeg process for camera {self.camera_id} writing to {self.temp_file}")
# Don't check process immediately - FFmpeg takes time to initialize
logger.info(f"Waiting for FFmpeg to initialize for camera {self.camera_id}...")
logger.info(f"Started FFmpeg process PID {self.process.pid} for camera {self.camera_id} -> {self.temp_file}")
return True
except Exception as e:
logger.error(f"Failed to start FFmpeg for camera {self.camera_id}: {e}")
return False
def _setup_file_watcher(self):
"""Setup file system watcher for temp file."""
if not os.path.exists(self.temp_file):
return
# Setup file watcher
handler = FrameFileHandler(self._on_file_changed)
self.observer = Observer()
self.observer.schedule(handler, os.path.dirname(self.temp_file), recursive=False)
self.observer.start()
logger.info(f"Started file watcher for {self.temp_file}")
def _on_file_changed(self):
"""Called when temp file is modified."""
if os.path.basename(self.temp_file) in str(self.temp_file):
self.frame_ready_event.set()
def _read_frames(self):
"""Read frames from FFmpeg temp file."""
consecutive_errors = 0
"""Reactively read frames when file changes."""
frame_count = 0
last_log_time = time.time()
bytes_per_frame = self.width * self.height * 3 # BGR = 3 bytes per pixel
last_file_size = 0
bytes_per_frame = self.width * self.height * 3
restart_check_interval = 10 # Check FFmpeg status every 10 seconds
while not self.stop_event.is_set():
try:
# Start/restart FFmpeg process if needed
# Start FFmpeg if not running
if not self.process or self.process.poll() is not None:
if self.process and self.process.poll() is not None:
logger.warning(f"FFmpeg process died for camera {self.camera_id}, restarting...")
if not self._start_ffmpeg_process():
time.sleep(5.0)
continue
# Wait for temp file to exist and have content
if not os.path.exists(self.temp_file):
time.sleep(0.1)
continue
# Wait for temp file to be created
wait_count = 0
while not os.path.exists(self.temp_file) and wait_count < 30:
time.sleep(1.0)
wait_count += 1
# Check if file size changed (new frame available)
try:
current_file_size = os.path.getsize(self.temp_file)
if current_file_size <= last_file_size and current_file_size > 0:
# File size didn't increase, wait for next frame
time.sleep(0.05) # ~20 FPS max
continue
last_file_size = current_file_size
except OSError:
time.sleep(0.1)
continue
# Read the latest frame from the end of file
try:
with open(self.temp_file, 'rb') as f:
# Seek to last complete frame
file_size = f.seek(0, 2) # Seek to end
if file_size < bytes_per_frame:
time.sleep(0.1)
continue
# Read last complete frame
last_frame_offset = (file_size // bytes_per_frame - 1) * bytes_per_frame
f.seek(last_frame_offset)
frame_data = f.read(bytes_per_frame)
if len(frame_data) != bytes_per_frame:
consecutive_errors += 1
if consecutive_errors >= 30:
logger.error(f"Camera {self.camera_id}: Too many read errors, restarting FFmpeg")
if self.process:
self.process.terminate()
consecutive_errors = 0
time.sleep(0.1)
if not os.path.exists(self.temp_file):
logger.error(f"Temp file not created after 30s for {self.camera_id}")
continue
# Convert raw bytes to numpy array
frame = np.frombuffer(frame_data, dtype=np.uint8)
frame = frame.reshape((self.height, self.width, 3))
# Setup file watcher
self._setup_file_watcher()
# Frame is valid
consecutive_errors = 0
frame_count += 1
# Wait for file change event (or timeout for health check)
if self.frame_ready_event.wait(timeout=restart_check_interval):
self.frame_ready_event.clear()
# Call frame callback
if self.frame_callback:
try:
self.frame_callback(self.camera_id, frame)
except Exception as e:
logger.error(f"Camera {self.camera_id}: Frame callback error: {e}")
# Read latest frame
try:
with open(self.temp_file, 'rb') as f:
# Get file size
f.seek(0, 2)
file_size = f.tell()
# Log progress
current_time = time.time()
if current_time - last_log_time >= 30:
logger.info(f"Camera {self.camera_id}: {frame_count} frames processed via temp file")
last_log_time = current_time
if file_size < bytes_per_frame:
continue
except IOError as e:
logger.debug(f"Camera {self.camera_id}: File read error: {e}")
time.sleep(0.1)
continue
# Read last complete frame
last_frame_offset = (file_size // bytes_per_frame - 1) * bytes_per_frame
f.seek(last_frame_offset)
frame_data = f.read(bytes_per_frame)
if len(frame_data) == bytes_per_frame:
# Convert to numpy array
frame = np.frombuffer(frame_data, dtype=np.uint8)
frame = frame.reshape((self.height, self.width, 3))
# Call frame callback
if self.frame_callback:
self.frame_callback(self.camera_id, frame)
frame_count += 1
# Log progress
current_time = time.time()
if current_time - last_log_time >= 30:
logger.info(f"Camera {self.camera_id}: {frame_count} frames processed reactively")
last_log_time = current_time
except (IOError, OSError) as e:
logger.debug(f"Camera {self.camera_id}: File read error: {e}")
except Exception as e:
logger.error(f"Camera {self.camera_id}: FFmpeg read error: {e}")
consecutive_errors += 1
if consecutive_errors >= 30:
if self.process:
self.process.terminate()
consecutive_errors = 0
logger.error(f"Camera {self.camera_id}: Error in reactive frame reading: {e}")
time.sleep(1.0)
# Cleanup
if self.observer:
self.observer.stop()
self.observer.join()
if self.process:
self.process.terminate()
# Clean up temp file
@ -201,7 +220,7 @@ class FFmpegRTSPReader:
os.remove(self.temp_file)
except:
pass
logger.info(f"FFmpeg reader thread ended for camera {self.camera_id}")
logger.info(f"Reactive FFmpeg reader ended for camera {self.camera_id}")
logger = logging.getLogger(__name__)