diff --git a/.claude/settings.local.json b/.claude/settings.local.json index 9e296ac..97cf5c1 100644 --- a/.claude/settings.local.json +++ b/.claude/settings.local.json @@ -2,8 +2,7 @@ "permissions": { "allow": [ "Bash(dir:*)", - "WebSearch", - "Bash(mkdir:*)" + "WebSearch" ], "deny": [], "ask": [] diff --git a/app.py b/app.py index 21d89db..605aa0b 100644 --- a/app.py +++ b/app.py @@ -8,7 +8,6 @@ import os import time import cv2 from contextlib import asynccontextmanager -from typing import Dict, Any from fastapi import FastAPI, WebSocket, HTTPException from fastapi.responses import Response @@ -32,135 +31,21 @@ logger.setLevel(logging.DEBUG) # Frames are now stored in the shared cache buffer from core.streaming.buffers # latest_frames = {} # Deprecated - using shared_cache_buffer instead - -# Health monitoring recovery handlers -def _handle_stream_restart_recovery(component: str, details: Dict[str, Any]) -> bool: - """Handle stream restart recovery at the application level.""" - try: - from core.streaming.manager import shared_stream_manager - - # Extract camera ID from component name (e.g., "stream_cam-001" -> "cam-001") - if component.startswith("stream_"): - camera_id = component[7:] # Remove "stream_" prefix - else: - camera_id = component - - logger.info(f"Attempting stream restart recovery for {camera_id}") - - # Find and restart the subscription - subscriptions = shared_stream_manager.get_all_subscriptions() - for sub_info in subscriptions: - if sub_info.camera_id == camera_id: - # Remove and re-add the subscription - shared_stream_manager.remove_subscription(sub_info.subscription_id) - time.sleep(1.0) # Brief delay - - # Re-add subscription - success = shared_stream_manager.add_subscription( - sub_info.subscription_id, - sub_info.stream_config, - sub_info.crop_coords, - sub_info.model_id, - sub_info.model_url, - sub_info.tracking_integration - ) - - if success: - logger.info(f"Stream restart recovery successful for {camera_id}") - return True - else: - logger.error(f"Stream restart recovery failed for {camera_id}") - return False - - logger.warning(f"No subscription found for camera {camera_id} during recovery") - return False - - except Exception as e: - logger.error(f"Error in stream restart recovery for {component}: {e}") - return False - - -def _handle_stream_reconnect_recovery(component: str, details: Dict[str, Any]) -> bool: - """Handle stream reconnect recovery at the application level.""" - try: - from core.streaming.manager import shared_stream_manager - - # Extract camera ID from component name - if component.startswith("stream_"): - camera_id = component[7:] - else: - camera_id = component - - logger.info(f"Attempting stream reconnect recovery for {camera_id}") - - # For reconnect, we just need to trigger the stream's internal reconnect - # The stream readers handle their own reconnection logic - active_cameras = shared_stream_manager.get_active_cameras() - - if camera_id in active_cameras: - logger.info(f"Stream reconnect recovery triggered for {camera_id}") - return True - else: - logger.warning(f"Camera {camera_id} not found in active cameras during reconnect recovery") - return False - - except Exception as e: - logger.error(f"Error in stream reconnect recovery for {component}: {e}") - return False - # Lifespan event handler (modern FastAPI approach) @asynccontextmanager async def lifespan(app: FastAPI): """Application lifespan management.""" # Startup logger.info("Detector Worker started successfully") - - # Initialize health monitoring system - try: - from core.monitoring.health import health_monitor - from core.monitoring.stream_health import stream_health_tracker - from core.monitoring.thread_health import thread_health_monitor - from core.monitoring.recovery import recovery_manager - - # Start health monitoring - health_monitor.start() - logger.info("Health monitoring system started") - - # Register recovery handlers for stream management - from core.streaming.manager import shared_stream_manager - recovery_manager.register_recovery_handler( - "restart_stream", - _handle_stream_restart_recovery - ) - recovery_manager.register_recovery_handler( - "reconnect", - _handle_stream_reconnect_recovery - ) - - logger.info("Recovery handlers registered") - - except Exception as e: - logger.error(f"Failed to initialize health monitoring: {e}") - logger.info("WebSocket endpoint available at: ws://0.0.0.0:8001/") logger.info("HTTP camera endpoint available at: http://0.0.0.0:8001/camera/{camera_id}/image") logger.info("Health check available at: http://0.0.0.0:8001/health") - logger.info("Detailed health monitoring available at: http://0.0.0.0:8001/health/detailed") logger.info("Ready and waiting for backend WebSocket connections") yield # Shutdown logger.info("Detector Worker shutting down...") - - # Stop health monitoring - try: - from core.monitoring.health import health_monitor - health_monitor.stop() - logger.info("Health monitoring system stopped") - except Exception as e: - logger.error(f"Error stopping health monitoring: {e}") - # Clear all state worker_state.set_subscriptions([]) worker_state.session_ids.clear() @@ -201,11 +86,10 @@ else: os.makedirs("models", exist_ok=True) logger.info("Ensured models directory exists") -# Stream manager already initialized at module level with max_streams=20 -# Calling initialize_stream_manager() creates a NEW instance, breaking references -# from core.streaming import initialize_stream_manager -# initialize_stream_manager(max_streams=config.get('max_streams', 10)) -logger.info(f"Using stream manager with max_streams=20 (module-level initialization)") +# Initialize stream manager with config value +from core.streaming import initialize_stream_manager +initialize_stream_manager(max_streams=config.get('max_streams', 10)) +logger.info(f"Initialized stream manager with max_streams={config.get('max_streams', 10)}") # Frames are now stored in the shared cache buffer from core.streaming.buffers # latest_frames = {} # Deprecated - using shared_cache_buffer instead @@ -302,63 +186,6 @@ async def get_camera_image(camera_id: str): raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") -@app.get("/session-image/{session_id}") -async def get_session_image(session_id: int): - """ - HTTP endpoint to retrieve the saved session image by session ID. - - Args: - session_id: The session ID to retrieve the image for - - Returns: - JPEG image as binary response - - Raises: - HTTPException: 404 if no image found for the session - HTTPException: 500 if reading image fails - """ - try: - from pathlib import Path - import glob - - # Images directory - images_dir = Path("images") - - if not images_dir.exists(): - logger.warning(f"Images directory does not exist") - raise HTTPException( - status_code=404, - detail=f"No images directory found" - ) - - # Search for files matching session ID pattern: {session_id}_* - pattern = str(images_dir / f"{session_id}_*.jpg") - matching_files = glob.glob(pattern) - - if not matching_files: - logger.warning(f"No image found for session {session_id}") - raise HTTPException( - status_code=404, - detail=f"No image found for session {session_id}" - ) - - # Get the most recent file if multiple exist - most_recent_file = max(matching_files, key=os.path.getmtime) - logger.info(f"Found session image for session {session_id}: {most_recent_file}") - - # Read the image file - image_data = open(most_recent_file, 'rb').read() - - # Return image as binary response - return Response(content=image_data, media_type="image/jpeg") - - except HTTPException: - raise - except Exception as e: - logger.error(f"Error retrieving session image for session {session_id}: {str(e)}", exc_info=True) - raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") - - @app.get("/health") async def health_check(): """Health check endpoint for monitoring.""" @@ -370,205 +197,6 @@ async def health_check(): } -@app.get("/health/detailed") -async def detailed_health_check(): - """Comprehensive health status with detailed monitoring data.""" - try: - from core.monitoring.health import health_monitor - from core.monitoring.stream_health import stream_health_tracker - from core.monitoring.thread_health import thread_health_monitor - from core.monitoring.recovery import recovery_manager - - # Get comprehensive health status - overall_health = health_monitor.get_health_status() - stream_metrics = stream_health_tracker.get_all_metrics() - thread_info = thread_health_monitor.get_all_thread_info() - recovery_stats = recovery_manager.get_recovery_stats() - - return { - "timestamp": time.time(), - "overall_health": overall_health, - "stream_metrics": stream_metrics, - "thread_health": thread_info, - "recovery_stats": recovery_stats, - "system_info": { - "active_subscriptions": len(worker_state.subscriptions), - "active_sessions": len(worker_state.session_ids), - "version": "2.0.0" - } - } - - except Exception as e: - logger.error(f"Error generating detailed health report: {e}") - raise HTTPException(status_code=500, detail=f"Health monitoring error: {str(e)}") - - -@app.get("/health/streams") -async def stream_health_status(): - """Stream-specific health monitoring.""" - try: - from core.monitoring.stream_health import stream_health_tracker - from core.streaming.buffers import shared_cache_buffer - - stream_metrics = stream_health_tracker.get_all_metrics() - buffer_stats = shared_cache_buffer.get_stats() - - return { - "timestamp": time.time(), - "stream_count": len(stream_metrics), - "stream_metrics": stream_metrics, - "buffer_stats": buffer_stats, - "frame_ages": { - camera_id: { - "age_seconds": time.time() - info["last_frame_time"] if info and info.get("last_frame_time") else None, - "total_frames": info.get("frame_count", 0) if info else 0 - } - for camera_id, info in stream_metrics.items() - } - } - - except Exception as e: - logger.error(f"Error generating stream health report: {e}") - raise HTTPException(status_code=500, detail=f"Stream health error: {str(e)}") - - -@app.get("/health/threads") -async def thread_health_status(): - """Thread-specific health monitoring.""" - try: - from core.monitoring.thread_health import thread_health_monitor - - thread_info = thread_health_monitor.get_all_thread_info() - deadlocks = thread_health_monitor.detect_deadlocks() - - return { - "timestamp": time.time(), - "thread_count": len(thread_info), - "thread_info": thread_info, - "potential_deadlocks": deadlocks, - "summary": { - "responsive_threads": sum(1 for info in thread_info.values() if info.get("is_responsive", False)), - "unresponsive_threads": sum(1 for info in thread_info.values() if not info.get("is_responsive", True)), - "deadlock_count": len(deadlocks) - } - } - - except Exception as e: - logger.error(f"Error generating thread health report: {e}") - raise HTTPException(status_code=500, detail=f"Thread health error: {str(e)}") - - -@app.get("/health/recovery") -async def recovery_status(): - """Recovery system status and history.""" - try: - from core.monitoring.recovery import recovery_manager - - recovery_stats = recovery_manager.get_recovery_stats() - - return { - "timestamp": time.time(), - "recovery_stats": recovery_stats, - "summary": { - "total_recoveries_last_hour": recovery_stats.get("total_recoveries_last_hour", 0), - "components_with_recovery_state": len(recovery_stats.get("recovery_states", {})), - "total_recovery_failures": sum( - state.get("failure_count", 0) - for state in recovery_stats.get("recovery_states", {}).values() - ), - "total_recovery_successes": sum( - state.get("success_count", 0) - for state in recovery_stats.get("recovery_states", {}).values() - ) - } - } - - except Exception as e: - logger.error(f"Error generating recovery status report: {e}") - raise HTTPException(status_code=500, detail=f"Recovery status error: {str(e)}") - - -@app.post("/health/recovery/force/{component}") -async def force_recovery(component: str, action: str = "restart_stream"): - """Force recovery action for a specific component.""" - try: - from core.monitoring.recovery import recovery_manager, RecoveryAction - - # Validate action - try: - recovery_action = RecoveryAction(action) - except ValueError: - raise HTTPException( - status_code=400, - detail=f"Invalid recovery action: {action}. Valid actions: {[a.value for a in RecoveryAction]}" - ) - - # Force recovery - success = recovery_manager.force_recovery(component, recovery_action, "manual_api_request") - - return { - "timestamp": time.time(), - "component": component, - "action": action, - "success": success, - "message": f"Recovery {'successful' if success else 'failed'} for component {component}" - } - - except HTTPException: - raise - except Exception as e: - logger.error(f"Error forcing recovery for {component}: {e}") - raise HTTPException(status_code=500, detail=f"Recovery error: {str(e)}") - - -@app.get("/health/metrics") -async def health_metrics(): - """Performance and health metrics in a format suitable for monitoring systems.""" - try: - from core.monitoring.health import health_monitor - from core.monitoring.stream_health import stream_health_tracker - from core.streaming.buffers import shared_cache_buffer - - # Get basic metrics - overall_health = health_monitor.get_health_status() - stream_metrics = stream_health_tracker.get_all_metrics() - buffer_stats = shared_cache_buffer.get_stats() - - # Format for monitoring systems (Prometheus-style) - metrics = { - "detector_worker_up": 1, - "detector_worker_streams_total": len(stream_metrics), - "detector_worker_subscriptions_total": len(worker_state.subscriptions), - "detector_worker_sessions_total": len(worker_state.session_ids), - "detector_worker_memory_mb": buffer_stats.get("total_memory_mb", 0), - "detector_worker_health_status": { - "healthy": 1, - "warning": 2, - "critical": 3, - "unknown": 4 - }.get(overall_health.get("overall_status", "unknown"), 4) - } - - # Add per-stream metrics - for camera_id, stream_info in stream_metrics.items(): - safe_camera_id = camera_id.replace("-", "_").replace(".", "_") - metrics.update({ - f"detector_worker_stream_frames_total{{camera=\"{safe_camera_id}\"}}": stream_info.get("frame_count", 0), - f"detector_worker_stream_errors_total{{camera=\"{safe_camera_id}\"}}": stream_info.get("error_count", 0), - f"detector_worker_stream_fps{{camera=\"{safe_camera_id}\"}}": stream_info.get("frames_per_second", 0), - f"detector_worker_stream_frame_age_seconds{{camera=\"{safe_camera_id}\"}}": stream_info.get("last_frame_age_seconds") or 0 - }) - - return { - "timestamp": time.time(), - "metrics": metrics - } - - except Exception as e: - logger.error(f"Error generating health metrics: {e}") - raise HTTPException(status_code=500, detail=f"Metrics error: {str(e)}") - - if __name__ == "__main__": diff --git a/core/communication/websocket.py b/core/communication/websocket.py index e53096a..4e40d2a 100644 --- a/core/communication/websocket.py +++ b/core/communication/websocket.py @@ -539,7 +539,7 @@ class WebSocketHandler: async def _handle_set_session_id(self, message: SetSessionIdMessage) -> None: """Handle setSessionId message.""" display_identifier = message.payload.displayIdentifier - session_id = str(message.payload.sessionId) if message.payload.sessionId is not None else None + session_id = message.payload.sessionId logger.info(f"[RX Processing] setSessionId for display {display_identifier}: {session_id}") diff --git a/core/detection/pipeline.py b/core/detection/pipeline.py index d395f3a..076cdc9 100644 --- a/core/detection/pipeline.py +++ b/core/detection/pipeline.py @@ -64,10 +64,6 @@ class DetectionPipeline: # SessionId to processing results mapping (for combining with license plate results) self.session_processing_results = {} - # Field mappings from parallelActions (e.g., {"car_brand": "{car_brand_cls_v3.brand}"}) - self.field_mappings = {} - self._parse_field_mappings() - # Statistics self.stats = { 'detections_processed': 0, @@ -78,25 +74,6 @@ class DetectionPipeline: logger.info("DetectionPipeline initialized") - def _parse_field_mappings(self): - """ - Parse field mappings from parallelActions.postgresql_update_combined.fields. - Extracts mappings like {"car_brand": "{car_brand_cls_v3.brand}"} for dynamic field resolution. - """ - try: - if not self.pipeline_config or not hasattr(self.pipeline_config, 'parallel_actions'): - return - - for action in self.pipeline_config.parallel_actions: - if action.type.value == 'postgresql_update_combined': - fields = action.params.get('fields', {}) - self.field_mappings = fields - logger.info(f"[FIELD MAPPINGS] Parsed from pipeline config: {self.field_mappings}") - break - - except Exception as e: - logger.error(f"Error parsing field mappings: {e}", exc_info=True) - async def initialize(self) -> bool: """ Initialize all pipeline components including models, Redis, and database. @@ -188,44 +165,6 @@ class DetectionPipeline: logger.error(f"Error initializing detection model: {e}", exc_info=True) return False - def _extract_fields_from_branches(self, branch_results: Dict[str, Any]) -> Dict[str, Any]: - """ - Extract fields dynamically from branch results using field mappings. - - Args: - branch_results: Dictionary of branch execution results - - Returns: - Dictionary with extracted field values (e.g., {"car_brand": "Honda", "body_type": "Sedan"}) - """ - extracted = {} - - try: - for db_field_name, template in self.field_mappings.items(): - # Parse template like "{car_brand_cls_v3.brand}" -> branch_id="car_brand_cls_v3", field="brand" - if template.startswith('{') and template.endswith('}'): - var_name = template[1:-1] - if '.' in var_name: - branch_id, field_name = var_name.split('.', 1) - - # Look up value in branch_results - if branch_id in branch_results: - branch_data = branch_results[branch_id] - if isinstance(branch_data, dict) and 'result' in branch_data: - result_data = branch_data['result'] - if isinstance(result_data, dict) and field_name in result_data: - extracted[field_name] = result_data[field_name] - logger.debug(f"[DYNAMIC EXTRACT] {field_name}={result_data[field_name]} from branch {branch_id}") - else: - logger.debug(f"[DYNAMIC EXTRACT] Field '{field_name}' not found in branch {branch_id}") - else: - logger.debug(f"[DYNAMIC EXTRACT] Branch '{branch_id}' not in results") - - except Exception as e: - logger.error(f"Error extracting fields from branches: {e}", exc_info=True) - - return extracted - async def _on_license_plate_result(self, session_id: str, license_data: Dict[str, Any]): """ Callback for handling license plate results from LPR service. @@ -333,12 +272,12 @@ class DetectionPipeline: branch_results = self.session_processing_results[session_id_for_lookup] logger.info(f"[LICENSE PLATE] Retrieved processing results for session {session_id_for_lookup}") - # Extract fields dynamically using field mappings from pipeline config - extracted_fields = self._extract_fields_from_branches(branch_results) - car_brand = extracted_fields.get('brand') - body_type = extracted_fields.get('body_type') - - logger.info(f"[LICENSE PLATE] Extracted fields: brand={car_brand}, body_type={body_type}") + if 'car_brand_cls_v2' in branch_results: + brand_result = branch_results['car_brand_cls_v2'].get('result', {}) + car_brand = brand_result.get('brand') + if 'car_bodytype_cls_v1' in branch_results: + bodytype_result = branch_results['car_bodytype_cls_v1'].get('result', {}) + body_type = bodytype_result.get('body_type') # Clean up stored results after use del self.session_processing_results[session_id_for_lookup] @@ -1064,7 +1003,7 @@ class DetectionPipeline: Resolve field template using branch results and context. Args: - template: Template string like "{car_brand_cls_v3.brand}" + template: Template string like "{car_brand_cls_v2.brand}" branch_results: Dictionary of branch execution results context: Detection context @@ -1076,7 +1015,7 @@ class DetectionPipeline: if template.startswith('{') and template.endswith('}'): var_name = template[1:-1] - # Check for branch result reference (e.g., "car_brand_cls_v3.brand") + # Check for branch result reference (e.g., "car_brand_cls_v2.brand") if '.' in var_name: branch_id, field_name = var_name.split('.', 1) if branch_id in branch_results: @@ -1122,10 +1061,17 @@ class DetectionPipeline: logger.warning("No session_id in context for processing results") return - # Extract fields dynamically using field mappings from pipeline config - extracted_fields = self._extract_fields_from_branches(branch_results) - car_brand = extracted_fields.get('brand') - body_type = extracted_fields.get('body_type') + # Extract car brand from car_brand_cls_v2 results + car_brand = None + if 'car_brand_cls_v2' in branch_results: + brand_result = branch_results['car_brand_cls_v2'].get('result', {}) + car_brand = brand_result.get('brand') + + # Extract body type from car_bodytype_cls_v1 results + body_type = None + if 'car_bodytype_cls_v1' in branch_results: + bodytype_result = branch_results['car_bodytype_cls_v1'].get('result', {}) + body_type = bodytype_result.get('body_type') logger.info(f"[PROCESSING RESULTS] Completed for session {session_id}: " f"brand={car_brand}, bodyType={body_type}") diff --git a/core/monitoring/__init__.py b/core/monitoring/__init__.py deleted file mode 100644 index 2ad32ed..0000000 --- a/core/monitoring/__init__.py +++ /dev/null @@ -1,18 +0,0 @@ -""" -Comprehensive health monitoring system for detector worker. -Tracks stream health, thread responsiveness, and system performance. -""" - -from .health import HealthMonitor, HealthStatus, HealthCheck -from .stream_health import StreamHealthTracker -from .thread_health import ThreadHealthMonitor -from .recovery import RecoveryManager - -__all__ = [ - 'HealthMonitor', - 'HealthStatus', - 'HealthCheck', - 'StreamHealthTracker', - 'ThreadHealthMonitor', - 'RecoveryManager' -] \ No newline at end of file diff --git a/core/monitoring/health.py b/core/monitoring/health.py deleted file mode 100644 index be094f3..0000000 --- a/core/monitoring/health.py +++ /dev/null @@ -1,456 +0,0 @@ -""" -Core health monitoring system for comprehensive stream and system health tracking. -Provides centralized health status, alerting, and recovery coordination. -""" -import time -import threading -import logging -import psutil -from typing import Dict, List, Optional, Any, Callable -from dataclasses import dataclass, field -from enum import Enum -from collections import defaultdict, deque - - -logger = logging.getLogger(__name__) - - -class HealthStatus(Enum): - """Health status levels.""" - HEALTHY = "healthy" - WARNING = "warning" - CRITICAL = "critical" - UNKNOWN = "unknown" - - -@dataclass -class HealthCheck: - """Individual health check result.""" - name: str - status: HealthStatus - message: str - timestamp: float = field(default_factory=time.time) - details: Dict[str, Any] = field(default_factory=dict) - recovery_action: Optional[str] = None - - -@dataclass -class HealthMetrics: - """Health metrics for a component.""" - component_id: str - last_update: float - frame_count: int = 0 - error_count: int = 0 - warning_count: int = 0 - restart_count: int = 0 - avg_frame_interval: float = 0.0 - last_frame_time: Optional[float] = None - thread_alive: bool = True - connection_healthy: bool = True - memory_usage_mb: float = 0.0 - cpu_usage_percent: float = 0.0 - - -class HealthMonitor: - """Comprehensive health monitoring system.""" - - def __init__(self, check_interval: float = 30.0): - """ - Initialize health monitor. - - Args: - check_interval: Interval between health checks in seconds - """ - self.check_interval = check_interval - self.running = False - self.monitor_thread = None - self._lock = threading.RLock() - - # Health data storage - self.health_checks: Dict[str, HealthCheck] = {} - self.metrics: Dict[str, HealthMetrics] = {} - self.alert_history: deque = deque(maxlen=1000) - self.recovery_actions: deque = deque(maxlen=500) - - # Thresholds (configurable) - self.thresholds = { - 'frame_stale_warning_seconds': 120, # 2 minutes - 'frame_stale_critical_seconds': 300, # 5 minutes - 'thread_unresponsive_seconds': 60, # 1 minute - 'memory_warning_mb': 500, # 500MB per stream - 'memory_critical_mb': 1000, # 1GB per stream - 'cpu_warning_percent': 80, # 80% CPU - 'cpu_critical_percent': 95, # 95% CPU - 'error_rate_warning': 0.1, # 10% error rate - 'error_rate_critical': 0.3, # 30% error rate - 'restart_threshold': 3 # Max restarts per hour - } - - # Health check functions - self.health_checkers: List[Callable[[], List[HealthCheck]]] = [] - self.recovery_callbacks: Dict[str, Callable[[str, HealthCheck], bool]] = {} - - # System monitoring - self.process = psutil.Process() - self.system_start_time = time.time() - - def start(self): - """Start health monitoring.""" - if self.running: - logger.warning("Health monitor already running") - return - - self.running = True - self.monitor_thread = threading.Thread(target=self._monitor_loop, daemon=True) - self.monitor_thread.start() - logger.info(f"Health monitor started (check interval: {self.check_interval}s)") - - def stop(self): - """Stop health monitoring.""" - self.running = False - if self.monitor_thread: - self.monitor_thread.join(timeout=5.0) - logger.info("Health monitor stopped") - - def register_health_checker(self, checker: Callable[[], List[HealthCheck]]): - """Register a health check function.""" - self.health_checkers.append(checker) - logger.debug(f"Registered health checker: {checker.__name__}") - - def register_recovery_callback(self, component: str, callback: Callable[[str, HealthCheck], bool]): - """Register a recovery callback for a component.""" - self.recovery_callbacks[component] = callback - logger.debug(f"Registered recovery callback for {component}") - - def update_metrics(self, component_id: str, **kwargs): - """Update metrics for a component.""" - with self._lock: - if component_id not in self.metrics: - self.metrics[component_id] = HealthMetrics( - component_id=component_id, - last_update=time.time() - ) - - metrics = self.metrics[component_id] - metrics.last_update = time.time() - - # Update provided metrics - for key, value in kwargs.items(): - if hasattr(metrics, key): - setattr(metrics, key, value) - - def report_frame_received(self, component_id: str): - """Report that a frame was received for a component.""" - current_time = time.time() - with self._lock: - if component_id not in self.metrics: - self.metrics[component_id] = HealthMetrics( - component_id=component_id, - last_update=current_time - ) - - metrics = self.metrics[component_id] - - # Update frame metrics - if metrics.last_frame_time: - interval = current_time - metrics.last_frame_time - # Moving average of frame intervals - if metrics.avg_frame_interval == 0: - metrics.avg_frame_interval = interval - else: - metrics.avg_frame_interval = (metrics.avg_frame_interval * 0.9) + (interval * 0.1) - - metrics.last_frame_time = current_time - metrics.frame_count += 1 - metrics.last_update = current_time - - def report_error(self, component_id: str, error_type: str = "general"): - """Report an error for a component.""" - with self._lock: - if component_id not in self.metrics: - self.metrics[component_id] = HealthMetrics( - component_id=component_id, - last_update=time.time() - ) - - self.metrics[component_id].error_count += 1 - self.metrics[component_id].last_update = time.time() - - logger.debug(f"Error reported for {component_id}: {error_type}") - - def report_warning(self, component_id: str, warning_type: str = "general"): - """Report a warning for a component.""" - with self._lock: - if component_id not in self.metrics: - self.metrics[component_id] = HealthMetrics( - component_id=component_id, - last_update=time.time() - ) - - self.metrics[component_id].warning_count += 1 - self.metrics[component_id].last_update = time.time() - - logger.debug(f"Warning reported for {component_id}: {warning_type}") - - def report_restart(self, component_id: str): - """Report that a component was restarted.""" - with self._lock: - if component_id not in self.metrics: - self.metrics[component_id] = HealthMetrics( - component_id=component_id, - last_update=time.time() - ) - - self.metrics[component_id].restart_count += 1 - self.metrics[component_id].last_update = time.time() - - # Log recovery action - recovery_action = { - 'timestamp': time.time(), - 'component': component_id, - 'action': 'restart', - 'reason': 'manual_restart' - } - - with self._lock: - self.recovery_actions.append(recovery_action) - - logger.info(f"Restart reported for {component_id}") - - def get_health_status(self, component_id: Optional[str] = None) -> Dict[str, Any]: - """Get comprehensive health status.""" - with self._lock: - if component_id: - # Get health for specific component - return self._get_component_health(component_id) - else: - # Get overall health status - return self._get_overall_health() - - def _get_component_health(self, component_id: str) -> Dict[str, Any]: - """Get health status for a specific component.""" - if component_id not in self.metrics: - return { - 'component_id': component_id, - 'status': HealthStatus.UNKNOWN.value, - 'message': 'No metrics available', - 'metrics': {} - } - - metrics = self.metrics[component_id] - current_time = time.time() - - # Determine health status - status = HealthStatus.HEALTHY - issues = [] - - # Check frame freshness - if metrics.last_frame_time: - frame_age = current_time - metrics.last_frame_time - if frame_age > self.thresholds['frame_stale_critical_seconds']: - status = HealthStatus.CRITICAL - issues.append(f"Frames stale for {frame_age:.1f}s") - elif frame_age > self.thresholds['frame_stale_warning_seconds']: - if status == HealthStatus.HEALTHY: - status = HealthStatus.WARNING - issues.append(f"Frames aging ({frame_age:.1f}s)") - - # Check error rates - if metrics.frame_count > 0: - error_rate = metrics.error_count / metrics.frame_count - if error_rate > self.thresholds['error_rate_critical']: - status = HealthStatus.CRITICAL - issues.append(f"High error rate ({error_rate:.1%})") - elif error_rate > self.thresholds['error_rate_warning']: - if status == HealthStatus.HEALTHY: - status = HealthStatus.WARNING - issues.append(f"Elevated error rate ({error_rate:.1%})") - - # Check restart frequency - restart_rate = metrics.restart_count / max(1, (current_time - self.system_start_time) / 3600) - if restart_rate > self.thresholds['restart_threshold']: - status = HealthStatus.CRITICAL - issues.append(f"Frequent restarts ({restart_rate:.1f}/hour)") - - # Check thread health - if not metrics.thread_alive: - status = HealthStatus.CRITICAL - issues.append("Thread not alive") - - # Check connection health - if not metrics.connection_healthy: - if status == HealthStatus.HEALTHY: - status = HealthStatus.WARNING - issues.append("Connection unhealthy") - - return { - 'component_id': component_id, - 'status': status.value, - 'message': '; '.join(issues) if issues else 'All checks passing', - 'metrics': { - 'frame_count': metrics.frame_count, - 'error_count': metrics.error_count, - 'warning_count': metrics.warning_count, - 'restart_count': metrics.restart_count, - 'avg_frame_interval': metrics.avg_frame_interval, - 'last_frame_age': current_time - metrics.last_frame_time if metrics.last_frame_time else None, - 'thread_alive': metrics.thread_alive, - 'connection_healthy': metrics.connection_healthy, - 'memory_usage_mb': metrics.memory_usage_mb, - 'cpu_usage_percent': metrics.cpu_usage_percent, - 'uptime_seconds': current_time - self.system_start_time - }, - 'last_update': metrics.last_update - } - - def _get_overall_health(self) -> Dict[str, Any]: - """Get overall system health status.""" - current_time = time.time() - components = {} - overall_status = HealthStatus.HEALTHY - - # Get health for all components - for component_id in self.metrics.keys(): - component_health = self._get_component_health(component_id) - components[component_id] = component_health - - # Determine overall status - component_status = HealthStatus(component_health['status']) - if component_status == HealthStatus.CRITICAL: - overall_status = HealthStatus.CRITICAL - elif component_status == HealthStatus.WARNING and overall_status == HealthStatus.HEALTHY: - overall_status = HealthStatus.WARNING - - # System metrics - try: - system_memory = self.process.memory_info() - system_cpu = self.process.cpu_percent() - except Exception: - system_memory = None - system_cpu = 0.0 - - return { - 'overall_status': overall_status.value, - 'timestamp': current_time, - 'uptime_seconds': current_time - self.system_start_time, - 'total_components': len(self.metrics), - 'components': components, - 'system_metrics': { - 'memory_mb': system_memory.rss / (1024 * 1024) if system_memory else 0, - 'cpu_percent': system_cpu, - 'process_id': self.process.pid - }, - 'recent_alerts': list(self.alert_history)[-10:], # Last 10 alerts - 'recent_recoveries': list(self.recovery_actions)[-10:] # Last 10 recovery actions - } - - def _monitor_loop(self): - """Main health monitoring loop.""" - logger.info("Health monitor loop started") - - while self.running: - try: - start_time = time.time() - - # Run all registered health checks - all_checks = [] - for checker in self.health_checkers: - try: - checks = checker() - all_checks.extend(checks) - except Exception as e: - logger.error(f"Error in health checker {checker.__name__}: {e}") - - # Process health checks and trigger recovery if needed - for check in all_checks: - self._process_health_check(check) - - # Update system metrics - self._update_system_metrics() - - # Sleep until next check - elapsed = time.time() - start_time - sleep_time = max(0, self.check_interval - elapsed) - if sleep_time > 0: - time.sleep(sleep_time) - - except Exception as e: - logger.error(f"Error in health monitor loop: {e}") - time.sleep(5.0) # Fallback sleep - - logger.info("Health monitor loop ended") - - def _process_health_check(self, check: HealthCheck): - """Process a health check result and trigger recovery if needed.""" - with self._lock: - # Store health check - self.health_checks[check.name] = check - - # Log alerts for non-healthy status - if check.status != HealthStatus.HEALTHY: - alert = { - 'timestamp': check.timestamp, - 'component': check.name, - 'status': check.status.value, - 'message': check.message, - 'details': check.details - } - self.alert_history.append(alert) - - logger.warning(f"Health alert [{check.status.value.upper()}] {check.name}: {check.message}") - - # Trigger recovery if critical and recovery action available - if check.status == HealthStatus.CRITICAL and check.recovery_action: - self._trigger_recovery(check.name, check) - - def _trigger_recovery(self, component: str, check: HealthCheck): - """Trigger recovery action for a component.""" - if component in self.recovery_callbacks: - try: - logger.info(f"Triggering recovery for {component}: {check.recovery_action}") - - success = self.recovery_callbacks[component](component, check) - - recovery_action = { - 'timestamp': time.time(), - 'component': component, - 'action': check.recovery_action, - 'reason': check.message, - 'success': success - } - - with self._lock: - self.recovery_actions.append(recovery_action) - - if success: - logger.info(f"Recovery successful for {component}") - else: - logger.error(f"Recovery failed for {component}") - - except Exception as e: - logger.error(f"Error in recovery callback for {component}: {e}") - - def _update_system_metrics(self): - """Update system-level metrics.""" - try: - # Update process metrics for all components - current_time = time.time() - - with self._lock: - for component_id, metrics in self.metrics.items(): - # Update CPU and memory if available - try: - # This is a simplified approach - in practice you'd want - # per-thread or per-component resource tracking - metrics.cpu_usage_percent = self.process.cpu_percent() / len(self.metrics) - memory_info = self.process.memory_info() - metrics.memory_usage_mb = memory_info.rss / (1024 * 1024) / len(self.metrics) - except Exception: - pass - - except Exception as e: - logger.error(f"Error updating system metrics: {e}") - - -# Global health monitor instance -health_monitor = HealthMonitor() \ No newline at end of file diff --git a/core/monitoring/recovery.py b/core/monitoring/recovery.py deleted file mode 100644 index 4ea16dc..0000000 --- a/core/monitoring/recovery.py +++ /dev/null @@ -1,385 +0,0 @@ -""" -Recovery manager for automatic handling of health issues. -Provides circuit breaker patterns, automatic restarts, and graceful degradation. -""" -import time -import logging -import threading -from typing import Dict, List, Optional, Any, Callable -from dataclasses import dataclass -from enum import Enum -from collections import defaultdict, deque - -from .health import HealthCheck, HealthStatus, health_monitor - - -logger = logging.getLogger(__name__) - - -class RecoveryAction(Enum): - """Types of recovery actions.""" - RESTART_STREAM = "restart_stream" - RESTART_THREAD = "restart_thread" - CLEAR_BUFFER = "clear_buffer" - RECONNECT = "reconnect" - THROTTLE = "throttle" - DISABLE = "disable" - - -@dataclass -class RecoveryAttempt: - """Record of a recovery attempt.""" - timestamp: float - component: str - action: RecoveryAction - reason: str - success: bool - details: Dict[str, Any] = None - - -@dataclass -class RecoveryState: - """Recovery state for a component - simplified without circuit breaker.""" - failure_count: int = 0 - success_count: int = 0 - last_failure_time: Optional[float] = None - last_success_time: Optional[float] = None - - -class RecoveryManager: - """Manages automatic recovery actions for health issues.""" - - def __init__(self): - self.recovery_handlers: Dict[str, Callable[[str, HealthCheck], bool]] = {} - self.recovery_states: Dict[str, RecoveryState] = {} - self.recovery_history: deque = deque(maxlen=1000) - self._lock = threading.RLock() - - # Configuration - simplified without circuit breaker - self.recovery_cooldown = 30 # 30 seconds between recovery attempts - self.max_attempts_per_hour = 20 # Still limit to prevent spam, but much higher - - # Track recovery attempts per component - self.recovery_attempts: Dict[str, deque] = defaultdict(lambda: deque(maxlen=50)) - - # Register with health monitor - health_monitor.register_recovery_callback("stream", self._handle_stream_recovery) - health_monitor.register_recovery_callback("thread", self._handle_thread_recovery) - health_monitor.register_recovery_callback("buffer", self._handle_buffer_recovery) - - def register_recovery_handler(self, action: RecoveryAction, handler: Callable[[str, Dict[str, Any]], bool]): - """ - Register a recovery handler for a specific action. - - Args: - action: Type of recovery action - handler: Function that performs the recovery - """ - self.recovery_handlers[action.value] = handler - logger.info(f"Registered recovery handler for {action.value}") - - def can_attempt_recovery(self, component: str) -> bool: - """ - Check if recovery can be attempted for a component. - - Args: - component: Component identifier - - Returns: - True if recovery can be attempted (always allow with minimal throttling) - """ - with self._lock: - current_time = time.time() - - # Check recovery attempt rate limiting (much more permissive) - recent_attempts = [ - attempt for attempt in self.recovery_attempts[component] - if current_time - attempt <= 3600 # Last hour - ] - - # Only block if truly excessive attempts - if len(recent_attempts) >= self.max_attempts_per_hour: - logger.warning(f"Recovery rate limit exceeded for {component} " - f"({len(recent_attempts)} attempts in last hour)") - return False - - # Check cooldown period (shorter cooldown) - if recent_attempts: - last_attempt = max(recent_attempts) - if current_time - last_attempt < self.recovery_cooldown: - logger.debug(f"Recovery cooldown active for {component} " - f"(last attempt {current_time - last_attempt:.1f}s ago)") - return False - - return True - - def attempt_recovery(self, component: str, action: RecoveryAction, reason: str, - details: Optional[Dict[str, Any]] = None) -> bool: - """ - Attempt recovery for a component. - - Args: - component: Component identifier - action: Recovery action to perform - reason: Reason for recovery - details: Additional details - - Returns: - True if recovery was successful - """ - if not self.can_attempt_recovery(component): - return False - - current_time = time.time() - - logger.info(f"Attempting recovery for {component}: {action.value} ({reason})") - - try: - # Record recovery attempt - with self._lock: - self.recovery_attempts[component].append(current_time) - - # Perform recovery action - success = self._execute_recovery_action(component, action, details or {}) - - # Record recovery result - attempt = RecoveryAttempt( - timestamp=current_time, - component=component, - action=action, - reason=reason, - success=success, - details=details - ) - - with self._lock: - self.recovery_history.append(attempt) - - # Update recovery state - self._update_recovery_state(component, success) - - if success: - logger.info(f"Recovery successful for {component}: {action.value}") - else: - logger.error(f"Recovery failed for {component}: {action.value}") - - return success - - except Exception as e: - logger.error(f"Error during recovery for {component}: {e}") - self._update_recovery_state(component, False) - return False - - def _execute_recovery_action(self, component: str, action: RecoveryAction, - details: Dict[str, Any]) -> bool: - """Execute a specific recovery action.""" - handler_key = action.value - - if handler_key not in self.recovery_handlers: - logger.error(f"No recovery handler registered for action: {handler_key}") - return False - - try: - handler = self.recovery_handlers[handler_key] - return handler(component, details) - - except Exception as e: - logger.error(f"Error executing recovery action {handler_key} for {component}: {e}") - return False - - def _update_recovery_state(self, component: str, success: bool): - """Update recovery state based on recovery result.""" - current_time = time.time() - - with self._lock: - if component not in self.recovery_states: - self.recovery_states[component] = RecoveryState() - - state = self.recovery_states[component] - - if success: - state.success_count += 1 - state.last_success_time = current_time - # Reset failure count on success - state.failure_count = max(0, state.failure_count - 1) - logger.debug(f"Recovery success for {component} (total successes: {state.success_count})") - else: - state.failure_count += 1 - state.last_failure_time = current_time - logger.debug(f"Recovery failure for {component} (total failures: {state.failure_count})") - - def _handle_stream_recovery(self, component: str, health_check: HealthCheck) -> bool: - """Handle recovery for stream-related issues.""" - if "frames" in health_check.name: - # Frame-related issue - restart stream - return self.attempt_recovery( - component, - RecoveryAction.RESTART_STREAM, - health_check.message, - health_check.details - ) - elif "connection" in health_check.name: - # Connection issue - reconnect - return self.attempt_recovery( - component, - RecoveryAction.RECONNECT, - health_check.message, - health_check.details - ) - elif "errors" in health_check.name: - # High error rate - throttle or restart - return self.attempt_recovery( - component, - RecoveryAction.THROTTLE, - health_check.message, - health_check.details - ) - else: - # Generic stream issue - restart - return self.attempt_recovery( - component, - RecoveryAction.RESTART_STREAM, - health_check.message, - health_check.details - ) - - def _handle_thread_recovery(self, component: str, health_check: HealthCheck) -> bool: - """Handle recovery for thread-related issues.""" - if "deadlock" in health_check.name: - # Deadlock detected - restart thread - return self.attempt_recovery( - component, - RecoveryAction.RESTART_THREAD, - health_check.message, - health_check.details - ) - elif "responsive" in health_check.name: - # Thread unresponsive - restart - return self.attempt_recovery( - component, - RecoveryAction.RESTART_THREAD, - health_check.message, - health_check.details - ) - else: - # Generic thread issue - restart - return self.attempt_recovery( - component, - RecoveryAction.RESTART_THREAD, - health_check.message, - health_check.details - ) - - def _handle_buffer_recovery(self, component: str, health_check: HealthCheck) -> bool: - """Handle recovery for buffer-related issues.""" - # Buffer issues - clear buffer - return self.attempt_recovery( - component, - RecoveryAction.CLEAR_BUFFER, - health_check.message, - health_check.details - ) - - def get_recovery_stats(self) -> Dict[str, Any]: - """Get recovery statistics.""" - current_time = time.time() - - with self._lock: - # Calculate stats from history - recent_recoveries = [ - attempt for attempt in self.recovery_history - if current_time - attempt.timestamp <= 3600 # Last hour - ] - - stats_by_component = defaultdict(lambda: { - 'attempts': 0, - 'successes': 0, - 'failures': 0, - 'last_attempt': None, - 'last_success': None - }) - - for attempt in recent_recoveries: - stats = stats_by_component[attempt.component] - stats['attempts'] += 1 - - if attempt.success: - stats['successes'] += 1 - if not stats['last_success'] or attempt.timestamp > stats['last_success']: - stats['last_success'] = attempt.timestamp - else: - stats['failures'] += 1 - - if not stats['last_attempt'] or attempt.timestamp > stats['last_attempt']: - stats['last_attempt'] = attempt.timestamp - - return { - 'total_recoveries_last_hour': len(recent_recoveries), - 'recovery_by_component': dict(stats_by_component), - 'recovery_states': { - component: { - 'failure_count': state.failure_count, - 'success_count': state.success_count, - 'last_failure_time': state.last_failure_time, - 'last_success_time': state.last_success_time - } - for component, state in self.recovery_states.items() - }, - 'recent_history': [ - { - 'timestamp': attempt.timestamp, - 'component': attempt.component, - 'action': attempt.action.value, - 'reason': attempt.reason, - 'success': attempt.success - } - for attempt in list(self.recovery_history)[-10:] # Last 10 attempts - ] - } - - def force_recovery(self, component: str, action: RecoveryAction, reason: str = "manual") -> bool: - """ - Force recovery for a component, bypassing rate limiting. - - Args: - component: Component identifier - action: Recovery action to perform - reason: Reason for forced recovery - - Returns: - True if recovery was successful - """ - logger.info(f"Forcing recovery for {component}: {action.value} ({reason})") - - current_time = time.time() - - try: - # Execute recovery action directly - success = self._execute_recovery_action(component, action, {}) - - # Record forced recovery - attempt = RecoveryAttempt( - timestamp=current_time, - component=component, - action=action, - reason=f"forced: {reason}", - success=success, - details={'forced': True} - ) - - with self._lock: - self.recovery_history.append(attempt) - self.recovery_attempts[component].append(current_time) - - # Update recovery state - self._update_recovery_state(component, success) - - return success - - except Exception as e: - logger.error(f"Error during forced recovery for {component}: {e}") - return False - - -# Global recovery manager instance -recovery_manager = RecoveryManager() \ No newline at end of file diff --git a/core/monitoring/stream_health.py b/core/monitoring/stream_health.py deleted file mode 100644 index 770dfe4..0000000 --- a/core/monitoring/stream_health.py +++ /dev/null @@ -1,351 +0,0 @@ -""" -Stream-specific health monitoring for video streams. -Tracks frame production, connection health, and stream-specific metrics. -""" -import time -import logging -import threading -import requests -from typing import Dict, Optional, List, Any -from collections import deque -from dataclasses import dataclass - -from .health import HealthCheck, HealthStatus, health_monitor - - -logger = logging.getLogger(__name__) - - -@dataclass -class StreamMetrics: - """Metrics for an individual stream.""" - camera_id: str - stream_type: str # 'rtsp', 'http_snapshot' - start_time: float - last_frame_time: Optional[float] = None - frame_count: int = 0 - error_count: int = 0 - reconnect_count: int = 0 - bytes_received: int = 0 - frames_per_second: float = 0.0 - connection_attempts: int = 0 - last_connection_test: Optional[float] = None - connection_healthy: bool = True - last_error: Optional[str] = None - last_error_time: Optional[float] = None - - -class StreamHealthTracker: - """Tracks health for individual video streams.""" - - def __init__(self): - self.streams: Dict[str, StreamMetrics] = {} - self._lock = threading.RLock() - - # Configuration - self.connection_test_interval = 300 # Test connection every 5 minutes - self.frame_timeout_warning = 120 # Warn if no frames for 2 minutes - self.frame_timeout_critical = 300 # Critical if no frames for 5 minutes - self.error_rate_threshold = 0.1 # 10% error rate threshold - - # Register with health monitor - health_monitor.register_health_checker(self._perform_health_checks) - - def register_stream(self, camera_id: str, stream_type: str, source_url: Optional[str] = None): - """Register a new stream for monitoring.""" - with self._lock: - if camera_id not in self.streams: - self.streams[camera_id] = StreamMetrics( - camera_id=camera_id, - stream_type=stream_type, - start_time=time.time() - ) - logger.info(f"Registered stream for monitoring: {camera_id} ({stream_type})") - - # Update health monitor metrics - health_monitor.update_metrics( - camera_id, - thread_alive=True, - connection_healthy=True - ) - - def unregister_stream(self, camera_id: str): - """Unregister a stream from monitoring.""" - with self._lock: - if camera_id in self.streams: - del self.streams[camera_id] - logger.info(f"Unregistered stream from monitoring: {camera_id}") - - def report_frame_received(self, camera_id: str, frame_size_bytes: int = 0): - """Report that a frame was received.""" - current_time = time.time() - - with self._lock: - if camera_id not in self.streams: - logger.warning(f"Frame received for unregistered stream: {camera_id}") - return - - stream = self.streams[camera_id] - - # Update frame metrics - if stream.last_frame_time: - interval = current_time - stream.last_frame_time - # Calculate FPS as moving average - if stream.frames_per_second == 0: - stream.frames_per_second = 1.0 / interval if interval > 0 else 0 - else: - new_fps = 1.0 / interval if interval > 0 else 0 - stream.frames_per_second = (stream.frames_per_second * 0.9) + (new_fps * 0.1) - - stream.last_frame_time = current_time - stream.frame_count += 1 - stream.bytes_received += frame_size_bytes - - # Report to health monitor - health_monitor.report_frame_received(camera_id) - health_monitor.update_metrics( - camera_id, - frame_count=stream.frame_count, - avg_frame_interval=1.0 / stream.frames_per_second if stream.frames_per_second > 0 else 0, - last_frame_time=current_time - ) - - def report_error(self, camera_id: str, error_message: str): - """Report an error for a stream.""" - current_time = time.time() - - with self._lock: - if camera_id not in self.streams: - logger.warning(f"Error reported for unregistered stream: {camera_id}") - return - - stream = self.streams[camera_id] - stream.error_count += 1 - stream.last_error = error_message - stream.last_error_time = current_time - - # Report to health monitor - health_monitor.report_error(camera_id, "stream_error") - health_monitor.update_metrics( - camera_id, - error_count=stream.error_count - ) - - logger.debug(f"Error reported for stream {camera_id}: {error_message}") - - def report_reconnect(self, camera_id: str, reason: str = "unknown"): - """Report that a stream reconnected.""" - current_time = time.time() - - with self._lock: - if camera_id not in self.streams: - logger.warning(f"Reconnect reported for unregistered stream: {camera_id}") - return - - stream = self.streams[camera_id] - stream.reconnect_count += 1 - - # Report to health monitor - health_monitor.report_restart(camera_id) - health_monitor.update_metrics( - camera_id, - restart_count=stream.reconnect_count - ) - - logger.info(f"Reconnect reported for stream {camera_id}: {reason}") - - def report_connection_attempt(self, camera_id: str, success: bool): - """Report a connection attempt.""" - with self._lock: - if camera_id not in self.streams: - return - - stream = self.streams[camera_id] - stream.connection_attempts += 1 - stream.connection_healthy = success - - # Report to health monitor - health_monitor.update_metrics( - camera_id, - connection_healthy=success - ) - - def test_http_connection(self, camera_id: str, url: str) -> bool: - """Test HTTP connection health for snapshot streams.""" - try: - # Quick HEAD request to test connectivity - response = requests.head(url, timeout=5, verify=False) - success = response.status_code in [200, 404] # 404 might be normal for some cameras - - self.report_connection_attempt(camera_id, success) - - if success: - logger.debug(f"Connection test passed for {camera_id}") - else: - logger.warning(f"Connection test failed for {camera_id}: HTTP {response.status_code}") - - return success - - except Exception as e: - logger.warning(f"Connection test failed for {camera_id}: {e}") - self.report_connection_attempt(camera_id, False) - return False - - def get_stream_metrics(self, camera_id: str) -> Optional[Dict[str, Any]]: - """Get metrics for a specific stream.""" - with self._lock: - if camera_id not in self.streams: - return None - - stream = self.streams[camera_id] - current_time = time.time() - - # Calculate derived metrics - uptime = current_time - stream.start_time - frame_age = current_time - stream.last_frame_time if stream.last_frame_time else None - error_rate = stream.error_count / max(1, stream.frame_count) - - return { - 'camera_id': camera_id, - 'stream_type': stream.stream_type, - 'uptime_seconds': uptime, - 'frame_count': stream.frame_count, - 'frames_per_second': stream.frames_per_second, - 'bytes_received': stream.bytes_received, - 'error_count': stream.error_count, - 'error_rate': error_rate, - 'reconnect_count': stream.reconnect_count, - 'connection_attempts': stream.connection_attempts, - 'connection_healthy': stream.connection_healthy, - 'last_frame_age_seconds': frame_age, - 'last_error': stream.last_error, - 'last_error_time': stream.last_error_time - } - - def get_all_metrics(self) -> Dict[str, Dict[str, Any]]: - """Get metrics for all streams.""" - with self._lock: - return { - camera_id: self.get_stream_metrics(camera_id) - for camera_id in self.streams.keys() - } - - def _perform_health_checks(self) -> List[HealthCheck]: - """Perform health checks for all streams.""" - checks = [] - current_time = time.time() - - with self._lock: - for camera_id, stream in self.streams.items(): - checks.extend(self._check_stream_health(camera_id, stream, current_time)) - - return checks - - def _check_stream_health(self, camera_id: str, stream: StreamMetrics, current_time: float) -> List[HealthCheck]: - """Perform health checks for a single stream.""" - checks = [] - - # Check frame freshness - if stream.last_frame_time: - frame_age = current_time - stream.last_frame_time - - if frame_age > self.frame_timeout_critical: - checks.append(HealthCheck( - name=f"stream_{camera_id}_frames", - status=HealthStatus.CRITICAL, - message=f"No frames for {frame_age:.1f}s (critical threshold: {self.frame_timeout_critical}s)", - details={ - 'frame_age': frame_age, - 'threshold': self.frame_timeout_critical, - 'last_frame_time': stream.last_frame_time - }, - recovery_action="restart_stream" - )) - elif frame_age > self.frame_timeout_warning: - checks.append(HealthCheck( - name=f"stream_{camera_id}_frames", - status=HealthStatus.WARNING, - message=f"Frames aging: {frame_age:.1f}s (warning threshold: {self.frame_timeout_warning}s)", - details={ - 'frame_age': frame_age, - 'threshold': self.frame_timeout_warning, - 'last_frame_time': stream.last_frame_time - } - )) - else: - # No frames received yet - startup_time = current_time - stream.start_time - if startup_time > 60: # Allow 1 minute for initial connection - checks.append(HealthCheck( - name=f"stream_{camera_id}_startup", - status=HealthStatus.CRITICAL, - message=f"No frames received since startup {startup_time:.1f}s ago", - details={ - 'startup_time': startup_time, - 'start_time': stream.start_time - }, - recovery_action="restart_stream" - )) - - # Check error rate - if stream.frame_count > 10: # Need sufficient samples - error_rate = stream.error_count / stream.frame_count - if error_rate > self.error_rate_threshold: - checks.append(HealthCheck( - name=f"stream_{camera_id}_errors", - status=HealthStatus.WARNING, - message=f"High error rate: {error_rate:.1%} ({stream.error_count}/{stream.frame_count})", - details={ - 'error_rate': error_rate, - 'error_count': stream.error_count, - 'frame_count': stream.frame_count, - 'last_error': stream.last_error - } - )) - - # Check connection health - if not stream.connection_healthy: - checks.append(HealthCheck( - name=f"stream_{camera_id}_connection", - status=HealthStatus.WARNING, - message="Connection unhealthy (last test failed)", - details={ - 'connection_attempts': stream.connection_attempts, - 'last_connection_test': stream.last_connection_test - } - )) - - # Check excessive reconnects - uptime_hours = (current_time - stream.start_time) / 3600 - if uptime_hours > 1 and stream.reconnect_count > 5: # More than 5 reconnects per hour - reconnect_rate = stream.reconnect_count / uptime_hours - checks.append(HealthCheck( - name=f"stream_{camera_id}_stability", - status=HealthStatus.WARNING, - message=f"Frequent reconnects: {reconnect_rate:.1f}/hour ({stream.reconnect_count} total)", - details={ - 'reconnect_rate': reconnect_rate, - 'reconnect_count': stream.reconnect_count, - 'uptime_hours': uptime_hours - } - )) - - # Check frame rate health - if stream.last_frame_time and stream.frames_per_second > 0: - expected_fps = 6.0 # Expected FPS for streams - if stream.frames_per_second < expected_fps * 0.5: # Less than 50% of expected - checks.append(HealthCheck( - name=f"stream_{camera_id}_framerate", - status=HealthStatus.WARNING, - message=f"Low frame rate: {stream.frames_per_second:.1f} fps (expected: ~{expected_fps} fps)", - details={ - 'current_fps': stream.frames_per_second, - 'expected_fps': expected_fps - } - )) - - return checks - - -# Global stream health tracker instance -stream_health_tracker = StreamHealthTracker() \ No newline at end of file diff --git a/core/monitoring/thread_health.py b/core/monitoring/thread_health.py deleted file mode 100644 index a29625b..0000000 --- a/core/monitoring/thread_health.py +++ /dev/null @@ -1,381 +0,0 @@ -""" -Thread health monitoring for detecting unresponsive and deadlocked threads. -Provides thread liveness detection and responsiveness testing. -""" -import time -import threading -import logging -import signal -import traceback -from typing import Dict, List, Optional, Any, Callable -from dataclasses import dataclass -from collections import defaultdict - -from .health import HealthCheck, HealthStatus, health_monitor - - -logger = logging.getLogger(__name__) - - -@dataclass -class ThreadInfo: - """Information about a monitored thread.""" - thread_id: int - thread_name: str - start_time: float - last_heartbeat: float - heartbeat_count: int = 0 - is_responsive: bool = True - last_activity: Optional[str] = None - stack_traces: List[str] = None - - -class ThreadHealthMonitor: - """Monitors thread health and responsiveness.""" - - def __init__(self): - self.monitored_threads: Dict[int, ThreadInfo] = {} - self.heartbeat_callbacks: Dict[int, Callable[[], bool]] = {} - self._lock = threading.RLock() - - # Configuration - self.heartbeat_timeout = 60.0 # 1 minute without heartbeat = unresponsive - self.responsiveness_test_interval = 30.0 # Test responsiveness every 30 seconds - self.stack_trace_count = 5 # Keep last 5 stack traces for analysis - - # Register with health monitor - health_monitor.register_health_checker(self._perform_health_checks) - - # Enable periodic responsiveness testing - self.test_thread = threading.Thread(target=self._responsiveness_test_loop, daemon=True) - self.test_thread.start() - - def register_thread(self, thread: threading.Thread, heartbeat_callback: Optional[Callable[[], bool]] = None): - """ - Register a thread for monitoring. - - Args: - thread: Thread to monitor - heartbeat_callback: Optional callback to test thread responsiveness - """ - with self._lock: - thread_info = ThreadInfo( - thread_id=thread.ident, - thread_name=thread.name, - start_time=time.time(), - last_heartbeat=time.time() - ) - - self.monitored_threads[thread.ident] = thread_info - - if heartbeat_callback: - self.heartbeat_callbacks[thread.ident] = heartbeat_callback - - logger.info(f"Registered thread for monitoring: {thread.name} (ID: {thread.ident})") - - def unregister_thread(self, thread_id: int): - """Unregister a thread from monitoring.""" - with self._lock: - if thread_id in self.monitored_threads: - thread_name = self.monitored_threads[thread_id].thread_name - del self.monitored_threads[thread_id] - - if thread_id in self.heartbeat_callbacks: - del self.heartbeat_callbacks[thread_id] - - logger.info(f"Unregistered thread from monitoring: {thread_name} (ID: {thread_id})") - - def heartbeat(self, thread_id: Optional[int] = None, activity: Optional[str] = None): - """ - Report thread heartbeat. - - Args: - thread_id: Thread ID (uses current thread if None) - activity: Description of current activity - """ - if thread_id is None: - thread_id = threading.current_thread().ident - - current_time = time.time() - - with self._lock: - if thread_id in self.monitored_threads: - thread_info = self.monitored_threads[thread_id] - thread_info.last_heartbeat = current_time - thread_info.heartbeat_count += 1 - thread_info.is_responsive = True - - if activity: - thread_info.last_activity = activity - - # Report to health monitor - health_monitor.update_metrics( - f"thread_{thread_info.thread_name}", - thread_alive=True, - last_frame_time=current_time - ) - - def get_thread_info(self, thread_id: int) -> Optional[Dict[str, Any]]: - """Get information about a monitored thread.""" - with self._lock: - if thread_id not in self.monitored_threads: - return None - - thread_info = self.monitored_threads[thread_id] - current_time = time.time() - - return { - 'thread_id': thread_id, - 'thread_name': thread_info.thread_name, - 'uptime_seconds': current_time - thread_info.start_time, - 'last_heartbeat_age': current_time - thread_info.last_heartbeat, - 'heartbeat_count': thread_info.heartbeat_count, - 'is_responsive': thread_info.is_responsive, - 'last_activity': thread_info.last_activity, - 'stack_traces': thread_info.stack_traces or [] - } - - def get_all_thread_info(self) -> Dict[int, Dict[str, Any]]: - """Get information about all monitored threads.""" - with self._lock: - return { - thread_id: self.get_thread_info(thread_id) - for thread_id in self.monitored_threads.keys() - } - - def test_thread_responsiveness(self, thread_id: int) -> bool: - """ - Test if a thread is responsive by calling its heartbeat callback. - - Args: - thread_id: ID of thread to test - - Returns: - True if thread responds within timeout - """ - if thread_id not in self.heartbeat_callbacks: - return True # Can't test if no callback provided - - try: - # Call the heartbeat callback with a timeout - callback = self.heartbeat_callbacks[thread_id] - - # This is a simple approach - in practice you might want to use - # threading.Timer or asyncio for more sophisticated timeout handling - start_time = time.time() - result = callback() - response_time = time.time() - start_time - - with self._lock: - if thread_id in self.monitored_threads: - self.monitored_threads[thread_id].is_responsive = result - - if response_time > 5.0: # Slow response - logger.warning(f"Thread {thread_id} slow response: {response_time:.1f}s") - - return result - - except Exception as e: - logger.error(f"Error testing thread {thread_id} responsiveness: {e}") - with self._lock: - if thread_id in self.monitored_threads: - self.monitored_threads[thread_id].is_responsive = False - return False - - def capture_stack_trace(self, thread_id: int) -> Optional[str]: - """ - Capture stack trace for a thread. - - Args: - thread_id: ID of thread to capture - - Returns: - Stack trace string or None if not available - """ - try: - # Get all frames for all threads - frames = dict(threading._current_frames()) - - if thread_id not in frames: - return None - - # Format stack trace - frame = frames[thread_id] - stack_trace = ''.join(traceback.format_stack(frame)) - - # Store in thread info - with self._lock: - if thread_id in self.monitored_threads: - thread_info = self.monitored_threads[thread_id] - if thread_info.stack_traces is None: - thread_info.stack_traces = [] - - thread_info.stack_traces.append(f"{time.time()}: {stack_trace}") - - # Keep only last N stack traces - if len(thread_info.stack_traces) > self.stack_trace_count: - thread_info.stack_traces = thread_info.stack_traces[-self.stack_trace_count:] - - return stack_trace - - except Exception as e: - logger.error(f"Error capturing stack trace for thread {thread_id}: {e}") - return None - - def detect_deadlocks(self) -> List[Dict[str, Any]]: - """ - Attempt to detect potential deadlocks by analyzing thread states. - - Returns: - List of potential deadlock scenarios - """ - deadlocks = [] - current_time = time.time() - - with self._lock: - # Look for threads that haven't had heartbeats for a long time - # and are supposedly alive - for thread_id, thread_info in self.monitored_threads.items(): - heartbeat_age = current_time - thread_info.last_heartbeat - - if heartbeat_age > self.heartbeat_timeout * 2: # Double the timeout - # Check if thread still exists - thread_exists = any( - t.ident == thread_id and t.is_alive() - for t in threading.enumerate() - ) - - if thread_exists: - # Thread exists but not responding - potential deadlock - stack_trace = self.capture_stack_trace(thread_id) - - deadlock_info = { - 'thread_id': thread_id, - 'thread_name': thread_info.thread_name, - 'heartbeat_age': heartbeat_age, - 'last_activity': thread_info.last_activity, - 'stack_trace': stack_trace, - 'detection_time': current_time - } - - deadlocks.append(deadlock_info) - logger.warning(f"Potential deadlock detected in thread {thread_info.thread_name}") - - return deadlocks - - def _responsiveness_test_loop(self): - """Background loop to test thread responsiveness.""" - logger.info("Thread responsiveness testing started") - - while True: - try: - time.sleep(self.responsiveness_test_interval) - - with self._lock: - thread_ids = list(self.monitored_threads.keys()) - - for thread_id in thread_ids: - try: - self.test_thread_responsiveness(thread_id) - except Exception as e: - logger.error(f"Error testing thread {thread_id}: {e}") - - except Exception as e: - logger.error(f"Error in responsiveness test loop: {e}") - time.sleep(10.0) # Fallback sleep - - def _perform_health_checks(self) -> List[HealthCheck]: - """Perform health checks for all monitored threads.""" - checks = [] - current_time = time.time() - - with self._lock: - for thread_id, thread_info in self.monitored_threads.items(): - checks.extend(self._check_thread_health(thread_id, thread_info, current_time)) - - # Check for deadlocks - deadlocks = self.detect_deadlocks() - for deadlock in deadlocks: - checks.append(HealthCheck( - name=f"deadlock_detection_{deadlock['thread_id']}", - status=HealthStatus.CRITICAL, - message=f"Potential deadlock in thread {deadlock['thread_name']} " - f"(unresponsive for {deadlock['heartbeat_age']:.1f}s)", - details=deadlock, - recovery_action="restart_thread" - )) - - return checks - - def _check_thread_health(self, thread_id: int, thread_info: ThreadInfo, current_time: float) -> List[HealthCheck]: - """Perform health checks for a single thread.""" - checks = [] - - # Check if thread still exists - thread_exists = any( - t.ident == thread_id and t.is_alive() - for t in threading.enumerate() - ) - - if not thread_exists: - checks.append(HealthCheck( - name=f"thread_{thread_info.thread_name}_alive", - status=HealthStatus.CRITICAL, - message=f"Thread {thread_info.thread_name} is no longer alive", - details={ - 'thread_id': thread_id, - 'uptime': current_time - thread_info.start_time, - 'last_heartbeat': thread_info.last_heartbeat - }, - recovery_action="restart_thread" - )) - return checks - - # Check heartbeat freshness - heartbeat_age = current_time - thread_info.last_heartbeat - - if heartbeat_age > self.heartbeat_timeout: - checks.append(HealthCheck( - name=f"thread_{thread_info.thread_name}_responsive", - status=HealthStatus.CRITICAL, - message=f"Thread {thread_info.thread_name} unresponsive for {heartbeat_age:.1f}s", - details={ - 'thread_id': thread_id, - 'heartbeat_age': heartbeat_age, - 'heartbeat_count': thread_info.heartbeat_count, - 'last_activity': thread_info.last_activity, - 'is_responsive': thread_info.is_responsive - }, - recovery_action="restart_thread" - )) - elif heartbeat_age > self.heartbeat_timeout * 0.5: # Warning at 50% of timeout - checks.append(HealthCheck( - name=f"thread_{thread_info.thread_name}_responsive", - status=HealthStatus.WARNING, - message=f"Thread {thread_info.thread_name} slow heartbeat: {heartbeat_age:.1f}s", - details={ - 'thread_id': thread_id, - 'heartbeat_age': heartbeat_age, - 'heartbeat_count': thread_info.heartbeat_count, - 'last_activity': thread_info.last_activity, - 'is_responsive': thread_info.is_responsive - } - )) - - # Check responsiveness test results - if not thread_info.is_responsive: - checks.append(HealthCheck( - name=f"thread_{thread_info.thread_name}_callback", - status=HealthStatus.WARNING, - message=f"Thread {thread_info.thread_name} failed responsiveness test", - details={ - 'thread_id': thread_id, - 'last_activity': thread_info.last_activity - } - )) - - return checks - - -# Global thread health monitor instance -thread_health_monitor = ThreadHealthMonitor() \ No newline at end of file diff --git a/core/streaming/__init__.py b/core/streaming/__init__.py index 93005ab..d878aac 100644 --- a/core/streaming/__init__.py +++ b/core/streaming/__init__.py @@ -2,12 +2,13 @@ Streaming system for RTSP and HTTP camera feeds. Provides modular frame readers, buffers, and stream management. """ -from .readers import HTTPSnapshotReader, FFmpegRTSPReader +from .readers import RTSPReader, HTTPSnapshotReader, FFmpegRTSPReader from .buffers import FrameBuffer, CacheBuffer, shared_frame_buffer, shared_cache_buffer from .manager import StreamManager, StreamConfig, SubscriptionInfo, shared_stream_manager, initialize_stream_manager __all__ = [ # Readers + 'RTSPReader', 'HTTPSnapshotReader', 'FFmpegRTSPReader', diff --git a/core/streaming/manager.py b/core/streaming/manager.py index c4ebd77..0c026e7 100644 --- a/core/streaming/manager.py +++ b/core/streaming/manager.py @@ -5,13 +5,11 @@ Optimized for 1280x720@6fps RTSP and 2560x1440 HTTP snapshots. import logging import threading import time -import queue -import asyncio from typing import Dict, Set, Optional, List, Any from dataclasses import dataclass from collections import defaultdict -from .readers import HTTPSnapshotReader, FFmpegRTSPReader +from .readers import RTSPReader, HTTPSnapshotReader, FFmpegRTSPReader from .buffers import shared_cache_buffer from ..tracking.integration import TrackingPipelineIntegration @@ -52,65 +50,6 @@ class StreamManager: self._camera_subscribers: Dict[str, Set[str]] = defaultdict(set) # camera_id -> set of subscription_ids self._lock = threading.RLock() - # Fair tracking queue system - per camera queues - self._tracking_queues: Dict[str, queue.Queue] = {} # camera_id -> queue - self._tracking_workers = [] - self._stop_workers = threading.Event() - self._dropped_frame_counts: Dict[str, int] = {} # per-camera drop counts - - # Round-robin scheduling state - self._camera_list = [] # Ordered list of active cameras - self._camera_round_robin_index = 0 - self._round_robin_lock = threading.Lock() - - # Start worker threads for tracking processing - num_workers = min(4, max_streams // 2 + 1) # Scale with streams - for i in range(num_workers): - worker = threading.Thread( - target=self._tracking_worker_loop, - name=f"TrackingWorker-{i}", - daemon=True - ) - worker.start() - self._tracking_workers.append(worker) - - logger.info(f"Started {num_workers} tracking worker threads") - - def _ensure_camera_queue(self, camera_id: str): - """Ensure a tracking queue exists for the camera.""" - if camera_id not in self._tracking_queues: - self._tracking_queues[camera_id] = queue.Queue(maxsize=10) # 10 frames per camera - self._dropped_frame_counts[camera_id] = 0 - - with self._round_robin_lock: - if camera_id not in self._camera_list: - self._camera_list.append(camera_id) - logger.info(f"Created tracking queue for camera {camera_id}") - else: - logger.debug(f"Camera {camera_id} already has tracking queue") - - def _remove_camera_queue(self, camera_id: str): - """Remove tracking queue for a camera that's no longer active.""" - if camera_id in self._tracking_queues: - # Clear any remaining items - while not self._tracking_queues[camera_id].empty(): - try: - self._tracking_queues[camera_id].get_nowait() - except queue.Empty: - break - - del self._tracking_queues[camera_id] - del self._dropped_frame_counts[camera_id] - - with self._round_robin_lock: - if camera_id in self._camera_list: - self._camera_list.remove(camera_id) - # Reset index if needed - if self._camera_round_robin_index >= len(self._camera_list): - self._camera_round_robin_index = 0 - - logger.info(f"Removed tracking queue for camera {camera_id}") - def add_subscription(self, subscription_id: str, stream_config: StreamConfig, crop_coords: Optional[tuple] = None, model_id: Optional[str] = None, @@ -154,10 +93,6 @@ class StreamManager: if not success: self._remove_subscription_internal(subscription_id) return False - else: - # Stream already exists, but ensure queue exists too - logger.info(f"Stream already exists for {camera_id}, ensuring queue exists") - self._ensure_camera_queue(camera_id) logger.info(f"Added subscription {subscription_id} for camera {camera_id} " f"({len(self._camera_subscribers[camera_id])} total subscribers)") @@ -204,7 +139,6 @@ class StreamManager: reader.set_frame_callback(self._frame_callback) reader.start() self._streams[camera_id] = reader - self._ensure_camera_queue(camera_id) # Create tracking queue logger.info(f"\033[92m[RTSP] {camera_id} connected\033[0m") elif stream_config.snapshot_url: @@ -219,7 +153,6 @@ class StreamManager: reader.set_frame_callback(self._frame_callback) reader.start() self._streams[camera_id] = reader - self._ensure_camera_queue(camera_id) # Create tracking queue logger.info(f"\033[92m[HTTP] {camera_id} connected\033[0m") else: @@ -238,7 +171,6 @@ class StreamManager: try: self._streams[camera_id].stop() del self._streams[camera_id] - self._remove_camera_queue(camera_id) # Remove tracking queue # DON'T clear frames - they should persist until replaced # shared_cache_buffer.clear_camera(camera_id) # REMOVED - frames should persist logger.info(f"Stopped stream for camera {camera_id} (frames preserved in buffer)") @@ -261,19 +193,8 @@ class StreamManager: available_cameras = shared_cache_buffer.frame_buffer.get_camera_list() logger.info(f"\033[96m[BUFFER] {len(available_cameras)} active cameras: {', '.join(available_cameras)}\033[0m") - # Queue for tracking processing (non-blocking) - route to camera-specific queue - if camera_id in self._tracking_queues: - try: - self._tracking_queues[camera_id].put_nowait({ - 'frame': frame, - 'timestamp': time.time() - }) - except queue.Full: - # Drop frame if camera queue is full (maintain real-time) - self._dropped_frame_counts[camera_id] += 1 - - if self._dropped_frame_counts[camera_id] % 50 == 0: - logger.warning(f"Dropped {self._dropped_frame_counts[camera_id]} frames for camera {camera_id} due to full queue") + # Process tracking for subscriptions with tracking integration + self._process_tracking_for_camera(camera_id, frame) except Exception as e: logger.error(f"Error in frame callback for camera {camera_id}: {e}") @@ -330,134 +251,6 @@ class StreamManager: except Exception as e: logger.error(f"Error processing tracking for camera {camera_id}: {e}") - def _tracking_worker_loop(self): - """Worker thread loop for round-robin processing of camera queues.""" - logger.info(f"Tracking worker {threading.current_thread().name} started") - - consecutive_empty = 0 - max_consecutive_empty = 10 # Sleep if all cameras empty this many times - - while not self._stop_workers.is_set(): - try: - # Get next camera in round-robin fashion - camera_id, item = self._get_next_camera_item() - - if camera_id is None: - # No cameras have items, sleep briefly - consecutive_empty += 1 - if consecutive_empty >= max_consecutive_empty: - time.sleep(0.1) # Sleep 100ms if nothing to process - consecutive_empty = 0 - continue - - consecutive_empty = 0 # Reset counter when we find work - - frame = item['frame'] - timestamp = item['timestamp'] - - # Check if frame is too old (drop if > 1 second old) - age = time.time() - timestamp - if age > 1.0: - logger.debug(f"Dropping old frame for {camera_id} (age: {age:.2f}s)") - continue - - # Process tracking for this camera's frame - self._process_tracking_for_camera_sync(camera_id, frame) - - except Exception as e: - logger.error(f"Error in tracking worker: {e}", exc_info=True) - - logger.info(f"Tracking worker {threading.current_thread().name} stopped") - - def _get_next_camera_item(self): - """Get next item from camera queues using round-robin scheduling.""" - with self._round_robin_lock: - # Get current list of cameras from actual tracking queues (central state) - camera_list = list(self._tracking_queues.keys()) - - if not camera_list: - return None, None - - attempts = 0 - max_attempts = len(camera_list) - - while attempts < max_attempts: - # Get current camera using round-robin index - if self._camera_round_robin_index >= len(camera_list): - self._camera_round_robin_index = 0 - - camera_id = camera_list[self._camera_round_robin_index] - - # Move to next camera for next call - self._camera_round_robin_index = (self._camera_round_robin_index + 1) % len(camera_list) - - # Try to get item from this camera's queue - try: - item = self._tracking_queues[camera_id].get_nowait() - return camera_id, item - except queue.Empty: - pass # Try next camera - - attempts += 1 - - return None, None # All cameras empty - - def _process_tracking_for_camera_sync(self, camera_id: str, frame): - """Synchronous version of tracking processing for worker threads.""" - try: - with self._lock: - subscription_ids = list(self._camera_subscribers.get(camera_id, [])) - - for subscription_id in subscription_ids: - subscription_info = self._subscriptions.get(subscription_id) - - if not subscription_info: - logger.warning(f"No subscription info found for {subscription_id}") - continue - - if not subscription_info.tracking_integration: - logger.debug(f"No tracking integration for {subscription_id} (camera {camera_id}), skipping inference") - continue - - display_id = subscription_id.split(';')[0] if ';' in subscription_id else subscription_id - - try: - # Run async tracking in thread's event loop - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - try: - result = loop.run_until_complete( - subscription_info.tracking_integration.process_frame( - frame, display_id, subscription_id - ) - ) - - # Log tracking results - if result: - tracked_count = len(result.get('tracked_vehicles', [])) - validated_vehicle = result.get('validated_vehicle') - pipeline_result = result.get('pipeline_result') - - if tracked_count > 0: - logger.info(f"[Tracking] {camera_id}: {tracked_count} vehicles tracked") - - if validated_vehicle: - logger.info(f"[Tracking] {camera_id}: Vehicle {validated_vehicle['track_id']} " - f"validated as {validated_vehicle['state']} " - f"(confidence: {validated_vehicle['confidence']:.2f})") - - if pipeline_result: - logger.info(f"[Pipeline] {camera_id}: {pipeline_result.get('status', 'unknown')} - " - f"{pipeline_result.get('message', 'no message')}") - finally: - loop.close() - - except Exception as track_e: - logger.error(f"Error in tracking for {subscription_id}: {track_e}") - - except Exception as e: - logger.error(f"Error processing tracking for camera {camera_id}: {e}") - def get_frame(self, camera_id: str, crop_coords: Optional[tuple] = None): """Get the latest frame for a camera with optional cropping.""" return shared_cache_buffer.get_frame(camera_id, crop_coords) @@ -573,30 +366,6 @@ class StreamManager: def stop_all(self): """Stop all streams and clear all subscriptions.""" - # Signal workers to stop - self._stop_workers.set() - - # Clear all camera queues - for camera_id, camera_queue in list(self._tracking_queues.items()): - while not camera_queue.empty(): - try: - camera_queue.get_nowait() - except queue.Empty: - break - - # Wait for workers to finish - for worker in self._tracking_workers: - worker.join(timeout=2.0) - - # Clear queue management structures - self._tracking_queues.clear() - self._dropped_frame_counts.clear() - with self._round_robin_lock: - self._camera_list.clear() - self._camera_round_robin_index = 0 - - logger.info("Stopped all tracking worker threads") - with self._lock: # Stop all streams for camera_id in list(self._streams.keys()): @@ -611,20 +380,13 @@ class StreamManager: def set_session_id(self, display_id: str, session_id: str): """Set session ID for tracking integration.""" - # Ensure session_id is always a string for consistent type handling - session_id = str(session_id) if session_id is not None else None with self._lock: for subscription_info in self._subscriptions.values(): # Check if this subscription matches the display_id subscription_display_id = subscription_info.subscription_id.split(';')[0] if subscription_display_id == display_id and subscription_info.tracking_integration: - # Pass the full subscription_id (displayId;cameraId) to the tracking integration - subscription_info.tracking_integration.set_session_id( - display_id, - session_id, - subscription_id=subscription_info.subscription_id - ) - logger.debug(f"Set session {session_id} for display {display_id} with subscription {subscription_info.subscription_id}") + subscription_info.tracking_integration.set_session_id(display_id, session_id) + logger.debug(f"Set session {session_id} for display {display_id}") def clear_session_id(self, session_id: str): """Clear session ID from the specific tracking integration handling this session.""" diff --git a/core/streaming/readers.py b/core/streaming/readers.py new file mode 100644 index 0000000..d5635ba --- /dev/null +++ b/core/streaming/readers.py @@ -0,0 +1,786 @@ +""" +Frame readers for RTSP streams and HTTP snapshots. +Optimized for 1280x720@6fps RTSP and 2560x1440 HTTP snapshots. +""" +import cv2 +import logging +import time +import threading +import requests +import numpy as np +import os +import subprocess +# import fcntl # No longer needed with atomic file operations +from typing import Optional, Callable +# Removed watchdog imports - no longer using file watching + +# Suppress FFMPEG/H.264 error messages if needed +# Set this environment variable to reduce noise from decoder errors +os.environ["OPENCV_LOG_LEVEL"] = "ERROR" +os.environ["OPENCV_FFMPEG_LOGLEVEL"] = "-8" # Suppress FFMPEG warnings + +logger = logging.getLogger(__name__) + +# Color codes for pretty logging +class Colors: + GREEN = '\033[92m' + YELLOW = '\033[93m' + RED = '\033[91m' + BLUE = '\033[94m' + PURPLE = '\033[95m' + CYAN = '\033[96m' + WHITE = '\033[97m' + BOLD = '\033[1m' + END = '\033[0m' + +def log_success(camera_id: str, message: str): + """Log success messages in green""" + logger.info(f"{Colors.GREEN}[{camera_id}] {message}{Colors.END}") + +def log_warning(camera_id: str, message: str): + """Log warnings in yellow""" + logger.warning(f"{Colors.YELLOW}[{camera_id}] {message}{Colors.END}") + +def log_error(camera_id: str, message: str): + """Log errors in red""" + logger.error(f"{Colors.RED}[{camera_id}] {message}{Colors.END}") + +def log_info(camera_id: str, message: str): + """Log info in cyan""" + logger.info(f"{Colors.CYAN}[{camera_id}] {message}{Colors.END}") + +# Removed watchdog logging configuration - no longer using file watching + + +# Removed FrameFileHandler - no longer using file watching + + +class FFmpegRTSPReader: + """RTSP stream reader using subprocess FFmpeg piping frames directly to buffer.""" + + def __init__(self, camera_id: str, rtsp_url: str, max_retries: int = 3): + self.camera_id = camera_id + self.rtsp_url = rtsp_url + self.max_retries = max_retries + self.process = None + self.stop_event = threading.Event() + self.thread = None + self.frame_callback: Optional[Callable] = None + + # Expected stream specs (for reference, actual dimensions read from PPM header) + self.width = 1280 + self.height = 720 + + def set_frame_callback(self, callback: Callable[[str, np.ndarray], None]): + """Set callback function to handle captured frames.""" + self.frame_callback = callback + + def start(self): + """Start the FFmpeg subprocess reader.""" + if self.thread and self.thread.is_alive(): + logger.warning(f"FFmpeg reader for {self.camera_id} already running") + return + + self.stop_event.clear() + self.thread = threading.Thread(target=self._read_frames, daemon=True) + self.thread.start() + log_success(self.camera_id, "Stream started") + + def stop(self): + """Stop the FFmpeg subprocess reader.""" + self.stop_event.set() + if self.process: + self.process.terminate() + try: + self.process.wait(timeout=5) + except subprocess.TimeoutExpired: + self.process.kill() + if self.thread: + self.thread.join(timeout=5.0) + log_info(self.camera_id, "Stream stopped") + + # Removed _probe_stream_info - BMP headers contain dimensions + + def _start_ffmpeg_process(self): + """Start FFmpeg subprocess outputting BMP frames to stdout pipe.""" + cmd = [ + 'ffmpeg', + # DO NOT REMOVE + # '-hwaccel', 'cuda', + # '-hwaccel_device', '0', + '-rtsp_transport', 'tcp', + '-i', self.rtsp_url, + '-f', 'image2pipe', # Output images to pipe + '-vcodec', 'bmp', # BMP format with header containing dimensions + # Use native stream resolution and framerate + '-an', # No audio + '-' # Output to stdout + ] + + try: + # Start FFmpeg with stdout pipe to read frames directly + self.process = subprocess.Popen( + cmd, + stdout=subprocess.PIPE, # Capture stdout for frame data + stderr=subprocess.DEVNULL, + bufsize=0 # Unbuffered for real-time processing + ) + return True + except Exception as e: + log_error(self.camera_id, f"FFmpeg startup failed: {e}") + return False + + def _read_bmp_frame(self, pipe): + """Read BMP frame from pipe - BMP header contains dimensions.""" + try: + # Read BMP header (14 bytes file header + 40 bytes info header = 54 bytes minimum) + header_data = b'' + bytes_to_read = 54 + + while len(header_data) < bytes_to_read: + chunk = pipe.read(bytes_to_read - len(header_data)) + if not chunk: + return None # Silent end of stream + header_data += chunk + + # Parse BMP header + if header_data[:2] != b'BM': + return None # Invalid format, skip frame silently + + # Extract file size from header (bytes 2-5) + import struct + file_size = struct.unpack('= 60: + log_success(self.camera_id, f"{frame_count} frames captured ({frame.shape[1]}x{frame.shape[0]})") + last_log_time = current_time + + except Exception: + # Process might have died, let it restart on next iteration + if self.process: + self.process.terminate() + self.process = None + time.sleep(1.0) + + except Exception: + time.sleep(1.0) + + # Cleanup + if self.process: + self.process.terminate() + + +logger = logging.getLogger(__name__) + + +class RTSPReader: + """RTSP stream frame reader optimized for 1280x720 @ 6fps streams.""" + + def __init__(self, camera_id: str, rtsp_url: str, max_retries: int = 3): + self.camera_id = camera_id + self.rtsp_url = rtsp_url + self.max_retries = max_retries + self.cap = None + self.stop_event = threading.Event() + self.thread = None + self.frame_callback: Optional[Callable] = None + + # Expected stream specifications + self.expected_width = 1280 + self.expected_height = 720 + self.expected_fps = 6 + + # Frame processing parameters + self.error_recovery_delay = 5.0 # Increased from 2.0 for stability + self.max_consecutive_errors = 30 # Increased from 10 to handle network jitter + self.stream_timeout = 30.0 + + def set_frame_callback(self, callback: Callable[[str, np.ndarray], None]): + """Set callback function to handle captured frames.""" + self.frame_callback = callback + + def start(self): + """Start the RTSP reader thread.""" + if self.thread and self.thread.is_alive(): + logger.warning(f"RTSP reader for {self.camera_id} already running") + return + + self.stop_event.clear() + self.thread = threading.Thread(target=self._read_frames, daemon=True) + self.thread.start() + logger.info(f"Started RTSP reader for camera {self.camera_id}") + + def stop(self): + """Stop the RTSP reader thread.""" + self.stop_event.set() + if self.thread: + self.thread.join(timeout=5.0) + if self.cap: + self.cap.release() + logger.info(f"Stopped RTSP reader for camera {self.camera_id}") + + def _read_frames(self): + """Main frame reading loop with H.264 error recovery.""" + consecutive_errors = 0 + frame_count = 0 + last_log_time = time.time() + last_successful_frame_time = time.time() + + while not self.stop_event.is_set(): + try: + # Initialize/reinitialize capture if needed + if not self.cap or not self.cap.isOpened(): + if not self._initialize_capture(): + time.sleep(self.error_recovery_delay) + continue + last_successful_frame_time = time.time() + + # Check for stream timeout + if time.time() - last_successful_frame_time > self.stream_timeout: + logger.warning(f"Camera {self.camera_id}: Stream timeout, reinitializing") + self._reinitialize_capture() + last_successful_frame_time = time.time() + continue + + # Read frame immediately without rate limiting for minimum latency + try: + ret, frame = self.cap.read() + if ret and frame is None: + # Grab succeeded but retrieve failed - decoder issue + logger.error(f"Camera {self.camera_id}: Frame grab OK but decode failed") + except Exception as read_error: + logger.error(f"Camera {self.camera_id}: cap.read() threw exception: {type(read_error).__name__}: {read_error}") + ret, frame = False, None + + if not ret or frame is None: + consecutive_errors += 1 + + # Enhanced logging to diagnose the issue + logger.error(f"Camera {self.camera_id}: cap.read() failed - ret={ret}, frame={frame is not None}") + + # Try to get more info from the capture + try: + if self.cap and self.cap.isOpened(): + backend = self.cap.getBackendName() + pos_frames = self.cap.get(cv2.CAP_PROP_POS_FRAMES) + logger.error(f"Camera {self.camera_id}: Capture open, backend: {backend}, pos_frames: {pos_frames}") + else: + logger.error(f"Camera {self.camera_id}: Capture is closed or None!") + except Exception as info_error: + logger.error(f"Camera {self.camera_id}: Error getting capture info: {type(info_error).__name__}: {info_error}") + + if consecutive_errors >= self.max_consecutive_errors: + logger.error(f"Camera {self.camera_id}: Too many consecutive errors ({consecutive_errors}), reinitializing") + self._reinitialize_capture() + consecutive_errors = 0 + time.sleep(self.error_recovery_delay) + else: + # Skip corrupted frame and continue with exponential backoff + if consecutive_errors <= 5: + logger.debug(f"Camera {self.camera_id}: Frame read failed (error {consecutive_errors})") + elif consecutive_errors % 10 == 0: # Log every 10th error after 5 + logger.warning(f"Camera {self.camera_id}: Continuing frame read failures (error {consecutive_errors})") + + # Exponential backoff with cap at 1 second + sleep_time = min(0.1 * (1.5 ** min(consecutive_errors, 10)), 1.0) + time.sleep(sleep_time) + continue + + # Accept any valid frame dimensions - don't force specific resolution + if frame.shape[1] <= 0 or frame.shape[0] <= 0: + consecutive_errors += 1 + continue + + # Check for corrupted frames (all black, all white, excessive noise) + if self._is_frame_corrupted(frame): + logger.debug(f"Camera {self.camera_id}: Corrupted frame detected, skipping") + consecutive_errors += 1 + continue + + # Frame is valid + consecutive_errors = 0 + frame_count += 1 + last_successful_frame_time = time.time() + + # Call frame callback + if self.frame_callback: + try: + self.frame_callback(self.camera_id, frame) + except Exception as e: + logger.error(f"Camera {self.camera_id}: Frame callback error: {e}") + + # Log progress every 30 seconds + current_time = time.time() + if current_time - last_log_time >= 30: + logger.info(f"Camera {self.camera_id}: {frame_count} frames processed") + last_log_time = current_time + + except Exception as e: + logger.error(f"Camera {self.camera_id}: Error in frame reading loop: {e}") + consecutive_errors += 1 + if consecutive_errors >= self.max_consecutive_errors: + self._reinitialize_capture() + consecutive_errors = 0 + time.sleep(self.error_recovery_delay) + + # Cleanup + if self.cap: + self.cap.release() + logger.info(f"RTSP reader thread ended for camera {self.camera_id}") + + def _initialize_capture(self) -> bool: + """Initialize video capture with FFmpeg hardware acceleration (CUVID/NVDEC) for 1280x720@6fps.""" + try: + # Release previous capture if exists + if self.cap: + self.cap.release() + time.sleep(0.5) + + logger.info(f"Initializing capture for camera {self.camera_id} with FFmpeg hardware acceleration") + hw_accel_success = False + + # Method 1: Try OpenCV CUDA VideoReader (if built with CUVID support) + if not hw_accel_success: + try: + # Check if OpenCV was built with CUDA codec support + build_info = cv2.getBuildInformation() + if 'cudacodec' in build_info or 'CUVID' in build_info: + logger.info(f"Attempting OpenCV CUDA VideoReader for camera {self.camera_id}") + + # Use OpenCV's CUDA backend + self.cap = cv2.VideoCapture(self.rtsp_url, cv2.CAP_FFMPEG, [ + cv2.CAP_PROP_HW_ACCELERATION, cv2.VIDEO_ACCELERATION_ANY + ]) + + if self.cap.isOpened(): + hw_accel_success = True + logger.info(f"Camera {self.camera_id}: Using OpenCV CUDA hardware acceleration") + else: + logger.debug(f"Camera {self.camera_id}: OpenCV not built with CUDA codec support") + except Exception as e: + logger.debug(f"Camera {self.camera_id}: OpenCV CUDA not available: {e}") + + # Method 2: Try FFmpeg with optimal hardware acceleration (CUVID/NVDEC) + if not hw_accel_success: + try: + from core.utils.ffmpeg_detector import get_optimal_rtsp_options + import os + + # Get optimal FFmpeg options based on detected capabilities + optimal_options = get_optimal_rtsp_options(self.rtsp_url) + os.environ['OPENCV_FFMPEG_CAPTURE_OPTIONS'] = optimal_options + + logger.info(f"Attempting FFmpeg with detected hardware acceleration for camera {self.camera_id}") + logger.debug(f"Camera {self.camera_id}: Using FFmpeg options: {optimal_options}") + + self.cap = cv2.VideoCapture(self.rtsp_url, cv2.CAP_FFMPEG) + + if self.cap.isOpened(): + hw_accel_success = True + # Try to get backend info to confirm hardware acceleration + backend = self.cap.getBackendName() + logger.info(f"Camera {self.camera_id}: Using FFmpeg hardware acceleration (backend: {backend})") + except Exception as e: + logger.debug(f"Camera {self.camera_id}: FFmpeg optimal hardware acceleration not available: {e}") + + # Method 3: Try FFmpeg with NVIDIA NVDEC (better for RTX 3060) + if not hw_accel_success: + try: + import os + os.environ['OPENCV_FFMPEG_CAPTURE_OPTIONS'] = 'hwaccel;cuda|hwaccel_device;0|rtsp_transport;tcp' + + logger.info(f"Attempting FFmpeg with NVDEC hardware acceleration for camera {self.camera_id}") + self.cap = cv2.VideoCapture(self.rtsp_url, cv2.CAP_FFMPEG) + + if self.cap.isOpened(): + hw_accel_success = True + logger.info(f"Camera {self.camera_id}: Using FFmpeg NVDEC hardware acceleration") + except Exception as e: + logger.debug(f"Camera {self.camera_id}: FFmpeg NVDEC not available: {e}") + + # Method 4: Try FFmpeg with VAAPI (Intel/AMD GPUs) + if not hw_accel_success: + try: + import os + os.environ['OPENCV_FFMPEG_CAPTURE_OPTIONS'] = 'hwaccel;vaapi|hwaccel_device;/dev/dri/renderD128|video_codec;h264|rtsp_transport;tcp' + + logger.info(f"Attempting FFmpeg with VAAPI for camera {self.camera_id}") + self.cap = cv2.VideoCapture(self.rtsp_url, cv2.CAP_FFMPEG) + + if self.cap.isOpened(): + hw_accel_success = True + logger.info(f"Camera {self.camera_id}: Using FFmpeg VAAPI hardware acceleration") + except Exception as e: + logger.debug(f"Camera {self.camera_id}: FFmpeg VAAPI not available: {e}") + + # Fallback: Standard FFmpeg with software decoding + if not hw_accel_success: + logger.warning(f"Camera {self.camera_id}: Hardware acceleration not available, falling back to software decoding") + import os + os.environ['OPENCV_FFMPEG_CAPTURE_OPTIONS'] = 'rtsp_transport;tcp' + self.cap = cv2.VideoCapture(self.rtsp_url, cv2.CAP_FFMPEG) + + if not self.cap.isOpened(): + logger.error(f"Failed to open stream for camera {self.camera_id}") + return False + + # Don't force resolution/fps - let the stream determine its natural specs + # The camera will provide whatever resolution/fps it supports + + + # Set FFMPEG options for better H.264 handling + self.cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*'H264')) + + # Verify stream properties + actual_width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH)) + actual_height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) + actual_fps = self.cap.get(cv2.CAP_PROP_FPS) + + logger.info(f"Camera {self.camera_id} initialized: {actual_width}x{actual_height} @ {actual_fps}fps") + + # Read and discard first few frames to stabilize stream + for _ in range(5): + ret, _ = self.cap.read() + if not ret: + logger.warning(f"Camera {self.camera_id}: Failed to read initial frames") + time.sleep(0.1) + + return True + + except Exception as e: + logger.error(f"Error initializing capture for camera {self.camera_id}: {e}") + return False + + def _reinitialize_capture(self): + """Reinitialize capture after errors with retry logic.""" + logger.info(f"Reinitializing capture for camera {self.camera_id}") + if self.cap: + self.cap.release() + self.cap = None + + # Longer delay before reconnection to avoid rapid reconnect loops + time.sleep(3.0) + + # Retry initialization up to 3 times + for attempt in range(3): + if self._initialize_capture(): + logger.info(f"Successfully reinitialized camera {self.camera_id} on attempt {attempt + 1}") + break + else: + logger.warning(f"Failed to reinitialize camera {self.camera_id} on attempt {attempt + 1}") + time.sleep(2.0) + + def _is_frame_corrupted(self, frame: np.ndarray) -> bool: + """Check if frame is corrupted (all black, all white, or excessive noise).""" + if frame is None or frame.size == 0: + return True + + # Check mean and standard deviation + mean = np.mean(frame) + std = np.std(frame) + + # All black or all white + if mean < 5 or mean > 250: + return True + + # No variation (stuck frame) + if std < 1: + return True + + # Excessive noise (corrupted H.264 decode) + # Calculate edge density as corruption indicator + edges = cv2.Canny(frame, 50, 150) + edge_density = np.sum(edges > 0) / edges.size + + # Too many edges indicate corruption + if edge_density > 0.5: + return True + + return False + + +class HTTPSnapshotReader: + """HTTP snapshot reader optimized for 2560x1440 (2K) high quality images.""" + + def __init__(self, camera_id: str, snapshot_url: str, interval_ms: int = 5000, max_retries: int = 3): + self.camera_id = camera_id + self.snapshot_url = snapshot_url + self.interval_ms = interval_ms + self.max_retries = max_retries + self.stop_event = threading.Event() + self.thread = None + self.frame_callback: Optional[Callable] = None + + # Expected snapshot specifications + self.expected_width = 2560 + self.expected_height = 1440 + self.max_file_size = 10 * 1024 * 1024 # 10MB max for 2K image + + def set_frame_callback(self, callback: Callable[[str, np.ndarray], None]): + """Set callback function to handle captured frames.""" + self.frame_callback = callback + + def start(self): + """Start the snapshot reader thread.""" + if self.thread and self.thread.is_alive(): + logger.warning(f"Snapshot reader for {self.camera_id} already running") + return + + self.stop_event.clear() + self.thread = threading.Thread(target=self._read_snapshots, daemon=True) + self.thread.start() + logger.info(f"Started snapshot reader for camera {self.camera_id}") + + def stop(self): + """Stop the snapshot reader thread.""" + self.stop_event.set() + if self.thread: + self.thread.join(timeout=5.0) + logger.info(f"Stopped snapshot reader for camera {self.camera_id}") + + def _read_snapshots(self): + """Main snapshot reading loop for high quality 2K images.""" + retries = 0 + frame_count = 0 + last_log_time = time.time() + interval_seconds = self.interval_ms / 1000.0 + + logger.info(f"Snapshot interval for camera {self.camera_id}: {interval_seconds}s") + + while not self.stop_event.is_set(): + try: + start_time = time.time() + frame = self._fetch_snapshot() + + if frame is None: + retries += 1 + logger.warning(f"Failed to fetch snapshot for camera {self.camera_id}, retry {retries}/{self.max_retries}") + + if self.max_retries != -1 and retries > self.max_retries: + logger.error(f"Max retries reached for snapshot camera {self.camera_id}") + break + + time.sleep(min(2.0, interval_seconds)) + continue + + # Accept any valid image dimensions - don't force specific resolution + if frame.shape[1] <= 0 or frame.shape[0] <= 0: + logger.warning(f"Camera {self.camera_id}: Invalid frame dimensions {frame.shape[1]}x{frame.shape[0]}") + continue + + # Reset retry counter on successful fetch + retries = 0 + frame_count += 1 + + # Call frame callback + if self.frame_callback: + try: + self.frame_callback(self.camera_id, frame) + except Exception as e: + logger.error(f"Camera {self.camera_id}: Frame callback error: {e}") + + # Log progress every 30 seconds + current_time = time.time() + if current_time - last_log_time >= 30: + logger.info(f"Camera {self.camera_id}: {frame_count} snapshots processed") + last_log_time = current_time + + # Wait for next interval + elapsed = time.time() - start_time + sleep_time = max(0, interval_seconds - elapsed) + if sleep_time > 0: + self.stop_event.wait(sleep_time) + + except Exception as e: + logger.error(f"Error in snapshot loop for camera {self.camera_id}: {e}") + retries += 1 + if self.max_retries != -1 and retries > self.max_retries: + break + time.sleep(min(2.0, interval_seconds)) + + logger.info(f"Snapshot reader thread ended for camera {self.camera_id}") + + def _fetch_snapshot(self) -> Optional[np.ndarray]: + """Fetch a single high quality snapshot from HTTP URL.""" + try: + # Parse URL for authentication + from urllib.parse import urlparse + parsed_url = urlparse(self.snapshot_url) + + headers = { + 'User-Agent': 'Python-Detector-Worker/1.0', + 'Accept': 'image/jpeg, image/png, image/*' + } + auth = None + + if parsed_url.username and parsed_url.password: + from requests.auth import HTTPBasicAuth, HTTPDigestAuth + auth = HTTPBasicAuth(parsed_url.username, parsed_url.password) + + # Reconstruct URL without credentials + clean_url = f"{parsed_url.scheme}://{parsed_url.hostname}" + if parsed_url.port: + clean_url += f":{parsed_url.port}" + clean_url += parsed_url.path + if parsed_url.query: + clean_url += f"?{parsed_url.query}" + + # Try Basic Auth first + response = requests.get(clean_url, auth=auth, timeout=15, headers=headers, + stream=True, verify=False) + + # If Basic Auth fails, try Digest Auth + if response.status_code == 401: + auth = HTTPDigestAuth(parsed_url.username, parsed_url.password) + response = requests.get(clean_url, auth=auth, timeout=15, headers=headers, + stream=True, verify=False) + else: + response = requests.get(self.snapshot_url, timeout=15, headers=headers, + stream=True, verify=False) + + if response.status_code == 200: + # Check content size + content_length = int(response.headers.get('content-length', 0)) + if content_length > self.max_file_size: + logger.warning(f"Snapshot too large for camera {self.camera_id}: {content_length} bytes") + return None + + # Read content + content = response.content + + # Convert to numpy array + image_array = np.frombuffer(content, np.uint8) + + # Decode as high quality image + frame = cv2.imdecode(image_array, cv2.IMREAD_COLOR) + + if frame is None: + logger.error(f"Failed to decode snapshot for camera {self.camera_id}") + return None + + logger.debug(f"Fetched snapshot for camera {self.camera_id}: {frame.shape[1]}x{frame.shape[0]}") + return frame + else: + logger.warning(f"HTTP {response.status_code} from {self.camera_id}") + return None + + except requests.RequestException as e: + logger.error(f"Request error fetching snapshot for {self.camera_id}: {e}") + return None + except Exception as e: + logger.error(f"Error decoding snapshot for {self.camera_id}: {e}") + return None + + def fetch_single_snapshot(self) -> Optional[np.ndarray]: + """ + Fetch a single high-quality snapshot on demand for pipeline processing. + This method is for one-time fetch from HTTP URL, not continuous streaming. + + Returns: + High quality 2K snapshot frame or None if failed + """ + logger.info(f"[SNAPSHOT] Fetching snapshot for {self.camera_id} from {self.snapshot_url}") + + # Try to fetch snapshot with retries + for attempt in range(self.max_retries): + frame = self._fetch_snapshot() + + if frame is not None: + logger.info(f"[SNAPSHOT] Successfully fetched {frame.shape[1]}x{frame.shape[0]} snapshot for {self.camera_id}") + return frame + + if attempt < self.max_retries - 1: + logger.warning(f"[SNAPSHOT] Attempt {attempt + 1}/{self.max_retries} failed for {self.camera_id}, retrying...") + time.sleep(0.5) + + logger.error(f"[SNAPSHOT] Failed to fetch snapshot for {self.camera_id} after {self.max_retries} attempts") + return None + + def _resize_maintain_aspect(self, frame: np.ndarray, target_width: int, target_height: int) -> np.ndarray: + """Resize image while maintaining aspect ratio for high quality.""" + h, w = frame.shape[:2] + aspect = w / h + target_aspect = target_width / target_height + + if aspect > target_aspect: + # Image is wider + new_width = target_width + new_height = int(target_width / aspect) + else: + # Image is taller + new_height = target_height + new_width = int(target_height * aspect) + + # Use INTER_LANCZOS4 for high quality downsampling + resized = cv2.resize(frame, (new_width, new_height), interpolation=cv2.INTER_LANCZOS4) + + # Pad to target size if needed + if new_width < target_width or new_height < target_height: + top = (target_height - new_height) // 2 + bottom = target_height - new_height - top + left = (target_width - new_width) // 2 + right = target_width - new_width - left + resized = cv2.copyMakeBorder(resized, top, bottom, left, right, cv2.BORDER_CONSTANT, value=[0, 0, 0]) + + return resized \ No newline at end of file diff --git a/core/streaming/readers/__init__.py b/core/streaming/readers/__init__.py deleted file mode 100644 index 0903d6d..0000000 --- a/core/streaming/readers/__init__.py +++ /dev/null @@ -1,18 +0,0 @@ -""" -Stream readers for RTSP and HTTP camera feeds. -""" -from .base import VideoReader -from .ffmpeg_rtsp import FFmpegRTSPReader -from .http_snapshot import HTTPSnapshotReader -from .utils import log_success, log_warning, log_error, log_info, Colors - -__all__ = [ - 'VideoReader', - 'FFmpegRTSPReader', - 'HTTPSnapshotReader', - 'log_success', - 'log_warning', - 'log_error', - 'log_info', - 'Colors' -] \ No newline at end of file diff --git a/core/streaming/readers/base.py b/core/streaming/readers/base.py deleted file mode 100644 index 56c41cb..0000000 --- a/core/streaming/readers/base.py +++ /dev/null @@ -1,65 +0,0 @@ -""" -Abstract base class for video stream readers. -""" -from abc import ABC, abstractmethod -from typing import Optional, Callable -import numpy as np - - -class VideoReader(ABC): - """Abstract base class for video stream readers.""" - - def __init__(self, camera_id: str, source_url: str, max_retries: int = 3): - """ - Initialize the video reader. - - Args: - camera_id: Unique identifier for the camera - source_url: URL or path to the video source - max_retries: Maximum number of retry attempts - """ - self.camera_id = camera_id - self.source_url = source_url - self.max_retries = max_retries - self.frame_callback: Optional[Callable[[str, np.ndarray], None]] = None - - @abstractmethod - def start(self) -> None: - """Start the video reader.""" - pass - - @abstractmethod - def stop(self) -> None: - """Stop the video reader.""" - pass - - @abstractmethod - def set_frame_callback(self, callback: Callable[[str, np.ndarray], None]) -> None: - """ - Set callback function to handle captured frames. - - Args: - callback: Function that takes (camera_id, frame) as arguments - """ - pass - - @property - @abstractmethod - def is_running(self) -> bool: - """Check if the reader is currently running.""" - pass - - @property - @abstractmethod - def reader_type(self) -> str: - """Get the type of reader (e.g., 'rtsp', 'http_snapshot').""" - pass - - def __enter__(self): - """Context manager entry.""" - self.start() - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - """Context manager exit.""" - self.stop() \ No newline at end of file diff --git a/core/streaming/readers/ffmpeg_rtsp.py b/core/streaming/readers/ffmpeg_rtsp.py deleted file mode 100644 index 88f45ae..0000000 --- a/core/streaming/readers/ffmpeg_rtsp.py +++ /dev/null @@ -1,436 +0,0 @@ -""" -FFmpeg RTSP stream reader using subprocess piping frames directly to buffer. -Enhanced with comprehensive health monitoring and automatic recovery. -""" -import cv2 -import time -import threading -import numpy as np -import subprocess -import struct -from typing import Optional, Callable, Dict, Any - -from .base import VideoReader -from .utils import log_success, log_warning, log_error, log_info -from ...monitoring.stream_health import stream_health_tracker -from ...monitoring.thread_health import thread_health_monitor -from ...monitoring.recovery import recovery_manager, RecoveryAction - - -class FFmpegRTSPReader(VideoReader): - """RTSP stream reader using subprocess FFmpeg piping frames directly to buffer.""" - - def __init__(self, camera_id: str, rtsp_url: str, max_retries: int = 3): - super().__init__(camera_id, rtsp_url, max_retries) - self.rtsp_url = rtsp_url - self.process = None - self.stop_event = threading.Event() - self.thread = None - self.stderr_thread = None - - # Expected stream specs (for reference, actual dimensions read from PPM header) - self.width = 1280 - self.height = 720 - - # Watchdog timers for stream reliability - self.process_start_time = None - self.last_frame_time = None - self.is_restart = False # Track if this is a restart (shorter timeout) - self.first_start_timeout = 30.0 # 30s timeout on first start - self.restart_timeout = 15.0 # 15s timeout after restart - - # Health monitoring setup - self.last_heartbeat = time.time() - self.consecutive_errors = 0 - self.ffmpeg_restart_count = 0 - - # Register recovery handlers - recovery_manager.register_recovery_handler( - RecoveryAction.RESTART_STREAM, - self._handle_restart_recovery - ) - recovery_manager.register_recovery_handler( - RecoveryAction.RECONNECT, - self._handle_reconnect_recovery - ) - - @property - def is_running(self) -> bool: - """Check if the reader is currently running.""" - return self.thread is not None and self.thread.is_alive() - - @property - def reader_type(self) -> str: - """Get the type of reader.""" - return "rtsp_ffmpeg" - - def set_frame_callback(self, callback: Callable[[str, np.ndarray], None]): - """Set callback function to handle captured frames.""" - self.frame_callback = callback - - def start(self): - """Start the FFmpeg subprocess reader.""" - if self.thread and self.thread.is_alive(): - log_warning(self.camera_id, "FFmpeg reader already running") - return - - self.stop_event.clear() - self.thread = threading.Thread(target=self._read_frames, daemon=True) - self.thread.start() - - # Register with health monitoring - stream_health_tracker.register_stream(self.camera_id, "rtsp_ffmpeg", self.rtsp_url) - thread_health_monitor.register_thread(self.thread, self._heartbeat_callback) - - log_success(self.camera_id, "Stream started with health monitoring") - - def stop(self): - """Stop the FFmpeg subprocess reader.""" - self.stop_event.set() - - # Unregister from health monitoring - if self.thread: - thread_health_monitor.unregister_thread(self.thread.ident) - - if self.process: - self.process.terminate() - try: - self.process.wait(timeout=5) - except subprocess.TimeoutExpired: - self.process.kill() - - if self.thread: - self.thread.join(timeout=5.0) - if self.stderr_thread: - self.stderr_thread.join(timeout=2.0) - - stream_health_tracker.unregister_stream(self.camera_id) - - log_info(self.camera_id, "Stream stopped") - - def _start_ffmpeg_process(self): - """Start FFmpeg subprocess outputting BMP frames to stdout pipe.""" - cmd = [ - 'ffmpeg', - # DO NOT REMOVE - '-hwaccel', 'cuda', - '-hwaccel_device', '0', - # Real-time input flags - '-fflags', 'nobuffer+genpts', - '-flags', 'low_delay', - '-max_delay', '0', # No reordering delay - # RTSP configuration - '-rtsp_transport', 'tcp', - '-i', self.rtsp_url, - # Output configuration (keeping BMP) - '-f', 'image2pipe', # Output images to pipe - '-vcodec', 'bmp', # BMP format with header containing dimensions - '-vsync', 'passthrough', # Pass frames as-is - # Use native stream resolution and framerate - '-an', # No audio - '-' # Output to stdout - ] - - try: - # Start FFmpeg with stdout pipe to read frames directly - self.process = subprocess.Popen( - cmd, - stdout=subprocess.PIPE, # Capture stdout for frame data - stderr=subprocess.PIPE, # Capture stderr for error logging - bufsize=0 # Unbuffered for real-time processing - ) - - # Start stderr reading thread - if self.stderr_thread and self.stderr_thread.is_alive(): - # Stop previous stderr thread - try: - self.stderr_thread.join(timeout=1.0) - except: - pass - - self.stderr_thread = threading.Thread(target=self._read_stderr, daemon=True) - self.stderr_thread.start() - - # Set process start time for watchdog - self.process_start_time = time.time() - self.last_frame_time = None # Reset frame time - - # After successful restart, next timeout will be back to 30s - if self.is_restart: - log_info(self.camera_id, f"FFmpeg restarted successfully, next timeout: {self.first_start_timeout}s") - self.is_restart = False - - return True - except Exception as e: - log_error(self.camera_id, f"FFmpeg startup failed: {e}") - return False - - def _read_bmp_frame(self, pipe): - """Read BMP frame from pipe - BMP header contains dimensions.""" - try: - # Read BMP header (14 bytes file header + 40 bytes info header = 54 bytes minimum) - header_data = b'' - bytes_to_read = 54 - - while len(header_data) < bytes_to_read: - chunk = pipe.read(bytes_to_read - len(header_data)) - if not chunk: - return None # Silent end of stream - header_data += chunk - - # Parse BMP header - if header_data[:2] != b'BM': - return None # Invalid format, skip frame silently - - # Extract file size from header (bytes 2-5) - file_size = struct.unpack(' bool: - """Check if watchdog timeout has been exceeded.""" - if not self.process_start_time: - return False - - current_time = time.time() - time_since_start = current_time - self.process_start_time - - # Determine timeout based on whether this is a restart - timeout = self.restart_timeout if self.is_restart else self.first_start_timeout - - # If no frames received yet, check against process start time - if not self.last_frame_time: - if time_since_start > timeout: - log_warning(self.camera_id, f"Watchdog timeout: No frames for {time_since_start:.1f}s (limit: {timeout}s)") - return True - else: - # Check time since last frame - time_since_frame = current_time - self.last_frame_time - if time_since_frame > timeout: - log_warning(self.camera_id, f"Watchdog timeout: No frames for {time_since_frame:.1f}s (limit: {timeout}s)") - return True - - return False - - def _restart_ffmpeg_process(self): - """Restart FFmpeg process due to watchdog timeout.""" - log_warning(self.camera_id, "Watchdog triggered FFmpeg restart") - - # Terminate current process - if self.process: - try: - self.process.terminate() - self.process.wait(timeout=3) - except subprocess.TimeoutExpired: - self.process.kill() - except Exception: - pass - self.process = None - - # Mark as restart for shorter timeout - self.is_restart = True - - # Small delay before restart - time.sleep(1.0) - - def _read_frames(self): - """Read frames directly from FFmpeg stdout pipe.""" - frame_count = 0 - last_log_time = time.time() - - while not self.stop_event.is_set(): - try: - # Send heartbeat for thread health monitoring - self._send_heartbeat("reading_frames") - - # Check watchdog timeout if process is running - if self.process and self.process.poll() is None: - if self._check_watchdog_timeout(): - self._restart_ffmpeg_process() - continue - - # Start FFmpeg if not running - if not self.process or self.process.poll() is not None: - if self.process and self.process.poll() is not None: - log_warning(self.camera_id, "Stream disconnected, reconnecting...") - stream_health_tracker.report_error( - self.camera_id, - "FFmpeg process disconnected" - ) - - if not self._start_ffmpeg_process(): - self.consecutive_errors += 1 - stream_health_tracker.report_error( - self.camera_id, - "Failed to start FFmpeg process" - ) - time.sleep(5.0) - continue - - # Read frames directly from FFmpeg stdout - try: - if self.process and self.process.stdout: - # Read BMP frame data - frame = self._read_bmp_frame(self.process.stdout) - if frame is None: - continue - - # Update watchdog - we got a frame - self.last_frame_time = time.time() - - # Reset error counter on successful frame - self.consecutive_errors = 0 - - # Report successful frame to health monitoring - frame_size = frame.nbytes - stream_health_tracker.report_frame_received(self.camera_id, frame_size) - - # Call frame callback - if self.frame_callback: - try: - self.frame_callback(self.camera_id, frame) - except Exception as e: - stream_health_tracker.report_error( - self.camera_id, - f"Frame callback error: {e}" - ) - - frame_count += 1 - - # Log progress every 60 seconds (quieter) - current_time = time.time() - if current_time - last_log_time >= 60: - log_success(self.camera_id, f"{frame_count} frames captured ({frame.shape[1]}x{frame.shape[0]})") - last_log_time = current_time - - except Exception as e: - # Process might have died, let it restart on next iteration - stream_health_tracker.report_error( - self.camera_id, - f"Frame reading error: {e}" - ) - if self.process: - self.process.terminate() - self.process = None - time.sleep(1.0) - - except Exception as e: - stream_health_tracker.report_error( - self.camera_id, - f"Main loop error: {e}" - ) - time.sleep(1.0) - - # Cleanup - if self.process: - self.process.terminate() - - # Health monitoring methods - def _send_heartbeat(self, activity: str = "running"): - """Send heartbeat to thread health monitor.""" - self.last_heartbeat = time.time() - thread_health_monitor.heartbeat(activity=activity) - - def _heartbeat_callback(self) -> bool: - """Heartbeat callback for thread responsiveness testing.""" - try: - # Check if thread is responsive by checking recent heartbeat - current_time = time.time() - age = current_time - self.last_heartbeat - - # Thread is responsive if heartbeat is recent - return age < 30.0 # 30 second responsiveness threshold - - except Exception: - return False - - def _handle_restart_recovery(self, component: str, details: Dict[str, Any]) -> bool: - """Handle restart recovery action.""" - try: - log_info(self.camera_id, "Restarting FFmpeg RTSP reader for health recovery") - - # Stop current instance - self.stop() - - # Small delay - time.sleep(2.0) - - # Restart - self.start() - - # Report successful restart - stream_health_tracker.report_reconnect(self.camera_id, "health_recovery_restart") - self.ffmpeg_restart_count += 1 - - return True - - except Exception as e: - log_error(self.camera_id, f"Failed to restart FFmpeg RTSP reader: {e}") - return False - - def _handle_reconnect_recovery(self, component: str, details: Dict[str, Any]) -> bool: - """Handle reconnect recovery action.""" - try: - log_info(self.camera_id, "Reconnecting FFmpeg RTSP reader for health recovery") - - # Force restart FFmpeg process - self._restart_ffmpeg_process() - - # Reset error counters - self.consecutive_errors = 0 - stream_health_tracker.report_reconnect(self.camera_id, "health_recovery_reconnect") - - return True - - except Exception as e: - log_error(self.camera_id, f"Failed to reconnect FFmpeg RTSP reader: {e}") - return False \ No newline at end of file diff --git a/core/streaming/readers/http_snapshot.py b/core/streaming/readers/http_snapshot.py deleted file mode 100644 index bbbf943..0000000 --- a/core/streaming/readers/http_snapshot.py +++ /dev/null @@ -1,378 +0,0 @@ -""" -HTTP snapshot reader optimized for 2560x1440 (2K) high quality images. -Enhanced with comprehensive health monitoring and automatic recovery. -""" -import cv2 -import logging -import time -import threading -import requests -import numpy as np -from typing import Optional, Callable, Dict, Any - -from .base import VideoReader -from .utils import log_success, log_warning, log_error, log_info -from ...monitoring.stream_health import stream_health_tracker -from ...monitoring.thread_health import thread_health_monitor -from ...monitoring.recovery import recovery_manager, RecoveryAction - -logger = logging.getLogger(__name__) - - -class HTTPSnapshotReader(VideoReader): - """HTTP snapshot reader optimized for 2560x1440 (2K) high quality images.""" - - def __init__(self, camera_id: str, snapshot_url: str, interval_ms: int = 5000, max_retries: int = 3): - super().__init__(camera_id, snapshot_url, max_retries) - self.snapshot_url = snapshot_url - self.interval_ms = interval_ms - self.stop_event = threading.Event() - self.thread = None - - # Expected snapshot specifications - self.expected_width = 2560 - self.expected_height = 1440 - self.max_file_size = 10 * 1024 * 1024 # 10MB max for 2K image - - # Health monitoring setup - self.last_heartbeat = time.time() - self.consecutive_errors = 0 - self.connection_test_interval = 300 # Test connection every 5 minutes - self.last_connection_test = None - - # Register recovery handlers - recovery_manager.register_recovery_handler( - RecoveryAction.RESTART_STREAM, - self._handle_restart_recovery - ) - recovery_manager.register_recovery_handler( - RecoveryAction.RECONNECT, - self._handle_reconnect_recovery - ) - - @property - def is_running(self) -> bool: - """Check if the reader is currently running.""" - return self.thread is not None and self.thread.is_alive() - - @property - def reader_type(self) -> str: - """Get the type of reader.""" - return "http_snapshot" - - def set_frame_callback(self, callback: Callable[[str, np.ndarray], None]): - """Set callback function to handle captured frames.""" - self.frame_callback = callback - - def start(self): - """Start the snapshot reader thread.""" - if self.thread and self.thread.is_alive(): - logger.warning(f"Snapshot reader for {self.camera_id} already running") - return - - self.stop_event.clear() - self.thread = threading.Thread(target=self._read_snapshots, daemon=True) - self.thread.start() - - # Register with health monitoring - stream_health_tracker.register_stream(self.camera_id, "http_snapshot", self.snapshot_url) - thread_health_monitor.register_thread(self.thread, self._heartbeat_callback) - - logger.info(f"Started snapshot reader for camera {self.camera_id} with health monitoring") - - def stop(self): - """Stop the snapshot reader thread.""" - self.stop_event.set() - - # Unregister from health monitoring - if self.thread: - thread_health_monitor.unregister_thread(self.thread.ident) - self.thread.join(timeout=5.0) - - stream_health_tracker.unregister_stream(self.camera_id) - - logger.info(f"Stopped snapshot reader for camera {self.camera_id}") - - def _read_snapshots(self): - """Main snapshot reading loop for high quality 2K images.""" - retries = 0 - frame_count = 0 - last_log_time = time.time() - last_connection_test = time.time() - interval_seconds = self.interval_ms / 1000.0 - - logger.info(f"Snapshot interval for camera {self.camera_id}: {interval_seconds}s") - - while not self.stop_event.is_set(): - try: - # Send heartbeat for thread health monitoring - self._send_heartbeat("fetching_snapshot") - - start_time = time.time() - frame = self._fetch_snapshot() - - if frame is None: - retries += 1 - self.consecutive_errors += 1 - - # Report error to health monitoring - stream_health_tracker.report_error( - self.camera_id, - f"Failed to fetch snapshot (retry {retries}/{self.max_retries})" - ) - - logger.warning(f"Failed to fetch snapshot for camera {self.camera_id}, retry {retries}/{self.max_retries}") - - if self.max_retries != -1 and retries > self.max_retries: - logger.error(f"Max retries reached for snapshot camera {self.camera_id}") - break - - time.sleep(min(2.0, interval_seconds)) - continue - - # Accept any valid image dimensions - don't force specific resolution - if frame.shape[1] <= 0 or frame.shape[0] <= 0: - logger.warning(f"Camera {self.camera_id}: Invalid frame dimensions {frame.shape[1]}x{frame.shape[0]}") - stream_health_tracker.report_error( - self.camera_id, - f"Invalid frame dimensions: {frame.shape[1]}x{frame.shape[0]}" - ) - continue - - # Reset retry counter on successful fetch - retries = 0 - self.consecutive_errors = 0 - frame_count += 1 - - # Report successful frame to health monitoring - frame_size = frame.nbytes - stream_health_tracker.report_frame_received(self.camera_id, frame_size) - - # Call frame callback - if self.frame_callback: - try: - self.frame_callback(self.camera_id, frame) - except Exception as e: - logger.error(f"Camera {self.camera_id}: Frame callback error: {e}") - stream_health_tracker.report_error(self.camera_id, f"Frame callback error: {e}") - - # Periodic connection health test - current_time = time.time() - if current_time - last_connection_test >= self.connection_test_interval: - self._test_connection_health() - last_connection_test = current_time - - # Log progress every 30 seconds - if current_time - last_log_time >= 30: - logger.info(f"Camera {self.camera_id}: {frame_count} snapshots processed") - last_log_time = current_time - - # Wait for next interval - elapsed = time.time() - start_time - sleep_time = max(0, interval_seconds - elapsed) - if sleep_time > 0: - self.stop_event.wait(sleep_time) - - except Exception as e: - logger.error(f"Error in snapshot loop for camera {self.camera_id}: {e}") - stream_health_tracker.report_error(self.camera_id, f"Snapshot loop error: {e}") - retries += 1 - if self.max_retries != -1 and retries > self.max_retries: - break - time.sleep(min(2.0, interval_seconds)) - - logger.info(f"Snapshot reader thread ended for camera {self.camera_id}") - - def _fetch_snapshot(self) -> Optional[np.ndarray]: - """Fetch a single high quality snapshot from HTTP URL.""" - try: - # Parse URL for authentication - from urllib.parse import urlparse - parsed_url = urlparse(self.snapshot_url) - - headers = { - 'User-Agent': 'Python-Detector-Worker/1.0', - 'Accept': 'image/jpeg, image/png, image/*' - } - auth = None - - if parsed_url.username and parsed_url.password: - from requests.auth import HTTPBasicAuth, HTTPDigestAuth - auth = HTTPBasicAuth(parsed_url.username, parsed_url.password) - - # Reconstruct URL without credentials - clean_url = f"{parsed_url.scheme}://{parsed_url.hostname}" - if parsed_url.port: - clean_url += f":{parsed_url.port}" - clean_url += parsed_url.path - if parsed_url.query: - clean_url += f"?{parsed_url.query}" - - # Try Basic Auth first - response = requests.get(clean_url, auth=auth, timeout=15, headers=headers, - stream=True, verify=False) - - # If Basic Auth fails, try Digest Auth - if response.status_code == 401: - auth = HTTPDigestAuth(parsed_url.username, parsed_url.password) - response = requests.get(clean_url, auth=auth, timeout=15, headers=headers, - stream=True, verify=False) - else: - response = requests.get(self.snapshot_url, timeout=15, headers=headers, - stream=True, verify=False) - - if response.status_code == 200: - # Check content size - content_length = int(response.headers.get('content-length', 0)) - if content_length > self.max_file_size: - logger.warning(f"Snapshot too large for camera {self.camera_id}: {content_length} bytes") - return None - - # Read content - content = response.content - - # Convert to numpy array - image_array = np.frombuffer(content, np.uint8) - - # Decode as high quality image - frame = cv2.imdecode(image_array, cv2.IMREAD_COLOR) - - if frame is None: - logger.error(f"Failed to decode snapshot for camera {self.camera_id}") - return None - - logger.debug(f"Fetched snapshot for camera {self.camera_id}: {frame.shape[1]}x{frame.shape[0]}") - return frame - else: - logger.warning(f"HTTP {response.status_code} from {self.camera_id}") - return None - - except requests.RequestException as e: - logger.error(f"Request error fetching snapshot for {self.camera_id}: {e}") - return None - except Exception as e: - logger.error(f"Error decoding snapshot for {self.camera_id}: {e}") - return None - - def fetch_single_snapshot(self) -> Optional[np.ndarray]: - """ - Fetch a single high-quality snapshot on demand for pipeline processing. - This method is for one-time fetch from HTTP URL, not continuous streaming. - - Returns: - High quality 2K snapshot frame or None if failed - """ - logger.info(f"[SNAPSHOT] Fetching snapshot for {self.camera_id} from {self.snapshot_url}") - - # Try to fetch snapshot with retries - for attempt in range(self.max_retries): - frame = self._fetch_snapshot() - - if frame is not None: - logger.info(f"[SNAPSHOT] Successfully fetched {frame.shape[1]}x{frame.shape[0]} snapshot for {self.camera_id}") - return frame - - if attempt < self.max_retries - 1: - logger.warning(f"[SNAPSHOT] Attempt {attempt + 1}/{self.max_retries} failed for {self.camera_id}, retrying...") - time.sleep(0.5) - - logger.error(f"[SNAPSHOT] Failed to fetch snapshot for {self.camera_id} after {self.max_retries} attempts") - return None - - def _resize_maintain_aspect(self, frame: np.ndarray, target_width: int, target_height: int) -> np.ndarray: - """Resize image while maintaining aspect ratio for high quality.""" - h, w = frame.shape[:2] - aspect = w / h - target_aspect = target_width / target_height - - if aspect > target_aspect: - # Image is wider - new_width = target_width - new_height = int(target_width / aspect) - else: - # Image is taller - new_height = target_height - new_width = int(target_height * aspect) - - # Use INTER_LANCZOS4 for high quality downsampling - resized = cv2.resize(frame, (new_width, new_height), interpolation=cv2.INTER_LANCZOS4) - - # Pad to target size if needed - if new_width < target_width or new_height < target_height: - top = (target_height - new_height) // 2 - bottom = target_height - new_height - top - left = (target_width - new_width) // 2 - right = target_width - new_width - left - resized = cv2.copyMakeBorder(resized, top, bottom, left, right, cv2.BORDER_CONSTANT, value=[0, 0, 0]) - - return resized - - # Health monitoring methods - def _send_heartbeat(self, activity: str = "running"): - """Send heartbeat to thread health monitor.""" - self.last_heartbeat = time.time() - thread_health_monitor.heartbeat(activity=activity) - - def _heartbeat_callback(self) -> bool: - """Heartbeat callback for thread responsiveness testing.""" - try: - # Check if thread is responsive by checking recent heartbeat - current_time = time.time() - age = current_time - self.last_heartbeat - - # Thread is responsive if heartbeat is recent - return age < 30.0 # 30 second responsiveness threshold - - except Exception: - return False - - def _test_connection_health(self): - """Test HTTP connection health.""" - try: - stream_health_tracker.test_http_connection(self.camera_id, self.snapshot_url) - except Exception as e: - logger.error(f"Error testing connection health for {self.camera_id}: {e}") - - def _handle_restart_recovery(self, component: str, details: Dict[str, Any]) -> bool: - """Handle restart recovery action.""" - try: - logger.info(f"Restarting HTTP snapshot reader for {self.camera_id}") - - # Stop current instance - self.stop() - - # Small delay - time.sleep(2.0) - - # Restart - self.start() - - # Report successful restart - stream_health_tracker.report_reconnect(self.camera_id, "health_recovery_restart") - - return True - - except Exception as e: - logger.error(f"Failed to restart HTTP snapshot reader for {self.camera_id}: {e}") - return False - - def _handle_reconnect_recovery(self, component: str, details: Dict[str, Any]) -> bool: - """Handle reconnect recovery action.""" - try: - logger.info(f"Reconnecting HTTP snapshot reader for {self.camera_id}") - - # Test connection first - success = stream_health_tracker.test_http_connection(self.camera_id, self.snapshot_url) - - if success: - # Reset error counters - self.consecutive_errors = 0 - stream_health_tracker.report_reconnect(self.camera_id, "health_recovery_reconnect") - return True - else: - logger.warning(f"Connection test failed during recovery for {self.camera_id}") - return False - - except Exception as e: - logger.error(f"Failed to reconnect HTTP snapshot reader for {self.camera_id}: {e}") - return False \ No newline at end of file diff --git a/core/streaming/readers/utils.py b/core/streaming/readers/utils.py deleted file mode 100644 index 813f49f..0000000 --- a/core/streaming/readers/utils.py +++ /dev/null @@ -1,38 +0,0 @@ -""" -Utility functions for stream readers. -""" -import logging -import os - -# Keep OpenCV errors visible but allow FFmpeg stderr logging -os.environ["OPENCV_LOG_LEVEL"] = "ERROR" - -logger = logging.getLogger(__name__) - -# Color codes for pretty logging -class Colors: - GREEN = '\033[92m' - YELLOW = '\033[93m' - RED = '\033[91m' - BLUE = '\033[94m' - PURPLE = '\033[95m' - CYAN = '\033[96m' - WHITE = '\033[97m' - BOLD = '\033[1m' - END = '\033[0m' - -def log_success(camera_id: str, message: str): - """Log success messages in green""" - logger.info(f"{Colors.GREEN}[{camera_id}] {message}{Colors.END}") - -def log_warning(camera_id: str, message: str): - """Log warnings in yellow""" - logger.warning(f"{Colors.YELLOW}[{camera_id}] {message}{Colors.END}") - -def log_error(camera_id: str, message: str): - """Log errors in red""" - logger.error(f"{Colors.RED}[{camera_id}] {message}{Colors.END}") - -def log_info(camera_id: str, message: str): - """Log info in cyan""" - logger.info(f"{Colors.CYAN}[{camera_id}] {message}{Colors.END}") \ No newline at end of file diff --git a/core/tracking/integration.py b/core/tracking/integration.py index 2fba002..3f1ebe0 100644 --- a/core/tracking/integration.py +++ b/core/tracking/integration.py @@ -61,7 +61,6 @@ class TrackingPipelineIntegration: self.cleared_sessions: Dict[str, float] = {} # session_id -> clear_time self.pending_vehicles: Dict[str, int] = {} # display_id -> track_id (waiting for session ID) self.pending_processing_data: Dict[str, Dict] = {} # display_id -> processing data (waiting for session ID) - self.display_to_subscription: Dict[str, str] = {} # display_id -> subscription_id (for fallback) # Additional validators for enhanced flow control self.permanently_processed: Dict[str, float] = {} # "camera_id:track_id" -> process_time (never process again) @@ -72,17 +71,12 @@ class TrackingPipelineIntegration: # Thread pool for pipeline execution self.executor = ThreadPoolExecutor(max_workers=2) - # Min bbox filtering configuration - # TODO: Make this configurable via pipeline.json in the future - self.min_bbox_area_percentage = 3.5 # 3.5% of frame area minimum - # Statistics self.stats = { 'frames_processed': 0, 'vehicles_detected': 0, 'vehicles_validated': 0, - 'pipelines_executed': 0, - 'frontals_filtered_small': 0 # Track filtered detections + 'pipelines_executed': 0 } @@ -208,10 +202,6 @@ class TrackingPipelineIntegration: else: logger.debug(f"No tracking results or detections attribute") - # Filter out small frontal detections (neighboring pumps/distant cars) - if tracking_results and hasattr(tracking_results, 'detections'): - tracking_results = self._filter_small_frontals(tracking_results, frame) - # Process tracking results tracked_vehicles = self.tracker.process_detections( tracking_results, @@ -220,10 +210,8 @@ class TrackingPipelineIntegration: ) # Update last detection time for abandonment detection - # Update when vehicles ARE detected, so when they leave, timestamp ages if tracked_vehicles: self.last_detection_time[display_id] = time.time() - logger.debug(f"Updated last_detection_time for {display_id}: {len(tracked_vehicles)} vehicles") # Check for car abandonment (vehicle left after getting car_wait_staff stage) await self._check_car_abandonment(display_id, subscription_id) @@ -414,12 +402,27 @@ class TrackingPipelineIntegration: logger.info(f"Executing processing phase for session {session_id}, vehicle {vehicle.track_id}") # Capture high-quality snapshot for pipeline processing - logger.info(f"[PROCESSING PHASE] Fetching 2K snapshot for session {session_id}") - frame = self._fetch_snapshot() + frame = None + if self.subscription_info and self.subscription_info.stream_config.snapshot_url: + from ..streaming.readers import HTTPSnapshotReader - if frame is None: - logger.warning(f"[PROCESSING PHASE] Failed to capture snapshot, falling back to RTSP frame") - # Fall back to RTSP frame if snapshot fails + logger.info(f"[PROCESSING PHASE] Fetching 2K snapshot for session {session_id}") + snapshot_reader = HTTPSnapshotReader( + camera_id=self.subscription_info.camera_id, + snapshot_url=self.subscription_info.stream_config.snapshot_url, + max_retries=3 + ) + + frame = snapshot_reader.fetch_single_snapshot() + + if frame is not None: + logger.info(f"[PROCESSING PHASE] Successfully fetched {frame.shape[1]}x{frame.shape[0]} snapshot for pipeline") + else: + logger.warning(f"[PROCESSING PHASE] Failed to capture snapshot, falling back to RTSP frame") + # Fall back to RTSP frame if snapshot fails + frame = processing_data['frame'] + else: + logger.warning(f"[PROCESSING PHASE] No snapshot URL available, using RTSP frame") frame = processing_data['frame'] # Extract detected regions from detection phase result if available @@ -462,7 +465,7 @@ class TrackingPipelineIntegration: self.subscription_info = subscription_info logger.debug(f"Set subscription info with snapshot_url: {subscription_info.stream_config.snapshot_url if subscription_info else None}") - def set_session_id(self, display_id: str, session_id: str, subscription_id: str = None): + def set_session_id(self, display_id: str, session_id: str): """ Set session ID for a display (from backend). This is called when backend sends setSessionId after receiving imageDetection. @@ -470,18 +473,9 @@ class TrackingPipelineIntegration: Args: display_id: Display identifier session_id: Session identifier - subscription_id: Subscription identifier (displayId;cameraId) - needed for fallback """ - # Ensure session_id is always a string for consistent type handling - session_id = str(session_id) if session_id is not None else None self.active_sessions[display_id] = session_id - - # Store subscription_id for fallback usage - if subscription_id: - self.display_to_subscription[display_id] = subscription_id - logger.info(f"Set session {session_id} for display {display_id} with subscription {subscription_id}") - else: - logger.info(f"Set session {session_id} for display {display_id}") + logger.info(f"Set session {session_id} for display {display_id}") # Check if we have a pending vehicle for this display if display_id in self.pending_vehicles: @@ -522,25 +516,6 @@ class TrackingPipelineIntegration: else: logger.warning(f"No pending processing data found for display {display_id} when setting session {session_id}") - # FALLBACK: Execute pipeline for POS-initiated sessions - # Skip if session_id is None (no car present or car has left) - if session_id is not None: - # Use stored subscription_id instead of creating fake one - stored_subscription_id = self.display_to_subscription.get(display_id) - if stored_subscription_id: - logger.info(f"[FALLBACK] Triggering fallback pipeline for session {session_id} on display {display_id} with subscription {stored_subscription_id}") - - # Trigger the fallback pipeline asynchronously with real subscription_id - asyncio.create_task(self._execute_fallback_pipeline( - display_id=display_id, - session_id=session_id, - subscription_id=stored_subscription_id - )) - else: - logger.error(f"[FALLBACK] No subscription_id stored for display {display_id}, cannot execute fallback pipeline") - else: - logger.debug(f"[FALLBACK] Skipping pipeline execution for session_id=None on display {display_id}") - def clear_session_id(self, session_id: str): """ Clear session ID (post-fueling). @@ -590,7 +565,6 @@ class TrackingPipelineIntegration: self.cleared_sessions.clear() self.pending_vehicles.clear() self.pending_processing_data.clear() - self.display_to_subscription.clear() self.permanently_processed.clear() self.progression_stages.clear() self.last_detection_time.clear() @@ -634,16 +608,10 @@ class TrackingPipelineIntegration: last_detection = self.last_detection_time.get(session_display, 0) time_since_detection = current_time - last_detection - logger.info(f"[ABANDON CHECK] Session {session_id} (display: {session_display}): " - f"time_since_detection={time_since_detection:.1f}s, " - f"timeout={self.abandonment_timeout}s") - if time_since_detection > self.abandonment_timeout: - logger.warning(f"🚨 Car abandonment detected: session {session_id}, " + logger.info(f"Car abandonment detected: session {session_id}, " f"no detection for {time_since_detection:.1f}s") abandoned_sessions.append(session_id) - else: - logger.debug(f"[ABANDON CHECK] Session {session_id} has no associated display") # Send abandonment detection for each abandoned session for session_id in abandoned_sessions: @@ -651,7 +619,6 @@ class TrackingPipelineIntegration: # Remove from progression stages to avoid repeated detection if session_id in self.progression_stages: del self.progression_stages[session_id] - logger.info(f"[ABANDON] Removed session {session_id} from progression_stages after notification") async def _send_abandonment_detection(self, subscription_id: str, session_id: str): """ @@ -698,153 +665,6 @@ class TrackingPipelineIntegration: if stage == "car_wait_staff": logger.info(f"Started monitoring session {session_id} for car abandonment") - def _fetch_snapshot(self) -> Optional[np.ndarray]: - """ - Fetch high-quality snapshot from camera's snapshot URL. - Reusable method for both processing phase and fallback pipeline. - - Returns: - Snapshot frame or None if unavailable - """ - if not (self.subscription_info and self.subscription_info.stream_config.snapshot_url): - logger.warning("[SNAPSHOT] No subscription info or snapshot URL available") - return None - - try: - from ..streaming.readers import HTTPSnapshotReader - - logger.info(f"[SNAPSHOT] Fetching snapshot for {self.subscription_info.camera_id}") - snapshot_reader = HTTPSnapshotReader( - camera_id=self.subscription_info.camera_id, - snapshot_url=self.subscription_info.stream_config.snapshot_url, - max_retries=3 - ) - - frame = snapshot_reader.fetch_single_snapshot() - - if frame is not None: - logger.info(f"[SNAPSHOT] Successfully fetched {frame.shape[1]}x{frame.shape[0]} snapshot") - return frame - else: - logger.warning("[SNAPSHOT] Failed to fetch snapshot") - return None - - except Exception as e: - logger.error(f"[SNAPSHOT] Error fetching snapshot: {e}", exc_info=True) - return None - - async def _execute_fallback_pipeline(self, display_id: str, session_id: str, subscription_id: str): - """ - Execute fallback pipeline when sessionId is received without prior detection. - This handles POS-initiated sessions where backend starts transaction before car detection. - - Args: - display_id: Display identifier - session_id: Session ID from backend - subscription_id: Subscription identifier for pipeline execution - """ - try: - logger.info(f"[FALLBACK PIPELINE] Executing for session {session_id}, display {display_id}") - - # Fetch fresh snapshot from camera - frame = self._fetch_snapshot() - - if frame is None: - logger.error(f"[FALLBACK] Failed to fetch snapshot for session {session_id}, cannot execute pipeline") - return - - logger.info(f"[FALLBACK] Using snapshot frame {frame.shape[1]}x{frame.shape[0]} for session {session_id}") - - # Check if detection pipeline is available - if not self.detection_pipeline: - logger.error(f"[FALLBACK] Detection pipeline not available for session {session_id}") - return - - # Execute detection phase to get detected regions - detection_result = await self.detection_pipeline.execute_detection_phase( - frame=frame, - display_id=display_id, - subscription_id=subscription_id - ) - - logger.info(f"[FALLBACK] Detection phase completed for session {session_id}: " - f"status={detection_result.get('status', 'unknown')}, " - f"regions={list(detection_result.get('detected_regions', {}).keys())}") - - # If detection found regions, execute processing phase - detected_regions = detection_result.get('detected_regions', {}) - if detected_regions: - processing_result = await self.detection_pipeline.execute_processing_phase( - frame=frame, - display_id=display_id, - session_id=session_id, - subscription_id=subscription_id, - detected_regions=detected_regions - ) - - logger.info(f"[FALLBACK] Processing phase completed for session {session_id}: " - f"status={processing_result.get('status', 'unknown')}, " - f"branches={len(processing_result.get('branch_results', {}))}, " - f"actions={len(processing_result.get('actions_executed', []))}") - - # Update statistics - self.stats['pipelines_executed'] += 1 - - else: - logger.warning(f"[FALLBACK] No detections found in snapshot for session {session_id}") - - except Exception as e: - logger.error(f"[FALLBACK] Error executing fallback pipeline for session {session_id}: {e}", exc_info=True) - - def _filter_small_frontals(self, tracking_results, frame): - """ - Filter out frontal detections that are smaller than minimum bbox area percentage. - This prevents processing of cars from neighboring pumps that appear in camera view. - - Args: - tracking_results: YOLO tracking results with detections - frame: Input frame for calculating frame area - - Returns: - Modified tracking_results with small frontals removed - """ - if not hasattr(tracking_results, 'detections') or not tracking_results.detections: - return tracking_results - - # Calculate frame area and minimum bbox area threshold - frame_area = frame.shape[0] * frame.shape[1] # height * width - min_bbox_area = frame_area * (self.min_bbox_area_percentage / 100.0) - - # Filter detections - filtered_detections = [] - filtered_count = 0 - - for detection in tracking_results.detections: - # Calculate detection bbox area - bbox = detection.bbox # Assuming bbox is [x1, y1, x2, y2] - bbox_area = (bbox[2] - bbox[0]) * (bbox[3] - bbox[1]) - - if bbox_area >= min_bbox_area: - # Keep detection - bbox is large enough - filtered_detections.append(detection) - else: - # Filter out small detection - filtered_count += 1 - area_percentage = (bbox_area / frame_area) * 100 - logger.debug(f"Filtered small frontal: area={bbox_area:.0f}px² ({area_percentage:.1f}% of frame, " - f"min required: {self.min_bbox_area_percentage}%)") - - # Update tracking results with filtered detections - tracking_results.detections = filtered_detections - - # Update statistics - if filtered_count > 0: - self.stats['frontals_filtered_small'] += filtered_count - logger.info(f"Filtered {filtered_count} small frontal detections, " - f"{len(filtered_detections)} remaining (total filtered: {self.stats['frontals_filtered_small']})") - - return tracking_results - def cleanup(self): """Cleanup resources.""" self.executor.shutdown(wait=False) diff --git a/requirements.base.txt b/requirements.base.txt index b8af923..3511dd4 100644 --- a/requirements.base.txt +++ b/requirements.base.txt @@ -7,6 +7,4 @@ filterpy psycopg2-binary lap>=0.5.12 pynvml -PyTurboJPEG -PyNvVideoCodec -cupy-cuda12x \ No newline at end of file +PyTurboJPEG \ No newline at end of file