diff --git a/core/streaming/readers.py b/core/streaming/readers.py index e6eed55..35a7213 100644 --- a/core/streaming/readers.py +++ b/core/streaming/readers.py @@ -10,6 +10,7 @@ import requests import numpy as np import os import subprocess +import fcntl from typing import Optional, Callable from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler @@ -94,7 +95,7 @@ class FFmpegRTSPReader: self.temp_file = f"/tmp/claude/camera_{self.camera_id.replace(' ', '_')}.raw" os.makedirs("/tmp/claude", exist_ok=True) - # Use PPM format - uncompressed with header, supports -update 1 + # Use PPM format with single file (will use file locking for concurrency) self.temp_file = f"/tmp/claude/camera_{self.camera_id.replace(' ', '_')}.ppm" cmd = [ @@ -176,31 +177,51 @@ class FFmpegRTSPReader: if self.frame_ready_event.wait(timeout=restart_check_interval): self.frame_ready_event.clear() - # Read PPM frame (uncompressed with header) + # Read PPM frame with persistent lock attempts until new inotify try: if os.path.exists(self.temp_file): - # Read PPM with OpenCV (handles RGB->BGR conversion automatically) - frame = cv2.imread(self.temp_file) + # Keep trying to acquire lock until new inotify event or success + max_attempts = 50 # ~500ms worth of attempts + for attempt in range(max_attempts): + # Check if new inotify event arrived (cancel current attempt) + if self.frame_ready_event.is_set(): + break - if frame is not None and frame.shape == (self.height, self.width, 3): - # Call frame callback directly - if self.frame_callback: - self.frame_callback(self.camera_id, frame) + try: + with open(self.temp_file, 'rb') as f: + # Try to acquire shared lock (non-blocking) + fcntl.flock(f.fileno(), fcntl.LOCK_SH | fcntl.LOCK_NB) - frame_count += 1 + # Success! File is locked, safe to read + frame = cv2.imread(self.temp_file) - # Log progress - current_time = time.time() - if current_time - last_log_time >= 30: - logger.info(f"Camera {self.camera_id}: {frame_count} PPM frames processed reactively") - last_log_time = current_time - else: - logger.debug(f"Camera {self.camera_id}: Invalid PPM frame") - else: - logger.debug(f"Camera {self.camera_id}: PPM file not found yet") + if frame is not None and frame.shape == (self.height, self.width, 3): + # Call frame callback directly + if self.frame_callback: + self.frame_callback(self.camera_id, frame) - except (IOError, OSError) as e: - logger.debug(f"Camera {self.camera_id}: File read error: {e}") + frame_count += 1 + + # Log progress + current_time = time.time() + if current_time - last_log_time >= 30: + logger.info(f"Camera {self.camera_id}: {frame_count} PPM frames processed with persistent locking") + last_log_time = current_time + # Invalid frame - just skip, no logging needed + + # Successfully processed frame + break + + except (OSError, IOError): + # File is still locked, wait a bit and try again + time.sleep(0.01) # 10ms wait between attempts + continue + + # If we get here, exhausted attempts or file not ready - just continue + + except (IOError, OSError): + # File errors are routine, just continue + pass except Exception as e: logger.error(f"Camera {self.camera_id}: Error in reactive frame reading: {e}")