From 83aaf95f594c83180353f37f305490a08c890524 Mon Sep 17 00:00:00 2001 From: ziesorx Date: Fri, 26 Sep 2025 11:24:48 +0700 Subject: [PATCH] fix: can read, track, and detect frame --- core/streaming/readers.py | 144 +++++++++++++++++++++----------------- 1 file changed, 79 insertions(+), 65 deletions(-) diff --git a/core/streaming/readers.py b/core/streaming/readers.py index 44fee34..d8d4b4d 100644 --- a/core/streaming/readers.py +++ b/core/streaming/readers.py @@ -10,7 +10,7 @@ import requests import numpy as np import os import subprocess -import fcntl +# import fcntl # No longer needed with atomic file operations from typing import Optional, Callable from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler @@ -24,6 +24,8 @@ logger = logging.getLogger(__name__) # Suppress noisy watchdog debug logs logging.getLogger('watchdog.observers.inotify_buffer').setLevel(logging.CRITICAL) +logging.getLogger('watchdog.observers.fsevents').setLevel(logging.CRITICAL) +logging.getLogger('fsevents').setLevel(logging.CRITICAL) class FrameFileHandler(FileSystemEventHandler): @@ -90,63 +92,68 @@ class FFmpegRTSPReader: logger.info(f"Stopped FFmpeg reader for camera {self.camera_id}") def _start_ffmpeg_process(self): - """Start FFmpeg subprocess with CUDA hardware acceleration writing to temp file.""" - # Create temp file path for this camera - self.temp_file = f"/tmp/frame/camera_{self.camera_id.replace(' ', '_')}.raw" - os.makedirs("/tmp/frame", exist_ok=True) + """Start FFmpeg subprocess writing timestamped frames for atomic reads.""" + # Create temp file paths for this camera + self.frame_dir = "/tmp/frame" + os.makedirs(self.frame_dir, exist_ok=True) - # Use PPM format with single file (will use file locking for concurrency) - self.temp_file = f"/tmp/frame/camera_{self.camera_id.replace(' ', '_')}.ppm" + # Use strftime pattern - FFmpeg writes each frame with unique timestamp + # This ensures each file is complete when written + camera_id_safe = self.camera_id.replace(' ', '_') + self.frame_prefix = f"camera_{camera_id_safe}" + # Using strftime pattern with microseconds for unique filenames + self.frame_pattern = f"{self.frame_dir}/{self.frame_prefix}_%Y%m%d_%H%M%S_%f.ppm" cmd = [ 'ffmpeg', + # DO NOT REMOVE '-hwaccel', 'cuda', '-hwaccel_device', '0', '-rtsp_transport', 'tcp', '-i', self.rtsp_url, '-f', 'image2', - '-update', '1', # Works with image2 format + '-strftime', '1', # Enable strftime pattern expansion '-pix_fmt', 'rgb24', # PPM uses RGB not BGR '-an', # No audio '-y', # Overwrite output file - self.temp_file + self.frame_pattern # Write timestamped frames ] try: + # Log the FFmpeg command for debugging + logger.info(f"Starting FFmpeg for camera {self.camera_id} with command: {' '.join(cmd)}") + # Start FFmpeg detached - we don't need to communicate with it self.process = subprocess.Popen( cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL ) - logger.info(f"Started FFmpeg process PID {self.process.pid} for camera {self.camera_id} -> {self.temp_file}") + logger.info(f"Started FFmpeg process PID {self.process.pid} for camera {self.camera_id} -> {self.frame_pattern}") return True except Exception as e: logger.error(f"Failed to start FFmpeg for camera {self.camera_id}: {e}") return False def _setup_file_watcher(self): - """Setup file system watcher for temp file.""" - if not os.path.exists(self.temp_file): - return - - # Setup file watcher - handler = FrameFileHandler(self._on_file_changed) + """Setup file system watcher for frame directory.""" + # Setup file watcher for the frame directory + handler = FrameFileHandler(lambda: self._on_file_changed()) self.observer = Observer() - self.observer.schedule(handler, os.path.dirname(self.temp_file), recursive=False) + self.observer.schedule(handler, self.frame_dir, recursive=False) self.observer.start() - logger.info(f"Started file watcher for {self.temp_file}") + logger.info(f"Started file watcher for {self.frame_dir} with pattern {self.frame_prefix}*.ppm") def _on_file_changed(self): - """Called when temp file is modified.""" - if os.path.basename(self.temp_file) in str(self.temp_file): - self.frame_ready_event.set() + """Called when a new frame file is created.""" + # Signal that a new frame might be available + self.frame_ready_event.set() def _read_frames(self): """Reactively read frames when file changes.""" frame_count = 0 last_log_time = time.time() - bytes_per_frame = self.width * self.height * 3 + # Remove unused variable: bytes_per_frame = self.width * self.height * 3 restart_check_interval = 10 # Check FFmpeg status every 10 seconds while not self.stop_event.is_set(): @@ -160,14 +167,21 @@ class FFmpegRTSPReader: time.sleep(5.0) continue - # Wait for temp file to be created + # Wait for FFmpeg to start writing frame files wait_count = 0 - while not os.path.exists(self.temp_file) and wait_count < 30: + while wait_count < 30: + # Check if any frame files exist + import glob + frame_files = glob.glob(f"{self.frame_dir}/{self.frame_prefix}*.ppm") + if frame_files: + logger.info(f"Found {len(frame_files)} initial frame files for {self.camera_id}") + break time.sleep(1.0) wait_count += 1 - if not os.path.exists(self.temp_file): - logger.error(f"Temp file not created after 30s for {self.camera_id}") + if wait_count >= 30: + logger.error(f"No frame files created after 30s for {self.camera_id}") + logger.error(f"Expected pattern: {self.frame_dir}/{self.frame_prefix}*.ppm") continue # Setup file watcher @@ -177,50 +191,44 @@ class FFmpegRTSPReader: if self.frame_ready_event.wait(timeout=restart_check_interval): self.frame_ready_event.clear() - # Read PPM frame with persistent lock attempts until new inotify + # Read latest complete frame file try: - if os.path.exists(self.temp_file): - # Keep trying to acquire lock until new inotify event or success - max_attempts = 50 # ~500ms worth of attempts - for attempt in range(max_attempts): - # Check if new inotify event arrived (cancel current attempt) - if self.frame_ready_event.is_set(): - break + import glob + # Find all frame files for this camera + frame_files = glob.glob(f"{self.frame_dir}/{self.frame_prefix}*.ppm") - try: - with open(self.temp_file, 'rb') as f: - # Try to acquire shared lock (non-blocking) - fcntl.flock(f.fileno(), fcntl.LOCK_SH | fcntl.LOCK_NB) + if frame_files: + # Sort by filename (which includes timestamp) and get the latest + frame_files.sort() + latest_frame = frame_files[-1] - # Success! File is locked, safe to read - frame = cv2.imread(self.temp_file) + # Read the latest frame (it's complete since FFmpeg wrote it atomically) + frame = cv2.imread(latest_frame) - if frame is not None and frame.shape == (self.height, self.width, 3): - # Call frame callback directly - if self.frame_callback: - self.frame_callback(self.camera_id, frame) + if frame is not None and frame.shape == (self.height, self.width, 3): + # Call frame callback directly + if self.frame_callback: + self.frame_callback(self.camera_id, frame) - frame_count += 1 + frame_count += 1 - # Log progress - current_time = time.time() - if current_time - last_log_time >= 30: - logger.info(f"Camera {self.camera_id}: {frame_count} PPM frames processed with persistent locking") - last_log_time = current_time - # Invalid frame - just skip, no logging needed + # Log progress + current_time = time.time() + if current_time - last_log_time >= 30: + logger.info(f"Camera {self.camera_id}: {frame_count} frames processed") + last_log_time = current_time - # Successfully processed frame - break + # Clean up old frame files to prevent disk filling + # Keep only the latest 5 frames + if len(frame_files) > 5: + for old_file in frame_files[:-5]: + try: + os.remove(old_file) + except: + pass - except (OSError, IOError): - # File is still locked, wait a bit and try again - time.sleep(0.01) # 10ms wait between attempts - continue - - # If we get here, exhausted attempts or file not ready - just continue - - except (IOError, OSError): - # File errors are routine, just continue + except Exception as e: + logger.debug(f"Camera {self.camera_id}: Error reading frames: {e}") pass except Exception as e: @@ -233,10 +241,16 @@ class FFmpegRTSPReader: self.observer.join() if self.process: self.process.terminate() - # Clean up temp file + # Clean up all frame files for this camera try: - if hasattr(self, 'temp_file') and os.path.exists(self.temp_file): - os.remove(self.temp_file) + if hasattr(self, 'frame_prefix') and hasattr(self, 'frame_dir'): + import glob + frame_files = glob.glob(f"{self.frame_dir}/{self.frame_prefix}*.ppm") + for frame_file in frame_files: + try: + os.remove(frame_file) + except: + pass except: pass logger.info(f"Reactive FFmpeg reader ended for camera {self.camera_id}")