python-detector-worker/core/streaming/readers.py
2025-09-23 21:21:27 +07:00

440 lines
No EOL
18 KiB
Python

"""
Frame readers for RTSP streams and HTTP snapshots.
Extracted from app.py for modular architecture.
"""
import cv2
import logging
import time
import threading
import requests
import numpy as np
from typing import Optional, Callable
from queue import Queue
logger = logging.getLogger(__name__)
class RTSPReader:
"""RTSP stream frame reader using OpenCV."""
def __init__(self, camera_id: str, rtsp_url: str, max_retries: int = 3):
self.camera_id = camera_id
self.rtsp_url = rtsp_url
self.max_retries = max_retries
self.cap = None
self.stop_event = threading.Event()
self.thread = None
self.frame_callback: Optional[Callable] = None
def set_frame_callback(self, callback: Callable[[str, np.ndarray], None]):
"""Set callback function to handle captured frames."""
self.frame_callback = callback
def start(self):
"""Start the RTSP reader thread."""
if self.thread and self.thread.is_alive():
logger.warning(f"RTSP reader for {self.camera_id} already running")
return
self.stop_event.clear()
self.thread = threading.Thread(target=self._read_frames, daemon=True)
self.thread.start()
logger.info(f"Started RTSP reader for camera {self.camera_id}")
def stop(self):
"""Stop the RTSP reader thread."""
self.stop_event.set()
if self.thread:
self.thread.join(timeout=5.0)
if self.cap:
self.cap.release()
logger.info(f"Stopped RTSP reader for camera {self.camera_id}")
def _read_frames(self):
"""Main frame reading loop with improved error handling and stream recovery."""
retries = 0
frame_count = 0
last_log_time = time.time()
consecutive_errors = 0
last_successful_frame_time = time.time()
try:
# Initialize video capture with optimized parameters
self._initialize_capture()
while not self.stop_event.is_set():
try:
# Check if stream needs recovery
if not self.cap or not self.cap.isOpened():
logger.warning(f"Camera {self.camera_id} not open, reinitializing")
self._initialize_capture()
time.sleep(1)
continue
# Check for stream timeout (no frames for 30 seconds)
if time.time() - last_successful_frame_time > 30:
logger.warning(f"Camera {self.camera_id} stream timeout, reinitializing")
self._initialize_capture()
last_successful_frame_time = time.time()
continue
ret, frame = self.cap.read()
if not ret or frame is None:
consecutive_errors += 1
logger.warning(f"Failed to read frame from camera {self.camera_id} (consecutive errors: {consecutive_errors})")
# Force stream recovery after multiple consecutive errors
if consecutive_errors >= 5:
logger.warning(f"Camera {self.camera_id}: Too many consecutive errors, reinitializing stream")
self._initialize_capture()
consecutive_errors = 0
continue
retries += 1
if retries > self.max_retries and self.max_retries != -1:
logger.error(f"Max retries reached for camera {self.camera_id}")
break
time.sleep(0.1)
continue
# Skip frame validation for now - let YOLO handle corrupted frames
# if not self._is_frame_valid(frame):
# logger.debug(f"Invalid frame detected for camera {self.camera_id}, skipping")
# consecutive_errors += 1
# if consecutive_errors >= 10: # Reinitialize after many invalid frames
# logger.warning(f"Camera {self.camera_id}: Too many invalid frames, reinitializing")
# self._initialize_capture()
# consecutive_errors = 0
# continue
# Reset counters on successful read
retries = 0
consecutive_errors = 0
frame_count += 1
last_successful_frame_time = time.time()
# Call frame callback if set
if self.frame_callback:
self.frame_callback(self.camera_id, frame)
# Log progress every 30 seconds
current_time = time.time()
if current_time - last_log_time >= 30:
logger.info(f"Camera {self.camera_id}: {frame_count} frames processed, {consecutive_errors} consecutive errors")
last_log_time = current_time
# Adaptive delay based on stream FPS and performance
if consecutive_errors == 0:
# Calculate frame delay based on actual FPS
try:
actual_fps = self.cap.get(cv2.CAP_PROP_FPS)
if actual_fps > 0 and actual_fps <= 120: # Reasonable bounds
delay = 1.0 / actual_fps
# Mock cam: 60fps -> ~16.7ms delay
# Real cam: 6fps -> ~167ms delay
else:
# Fallback for invalid FPS values
delay = 0.033 # Default 30 FPS (33ms)
except Exception as e:
logger.debug(f"Failed to get FPS for delay calculation: {e}")
delay = 0.033 # Fallback to 30 FPS
else:
delay = 0.1 # Slower when having issues (100ms)
time.sleep(delay)
except Exception as e:
logger.error(f"Error reading frame from camera {self.camera_id}: {e}")
consecutive_errors += 1
retries += 1
# Force reinitialization on severe errors
if consecutive_errors >= 3:
logger.warning(f"Camera {self.camera_id}: Severe errors detected, reinitializing stream")
self._initialize_capture()
consecutive_errors = 0
if retries > self.max_retries and self.max_retries != -1:
break
time.sleep(1)
except Exception as e:
logger.error(f"Fatal error in RTSP reader for camera {self.camera_id}: {e}")
finally:
if self.cap:
self.cap.release()
logger.info(f"RTSP reader thread ended for camera {self.camera_id}")
def _initialize_capture(self):
"""Initialize or reinitialize video capture with optimized settings."""
try:
# Release previous capture if exists
if self.cap:
self.cap.release()
time.sleep(0.1)
# Create new capture with enhanced RTSP URL parameters
enhanced_url = self._enhance_rtsp_url(self.rtsp_url)
logger.debug(f"Initializing capture for camera {self.camera_id} with URL: {enhanced_url}")
self.cap = cv2.VideoCapture(enhanced_url)
if not self.cap.isOpened():
# Try again with different backend
logger.debug(f"Retrying capture initialization with different backend for camera {self.camera_id}")
self.cap = cv2.VideoCapture(enhanced_url, cv2.CAP_FFMPEG)
if self.cap.isOpened():
# Get actual stream properties first
width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = self.cap.get(cv2.CAP_PROP_FPS)
# Adaptive buffer settings based on FPS and resolution
# Mock cam: 1920x1080@60fps, Real cam: 1280x720@6fps
if fps > 30:
# High FPS streams (like mock cam) need larger buffer
buffer_size = 5
elif fps > 15:
# Medium FPS streams
buffer_size = 3
else:
# Low FPS streams (like real cam) can use smaller buffer
buffer_size = 2
# Apply buffer size with bounds checking
try:
self.cap.set(cv2.CAP_PROP_BUFFERSIZE, buffer_size)
actual_buffer = int(self.cap.get(cv2.CAP_PROP_BUFFERSIZE))
logger.debug(f"Camera {self.camera_id}: Buffer size set to {buffer_size}, actual: {actual_buffer}")
except Exception as e:
logger.warning(f"Failed to set buffer size for camera {self.camera_id}: {e}")
# Don't override FPS - let stream use its natural rate
# This works for both mock cam (60fps) and real cam (6fps)
logger.debug(f"Camera {self.camera_id}: Using native FPS {fps}")
# Additional optimization for high resolution streams
if width * height > 1920 * 1080:
logger.info(f"Camera {self.camera_id}: High resolution stream detected, applying optimizations")
logger.info(f"Camera {self.camera_id} initialized: {width}x{height}, FPS: {fps}, Buffer: {buffer_size}")
return True
else:
logger.error(f"Failed to initialize camera {self.camera_id}")
return False
except Exception as e:
logger.error(f"Error initializing capture for camera {self.camera_id}: {e}")
return False
def _enhance_rtsp_url(self, rtsp_url: str) -> str:
"""Use RTSP URL exactly as provided by backend without modification."""
return rtsp_url
def _is_frame_valid(self, frame) -> bool:
"""Validate frame integrity to detect corrupted frames."""
if frame is None:
return False
# Check frame dimensions
if frame.shape[0] < 10 or frame.shape[1] < 10:
return False
# Check if frame is completely black or completely white (possible corruption)
mean_val = np.mean(frame)
if mean_val < 1 or mean_val > 254:
return False
# Check for excessive noise/corruption (very high standard deviation)
std_val = np.std(frame)
if std_val > 100: # Threshold for detecting very noisy/corrupted frames
return False
return True
class HTTPSnapshotReader:
"""HTTP snapshot reader for periodic image capture."""
def __init__(self, camera_id: str, snapshot_url: str, interval_ms: int = 5000, max_retries: int = 3):
self.camera_id = camera_id
self.snapshot_url = snapshot_url
self.interval_ms = interval_ms
self.max_retries = max_retries
self.stop_event = threading.Event()
self.thread = None
self.frame_callback: Optional[Callable] = None
def set_frame_callback(self, callback: Callable[[str, np.ndarray], None]):
"""Set callback function to handle captured frames."""
self.frame_callback = callback
def start(self):
"""Start the snapshot reader thread."""
if self.thread and self.thread.is_alive():
logger.warning(f"Snapshot reader for {self.camera_id} already running")
return
self.stop_event.clear()
self.thread = threading.Thread(target=self._read_snapshots, daemon=True)
self.thread.start()
logger.info(f"Started snapshot reader for camera {self.camera_id}")
def stop(self):
"""Stop the snapshot reader thread."""
self.stop_event.set()
if self.thread:
self.thread.join(timeout=5.0)
logger.info(f"Stopped snapshot reader for camera {self.camera_id}")
def _read_snapshots(self):
"""Main snapshot reading loop."""
retries = 0
frame_count = 0
last_log_time = time.time()
interval_seconds = self.interval_ms / 1000.0
logger.info(f"Snapshot interval for camera {self.camera_id}: {interval_seconds}s")
try:
while not self.stop_event.is_set():
try:
start_time = time.time()
frame = self._fetch_snapshot()
if frame is None:
logger.warning(f"Failed to fetch snapshot for camera {self.camera_id}, retry {retries+1}/{self.max_retries}")
retries += 1
if retries > self.max_retries and self.max_retries != -1:
logger.error(f"Max retries reached for snapshot camera {self.camera_id}")
break
time.sleep(1)
continue
# Reset retry counter on successful fetch
retries = 0
frame_count += 1
# Call frame callback if set
if self.frame_callback:
self.frame_callback(self.camera_id, frame)
# Log progress every 30 seconds
current_time = time.time()
if current_time - last_log_time >= 30:
logger.info(f"Camera {self.camera_id}: {frame_count} snapshots processed")
last_log_time = current_time
# Wait for next interval, accounting for processing time
elapsed = time.time() - start_time
sleep_time = max(0, interval_seconds - elapsed)
if sleep_time > 0:
self.stop_event.wait(sleep_time)
except Exception as e:
logger.error(f"Error fetching snapshot for camera {self.camera_id}: {e}")
retries += 1
if retries > self.max_retries and self.max_retries != -1:
break
time.sleep(1)
except Exception as e:
logger.error(f"Fatal error in snapshot reader for camera {self.camera_id}: {e}")
finally:
logger.info(f"Snapshot reader thread ended for camera {self.camera_id}")
def _fetch_snapshot(self) -> Optional[np.ndarray]:
"""Fetch a single snapshot from HTTP URL."""
try:
# Parse URL to extract auth credentials if present
from urllib.parse import urlparse
parsed_url = urlparse(self.snapshot_url)
# Prepare headers with proper authentication
headers = {}
auth = None
if parsed_url.username and parsed_url.password:
# Use HTTP Basic Auth properly
from requests.auth import HTTPBasicAuth, HTTPDigestAuth
auth = HTTPBasicAuth(parsed_url.username, parsed_url.password)
# Reconstruct URL without credentials
clean_url = f"{parsed_url.scheme}://{parsed_url.hostname}"
if parsed_url.port:
clean_url += f":{parsed_url.port}"
clean_url += parsed_url.path
if parsed_url.query:
clean_url += f"?{parsed_url.query}"
# Try with Basic Auth first
response = requests.get(clean_url, auth=auth, timeout=10, headers=headers)
# If Basic Auth fails, try Digest Auth (common for IP cameras)
if response.status_code == 401:
auth = HTTPDigestAuth(parsed_url.username, parsed_url.password)
response = requests.get(clean_url, auth=auth, timeout=10, headers=headers)
else:
# No auth in URL, use as-is
response = requests.get(self.snapshot_url, timeout=10, headers=headers)
if response.status_code == 200:
# Convert bytes to numpy array
image_array = np.frombuffer(response.content, np.uint8)
# Decode as image
frame = cv2.imdecode(image_array, cv2.IMREAD_COLOR)
return frame
else:
logger.warning(f"HTTP {response.status_code} from {self.snapshot_url}")
return None
except requests.RequestException as e:
logger.error(f"Request error fetching snapshot: {e}")
return None
except Exception as e:
logger.error(f"Error decoding snapshot: {e}")
return None
def fetch_snapshot(url: str) -> Optional[np.ndarray]:
"""Standalone function to fetch a snapshot (for compatibility)."""
try:
# Parse URL to extract auth credentials if present
from urllib.parse import urlparse
parsed_url = urlparse(url)
auth = None
if parsed_url.username and parsed_url.password:
# Use HTTP Basic Auth properly
from requests.auth import HTTPBasicAuth, HTTPDigestAuth
auth = HTTPBasicAuth(parsed_url.username, parsed_url.password)
# Reconstruct URL without credentials
clean_url = f"{parsed_url.scheme}://{parsed_url.hostname}"
if parsed_url.port:
clean_url += f":{parsed_url.port}"
clean_url += parsed_url.path
if parsed_url.query:
clean_url += f"?{parsed_url.query}"
# Try with Basic Auth first
response = requests.get(clean_url, auth=auth, timeout=10)
# If Basic Auth fails, try Digest Auth (common for IP cameras)
if response.status_code == 401:
auth = HTTPDigestAuth(parsed_url.username, parsed_url.password)
response = requests.get(clean_url, auth=auth, timeout=10)
else:
# No auth in URL, use as-is
response = requests.get(url, timeout=10)
if response.status_code == 200:
image_array = np.frombuffer(response.content, np.uint8)
frame = cv2.imdecode(image_array, cv2.IMREAD_COLOR)
return frame
return None
except Exception as e:
logger.error(f"Error fetching snapshot from {url}: {e}")
return None