refactor: update FFmpegRTSPReader to use a temporary file for frame reading and improve error handling
This commit is contained in:
		
							parent
							
								
									c38b58e34c
								
							
						
					
					
						commit
						79a1189675
					
				
					 1 changed files with 81 additions and 31 deletions
				
			
		| 
						 | 
				
			
			@ -65,7 +65,12 @@ class FFmpegRTSPReader:
 | 
			
		|||
        logger.info(f"Stopped FFmpeg reader for camera {self.camera_id}")
 | 
			
		||||
 | 
			
		||||
    def _start_ffmpeg_process(self):
 | 
			
		||||
        """Start FFmpeg subprocess with CUDA hardware acceleration."""
 | 
			
		||||
        """Start FFmpeg subprocess with CUDA hardware acceleration writing to temp file."""
 | 
			
		||||
        # Create temp file path for this camera
 | 
			
		||||
        import tempfile
 | 
			
		||||
        self.temp_file = f"/tmp/claude/camera_{self.camera_id.replace(' ', '_')}.raw"
 | 
			
		||||
        os.makedirs("/tmp/claude", exist_ok=True)
 | 
			
		||||
 | 
			
		||||
        cmd = [
 | 
			
		||||
            'ffmpeg',
 | 
			
		||||
            '-hwaccel', 'cuda',
 | 
			
		||||
| 
						 | 
				
			
			@ -75,7 +80,8 @@ class FFmpegRTSPReader:
 | 
			
		|||
            '-f', 'rawvideo',
 | 
			
		||||
            '-pix_fmt', 'bgr24',
 | 
			
		||||
            '-an',  # No audio
 | 
			
		||||
            '-'  # Output to stdout
 | 
			
		||||
            '-y',  # Overwrite output file
 | 
			
		||||
            self.temp_file
 | 
			
		||||
        ]
 | 
			
		||||
 | 
			
		||||
        try:
 | 
			
		||||
| 
						 | 
				
			
			@ -85,18 +91,22 @@ class FFmpegRTSPReader:
 | 
			
		|||
                stderr=subprocess.PIPE,
 | 
			
		||||
                bufsize=0
 | 
			
		||||
            )
 | 
			
		||||
            logger.info(f"Started FFmpeg process for camera {self.camera_id}")
 | 
			
		||||
            logger.info(f"Started FFmpeg process for camera {self.camera_id} writing to {self.temp_file}")
 | 
			
		||||
 | 
			
		||||
            # Don't check process immediately - FFmpeg takes time to initialize
 | 
			
		||||
            logger.info(f"Waiting for FFmpeg to initialize for camera {self.camera_id}...")
 | 
			
		||||
            return True
 | 
			
		||||
        except Exception as e:
 | 
			
		||||
            logger.error(f"Failed to start FFmpeg for camera {self.camera_id}: {e}")
 | 
			
		||||
            return False
 | 
			
		||||
 | 
			
		||||
    def _read_frames(self):
 | 
			
		||||
        """Read frames from FFmpeg stdout pipe."""
 | 
			
		||||
        """Read frames from FFmpeg temp file."""
 | 
			
		||||
        consecutive_errors = 0
 | 
			
		||||
        frame_count = 0
 | 
			
		||||
        last_log_time = time.time()
 | 
			
		||||
        bytes_per_frame = self.width * self.height * 3  # BGR = 3 bytes per pixel
 | 
			
		||||
        last_file_size = 0
 | 
			
		||||
 | 
			
		||||
        while not self.stop_event.is_set():
 | 
			
		||||
            try:
 | 
			
		||||
| 
						 | 
				
			
			@ -106,38 +116,72 @@ class FFmpegRTSPReader:
 | 
			
		|||
                        time.sleep(5.0)
 | 
			
		||||
                        continue
 | 
			
		||||
 | 
			
		||||
                # Read one frame worth of data
 | 
			
		||||
                frame_data = self.process.stdout.read(bytes_per_frame)
 | 
			
		||||
 | 
			
		||||
                if len(frame_data) != bytes_per_frame:
 | 
			
		||||
                    consecutive_errors += 1
 | 
			
		||||
                    if consecutive_errors >= 30:
 | 
			
		||||
                        logger.error(f"Camera {self.camera_id}: Too many read errors, restarting FFmpeg")
 | 
			
		||||
                        if self.process:
 | 
			
		||||
                            self.process.terminate()
 | 
			
		||||
                        consecutive_errors = 0
 | 
			
		||||
                # Wait for temp file to exist and have content
 | 
			
		||||
                if not os.path.exists(self.temp_file):
 | 
			
		||||
                    time.sleep(0.1)
 | 
			
		||||
                    continue
 | 
			
		||||
 | 
			
		||||
                # Convert raw bytes to numpy array
 | 
			
		||||
                frame = np.frombuffer(frame_data, dtype=np.uint8)
 | 
			
		||||
                frame = frame.reshape((self.height, self.width, 3))
 | 
			
		||||
                # Check if file size changed (new frame available)
 | 
			
		||||
                try:
 | 
			
		||||
                    current_file_size = os.path.getsize(self.temp_file)
 | 
			
		||||
                    if current_file_size <= last_file_size and current_file_size > 0:
 | 
			
		||||
                        # File size didn't increase, wait for next frame
 | 
			
		||||
                        time.sleep(0.05)  # ~20 FPS max
 | 
			
		||||
                        continue
 | 
			
		||||
                    last_file_size = current_file_size
 | 
			
		||||
                except OSError:
 | 
			
		||||
                    time.sleep(0.1)
 | 
			
		||||
                    continue
 | 
			
		||||
 | 
			
		||||
                # Frame is valid
 | 
			
		||||
                consecutive_errors = 0
 | 
			
		||||
                frame_count += 1
 | 
			
		||||
                # Read the latest frame from the end of file
 | 
			
		||||
                try:
 | 
			
		||||
                    with open(self.temp_file, 'rb') as f:
 | 
			
		||||
                        # Seek to last complete frame
 | 
			
		||||
                        file_size = f.seek(0, 2)  # Seek to end
 | 
			
		||||
                        if file_size < bytes_per_frame:
 | 
			
		||||
                            time.sleep(0.1)
 | 
			
		||||
                            continue
 | 
			
		||||
 | 
			
		||||
                # Call frame callback
 | 
			
		||||
                if self.frame_callback:
 | 
			
		||||
                    try:
 | 
			
		||||
                        self.frame_callback(self.camera_id, frame)
 | 
			
		||||
                    except Exception as e:
 | 
			
		||||
                        logger.error(f"Camera {self.camera_id}: Frame callback error: {e}")
 | 
			
		||||
                        # Read last complete frame
 | 
			
		||||
                        last_frame_offset = (file_size // bytes_per_frame - 1) * bytes_per_frame
 | 
			
		||||
                        f.seek(last_frame_offset)
 | 
			
		||||
                        frame_data = f.read(bytes_per_frame)
 | 
			
		||||
 | 
			
		||||
                # Log progress
 | 
			
		||||
                current_time = time.time()
 | 
			
		||||
                if current_time - last_log_time >= 30:
 | 
			
		||||
                    logger.info(f"Camera {self.camera_id}: {frame_count} frames processed via FFmpeg")
 | 
			
		||||
                    last_log_time = current_time
 | 
			
		||||
                    if len(frame_data) != bytes_per_frame:
 | 
			
		||||
                        consecutive_errors += 1
 | 
			
		||||
                        if consecutive_errors >= 30:
 | 
			
		||||
                            logger.error(f"Camera {self.camera_id}: Too many read errors, restarting FFmpeg")
 | 
			
		||||
                            if self.process:
 | 
			
		||||
                                self.process.terminate()
 | 
			
		||||
                            consecutive_errors = 0
 | 
			
		||||
                        time.sleep(0.1)
 | 
			
		||||
                        continue
 | 
			
		||||
 | 
			
		||||
                    # Convert raw bytes to numpy array
 | 
			
		||||
                    frame = np.frombuffer(frame_data, dtype=np.uint8)
 | 
			
		||||
                    frame = frame.reshape((self.height, self.width, 3))
 | 
			
		||||
 | 
			
		||||
                    # Frame is valid
 | 
			
		||||
                    consecutive_errors = 0
 | 
			
		||||
                    frame_count += 1
 | 
			
		||||
 | 
			
		||||
                    # Call frame callback
 | 
			
		||||
                    if self.frame_callback:
 | 
			
		||||
                        try:
 | 
			
		||||
                            self.frame_callback(self.camera_id, frame)
 | 
			
		||||
                        except Exception as e:
 | 
			
		||||
                            logger.error(f"Camera {self.camera_id}: Frame callback error: {e}")
 | 
			
		||||
 | 
			
		||||
                    # Log progress
 | 
			
		||||
                    current_time = time.time()
 | 
			
		||||
                    if current_time - last_log_time >= 30:
 | 
			
		||||
                        logger.info(f"Camera {self.camera_id}: {frame_count} frames processed via temp file")
 | 
			
		||||
                        last_log_time = current_time
 | 
			
		||||
 | 
			
		||||
                except IOError as e:
 | 
			
		||||
                    logger.debug(f"Camera {self.camera_id}: File read error: {e}")
 | 
			
		||||
                    time.sleep(0.1)
 | 
			
		||||
                    continue
 | 
			
		||||
 | 
			
		||||
            except Exception as e:
 | 
			
		||||
                logger.error(f"Camera {self.camera_id}: FFmpeg read error: {e}")
 | 
			
		||||
| 
						 | 
				
			
			@ -151,6 +195,12 @@ class FFmpegRTSPReader:
 | 
			
		|||
        # Cleanup
 | 
			
		||||
        if self.process:
 | 
			
		||||
            self.process.terminate()
 | 
			
		||||
        # Clean up temp file
 | 
			
		||||
        try:
 | 
			
		||||
            if hasattr(self, 'temp_file') and os.path.exists(self.temp_file):
 | 
			
		||||
                os.remove(self.temp_file)
 | 
			
		||||
        except:
 | 
			
		||||
            pass
 | 
			
		||||
        logger.info(f"FFmpeg reader thread ended for camera {self.camera_id}")
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue