From 79a1189675e430e093d971565776b5ad01809eb0 Mon Sep 17 00:00:00 2001 From: Siwat Sirichai Date: Fri, 26 Sep 2025 02:15:06 +0700 Subject: [PATCH] refactor: update FFmpegRTSPReader to use a temporary file for frame reading and improve error handling --- core/streaming/readers.py | 112 +++++++++++++++++++++++++++----------- 1 file changed, 81 insertions(+), 31 deletions(-) diff --git a/core/streaming/readers.py b/core/streaming/readers.py index 243f088..7478e38 100644 --- a/core/streaming/readers.py +++ b/core/streaming/readers.py @@ -65,7 +65,12 @@ class FFmpegRTSPReader: logger.info(f"Stopped FFmpeg reader for camera {self.camera_id}") def _start_ffmpeg_process(self): - """Start FFmpeg subprocess with CUDA hardware acceleration.""" + """Start FFmpeg subprocess with CUDA hardware acceleration writing to temp file.""" + # Create temp file path for this camera + import tempfile + self.temp_file = f"/tmp/claude/camera_{self.camera_id.replace(' ', '_')}.raw" + os.makedirs("/tmp/claude", exist_ok=True) + cmd = [ 'ffmpeg', '-hwaccel', 'cuda', @@ -75,7 +80,8 @@ class FFmpegRTSPReader: '-f', 'rawvideo', '-pix_fmt', 'bgr24', '-an', # No audio - '-' # Output to stdout + '-y', # Overwrite output file + self.temp_file ] try: @@ -85,18 +91,22 @@ class FFmpegRTSPReader: stderr=subprocess.PIPE, bufsize=0 ) - logger.info(f"Started FFmpeg process for camera {self.camera_id}") + logger.info(f"Started FFmpeg process for camera {self.camera_id} writing to {self.temp_file}") + + # Don't check process immediately - FFmpeg takes time to initialize + logger.info(f"Waiting for FFmpeg to initialize for camera {self.camera_id}...") return True except Exception as e: logger.error(f"Failed to start FFmpeg for camera {self.camera_id}: {e}") return False def _read_frames(self): - """Read frames from FFmpeg stdout pipe.""" + """Read frames from FFmpeg temp file.""" consecutive_errors = 0 frame_count = 0 last_log_time = time.time() bytes_per_frame = self.width * self.height * 3 # BGR = 3 bytes per pixel + last_file_size = 0 while not self.stop_event.is_set(): try: @@ -106,38 +116,72 @@ class FFmpegRTSPReader: time.sleep(5.0) continue - # Read one frame worth of data - frame_data = self.process.stdout.read(bytes_per_frame) - - if len(frame_data) != bytes_per_frame: - consecutive_errors += 1 - if consecutive_errors >= 30: - logger.error(f"Camera {self.camera_id}: Too many read errors, restarting FFmpeg") - if self.process: - self.process.terminate() - consecutive_errors = 0 + # Wait for temp file to exist and have content + if not os.path.exists(self.temp_file): + time.sleep(0.1) continue - # Convert raw bytes to numpy array - frame = np.frombuffer(frame_data, dtype=np.uint8) - frame = frame.reshape((self.height, self.width, 3)) + # Check if file size changed (new frame available) + try: + current_file_size = os.path.getsize(self.temp_file) + if current_file_size <= last_file_size and current_file_size > 0: + # File size didn't increase, wait for next frame + time.sleep(0.05) # ~20 FPS max + continue + last_file_size = current_file_size + except OSError: + time.sleep(0.1) + continue - # Frame is valid - consecutive_errors = 0 - frame_count += 1 + # Read the latest frame from the end of file + try: + with open(self.temp_file, 'rb') as f: + # Seek to last complete frame + file_size = f.seek(0, 2) # Seek to end + if file_size < bytes_per_frame: + time.sleep(0.1) + continue - # Call frame callback - if self.frame_callback: - try: - self.frame_callback(self.camera_id, frame) - except Exception as e: - logger.error(f"Camera {self.camera_id}: Frame callback error: {e}") + # Read last complete frame + last_frame_offset = (file_size // bytes_per_frame - 1) * bytes_per_frame + f.seek(last_frame_offset) + frame_data = f.read(bytes_per_frame) - # Log progress - current_time = time.time() - if current_time - last_log_time >= 30: - logger.info(f"Camera {self.camera_id}: {frame_count} frames processed via FFmpeg") - last_log_time = current_time + if len(frame_data) != bytes_per_frame: + consecutive_errors += 1 + if consecutive_errors >= 30: + logger.error(f"Camera {self.camera_id}: Too many read errors, restarting FFmpeg") + if self.process: + self.process.terminate() + consecutive_errors = 0 + time.sleep(0.1) + continue + + # Convert raw bytes to numpy array + frame = np.frombuffer(frame_data, dtype=np.uint8) + frame = frame.reshape((self.height, self.width, 3)) + + # Frame is valid + consecutive_errors = 0 + frame_count += 1 + + # Call frame callback + if self.frame_callback: + try: + self.frame_callback(self.camera_id, frame) + except Exception as e: + logger.error(f"Camera {self.camera_id}: Frame callback error: {e}") + + # Log progress + current_time = time.time() + if current_time - last_log_time >= 30: + logger.info(f"Camera {self.camera_id}: {frame_count} frames processed via temp file") + last_log_time = current_time + + except IOError as e: + logger.debug(f"Camera {self.camera_id}: File read error: {e}") + time.sleep(0.1) + continue except Exception as e: logger.error(f"Camera {self.camera_id}: FFmpeg read error: {e}") @@ -151,6 +195,12 @@ class FFmpegRTSPReader: # Cleanup if self.process: self.process.terminate() + # Clean up temp file + try: + if hasattr(self, 'temp_file') and os.path.exists(self.temp_file): + os.remove(self.temp_file) + except: + pass logger.info(f"FFmpeg reader thread ended for camera {self.camera_id}")