From cb31633cc107a5156b4c81d975823989f42e416c Mon Sep 17 00:00:00 2001 From: Siwat Sirichai Date: Fri, 26 Sep 2025 02:18:20 +0700 Subject: [PATCH] refactor: enhance FFmpegRTSPReader with file watching and reactive frame reading --- .claude/settings.local.json | 3 +- core/streaming/readers.py | 179 ++++++++++++++++++++---------------- 2 files changed, 101 insertions(+), 81 deletions(-) diff --git a/.claude/settings.local.json b/.claude/settings.local.json index b06024d..97cf5c1 100644 --- a/.claude/settings.local.json +++ b/.claude/settings.local.json @@ -1,7 +1,8 @@ { "permissions": { "allow": [ - "Bash(dir:*)" + "Bash(dir:*)", + "WebSearch" ], "deny": [], "ask": [] diff --git a/core/streaming/readers.py b/core/streaming/readers.py index 7478e38..e221c4a 100644 --- a/core/streaming/readers.py +++ b/core/streaming/readers.py @@ -11,6 +11,8 @@ import numpy as np import os import subprocess from typing import Optional, Callable +from watchdog.observers import Observer +from watchdog.events import FileSystemEventHandler # Suppress FFMPEG/H.264 error messages if needed # Set this environment variable to reduce noise from decoder errors @@ -20,8 +22,25 @@ os.environ["OPENCV_FFMPEG_LOGLEVEL"] = "-8" # Suppress FFMPEG warnings logger = logging.getLogger(__name__) +class FrameFileHandler(FileSystemEventHandler): + """File system event handler for frame file changes.""" + + def __init__(self, callback): + self.callback = callback + self.last_modified = 0 + + def on_modified(self, event): + if event.is_directory: + return + # Debounce rapid file changes + current_time = time.time() + if current_time - self.last_modified > 0.01: # 10ms debounce + self.last_modified = current_time + self.callback() + + class FFmpegRTSPReader: - """RTSP stream reader using subprocess FFmpeg with CUDA hardware acceleration.""" + """RTSP stream reader using subprocess FFmpeg with CUDA hardware acceleration and file watching.""" def __init__(self, camera_id: str, rtsp_url: str, max_retries: int = 3): self.camera_id = camera_id @@ -31,6 +50,8 @@ class FFmpegRTSPReader: self.stop_event = threading.Event() self.thread = None self.frame_callback: Optional[Callable] = None + self.observer = None + self.frame_ready_event = threading.Event() # Stream specs self.width = 1280 @@ -67,7 +88,6 @@ class FFmpegRTSPReader: def _start_ffmpeg_process(self): """Start FFmpeg subprocess with CUDA hardware acceleration writing to temp file.""" # Create temp file path for this camera - import tempfile self.temp_file = f"/tmp/claude/camera_{self.camera_id.replace(' ', '_')}.raw" os.makedirs("/tmp/claude", exist_ok=True) @@ -85,114 +105,113 @@ class FFmpegRTSPReader: ] try: + # Start FFmpeg detached - we don't need to communicate with it self.process = subprocess.Popen( cmd, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - bufsize=0 + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL ) - logger.info(f"Started FFmpeg process for camera {self.camera_id} writing to {self.temp_file}") - - # Don't check process immediately - FFmpeg takes time to initialize - logger.info(f"Waiting for FFmpeg to initialize for camera {self.camera_id}...") + logger.info(f"Started FFmpeg process PID {self.process.pid} for camera {self.camera_id} -> {self.temp_file}") return True except Exception as e: logger.error(f"Failed to start FFmpeg for camera {self.camera_id}: {e}") return False + def _setup_file_watcher(self): + """Setup file system watcher for temp file.""" + if not os.path.exists(self.temp_file): + return + + # Setup file watcher + handler = FrameFileHandler(self._on_file_changed) + self.observer = Observer() + self.observer.schedule(handler, os.path.dirname(self.temp_file), recursive=False) + self.observer.start() + logger.info(f"Started file watcher for {self.temp_file}") + + def _on_file_changed(self): + """Called when temp file is modified.""" + if os.path.basename(self.temp_file) in str(self.temp_file): + self.frame_ready_event.set() + def _read_frames(self): - """Read frames from FFmpeg temp file.""" - consecutive_errors = 0 + """Reactively read frames when file changes.""" frame_count = 0 last_log_time = time.time() - bytes_per_frame = self.width * self.height * 3 # BGR = 3 bytes per pixel - last_file_size = 0 + bytes_per_frame = self.width * self.height * 3 + restart_check_interval = 10 # Check FFmpeg status every 10 seconds while not self.stop_event.is_set(): try: - # Start/restart FFmpeg process if needed + # Start FFmpeg if not running if not self.process or self.process.poll() is not None: + if self.process and self.process.poll() is not None: + logger.warning(f"FFmpeg process died for camera {self.camera_id}, restarting...") + if not self._start_ffmpeg_process(): time.sleep(5.0) continue - # Wait for temp file to exist and have content - if not os.path.exists(self.temp_file): - time.sleep(0.1) - continue + # Wait for temp file to be created + wait_count = 0 + while not os.path.exists(self.temp_file) and wait_count < 30: + time.sleep(1.0) + wait_count += 1 - # Check if file size changed (new frame available) - try: - current_file_size = os.path.getsize(self.temp_file) - if current_file_size <= last_file_size and current_file_size > 0: - # File size didn't increase, wait for next frame - time.sleep(0.05) # ~20 FPS max - continue - last_file_size = current_file_size - except OSError: - time.sleep(0.1) - continue - - # Read the latest frame from the end of file - try: - with open(self.temp_file, 'rb') as f: - # Seek to last complete frame - file_size = f.seek(0, 2) # Seek to end - if file_size < bytes_per_frame: - time.sleep(0.1) - continue - - # Read last complete frame - last_frame_offset = (file_size // bytes_per_frame - 1) * bytes_per_frame - f.seek(last_frame_offset) - frame_data = f.read(bytes_per_frame) - - if len(frame_data) != bytes_per_frame: - consecutive_errors += 1 - if consecutive_errors >= 30: - logger.error(f"Camera {self.camera_id}: Too many read errors, restarting FFmpeg") - if self.process: - self.process.terminate() - consecutive_errors = 0 - time.sleep(0.1) + if not os.path.exists(self.temp_file): + logger.error(f"Temp file not created after 30s for {self.camera_id}") continue - # Convert raw bytes to numpy array - frame = np.frombuffer(frame_data, dtype=np.uint8) - frame = frame.reshape((self.height, self.width, 3)) + # Setup file watcher + self._setup_file_watcher() - # Frame is valid - consecutive_errors = 0 - frame_count += 1 + # Wait for file change event (or timeout for health check) + if self.frame_ready_event.wait(timeout=restart_check_interval): + self.frame_ready_event.clear() - # Call frame callback - if self.frame_callback: - try: - self.frame_callback(self.camera_id, frame) - except Exception as e: - logger.error(f"Camera {self.camera_id}: Frame callback error: {e}") + # Read latest frame + try: + with open(self.temp_file, 'rb') as f: + # Get file size + f.seek(0, 2) + file_size = f.tell() - # Log progress - current_time = time.time() - if current_time - last_log_time >= 30: - logger.info(f"Camera {self.camera_id}: {frame_count} frames processed via temp file") - last_log_time = current_time + if file_size < bytes_per_frame: + continue - except IOError as e: - logger.debug(f"Camera {self.camera_id}: File read error: {e}") - time.sleep(0.1) - continue + # Read last complete frame + last_frame_offset = (file_size // bytes_per_frame - 1) * bytes_per_frame + f.seek(last_frame_offset) + frame_data = f.read(bytes_per_frame) + + if len(frame_data) == bytes_per_frame: + # Convert to numpy array + frame = np.frombuffer(frame_data, dtype=np.uint8) + frame = frame.reshape((self.height, self.width, 3)) + + # Call frame callback + if self.frame_callback: + self.frame_callback(self.camera_id, frame) + + frame_count += 1 + + # Log progress + current_time = time.time() + if current_time - last_log_time >= 30: + logger.info(f"Camera {self.camera_id}: {frame_count} frames processed reactively") + last_log_time = current_time + + except (IOError, OSError) as e: + logger.debug(f"Camera {self.camera_id}: File read error: {e}") except Exception as e: - logger.error(f"Camera {self.camera_id}: FFmpeg read error: {e}") - consecutive_errors += 1 - if consecutive_errors >= 30: - if self.process: - self.process.terminate() - consecutive_errors = 0 + logger.error(f"Camera {self.camera_id}: Error in reactive frame reading: {e}") time.sleep(1.0) # Cleanup + if self.observer: + self.observer.stop() + self.observer.join() if self.process: self.process.terminate() # Clean up temp file @@ -201,7 +220,7 @@ class FFmpegRTSPReader: os.remove(self.temp_file) except: pass - logger.info(f"FFmpeg reader thread ended for camera {self.camera_id}") + logger.info(f"Reactive FFmpeg reader ended for camera {self.camera_id}") logger = logging.getLogger(__name__)