diff --git a/core/streaming/readers.py b/core/streaming/readers.py index d6a1272..b68a15b 100644 --- a/core/streaming/readers.py +++ b/core/streaming/readers.py @@ -170,27 +170,44 @@ class FFmpegRTSPReader: if self.frame_ready_event.wait(timeout=restart_check_interval): self.frame_ready_event.clear() - # Read current frame (file is always exactly one frame) + # Read current frame with concurrency safety try: - with open(self.temp_file, 'rb') as f: - frame_data = f.read(bytes_per_frame) + # Try to read frame multiple times to handle race conditions + frame_data = None + for attempt in range(3): + try: + with open(self.temp_file, 'rb') as f: + frame_data = f.read(bytes_per_frame) - if len(frame_data) == bytes_per_frame: - # Convert to numpy array - frame = np.frombuffer(frame_data, dtype=np.uint8) - frame = frame.reshape((self.height, self.width, 3)) + # Validate we got a complete frame + if len(frame_data) == bytes_per_frame: + break + else: + logger.debug(f"Camera {self.camera_id}: Partial read {len(frame_data)}/{bytes_per_frame}, attempt {attempt+1}") + time.sleep(0.01) # Brief wait before retry - # Call frame callback - if self.frame_callback: - self.frame_callback(self.camera_id, frame) + except (IOError, OSError) as e: + logger.debug(f"Camera {self.camera_id}: Read error on attempt {attempt+1}: {e}") + time.sleep(0.01) - frame_count += 1 + if frame_data and len(frame_data) == bytes_per_frame: + # Convert to numpy array + frame = np.frombuffer(frame_data, dtype=np.uint8) + frame = frame.reshape((self.height, self.width, 3)) - # Log progress - current_time = time.time() - if current_time - last_log_time >= 30: - logger.info(f"Camera {self.camera_id}: {frame_count} frames processed reactively") - last_log_time = current_time + # Call frame callback directly - trust the retry logic caught corruption + if self.frame_callback: + self.frame_callback(self.camera_id, frame) + + frame_count += 1 + + # Log progress + current_time = time.time() + if current_time - last_log_time >= 30: + logger.info(f"Camera {self.camera_id}: {frame_count} frames processed reactively") + last_log_time = current_time + else: + logger.debug(f"Camera {self.camera_id}: Failed to read complete frame after retries") except (IOError, OSError) as e: logger.debug(f"Camera {self.camera_id}: File read error: {e}") diff --git a/requirements.txt b/requirements.txt index 034d18e..2afeb0e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,4 +5,5 @@ fastapi[standard] redis urllib3<2.0.0 numpy -requests \ No newline at end of file +requests +watchdog \ No newline at end of file