diff --git a/core/streaming/readers.py b/core/streaming/readers.py index 4b5c8ba..d17a229 100644 --- a/core/streaming/readers.py +++ b/core/streaming/readers.py @@ -12,8 +12,7 @@ import os import subprocess # import fcntl # No longer needed with atomic file operations from typing import Optional, Callable -from watchdog.observers import Observer -from watchdog.events import FileSystemEventHandler +# Removed watchdog imports - no longer using file watching # Suppress FFMPEG/H.264 error messages if needed # Set this environment variable to reduce noise from decoder errors @@ -22,31 +21,14 @@ os.environ["OPENCV_FFMPEG_LOGLEVEL"] = "-8" # Suppress FFMPEG warnings logger = logging.getLogger(__name__) -# Suppress noisy watchdog debug logs -logging.getLogger('watchdog.observers.inotify_buffer').setLevel(logging.CRITICAL) -logging.getLogger('watchdog.observers.fsevents').setLevel(logging.CRITICAL) -logging.getLogger('fsevents').setLevel(logging.CRITICAL) +# Removed watchdog logging configuration - no longer using file watching -class FrameFileHandler(FileSystemEventHandler): - """File system event handler for frame file changes.""" - - def __init__(self, callback): - self.callback = callback - self.last_modified = 0 - - def on_modified(self, event): - if event.is_directory: - return - # Debounce rapid file changes - current_time = time.time() - if current_time - self.last_modified > 0.01: # 10ms debounce - self.last_modified = current_time - self.callback() +# Removed FrameFileHandler - no longer using file watching class FFmpegRTSPReader: - """RTSP stream reader using subprocess FFmpeg with CUDA hardware acceleration and file watching.""" + """RTSP stream reader using subprocess FFmpeg piping frames directly to buffer.""" def __init__(self, camera_id: str, rtsp_url: str, max_retries: int = 3): self.camera_id = camera_id @@ -56,10 +38,8 @@ class FFmpegRTSPReader: self.stop_event = threading.Event() self.thread = None self.frame_callback: Optional[Callable] = None - self.observer = None - self.frame_ready_event = threading.Event() - # Stream specs + # Expected stream specs (for reference, actual dimensions read from PPM header) self.width = 1280 self.height = 720 @@ -91,18 +71,58 @@ class FFmpegRTSPReader: self.thread.join(timeout=5.0) logger.info(f"Stopped FFmpeg reader for camera {self.camera_id}") - def _start_ffmpeg_process(self): - """Start FFmpeg subprocess writing timestamped frames for atomic reads.""" - # Create temp file paths for this camera - self.frame_dir = "/tmp/frame" - os.makedirs(self.frame_dir, exist_ok=True) + def _probe_stream_info(self): + """Probe stream to get resolution and other info.""" + try: + cmd = [ + 'ffprobe', + '-v', 'quiet', + '-print_format', 'json', + '-show_streams', + '-select_streams', 'v:0', # First video stream + '-rtsp_transport', 'tcp', + self.rtsp_url + ] - # Use strftime pattern - FFmpeg writes each frame with unique timestamp - # This ensures each file is complete when written - camera_id_safe = self.camera_id.replace(' ', '_') - self.frame_prefix = f"camera_{camera_id_safe}" - # Using strftime pattern with seconds for unique filenames (avoid %f which may not work) - self.frame_pattern = f"{self.frame_dir}/{self.frame_prefix}_%Y%m%d_%H%M%S.ppm" + result = subprocess.run(cmd, capture_output=True, text=True, timeout=10) + if result.returncode != 0: + logger.error(f"Camera {self.camera_id}: ffprobe failed (code {result.returncode})") + if result.stderr: + logger.error(f"Camera {self.camera_id}: ffprobe stderr: {result.stderr}") + if result.stdout: + logger.debug(f"Camera {self.camera_id}: ffprobe stdout: {result.stdout}") + return None + + import json + data = json.loads(result.stdout) + if not data.get('streams'): + logger.error(f"Camera {self.camera_id}: No video streams found") + return None + + stream = data['streams'][0] + width = stream.get('width') + height = stream.get('height') + + if not width or not height: + logger.error(f"Camera {self.camera_id}: Could not determine resolution") + return None + + logger.info(f"Camera {self.camera_id}: Detected resolution {width}x{height}") + return width, height + + except Exception as e: + logger.error(f"Camera {self.camera_id}: Error probing stream: {e}") + return None + + def _start_ffmpeg_process(self): + """Start FFmpeg subprocess outputting raw RGB frames to stdout pipe.""" + # First probe the stream to get resolution + probe_result = self._probe_stream_info() + if not probe_result: + logger.error(f"Camera {self.camera_id}: Failed to probe stream info") + return False + + self.actual_width, self.actual_height = probe_result cmd = [ 'ffmpeg', @@ -111,50 +131,69 @@ class FFmpegRTSPReader: # '-hwaccel_device', '0', '-rtsp_transport', 'tcp', '-i', self.rtsp_url, - '-f', 'image2', - '-strftime', '1', # Enable strftime pattern expansion - '-pix_fmt', 'rgb24', # PPM uses RGB not BGR - '-an', # No audio - '-y', # Overwrite output file - self.frame_pattern # Write timestamped frames + '-f', 'rawvideo', # Raw video output instead of PPM + '-pix_fmt', 'rgb24', # Raw RGB24 format + # Use native stream resolution and framerate + '-an', # No audio + '-' # Output to stdout ] try: # Log the FFmpeg command for debugging logger.info(f"Starting FFmpeg for camera {self.camera_id} with command: {' '.join(cmd)}") - # Start FFmpeg detached - we don't need to communicate with it + # Start FFmpeg with stdout pipe to read frames directly self.process = subprocess.Popen( cmd, - stdout=subprocess.DEVNULL, - stderr=subprocess.DEVNULL + stdout=subprocess.PIPE, # Capture stdout for frame data + stderr=subprocess.DEVNULL, + bufsize=0 # Unbuffered for real-time processing ) - logger.info(f"Started FFmpeg process PID {self.process.pid} for camera {self.camera_id} -> {self.frame_pattern}") + logger.info(f"Started FFmpeg process PID {self.process.pid} for camera {self.camera_id} -> stdout pipe (resolution: {self.actual_width}x{self.actual_height})") return True except Exception as e: logger.error(f"Failed to start FFmpeg for camera {self.camera_id}: {e}") return False - def _setup_file_watcher(self): - """Setup file system watcher for frame directory.""" - # Setup file watcher for the frame directory - handler = FrameFileHandler(lambda: self._on_file_changed()) - self.observer = Observer() - self.observer.schedule(handler, self.frame_dir, recursive=False) - self.observer.start() - logger.info(f"Started file watcher for {self.frame_dir} with pattern {self.frame_prefix}*.ppm") + def _read_raw_frame(self, pipe): + """Read raw RGB frame data from pipe with proper buffering.""" + try: + # Calculate frame size using actual detected dimensions + frame_size = self.actual_width * self.actual_height * 3 - def _on_file_changed(self): - """Called when a new frame file is created.""" - # Signal that a new frame might be available - self.frame_ready_event.set() + # Read frame data in chunks until we have the complete frame + frame_data = b'' + bytes_remaining = frame_size + + while bytes_remaining > 0: + chunk = pipe.read(bytes_remaining) + if not chunk: # EOF + if len(frame_data) == 0: + logger.debug(f"Camera {self.camera_id}: No more data (stream ended)") + else: + logger.warning(f"Camera {self.camera_id}: Stream ended mid-frame: {len(frame_data)}/{frame_size} bytes") + return None + + frame_data += chunk + bytes_remaining -= len(chunk) + + # Convert raw RGB data to numpy array using actual dimensions + frame_array = np.frombuffer(frame_data, dtype=np.uint8) + frame_rgb = frame_array.reshape((self.actual_height, self.actual_width, 3)) + + # Convert RGB to BGR for OpenCV compatibility + frame_bgr = cv2.cvtColor(frame_rgb, cv2.COLOR_RGB2BGR) + + return frame_bgr + + except Exception as e: + logger.error(f"Camera {self.camera_id}: Error reading raw frame: {e}") + return None def _read_frames(self): - """Reactively read frames when file changes.""" + """Read frames directly from FFmpeg stdout pipe.""" frame_count = 0 last_log_time = time.time() - # Remove unused variable: bytes_per_frame = self.width * self.height * 3 - restart_check_interval = 10 # Check FFmpeg status every 10 seconds while not self.stop_event.is_set(): try: @@ -167,100 +206,45 @@ class FFmpegRTSPReader: time.sleep(5.0) continue - # Wait for FFmpeg to start writing frame files - wait_count = 0 - while wait_count < 30: - # Check if any frame files exist - import glob - frame_files = glob.glob(f"{self.frame_dir}/{self.frame_prefix}*.ppm") - if frame_files: - logger.info(f"Found {len(frame_files)} initial frame files for {self.camera_id}") - break - time.sleep(1.0) - wait_count += 1 + logger.info(f"FFmpeg started for camera {self.camera_id}, reading frames from pipe...") - if wait_count >= 30: - logger.error(f"No frame files created after 30s for {self.camera_id}") - logger.error(f"Expected pattern: {self.frame_dir}/{self.frame_prefix}*.ppm") - continue + # Read frames directly from FFmpeg stdout + try: + if self.process and self.process.stdout: + # Read raw frame data + frame = self._read_raw_frame(self.process.stdout) + if frame is None: + continue - # Setup file watcher - self._setup_file_watcher() + # Call frame callback + if self.frame_callback: + self.frame_callback(self.camera_id, frame) + logger.debug(f"Camera {self.camera_id}: Called frame callback with shape {frame.shape}") - # Wait for file change event (or timeout for health check) - if self.frame_ready_event.wait(timeout=restart_check_interval): - self.frame_ready_event.clear() + frame_count += 1 - # Read latest complete frame file - try: - import glob - # Find all frame files for this camera - frame_files = glob.glob(f"{self.frame_dir}/{self.frame_prefix}*.ppm") + # Log progress + current_time = time.time() + if current_time - last_log_time >= 30: + logger.info(f"Camera {self.camera_id}: {frame_count} frames processed via pipe") + last_log_time = current_time - if frame_files: - # Sort by filename (which includes timestamp) and get the latest - frame_files.sort() - latest_frame = frame_files[-1] - logger.debug(f"Camera {self.camera_id}: Found {len(frame_files)} frames, processing latest: {latest_frame}") - - # Read the latest frame (it's complete since FFmpeg wrote it atomically) - frame = cv2.imread(latest_frame) - - if frame is not None: - logger.debug(f"Camera {self.camera_id}: Successfully read frame {frame.shape} from {latest_frame}") - # Accept any frame dimensions initially for debugging - if self.frame_callback: - self.frame_callback(self.camera_id, frame) - logger.debug(f"Camera {self.camera_id}: Called frame callback") - - frame_count += 1 - - # Log progress - current_time = time.time() - if current_time - last_log_time >= 30: - logger.info(f"Camera {self.camera_id}: {frame_count} frames processed") - last_log_time = current_time - else: - logger.warning(f"Camera {self.camera_id}: Failed to read frame from {latest_frame}") - - # Clean up old frame files to prevent disk filling - # Keep only the latest 5 frames - if len(frame_files) > 5: - for old_file in frame_files[:-5]: - try: - os.remove(old_file) - except: - pass - else: - logger.warning(f"Camera {self.camera_id}: No frame files found in {self.frame_dir} with pattern {self.frame_prefix}*.ppm") - - except Exception as e: - logger.debug(f"Camera {self.camera_id}: Error reading frames: {e}") - pass + except Exception as e: + logger.error(f"Camera {self.camera_id}: Error reading from pipe: {e}") + # Process might have died, let it restart on next iteration + if self.process: + self.process.terminate() + self.process = None + time.sleep(1.0) except Exception as e: - logger.error(f"Camera {self.camera_id}: Error in reactive frame reading: {e}") + logger.error(f"Camera {self.camera_id}: Error in pipe frame reading: {e}") time.sleep(1.0) # Cleanup - if self.observer: - self.observer.stop() - self.observer.join() if self.process: self.process.terminate() - # Clean up all frame files for this camera - try: - if hasattr(self, 'frame_prefix') and hasattr(self, 'frame_dir'): - import glob - frame_files = glob.glob(f"{self.frame_dir}/{self.frame_prefix}*.ppm") - for frame_file in frame_files: - try: - os.remove(frame_file) - except: - pass - except: - pass - logger.info(f"Reactive FFmpeg reader ended for camera {self.camera_id}") + logger.info(f"FFmpeg pipe reader ended for camera {self.camera_id}") logger = logging.getLogger(__name__)