""" Frame readers for RTSP streams and HTTP snapshots. Optimized for 1280x720@6fps RTSP and 2560x1440 HTTP snapshots. """ import cv2 import logging import time import threading import requests import numpy as np import os from typing import Optional, Callable # Suppress FFMPEG/H.264 error messages if needed # Set this environment variable to reduce noise from decoder errors os.environ["OPENCV_LOG_LEVEL"] = "ERROR" os.environ["OPENCV_FFMPEG_LOGLEVEL"] = "-8" # Suppress FFMPEG warnings logger = logging.getLogger(__name__) class RTSPReader: """RTSP stream frame reader optimized for 1280x720 @ 6fps streams.""" def __init__(self, camera_id: str, rtsp_url: str, max_retries: int = 3): self.camera_id = camera_id self.rtsp_url = rtsp_url self.max_retries = max_retries self.cap = None self.stop_event = threading.Event() self.thread = None self.frame_callback: Optional[Callable] = None # Expected stream specifications self.expected_width = 1280 self.expected_height = 720 self.expected_fps = 6 # Frame processing parameters self.frame_interval = 1.0 / self.expected_fps # ~167ms for 6fps self.error_recovery_delay = 5.0 # Increased from 2.0 for stability self.max_consecutive_errors = 30 # Increased from 10 to handle network jitter self.stream_timeout = 30.0 def set_frame_callback(self, callback: Callable[[str, np.ndarray], None]): """Set callback function to handle captured frames.""" self.frame_callback = callback def start(self): """Start the RTSP reader thread.""" if self.thread and self.thread.is_alive(): logger.warning(f"RTSP reader for {self.camera_id} already running") return self.stop_event.clear() self.thread = threading.Thread(target=self._read_frames, daemon=True) self.thread.start() logger.info(f"Started RTSP reader for camera {self.camera_id}") def stop(self): """Stop the RTSP reader thread.""" self.stop_event.set() if self.thread: self.thread.join(timeout=5.0) if self.cap: self.cap.release() logger.info(f"Stopped RTSP reader for camera {self.camera_id}") def _read_frames(self): """Main frame reading loop with H.264 error recovery.""" consecutive_errors = 0 frame_count = 0 last_log_time = time.time() last_successful_frame_time = time.time() last_frame_time = 0 while not self.stop_event.is_set(): try: # Initialize/reinitialize capture if needed if not self.cap or not self.cap.isOpened(): if not self._initialize_capture(): time.sleep(self.error_recovery_delay) continue last_successful_frame_time = time.time() # Check for stream timeout if time.time() - last_successful_frame_time > self.stream_timeout: logger.warning(f"Camera {self.camera_id}: Stream timeout, reinitializing") self._reinitialize_capture() last_successful_frame_time = time.time() continue # Rate limiting for 6fps current_time = time.time() if current_time - last_frame_time < self.frame_interval: time.sleep(0.01) # Small sleep to avoid busy waiting continue ret, frame = self.cap.read() if not ret or frame is None: consecutive_errors += 1 if consecutive_errors >= self.max_consecutive_errors: logger.error(f"Camera {self.camera_id}: Too many consecutive errors, reinitializing") self._reinitialize_capture() consecutive_errors = 0 time.sleep(self.error_recovery_delay) else: # Skip corrupted frame and continue with exponential backoff if consecutive_errors <= 5: logger.debug(f"Camera {self.camera_id}: Frame read failed (error {consecutive_errors})") elif consecutive_errors % 10 == 0: # Log every 10th error after 5 logger.warning(f"Camera {self.camera_id}: Continuing frame read failures (error {consecutive_errors})") # Exponential backoff with cap at 1 second sleep_time = min(0.1 * (1.5 ** min(consecutive_errors, 10)), 1.0) time.sleep(sleep_time) continue # Validate frame dimensions if frame.shape[1] != self.expected_width or frame.shape[0] != self.expected_height: logger.warning(f"Camera {self.camera_id}: Unexpected frame dimensions {frame.shape[1]}x{frame.shape[0]}") # Try to resize if dimensions are wrong if frame.shape[1] > 0 and frame.shape[0] > 0: frame = cv2.resize(frame, (self.expected_width, self.expected_height)) else: consecutive_errors += 1 continue # Check for corrupted frames (all black, all white, excessive noise) if self._is_frame_corrupted(frame): logger.debug(f"Camera {self.camera_id}: Corrupted frame detected, skipping") consecutive_errors += 1 continue # Frame is valid consecutive_errors = 0 frame_count += 1 last_successful_frame_time = time.time() last_frame_time = current_time # Call frame callback if self.frame_callback: try: self.frame_callback(self.camera_id, frame) except Exception as e: logger.error(f"Camera {self.camera_id}: Frame callback error: {e}") # Log progress every 30 seconds if current_time - last_log_time >= 30: logger.info(f"Camera {self.camera_id}: {frame_count} frames processed") last_log_time = current_time except Exception as e: logger.error(f"Camera {self.camera_id}: Error in frame reading loop: {e}") consecutive_errors += 1 if consecutive_errors >= self.max_consecutive_errors: self._reinitialize_capture() consecutive_errors = 0 time.sleep(self.error_recovery_delay) # Cleanup if self.cap: self.cap.release() logger.info(f"RTSP reader thread ended for camera {self.camera_id}") def _initialize_capture(self) -> bool: """Initialize video capture with optimized settings for 1280x720@6fps.""" try: # Release previous capture if exists if self.cap: self.cap.release() time.sleep(0.5) logger.info(f"Initializing capture for camera {self.camera_id}") # Create capture with FFMPEG backend and TCP transport for reliability # Use TCP instead of UDP to prevent packet loss rtsp_url_tcp = self.rtsp_url.replace('rtsp://', 'rtsp://') if '?' in rtsp_url_tcp: rtsp_url_tcp += '&tcp' else: rtsp_url_tcp += '?tcp' # Alternative: Set environment variable for RTSP transport import os os.environ['OPENCV_FFMPEG_CAPTURE_OPTIONS'] = 'rtsp_transport;tcp' self.cap = cv2.VideoCapture(self.rtsp_url, cv2.CAP_FFMPEG) if not self.cap.isOpened(): logger.error(f"Failed to open stream for camera {self.camera_id}") return False # Set capture properties for 1280x720@6fps self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, self.expected_width) self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, self.expected_height) self.cap.set(cv2.CAP_PROP_FPS, self.expected_fps) # Set moderate buffer to handle network jitter while avoiding excessive latency # Buffer of 3 frames provides resilience without major delay self.cap.set(cv2.CAP_PROP_BUFFERSIZE, 3) # Set FFMPEG options for better H.264 handling self.cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*'H264')) # Verify stream properties actual_width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH)) actual_height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) actual_fps = self.cap.get(cv2.CAP_PROP_FPS) logger.info(f"Camera {self.camera_id} initialized: {actual_width}x{actual_height} @ {actual_fps}fps") # Read and discard first few frames to stabilize stream for _ in range(5): ret, _ = self.cap.read() if not ret: logger.warning(f"Camera {self.camera_id}: Failed to read initial frames") time.sleep(0.1) return True except Exception as e: logger.error(f"Error initializing capture for camera {self.camera_id}: {e}") return False def _reinitialize_capture(self): """Reinitialize capture after errors with retry logic.""" logger.info(f"Reinitializing capture for camera {self.camera_id}") if self.cap: self.cap.release() self.cap = None # Longer delay before reconnection to avoid rapid reconnect loops time.sleep(3.0) # Retry initialization up to 3 times for attempt in range(3): if self._initialize_capture(): logger.info(f"Successfully reinitialized camera {self.camera_id} on attempt {attempt + 1}") break else: logger.warning(f"Failed to reinitialize camera {self.camera_id} on attempt {attempt + 1}") time.sleep(2.0) def _is_frame_corrupted(self, frame: np.ndarray) -> bool: """Check if frame is corrupted (all black, all white, or excessive noise).""" if frame is None or frame.size == 0: return True # Check mean and standard deviation mean = np.mean(frame) std = np.std(frame) # All black or all white if mean < 5 or mean > 250: return True # No variation (stuck frame) if std < 1: return True # Excessive noise (corrupted H.264 decode) # Calculate edge density as corruption indicator edges = cv2.Canny(frame, 50, 150) edge_density = np.sum(edges > 0) / edges.size # Too many edges indicate corruption if edge_density > 0.5: return True return False class HTTPSnapshotReader: """HTTP snapshot reader optimized for 2560x1440 (2K) high quality images.""" def __init__(self, camera_id: str, snapshot_url: str, interval_ms: int = 5000, max_retries: int = 3): self.camera_id = camera_id self.snapshot_url = snapshot_url self.interval_ms = interval_ms self.max_retries = max_retries self.stop_event = threading.Event() self.thread = None self.frame_callback: Optional[Callable] = None # Expected snapshot specifications self.expected_width = 2560 self.expected_height = 1440 self.max_file_size = 10 * 1024 * 1024 # 10MB max for 2K image def set_frame_callback(self, callback: Callable[[str, np.ndarray], None]): """Set callback function to handle captured frames.""" self.frame_callback = callback def start(self): """Start the snapshot reader thread.""" if self.thread and self.thread.is_alive(): logger.warning(f"Snapshot reader for {self.camera_id} already running") return self.stop_event.clear() self.thread = threading.Thread(target=self._read_snapshots, daemon=True) self.thread.start() logger.info(f"Started snapshot reader for camera {self.camera_id}") def stop(self): """Stop the snapshot reader thread.""" self.stop_event.set() if self.thread: self.thread.join(timeout=5.0) logger.info(f"Stopped snapshot reader for camera {self.camera_id}") def _read_snapshots(self): """Main snapshot reading loop for high quality 2K images.""" retries = 0 frame_count = 0 last_log_time = time.time() interval_seconds = self.interval_ms / 1000.0 logger.info(f"Snapshot interval for camera {self.camera_id}: {interval_seconds}s") while not self.stop_event.is_set(): try: start_time = time.time() frame = self._fetch_snapshot() if frame is None: retries += 1 logger.warning(f"Failed to fetch snapshot for camera {self.camera_id}, retry {retries}/{self.max_retries}") if self.max_retries != -1 and retries > self.max_retries: logger.error(f"Max retries reached for snapshot camera {self.camera_id}") break time.sleep(min(2.0, interval_seconds)) continue # Validate image dimensions if frame.shape[1] != self.expected_width or frame.shape[0] != self.expected_height: logger.info(f"Camera {self.camera_id}: Snapshot dimensions {frame.shape[1]}x{frame.shape[0]} " f"(expected {self.expected_width}x{self.expected_height})") # Resize if needed (maintaining aspect ratio for high quality) if frame.shape[1] > 0 and frame.shape[0] > 0: # Only resize if significantly different if abs(frame.shape[1] - self.expected_width) > 100: frame = self._resize_maintain_aspect(frame, self.expected_width, self.expected_height) # Reset retry counter on successful fetch retries = 0 frame_count += 1 # Call frame callback if self.frame_callback: try: self.frame_callback(self.camera_id, frame) except Exception as e: logger.error(f"Camera {self.camera_id}: Frame callback error: {e}") # Log progress every 30 seconds current_time = time.time() if current_time - last_log_time >= 30: logger.info(f"Camera {self.camera_id}: {frame_count} snapshots processed") last_log_time = current_time # Wait for next interval elapsed = time.time() - start_time sleep_time = max(0, interval_seconds - elapsed) if sleep_time > 0: self.stop_event.wait(sleep_time) except Exception as e: logger.error(f"Error in snapshot loop for camera {self.camera_id}: {e}") retries += 1 if self.max_retries != -1 and retries > self.max_retries: break time.sleep(min(2.0, interval_seconds)) logger.info(f"Snapshot reader thread ended for camera {self.camera_id}") def _fetch_snapshot(self) -> Optional[np.ndarray]: """Fetch a single high quality snapshot from HTTP URL.""" try: # Parse URL for authentication from urllib.parse import urlparse parsed_url = urlparse(self.snapshot_url) headers = { 'User-Agent': 'Python-Detector-Worker/1.0', 'Accept': 'image/jpeg, image/png, image/*' } auth = None if parsed_url.username and parsed_url.password: from requests.auth import HTTPBasicAuth, HTTPDigestAuth auth = HTTPBasicAuth(parsed_url.username, parsed_url.password) # Reconstruct URL without credentials clean_url = f"{parsed_url.scheme}://{parsed_url.hostname}" if parsed_url.port: clean_url += f":{parsed_url.port}" clean_url += parsed_url.path if parsed_url.query: clean_url += f"?{parsed_url.query}" # Try Basic Auth first response = requests.get(clean_url, auth=auth, timeout=15, headers=headers, stream=True, verify=False) # If Basic Auth fails, try Digest Auth if response.status_code == 401: auth = HTTPDigestAuth(parsed_url.username, parsed_url.password) response = requests.get(clean_url, auth=auth, timeout=15, headers=headers, stream=True, verify=False) else: response = requests.get(self.snapshot_url, timeout=15, headers=headers, stream=True, verify=False) if response.status_code == 200: # Check content size content_length = int(response.headers.get('content-length', 0)) if content_length > self.max_file_size: logger.warning(f"Snapshot too large for camera {self.camera_id}: {content_length} bytes") return None # Read content content = response.content # Convert to numpy array image_array = np.frombuffer(content, np.uint8) # Decode as high quality image frame = cv2.imdecode(image_array, cv2.IMREAD_COLOR) if frame is None: logger.error(f"Failed to decode snapshot for camera {self.camera_id}") return None logger.debug(f"Fetched snapshot for camera {self.camera_id}: {frame.shape[1]}x{frame.shape[0]}") return frame else: logger.warning(f"HTTP {response.status_code} from {self.camera_id}") return None except requests.RequestException as e: logger.error(f"Request error fetching snapshot for {self.camera_id}: {e}") return None except Exception as e: logger.error(f"Error decoding snapshot for {self.camera_id}: {e}") return None def fetch_single_snapshot(self) -> Optional[np.ndarray]: """ Fetch a single high-quality snapshot on demand for pipeline processing. This method is for one-time fetch from HTTP URL, not continuous streaming. Returns: High quality 2K snapshot frame or None if failed """ logger.info(f"[SNAPSHOT] Fetching snapshot for {self.camera_id} from {self.snapshot_url}") # Try to fetch snapshot with retries for attempt in range(self.max_retries): frame = self._fetch_snapshot() if frame is not None: logger.info(f"[SNAPSHOT] Successfully fetched {frame.shape[1]}x{frame.shape[0]} snapshot for {self.camera_id}") return frame if attempt < self.max_retries - 1: logger.warning(f"[SNAPSHOT] Attempt {attempt + 1}/{self.max_retries} failed for {self.camera_id}, retrying...") time.sleep(0.5) logger.error(f"[SNAPSHOT] Failed to fetch snapshot for {self.camera_id} after {self.max_retries} attempts") return None def _resize_maintain_aspect(self, frame: np.ndarray, target_width: int, target_height: int) -> np.ndarray: """Resize image while maintaining aspect ratio for high quality.""" h, w = frame.shape[:2] aspect = w / h target_aspect = target_width / target_height if aspect > target_aspect: # Image is wider new_width = target_width new_height = int(target_width / aspect) else: # Image is taller new_height = target_height new_width = int(target_height * aspect) # Use INTER_LANCZOS4 for high quality downsampling resized = cv2.resize(frame, (new_width, new_height), interpolation=cv2.INTER_LANCZOS4) # Pad to target size if needed if new_width < target_width or new_height < target_height: top = (target_height - new_height) // 2 bottom = target_height - new_height - top left = (target_width - new_width) // 2 right = target_width - new_width - left resized = cv2.copyMakeBorder(resized, top, bottom, left, right, cv2.BORDER_CONSTANT, value=[0, 0, 0]) return resized