Implement comprehensive health monitoring for streams and threads
Some checks failed
Build Worker Base and Application Images / check-base-changes (push) Successful in 8s
Build Worker Base and Application Images / build-base (push) Has been skipped
Build Worker Base and Application Images / build-docker (push) Successful in 3m7s
Build Worker Base and Application Images / deploy-stack (push) Has been cancelled

- Added RecoveryManager for automatic handling of health issues, including circuit breaker patterns, automatic restarts, and graceful degradation.
- Introduced StreamHealthTracker to monitor video stream metrics, including frame production, connection health, and error rates.
- Developed ThreadHealthMonitor for detecting unresponsive and deadlocked threads, providing liveness detection and responsiveness testing.
- Integrated health checks for streams and threads, reporting metrics and recovery actions to the health monitor.
- Enhanced logging for recovery attempts, errors, and health checks to improve observability and debugging.
This commit is contained in:
Siwat Sirichai 2025-09-27 12:27:38 +07:00
parent 8c08c815ce
commit b08ce27de2
9 changed files with 2173 additions and 11 deletions

View file

@ -1,5 +1,6 @@
"""
FFmpeg RTSP stream reader using subprocess piping frames directly to buffer.
Enhanced with comprehensive health monitoring and automatic recovery.
"""
import cv2
import time
@ -7,10 +8,13 @@ import threading
import numpy as np
import subprocess
import struct
from typing import Optional, Callable
from typing import Optional, Callable, Dict, Any
from .base import VideoReader
from .utils import log_success, log_warning, log_error, log_info
from ..monitoring.stream_health import stream_health_tracker
from ..monitoring.thread_health import thread_health_monitor
from ..monitoring.recovery import recovery_manager, RecoveryAction
class FFmpegRTSPReader(VideoReader):
@ -35,6 +39,21 @@ class FFmpegRTSPReader(VideoReader):
self.first_start_timeout = 30.0 # 30s timeout on first start
self.restart_timeout = 15.0 # 15s timeout after restart
# Health monitoring setup
self.last_heartbeat = time.time()
self.consecutive_errors = 0
self.ffmpeg_restart_count = 0
# Register recovery handlers
recovery_manager.register_recovery_handler(
RecoveryAction.RESTART_STREAM,
self._handle_restart_recovery
)
recovery_manager.register_recovery_handler(
RecoveryAction.RECONNECT,
self._handle_reconnect_recovery
)
@property
def is_running(self) -> bool:
"""Check if the reader is currently running."""
@ -58,21 +77,35 @@ class FFmpegRTSPReader(VideoReader):
self.stop_event.clear()
self.thread = threading.Thread(target=self._read_frames, daemon=True)
self.thread.start()
log_success(self.camera_id, "Stream started")
# Register with health monitoring
stream_health_tracker.register_stream(self.camera_id, "rtsp_ffmpeg", self.rtsp_url)
thread_health_monitor.register_thread(self.thread, self._heartbeat_callback)
log_success(self.camera_id, "Stream started with health monitoring")
def stop(self):
"""Stop the FFmpeg subprocess reader."""
self.stop_event.set()
# Unregister from health monitoring
if self.thread:
thread_health_monitor.unregister_thread(self.thread.ident)
if self.process:
self.process.terminate()
try:
self.process.wait(timeout=5)
except subprocess.TimeoutExpired:
self.process.kill()
if self.thread:
self.thread.join(timeout=5.0)
if self.stderr_thread:
self.stderr_thread.join(timeout=2.0)
stream_health_tracker.unregister_stream(self.camera_id)
log_info(self.camera_id, "Stream stopped")
def _start_ffmpeg_process(self):
@ -249,6 +282,9 @@ class FFmpegRTSPReader(VideoReader):
while not self.stop_event.is_set():
try:
# Send heartbeat for thread health monitoring
self._send_heartbeat("reading_frames")
# Check watchdog timeout if process is running
if self.process and self.process.poll() is None:
if self._check_watchdog_timeout():
@ -259,8 +295,17 @@ class FFmpegRTSPReader(VideoReader):
if not self.process or self.process.poll() is not None:
if self.process and self.process.poll() is not None:
log_warning(self.camera_id, "Stream disconnected, reconnecting...")
stream_health_tracker.report_error(
self.camera_id,
"FFmpeg process disconnected"
)
if not self._start_ffmpeg_process():
self.consecutive_errors += 1
stream_health_tracker.report_error(
self.camera_id,
"Failed to start FFmpeg process"
)
time.sleep(5.0)
continue
@ -275,9 +320,22 @@ class FFmpegRTSPReader(VideoReader):
# Update watchdog - we got a frame
self.last_frame_time = time.time()
# Reset error counter on successful frame
self.consecutive_errors = 0
# Report successful frame to health monitoring
frame_size = frame.nbytes
stream_health_tracker.report_frame_received(self.camera_id, frame_size)
# Call frame callback
if self.frame_callback:
self.frame_callback(self.camera_id, frame)
try:
self.frame_callback(self.camera_id, frame)
except Exception as e:
stream_health_tracker.report_error(
self.camera_id,
f"Frame callback error: {e}"
)
frame_count += 1
@ -287,16 +345,85 @@ class FFmpegRTSPReader(VideoReader):
log_success(self.camera_id, f"{frame_count} frames captured ({frame.shape[1]}x{frame.shape[0]})")
last_log_time = current_time
except Exception:
except Exception as e:
# Process might have died, let it restart on next iteration
stream_health_tracker.report_error(
self.camera_id,
f"Frame reading error: {e}"
)
if self.process:
self.process.terminate()
self.process = None
time.sleep(1.0)
except Exception:
except Exception as e:
stream_health_tracker.report_error(
self.camera_id,
f"Main loop error: {e}"
)
time.sleep(1.0)
# Cleanup
if self.process:
self.process.terminate()
self.process.terminate()
# Health monitoring methods
def _send_heartbeat(self, activity: str = "running"):
"""Send heartbeat to thread health monitor."""
self.last_heartbeat = time.time()
thread_health_monitor.heartbeat(activity=activity)
def _heartbeat_callback(self) -> bool:
"""Heartbeat callback for thread responsiveness testing."""
try:
# Check if thread is responsive by checking recent heartbeat
current_time = time.time()
age = current_time - self.last_heartbeat
# Thread is responsive if heartbeat is recent
return age < 30.0 # 30 second responsiveness threshold
except Exception:
return False
def _handle_restart_recovery(self, component: str, details: Dict[str, Any]) -> bool:
"""Handle restart recovery action."""
try:
log_info(self.camera_id, "Restarting FFmpeg RTSP reader for health recovery")
# Stop current instance
self.stop()
# Small delay
time.sleep(2.0)
# Restart
self.start()
# Report successful restart
stream_health_tracker.report_reconnect(self.camera_id, "health_recovery_restart")
self.ffmpeg_restart_count += 1
return True
except Exception as e:
log_error(self.camera_id, f"Failed to restart FFmpeg RTSP reader: {e}")
return False
def _handle_reconnect_recovery(self, component: str, details: Dict[str, Any]) -> bool:
"""Handle reconnect recovery action."""
try:
log_info(self.camera_id, "Reconnecting FFmpeg RTSP reader for health recovery")
# Force restart FFmpeg process
self._restart_ffmpeg_process()
# Reset error counters
self.consecutive_errors = 0
stream_health_tracker.report_reconnect(self.camera_id, "health_recovery_reconnect")
return True
except Exception as e:
log_error(self.camera_id, f"Failed to reconnect FFmpeg RTSP reader: {e}")
return False

View file

@ -1,5 +1,6 @@
"""
HTTP snapshot reader optimized for 2560x1440 (2K) high quality images.
Enhanced with comprehensive health monitoring and automatic recovery.
"""
import cv2
import logging
@ -7,10 +8,13 @@ import time
import threading
import requests
import numpy as np
from typing import Optional, Callable
from typing import Optional, Callable, Dict, Any
from .base import VideoReader
from .utils import log_success, log_warning, log_error, log_info
from ..monitoring.stream_health import stream_health_tracker
from ..monitoring.thread_health import thread_health_monitor
from ..monitoring.recovery import recovery_manager, RecoveryAction
logger = logging.getLogger(__name__)
@ -30,6 +34,22 @@ class HTTPSnapshotReader(VideoReader):
self.expected_height = 1440
self.max_file_size = 10 * 1024 * 1024 # 10MB max for 2K image
# Health monitoring setup
self.last_heartbeat = time.time()
self.consecutive_errors = 0
self.connection_test_interval = 300 # Test connection every 5 minutes
self.last_connection_test = None
# Register recovery handlers
recovery_manager.register_recovery_handler(
RecoveryAction.RESTART_STREAM,
self._handle_restart_recovery
)
recovery_manager.register_recovery_handler(
RecoveryAction.RECONNECT,
self._handle_reconnect_recovery
)
@property
def is_running(self) -> bool:
"""Check if the reader is currently running."""
@ -53,13 +73,24 @@ class HTTPSnapshotReader(VideoReader):
self.stop_event.clear()
self.thread = threading.Thread(target=self._read_snapshots, daemon=True)
self.thread.start()
logger.info(f"Started snapshot reader for camera {self.camera_id}")
# Register with health monitoring
stream_health_tracker.register_stream(self.camera_id, "http_snapshot", self.snapshot_url)
thread_health_monitor.register_thread(self.thread, self._heartbeat_callback)
logger.info(f"Started snapshot reader for camera {self.camera_id} with health monitoring")
def stop(self):
"""Stop the snapshot reader thread."""
self.stop_event.set()
# Unregister from health monitoring
if self.thread:
thread_health_monitor.unregister_thread(self.thread.ident)
self.thread.join(timeout=5.0)
stream_health_tracker.unregister_stream(self.camera_id)
logger.info(f"Stopped snapshot reader for camera {self.camera_id}")
def _read_snapshots(self):
@ -67,17 +98,29 @@ class HTTPSnapshotReader(VideoReader):
retries = 0
frame_count = 0
last_log_time = time.time()
last_connection_test = time.time()
interval_seconds = self.interval_ms / 1000.0
logger.info(f"Snapshot interval for camera {self.camera_id}: {interval_seconds}s")
while not self.stop_event.is_set():
try:
# Send heartbeat for thread health monitoring
self._send_heartbeat("fetching_snapshot")
start_time = time.time()
frame = self._fetch_snapshot()
if frame is None:
retries += 1
self.consecutive_errors += 1
# Report error to health monitoring
stream_health_tracker.report_error(
self.camera_id,
f"Failed to fetch snapshot (retry {retries}/{self.max_retries})"
)
logger.warning(f"Failed to fetch snapshot for camera {self.camera_id}, retry {retries}/{self.max_retries}")
if self.max_retries != -1 and retries > self.max_retries:
@ -90,21 +133,36 @@ class HTTPSnapshotReader(VideoReader):
# Accept any valid image dimensions - don't force specific resolution
if frame.shape[1] <= 0 or frame.shape[0] <= 0:
logger.warning(f"Camera {self.camera_id}: Invalid frame dimensions {frame.shape[1]}x{frame.shape[0]}")
stream_health_tracker.report_error(
self.camera_id,
f"Invalid frame dimensions: {frame.shape[1]}x{frame.shape[0]}"
)
continue
# Reset retry counter on successful fetch
retries = 0
self.consecutive_errors = 0
frame_count += 1
# Report successful frame to health monitoring
frame_size = frame.nbytes
stream_health_tracker.report_frame_received(self.camera_id, frame_size)
# Call frame callback
if self.frame_callback:
try:
self.frame_callback(self.camera_id, frame)
except Exception as e:
logger.error(f"Camera {self.camera_id}: Frame callback error: {e}")
stream_health_tracker.report_error(self.camera_id, f"Frame callback error: {e}")
# Periodic connection health test
current_time = time.time()
if current_time - last_connection_test >= self.connection_test_interval:
self._test_connection_health()
last_connection_test = current_time
# Log progress every 30 seconds
current_time = time.time()
if current_time - last_log_time >= 30:
logger.info(f"Camera {self.camera_id}: {frame_count} snapshots processed")
last_log_time = current_time
@ -117,6 +175,7 @@ class HTTPSnapshotReader(VideoReader):
except Exception as e:
logger.error(f"Error in snapshot loop for camera {self.camera_id}: {e}")
stream_health_tracker.report_error(self.camera_id, f"Snapshot loop error: {e}")
retries += 1
if self.max_retries != -1 and retries > self.max_retries:
break
@ -246,4 +305,74 @@ class HTTPSnapshotReader(VideoReader):
right = target_width - new_width - left
resized = cv2.copyMakeBorder(resized, top, bottom, left, right, cv2.BORDER_CONSTANT, value=[0, 0, 0])
return resized
return resized
# Health monitoring methods
def _send_heartbeat(self, activity: str = "running"):
"""Send heartbeat to thread health monitor."""
self.last_heartbeat = time.time()
thread_health_monitor.heartbeat(activity=activity)
def _heartbeat_callback(self) -> bool:
"""Heartbeat callback for thread responsiveness testing."""
try:
# Check if thread is responsive by checking recent heartbeat
current_time = time.time()
age = current_time - self.last_heartbeat
# Thread is responsive if heartbeat is recent
return age < 30.0 # 30 second responsiveness threshold
except Exception:
return False
def _test_connection_health(self):
"""Test HTTP connection health."""
try:
stream_health_tracker.test_http_connection(self.camera_id, self.snapshot_url)
except Exception as e:
logger.error(f"Error testing connection health for {self.camera_id}: {e}")
def _handle_restart_recovery(self, component: str, details: Dict[str, Any]) -> bool:
"""Handle restart recovery action."""
try:
logger.info(f"Restarting HTTP snapshot reader for {self.camera_id}")
# Stop current instance
self.stop()
# Small delay
time.sleep(2.0)
# Restart
self.start()
# Report successful restart
stream_health_tracker.report_reconnect(self.camera_id, "health_recovery_restart")
return True
except Exception as e:
logger.error(f"Failed to restart HTTP snapshot reader for {self.camera_id}: {e}")
return False
def _handle_reconnect_recovery(self, component: str, details: Dict[str, Any]) -> bool:
"""Handle reconnect recovery action."""
try:
logger.info(f"Reconnecting HTTP snapshot reader for {self.camera_id}")
# Test connection first
success = stream_health_tracker.test_http_connection(self.camera_id, self.snapshot_url)
if success:
# Reset error counters
self.consecutive_errors = 0
stream_health_tracker.report_reconnect(self.camera_id, "health_recovery_reconnect")
return True
else:
logger.warning(f"Connection test failed during recovery for {self.camera_id}")
return False
except Exception as e:
logger.error(f"Failed to reconnect HTTP snapshot reader for {self.camera_id}: {e}")
return False