fix: remove unused RTSPReader import and related code
Some checks failed
Build Worker Base and Application Images / deploy-stack (push) Blocked by required conditions
Build Worker Base and Application Images / check-base-changes (push) Successful in 7s
Build Worker Base and Application Images / build-base (push) Has been skipped
Build Worker Base and Application Images / build-docker (push) Has been cancelled

This commit is contained in:
Siwat Sirichai 2025-09-26 19:42:41 +07:00
parent cd1359f5d2
commit 2808316e94
3 changed files with 112 additions and 337 deletions

View file

@ -8,16 +8,10 @@ import time
import threading
import requests
import numpy as np
import os
import subprocess
# import fcntl # No longer needed with atomic file operations
from typing import Optional, Callable
# Removed watchdog imports - no longer using file watching
# Suppress FFMPEG/H.264 error messages if needed
# Set this environment variable to reduce noise from decoder errors
os.environ["OPENCV_LOG_LEVEL"] = "ERROR"
os.environ["OPENCV_FFMPEG_LOGLEVEL"] = "-8" # Suppress FFMPEG warnings
logger = logging.getLogger(__name__)
@ -65,12 +59,20 @@ class FFmpegRTSPReader:
self.process = None
self.stop_event = threading.Event()
self.thread = None
self.stderr_thread = None
self.frame_callback: Optional[Callable] = None
# Expected stream specs (for reference, actual dimensions read from PPM header)
self.width = 1280
self.height = 720
# Watchdog timers for stream reliability
self.process_start_time = None
self.last_frame_time = None
self.is_restart = False # Track if this is a restart (shorter timeout)
self.first_start_timeout = 30.0 # 30s timeout on first start
self.restart_timeout = 15.0 # 15s timeout after restart
def set_frame_callback(self, callback: Callable[[str, np.ndarray], None]):
"""Set callback function to handle captured frames."""
self.frame_callback = callback
@ -97,6 +99,8 @@ class FFmpegRTSPReader:
self.process.kill()
if self.thread:
self.thread.join(timeout=5.0)
if self.stderr_thread:
self.stderr_thread.join(timeout=2.0)
log_info(self.camera_id, "Stream stopped")
# Removed _probe_stream_info - BMP headers contain dimensions
@ -122,9 +126,30 @@ class FFmpegRTSPReader:
self.process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE, # Capture stdout for frame data
stderr=subprocess.DEVNULL,
stderr=subprocess.PIPE, # Capture stderr for error logging
bufsize=0 # Unbuffered for real-time processing
)
# Start stderr reading thread
if self.stderr_thread and self.stderr_thread.is_alive():
# Stop previous stderr thread
try:
self.stderr_thread.join(timeout=1.0)
except:
pass
self.stderr_thread = threading.Thread(target=self._read_stderr, daemon=True)
self.stderr_thread.start()
# Set process start time for watchdog
self.process_start_time = time.time()
self.last_frame_time = None # Reset frame time
# After successful restart, next timeout will be back to 30s
if self.is_restart:
log_info(self.camera_id, f"FFmpeg restarted successfully, next timeout: {self.first_start_timeout}s")
self.is_restart = False
return True
except Exception as e:
log_error(self.camera_id, f"FFmpeg startup failed: {e}")
@ -180,6 +205,74 @@ class FFmpegRTSPReader:
except Exception:
return None # Error reading frame silently
def _read_stderr(self):
"""Read and log FFmpeg stderr output in background thread."""
if not self.process or not self.process.stderr:
return
try:
while self.process and self.process.poll() is None:
try:
line = self.process.stderr.readline()
if line:
error_msg = line.decode('utf-8', errors='ignore').strip()
if error_msg and not self.stop_event.is_set():
# Filter out common noise but log actual errors
if any(keyword in error_msg.lower() for keyword in ['error', 'failed', 'cannot', 'invalid']):
log_error(self.camera_id, f"FFmpeg: {error_msg}")
elif 'warning' in error_msg.lower():
log_warning(self.camera_id, f"FFmpeg: {error_msg}")
except Exception:
break
except Exception:
pass
def _check_watchdog_timeout(self) -> bool:
"""Check if watchdog timeout has been exceeded."""
if not self.process_start_time:
return False
current_time = time.time()
time_since_start = current_time - self.process_start_time
# Determine timeout based on whether this is a restart
timeout = self.restart_timeout if self.is_restart else self.first_start_timeout
# If no frames received yet, check against process start time
if not self.last_frame_time:
if time_since_start > timeout:
log_warning(self.camera_id, f"Watchdog timeout: No frames for {time_since_start:.1f}s (limit: {timeout}s)")
return True
else:
# Check time since last frame
time_since_frame = current_time - self.last_frame_time
if time_since_frame > timeout:
log_warning(self.camera_id, f"Watchdog timeout: No frames for {time_since_frame:.1f}s (limit: {timeout}s)")
return True
return False
def _restart_ffmpeg_process(self):
"""Restart FFmpeg process due to watchdog timeout."""
log_warning(self.camera_id, "Watchdog triggered FFmpeg restart")
# Terminate current process
if self.process:
try:
self.process.terminate()
self.process.wait(timeout=3)
except subprocess.TimeoutExpired:
self.process.kill()
except Exception:
pass
self.process = None
# Mark as restart for shorter timeout
self.is_restart = True
# Small delay before restart
time.sleep(1.0)
def _read_frames(self):
"""Read frames directly from FFmpeg stdout pipe."""
frame_count = 0
@ -187,6 +280,12 @@ class FFmpegRTSPReader:
while not self.stop_event.is_set():
try:
# Check watchdog timeout if process is running
if self.process and self.process.poll() is None:
if self._check_watchdog_timeout():
self._restart_ffmpeg_process()
continue
# Start FFmpeg if not running
if not self.process or self.process.poll() is not None:
if self.process and self.process.poll() is not None:
@ -204,6 +303,9 @@ class FFmpegRTSPReader:
if frame is None:
continue
# Update watchdog - we got a frame
self.last_frame_time = time.time()
# Call frame callback
if self.frame_callback:
self.frame_callback(self.camera_id, frame)
@ -234,332 +336,6 @@ class FFmpegRTSPReader:
logger = logging.getLogger(__name__)
class RTSPReader:
"""RTSP stream frame reader optimized for 1280x720 @ 6fps streams."""
def __init__(self, camera_id: str, rtsp_url: str, max_retries: int = 3):
self.camera_id = camera_id
self.rtsp_url = rtsp_url
self.max_retries = max_retries
self.cap = None
self.stop_event = threading.Event()
self.thread = None
self.frame_callback: Optional[Callable] = None
# Expected stream specifications
self.expected_width = 1280
self.expected_height = 720
self.expected_fps = 6
# Frame processing parameters
self.error_recovery_delay = 5.0 # Increased from 2.0 for stability
self.max_consecutive_errors = 30 # Increased from 10 to handle network jitter
self.stream_timeout = 30.0
def set_frame_callback(self, callback: Callable[[str, np.ndarray], None]):
"""Set callback function to handle captured frames."""
self.frame_callback = callback
def start(self):
"""Start the RTSP reader thread."""
if self.thread and self.thread.is_alive():
logger.warning(f"RTSP reader for {self.camera_id} already running")
return
self.stop_event.clear()
self.thread = threading.Thread(target=self._read_frames, daemon=True)
self.thread.start()
logger.info(f"Started RTSP reader for camera {self.camera_id}")
def stop(self):
"""Stop the RTSP reader thread."""
self.stop_event.set()
if self.thread:
self.thread.join(timeout=5.0)
if self.cap:
self.cap.release()
logger.info(f"Stopped RTSP reader for camera {self.camera_id}")
def _read_frames(self):
"""Main frame reading loop with H.264 error recovery."""
consecutive_errors = 0
frame_count = 0
last_log_time = time.time()
last_successful_frame_time = time.time()
while not self.stop_event.is_set():
try:
# Initialize/reinitialize capture if needed
if not self.cap or not self.cap.isOpened():
if not self._initialize_capture():
time.sleep(self.error_recovery_delay)
continue
last_successful_frame_time = time.time()
# Check for stream timeout
if time.time() - last_successful_frame_time > self.stream_timeout:
logger.warning(f"Camera {self.camera_id}: Stream timeout, reinitializing")
self._reinitialize_capture()
last_successful_frame_time = time.time()
continue
# Read frame immediately without rate limiting for minimum latency
try:
ret, frame = self.cap.read()
if ret and frame is None:
# Grab succeeded but retrieve failed - decoder issue
logger.error(f"Camera {self.camera_id}: Frame grab OK but decode failed")
except Exception as read_error:
logger.error(f"Camera {self.camera_id}: cap.read() threw exception: {type(read_error).__name__}: {read_error}")
ret, frame = False, None
if not ret or frame is None:
consecutive_errors += 1
# Enhanced logging to diagnose the issue
logger.error(f"Camera {self.camera_id}: cap.read() failed - ret={ret}, frame={frame is not None}")
# Try to get more info from the capture
try:
if self.cap and self.cap.isOpened():
backend = self.cap.getBackendName()
pos_frames = self.cap.get(cv2.CAP_PROP_POS_FRAMES)
logger.error(f"Camera {self.camera_id}: Capture open, backend: {backend}, pos_frames: {pos_frames}")
else:
logger.error(f"Camera {self.camera_id}: Capture is closed or None!")
except Exception as info_error:
logger.error(f"Camera {self.camera_id}: Error getting capture info: {type(info_error).__name__}: {info_error}")
if consecutive_errors >= self.max_consecutive_errors:
logger.error(f"Camera {self.camera_id}: Too many consecutive errors ({consecutive_errors}), reinitializing")
self._reinitialize_capture()
consecutive_errors = 0
time.sleep(self.error_recovery_delay)
else:
# Skip corrupted frame and continue with exponential backoff
if consecutive_errors <= 5:
logger.debug(f"Camera {self.camera_id}: Frame read failed (error {consecutive_errors})")
elif consecutive_errors % 10 == 0: # Log every 10th error after 5
logger.warning(f"Camera {self.camera_id}: Continuing frame read failures (error {consecutive_errors})")
# Exponential backoff with cap at 1 second
sleep_time = min(0.1 * (1.5 ** min(consecutive_errors, 10)), 1.0)
time.sleep(sleep_time)
continue
# Accept any valid frame dimensions - don't force specific resolution
if frame.shape[1] <= 0 or frame.shape[0] <= 0:
consecutive_errors += 1
continue
# Check for corrupted frames (all black, all white, excessive noise)
if self._is_frame_corrupted(frame):
logger.debug(f"Camera {self.camera_id}: Corrupted frame detected, skipping")
consecutive_errors += 1
continue
# Frame is valid
consecutive_errors = 0
frame_count += 1
last_successful_frame_time = time.time()
# Call frame callback
if self.frame_callback:
try:
self.frame_callback(self.camera_id, frame)
except Exception as e:
logger.error(f"Camera {self.camera_id}: Frame callback error: {e}")
# Log progress every 30 seconds
current_time = time.time()
if current_time - last_log_time >= 30:
logger.info(f"Camera {self.camera_id}: {frame_count} frames processed")
last_log_time = current_time
except Exception as e:
logger.error(f"Camera {self.camera_id}: Error in frame reading loop: {e}")
consecutive_errors += 1
if consecutive_errors >= self.max_consecutive_errors:
self._reinitialize_capture()
consecutive_errors = 0
time.sleep(self.error_recovery_delay)
# Cleanup
if self.cap:
self.cap.release()
logger.info(f"RTSP reader thread ended for camera {self.camera_id}")
def _initialize_capture(self) -> bool:
"""Initialize video capture with FFmpeg hardware acceleration (CUVID/NVDEC) for 1280x720@6fps."""
try:
# Release previous capture if exists
if self.cap:
self.cap.release()
time.sleep(0.5)
logger.info(f"Initializing capture for camera {self.camera_id} with FFmpeg hardware acceleration")
hw_accel_success = False
# Method 1: Try OpenCV CUDA VideoReader (if built with CUVID support)
if not hw_accel_success:
try:
# Check if OpenCV was built with CUDA codec support
build_info = cv2.getBuildInformation()
if 'cudacodec' in build_info or 'CUVID' in build_info:
logger.info(f"Attempting OpenCV CUDA VideoReader for camera {self.camera_id}")
# Use OpenCV's CUDA backend
self.cap = cv2.VideoCapture(self.rtsp_url, cv2.CAP_FFMPEG, [
cv2.CAP_PROP_HW_ACCELERATION, cv2.VIDEO_ACCELERATION_ANY
])
if self.cap.isOpened():
hw_accel_success = True
logger.info(f"Camera {self.camera_id}: Using OpenCV CUDA hardware acceleration")
else:
logger.debug(f"Camera {self.camera_id}: OpenCV not built with CUDA codec support")
except Exception as e:
logger.debug(f"Camera {self.camera_id}: OpenCV CUDA not available: {e}")
# Method 2: Try FFmpeg with optimal hardware acceleration (CUVID/NVDEC)
if not hw_accel_success:
try:
from core.utils.ffmpeg_detector import get_optimal_rtsp_options
import os
# Get optimal FFmpeg options based on detected capabilities
optimal_options = get_optimal_rtsp_options(self.rtsp_url)
os.environ['OPENCV_FFMPEG_CAPTURE_OPTIONS'] = optimal_options
logger.info(f"Attempting FFmpeg with detected hardware acceleration for camera {self.camera_id}")
logger.debug(f"Camera {self.camera_id}: Using FFmpeg options: {optimal_options}")
self.cap = cv2.VideoCapture(self.rtsp_url, cv2.CAP_FFMPEG)
if self.cap.isOpened():
hw_accel_success = True
# Try to get backend info to confirm hardware acceleration
backend = self.cap.getBackendName()
logger.info(f"Camera {self.camera_id}: Using FFmpeg hardware acceleration (backend: {backend})")
except Exception as e:
logger.debug(f"Camera {self.camera_id}: FFmpeg optimal hardware acceleration not available: {e}")
# Method 3: Try FFmpeg with NVIDIA NVDEC (better for RTX 3060)
if not hw_accel_success:
try:
import os
os.environ['OPENCV_FFMPEG_CAPTURE_OPTIONS'] = 'hwaccel;cuda|hwaccel_device;0|rtsp_transport;tcp'
logger.info(f"Attempting FFmpeg with NVDEC hardware acceleration for camera {self.camera_id}")
self.cap = cv2.VideoCapture(self.rtsp_url, cv2.CAP_FFMPEG)
if self.cap.isOpened():
hw_accel_success = True
logger.info(f"Camera {self.camera_id}: Using FFmpeg NVDEC hardware acceleration")
except Exception as e:
logger.debug(f"Camera {self.camera_id}: FFmpeg NVDEC not available: {e}")
# Method 4: Try FFmpeg with VAAPI (Intel/AMD GPUs)
if not hw_accel_success:
try:
import os
os.environ['OPENCV_FFMPEG_CAPTURE_OPTIONS'] = 'hwaccel;vaapi|hwaccel_device;/dev/dri/renderD128|video_codec;h264|rtsp_transport;tcp'
logger.info(f"Attempting FFmpeg with VAAPI for camera {self.camera_id}")
self.cap = cv2.VideoCapture(self.rtsp_url, cv2.CAP_FFMPEG)
if self.cap.isOpened():
hw_accel_success = True
logger.info(f"Camera {self.camera_id}: Using FFmpeg VAAPI hardware acceleration")
except Exception as e:
logger.debug(f"Camera {self.camera_id}: FFmpeg VAAPI not available: {e}")
# Fallback: Standard FFmpeg with software decoding
if not hw_accel_success:
logger.warning(f"Camera {self.camera_id}: Hardware acceleration not available, falling back to software decoding")
import os
os.environ['OPENCV_FFMPEG_CAPTURE_OPTIONS'] = 'rtsp_transport;tcp'
self.cap = cv2.VideoCapture(self.rtsp_url, cv2.CAP_FFMPEG)
if not self.cap.isOpened():
logger.error(f"Failed to open stream for camera {self.camera_id}")
return False
# Don't force resolution/fps - let the stream determine its natural specs
# The camera will provide whatever resolution/fps it supports
# Set FFMPEG options for better H.264 handling
self.cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*'H264'))
# Verify stream properties
actual_width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH))
actual_height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
actual_fps = self.cap.get(cv2.CAP_PROP_FPS)
logger.info(f"Camera {self.camera_id} initialized: {actual_width}x{actual_height} @ {actual_fps}fps")
# Read and discard first few frames to stabilize stream
for _ in range(5):
ret, _ = self.cap.read()
if not ret:
logger.warning(f"Camera {self.camera_id}: Failed to read initial frames")
time.sleep(0.1)
return True
except Exception as e:
logger.error(f"Error initializing capture for camera {self.camera_id}: {e}")
return False
def _reinitialize_capture(self):
"""Reinitialize capture after errors with retry logic."""
logger.info(f"Reinitializing capture for camera {self.camera_id}")
if self.cap:
self.cap.release()
self.cap = None
# Longer delay before reconnection to avoid rapid reconnect loops
time.sleep(3.0)
# Retry initialization up to 3 times
for attempt in range(3):
if self._initialize_capture():
logger.info(f"Successfully reinitialized camera {self.camera_id} on attempt {attempt + 1}")
break
else:
logger.warning(f"Failed to reinitialize camera {self.camera_id} on attempt {attempt + 1}")
time.sleep(2.0)
def _is_frame_corrupted(self, frame: np.ndarray) -> bool:
"""Check if frame is corrupted (all black, all white, or excessive noise)."""
if frame is None or frame.size == 0:
return True
# Check mean and standard deviation
mean = np.mean(frame)
std = np.std(frame)
# All black or all white
if mean < 5 or mean > 250:
return True
# No variation (stuck frame)
if std < 1:
return True
# Excessive noise (corrupted H.264 decode)
# Calculate edge density as corruption indicator
edges = cv2.Canny(frame, 50, 150)
edge_density = np.sum(edges > 0) / edges.size
# Too many edges indicate corruption
if edge_density > 0.5:
return True
return False
class HTTPSnapshotReader:
"""HTTP snapshot reader optimized for 2560x1440 (2K) high quality images."""