python-detector-worker/app.py
2025-09-13 01:00:49 +07:00

284 lines
No EOL
9.1 KiB
Python

"""
Refactored FastAPI application using the new modular architecture.
This replaces the monolithic app.py with a clean, maintainable structure
using dependency injection and singleton managers.
"""
import logging
import os
from contextlib import asynccontextmanager
from fastapi import FastAPI, WebSocket, HTTPException
from fastapi.responses import Response
from detector_worker.core.config import get_config_manager, validate_config
from detector_worker.core.dependency_injection import get_container
from detector_worker.core.singleton_managers import (
ModelStateManager, StreamStateManager, SessionStateManager,
CacheStateManager, CameraStateManager, PipelineStateManager
)
from detector_worker.communication.websocket_handler import WebSocketHandler
from detector_worker.utils.system_monitor import get_system_metrics
from detector_worker.utils.error_handler import ErrorHandler, create_logger
from detector_worker.utils.websocket_debug import (
setup_websocket_logging, enable_websocket_debug, disable_websocket_debug,
get_websocket_debug_status
)
# Setup logging
logger = create_logger("detector_worker.main", logging.INFO)
# Global state managers (singleton instances)
model_manager = ModelStateManager()
stream_manager = StreamStateManager()
session_manager = SessionStateManager()
cache_manager = CacheStateManager()
camera_manager = CameraStateManager()
pipeline_manager = PipelineStateManager()
# Dependency injection container
container = get_container()
# Error handler
error_handler = ErrorHandler("main_app")
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Application lifespan management for startup and shutdown."""
# Startup
try:
logger.info("Starting Detector Worker v2.0.0...")
# Validate configuration
config_manager = get_config_manager()
errors = validate_config()
if errors:
logger.error(f"Configuration validation failed: {errors}")
raise RuntimeError(f"Invalid configuration: {', '.join(errors)}")
logger.info("Configuration validation passed")
# Setup WebSocket logging based on configuration
setup_websocket_logging()
# Log startup information
config = config_manager.get_all()
logger.info(f"Max streams: {config.get('max_streams', 5)}")
logger.info(f"Target FPS: {config.get('target_fps', 10)}")
# Initialize dependency injection container
container_stats = container.get_container().get_stats()
logger.info(f"Dependency container initialized: {container_stats}")
logger.info("Detector Worker startup complete")
except Exception as e:
logger.critical(f"Startup failed: {e}")
raise
# Yield control to the application
yield
# Shutdown
try:
logger.info("Shutting down Detector Worker...")
# Clear all state managers
model_manager.clear_all()
stream_manager.clear_all()
session_manager.clear_all()
cache_manager.clear_all()
camera_manager.clear_all()
pipeline_manager.clear_all()
# Clear model cache files on disk
import shutil
from detector_worker.core.config import MODELS_DIR
if os.path.exists(MODELS_DIR):
logger.info(f"Clearing model cache directory: {MODELS_DIR}")
shutil.rmtree(MODELS_DIR)
logger.info("Model cache directory cleared")
# Clear dependency container singletons
container.get_container().clear_singletons()
logger.info("Detector Worker shutdown complete")
except Exception as e:
logger.error(f"Error during shutdown: {e}")
# Create FastAPI app with lifespan handler
app = FastAPI(
title="Detector Worker",
version="2.0.0",
description="Refactored computer vision detection worker with modular architecture",
lifespan=lifespan
)
@app.websocket("/")
async def websocket_endpoint(websocket: WebSocket):
"""Main WebSocket endpoint for real-time communication."""
try:
# Create WebSocket handler using dependency injection
ws_handler = container.resolve(WebSocketHandler)
await ws_handler.handle_connection(websocket)
except Exception as e:
logger.error(f"WebSocket error: {e}")
if not websocket.client_state.DISCONNECTED:
await websocket.close()
@app.get("/camera/{camera_id}/image")
async def get_camera_image(camera_id: str):
"""REST endpoint to get latest frame from camera."""
try:
# Get latest frame from cache manager
frame_data = cache_manager.get_latest_frame(camera_id)
if frame_data is None:
raise HTTPException(status_code=404, detail=f"No frame available for camera {camera_id}")
# Return frame as image response
return Response(
content=frame_data,
media_type="image/jpeg",
headers={"Cache-Control": "no-cache, no-store, must-revalidate"}
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting camera image: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
@app.get("/health")
async def health_check():
"""Health check endpoint with system metrics."""
try:
# Get system metrics
system_stats = get_system_metrics()
# Get state manager statistics
stats = {
"status": "healthy",
"version": "2.0.0",
"system": system_stats,
"managers": {
"models": model_manager.get_stats(),
"streams": stream_manager.get_stats(),
"sessions": session_manager.get_stats(),
"cache": cache_manager.get_stats(),
"cameras": camera_manager.get_stats(),
"pipeline": pipeline_manager.get_stats()
},
"container": container.get_container().get_stats(),
"errors": error_handler.get_error_stats()
}
return stats
except Exception as e:
logger.error(f"Health check failed: {e}")
return {
"status": "unhealthy",
"error": str(e),
"version": "2.0.0"
}
@app.get("/config")
async def get_configuration():
"""Get current configuration."""
try:
config_manager = get_config_manager()
return {
"config": config_manager.get_all(),
"validation_errors": validate_config()
}
except Exception as e:
logger.error(f"Error getting configuration: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
@app.post("/config/reload")
async def reload_configuration():
"""Reload configuration from all sources."""
try:
config_manager = get_config_manager()
success = config_manager.reload()
if not success:
raise HTTPException(status_code=500, detail="Failed to reload configuration")
errors = validate_config()
return {
"success": True,
"validation_errors": errors,
"config": config_manager.get_all()
}
except Exception as e:
logger.error(f"Error reloading configuration: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
@app.get("/debug/websocket")
async def get_websocket_debug_status():
"""Get current WebSocket debugging status."""
try:
return get_websocket_debug_status()
except Exception as e:
logger.error(f"Error getting WebSocket debug status: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
@app.post("/debug/websocket/enable")
async def enable_websocket_debugging():
"""Enable WebSocket debugging."""
try:
enable_websocket_debug()
logger.info("WebSocket debugging enabled via API")
return {
"success": True,
"message": "WebSocket debugging enabled",
"status": get_websocket_debug_status()
}
except Exception as e:
logger.error(f"Error enabling WebSocket debugging: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
@app.post("/debug/websocket/disable")
async def disable_websocket_debugging():
"""Disable WebSocket debugging."""
try:
disable_websocket_debug()
logger.info("WebSocket debugging disabled via API")
return {
"success": True,
"message": "WebSocket debugging disabled",
"status": get_websocket_debug_status()
}
except Exception as e:
logger.error(f"Error disabling WebSocket debugging: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
if __name__ == "__main__":
import uvicorn
# Get configuration
config_manager = get_config_manager()
# Run the application
uvicorn.run(
app,
host="0.0.0.0",
port=8000,
log_level="info"
)