""" Frame readers for RTSP streams and HTTP snapshots. Optimized for 1280x720@6fps RTSP and 2560x1440 HTTP snapshots. NOTE: This module provides threading-based readers for fallback compatibility. For RTSP streams, the new multiprocessing implementation in process_manager.py is preferred and used by default for better scalability and performance. """ import cv2 import logging import time import threading import requests import numpy as np import os from typing import Optional, Callable # Suppress FFMPEG/H.264 error messages if needed # Set this environment variable to reduce noise from decoder errors os.environ["OPENCV_LOG_LEVEL"] = "ERROR" os.environ["OPENCV_FFMPEG_LOGLEVEL"] = "-8" # Suppress FFMPEG warnings logger = logging.getLogger(__name__) class RTSPReader: """RTSP stream frame reader optimized for 1280x720 @ 6fps streams.""" def __init__(self, camera_id: str, rtsp_url: str, max_retries: int = 3): self.camera_id = camera_id self.rtsp_url = rtsp_url self.max_retries = max_retries self.cap = None self.stop_event = threading.Event() self.thread = None self.frame_callback: Optional[Callable] = None # Expected stream specifications self.expected_width = 1280 self.expected_height = 720 self.expected_fps = 6 # Frame processing parameters self.frame_interval = 1.0 / self.expected_fps # ~167ms for 6fps self.error_recovery_delay = 5.0 # Increased from 2.0 for stability self.max_consecutive_errors = 30 # Increased from 10 to handle network jitter self.stream_timeout = 30.0 def set_frame_callback(self, callback: Callable[[str, np.ndarray], None]): """Set callback function to handle captured frames.""" self.frame_callback = callback def start(self): """Start the RTSP reader thread.""" if self.thread and self.thread.is_alive(): logger.warning(f"RTSP reader for {self.camera_id} already running") return self.stop_event.clear() self.thread = threading.Thread(target=self._read_frames, daemon=True) self.thread.start() logger.info(f"Started RTSP reader for camera {self.camera_id}") def stop(self): """Stop the RTSP reader thread.""" self.stop_event.set() if self.thread: self.thread.join(timeout=5.0) if self.cap: self.cap.release() logger.info(f"Stopped RTSP reader for camera {self.camera_id}") def _read_frames(self): """Main frame reading loop with H.264 error recovery.""" consecutive_errors = 0 frame_count = 0 last_log_time = time.time() last_successful_frame_time = time.time() last_frame_time = 0 while not self.stop_event.is_set(): try: # Initialize/reinitialize capture if needed if not self.cap or not self.cap.isOpened(): if not self._initialize_capture(): time.sleep(self.error_recovery_delay) continue last_successful_frame_time = time.time() # Check for stream timeout if time.time() - last_successful_frame_time > self.stream_timeout: logger.warning(f"Camera {self.camera_id}: Stream timeout, reinitializing") self._reinitialize_capture() last_successful_frame_time = time.time() continue # Rate limiting for 6fps current_time = time.time() if current_time - last_frame_time < self.frame_interval: time.sleep(0.01) # Small sleep to avoid busy waiting continue ret, frame = self.cap.read() if not ret or frame is None: consecutive_errors += 1 if consecutive_errors >= self.max_consecutive_errors: logger.error(f"Camera {self.camera_id}: Too many consecutive errors, reinitializing") self._reinitialize_capture() consecutive_errors = 0 time.sleep(self.error_recovery_delay) else: # Skip corrupted frame and continue with exponential backoff if consecutive_errors <= 5: logger.debug(f"Camera {self.camera_id}: Frame read failed (error {consecutive_errors})") elif consecutive_errors % 10 == 0: # Log every 10th error after 5 logger.warning(f"Camera {self.camera_id}: Continuing frame read failures (error {consecutive_errors})") # Exponential backoff with cap at 1 second sleep_time = min(0.1 * (1.5 ** min(consecutive_errors, 10)), 1.0) time.sleep(sleep_time) continue # Validate frame dimensions if frame.shape[1] != self.expected_width or frame.shape[0] != self.expected_height: logger.warning(f"Camera {self.camera_id}: Unexpected frame dimensions {frame.shape[1]}x{frame.shape[0]}") # Try to resize if dimensions are wrong if frame.shape[1] > 0 and frame.shape[0] > 0: frame = cv2.resize(frame, (self.expected_width, self.expected_height)) else: consecutive_errors += 1 continue # Check for corrupted frames (all black, all white, excessive noise) if self._is_frame_corrupted(frame): logger.debug(f"Camera {self.camera_id}: Corrupted frame detected, skipping") consecutive_errors += 1 continue # Frame is valid consecutive_errors = 0 frame_count += 1 last_successful_frame_time = time.time() last_frame_time = current_time # Call frame callback if self.frame_callback: try: self.frame_callback(self.camera_id, frame) except Exception as e: logger.error(f"Camera {self.camera_id}: Frame callback error: {e}") # Log progress every 30 seconds if current_time - last_log_time >= 30: logger.info(f"Camera {self.camera_id}: {frame_count} frames processed") last_log_time = current_time except Exception as e: logger.error(f"Camera {self.camera_id}: Error in frame reading loop: {e}") consecutive_errors += 1 if consecutive_errors >= self.max_consecutive_errors: self._reinitialize_capture() consecutive_errors = 0 time.sleep(self.error_recovery_delay) # Cleanup if self.cap: self.cap.release() logger.info(f"RTSP reader thread ended for camera {self.camera_id}") def _initialize_capture(self) -> bool: """Initialize video capture with hardware acceleration (NVDEC) for 1280x720@6fps.""" try: # Release previous capture if exists if self.cap: self.cap.release() time.sleep(0.5) logger.info(f"Initializing capture for camera {self.camera_id} with hardware acceleration") hw_accel_success = False # Method 1: Try GStreamer with NVDEC (most efficient on NVIDIA GPUs) if not hw_accel_success: try: # Build GStreamer pipeline for NVIDIA hardware decoding gst_pipeline = ( f"rtspsrc location={self.rtsp_url} protocols=tcp latency=100 ! " "rtph264depay ! h264parse ! " "nvv4l2decoder ! " # NVIDIA hardware decoder "nvvideoconvert ! " # NVIDIA hardware color conversion "video/x-raw,format=BGRx,width=1280,height=720 ! " "videoconvert ! " "video/x-raw,format=BGR ! " "appsink max-buffers=1 drop=true sync=false" ) logger.info(f"Attempting GStreamer NVDEC pipeline for camera {self.camera_id}") self.cap = cv2.VideoCapture(gst_pipeline, cv2.CAP_GSTREAMER) if self.cap.isOpened(): hw_accel_success = True logger.info(f"Camera {self.camera_id}: Successfully using GStreamer with NVDEC hardware acceleration") except Exception as e: logger.debug(f"Camera {self.camera_id}: GStreamer NVDEC not available: {e}") # Method 2: Try FFMPEG with NVIDIA CUVID hardware decoder if not hw_accel_success: try: import os # Set FFMPEG to use NVIDIA CUVID decoder os.environ['OPENCV_FFMPEG_CAPTURE_OPTIONS'] = 'video_codec;h264_cuvid|rtsp_transport;tcp|hwaccel;cuda' logger.info(f"Attempting FFMPEG with h264_cuvid for camera {self.camera_id}") self.cap = cv2.VideoCapture(self.rtsp_url, cv2.CAP_FFMPEG) if self.cap.isOpened(): hw_accel_success = True logger.info(f"Camera {self.camera_id}: Using FFMPEG with CUVID hardware acceleration") except Exception as e: logger.debug(f"Camera {self.camera_id}: FFMPEG CUVID not available: {e}") # Method 3: Try VAAPI hardware acceleration (for Intel/AMD GPUs) if not hw_accel_success: try: gst_pipeline = ( f"rtspsrc location={self.rtsp_url} protocols=tcp latency=100 ! " "rtph264depay ! h264parse ! " "vaapih264dec ! " # VAAPI hardware decoder "vaapipostproc ! " "video/x-raw,format=BGRx,width=1280,height=720 ! " "videoconvert ! " "video/x-raw,format=BGR ! " "appsink max-buffers=1 drop=true sync=false" ) logger.info(f"Attempting GStreamer VAAPI pipeline for camera {self.camera_id}") self.cap = cv2.VideoCapture(gst_pipeline, cv2.CAP_GSTREAMER) if self.cap.isOpened(): hw_accel_success = True logger.info(f"Camera {self.camera_id}: Successfully using GStreamer with VAAPI hardware acceleration") except Exception as e: logger.debug(f"Camera {self.camera_id}: GStreamer VAAPI not available: {e}") # Fallback: Standard FFMPEG with software decoding if not hw_accel_success: logger.warning(f"Camera {self.camera_id}: Hardware acceleration not available, falling back to software decoding") import os os.environ['OPENCV_FFMPEG_CAPTURE_OPTIONS'] = 'rtsp_transport;tcp' self.cap = cv2.VideoCapture(self.rtsp_url, cv2.CAP_FFMPEG) if not self.cap.isOpened(): logger.error(f"Failed to open stream for camera {self.camera_id}") return False # Set capture properties for 1280x720@6fps self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, self.expected_width) self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, self.expected_height) self.cap.set(cv2.CAP_PROP_FPS, self.expected_fps) # Set moderate buffer to handle network jitter while avoiding excessive latency # Buffer of 3 frames provides resilience without major delay self.cap.set(cv2.CAP_PROP_BUFFERSIZE, 3) # Set FFMPEG options for better H.264 handling self.cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*'H264')) # Verify stream properties actual_width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH)) actual_height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) actual_fps = self.cap.get(cv2.CAP_PROP_FPS) logger.info(f"Camera {self.camera_id} initialized: {actual_width}x{actual_height} @ {actual_fps}fps") # Read and discard first few frames to stabilize stream for _ in range(5): ret, _ = self.cap.read() if not ret: logger.warning(f"Camera {self.camera_id}: Failed to read initial frames") time.sleep(0.1) return True except Exception as e: logger.error(f"Error initializing capture for camera {self.camera_id}: {e}") return False def _reinitialize_capture(self): """Reinitialize capture after errors with retry logic.""" logger.info(f"Reinitializing capture for camera {self.camera_id}") if self.cap: self.cap.release() self.cap = None # Longer delay before reconnection to avoid rapid reconnect loops time.sleep(3.0) # Retry initialization up to 3 times for attempt in range(3): if self._initialize_capture(): logger.info(f"Successfully reinitialized camera {self.camera_id} on attempt {attempt + 1}") break else: logger.warning(f"Failed to reinitialize camera {self.camera_id} on attempt {attempt + 1}") time.sleep(2.0) def _is_frame_corrupted(self, frame: np.ndarray) -> bool: """Check if frame is corrupted (all black, all white, or excessive noise).""" if frame is None or frame.size == 0: return True # Check mean and standard deviation mean = np.mean(frame) std = np.std(frame) # All black or all white if mean < 5 or mean > 250: return True # No variation (stuck frame) if std < 1: return True # Excessive noise (corrupted H.264 decode) # Calculate edge density as corruption indicator edges = cv2.Canny(frame, 50, 150) edge_density = np.sum(edges > 0) / edges.size # Too many edges indicate corruption if edge_density > 0.5: return True return False class HTTPSnapshotReader: """HTTP snapshot reader optimized for 2560x1440 (2K) high quality images.""" def __init__(self, camera_id: str, snapshot_url: str, interval_ms: int = 5000, max_retries: int = 3): self.camera_id = camera_id self.snapshot_url = snapshot_url self.interval_ms = interval_ms self.max_retries = max_retries self.stop_event = threading.Event() self.thread = None self.frame_callback: Optional[Callable] = None # Expected snapshot specifications self.expected_width = 2560 self.expected_height = 1440 self.max_file_size = 10 * 1024 * 1024 # 10MB max for 2K image def set_frame_callback(self, callback: Callable[[str, np.ndarray], None]): """Set callback function to handle captured frames.""" self.frame_callback = callback def start(self): """Start the snapshot reader thread.""" if self.thread and self.thread.is_alive(): logger.warning(f"Snapshot reader for {self.camera_id} already running") return self.stop_event.clear() self.thread = threading.Thread(target=self._read_snapshots, daemon=True) self.thread.start() logger.info(f"Started snapshot reader for camera {self.camera_id}") def stop(self): """Stop the snapshot reader thread.""" self.stop_event.set() if self.thread: self.thread.join(timeout=5.0) logger.info(f"Stopped snapshot reader for camera {self.camera_id}") def _read_snapshots(self): """Main snapshot reading loop for high quality 2K images.""" retries = 0 frame_count = 0 last_log_time = time.time() interval_seconds = self.interval_ms / 1000.0 logger.info(f"Snapshot interval for camera {self.camera_id}: {interval_seconds}s") while not self.stop_event.is_set(): try: start_time = time.time() frame = self._fetch_snapshot() if frame is None: retries += 1 logger.warning(f"Failed to fetch snapshot for camera {self.camera_id}, retry {retries}/{self.max_retries}") if self.max_retries != -1 and retries > self.max_retries: logger.error(f"Max retries reached for snapshot camera {self.camera_id}") break time.sleep(min(2.0, interval_seconds)) continue # Validate image dimensions if frame.shape[1] != self.expected_width or frame.shape[0] != self.expected_height: logger.info(f"Camera {self.camera_id}: Snapshot dimensions {frame.shape[1]}x{frame.shape[0]} " f"(expected {self.expected_width}x{self.expected_height})") # Resize if needed (maintaining aspect ratio for high quality) if frame.shape[1] > 0 and frame.shape[0] > 0: # Only resize if significantly different if abs(frame.shape[1] - self.expected_width) > 100: frame = self._resize_maintain_aspect(frame, self.expected_width, self.expected_height) # Reset retry counter on successful fetch retries = 0 frame_count += 1 # Call frame callback if self.frame_callback: try: self.frame_callback(self.camera_id, frame) except Exception as e: logger.error(f"Camera {self.camera_id}: Frame callback error: {e}") # Log progress every 30 seconds current_time = time.time() if current_time - last_log_time >= 30: logger.info(f"Camera {self.camera_id}: {frame_count} snapshots processed") last_log_time = current_time # Wait for next interval elapsed = time.time() - start_time sleep_time = max(0, interval_seconds - elapsed) if sleep_time > 0: self.stop_event.wait(sleep_time) except Exception as e: logger.error(f"Error in snapshot loop for camera {self.camera_id}: {e}") retries += 1 if self.max_retries != -1 and retries > self.max_retries: break time.sleep(min(2.0, interval_seconds)) logger.info(f"Snapshot reader thread ended for camera {self.camera_id}") def _fetch_snapshot(self) -> Optional[np.ndarray]: """Fetch a single high quality snapshot from HTTP URL.""" try: # Parse URL for authentication from urllib.parse import urlparse parsed_url = urlparse(self.snapshot_url) headers = { 'User-Agent': 'Python-Detector-Worker/1.0', 'Accept': 'image/jpeg, image/png, image/*' } auth = None if parsed_url.username and parsed_url.password: from requests.auth import HTTPBasicAuth, HTTPDigestAuth auth = HTTPBasicAuth(parsed_url.username, parsed_url.password) # Reconstruct URL without credentials clean_url = f"{parsed_url.scheme}://{parsed_url.hostname}" if parsed_url.port: clean_url += f":{parsed_url.port}" clean_url += parsed_url.path if parsed_url.query: clean_url += f"?{parsed_url.query}" # Try Basic Auth first response = requests.get(clean_url, auth=auth, timeout=15, headers=headers, stream=True, verify=False) # If Basic Auth fails, try Digest Auth if response.status_code == 401: auth = HTTPDigestAuth(parsed_url.username, parsed_url.password) response = requests.get(clean_url, auth=auth, timeout=15, headers=headers, stream=True, verify=False) else: response = requests.get(self.snapshot_url, timeout=15, headers=headers, stream=True, verify=False) if response.status_code == 200: # Check content size content_length = int(response.headers.get('content-length', 0)) if content_length > self.max_file_size: logger.warning(f"Snapshot too large for camera {self.camera_id}: {content_length} bytes") return None # Read content content = response.content # Convert to numpy array image_array = np.frombuffer(content, np.uint8) # Decode as high quality image frame = cv2.imdecode(image_array, cv2.IMREAD_COLOR) if frame is None: logger.error(f"Failed to decode snapshot for camera {self.camera_id}") return None logger.debug(f"Fetched snapshot for camera {self.camera_id}: {frame.shape[1]}x{frame.shape[0]}") return frame else: logger.warning(f"HTTP {response.status_code} from {self.camera_id}") return None except requests.RequestException as e: logger.error(f"Request error fetching snapshot for {self.camera_id}: {e}") return None except Exception as e: logger.error(f"Error decoding snapshot for {self.camera_id}: {e}") return None def fetch_single_snapshot(self) -> Optional[np.ndarray]: """ Fetch a single high-quality snapshot on demand for pipeline processing. This method is for one-time fetch from HTTP URL, not continuous streaming. Returns: High quality 2K snapshot frame or None if failed """ logger.info(f"[SNAPSHOT] Fetching snapshot for {self.camera_id} from {self.snapshot_url}") # Try to fetch snapshot with retries for attempt in range(self.max_retries): frame = self._fetch_snapshot() if frame is not None: logger.info(f"[SNAPSHOT] Successfully fetched {frame.shape[1]}x{frame.shape[0]} snapshot for {self.camera_id}") return frame if attempt < self.max_retries - 1: logger.warning(f"[SNAPSHOT] Attempt {attempt + 1}/{self.max_retries} failed for {self.camera_id}, retrying...") time.sleep(0.5) logger.error(f"[SNAPSHOT] Failed to fetch snapshot for {self.camera_id} after {self.max_retries} attempts") return None def _resize_maintain_aspect(self, frame: np.ndarray, target_width: int, target_height: int) -> np.ndarray: """Resize image while maintaining aspect ratio for high quality.""" h, w = frame.shape[:2] aspect = w / h target_aspect = target_width / target_height if aspect > target_aspect: # Image is wider new_width = target_width new_height = int(target_width / aspect) else: # Image is taller new_height = target_height new_width = int(target_height * aspect) # Use INTER_LANCZOS4 for high quality downsampling resized = cv2.resize(frame, (new_width, new_height), interpolation=cv2.INTER_LANCZOS4) # Pad to target size if needed if new_width < target_width or new_height < target_height: top = (target_height - new_height) // 2 bottom = target_height - new_height - top left = (target_width - new_width) // 2 right = target_width - new_width - left resized = cv2.copyMakeBorder(resized, top, bottom, left, right, cv2.BORDER_CONSTANT, value=[0, 0, 0]) return resized