Compare commits
20 commits
fix/siwat-
...
dev
Author | SHA1 | Date | |
---|---|---|---|
b2e7bc499d | |||
|
402f7732a8 | ||
|
9e5b5a32ad | ||
|
3ed7a2cd53 | ||
|
793beb1571 | ||
|
354ed9ce3c | ||
|
e92efdbe11 | ||
|
8d2a71fcd7 | ||
|
fed71046a9 | ||
|
31bc91d57b | ||
|
fa0f865319 | ||
|
ee484b4655 | ||
|
52ba1ff316 | ||
eb57de02c3 | |||
b08ce27de2 | |||
8c08c815ce | |||
2b382210eb | |||
d8d1b33cd8 | |||
33d738b31b | |||
2808316e94 |
19 changed files with 3429 additions and 843 deletions
|
@ -2,7 +2,8 @@
|
||||||
"permissions": {
|
"permissions": {
|
||||||
"allow": [
|
"allow": [
|
||||||
"Bash(dir:*)",
|
"Bash(dir:*)",
|
||||||
"WebSearch"
|
"WebSearch",
|
||||||
|
"Bash(mkdir:*)"
|
||||||
],
|
],
|
||||||
"deny": [],
|
"deny": [],
|
||||||
"ask": []
|
"ask": []
|
||||||
|
|
380
app.py
380
app.py
|
@ -8,6 +8,7 @@ import os
|
||||||
import time
|
import time
|
||||||
import cv2
|
import cv2
|
||||||
from contextlib import asynccontextmanager
|
from contextlib import asynccontextmanager
|
||||||
|
from typing import Dict, Any
|
||||||
from fastapi import FastAPI, WebSocket, HTTPException
|
from fastapi import FastAPI, WebSocket, HTTPException
|
||||||
from fastapi.responses import Response
|
from fastapi.responses import Response
|
||||||
|
|
||||||
|
@ -31,21 +32,135 @@ logger.setLevel(logging.DEBUG)
|
||||||
# Frames are now stored in the shared cache buffer from core.streaming.buffers
|
# Frames are now stored in the shared cache buffer from core.streaming.buffers
|
||||||
# latest_frames = {} # Deprecated - using shared_cache_buffer instead
|
# latest_frames = {} # Deprecated - using shared_cache_buffer instead
|
||||||
|
|
||||||
|
|
||||||
|
# Health monitoring recovery handlers
|
||||||
|
def _handle_stream_restart_recovery(component: str, details: Dict[str, Any]) -> bool:
|
||||||
|
"""Handle stream restart recovery at the application level."""
|
||||||
|
try:
|
||||||
|
from core.streaming.manager import shared_stream_manager
|
||||||
|
|
||||||
|
# Extract camera ID from component name (e.g., "stream_cam-001" -> "cam-001")
|
||||||
|
if component.startswith("stream_"):
|
||||||
|
camera_id = component[7:] # Remove "stream_" prefix
|
||||||
|
else:
|
||||||
|
camera_id = component
|
||||||
|
|
||||||
|
logger.info(f"Attempting stream restart recovery for {camera_id}")
|
||||||
|
|
||||||
|
# Find and restart the subscription
|
||||||
|
subscriptions = shared_stream_manager.get_all_subscriptions()
|
||||||
|
for sub_info in subscriptions:
|
||||||
|
if sub_info.camera_id == camera_id:
|
||||||
|
# Remove and re-add the subscription
|
||||||
|
shared_stream_manager.remove_subscription(sub_info.subscription_id)
|
||||||
|
time.sleep(1.0) # Brief delay
|
||||||
|
|
||||||
|
# Re-add subscription
|
||||||
|
success = shared_stream_manager.add_subscription(
|
||||||
|
sub_info.subscription_id,
|
||||||
|
sub_info.stream_config,
|
||||||
|
sub_info.crop_coords,
|
||||||
|
sub_info.model_id,
|
||||||
|
sub_info.model_url,
|
||||||
|
sub_info.tracking_integration
|
||||||
|
)
|
||||||
|
|
||||||
|
if success:
|
||||||
|
logger.info(f"Stream restart recovery successful for {camera_id}")
|
||||||
|
return True
|
||||||
|
else:
|
||||||
|
logger.error(f"Stream restart recovery failed for {camera_id}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
logger.warning(f"No subscription found for camera {camera_id} during recovery")
|
||||||
|
return False
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error in stream restart recovery for {component}: {e}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def _handle_stream_reconnect_recovery(component: str, details: Dict[str, Any]) -> bool:
|
||||||
|
"""Handle stream reconnect recovery at the application level."""
|
||||||
|
try:
|
||||||
|
from core.streaming.manager import shared_stream_manager
|
||||||
|
|
||||||
|
# Extract camera ID from component name
|
||||||
|
if component.startswith("stream_"):
|
||||||
|
camera_id = component[7:]
|
||||||
|
else:
|
||||||
|
camera_id = component
|
||||||
|
|
||||||
|
logger.info(f"Attempting stream reconnect recovery for {camera_id}")
|
||||||
|
|
||||||
|
# For reconnect, we just need to trigger the stream's internal reconnect
|
||||||
|
# The stream readers handle their own reconnection logic
|
||||||
|
active_cameras = shared_stream_manager.get_active_cameras()
|
||||||
|
|
||||||
|
if camera_id in active_cameras:
|
||||||
|
logger.info(f"Stream reconnect recovery triggered for {camera_id}")
|
||||||
|
return True
|
||||||
|
else:
|
||||||
|
logger.warning(f"Camera {camera_id} not found in active cameras during reconnect recovery")
|
||||||
|
return False
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error in stream reconnect recovery for {component}: {e}")
|
||||||
|
return False
|
||||||
|
|
||||||
# Lifespan event handler (modern FastAPI approach)
|
# Lifespan event handler (modern FastAPI approach)
|
||||||
@asynccontextmanager
|
@asynccontextmanager
|
||||||
async def lifespan(app: FastAPI):
|
async def lifespan(app: FastAPI):
|
||||||
"""Application lifespan management."""
|
"""Application lifespan management."""
|
||||||
# Startup
|
# Startup
|
||||||
logger.info("Detector Worker started successfully")
|
logger.info("Detector Worker started successfully")
|
||||||
|
|
||||||
|
# Initialize health monitoring system
|
||||||
|
try:
|
||||||
|
from core.monitoring.health import health_monitor
|
||||||
|
from core.monitoring.stream_health import stream_health_tracker
|
||||||
|
from core.monitoring.thread_health import thread_health_monitor
|
||||||
|
from core.monitoring.recovery import recovery_manager
|
||||||
|
|
||||||
|
# Start health monitoring
|
||||||
|
health_monitor.start()
|
||||||
|
logger.info("Health monitoring system started")
|
||||||
|
|
||||||
|
# Register recovery handlers for stream management
|
||||||
|
from core.streaming.manager import shared_stream_manager
|
||||||
|
recovery_manager.register_recovery_handler(
|
||||||
|
"restart_stream",
|
||||||
|
_handle_stream_restart_recovery
|
||||||
|
)
|
||||||
|
recovery_manager.register_recovery_handler(
|
||||||
|
"reconnect",
|
||||||
|
_handle_stream_reconnect_recovery
|
||||||
|
)
|
||||||
|
|
||||||
|
logger.info("Recovery handlers registered")
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Failed to initialize health monitoring: {e}")
|
||||||
|
|
||||||
logger.info("WebSocket endpoint available at: ws://0.0.0.0:8001/")
|
logger.info("WebSocket endpoint available at: ws://0.0.0.0:8001/")
|
||||||
logger.info("HTTP camera endpoint available at: http://0.0.0.0:8001/camera/{camera_id}/image")
|
logger.info("HTTP camera endpoint available at: http://0.0.0.0:8001/camera/{camera_id}/image")
|
||||||
logger.info("Health check available at: http://0.0.0.0:8001/health")
|
logger.info("Health check available at: http://0.0.0.0:8001/health")
|
||||||
|
logger.info("Detailed health monitoring available at: http://0.0.0.0:8001/health/detailed")
|
||||||
logger.info("Ready and waiting for backend WebSocket connections")
|
logger.info("Ready and waiting for backend WebSocket connections")
|
||||||
|
|
||||||
yield
|
yield
|
||||||
|
|
||||||
# Shutdown
|
# Shutdown
|
||||||
logger.info("Detector Worker shutting down...")
|
logger.info("Detector Worker shutting down...")
|
||||||
|
|
||||||
|
# Stop health monitoring
|
||||||
|
try:
|
||||||
|
from core.monitoring.health import health_monitor
|
||||||
|
health_monitor.stop()
|
||||||
|
logger.info("Health monitoring system stopped")
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error stopping health monitoring: {e}")
|
||||||
|
|
||||||
# Clear all state
|
# Clear all state
|
||||||
worker_state.set_subscriptions([])
|
worker_state.set_subscriptions([])
|
||||||
worker_state.session_ids.clear()
|
worker_state.session_ids.clear()
|
||||||
|
@ -86,10 +201,11 @@ else:
|
||||||
os.makedirs("models", exist_ok=True)
|
os.makedirs("models", exist_ok=True)
|
||||||
logger.info("Ensured models directory exists")
|
logger.info("Ensured models directory exists")
|
||||||
|
|
||||||
# Initialize stream manager with config value
|
# Stream manager already initialized at module level with max_streams=20
|
||||||
from core.streaming import initialize_stream_manager
|
# Calling initialize_stream_manager() creates a NEW instance, breaking references
|
||||||
initialize_stream_manager(max_streams=config.get('max_streams', 10))
|
# from core.streaming import initialize_stream_manager
|
||||||
logger.info(f"Initialized stream manager with max_streams={config.get('max_streams', 10)}")
|
# initialize_stream_manager(max_streams=config.get('max_streams', 10))
|
||||||
|
logger.info(f"Using stream manager with max_streams=20 (module-level initialization)")
|
||||||
|
|
||||||
# Frames are now stored in the shared cache buffer from core.streaming.buffers
|
# Frames are now stored in the shared cache buffer from core.streaming.buffers
|
||||||
# latest_frames = {} # Deprecated - using shared_cache_buffer instead
|
# latest_frames = {} # Deprecated - using shared_cache_buffer instead
|
||||||
|
@ -186,6 +302,63 @@ async def get_camera_image(camera_id: str):
|
||||||
raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")
|
raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")
|
||||||
|
|
||||||
|
|
||||||
|
@app.get("/session-image/{session_id}")
|
||||||
|
async def get_session_image(session_id: int):
|
||||||
|
"""
|
||||||
|
HTTP endpoint to retrieve the saved session image by session ID.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
session_id: The session ID to retrieve the image for
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
JPEG image as binary response
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
HTTPException: 404 if no image found for the session
|
||||||
|
HTTPException: 500 if reading image fails
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
from pathlib import Path
|
||||||
|
import glob
|
||||||
|
|
||||||
|
# Images directory
|
||||||
|
images_dir = Path("images")
|
||||||
|
|
||||||
|
if not images_dir.exists():
|
||||||
|
logger.warning(f"Images directory does not exist")
|
||||||
|
raise HTTPException(
|
||||||
|
status_code=404,
|
||||||
|
detail=f"No images directory found"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Search for files matching session ID pattern: {session_id}_*
|
||||||
|
pattern = str(images_dir / f"{session_id}_*.jpg")
|
||||||
|
matching_files = glob.glob(pattern)
|
||||||
|
|
||||||
|
if not matching_files:
|
||||||
|
logger.warning(f"No image found for session {session_id}")
|
||||||
|
raise HTTPException(
|
||||||
|
status_code=404,
|
||||||
|
detail=f"No image found for session {session_id}"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Get the most recent file if multiple exist
|
||||||
|
most_recent_file = max(matching_files, key=os.path.getmtime)
|
||||||
|
logger.info(f"Found session image for session {session_id}: {most_recent_file}")
|
||||||
|
|
||||||
|
# Read the image file
|
||||||
|
image_data = open(most_recent_file, 'rb').read()
|
||||||
|
|
||||||
|
# Return image as binary response
|
||||||
|
return Response(content=image_data, media_type="image/jpeg")
|
||||||
|
|
||||||
|
except HTTPException:
|
||||||
|
raise
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error retrieving session image for session {session_id}: {str(e)}", exc_info=True)
|
||||||
|
raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")
|
||||||
|
|
||||||
|
|
||||||
@app.get("/health")
|
@app.get("/health")
|
||||||
async def health_check():
|
async def health_check():
|
||||||
"""Health check endpoint for monitoring."""
|
"""Health check endpoint for monitoring."""
|
||||||
|
@ -197,6 +370,205 @@ async def health_check():
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@app.get("/health/detailed")
|
||||||
|
async def detailed_health_check():
|
||||||
|
"""Comprehensive health status with detailed monitoring data."""
|
||||||
|
try:
|
||||||
|
from core.monitoring.health import health_monitor
|
||||||
|
from core.monitoring.stream_health import stream_health_tracker
|
||||||
|
from core.monitoring.thread_health import thread_health_monitor
|
||||||
|
from core.monitoring.recovery import recovery_manager
|
||||||
|
|
||||||
|
# Get comprehensive health status
|
||||||
|
overall_health = health_monitor.get_health_status()
|
||||||
|
stream_metrics = stream_health_tracker.get_all_metrics()
|
||||||
|
thread_info = thread_health_monitor.get_all_thread_info()
|
||||||
|
recovery_stats = recovery_manager.get_recovery_stats()
|
||||||
|
|
||||||
|
return {
|
||||||
|
"timestamp": time.time(),
|
||||||
|
"overall_health": overall_health,
|
||||||
|
"stream_metrics": stream_metrics,
|
||||||
|
"thread_health": thread_info,
|
||||||
|
"recovery_stats": recovery_stats,
|
||||||
|
"system_info": {
|
||||||
|
"active_subscriptions": len(worker_state.subscriptions),
|
||||||
|
"active_sessions": len(worker_state.session_ids),
|
||||||
|
"version": "2.0.0"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error generating detailed health report: {e}")
|
||||||
|
raise HTTPException(status_code=500, detail=f"Health monitoring error: {str(e)}")
|
||||||
|
|
||||||
|
|
||||||
|
@app.get("/health/streams")
|
||||||
|
async def stream_health_status():
|
||||||
|
"""Stream-specific health monitoring."""
|
||||||
|
try:
|
||||||
|
from core.monitoring.stream_health import stream_health_tracker
|
||||||
|
from core.streaming.buffers import shared_cache_buffer
|
||||||
|
|
||||||
|
stream_metrics = stream_health_tracker.get_all_metrics()
|
||||||
|
buffer_stats = shared_cache_buffer.get_stats()
|
||||||
|
|
||||||
|
return {
|
||||||
|
"timestamp": time.time(),
|
||||||
|
"stream_count": len(stream_metrics),
|
||||||
|
"stream_metrics": stream_metrics,
|
||||||
|
"buffer_stats": buffer_stats,
|
||||||
|
"frame_ages": {
|
||||||
|
camera_id: {
|
||||||
|
"age_seconds": time.time() - info["last_frame_time"] if info and info.get("last_frame_time") else None,
|
||||||
|
"total_frames": info.get("frame_count", 0) if info else 0
|
||||||
|
}
|
||||||
|
for camera_id, info in stream_metrics.items()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error generating stream health report: {e}")
|
||||||
|
raise HTTPException(status_code=500, detail=f"Stream health error: {str(e)}")
|
||||||
|
|
||||||
|
|
||||||
|
@app.get("/health/threads")
|
||||||
|
async def thread_health_status():
|
||||||
|
"""Thread-specific health monitoring."""
|
||||||
|
try:
|
||||||
|
from core.monitoring.thread_health import thread_health_monitor
|
||||||
|
|
||||||
|
thread_info = thread_health_monitor.get_all_thread_info()
|
||||||
|
deadlocks = thread_health_monitor.detect_deadlocks()
|
||||||
|
|
||||||
|
return {
|
||||||
|
"timestamp": time.time(),
|
||||||
|
"thread_count": len(thread_info),
|
||||||
|
"thread_info": thread_info,
|
||||||
|
"potential_deadlocks": deadlocks,
|
||||||
|
"summary": {
|
||||||
|
"responsive_threads": sum(1 for info in thread_info.values() if info.get("is_responsive", False)),
|
||||||
|
"unresponsive_threads": sum(1 for info in thread_info.values() if not info.get("is_responsive", True)),
|
||||||
|
"deadlock_count": len(deadlocks)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error generating thread health report: {e}")
|
||||||
|
raise HTTPException(status_code=500, detail=f"Thread health error: {str(e)}")
|
||||||
|
|
||||||
|
|
||||||
|
@app.get("/health/recovery")
|
||||||
|
async def recovery_status():
|
||||||
|
"""Recovery system status and history."""
|
||||||
|
try:
|
||||||
|
from core.monitoring.recovery import recovery_manager
|
||||||
|
|
||||||
|
recovery_stats = recovery_manager.get_recovery_stats()
|
||||||
|
|
||||||
|
return {
|
||||||
|
"timestamp": time.time(),
|
||||||
|
"recovery_stats": recovery_stats,
|
||||||
|
"summary": {
|
||||||
|
"total_recoveries_last_hour": recovery_stats.get("total_recoveries_last_hour", 0),
|
||||||
|
"components_with_recovery_state": len(recovery_stats.get("recovery_states", {})),
|
||||||
|
"total_recovery_failures": sum(
|
||||||
|
state.get("failure_count", 0)
|
||||||
|
for state in recovery_stats.get("recovery_states", {}).values()
|
||||||
|
),
|
||||||
|
"total_recovery_successes": sum(
|
||||||
|
state.get("success_count", 0)
|
||||||
|
for state in recovery_stats.get("recovery_states", {}).values()
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error generating recovery status report: {e}")
|
||||||
|
raise HTTPException(status_code=500, detail=f"Recovery status error: {str(e)}")
|
||||||
|
|
||||||
|
|
||||||
|
@app.post("/health/recovery/force/{component}")
|
||||||
|
async def force_recovery(component: str, action: str = "restart_stream"):
|
||||||
|
"""Force recovery action for a specific component."""
|
||||||
|
try:
|
||||||
|
from core.monitoring.recovery import recovery_manager, RecoveryAction
|
||||||
|
|
||||||
|
# Validate action
|
||||||
|
try:
|
||||||
|
recovery_action = RecoveryAction(action)
|
||||||
|
except ValueError:
|
||||||
|
raise HTTPException(
|
||||||
|
status_code=400,
|
||||||
|
detail=f"Invalid recovery action: {action}. Valid actions: {[a.value for a in RecoveryAction]}"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Force recovery
|
||||||
|
success = recovery_manager.force_recovery(component, recovery_action, "manual_api_request")
|
||||||
|
|
||||||
|
return {
|
||||||
|
"timestamp": time.time(),
|
||||||
|
"component": component,
|
||||||
|
"action": action,
|
||||||
|
"success": success,
|
||||||
|
"message": f"Recovery {'successful' if success else 'failed'} for component {component}"
|
||||||
|
}
|
||||||
|
|
||||||
|
except HTTPException:
|
||||||
|
raise
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error forcing recovery for {component}: {e}")
|
||||||
|
raise HTTPException(status_code=500, detail=f"Recovery error: {str(e)}")
|
||||||
|
|
||||||
|
|
||||||
|
@app.get("/health/metrics")
|
||||||
|
async def health_metrics():
|
||||||
|
"""Performance and health metrics in a format suitable for monitoring systems."""
|
||||||
|
try:
|
||||||
|
from core.monitoring.health import health_monitor
|
||||||
|
from core.monitoring.stream_health import stream_health_tracker
|
||||||
|
from core.streaming.buffers import shared_cache_buffer
|
||||||
|
|
||||||
|
# Get basic metrics
|
||||||
|
overall_health = health_monitor.get_health_status()
|
||||||
|
stream_metrics = stream_health_tracker.get_all_metrics()
|
||||||
|
buffer_stats = shared_cache_buffer.get_stats()
|
||||||
|
|
||||||
|
# Format for monitoring systems (Prometheus-style)
|
||||||
|
metrics = {
|
||||||
|
"detector_worker_up": 1,
|
||||||
|
"detector_worker_streams_total": len(stream_metrics),
|
||||||
|
"detector_worker_subscriptions_total": len(worker_state.subscriptions),
|
||||||
|
"detector_worker_sessions_total": len(worker_state.session_ids),
|
||||||
|
"detector_worker_memory_mb": buffer_stats.get("total_memory_mb", 0),
|
||||||
|
"detector_worker_health_status": {
|
||||||
|
"healthy": 1,
|
||||||
|
"warning": 2,
|
||||||
|
"critical": 3,
|
||||||
|
"unknown": 4
|
||||||
|
}.get(overall_health.get("overall_status", "unknown"), 4)
|
||||||
|
}
|
||||||
|
|
||||||
|
# Add per-stream metrics
|
||||||
|
for camera_id, stream_info in stream_metrics.items():
|
||||||
|
safe_camera_id = camera_id.replace("-", "_").replace(".", "_")
|
||||||
|
metrics.update({
|
||||||
|
f"detector_worker_stream_frames_total{{camera=\"{safe_camera_id}\"}}": stream_info.get("frame_count", 0),
|
||||||
|
f"detector_worker_stream_errors_total{{camera=\"{safe_camera_id}\"}}": stream_info.get("error_count", 0),
|
||||||
|
f"detector_worker_stream_fps{{camera=\"{safe_camera_id}\"}}": stream_info.get("frames_per_second", 0),
|
||||||
|
f"detector_worker_stream_frame_age_seconds{{camera=\"{safe_camera_id}\"}}": stream_info.get("last_frame_age_seconds") or 0
|
||||||
|
})
|
||||||
|
|
||||||
|
return {
|
||||||
|
"timestamp": time.time(),
|
||||||
|
"metrics": metrics
|
||||||
|
}
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error generating health metrics: {e}")
|
||||||
|
raise HTTPException(status_code=500, detail=f"Metrics error: {str(e)}")
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
|
|
@ -539,7 +539,7 @@ class WebSocketHandler:
|
||||||
async def _handle_set_session_id(self, message: SetSessionIdMessage) -> None:
|
async def _handle_set_session_id(self, message: SetSessionIdMessage) -> None:
|
||||||
"""Handle setSessionId message."""
|
"""Handle setSessionId message."""
|
||||||
display_identifier = message.payload.displayIdentifier
|
display_identifier = message.payload.displayIdentifier
|
||||||
session_id = message.payload.sessionId
|
session_id = str(message.payload.sessionId) if message.payload.sessionId is not None else None
|
||||||
|
|
||||||
logger.info(f"[RX Processing] setSessionId for display {display_identifier}: {session_id}")
|
logger.info(f"[RX Processing] setSessionId for display {display_identifier}: {session_id}")
|
||||||
|
|
||||||
|
|
|
@ -64,6 +64,10 @@ class DetectionPipeline:
|
||||||
# SessionId to processing results mapping (for combining with license plate results)
|
# SessionId to processing results mapping (for combining with license plate results)
|
||||||
self.session_processing_results = {}
|
self.session_processing_results = {}
|
||||||
|
|
||||||
|
# Field mappings from parallelActions (e.g., {"car_brand": "{car_brand_cls_v3.brand}"})
|
||||||
|
self.field_mappings = {}
|
||||||
|
self._parse_field_mappings()
|
||||||
|
|
||||||
# Statistics
|
# Statistics
|
||||||
self.stats = {
|
self.stats = {
|
||||||
'detections_processed': 0,
|
'detections_processed': 0,
|
||||||
|
@ -74,6 +78,25 @@ class DetectionPipeline:
|
||||||
|
|
||||||
logger.info("DetectionPipeline initialized")
|
logger.info("DetectionPipeline initialized")
|
||||||
|
|
||||||
|
def _parse_field_mappings(self):
|
||||||
|
"""
|
||||||
|
Parse field mappings from parallelActions.postgresql_update_combined.fields.
|
||||||
|
Extracts mappings like {"car_brand": "{car_brand_cls_v3.brand}"} for dynamic field resolution.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
if not self.pipeline_config or not hasattr(self.pipeline_config, 'parallel_actions'):
|
||||||
|
return
|
||||||
|
|
||||||
|
for action in self.pipeline_config.parallel_actions:
|
||||||
|
if action.type.value == 'postgresql_update_combined':
|
||||||
|
fields = action.params.get('fields', {})
|
||||||
|
self.field_mappings = fields
|
||||||
|
logger.info(f"[FIELD MAPPINGS] Parsed from pipeline config: {self.field_mappings}")
|
||||||
|
break
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error parsing field mappings: {e}", exc_info=True)
|
||||||
|
|
||||||
async def initialize(self) -> bool:
|
async def initialize(self) -> bool:
|
||||||
"""
|
"""
|
||||||
Initialize all pipeline components including models, Redis, and database.
|
Initialize all pipeline components including models, Redis, and database.
|
||||||
|
@ -165,6 +188,44 @@ class DetectionPipeline:
|
||||||
logger.error(f"Error initializing detection model: {e}", exc_info=True)
|
logger.error(f"Error initializing detection model: {e}", exc_info=True)
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
def _extract_fields_from_branches(self, branch_results: Dict[str, Any]) -> Dict[str, Any]:
|
||||||
|
"""
|
||||||
|
Extract fields dynamically from branch results using field mappings.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
branch_results: Dictionary of branch execution results
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Dictionary with extracted field values (e.g., {"car_brand": "Honda", "body_type": "Sedan"})
|
||||||
|
"""
|
||||||
|
extracted = {}
|
||||||
|
|
||||||
|
try:
|
||||||
|
for db_field_name, template in self.field_mappings.items():
|
||||||
|
# Parse template like "{car_brand_cls_v3.brand}" -> branch_id="car_brand_cls_v3", field="brand"
|
||||||
|
if template.startswith('{') and template.endswith('}'):
|
||||||
|
var_name = template[1:-1]
|
||||||
|
if '.' in var_name:
|
||||||
|
branch_id, field_name = var_name.split('.', 1)
|
||||||
|
|
||||||
|
# Look up value in branch_results
|
||||||
|
if branch_id in branch_results:
|
||||||
|
branch_data = branch_results[branch_id]
|
||||||
|
if isinstance(branch_data, dict) and 'result' in branch_data:
|
||||||
|
result_data = branch_data['result']
|
||||||
|
if isinstance(result_data, dict) and field_name in result_data:
|
||||||
|
extracted[field_name] = result_data[field_name]
|
||||||
|
logger.debug(f"[DYNAMIC EXTRACT] {field_name}={result_data[field_name]} from branch {branch_id}")
|
||||||
|
else:
|
||||||
|
logger.debug(f"[DYNAMIC EXTRACT] Field '{field_name}' not found in branch {branch_id}")
|
||||||
|
else:
|
||||||
|
logger.debug(f"[DYNAMIC EXTRACT] Branch '{branch_id}' not in results")
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error extracting fields from branches: {e}", exc_info=True)
|
||||||
|
|
||||||
|
return extracted
|
||||||
|
|
||||||
async def _on_license_plate_result(self, session_id: str, license_data: Dict[str, Any]):
|
async def _on_license_plate_result(self, session_id: str, license_data: Dict[str, Any]):
|
||||||
"""
|
"""
|
||||||
Callback for handling license plate results from LPR service.
|
Callback for handling license plate results from LPR service.
|
||||||
|
@ -272,12 +333,12 @@ class DetectionPipeline:
|
||||||
branch_results = self.session_processing_results[session_id_for_lookup]
|
branch_results = self.session_processing_results[session_id_for_lookup]
|
||||||
logger.info(f"[LICENSE PLATE] Retrieved processing results for session {session_id_for_lookup}")
|
logger.info(f"[LICENSE PLATE] Retrieved processing results for session {session_id_for_lookup}")
|
||||||
|
|
||||||
if 'car_brand_cls_v2' in branch_results:
|
# Extract fields dynamically using field mappings from pipeline config
|
||||||
brand_result = branch_results['car_brand_cls_v2'].get('result', {})
|
extracted_fields = self._extract_fields_from_branches(branch_results)
|
||||||
car_brand = brand_result.get('brand')
|
car_brand = extracted_fields.get('brand')
|
||||||
if 'car_bodytype_cls_v1' in branch_results:
|
body_type = extracted_fields.get('body_type')
|
||||||
bodytype_result = branch_results['car_bodytype_cls_v1'].get('result', {})
|
|
||||||
body_type = bodytype_result.get('body_type')
|
logger.info(f"[LICENSE PLATE] Extracted fields: brand={car_brand}, body_type={body_type}")
|
||||||
|
|
||||||
# Clean up stored results after use
|
# Clean up stored results after use
|
||||||
del self.session_processing_results[session_id_for_lookup]
|
del self.session_processing_results[session_id_for_lookup]
|
||||||
|
@ -1003,7 +1064,7 @@ class DetectionPipeline:
|
||||||
Resolve field template using branch results and context.
|
Resolve field template using branch results and context.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
template: Template string like "{car_brand_cls_v2.brand}"
|
template: Template string like "{car_brand_cls_v3.brand}"
|
||||||
branch_results: Dictionary of branch execution results
|
branch_results: Dictionary of branch execution results
|
||||||
context: Detection context
|
context: Detection context
|
||||||
|
|
||||||
|
@ -1015,7 +1076,7 @@ class DetectionPipeline:
|
||||||
if template.startswith('{') and template.endswith('}'):
|
if template.startswith('{') and template.endswith('}'):
|
||||||
var_name = template[1:-1]
|
var_name = template[1:-1]
|
||||||
|
|
||||||
# Check for branch result reference (e.g., "car_brand_cls_v2.brand")
|
# Check for branch result reference (e.g., "car_brand_cls_v3.brand")
|
||||||
if '.' in var_name:
|
if '.' in var_name:
|
||||||
branch_id, field_name = var_name.split('.', 1)
|
branch_id, field_name = var_name.split('.', 1)
|
||||||
if branch_id in branch_results:
|
if branch_id in branch_results:
|
||||||
|
@ -1061,17 +1122,10 @@ class DetectionPipeline:
|
||||||
logger.warning("No session_id in context for processing results")
|
logger.warning("No session_id in context for processing results")
|
||||||
return
|
return
|
||||||
|
|
||||||
# Extract car brand from car_brand_cls_v2 results
|
# Extract fields dynamically using field mappings from pipeline config
|
||||||
car_brand = None
|
extracted_fields = self._extract_fields_from_branches(branch_results)
|
||||||
if 'car_brand_cls_v2' in branch_results:
|
car_brand = extracted_fields.get('brand')
|
||||||
brand_result = branch_results['car_brand_cls_v2'].get('result', {})
|
body_type = extracted_fields.get('body_type')
|
||||||
car_brand = brand_result.get('brand')
|
|
||||||
|
|
||||||
# Extract body type from car_bodytype_cls_v1 results
|
|
||||||
body_type = None
|
|
||||||
if 'car_bodytype_cls_v1' in branch_results:
|
|
||||||
bodytype_result = branch_results['car_bodytype_cls_v1'].get('result', {})
|
|
||||||
body_type = bodytype_result.get('body_type')
|
|
||||||
|
|
||||||
logger.info(f"[PROCESSING RESULTS] Completed for session {session_id}: "
|
logger.info(f"[PROCESSING RESULTS] Completed for session {session_id}: "
|
||||||
f"brand={car_brand}, bodyType={body_type}")
|
f"brand={car_brand}, bodyType={body_type}")
|
||||||
|
|
18
core/monitoring/__init__.py
Normal file
18
core/monitoring/__init__.py
Normal file
|
@ -0,0 +1,18 @@
|
||||||
|
"""
|
||||||
|
Comprehensive health monitoring system for detector worker.
|
||||||
|
Tracks stream health, thread responsiveness, and system performance.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from .health import HealthMonitor, HealthStatus, HealthCheck
|
||||||
|
from .stream_health import StreamHealthTracker
|
||||||
|
from .thread_health import ThreadHealthMonitor
|
||||||
|
from .recovery import RecoveryManager
|
||||||
|
|
||||||
|
__all__ = [
|
||||||
|
'HealthMonitor',
|
||||||
|
'HealthStatus',
|
||||||
|
'HealthCheck',
|
||||||
|
'StreamHealthTracker',
|
||||||
|
'ThreadHealthMonitor',
|
||||||
|
'RecoveryManager'
|
||||||
|
]
|
456
core/monitoring/health.py
Normal file
456
core/monitoring/health.py
Normal file
|
@ -0,0 +1,456 @@
|
||||||
|
"""
|
||||||
|
Core health monitoring system for comprehensive stream and system health tracking.
|
||||||
|
Provides centralized health status, alerting, and recovery coordination.
|
||||||
|
"""
|
||||||
|
import time
|
||||||
|
import threading
|
||||||
|
import logging
|
||||||
|
import psutil
|
||||||
|
from typing import Dict, List, Optional, Any, Callable
|
||||||
|
from dataclasses import dataclass, field
|
||||||
|
from enum import Enum
|
||||||
|
from collections import defaultdict, deque
|
||||||
|
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class HealthStatus(Enum):
|
||||||
|
"""Health status levels."""
|
||||||
|
HEALTHY = "healthy"
|
||||||
|
WARNING = "warning"
|
||||||
|
CRITICAL = "critical"
|
||||||
|
UNKNOWN = "unknown"
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class HealthCheck:
|
||||||
|
"""Individual health check result."""
|
||||||
|
name: str
|
||||||
|
status: HealthStatus
|
||||||
|
message: str
|
||||||
|
timestamp: float = field(default_factory=time.time)
|
||||||
|
details: Dict[str, Any] = field(default_factory=dict)
|
||||||
|
recovery_action: Optional[str] = None
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class HealthMetrics:
|
||||||
|
"""Health metrics for a component."""
|
||||||
|
component_id: str
|
||||||
|
last_update: float
|
||||||
|
frame_count: int = 0
|
||||||
|
error_count: int = 0
|
||||||
|
warning_count: int = 0
|
||||||
|
restart_count: int = 0
|
||||||
|
avg_frame_interval: float = 0.0
|
||||||
|
last_frame_time: Optional[float] = None
|
||||||
|
thread_alive: bool = True
|
||||||
|
connection_healthy: bool = True
|
||||||
|
memory_usage_mb: float = 0.0
|
||||||
|
cpu_usage_percent: float = 0.0
|
||||||
|
|
||||||
|
|
||||||
|
class HealthMonitor:
|
||||||
|
"""Comprehensive health monitoring system."""
|
||||||
|
|
||||||
|
def __init__(self, check_interval: float = 30.0):
|
||||||
|
"""
|
||||||
|
Initialize health monitor.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
check_interval: Interval between health checks in seconds
|
||||||
|
"""
|
||||||
|
self.check_interval = check_interval
|
||||||
|
self.running = False
|
||||||
|
self.monitor_thread = None
|
||||||
|
self._lock = threading.RLock()
|
||||||
|
|
||||||
|
# Health data storage
|
||||||
|
self.health_checks: Dict[str, HealthCheck] = {}
|
||||||
|
self.metrics: Dict[str, HealthMetrics] = {}
|
||||||
|
self.alert_history: deque = deque(maxlen=1000)
|
||||||
|
self.recovery_actions: deque = deque(maxlen=500)
|
||||||
|
|
||||||
|
# Thresholds (configurable)
|
||||||
|
self.thresholds = {
|
||||||
|
'frame_stale_warning_seconds': 120, # 2 minutes
|
||||||
|
'frame_stale_critical_seconds': 300, # 5 minutes
|
||||||
|
'thread_unresponsive_seconds': 60, # 1 minute
|
||||||
|
'memory_warning_mb': 500, # 500MB per stream
|
||||||
|
'memory_critical_mb': 1000, # 1GB per stream
|
||||||
|
'cpu_warning_percent': 80, # 80% CPU
|
||||||
|
'cpu_critical_percent': 95, # 95% CPU
|
||||||
|
'error_rate_warning': 0.1, # 10% error rate
|
||||||
|
'error_rate_critical': 0.3, # 30% error rate
|
||||||
|
'restart_threshold': 3 # Max restarts per hour
|
||||||
|
}
|
||||||
|
|
||||||
|
# Health check functions
|
||||||
|
self.health_checkers: List[Callable[[], List[HealthCheck]]] = []
|
||||||
|
self.recovery_callbacks: Dict[str, Callable[[str, HealthCheck], bool]] = {}
|
||||||
|
|
||||||
|
# System monitoring
|
||||||
|
self.process = psutil.Process()
|
||||||
|
self.system_start_time = time.time()
|
||||||
|
|
||||||
|
def start(self):
|
||||||
|
"""Start health monitoring."""
|
||||||
|
if self.running:
|
||||||
|
logger.warning("Health monitor already running")
|
||||||
|
return
|
||||||
|
|
||||||
|
self.running = True
|
||||||
|
self.monitor_thread = threading.Thread(target=self._monitor_loop, daemon=True)
|
||||||
|
self.monitor_thread.start()
|
||||||
|
logger.info(f"Health monitor started (check interval: {self.check_interval}s)")
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
"""Stop health monitoring."""
|
||||||
|
self.running = False
|
||||||
|
if self.monitor_thread:
|
||||||
|
self.monitor_thread.join(timeout=5.0)
|
||||||
|
logger.info("Health monitor stopped")
|
||||||
|
|
||||||
|
def register_health_checker(self, checker: Callable[[], List[HealthCheck]]):
|
||||||
|
"""Register a health check function."""
|
||||||
|
self.health_checkers.append(checker)
|
||||||
|
logger.debug(f"Registered health checker: {checker.__name__}")
|
||||||
|
|
||||||
|
def register_recovery_callback(self, component: str, callback: Callable[[str, HealthCheck], bool]):
|
||||||
|
"""Register a recovery callback for a component."""
|
||||||
|
self.recovery_callbacks[component] = callback
|
||||||
|
logger.debug(f"Registered recovery callback for {component}")
|
||||||
|
|
||||||
|
def update_metrics(self, component_id: str, **kwargs):
|
||||||
|
"""Update metrics for a component."""
|
||||||
|
with self._lock:
|
||||||
|
if component_id not in self.metrics:
|
||||||
|
self.metrics[component_id] = HealthMetrics(
|
||||||
|
component_id=component_id,
|
||||||
|
last_update=time.time()
|
||||||
|
)
|
||||||
|
|
||||||
|
metrics = self.metrics[component_id]
|
||||||
|
metrics.last_update = time.time()
|
||||||
|
|
||||||
|
# Update provided metrics
|
||||||
|
for key, value in kwargs.items():
|
||||||
|
if hasattr(metrics, key):
|
||||||
|
setattr(metrics, key, value)
|
||||||
|
|
||||||
|
def report_frame_received(self, component_id: str):
|
||||||
|
"""Report that a frame was received for a component."""
|
||||||
|
current_time = time.time()
|
||||||
|
with self._lock:
|
||||||
|
if component_id not in self.metrics:
|
||||||
|
self.metrics[component_id] = HealthMetrics(
|
||||||
|
component_id=component_id,
|
||||||
|
last_update=current_time
|
||||||
|
)
|
||||||
|
|
||||||
|
metrics = self.metrics[component_id]
|
||||||
|
|
||||||
|
# Update frame metrics
|
||||||
|
if metrics.last_frame_time:
|
||||||
|
interval = current_time - metrics.last_frame_time
|
||||||
|
# Moving average of frame intervals
|
||||||
|
if metrics.avg_frame_interval == 0:
|
||||||
|
metrics.avg_frame_interval = interval
|
||||||
|
else:
|
||||||
|
metrics.avg_frame_interval = (metrics.avg_frame_interval * 0.9) + (interval * 0.1)
|
||||||
|
|
||||||
|
metrics.last_frame_time = current_time
|
||||||
|
metrics.frame_count += 1
|
||||||
|
metrics.last_update = current_time
|
||||||
|
|
||||||
|
def report_error(self, component_id: str, error_type: str = "general"):
|
||||||
|
"""Report an error for a component."""
|
||||||
|
with self._lock:
|
||||||
|
if component_id not in self.metrics:
|
||||||
|
self.metrics[component_id] = HealthMetrics(
|
||||||
|
component_id=component_id,
|
||||||
|
last_update=time.time()
|
||||||
|
)
|
||||||
|
|
||||||
|
self.metrics[component_id].error_count += 1
|
||||||
|
self.metrics[component_id].last_update = time.time()
|
||||||
|
|
||||||
|
logger.debug(f"Error reported for {component_id}: {error_type}")
|
||||||
|
|
||||||
|
def report_warning(self, component_id: str, warning_type: str = "general"):
|
||||||
|
"""Report a warning for a component."""
|
||||||
|
with self._lock:
|
||||||
|
if component_id not in self.metrics:
|
||||||
|
self.metrics[component_id] = HealthMetrics(
|
||||||
|
component_id=component_id,
|
||||||
|
last_update=time.time()
|
||||||
|
)
|
||||||
|
|
||||||
|
self.metrics[component_id].warning_count += 1
|
||||||
|
self.metrics[component_id].last_update = time.time()
|
||||||
|
|
||||||
|
logger.debug(f"Warning reported for {component_id}: {warning_type}")
|
||||||
|
|
||||||
|
def report_restart(self, component_id: str):
|
||||||
|
"""Report that a component was restarted."""
|
||||||
|
with self._lock:
|
||||||
|
if component_id not in self.metrics:
|
||||||
|
self.metrics[component_id] = HealthMetrics(
|
||||||
|
component_id=component_id,
|
||||||
|
last_update=time.time()
|
||||||
|
)
|
||||||
|
|
||||||
|
self.metrics[component_id].restart_count += 1
|
||||||
|
self.metrics[component_id].last_update = time.time()
|
||||||
|
|
||||||
|
# Log recovery action
|
||||||
|
recovery_action = {
|
||||||
|
'timestamp': time.time(),
|
||||||
|
'component': component_id,
|
||||||
|
'action': 'restart',
|
||||||
|
'reason': 'manual_restart'
|
||||||
|
}
|
||||||
|
|
||||||
|
with self._lock:
|
||||||
|
self.recovery_actions.append(recovery_action)
|
||||||
|
|
||||||
|
logger.info(f"Restart reported for {component_id}")
|
||||||
|
|
||||||
|
def get_health_status(self, component_id: Optional[str] = None) -> Dict[str, Any]:
|
||||||
|
"""Get comprehensive health status."""
|
||||||
|
with self._lock:
|
||||||
|
if component_id:
|
||||||
|
# Get health for specific component
|
||||||
|
return self._get_component_health(component_id)
|
||||||
|
else:
|
||||||
|
# Get overall health status
|
||||||
|
return self._get_overall_health()
|
||||||
|
|
||||||
|
def _get_component_health(self, component_id: str) -> Dict[str, Any]:
|
||||||
|
"""Get health status for a specific component."""
|
||||||
|
if component_id not in self.metrics:
|
||||||
|
return {
|
||||||
|
'component_id': component_id,
|
||||||
|
'status': HealthStatus.UNKNOWN.value,
|
||||||
|
'message': 'No metrics available',
|
||||||
|
'metrics': {}
|
||||||
|
}
|
||||||
|
|
||||||
|
metrics = self.metrics[component_id]
|
||||||
|
current_time = time.time()
|
||||||
|
|
||||||
|
# Determine health status
|
||||||
|
status = HealthStatus.HEALTHY
|
||||||
|
issues = []
|
||||||
|
|
||||||
|
# Check frame freshness
|
||||||
|
if metrics.last_frame_time:
|
||||||
|
frame_age = current_time - metrics.last_frame_time
|
||||||
|
if frame_age > self.thresholds['frame_stale_critical_seconds']:
|
||||||
|
status = HealthStatus.CRITICAL
|
||||||
|
issues.append(f"Frames stale for {frame_age:.1f}s")
|
||||||
|
elif frame_age > self.thresholds['frame_stale_warning_seconds']:
|
||||||
|
if status == HealthStatus.HEALTHY:
|
||||||
|
status = HealthStatus.WARNING
|
||||||
|
issues.append(f"Frames aging ({frame_age:.1f}s)")
|
||||||
|
|
||||||
|
# Check error rates
|
||||||
|
if metrics.frame_count > 0:
|
||||||
|
error_rate = metrics.error_count / metrics.frame_count
|
||||||
|
if error_rate > self.thresholds['error_rate_critical']:
|
||||||
|
status = HealthStatus.CRITICAL
|
||||||
|
issues.append(f"High error rate ({error_rate:.1%})")
|
||||||
|
elif error_rate > self.thresholds['error_rate_warning']:
|
||||||
|
if status == HealthStatus.HEALTHY:
|
||||||
|
status = HealthStatus.WARNING
|
||||||
|
issues.append(f"Elevated error rate ({error_rate:.1%})")
|
||||||
|
|
||||||
|
# Check restart frequency
|
||||||
|
restart_rate = metrics.restart_count / max(1, (current_time - self.system_start_time) / 3600)
|
||||||
|
if restart_rate > self.thresholds['restart_threshold']:
|
||||||
|
status = HealthStatus.CRITICAL
|
||||||
|
issues.append(f"Frequent restarts ({restart_rate:.1f}/hour)")
|
||||||
|
|
||||||
|
# Check thread health
|
||||||
|
if not metrics.thread_alive:
|
||||||
|
status = HealthStatus.CRITICAL
|
||||||
|
issues.append("Thread not alive")
|
||||||
|
|
||||||
|
# Check connection health
|
||||||
|
if not metrics.connection_healthy:
|
||||||
|
if status == HealthStatus.HEALTHY:
|
||||||
|
status = HealthStatus.WARNING
|
||||||
|
issues.append("Connection unhealthy")
|
||||||
|
|
||||||
|
return {
|
||||||
|
'component_id': component_id,
|
||||||
|
'status': status.value,
|
||||||
|
'message': '; '.join(issues) if issues else 'All checks passing',
|
||||||
|
'metrics': {
|
||||||
|
'frame_count': metrics.frame_count,
|
||||||
|
'error_count': metrics.error_count,
|
||||||
|
'warning_count': metrics.warning_count,
|
||||||
|
'restart_count': metrics.restart_count,
|
||||||
|
'avg_frame_interval': metrics.avg_frame_interval,
|
||||||
|
'last_frame_age': current_time - metrics.last_frame_time if metrics.last_frame_time else None,
|
||||||
|
'thread_alive': metrics.thread_alive,
|
||||||
|
'connection_healthy': metrics.connection_healthy,
|
||||||
|
'memory_usage_mb': metrics.memory_usage_mb,
|
||||||
|
'cpu_usage_percent': metrics.cpu_usage_percent,
|
||||||
|
'uptime_seconds': current_time - self.system_start_time
|
||||||
|
},
|
||||||
|
'last_update': metrics.last_update
|
||||||
|
}
|
||||||
|
|
||||||
|
def _get_overall_health(self) -> Dict[str, Any]:
|
||||||
|
"""Get overall system health status."""
|
||||||
|
current_time = time.time()
|
||||||
|
components = {}
|
||||||
|
overall_status = HealthStatus.HEALTHY
|
||||||
|
|
||||||
|
# Get health for all components
|
||||||
|
for component_id in self.metrics.keys():
|
||||||
|
component_health = self._get_component_health(component_id)
|
||||||
|
components[component_id] = component_health
|
||||||
|
|
||||||
|
# Determine overall status
|
||||||
|
component_status = HealthStatus(component_health['status'])
|
||||||
|
if component_status == HealthStatus.CRITICAL:
|
||||||
|
overall_status = HealthStatus.CRITICAL
|
||||||
|
elif component_status == HealthStatus.WARNING and overall_status == HealthStatus.HEALTHY:
|
||||||
|
overall_status = HealthStatus.WARNING
|
||||||
|
|
||||||
|
# System metrics
|
||||||
|
try:
|
||||||
|
system_memory = self.process.memory_info()
|
||||||
|
system_cpu = self.process.cpu_percent()
|
||||||
|
except Exception:
|
||||||
|
system_memory = None
|
||||||
|
system_cpu = 0.0
|
||||||
|
|
||||||
|
return {
|
||||||
|
'overall_status': overall_status.value,
|
||||||
|
'timestamp': current_time,
|
||||||
|
'uptime_seconds': current_time - self.system_start_time,
|
||||||
|
'total_components': len(self.metrics),
|
||||||
|
'components': components,
|
||||||
|
'system_metrics': {
|
||||||
|
'memory_mb': system_memory.rss / (1024 * 1024) if system_memory else 0,
|
||||||
|
'cpu_percent': system_cpu,
|
||||||
|
'process_id': self.process.pid
|
||||||
|
},
|
||||||
|
'recent_alerts': list(self.alert_history)[-10:], # Last 10 alerts
|
||||||
|
'recent_recoveries': list(self.recovery_actions)[-10:] # Last 10 recovery actions
|
||||||
|
}
|
||||||
|
|
||||||
|
def _monitor_loop(self):
|
||||||
|
"""Main health monitoring loop."""
|
||||||
|
logger.info("Health monitor loop started")
|
||||||
|
|
||||||
|
while self.running:
|
||||||
|
try:
|
||||||
|
start_time = time.time()
|
||||||
|
|
||||||
|
# Run all registered health checks
|
||||||
|
all_checks = []
|
||||||
|
for checker in self.health_checkers:
|
||||||
|
try:
|
||||||
|
checks = checker()
|
||||||
|
all_checks.extend(checks)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error in health checker {checker.__name__}: {e}")
|
||||||
|
|
||||||
|
# Process health checks and trigger recovery if needed
|
||||||
|
for check in all_checks:
|
||||||
|
self._process_health_check(check)
|
||||||
|
|
||||||
|
# Update system metrics
|
||||||
|
self._update_system_metrics()
|
||||||
|
|
||||||
|
# Sleep until next check
|
||||||
|
elapsed = time.time() - start_time
|
||||||
|
sleep_time = max(0, self.check_interval - elapsed)
|
||||||
|
if sleep_time > 0:
|
||||||
|
time.sleep(sleep_time)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error in health monitor loop: {e}")
|
||||||
|
time.sleep(5.0) # Fallback sleep
|
||||||
|
|
||||||
|
logger.info("Health monitor loop ended")
|
||||||
|
|
||||||
|
def _process_health_check(self, check: HealthCheck):
|
||||||
|
"""Process a health check result and trigger recovery if needed."""
|
||||||
|
with self._lock:
|
||||||
|
# Store health check
|
||||||
|
self.health_checks[check.name] = check
|
||||||
|
|
||||||
|
# Log alerts for non-healthy status
|
||||||
|
if check.status != HealthStatus.HEALTHY:
|
||||||
|
alert = {
|
||||||
|
'timestamp': check.timestamp,
|
||||||
|
'component': check.name,
|
||||||
|
'status': check.status.value,
|
||||||
|
'message': check.message,
|
||||||
|
'details': check.details
|
||||||
|
}
|
||||||
|
self.alert_history.append(alert)
|
||||||
|
|
||||||
|
logger.warning(f"Health alert [{check.status.value.upper()}] {check.name}: {check.message}")
|
||||||
|
|
||||||
|
# Trigger recovery if critical and recovery action available
|
||||||
|
if check.status == HealthStatus.CRITICAL and check.recovery_action:
|
||||||
|
self._trigger_recovery(check.name, check)
|
||||||
|
|
||||||
|
def _trigger_recovery(self, component: str, check: HealthCheck):
|
||||||
|
"""Trigger recovery action for a component."""
|
||||||
|
if component in self.recovery_callbacks:
|
||||||
|
try:
|
||||||
|
logger.info(f"Triggering recovery for {component}: {check.recovery_action}")
|
||||||
|
|
||||||
|
success = self.recovery_callbacks[component](component, check)
|
||||||
|
|
||||||
|
recovery_action = {
|
||||||
|
'timestamp': time.time(),
|
||||||
|
'component': component,
|
||||||
|
'action': check.recovery_action,
|
||||||
|
'reason': check.message,
|
||||||
|
'success': success
|
||||||
|
}
|
||||||
|
|
||||||
|
with self._lock:
|
||||||
|
self.recovery_actions.append(recovery_action)
|
||||||
|
|
||||||
|
if success:
|
||||||
|
logger.info(f"Recovery successful for {component}")
|
||||||
|
else:
|
||||||
|
logger.error(f"Recovery failed for {component}")
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error in recovery callback for {component}: {e}")
|
||||||
|
|
||||||
|
def _update_system_metrics(self):
|
||||||
|
"""Update system-level metrics."""
|
||||||
|
try:
|
||||||
|
# Update process metrics for all components
|
||||||
|
current_time = time.time()
|
||||||
|
|
||||||
|
with self._lock:
|
||||||
|
for component_id, metrics in self.metrics.items():
|
||||||
|
# Update CPU and memory if available
|
||||||
|
try:
|
||||||
|
# This is a simplified approach - in practice you'd want
|
||||||
|
# per-thread or per-component resource tracking
|
||||||
|
metrics.cpu_usage_percent = self.process.cpu_percent() / len(self.metrics)
|
||||||
|
memory_info = self.process.memory_info()
|
||||||
|
metrics.memory_usage_mb = memory_info.rss / (1024 * 1024) / len(self.metrics)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error updating system metrics: {e}")
|
||||||
|
|
||||||
|
|
||||||
|
# Global health monitor instance
|
||||||
|
health_monitor = HealthMonitor()
|
385
core/monitoring/recovery.py
Normal file
385
core/monitoring/recovery.py
Normal file
|
@ -0,0 +1,385 @@
|
||||||
|
"""
|
||||||
|
Recovery manager for automatic handling of health issues.
|
||||||
|
Provides circuit breaker patterns, automatic restarts, and graceful degradation.
|
||||||
|
"""
|
||||||
|
import time
|
||||||
|
import logging
|
||||||
|
import threading
|
||||||
|
from typing import Dict, List, Optional, Any, Callable
|
||||||
|
from dataclasses import dataclass
|
||||||
|
from enum import Enum
|
||||||
|
from collections import defaultdict, deque
|
||||||
|
|
||||||
|
from .health import HealthCheck, HealthStatus, health_monitor
|
||||||
|
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class RecoveryAction(Enum):
|
||||||
|
"""Types of recovery actions."""
|
||||||
|
RESTART_STREAM = "restart_stream"
|
||||||
|
RESTART_THREAD = "restart_thread"
|
||||||
|
CLEAR_BUFFER = "clear_buffer"
|
||||||
|
RECONNECT = "reconnect"
|
||||||
|
THROTTLE = "throttle"
|
||||||
|
DISABLE = "disable"
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class RecoveryAttempt:
|
||||||
|
"""Record of a recovery attempt."""
|
||||||
|
timestamp: float
|
||||||
|
component: str
|
||||||
|
action: RecoveryAction
|
||||||
|
reason: str
|
||||||
|
success: bool
|
||||||
|
details: Dict[str, Any] = None
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class RecoveryState:
|
||||||
|
"""Recovery state for a component - simplified without circuit breaker."""
|
||||||
|
failure_count: int = 0
|
||||||
|
success_count: int = 0
|
||||||
|
last_failure_time: Optional[float] = None
|
||||||
|
last_success_time: Optional[float] = None
|
||||||
|
|
||||||
|
|
||||||
|
class RecoveryManager:
|
||||||
|
"""Manages automatic recovery actions for health issues."""
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
self.recovery_handlers: Dict[str, Callable[[str, HealthCheck], bool]] = {}
|
||||||
|
self.recovery_states: Dict[str, RecoveryState] = {}
|
||||||
|
self.recovery_history: deque = deque(maxlen=1000)
|
||||||
|
self._lock = threading.RLock()
|
||||||
|
|
||||||
|
# Configuration - simplified without circuit breaker
|
||||||
|
self.recovery_cooldown = 30 # 30 seconds between recovery attempts
|
||||||
|
self.max_attempts_per_hour = 20 # Still limit to prevent spam, but much higher
|
||||||
|
|
||||||
|
# Track recovery attempts per component
|
||||||
|
self.recovery_attempts: Dict[str, deque] = defaultdict(lambda: deque(maxlen=50))
|
||||||
|
|
||||||
|
# Register with health monitor
|
||||||
|
health_monitor.register_recovery_callback("stream", self._handle_stream_recovery)
|
||||||
|
health_monitor.register_recovery_callback("thread", self._handle_thread_recovery)
|
||||||
|
health_monitor.register_recovery_callback("buffer", self._handle_buffer_recovery)
|
||||||
|
|
||||||
|
def register_recovery_handler(self, action: RecoveryAction, handler: Callable[[str, Dict[str, Any]], bool]):
|
||||||
|
"""
|
||||||
|
Register a recovery handler for a specific action.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
action: Type of recovery action
|
||||||
|
handler: Function that performs the recovery
|
||||||
|
"""
|
||||||
|
self.recovery_handlers[action.value] = handler
|
||||||
|
logger.info(f"Registered recovery handler for {action.value}")
|
||||||
|
|
||||||
|
def can_attempt_recovery(self, component: str) -> bool:
|
||||||
|
"""
|
||||||
|
Check if recovery can be attempted for a component.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
component: Component identifier
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
True if recovery can be attempted (always allow with minimal throttling)
|
||||||
|
"""
|
||||||
|
with self._lock:
|
||||||
|
current_time = time.time()
|
||||||
|
|
||||||
|
# Check recovery attempt rate limiting (much more permissive)
|
||||||
|
recent_attempts = [
|
||||||
|
attempt for attempt in self.recovery_attempts[component]
|
||||||
|
if current_time - attempt <= 3600 # Last hour
|
||||||
|
]
|
||||||
|
|
||||||
|
# Only block if truly excessive attempts
|
||||||
|
if len(recent_attempts) >= self.max_attempts_per_hour:
|
||||||
|
logger.warning(f"Recovery rate limit exceeded for {component} "
|
||||||
|
f"({len(recent_attempts)} attempts in last hour)")
|
||||||
|
return False
|
||||||
|
|
||||||
|
# Check cooldown period (shorter cooldown)
|
||||||
|
if recent_attempts:
|
||||||
|
last_attempt = max(recent_attempts)
|
||||||
|
if current_time - last_attempt < self.recovery_cooldown:
|
||||||
|
logger.debug(f"Recovery cooldown active for {component} "
|
||||||
|
f"(last attempt {current_time - last_attempt:.1f}s ago)")
|
||||||
|
return False
|
||||||
|
|
||||||
|
return True
|
||||||
|
|
||||||
|
def attempt_recovery(self, component: str, action: RecoveryAction, reason: str,
|
||||||
|
details: Optional[Dict[str, Any]] = None) -> bool:
|
||||||
|
"""
|
||||||
|
Attempt recovery for a component.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
component: Component identifier
|
||||||
|
action: Recovery action to perform
|
||||||
|
reason: Reason for recovery
|
||||||
|
details: Additional details
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
True if recovery was successful
|
||||||
|
"""
|
||||||
|
if not self.can_attempt_recovery(component):
|
||||||
|
return False
|
||||||
|
|
||||||
|
current_time = time.time()
|
||||||
|
|
||||||
|
logger.info(f"Attempting recovery for {component}: {action.value} ({reason})")
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Record recovery attempt
|
||||||
|
with self._lock:
|
||||||
|
self.recovery_attempts[component].append(current_time)
|
||||||
|
|
||||||
|
# Perform recovery action
|
||||||
|
success = self._execute_recovery_action(component, action, details or {})
|
||||||
|
|
||||||
|
# Record recovery result
|
||||||
|
attempt = RecoveryAttempt(
|
||||||
|
timestamp=current_time,
|
||||||
|
component=component,
|
||||||
|
action=action,
|
||||||
|
reason=reason,
|
||||||
|
success=success,
|
||||||
|
details=details
|
||||||
|
)
|
||||||
|
|
||||||
|
with self._lock:
|
||||||
|
self.recovery_history.append(attempt)
|
||||||
|
|
||||||
|
# Update recovery state
|
||||||
|
self._update_recovery_state(component, success)
|
||||||
|
|
||||||
|
if success:
|
||||||
|
logger.info(f"Recovery successful for {component}: {action.value}")
|
||||||
|
else:
|
||||||
|
logger.error(f"Recovery failed for {component}: {action.value}")
|
||||||
|
|
||||||
|
return success
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error during recovery for {component}: {e}")
|
||||||
|
self._update_recovery_state(component, False)
|
||||||
|
return False
|
||||||
|
|
||||||
|
def _execute_recovery_action(self, component: str, action: RecoveryAction,
|
||||||
|
details: Dict[str, Any]) -> bool:
|
||||||
|
"""Execute a specific recovery action."""
|
||||||
|
handler_key = action.value
|
||||||
|
|
||||||
|
if handler_key not in self.recovery_handlers:
|
||||||
|
logger.error(f"No recovery handler registered for action: {handler_key}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
try:
|
||||||
|
handler = self.recovery_handlers[handler_key]
|
||||||
|
return handler(component, details)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error executing recovery action {handler_key} for {component}: {e}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
def _update_recovery_state(self, component: str, success: bool):
|
||||||
|
"""Update recovery state based on recovery result."""
|
||||||
|
current_time = time.time()
|
||||||
|
|
||||||
|
with self._lock:
|
||||||
|
if component not in self.recovery_states:
|
||||||
|
self.recovery_states[component] = RecoveryState()
|
||||||
|
|
||||||
|
state = self.recovery_states[component]
|
||||||
|
|
||||||
|
if success:
|
||||||
|
state.success_count += 1
|
||||||
|
state.last_success_time = current_time
|
||||||
|
# Reset failure count on success
|
||||||
|
state.failure_count = max(0, state.failure_count - 1)
|
||||||
|
logger.debug(f"Recovery success for {component} (total successes: {state.success_count})")
|
||||||
|
else:
|
||||||
|
state.failure_count += 1
|
||||||
|
state.last_failure_time = current_time
|
||||||
|
logger.debug(f"Recovery failure for {component} (total failures: {state.failure_count})")
|
||||||
|
|
||||||
|
def _handle_stream_recovery(self, component: str, health_check: HealthCheck) -> bool:
|
||||||
|
"""Handle recovery for stream-related issues."""
|
||||||
|
if "frames" in health_check.name:
|
||||||
|
# Frame-related issue - restart stream
|
||||||
|
return self.attempt_recovery(
|
||||||
|
component,
|
||||||
|
RecoveryAction.RESTART_STREAM,
|
||||||
|
health_check.message,
|
||||||
|
health_check.details
|
||||||
|
)
|
||||||
|
elif "connection" in health_check.name:
|
||||||
|
# Connection issue - reconnect
|
||||||
|
return self.attempt_recovery(
|
||||||
|
component,
|
||||||
|
RecoveryAction.RECONNECT,
|
||||||
|
health_check.message,
|
||||||
|
health_check.details
|
||||||
|
)
|
||||||
|
elif "errors" in health_check.name:
|
||||||
|
# High error rate - throttle or restart
|
||||||
|
return self.attempt_recovery(
|
||||||
|
component,
|
||||||
|
RecoveryAction.THROTTLE,
|
||||||
|
health_check.message,
|
||||||
|
health_check.details
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
# Generic stream issue - restart
|
||||||
|
return self.attempt_recovery(
|
||||||
|
component,
|
||||||
|
RecoveryAction.RESTART_STREAM,
|
||||||
|
health_check.message,
|
||||||
|
health_check.details
|
||||||
|
)
|
||||||
|
|
||||||
|
def _handle_thread_recovery(self, component: str, health_check: HealthCheck) -> bool:
|
||||||
|
"""Handle recovery for thread-related issues."""
|
||||||
|
if "deadlock" in health_check.name:
|
||||||
|
# Deadlock detected - restart thread
|
||||||
|
return self.attempt_recovery(
|
||||||
|
component,
|
||||||
|
RecoveryAction.RESTART_THREAD,
|
||||||
|
health_check.message,
|
||||||
|
health_check.details
|
||||||
|
)
|
||||||
|
elif "responsive" in health_check.name:
|
||||||
|
# Thread unresponsive - restart
|
||||||
|
return self.attempt_recovery(
|
||||||
|
component,
|
||||||
|
RecoveryAction.RESTART_THREAD,
|
||||||
|
health_check.message,
|
||||||
|
health_check.details
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
# Generic thread issue - restart
|
||||||
|
return self.attempt_recovery(
|
||||||
|
component,
|
||||||
|
RecoveryAction.RESTART_THREAD,
|
||||||
|
health_check.message,
|
||||||
|
health_check.details
|
||||||
|
)
|
||||||
|
|
||||||
|
def _handle_buffer_recovery(self, component: str, health_check: HealthCheck) -> bool:
|
||||||
|
"""Handle recovery for buffer-related issues."""
|
||||||
|
# Buffer issues - clear buffer
|
||||||
|
return self.attempt_recovery(
|
||||||
|
component,
|
||||||
|
RecoveryAction.CLEAR_BUFFER,
|
||||||
|
health_check.message,
|
||||||
|
health_check.details
|
||||||
|
)
|
||||||
|
|
||||||
|
def get_recovery_stats(self) -> Dict[str, Any]:
|
||||||
|
"""Get recovery statistics."""
|
||||||
|
current_time = time.time()
|
||||||
|
|
||||||
|
with self._lock:
|
||||||
|
# Calculate stats from history
|
||||||
|
recent_recoveries = [
|
||||||
|
attempt for attempt in self.recovery_history
|
||||||
|
if current_time - attempt.timestamp <= 3600 # Last hour
|
||||||
|
]
|
||||||
|
|
||||||
|
stats_by_component = defaultdict(lambda: {
|
||||||
|
'attempts': 0,
|
||||||
|
'successes': 0,
|
||||||
|
'failures': 0,
|
||||||
|
'last_attempt': None,
|
||||||
|
'last_success': None
|
||||||
|
})
|
||||||
|
|
||||||
|
for attempt in recent_recoveries:
|
||||||
|
stats = stats_by_component[attempt.component]
|
||||||
|
stats['attempts'] += 1
|
||||||
|
|
||||||
|
if attempt.success:
|
||||||
|
stats['successes'] += 1
|
||||||
|
if not stats['last_success'] or attempt.timestamp > stats['last_success']:
|
||||||
|
stats['last_success'] = attempt.timestamp
|
||||||
|
else:
|
||||||
|
stats['failures'] += 1
|
||||||
|
|
||||||
|
if not stats['last_attempt'] or attempt.timestamp > stats['last_attempt']:
|
||||||
|
stats['last_attempt'] = attempt.timestamp
|
||||||
|
|
||||||
|
return {
|
||||||
|
'total_recoveries_last_hour': len(recent_recoveries),
|
||||||
|
'recovery_by_component': dict(stats_by_component),
|
||||||
|
'recovery_states': {
|
||||||
|
component: {
|
||||||
|
'failure_count': state.failure_count,
|
||||||
|
'success_count': state.success_count,
|
||||||
|
'last_failure_time': state.last_failure_time,
|
||||||
|
'last_success_time': state.last_success_time
|
||||||
|
}
|
||||||
|
for component, state in self.recovery_states.items()
|
||||||
|
},
|
||||||
|
'recent_history': [
|
||||||
|
{
|
||||||
|
'timestamp': attempt.timestamp,
|
||||||
|
'component': attempt.component,
|
||||||
|
'action': attempt.action.value,
|
||||||
|
'reason': attempt.reason,
|
||||||
|
'success': attempt.success
|
||||||
|
}
|
||||||
|
for attempt in list(self.recovery_history)[-10:] # Last 10 attempts
|
||||||
|
]
|
||||||
|
}
|
||||||
|
|
||||||
|
def force_recovery(self, component: str, action: RecoveryAction, reason: str = "manual") -> bool:
|
||||||
|
"""
|
||||||
|
Force recovery for a component, bypassing rate limiting.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
component: Component identifier
|
||||||
|
action: Recovery action to perform
|
||||||
|
reason: Reason for forced recovery
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
True if recovery was successful
|
||||||
|
"""
|
||||||
|
logger.info(f"Forcing recovery for {component}: {action.value} ({reason})")
|
||||||
|
|
||||||
|
current_time = time.time()
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Execute recovery action directly
|
||||||
|
success = self._execute_recovery_action(component, action, {})
|
||||||
|
|
||||||
|
# Record forced recovery
|
||||||
|
attempt = RecoveryAttempt(
|
||||||
|
timestamp=current_time,
|
||||||
|
component=component,
|
||||||
|
action=action,
|
||||||
|
reason=f"forced: {reason}",
|
||||||
|
success=success,
|
||||||
|
details={'forced': True}
|
||||||
|
)
|
||||||
|
|
||||||
|
with self._lock:
|
||||||
|
self.recovery_history.append(attempt)
|
||||||
|
self.recovery_attempts[component].append(current_time)
|
||||||
|
|
||||||
|
# Update recovery state
|
||||||
|
self._update_recovery_state(component, success)
|
||||||
|
|
||||||
|
return success
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error during forced recovery for {component}: {e}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
# Global recovery manager instance
|
||||||
|
recovery_manager = RecoveryManager()
|
351
core/monitoring/stream_health.py
Normal file
351
core/monitoring/stream_health.py
Normal file
|
@ -0,0 +1,351 @@
|
||||||
|
"""
|
||||||
|
Stream-specific health monitoring for video streams.
|
||||||
|
Tracks frame production, connection health, and stream-specific metrics.
|
||||||
|
"""
|
||||||
|
import time
|
||||||
|
import logging
|
||||||
|
import threading
|
||||||
|
import requests
|
||||||
|
from typing import Dict, Optional, List, Any
|
||||||
|
from collections import deque
|
||||||
|
from dataclasses import dataclass
|
||||||
|
|
||||||
|
from .health import HealthCheck, HealthStatus, health_monitor
|
||||||
|
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class StreamMetrics:
|
||||||
|
"""Metrics for an individual stream."""
|
||||||
|
camera_id: str
|
||||||
|
stream_type: str # 'rtsp', 'http_snapshot'
|
||||||
|
start_time: float
|
||||||
|
last_frame_time: Optional[float] = None
|
||||||
|
frame_count: int = 0
|
||||||
|
error_count: int = 0
|
||||||
|
reconnect_count: int = 0
|
||||||
|
bytes_received: int = 0
|
||||||
|
frames_per_second: float = 0.0
|
||||||
|
connection_attempts: int = 0
|
||||||
|
last_connection_test: Optional[float] = None
|
||||||
|
connection_healthy: bool = True
|
||||||
|
last_error: Optional[str] = None
|
||||||
|
last_error_time: Optional[float] = None
|
||||||
|
|
||||||
|
|
||||||
|
class StreamHealthTracker:
|
||||||
|
"""Tracks health for individual video streams."""
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
self.streams: Dict[str, StreamMetrics] = {}
|
||||||
|
self._lock = threading.RLock()
|
||||||
|
|
||||||
|
# Configuration
|
||||||
|
self.connection_test_interval = 300 # Test connection every 5 minutes
|
||||||
|
self.frame_timeout_warning = 120 # Warn if no frames for 2 minutes
|
||||||
|
self.frame_timeout_critical = 300 # Critical if no frames for 5 minutes
|
||||||
|
self.error_rate_threshold = 0.1 # 10% error rate threshold
|
||||||
|
|
||||||
|
# Register with health monitor
|
||||||
|
health_monitor.register_health_checker(self._perform_health_checks)
|
||||||
|
|
||||||
|
def register_stream(self, camera_id: str, stream_type: str, source_url: Optional[str] = None):
|
||||||
|
"""Register a new stream for monitoring."""
|
||||||
|
with self._lock:
|
||||||
|
if camera_id not in self.streams:
|
||||||
|
self.streams[camera_id] = StreamMetrics(
|
||||||
|
camera_id=camera_id,
|
||||||
|
stream_type=stream_type,
|
||||||
|
start_time=time.time()
|
||||||
|
)
|
||||||
|
logger.info(f"Registered stream for monitoring: {camera_id} ({stream_type})")
|
||||||
|
|
||||||
|
# Update health monitor metrics
|
||||||
|
health_monitor.update_metrics(
|
||||||
|
camera_id,
|
||||||
|
thread_alive=True,
|
||||||
|
connection_healthy=True
|
||||||
|
)
|
||||||
|
|
||||||
|
def unregister_stream(self, camera_id: str):
|
||||||
|
"""Unregister a stream from monitoring."""
|
||||||
|
with self._lock:
|
||||||
|
if camera_id in self.streams:
|
||||||
|
del self.streams[camera_id]
|
||||||
|
logger.info(f"Unregistered stream from monitoring: {camera_id}")
|
||||||
|
|
||||||
|
def report_frame_received(self, camera_id: str, frame_size_bytes: int = 0):
|
||||||
|
"""Report that a frame was received."""
|
||||||
|
current_time = time.time()
|
||||||
|
|
||||||
|
with self._lock:
|
||||||
|
if camera_id not in self.streams:
|
||||||
|
logger.warning(f"Frame received for unregistered stream: {camera_id}")
|
||||||
|
return
|
||||||
|
|
||||||
|
stream = self.streams[camera_id]
|
||||||
|
|
||||||
|
# Update frame metrics
|
||||||
|
if stream.last_frame_time:
|
||||||
|
interval = current_time - stream.last_frame_time
|
||||||
|
# Calculate FPS as moving average
|
||||||
|
if stream.frames_per_second == 0:
|
||||||
|
stream.frames_per_second = 1.0 / interval if interval > 0 else 0
|
||||||
|
else:
|
||||||
|
new_fps = 1.0 / interval if interval > 0 else 0
|
||||||
|
stream.frames_per_second = (stream.frames_per_second * 0.9) + (new_fps * 0.1)
|
||||||
|
|
||||||
|
stream.last_frame_time = current_time
|
||||||
|
stream.frame_count += 1
|
||||||
|
stream.bytes_received += frame_size_bytes
|
||||||
|
|
||||||
|
# Report to health monitor
|
||||||
|
health_monitor.report_frame_received(camera_id)
|
||||||
|
health_monitor.update_metrics(
|
||||||
|
camera_id,
|
||||||
|
frame_count=stream.frame_count,
|
||||||
|
avg_frame_interval=1.0 / stream.frames_per_second if stream.frames_per_second > 0 else 0,
|
||||||
|
last_frame_time=current_time
|
||||||
|
)
|
||||||
|
|
||||||
|
def report_error(self, camera_id: str, error_message: str):
|
||||||
|
"""Report an error for a stream."""
|
||||||
|
current_time = time.time()
|
||||||
|
|
||||||
|
with self._lock:
|
||||||
|
if camera_id not in self.streams:
|
||||||
|
logger.warning(f"Error reported for unregistered stream: {camera_id}")
|
||||||
|
return
|
||||||
|
|
||||||
|
stream = self.streams[camera_id]
|
||||||
|
stream.error_count += 1
|
||||||
|
stream.last_error = error_message
|
||||||
|
stream.last_error_time = current_time
|
||||||
|
|
||||||
|
# Report to health monitor
|
||||||
|
health_monitor.report_error(camera_id, "stream_error")
|
||||||
|
health_monitor.update_metrics(
|
||||||
|
camera_id,
|
||||||
|
error_count=stream.error_count
|
||||||
|
)
|
||||||
|
|
||||||
|
logger.debug(f"Error reported for stream {camera_id}: {error_message}")
|
||||||
|
|
||||||
|
def report_reconnect(self, camera_id: str, reason: str = "unknown"):
|
||||||
|
"""Report that a stream reconnected."""
|
||||||
|
current_time = time.time()
|
||||||
|
|
||||||
|
with self._lock:
|
||||||
|
if camera_id not in self.streams:
|
||||||
|
logger.warning(f"Reconnect reported for unregistered stream: {camera_id}")
|
||||||
|
return
|
||||||
|
|
||||||
|
stream = self.streams[camera_id]
|
||||||
|
stream.reconnect_count += 1
|
||||||
|
|
||||||
|
# Report to health monitor
|
||||||
|
health_monitor.report_restart(camera_id)
|
||||||
|
health_monitor.update_metrics(
|
||||||
|
camera_id,
|
||||||
|
restart_count=stream.reconnect_count
|
||||||
|
)
|
||||||
|
|
||||||
|
logger.info(f"Reconnect reported for stream {camera_id}: {reason}")
|
||||||
|
|
||||||
|
def report_connection_attempt(self, camera_id: str, success: bool):
|
||||||
|
"""Report a connection attempt."""
|
||||||
|
with self._lock:
|
||||||
|
if camera_id not in self.streams:
|
||||||
|
return
|
||||||
|
|
||||||
|
stream = self.streams[camera_id]
|
||||||
|
stream.connection_attempts += 1
|
||||||
|
stream.connection_healthy = success
|
||||||
|
|
||||||
|
# Report to health monitor
|
||||||
|
health_monitor.update_metrics(
|
||||||
|
camera_id,
|
||||||
|
connection_healthy=success
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_http_connection(self, camera_id: str, url: str) -> bool:
|
||||||
|
"""Test HTTP connection health for snapshot streams."""
|
||||||
|
try:
|
||||||
|
# Quick HEAD request to test connectivity
|
||||||
|
response = requests.head(url, timeout=5, verify=False)
|
||||||
|
success = response.status_code in [200, 404] # 404 might be normal for some cameras
|
||||||
|
|
||||||
|
self.report_connection_attempt(camera_id, success)
|
||||||
|
|
||||||
|
if success:
|
||||||
|
logger.debug(f"Connection test passed for {camera_id}")
|
||||||
|
else:
|
||||||
|
logger.warning(f"Connection test failed for {camera_id}: HTTP {response.status_code}")
|
||||||
|
|
||||||
|
return success
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(f"Connection test failed for {camera_id}: {e}")
|
||||||
|
self.report_connection_attempt(camera_id, False)
|
||||||
|
return False
|
||||||
|
|
||||||
|
def get_stream_metrics(self, camera_id: str) -> Optional[Dict[str, Any]]:
|
||||||
|
"""Get metrics for a specific stream."""
|
||||||
|
with self._lock:
|
||||||
|
if camera_id not in self.streams:
|
||||||
|
return None
|
||||||
|
|
||||||
|
stream = self.streams[camera_id]
|
||||||
|
current_time = time.time()
|
||||||
|
|
||||||
|
# Calculate derived metrics
|
||||||
|
uptime = current_time - stream.start_time
|
||||||
|
frame_age = current_time - stream.last_frame_time if stream.last_frame_time else None
|
||||||
|
error_rate = stream.error_count / max(1, stream.frame_count)
|
||||||
|
|
||||||
|
return {
|
||||||
|
'camera_id': camera_id,
|
||||||
|
'stream_type': stream.stream_type,
|
||||||
|
'uptime_seconds': uptime,
|
||||||
|
'frame_count': stream.frame_count,
|
||||||
|
'frames_per_second': stream.frames_per_second,
|
||||||
|
'bytes_received': stream.bytes_received,
|
||||||
|
'error_count': stream.error_count,
|
||||||
|
'error_rate': error_rate,
|
||||||
|
'reconnect_count': stream.reconnect_count,
|
||||||
|
'connection_attempts': stream.connection_attempts,
|
||||||
|
'connection_healthy': stream.connection_healthy,
|
||||||
|
'last_frame_age_seconds': frame_age,
|
||||||
|
'last_error': stream.last_error,
|
||||||
|
'last_error_time': stream.last_error_time
|
||||||
|
}
|
||||||
|
|
||||||
|
def get_all_metrics(self) -> Dict[str, Dict[str, Any]]:
|
||||||
|
"""Get metrics for all streams."""
|
||||||
|
with self._lock:
|
||||||
|
return {
|
||||||
|
camera_id: self.get_stream_metrics(camera_id)
|
||||||
|
for camera_id in self.streams.keys()
|
||||||
|
}
|
||||||
|
|
||||||
|
def _perform_health_checks(self) -> List[HealthCheck]:
|
||||||
|
"""Perform health checks for all streams."""
|
||||||
|
checks = []
|
||||||
|
current_time = time.time()
|
||||||
|
|
||||||
|
with self._lock:
|
||||||
|
for camera_id, stream in self.streams.items():
|
||||||
|
checks.extend(self._check_stream_health(camera_id, stream, current_time))
|
||||||
|
|
||||||
|
return checks
|
||||||
|
|
||||||
|
def _check_stream_health(self, camera_id: str, stream: StreamMetrics, current_time: float) -> List[HealthCheck]:
|
||||||
|
"""Perform health checks for a single stream."""
|
||||||
|
checks = []
|
||||||
|
|
||||||
|
# Check frame freshness
|
||||||
|
if stream.last_frame_time:
|
||||||
|
frame_age = current_time - stream.last_frame_time
|
||||||
|
|
||||||
|
if frame_age > self.frame_timeout_critical:
|
||||||
|
checks.append(HealthCheck(
|
||||||
|
name=f"stream_{camera_id}_frames",
|
||||||
|
status=HealthStatus.CRITICAL,
|
||||||
|
message=f"No frames for {frame_age:.1f}s (critical threshold: {self.frame_timeout_critical}s)",
|
||||||
|
details={
|
||||||
|
'frame_age': frame_age,
|
||||||
|
'threshold': self.frame_timeout_critical,
|
||||||
|
'last_frame_time': stream.last_frame_time
|
||||||
|
},
|
||||||
|
recovery_action="restart_stream"
|
||||||
|
))
|
||||||
|
elif frame_age > self.frame_timeout_warning:
|
||||||
|
checks.append(HealthCheck(
|
||||||
|
name=f"stream_{camera_id}_frames",
|
||||||
|
status=HealthStatus.WARNING,
|
||||||
|
message=f"Frames aging: {frame_age:.1f}s (warning threshold: {self.frame_timeout_warning}s)",
|
||||||
|
details={
|
||||||
|
'frame_age': frame_age,
|
||||||
|
'threshold': self.frame_timeout_warning,
|
||||||
|
'last_frame_time': stream.last_frame_time
|
||||||
|
}
|
||||||
|
))
|
||||||
|
else:
|
||||||
|
# No frames received yet
|
||||||
|
startup_time = current_time - stream.start_time
|
||||||
|
if startup_time > 60: # Allow 1 minute for initial connection
|
||||||
|
checks.append(HealthCheck(
|
||||||
|
name=f"stream_{camera_id}_startup",
|
||||||
|
status=HealthStatus.CRITICAL,
|
||||||
|
message=f"No frames received since startup {startup_time:.1f}s ago",
|
||||||
|
details={
|
||||||
|
'startup_time': startup_time,
|
||||||
|
'start_time': stream.start_time
|
||||||
|
},
|
||||||
|
recovery_action="restart_stream"
|
||||||
|
))
|
||||||
|
|
||||||
|
# Check error rate
|
||||||
|
if stream.frame_count > 10: # Need sufficient samples
|
||||||
|
error_rate = stream.error_count / stream.frame_count
|
||||||
|
if error_rate > self.error_rate_threshold:
|
||||||
|
checks.append(HealthCheck(
|
||||||
|
name=f"stream_{camera_id}_errors",
|
||||||
|
status=HealthStatus.WARNING,
|
||||||
|
message=f"High error rate: {error_rate:.1%} ({stream.error_count}/{stream.frame_count})",
|
||||||
|
details={
|
||||||
|
'error_rate': error_rate,
|
||||||
|
'error_count': stream.error_count,
|
||||||
|
'frame_count': stream.frame_count,
|
||||||
|
'last_error': stream.last_error
|
||||||
|
}
|
||||||
|
))
|
||||||
|
|
||||||
|
# Check connection health
|
||||||
|
if not stream.connection_healthy:
|
||||||
|
checks.append(HealthCheck(
|
||||||
|
name=f"stream_{camera_id}_connection",
|
||||||
|
status=HealthStatus.WARNING,
|
||||||
|
message="Connection unhealthy (last test failed)",
|
||||||
|
details={
|
||||||
|
'connection_attempts': stream.connection_attempts,
|
||||||
|
'last_connection_test': stream.last_connection_test
|
||||||
|
}
|
||||||
|
))
|
||||||
|
|
||||||
|
# Check excessive reconnects
|
||||||
|
uptime_hours = (current_time - stream.start_time) / 3600
|
||||||
|
if uptime_hours > 1 and stream.reconnect_count > 5: # More than 5 reconnects per hour
|
||||||
|
reconnect_rate = stream.reconnect_count / uptime_hours
|
||||||
|
checks.append(HealthCheck(
|
||||||
|
name=f"stream_{camera_id}_stability",
|
||||||
|
status=HealthStatus.WARNING,
|
||||||
|
message=f"Frequent reconnects: {reconnect_rate:.1f}/hour ({stream.reconnect_count} total)",
|
||||||
|
details={
|
||||||
|
'reconnect_rate': reconnect_rate,
|
||||||
|
'reconnect_count': stream.reconnect_count,
|
||||||
|
'uptime_hours': uptime_hours
|
||||||
|
}
|
||||||
|
))
|
||||||
|
|
||||||
|
# Check frame rate health
|
||||||
|
if stream.last_frame_time and stream.frames_per_second > 0:
|
||||||
|
expected_fps = 6.0 # Expected FPS for streams
|
||||||
|
if stream.frames_per_second < expected_fps * 0.5: # Less than 50% of expected
|
||||||
|
checks.append(HealthCheck(
|
||||||
|
name=f"stream_{camera_id}_framerate",
|
||||||
|
status=HealthStatus.WARNING,
|
||||||
|
message=f"Low frame rate: {stream.frames_per_second:.1f} fps (expected: ~{expected_fps} fps)",
|
||||||
|
details={
|
||||||
|
'current_fps': stream.frames_per_second,
|
||||||
|
'expected_fps': expected_fps
|
||||||
|
}
|
||||||
|
))
|
||||||
|
|
||||||
|
return checks
|
||||||
|
|
||||||
|
|
||||||
|
# Global stream health tracker instance
|
||||||
|
stream_health_tracker = StreamHealthTracker()
|
381
core/monitoring/thread_health.py
Normal file
381
core/monitoring/thread_health.py
Normal file
|
@ -0,0 +1,381 @@
|
||||||
|
"""
|
||||||
|
Thread health monitoring for detecting unresponsive and deadlocked threads.
|
||||||
|
Provides thread liveness detection and responsiveness testing.
|
||||||
|
"""
|
||||||
|
import time
|
||||||
|
import threading
|
||||||
|
import logging
|
||||||
|
import signal
|
||||||
|
import traceback
|
||||||
|
from typing import Dict, List, Optional, Any, Callable
|
||||||
|
from dataclasses import dataclass
|
||||||
|
from collections import defaultdict
|
||||||
|
|
||||||
|
from .health import HealthCheck, HealthStatus, health_monitor
|
||||||
|
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class ThreadInfo:
|
||||||
|
"""Information about a monitored thread."""
|
||||||
|
thread_id: int
|
||||||
|
thread_name: str
|
||||||
|
start_time: float
|
||||||
|
last_heartbeat: float
|
||||||
|
heartbeat_count: int = 0
|
||||||
|
is_responsive: bool = True
|
||||||
|
last_activity: Optional[str] = None
|
||||||
|
stack_traces: List[str] = None
|
||||||
|
|
||||||
|
|
||||||
|
class ThreadHealthMonitor:
|
||||||
|
"""Monitors thread health and responsiveness."""
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
self.monitored_threads: Dict[int, ThreadInfo] = {}
|
||||||
|
self.heartbeat_callbacks: Dict[int, Callable[[], bool]] = {}
|
||||||
|
self._lock = threading.RLock()
|
||||||
|
|
||||||
|
# Configuration
|
||||||
|
self.heartbeat_timeout = 60.0 # 1 minute without heartbeat = unresponsive
|
||||||
|
self.responsiveness_test_interval = 30.0 # Test responsiveness every 30 seconds
|
||||||
|
self.stack_trace_count = 5 # Keep last 5 stack traces for analysis
|
||||||
|
|
||||||
|
# Register with health monitor
|
||||||
|
health_monitor.register_health_checker(self._perform_health_checks)
|
||||||
|
|
||||||
|
# Enable periodic responsiveness testing
|
||||||
|
self.test_thread = threading.Thread(target=self._responsiveness_test_loop, daemon=True)
|
||||||
|
self.test_thread.start()
|
||||||
|
|
||||||
|
def register_thread(self, thread: threading.Thread, heartbeat_callback: Optional[Callable[[], bool]] = None):
|
||||||
|
"""
|
||||||
|
Register a thread for monitoring.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
thread: Thread to monitor
|
||||||
|
heartbeat_callback: Optional callback to test thread responsiveness
|
||||||
|
"""
|
||||||
|
with self._lock:
|
||||||
|
thread_info = ThreadInfo(
|
||||||
|
thread_id=thread.ident,
|
||||||
|
thread_name=thread.name,
|
||||||
|
start_time=time.time(),
|
||||||
|
last_heartbeat=time.time()
|
||||||
|
)
|
||||||
|
|
||||||
|
self.monitored_threads[thread.ident] = thread_info
|
||||||
|
|
||||||
|
if heartbeat_callback:
|
||||||
|
self.heartbeat_callbacks[thread.ident] = heartbeat_callback
|
||||||
|
|
||||||
|
logger.info(f"Registered thread for monitoring: {thread.name} (ID: {thread.ident})")
|
||||||
|
|
||||||
|
def unregister_thread(self, thread_id: int):
|
||||||
|
"""Unregister a thread from monitoring."""
|
||||||
|
with self._lock:
|
||||||
|
if thread_id in self.monitored_threads:
|
||||||
|
thread_name = self.monitored_threads[thread_id].thread_name
|
||||||
|
del self.monitored_threads[thread_id]
|
||||||
|
|
||||||
|
if thread_id in self.heartbeat_callbacks:
|
||||||
|
del self.heartbeat_callbacks[thread_id]
|
||||||
|
|
||||||
|
logger.info(f"Unregistered thread from monitoring: {thread_name} (ID: {thread_id})")
|
||||||
|
|
||||||
|
def heartbeat(self, thread_id: Optional[int] = None, activity: Optional[str] = None):
|
||||||
|
"""
|
||||||
|
Report thread heartbeat.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
thread_id: Thread ID (uses current thread if None)
|
||||||
|
activity: Description of current activity
|
||||||
|
"""
|
||||||
|
if thread_id is None:
|
||||||
|
thread_id = threading.current_thread().ident
|
||||||
|
|
||||||
|
current_time = time.time()
|
||||||
|
|
||||||
|
with self._lock:
|
||||||
|
if thread_id in self.monitored_threads:
|
||||||
|
thread_info = self.monitored_threads[thread_id]
|
||||||
|
thread_info.last_heartbeat = current_time
|
||||||
|
thread_info.heartbeat_count += 1
|
||||||
|
thread_info.is_responsive = True
|
||||||
|
|
||||||
|
if activity:
|
||||||
|
thread_info.last_activity = activity
|
||||||
|
|
||||||
|
# Report to health monitor
|
||||||
|
health_monitor.update_metrics(
|
||||||
|
f"thread_{thread_info.thread_name}",
|
||||||
|
thread_alive=True,
|
||||||
|
last_frame_time=current_time
|
||||||
|
)
|
||||||
|
|
||||||
|
def get_thread_info(self, thread_id: int) -> Optional[Dict[str, Any]]:
|
||||||
|
"""Get information about a monitored thread."""
|
||||||
|
with self._lock:
|
||||||
|
if thread_id not in self.monitored_threads:
|
||||||
|
return None
|
||||||
|
|
||||||
|
thread_info = self.monitored_threads[thread_id]
|
||||||
|
current_time = time.time()
|
||||||
|
|
||||||
|
return {
|
||||||
|
'thread_id': thread_id,
|
||||||
|
'thread_name': thread_info.thread_name,
|
||||||
|
'uptime_seconds': current_time - thread_info.start_time,
|
||||||
|
'last_heartbeat_age': current_time - thread_info.last_heartbeat,
|
||||||
|
'heartbeat_count': thread_info.heartbeat_count,
|
||||||
|
'is_responsive': thread_info.is_responsive,
|
||||||
|
'last_activity': thread_info.last_activity,
|
||||||
|
'stack_traces': thread_info.stack_traces or []
|
||||||
|
}
|
||||||
|
|
||||||
|
def get_all_thread_info(self) -> Dict[int, Dict[str, Any]]:
|
||||||
|
"""Get information about all monitored threads."""
|
||||||
|
with self._lock:
|
||||||
|
return {
|
||||||
|
thread_id: self.get_thread_info(thread_id)
|
||||||
|
for thread_id in self.monitored_threads.keys()
|
||||||
|
}
|
||||||
|
|
||||||
|
def test_thread_responsiveness(self, thread_id: int) -> bool:
|
||||||
|
"""
|
||||||
|
Test if a thread is responsive by calling its heartbeat callback.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
thread_id: ID of thread to test
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
True if thread responds within timeout
|
||||||
|
"""
|
||||||
|
if thread_id not in self.heartbeat_callbacks:
|
||||||
|
return True # Can't test if no callback provided
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Call the heartbeat callback with a timeout
|
||||||
|
callback = self.heartbeat_callbacks[thread_id]
|
||||||
|
|
||||||
|
# This is a simple approach - in practice you might want to use
|
||||||
|
# threading.Timer or asyncio for more sophisticated timeout handling
|
||||||
|
start_time = time.time()
|
||||||
|
result = callback()
|
||||||
|
response_time = time.time() - start_time
|
||||||
|
|
||||||
|
with self._lock:
|
||||||
|
if thread_id in self.monitored_threads:
|
||||||
|
self.monitored_threads[thread_id].is_responsive = result
|
||||||
|
|
||||||
|
if response_time > 5.0: # Slow response
|
||||||
|
logger.warning(f"Thread {thread_id} slow response: {response_time:.1f}s")
|
||||||
|
|
||||||
|
return result
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error testing thread {thread_id} responsiveness: {e}")
|
||||||
|
with self._lock:
|
||||||
|
if thread_id in self.monitored_threads:
|
||||||
|
self.monitored_threads[thread_id].is_responsive = False
|
||||||
|
return False
|
||||||
|
|
||||||
|
def capture_stack_trace(self, thread_id: int) -> Optional[str]:
|
||||||
|
"""
|
||||||
|
Capture stack trace for a thread.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
thread_id: ID of thread to capture
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Stack trace string or None if not available
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
# Get all frames for all threads
|
||||||
|
frames = dict(threading._current_frames())
|
||||||
|
|
||||||
|
if thread_id not in frames:
|
||||||
|
return None
|
||||||
|
|
||||||
|
# Format stack trace
|
||||||
|
frame = frames[thread_id]
|
||||||
|
stack_trace = ''.join(traceback.format_stack(frame))
|
||||||
|
|
||||||
|
# Store in thread info
|
||||||
|
with self._lock:
|
||||||
|
if thread_id in self.monitored_threads:
|
||||||
|
thread_info = self.monitored_threads[thread_id]
|
||||||
|
if thread_info.stack_traces is None:
|
||||||
|
thread_info.stack_traces = []
|
||||||
|
|
||||||
|
thread_info.stack_traces.append(f"{time.time()}: {stack_trace}")
|
||||||
|
|
||||||
|
# Keep only last N stack traces
|
||||||
|
if len(thread_info.stack_traces) > self.stack_trace_count:
|
||||||
|
thread_info.stack_traces = thread_info.stack_traces[-self.stack_trace_count:]
|
||||||
|
|
||||||
|
return stack_trace
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error capturing stack trace for thread {thread_id}: {e}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
def detect_deadlocks(self) -> List[Dict[str, Any]]:
|
||||||
|
"""
|
||||||
|
Attempt to detect potential deadlocks by analyzing thread states.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
List of potential deadlock scenarios
|
||||||
|
"""
|
||||||
|
deadlocks = []
|
||||||
|
current_time = time.time()
|
||||||
|
|
||||||
|
with self._lock:
|
||||||
|
# Look for threads that haven't had heartbeats for a long time
|
||||||
|
# and are supposedly alive
|
||||||
|
for thread_id, thread_info in self.monitored_threads.items():
|
||||||
|
heartbeat_age = current_time - thread_info.last_heartbeat
|
||||||
|
|
||||||
|
if heartbeat_age > self.heartbeat_timeout * 2: # Double the timeout
|
||||||
|
# Check if thread still exists
|
||||||
|
thread_exists = any(
|
||||||
|
t.ident == thread_id and t.is_alive()
|
||||||
|
for t in threading.enumerate()
|
||||||
|
)
|
||||||
|
|
||||||
|
if thread_exists:
|
||||||
|
# Thread exists but not responding - potential deadlock
|
||||||
|
stack_trace = self.capture_stack_trace(thread_id)
|
||||||
|
|
||||||
|
deadlock_info = {
|
||||||
|
'thread_id': thread_id,
|
||||||
|
'thread_name': thread_info.thread_name,
|
||||||
|
'heartbeat_age': heartbeat_age,
|
||||||
|
'last_activity': thread_info.last_activity,
|
||||||
|
'stack_trace': stack_trace,
|
||||||
|
'detection_time': current_time
|
||||||
|
}
|
||||||
|
|
||||||
|
deadlocks.append(deadlock_info)
|
||||||
|
logger.warning(f"Potential deadlock detected in thread {thread_info.thread_name}")
|
||||||
|
|
||||||
|
return deadlocks
|
||||||
|
|
||||||
|
def _responsiveness_test_loop(self):
|
||||||
|
"""Background loop to test thread responsiveness."""
|
||||||
|
logger.info("Thread responsiveness testing started")
|
||||||
|
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
time.sleep(self.responsiveness_test_interval)
|
||||||
|
|
||||||
|
with self._lock:
|
||||||
|
thread_ids = list(self.monitored_threads.keys())
|
||||||
|
|
||||||
|
for thread_id in thread_ids:
|
||||||
|
try:
|
||||||
|
self.test_thread_responsiveness(thread_id)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error testing thread {thread_id}: {e}")
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error in responsiveness test loop: {e}")
|
||||||
|
time.sleep(10.0) # Fallback sleep
|
||||||
|
|
||||||
|
def _perform_health_checks(self) -> List[HealthCheck]:
|
||||||
|
"""Perform health checks for all monitored threads."""
|
||||||
|
checks = []
|
||||||
|
current_time = time.time()
|
||||||
|
|
||||||
|
with self._lock:
|
||||||
|
for thread_id, thread_info in self.monitored_threads.items():
|
||||||
|
checks.extend(self._check_thread_health(thread_id, thread_info, current_time))
|
||||||
|
|
||||||
|
# Check for deadlocks
|
||||||
|
deadlocks = self.detect_deadlocks()
|
||||||
|
for deadlock in deadlocks:
|
||||||
|
checks.append(HealthCheck(
|
||||||
|
name=f"deadlock_detection_{deadlock['thread_id']}",
|
||||||
|
status=HealthStatus.CRITICAL,
|
||||||
|
message=f"Potential deadlock in thread {deadlock['thread_name']} "
|
||||||
|
f"(unresponsive for {deadlock['heartbeat_age']:.1f}s)",
|
||||||
|
details=deadlock,
|
||||||
|
recovery_action="restart_thread"
|
||||||
|
))
|
||||||
|
|
||||||
|
return checks
|
||||||
|
|
||||||
|
def _check_thread_health(self, thread_id: int, thread_info: ThreadInfo, current_time: float) -> List[HealthCheck]:
|
||||||
|
"""Perform health checks for a single thread."""
|
||||||
|
checks = []
|
||||||
|
|
||||||
|
# Check if thread still exists
|
||||||
|
thread_exists = any(
|
||||||
|
t.ident == thread_id and t.is_alive()
|
||||||
|
for t in threading.enumerate()
|
||||||
|
)
|
||||||
|
|
||||||
|
if not thread_exists:
|
||||||
|
checks.append(HealthCheck(
|
||||||
|
name=f"thread_{thread_info.thread_name}_alive",
|
||||||
|
status=HealthStatus.CRITICAL,
|
||||||
|
message=f"Thread {thread_info.thread_name} is no longer alive",
|
||||||
|
details={
|
||||||
|
'thread_id': thread_id,
|
||||||
|
'uptime': current_time - thread_info.start_time,
|
||||||
|
'last_heartbeat': thread_info.last_heartbeat
|
||||||
|
},
|
||||||
|
recovery_action="restart_thread"
|
||||||
|
))
|
||||||
|
return checks
|
||||||
|
|
||||||
|
# Check heartbeat freshness
|
||||||
|
heartbeat_age = current_time - thread_info.last_heartbeat
|
||||||
|
|
||||||
|
if heartbeat_age > self.heartbeat_timeout:
|
||||||
|
checks.append(HealthCheck(
|
||||||
|
name=f"thread_{thread_info.thread_name}_responsive",
|
||||||
|
status=HealthStatus.CRITICAL,
|
||||||
|
message=f"Thread {thread_info.thread_name} unresponsive for {heartbeat_age:.1f}s",
|
||||||
|
details={
|
||||||
|
'thread_id': thread_id,
|
||||||
|
'heartbeat_age': heartbeat_age,
|
||||||
|
'heartbeat_count': thread_info.heartbeat_count,
|
||||||
|
'last_activity': thread_info.last_activity,
|
||||||
|
'is_responsive': thread_info.is_responsive
|
||||||
|
},
|
||||||
|
recovery_action="restart_thread"
|
||||||
|
))
|
||||||
|
elif heartbeat_age > self.heartbeat_timeout * 0.5: # Warning at 50% of timeout
|
||||||
|
checks.append(HealthCheck(
|
||||||
|
name=f"thread_{thread_info.thread_name}_responsive",
|
||||||
|
status=HealthStatus.WARNING,
|
||||||
|
message=f"Thread {thread_info.thread_name} slow heartbeat: {heartbeat_age:.1f}s",
|
||||||
|
details={
|
||||||
|
'thread_id': thread_id,
|
||||||
|
'heartbeat_age': heartbeat_age,
|
||||||
|
'heartbeat_count': thread_info.heartbeat_count,
|
||||||
|
'last_activity': thread_info.last_activity,
|
||||||
|
'is_responsive': thread_info.is_responsive
|
||||||
|
}
|
||||||
|
))
|
||||||
|
|
||||||
|
# Check responsiveness test results
|
||||||
|
if not thread_info.is_responsive:
|
||||||
|
checks.append(HealthCheck(
|
||||||
|
name=f"thread_{thread_info.thread_name}_callback",
|
||||||
|
status=HealthStatus.WARNING,
|
||||||
|
message=f"Thread {thread_info.thread_name} failed responsiveness test",
|
||||||
|
details={
|
||||||
|
'thread_id': thread_id,
|
||||||
|
'last_activity': thread_info.last_activity
|
||||||
|
}
|
||||||
|
))
|
||||||
|
|
||||||
|
return checks
|
||||||
|
|
||||||
|
|
||||||
|
# Global thread health monitor instance
|
||||||
|
thread_health_monitor = ThreadHealthMonitor()
|
|
@ -2,13 +2,12 @@
|
||||||
Streaming system for RTSP and HTTP camera feeds.
|
Streaming system for RTSP and HTTP camera feeds.
|
||||||
Provides modular frame readers, buffers, and stream management.
|
Provides modular frame readers, buffers, and stream management.
|
||||||
"""
|
"""
|
||||||
from .readers import RTSPReader, HTTPSnapshotReader, FFmpegRTSPReader
|
from .readers import HTTPSnapshotReader, FFmpegRTSPReader
|
||||||
from .buffers import FrameBuffer, CacheBuffer, shared_frame_buffer, shared_cache_buffer
|
from .buffers import FrameBuffer, CacheBuffer, shared_frame_buffer, shared_cache_buffer
|
||||||
from .manager import StreamManager, StreamConfig, SubscriptionInfo, shared_stream_manager, initialize_stream_manager
|
from .manager import StreamManager, StreamConfig, SubscriptionInfo, shared_stream_manager, initialize_stream_manager
|
||||||
|
|
||||||
__all__ = [
|
__all__ = [
|
||||||
# Readers
|
# Readers
|
||||||
'RTSPReader',
|
|
||||||
'HTTPSnapshotReader',
|
'HTTPSnapshotReader',
|
||||||
'FFmpegRTSPReader',
|
'FFmpegRTSPReader',
|
||||||
|
|
||||||
|
|
|
@ -5,11 +5,13 @@ Optimized for 1280x720@6fps RTSP and 2560x1440 HTTP snapshots.
|
||||||
import logging
|
import logging
|
||||||
import threading
|
import threading
|
||||||
import time
|
import time
|
||||||
|
import queue
|
||||||
|
import asyncio
|
||||||
from typing import Dict, Set, Optional, List, Any
|
from typing import Dict, Set, Optional, List, Any
|
||||||
from dataclasses import dataclass
|
from dataclasses import dataclass
|
||||||
from collections import defaultdict
|
from collections import defaultdict
|
||||||
|
|
||||||
from .readers import RTSPReader, HTTPSnapshotReader, FFmpegRTSPReader
|
from .readers import HTTPSnapshotReader, FFmpegRTSPReader
|
||||||
from .buffers import shared_cache_buffer
|
from .buffers import shared_cache_buffer
|
||||||
from ..tracking.integration import TrackingPipelineIntegration
|
from ..tracking.integration import TrackingPipelineIntegration
|
||||||
|
|
||||||
|
@ -50,6 +52,65 @@ class StreamManager:
|
||||||
self._camera_subscribers: Dict[str, Set[str]] = defaultdict(set) # camera_id -> set of subscription_ids
|
self._camera_subscribers: Dict[str, Set[str]] = defaultdict(set) # camera_id -> set of subscription_ids
|
||||||
self._lock = threading.RLock()
|
self._lock = threading.RLock()
|
||||||
|
|
||||||
|
# Fair tracking queue system - per camera queues
|
||||||
|
self._tracking_queues: Dict[str, queue.Queue] = {} # camera_id -> queue
|
||||||
|
self._tracking_workers = []
|
||||||
|
self._stop_workers = threading.Event()
|
||||||
|
self._dropped_frame_counts: Dict[str, int] = {} # per-camera drop counts
|
||||||
|
|
||||||
|
# Round-robin scheduling state
|
||||||
|
self._camera_list = [] # Ordered list of active cameras
|
||||||
|
self._camera_round_robin_index = 0
|
||||||
|
self._round_robin_lock = threading.Lock()
|
||||||
|
|
||||||
|
# Start worker threads for tracking processing
|
||||||
|
num_workers = min(4, max_streams // 2 + 1) # Scale with streams
|
||||||
|
for i in range(num_workers):
|
||||||
|
worker = threading.Thread(
|
||||||
|
target=self._tracking_worker_loop,
|
||||||
|
name=f"TrackingWorker-{i}",
|
||||||
|
daemon=True
|
||||||
|
)
|
||||||
|
worker.start()
|
||||||
|
self._tracking_workers.append(worker)
|
||||||
|
|
||||||
|
logger.info(f"Started {num_workers} tracking worker threads")
|
||||||
|
|
||||||
|
def _ensure_camera_queue(self, camera_id: str):
|
||||||
|
"""Ensure a tracking queue exists for the camera."""
|
||||||
|
if camera_id not in self._tracking_queues:
|
||||||
|
self._tracking_queues[camera_id] = queue.Queue(maxsize=10) # 10 frames per camera
|
||||||
|
self._dropped_frame_counts[camera_id] = 0
|
||||||
|
|
||||||
|
with self._round_robin_lock:
|
||||||
|
if camera_id not in self._camera_list:
|
||||||
|
self._camera_list.append(camera_id)
|
||||||
|
logger.info(f"Created tracking queue for camera {camera_id}")
|
||||||
|
else:
|
||||||
|
logger.debug(f"Camera {camera_id} already has tracking queue")
|
||||||
|
|
||||||
|
def _remove_camera_queue(self, camera_id: str):
|
||||||
|
"""Remove tracking queue for a camera that's no longer active."""
|
||||||
|
if camera_id in self._tracking_queues:
|
||||||
|
# Clear any remaining items
|
||||||
|
while not self._tracking_queues[camera_id].empty():
|
||||||
|
try:
|
||||||
|
self._tracking_queues[camera_id].get_nowait()
|
||||||
|
except queue.Empty:
|
||||||
|
break
|
||||||
|
|
||||||
|
del self._tracking_queues[camera_id]
|
||||||
|
del self._dropped_frame_counts[camera_id]
|
||||||
|
|
||||||
|
with self._round_robin_lock:
|
||||||
|
if camera_id in self._camera_list:
|
||||||
|
self._camera_list.remove(camera_id)
|
||||||
|
# Reset index if needed
|
||||||
|
if self._camera_round_robin_index >= len(self._camera_list):
|
||||||
|
self._camera_round_robin_index = 0
|
||||||
|
|
||||||
|
logger.info(f"Removed tracking queue for camera {camera_id}")
|
||||||
|
|
||||||
def add_subscription(self, subscription_id: str, stream_config: StreamConfig,
|
def add_subscription(self, subscription_id: str, stream_config: StreamConfig,
|
||||||
crop_coords: Optional[tuple] = None,
|
crop_coords: Optional[tuple] = None,
|
||||||
model_id: Optional[str] = None,
|
model_id: Optional[str] = None,
|
||||||
|
@ -93,6 +154,10 @@ class StreamManager:
|
||||||
if not success:
|
if not success:
|
||||||
self._remove_subscription_internal(subscription_id)
|
self._remove_subscription_internal(subscription_id)
|
||||||
return False
|
return False
|
||||||
|
else:
|
||||||
|
# Stream already exists, but ensure queue exists too
|
||||||
|
logger.info(f"Stream already exists for {camera_id}, ensuring queue exists")
|
||||||
|
self._ensure_camera_queue(camera_id)
|
||||||
|
|
||||||
logger.info(f"Added subscription {subscription_id} for camera {camera_id} "
|
logger.info(f"Added subscription {subscription_id} for camera {camera_id} "
|
||||||
f"({len(self._camera_subscribers[camera_id])} total subscribers)")
|
f"({len(self._camera_subscribers[camera_id])} total subscribers)")
|
||||||
|
@ -139,6 +204,7 @@ class StreamManager:
|
||||||
reader.set_frame_callback(self._frame_callback)
|
reader.set_frame_callback(self._frame_callback)
|
||||||
reader.start()
|
reader.start()
|
||||||
self._streams[camera_id] = reader
|
self._streams[camera_id] = reader
|
||||||
|
self._ensure_camera_queue(camera_id) # Create tracking queue
|
||||||
logger.info(f"\033[92m[RTSP] {camera_id} connected\033[0m")
|
logger.info(f"\033[92m[RTSP] {camera_id} connected\033[0m")
|
||||||
|
|
||||||
elif stream_config.snapshot_url:
|
elif stream_config.snapshot_url:
|
||||||
|
@ -153,6 +219,7 @@ class StreamManager:
|
||||||
reader.set_frame_callback(self._frame_callback)
|
reader.set_frame_callback(self._frame_callback)
|
||||||
reader.start()
|
reader.start()
|
||||||
self._streams[camera_id] = reader
|
self._streams[camera_id] = reader
|
||||||
|
self._ensure_camera_queue(camera_id) # Create tracking queue
|
||||||
logger.info(f"\033[92m[HTTP] {camera_id} connected\033[0m")
|
logger.info(f"\033[92m[HTTP] {camera_id} connected\033[0m")
|
||||||
|
|
||||||
else:
|
else:
|
||||||
|
@ -171,6 +238,7 @@ class StreamManager:
|
||||||
try:
|
try:
|
||||||
self._streams[camera_id].stop()
|
self._streams[camera_id].stop()
|
||||||
del self._streams[camera_id]
|
del self._streams[camera_id]
|
||||||
|
self._remove_camera_queue(camera_id) # Remove tracking queue
|
||||||
# DON'T clear frames - they should persist until replaced
|
# DON'T clear frames - they should persist until replaced
|
||||||
# shared_cache_buffer.clear_camera(camera_id) # REMOVED - frames should persist
|
# shared_cache_buffer.clear_camera(camera_id) # REMOVED - frames should persist
|
||||||
logger.info(f"Stopped stream for camera {camera_id} (frames preserved in buffer)")
|
logger.info(f"Stopped stream for camera {camera_id} (frames preserved in buffer)")
|
||||||
|
@ -193,8 +261,19 @@ class StreamManager:
|
||||||
available_cameras = shared_cache_buffer.frame_buffer.get_camera_list()
|
available_cameras = shared_cache_buffer.frame_buffer.get_camera_list()
|
||||||
logger.info(f"\033[96m[BUFFER] {len(available_cameras)} active cameras: {', '.join(available_cameras)}\033[0m")
|
logger.info(f"\033[96m[BUFFER] {len(available_cameras)} active cameras: {', '.join(available_cameras)}\033[0m")
|
||||||
|
|
||||||
# Process tracking for subscriptions with tracking integration
|
# Queue for tracking processing (non-blocking) - route to camera-specific queue
|
||||||
self._process_tracking_for_camera(camera_id, frame)
|
if camera_id in self._tracking_queues:
|
||||||
|
try:
|
||||||
|
self._tracking_queues[camera_id].put_nowait({
|
||||||
|
'frame': frame,
|
||||||
|
'timestamp': time.time()
|
||||||
|
})
|
||||||
|
except queue.Full:
|
||||||
|
# Drop frame if camera queue is full (maintain real-time)
|
||||||
|
self._dropped_frame_counts[camera_id] += 1
|
||||||
|
|
||||||
|
if self._dropped_frame_counts[camera_id] % 50 == 0:
|
||||||
|
logger.warning(f"Dropped {self._dropped_frame_counts[camera_id]} frames for camera {camera_id} due to full queue")
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Error in frame callback for camera {camera_id}: {e}")
|
logger.error(f"Error in frame callback for camera {camera_id}: {e}")
|
||||||
|
@ -251,6 +330,134 @@ class StreamManager:
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Error processing tracking for camera {camera_id}: {e}")
|
logger.error(f"Error processing tracking for camera {camera_id}: {e}")
|
||||||
|
|
||||||
|
def _tracking_worker_loop(self):
|
||||||
|
"""Worker thread loop for round-robin processing of camera queues."""
|
||||||
|
logger.info(f"Tracking worker {threading.current_thread().name} started")
|
||||||
|
|
||||||
|
consecutive_empty = 0
|
||||||
|
max_consecutive_empty = 10 # Sleep if all cameras empty this many times
|
||||||
|
|
||||||
|
while not self._stop_workers.is_set():
|
||||||
|
try:
|
||||||
|
# Get next camera in round-robin fashion
|
||||||
|
camera_id, item = self._get_next_camera_item()
|
||||||
|
|
||||||
|
if camera_id is None:
|
||||||
|
# No cameras have items, sleep briefly
|
||||||
|
consecutive_empty += 1
|
||||||
|
if consecutive_empty >= max_consecutive_empty:
|
||||||
|
time.sleep(0.1) # Sleep 100ms if nothing to process
|
||||||
|
consecutive_empty = 0
|
||||||
|
continue
|
||||||
|
|
||||||
|
consecutive_empty = 0 # Reset counter when we find work
|
||||||
|
|
||||||
|
frame = item['frame']
|
||||||
|
timestamp = item['timestamp']
|
||||||
|
|
||||||
|
# Check if frame is too old (drop if > 1 second old)
|
||||||
|
age = time.time() - timestamp
|
||||||
|
if age > 1.0:
|
||||||
|
logger.debug(f"Dropping old frame for {camera_id} (age: {age:.2f}s)")
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Process tracking for this camera's frame
|
||||||
|
self._process_tracking_for_camera_sync(camera_id, frame)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error in tracking worker: {e}", exc_info=True)
|
||||||
|
|
||||||
|
logger.info(f"Tracking worker {threading.current_thread().name} stopped")
|
||||||
|
|
||||||
|
def _get_next_camera_item(self):
|
||||||
|
"""Get next item from camera queues using round-robin scheduling."""
|
||||||
|
with self._round_robin_lock:
|
||||||
|
# Get current list of cameras from actual tracking queues (central state)
|
||||||
|
camera_list = list(self._tracking_queues.keys())
|
||||||
|
|
||||||
|
if not camera_list:
|
||||||
|
return None, None
|
||||||
|
|
||||||
|
attempts = 0
|
||||||
|
max_attempts = len(camera_list)
|
||||||
|
|
||||||
|
while attempts < max_attempts:
|
||||||
|
# Get current camera using round-robin index
|
||||||
|
if self._camera_round_robin_index >= len(camera_list):
|
||||||
|
self._camera_round_robin_index = 0
|
||||||
|
|
||||||
|
camera_id = camera_list[self._camera_round_robin_index]
|
||||||
|
|
||||||
|
# Move to next camera for next call
|
||||||
|
self._camera_round_robin_index = (self._camera_round_robin_index + 1) % len(camera_list)
|
||||||
|
|
||||||
|
# Try to get item from this camera's queue
|
||||||
|
try:
|
||||||
|
item = self._tracking_queues[camera_id].get_nowait()
|
||||||
|
return camera_id, item
|
||||||
|
except queue.Empty:
|
||||||
|
pass # Try next camera
|
||||||
|
|
||||||
|
attempts += 1
|
||||||
|
|
||||||
|
return None, None # All cameras empty
|
||||||
|
|
||||||
|
def _process_tracking_for_camera_sync(self, camera_id: str, frame):
|
||||||
|
"""Synchronous version of tracking processing for worker threads."""
|
||||||
|
try:
|
||||||
|
with self._lock:
|
||||||
|
subscription_ids = list(self._camera_subscribers.get(camera_id, []))
|
||||||
|
|
||||||
|
for subscription_id in subscription_ids:
|
||||||
|
subscription_info = self._subscriptions.get(subscription_id)
|
||||||
|
|
||||||
|
if not subscription_info:
|
||||||
|
logger.warning(f"No subscription info found for {subscription_id}")
|
||||||
|
continue
|
||||||
|
|
||||||
|
if not subscription_info.tracking_integration:
|
||||||
|
logger.debug(f"No tracking integration for {subscription_id} (camera {camera_id}), skipping inference")
|
||||||
|
continue
|
||||||
|
|
||||||
|
display_id = subscription_id.split(';')[0] if ';' in subscription_id else subscription_id
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Run async tracking in thread's event loop
|
||||||
|
loop = asyncio.new_event_loop()
|
||||||
|
asyncio.set_event_loop(loop)
|
||||||
|
try:
|
||||||
|
result = loop.run_until_complete(
|
||||||
|
subscription_info.tracking_integration.process_frame(
|
||||||
|
frame, display_id, subscription_id
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
# Log tracking results
|
||||||
|
if result:
|
||||||
|
tracked_count = len(result.get('tracked_vehicles', []))
|
||||||
|
validated_vehicle = result.get('validated_vehicle')
|
||||||
|
pipeline_result = result.get('pipeline_result')
|
||||||
|
|
||||||
|
if tracked_count > 0:
|
||||||
|
logger.info(f"[Tracking] {camera_id}: {tracked_count} vehicles tracked")
|
||||||
|
|
||||||
|
if validated_vehicle:
|
||||||
|
logger.info(f"[Tracking] {camera_id}: Vehicle {validated_vehicle['track_id']} "
|
||||||
|
f"validated as {validated_vehicle['state']} "
|
||||||
|
f"(confidence: {validated_vehicle['confidence']:.2f})")
|
||||||
|
|
||||||
|
if pipeline_result:
|
||||||
|
logger.info(f"[Pipeline] {camera_id}: {pipeline_result.get('status', 'unknown')} - "
|
||||||
|
f"{pipeline_result.get('message', 'no message')}")
|
||||||
|
finally:
|
||||||
|
loop.close()
|
||||||
|
|
||||||
|
except Exception as track_e:
|
||||||
|
logger.error(f"Error in tracking for {subscription_id}: {track_e}")
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error processing tracking for camera {camera_id}: {e}")
|
||||||
|
|
||||||
def get_frame(self, camera_id: str, crop_coords: Optional[tuple] = None):
|
def get_frame(self, camera_id: str, crop_coords: Optional[tuple] = None):
|
||||||
"""Get the latest frame for a camera with optional cropping."""
|
"""Get the latest frame for a camera with optional cropping."""
|
||||||
return shared_cache_buffer.get_frame(camera_id, crop_coords)
|
return shared_cache_buffer.get_frame(camera_id, crop_coords)
|
||||||
|
@ -366,6 +573,30 @@ class StreamManager:
|
||||||
|
|
||||||
def stop_all(self):
|
def stop_all(self):
|
||||||
"""Stop all streams and clear all subscriptions."""
|
"""Stop all streams and clear all subscriptions."""
|
||||||
|
# Signal workers to stop
|
||||||
|
self._stop_workers.set()
|
||||||
|
|
||||||
|
# Clear all camera queues
|
||||||
|
for camera_id, camera_queue in list(self._tracking_queues.items()):
|
||||||
|
while not camera_queue.empty():
|
||||||
|
try:
|
||||||
|
camera_queue.get_nowait()
|
||||||
|
except queue.Empty:
|
||||||
|
break
|
||||||
|
|
||||||
|
# Wait for workers to finish
|
||||||
|
for worker in self._tracking_workers:
|
||||||
|
worker.join(timeout=2.0)
|
||||||
|
|
||||||
|
# Clear queue management structures
|
||||||
|
self._tracking_queues.clear()
|
||||||
|
self._dropped_frame_counts.clear()
|
||||||
|
with self._round_robin_lock:
|
||||||
|
self._camera_list.clear()
|
||||||
|
self._camera_round_robin_index = 0
|
||||||
|
|
||||||
|
logger.info("Stopped all tracking worker threads")
|
||||||
|
|
||||||
with self._lock:
|
with self._lock:
|
||||||
# Stop all streams
|
# Stop all streams
|
||||||
for camera_id in list(self._streams.keys()):
|
for camera_id in list(self._streams.keys()):
|
||||||
|
@ -380,13 +611,20 @@ class StreamManager:
|
||||||
|
|
||||||
def set_session_id(self, display_id: str, session_id: str):
|
def set_session_id(self, display_id: str, session_id: str):
|
||||||
"""Set session ID for tracking integration."""
|
"""Set session ID for tracking integration."""
|
||||||
|
# Ensure session_id is always a string for consistent type handling
|
||||||
|
session_id = str(session_id) if session_id is not None else None
|
||||||
with self._lock:
|
with self._lock:
|
||||||
for subscription_info in self._subscriptions.values():
|
for subscription_info in self._subscriptions.values():
|
||||||
# Check if this subscription matches the display_id
|
# Check if this subscription matches the display_id
|
||||||
subscription_display_id = subscription_info.subscription_id.split(';')[0]
|
subscription_display_id = subscription_info.subscription_id.split(';')[0]
|
||||||
if subscription_display_id == display_id and subscription_info.tracking_integration:
|
if subscription_display_id == display_id and subscription_info.tracking_integration:
|
||||||
subscription_info.tracking_integration.set_session_id(display_id, session_id)
|
# Pass the full subscription_id (displayId;cameraId) to the tracking integration
|
||||||
logger.debug(f"Set session {session_id} for display {display_id}")
|
subscription_info.tracking_integration.set_session_id(
|
||||||
|
display_id,
|
||||||
|
session_id,
|
||||||
|
subscription_id=subscription_info.subscription_id
|
||||||
|
)
|
||||||
|
logger.debug(f"Set session {session_id} for display {display_id} with subscription {subscription_info.subscription_id}")
|
||||||
|
|
||||||
def clear_session_id(self, session_id: str):
|
def clear_session_id(self, session_id: str):
|
||||||
"""Clear session ID from the specific tracking integration handling this session."""
|
"""Clear session ID from the specific tracking integration handling this session."""
|
||||||
|
|
|
@ -1,786 +0,0 @@
|
||||||
"""
|
|
||||||
Frame readers for RTSP streams and HTTP snapshots.
|
|
||||||
Optimized for 1280x720@6fps RTSP and 2560x1440 HTTP snapshots.
|
|
||||||
"""
|
|
||||||
import cv2
|
|
||||||
import logging
|
|
||||||
import time
|
|
||||||
import threading
|
|
||||||
import requests
|
|
||||||
import numpy as np
|
|
||||||
import os
|
|
||||||
import subprocess
|
|
||||||
# import fcntl # No longer needed with atomic file operations
|
|
||||||
from typing import Optional, Callable
|
|
||||||
# Removed watchdog imports - no longer using file watching
|
|
||||||
|
|
||||||
# Suppress FFMPEG/H.264 error messages if needed
|
|
||||||
# Set this environment variable to reduce noise from decoder errors
|
|
||||||
os.environ["OPENCV_LOG_LEVEL"] = "ERROR"
|
|
||||||
os.environ["OPENCV_FFMPEG_LOGLEVEL"] = "-8" # Suppress FFMPEG warnings
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
# Color codes for pretty logging
|
|
||||||
class Colors:
|
|
||||||
GREEN = '\033[92m'
|
|
||||||
YELLOW = '\033[93m'
|
|
||||||
RED = '\033[91m'
|
|
||||||
BLUE = '\033[94m'
|
|
||||||
PURPLE = '\033[95m'
|
|
||||||
CYAN = '\033[96m'
|
|
||||||
WHITE = '\033[97m'
|
|
||||||
BOLD = '\033[1m'
|
|
||||||
END = '\033[0m'
|
|
||||||
|
|
||||||
def log_success(camera_id: str, message: str):
|
|
||||||
"""Log success messages in green"""
|
|
||||||
logger.info(f"{Colors.GREEN}[{camera_id}] {message}{Colors.END}")
|
|
||||||
|
|
||||||
def log_warning(camera_id: str, message: str):
|
|
||||||
"""Log warnings in yellow"""
|
|
||||||
logger.warning(f"{Colors.YELLOW}[{camera_id}] {message}{Colors.END}")
|
|
||||||
|
|
||||||
def log_error(camera_id: str, message: str):
|
|
||||||
"""Log errors in red"""
|
|
||||||
logger.error(f"{Colors.RED}[{camera_id}] {message}{Colors.END}")
|
|
||||||
|
|
||||||
def log_info(camera_id: str, message: str):
|
|
||||||
"""Log info in cyan"""
|
|
||||||
logger.info(f"{Colors.CYAN}[{camera_id}] {message}{Colors.END}")
|
|
||||||
|
|
||||||
# Removed watchdog logging configuration - no longer using file watching
|
|
||||||
|
|
||||||
|
|
||||||
# Removed FrameFileHandler - no longer using file watching
|
|
||||||
|
|
||||||
|
|
||||||
class FFmpegRTSPReader:
|
|
||||||
"""RTSP stream reader using subprocess FFmpeg piping frames directly to buffer."""
|
|
||||||
|
|
||||||
def __init__(self, camera_id: str, rtsp_url: str, max_retries: int = 3):
|
|
||||||
self.camera_id = camera_id
|
|
||||||
self.rtsp_url = rtsp_url
|
|
||||||
self.max_retries = max_retries
|
|
||||||
self.process = None
|
|
||||||
self.stop_event = threading.Event()
|
|
||||||
self.thread = None
|
|
||||||
self.frame_callback: Optional[Callable] = None
|
|
||||||
|
|
||||||
# Expected stream specs (for reference, actual dimensions read from PPM header)
|
|
||||||
self.width = 1280
|
|
||||||
self.height = 720
|
|
||||||
|
|
||||||
def set_frame_callback(self, callback: Callable[[str, np.ndarray], None]):
|
|
||||||
"""Set callback function to handle captured frames."""
|
|
||||||
self.frame_callback = callback
|
|
||||||
|
|
||||||
def start(self):
|
|
||||||
"""Start the FFmpeg subprocess reader."""
|
|
||||||
if self.thread and self.thread.is_alive():
|
|
||||||
logger.warning(f"FFmpeg reader for {self.camera_id} already running")
|
|
||||||
return
|
|
||||||
|
|
||||||
self.stop_event.clear()
|
|
||||||
self.thread = threading.Thread(target=self._read_frames, daemon=True)
|
|
||||||
self.thread.start()
|
|
||||||
log_success(self.camera_id, "Stream started")
|
|
||||||
|
|
||||||
def stop(self):
|
|
||||||
"""Stop the FFmpeg subprocess reader."""
|
|
||||||
self.stop_event.set()
|
|
||||||
if self.process:
|
|
||||||
self.process.terminate()
|
|
||||||
try:
|
|
||||||
self.process.wait(timeout=5)
|
|
||||||
except subprocess.TimeoutExpired:
|
|
||||||
self.process.kill()
|
|
||||||
if self.thread:
|
|
||||||
self.thread.join(timeout=5.0)
|
|
||||||
log_info(self.camera_id, "Stream stopped")
|
|
||||||
|
|
||||||
# Removed _probe_stream_info - BMP headers contain dimensions
|
|
||||||
|
|
||||||
def _start_ffmpeg_process(self):
|
|
||||||
"""Start FFmpeg subprocess outputting BMP frames to stdout pipe."""
|
|
||||||
cmd = [
|
|
||||||
'ffmpeg',
|
|
||||||
# DO NOT REMOVE
|
|
||||||
'-hwaccel', 'cuda',
|
|
||||||
'-hwaccel_device', '0',
|
|
||||||
'-rtsp_transport', 'tcp',
|
|
||||||
'-i', self.rtsp_url,
|
|
||||||
'-f', 'image2pipe', # Output images to pipe
|
|
||||||
'-vcodec', 'bmp', # BMP format with header containing dimensions
|
|
||||||
# Use native stream resolution and framerate
|
|
||||||
'-an', # No audio
|
|
||||||
'-' # Output to stdout
|
|
||||||
]
|
|
||||||
|
|
||||||
try:
|
|
||||||
# Start FFmpeg with stdout pipe to read frames directly
|
|
||||||
self.process = subprocess.Popen(
|
|
||||||
cmd,
|
|
||||||
stdout=subprocess.PIPE, # Capture stdout for frame data
|
|
||||||
stderr=subprocess.DEVNULL,
|
|
||||||
bufsize=0 # Unbuffered for real-time processing
|
|
||||||
)
|
|
||||||
return True
|
|
||||||
except Exception as e:
|
|
||||||
log_error(self.camera_id, f"FFmpeg startup failed: {e}")
|
|
||||||
return False
|
|
||||||
|
|
||||||
def _read_bmp_frame(self, pipe):
|
|
||||||
"""Read BMP frame from pipe - BMP header contains dimensions."""
|
|
||||||
try:
|
|
||||||
# Read BMP header (14 bytes file header + 40 bytes info header = 54 bytes minimum)
|
|
||||||
header_data = b''
|
|
||||||
bytes_to_read = 54
|
|
||||||
|
|
||||||
while len(header_data) < bytes_to_read:
|
|
||||||
chunk = pipe.read(bytes_to_read - len(header_data))
|
|
||||||
if not chunk:
|
|
||||||
return None # Silent end of stream
|
|
||||||
header_data += chunk
|
|
||||||
|
|
||||||
# Parse BMP header
|
|
||||||
if header_data[:2] != b'BM':
|
|
||||||
return None # Invalid format, skip frame silently
|
|
||||||
|
|
||||||
# Extract file size from header (bytes 2-5)
|
|
||||||
import struct
|
|
||||||
file_size = struct.unpack('<L', header_data[2:6])[0]
|
|
||||||
|
|
||||||
# Extract width and height from info header (bytes 18-21 and 22-25)
|
|
||||||
width = struct.unpack('<L', header_data[18:22])[0]
|
|
||||||
height = struct.unpack('<L', header_data[22:26])[0]
|
|
||||||
|
|
||||||
# Read remaining file data
|
|
||||||
remaining_size = file_size - 54
|
|
||||||
remaining_data = b''
|
|
||||||
|
|
||||||
while len(remaining_data) < remaining_size:
|
|
||||||
chunk = pipe.read(remaining_size - len(remaining_data))
|
|
||||||
if not chunk:
|
|
||||||
return None # Stream ended silently
|
|
||||||
remaining_data += chunk
|
|
||||||
|
|
||||||
# Complete BMP data
|
|
||||||
bmp_data = header_data + remaining_data
|
|
||||||
|
|
||||||
# Use OpenCV to decode BMP directly from memory
|
|
||||||
frame_array = np.frombuffer(bmp_data, dtype=np.uint8)
|
|
||||||
frame = cv2.imdecode(frame_array, cv2.IMREAD_COLOR)
|
|
||||||
|
|
||||||
if frame is None:
|
|
||||||
return None # Decode failed silently
|
|
||||||
|
|
||||||
return frame
|
|
||||||
|
|
||||||
except Exception:
|
|
||||||
return None # Error reading frame silently
|
|
||||||
|
|
||||||
def _read_frames(self):
|
|
||||||
"""Read frames directly from FFmpeg stdout pipe."""
|
|
||||||
frame_count = 0
|
|
||||||
last_log_time = time.time()
|
|
||||||
|
|
||||||
while not self.stop_event.is_set():
|
|
||||||
try:
|
|
||||||
# Start FFmpeg if not running
|
|
||||||
if not self.process or self.process.poll() is not None:
|
|
||||||
if self.process and self.process.poll() is not None:
|
|
||||||
log_warning(self.camera_id, "Stream disconnected, reconnecting...")
|
|
||||||
|
|
||||||
if not self._start_ffmpeg_process():
|
|
||||||
time.sleep(5.0)
|
|
||||||
continue
|
|
||||||
|
|
||||||
# Read frames directly from FFmpeg stdout
|
|
||||||
try:
|
|
||||||
if self.process and self.process.stdout:
|
|
||||||
# Read BMP frame data
|
|
||||||
frame = self._read_bmp_frame(self.process.stdout)
|
|
||||||
if frame is None:
|
|
||||||
continue
|
|
||||||
|
|
||||||
# Call frame callback
|
|
||||||
if self.frame_callback:
|
|
||||||
self.frame_callback(self.camera_id, frame)
|
|
||||||
|
|
||||||
frame_count += 1
|
|
||||||
|
|
||||||
# Log progress every 60 seconds (quieter)
|
|
||||||
current_time = time.time()
|
|
||||||
if current_time - last_log_time >= 60:
|
|
||||||
log_success(self.camera_id, f"{frame_count} frames captured ({frame.shape[1]}x{frame.shape[0]})")
|
|
||||||
last_log_time = current_time
|
|
||||||
|
|
||||||
except Exception:
|
|
||||||
# Process might have died, let it restart on next iteration
|
|
||||||
if self.process:
|
|
||||||
self.process.terminate()
|
|
||||||
self.process = None
|
|
||||||
time.sleep(1.0)
|
|
||||||
|
|
||||||
except Exception:
|
|
||||||
time.sleep(1.0)
|
|
||||||
|
|
||||||
# Cleanup
|
|
||||||
if self.process:
|
|
||||||
self.process.terminate()
|
|
||||||
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
|
|
||||||
class RTSPReader:
|
|
||||||
"""RTSP stream frame reader optimized for 1280x720 @ 6fps streams."""
|
|
||||||
|
|
||||||
def __init__(self, camera_id: str, rtsp_url: str, max_retries: int = 3):
|
|
||||||
self.camera_id = camera_id
|
|
||||||
self.rtsp_url = rtsp_url
|
|
||||||
self.max_retries = max_retries
|
|
||||||
self.cap = None
|
|
||||||
self.stop_event = threading.Event()
|
|
||||||
self.thread = None
|
|
||||||
self.frame_callback: Optional[Callable] = None
|
|
||||||
|
|
||||||
# Expected stream specifications
|
|
||||||
self.expected_width = 1280
|
|
||||||
self.expected_height = 720
|
|
||||||
self.expected_fps = 6
|
|
||||||
|
|
||||||
# Frame processing parameters
|
|
||||||
self.error_recovery_delay = 5.0 # Increased from 2.0 for stability
|
|
||||||
self.max_consecutive_errors = 30 # Increased from 10 to handle network jitter
|
|
||||||
self.stream_timeout = 30.0
|
|
||||||
|
|
||||||
def set_frame_callback(self, callback: Callable[[str, np.ndarray], None]):
|
|
||||||
"""Set callback function to handle captured frames."""
|
|
||||||
self.frame_callback = callback
|
|
||||||
|
|
||||||
def start(self):
|
|
||||||
"""Start the RTSP reader thread."""
|
|
||||||
if self.thread and self.thread.is_alive():
|
|
||||||
logger.warning(f"RTSP reader for {self.camera_id} already running")
|
|
||||||
return
|
|
||||||
|
|
||||||
self.stop_event.clear()
|
|
||||||
self.thread = threading.Thread(target=self._read_frames, daemon=True)
|
|
||||||
self.thread.start()
|
|
||||||
logger.info(f"Started RTSP reader for camera {self.camera_id}")
|
|
||||||
|
|
||||||
def stop(self):
|
|
||||||
"""Stop the RTSP reader thread."""
|
|
||||||
self.stop_event.set()
|
|
||||||
if self.thread:
|
|
||||||
self.thread.join(timeout=5.0)
|
|
||||||
if self.cap:
|
|
||||||
self.cap.release()
|
|
||||||
logger.info(f"Stopped RTSP reader for camera {self.camera_id}")
|
|
||||||
|
|
||||||
def _read_frames(self):
|
|
||||||
"""Main frame reading loop with H.264 error recovery."""
|
|
||||||
consecutive_errors = 0
|
|
||||||
frame_count = 0
|
|
||||||
last_log_time = time.time()
|
|
||||||
last_successful_frame_time = time.time()
|
|
||||||
|
|
||||||
while not self.stop_event.is_set():
|
|
||||||
try:
|
|
||||||
# Initialize/reinitialize capture if needed
|
|
||||||
if not self.cap or not self.cap.isOpened():
|
|
||||||
if not self._initialize_capture():
|
|
||||||
time.sleep(self.error_recovery_delay)
|
|
||||||
continue
|
|
||||||
last_successful_frame_time = time.time()
|
|
||||||
|
|
||||||
# Check for stream timeout
|
|
||||||
if time.time() - last_successful_frame_time > self.stream_timeout:
|
|
||||||
logger.warning(f"Camera {self.camera_id}: Stream timeout, reinitializing")
|
|
||||||
self._reinitialize_capture()
|
|
||||||
last_successful_frame_time = time.time()
|
|
||||||
continue
|
|
||||||
|
|
||||||
# Read frame immediately without rate limiting for minimum latency
|
|
||||||
try:
|
|
||||||
ret, frame = self.cap.read()
|
|
||||||
if ret and frame is None:
|
|
||||||
# Grab succeeded but retrieve failed - decoder issue
|
|
||||||
logger.error(f"Camera {self.camera_id}: Frame grab OK but decode failed")
|
|
||||||
except Exception as read_error:
|
|
||||||
logger.error(f"Camera {self.camera_id}: cap.read() threw exception: {type(read_error).__name__}: {read_error}")
|
|
||||||
ret, frame = False, None
|
|
||||||
|
|
||||||
if not ret or frame is None:
|
|
||||||
consecutive_errors += 1
|
|
||||||
|
|
||||||
# Enhanced logging to diagnose the issue
|
|
||||||
logger.error(f"Camera {self.camera_id}: cap.read() failed - ret={ret}, frame={frame is not None}")
|
|
||||||
|
|
||||||
# Try to get more info from the capture
|
|
||||||
try:
|
|
||||||
if self.cap and self.cap.isOpened():
|
|
||||||
backend = self.cap.getBackendName()
|
|
||||||
pos_frames = self.cap.get(cv2.CAP_PROP_POS_FRAMES)
|
|
||||||
logger.error(f"Camera {self.camera_id}: Capture open, backend: {backend}, pos_frames: {pos_frames}")
|
|
||||||
else:
|
|
||||||
logger.error(f"Camera {self.camera_id}: Capture is closed or None!")
|
|
||||||
except Exception as info_error:
|
|
||||||
logger.error(f"Camera {self.camera_id}: Error getting capture info: {type(info_error).__name__}: {info_error}")
|
|
||||||
|
|
||||||
if consecutive_errors >= self.max_consecutive_errors:
|
|
||||||
logger.error(f"Camera {self.camera_id}: Too many consecutive errors ({consecutive_errors}), reinitializing")
|
|
||||||
self._reinitialize_capture()
|
|
||||||
consecutive_errors = 0
|
|
||||||
time.sleep(self.error_recovery_delay)
|
|
||||||
else:
|
|
||||||
# Skip corrupted frame and continue with exponential backoff
|
|
||||||
if consecutive_errors <= 5:
|
|
||||||
logger.debug(f"Camera {self.camera_id}: Frame read failed (error {consecutive_errors})")
|
|
||||||
elif consecutive_errors % 10 == 0: # Log every 10th error after 5
|
|
||||||
logger.warning(f"Camera {self.camera_id}: Continuing frame read failures (error {consecutive_errors})")
|
|
||||||
|
|
||||||
# Exponential backoff with cap at 1 second
|
|
||||||
sleep_time = min(0.1 * (1.5 ** min(consecutive_errors, 10)), 1.0)
|
|
||||||
time.sleep(sleep_time)
|
|
||||||
continue
|
|
||||||
|
|
||||||
# Accept any valid frame dimensions - don't force specific resolution
|
|
||||||
if frame.shape[1] <= 0 or frame.shape[0] <= 0:
|
|
||||||
consecutive_errors += 1
|
|
||||||
continue
|
|
||||||
|
|
||||||
# Check for corrupted frames (all black, all white, excessive noise)
|
|
||||||
if self._is_frame_corrupted(frame):
|
|
||||||
logger.debug(f"Camera {self.camera_id}: Corrupted frame detected, skipping")
|
|
||||||
consecutive_errors += 1
|
|
||||||
continue
|
|
||||||
|
|
||||||
# Frame is valid
|
|
||||||
consecutive_errors = 0
|
|
||||||
frame_count += 1
|
|
||||||
last_successful_frame_time = time.time()
|
|
||||||
|
|
||||||
# Call frame callback
|
|
||||||
if self.frame_callback:
|
|
||||||
try:
|
|
||||||
self.frame_callback(self.camera_id, frame)
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Camera {self.camera_id}: Frame callback error: {e}")
|
|
||||||
|
|
||||||
# Log progress every 30 seconds
|
|
||||||
current_time = time.time()
|
|
||||||
if current_time - last_log_time >= 30:
|
|
||||||
logger.info(f"Camera {self.camera_id}: {frame_count} frames processed")
|
|
||||||
last_log_time = current_time
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Camera {self.camera_id}: Error in frame reading loop: {e}")
|
|
||||||
consecutive_errors += 1
|
|
||||||
if consecutive_errors >= self.max_consecutive_errors:
|
|
||||||
self._reinitialize_capture()
|
|
||||||
consecutive_errors = 0
|
|
||||||
time.sleep(self.error_recovery_delay)
|
|
||||||
|
|
||||||
# Cleanup
|
|
||||||
if self.cap:
|
|
||||||
self.cap.release()
|
|
||||||
logger.info(f"RTSP reader thread ended for camera {self.camera_id}")
|
|
||||||
|
|
||||||
def _initialize_capture(self) -> bool:
|
|
||||||
"""Initialize video capture with FFmpeg hardware acceleration (CUVID/NVDEC) for 1280x720@6fps."""
|
|
||||||
try:
|
|
||||||
# Release previous capture if exists
|
|
||||||
if self.cap:
|
|
||||||
self.cap.release()
|
|
||||||
time.sleep(0.5)
|
|
||||||
|
|
||||||
logger.info(f"Initializing capture for camera {self.camera_id} with FFmpeg hardware acceleration")
|
|
||||||
hw_accel_success = False
|
|
||||||
|
|
||||||
# Method 1: Try OpenCV CUDA VideoReader (if built with CUVID support)
|
|
||||||
if not hw_accel_success:
|
|
||||||
try:
|
|
||||||
# Check if OpenCV was built with CUDA codec support
|
|
||||||
build_info = cv2.getBuildInformation()
|
|
||||||
if 'cudacodec' in build_info or 'CUVID' in build_info:
|
|
||||||
logger.info(f"Attempting OpenCV CUDA VideoReader for camera {self.camera_id}")
|
|
||||||
|
|
||||||
# Use OpenCV's CUDA backend
|
|
||||||
self.cap = cv2.VideoCapture(self.rtsp_url, cv2.CAP_FFMPEG, [
|
|
||||||
cv2.CAP_PROP_HW_ACCELERATION, cv2.VIDEO_ACCELERATION_ANY
|
|
||||||
])
|
|
||||||
|
|
||||||
if self.cap.isOpened():
|
|
||||||
hw_accel_success = True
|
|
||||||
logger.info(f"Camera {self.camera_id}: Using OpenCV CUDA hardware acceleration")
|
|
||||||
else:
|
|
||||||
logger.debug(f"Camera {self.camera_id}: OpenCV not built with CUDA codec support")
|
|
||||||
except Exception as e:
|
|
||||||
logger.debug(f"Camera {self.camera_id}: OpenCV CUDA not available: {e}")
|
|
||||||
|
|
||||||
# Method 2: Try FFmpeg with optimal hardware acceleration (CUVID/NVDEC)
|
|
||||||
if not hw_accel_success:
|
|
||||||
try:
|
|
||||||
from core.utils.ffmpeg_detector import get_optimal_rtsp_options
|
|
||||||
import os
|
|
||||||
|
|
||||||
# Get optimal FFmpeg options based on detected capabilities
|
|
||||||
optimal_options = get_optimal_rtsp_options(self.rtsp_url)
|
|
||||||
os.environ['OPENCV_FFMPEG_CAPTURE_OPTIONS'] = optimal_options
|
|
||||||
|
|
||||||
logger.info(f"Attempting FFmpeg with detected hardware acceleration for camera {self.camera_id}")
|
|
||||||
logger.debug(f"Camera {self.camera_id}: Using FFmpeg options: {optimal_options}")
|
|
||||||
|
|
||||||
self.cap = cv2.VideoCapture(self.rtsp_url, cv2.CAP_FFMPEG)
|
|
||||||
|
|
||||||
if self.cap.isOpened():
|
|
||||||
hw_accel_success = True
|
|
||||||
# Try to get backend info to confirm hardware acceleration
|
|
||||||
backend = self.cap.getBackendName()
|
|
||||||
logger.info(f"Camera {self.camera_id}: Using FFmpeg hardware acceleration (backend: {backend})")
|
|
||||||
except Exception as e:
|
|
||||||
logger.debug(f"Camera {self.camera_id}: FFmpeg optimal hardware acceleration not available: {e}")
|
|
||||||
|
|
||||||
# Method 3: Try FFmpeg with NVIDIA NVDEC (better for RTX 3060)
|
|
||||||
if not hw_accel_success:
|
|
||||||
try:
|
|
||||||
import os
|
|
||||||
os.environ['OPENCV_FFMPEG_CAPTURE_OPTIONS'] = 'hwaccel;cuda|hwaccel_device;0|rtsp_transport;tcp'
|
|
||||||
|
|
||||||
logger.info(f"Attempting FFmpeg with NVDEC hardware acceleration for camera {self.camera_id}")
|
|
||||||
self.cap = cv2.VideoCapture(self.rtsp_url, cv2.CAP_FFMPEG)
|
|
||||||
|
|
||||||
if self.cap.isOpened():
|
|
||||||
hw_accel_success = True
|
|
||||||
logger.info(f"Camera {self.camera_id}: Using FFmpeg NVDEC hardware acceleration")
|
|
||||||
except Exception as e:
|
|
||||||
logger.debug(f"Camera {self.camera_id}: FFmpeg NVDEC not available: {e}")
|
|
||||||
|
|
||||||
# Method 4: Try FFmpeg with VAAPI (Intel/AMD GPUs)
|
|
||||||
if not hw_accel_success:
|
|
||||||
try:
|
|
||||||
import os
|
|
||||||
os.environ['OPENCV_FFMPEG_CAPTURE_OPTIONS'] = 'hwaccel;vaapi|hwaccel_device;/dev/dri/renderD128|video_codec;h264|rtsp_transport;tcp'
|
|
||||||
|
|
||||||
logger.info(f"Attempting FFmpeg with VAAPI for camera {self.camera_id}")
|
|
||||||
self.cap = cv2.VideoCapture(self.rtsp_url, cv2.CAP_FFMPEG)
|
|
||||||
|
|
||||||
if self.cap.isOpened():
|
|
||||||
hw_accel_success = True
|
|
||||||
logger.info(f"Camera {self.camera_id}: Using FFmpeg VAAPI hardware acceleration")
|
|
||||||
except Exception as e:
|
|
||||||
logger.debug(f"Camera {self.camera_id}: FFmpeg VAAPI not available: {e}")
|
|
||||||
|
|
||||||
# Fallback: Standard FFmpeg with software decoding
|
|
||||||
if not hw_accel_success:
|
|
||||||
logger.warning(f"Camera {self.camera_id}: Hardware acceleration not available, falling back to software decoding")
|
|
||||||
import os
|
|
||||||
os.environ['OPENCV_FFMPEG_CAPTURE_OPTIONS'] = 'rtsp_transport;tcp'
|
|
||||||
self.cap = cv2.VideoCapture(self.rtsp_url, cv2.CAP_FFMPEG)
|
|
||||||
|
|
||||||
if not self.cap.isOpened():
|
|
||||||
logger.error(f"Failed to open stream for camera {self.camera_id}")
|
|
||||||
return False
|
|
||||||
|
|
||||||
# Don't force resolution/fps - let the stream determine its natural specs
|
|
||||||
# The camera will provide whatever resolution/fps it supports
|
|
||||||
|
|
||||||
|
|
||||||
# Set FFMPEG options for better H.264 handling
|
|
||||||
self.cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*'H264'))
|
|
||||||
|
|
||||||
# Verify stream properties
|
|
||||||
actual_width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH))
|
|
||||||
actual_height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
|
|
||||||
actual_fps = self.cap.get(cv2.CAP_PROP_FPS)
|
|
||||||
|
|
||||||
logger.info(f"Camera {self.camera_id} initialized: {actual_width}x{actual_height} @ {actual_fps}fps")
|
|
||||||
|
|
||||||
# Read and discard first few frames to stabilize stream
|
|
||||||
for _ in range(5):
|
|
||||||
ret, _ = self.cap.read()
|
|
||||||
if not ret:
|
|
||||||
logger.warning(f"Camera {self.camera_id}: Failed to read initial frames")
|
|
||||||
time.sleep(0.1)
|
|
||||||
|
|
||||||
return True
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Error initializing capture for camera {self.camera_id}: {e}")
|
|
||||||
return False
|
|
||||||
|
|
||||||
def _reinitialize_capture(self):
|
|
||||||
"""Reinitialize capture after errors with retry logic."""
|
|
||||||
logger.info(f"Reinitializing capture for camera {self.camera_id}")
|
|
||||||
if self.cap:
|
|
||||||
self.cap.release()
|
|
||||||
self.cap = None
|
|
||||||
|
|
||||||
# Longer delay before reconnection to avoid rapid reconnect loops
|
|
||||||
time.sleep(3.0)
|
|
||||||
|
|
||||||
# Retry initialization up to 3 times
|
|
||||||
for attempt in range(3):
|
|
||||||
if self._initialize_capture():
|
|
||||||
logger.info(f"Successfully reinitialized camera {self.camera_id} on attempt {attempt + 1}")
|
|
||||||
break
|
|
||||||
else:
|
|
||||||
logger.warning(f"Failed to reinitialize camera {self.camera_id} on attempt {attempt + 1}")
|
|
||||||
time.sleep(2.0)
|
|
||||||
|
|
||||||
def _is_frame_corrupted(self, frame: np.ndarray) -> bool:
|
|
||||||
"""Check if frame is corrupted (all black, all white, or excessive noise)."""
|
|
||||||
if frame is None or frame.size == 0:
|
|
||||||
return True
|
|
||||||
|
|
||||||
# Check mean and standard deviation
|
|
||||||
mean = np.mean(frame)
|
|
||||||
std = np.std(frame)
|
|
||||||
|
|
||||||
# All black or all white
|
|
||||||
if mean < 5 or mean > 250:
|
|
||||||
return True
|
|
||||||
|
|
||||||
# No variation (stuck frame)
|
|
||||||
if std < 1:
|
|
||||||
return True
|
|
||||||
|
|
||||||
# Excessive noise (corrupted H.264 decode)
|
|
||||||
# Calculate edge density as corruption indicator
|
|
||||||
edges = cv2.Canny(frame, 50, 150)
|
|
||||||
edge_density = np.sum(edges > 0) / edges.size
|
|
||||||
|
|
||||||
# Too many edges indicate corruption
|
|
||||||
if edge_density > 0.5:
|
|
||||||
return True
|
|
||||||
|
|
||||||
return False
|
|
||||||
|
|
||||||
|
|
||||||
class HTTPSnapshotReader:
|
|
||||||
"""HTTP snapshot reader optimized for 2560x1440 (2K) high quality images."""
|
|
||||||
|
|
||||||
def __init__(self, camera_id: str, snapshot_url: str, interval_ms: int = 5000, max_retries: int = 3):
|
|
||||||
self.camera_id = camera_id
|
|
||||||
self.snapshot_url = snapshot_url
|
|
||||||
self.interval_ms = interval_ms
|
|
||||||
self.max_retries = max_retries
|
|
||||||
self.stop_event = threading.Event()
|
|
||||||
self.thread = None
|
|
||||||
self.frame_callback: Optional[Callable] = None
|
|
||||||
|
|
||||||
# Expected snapshot specifications
|
|
||||||
self.expected_width = 2560
|
|
||||||
self.expected_height = 1440
|
|
||||||
self.max_file_size = 10 * 1024 * 1024 # 10MB max for 2K image
|
|
||||||
|
|
||||||
def set_frame_callback(self, callback: Callable[[str, np.ndarray], None]):
|
|
||||||
"""Set callback function to handle captured frames."""
|
|
||||||
self.frame_callback = callback
|
|
||||||
|
|
||||||
def start(self):
|
|
||||||
"""Start the snapshot reader thread."""
|
|
||||||
if self.thread and self.thread.is_alive():
|
|
||||||
logger.warning(f"Snapshot reader for {self.camera_id} already running")
|
|
||||||
return
|
|
||||||
|
|
||||||
self.stop_event.clear()
|
|
||||||
self.thread = threading.Thread(target=self._read_snapshots, daemon=True)
|
|
||||||
self.thread.start()
|
|
||||||
logger.info(f"Started snapshot reader for camera {self.camera_id}")
|
|
||||||
|
|
||||||
def stop(self):
|
|
||||||
"""Stop the snapshot reader thread."""
|
|
||||||
self.stop_event.set()
|
|
||||||
if self.thread:
|
|
||||||
self.thread.join(timeout=5.0)
|
|
||||||
logger.info(f"Stopped snapshot reader for camera {self.camera_id}")
|
|
||||||
|
|
||||||
def _read_snapshots(self):
|
|
||||||
"""Main snapshot reading loop for high quality 2K images."""
|
|
||||||
retries = 0
|
|
||||||
frame_count = 0
|
|
||||||
last_log_time = time.time()
|
|
||||||
interval_seconds = self.interval_ms / 1000.0
|
|
||||||
|
|
||||||
logger.info(f"Snapshot interval for camera {self.camera_id}: {interval_seconds}s")
|
|
||||||
|
|
||||||
while not self.stop_event.is_set():
|
|
||||||
try:
|
|
||||||
start_time = time.time()
|
|
||||||
frame = self._fetch_snapshot()
|
|
||||||
|
|
||||||
if frame is None:
|
|
||||||
retries += 1
|
|
||||||
logger.warning(f"Failed to fetch snapshot for camera {self.camera_id}, retry {retries}/{self.max_retries}")
|
|
||||||
|
|
||||||
if self.max_retries != -1 and retries > self.max_retries:
|
|
||||||
logger.error(f"Max retries reached for snapshot camera {self.camera_id}")
|
|
||||||
break
|
|
||||||
|
|
||||||
time.sleep(min(2.0, interval_seconds))
|
|
||||||
continue
|
|
||||||
|
|
||||||
# Accept any valid image dimensions - don't force specific resolution
|
|
||||||
if frame.shape[1] <= 0 or frame.shape[0] <= 0:
|
|
||||||
logger.warning(f"Camera {self.camera_id}: Invalid frame dimensions {frame.shape[1]}x{frame.shape[0]}")
|
|
||||||
continue
|
|
||||||
|
|
||||||
# Reset retry counter on successful fetch
|
|
||||||
retries = 0
|
|
||||||
frame_count += 1
|
|
||||||
|
|
||||||
# Call frame callback
|
|
||||||
if self.frame_callback:
|
|
||||||
try:
|
|
||||||
self.frame_callback(self.camera_id, frame)
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Camera {self.camera_id}: Frame callback error: {e}")
|
|
||||||
|
|
||||||
# Log progress every 30 seconds
|
|
||||||
current_time = time.time()
|
|
||||||
if current_time - last_log_time >= 30:
|
|
||||||
logger.info(f"Camera {self.camera_id}: {frame_count} snapshots processed")
|
|
||||||
last_log_time = current_time
|
|
||||||
|
|
||||||
# Wait for next interval
|
|
||||||
elapsed = time.time() - start_time
|
|
||||||
sleep_time = max(0, interval_seconds - elapsed)
|
|
||||||
if sleep_time > 0:
|
|
||||||
self.stop_event.wait(sleep_time)
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Error in snapshot loop for camera {self.camera_id}: {e}")
|
|
||||||
retries += 1
|
|
||||||
if self.max_retries != -1 and retries > self.max_retries:
|
|
||||||
break
|
|
||||||
time.sleep(min(2.0, interval_seconds))
|
|
||||||
|
|
||||||
logger.info(f"Snapshot reader thread ended for camera {self.camera_id}")
|
|
||||||
|
|
||||||
def _fetch_snapshot(self) -> Optional[np.ndarray]:
|
|
||||||
"""Fetch a single high quality snapshot from HTTP URL."""
|
|
||||||
try:
|
|
||||||
# Parse URL for authentication
|
|
||||||
from urllib.parse import urlparse
|
|
||||||
parsed_url = urlparse(self.snapshot_url)
|
|
||||||
|
|
||||||
headers = {
|
|
||||||
'User-Agent': 'Python-Detector-Worker/1.0',
|
|
||||||
'Accept': 'image/jpeg, image/png, image/*'
|
|
||||||
}
|
|
||||||
auth = None
|
|
||||||
|
|
||||||
if parsed_url.username and parsed_url.password:
|
|
||||||
from requests.auth import HTTPBasicAuth, HTTPDigestAuth
|
|
||||||
auth = HTTPBasicAuth(parsed_url.username, parsed_url.password)
|
|
||||||
|
|
||||||
# Reconstruct URL without credentials
|
|
||||||
clean_url = f"{parsed_url.scheme}://{parsed_url.hostname}"
|
|
||||||
if parsed_url.port:
|
|
||||||
clean_url += f":{parsed_url.port}"
|
|
||||||
clean_url += parsed_url.path
|
|
||||||
if parsed_url.query:
|
|
||||||
clean_url += f"?{parsed_url.query}"
|
|
||||||
|
|
||||||
# Try Basic Auth first
|
|
||||||
response = requests.get(clean_url, auth=auth, timeout=15, headers=headers,
|
|
||||||
stream=True, verify=False)
|
|
||||||
|
|
||||||
# If Basic Auth fails, try Digest Auth
|
|
||||||
if response.status_code == 401:
|
|
||||||
auth = HTTPDigestAuth(parsed_url.username, parsed_url.password)
|
|
||||||
response = requests.get(clean_url, auth=auth, timeout=15, headers=headers,
|
|
||||||
stream=True, verify=False)
|
|
||||||
else:
|
|
||||||
response = requests.get(self.snapshot_url, timeout=15, headers=headers,
|
|
||||||
stream=True, verify=False)
|
|
||||||
|
|
||||||
if response.status_code == 200:
|
|
||||||
# Check content size
|
|
||||||
content_length = int(response.headers.get('content-length', 0))
|
|
||||||
if content_length > self.max_file_size:
|
|
||||||
logger.warning(f"Snapshot too large for camera {self.camera_id}: {content_length} bytes")
|
|
||||||
return None
|
|
||||||
|
|
||||||
# Read content
|
|
||||||
content = response.content
|
|
||||||
|
|
||||||
# Convert to numpy array
|
|
||||||
image_array = np.frombuffer(content, np.uint8)
|
|
||||||
|
|
||||||
# Decode as high quality image
|
|
||||||
frame = cv2.imdecode(image_array, cv2.IMREAD_COLOR)
|
|
||||||
|
|
||||||
if frame is None:
|
|
||||||
logger.error(f"Failed to decode snapshot for camera {self.camera_id}")
|
|
||||||
return None
|
|
||||||
|
|
||||||
logger.debug(f"Fetched snapshot for camera {self.camera_id}: {frame.shape[1]}x{frame.shape[0]}")
|
|
||||||
return frame
|
|
||||||
else:
|
|
||||||
logger.warning(f"HTTP {response.status_code} from {self.camera_id}")
|
|
||||||
return None
|
|
||||||
|
|
||||||
except requests.RequestException as e:
|
|
||||||
logger.error(f"Request error fetching snapshot for {self.camera_id}: {e}")
|
|
||||||
return None
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Error decoding snapshot for {self.camera_id}: {e}")
|
|
||||||
return None
|
|
||||||
|
|
||||||
def fetch_single_snapshot(self) -> Optional[np.ndarray]:
|
|
||||||
"""
|
|
||||||
Fetch a single high-quality snapshot on demand for pipeline processing.
|
|
||||||
This method is for one-time fetch from HTTP URL, not continuous streaming.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
High quality 2K snapshot frame or None if failed
|
|
||||||
"""
|
|
||||||
logger.info(f"[SNAPSHOT] Fetching snapshot for {self.camera_id} from {self.snapshot_url}")
|
|
||||||
|
|
||||||
# Try to fetch snapshot with retries
|
|
||||||
for attempt in range(self.max_retries):
|
|
||||||
frame = self._fetch_snapshot()
|
|
||||||
|
|
||||||
if frame is not None:
|
|
||||||
logger.info(f"[SNAPSHOT] Successfully fetched {frame.shape[1]}x{frame.shape[0]} snapshot for {self.camera_id}")
|
|
||||||
return frame
|
|
||||||
|
|
||||||
if attempt < self.max_retries - 1:
|
|
||||||
logger.warning(f"[SNAPSHOT] Attempt {attempt + 1}/{self.max_retries} failed for {self.camera_id}, retrying...")
|
|
||||||
time.sleep(0.5)
|
|
||||||
|
|
||||||
logger.error(f"[SNAPSHOT] Failed to fetch snapshot for {self.camera_id} after {self.max_retries} attempts")
|
|
||||||
return None
|
|
||||||
|
|
||||||
def _resize_maintain_aspect(self, frame: np.ndarray, target_width: int, target_height: int) -> np.ndarray:
|
|
||||||
"""Resize image while maintaining aspect ratio for high quality."""
|
|
||||||
h, w = frame.shape[:2]
|
|
||||||
aspect = w / h
|
|
||||||
target_aspect = target_width / target_height
|
|
||||||
|
|
||||||
if aspect > target_aspect:
|
|
||||||
# Image is wider
|
|
||||||
new_width = target_width
|
|
||||||
new_height = int(target_width / aspect)
|
|
||||||
else:
|
|
||||||
# Image is taller
|
|
||||||
new_height = target_height
|
|
||||||
new_width = int(target_height * aspect)
|
|
||||||
|
|
||||||
# Use INTER_LANCZOS4 for high quality downsampling
|
|
||||||
resized = cv2.resize(frame, (new_width, new_height), interpolation=cv2.INTER_LANCZOS4)
|
|
||||||
|
|
||||||
# Pad to target size if needed
|
|
||||||
if new_width < target_width or new_height < target_height:
|
|
||||||
top = (target_height - new_height) // 2
|
|
||||||
bottom = target_height - new_height - top
|
|
||||||
left = (target_width - new_width) // 2
|
|
||||||
right = target_width - new_width - left
|
|
||||||
resized = cv2.copyMakeBorder(resized, top, bottom, left, right, cv2.BORDER_CONSTANT, value=[0, 0, 0])
|
|
||||||
|
|
||||||
return resized
|
|
18
core/streaming/readers/__init__.py
Normal file
18
core/streaming/readers/__init__.py
Normal file
|
@ -0,0 +1,18 @@
|
||||||
|
"""
|
||||||
|
Stream readers for RTSP and HTTP camera feeds.
|
||||||
|
"""
|
||||||
|
from .base import VideoReader
|
||||||
|
from .ffmpeg_rtsp import FFmpegRTSPReader
|
||||||
|
from .http_snapshot import HTTPSnapshotReader
|
||||||
|
from .utils import log_success, log_warning, log_error, log_info, Colors
|
||||||
|
|
||||||
|
__all__ = [
|
||||||
|
'VideoReader',
|
||||||
|
'FFmpegRTSPReader',
|
||||||
|
'HTTPSnapshotReader',
|
||||||
|
'log_success',
|
||||||
|
'log_warning',
|
||||||
|
'log_error',
|
||||||
|
'log_info',
|
||||||
|
'Colors'
|
||||||
|
]
|
65
core/streaming/readers/base.py
Normal file
65
core/streaming/readers/base.py
Normal file
|
@ -0,0 +1,65 @@
|
||||||
|
"""
|
||||||
|
Abstract base class for video stream readers.
|
||||||
|
"""
|
||||||
|
from abc import ABC, abstractmethod
|
||||||
|
from typing import Optional, Callable
|
||||||
|
import numpy as np
|
||||||
|
|
||||||
|
|
||||||
|
class VideoReader(ABC):
|
||||||
|
"""Abstract base class for video stream readers."""
|
||||||
|
|
||||||
|
def __init__(self, camera_id: str, source_url: str, max_retries: int = 3):
|
||||||
|
"""
|
||||||
|
Initialize the video reader.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
camera_id: Unique identifier for the camera
|
||||||
|
source_url: URL or path to the video source
|
||||||
|
max_retries: Maximum number of retry attempts
|
||||||
|
"""
|
||||||
|
self.camera_id = camera_id
|
||||||
|
self.source_url = source_url
|
||||||
|
self.max_retries = max_retries
|
||||||
|
self.frame_callback: Optional[Callable[[str, np.ndarray], None]] = None
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def start(self) -> None:
|
||||||
|
"""Start the video reader."""
|
||||||
|
pass
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def stop(self) -> None:
|
||||||
|
"""Stop the video reader."""
|
||||||
|
pass
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def set_frame_callback(self, callback: Callable[[str, np.ndarray], None]) -> None:
|
||||||
|
"""
|
||||||
|
Set callback function to handle captured frames.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
callback: Function that takes (camera_id, frame) as arguments
|
||||||
|
"""
|
||||||
|
pass
|
||||||
|
|
||||||
|
@property
|
||||||
|
@abstractmethod
|
||||||
|
def is_running(self) -> bool:
|
||||||
|
"""Check if the reader is currently running."""
|
||||||
|
pass
|
||||||
|
|
||||||
|
@property
|
||||||
|
@abstractmethod
|
||||||
|
def reader_type(self) -> str:
|
||||||
|
"""Get the type of reader (e.g., 'rtsp', 'http_snapshot')."""
|
||||||
|
pass
|
||||||
|
|
||||||
|
def __enter__(self):
|
||||||
|
"""Context manager entry."""
|
||||||
|
self.start()
|
||||||
|
return self
|
||||||
|
|
||||||
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||||
|
"""Context manager exit."""
|
||||||
|
self.stop()
|
436
core/streaming/readers/ffmpeg_rtsp.py
Normal file
436
core/streaming/readers/ffmpeg_rtsp.py
Normal file
|
@ -0,0 +1,436 @@
|
||||||
|
"""
|
||||||
|
FFmpeg RTSP stream reader using subprocess piping frames directly to buffer.
|
||||||
|
Enhanced with comprehensive health monitoring and automatic recovery.
|
||||||
|
"""
|
||||||
|
import cv2
|
||||||
|
import time
|
||||||
|
import threading
|
||||||
|
import numpy as np
|
||||||
|
import subprocess
|
||||||
|
import struct
|
||||||
|
from typing import Optional, Callable, Dict, Any
|
||||||
|
|
||||||
|
from .base import VideoReader
|
||||||
|
from .utils import log_success, log_warning, log_error, log_info
|
||||||
|
from ...monitoring.stream_health import stream_health_tracker
|
||||||
|
from ...monitoring.thread_health import thread_health_monitor
|
||||||
|
from ...monitoring.recovery import recovery_manager, RecoveryAction
|
||||||
|
|
||||||
|
|
||||||
|
class FFmpegRTSPReader(VideoReader):
|
||||||
|
"""RTSP stream reader using subprocess FFmpeg piping frames directly to buffer."""
|
||||||
|
|
||||||
|
def __init__(self, camera_id: str, rtsp_url: str, max_retries: int = 3):
|
||||||
|
super().__init__(camera_id, rtsp_url, max_retries)
|
||||||
|
self.rtsp_url = rtsp_url
|
||||||
|
self.process = None
|
||||||
|
self.stop_event = threading.Event()
|
||||||
|
self.thread = None
|
||||||
|
self.stderr_thread = None
|
||||||
|
|
||||||
|
# Expected stream specs (for reference, actual dimensions read from PPM header)
|
||||||
|
self.width = 1280
|
||||||
|
self.height = 720
|
||||||
|
|
||||||
|
# Watchdog timers for stream reliability
|
||||||
|
self.process_start_time = None
|
||||||
|
self.last_frame_time = None
|
||||||
|
self.is_restart = False # Track if this is a restart (shorter timeout)
|
||||||
|
self.first_start_timeout = 30.0 # 30s timeout on first start
|
||||||
|
self.restart_timeout = 15.0 # 15s timeout after restart
|
||||||
|
|
||||||
|
# Health monitoring setup
|
||||||
|
self.last_heartbeat = time.time()
|
||||||
|
self.consecutive_errors = 0
|
||||||
|
self.ffmpeg_restart_count = 0
|
||||||
|
|
||||||
|
# Register recovery handlers
|
||||||
|
recovery_manager.register_recovery_handler(
|
||||||
|
RecoveryAction.RESTART_STREAM,
|
||||||
|
self._handle_restart_recovery
|
||||||
|
)
|
||||||
|
recovery_manager.register_recovery_handler(
|
||||||
|
RecoveryAction.RECONNECT,
|
||||||
|
self._handle_reconnect_recovery
|
||||||
|
)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def is_running(self) -> bool:
|
||||||
|
"""Check if the reader is currently running."""
|
||||||
|
return self.thread is not None and self.thread.is_alive()
|
||||||
|
|
||||||
|
@property
|
||||||
|
def reader_type(self) -> str:
|
||||||
|
"""Get the type of reader."""
|
||||||
|
return "rtsp_ffmpeg"
|
||||||
|
|
||||||
|
def set_frame_callback(self, callback: Callable[[str, np.ndarray], None]):
|
||||||
|
"""Set callback function to handle captured frames."""
|
||||||
|
self.frame_callback = callback
|
||||||
|
|
||||||
|
def start(self):
|
||||||
|
"""Start the FFmpeg subprocess reader."""
|
||||||
|
if self.thread and self.thread.is_alive():
|
||||||
|
log_warning(self.camera_id, "FFmpeg reader already running")
|
||||||
|
return
|
||||||
|
|
||||||
|
self.stop_event.clear()
|
||||||
|
self.thread = threading.Thread(target=self._read_frames, daemon=True)
|
||||||
|
self.thread.start()
|
||||||
|
|
||||||
|
# Register with health monitoring
|
||||||
|
stream_health_tracker.register_stream(self.camera_id, "rtsp_ffmpeg", self.rtsp_url)
|
||||||
|
thread_health_monitor.register_thread(self.thread, self._heartbeat_callback)
|
||||||
|
|
||||||
|
log_success(self.camera_id, "Stream started with health monitoring")
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
"""Stop the FFmpeg subprocess reader."""
|
||||||
|
self.stop_event.set()
|
||||||
|
|
||||||
|
# Unregister from health monitoring
|
||||||
|
if self.thread:
|
||||||
|
thread_health_monitor.unregister_thread(self.thread.ident)
|
||||||
|
|
||||||
|
if self.process:
|
||||||
|
self.process.terminate()
|
||||||
|
try:
|
||||||
|
self.process.wait(timeout=5)
|
||||||
|
except subprocess.TimeoutExpired:
|
||||||
|
self.process.kill()
|
||||||
|
|
||||||
|
if self.thread:
|
||||||
|
self.thread.join(timeout=5.0)
|
||||||
|
if self.stderr_thread:
|
||||||
|
self.stderr_thread.join(timeout=2.0)
|
||||||
|
|
||||||
|
stream_health_tracker.unregister_stream(self.camera_id)
|
||||||
|
|
||||||
|
log_info(self.camera_id, "Stream stopped")
|
||||||
|
|
||||||
|
def _start_ffmpeg_process(self):
|
||||||
|
"""Start FFmpeg subprocess outputting BMP frames to stdout pipe."""
|
||||||
|
cmd = [
|
||||||
|
'ffmpeg',
|
||||||
|
# DO NOT REMOVE
|
||||||
|
'-hwaccel', 'cuda',
|
||||||
|
'-hwaccel_device', '0',
|
||||||
|
# Real-time input flags
|
||||||
|
'-fflags', 'nobuffer+genpts',
|
||||||
|
'-flags', 'low_delay',
|
||||||
|
'-max_delay', '0', # No reordering delay
|
||||||
|
# RTSP configuration
|
||||||
|
'-rtsp_transport', 'tcp',
|
||||||
|
'-i', self.rtsp_url,
|
||||||
|
# Output configuration (keeping BMP)
|
||||||
|
'-f', 'image2pipe', # Output images to pipe
|
||||||
|
'-vcodec', 'bmp', # BMP format with header containing dimensions
|
||||||
|
'-vsync', 'passthrough', # Pass frames as-is
|
||||||
|
# Use native stream resolution and framerate
|
||||||
|
'-an', # No audio
|
||||||
|
'-' # Output to stdout
|
||||||
|
]
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Start FFmpeg with stdout pipe to read frames directly
|
||||||
|
self.process = subprocess.Popen(
|
||||||
|
cmd,
|
||||||
|
stdout=subprocess.PIPE, # Capture stdout for frame data
|
||||||
|
stderr=subprocess.PIPE, # Capture stderr for error logging
|
||||||
|
bufsize=0 # Unbuffered for real-time processing
|
||||||
|
)
|
||||||
|
|
||||||
|
# Start stderr reading thread
|
||||||
|
if self.stderr_thread and self.stderr_thread.is_alive():
|
||||||
|
# Stop previous stderr thread
|
||||||
|
try:
|
||||||
|
self.stderr_thread.join(timeout=1.0)
|
||||||
|
except:
|
||||||
|
pass
|
||||||
|
|
||||||
|
self.stderr_thread = threading.Thread(target=self._read_stderr, daemon=True)
|
||||||
|
self.stderr_thread.start()
|
||||||
|
|
||||||
|
# Set process start time for watchdog
|
||||||
|
self.process_start_time = time.time()
|
||||||
|
self.last_frame_time = None # Reset frame time
|
||||||
|
|
||||||
|
# After successful restart, next timeout will be back to 30s
|
||||||
|
if self.is_restart:
|
||||||
|
log_info(self.camera_id, f"FFmpeg restarted successfully, next timeout: {self.first_start_timeout}s")
|
||||||
|
self.is_restart = False
|
||||||
|
|
||||||
|
return True
|
||||||
|
except Exception as e:
|
||||||
|
log_error(self.camera_id, f"FFmpeg startup failed: {e}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
def _read_bmp_frame(self, pipe):
|
||||||
|
"""Read BMP frame from pipe - BMP header contains dimensions."""
|
||||||
|
try:
|
||||||
|
# Read BMP header (14 bytes file header + 40 bytes info header = 54 bytes minimum)
|
||||||
|
header_data = b''
|
||||||
|
bytes_to_read = 54
|
||||||
|
|
||||||
|
while len(header_data) < bytes_to_read:
|
||||||
|
chunk = pipe.read(bytes_to_read - len(header_data))
|
||||||
|
if not chunk:
|
||||||
|
return None # Silent end of stream
|
||||||
|
header_data += chunk
|
||||||
|
|
||||||
|
# Parse BMP header
|
||||||
|
if header_data[:2] != b'BM':
|
||||||
|
return None # Invalid format, skip frame silently
|
||||||
|
|
||||||
|
# Extract file size from header (bytes 2-5)
|
||||||
|
file_size = struct.unpack('<L', header_data[2:6])[0]
|
||||||
|
|
||||||
|
# Extract width and height from info header (bytes 18-21 and 22-25)
|
||||||
|
width = struct.unpack('<L', header_data[18:22])[0]
|
||||||
|
height = struct.unpack('<L', header_data[22:26])[0]
|
||||||
|
|
||||||
|
# Read remaining file data
|
||||||
|
remaining_size = file_size - 54
|
||||||
|
remaining_data = b''
|
||||||
|
|
||||||
|
while len(remaining_data) < remaining_size:
|
||||||
|
chunk = pipe.read(remaining_size - len(remaining_data))
|
||||||
|
if not chunk:
|
||||||
|
return None # Stream ended silently
|
||||||
|
remaining_data += chunk
|
||||||
|
|
||||||
|
# Complete BMP data
|
||||||
|
bmp_data = header_data + remaining_data
|
||||||
|
|
||||||
|
# Use OpenCV to decode BMP directly from memory
|
||||||
|
frame_array = np.frombuffer(bmp_data, dtype=np.uint8)
|
||||||
|
frame = cv2.imdecode(frame_array, cv2.IMREAD_COLOR)
|
||||||
|
|
||||||
|
if frame is None:
|
||||||
|
return None # Decode failed silently
|
||||||
|
|
||||||
|
return frame
|
||||||
|
|
||||||
|
except Exception:
|
||||||
|
return None # Error reading frame silently
|
||||||
|
|
||||||
|
def _read_stderr(self):
|
||||||
|
"""Read and log FFmpeg stderr output in background thread."""
|
||||||
|
if not self.process or not self.process.stderr:
|
||||||
|
return
|
||||||
|
|
||||||
|
try:
|
||||||
|
while self.process and self.process.poll() is None:
|
||||||
|
try:
|
||||||
|
line = self.process.stderr.readline()
|
||||||
|
if line:
|
||||||
|
error_msg = line.decode('utf-8', errors='ignore').strip()
|
||||||
|
if error_msg and not self.stop_event.is_set():
|
||||||
|
# Filter out common noise but log actual errors
|
||||||
|
if any(keyword in error_msg.lower() for keyword in ['error', 'failed', 'cannot', 'invalid']):
|
||||||
|
log_error(self.camera_id, f"FFmpeg: {error_msg}")
|
||||||
|
elif 'warning' in error_msg.lower():
|
||||||
|
log_warning(self.camera_id, f"FFmpeg: {error_msg}")
|
||||||
|
except Exception:
|
||||||
|
break
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
def _check_watchdog_timeout(self) -> bool:
|
||||||
|
"""Check if watchdog timeout has been exceeded."""
|
||||||
|
if not self.process_start_time:
|
||||||
|
return False
|
||||||
|
|
||||||
|
current_time = time.time()
|
||||||
|
time_since_start = current_time - self.process_start_time
|
||||||
|
|
||||||
|
# Determine timeout based on whether this is a restart
|
||||||
|
timeout = self.restart_timeout if self.is_restart else self.first_start_timeout
|
||||||
|
|
||||||
|
# If no frames received yet, check against process start time
|
||||||
|
if not self.last_frame_time:
|
||||||
|
if time_since_start > timeout:
|
||||||
|
log_warning(self.camera_id, f"Watchdog timeout: No frames for {time_since_start:.1f}s (limit: {timeout}s)")
|
||||||
|
return True
|
||||||
|
else:
|
||||||
|
# Check time since last frame
|
||||||
|
time_since_frame = current_time - self.last_frame_time
|
||||||
|
if time_since_frame > timeout:
|
||||||
|
log_warning(self.camera_id, f"Watchdog timeout: No frames for {time_since_frame:.1f}s (limit: {timeout}s)")
|
||||||
|
return True
|
||||||
|
|
||||||
|
return False
|
||||||
|
|
||||||
|
def _restart_ffmpeg_process(self):
|
||||||
|
"""Restart FFmpeg process due to watchdog timeout."""
|
||||||
|
log_warning(self.camera_id, "Watchdog triggered FFmpeg restart")
|
||||||
|
|
||||||
|
# Terminate current process
|
||||||
|
if self.process:
|
||||||
|
try:
|
||||||
|
self.process.terminate()
|
||||||
|
self.process.wait(timeout=3)
|
||||||
|
except subprocess.TimeoutExpired:
|
||||||
|
self.process.kill()
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
self.process = None
|
||||||
|
|
||||||
|
# Mark as restart for shorter timeout
|
||||||
|
self.is_restart = True
|
||||||
|
|
||||||
|
# Small delay before restart
|
||||||
|
time.sleep(1.0)
|
||||||
|
|
||||||
|
def _read_frames(self):
|
||||||
|
"""Read frames directly from FFmpeg stdout pipe."""
|
||||||
|
frame_count = 0
|
||||||
|
last_log_time = time.time()
|
||||||
|
|
||||||
|
while not self.stop_event.is_set():
|
||||||
|
try:
|
||||||
|
# Send heartbeat for thread health monitoring
|
||||||
|
self._send_heartbeat("reading_frames")
|
||||||
|
|
||||||
|
# Check watchdog timeout if process is running
|
||||||
|
if self.process and self.process.poll() is None:
|
||||||
|
if self._check_watchdog_timeout():
|
||||||
|
self._restart_ffmpeg_process()
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Start FFmpeg if not running
|
||||||
|
if not self.process or self.process.poll() is not None:
|
||||||
|
if self.process and self.process.poll() is not None:
|
||||||
|
log_warning(self.camera_id, "Stream disconnected, reconnecting...")
|
||||||
|
stream_health_tracker.report_error(
|
||||||
|
self.camera_id,
|
||||||
|
"FFmpeg process disconnected"
|
||||||
|
)
|
||||||
|
|
||||||
|
if not self._start_ffmpeg_process():
|
||||||
|
self.consecutive_errors += 1
|
||||||
|
stream_health_tracker.report_error(
|
||||||
|
self.camera_id,
|
||||||
|
"Failed to start FFmpeg process"
|
||||||
|
)
|
||||||
|
time.sleep(5.0)
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Read frames directly from FFmpeg stdout
|
||||||
|
try:
|
||||||
|
if self.process and self.process.stdout:
|
||||||
|
# Read BMP frame data
|
||||||
|
frame = self._read_bmp_frame(self.process.stdout)
|
||||||
|
if frame is None:
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Update watchdog - we got a frame
|
||||||
|
self.last_frame_time = time.time()
|
||||||
|
|
||||||
|
# Reset error counter on successful frame
|
||||||
|
self.consecutive_errors = 0
|
||||||
|
|
||||||
|
# Report successful frame to health monitoring
|
||||||
|
frame_size = frame.nbytes
|
||||||
|
stream_health_tracker.report_frame_received(self.camera_id, frame_size)
|
||||||
|
|
||||||
|
# Call frame callback
|
||||||
|
if self.frame_callback:
|
||||||
|
try:
|
||||||
|
self.frame_callback(self.camera_id, frame)
|
||||||
|
except Exception as e:
|
||||||
|
stream_health_tracker.report_error(
|
||||||
|
self.camera_id,
|
||||||
|
f"Frame callback error: {e}"
|
||||||
|
)
|
||||||
|
|
||||||
|
frame_count += 1
|
||||||
|
|
||||||
|
# Log progress every 60 seconds (quieter)
|
||||||
|
current_time = time.time()
|
||||||
|
if current_time - last_log_time >= 60:
|
||||||
|
log_success(self.camera_id, f"{frame_count} frames captured ({frame.shape[1]}x{frame.shape[0]})")
|
||||||
|
last_log_time = current_time
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
# Process might have died, let it restart on next iteration
|
||||||
|
stream_health_tracker.report_error(
|
||||||
|
self.camera_id,
|
||||||
|
f"Frame reading error: {e}"
|
||||||
|
)
|
||||||
|
if self.process:
|
||||||
|
self.process.terminate()
|
||||||
|
self.process = None
|
||||||
|
time.sleep(1.0)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
stream_health_tracker.report_error(
|
||||||
|
self.camera_id,
|
||||||
|
f"Main loop error: {e}"
|
||||||
|
)
|
||||||
|
time.sleep(1.0)
|
||||||
|
|
||||||
|
# Cleanup
|
||||||
|
if self.process:
|
||||||
|
self.process.terminate()
|
||||||
|
|
||||||
|
# Health monitoring methods
|
||||||
|
def _send_heartbeat(self, activity: str = "running"):
|
||||||
|
"""Send heartbeat to thread health monitor."""
|
||||||
|
self.last_heartbeat = time.time()
|
||||||
|
thread_health_monitor.heartbeat(activity=activity)
|
||||||
|
|
||||||
|
def _heartbeat_callback(self) -> bool:
|
||||||
|
"""Heartbeat callback for thread responsiveness testing."""
|
||||||
|
try:
|
||||||
|
# Check if thread is responsive by checking recent heartbeat
|
||||||
|
current_time = time.time()
|
||||||
|
age = current_time - self.last_heartbeat
|
||||||
|
|
||||||
|
# Thread is responsive if heartbeat is recent
|
||||||
|
return age < 30.0 # 30 second responsiveness threshold
|
||||||
|
|
||||||
|
except Exception:
|
||||||
|
return False
|
||||||
|
|
||||||
|
def _handle_restart_recovery(self, component: str, details: Dict[str, Any]) -> bool:
|
||||||
|
"""Handle restart recovery action."""
|
||||||
|
try:
|
||||||
|
log_info(self.camera_id, "Restarting FFmpeg RTSP reader for health recovery")
|
||||||
|
|
||||||
|
# Stop current instance
|
||||||
|
self.stop()
|
||||||
|
|
||||||
|
# Small delay
|
||||||
|
time.sleep(2.0)
|
||||||
|
|
||||||
|
# Restart
|
||||||
|
self.start()
|
||||||
|
|
||||||
|
# Report successful restart
|
||||||
|
stream_health_tracker.report_reconnect(self.camera_id, "health_recovery_restart")
|
||||||
|
self.ffmpeg_restart_count += 1
|
||||||
|
|
||||||
|
return True
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
log_error(self.camera_id, f"Failed to restart FFmpeg RTSP reader: {e}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
def _handle_reconnect_recovery(self, component: str, details: Dict[str, Any]) -> bool:
|
||||||
|
"""Handle reconnect recovery action."""
|
||||||
|
try:
|
||||||
|
log_info(self.camera_id, "Reconnecting FFmpeg RTSP reader for health recovery")
|
||||||
|
|
||||||
|
# Force restart FFmpeg process
|
||||||
|
self._restart_ffmpeg_process()
|
||||||
|
|
||||||
|
# Reset error counters
|
||||||
|
self.consecutive_errors = 0
|
||||||
|
stream_health_tracker.report_reconnect(self.camera_id, "health_recovery_reconnect")
|
||||||
|
|
||||||
|
return True
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
log_error(self.camera_id, f"Failed to reconnect FFmpeg RTSP reader: {e}")
|
||||||
|
return False
|
378
core/streaming/readers/http_snapshot.py
Normal file
378
core/streaming/readers/http_snapshot.py
Normal file
|
@ -0,0 +1,378 @@
|
||||||
|
"""
|
||||||
|
HTTP snapshot reader optimized for 2560x1440 (2K) high quality images.
|
||||||
|
Enhanced with comprehensive health monitoring and automatic recovery.
|
||||||
|
"""
|
||||||
|
import cv2
|
||||||
|
import logging
|
||||||
|
import time
|
||||||
|
import threading
|
||||||
|
import requests
|
||||||
|
import numpy as np
|
||||||
|
from typing import Optional, Callable, Dict, Any
|
||||||
|
|
||||||
|
from .base import VideoReader
|
||||||
|
from .utils import log_success, log_warning, log_error, log_info
|
||||||
|
from ...monitoring.stream_health import stream_health_tracker
|
||||||
|
from ...monitoring.thread_health import thread_health_monitor
|
||||||
|
from ...monitoring.recovery import recovery_manager, RecoveryAction
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class HTTPSnapshotReader(VideoReader):
|
||||||
|
"""HTTP snapshot reader optimized for 2560x1440 (2K) high quality images."""
|
||||||
|
|
||||||
|
def __init__(self, camera_id: str, snapshot_url: str, interval_ms: int = 5000, max_retries: int = 3):
|
||||||
|
super().__init__(camera_id, snapshot_url, max_retries)
|
||||||
|
self.snapshot_url = snapshot_url
|
||||||
|
self.interval_ms = interval_ms
|
||||||
|
self.stop_event = threading.Event()
|
||||||
|
self.thread = None
|
||||||
|
|
||||||
|
# Expected snapshot specifications
|
||||||
|
self.expected_width = 2560
|
||||||
|
self.expected_height = 1440
|
||||||
|
self.max_file_size = 10 * 1024 * 1024 # 10MB max for 2K image
|
||||||
|
|
||||||
|
# Health monitoring setup
|
||||||
|
self.last_heartbeat = time.time()
|
||||||
|
self.consecutive_errors = 0
|
||||||
|
self.connection_test_interval = 300 # Test connection every 5 minutes
|
||||||
|
self.last_connection_test = None
|
||||||
|
|
||||||
|
# Register recovery handlers
|
||||||
|
recovery_manager.register_recovery_handler(
|
||||||
|
RecoveryAction.RESTART_STREAM,
|
||||||
|
self._handle_restart_recovery
|
||||||
|
)
|
||||||
|
recovery_manager.register_recovery_handler(
|
||||||
|
RecoveryAction.RECONNECT,
|
||||||
|
self._handle_reconnect_recovery
|
||||||
|
)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def is_running(self) -> bool:
|
||||||
|
"""Check if the reader is currently running."""
|
||||||
|
return self.thread is not None and self.thread.is_alive()
|
||||||
|
|
||||||
|
@property
|
||||||
|
def reader_type(self) -> str:
|
||||||
|
"""Get the type of reader."""
|
||||||
|
return "http_snapshot"
|
||||||
|
|
||||||
|
def set_frame_callback(self, callback: Callable[[str, np.ndarray], None]):
|
||||||
|
"""Set callback function to handle captured frames."""
|
||||||
|
self.frame_callback = callback
|
||||||
|
|
||||||
|
def start(self):
|
||||||
|
"""Start the snapshot reader thread."""
|
||||||
|
if self.thread and self.thread.is_alive():
|
||||||
|
logger.warning(f"Snapshot reader for {self.camera_id} already running")
|
||||||
|
return
|
||||||
|
|
||||||
|
self.stop_event.clear()
|
||||||
|
self.thread = threading.Thread(target=self._read_snapshots, daemon=True)
|
||||||
|
self.thread.start()
|
||||||
|
|
||||||
|
# Register with health monitoring
|
||||||
|
stream_health_tracker.register_stream(self.camera_id, "http_snapshot", self.snapshot_url)
|
||||||
|
thread_health_monitor.register_thread(self.thread, self._heartbeat_callback)
|
||||||
|
|
||||||
|
logger.info(f"Started snapshot reader for camera {self.camera_id} with health monitoring")
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
"""Stop the snapshot reader thread."""
|
||||||
|
self.stop_event.set()
|
||||||
|
|
||||||
|
# Unregister from health monitoring
|
||||||
|
if self.thread:
|
||||||
|
thread_health_monitor.unregister_thread(self.thread.ident)
|
||||||
|
self.thread.join(timeout=5.0)
|
||||||
|
|
||||||
|
stream_health_tracker.unregister_stream(self.camera_id)
|
||||||
|
|
||||||
|
logger.info(f"Stopped snapshot reader for camera {self.camera_id}")
|
||||||
|
|
||||||
|
def _read_snapshots(self):
|
||||||
|
"""Main snapshot reading loop for high quality 2K images."""
|
||||||
|
retries = 0
|
||||||
|
frame_count = 0
|
||||||
|
last_log_time = time.time()
|
||||||
|
last_connection_test = time.time()
|
||||||
|
interval_seconds = self.interval_ms / 1000.0
|
||||||
|
|
||||||
|
logger.info(f"Snapshot interval for camera {self.camera_id}: {interval_seconds}s")
|
||||||
|
|
||||||
|
while not self.stop_event.is_set():
|
||||||
|
try:
|
||||||
|
# Send heartbeat for thread health monitoring
|
||||||
|
self._send_heartbeat("fetching_snapshot")
|
||||||
|
|
||||||
|
start_time = time.time()
|
||||||
|
frame = self._fetch_snapshot()
|
||||||
|
|
||||||
|
if frame is None:
|
||||||
|
retries += 1
|
||||||
|
self.consecutive_errors += 1
|
||||||
|
|
||||||
|
# Report error to health monitoring
|
||||||
|
stream_health_tracker.report_error(
|
||||||
|
self.camera_id,
|
||||||
|
f"Failed to fetch snapshot (retry {retries}/{self.max_retries})"
|
||||||
|
)
|
||||||
|
|
||||||
|
logger.warning(f"Failed to fetch snapshot for camera {self.camera_id}, retry {retries}/{self.max_retries}")
|
||||||
|
|
||||||
|
if self.max_retries != -1 and retries > self.max_retries:
|
||||||
|
logger.error(f"Max retries reached for snapshot camera {self.camera_id}")
|
||||||
|
break
|
||||||
|
|
||||||
|
time.sleep(min(2.0, interval_seconds))
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Accept any valid image dimensions - don't force specific resolution
|
||||||
|
if frame.shape[1] <= 0 or frame.shape[0] <= 0:
|
||||||
|
logger.warning(f"Camera {self.camera_id}: Invalid frame dimensions {frame.shape[1]}x{frame.shape[0]}")
|
||||||
|
stream_health_tracker.report_error(
|
||||||
|
self.camera_id,
|
||||||
|
f"Invalid frame dimensions: {frame.shape[1]}x{frame.shape[0]}"
|
||||||
|
)
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Reset retry counter on successful fetch
|
||||||
|
retries = 0
|
||||||
|
self.consecutive_errors = 0
|
||||||
|
frame_count += 1
|
||||||
|
|
||||||
|
# Report successful frame to health monitoring
|
||||||
|
frame_size = frame.nbytes
|
||||||
|
stream_health_tracker.report_frame_received(self.camera_id, frame_size)
|
||||||
|
|
||||||
|
# Call frame callback
|
||||||
|
if self.frame_callback:
|
||||||
|
try:
|
||||||
|
self.frame_callback(self.camera_id, frame)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Camera {self.camera_id}: Frame callback error: {e}")
|
||||||
|
stream_health_tracker.report_error(self.camera_id, f"Frame callback error: {e}")
|
||||||
|
|
||||||
|
# Periodic connection health test
|
||||||
|
current_time = time.time()
|
||||||
|
if current_time - last_connection_test >= self.connection_test_interval:
|
||||||
|
self._test_connection_health()
|
||||||
|
last_connection_test = current_time
|
||||||
|
|
||||||
|
# Log progress every 30 seconds
|
||||||
|
if current_time - last_log_time >= 30:
|
||||||
|
logger.info(f"Camera {self.camera_id}: {frame_count} snapshots processed")
|
||||||
|
last_log_time = current_time
|
||||||
|
|
||||||
|
# Wait for next interval
|
||||||
|
elapsed = time.time() - start_time
|
||||||
|
sleep_time = max(0, interval_seconds - elapsed)
|
||||||
|
if sleep_time > 0:
|
||||||
|
self.stop_event.wait(sleep_time)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error in snapshot loop for camera {self.camera_id}: {e}")
|
||||||
|
stream_health_tracker.report_error(self.camera_id, f"Snapshot loop error: {e}")
|
||||||
|
retries += 1
|
||||||
|
if self.max_retries != -1 and retries > self.max_retries:
|
||||||
|
break
|
||||||
|
time.sleep(min(2.0, interval_seconds))
|
||||||
|
|
||||||
|
logger.info(f"Snapshot reader thread ended for camera {self.camera_id}")
|
||||||
|
|
||||||
|
def _fetch_snapshot(self) -> Optional[np.ndarray]:
|
||||||
|
"""Fetch a single high quality snapshot from HTTP URL."""
|
||||||
|
try:
|
||||||
|
# Parse URL for authentication
|
||||||
|
from urllib.parse import urlparse
|
||||||
|
parsed_url = urlparse(self.snapshot_url)
|
||||||
|
|
||||||
|
headers = {
|
||||||
|
'User-Agent': 'Python-Detector-Worker/1.0',
|
||||||
|
'Accept': 'image/jpeg, image/png, image/*'
|
||||||
|
}
|
||||||
|
auth = None
|
||||||
|
|
||||||
|
if parsed_url.username and parsed_url.password:
|
||||||
|
from requests.auth import HTTPBasicAuth, HTTPDigestAuth
|
||||||
|
auth = HTTPBasicAuth(parsed_url.username, parsed_url.password)
|
||||||
|
|
||||||
|
# Reconstruct URL without credentials
|
||||||
|
clean_url = f"{parsed_url.scheme}://{parsed_url.hostname}"
|
||||||
|
if parsed_url.port:
|
||||||
|
clean_url += f":{parsed_url.port}"
|
||||||
|
clean_url += parsed_url.path
|
||||||
|
if parsed_url.query:
|
||||||
|
clean_url += f"?{parsed_url.query}"
|
||||||
|
|
||||||
|
# Try Basic Auth first
|
||||||
|
response = requests.get(clean_url, auth=auth, timeout=15, headers=headers,
|
||||||
|
stream=True, verify=False)
|
||||||
|
|
||||||
|
# If Basic Auth fails, try Digest Auth
|
||||||
|
if response.status_code == 401:
|
||||||
|
auth = HTTPDigestAuth(parsed_url.username, parsed_url.password)
|
||||||
|
response = requests.get(clean_url, auth=auth, timeout=15, headers=headers,
|
||||||
|
stream=True, verify=False)
|
||||||
|
else:
|
||||||
|
response = requests.get(self.snapshot_url, timeout=15, headers=headers,
|
||||||
|
stream=True, verify=False)
|
||||||
|
|
||||||
|
if response.status_code == 200:
|
||||||
|
# Check content size
|
||||||
|
content_length = int(response.headers.get('content-length', 0))
|
||||||
|
if content_length > self.max_file_size:
|
||||||
|
logger.warning(f"Snapshot too large for camera {self.camera_id}: {content_length} bytes")
|
||||||
|
return None
|
||||||
|
|
||||||
|
# Read content
|
||||||
|
content = response.content
|
||||||
|
|
||||||
|
# Convert to numpy array
|
||||||
|
image_array = np.frombuffer(content, np.uint8)
|
||||||
|
|
||||||
|
# Decode as high quality image
|
||||||
|
frame = cv2.imdecode(image_array, cv2.IMREAD_COLOR)
|
||||||
|
|
||||||
|
if frame is None:
|
||||||
|
logger.error(f"Failed to decode snapshot for camera {self.camera_id}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
logger.debug(f"Fetched snapshot for camera {self.camera_id}: {frame.shape[1]}x{frame.shape[0]}")
|
||||||
|
return frame
|
||||||
|
else:
|
||||||
|
logger.warning(f"HTTP {response.status_code} from {self.camera_id}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
except requests.RequestException as e:
|
||||||
|
logger.error(f"Request error fetching snapshot for {self.camera_id}: {e}")
|
||||||
|
return None
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error decoding snapshot for {self.camera_id}: {e}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
def fetch_single_snapshot(self) -> Optional[np.ndarray]:
|
||||||
|
"""
|
||||||
|
Fetch a single high-quality snapshot on demand for pipeline processing.
|
||||||
|
This method is for one-time fetch from HTTP URL, not continuous streaming.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
High quality 2K snapshot frame or None if failed
|
||||||
|
"""
|
||||||
|
logger.info(f"[SNAPSHOT] Fetching snapshot for {self.camera_id} from {self.snapshot_url}")
|
||||||
|
|
||||||
|
# Try to fetch snapshot with retries
|
||||||
|
for attempt in range(self.max_retries):
|
||||||
|
frame = self._fetch_snapshot()
|
||||||
|
|
||||||
|
if frame is not None:
|
||||||
|
logger.info(f"[SNAPSHOT] Successfully fetched {frame.shape[1]}x{frame.shape[0]} snapshot for {self.camera_id}")
|
||||||
|
return frame
|
||||||
|
|
||||||
|
if attempt < self.max_retries - 1:
|
||||||
|
logger.warning(f"[SNAPSHOT] Attempt {attempt + 1}/{self.max_retries} failed for {self.camera_id}, retrying...")
|
||||||
|
time.sleep(0.5)
|
||||||
|
|
||||||
|
logger.error(f"[SNAPSHOT] Failed to fetch snapshot for {self.camera_id} after {self.max_retries} attempts")
|
||||||
|
return None
|
||||||
|
|
||||||
|
def _resize_maintain_aspect(self, frame: np.ndarray, target_width: int, target_height: int) -> np.ndarray:
|
||||||
|
"""Resize image while maintaining aspect ratio for high quality."""
|
||||||
|
h, w = frame.shape[:2]
|
||||||
|
aspect = w / h
|
||||||
|
target_aspect = target_width / target_height
|
||||||
|
|
||||||
|
if aspect > target_aspect:
|
||||||
|
# Image is wider
|
||||||
|
new_width = target_width
|
||||||
|
new_height = int(target_width / aspect)
|
||||||
|
else:
|
||||||
|
# Image is taller
|
||||||
|
new_height = target_height
|
||||||
|
new_width = int(target_height * aspect)
|
||||||
|
|
||||||
|
# Use INTER_LANCZOS4 for high quality downsampling
|
||||||
|
resized = cv2.resize(frame, (new_width, new_height), interpolation=cv2.INTER_LANCZOS4)
|
||||||
|
|
||||||
|
# Pad to target size if needed
|
||||||
|
if new_width < target_width or new_height < target_height:
|
||||||
|
top = (target_height - new_height) // 2
|
||||||
|
bottom = target_height - new_height - top
|
||||||
|
left = (target_width - new_width) // 2
|
||||||
|
right = target_width - new_width - left
|
||||||
|
resized = cv2.copyMakeBorder(resized, top, bottom, left, right, cv2.BORDER_CONSTANT, value=[0, 0, 0])
|
||||||
|
|
||||||
|
return resized
|
||||||
|
|
||||||
|
# Health monitoring methods
|
||||||
|
def _send_heartbeat(self, activity: str = "running"):
|
||||||
|
"""Send heartbeat to thread health monitor."""
|
||||||
|
self.last_heartbeat = time.time()
|
||||||
|
thread_health_monitor.heartbeat(activity=activity)
|
||||||
|
|
||||||
|
def _heartbeat_callback(self) -> bool:
|
||||||
|
"""Heartbeat callback for thread responsiveness testing."""
|
||||||
|
try:
|
||||||
|
# Check if thread is responsive by checking recent heartbeat
|
||||||
|
current_time = time.time()
|
||||||
|
age = current_time - self.last_heartbeat
|
||||||
|
|
||||||
|
# Thread is responsive if heartbeat is recent
|
||||||
|
return age < 30.0 # 30 second responsiveness threshold
|
||||||
|
|
||||||
|
except Exception:
|
||||||
|
return False
|
||||||
|
|
||||||
|
def _test_connection_health(self):
|
||||||
|
"""Test HTTP connection health."""
|
||||||
|
try:
|
||||||
|
stream_health_tracker.test_http_connection(self.camera_id, self.snapshot_url)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error testing connection health for {self.camera_id}: {e}")
|
||||||
|
|
||||||
|
def _handle_restart_recovery(self, component: str, details: Dict[str, Any]) -> bool:
|
||||||
|
"""Handle restart recovery action."""
|
||||||
|
try:
|
||||||
|
logger.info(f"Restarting HTTP snapshot reader for {self.camera_id}")
|
||||||
|
|
||||||
|
# Stop current instance
|
||||||
|
self.stop()
|
||||||
|
|
||||||
|
# Small delay
|
||||||
|
time.sleep(2.0)
|
||||||
|
|
||||||
|
# Restart
|
||||||
|
self.start()
|
||||||
|
|
||||||
|
# Report successful restart
|
||||||
|
stream_health_tracker.report_reconnect(self.camera_id, "health_recovery_restart")
|
||||||
|
|
||||||
|
return True
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Failed to restart HTTP snapshot reader for {self.camera_id}: {e}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
def _handle_reconnect_recovery(self, component: str, details: Dict[str, Any]) -> bool:
|
||||||
|
"""Handle reconnect recovery action."""
|
||||||
|
try:
|
||||||
|
logger.info(f"Reconnecting HTTP snapshot reader for {self.camera_id}")
|
||||||
|
|
||||||
|
# Test connection first
|
||||||
|
success = stream_health_tracker.test_http_connection(self.camera_id, self.snapshot_url)
|
||||||
|
|
||||||
|
if success:
|
||||||
|
# Reset error counters
|
||||||
|
self.consecutive_errors = 0
|
||||||
|
stream_health_tracker.report_reconnect(self.camera_id, "health_recovery_reconnect")
|
||||||
|
return True
|
||||||
|
else:
|
||||||
|
logger.warning(f"Connection test failed during recovery for {self.camera_id}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Failed to reconnect HTTP snapshot reader for {self.camera_id}: {e}")
|
||||||
|
return False
|
38
core/streaming/readers/utils.py
Normal file
38
core/streaming/readers/utils.py
Normal file
|
@ -0,0 +1,38 @@
|
||||||
|
"""
|
||||||
|
Utility functions for stream readers.
|
||||||
|
"""
|
||||||
|
import logging
|
||||||
|
import os
|
||||||
|
|
||||||
|
# Keep OpenCV errors visible but allow FFmpeg stderr logging
|
||||||
|
os.environ["OPENCV_LOG_LEVEL"] = "ERROR"
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
# Color codes for pretty logging
|
||||||
|
class Colors:
|
||||||
|
GREEN = '\033[92m'
|
||||||
|
YELLOW = '\033[93m'
|
||||||
|
RED = '\033[91m'
|
||||||
|
BLUE = '\033[94m'
|
||||||
|
PURPLE = '\033[95m'
|
||||||
|
CYAN = '\033[96m'
|
||||||
|
WHITE = '\033[97m'
|
||||||
|
BOLD = '\033[1m'
|
||||||
|
END = '\033[0m'
|
||||||
|
|
||||||
|
def log_success(camera_id: str, message: str):
|
||||||
|
"""Log success messages in green"""
|
||||||
|
logger.info(f"{Colors.GREEN}[{camera_id}] {message}{Colors.END}")
|
||||||
|
|
||||||
|
def log_warning(camera_id: str, message: str):
|
||||||
|
"""Log warnings in yellow"""
|
||||||
|
logger.warning(f"{Colors.YELLOW}[{camera_id}] {message}{Colors.END}")
|
||||||
|
|
||||||
|
def log_error(camera_id: str, message: str):
|
||||||
|
"""Log errors in red"""
|
||||||
|
logger.error(f"{Colors.RED}[{camera_id}] {message}{Colors.END}")
|
||||||
|
|
||||||
|
def log_info(camera_id: str, message: str):
|
||||||
|
"""Log info in cyan"""
|
||||||
|
logger.info(f"{Colors.CYAN}[{camera_id}] {message}{Colors.END}")
|
|
@ -61,6 +61,7 @@ class TrackingPipelineIntegration:
|
||||||
self.cleared_sessions: Dict[str, float] = {} # session_id -> clear_time
|
self.cleared_sessions: Dict[str, float] = {} # session_id -> clear_time
|
||||||
self.pending_vehicles: Dict[str, int] = {} # display_id -> track_id (waiting for session ID)
|
self.pending_vehicles: Dict[str, int] = {} # display_id -> track_id (waiting for session ID)
|
||||||
self.pending_processing_data: Dict[str, Dict] = {} # display_id -> processing data (waiting for session ID)
|
self.pending_processing_data: Dict[str, Dict] = {} # display_id -> processing data (waiting for session ID)
|
||||||
|
self.display_to_subscription: Dict[str, str] = {} # display_id -> subscription_id (for fallback)
|
||||||
|
|
||||||
# Additional validators for enhanced flow control
|
# Additional validators for enhanced flow control
|
||||||
self.permanently_processed: Dict[str, float] = {} # "camera_id:track_id" -> process_time (never process again)
|
self.permanently_processed: Dict[str, float] = {} # "camera_id:track_id" -> process_time (never process again)
|
||||||
|
@ -71,12 +72,17 @@ class TrackingPipelineIntegration:
|
||||||
# Thread pool for pipeline execution
|
# Thread pool for pipeline execution
|
||||||
self.executor = ThreadPoolExecutor(max_workers=2)
|
self.executor = ThreadPoolExecutor(max_workers=2)
|
||||||
|
|
||||||
|
# Min bbox filtering configuration
|
||||||
|
# TODO: Make this configurable via pipeline.json in the future
|
||||||
|
self.min_bbox_area_percentage = 3.5 # 3.5% of frame area minimum
|
||||||
|
|
||||||
# Statistics
|
# Statistics
|
||||||
self.stats = {
|
self.stats = {
|
||||||
'frames_processed': 0,
|
'frames_processed': 0,
|
||||||
'vehicles_detected': 0,
|
'vehicles_detected': 0,
|
||||||
'vehicles_validated': 0,
|
'vehicles_validated': 0,
|
||||||
'pipelines_executed': 0
|
'pipelines_executed': 0,
|
||||||
|
'frontals_filtered_small': 0 # Track filtered detections
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -202,6 +208,10 @@ class TrackingPipelineIntegration:
|
||||||
else:
|
else:
|
||||||
logger.debug(f"No tracking results or detections attribute")
|
logger.debug(f"No tracking results or detections attribute")
|
||||||
|
|
||||||
|
# Filter out small frontal detections (neighboring pumps/distant cars)
|
||||||
|
if tracking_results and hasattr(tracking_results, 'detections'):
|
||||||
|
tracking_results = self._filter_small_frontals(tracking_results, frame)
|
||||||
|
|
||||||
# Process tracking results
|
# Process tracking results
|
||||||
tracked_vehicles = self.tracker.process_detections(
|
tracked_vehicles = self.tracker.process_detections(
|
||||||
tracking_results,
|
tracking_results,
|
||||||
|
@ -210,8 +220,10 @@ class TrackingPipelineIntegration:
|
||||||
)
|
)
|
||||||
|
|
||||||
# Update last detection time for abandonment detection
|
# Update last detection time for abandonment detection
|
||||||
|
# Update when vehicles ARE detected, so when they leave, timestamp ages
|
||||||
if tracked_vehicles:
|
if tracked_vehicles:
|
||||||
self.last_detection_time[display_id] = time.time()
|
self.last_detection_time[display_id] = time.time()
|
||||||
|
logger.debug(f"Updated last_detection_time for {display_id}: {len(tracked_vehicles)} vehicles")
|
||||||
|
|
||||||
# Check for car abandonment (vehicle left after getting car_wait_staff stage)
|
# Check for car abandonment (vehicle left after getting car_wait_staff stage)
|
||||||
await self._check_car_abandonment(display_id, subscription_id)
|
await self._check_car_abandonment(display_id, subscription_id)
|
||||||
|
@ -402,28 +414,13 @@ class TrackingPipelineIntegration:
|
||||||
logger.info(f"Executing processing phase for session {session_id}, vehicle {vehicle.track_id}")
|
logger.info(f"Executing processing phase for session {session_id}, vehicle {vehicle.track_id}")
|
||||||
|
|
||||||
# Capture high-quality snapshot for pipeline processing
|
# Capture high-quality snapshot for pipeline processing
|
||||||
frame = None
|
|
||||||
if self.subscription_info and self.subscription_info.stream_config.snapshot_url:
|
|
||||||
from ..streaming.readers import HTTPSnapshotReader
|
|
||||||
|
|
||||||
logger.info(f"[PROCESSING PHASE] Fetching 2K snapshot for session {session_id}")
|
logger.info(f"[PROCESSING PHASE] Fetching 2K snapshot for session {session_id}")
|
||||||
snapshot_reader = HTTPSnapshotReader(
|
frame = self._fetch_snapshot()
|
||||||
camera_id=self.subscription_info.camera_id,
|
|
||||||
snapshot_url=self.subscription_info.stream_config.snapshot_url,
|
|
||||||
max_retries=3
|
|
||||||
)
|
|
||||||
|
|
||||||
frame = snapshot_reader.fetch_single_snapshot()
|
if frame is None:
|
||||||
|
|
||||||
if frame is not None:
|
|
||||||
logger.info(f"[PROCESSING PHASE] Successfully fetched {frame.shape[1]}x{frame.shape[0]} snapshot for pipeline")
|
|
||||||
else:
|
|
||||||
logger.warning(f"[PROCESSING PHASE] Failed to capture snapshot, falling back to RTSP frame")
|
logger.warning(f"[PROCESSING PHASE] Failed to capture snapshot, falling back to RTSP frame")
|
||||||
# Fall back to RTSP frame if snapshot fails
|
# Fall back to RTSP frame if snapshot fails
|
||||||
frame = processing_data['frame']
|
frame = processing_data['frame']
|
||||||
else:
|
|
||||||
logger.warning(f"[PROCESSING PHASE] No snapshot URL available, using RTSP frame")
|
|
||||||
frame = processing_data['frame']
|
|
||||||
|
|
||||||
# Extract detected regions from detection phase result if available
|
# Extract detected regions from detection phase result if available
|
||||||
detected_regions = detection_result.get('detected_regions', {})
|
detected_regions = detection_result.get('detected_regions', {})
|
||||||
|
@ -465,7 +462,7 @@ class TrackingPipelineIntegration:
|
||||||
self.subscription_info = subscription_info
|
self.subscription_info = subscription_info
|
||||||
logger.debug(f"Set subscription info with snapshot_url: {subscription_info.stream_config.snapshot_url if subscription_info else None}")
|
logger.debug(f"Set subscription info with snapshot_url: {subscription_info.stream_config.snapshot_url if subscription_info else None}")
|
||||||
|
|
||||||
def set_session_id(self, display_id: str, session_id: str):
|
def set_session_id(self, display_id: str, session_id: str, subscription_id: str = None):
|
||||||
"""
|
"""
|
||||||
Set session ID for a display (from backend).
|
Set session ID for a display (from backend).
|
||||||
This is called when backend sends setSessionId after receiving imageDetection.
|
This is called when backend sends setSessionId after receiving imageDetection.
|
||||||
|
@ -473,8 +470,17 @@ class TrackingPipelineIntegration:
|
||||||
Args:
|
Args:
|
||||||
display_id: Display identifier
|
display_id: Display identifier
|
||||||
session_id: Session identifier
|
session_id: Session identifier
|
||||||
|
subscription_id: Subscription identifier (displayId;cameraId) - needed for fallback
|
||||||
"""
|
"""
|
||||||
|
# Ensure session_id is always a string for consistent type handling
|
||||||
|
session_id = str(session_id) if session_id is not None else None
|
||||||
self.active_sessions[display_id] = session_id
|
self.active_sessions[display_id] = session_id
|
||||||
|
|
||||||
|
# Store subscription_id for fallback usage
|
||||||
|
if subscription_id:
|
||||||
|
self.display_to_subscription[display_id] = subscription_id
|
||||||
|
logger.info(f"Set session {session_id} for display {display_id} with subscription {subscription_id}")
|
||||||
|
else:
|
||||||
logger.info(f"Set session {session_id} for display {display_id}")
|
logger.info(f"Set session {session_id} for display {display_id}")
|
||||||
|
|
||||||
# Check if we have a pending vehicle for this display
|
# Check if we have a pending vehicle for this display
|
||||||
|
@ -516,6 +522,25 @@ class TrackingPipelineIntegration:
|
||||||
else:
|
else:
|
||||||
logger.warning(f"No pending processing data found for display {display_id} when setting session {session_id}")
|
logger.warning(f"No pending processing data found for display {display_id} when setting session {session_id}")
|
||||||
|
|
||||||
|
# FALLBACK: Execute pipeline for POS-initiated sessions
|
||||||
|
# Skip if session_id is None (no car present or car has left)
|
||||||
|
if session_id is not None:
|
||||||
|
# Use stored subscription_id instead of creating fake one
|
||||||
|
stored_subscription_id = self.display_to_subscription.get(display_id)
|
||||||
|
if stored_subscription_id:
|
||||||
|
logger.info(f"[FALLBACK] Triggering fallback pipeline for session {session_id} on display {display_id} with subscription {stored_subscription_id}")
|
||||||
|
|
||||||
|
# Trigger the fallback pipeline asynchronously with real subscription_id
|
||||||
|
asyncio.create_task(self._execute_fallback_pipeline(
|
||||||
|
display_id=display_id,
|
||||||
|
session_id=session_id,
|
||||||
|
subscription_id=stored_subscription_id
|
||||||
|
))
|
||||||
|
else:
|
||||||
|
logger.error(f"[FALLBACK] No subscription_id stored for display {display_id}, cannot execute fallback pipeline")
|
||||||
|
else:
|
||||||
|
logger.debug(f"[FALLBACK] Skipping pipeline execution for session_id=None on display {display_id}")
|
||||||
|
|
||||||
def clear_session_id(self, session_id: str):
|
def clear_session_id(self, session_id: str):
|
||||||
"""
|
"""
|
||||||
Clear session ID (post-fueling).
|
Clear session ID (post-fueling).
|
||||||
|
@ -565,6 +590,7 @@ class TrackingPipelineIntegration:
|
||||||
self.cleared_sessions.clear()
|
self.cleared_sessions.clear()
|
||||||
self.pending_vehicles.clear()
|
self.pending_vehicles.clear()
|
||||||
self.pending_processing_data.clear()
|
self.pending_processing_data.clear()
|
||||||
|
self.display_to_subscription.clear()
|
||||||
self.permanently_processed.clear()
|
self.permanently_processed.clear()
|
||||||
self.progression_stages.clear()
|
self.progression_stages.clear()
|
||||||
self.last_detection_time.clear()
|
self.last_detection_time.clear()
|
||||||
|
@ -608,10 +634,16 @@ class TrackingPipelineIntegration:
|
||||||
last_detection = self.last_detection_time.get(session_display, 0)
|
last_detection = self.last_detection_time.get(session_display, 0)
|
||||||
time_since_detection = current_time - last_detection
|
time_since_detection = current_time - last_detection
|
||||||
|
|
||||||
|
logger.info(f"[ABANDON CHECK] Session {session_id} (display: {session_display}): "
|
||||||
|
f"time_since_detection={time_since_detection:.1f}s, "
|
||||||
|
f"timeout={self.abandonment_timeout}s")
|
||||||
|
|
||||||
if time_since_detection > self.abandonment_timeout:
|
if time_since_detection > self.abandonment_timeout:
|
||||||
logger.info(f"Car abandonment detected: session {session_id}, "
|
logger.warning(f"🚨 Car abandonment detected: session {session_id}, "
|
||||||
f"no detection for {time_since_detection:.1f}s")
|
f"no detection for {time_since_detection:.1f}s")
|
||||||
abandoned_sessions.append(session_id)
|
abandoned_sessions.append(session_id)
|
||||||
|
else:
|
||||||
|
logger.debug(f"[ABANDON CHECK] Session {session_id} has no associated display")
|
||||||
|
|
||||||
# Send abandonment detection for each abandoned session
|
# Send abandonment detection for each abandoned session
|
||||||
for session_id in abandoned_sessions:
|
for session_id in abandoned_sessions:
|
||||||
|
@ -619,6 +651,7 @@ class TrackingPipelineIntegration:
|
||||||
# Remove from progression stages to avoid repeated detection
|
# Remove from progression stages to avoid repeated detection
|
||||||
if session_id in self.progression_stages:
|
if session_id in self.progression_stages:
|
||||||
del self.progression_stages[session_id]
|
del self.progression_stages[session_id]
|
||||||
|
logger.info(f"[ABANDON] Removed session {session_id} from progression_stages after notification")
|
||||||
|
|
||||||
async def _send_abandonment_detection(self, subscription_id: str, session_id: str):
|
async def _send_abandonment_detection(self, subscription_id: str, session_id: str):
|
||||||
"""
|
"""
|
||||||
|
@ -665,6 +698,153 @@ class TrackingPipelineIntegration:
|
||||||
if stage == "car_wait_staff":
|
if stage == "car_wait_staff":
|
||||||
logger.info(f"Started monitoring session {session_id} for car abandonment")
|
logger.info(f"Started monitoring session {session_id} for car abandonment")
|
||||||
|
|
||||||
|
def _fetch_snapshot(self) -> Optional[np.ndarray]:
|
||||||
|
"""
|
||||||
|
Fetch high-quality snapshot from camera's snapshot URL.
|
||||||
|
Reusable method for both processing phase and fallback pipeline.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Snapshot frame or None if unavailable
|
||||||
|
"""
|
||||||
|
if not (self.subscription_info and self.subscription_info.stream_config.snapshot_url):
|
||||||
|
logger.warning("[SNAPSHOT] No subscription info or snapshot URL available")
|
||||||
|
return None
|
||||||
|
|
||||||
|
try:
|
||||||
|
from ..streaming.readers import HTTPSnapshotReader
|
||||||
|
|
||||||
|
logger.info(f"[SNAPSHOT] Fetching snapshot for {self.subscription_info.camera_id}")
|
||||||
|
snapshot_reader = HTTPSnapshotReader(
|
||||||
|
camera_id=self.subscription_info.camera_id,
|
||||||
|
snapshot_url=self.subscription_info.stream_config.snapshot_url,
|
||||||
|
max_retries=3
|
||||||
|
)
|
||||||
|
|
||||||
|
frame = snapshot_reader.fetch_single_snapshot()
|
||||||
|
|
||||||
|
if frame is not None:
|
||||||
|
logger.info(f"[SNAPSHOT] Successfully fetched {frame.shape[1]}x{frame.shape[0]} snapshot")
|
||||||
|
return frame
|
||||||
|
else:
|
||||||
|
logger.warning("[SNAPSHOT] Failed to fetch snapshot")
|
||||||
|
return None
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"[SNAPSHOT] Error fetching snapshot: {e}", exc_info=True)
|
||||||
|
return None
|
||||||
|
|
||||||
|
async def _execute_fallback_pipeline(self, display_id: str, session_id: str, subscription_id: str):
|
||||||
|
"""
|
||||||
|
Execute fallback pipeline when sessionId is received without prior detection.
|
||||||
|
This handles POS-initiated sessions where backend starts transaction before car detection.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
display_id: Display identifier
|
||||||
|
session_id: Session ID from backend
|
||||||
|
subscription_id: Subscription identifier for pipeline execution
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
logger.info(f"[FALLBACK PIPELINE] Executing for session {session_id}, display {display_id}")
|
||||||
|
|
||||||
|
# Fetch fresh snapshot from camera
|
||||||
|
frame = self._fetch_snapshot()
|
||||||
|
|
||||||
|
if frame is None:
|
||||||
|
logger.error(f"[FALLBACK] Failed to fetch snapshot for session {session_id}, cannot execute pipeline")
|
||||||
|
return
|
||||||
|
|
||||||
|
logger.info(f"[FALLBACK] Using snapshot frame {frame.shape[1]}x{frame.shape[0]} for session {session_id}")
|
||||||
|
|
||||||
|
# Check if detection pipeline is available
|
||||||
|
if not self.detection_pipeline:
|
||||||
|
logger.error(f"[FALLBACK] Detection pipeline not available for session {session_id}")
|
||||||
|
return
|
||||||
|
|
||||||
|
# Execute detection phase to get detected regions
|
||||||
|
detection_result = await self.detection_pipeline.execute_detection_phase(
|
||||||
|
frame=frame,
|
||||||
|
display_id=display_id,
|
||||||
|
subscription_id=subscription_id
|
||||||
|
)
|
||||||
|
|
||||||
|
logger.info(f"[FALLBACK] Detection phase completed for session {session_id}: "
|
||||||
|
f"status={detection_result.get('status', 'unknown')}, "
|
||||||
|
f"regions={list(detection_result.get('detected_regions', {}).keys())}")
|
||||||
|
|
||||||
|
# If detection found regions, execute processing phase
|
||||||
|
detected_regions = detection_result.get('detected_regions', {})
|
||||||
|
if detected_regions:
|
||||||
|
processing_result = await self.detection_pipeline.execute_processing_phase(
|
||||||
|
frame=frame,
|
||||||
|
display_id=display_id,
|
||||||
|
session_id=session_id,
|
||||||
|
subscription_id=subscription_id,
|
||||||
|
detected_regions=detected_regions
|
||||||
|
)
|
||||||
|
|
||||||
|
logger.info(f"[FALLBACK] Processing phase completed for session {session_id}: "
|
||||||
|
f"status={processing_result.get('status', 'unknown')}, "
|
||||||
|
f"branches={len(processing_result.get('branch_results', {}))}, "
|
||||||
|
f"actions={len(processing_result.get('actions_executed', []))}")
|
||||||
|
|
||||||
|
# Update statistics
|
||||||
|
self.stats['pipelines_executed'] += 1
|
||||||
|
|
||||||
|
else:
|
||||||
|
logger.warning(f"[FALLBACK] No detections found in snapshot for session {session_id}")
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"[FALLBACK] Error executing fallback pipeline for session {session_id}: {e}", exc_info=True)
|
||||||
|
|
||||||
|
def _filter_small_frontals(self, tracking_results, frame):
|
||||||
|
"""
|
||||||
|
Filter out frontal detections that are smaller than minimum bbox area percentage.
|
||||||
|
This prevents processing of cars from neighboring pumps that appear in camera view.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
tracking_results: YOLO tracking results with detections
|
||||||
|
frame: Input frame for calculating frame area
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Modified tracking_results with small frontals removed
|
||||||
|
"""
|
||||||
|
if not hasattr(tracking_results, 'detections') or not tracking_results.detections:
|
||||||
|
return tracking_results
|
||||||
|
|
||||||
|
# Calculate frame area and minimum bbox area threshold
|
||||||
|
frame_area = frame.shape[0] * frame.shape[1] # height * width
|
||||||
|
min_bbox_area = frame_area * (self.min_bbox_area_percentage / 100.0)
|
||||||
|
|
||||||
|
# Filter detections
|
||||||
|
filtered_detections = []
|
||||||
|
filtered_count = 0
|
||||||
|
|
||||||
|
for detection in tracking_results.detections:
|
||||||
|
# Calculate detection bbox area
|
||||||
|
bbox = detection.bbox # Assuming bbox is [x1, y1, x2, y2]
|
||||||
|
bbox_area = (bbox[2] - bbox[0]) * (bbox[3] - bbox[1])
|
||||||
|
|
||||||
|
if bbox_area >= min_bbox_area:
|
||||||
|
# Keep detection - bbox is large enough
|
||||||
|
filtered_detections.append(detection)
|
||||||
|
else:
|
||||||
|
# Filter out small detection
|
||||||
|
filtered_count += 1
|
||||||
|
area_percentage = (bbox_area / frame_area) * 100
|
||||||
|
logger.debug(f"Filtered small frontal: area={bbox_area:.0f}px² ({area_percentage:.1f}% of frame, "
|
||||||
|
f"min required: {self.min_bbox_area_percentage}%)")
|
||||||
|
|
||||||
|
# Update tracking results with filtered detections
|
||||||
|
tracking_results.detections = filtered_detections
|
||||||
|
|
||||||
|
# Update statistics
|
||||||
|
if filtered_count > 0:
|
||||||
|
self.stats['frontals_filtered_small'] += filtered_count
|
||||||
|
logger.info(f"Filtered {filtered_count} small frontal detections, "
|
||||||
|
f"{len(filtered_detections)} remaining (total filtered: {self.stats['frontals_filtered_small']})")
|
||||||
|
|
||||||
|
return tracking_results
|
||||||
|
|
||||||
def cleanup(self):
|
def cleanup(self):
|
||||||
"""Cleanup resources."""
|
"""Cleanup resources."""
|
||||||
self.executor.shutdown(wait=False)
|
self.executor.shutdown(wait=False)
|
||||||
|
|
|
@ -8,3 +8,5 @@ psycopg2-binary
|
||||||
lap>=0.5.12
|
lap>=0.5.12
|
||||||
pynvml
|
pynvml
|
||||||
PyTurboJPEG
|
PyTurboJPEG
|
||||||
|
PyNvVideoCodec
|
||||||
|
cupy-cuda12x
|
Loading…
Add table
Add a link
Reference in a new issue