From 2808316e94f09db23ef3a922b95aae97a9aec847 Mon Sep 17 00:00:00 2001 From: Siwat Sirichai Date: Fri, 26 Sep 2025 19:42:41 +0700 Subject: [PATCH] fix: remove unused RTSPReader import and related code --- core/streaming/__init__.py | 3 +- core/streaming/manager.py | 2 +- core/streaming/readers.py | 444 +++++++++---------------------------- 3 files changed, 112 insertions(+), 337 deletions(-) diff --git a/core/streaming/__init__.py b/core/streaming/__init__.py index d878aac..93005ab 100644 --- a/core/streaming/__init__.py +++ b/core/streaming/__init__.py @@ -2,13 +2,12 @@ Streaming system for RTSP and HTTP camera feeds. Provides modular frame readers, buffers, and stream management. """ -from .readers import RTSPReader, HTTPSnapshotReader, FFmpegRTSPReader +from .readers import HTTPSnapshotReader, FFmpegRTSPReader from .buffers import FrameBuffer, CacheBuffer, shared_frame_buffer, shared_cache_buffer from .manager import StreamManager, StreamConfig, SubscriptionInfo, shared_stream_manager, initialize_stream_manager __all__ = [ # Readers - 'RTSPReader', 'HTTPSnapshotReader', 'FFmpegRTSPReader', diff --git a/core/streaming/manager.py b/core/streaming/manager.py index 0c026e7..5b4637c 100644 --- a/core/streaming/manager.py +++ b/core/streaming/manager.py @@ -9,7 +9,7 @@ from typing import Dict, Set, Optional, List, Any from dataclasses import dataclass from collections import defaultdict -from .readers import RTSPReader, HTTPSnapshotReader, FFmpegRTSPReader +from .readers import HTTPSnapshotReader, FFmpegRTSPReader from .buffers import shared_cache_buffer from ..tracking.integration import TrackingPipelineIntegration diff --git a/core/streaming/readers.py b/core/streaming/readers.py index 6a1dab8..5684997 100644 --- a/core/streaming/readers.py +++ b/core/streaming/readers.py @@ -8,16 +8,10 @@ import time import threading import requests import numpy as np -import os import subprocess -# import fcntl # No longer needed with atomic file operations from typing import Optional, Callable -# Removed watchdog imports - no longer using file watching -# Suppress FFMPEG/H.264 error messages if needed -# Set this environment variable to reduce noise from decoder errors -os.environ["OPENCV_LOG_LEVEL"] = "ERROR" -os.environ["OPENCV_FFMPEG_LOGLEVEL"] = "-8" # Suppress FFMPEG warnings + logger = logging.getLogger(__name__) @@ -65,12 +59,20 @@ class FFmpegRTSPReader: self.process = None self.stop_event = threading.Event() self.thread = None + self.stderr_thread = None self.frame_callback: Optional[Callable] = None # Expected stream specs (for reference, actual dimensions read from PPM header) self.width = 1280 self.height = 720 + # Watchdog timers for stream reliability + self.process_start_time = None + self.last_frame_time = None + self.is_restart = False # Track if this is a restart (shorter timeout) + self.first_start_timeout = 30.0 # 30s timeout on first start + self.restart_timeout = 15.0 # 15s timeout after restart + def set_frame_callback(self, callback: Callable[[str, np.ndarray], None]): """Set callback function to handle captured frames.""" self.frame_callback = callback @@ -97,6 +99,8 @@ class FFmpegRTSPReader: self.process.kill() if self.thread: self.thread.join(timeout=5.0) + if self.stderr_thread: + self.stderr_thread.join(timeout=2.0) log_info(self.camera_id, "Stream stopped") # Removed _probe_stream_info - BMP headers contain dimensions @@ -122,9 +126,30 @@ class FFmpegRTSPReader: self.process = subprocess.Popen( cmd, stdout=subprocess.PIPE, # Capture stdout for frame data - stderr=subprocess.DEVNULL, + stderr=subprocess.PIPE, # Capture stderr for error logging bufsize=0 # Unbuffered for real-time processing ) + + # Start stderr reading thread + if self.stderr_thread and self.stderr_thread.is_alive(): + # Stop previous stderr thread + try: + self.stderr_thread.join(timeout=1.0) + except: + pass + + self.stderr_thread = threading.Thread(target=self._read_stderr, daemon=True) + self.stderr_thread.start() + + # Set process start time for watchdog + self.process_start_time = time.time() + self.last_frame_time = None # Reset frame time + + # After successful restart, next timeout will be back to 30s + if self.is_restart: + log_info(self.camera_id, f"FFmpeg restarted successfully, next timeout: {self.first_start_timeout}s") + self.is_restart = False + return True except Exception as e: log_error(self.camera_id, f"FFmpeg startup failed: {e}") @@ -180,6 +205,74 @@ class FFmpegRTSPReader: except Exception: return None # Error reading frame silently + def _read_stderr(self): + """Read and log FFmpeg stderr output in background thread.""" + if not self.process or not self.process.stderr: + return + + try: + while self.process and self.process.poll() is None: + try: + line = self.process.stderr.readline() + if line: + error_msg = line.decode('utf-8', errors='ignore').strip() + if error_msg and not self.stop_event.is_set(): + # Filter out common noise but log actual errors + if any(keyword in error_msg.lower() for keyword in ['error', 'failed', 'cannot', 'invalid']): + log_error(self.camera_id, f"FFmpeg: {error_msg}") + elif 'warning' in error_msg.lower(): + log_warning(self.camera_id, f"FFmpeg: {error_msg}") + except Exception: + break + except Exception: + pass + + def _check_watchdog_timeout(self) -> bool: + """Check if watchdog timeout has been exceeded.""" + if not self.process_start_time: + return False + + current_time = time.time() + time_since_start = current_time - self.process_start_time + + # Determine timeout based on whether this is a restart + timeout = self.restart_timeout if self.is_restart else self.first_start_timeout + + # If no frames received yet, check against process start time + if not self.last_frame_time: + if time_since_start > timeout: + log_warning(self.camera_id, f"Watchdog timeout: No frames for {time_since_start:.1f}s (limit: {timeout}s)") + return True + else: + # Check time since last frame + time_since_frame = current_time - self.last_frame_time + if time_since_frame > timeout: + log_warning(self.camera_id, f"Watchdog timeout: No frames for {time_since_frame:.1f}s (limit: {timeout}s)") + return True + + return False + + def _restart_ffmpeg_process(self): + """Restart FFmpeg process due to watchdog timeout.""" + log_warning(self.camera_id, "Watchdog triggered FFmpeg restart") + + # Terminate current process + if self.process: + try: + self.process.terminate() + self.process.wait(timeout=3) + except subprocess.TimeoutExpired: + self.process.kill() + except Exception: + pass + self.process = None + + # Mark as restart for shorter timeout + self.is_restart = True + + # Small delay before restart + time.sleep(1.0) + def _read_frames(self): """Read frames directly from FFmpeg stdout pipe.""" frame_count = 0 @@ -187,6 +280,12 @@ class FFmpegRTSPReader: while not self.stop_event.is_set(): try: + # Check watchdog timeout if process is running + if self.process and self.process.poll() is None: + if self._check_watchdog_timeout(): + self._restart_ffmpeg_process() + continue + # Start FFmpeg if not running if not self.process or self.process.poll() is not None: if self.process and self.process.poll() is not None: @@ -204,6 +303,9 @@ class FFmpegRTSPReader: if frame is None: continue + # Update watchdog - we got a frame + self.last_frame_time = time.time() + # Call frame callback if self.frame_callback: self.frame_callback(self.camera_id, frame) @@ -234,332 +336,6 @@ class FFmpegRTSPReader: logger = logging.getLogger(__name__) -class RTSPReader: - """RTSP stream frame reader optimized for 1280x720 @ 6fps streams.""" - - def __init__(self, camera_id: str, rtsp_url: str, max_retries: int = 3): - self.camera_id = camera_id - self.rtsp_url = rtsp_url - self.max_retries = max_retries - self.cap = None - self.stop_event = threading.Event() - self.thread = None - self.frame_callback: Optional[Callable] = None - - # Expected stream specifications - self.expected_width = 1280 - self.expected_height = 720 - self.expected_fps = 6 - - # Frame processing parameters - self.error_recovery_delay = 5.0 # Increased from 2.0 for stability - self.max_consecutive_errors = 30 # Increased from 10 to handle network jitter - self.stream_timeout = 30.0 - - def set_frame_callback(self, callback: Callable[[str, np.ndarray], None]): - """Set callback function to handle captured frames.""" - self.frame_callback = callback - - def start(self): - """Start the RTSP reader thread.""" - if self.thread and self.thread.is_alive(): - logger.warning(f"RTSP reader for {self.camera_id} already running") - return - - self.stop_event.clear() - self.thread = threading.Thread(target=self._read_frames, daemon=True) - self.thread.start() - logger.info(f"Started RTSP reader for camera {self.camera_id}") - - def stop(self): - """Stop the RTSP reader thread.""" - self.stop_event.set() - if self.thread: - self.thread.join(timeout=5.0) - if self.cap: - self.cap.release() - logger.info(f"Stopped RTSP reader for camera {self.camera_id}") - - def _read_frames(self): - """Main frame reading loop with H.264 error recovery.""" - consecutive_errors = 0 - frame_count = 0 - last_log_time = time.time() - last_successful_frame_time = time.time() - - while not self.stop_event.is_set(): - try: - # Initialize/reinitialize capture if needed - if not self.cap or not self.cap.isOpened(): - if not self._initialize_capture(): - time.sleep(self.error_recovery_delay) - continue - last_successful_frame_time = time.time() - - # Check for stream timeout - if time.time() - last_successful_frame_time > self.stream_timeout: - logger.warning(f"Camera {self.camera_id}: Stream timeout, reinitializing") - self._reinitialize_capture() - last_successful_frame_time = time.time() - continue - - # Read frame immediately without rate limiting for minimum latency - try: - ret, frame = self.cap.read() - if ret and frame is None: - # Grab succeeded but retrieve failed - decoder issue - logger.error(f"Camera {self.camera_id}: Frame grab OK but decode failed") - except Exception as read_error: - logger.error(f"Camera {self.camera_id}: cap.read() threw exception: {type(read_error).__name__}: {read_error}") - ret, frame = False, None - - if not ret or frame is None: - consecutive_errors += 1 - - # Enhanced logging to diagnose the issue - logger.error(f"Camera {self.camera_id}: cap.read() failed - ret={ret}, frame={frame is not None}") - - # Try to get more info from the capture - try: - if self.cap and self.cap.isOpened(): - backend = self.cap.getBackendName() - pos_frames = self.cap.get(cv2.CAP_PROP_POS_FRAMES) - logger.error(f"Camera {self.camera_id}: Capture open, backend: {backend}, pos_frames: {pos_frames}") - else: - logger.error(f"Camera {self.camera_id}: Capture is closed or None!") - except Exception as info_error: - logger.error(f"Camera {self.camera_id}: Error getting capture info: {type(info_error).__name__}: {info_error}") - - if consecutive_errors >= self.max_consecutive_errors: - logger.error(f"Camera {self.camera_id}: Too many consecutive errors ({consecutive_errors}), reinitializing") - self._reinitialize_capture() - consecutive_errors = 0 - time.sleep(self.error_recovery_delay) - else: - # Skip corrupted frame and continue with exponential backoff - if consecutive_errors <= 5: - logger.debug(f"Camera {self.camera_id}: Frame read failed (error {consecutive_errors})") - elif consecutive_errors % 10 == 0: # Log every 10th error after 5 - logger.warning(f"Camera {self.camera_id}: Continuing frame read failures (error {consecutive_errors})") - - # Exponential backoff with cap at 1 second - sleep_time = min(0.1 * (1.5 ** min(consecutive_errors, 10)), 1.0) - time.sleep(sleep_time) - continue - - # Accept any valid frame dimensions - don't force specific resolution - if frame.shape[1] <= 0 or frame.shape[0] <= 0: - consecutive_errors += 1 - continue - - # Check for corrupted frames (all black, all white, excessive noise) - if self._is_frame_corrupted(frame): - logger.debug(f"Camera {self.camera_id}: Corrupted frame detected, skipping") - consecutive_errors += 1 - continue - - # Frame is valid - consecutive_errors = 0 - frame_count += 1 - last_successful_frame_time = time.time() - - # Call frame callback - if self.frame_callback: - try: - self.frame_callback(self.camera_id, frame) - except Exception as e: - logger.error(f"Camera {self.camera_id}: Frame callback error: {e}") - - # Log progress every 30 seconds - current_time = time.time() - if current_time - last_log_time >= 30: - logger.info(f"Camera {self.camera_id}: {frame_count} frames processed") - last_log_time = current_time - - except Exception as e: - logger.error(f"Camera {self.camera_id}: Error in frame reading loop: {e}") - consecutive_errors += 1 - if consecutive_errors >= self.max_consecutive_errors: - self._reinitialize_capture() - consecutive_errors = 0 - time.sleep(self.error_recovery_delay) - - # Cleanup - if self.cap: - self.cap.release() - logger.info(f"RTSP reader thread ended for camera {self.camera_id}") - - def _initialize_capture(self) -> bool: - """Initialize video capture with FFmpeg hardware acceleration (CUVID/NVDEC) for 1280x720@6fps.""" - try: - # Release previous capture if exists - if self.cap: - self.cap.release() - time.sleep(0.5) - - logger.info(f"Initializing capture for camera {self.camera_id} with FFmpeg hardware acceleration") - hw_accel_success = False - - # Method 1: Try OpenCV CUDA VideoReader (if built with CUVID support) - if not hw_accel_success: - try: - # Check if OpenCV was built with CUDA codec support - build_info = cv2.getBuildInformation() - if 'cudacodec' in build_info or 'CUVID' in build_info: - logger.info(f"Attempting OpenCV CUDA VideoReader for camera {self.camera_id}") - - # Use OpenCV's CUDA backend - self.cap = cv2.VideoCapture(self.rtsp_url, cv2.CAP_FFMPEG, [ - cv2.CAP_PROP_HW_ACCELERATION, cv2.VIDEO_ACCELERATION_ANY - ]) - - if self.cap.isOpened(): - hw_accel_success = True - logger.info(f"Camera {self.camera_id}: Using OpenCV CUDA hardware acceleration") - else: - logger.debug(f"Camera {self.camera_id}: OpenCV not built with CUDA codec support") - except Exception as e: - logger.debug(f"Camera {self.camera_id}: OpenCV CUDA not available: {e}") - - # Method 2: Try FFmpeg with optimal hardware acceleration (CUVID/NVDEC) - if not hw_accel_success: - try: - from core.utils.ffmpeg_detector import get_optimal_rtsp_options - import os - - # Get optimal FFmpeg options based on detected capabilities - optimal_options = get_optimal_rtsp_options(self.rtsp_url) - os.environ['OPENCV_FFMPEG_CAPTURE_OPTIONS'] = optimal_options - - logger.info(f"Attempting FFmpeg with detected hardware acceleration for camera {self.camera_id}") - logger.debug(f"Camera {self.camera_id}: Using FFmpeg options: {optimal_options}") - - self.cap = cv2.VideoCapture(self.rtsp_url, cv2.CAP_FFMPEG) - - if self.cap.isOpened(): - hw_accel_success = True - # Try to get backend info to confirm hardware acceleration - backend = self.cap.getBackendName() - logger.info(f"Camera {self.camera_id}: Using FFmpeg hardware acceleration (backend: {backend})") - except Exception as e: - logger.debug(f"Camera {self.camera_id}: FFmpeg optimal hardware acceleration not available: {e}") - - # Method 3: Try FFmpeg with NVIDIA NVDEC (better for RTX 3060) - if not hw_accel_success: - try: - import os - os.environ['OPENCV_FFMPEG_CAPTURE_OPTIONS'] = 'hwaccel;cuda|hwaccel_device;0|rtsp_transport;tcp' - - logger.info(f"Attempting FFmpeg with NVDEC hardware acceleration for camera {self.camera_id}") - self.cap = cv2.VideoCapture(self.rtsp_url, cv2.CAP_FFMPEG) - - if self.cap.isOpened(): - hw_accel_success = True - logger.info(f"Camera {self.camera_id}: Using FFmpeg NVDEC hardware acceleration") - except Exception as e: - logger.debug(f"Camera {self.camera_id}: FFmpeg NVDEC not available: {e}") - - # Method 4: Try FFmpeg with VAAPI (Intel/AMD GPUs) - if not hw_accel_success: - try: - import os - os.environ['OPENCV_FFMPEG_CAPTURE_OPTIONS'] = 'hwaccel;vaapi|hwaccel_device;/dev/dri/renderD128|video_codec;h264|rtsp_transport;tcp' - - logger.info(f"Attempting FFmpeg with VAAPI for camera {self.camera_id}") - self.cap = cv2.VideoCapture(self.rtsp_url, cv2.CAP_FFMPEG) - - if self.cap.isOpened(): - hw_accel_success = True - logger.info(f"Camera {self.camera_id}: Using FFmpeg VAAPI hardware acceleration") - except Exception as e: - logger.debug(f"Camera {self.camera_id}: FFmpeg VAAPI not available: {e}") - - # Fallback: Standard FFmpeg with software decoding - if not hw_accel_success: - logger.warning(f"Camera {self.camera_id}: Hardware acceleration not available, falling back to software decoding") - import os - os.environ['OPENCV_FFMPEG_CAPTURE_OPTIONS'] = 'rtsp_transport;tcp' - self.cap = cv2.VideoCapture(self.rtsp_url, cv2.CAP_FFMPEG) - - if not self.cap.isOpened(): - logger.error(f"Failed to open stream for camera {self.camera_id}") - return False - - # Don't force resolution/fps - let the stream determine its natural specs - # The camera will provide whatever resolution/fps it supports - - - # Set FFMPEG options for better H.264 handling - self.cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*'H264')) - - # Verify stream properties - actual_width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH)) - actual_height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) - actual_fps = self.cap.get(cv2.CAP_PROP_FPS) - - logger.info(f"Camera {self.camera_id} initialized: {actual_width}x{actual_height} @ {actual_fps}fps") - - # Read and discard first few frames to stabilize stream - for _ in range(5): - ret, _ = self.cap.read() - if not ret: - logger.warning(f"Camera {self.camera_id}: Failed to read initial frames") - time.sleep(0.1) - - return True - - except Exception as e: - logger.error(f"Error initializing capture for camera {self.camera_id}: {e}") - return False - - def _reinitialize_capture(self): - """Reinitialize capture after errors with retry logic.""" - logger.info(f"Reinitializing capture for camera {self.camera_id}") - if self.cap: - self.cap.release() - self.cap = None - - # Longer delay before reconnection to avoid rapid reconnect loops - time.sleep(3.0) - - # Retry initialization up to 3 times - for attempt in range(3): - if self._initialize_capture(): - logger.info(f"Successfully reinitialized camera {self.camera_id} on attempt {attempt + 1}") - break - else: - logger.warning(f"Failed to reinitialize camera {self.camera_id} on attempt {attempt + 1}") - time.sleep(2.0) - - def _is_frame_corrupted(self, frame: np.ndarray) -> bool: - """Check if frame is corrupted (all black, all white, or excessive noise).""" - if frame is None or frame.size == 0: - return True - - # Check mean and standard deviation - mean = np.mean(frame) - std = np.std(frame) - - # All black or all white - if mean < 5 or mean > 250: - return True - - # No variation (stuck frame) - if std < 1: - return True - - # Excessive noise (corrupted H.264 decode) - # Calculate edge density as corruption indicator - edges = cv2.Canny(frame, 50, 150) - edge_density = np.sum(edges > 0) / edges.size - - # Too many edges indicate corruption - if edge_density > 0.5: - return True - - return False - - class HTTPSnapshotReader: """HTTP snapshot reader optimized for 2560x1440 (2K) high quality images."""