Add HTTP endpoint to retrieve saved session images by session ID. Images are saved during car_fueling progression stage. - Add GET /session-image/{session_id} endpoint - Search images directory for files matching session ID pattern - Return most recent image if multiple exist - Proper error handling (404 for not found, 500 for errors) 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
576 lines
No EOL
21 KiB
Python
576 lines
No EOL
21 KiB
Python
"""
|
|
Detector Worker - Main FastAPI Application
|
|
Refactored modular architecture for computer vision pipeline processing.
|
|
"""
|
|
import json
|
|
import logging
|
|
import os
|
|
import time
|
|
import cv2
|
|
from contextlib import asynccontextmanager
|
|
from typing import Dict, Any
|
|
from fastapi import FastAPI, WebSocket, HTTPException
|
|
from fastapi.responses import Response
|
|
|
|
# Import new modular communication system
|
|
from core.communication.websocket import websocket_endpoint
|
|
from core.communication.state import worker_state
|
|
|
|
# Configure logging
|
|
logging.basicConfig(
|
|
level=logging.DEBUG,
|
|
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
|
|
handlers=[
|
|
logging.FileHandler("detector_worker.log"),
|
|
logging.StreamHandler()
|
|
]
|
|
)
|
|
|
|
logger = logging.getLogger("detector_worker")
|
|
logger.setLevel(logging.DEBUG)
|
|
|
|
# Frames are now stored in the shared cache buffer from core.streaming.buffers
|
|
# latest_frames = {} # Deprecated - using shared_cache_buffer instead
|
|
|
|
|
|
# Health monitoring recovery handlers
|
|
def _handle_stream_restart_recovery(component: str, details: Dict[str, Any]) -> bool:
|
|
"""Handle stream restart recovery at the application level."""
|
|
try:
|
|
from core.streaming.manager import shared_stream_manager
|
|
|
|
# Extract camera ID from component name (e.g., "stream_cam-001" -> "cam-001")
|
|
if component.startswith("stream_"):
|
|
camera_id = component[7:] # Remove "stream_" prefix
|
|
else:
|
|
camera_id = component
|
|
|
|
logger.info(f"Attempting stream restart recovery for {camera_id}")
|
|
|
|
# Find and restart the subscription
|
|
subscriptions = shared_stream_manager.get_all_subscriptions()
|
|
for sub_info in subscriptions:
|
|
if sub_info.camera_id == camera_id:
|
|
# Remove and re-add the subscription
|
|
shared_stream_manager.remove_subscription(sub_info.subscription_id)
|
|
time.sleep(1.0) # Brief delay
|
|
|
|
# Re-add subscription
|
|
success = shared_stream_manager.add_subscription(
|
|
sub_info.subscription_id,
|
|
sub_info.stream_config,
|
|
sub_info.crop_coords,
|
|
sub_info.model_id,
|
|
sub_info.model_url,
|
|
sub_info.tracking_integration
|
|
)
|
|
|
|
if success:
|
|
logger.info(f"Stream restart recovery successful for {camera_id}")
|
|
return True
|
|
else:
|
|
logger.error(f"Stream restart recovery failed for {camera_id}")
|
|
return False
|
|
|
|
logger.warning(f"No subscription found for camera {camera_id} during recovery")
|
|
return False
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in stream restart recovery for {component}: {e}")
|
|
return False
|
|
|
|
|
|
def _handle_stream_reconnect_recovery(component: str, details: Dict[str, Any]) -> bool:
|
|
"""Handle stream reconnect recovery at the application level."""
|
|
try:
|
|
from core.streaming.manager import shared_stream_manager
|
|
|
|
# Extract camera ID from component name
|
|
if component.startswith("stream_"):
|
|
camera_id = component[7:]
|
|
else:
|
|
camera_id = component
|
|
|
|
logger.info(f"Attempting stream reconnect recovery for {camera_id}")
|
|
|
|
# For reconnect, we just need to trigger the stream's internal reconnect
|
|
# The stream readers handle their own reconnection logic
|
|
active_cameras = shared_stream_manager.get_active_cameras()
|
|
|
|
if camera_id in active_cameras:
|
|
logger.info(f"Stream reconnect recovery triggered for {camera_id}")
|
|
return True
|
|
else:
|
|
logger.warning(f"Camera {camera_id} not found in active cameras during reconnect recovery")
|
|
return False
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in stream reconnect recovery for {component}: {e}")
|
|
return False
|
|
|
|
# Lifespan event handler (modern FastAPI approach)
|
|
@asynccontextmanager
|
|
async def lifespan(app: FastAPI):
|
|
"""Application lifespan management."""
|
|
# Startup
|
|
logger.info("Detector Worker started successfully")
|
|
|
|
# Initialize health monitoring system
|
|
try:
|
|
from core.monitoring.health import health_monitor
|
|
from core.monitoring.stream_health import stream_health_tracker
|
|
from core.monitoring.thread_health import thread_health_monitor
|
|
from core.monitoring.recovery import recovery_manager
|
|
|
|
# Start health monitoring
|
|
health_monitor.start()
|
|
logger.info("Health monitoring system started")
|
|
|
|
# Register recovery handlers for stream management
|
|
from core.streaming.manager import shared_stream_manager
|
|
recovery_manager.register_recovery_handler(
|
|
"restart_stream",
|
|
_handle_stream_restart_recovery
|
|
)
|
|
recovery_manager.register_recovery_handler(
|
|
"reconnect",
|
|
_handle_stream_reconnect_recovery
|
|
)
|
|
|
|
logger.info("Recovery handlers registered")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to initialize health monitoring: {e}")
|
|
|
|
logger.info("WebSocket endpoint available at: ws://0.0.0.0:8001/")
|
|
logger.info("HTTP camera endpoint available at: http://0.0.0.0:8001/camera/{camera_id}/image")
|
|
logger.info("Health check available at: http://0.0.0.0:8001/health")
|
|
logger.info("Detailed health monitoring available at: http://0.0.0.0:8001/health/detailed")
|
|
logger.info("Ready and waiting for backend WebSocket connections")
|
|
|
|
yield
|
|
|
|
# Shutdown
|
|
logger.info("Detector Worker shutting down...")
|
|
|
|
# Stop health monitoring
|
|
try:
|
|
from core.monitoring.health import health_monitor
|
|
health_monitor.stop()
|
|
logger.info("Health monitoring system stopped")
|
|
except Exception as e:
|
|
logger.error(f"Error stopping health monitoring: {e}")
|
|
|
|
# Clear all state
|
|
worker_state.set_subscriptions([])
|
|
worker_state.session_ids.clear()
|
|
worker_state.progression_stages.clear()
|
|
# latest_frames.clear() # No longer needed - frames are in shared_cache_buffer
|
|
logger.info("Detector Worker shutdown complete")
|
|
|
|
# Create FastAPI application with detailed WebSocket logging
|
|
app = FastAPI(title="Detector Worker", version="2.0.0", lifespan=lifespan)
|
|
|
|
# Add middleware to log all requests
|
|
@app.middleware("http")
|
|
async def log_requests(request, call_next):
|
|
start_time = time.time()
|
|
response = await call_next(request)
|
|
process_time = time.time() - start_time
|
|
logger.debug(f"HTTP {request.method} {request.url} - {response.status_code} ({process_time:.3f}s)")
|
|
return response
|
|
|
|
# Load configuration
|
|
config_path = "config.json"
|
|
if os.path.exists(config_path):
|
|
with open(config_path, "r") as f:
|
|
config = json.load(f)
|
|
logger.info(f"Loaded configuration from {config_path}")
|
|
else:
|
|
# Default configuration
|
|
config = {
|
|
"poll_interval_ms": 100,
|
|
"reconnect_interval_sec": 5,
|
|
"target_fps": 10,
|
|
"max_streams": 20,
|
|
"max_retries": 3
|
|
}
|
|
logger.warning(f"Configuration file {config_path} not found, using defaults")
|
|
|
|
# Ensure models directory exists
|
|
os.makedirs("models", exist_ok=True)
|
|
logger.info("Ensured models directory exists")
|
|
|
|
# Stream manager already initialized at module level with max_streams=20
|
|
# Calling initialize_stream_manager() creates a NEW instance, breaking references
|
|
# from core.streaming import initialize_stream_manager
|
|
# initialize_stream_manager(max_streams=config.get('max_streams', 10))
|
|
logger.info(f"Using stream manager with max_streams=20 (module-level initialization)")
|
|
|
|
# Frames are now stored in the shared cache buffer from core.streaming.buffers
|
|
# latest_frames = {} # Deprecated - using shared_cache_buffer instead
|
|
|
|
logger.info("Starting detector worker application (refactored)")
|
|
logger.info(f"Configuration: Target FPS: {config.get('target_fps', 10)}, "
|
|
f"Max streams: {config.get('max_streams', 5)}, "
|
|
f"Max retries: {config.get('max_retries', 3)}")
|
|
|
|
|
|
@app.websocket("/")
|
|
async def websocket_handler(websocket: WebSocket):
|
|
"""
|
|
Main WebSocket endpoint for backend communication.
|
|
Handles all protocol messages according to worker.md specification.
|
|
"""
|
|
client_info = f"{websocket.client.host}:{websocket.client.port}" if websocket.client else "unknown"
|
|
logger.info(f"[RX ← Backend] New WebSocket connection request from {client_info}")
|
|
|
|
try:
|
|
await websocket_endpoint(websocket)
|
|
except Exception as e:
|
|
logger.error(f"WebSocket handler error for {client_info}: {e}", exc_info=True)
|
|
|
|
|
|
@app.get("/camera/{camera_id}/image")
|
|
async def get_camera_image(camera_id: str):
|
|
"""
|
|
HTTP endpoint to retrieve the latest frame from a camera as JPEG image.
|
|
|
|
This endpoint is preserved for backward compatibility with existing systems.
|
|
|
|
Args:
|
|
camera_id: The subscription identifier (e.g., "display-001;cam-001")
|
|
|
|
Returns:
|
|
JPEG image as binary response
|
|
|
|
Raises:
|
|
HTTPException: 404 if camera not found or no frame available
|
|
HTTPException: 500 if encoding fails
|
|
"""
|
|
try:
|
|
from urllib.parse import unquote
|
|
|
|
# URL decode the camera_id to handle encoded characters
|
|
original_camera_id = camera_id
|
|
camera_id = unquote(camera_id)
|
|
logger.debug(f"REST API request: original='{original_camera_id}', decoded='{camera_id}'")
|
|
|
|
# Check if camera is in active subscriptions
|
|
subscription = worker_state.get_subscription(camera_id)
|
|
if not subscription:
|
|
logger.warning(f"Camera ID '{camera_id}' not found in active subscriptions")
|
|
available_cameras = list(worker_state.subscriptions.keys())
|
|
logger.debug(f"Available cameras: {available_cameras}")
|
|
raise HTTPException(
|
|
status_code=404,
|
|
detail=f"Camera {camera_id} not found or not active"
|
|
)
|
|
|
|
# Extract actual camera_id from subscription identifier (displayId;cameraId)
|
|
# Frames are stored using just the camera_id part
|
|
actual_camera_id = camera_id.split(';')[-1] if ';' in camera_id else camera_id
|
|
|
|
# Get frame from the shared cache buffer
|
|
from core.streaming.buffers import shared_cache_buffer
|
|
|
|
# Only show buffer debug info if camera not found (to reduce log spam)
|
|
available_cameras = shared_cache_buffer.frame_buffer.get_camera_list()
|
|
|
|
frame = shared_cache_buffer.get_frame(actual_camera_id)
|
|
if frame is None:
|
|
logger.warning(f"\033[93m[API] No frame for '{actual_camera_id}' - Available: {available_cameras}\033[0m")
|
|
raise HTTPException(
|
|
status_code=404,
|
|
detail=f"No frame available for camera {actual_camera_id}"
|
|
)
|
|
|
|
# Successful frame retrieval - log only occasionally to avoid spam
|
|
|
|
# Encode frame as JPEG
|
|
success, buffer_img = cv2.imencode('.jpg', frame, [cv2.IMWRITE_JPEG_QUALITY, 85])
|
|
if not success:
|
|
raise HTTPException(status_code=500, detail="Failed to encode image as JPEG")
|
|
|
|
# Return image as binary response
|
|
return Response(content=buffer_img.tobytes(), media_type="image/jpeg")
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error retrieving image for camera {camera_id}: {str(e)}", exc_info=True)
|
|
raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")
|
|
|
|
|
|
@app.get("/session-image/{session_id}")
|
|
async def get_session_image(session_id: int):
|
|
"""
|
|
HTTP endpoint to retrieve the saved session image by session ID.
|
|
|
|
Args:
|
|
session_id: The session ID to retrieve the image for
|
|
|
|
Returns:
|
|
JPEG image as binary response
|
|
|
|
Raises:
|
|
HTTPException: 404 if no image found for the session
|
|
HTTPException: 500 if reading image fails
|
|
"""
|
|
try:
|
|
from pathlib import Path
|
|
import glob
|
|
|
|
# Images directory
|
|
images_dir = Path("images")
|
|
|
|
if not images_dir.exists():
|
|
logger.warning(f"Images directory does not exist")
|
|
raise HTTPException(
|
|
status_code=404,
|
|
detail=f"No images directory found"
|
|
)
|
|
|
|
# Search for files matching session ID pattern: {session_id}_*
|
|
pattern = str(images_dir / f"{session_id}_*.jpg")
|
|
matching_files = glob.glob(pattern)
|
|
|
|
if not matching_files:
|
|
logger.warning(f"No image found for session {session_id}")
|
|
raise HTTPException(
|
|
status_code=404,
|
|
detail=f"No image found for session {session_id}"
|
|
)
|
|
|
|
# Get the most recent file if multiple exist
|
|
most_recent_file = max(matching_files, key=os.path.getmtime)
|
|
logger.info(f"Found session image for session {session_id}: {most_recent_file}")
|
|
|
|
# Read the image file
|
|
image_data = open(most_recent_file, 'rb').read()
|
|
|
|
# Return image as binary response
|
|
return Response(content=image_data, media_type="image/jpeg")
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error retrieving session image for session {session_id}: {str(e)}", exc_info=True)
|
|
raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")
|
|
|
|
|
|
@app.get("/health")
|
|
async def health_check():
|
|
"""Health check endpoint for monitoring."""
|
|
return {
|
|
"status": "healthy",
|
|
"version": "2.0.0",
|
|
"active_subscriptions": len(worker_state.subscriptions),
|
|
"active_sessions": len(worker_state.session_ids)
|
|
}
|
|
|
|
|
|
@app.get("/health/detailed")
|
|
async def detailed_health_check():
|
|
"""Comprehensive health status with detailed monitoring data."""
|
|
try:
|
|
from core.monitoring.health import health_monitor
|
|
from core.monitoring.stream_health import stream_health_tracker
|
|
from core.monitoring.thread_health import thread_health_monitor
|
|
from core.monitoring.recovery import recovery_manager
|
|
|
|
# Get comprehensive health status
|
|
overall_health = health_monitor.get_health_status()
|
|
stream_metrics = stream_health_tracker.get_all_metrics()
|
|
thread_info = thread_health_monitor.get_all_thread_info()
|
|
recovery_stats = recovery_manager.get_recovery_stats()
|
|
|
|
return {
|
|
"timestamp": time.time(),
|
|
"overall_health": overall_health,
|
|
"stream_metrics": stream_metrics,
|
|
"thread_health": thread_info,
|
|
"recovery_stats": recovery_stats,
|
|
"system_info": {
|
|
"active_subscriptions": len(worker_state.subscriptions),
|
|
"active_sessions": len(worker_state.session_ids),
|
|
"version": "2.0.0"
|
|
}
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error generating detailed health report: {e}")
|
|
raise HTTPException(status_code=500, detail=f"Health monitoring error: {str(e)}")
|
|
|
|
|
|
@app.get("/health/streams")
|
|
async def stream_health_status():
|
|
"""Stream-specific health monitoring."""
|
|
try:
|
|
from core.monitoring.stream_health import stream_health_tracker
|
|
from core.streaming.buffers import shared_cache_buffer
|
|
|
|
stream_metrics = stream_health_tracker.get_all_metrics()
|
|
buffer_stats = shared_cache_buffer.get_stats()
|
|
|
|
return {
|
|
"timestamp": time.time(),
|
|
"stream_count": len(stream_metrics),
|
|
"stream_metrics": stream_metrics,
|
|
"buffer_stats": buffer_stats,
|
|
"frame_ages": {
|
|
camera_id: {
|
|
"age_seconds": time.time() - info["last_frame_time"] if info and info.get("last_frame_time") else None,
|
|
"total_frames": info.get("frame_count", 0) if info else 0
|
|
}
|
|
for camera_id, info in stream_metrics.items()
|
|
}
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error generating stream health report: {e}")
|
|
raise HTTPException(status_code=500, detail=f"Stream health error: {str(e)}")
|
|
|
|
|
|
@app.get("/health/threads")
|
|
async def thread_health_status():
|
|
"""Thread-specific health monitoring."""
|
|
try:
|
|
from core.monitoring.thread_health import thread_health_monitor
|
|
|
|
thread_info = thread_health_monitor.get_all_thread_info()
|
|
deadlocks = thread_health_monitor.detect_deadlocks()
|
|
|
|
return {
|
|
"timestamp": time.time(),
|
|
"thread_count": len(thread_info),
|
|
"thread_info": thread_info,
|
|
"potential_deadlocks": deadlocks,
|
|
"summary": {
|
|
"responsive_threads": sum(1 for info in thread_info.values() if info.get("is_responsive", False)),
|
|
"unresponsive_threads": sum(1 for info in thread_info.values() if not info.get("is_responsive", True)),
|
|
"deadlock_count": len(deadlocks)
|
|
}
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error generating thread health report: {e}")
|
|
raise HTTPException(status_code=500, detail=f"Thread health error: {str(e)}")
|
|
|
|
|
|
@app.get("/health/recovery")
|
|
async def recovery_status():
|
|
"""Recovery system status and history."""
|
|
try:
|
|
from core.monitoring.recovery import recovery_manager
|
|
|
|
recovery_stats = recovery_manager.get_recovery_stats()
|
|
|
|
return {
|
|
"timestamp": time.time(),
|
|
"recovery_stats": recovery_stats,
|
|
"summary": {
|
|
"total_recoveries_last_hour": recovery_stats.get("total_recoveries_last_hour", 0),
|
|
"components_with_recovery_state": len(recovery_stats.get("recovery_states", {})),
|
|
"total_recovery_failures": sum(
|
|
state.get("failure_count", 0)
|
|
for state in recovery_stats.get("recovery_states", {}).values()
|
|
),
|
|
"total_recovery_successes": sum(
|
|
state.get("success_count", 0)
|
|
for state in recovery_stats.get("recovery_states", {}).values()
|
|
)
|
|
}
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error generating recovery status report: {e}")
|
|
raise HTTPException(status_code=500, detail=f"Recovery status error: {str(e)}")
|
|
|
|
|
|
@app.post("/health/recovery/force/{component}")
|
|
async def force_recovery(component: str, action: str = "restart_stream"):
|
|
"""Force recovery action for a specific component."""
|
|
try:
|
|
from core.monitoring.recovery import recovery_manager, RecoveryAction
|
|
|
|
# Validate action
|
|
try:
|
|
recovery_action = RecoveryAction(action)
|
|
except ValueError:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"Invalid recovery action: {action}. Valid actions: {[a.value for a in RecoveryAction]}"
|
|
)
|
|
|
|
# Force recovery
|
|
success = recovery_manager.force_recovery(component, recovery_action, "manual_api_request")
|
|
|
|
return {
|
|
"timestamp": time.time(),
|
|
"component": component,
|
|
"action": action,
|
|
"success": success,
|
|
"message": f"Recovery {'successful' if success else 'failed'} for component {component}"
|
|
}
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error forcing recovery for {component}: {e}")
|
|
raise HTTPException(status_code=500, detail=f"Recovery error: {str(e)}")
|
|
|
|
|
|
@app.get("/health/metrics")
|
|
async def health_metrics():
|
|
"""Performance and health metrics in a format suitable for monitoring systems."""
|
|
try:
|
|
from core.monitoring.health import health_monitor
|
|
from core.monitoring.stream_health import stream_health_tracker
|
|
from core.streaming.buffers import shared_cache_buffer
|
|
|
|
# Get basic metrics
|
|
overall_health = health_monitor.get_health_status()
|
|
stream_metrics = stream_health_tracker.get_all_metrics()
|
|
buffer_stats = shared_cache_buffer.get_stats()
|
|
|
|
# Format for monitoring systems (Prometheus-style)
|
|
metrics = {
|
|
"detector_worker_up": 1,
|
|
"detector_worker_streams_total": len(stream_metrics),
|
|
"detector_worker_subscriptions_total": len(worker_state.subscriptions),
|
|
"detector_worker_sessions_total": len(worker_state.session_ids),
|
|
"detector_worker_memory_mb": buffer_stats.get("total_memory_mb", 0),
|
|
"detector_worker_health_status": {
|
|
"healthy": 1,
|
|
"warning": 2,
|
|
"critical": 3,
|
|
"unknown": 4
|
|
}.get(overall_health.get("overall_status", "unknown"), 4)
|
|
}
|
|
|
|
# Add per-stream metrics
|
|
for camera_id, stream_info in stream_metrics.items():
|
|
safe_camera_id = camera_id.replace("-", "_").replace(".", "_")
|
|
metrics.update({
|
|
f"detector_worker_stream_frames_total{{camera=\"{safe_camera_id}\"}}": stream_info.get("frame_count", 0),
|
|
f"detector_worker_stream_errors_total{{camera=\"{safe_camera_id}\"}}": stream_info.get("error_count", 0),
|
|
f"detector_worker_stream_fps{{camera=\"{safe_camera_id}\"}}": stream_info.get("frames_per_second", 0),
|
|
f"detector_worker_stream_frame_age_seconds{{camera=\"{safe_camera_id}\"}}": stream_info.get("last_frame_age_seconds") or 0
|
|
})
|
|
|
|
return {
|
|
"timestamp": time.time(),
|
|
"metrics": metrics
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error generating health metrics: {e}")
|
|
raise HTTPException(status_code=500, detail=f"Metrics error: {str(e)}")
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
import uvicorn
|
|
uvicorn.run(app, host="0.0.0.0", port=8001) |