diff --git a/core/streaming/readers.py b/core/streaming/readers.py index e6eed55..d8d4b4d 100644 --- a/core/streaming/readers.py +++ b/core/streaming/readers.py @@ -10,6 +10,7 @@ import requests import numpy as np import os import subprocess +# import fcntl # No longer needed with atomic file operations from typing import Optional, Callable from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler @@ -23,6 +24,8 @@ logger = logging.getLogger(__name__) # Suppress noisy watchdog debug logs logging.getLogger('watchdog.observers.inotify_buffer').setLevel(logging.CRITICAL) +logging.getLogger('watchdog.observers.fsevents').setLevel(logging.CRITICAL) +logging.getLogger('fsevents').setLevel(logging.CRITICAL) class FrameFileHandler(FileSystemEventHandler): @@ -89,63 +92,68 @@ class FFmpegRTSPReader: logger.info(f"Stopped FFmpeg reader for camera {self.camera_id}") def _start_ffmpeg_process(self): - """Start FFmpeg subprocess with CUDA hardware acceleration writing to temp file.""" - # Create temp file path for this camera - self.temp_file = f"/tmp/claude/camera_{self.camera_id.replace(' ', '_')}.raw" - os.makedirs("/tmp/claude", exist_ok=True) + """Start FFmpeg subprocess writing timestamped frames for atomic reads.""" + # Create temp file paths for this camera + self.frame_dir = "/tmp/frame" + os.makedirs(self.frame_dir, exist_ok=True) - # Use PPM format - uncompressed with header, supports -update 1 - self.temp_file = f"/tmp/claude/camera_{self.camera_id.replace(' ', '_')}.ppm" + # Use strftime pattern - FFmpeg writes each frame with unique timestamp + # This ensures each file is complete when written + camera_id_safe = self.camera_id.replace(' ', '_') + self.frame_prefix = f"camera_{camera_id_safe}" + # Using strftime pattern with microseconds for unique filenames + self.frame_pattern = f"{self.frame_dir}/{self.frame_prefix}_%Y%m%d_%H%M%S_%f.ppm" cmd = [ 'ffmpeg', + # DO NOT REMOVE '-hwaccel', 'cuda', '-hwaccel_device', '0', '-rtsp_transport', 'tcp', '-i', self.rtsp_url, '-f', 'image2', - '-update', '1', # Works with image2 format + '-strftime', '1', # Enable strftime pattern expansion '-pix_fmt', 'rgb24', # PPM uses RGB not BGR '-an', # No audio '-y', # Overwrite output file - self.temp_file + self.frame_pattern # Write timestamped frames ] try: + # Log the FFmpeg command for debugging + logger.info(f"Starting FFmpeg for camera {self.camera_id} with command: {' '.join(cmd)}") + # Start FFmpeg detached - we don't need to communicate with it self.process = subprocess.Popen( cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL ) - logger.info(f"Started FFmpeg process PID {self.process.pid} for camera {self.camera_id} -> {self.temp_file}") + logger.info(f"Started FFmpeg process PID {self.process.pid} for camera {self.camera_id} -> {self.frame_pattern}") return True except Exception as e: logger.error(f"Failed to start FFmpeg for camera {self.camera_id}: {e}") return False def _setup_file_watcher(self): - """Setup file system watcher for temp file.""" - if not os.path.exists(self.temp_file): - return - - # Setup file watcher - handler = FrameFileHandler(self._on_file_changed) + """Setup file system watcher for frame directory.""" + # Setup file watcher for the frame directory + handler = FrameFileHandler(lambda: self._on_file_changed()) self.observer = Observer() - self.observer.schedule(handler, os.path.dirname(self.temp_file), recursive=False) + self.observer.schedule(handler, self.frame_dir, recursive=False) self.observer.start() - logger.info(f"Started file watcher for {self.temp_file}") + logger.info(f"Started file watcher for {self.frame_dir} with pattern {self.frame_prefix}*.ppm") def _on_file_changed(self): - """Called when temp file is modified.""" - if os.path.basename(self.temp_file) in str(self.temp_file): - self.frame_ready_event.set() + """Called when a new frame file is created.""" + # Signal that a new frame might be available + self.frame_ready_event.set() def _read_frames(self): """Reactively read frames when file changes.""" frame_count = 0 last_log_time = time.time() - bytes_per_frame = self.width * self.height * 3 + # Remove unused variable: bytes_per_frame = self.width * self.height * 3 restart_check_interval = 10 # Check FFmpeg status every 10 seconds while not self.stop_event.is_set(): @@ -159,14 +167,21 @@ class FFmpegRTSPReader: time.sleep(5.0) continue - # Wait for temp file to be created + # Wait for FFmpeg to start writing frame files wait_count = 0 - while not os.path.exists(self.temp_file) and wait_count < 30: + while wait_count < 30: + # Check if any frame files exist + import glob + frame_files = glob.glob(f"{self.frame_dir}/{self.frame_prefix}*.ppm") + if frame_files: + logger.info(f"Found {len(frame_files)} initial frame files for {self.camera_id}") + break time.sleep(1.0) wait_count += 1 - if not os.path.exists(self.temp_file): - logger.error(f"Temp file not created after 30s for {self.camera_id}") + if wait_count >= 30: + logger.error(f"No frame files created after 30s for {self.camera_id}") + logger.error(f"Expected pattern: {self.frame_dir}/{self.frame_prefix}*.ppm") continue # Setup file watcher @@ -176,11 +191,19 @@ class FFmpegRTSPReader: if self.frame_ready_event.wait(timeout=restart_check_interval): self.frame_ready_event.clear() - # Read PPM frame (uncompressed with header) + # Read latest complete frame file try: - if os.path.exists(self.temp_file): - # Read PPM with OpenCV (handles RGB->BGR conversion automatically) - frame = cv2.imread(self.temp_file) + import glob + # Find all frame files for this camera + frame_files = glob.glob(f"{self.frame_dir}/{self.frame_prefix}*.ppm") + + if frame_files: + # Sort by filename (which includes timestamp) and get the latest + frame_files.sort() + latest_frame = frame_files[-1] + + # Read the latest frame (it's complete since FFmpeg wrote it atomically) + frame = cv2.imread(latest_frame) if frame is not None and frame.shape == (self.height, self.width, 3): # Call frame callback directly @@ -192,15 +215,21 @@ class FFmpegRTSPReader: # Log progress current_time = time.time() if current_time - last_log_time >= 30: - logger.info(f"Camera {self.camera_id}: {frame_count} PPM frames processed reactively") + logger.info(f"Camera {self.camera_id}: {frame_count} frames processed") last_log_time = current_time - else: - logger.debug(f"Camera {self.camera_id}: Invalid PPM frame") - else: - logger.debug(f"Camera {self.camera_id}: PPM file not found yet") - except (IOError, OSError) as e: - logger.debug(f"Camera {self.camera_id}: File read error: {e}") + # Clean up old frame files to prevent disk filling + # Keep only the latest 5 frames + if len(frame_files) > 5: + for old_file in frame_files[:-5]: + try: + os.remove(old_file) + except: + pass + + except Exception as e: + logger.debug(f"Camera {self.camera_id}: Error reading frames: {e}") + pass except Exception as e: logger.error(f"Camera {self.camera_id}: Error in reactive frame reading: {e}") @@ -212,10 +241,16 @@ class FFmpegRTSPReader: self.observer.join() if self.process: self.process.terminate() - # Clean up temp file + # Clean up all frame files for this camera try: - if hasattr(self, 'temp_file') and os.path.exists(self.temp_file): - os.remove(self.temp_file) + if hasattr(self, 'frame_prefix') and hasattr(self, 'frame_dir'): + import glob + frame_files = glob.glob(f"{self.frame_dir}/{self.frame_prefix}*.ppm") + for frame_file in frame_files: + try: + os.remove(frame_file) + except: + pass except: pass logger.info(f"Reactive FFmpeg reader ended for camera {self.camera_id}")