python-detector-worker/core/monitoring/recovery.py
Siwat Sirichai b08ce27de2
Some checks failed
Build Worker Base and Application Images / check-base-changes (push) Successful in 8s
Build Worker Base and Application Images / build-base (push) Has been skipped
Build Worker Base and Application Images / build-docker (push) Successful in 3m7s
Build Worker Base and Application Images / deploy-stack (push) Has been cancelled
Implement comprehensive health monitoring for streams and threads
- Added RecoveryManager for automatic handling of health issues, including circuit breaker patterns, automatic restarts, and graceful degradation.
- Introduced StreamHealthTracker to monitor video stream metrics, including frame production, connection health, and error rates.
- Developed ThreadHealthMonitor for detecting unresponsive and deadlocked threads, providing liveness detection and responsiveness testing.
- Integrated health checks for streams and threads, reporting metrics and recovery actions to the health monitor.
- Enhanced logging for recovery attempts, errors, and health checks to improve observability and debugging.
2025-09-27 12:27:38 +07:00

385 lines
No EOL
14 KiB
Python

"""
Recovery manager for automatic handling of health issues.
Provides circuit breaker patterns, automatic restarts, and graceful degradation.
"""
import time
import logging
import threading
from typing import Dict, List, Optional, Any, Callable
from dataclasses import dataclass
from enum import Enum
from collections import defaultdict, deque
from .health import HealthCheck, HealthStatus, health_monitor
logger = logging.getLogger(__name__)
class RecoveryAction(Enum):
"""Types of recovery actions."""
RESTART_STREAM = "restart_stream"
RESTART_THREAD = "restart_thread"
CLEAR_BUFFER = "clear_buffer"
RECONNECT = "reconnect"
THROTTLE = "throttle"
DISABLE = "disable"
@dataclass
class RecoveryAttempt:
"""Record of a recovery attempt."""
timestamp: float
component: str
action: RecoveryAction
reason: str
success: bool
details: Dict[str, Any] = None
@dataclass
class RecoveryState:
"""Recovery state for a component - simplified without circuit breaker."""
failure_count: int = 0
success_count: int = 0
last_failure_time: Optional[float] = None
last_success_time: Optional[float] = None
class RecoveryManager:
"""Manages automatic recovery actions for health issues."""
def __init__(self):
self.recovery_handlers: Dict[str, Callable[[str, HealthCheck], bool]] = {}
self.recovery_states: Dict[str, RecoveryState] = {}
self.recovery_history: deque = deque(maxlen=1000)
self._lock = threading.RLock()
# Configuration - simplified without circuit breaker
self.recovery_cooldown = 30 # 30 seconds between recovery attempts
self.max_attempts_per_hour = 20 # Still limit to prevent spam, but much higher
# Track recovery attempts per component
self.recovery_attempts: Dict[str, deque] = defaultdict(lambda: deque(maxlen=50))
# Register with health monitor
health_monitor.register_recovery_callback("stream", self._handle_stream_recovery)
health_monitor.register_recovery_callback("thread", self._handle_thread_recovery)
health_monitor.register_recovery_callback("buffer", self._handle_buffer_recovery)
def register_recovery_handler(self, action: RecoveryAction, handler: Callable[[str, Dict[str, Any]], bool]):
"""
Register a recovery handler for a specific action.
Args:
action: Type of recovery action
handler: Function that performs the recovery
"""
self.recovery_handlers[action.value] = handler
logger.info(f"Registered recovery handler for {action.value}")
def can_attempt_recovery(self, component: str) -> bool:
"""
Check if recovery can be attempted for a component.
Args:
component: Component identifier
Returns:
True if recovery can be attempted (always allow with minimal throttling)
"""
with self._lock:
current_time = time.time()
# Check recovery attempt rate limiting (much more permissive)
recent_attempts = [
attempt for attempt in self.recovery_attempts[component]
if current_time - attempt <= 3600 # Last hour
]
# Only block if truly excessive attempts
if len(recent_attempts) >= self.max_attempts_per_hour:
logger.warning(f"Recovery rate limit exceeded for {component} "
f"({len(recent_attempts)} attempts in last hour)")
return False
# Check cooldown period (shorter cooldown)
if recent_attempts:
last_attempt = max(recent_attempts)
if current_time - last_attempt < self.recovery_cooldown:
logger.debug(f"Recovery cooldown active for {component} "
f"(last attempt {current_time - last_attempt:.1f}s ago)")
return False
return True
def attempt_recovery(self, component: str, action: RecoveryAction, reason: str,
details: Optional[Dict[str, Any]] = None) -> bool:
"""
Attempt recovery for a component.
Args:
component: Component identifier
action: Recovery action to perform
reason: Reason for recovery
details: Additional details
Returns:
True if recovery was successful
"""
if not self.can_attempt_recovery(component):
return False
current_time = time.time()
logger.info(f"Attempting recovery for {component}: {action.value} ({reason})")
try:
# Record recovery attempt
with self._lock:
self.recovery_attempts[component].append(current_time)
# Perform recovery action
success = self._execute_recovery_action(component, action, details or {})
# Record recovery result
attempt = RecoveryAttempt(
timestamp=current_time,
component=component,
action=action,
reason=reason,
success=success,
details=details
)
with self._lock:
self.recovery_history.append(attempt)
# Update recovery state
self._update_recovery_state(component, success)
if success:
logger.info(f"Recovery successful for {component}: {action.value}")
else:
logger.error(f"Recovery failed for {component}: {action.value}")
return success
except Exception as e:
logger.error(f"Error during recovery for {component}: {e}")
self._update_recovery_state(component, False)
return False
def _execute_recovery_action(self, component: str, action: RecoveryAction,
details: Dict[str, Any]) -> bool:
"""Execute a specific recovery action."""
handler_key = action.value
if handler_key not in self.recovery_handlers:
logger.error(f"No recovery handler registered for action: {handler_key}")
return False
try:
handler = self.recovery_handlers[handler_key]
return handler(component, details)
except Exception as e:
logger.error(f"Error executing recovery action {handler_key} for {component}: {e}")
return False
def _update_recovery_state(self, component: str, success: bool):
"""Update recovery state based on recovery result."""
current_time = time.time()
with self._lock:
if component not in self.recovery_states:
self.recovery_states[component] = RecoveryState()
state = self.recovery_states[component]
if success:
state.success_count += 1
state.last_success_time = current_time
# Reset failure count on success
state.failure_count = max(0, state.failure_count - 1)
logger.debug(f"Recovery success for {component} (total successes: {state.success_count})")
else:
state.failure_count += 1
state.last_failure_time = current_time
logger.debug(f"Recovery failure for {component} (total failures: {state.failure_count})")
def _handle_stream_recovery(self, component: str, health_check: HealthCheck) -> bool:
"""Handle recovery for stream-related issues."""
if "frames" in health_check.name:
# Frame-related issue - restart stream
return self.attempt_recovery(
component,
RecoveryAction.RESTART_STREAM,
health_check.message,
health_check.details
)
elif "connection" in health_check.name:
# Connection issue - reconnect
return self.attempt_recovery(
component,
RecoveryAction.RECONNECT,
health_check.message,
health_check.details
)
elif "errors" in health_check.name:
# High error rate - throttle or restart
return self.attempt_recovery(
component,
RecoveryAction.THROTTLE,
health_check.message,
health_check.details
)
else:
# Generic stream issue - restart
return self.attempt_recovery(
component,
RecoveryAction.RESTART_STREAM,
health_check.message,
health_check.details
)
def _handle_thread_recovery(self, component: str, health_check: HealthCheck) -> bool:
"""Handle recovery for thread-related issues."""
if "deadlock" in health_check.name:
# Deadlock detected - restart thread
return self.attempt_recovery(
component,
RecoveryAction.RESTART_THREAD,
health_check.message,
health_check.details
)
elif "responsive" in health_check.name:
# Thread unresponsive - restart
return self.attempt_recovery(
component,
RecoveryAction.RESTART_THREAD,
health_check.message,
health_check.details
)
else:
# Generic thread issue - restart
return self.attempt_recovery(
component,
RecoveryAction.RESTART_THREAD,
health_check.message,
health_check.details
)
def _handle_buffer_recovery(self, component: str, health_check: HealthCheck) -> bool:
"""Handle recovery for buffer-related issues."""
# Buffer issues - clear buffer
return self.attempt_recovery(
component,
RecoveryAction.CLEAR_BUFFER,
health_check.message,
health_check.details
)
def get_recovery_stats(self) -> Dict[str, Any]:
"""Get recovery statistics."""
current_time = time.time()
with self._lock:
# Calculate stats from history
recent_recoveries = [
attempt for attempt in self.recovery_history
if current_time - attempt.timestamp <= 3600 # Last hour
]
stats_by_component = defaultdict(lambda: {
'attempts': 0,
'successes': 0,
'failures': 0,
'last_attempt': None,
'last_success': None
})
for attempt in recent_recoveries:
stats = stats_by_component[attempt.component]
stats['attempts'] += 1
if attempt.success:
stats['successes'] += 1
if not stats['last_success'] or attempt.timestamp > stats['last_success']:
stats['last_success'] = attempt.timestamp
else:
stats['failures'] += 1
if not stats['last_attempt'] or attempt.timestamp > stats['last_attempt']:
stats['last_attempt'] = attempt.timestamp
return {
'total_recoveries_last_hour': len(recent_recoveries),
'recovery_by_component': dict(stats_by_component),
'recovery_states': {
component: {
'failure_count': state.failure_count,
'success_count': state.success_count,
'last_failure_time': state.last_failure_time,
'last_success_time': state.last_success_time
}
for component, state in self.recovery_states.items()
},
'recent_history': [
{
'timestamp': attempt.timestamp,
'component': attempt.component,
'action': attempt.action.value,
'reason': attempt.reason,
'success': attempt.success
}
for attempt in list(self.recovery_history)[-10:] # Last 10 attempts
]
}
def force_recovery(self, component: str, action: RecoveryAction, reason: str = "manual") -> bool:
"""
Force recovery for a component, bypassing rate limiting.
Args:
component: Component identifier
action: Recovery action to perform
reason: Reason for forced recovery
Returns:
True if recovery was successful
"""
logger.info(f"Forcing recovery for {component}: {action.value} ({reason})")
current_time = time.time()
try:
# Execute recovery action directly
success = self._execute_recovery_action(component, action, {})
# Record forced recovery
attempt = RecoveryAttempt(
timestamp=current_time,
component=component,
action=action,
reason=f"forced: {reason}",
success=success,
details={'forced': True}
)
with self._lock:
self.recovery_history.append(attempt)
self.recovery_attempts[component].append(current_time)
# Update recovery state
self._update_recovery_state(component, success)
return success
except Exception as e:
logger.error(f"Error during forced recovery for {component}: {e}")
return False
# Global recovery manager instance
recovery_manager = RecoveryManager()