""" Frame readers for RTSP streams and HTTP snapshots. Optimized for 1280x720@6fps RTSP and 2560x1440 HTTP snapshots. """ import cv2 import logging import time import threading import requests import numpy as np import subprocess from typing import Optional, Callable logger = logging.getLogger(__name__) # Color codes for pretty logging class Colors: GREEN = '\033[92m' YELLOW = '\033[93m' RED = '\033[91m' BLUE = '\033[94m' PURPLE = '\033[95m' CYAN = '\033[96m' WHITE = '\033[97m' BOLD = '\033[1m' END = '\033[0m' def log_success(camera_id: str, message: str): """Log success messages in green""" logger.info(f"{Colors.GREEN}[{camera_id}] {message}{Colors.END}") def log_warning(camera_id: str, message: str): """Log warnings in yellow""" logger.warning(f"{Colors.YELLOW}[{camera_id}] {message}{Colors.END}") def log_error(camera_id: str, message: str): """Log errors in red""" logger.error(f"{Colors.RED}[{camera_id}] {message}{Colors.END}") def log_info(camera_id: str, message: str): """Log info in cyan""" logger.info(f"{Colors.CYAN}[{camera_id}] {message}{Colors.END}") class FFmpegRTSPReader: """RTSP stream reader using subprocess FFmpeg piping frames directly to buffer.""" def __init__(self, camera_id: str, rtsp_url: str, max_retries: int = 3): self.camera_id = camera_id self.rtsp_url = rtsp_url self.max_retries = max_retries self.process = None self.stop_event = threading.Event() self.thread = None self.stderr_thread = None self.frame_callback: Optional[Callable] = None # Expected stream specs (for reference, actual dimensions read from PPM header) self.width = 1280 self.height = 720 # Watchdog timers for stream reliability self.process_start_time = None self.last_frame_time = None self.is_restart = False # Track if this is a restart (shorter timeout) self.first_start_timeout = 30.0 # 30s timeout on first start self.restart_timeout = 15.0 # 15s timeout after restart def set_frame_callback(self, callback: Callable[[str, np.ndarray], None]): """Set callback function to handle captured frames.""" self.frame_callback = callback def start(self): """Start the FFmpeg subprocess reader.""" if self.thread and self.thread.is_alive(): logger.warning(f"FFmpeg reader for {self.camera_id} already running") return self.stop_event.clear() self.thread = threading.Thread(target=self._read_frames, daemon=True) self.thread.start() log_success(self.camera_id, "Stream started") def stop(self): """Stop the FFmpeg subprocess reader.""" self.stop_event.set() if self.process: self.process.terminate() try: self.process.wait(timeout=5) except subprocess.TimeoutExpired: self.process.kill() if self.thread: self.thread.join(timeout=5.0) if self.stderr_thread: self.stderr_thread.join(timeout=2.0) log_info(self.camera_id, "Stream stopped") # Removed _probe_stream_info - BMP headers contain dimensions def _start_ffmpeg_process(self): """Start FFmpeg subprocess outputting BMP frames to stdout pipe.""" cmd = [ 'ffmpeg', # DO NOT REMOVE '-hwaccel', 'cuda', '-hwaccel_device', '0', '-rtsp_transport', 'tcp', '-i', self.rtsp_url, '-f', 'image2pipe', # Output images to pipe '-vcodec', 'bmp', # BMP format with header containing dimensions # Use native stream resolution and framerate '-an', # No audio '-' # Output to stdout ] try: # Start FFmpeg with stdout pipe to read frames directly self.process = subprocess.Popen( cmd, stdout=subprocess.PIPE, # Capture stdout for frame data stderr=subprocess.PIPE, # Capture stderr for error logging bufsize=0 # Unbuffered for real-time processing ) # Start stderr reading thread if self.stderr_thread and self.stderr_thread.is_alive(): # Stop previous stderr thread try: self.stderr_thread.join(timeout=1.0) except: pass self.stderr_thread = threading.Thread(target=self._read_stderr, daemon=True) self.stderr_thread.start() # Set process start time for watchdog self.process_start_time = time.time() self.last_frame_time = None # Reset frame time # After successful restart, next timeout will be back to 30s if self.is_restart: log_info(self.camera_id, f"FFmpeg restarted successfully, next timeout: {self.first_start_timeout}s") self.is_restart = False return True except Exception as e: log_error(self.camera_id, f"FFmpeg startup failed: {e}") return False def _read_bmp_frame(self, pipe): """Read BMP frame from pipe - BMP header contains dimensions.""" try: # Read BMP header (14 bytes file header + 40 bytes info header = 54 bytes minimum) header_data = b'' bytes_to_read = 54 while len(header_data) < bytes_to_read: chunk = pipe.read(bytes_to_read - len(header_data)) if not chunk: return None # Silent end of stream header_data += chunk # Parse BMP header if header_data[:2] != b'BM': return None # Invalid format, skip frame silently # Extract file size from header (bytes 2-5) import struct file_size = struct.unpack(' bool: """Check if watchdog timeout has been exceeded.""" if not self.process_start_time: return False current_time = time.time() time_since_start = current_time - self.process_start_time # Determine timeout based on whether this is a restart timeout = self.restart_timeout if self.is_restart else self.first_start_timeout # If no frames received yet, check against process start time if not self.last_frame_time: if time_since_start > timeout: log_warning(self.camera_id, f"Watchdog timeout: No frames for {time_since_start:.1f}s (limit: {timeout}s)") return True else: # Check time since last frame time_since_frame = current_time - self.last_frame_time if time_since_frame > timeout: log_warning(self.camera_id, f"Watchdog timeout: No frames for {time_since_frame:.1f}s (limit: {timeout}s)") return True return False def _restart_ffmpeg_process(self): """Restart FFmpeg process due to watchdog timeout.""" log_warning(self.camera_id, "Watchdog triggered FFmpeg restart") # Terminate current process if self.process: try: self.process.terminate() self.process.wait(timeout=3) except subprocess.TimeoutExpired: self.process.kill() except Exception: pass self.process = None # Mark as restart for shorter timeout self.is_restart = True # Small delay before restart time.sleep(1.0) def _read_frames(self): """Read frames directly from FFmpeg stdout pipe.""" frame_count = 0 last_log_time = time.time() while not self.stop_event.is_set(): try: # Check watchdog timeout if process is running if self.process and self.process.poll() is None: if self._check_watchdog_timeout(): self._restart_ffmpeg_process() continue # Start FFmpeg if not running if not self.process or self.process.poll() is not None: if self.process and self.process.poll() is not None: log_warning(self.camera_id, "Stream disconnected, reconnecting...") if not self._start_ffmpeg_process(): time.sleep(5.0) continue # Read frames directly from FFmpeg stdout try: if self.process and self.process.stdout: # Read BMP frame data frame = self._read_bmp_frame(self.process.stdout) if frame is None: continue # Update watchdog - we got a frame self.last_frame_time = time.time() # Call frame callback if self.frame_callback: self.frame_callback(self.camera_id, frame) frame_count += 1 # Log progress every 60 seconds (quieter) current_time = time.time() if current_time - last_log_time >= 60: log_success(self.camera_id, f"{frame_count} frames captured ({frame.shape[1]}x{frame.shape[0]})") last_log_time = current_time except Exception: # Process might have died, let it restart on next iteration if self.process: self.process.terminate() self.process = None time.sleep(1.0) except Exception: time.sleep(1.0) # Cleanup if self.process: self.process.terminate() logger = logging.getLogger(__name__) class HTTPSnapshotReader: """HTTP snapshot reader optimized for 2560x1440 (2K) high quality images.""" def __init__(self, camera_id: str, snapshot_url: str, interval_ms: int = 5000, max_retries: int = 3): self.camera_id = camera_id self.snapshot_url = snapshot_url self.interval_ms = interval_ms self.max_retries = max_retries self.stop_event = threading.Event() self.thread = None self.frame_callback: Optional[Callable] = None # Expected snapshot specifications self.expected_width = 2560 self.expected_height = 1440 self.max_file_size = 10 * 1024 * 1024 # 10MB max for 2K image def set_frame_callback(self, callback: Callable[[str, np.ndarray], None]): """Set callback function to handle captured frames.""" self.frame_callback = callback def start(self): """Start the snapshot reader thread.""" if self.thread and self.thread.is_alive(): logger.warning(f"Snapshot reader for {self.camera_id} already running") return self.stop_event.clear() self.thread = threading.Thread(target=self._read_snapshots, daemon=True) self.thread.start() logger.info(f"Started snapshot reader for camera {self.camera_id}") def stop(self): """Stop the snapshot reader thread.""" self.stop_event.set() if self.thread: self.thread.join(timeout=5.0) logger.info(f"Stopped snapshot reader for camera {self.camera_id}") def _read_snapshots(self): """Main snapshot reading loop for high quality 2K images.""" retries = 0 frame_count = 0 last_log_time = time.time() interval_seconds = self.interval_ms / 1000.0 logger.info(f"Snapshot interval for camera {self.camera_id}: {interval_seconds}s") while not self.stop_event.is_set(): try: start_time = time.time() frame = self._fetch_snapshot() if frame is None: retries += 1 logger.warning(f"Failed to fetch snapshot for camera {self.camera_id}, retry {retries}/{self.max_retries}") if self.max_retries != -1 and retries > self.max_retries: logger.error(f"Max retries reached for snapshot camera {self.camera_id}") break time.sleep(min(2.0, interval_seconds)) continue # Accept any valid image dimensions - don't force specific resolution if frame.shape[1] <= 0 or frame.shape[0] <= 0: logger.warning(f"Camera {self.camera_id}: Invalid frame dimensions {frame.shape[1]}x{frame.shape[0]}") continue # Reset retry counter on successful fetch retries = 0 frame_count += 1 # Call frame callback if self.frame_callback: try: self.frame_callback(self.camera_id, frame) except Exception as e: logger.error(f"Camera {self.camera_id}: Frame callback error: {e}") # Log progress every 30 seconds current_time = time.time() if current_time - last_log_time >= 30: logger.info(f"Camera {self.camera_id}: {frame_count} snapshots processed") last_log_time = current_time # Wait for next interval elapsed = time.time() - start_time sleep_time = max(0, interval_seconds - elapsed) if sleep_time > 0: self.stop_event.wait(sleep_time) except Exception as e: logger.error(f"Error in snapshot loop for camera {self.camera_id}: {e}") retries += 1 if self.max_retries != -1 and retries > self.max_retries: break time.sleep(min(2.0, interval_seconds)) logger.info(f"Snapshot reader thread ended for camera {self.camera_id}") def _fetch_snapshot(self) -> Optional[np.ndarray]: """Fetch a single high quality snapshot from HTTP URL.""" try: # Parse URL for authentication from urllib.parse import urlparse parsed_url = urlparse(self.snapshot_url) headers = { 'User-Agent': 'Python-Detector-Worker/1.0', 'Accept': 'image/jpeg, image/png, image/*' } auth = None if parsed_url.username and parsed_url.password: from requests.auth import HTTPBasicAuth, HTTPDigestAuth auth = HTTPBasicAuth(parsed_url.username, parsed_url.password) # Reconstruct URL without credentials clean_url = f"{parsed_url.scheme}://{parsed_url.hostname}" if parsed_url.port: clean_url += f":{parsed_url.port}" clean_url += parsed_url.path if parsed_url.query: clean_url += f"?{parsed_url.query}" # Try Basic Auth first response = requests.get(clean_url, auth=auth, timeout=15, headers=headers, stream=True, verify=False) # If Basic Auth fails, try Digest Auth if response.status_code == 401: auth = HTTPDigestAuth(parsed_url.username, parsed_url.password) response = requests.get(clean_url, auth=auth, timeout=15, headers=headers, stream=True, verify=False) else: response = requests.get(self.snapshot_url, timeout=15, headers=headers, stream=True, verify=False) if response.status_code == 200: # Check content size content_length = int(response.headers.get('content-length', 0)) if content_length > self.max_file_size: logger.warning(f"Snapshot too large for camera {self.camera_id}: {content_length} bytes") return None # Read content content = response.content # Convert to numpy array image_array = np.frombuffer(content, np.uint8) # Decode as high quality image frame = cv2.imdecode(image_array, cv2.IMREAD_COLOR) if frame is None: logger.error(f"Failed to decode snapshot for camera {self.camera_id}") return None logger.debug(f"Fetched snapshot for camera {self.camera_id}: {frame.shape[1]}x{frame.shape[0]}") return frame else: logger.warning(f"HTTP {response.status_code} from {self.camera_id}") return None except requests.RequestException as e: logger.error(f"Request error fetching snapshot for {self.camera_id}: {e}") return None except Exception as e: logger.error(f"Error decoding snapshot for {self.camera_id}: {e}") return None def fetch_single_snapshot(self) -> Optional[np.ndarray]: """ Fetch a single high-quality snapshot on demand for pipeline processing. This method is for one-time fetch from HTTP URL, not continuous streaming. Returns: High quality 2K snapshot frame or None if failed """ logger.info(f"[SNAPSHOT] Fetching snapshot for {self.camera_id} from {self.snapshot_url}") # Try to fetch snapshot with retries for attempt in range(self.max_retries): frame = self._fetch_snapshot() if frame is not None: logger.info(f"[SNAPSHOT] Successfully fetched {frame.shape[1]}x{frame.shape[0]} snapshot for {self.camera_id}") return frame if attempt < self.max_retries - 1: logger.warning(f"[SNAPSHOT] Attempt {attempt + 1}/{self.max_retries} failed for {self.camera_id}, retrying...") time.sleep(0.5) logger.error(f"[SNAPSHOT] Failed to fetch snapshot for {self.camera_id} after {self.max_retries} attempts") return None def _resize_maintain_aspect(self, frame: np.ndarray, target_width: int, target_height: int) -> np.ndarray: """Resize image while maintaining aspect ratio for high quality.""" h, w = frame.shape[:2] aspect = w / h target_aspect = target_width / target_height if aspect > target_aspect: # Image is wider new_width = target_width new_height = int(target_width / aspect) else: # Image is taller new_height = target_height new_width = int(target_height * aspect) # Use INTER_LANCZOS4 for high quality downsampling resized = cv2.resize(frame, (new_width, new_height), interpolation=cv2.INTER_LANCZOS4) # Pad to target size if needed if new_width < target_width or new_height < target_height: top = (target_height - new_height) // 2 bottom = target_height - new_height - top left = (target_width - new_width) // 2 right = target_width - new_width - left resized = cv2.copyMakeBorder(resized, top, bottom, left, right, cv2.BORDER_CONSTANT, value=[0, 0, 0]) return resized