Merge pull request 'dev' (#18) from dev into main
Some checks failed
Build Worker Base and Application Images / check-base-changes (push) Successful in 8s
Build Worker Base and Application Images / build-base (push) Successful in 11m28s
Build Worker Base and Application Images / build-docker (push) Successful in 4m6s
Build Worker Base and Application Images / deploy-stack (push) Has been cancelled

Reviewed-on: #18
This commit is contained in:
Siwat Sirichai 2025-09-26 13:50:19 +00:00
commit 8c08c815ce
9 changed files with 677 additions and 790 deletions

View file

@ -2,13 +2,12 @@
Streaming system for RTSP and HTTP camera feeds. Streaming system for RTSP and HTTP camera feeds.
Provides modular frame readers, buffers, and stream management. Provides modular frame readers, buffers, and stream management.
""" """
from .readers import RTSPReader, HTTPSnapshotReader, FFmpegRTSPReader from .readers import HTTPSnapshotReader, FFmpegRTSPReader
from .buffers import FrameBuffer, CacheBuffer, shared_frame_buffer, shared_cache_buffer from .buffers import FrameBuffer, CacheBuffer, shared_frame_buffer, shared_cache_buffer
from .manager import StreamManager, StreamConfig, SubscriptionInfo, shared_stream_manager, initialize_stream_manager from .manager import StreamManager, StreamConfig, SubscriptionInfo, shared_stream_manager, initialize_stream_manager
__all__ = [ __all__ = [
# Readers # Readers
'RTSPReader',
'HTTPSnapshotReader', 'HTTPSnapshotReader',
'FFmpegRTSPReader', 'FFmpegRTSPReader',

View file

@ -9,7 +9,7 @@ from typing import Dict, Set, Optional, List, Any
from dataclasses import dataclass from dataclasses import dataclass
from collections import defaultdict from collections import defaultdict
from .readers import RTSPReader, HTTPSnapshotReader, FFmpegRTSPReader from .readers import HTTPSnapshotReader, FFmpegRTSPReader
from .buffers import shared_cache_buffer from .buffers import shared_cache_buffer
from ..tracking.integration import TrackingPipelineIntegration from ..tracking.integration import TrackingPipelineIntegration

View file

@ -1,786 +0,0 @@
"""
Frame readers for RTSP streams and HTTP snapshots.
Optimized for 1280x720@6fps RTSP and 2560x1440 HTTP snapshots.
"""
import cv2
import logging
import time
import threading
import requests
import numpy as np
import os
import subprocess
# import fcntl # No longer needed with atomic file operations
from typing import Optional, Callable
# Removed watchdog imports - no longer using file watching
# Suppress FFMPEG/H.264 error messages if needed
# Set this environment variable to reduce noise from decoder errors
os.environ["OPENCV_LOG_LEVEL"] = "ERROR"
os.environ["OPENCV_FFMPEG_LOGLEVEL"] = "-8" # Suppress FFMPEG warnings
logger = logging.getLogger(__name__)
# Color codes for pretty logging
class Colors:
GREEN = '\033[92m'
YELLOW = '\033[93m'
RED = '\033[91m'
BLUE = '\033[94m'
PURPLE = '\033[95m'
CYAN = '\033[96m'
WHITE = '\033[97m'
BOLD = '\033[1m'
END = '\033[0m'
def log_success(camera_id: str, message: str):
"""Log success messages in green"""
logger.info(f"{Colors.GREEN}[{camera_id}] {message}{Colors.END}")
def log_warning(camera_id: str, message: str):
"""Log warnings in yellow"""
logger.warning(f"{Colors.YELLOW}[{camera_id}] {message}{Colors.END}")
def log_error(camera_id: str, message: str):
"""Log errors in red"""
logger.error(f"{Colors.RED}[{camera_id}] {message}{Colors.END}")
def log_info(camera_id: str, message: str):
"""Log info in cyan"""
logger.info(f"{Colors.CYAN}[{camera_id}] {message}{Colors.END}")
# Removed watchdog logging configuration - no longer using file watching
# Removed FrameFileHandler - no longer using file watching
class FFmpegRTSPReader:
"""RTSP stream reader using subprocess FFmpeg piping frames directly to buffer."""
def __init__(self, camera_id: str, rtsp_url: str, max_retries: int = 3):
self.camera_id = camera_id
self.rtsp_url = rtsp_url
self.max_retries = max_retries
self.process = None
self.stop_event = threading.Event()
self.thread = None
self.frame_callback: Optional[Callable] = None
# Expected stream specs (for reference, actual dimensions read from PPM header)
self.width = 1280
self.height = 720
def set_frame_callback(self, callback: Callable[[str, np.ndarray], None]):
"""Set callback function to handle captured frames."""
self.frame_callback = callback
def start(self):
"""Start the FFmpeg subprocess reader."""
if self.thread and self.thread.is_alive():
logger.warning(f"FFmpeg reader for {self.camera_id} already running")
return
self.stop_event.clear()
self.thread = threading.Thread(target=self._read_frames, daemon=True)
self.thread.start()
log_success(self.camera_id, "Stream started")
def stop(self):
"""Stop the FFmpeg subprocess reader."""
self.stop_event.set()
if self.process:
self.process.terminate()
try:
self.process.wait(timeout=5)
except subprocess.TimeoutExpired:
self.process.kill()
if self.thread:
self.thread.join(timeout=5.0)
log_info(self.camera_id, "Stream stopped")
# Removed _probe_stream_info - BMP headers contain dimensions
def _start_ffmpeg_process(self):
"""Start FFmpeg subprocess outputting BMP frames to stdout pipe."""
cmd = [
'ffmpeg',
# DO NOT REMOVE
'-hwaccel', 'cuda',
'-hwaccel_device', '0',
'-rtsp_transport', 'tcp',
'-i', self.rtsp_url,
'-f', 'image2pipe', # Output images to pipe
'-vcodec', 'bmp', # BMP format with header containing dimensions
# Use native stream resolution and framerate
'-an', # No audio
'-' # Output to stdout
]
try:
# Start FFmpeg with stdout pipe to read frames directly
self.process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE, # Capture stdout for frame data
stderr=subprocess.DEVNULL,
bufsize=0 # Unbuffered for real-time processing
)
return True
except Exception as e:
log_error(self.camera_id, f"FFmpeg startup failed: {e}")
return False
def _read_bmp_frame(self, pipe):
"""Read BMP frame from pipe - BMP header contains dimensions."""
try:
# Read BMP header (14 bytes file header + 40 bytes info header = 54 bytes minimum)
header_data = b''
bytes_to_read = 54
while len(header_data) < bytes_to_read:
chunk = pipe.read(bytes_to_read - len(header_data))
if not chunk:
return None # Silent end of stream
header_data += chunk
# Parse BMP header
if header_data[:2] != b'BM':
return None # Invalid format, skip frame silently
# Extract file size from header (bytes 2-5)
import struct
file_size = struct.unpack('<L', header_data[2:6])[0]
# Extract width and height from info header (bytes 18-21 and 22-25)
width = struct.unpack('<L', header_data[18:22])[0]
height = struct.unpack('<L', header_data[22:26])[0]
# Read remaining file data
remaining_size = file_size - 54
remaining_data = b''
while len(remaining_data) < remaining_size:
chunk = pipe.read(remaining_size - len(remaining_data))
if not chunk:
return None # Stream ended silently
remaining_data += chunk
# Complete BMP data
bmp_data = header_data + remaining_data
# Use OpenCV to decode BMP directly from memory
frame_array = np.frombuffer(bmp_data, dtype=np.uint8)
frame = cv2.imdecode(frame_array, cv2.IMREAD_COLOR)
if frame is None:
return None # Decode failed silently
return frame
except Exception:
return None # Error reading frame silently
def _read_frames(self):
"""Read frames directly from FFmpeg stdout pipe."""
frame_count = 0
last_log_time = time.time()
while not self.stop_event.is_set():
try:
# Start FFmpeg if not running
if not self.process or self.process.poll() is not None:
if self.process and self.process.poll() is not None:
log_warning(self.camera_id, "Stream disconnected, reconnecting...")
if not self._start_ffmpeg_process():
time.sleep(5.0)
continue
# Read frames directly from FFmpeg stdout
try:
if self.process and self.process.stdout:
# Read BMP frame data
frame = self._read_bmp_frame(self.process.stdout)
if frame is None:
continue
# Call frame callback
if self.frame_callback:
self.frame_callback(self.camera_id, frame)
frame_count += 1
# Log progress every 60 seconds (quieter)
current_time = time.time()
if current_time - last_log_time >= 60:
log_success(self.camera_id, f"{frame_count} frames captured ({frame.shape[1]}x{frame.shape[0]})")
last_log_time = current_time
except Exception:
# Process might have died, let it restart on next iteration
if self.process:
self.process.terminate()
self.process = None
time.sleep(1.0)
except Exception:
time.sleep(1.0)
# Cleanup
if self.process:
self.process.terminate()
logger = logging.getLogger(__name__)
class RTSPReader:
"""RTSP stream frame reader optimized for 1280x720 @ 6fps streams."""
def __init__(self, camera_id: str, rtsp_url: str, max_retries: int = 3):
self.camera_id = camera_id
self.rtsp_url = rtsp_url
self.max_retries = max_retries
self.cap = None
self.stop_event = threading.Event()
self.thread = None
self.frame_callback: Optional[Callable] = None
# Expected stream specifications
self.expected_width = 1280
self.expected_height = 720
self.expected_fps = 6
# Frame processing parameters
self.error_recovery_delay = 5.0 # Increased from 2.0 for stability
self.max_consecutive_errors = 30 # Increased from 10 to handle network jitter
self.stream_timeout = 30.0
def set_frame_callback(self, callback: Callable[[str, np.ndarray], None]):
"""Set callback function to handle captured frames."""
self.frame_callback = callback
def start(self):
"""Start the RTSP reader thread."""
if self.thread and self.thread.is_alive():
logger.warning(f"RTSP reader for {self.camera_id} already running")
return
self.stop_event.clear()
self.thread = threading.Thread(target=self._read_frames, daemon=True)
self.thread.start()
logger.info(f"Started RTSP reader for camera {self.camera_id}")
def stop(self):
"""Stop the RTSP reader thread."""
self.stop_event.set()
if self.thread:
self.thread.join(timeout=5.0)
if self.cap:
self.cap.release()
logger.info(f"Stopped RTSP reader for camera {self.camera_id}")
def _read_frames(self):
"""Main frame reading loop with H.264 error recovery."""
consecutive_errors = 0
frame_count = 0
last_log_time = time.time()
last_successful_frame_time = time.time()
while not self.stop_event.is_set():
try:
# Initialize/reinitialize capture if needed
if not self.cap or not self.cap.isOpened():
if not self._initialize_capture():
time.sleep(self.error_recovery_delay)
continue
last_successful_frame_time = time.time()
# Check for stream timeout
if time.time() - last_successful_frame_time > self.stream_timeout:
logger.warning(f"Camera {self.camera_id}: Stream timeout, reinitializing")
self._reinitialize_capture()
last_successful_frame_time = time.time()
continue
# Read frame immediately without rate limiting for minimum latency
try:
ret, frame = self.cap.read()
if ret and frame is None:
# Grab succeeded but retrieve failed - decoder issue
logger.error(f"Camera {self.camera_id}: Frame grab OK but decode failed")
except Exception as read_error:
logger.error(f"Camera {self.camera_id}: cap.read() threw exception: {type(read_error).__name__}: {read_error}")
ret, frame = False, None
if not ret or frame is None:
consecutive_errors += 1
# Enhanced logging to diagnose the issue
logger.error(f"Camera {self.camera_id}: cap.read() failed - ret={ret}, frame={frame is not None}")
# Try to get more info from the capture
try:
if self.cap and self.cap.isOpened():
backend = self.cap.getBackendName()
pos_frames = self.cap.get(cv2.CAP_PROP_POS_FRAMES)
logger.error(f"Camera {self.camera_id}: Capture open, backend: {backend}, pos_frames: {pos_frames}")
else:
logger.error(f"Camera {self.camera_id}: Capture is closed or None!")
except Exception as info_error:
logger.error(f"Camera {self.camera_id}: Error getting capture info: {type(info_error).__name__}: {info_error}")
if consecutive_errors >= self.max_consecutive_errors:
logger.error(f"Camera {self.camera_id}: Too many consecutive errors ({consecutive_errors}), reinitializing")
self._reinitialize_capture()
consecutive_errors = 0
time.sleep(self.error_recovery_delay)
else:
# Skip corrupted frame and continue with exponential backoff
if consecutive_errors <= 5:
logger.debug(f"Camera {self.camera_id}: Frame read failed (error {consecutive_errors})")
elif consecutive_errors % 10 == 0: # Log every 10th error after 5
logger.warning(f"Camera {self.camera_id}: Continuing frame read failures (error {consecutive_errors})")
# Exponential backoff with cap at 1 second
sleep_time = min(0.1 * (1.5 ** min(consecutive_errors, 10)), 1.0)
time.sleep(sleep_time)
continue
# Accept any valid frame dimensions - don't force specific resolution
if frame.shape[1] <= 0 or frame.shape[0] <= 0:
consecutive_errors += 1
continue
# Check for corrupted frames (all black, all white, excessive noise)
if self._is_frame_corrupted(frame):
logger.debug(f"Camera {self.camera_id}: Corrupted frame detected, skipping")
consecutive_errors += 1
continue
# Frame is valid
consecutive_errors = 0
frame_count += 1
last_successful_frame_time = time.time()
# Call frame callback
if self.frame_callback:
try:
self.frame_callback(self.camera_id, frame)
except Exception as e:
logger.error(f"Camera {self.camera_id}: Frame callback error: {e}")
# Log progress every 30 seconds
current_time = time.time()
if current_time - last_log_time >= 30:
logger.info(f"Camera {self.camera_id}: {frame_count} frames processed")
last_log_time = current_time
except Exception as e:
logger.error(f"Camera {self.camera_id}: Error in frame reading loop: {e}")
consecutive_errors += 1
if consecutive_errors >= self.max_consecutive_errors:
self._reinitialize_capture()
consecutive_errors = 0
time.sleep(self.error_recovery_delay)
# Cleanup
if self.cap:
self.cap.release()
logger.info(f"RTSP reader thread ended for camera {self.camera_id}")
def _initialize_capture(self) -> bool:
"""Initialize video capture with FFmpeg hardware acceleration (CUVID/NVDEC) for 1280x720@6fps."""
try:
# Release previous capture if exists
if self.cap:
self.cap.release()
time.sleep(0.5)
logger.info(f"Initializing capture for camera {self.camera_id} with FFmpeg hardware acceleration")
hw_accel_success = False
# Method 1: Try OpenCV CUDA VideoReader (if built with CUVID support)
if not hw_accel_success:
try:
# Check if OpenCV was built with CUDA codec support
build_info = cv2.getBuildInformation()
if 'cudacodec' in build_info or 'CUVID' in build_info:
logger.info(f"Attempting OpenCV CUDA VideoReader for camera {self.camera_id}")
# Use OpenCV's CUDA backend
self.cap = cv2.VideoCapture(self.rtsp_url, cv2.CAP_FFMPEG, [
cv2.CAP_PROP_HW_ACCELERATION, cv2.VIDEO_ACCELERATION_ANY
])
if self.cap.isOpened():
hw_accel_success = True
logger.info(f"Camera {self.camera_id}: Using OpenCV CUDA hardware acceleration")
else:
logger.debug(f"Camera {self.camera_id}: OpenCV not built with CUDA codec support")
except Exception as e:
logger.debug(f"Camera {self.camera_id}: OpenCV CUDA not available: {e}")
# Method 2: Try FFmpeg with optimal hardware acceleration (CUVID/NVDEC)
if not hw_accel_success:
try:
from core.utils.ffmpeg_detector import get_optimal_rtsp_options
import os
# Get optimal FFmpeg options based on detected capabilities
optimal_options = get_optimal_rtsp_options(self.rtsp_url)
os.environ['OPENCV_FFMPEG_CAPTURE_OPTIONS'] = optimal_options
logger.info(f"Attempting FFmpeg with detected hardware acceleration for camera {self.camera_id}")
logger.debug(f"Camera {self.camera_id}: Using FFmpeg options: {optimal_options}")
self.cap = cv2.VideoCapture(self.rtsp_url, cv2.CAP_FFMPEG)
if self.cap.isOpened():
hw_accel_success = True
# Try to get backend info to confirm hardware acceleration
backend = self.cap.getBackendName()
logger.info(f"Camera {self.camera_id}: Using FFmpeg hardware acceleration (backend: {backend})")
except Exception as e:
logger.debug(f"Camera {self.camera_id}: FFmpeg optimal hardware acceleration not available: {e}")
# Method 3: Try FFmpeg with NVIDIA NVDEC (better for RTX 3060)
if not hw_accel_success:
try:
import os
os.environ['OPENCV_FFMPEG_CAPTURE_OPTIONS'] = 'hwaccel;cuda|hwaccel_device;0|rtsp_transport;tcp'
logger.info(f"Attempting FFmpeg with NVDEC hardware acceleration for camera {self.camera_id}")
self.cap = cv2.VideoCapture(self.rtsp_url, cv2.CAP_FFMPEG)
if self.cap.isOpened():
hw_accel_success = True
logger.info(f"Camera {self.camera_id}: Using FFmpeg NVDEC hardware acceleration")
except Exception as e:
logger.debug(f"Camera {self.camera_id}: FFmpeg NVDEC not available: {e}")
# Method 4: Try FFmpeg with VAAPI (Intel/AMD GPUs)
if not hw_accel_success:
try:
import os
os.environ['OPENCV_FFMPEG_CAPTURE_OPTIONS'] = 'hwaccel;vaapi|hwaccel_device;/dev/dri/renderD128|video_codec;h264|rtsp_transport;tcp'
logger.info(f"Attempting FFmpeg with VAAPI for camera {self.camera_id}")
self.cap = cv2.VideoCapture(self.rtsp_url, cv2.CAP_FFMPEG)
if self.cap.isOpened():
hw_accel_success = True
logger.info(f"Camera {self.camera_id}: Using FFmpeg VAAPI hardware acceleration")
except Exception as e:
logger.debug(f"Camera {self.camera_id}: FFmpeg VAAPI not available: {e}")
# Fallback: Standard FFmpeg with software decoding
if not hw_accel_success:
logger.warning(f"Camera {self.camera_id}: Hardware acceleration not available, falling back to software decoding")
import os
os.environ['OPENCV_FFMPEG_CAPTURE_OPTIONS'] = 'rtsp_transport;tcp'
self.cap = cv2.VideoCapture(self.rtsp_url, cv2.CAP_FFMPEG)
if not self.cap.isOpened():
logger.error(f"Failed to open stream for camera {self.camera_id}")
return False
# Don't force resolution/fps - let the stream determine its natural specs
# The camera will provide whatever resolution/fps it supports
# Set FFMPEG options for better H.264 handling
self.cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*'H264'))
# Verify stream properties
actual_width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH))
actual_height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
actual_fps = self.cap.get(cv2.CAP_PROP_FPS)
logger.info(f"Camera {self.camera_id} initialized: {actual_width}x{actual_height} @ {actual_fps}fps")
# Read and discard first few frames to stabilize stream
for _ in range(5):
ret, _ = self.cap.read()
if not ret:
logger.warning(f"Camera {self.camera_id}: Failed to read initial frames")
time.sleep(0.1)
return True
except Exception as e:
logger.error(f"Error initializing capture for camera {self.camera_id}: {e}")
return False
def _reinitialize_capture(self):
"""Reinitialize capture after errors with retry logic."""
logger.info(f"Reinitializing capture for camera {self.camera_id}")
if self.cap:
self.cap.release()
self.cap = None
# Longer delay before reconnection to avoid rapid reconnect loops
time.sleep(3.0)
# Retry initialization up to 3 times
for attempt in range(3):
if self._initialize_capture():
logger.info(f"Successfully reinitialized camera {self.camera_id} on attempt {attempt + 1}")
break
else:
logger.warning(f"Failed to reinitialize camera {self.camera_id} on attempt {attempt + 1}")
time.sleep(2.0)
def _is_frame_corrupted(self, frame: np.ndarray) -> bool:
"""Check if frame is corrupted (all black, all white, or excessive noise)."""
if frame is None or frame.size == 0:
return True
# Check mean and standard deviation
mean = np.mean(frame)
std = np.std(frame)
# All black or all white
if mean < 5 or mean > 250:
return True
# No variation (stuck frame)
if std < 1:
return True
# Excessive noise (corrupted H.264 decode)
# Calculate edge density as corruption indicator
edges = cv2.Canny(frame, 50, 150)
edge_density = np.sum(edges > 0) / edges.size
# Too many edges indicate corruption
if edge_density > 0.5:
return True
return False
class HTTPSnapshotReader:
"""HTTP snapshot reader optimized for 2560x1440 (2K) high quality images."""
def __init__(self, camera_id: str, snapshot_url: str, interval_ms: int = 5000, max_retries: int = 3):
self.camera_id = camera_id
self.snapshot_url = snapshot_url
self.interval_ms = interval_ms
self.max_retries = max_retries
self.stop_event = threading.Event()
self.thread = None
self.frame_callback: Optional[Callable] = None
# Expected snapshot specifications
self.expected_width = 2560
self.expected_height = 1440
self.max_file_size = 10 * 1024 * 1024 # 10MB max for 2K image
def set_frame_callback(self, callback: Callable[[str, np.ndarray], None]):
"""Set callback function to handle captured frames."""
self.frame_callback = callback
def start(self):
"""Start the snapshot reader thread."""
if self.thread and self.thread.is_alive():
logger.warning(f"Snapshot reader for {self.camera_id} already running")
return
self.stop_event.clear()
self.thread = threading.Thread(target=self._read_snapshots, daemon=True)
self.thread.start()
logger.info(f"Started snapshot reader for camera {self.camera_id}")
def stop(self):
"""Stop the snapshot reader thread."""
self.stop_event.set()
if self.thread:
self.thread.join(timeout=5.0)
logger.info(f"Stopped snapshot reader for camera {self.camera_id}")
def _read_snapshots(self):
"""Main snapshot reading loop for high quality 2K images."""
retries = 0
frame_count = 0
last_log_time = time.time()
interval_seconds = self.interval_ms / 1000.0
logger.info(f"Snapshot interval for camera {self.camera_id}: {interval_seconds}s")
while not self.stop_event.is_set():
try:
start_time = time.time()
frame = self._fetch_snapshot()
if frame is None:
retries += 1
logger.warning(f"Failed to fetch snapshot for camera {self.camera_id}, retry {retries}/{self.max_retries}")
if self.max_retries != -1 and retries > self.max_retries:
logger.error(f"Max retries reached for snapshot camera {self.camera_id}")
break
time.sleep(min(2.0, interval_seconds))
continue
# Accept any valid image dimensions - don't force specific resolution
if frame.shape[1] <= 0 or frame.shape[0] <= 0:
logger.warning(f"Camera {self.camera_id}: Invalid frame dimensions {frame.shape[1]}x{frame.shape[0]}")
continue
# Reset retry counter on successful fetch
retries = 0
frame_count += 1
# Call frame callback
if self.frame_callback:
try:
self.frame_callback(self.camera_id, frame)
except Exception as e:
logger.error(f"Camera {self.camera_id}: Frame callback error: {e}")
# Log progress every 30 seconds
current_time = time.time()
if current_time - last_log_time >= 30:
logger.info(f"Camera {self.camera_id}: {frame_count} snapshots processed")
last_log_time = current_time
# Wait for next interval
elapsed = time.time() - start_time
sleep_time = max(0, interval_seconds - elapsed)
if sleep_time > 0:
self.stop_event.wait(sleep_time)
except Exception as e:
logger.error(f"Error in snapshot loop for camera {self.camera_id}: {e}")
retries += 1
if self.max_retries != -1 and retries > self.max_retries:
break
time.sleep(min(2.0, interval_seconds))
logger.info(f"Snapshot reader thread ended for camera {self.camera_id}")
def _fetch_snapshot(self) -> Optional[np.ndarray]:
"""Fetch a single high quality snapshot from HTTP URL."""
try:
# Parse URL for authentication
from urllib.parse import urlparse
parsed_url = urlparse(self.snapshot_url)
headers = {
'User-Agent': 'Python-Detector-Worker/1.0',
'Accept': 'image/jpeg, image/png, image/*'
}
auth = None
if parsed_url.username and parsed_url.password:
from requests.auth import HTTPBasicAuth, HTTPDigestAuth
auth = HTTPBasicAuth(parsed_url.username, parsed_url.password)
# Reconstruct URL without credentials
clean_url = f"{parsed_url.scheme}://{parsed_url.hostname}"
if parsed_url.port:
clean_url += f":{parsed_url.port}"
clean_url += parsed_url.path
if parsed_url.query:
clean_url += f"?{parsed_url.query}"
# Try Basic Auth first
response = requests.get(clean_url, auth=auth, timeout=15, headers=headers,
stream=True, verify=False)
# If Basic Auth fails, try Digest Auth
if response.status_code == 401:
auth = HTTPDigestAuth(parsed_url.username, parsed_url.password)
response = requests.get(clean_url, auth=auth, timeout=15, headers=headers,
stream=True, verify=False)
else:
response = requests.get(self.snapshot_url, timeout=15, headers=headers,
stream=True, verify=False)
if response.status_code == 200:
# Check content size
content_length = int(response.headers.get('content-length', 0))
if content_length > self.max_file_size:
logger.warning(f"Snapshot too large for camera {self.camera_id}: {content_length} bytes")
return None
# Read content
content = response.content
# Convert to numpy array
image_array = np.frombuffer(content, np.uint8)
# Decode as high quality image
frame = cv2.imdecode(image_array, cv2.IMREAD_COLOR)
if frame is None:
logger.error(f"Failed to decode snapshot for camera {self.camera_id}")
return None
logger.debug(f"Fetched snapshot for camera {self.camera_id}: {frame.shape[1]}x{frame.shape[0]}")
return frame
else:
logger.warning(f"HTTP {response.status_code} from {self.camera_id}")
return None
except requests.RequestException as e:
logger.error(f"Request error fetching snapshot for {self.camera_id}: {e}")
return None
except Exception as e:
logger.error(f"Error decoding snapshot for {self.camera_id}: {e}")
return None
def fetch_single_snapshot(self) -> Optional[np.ndarray]:
"""
Fetch a single high-quality snapshot on demand for pipeline processing.
This method is for one-time fetch from HTTP URL, not continuous streaming.
Returns:
High quality 2K snapshot frame or None if failed
"""
logger.info(f"[SNAPSHOT] Fetching snapshot for {self.camera_id} from {self.snapshot_url}")
# Try to fetch snapshot with retries
for attempt in range(self.max_retries):
frame = self._fetch_snapshot()
if frame is not None:
logger.info(f"[SNAPSHOT] Successfully fetched {frame.shape[1]}x{frame.shape[0]} snapshot for {self.camera_id}")
return frame
if attempt < self.max_retries - 1:
logger.warning(f"[SNAPSHOT] Attempt {attempt + 1}/{self.max_retries} failed for {self.camera_id}, retrying...")
time.sleep(0.5)
logger.error(f"[SNAPSHOT] Failed to fetch snapshot for {self.camera_id} after {self.max_retries} attempts")
return None
def _resize_maintain_aspect(self, frame: np.ndarray, target_width: int, target_height: int) -> np.ndarray:
"""Resize image while maintaining aspect ratio for high quality."""
h, w = frame.shape[:2]
aspect = w / h
target_aspect = target_width / target_height
if aspect > target_aspect:
# Image is wider
new_width = target_width
new_height = int(target_width / aspect)
else:
# Image is taller
new_height = target_height
new_width = int(target_height * aspect)
# Use INTER_LANCZOS4 for high quality downsampling
resized = cv2.resize(frame, (new_width, new_height), interpolation=cv2.INTER_LANCZOS4)
# Pad to target size if needed
if new_width < target_width or new_height < target_height:
top = (target_height - new_height) // 2
bottom = target_height - new_height - top
left = (target_width - new_width) // 2
right = target_width - new_width - left
resized = cv2.copyMakeBorder(resized, top, bottom, left, right, cv2.BORDER_CONSTANT, value=[0, 0, 0])
return resized

View file

@ -0,0 +1,18 @@
"""
Stream readers for RTSP and HTTP camera feeds.
"""
from .base import VideoReader
from .ffmpeg_rtsp import FFmpegRTSPReader
from .http_snapshot import HTTPSnapshotReader
from .utils import log_success, log_warning, log_error, log_info, Colors
__all__ = [
'VideoReader',
'FFmpegRTSPReader',
'HTTPSnapshotReader',
'log_success',
'log_warning',
'log_error',
'log_info',
'Colors'
]

View file

@ -0,0 +1,65 @@
"""
Abstract base class for video stream readers.
"""
from abc import ABC, abstractmethod
from typing import Optional, Callable
import numpy as np
class VideoReader(ABC):
"""Abstract base class for video stream readers."""
def __init__(self, camera_id: str, source_url: str, max_retries: int = 3):
"""
Initialize the video reader.
Args:
camera_id: Unique identifier for the camera
source_url: URL or path to the video source
max_retries: Maximum number of retry attempts
"""
self.camera_id = camera_id
self.source_url = source_url
self.max_retries = max_retries
self.frame_callback: Optional[Callable[[str, np.ndarray], None]] = None
@abstractmethod
def start(self) -> None:
"""Start the video reader."""
pass
@abstractmethod
def stop(self) -> None:
"""Stop the video reader."""
pass
@abstractmethod
def set_frame_callback(self, callback: Callable[[str, np.ndarray], None]) -> None:
"""
Set callback function to handle captured frames.
Args:
callback: Function that takes (camera_id, frame) as arguments
"""
pass
@property
@abstractmethod
def is_running(self) -> bool:
"""Check if the reader is currently running."""
pass
@property
@abstractmethod
def reader_type(self) -> str:
"""Get the type of reader (e.g., 'rtsp', 'http_snapshot')."""
pass
def __enter__(self):
"""Context manager entry."""
self.start()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit."""
self.stop()

View file

@ -0,0 +1,302 @@
"""
FFmpeg RTSP stream reader using subprocess piping frames directly to buffer.
"""
import cv2
import time
import threading
import numpy as np
import subprocess
import struct
from typing import Optional, Callable
from .base import VideoReader
from .utils import log_success, log_warning, log_error, log_info
class FFmpegRTSPReader(VideoReader):
"""RTSP stream reader using subprocess FFmpeg piping frames directly to buffer."""
def __init__(self, camera_id: str, rtsp_url: str, max_retries: int = 3):
super().__init__(camera_id, rtsp_url, max_retries)
self.rtsp_url = rtsp_url
self.process = None
self.stop_event = threading.Event()
self.thread = None
self.stderr_thread = None
# Expected stream specs (for reference, actual dimensions read from PPM header)
self.width = 1280
self.height = 720
# Watchdog timers for stream reliability
self.process_start_time = None
self.last_frame_time = None
self.is_restart = False # Track if this is a restart (shorter timeout)
self.first_start_timeout = 30.0 # 30s timeout on first start
self.restart_timeout = 15.0 # 15s timeout after restart
@property
def is_running(self) -> bool:
"""Check if the reader is currently running."""
return self.thread is not None and self.thread.is_alive()
@property
def reader_type(self) -> str:
"""Get the type of reader."""
return "rtsp_ffmpeg"
def set_frame_callback(self, callback: Callable[[str, np.ndarray], None]):
"""Set callback function to handle captured frames."""
self.frame_callback = callback
def start(self):
"""Start the FFmpeg subprocess reader."""
if self.thread and self.thread.is_alive():
log_warning(self.camera_id, "FFmpeg reader already running")
return
self.stop_event.clear()
self.thread = threading.Thread(target=self._read_frames, daemon=True)
self.thread.start()
log_success(self.camera_id, "Stream started")
def stop(self):
"""Stop the FFmpeg subprocess reader."""
self.stop_event.set()
if self.process:
self.process.terminate()
try:
self.process.wait(timeout=5)
except subprocess.TimeoutExpired:
self.process.kill()
if self.thread:
self.thread.join(timeout=5.0)
if self.stderr_thread:
self.stderr_thread.join(timeout=2.0)
log_info(self.camera_id, "Stream stopped")
def _start_ffmpeg_process(self):
"""Start FFmpeg subprocess outputting BMP frames to stdout pipe."""
cmd = [
'ffmpeg',
# DO NOT REMOVE
'-hwaccel', 'cuda',
'-hwaccel_device', '0',
'-rtsp_transport', 'tcp',
'-i', self.rtsp_url,
'-f', 'image2pipe', # Output images to pipe
'-vcodec', 'bmp', # BMP format with header containing dimensions
# Use native stream resolution and framerate
'-an', # No audio
'-' # Output to stdout
]
try:
# Start FFmpeg with stdout pipe to read frames directly
self.process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE, # Capture stdout for frame data
stderr=subprocess.PIPE, # Capture stderr for error logging
bufsize=0 # Unbuffered for real-time processing
)
# Start stderr reading thread
if self.stderr_thread and self.stderr_thread.is_alive():
# Stop previous stderr thread
try:
self.stderr_thread.join(timeout=1.0)
except:
pass
self.stderr_thread = threading.Thread(target=self._read_stderr, daemon=True)
self.stderr_thread.start()
# Set process start time for watchdog
self.process_start_time = time.time()
self.last_frame_time = None # Reset frame time
# After successful restart, next timeout will be back to 30s
if self.is_restart:
log_info(self.camera_id, f"FFmpeg restarted successfully, next timeout: {self.first_start_timeout}s")
self.is_restart = False
return True
except Exception as e:
log_error(self.camera_id, f"FFmpeg startup failed: {e}")
return False
def _read_bmp_frame(self, pipe):
"""Read BMP frame from pipe - BMP header contains dimensions."""
try:
# Read BMP header (14 bytes file header + 40 bytes info header = 54 bytes minimum)
header_data = b''
bytes_to_read = 54
while len(header_data) < bytes_to_read:
chunk = pipe.read(bytes_to_read - len(header_data))
if not chunk:
return None # Silent end of stream
header_data += chunk
# Parse BMP header
if header_data[:2] != b'BM':
return None # Invalid format, skip frame silently
# Extract file size from header (bytes 2-5)
file_size = struct.unpack('<L', header_data[2:6])[0]
# Extract width and height from info header (bytes 18-21 and 22-25)
width = struct.unpack('<L', header_data[18:22])[0]
height = struct.unpack('<L', header_data[22:26])[0]
# Read remaining file data
remaining_size = file_size - 54
remaining_data = b''
while len(remaining_data) < remaining_size:
chunk = pipe.read(remaining_size - len(remaining_data))
if not chunk:
return None # Stream ended silently
remaining_data += chunk
# Complete BMP data
bmp_data = header_data + remaining_data
# Use OpenCV to decode BMP directly from memory
frame_array = np.frombuffer(bmp_data, dtype=np.uint8)
frame = cv2.imdecode(frame_array, cv2.IMREAD_COLOR)
if frame is None:
return None # Decode failed silently
return frame
except Exception:
return None # Error reading frame silently
def _read_stderr(self):
"""Read and log FFmpeg stderr output in background thread."""
if not self.process or not self.process.stderr:
return
try:
while self.process and self.process.poll() is None:
try:
line = self.process.stderr.readline()
if line:
error_msg = line.decode('utf-8', errors='ignore').strip()
if error_msg and not self.stop_event.is_set():
# Filter out common noise but log actual errors
if any(keyword in error_msg.lower() for keyword in ['error', 'failed', 'cannot', 'invalid']):
log_error(self.camera_id, f"FFmpeg: {error_msg}")
elif 'warning' in error_msg.lower():
log_warning(self.camera_id, f"FFmpeg: {error_msg}")
except Exception:
break
except Exception:
pass
def _check_watchdog_timeout(self) -> bool:
"""Check if watchdog timeout has been exceeded."""
if not self.process_start_time:
return False
current_time = time.time()
time_since_start = current_time - self.process_start_time
# Determine timeout based on whether this is a restart
timeout = self.restart_timeout if self.is_restart else self.first_start_timeout
# If no frames received yet, check against process start time
if not self.last_frame_time:
if time_since_start > timeout:
log_warning(self.camera_id, f"Watchdog timeout: No frames for {time_since_start:.1f}s (limit: {timeout}s)")
return True
else:
# Check time since last frame
time_since_frame = current_time - self.last_frame_time
if time_since_frame > timeout:
log_warning(self.camera_id, f"Watchdog timeout: No frames for {time_since_frame:.1f}s (limit: {timeout}s)")
return True
return False
def _restart_ffmpeg_process(self):
"""Restart FFmpeg process due to watchdog timeout."""
log_warning(self.camera_id, "Watchdog triggered FFmpeg restart")
# Terminate current process
if self.process:
try:
self.process.terminate()
self.process.wait(timeout=3)
except subprocess.TimeoutExpired:
self.process.kill()
except Exception:
pass
self.process = None
# Mark as restart for shorter timeout
self.is_restart = True
# Small delay before restart
time.sleep(1.0)
def _read_frames(self):
"""Read frames directly from FFmpeg stdout pipe."""
frame_count = 0
last_log_time = time.time()
while not self.stop_event.is_set():
try:
# Check watchdog timeout if process is running
if self.process and self.process.poll() is None:
if self._check_watchdog_timeout():
self._restart_ffmpeg_process()
continue
# Start FFmpeg if not running
if not self.process or self.process.poll() is not None:
if self.process and self.process.poll() is not None:
log_warning(self.camera_id, "Stream disconnected, reconnecting...")
if not self._start_ffmpeg_process():
time.sleep(5.0)
continue
# Read frames directly from FFmpeg stdout
try:
if self.process and self.process.stdout:
# Read BMP frame data
frame = self._read_bmp_frame(self.process.stdout)
if frame is None:
continue
# Update watchdog - we got a frame
self.last_frame_time = time.time()
# Call frame callback
if self.frame_callback:
self.frame_callback(self.camera_id, frame)
frame_count += 1
# Log progress every 60 seconds (quieter)
current_time = time.time()
if current_time - last_log_time >= 60:
log_success(self.camera_id, f"{frame_count} frames captured ({frame.shape[1]}x{frame.shape[0]})")
last_log_time = current_time
except Exception:
# Process might have died, let it restart on next iteration
if self.process:
self.process.terminate()
self.process = None
time.sleep(1.0)
except Exception:
time.sleep(1.0)
# Cleanup
if self.process:
self.process.terminate()

View file

@ -0,0 +1,249 @@
"""
HTTP snapshot reader optimized for 2560x1440 (2K) high quality images.
"""
import cv2
import logging
import time
import threading
import requests
import numpy as np
from typing import Optional, Callable
from .base import VideoReader
from .utils import log_success, log_warning, log_error, log_info
logger = logging.getLogger(__name__)
class HTTPSnapshotReader(VideoReader):
"""HTTP snapshot reader optimized for 2560x1440 (2K) high quality images."""
def __init__(self, camera_id: str, snapshot_url: str, interval_ms: int = 5000, max_retries: int = 3):
super().__init__(camera_id, snapshot_url, max_retries)
self.snapshot_url = snapshot_url
self.interval_ms = interval_ms
self.stop_event = threading.Event()
self.thread = None
# Expected snapshot specifications
self.expected_width = 2560
self.expected_height = 1440
self.max_file_size = 10 * 1024 * 1024 # 10MB max for 2K image
@property
def is_running(self) -> bool:
"""Check if the reader is currently running."""
return self.thread is not None and self.thread.is_alive()
@property
def reader_type(self) -> str:
"""Get the type of reader."""
return "http_snapshot"
def set_frame_callback(self, callback: Callable[[str, np.ndarray], None]):
"""Set callback function to handle captured frames."""
self.frame_callback = callback
def start(self):
"""Start the snapshot reader thread."""
if self.thread and self.thread.is_alive():
logger.warning(f"Snapshot reader for {self.camera_id} already running")
return
self.stop_event.clear()
self.thread = threading.Thread(target=self._read_snapshots, daemon=True)
self.thread.start()
logger.info(f"Started snapshot reader for camera {self.camera_id}")
def stop(self):
"""Stop the snapshot reader thread."""
self.stop_event.set()
if self.thread:
self.thread.join(timeout=5.0)
logger.info(f"Stopped snapshot reader for camera {self.camera_id}")
def _read_snapshots(self):
"""Main snapshot reading loop for high quality 2K images."""
retries = 0
frame_count = 0
last_log_time = time.time()
interval_seconds = self.interval_ms / 1000.0
logger.info(f"Snapshot interval for camera {self.camera_id}: {interval_seconds}s")
while not self.stop_event.is_set():
try:
start_time = time.time()
frame = self._fetch_snapshot()
if frame is None:
retries += 1
logger.warning(f"Failed to fetch snapshot for camera {self.camera_id}, retry {retries}/{self.max_retries}")
if self.max_retries != -1 and retries > self.max_retries:
logger.error(f"Max retries reached for snapshot camera {self.camera_id}")
break
time.sleep(min(2.0, interval_seconds))
continue
# Accept any valid image dimensions - don't force specific resolution
if frame.shape[1] <= 0 or frame.shape[0] <= 0:
logger.warning(f"Camera {self.camera_id}: Invalid frame dimensions {frame.shape[1]}x{frame.shape[0]}")
continue
# Reset retry counter on successful fetch
retries = 0
frame_count += 1
# Call frame callback
if self.frame_callback:
try:
self.frame_callback(self.camera_id, frame)
except Exception as e:
logger.error(f"Camera {self.camera_id}: Frame callback error: {e}")
# Log progress every 30 seconds
current_time = time.time()
if current_time - last_log_time >= 30:
logger.info(f"Camera {self.camera_id}: {frame_count} snapshots processed")
last_log_time = current_time
# Wait for next interval
elapsed = time.time() - start_time
sleep_time = max(0, interval_seconds - elapsed)
if sleep_time > 0:
self.stop_event.wait(sleep_time)
except Exception as e:
logger.error(f"Error in snapshot loop for camera {self.camera_id}: {e}")
retries += 1
if self.max_retries != -1 and retries > self.max_retries:
break
time.sleep(min(2.0, interval_seconds))
logger.info(f"Snapshot reader thread ended for camera {self.camera_id}")
def _fetch_snapshot(self) -> Optional[np.ndarray]:
"""Fetch a single high quality snapshot from HTTP URL."""
try:
# Parse URL for authentication
from urllib.parse import urlparse
parsed_url = urlparse(self.snapshot_url)
headers = {
'User-Agent': 'Python-Detector-Worker/1.0',
'Accept': 'image/jpeg, image/png, image/*'
}
auth = None
if parsed_url.username and parsed_url.password:
from requests.auth import HTTPBasicAuth, HTTPDigestAuth
auth = HTTPBasicAuth(parsed_url.username, parsed_url.password)
# Reconstruct URL without credentials
clean_url = f"{parsed_url.scheme}://{parsed_url.hostname}"
if parsed_url.port:
clean_url += f":{parsed_url.port}"
clean_url += parsed_url.path
if parsed_url.query:
clean_url += f"?{parsed_url.query}"
# Try Basic Auth first
response = requests.get(clean_url, auth=auth, timeout=15, headers=headers,
stream=True, verify=False)
# If Basic Auth fails, try Digest Auth
if response.status_code == 401:
auth = HTTPDigestAuth(parsed_url.username, parsed_url.password)
response = requests.get(clean_url, auth=auth, timeout=15, headers=headers,
stream=True, verify=False)
else:
response = requests.get(self.snapshot_url, timeout=15, headers=headers,
stream=True, verify=False)
if response.status_code == 200:
# Check content size
content_length = int(response.headers.get('content-length', 0))
if content_length > self.max_file_size:
logger.warning(f"Snapshot too large for camera {self.camera_id}: {content_length} bytes")
return None
# Read content
content = response.content
# Convert to numpy array
image_array = np.frombuffer(content, np.uint8)
# Decode as high quality image
frame = cv2.imdecode(image_array, cv2.IMREAD_COLOR)
if frame is None:
logger.error(f"Failed to decode snapshot for camera {self.camera_id}")
return None
logger.debug(f"Fetched snapshot for camera {self.camera_id}: {frame.shape[1]}x{frame.shape[0]}")
return frame
else:
logger.warning(f"HTTP {response.status_code} from {self.camera_id}")
return None
except requests.RequestException as e:
logger.error(f"Request error fetching snapshot for {self.camera_id}: {e}")
return None
except Exception as e:
logger.error(f"Error decoding snapshot for {self.camera_id}: {e}")
return None
def fetch_single_snapshot(self) -> Optional[np.ndarray]:
"""
Fetch a single high-quality snapshot on demand for pipeline processing.
This method is for one-time fetch from HTTP URL, not continuous streaming.
Returns:
High quality 2K snapshot frame or None if failed
"""
logger.info(f"[SNAPSHOT] Fetching snapshot for {self.camera_id} from {self.snapshot_url}")
# Try to fetch snapshot with retries
for attempt in range(self.max_retries):
frame = self._fetch_snapshot()
if frame is not None:
logger.info(f"[SNAPSHOT] Successfully fetched {frame.shape[1]}x{frame.shape[0]} snapshot for {self.camera_id}")
return frame
if attempt < self.max_retries - 1:
logger.warning(f"[SNAPSHOT] Attempt {attempt + 1}/{self.max_retries} failed for {self.camera_id}, retrying...")
time.sleep(0.5)
logger.error(f"[SNAPSHOT] Failed to fetch snapshot for {self.camera_id} after {self.max_retries} attempts")
return None
def _resize_maintain_aspect(self, frame: np.ndarray, target_width: int, target_height: int) -> np.ndarray:
"""Resize image while maintaining aspect ratio for high quality."""
h, w = frame.shape[:2]
aspect = w / h
target_aspect = target_width / target_height
if aspect > target_aspect:
# Image is wider
new_width = target_width
new_height = int(target_width / aspect)
else:
# Image is taller
new_height = target_height
new_width = int(target_height * aspect)
# Use INTER_LANCZOS4 for high quality downsampling
resized = cv2.resize(frame, (new_width, new_height), interpolation=cv2.INTER_LANCZOS4)
# Pad to target size if needed
if new_width < target_width or new_height < target_height:
top = (target_height - new_height) // 2
bottom = target_height - new_height - top
left = (target_width - new_width) // 2
right = target_width - new_width - left
resized = cv2.copyMakeBorder(resized, top, bottom, left, right, cv2.BORDER_CONSTANT, value=[0, 0, 0])
return resized

View file

@ -0,0 +1,38 @@
"""
Utility functions for stream readers.
"""
import logging
import os
# Keep OpenCV errors visible but allow FFmpeg stderr logging
os.environ["OPENCV_LOG_LEVEL"] = "ERROR"
logger = logging.getLogger(__name__)
# Color codes for pretty logging
class Colors:
GREEN = '\033[92m'
YELLOW = '\033[93m'
RED = '\033[91m'
BLUE = '\033[94m'
PURPLE = '\033[95m'
CYAN = '\033[96m'
WHITE = '\033[97m'
BOLD = '\033[1m'
END = '\033[0m'
def log_success(camera_id: str, message: str):
"""Log success messages in green"""
logger.info(f"{Colors.GREEN}[{camera_id}] {message}{Colors.END}")
def log_warning(camera_id: str, message: str):
"""Log warnings in yellow"""
logger.warning(f"{Colors.YELLOW}[{camera_id}] {message}{Colors.END}")
def log_error(camera_id: str, message: str):
"""Log errors in red"""
logger.error(f"{Colors.RED}[{camera_id}] {message}{Colors.END}")
def log_info(camera_id: str, message: str):
"""Log info in cyan"""
logger.info(f"{Colors.CYAN}[{camera_id}] {message}{Colors.END}")

View file

@ -7,4 +7,6 @@ filterpy
psycopg2-binary psycopg2-binary
lap>=0.5.12 lap>=0.5.12
pynvml pynvml
PyTurboJPEG PyTurboJPEG
PyNvVideoCodec
cupy-cuda12x