440 lines
No EOL
18 KiB
Python
440 lines
No EOL
18 KiB
Python
"""
|
|
Frame readers for RTSP streams and HTTP snapshots.
|
|
Extracted from app.py for modular architecture.
|
|
"""
|
|
import cv2
|
|
import logging
|
|
import time
|
|
import threading
|
|
import requests
|
|
import numpy as np
|
|
from typing import Optional, Callable
|
|
from queue import Queue
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class RTSPReader:
|
|
"""RTSP stream frame reader using OpenCV."""
|
|
|
|
def __init__(self, camera_id: str, rtsp_url: str, max_retries: int = 3):
|
|
self.camera_id = camera_id
|
|
self.rtsp_url = rtsp_url
|
|
self.max_retries = max_retries
|
|
self.cap = None
|
|
self.stop_event = threading.Event()
|
|
self.thread = None
|
|
self.frame_callback: Optional[Callable] = None
|
|
|
|
def set_frame_callback(self, callback: Callable[[str, np.ndarray], None]):
|
|
"""Set callback function to handle captured frames."""
|
|
self.frame_callback = callback
|
|
|
|
def start(self):
|
|
"""Start the RTSP reader thread."""
|
|
if self.thread and self.thread.is_alive():
|
|
logger.warning(f"RTSP reader for {self.camera_id} already running")
|
|
return
|
|
|
|
self.stop_event.clear()
|
|
self.thread = threading.Thread(target=self._read_frames, daemon=True)
|
|
self.thread.start()
|
|
logger.info(f"Started RTSP reader for camera {self.camera_id}")
|
|
|
|
def stop(self):
|
|
"""Stop the RTSP reader thread."""
|
|
self.stop_event.set()
|
|
if self.thread:
|
|
self.thread.join(timeout=5.0)
|
|
if self.cap:
|
|
self.cap.release()
|
|
logger.info(f"Stopped RTSP reader for camera {self.camera_id}")
|
|
|
|
def _read_frames(self):
|
|
"""Main frame reading loop with improved error handling and stream recovery."""
|
|
retries = 0
|
|
frame_count = 0
|
|
last_log_time = time.time()
|
|
consecutive_errors = 0
|
|
last_successful_frame_time = time.time()
|
|
|
|
try:
|
|
# Initialize video capture with optimized parameters
|
|
self._initialize_capture()
|
|
|
|
while not self.stop_event.is_set():
|
|
try:
|
|
# Check if stream needs recovery
|
|
if not self.cap or not self.cap.isOpened():
|
|
logger.warning(f"Camera {self.camera_id} not open, reinitializing")
|
|
self._initialize_capture()
|
|
time.sleep(1)
|
|
continue
|
|
|
|
# Check for stream timeout (no frames for 30 seconds)
|
|
if time.time() - last_successful_frame_time > 30:
|
|
logger.warning(f"Camera {self.camera_id} stream timeout, reinitializing")
|
|
self._initialize_capture()
|
|
last_successful_frame_time = time.time()
|
|
continue
|
|
|
|
ret, frame = self.cap.read()
|
|
|
|
if not ret or frame is None:
|
|
consecutive_errors += 1
|
|
logger.warning(f"Failed to read frame from camera {self.camera_id} (consecutive errors: {consecutive_errors})")
|
|
|
|
# Force stream recovery after multiple consecutive errors
|
|
if consecutive_errors >= 5:
|
|
logger.warning(f"Camera {self.camera_id}: Too many consecutive errors, reinitializing stream")
|
|
self._initialize_capture()
|
|
consecutive_errors = 0
|
|
continue
|
|
|
|
retries += 1
|
|
if retries > self.max_retries and self.max_retries != -1:
|
|
logger.error(f"Max retries reached for camera {self.camera_id}")
|
|
break
|
|
time.sleep(0.1)
|
|
continue
|
|
|
|
# Skip frame validation for now - let YOLO handle corrupted frames
|
|
# if not self._is_frame_valid(frame):
|
|
# logger.debug(f"Invalid frame detected for camera {self.camera_id}, skipping")
|
|
# consecutive_errors += 1
|
|
# if consecutive_errors >= 10: # Reinitialize after many invalid frames
|
|
# logger.warning(f"Camera {self.camera_id}: Too many invalid frames, reinitializing")
|
|
# self._initialize_capture()
|
|
# consecutive_errors = 0
|
|
# continue
|
|
|
|
# Reset counters on successful read
|
|
retries = 0
|
|
consecutive_errors = 0
|
|
frame_count += 1
|
|
last_successful_frame_time = time.time()
|
|
|
|
# Call frame callback if set
|
|
if self.frame_callback:
|
|
self.frame_callback(self.camera_id, frame)
|
|
|
|
# Log progress every 30 seconds
|
|
current_time = time.time()
|
|
if current_time - last_log_time >= 30:
|
|
logger.info(f"Camera {self.camera_id}: {frame_count} frames processed, {consecutive_errors} consecutive errors")
|
|
last_log_time = current_time
|
|
|
|
# Adaptive delay based on stream FPS and performance
|
|
if consecutive_errors == 0:
|
|
# Calculate frame delay based on actual FPS
|
|
try:
|
|
actual_fps = self.cap.get(cv2.CAP_PROP_FPS)
|
|
if actual_fps > 0 and actual_fps <= 120: # Reasonable bounds
|
|
delay = 1.0 / actual_fps
|
|
# Mock cam: 60fps -> ~16.7ms delay
|
|
# Real cam: 6fps -> ~167ms delay
|
|
else:
|
|
# Fallback for invalid FPS values
|
|
delay = 0.033 # Default 30 FPS (33ms)
|
|
except Exception as e:
|
|
logger.debug(f"Failed to get FPS for delay calculation: {e}")
|
|
delay = 0.033 # Fallback to 30 FPS
|
|
else:
|
|
delay = 0.1 # Slower when having issues (100ms)
|
|
|
|
time.sleep(delay)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error reading frame from camera {self.camera_id}: {e}")
|
|
consecutive_errors += 1
|
|
retries += 1
|
|
|
|
# Force reinitialization on severe errors
|
|
if consecutive_errors >= 3:
|
|
logger.warning(f"Camera {self.camera_id}: Severe errors detected, reinitializing stream")
|
|
self._initialize_capture()
|
|
consecutive_errors = 0
|
|
|
|
if retries > self.max_retries and self.max_retries != -1:
|
|
break
|
|
time.sleep(1)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Fatal error in RTSP reader for camera {self.camera_id}: {e}")
|
|
finally:
|
|
if self.cap:
|
|
self.cap.release()
|
|
logger.info(f"RTSP reader thread ended for camera {self.camera_id}")
|
|
|
|
def _initialize_capture(self):
|
|
"""Initialize or reinitialize video capture with optimized settings."""
|
|
try:
|
|
# Release previous capture if exists
|
|
if self.cap:
|
|
self.cap.release()
|
|
time.sleep(0.1)
|
|
|
|
# Create new capture with enhanced RTSP URL parameters
|
|
enhanced_url = self._enhance_rtsp_url(self.rtsp_url)
|
|
logger.debug(f"Initializing capture for camera {self.camera_id} with URL: {enhanced_url}")
|
|
|
|
self.cap = cv2.VideoCapture(enhanced_url)
|
|
|
|
if not self.cap.isOpened():
|
|
# Try again with different backend
|
|
logger.debug(f"Retrying capture initialization with different backend for camera {self.camera_id}")
|
|
self.cap = cv2.VideoCapture(enhanced_url, cv2.CAP_FFMPEG)
|
|
|
|
if self.cap.isOpened():
|
|
# Get actual stream properties first
|
|
width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH))
|
|
height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
|
|
fps = self.cap.get(cv2.CAP_PROP_FPS)
|
|
|
|
# Adaptive buffer settings based on FPS and resolution
|
|
# Mock cam: 1920x1080@60fps, Real cam: 1280x720@6fps
|
|
if fps > 30:
|
|
# High FPS streams (like mock cam) need larger buffer
|
|
buffer_size = 5
|
|
elif fps > 15:
|
|
# Medium FPS streams
|
|
buffer_size = 3
|
|
else:
|
|
# Low FPS streams (like real cam) can use smaller buffer
|
|
buffer_size = 2
|
|
|
|
# Apply buffer size with bounds checking
|
|
try:
|
|
self.cap.set(cv2.CAP_PROP_BUFFERSIZE, buffer_size)
|
|
actual_buffer = int(self.cap.get(cv2.CAP_PROP_BUFFERSIZE))
|
|
logger.debug(f"Camera {self.camera_id}: Buffer size set to {buffer_size}, actual: {actual_buffer}")
|
|
except Exception as e:
|
|
logger.warning(f"Failed to set buffer size for camera {self.camera_id}: {e}")
|
|
|
|
# Don't override FPS - let stream use its natural rate
|
|
# This works for both mock cam (60fps) and real cam (6fps)
|
|
logger.debug(f"Camera {self.camera_id}: Using native FPS {fps}")
|
|
|
|
# Additional optimization for high resolution streams
|
|
if width * height > 1920 * 1080:
|
|
logger.info(f"Camera {self.camera_id}: High resolution stream detected, applying optimizations")
|
|
|
|
logger.info(f"Camera {self.camera_id} initialized: {width}x{height}, FPS: {fps}, Buffer: {buffer_size}")
|
|
return True
|
|
else:
|
|
logger.error(f"Failed to initialize camera {self.camera_id}")
|
|
return False
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error initializing capture for camera {self.camera_id}: {e}")
|
|
return False
|
|
|
|
def _enhance_rtsp_url(self, rtsp_url: str) -> str:
|
|
"""Use RTSP URL exactly as provided by backend without modification."""
|
|
return rtsp_url
|
|
|
|
def _is_frame_valid(self, frame) -> bool:
|
|
"""Validate frame integrity to detect corrupted frames."""
|
|
if frame is None:
|
|
return False
|
|
|
|
# Check frame dimensions
|
|
if frame.shape[0] < 10 or frame.shape[1] < 10:
|
|
return False
|
|
|
|
# Check if frame is completely black or completely white (possible corruption)
|
|
mean_val = np.mean(frame)
|
|
if mean_val < 1 or mean_val > 254:
|
|
return False
|
|
|
|
# Check for excessive noise/corruption (very high standard deviation)
|
|
std_val = np.std(frame)
|
|
if std_val > 100: # Threshold for detecting very noisy/corrupted frames
|
|
return False
|
|
|
|
return True
|
|
|
|
|
|
class HTTPSnapshotReader:
|
|
"""HTTP snapshot reader for periodic image capture."""
|
|
|
|
def __init__(self, camera_id: str, snapshot_url: str, interval_ms: int = 5000, max_retries: int = 3):
|
|
self.camera_id = camera_id
|
|
self.snapshot_url = snapshot_url
|
|
self.interval_ms = interval_ms
|
|
self.max_retries = max_retries
|
|
self.stop_event = threading.Event()
|
|
self.thread = None
|
|
self.frame_callback: Optional[Callable] = None
|
|
|
|
def set_frame_callback(self, callback: Callable[[str, np.ndarray], None]):
|
|
"""Set callback function to handle captured frames."""
|
|
self.frame_callback = callback
|
|
|
|
def start(self):
|
|
"""Start the snapshot reader thread."""
|
|
if self.thread and self.thread.is_alive():
|
|
logger.warning(f"Snapshot reader for {self.camera_id} already running")
|
|
return
|
|
|
|
self.stop_event.clear()
|
|
self.thread = threading.Thread(target=self._read_snapshots, daemon=True)
|
|
self.thread.start()
|
|
logger.info(f"Started snapshot reader for camera {self.camera_id}")
|
|
|
|
def stop(self):
|
|
"""Stop the snapshot reader thread."""
|
|
self.stop_event.set()
|
|
if self.thread:
|
|
self.thread.join(timeout=5.0)
|
|
logger.info(f"Stopped snapshot reader for camera {self.camera_id}")
|
|
|
|
def _read_snapshots(self):
|
|
"""Main snapshot reading loop."""
|
|
retries = 0
|
|
frame_count = 0
|
|
last_log_time = time.time()
|
|
interval_seconds = self.interval_ms / 1000.0
|
|
|
|
logger.info(f"Snapshot interval for camera {self.camera_id}: {interval_seconds}s")
|
|
|
|
try:
|
|
while not self.stop_event.is_set():
|
|
try:
|
|
start_time = time.time()
|
|
frame = self._fetch_snapshot()
|
|
|
|
if frame is None:
|
|
logger.warning(f"Failed to fetch snapshot for camera {self.camera_id}, retry {retries+1}/{self.max_retries}")
|
|
retries += 1
|
|
if retries > self.max_retries and self.max_retries != -1:
|
|
logger.error(f"Max retries reached for snapshot camera {self.camera_id}")
|
|
break
|
|
time.sleep(1)
|
|
continue
|
|
|
|
# Reset retry counter on successful fetch
|
|
retries = 0
|
|
frame_count += 1
|
|
|
|
# Call frame callback if set
|
|
if self.frame_callback:
|
|
self.frame_callback(self.camera_id, frame)
|
|
|
|
# Log progress every 30 seconds
|
|
current_time = time.time()
|
|
if current_time - last_log_time >= 30:
|
|
logger.info(f"Camera {self.camera_id}: {frame_count} snapshots processed")
|
|
last_log_time = current_time
|
|
|
|
# Wait for next interval, accounting for processing time
|
|
elapsed = time.time() - start_time
|
|
sleep_time = max(0, interval_seconds - elapsed)
|
|
if sleep_time > 0:
|
|
self.stop_event.wait(sleep_time)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error fetching snapshot for camera {self.camera_id}: {e}")
|
|
retries += 1
|
|
if retries > self.max_retries and self.max_retries != -1:
|
|
break
|
|
time.sleep(1)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Fatal error in snapshot reader for camera {self.camera_id}: {e}")
|
|
finally:
|
|
logger.info(f"Snapshot reader thread ended for camera {self.camera_id}")
|
|
|
|
def _fetch_snapshot(self) -> Optional[np.ndarray]:
|
|
"""Fetch a single snapshot from HTTP URL."""
|
|
try:
|
|
# Parse URL to extract auth credentials if present
|
|
from urllib.parse import urlparse
|
|
parsed_url = urlparse(self.snapshot_url)
|
|
|
|
# Prepare headers with proper authentication
|
|
headers = {}
|
|
auth = None
|
|
|
|
if parsed_url.username and parsed_url.password:
|
|
# Use HTTP Basic Auth properly
|
|
from requests.auth import HTTPBasicAuth, HTTPDigestAuth
|
|
auth = HTTPBasicAuth(parsed_url.username, parsed_url.password)
|
|
|
|
# Reconstruct URL without credentials
|
|
clean_url = f"{parsed_url.scheme}://{parsed_url.hostname}"
|
|
if parsed_url.port:
|
|
clean_url += f":{parsed_url.port}"
|
|
clean_url += parsed_url.path
|
|
if parsed_url.query:
|
|
clean_url += f"?{parsed_url.query}"
|
|
|
|
# Try with Basic Auth first
|
|
response = requests.get(clean_url, auth=auth, timeout=10, headers=headers)
|
|
|
|
# If Basic Auth fails, try Digest Auth (common for IP cameras)
|
|
if response.status_code == 401:
|
|
auth = HTTPDigestAuth(parsed_url.username, parsed_url.password)
|
|
response = requests.get(clean_url, auth=auth, timeout=10, headers=headers)
|
|
else:
|
|
# No auth in URL, use as-is
|
|
response = requests.get(self.snapshot_url, timeout=10, headers=headers)
|
|
|
|
if response.status_code == 200:
|
|
# Convert bytes to numpy array
|
|
image_array = np.frombuffer(response.content, np.uint8)
|
|
# Decode as image
|
|
frame = cv2.imdecode(image_array, cv2.IMREAD_COLOR)
|
|
return frame
|
|
else:
|
|
logger.warning(f"HTTP {response.status_code} from {self.snapshot_url}")
|
|
return None
|
|
except requests.RequestException as e:
|
|
logger.error(f"Request error fetching snapshot: {e}")
|
|
return None
|
|
except Exception as e:
|
|
logger.error(f"Error decoding snapshot: {e}")
|
|
return None
|
|
|
|
|
|
def fetch_snapshot(url: str) -> Optional[np.ndarray]:
|
|
"""Standalone function to fetch a snapshot (for compatibility)."""
|
|
try:
|
|
# Parse URL to extract auth credentials if present
|
|
from urllib.parse import urlparse
|
|
parsed_url = urlparse(url)
|
|
|
|
auth = None
|
|
if parsed_url.username and parsed_url.password:
|
|
# Use HTTP Basic Auth properly
|
|
from requests.auth import HTTPBasicAuth, HTTPDigestAuth
|
|
auth = HTTPBasicAuth(parsed_url.username, parsed_url.password)
|
|
|
|
# Reconstruct URL without credentials
|
|
clean_url = f"{parsed_url.scheme}://{parsed_url.hostname}"
|
|
if parsed_url.port:
|
|
clean_url += f":{parsed_url.port}"
|
|
clean_url += parsed_url.path
|
|
if parsed_url.query:
|
|
clean_url += f"?{parsed_url.query}"
|
|
|
|
# Try with Basic Auth first
|
|
response = requests.get(clean_url, auth=auth, timeout=10)
|
|
|
|
# If Basic Auth fails, try Digest Auth (common for IP cameras)
|
|
if response.status_code == 401:
|
|
auth = HTTPDigestAuth(parsed_url.username, parsed_url.password)
|
|
response = requests.get(clean_url, auth=auth, timeout=10)
|
|
else:
|
|
# No auth in URL, use as-is
|
|
response = requests.get(url, timeout=10)
|
|
|
|
if response.status_code == 200:
|
|
image_array = np.frombuffer(response.content, np.uint8)
|
|
frame = cv2.imdecode(image_array, cv2.IMREAD_COLOR)
|
|
return frame
|
|
return None
|
|
except Exception as e:
|
|
logger.error(f"Error fetching snapshot from {url}: {e}")
|
|
return None |