python-detector-worker/core/streaming/readers.py
Siwat Sirichai 95c39a008f
Some checks failed
Build Worker Base and Application Images / check-base-changes (push) Successful in 7s
Build Worker Base and Application Images / build-base (push) Has been skipped
Build Worker Base and Application Images / build-docker (push) Successful in 2m53s
Build Worker Base and Application Images / deploy-stack (push) Failing after 12s
refactor: suppress noisy watchdog debug logs for cleaner output
2025-09-26 02:35:27 +07:00

789 lines
No EOL
33 KiB
Python

"""
Frame readers for RTSP streams and HTTP snapshots.
Optimized for 1280x720@6fps RTSP and 2560x1440 HTTP snapshots.
"""
import cv2
import logging
import time
import threading
import requests
import numpy as np
import os
import subprocess
from typing import Optional, Callable
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
# Suppress FFMPEG/H.264 error messages if needed
# Set this environment variable to reduce noise from decoder errors
os.environ["OPENCV_LOG_LEVEL"] = "ERROR"
os.environ["OPENCV_FFMPEG_LOGLEVEL"] = "-8" # Suppress FFMPEG warnings
logger = logging.getLogger(__name__)
# Suppress noisy watchdog debug logs
logging.getLogger('watchdog.observers.inotify_buffer').setLevel(logging.CRITICAL)
class FrameFileHandler(FileSystemEventHandler):
"""File system event handler for frame file changes."""
def __init__(self, callback):
self.callback = callback
self.last_modified = 0
def on_modified(self, event):
if event.is_directory:
return
# Debounce rapid file changes
current_time = time.time()
if current_time - self.last_modified > 0.01: # 10ms debounce
self.last_modified = current_time
self.callback()
class FFmpegRTSPReader:
"""RTSP stream reader using subprocess FFmpeg with CUDA hardware acceleration and file watching."""
def __init__(self, camera_id: str, rtsp_url: str, max_retries: int = 3):
self.camera_id = camera_id
self.rtsp_url = rtsp_url
self.max_retries = max_retries
self.process = None
self.stop_event = threading.Event()
self.thread = None
self.frame_callback: Optional[Callable] = None
self.observer = None
self.frame_ready_event = threading.Event()
# Stream specs
self.width = 1280
self.height = 720
def set_frame_callback(self, callback: Callable[[str, np.ndarray], None]):
"""Set callback function to handle captured frames."""
self.frame_callback = callback
def start(self):
"""Start the FFmpeg subprocess reader."""
if self.thread and self.thread.is_alive():
logger.warning(f"FFmpeg reader for {self.camera_id} already running")
return
self.stop_event.clear()
self.thread = threading.Thread(target=self._read_frames, daemon=True)
self.thread.start()
logger.info(f"Started FFmpeg reader for camera {self.camera_id}")
def stop(self):
"""Stop the FFmpeg subprocess reader."""
self.stop_event.set()
if self.process:
self.process.terminate()
try:
self.process.wait(timeout=5)
except subprocess.TimeoutExpired:
self.process.kill()
if self.thread:
self.thread.join(timeout=5.0)
logger.info(f"Stopped FFmpeg reader for camera {self.camera_id}")
def _start_ffmpeg_process(self):
"""Start FFmpeg subprocess with CUDA hardware acceleration writing to temp file."""
# Create temp file path for this camera
self.temp_file = f"/tmp/claude/camera_{self.camera_id.replace(' ', '_')}.raw"
os.makedirs("/tmp/claude", exist_ok=True)
cmd = [
'ffmpeg',
'-hwaccel', 'cuda',
'-hwaccel_device', '0',
'-rtsp_transport', 'tcp',
'-i', self.rtsp_url,
'-f', 'rawvideo',
'-pix_fmt', 'bgr24',
'-an', # No audio
'-update', '1', # Update single frame in place
'-y', # Overwrite output file
self.temp_file
]
try:
# Start FFmpeg detached - we don't need to communicate with it
self.process = subprocess.Popen(
cmd,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL
)
logger.info(f"Started FFmpeg process PID {self.process.pid} for camera {self.camera_id} -> {self.temp_file}")
return True
except Exception as e:
logger.error(f"Failed to start FFmpeg for camera {self.camera_id}: {e}")
return False
def _setup_file_watcher(self):
"""Setup file system watcher for temp file."""
if not os.path.exists(self.temp_file):
return
# Setup file watcher
handler = FrameFileHandler(self._on_file_changed)
self.observer = Observer()
self.observer.schedule(handler, os.path.dirname(self.temp_file), recursive=False)
self.observer.start()
logger.info(f"Started file watcher for {self.temp_file}")
def _on_file_changed(self):
"""Called when temp file is modified."""
if os.path.basename(self.temp_file) in str(self.temp_file):
self.frame_ready_event.set()
def _read_frames(self):
"""Reactively read frames when file changes."""
frame_count = 0
last_log_time = time.time()
bytes_per_frame = self.width * self.height * 3
restart_check_interval = 10 # Check FFmpeg status every 10 seconds
while not self.stop_event.is_set():
try:
# Start FFmpeg if not running
if not self.process or self.process.poll() is not None:
if self.process and self.process.poll() is not None:
logger.warning(f"FFmpeg process died for camera {self.camera_id}, restarting...")
if not self._start_ffmpeg_process():
time.sleep(5.0)
continue
# Wait for temp file to be created
wait_count = 0
while not os.path.exists(self.temp_file) and wait_count < 30:
time.sleep(1.0)
wait_count += 1
if not os.path.exists(self.temp_file):
logger.error(f"Temp file not created after 30s for {self.camera_id}")
continue
# Setup file watcher
self._setup_file_watcher()
# Wait for file change event (or timeout for health check)
if self.frame_ready_event.wait(timeout=restart_check_interval):
self.frame_ready_event.clear()
# Read current frame with concurrency safety
try:
# Try to read frame multiple times to handle race conditions
frame_data = None
for attempt in range(3):
try:
with open(self.temp_file, 'rb') as f:
frame_data = f.read(bytes_per_frame)
# Validate we got a complete frame
if len(frame_data) == bytes_per_frame:
break
else:
logger.debug(f"Camera {self.camera_id}: Partial read {len(frame_data)}/{bytes_per_frame}, attempt {attempt+1}")
time.sleep(0.01) # Brief wait before retry
except (IOError, OSError) as e:
logger.debug(f"Camera {self.camera_id}: Read error on attempt {attempt+1}: {e}")
time.sleep(0.01)
if frame_data and len(frame_data) == bytes_per_frame:
# Convert to numpy array
frame = np.frombuffer(frame_data, dtype=np.uint8)
frame = frame.reshape((self.height, self.width, 3))
# Call frame callback directly - trust the retry logic caught corruption
if self.frame_callback:
self.frame_callback(self.camera_id, frame)
frame_count += 1
# Log progress
current_time = time.time()
if current_time - last_log_time >= 30:
logger.info(f"Camera {self.camera_id}: {frame_count} frames processed reactively")
last_log_time = current_time
else:
logger.debug(f"Camera {self.camera_id}: Failed to read complete frame after retries")
except (IOError, OSError) as e:
logger.debug(f"Camera {self.camera_id}: File read error: {e}")
except Exception as e:
logger.error(f"Camera {self.camera_id}: Error in reactive frame reading: {e}")
time.sleep(1.0)
# Cleanup
if self.observer:
self.observer.stop()
self.observer.join()
if self.process:
self.process.terminate()
# Clean up temp file
try:
if hasattr(self, 'temp_file') and os.path.exists(self.temp_file):
os.remove(self.temp_file)
except:
pass
logger.info(f"Reactive FFmpeg reader ended for camera {self.camera_id}")
logger = logging.getLogger(__name__)
class RTSPReader:
"""RTSP stream frame reader optimized for 1280x720 @ 6fps streams."""
def __init__(self, camera_id: str, rtsp_url: str, max_retries: int = 3):
self.camera_id = camera_id
self.rtsp_url = rtsp_url
self.max_retries = max_retries
self.cap = None
self.stop_event = threading.Event()
self.thread = None
self.frame_callback: Optional[Callable] = None
# Expected stream specifications
self.expected_width = 1280
self.expected_height = 720
self.expected_fps = 6
# Frame processing parameters
self.error_recovery_delay = 5.0 # Increased from 2.0 for stability
self.max_consecutive_errors = 30 # Increased from 10 to handle network jitter
self.stream_timeout = 30.0
def set_frame_callback(self, callback: Callable[[str, np.ndarray], None]):
"""Set callback function to handle captured frames."""
self.frame_callback = callback
def start(self):
"""Start the RTSP reader thread."""
if self.thread and self.thread.is_alive():
logger.warning(f"RTSP reader for {self.camera_id} already running")
return
self.stop_event.clear()
self.thread = threading.Thread(target=self._read_frames, daemon=True)
self.thread.start()
logger.info(f"Started RTSP reader for camera {self.camera_id}")
def stop(self):
"""Stop the RTSP reader thread."""
self.stop_event.set()
if self.thread:
self.thread.join(timeout=5.0)
if self.cap:
self.cap.release()
logger.info(f"Stopped RTSP reader for camera {self.camera_id}")
def _read_frames(self):
"""Main frame reading loop with H.264 error recovery."""
consecutive_errors = 0
frame_count = 0
last_log_time = time.time()
last_successful_frame_time = time.time()
while not self.stop_event.is_set():
try:
# Initialize/reinitialize capture if needed
if not self.cap or not self.cap.isOpened():
if not self._initialize_capture():
time.sleep(self.error_recovery_delay)
continue
last_successful_frame_time = time.time()
# Check for stream timeout
if time.time() - last_successful_frame_time > self.stream_timeout:
logger.warning(f"Camera {self.camera_id}: Stream timeout, reinitializing")
self._reinitialize_capture()
last_successful_frame_time = time.time()
continue
# Read frame immediately without rate limiting for minimum latency
try:
ret, frame = self.cap.read()
if ret and frame is None:
# Grab succeeded but retrieve failed - decoder issue
logger.error(f"Camera {self.camera_id}: Frame grab OK but decode failed")
except Exception as read_error:
logger.error(f"Camera {self.camera_id}: cap.read() threw exception: {type(read_error).__name__}: {read_error}")
ret, frame = False, None
if not ret or frame is None:
consecutive_errors += 1
# Enhanced logging to diagnose the issue
logger.error(f"Camera {self.camera_id}: cap.read() failed - ret={ret}, frame={frame is not None}")
# Try to get more info from the capture
try:
if self.cap and self.cap.isOpened():
backend = self.cap.getBackendName()
pos_frames = self.cap.get(cv2.CAP_PROP_POS_FRAMES)
logger.error(f"Camera {self.camera_id}: Capture open, backend: {backend}, pos_frames: {pos_frames}")
else:
logger.error(f"Camera {self.camera_id}: Capture is closed or None!")
except Exception as info_error:
logger.error(f"Camera {self.camera_id}: Error getting capture info: {type(info_error).__name__}: {info_error}")
if consecutive_errors >= self.max_consecutive_errors:
logger.error(f"Camera {self.camera_id}: Too many consecutive errors ({consecutive_errors}), reinitializing")
self._reinitialize_capture()
consecutive_errors = 0
time.sleep(self.error_recovery_delay)
else:
# Skip corrupted frame and continue with exponential backoff
if consecutive_errors <= 5:
logger.debug(f"Camera {self.camera_id}: Frame read failed (error {consecutive_errors})")
elif consecutive_errors % 10 == 0: # Log every 10th error after 5
logger.warning(f"Camera {self.camera_id}: Continuing frame read failures (error {consecutive_errors})")
# Exponential backoff with cap at 1 second
sleep_time = min(0.1 * (1.5 ** min(consecutive_errors, 10)), 1.0)
time.sleep(sleep_time)
continue
# Accept any valid frame dimensions - don't force specific resolution
if frame.shape[1] <= 0 or frame.shape[0] <= 0:
consecutive_errors += 1
continue
# Check for corrupted frames (all black, all white, excessive noise)
if self._is_frame_corrupted(frame):
logger.debug(f"Camera {self.camera_id}: Corrupted frame detected, skipping")
consecutive_errors += 1
continue
# Frame is valid
consecutive_errors = 0
frame_count += 1
last_successful_frame_time = time.time()
# Call frame callback
if self.frame_callback:
try:
self.frame_callback(self.camera_id, frame)
except Exception as e:
logger.error(f"Camera {self.camera_id}: Frame callback error: {e}")
# Log progress every 30 seconds
current_time = time.time()
if current_time - last_log_time >= 30:
logger.info(f"Camera {self.camera_id}: {frame_count} frames processed")
last_log_time = current_time
except Exception as e:
logger.error(f"Camera {self.camera_id}: Error in frame reading loop: {e}")
consecutive_errors += 1
if consecutive_errors >= self.max_consecutive_errors:
self._reinitialize_capture()
consecutive_errors = 0
time.sleep(self.error_recovery_delay)
# Cleanup
if self.cap:
self.cap.release()
logger.info(f"RTSP reader thread ended for camera {self.camera_id}")
def _initialize_capture(self) -> bool:
"""Initialize video capture with FFmpeg hardware acceleration (CUVID/NVDEC) for 1280x720@6fps."""
try:
# Release previous capture if exists
if self.cap:
self.cap.release()
time.sleep(0.5)
logger.info(f"Initializing capture for camera {self.camera_id} with FFmpeg hardware acceleration")
hw_accel_success = False
# Method 1: Try OpenCV CUDA VideoReader (if built with CUVID support)
if not hw_accel_success:
try:
# Check if OpenCV was built with CUDA codec support
build_info = cv2.getBuildInformation()
if 'cudacodec' in build_info or 'CUVID' in build_info:
logger.info(f"Attempting OpenCV CUDA VideoReader for camera {self.camera_id}")
# Use OpenCV's CUDA backend
self.cap = cv2.VideoCapture(self.rtsp_url, cv2.CAP_FFMPEG, [
cv2.CAP_PROP_HW_ACCELERATION, cv2.VIDEO_ACCELERATION_ANY
])
if self.cap.isOpened():
hw_accel_success = True
logger.info(f"Camera {self.camera_id}: Using OpenCV CUDA hardware acceleration")
else:
logger.debug(f"Camera {self.camera_id}: OpenCV not built with CUDA codec support")
except Exception as e:
logger.debug(f"Camera {self.camera_id}: OpenCV CUDA not available: {e}")
# Method 2: Try FFmpeg with optimal hardware acceleration (CUVID/NVDEC)
if not hw_accel_success:
try:
from core.utils.ffmpeg_detector import get_optimal_rtsp_options
import os
# Get optimal FFmpeg options based on detected capabilities
optimal_options = get_optimal_rtsp_options(self.rtsp_url)
os.environ['OPENCV_FFMPEG_CAPTURE_OPTIONS'] = optimal_options
logger.info(f"Attempting FFmpeg with detected hardware acceleration for camera {self.camera_id}")
logger.debug(f"Camera {self.camera_id}: Using FFmpeg options: {optimal_options}")
self.cap = cv2.VideoCapture(self.rtsp_url, cv2.CAP_FFMPEG)
if self.cap.isOpened():
hw_accel_success = True
# Try to get backend info to confirm hardware acceleration
backend = self.cap.getBackendName()
logger.info(f"Camera {self.camera_id}: Using FFmpeg hardware acceleration (backend: {backend})")
except Exception as e:
logger.debug(f"Camera {self.camera_id}: FFmpeg optimal hardware acceleration not available: {e}")
# Method 3: Try FFmpeg with NVIDIA NVDEC (better for RTX 3060)
if not hw_accel_success:
try:
import os
os.environ['OPENCV_FFMPEG_CAPTURE_OPTIONS'] = 'hwaccel;cuda|hwaccel_device;0|rtsp_transport;tcp'
logger.info(f"Attempting FFmpeg with NVDEC hardware acceleration for camera {self.camera_id}")
self.cap = cv2.VideoCapture(self.rtsp_url, cv2.CAP_FFMPEG)
if self.cap.isOpened():
hw_accel_success = True
logger.info(f"Camera {self.camera_id}: Using FFmpeg NVDEC hardware acceleration")
except Exception as e:
logger.debug(f"Camera {self.camera_id}: FFmpeg NVDEC not available: {e}")
# Method 4: Try FFmpeg with VAAPI (Intel/AMD GPUs)
if not hw_accel_success:
try:
import os
os.environ['OPENCV_FFMPEG_CAPTURE_OPTIONS'] = 'hwaccel;vaapi|hwaccel_device;/dev/dri/renderD128|video_codec;h264|rtsp_transport;tcp'
logger.info(f"Attempting FFmpeg with VAAPI for camera {self.camera_id}")
self.cap = cv2.VideoCapture(self.rtsp_url, cv2.CAP_FFMPEG)
if self.cap.isOpened():
hw_accel_success = True
logger.info(f"Camera {self.camera_id}: Using FFmpeg VAAPI hardware acceleration")
except Exception as e:
logger.debug(f"Camera {self.camera_id}: FFmpeg VAAPI not available: {e}")
# Fallback: Standard FFmpeg with software decoding
if not hw_accel_success:
logger.warning(f"Camera {self.camera_id}: Hardware acceleration not available, falling back to software decoding")
import os
os.environ['OPENCV_FFMPEG_CAPTURE_OPTIONS'] = 'rtsp_transport;tcp'
self.cap = cv2.VideoCapture(self.rtsp_url, cv2.CAP_FFMPEG)
if not self.cap.isOpened():
logger.error(f"Failed to open stream for camera {self.camera_id}")
return False
# Don't force resolution/fps - let the stream determine its natural specs
# The camera will provide whatever resolution/fps it supports
# Set FFMPEG options for better H.264 handling
self.cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*'H264'))
# Verify stream properties
actual_width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH))
actual_height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
actual_fps = self.cap.get(cv2.CAP_PROP_FPS)
logger.info(f"Camera {self.camera_id} initialized: {actual_width}x{actual_height} @ {actual_fps}fps")
# Read and discard first few frames to stabilize stream
for _ in range(5):
ret, _ = self.cap.read()
if not ret:
logger.warning(f"Camera {self.camera_id}: Failed to read initial frames")
time.sleep(0.1)
return True
except Exception as e:
logger.error(f"Error initializing capture for camera {self.camera_id}: {e}")
return False
def _reinitialize_capture(self):
"""Reinitialize capture after errors with retry logic."""
logger.info(f"Reinitializing capture for camera {self.camera_id}")
if self.cap:
self.cap.release()
self.cap = None
# Longer delay before reconnection to avoid rapid reconnect loops
time.sleep(3.0)
# Retry initialization up to 3 times
for attempt in range(3):
if self._initialize_capture():
logger.info(f"Successfully reinitialized camera {self.camera_id} on attempt {attempt + 1}")
break
else:
logger.warning(f"Failed to reinitialize camera {self.camera_id} on attempt {attempt + 1}")
time.sleep(2.0)
def _is_frame_corrupted(self, frame: np.ndarray) -> bool:
"""Check if frame is corrupted (all black, all white, or excessive noise)."""
if frame is None or frame.size == 0:
return True
# Check mean and standard deviation
mean = np.mean(frame)
std = np.std(frame)
# All black or all white
if mean < 5 or mean > 250:
return True
# No variation (stuck frame)
if std < 1:
return True
# Excessive noise (corrupted H.264 decode)
# Calculate edge density as corruption indicator
edges = cv2.Canny(frame, 50, 150)
edge_density = np.sum(edges > 0) / edges.size
# Too many edges indicate corruption
if edge_density > 0.5:
return True
return False
class HTTPSnapshotReader:
"""HTTP snapshot reader optimized for 2560x1440 (2K) high quality images."""
def __init__(self, camera_id: str, snapshot_url: str, interval_ms: int = 5000, max_retries: int = 3):
self.camera_id = camera_id
self.snapshot_url = snapshot_url
self.interval_ms = interval_ms
self.max_retries = max_retries
self.stop_event = threading.Event()
self.thread = None
self.frame_callback: Optional[Callable] = None
# Expected snapshot specifications
self.expected_width = 2560
self.expected_height = 1440
self.max_file_size = 10 * 1024 * 1024 # 10MB max for 2K image
def set_frame_callback(self, callback: Callable[[str, np.ndarray], None]):
"""Set callback function to handle captured frames."""
self.frame_callback = callback
def start(self):
"""Start the snapshot reader thread."""
if self.thread and self.thread.is_alive():
logger.warning(f"Snapshot reader for {self.camera_id} already running")
return
self.stop_event.clear()
self.thread = threading.Thread(target=self._read_snapshots, daemon=True)
self.thread.start()
logger.info(f"Started snapshot reader for camera {self.camera_id}")
def stop(self):
"""Stop the snapshot reader thread."""
self.stop_event.set()
if self.thread:
self.thread.join(timeout=5.0)
logger.info(f"Stopped snapshot reader for camera {self.camera_id}")
def _read_snapshots(self):
"""Main snapshot reading loop for high quality 2K images."""
retries = 0
frame_count = 0
last_log_time = time.time()
interval_seconds = self.interval_ms / 1000.0
logger.info(f"Snapshot interval for camera {self.camera_id}: {interval_seconds}s")
while not self.stop_event.is_set():
try:
start_time = time.time()
frame = self._fetch_snapshot()
if frame is None:
retries += 1
logger.warning(f"Failed to fetch snapshot for camera {self.camera_id}, retry {retries}/{self.max_retries}")
if self.max_retries != -1 and retries > self.max_retries:
logger.error(f"Max retries reached for snapshot camera {self.camera_id}")
break
time.sleep(min(2.0, interval_seconds))
continue
# Accept any valid image dimensions - don't force specific resolution
if frame.shape[1] <= 0 or frame.shape[0] <= 0:
logger.warning(f"Camera {self.camera_id}: Invalid frame dimensions {frame.shape[1]}x{frame.shape[0]}")
continue
# Reset retry counter on successful fetch
retries = 0
frame_count += 1
# Call frame callback
if self.frame_callback:
try:
self.frame_callback(self.camera_id, frame)
except Exception as e:
logger.error(f"Camera {self.camera_id}: Frame callback error: {e}")
# Log progress every 30 seconds
current_time = time.time()
if current_time - last_log_time >= 30:
logger.info(f"Camera {self.camera_id}: {frame_count} snapshots processed")
last_log_time = current_time
# Wait for next interval
elapsed = time.time() - start_time
sleep_time = max(0, interval_seconds - elapsed)
if sleep_time > 0:
self.stop_event.wait(sleep_time)
except Exception as e:
logger.error(f"Error in snapshot loop for camera {self.camera_id}: {e}")
retries += 1
if self.max_retries != -1 and retries > self.max_retries:
break
time.sleep(min(2.0, interval_seconds))
logger.info(f"Snapshot reader thread ended for camera {self.camera_id}")
def _fetch_snapshot(self) -> Optional[np.ndarray]:
"""Fetch a single high quality snapshot from HTTP URL."""
try:
# Parse URL for authentication
from urllib.parse import urlparse
parsed_url = urlparse(self.snapshot_url)
headers = {
'User-Agent': 'Python-Detector-Worker/1.0',
'Accept': 'image/jpeg, image/png, image/*'
}
auth = None
if parsed_url.username and parsed_url.password:
from requests.auth import HTTPBasicAuth, HTTPDigestAuth
auth = HTTPBasicAuth(parsed_url.username, parsed_url.password)
# Reconstruct URL without credentials
clean_url = f"{parsed_url.scheme}://{parsed_url.hostname}"
if parsed_url.port:
clean_url += f":{parsed_url.port}"
clean_url += parsed_url.path
if parsed_url.query:
clean_url += f"?{parsed_url.query}"
# Try Basic Auth first
response = requests.get(clean_url, auth=auth, timeout=15, headers=headers,
stream=True, verify=False)
# If Basic Auth fails, try Digest Auth
if response.status_code == 401:
auth = HTTPDigestAuth(parsed_url.username, parsed_url.password)
response = requests.get(clean_url, auth=auth, timeout=15, headers=headers,
stream=True, verify=False)
else:
response = requests.get(self.snapshot_url, timeout=15, headers=headers,
stream=True, verify=False)
if response.status_code == 200:
# Check content size
content_length = int(response.headers.get('content-length', 0))
if content_length > self.max_file_size:
logger.warning(f"Snapshot too large for camera {self.camera_id}: {content_length} bytes")
return None
# Read content
content = response.content
# Convert to numpy array
image_array = np.frombuffer(content, np.uint8)
# Decode as high quality image
frame = cv2.imdecode(image_array, cv2.IMREAD_COLOR)
if frame is None:
logger.error(f"Failed to decode snapshot for camera {self.camera_id}")
return None
logger.debug(f"Fetched snapshot for camera {self.camera_id}: {frame.shape[1]}x{frame.shape[0]}")
return frame
else:
logger.warning(f"HTTP {response.status_code} from {self.camera_id}")
return None
except requests.RequestException as e:
logger.error(f"Request error fetching snapshot for {self.camera_id}: {e}")
return None
except Exception as e:
logger.error(f"Error decoding snapshot for {self.camera_id}: {e}")
return None
def fetch_single_snapshot(self) -> Optional[np.ndarray]:
"""
Fetch a single high-quality snapshot on demand for pipeline processing.
This method is for one-time fetch from HTTP URL, not continuous streaming.
Returns:
High quality 2K snapshot frame or None if failed
"""
logger.info(f"[SNAPSHOT] Fetching snapshot for {self.camera_id} from {self.snapshot_url}")
# Try to fetch snapshot with retries
for attempt in range(self.max_retries):
frame = self._fetch_snapshot()
if frame is not None:
logger.info(f"[SNAPSHOT] Successfully fetched {frame.shape[1]}x{frame.shape[0]} snapshot for {self.camera_id}")
return frame
if attempt < self.max_retries - 1:
logger.warning(f"[SNAPSHOT] Attempt {attempt + 1}/{self.max_retries} failed for {self.camera_id}, retrying...")
time.sleep(0.5)
logger.error(f"[SNAPSHOT] Failed to fetch snapshot for {self.camera_id} after {self.max_retries} attempts")
return None
def _resize_maintain_aspect(self, frame: np.ndarray, target_width: int, target_height: int) -> np.ndarray:
"""Resize image while maintaining aspect ratio for high quality."""
h, w = frame.shape[:2]
aspect = w / h
target_aspect = target_width / target_height
if aspect > target_aspect:
# Image is wider
new_width = target_width
new_height = int(target_width / aspect)
else:
# Image is taller
new_height = target_height
new_width = int(target_height * aspect)
# Use INTER_LANCZOS4 for high quality downsampling
resized = cv2.resize(frame, (new_width, new_height), interpolation=cv2.INTER_LANCZOS4)
# Pad to target size if needed
if new_width < target_width or new_height < target_height:
top = (target_height - new_height) // 2
bottom = target_height - new_height - top
left = (target_width - new_width) // 2
right = target_width - new_width - left
resized = cv2.copyMakeBorder(resized, top, bottom, left, right, cv2.BORDER_CONSTANT, value=[0, 0, 0])
return resized