""" Frame readers for RTSP streams and HTTP snapshots. Optimized for 1280x720@6fps RTSP and 2560x1440 HTTP snapshots. """ import cv2 import logging import time import threading import requests import numpy as np import os import subprocess from typing import Optional, Callable from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler # Suppress FFMPEG/H.264 error messages if needed # Set this environment variable to reduce noise from decoder errors os.environ["OPENCV_LOG_LEVEL"] = "ERROR" os.environ["OPENCV_FFMPEG_LOGLEVEL"] = "-8" # Suppress FFMPEG warnings logger = logging.getLogger(__name__) class FrameFileHandler(FileSystemEventHandler): """File system event handler for frame file changes.""" def __init__(self, callback): self.callback = callback self.last_modified = 0 def on_modified(self, event): if event.is_directory: return # Debounce rapid file changes current_time = time.time() if current_time - self.last_modified > 0.01: # 10ms debounce self.last_modified = current_time self.callback() class FFmpegRTSPReader: """RTSP stream reader using subprocess FFmpeg with CUDA hardware acceleration and file watching.""" def __init__(self, camera_id: str, rtsp_url: str, max_retries: int = 3): self.camera_id = camera_id self.rtsp_url = rtsp_url self.max_retries = max_retries self.process = None self.stop_event = threading.Event() self.thread = None self.frame_callback: Optional[Callable] = None self.observer = None self.frame_ready_event = threading.Event() # Stream specs self.width = 1280 self.height = 720 def set_frame_callback(self, callback: Callable[[str, np.ndarray], None]): """Set callback function to handle captured frames.""" self.frame_callback = callback def start(self): """Start the FFmpeg subprocess reader.""" if self.thread and self.thread.is_alive(): logger.warning(f"FFmpeg reader for {self.camera_id} already running") return self.stop_event.clear() self.thread = threading.Thread(target=self._read_frames, daemon=True) self.thread.start() logger.info(f"Started FFmpeg reader for camera {self.camera_id}") def stop(self): """Stop the FFmpeg subprocess reader.""" self.stop_event.set() if self.process: self.process.terminate() try: self.process.wait(timeout=5) except subprocess.TimeoutExpired: self.process.kill() if self.thread: self.thread.join(timeout=5.0) logger.info(f"Stopped FFmpeg reader for camera {self.camera_id}") def _start_ffmpeg_process(self): """Start FFmpeg subprocess with CUDA hardware acceleration writing to temp file.""" # Create temp file path for this camera self.temp_file = f"/tmp/claude/camera_{self.camera_id.replace(' ', '_')}.raw" os.makedirs("/tmp/claude", exist_ok=True) cmd = [ 'ffmpeg', '-hwaccel', 'cuda', '-hwaccel_device', '0', '-rtsp_transport', 'tcp', '-i', self.rtsp_url, '-f', 'rawvideo', '-pix_fmt', 'bgr24', '-an', # No audio '-update', '1', # Update single frame in place '-y', # Overwrite output file self.temp_file ] try: # Start FFmpeg detached - we don't need to communicate with it self.process = subprocess.Popen( cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL ) logger.info(f"Started FFmpeg process PID {self.process.pid} for camera {self.camera_id} -> {self.temp_file}") return True except Exception as e: logger.error(f"Failed to start FFmpeg for camera {self.camera_id}: {e}") return False def _setup_file_watcher(self): """Setup file system watcher for temp file.""" if not os.path.exists(self.temp_file): return # Setup file watcher handler = FrameFileHandler(self._on_file_changed) self.observer = Observer() self.observer.schedule(handler, os.path.dirname(self.temp_file), recursive=False) self.observer.start() logger.info(f"Started file watcher for {self.temp_file}") def _on_file_changed(self): """Called when temp file is modified.""" if os.path.basename(self.temp_file) in str(self.temp_file): self.frame_ready_event.set() def _read_frames(self): """Reactively read frames when file changes.""" frame_count = 0 last_log_time = time.time() bytes_per_frame = self.width * self.height * 3 restart_check_interval = 10 # Check FFmpeg status every 10 seconds while not self.stop_event.is_set(): try: # Start FFmpeg if not running if not self.process or self.process.poll() is not None: if self.process and self.process.poll() is not None: logger.warning(f"FFmpeg process died for camera {self.camera_id}, restarting...") if not self._start_ffmpeg_process(): time.sleep(5.0) continue # Wait for temp file to be created wait_count = 0 while not os.path.exists(self.temp_file) and wait_count < 30: time.sleep(1.0) wait_count += 1 if not os.path.exists(self.temp_file): logger.error(f"Temp file not created after 30s for {self.camera_id}") continue # Setup file watcher self._setup_file_watcher() # Wait for file change event (or timeout for health check) if self.frame_ready_event.wait(timeout=restart_check_interval): self.frame_ready_event.clear() # Read current frame with concurrency safety try: # Try to read frame multiple times to handle race conditions frame_data = None for attempt in range(3): try: with open(self.temp_file, 'rb') as f: frame_data = f.read(bytes_per_frame) # Validate we got a complete frame if len(frame_data) == bytes_per_frame: break else: logger.debug(f"Camera {self.camera_id}: Partial read {len(frame_data)}/{bytes_per_frame}, attempt {attempt+1}") time.sleep(0.01) # Brief wait before retry except (IOError, OSError) as e: logger.debug(f"Camera {self.camera_id}: Read error on attempt {attempt+1}: {e}") time.sleep(0.01) if frame_data and len(frame_data) == bytes_per_frame: # Convert to numpy array frame = np.frombuffer(frame_data, dtype=np.uint8) frame = frame.reshape((self.height, self.width, 3)) # Call frame callback directly - trust the retry logic caught corruption if self.frame_callback: self.frame_callback(self.camera_id, frame) frame_count += 1 # Log progress current_time = time.time() if current_time - last_log_time >= 30: logger.info(f"Camera {self.camera_id}: {frame_count} frames processed reactively") last_log_time = current_time else: logger.debug(f"Camera {self.camera_id}: Failed to read complete frame after retries") except (IOError, OSError) as e: logger.debug(f"Camera {self.camera_id}: File read error: {e}") except Exception as e: logger.error(f"Camera {self.camera_id}: Error in reactive frame reading: {e}") time.sleep(1.0) # Cleanup if self.observer: self.observer.stop() self.observer.join() if self.process: self.process.terminate() # Clean up temp file try: if hasattr(self, 'temp_file') and os.path.exists(self.temp_file): os.remove(self.temp_file) except: pass logger.info(f"Reactive FFmpeg reader ended for camera {self.camera_id}") logger = logging.getLogger(__name__) class RTSPReader: """RTSP stream frame reader optimized for 1280x720 @ 6fps streams.""" def __init__(self, camera_id: str, rtsp_url: str, max_retries: int = 3): self.camera_id = camera_id self.rtsp_url = rtsp_url self.max_retries = max_retries self.cap = None self.stop_event = threading.Event() self.thread = None self.frame_callback: Optional[Callable] = None # Expected stream specifications self.expected_width = 1280 self.expected_height = 720 self.expected_fps = 6 # Frame processing parameters self.error_recovery_delay = 5.0 # Increased from 2.0 for stability self.max_consecutive_errors = 30 # Increased from 10 to handle network jitter self.stream_timeout = 30.0 def set_frame_callback(self, callback: Callable[[str, np.ndarray], None]): """Set callback function to handle captured frames.""" self.frame_callback = callback def start(self): """Start the RTSP reader thread.""" if self.thread and self.thread.is_alive(): logger.warning(f"RTSP reader for {self.camera_id} already running") return self.stop_event.clear() self.thread = threading.Thread(target=self._read_frames, daemon=True) self.thread.start() logger.info(f"Started RTSP reader for camera {self.camera_id}") def stop(self): """Stop the RTSP reader thread.""" self.stop_event.set() if self.thread: self.thread.join(timeout=5.0) if self.cap: self.cap.release() logger.info(f"Stopped RTSP reader for camera {self.camera_id}") def _read_frames(self): """Main frame reading loop with H.264 error recovery.""" consecutive_errors = 0 frame_count = 0 last_log_time = time.time() last_successful_frame_time = time.time() while not self.stop_event.is_set(): try: # Initialize/reinitialize capture if needed if not self.cap or not self.cap.isOpened(): if not self._initialize_capture(): time.sleep(self.error_recovery_delay) continue last_successful_frame_time = time.time() # Check for stream timeout if time.time() - last_successful_frame_time > self.stream_timeout: logger.warning(f"Camera {self.camera_id}: Stream timeout, reinitializing") self._reinitialize_capture() last_successful_frame_time = time.time() continue # Read frame immediately without rate limiting for minimum latency try: ret, frame = self.cap.read() if ret and frame is None: # Grab succeeded but retrieve failed - decoder issue logger.error(f"Camera {self.camera_id}: Frame grab OK but decode failed") except Exception as read_error: logger.error(f"Camera {self.camera_id}: cap.read() threw exception: {type(read_error).__name__}: {read_error}") ret, frame = False, None if not ret or frame is None: consecutive_errors += 1 # Enhanced logging to diagnose the issue logger.error(f"Camera {self.camera_id}: cap.read() failed - ret={ret}, frame={frame is not None}") # Try to get more info from the capture try: if self.cap and self.cap.isOpened(): backend = self.cap.getBackendName() pos_frames = self.cap.get(cv2.CAP_PROP_POS_FRAMES) logger.error(f"Camera {self.camera_id}: Capture open, backend: {backend}, pos_frames: {pos_frames}") else: logger.error(f"Camera {self.camera_id}: Capture is closed or None!") except Exception as info_error: logger.error(f"Camera {self.camera_id}: Error getting capture info: {type(info_error).__name__}: {info_error}") if consecutive_errors >= self.max_consecutive_errors: logger.error(f"Camera {self.camera_id}: Too many consecutive errors ({consecutive_errors}), reinitializing") self._reinitialize_capture() consecutive_errors = 0 time.sleep(self.error_recovery_delay) else: # Skip corrupted frame and continue with exponential backoff if consecutive_errors <= 5: logger.debug(f"Camera {self.camera_id}: Frame read failed (error {consecutive_errors})") elif consecutive_errors % 10 == 0: # Log every 10th error after 5 logger.warning(f"Camera {self.camera_id}: Continuing frame read failures (error {consecutive_errors})") # Exponential backoff with cap at 1 second sleep_time = min(0.1 * (1.5 ** min(consecutive_errors, 10)), 1.0) time.sleep(sleep_time) continue # Accept any valid frame dimensions - don't force specific resolution if frame.shape[1] <= 0 or frame.shape[0] <= 0: consecutive_errors += 1 continue # Check for corrupted frames (all black, all white, excessive noise) if self._is_frame_corrupted(frame): logger.debug(f"Camera {self.camera_id}: Corrupted frame detected, skipping") consecutive_errors += 1 continue # Frame is valid consecutive_errors = 0 frame_count += 1 last_successful_frame_time = time.time() # Call frame callback if self.frame_callback: try: self.frame_callback(self.camera_id, frame) except Exception as e: logger.error(f"Camera {self.camera_id}: Frame callback error: {e}") # Log progress every 30 seconds current_time = time.time() if current_time - last_log_time >= 30: logger.info(f"Camera {self.camera_id}: {frame_count} frames processed") last_log_time = current_time except Exception as e: logger.error(f"Camera {self.camera_id}: Error in frame reading loop: {e}") consecutive_errors += 1 if consecutive_errors >= self.max_consecutive_errors: self._reinitialize_capture() consecutive_errors = 0 time.sleep(self.error_recovery_delay) # Cleanup if self.cap: self.cap.release() logger.info(f"RTSP reader thread ended for camera {self.camera_id}") def _initialize_capture(self) -> bool: """Initialize video capture with FFmpeg hardware acceleration (CUVID/NVDEC) for 1280x720@6fps.""" try: # Release previous capture if exists if self.cap: self.cap.release() time.sleep(0.5) logger.info(f"Initializing capture for camera {self.camera_id} with FFmpeg hardware acceleration") hw_accel_success = False # Method 1: Try OpenCV CUDA VideoReader (if built with CUVID support) if not hw_accel_success: try: # Check if OpenCV was built with CUDA codec support build_info = cv2.getBuildInformation() if 'cudacodec' in build_info or 'CUVID' in build_info: logger.info(f"Attempting OpenCV CUDA VideoReader for camera {self.camera_id}") # Use OpenCV's CUDA backend self.cap = cv2.VideoCapture(self.rtsp_url, cv2.CAP_FFMPEG, [ cv2.CAP_PROP_HW_ACCELERATION, cv2.VIDEO_ACCELERATION_ANY ]) if self.cap.isOpened(): hw_accel_success = True logger.info(f"Camera {self.camera_id}: Using OpenCV CUDA hardware acceleration") else: logger.debug(f"Camera {self.camera_id}: OpenCV not built with CUDA codec support") except Exception as e: logger.debug(f"Camera {self.camera_id}: OpenCV CUDA not available: {e}") # Method 2: Try FFmpeg with optimal hardware acceleration (CUVID/NVDEC) if not hw_accel_success: try: from core.utils.ffmpeg_detector import get_optimal_rtsp_options import os # Get optimal FFmpeg options based on detected capabilities optimal_options = get_optimal_rtsp_options(self.rtsp_url) os.environ['OPENCV_FFMPEG_CAPTURE_OPTIONS'] = optimal_options logger.info(f"Attempting FFmpeg with detected hardware acceleration for camera {self.camera_id}") logger.debug(f"Camera {self.camera_id}: Using FFmpeg options: {optimal_options}") self.cap = cv2.VideoCapture(self.rtsp_url, cv2.CAP_FFMPEG) if self.cap.isOpened(): hw_accel_success = True # Try to get backend info to confirm hardware acceleration backend = self.cap.getBackendName() logger.info(f"Camera {self.camera_id}: Using FFmpeg hardware acceleration (backend: {backend})") except Exception as e: logger.debug(f"Camera {self.camera_id}: FFmpeg optimal hardware acceleration not available: {e}") # Method 3: Try FFmpeg with NVIDIA NVDEC (better for RTX 3060) if not hw_accel_success: try: import os os.environ['OPENCV_FFMPEG_CAPTURE_OPTIONS'] = 'hwaccel;cuda|hwaccel_device;0|rtsp_transport;tcp' logger.info(f"Attempting FFmpeg with NVDEC hardware acceleration for camera {self.camera_id}") self.cap = cv2.VideoCapture(self.rtsp_url, cv2.CAP_FFMPEG) if self.cap.isOpened(): hw_accel_success = True logger.info(f"Camera {self.camera_id}: Using FFmpeg NVDEC hardware acceleration") except Exception as e: logger.debug(f"Camera {self.camera_id}: FFmpeg NVDEC not available: {e}") # Method 4: Try FFmpeg with VAAPI (Intel/AMD GPUs) if not hw_accel_success: try: import os os.environ['OPENCV_FFMPEG_CAPTURE_OPTIONS'] = 'hwaccel;vaapi|hwaccel_device;/dev/dri/renderD128|video_codec;h264|rtsp_transport;tcp' logger.info(f"Attempting FFmpeg with VAAPI for camera {self.camera_id}") self.cap = cv2.VideoCapture(self.rtsp_url, cv2.CAP_FFMPEG) if self.cap.isOpened(): hw_accel_success = True logger.info(f"Camera {self.camera_id}: Using FFmpeg VAAPI hardware acceleration") except Exception as e: logger.debug(f"Camera {self.camera_id}: FFmpeg VAAPI not available: {e}") # Fallback: Standard FFmpeg with software decoding if not hw_accel_success: logger.warning(f"Camera {self.camera_id}: Hardware acceleration not available, falling back to software decoding") import os os.environ['OPENCV_FFMPEG_CAPTURE_OPTIONS'] = 'rtsp_transport;tcp' self.cap = cv2.VideoCapture(self.rtsp_url, cv2.CAP_FFMPEG) if not self.cap.isOpened(): logger.error(f"Failed to open stream for camera {self.camera_id}") return False # Don't force resolution/fps - let the stream determine its natural specs # The camera will provide whatever resolution/fps it supports # Set FFMPEG options for better H.264 handling self.cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*'H264')) # Verify stream properties actual_width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH)) actual_height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) actual_fps = self.cap.get(cv2.CAP_PROP_FPS) logger.info(f"Camera {self.camera_id} initialized: {actual_width}x{actual_height} @ {actual_fps}fps") # Read and discard first few frames to stabilize stream for _ in range(5): ret, _ = self.cap.read() if not ret: logger.warning(f"Camera {self.camera_id}: Failed to read initial frames") time.sleep(0.1) return True except Exception as e: logger.error(f"Error initializing capture for camera {self.camera_id}: {e}") return False def _reinitialize_capture(self): """Reinitialize capture after errors with retry logic.""" logger.info(f"Reinitializing capture for camera {self.camera_id}") if self.cap: self.cap.release() self.cap = None # Longer delay before reconnection to avoid rapid reconnect loops time.sleep(3.0) # Retry initialization up to 3 times for attempt in range(3): if self._initialize_capture(): logger.info(f"Successfully reinitialized camera {self.camera_id} on attempt {attempt + 1}") break else: logger.warning(f"Failed to reinitialize camera {self.camera_id} on attempt {attempt + 1}") time.sleep(2.0) def _is_frame_corrupted(self, frame: np.ndarray) -> bool: """Check if frame is corrupted (all black, all white, or excessive noise).""" if frame is None or frame.size == 0: return True # Check mean and standard deviation mean = np.mean(frame) std = np.std(frame) # All black or all white if mean < 5 or mean > 250: return True # No variation (stuck frame) if std < 1: return True # Excessive noise (corrupted H.264 decode) # Calculate edge density as corruption indicator edges = cv2.Canny(frame, 50, 150) edge_density = np.sum(edges > 0) / edges.size # Too many edges indicate corruption if edge_density > 0.5: return True return False class HTTPSnapshotReader: """HTTP snapshot reader optimized for 2560x1440 (2K) high quality images.""" def __init__(self, camera_id: str, snapshot_url: str, interval_ms: int = 5000, max_retries: int = 3): self.camera_id = camera_id self.snapshot_url = snapshot_url self.interval_ms = interval_ms self.max_retries = max_retries self.stop_event = threading.Event() self.thread = None self.frame_callback: Optional[Callable] = None # Expected snapshot specifications self.expected_width = 2560 self.expected_height = 1440 self.max_file_size = 10 * 1024 * 1024 # 10MB max for 2K image def set_frame_callback(self, callback: Callable[[str, np.ndarray], None]): """Set callback function to handle captured frames.""" self.frame_callback = callback def start(self): """Start the snapshot reader thread.""" if self.thread and self.thread.is_alive(): logger.warning(f"Snapshot reader for {self.camera_id} already running") return self.stop_event.clear() self.thread = threading.Thread(target=self._read_snapshots, daemon=True) self.thread.start() logger.info(f"Started snapshot reader for camera {self.camera_id}") def stop(self): """Stop the snapshot reader thread.""" self.stop_event.set() if self.thread: self.thread.join(timeout=5.0) logger.info(f"Stopped snapshot reader for camera {self.camera_id}") def _read_snapshots(self): """Main snapshot reading loop for high quality 2K images.""" retries = 0 frame_count = 0 last_log_time = time.time() interval_seconds = self.interval_ms / 1000.0 logger.info(f"Snapshot interval for camera {self.camera_id}: {interval_seconds}s") while not self.stop_event.is_set(): try: start_time = time.time() frame = self._fetch_snapshot() if frame is None: retries += 1 logger.warning(f"Failed to fetch snapshot for camera {self.camera_id}, retry {retries}/{self.max_retries}") if self.max_retries != -1 and retries > self.max_retries: logger.error(f"Max retries reached for snapshot camera {self.camera_id}") break time.sleep(min(2.0, interval_seconds)) continue # Accept any valid image dimensions - don't force specific resolution if frame.shape[1] <= 0 or frame.shape[0] <= 0: logger.warning(f"Camera {self.camera_id}: Invalid frame dimensions {frame.shape[1]}x{frame.shape[0]}") continue # Reset retry counter on successful fetch retries = 0 frame_count += 1 # Call frame callback if self.frame_callback: try: self.frame_callback(self.camera_id, frame) except Exception as e: logger.error(f"Camera {self.camera_id}: Frame callback error: {e}") # Log progress every 30 seconds current_time = time.time() if current_time - last_log_time >= 30: logger.info(f"Camera {self.camera_id}: {frame_count} snapshots processed") last_log_time = current_time # Wait for next interval elapsed = time.time() - start_time sleep_time = max(0, interval_seconds - elapsed) if sleep_time > 0: self.stop_event.wait(sleep_time) except Exception as e: logger.error(f"Error in snapshot loop for camera {self.camera_id}: {e}") retries += 1 if self.max_retries != -1 and retries > self.max_retries: break time.sleep(min(2.0, interval_seconds)) logger.info(f"Snapshot reader thread ended for camera {self.camera_id}") def _fetch_snapshot(self) -> Optional[np.ndarray]: """Fetch a single high quality snapshot from HTTP URL.""" try: # Parse URL for authentication from urllib.parse import urlparse parsed_url = urlparse(self.snapshot_url) headers = { 'User-Agent': 'Python-Detector-Worker/1.0', 'Accept': 'image/jpeg, image/png, image/*' } auth = None if parsed_url.username and parsed_url.password: from requests.auth import HTTPBasicAuth, HTTPDigestAuth auth = HTTPBasicAuth(parsed_url.username, parsed_url.password) # Reconstruct URL without credentials clean_url = f"{parsed_url.scheme}://{parsed_url.hostname}" if parsed_url.port: clean_url += f":{parsed_url.port}" clean_url += parsed_url.path if parsed_url.query: clean_url += f"?{parsed_url.query}" # Try Basic Auth first response = requests.get(clean_url, auth=auth, timeout=15, headers=headers, stream=True, verify=False) # If Basic Auth fails, try Digest Auth if response.status_code == 401: auth = HTTPDigestAuth(parsed_url.username, parsed_url.password) response = requests.get(clean_url, auth=auth, timeout=15, headers=headers, stream=True, verify=False) else: response = requests.get(self.snapshot_url, timeout=15, headers=headers, stream=True, verify=False) if response.status_code == 200: # Check content size content_length = int(response.headers.get('content-length', 0)) if content_length > self.max_file_size: logger.warning(f"Snapshot too large for camera {self.camera_id}: {content_length} bytes") return None # Read content content = response.content # Convert to numpy array image_array = np.frombuffer(content, np.uint8) # Decode as high quality image frame = cv2.imdecode(image_array, cv2.IMREAD_COLOR) if frame is None: logger.error(f"Failed to decode snapshot for camera {self.camera_id}") return None logger.debug(f"Fetched snapshot for camera {self.camera_id}: {frame.shape[1]}x{frame.shape[0]}") return frame else: logger.warning(f"HTTP {response.status_code} from {self.camera_id}") return None except requests.RequestException as e: logger.error(f"Request error fetching snapshot for {self.camera_id}: {e}") return None except Exception as e: logger.error(f"Error decoding snapshot for {self.camera_id}: {e}") return None def fetch_single_snapshot(self) -> Optional[np.ndarray]: """ Fetch a single high-quality snapshot on demand for pipeline processing. This method is for one-time fetch from HTTP URL, not continuous streaming. Returns: High quality 2K snapshot frame or None if failed """ logger.info(f"[SNAPSHOT] Fetching snapshot for {self.camera_id} from {self.snapshot_url}") # Try to fetch snapshot with retries for attempt in range(self.max_retries): frame = self._fetch_snapshot() if frame is not None: logger.info(f"[SNAPSHOT] Successfully fetched {frame.shape[1]}x{frame.shape[0]} snapshot for {self.camera_id}") return frame if attempt < self.max_retries - 1: logger.warning(f"[SNAPSHOT] Attempt {attempt + 1}/{self.max_retries} failed for {self.camera_id}, retrying...") time.sleep(0.5) logger.error(f"[SNAPSHOT] Failed to fetch snapshot for {self.camera_id} after {self.max_retries} attempts") return None def _resize_maintain_aspect(self, frame: np.ndarray, target_width: int, target_height: int) -> np.ndarray: """Resize image while maintaining aspect ratio for high quality.""" h, w = frame.shape[:2] aspect = w / h target_aspect = target_width / target_height if aspect > target_aspect: # Image is wider new_width = target_width new_height = int(target_width / aspect) else: # Image is taller new_height = target_height new_width = int(target_height * aspect) # Use INTER_LANCZOS4 for high quality downsampling resized = cv2.resize(frame, (new_width, new_height), interpolation=cv2.INTER_LANCZOS4) # Pad to target size if needed if new_width < target_width or new_height < target_height: top = (target_height - new_height) // 2 bottom = target_height - new_height - top left = (target_width - new_width) // 2 right = target_width - new_width - left resized = cv2.copyMakeBorder(resized, top, bottom, left, right, cv2.BORDER_CONSTANT, value=[0, 0, 0]) return resized