Some checks failed
Build Worker Base and Application Images / check-base-changes (push) Successful in 8s
Build Worker Base and Application Images / build-base (push) Has been skipped
Build Worker Base and Application Images / build-docker (push) Successful in 3m7s
Build Worker Base and Application Images / deploy-stack (push) Has been cancelled
- Added RecoveryManager for automatic handling of health issues, including circuit breaker patterns, automatic restarts, and graceful degradation. - Introduced StreamHealthTracker to monitor video stream metrics, including frame production, connection health, and error rates. - Developed ThreadHealthMonitor for detecting unresponsive and deadlocked threads, providing liveness detection and responsiveness testing. - Integrated health checks for streams and threads, reporting metrics and recovery actions to the health monitor. - Enhanced logging for recovery attempts, errors, and health checks to improve observability and debugging.
456 lines
No EOL
17 KiB
Python
456 lines
No EOL
17 KiB
Python
"""
|
|
Core health monitoring system for comprehensive stream and system health tracking.
|
|
Provides centralized health status, alerting, and recovery coordination.
|
|
"""
|
|
import time
|
|
import threading
|
|
import logging
|
|
import psutil
|
|
from typing import Dict, List, Optional, Any, Callable
|
|
from dataclasses import dataclass, field
|
|
from enum import Enum
|
|
from collections import defaultdict, deque
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class HealthStatus(Enum):
|
|
"""Health status levels."""
|
|
HEALTHY = "healthy"
|
|
WARNING = "warning"
|
|
CRITICAL = "critical"
|
|
UNKNOWN = "unknown"
|
|
|
|
|
|
@dataclass
|
|
class HealthCheck:
|
|
"""Individual health check result."""
|
|
name: str
|
|
status: HealthStatus
|
|
message: str
|
|
timestamp: float = field(default_factory=time.time)
|
|
details: Dict[str, Any] = field(default_factory=dict)
|
|
recovery_action: Optional[str] = None
|
|
|
|
|
|
@dataclass
|
|
class HealthMetrics:
|
|
"""Health metrics for a component."""
|
|
component_id: str
|
|
last_update: float
|
|
frame_count: int = 0
|
|
error_count: int = 0
|
|
warning_count: int = 0
|
|
restart_count: int = 0
|
|
avg_frame_interval: float = 0.0
|
|
last_frame_time: Optional[float] = None
|
|
thread_alive: bool = True
|
|
connection_healthy: bool = True
|
|
memory_usage_mb: float = 0.0
|
|
cpu_usage_percent: float = 0.0
|
|
|
|
|
|
class HealthMonitor:
|
|
"""Comprehensive health monitoring system."""
|
|
|
|
def __init__(self, check_interval: float = 30.0):
|
|
"""
|
|
Initialize health monitor.
|
|
|
|
Args:
|
|
check_interval: Interval between health checks in seconds
|
|
"""
|
|
self.check_interval = check_interval
|
|
self.running = False
|
|
self.monitor_thread = None
|
|
self._lock = threading.RLock()
|
|
|
|
# Health data storage
|
|
self.health_checks: Dict[str, HealthCheck] = {}
|
|
self.metrics: Dict[str, HealthMetrics] = {}
|
|
self.alert_history: deque = deque(maxlen=1000)
|
|
self.recovery_actions: deque = deque(maxlen=500)
|
|
|
|
# Thresholds (configurable)
|
|
self.thresholds = {
|
|
'frame_stale_warning_seconds': 120, # 2 minutes
|
|
'frame_stale_critical_seconds': 300, # 5 minutes
|
|
'thread_unresponsive_seconds': 60, # 1 minute
|
|
'memory_warning_mb': 500, # 500MB per stream
|
|
'memory_critical_mb': 1000, # 1GB per stream
|
|
'cpu_warning_percent': 80, # 80% CPU
|
|
'cpu_critical_percent': 95, # 95% CPU
|
|
'error_rate_warning': 0.1, # 10% error rate
|
|
'error_rate_critical': 0.3, # 30% error rate
|
|
'restart_threshold': 3 # Max restarts per hour
|
|
}
|
|
|
|
# Health check functions
|
|
self.health_checkers: List[Callable[[], List[HealthCheck]]] = []
|
|
self.recovery_callbacks: Dict[str, Callable[[str, HealthCheck], bool]] = {}
|
|
|
|
# System monitoring
|
|
self.process = psutil.Process()
|
|
self.system_start_time = time.time()
|
|
|
|
def start(self):
|
|
"""Start health monitoring."""
|
|
if self.running:
|
|
logger.warning("Health monitor already running")
|
|
return
|
|
|
|
self.running = True
|
|
self.monitor_thread = threading.Thread(target=self._monitor_loop, daemon=True)
|
|
self.monitor_thread.start()
|
|
logger.info(f"Health monitor started (check interval: {self.check_interval}s)")
|
|
|
|
def stop(self):
|
|
"""Stop health monitoring."""
|
|
self.running = False
|
|
if self.monitor_thread:
|
|
self.monitor_thread.join(timeout=5.0)
|
|
logger.info("Health monitor stopped")
|
|
|
|
def register_health_checker(self, checker: Callable[[], List[HealthCheck]]):
|
|
"""Register a health check function."""
|
|
self.health_checkers.append(checker)
|
|
logger.debug(f"Registered health checker: {checker.__name__}")
|
|
|
|
def register_recovery_callback(self, component: str, callback: Callable[[str, HealthCheck], bool]):
|
|
"""Register a recovery callback for a component."""
|
|
self.recovery_callbacks[component] = callback
|
|
logger.debug(f"Registered recovery callback for {component}")
|
|
|
|
def update_metrics(self, component_id: str, **kwargs):
|
|
"""Update metrics for a component."""
|
|
with self._lock:
|
|
if component_id not in self.metrics:
|
|
self.metrics[component_id] = HealthMetrics(
|
|
component_id=component_id,
|
|
last_update=time.time()
|
|
)
|
|
|
|
metrics = self.metrics[component_id]
|
|
metrics.last_update = time.time()
|
|
|
|
# Update provided metrics
|
|
for key, value in kwargs.items():
|
|
if hasattr(metrics, key):
|
|
setattr(metrics, key, value)
|
|
|
|
def report_frame_received(self, component_id: str):
|
|
"""Report that a frame was received for a component."""
|
|
current_time = time.time()
|
|
with self._lock:
|
|
if component_id not in self.metrics:
|
|
self.metrics[component_id] = HealthMetrics(
|
|
component_id=component_id,
|
|
last_update=current_time
|
|
)
|
|
|
|
metrics = self.metrics[component_id]
|
|
|
|
# Update frame metrics
|
|
if metrics.last_frame_time:
|
|
interval = current_time - metrics.last_frame_time
|
|
# Moving average of frame intervals
|
|
if metrics.avg_frame_interval == 0:
|
|
metrics.avg_frame_interval = interval
|
|
else:
|
|
metrics.avg_frame_interval = (metrics.avg_frame_interval * 0.9) + (interval * 0.1)
|
|
|
|
metrics.last_frame_time = current_time
|
|
metrics.frame_count += 1
|
|
metrics.last_update = current_time
|
|
|
|
def report_error(self, component_id: str, error_type: str = "general"):
|
|
"""Report an error for a component."""
|
|
with self._lock:
|
|
if component_id not in self.metrics:
|
|
self.metrics[component_id] = HealthMetrics(
|
|
component_id=component_id,
|
|
last_update=time.time()
|
|
)
|
|
|
|
self.metrics[component_id].error_count += 1
|
|
self.metrics[component_id].last_update = time.time()
|
|
|
|
logger.debug(f"Error reported for {component_id}: {error_type}")
|
|
|
|
def report_warning(self, component_id: str, warning_type: str = "general"):
|
|
"""Report a warning for a component."""
|
|
with self._lock:
|
|
if component_id not in self.metrics:
|
|
self.metrics[component_id] = HealthMetrics(
|
|
component_id=component_id,
|
|
last_update=time.time()
|
|
)
|
|
|
|
self.metrics[component_id].warning_count += 1
|
|
self.metrics[component_id].last_update = time.time()
|
|
|
|
logger.debug(f"Warning reported for {component_id}: {warning_type}")
|
|
|
|
def report_restart(self, component_id: str):
|
|
"""Report that a component was restarted."""
|
|
with self._lock:
|
|
if component_id not in self.metrics:
|
|
self.metrics[component_id] = HealthMetrics(
|
|
component_id=component_id,
|
|
last_update=time.time()
|
|
)
|
|
|
|
self.metrics[component_id].restart_count += 1
|
|
self.metrics[component_id].last_update = time.time()
|
|
|
|
# Log recovery action
|
|
recovery_action = {
|
|
'timestamp': time.time(),
|
|
'component': component_id,
|
|
'action': 'restart',
|
|
'reason': 'manual_restart'
|
|
}
|
|
|
|
with self._lock:
|
|
self.recovery_actions.append(recovery_action)
|
|
|
|
logger.info(f"Restart reported for {component_id}")
|
|
|
|
def get_health_status(self, component_id: Optional[str] = None) -> Dict[str, Any]:
|
|
"""Get comprehensive health status."""
|
|
with self._lock:
|
|
if component_id:
|
|
# Get health for specific component
|
|
return self._get_component_health(component_id)
|
|
else:
|
|
# Get overall health status
|
|
return self._get_overall_health()
|
|
|
|
def _get_component_health(self, component_id: str) -> Dict[str, Any]:
|
|
"""Get health status for a specific component."""
|
|
if component_id not in self.metrics:
|
|
return {
|
|
'component_id': component_id,
|
|
'status': HealthStatus.UNKNOWN.value,
|
|
'message': 'No metrics available',
|
|
'metrics': {}
|
|
}
|
|
|
|
metrics = self.metrics[component_id]
|
|
current_time = time.time()
|
|
|
|
# Determine health status
|
|
status = HealthStatus.HEALTHY
|
|
issues = []
|
|
|
|
# Check frame freshness
|
|
if metrics.last_frame_time:
|
|
frame_age = current_time - metrics.last_frame_time
|
|
if frame_age > self.thresholds['frame_stale_critical_seconds']:
|
|
status = HealthStatus.CRITICAL
|
|
issues.append(f"Frames stale for {frame_age:.1f}s")
|
|
elif frame_age > self.thresholds['frame_stale_warning_seconds']:
|
|
if status == HealthStatus.HEALTHY:
|
|
status = HealthStatus.WARNING
|
|
issues.append(f"Frames aging ({frame_age:.1f}s)")
|
|
|
|
# Check error rates
|
|
if metrics.frame_count > 0:
|
|
error_rate = metrics.error_count / metrics.frame_count
|
|
if error_rate > self.thresholds['error_rate_critical']:
|
|
status = HealthStatus.CRITICAL
|
|
issues.append(f"High error rate ({error_rate:.1%})")
|
|
elif error_rate > self.thresholds['error_rate_warning']:
|
|
if status == HealthStatus.HEALTHY:
|
|
status = HealthStatus.WARNING
|
|
issues.append(f"Elevated error rate ({error_rate:.1%})")
|
|
|
|
# Check restart frequency
|
|
restart_rate = metrics.restart_count / max(1, (current_time - self.system_start_time) / 3600)
|
|
if restart_rate > self.thresholds['restart_threshold']:
|
|
status = HealthStatus.CRITICAL
|
|
issues.append(f"Frequent restarts ({restart_rate:.1f}/hour)")
|
|
|
|
# Check thread health
|
|
if not metrics.thread_alive:
|
|
status = HealthStatus.CRITICAL
|
|
issues.append("Thread not alive")
|
|
|
|
# Check connection health
|
|
if not metrics.connection_healthy:
|
|
if status == HealthStatus.HEALTHY:
|
|
status = HealthStatus.WARNING
|
|
issues.append("Connection unhealthy")
|
|
|
|
return {
|
|
'component_id': component_id,
|
|
'status': status.value,
|
|
'message': '; '.join(issues) if issues else 'All checks passing',
|
|
'metrics': {
|
|
'frame_count': metrics.frame_count,
|
|
'error_count': metrics.error_count,
|
|
'warning_count': metrics.warning_count,
|
|
'restart_count': metrics.restart_count,
|
|
'avg_frame_interval': metrics.avg_frame_interval,
|
|
'last_frame_age': current_time - metrics.last_frame_time if metrics.last_frame_time else None,
|
|
'thread_alive': metrics.thread_alive,
|
|
'connection_healthy': metrics.connection_healthy,
|
|
'memory_usage_mb': metrics.memory_usage_mb,
|
|
'cpu_usage_percent': metrics.cpu_usage_percent,
|
|
'uptime_seconds': current_time - self.system_start_time
|
|
},
|
|
'last_update': metrics.last_update
|
|
}
|
|
|
|
def _get_overall_health(self) -> Dict[str, Any]:
|
|
"""Get overall system health status."""
|
|
current_time = time.time()
|
|
components = {}
|
|
overall_status = HealthStatus.HEALTHY
|
|
|
|
# Get health for all components
|
|
for component_id in self.metrics.keys():
|
|
component_health = self._get_component_health(component_id)
|
|
components[component_id] = component_health
|
|
|
|
# Determine overall status
|
|
component_status = HealthStatus(component_health['status'])
|
|
if component_status == HealthStatus.CRITICAL:
|
|
overall_status = HealthStatus.CRITICAL
|
|
elif component_status == HealthStatus.WARNING and overall_status == HealthStatus.HEALTHY:
|
|
overall_status = HealthStatus.WARNING
|
|
|
|
# System metrics
|
|
try:
|
|
system_memory = self.process.memory_info()
|
|
system_cpu = self.process.cpu_percent()
|
|
except Exception:
|
|
system_memory = None
|
|
system_cpu = 0.0
|
|
|
|
return {
|
|
'overall_status': overall_status.value,
|
|
'timestamp': current_time,
|
|
'uptime_seconds': current_time - self.system_start_time,
|
|
'total_components': len(self.metrics),
|
|
'components': components,
|
|
'system_metrics': {
|
|
'memory_mb': system_memory.rss / (1024 * 1024) if system_memory else 0,
|
|
'cpu_percent': system_cpu,
|
|
'process_id': self.process.pid
|
|
},
|
|
'recent_alerts': list(self.alert_history)[-10:], # Last 10 alerts
|
|
'recent_recoveries': list(self.recovery_actions)[-10:] # Last 10 recovery actions
|
|
}
|
|
|
|
def _monitor_loop(self):
|
|
"""Main health monitoring loop."""
|
|
logger.info("Health monitor loop started")
|
|
|
|
while self.running:
|
|
try:
|
|
start_time = time.time()
|
|
|
|
# Run all registered health checks
|
|
all_checks = []
|
|
for checker in self.health_checkers:
|
|
try:
|
|
checks = checker()
|
|
all_checks.extend(checks)
|
|
except Exception as e:
|
|
logger.error(f"Error in health checker {checker.__name__}: {e}")
|
|
|
|
# Process health checks and trigger recovery if needed
|
|
for check in all_checks:
|
|
self._process_health_check(check)
|
|
|
|
# Update system metrics
|
|
self._update_system_metrics()
|
|
|
|
# Sleep until next check
|
|
elapsed = time.time() - start_time
|
|
sleep_time = max(0, self.check_interval - elapsed)
|
|
if sleep_time > 0:
|
|
time.sleep(sleep_time)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in health monitor loop: {e}")
|
|
time.sleep(5.0) # Fallback sleep
|
|
|
|
logger.info("Health monitor loop ended")
|
|
|
|
def _process_health_check(self, check: HealthCheck):
|
|
"""Process a health check result and trigger recovery if needed."""
|
|
with self._lock:
|
|
# Store health check
|
|
self.health_checks[check.name] = check
|
|
|
|
# Log alerts for non-healthy status
|
|
if check.status != HealthStatus.HEALTHY:
|
|
alert = {
|
|
'timestamp': check.timestamp,
|
|
'component': check.name,
|
|
'status': check.status.value,
|
|
'message': check.message,
|
|
'details': check.details
|
|
}
|
|
self.alert_history.append(alert)
|
|
|
|
logger.warning(f"Health alert [{check.status.value.upper()}] {check.name}: {check.message}")
|
|
|
|
# Trigger recovery if critical and recovery action available
|
|
if check.status == HealthStatus.CRITICAL and check.recovery_action:
|
|
self._trigger_recovery(check.name, check)
|
|
|
|
def _trigger_recovery(self, component: str, check: HealthCheck):
|
|
"""Trigger recovery action for a component."""
|
|
if component in self.recovery_callbacks:
|
|
try:
|
|
logger.info(f"Triggering recovery for {component}: {check.recovery_action}")
|
|
|
|
success = self.recovery_callbacks[component](component, check)
|
|
|
|
recovery_action = {
|
|
'timestamp': time.time(),
|
|
'component': component,
|
|
'action': check.recovery_action,
|
|
'reason': check.message,
|
|
'success': success
|
|
}
|
|
|
|
with self._lock:
|
|
self.recovery_actions.append(recovery_action)
|
|
|
|
if success:
|
|
logger.info(f"Recovery successful for {component}")
|
|
else:
|
|
logger.error(f"Recovery failed for {component}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in recovery callback for {component}: {e}")
|
|
|
|
def _update_system_metrics(self):
|
|
"""Update system-level metrics."""
|
|
try:
|
|
# Update process metrics for all components
|
|
current_time = time.time()
|
|
|
|
with self._lock:
|
|
for component_id, metrics in self.metrics.items():
|
|
# Update CPU and memory if available
|
|
try:
|
|
# This is a simplified approach - in practice you'd want
|
|
# per-thread or per-component resource tracking
|
|
metrics.cpu_usage_percent = self.process.cpu_percent() / len(self.metrics)
|
|
memory_info = self.process.memory_info()
|
|
metrics.memory_usage_mb = memory_info.rss / (1024 * 1024) / len(self.metrics)
|
|
except Exception:
|
|
pass
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error updating system metrics: {e}")
|
|
|
|
|
|
# Global health monitor instance
|
|
health_monitor = HealthMonitor() |