From 6c7c4c5d9ccdde81eacbd33585832053b9f24b46 Mon Sep 17 00:00:00 2001 From: ziesorx Date: Fri, 12 Sep 2025 15:39:19 +0700 Subject: [PATCH] Refactor: Phase 5: Granular Refactoring --- detector_worker/core/orchestrator.py | 521 ++++++++++++++++++ detector_worker/detection/yolo_detector.py | 137 ++++- detector_worker/pipeline/pipeline_executor.py | 167 +++++- detector_worker/utils/error_handler.py | 406 ++++++++++++++ 4 files changed, 1216 insertions(+), 15 deletions(-) create mode 100644 detector_worker/core/orchestrator.py create mode 100644 detector_worker/utils/error_handler.py diff --git a/detector_worker/core/orchestrator.py b/detector_worker/core/orchestrator.py new file mode 100644 index 0000000..9aea779 --- /dev/null +++ b/detector_worker/core/orchestrator.py @@ -0,0 +1,521 @@ +""" +Detection orchestrator module. + +This module provides the main orchestration classes that coordinate between +all the different components for the detection workflow. +""" +import asyncio +import json +import logging +import time +import traceback +from typing import Dict, Any, Optional, List, Callable + +from fastapi import WebSocket +from websockets.exceptions import ConnectionClosedError, WebSocketDisconnect + +from .config import config +from .constants import HEARTBEAT_INTERVAL +from .exceptions import DetectionError, OrchestrationError +from ..communication.websocket_handler import WebSocketHandler +from ..communication.message_processor import MessageProcessor, MessageType +from ..communication.response_formatter import ResponseFormatter +from ..streams.stream_manager import StreamManager +from ..models.model_manager import ModelManager +from ..pipeline.pipeline_executor import PipelineExecutor +from ..storage.session_cache import SessionCache +from ..storage.redis_client import RedisClientManager +from ..storage.database_manager import DatabaseManager +from ..utils.system_monitor import get_system_metrics + +# Setup logging +logger = logging.getLogger("detector_worker.orchestrator") + + +class DetectionOrchestrator: + """ + Main orchestrator for the detection workflow. + + This class coordinates all components and provides the main + entry point for detection processing. + """ + + def __init__(self): + """Initialize the detection orchestrator.""" + # Core components + self.stream_manager = StreamManager() + self.model_manager = ModelManager() + self.pipeline_executor = PipelineExecutor() + self.session_cache = SessionCache() + + # Communication components + self.message_processor = MessageProcessor() + self.response_formatter = ResponseFormatter() + + # Storage components (optional) + self.redis_client: Optional[RedisClientManager] = None + self.database_manager: Optional[DatabaseManager] = None + + # WebSocket handler + self.websocket_handler: Optional[WebSocketHandler] = None + + # Initialize components + self._initialize_components() + + def _initialize_components(self) -> None: + """Initialize and wire up components.""" + try: + # Initialize Redis if configured + redis_config = config.get("redis", {}) + if redis_config.get("enabled", False): + self.redis_client = RedisClientManager(redis_config) + + # Initialize database if configured + db_config = config.get("database", {}) + if db_config.get("enabled", False): + self.database_manager = DatabaseManager(db_config) + + # Wire up pipeline executor dependencies + self.pipeline_executor.set_dependencies( + redis_client=self.redis_client, + database_manager=self.database_manager, + session_cache=self.session_cache + ) + + # Set up model manager with pipeline loader + from ..models.pipeline_loader import get_pipeline_loader + pipeline_loader = get_pipeline_loader() + self.model_manager.set_pipeline_loader(pipeline_loader) + + except Exception as e: + logger.error(f"Failed to initialize components: {e}") + raise OrchestrationError(f"Component initialization failed: {e}") + + async def handle_websocket_connection(self, websocket: WebSocket) -> None: + """ + Handle a WebSocket connection (refactored detect() function). + + This replaces the massive 586-line detect() function with a clean + orchestration approach. + + Args: + websocket: WebSocket connection to handle + """ + try: + # Create WebSocket handler + self.websocket_handler = WebSocketHandler( + stream_manager=self.stream_manager, + model_manager=self.model_manager, + pipeline_executor=self.pipeline_executor, + session_cache=self.session_cache, + redis_client=self.redis_client + ) + + # Handle the connection + await self.websocket_handler.handle_connection(websocket) + + except Exception as e: + logger.error(f"Error in WebSocket orchestration: {e}") + traceback.print_exc() + raise DetectionError(f"WebSocket handling failed: {e}") + + def get_status_report(self) -> Dict[str, Any]: + """ + Get comprehensive status report. + + Returns: + Dictionary with system status + """ + try: + # Get system metrics + system_metrics = get_system_metrics() + + # Get component status + active_streams = self.stream_manager.get_active_streams() + loaded_models = self.model_manager.get_loaded_models() + model_stats = self.model_manager.get_model_stats() + session_stats = self.session_cache.get_cache_stats() + + status = { + "timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), + "system": system_metrics, + "streams": { + "active_count": len(active_streams), + "cameras": list(active_streams.keys()) + }, + "models": { + "loaded_count": len(loaded_models), + "total_instances": model_stats.get("total_instances", 0), + "cache_size_mb": model_stats.get("cache_size_mb", 0) + }, + "sessions": { + "active_count": session_stats.get("active_sessions", 0), + "cached_detections": session_stats.get("cached_detections", 0), + "cache_size_mb": session_stats.get("cache_size_mb", 0) + } + } + + # Add Redis status if available + if self.redis_client: + redis_stats = self.redis_client.get_connection_stats() + status["redis"] = redis_stats + + # Add database status if available + if self.database_manager: + db_stats = self.database_manager.get_connection_stats() + status["database"] = db_stats + + return status + + except Exception as e: + logger.error(f"Error getting status report: {e}") + return { + "timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), + "error": f"Failed to get status: {e}" + } + + async def shutdown(self) -> None: + """Shutdown the orchestrator and all components.""" + logger.info("Shutting down detection orchestrator") + + try: + # Shutdown WebSocket handler + if self.websocket_handler: + await self.websocket_handler._cleanup() + + # Cleanup components + await self.stream_manager.cleanup_all_streams() + self.model_manager.cleanup_all_models() + self.session_cache.clear_all_sessions() + + # Cleanup storage components + if self.redis_client: + await self.redis_client.cleanup() + if self.database_manager: + await self.database_manager.cleanup() + + except Exception as e: + logger.error(f"Error during shutdown: {e}") + + logger.info("Detection orchestrator shutdown completed") + + +class SessionOrchestrator: + """ + Orchestrates session-based detection workflows. + + This class handles the complex session state management and + mode transitions that were embedded in the original detect() function. + """ + + def __init__( + self, + session_cache: SessionCache, + pipeline_executor: PipelineExecutor + ): + """ + Initialize session orchestrator. + + Args: + session_cache: Session cache manager + pipeline_executor: Pipeline executor + """ + self.session_cache = session_cache + self.pipeline_executor = pipeline_executor + + async def process_camera_detection( + self, + camera_id: str, + stream_config: Dict[str, Any], + frame: Any, + model_tree: Any, + backend_session_id: Optional[str] = None + ) -> Optional[Dict[str, Any]]: + """ + Process detection for a camera with session state management. + + This method encapsulates the complex session mode logic from + the original detect() function. + + Args: + camera_id: Camera identifier + stream_config: Stream configuration + frame: Video frame to process + model_tree: Model pipeline tree + backend_session_id: Backend session ID if available + + Returns: + Detection result or None + """ + try: + # Get display identifier for pipeline context + subscription_parts = stream_config["subscriptionIdentifier"].split(';') + display_identifier = subscription_parts[0] if subscription_parts else None + + # Get or initialize session pipeline state + pipeline_state = self.session_cache.get_or_init_session_pipeline_state(camera_id) + current_mode = pipeline_state["mode"] + + logger.debug(f"🔍 Camera {camera_id}: mode='{current_mode}', session_id={repr(backend_session_id)}") + + # Handle session ID-based processing mode + updated_mode = self._handle_session_mode_transition( + camera_id, current_mode, backend_session_id, pipeline_state + ) + + # Apply frame cropping if configured + cropped_frame = self._apply_frame_crop(frame, stream_config, camera_id) + + # Process based on current mode + detection_result = await self._process_by_mode( + camera_id, updated_mode, cropped_frame, stream_config, + model_tree, pipeline_state, backend_session_id + ) + + return detection_result + + except Exception as e: + logger.error(f"Error processing camera detection for {camera_id}: {e}") + traceback.print_exc() + return None + + def _handle_session_mode_transition( + self, + camera_id: str, + current_mode: str, + backend_session_id: Optional[str], + pipeline_state: Dict[str, Any] + ) -> str: + """Handle session mode transitions based on session ID availability.""" + if not backend_session_id: + # No session ID - handle different modes appropriately + if current_mode == "lightweight": + # Check if we're in car_waitpayment stage + current_progression = pipeline_state.get("progression_stage") + if current_progression == "car_waitpayment": + # Stay in lightweight mode for dual reset condition + logger.debug(f"🔍 Camera {camera_id}: No session ID but in car_waitpayment - staying in lightweight mode") + return current_mode + else: + # Reset to validation_detecting (situation 1) + self.session_cache.update_session_pipeline_mode(camera_id, "validation_detecting") + logger.debug(f"🔍 Camera {camera_id}: No session ID - reset to validation_detecting") + return "validation_detecting" + elif current_mode not in ["validation_detecting", "send_detections", "waiting_for_session_id"]: + # Other modes - reset to validation_detecting + self.session_cache.update_session_pipeline_mode(camera_id, "validation_detecting") + logger.debug(f"🔍 Camera {camera_id}: No session ID - reset from {current_mode} to validation_detecting") + return "validation_detecting" + else: + logger.debug(f"🔍 Camera {camera_id}: No session ID - staying in {current_mode}") + return current_mode + else: + # Session ID available - switch to full pipeline mode + if current_mode in ["send_detections", "waiting_for_session_id"]: + # Session ID just arrived - switch to full pipeline mode + self.session_cache.update_session_pipeline_mode( + camera_id, "full_pipeline", backend_session_id + ) + logger.info(f"đŸ”Ĩ Camera {camera_id}: Session ID received ({backend_session_id}) - switching to FULL PIPELINE mode") + return "full_pipeline" + else: + logger.debug(f"🔍 Camera {camera_id}: Session ID available - staying in {current_mode}") + return current_mode + + def _apply_frame_crop( + self, + frame: Any, + stream_config: Dict[str, Any], + camera_id: str + ) -> Any: + """Apply frame cropping if configured.""" + crop_coords = [ + stream_config.get("cropX1"), + stream_config.get("cropY1"), + stream_config.get("cropX2"), + stream_config.get("cropY2") + ] + + if all(coord is not None for coord in crop_coords): + x1, y1, x2, y2 = crop_coords + cropped_frame = frame[y1:y2, x1:x2] + logger.debug(f"Applied crop ({x1}, {y1}, {x2}, {y2}) to frame for camera {camera_id}") + return cropped_frame + + return frame + + async def _process_by_mode( + self, + camera_id: str, + mode: str, + frame: Any, + stream_config: Dict[str, Any], + model_tree: Any, + pipeline_state: Dict[str, Any], + backend_session_id: Optional[str] + ) -> Optional[Dict[str, Any]]: + """Process detection based on current mode.""" + + if mode == "validation_detecting": + return await self._process_validation_mode( + camera_id, frame, stream_config, model_tree, pipeline_state + ) + elif mode == "send_detections": + return await self._process_send_detections_mode( + camera_id, frame, stream_config, model_tree, pipeline_state + ) + elif mode == "waiting_for_session_id": + return await self._process_waiting_mode( + camera_id, frame, stream_config, model_tree, pipeline_state + ) + elif mode == "full_pipeline": + return await self._process_full_pipeline_mode( + camera_id, frame, stream_config, model_tree, + pipeline_state, backend_session_id + ) + elif mode == "lightweight": + return await self._process_lightweight_mode( + camera_id, frame, stream_config, model_tree, pipeline_state + ) + else: + logger.warning(f"Unknown mode '{mode}' for camera {camera_id}") + return None + + async def _process_validation_mode( + self, + camera_id: str, + frame: Any, + stream_config: Dict[str, Any], + model_tree: Any, + pipeline_state: Dict[str, Any] + ) -> Optional[Dict[str, Any]]: + """Process detection in validation mode.""" + logger.debug(f"🔍 Camera {camera_id}: Processing in validation_detecting mode") + + # Run detection only (no full pipeline) + detection_result = await self.pipeline_executor.run_detection_only( + camera_id, frame, model_tree, pipeline_state + ) + + if detection_result and detection_result.has_expected_classes(): + # Validation successful - transition to send_detections mode + self.session_cache.update_session_pipeline_mode(camera_id, "send_detections") + logger.info(f"đŸŽ¯ Camera {camera_id}: Validation successful - switching to send_detections mode") + + return detection_result + + async def _process_send_detections_mode( + self, + camera_id: str, + frame: Any, + stream_config: Dict[str, Any], + model_tree: Any, + pipeline_state: Dict[str, Any] + ) -> Optional[Dict[str, Any]]: + """Process detection in send_detections mode.""" + logger.debug(f"🔍 Camera {camera_id}: Processing in send_detections mode") + + # Continue sending detections, waiting for session ID + detection_result = await self.pipeline_executor.run_detection_only( + camera_id, frame, model_tree, pipeline_state + ) + + # Check if we should transition to waiting mode + if detection_result and detection_result.is_stable(): + self.session_cache.update_session_pipeline_mode(camera_id, "waiting_for_session_id") + logger.info(f"âŗ Camera {camera_id}: Stable detection - switching to waiting_for_session_id mode") + + return detection_result + + async def _process_waiting_mode( + self, + camera_id: str, + frame: Any, + stream_config: Dict[str, Any], + model_tree: Any, + pipeline_state: Dict[str, Any] + ) -> Optional[Dict[str, Any]]: + """Process detection in waiting_for_session_id mode.""" + logger.debug(f"🔍 Camera {camera_id}: Processing in waiting_for_session_id mode") + + # Continue detection but wait for session ID to arrive + return await self.pipeline_executor.run_detection_only( + camera_id, frame, model_tree, pipeline_state + ) + + async def _process_full_pipeline_mode( + self, + camera_id: str, + frame: Any, + stream_config: Dict[str, Any], + model_tree: Any, + pipeline_state: Dict[str, Any], + backend_session_id: str + ) -> Optional[Dict[str, Any]]: + """Process detection with full pipeline.""" + logger.debug(f"🔍 Camera {camera_id}: Processing in full_pipeline mode") + + # Run full pipeline with all actions + result = await self.pipeline_executor.execute_pipeline( + camera_id, stream_config, frame, model_tree, + {}, pipeline_state, backend_session_id + ) + + # Check for mode transitions after pipeline + if result and result.get("should_switch_to_lightweight"): + self.session_cache.update_session_pipeline_mode(camera_id, "lightweight") + logger.info(f"💡 Camera {camera_id}: Switching to lightweight mode after full pipeline") + + return result + + async def _process_lightweight_mode( + self, + camera_id: str, + frame: Any, + stream_config: Dict[str, Any], + model_tree: Any, + pipeline_state: Dict[str, Any] + ) -> Optional[Dict[str, Any]]: + """Process detection in lightweight mode.""" + logger.debug(f"🔍 Camera {camera_id}: Processing in lightweight mode") + + # Check if YOLO inference is enabled for this stage + yolo_enabled = pipeline_state.get("yolo_inference_enabled", True) + + if yolo_enabled: + # Run detection for absence monitoring + return await self.pipeline_executor.run_detection_only( + camera_id, frame, model_tree, pipeline_state + ) + else: + # Return cached detection without inference + cached_result = self.session_cache.get_cached_detection(camera_id) + logger.debug(f"🔍 Camera {camera_id}: YOLO inference disabled - returning cached detection") + return cached_result + + +# Global orchestrator instance +_detection_orchestrator: Optional[DetectionOrchestrator] = None + + +def get_detection_orchestrator() -> DetectionOrchestrator: + """Get or create the global detection orchestrator.""" + global _detection_orchestrator + if _detection_orchestrator is None: + _detection_orchestrator = DetectionOrchestrator() + return _detection_orchestrator + + +# Main entry point for the refactored detect() function +async def handle_websocket_detection(websocket: WebSocket) -> None: + """ + Main entry point for WebSocket detection handling. + + This replaces the original 586-line detect() function with a clean, + orchestrated approach using the DetectionOrchestrator. + + Args: + websocket: WebSocket connection to handle + """ + orchestrator = get_detection_orchestrator() + await orchestrator.handle_websocket_connection(websocket) \ No newline at end of file diff --git a/detector_worker/detection/yolo_detector.py b/detector_worker/detection/yolo_detector.py index 6355c60..8cee58f 100644 --- a/detector_worker/detection/yolo_detector.py +++ b/detector_worker/detection/yolo_detector.py @@ -493,6 +493,9 @@ class YOLODetector: """ Run YOLO detection with BoT-SORT tracking and stability validation. + This method has been refactored from 226 lines into smaller, focused helper methods + for better maintainability and testability. + Args: frame: Input frame/image node: Pipeline node configuration with model and settings @@ -504,6 +507,32 @@ class YOLODetector: - regions_dict: Dict mapping class names to highest confidence detections - track_validation_result: Dict with validation status and stable tracks """ + try: + # Extract context information + camera_id, model_id, current_mode = self._extract_detection_context(node, context) + + # Initialize detection configuration + tracking_config = self._extract_tracking_config(node) + min_confidence = self._determine_confidence_threshold(node) + + # Prepare tracker state + self._prepare_tracker_state(node, camera_id, model_id) + + # Run core detection pipeline + detection_result = self._execute_detection_pipeline( + frame, node, camera_id, model_id, current_mode, + tracking_config, min_confidence + ) + + # Store validation context + if context is not None: + context["track_validation_result"] = detection_result[2] + + return detection_result + + except Exception as e: + logger.error(f"Detection error for camera {context.get('camera_id', 'unknown') if context else 'unknown'}: {e}") + return [], {}, {} try: camera_id = context.get("camera_id", "unknown") if context else "unknown" model_id = node.get("modelId", "unknown") @@ -566,18 +595,116 @@ class YOLODetector: return [], {}, track_validation_result.to_dict() logger.info(f"✅ Camera {camera_id}: DETECTION COMPLETE - tracking single car: track_id={track_id}, conf={best_detection['confidence']:.3f}") - logger.debug(f"📊 Camera {camera_id}: Detection summary: {len(res.boxes)} raw → {len(candidate_detections)} candidates → 1 selected") # Store validation state in context for pipeline decisions if context is not None: context["track_validation_result"] = track_validation_result.to_dict() return all_detections, regions_dict, track_validation_result.to_dict() - + except Exception as e: - camera_id = context.get("camera_id", "unknown") if context else "unknown" - model_id = node.get("modelId", "unknown") - raise create_detection_error(camera_id, model_id, "detection_with_tracking", e) + logger.error(f"Detection error for camera {camera_id}: {e}") + return [], {}, {} + + def _extract_detection_context(self, node: Dict[str, Any], context: Optional[Dict[str, Any]]) -> Tuple[str, str, str]: + """Extract camera ID, model ID, and current mode from context.""" + camera_id = context.get("camera_id", "unknown") if context else "unknown" + model_id = node.get("modelId", "unknown") + current_mode = context.get("current_mode", "unknown") if context else "unknown" + return camera_id, model_id, current_mode + + def _prepare_tracker_state(self, node: Dict[str, Any], camera_id: str, model_id: str) -> None: + """Prepare tracker state and reset if needed.""" + # Check if we need to reset tracker after cooldown + self._reset_yolo_tracker_if_needed(node, camera_id, model_id) + + def _execute_detection_pipeline( + self, + frame, + node: Dict[str, Any], + camera_id: str, + model_id: str, + current_mode: str, + tracking_config: TrackingConfig, + min_confidence: float + ) -> Tuple[List[Dict[str, Any]], Dict[str, Any], Dict[str, Any]]: + """Execute the core detection pipeline.""" + # Run YOLO inference + res = self._run_yolo_inference(frame, node, tracking_config) + + # Process detection results + candidate_detections = self._process_detections(res, node, camera_id, min_confidence) + + # Select best detection + best_detection = self._select_best_detection(candidate_detections, camera_id) + + # Update track stability validation + track_validation_result = self._update_track_stability( + best_detection, node, camera_id, model_id, current_mode + ) + + # Handle no detection case + if best_detection is None: + return [], {}, track_validation_result.to_dict() + + # Process successful detection + return self._process_successful_detection( + best_detection, node, camera_id, track_validation_result + ) + + def _update_track_stability( + self, + detection: Optional[Dict[str, Any]], + node: Dict[str, Any], + camera_id: str, + model_id: str, + current_mode: str + ) -> Any: + """Update track stability validation.""" + tracking_config = self._extract_tracking_config(node) + is_branch_node = node.get("cropClass") is not None or node.get("parallel") is True + + return self.stability_tracker.update_single_track_stability( + detection=detection, + camera_id=camera_id, + model_id=model_id, + stability_threshold=tracking_config.stability_threshold, + current_mode=current_mode, + is_branch_node=is_branch_node + ) + + def _process_successful_detection( + self, + detection: Dict[str, Any], + node: Dict[str, Any], + camera_id: str, + track_validation_result: Any + ) -> Tuple[List[Dict[str, Any]], Dict[str, Any], Dict[str, Any]]: + """Process a successful detection result.""" + # Apply class mapping + detection = self._apply_class_mapping(detection, node) + + # Create regions dictionary + mapped_class = detection["class"] + track_id = detection["id"] + + all_detections = [detection] + regions_dict = { + mapped_class: { + "bbox": detection["bbox"], + "confidence": detection["confidence"], + "detection": detection, + "track_id": track_id + } + } + + # Multi-class validation + if not self._validate_multi_class(regions_dict, node): + return [], {}, track_validation_result.to_dict() + + logger.info(f"✅ Camera {camera_id}: DETECTION COMPLETE - tracking single car: track_id={track_id}, conf={detection['confidence']:.3f}") + + return all_detections, regions_dict, track_validation_result.to_dict() # Global instances for backward compatibility diff --git a/detector_worker/pipeline/pipeline_executor.py b/detector_worker/pipeline/pipeline_executor.py index 7b88073..1e4013d 100644 --- a/detector_worker/pipeline/pipeline_executor.py +++ b/detector_worker/pipeline/pipeline_executor.py @@ -657,20 +657,167 @@ class PipelineExecutor: self._execute_node_actions(node, frame, detection_result, regions_dict) # ─── Return detection result ──────────────────────────────── - primary_detection = max(all_detections, key=lambda x: x["confidence"]) - primary_bbox = primary_detection["bbox"] - - # Add branch results and session_id to primary detection for compatibility - if "branch_results" in detection_result: - primary_detection["branch_results"] = detection_result["branch_results"] - if "session_id" in detection_result: - primary_detection["session_id"] = detection_result["session_id"] - - return (primary_detection, primary_bbox) if return_bbox else primary_detection + return self._finalize_pipeline_result(detection_result, return_bbox) except Exception as e: pipeline_id = node.get("modelId", "unknown") raise create_pipeline_error(pipeline_id, "pipeline_execution", e) + + def _initialize_pipeline_execution(self, node: Dict[str, Any], context: Optional[Dict[str, Any]]) -> PipelineContext: + """Initialize pipeline execution context.""" + pipeline_context = self._extract_context(context) + model_id = node.get("modelId", "unknown") + + if pipeline_context.backend_session_id: + logger.info(f"🔑 PIPELINE USING BACKEND SESSION_ID: {pipeline_context.backend_session_id} for camera {pipeline_context.camera_id}") + + return pipeline_context + + def _handle_classification_pipeline( + self, + frame: np.ndarray, + node: Dict[str, Any], + pipeline_context: PipelineContext, + return_bbox: bool + ) -> Optional[Union[Dict[str, Any], Tuple[Dict[str, Any], List[int]]]]: + """Handle classification pipeline tasks.""" + task = getattr(node["model"], "task", None) + if task == "classify": + return self._handle_classification_task(frame, node, pipeline_context, return_bbox) + return None + + def _check_session_state( + self, + pipeline_context: PipelineContext, + node: Dict[str, Any], + return_bbox: bool + ) -> Optional[Union[Dict[str, Any], Tuple[Dict[str, Any], List[int]]]]: + """Check session and camera activity state.""" + model_id = node.get("modelId", "unknown") + return self._check_camera_active(pipeline_context.camera_id, model_id, return_bbox) + + def _execute_detection_stage( + self, + frame: np.ndarray, + node: Dict[str, Any], + pipeline_context: PipelineContext, + validated_detection: Optional[Dict[str, Any]] + ) -> Optional[Tuple[List[Dict[str, Any]], Dict[str, Any], Dict[str, Any]]]: + """Execute the detection stage.""" + all_detections, regions_dict, track_validation_result = self._run_detection_stage( + frame, node, pipeline_context, validated_detection + ) + + if not all_detections: + logger.debug("No detections from structured detection function") + return None + + return all_detections, regions_dict, track_validation_result + + def _create_none_detection(self, return_bbox: bool) -> Union[Dict[str, Any], Tuple[Dict[str, Any], List[int]]]: + """Create a 'none' detection result.""" + none_detection = { + "class": "none", + "confidence": 1.0, + "bbox": [0, 0, 0, 0], + "branch_results": {} + } + return (none_detection, [0, 0, 0, 0]) if return_bbox else none_detection + + def _validate_pipeline_execution( + self, + node: Dict[str, Any], + track_validation_result: Dict[str, Any], + regions_dict: Dict[str, Any], + pipeline_context: PipelineContext, + return_bbox: bool + ) -> Optional[Union[Dict[str, Any], Tuple[Dict[str, Any], List[int]], Tuple[None, None]]]: + """Validate pipeline execution requirements.""" + # Track-based validation + tracking_validation_result = self._validate_tracking_requirements( + node, track_validation_result, pipeline_context, return_bbox + ) + if tracking_validation_result is not None: + return tracking_validation_result + + # Pipeline execution validation + pipeline_valid, missing_branches = validate_pipeline_execution(node, regions_dict) + if not pipeline_valid: + logger.error(f"Pipeline execution validation FAILED - required branches {missing_branches} cannot execute") + logger.error("Aborting pipeline: no Redis actions or database records will be created") + return (None, None) if return_bbox else None + + return None + + def _execute_main_pipeline( + self, + frame: np.ndarray, + node: Dict[str, Any], + all_detections: List[Dict[str, Any]], + regions_dict: Dict[str, Any], + pipeline_context: PipelineContext + ) -> Dict[str, Any]: + """Execute the main pipeline with actions and branch processing.""" + model_id = node.get("modelId", "unknown") + + # Create detection result + detection_result = { + "detections": all_detections, + "regions": regions_dict, + **pipeline_context.to_dict() + } + + # Handle database operations + self._handle_database_operations(node, detection_result, regions_dict, pipeline_context) + + # Execute root node actions if no branches or specific model + if not node.get("branches") or node.get("modelId") == "yolo11n": + self._execute_node_actions(node, frame, detection_result, regions_dict) + + # Process branches + branch_results = self._process_branches(frame, node, detection_result, regions_dict, pipeline_context) + detection_result["branch_results"] = branch_results + + # Execute parallel actions + if node.get("parallelActions") and "branch_results" in detection_result: + self._execute_parallel_actions(node, frame, detection_result, regions_dict) + + # Auto-enable occupancy mode after successful completion + occupancy_detector(pipeline_context.camera_id, model_id, enable=True) + + logger.info(f"✅ Camera {pipeline_context.camera_id}: Pipeline completed, detection data will be sent to backend") + logger.info(f"🛑 Camera {pipeline_context.camera_id}: Model will stop inference for future frames") + logger.info(f"📡 Backend sessionId will be handled when received via WebSocket") + + # Execute branch node actions + if node.get("actions") and regions_dict and node.get("modelId") != "yolo11n": + logger.debug(f"Executing post-detection actions for branch node {node.get('modelId')}") + self._execute_node_actions(node, frame, detection_result, regions_dict) + + return detection_result + + def _finalize_pipeline_result( + self, + detection_result: Dict[str, Any], + return_bbox: bool + ) -> Union[Dict[str, Any], Tuple[Dict[str, Any], List[int]]]: + """Finalize and return the pipeline result.""" + all_detections = detection_result.get("detections", []) + + if not all_detections: + return self._create_none_detection(return_bbox) + + # Get primary detection (highest confidence) + primary_detection = max(all_detections, key=lambda x: x["confidence"]) + primary_bbox = primary_detection["bbox"] + + # Add branch results and session_id to primary detection for compatibility + if "branch_results" in detection_result: + primary_detection["branch_results"] = detection_result["branch_results"] + if "session_id" in detection_result: + primary_detection["session_id"] = detection_result["session_id"] + + return (primary_detection, primary_bbox) if return_bbox else primary_detection # Global pipeline executor instance diff --git a/detector_worker/utils/error_handler.py b/detector_worker/utils/error_handler.py new file mode 100644 index 0000000..24a92d3 --- /dev/null +++ b/detector_worker/utils/error_handler.py @@ -0,0 +1,406 @@ +""" +Common error handling and logging patterns. + +This module provides standardized error handling, logging utilities, and +common patterns used throughout the detection worker system. +""" +import logging +import traceback +import functools +import time +from typing import Dict, Any, Optional, Callable, Union +from contextlib import contextmanager +from dataclasses import dataclass +from enum import Enum + +from ..core.exceptions import DetectionError, PipelineError, StreamError, ModelLoadError + +# Setup logging +logger = logging.getLogger("detector_worker.error_handler") + + +class ErrorSeverity(Enum): + """Error severity levels.""" + LOW = "low" + MEDIUM = "medium" + HIGH = "high" + CRITICAL = "critical" + + +@dataclass +class ErrorContext: + """Context information for error reporting.""" + component: str = "unknown" + operation: str = "unknown" + camera_id: Optional[str] = None + model_id: Optional[str] = None + session_id: Optional[str] = None + additional_data: Optional[Dict[str, Any]] = None + + def to_dict(self) -> Dict[str, Any]: + """Convert to dictionary.""" + return { + "component": self.component, + "operation": self.operation, + "camera_id": self.camera_id, + "model_id": self.model_id, + "session_id": self.session_id, + "additional_data": self.additional_data or {} + } + + +class ErrorHandler: + """ + Centralized error handling and logging utility. + + Provides consistent error handling patterns, logging formats, + and error recovery strategies across the detection worker. + """ + + def __init__(self, component_name: str = "detector_worker"): + """ + Initialize error handler. + + Args: + component_name: Name of the component using this handler + """ + self.component_name = component_name + self.error_counts: Dict[str, int] = {} + + def handle_error( + self, + error: Exception, + context: ErrorContext, + severity: ErrorSeverity = ErrorSeverity.MEDIUM, + reraise: bool = True, + log_traceback: bool = True + ) -> None: + """ + Handle an error with consistent logging and optional re-raising. + + Args: + error: Exception that occurred + context: Error context information + severity: Error severity level + reraise: Whether to re-raise the exception + log_traceback: Whether to log the full traceback + """ + # Generate error key for counting + error_key = f"{context.component}:{context.operation}:{type(error).__name__}" + self.error_counts[error_key] = self.error_counts.get(error_key, 0) + 1 + + # Create error message + error_msg = self._format_error_message(error, context, severity) + + # Log based on severity + if severity == ErrorSeverity.CRITICAL: + logger.critical(error_msg) + elif severity == ErrorSeverity.HIGH: + logger.error(error_msg) + elif severity == ErrorSeverity.MEDIUM: + logger.warning(error_msg) + else: + logger.info(error_msg) + + # Log traceback if requested and severity is medium or higher + if log_traceback and severity in [ErrorSeverity.MEDIUM, ErrorSeverity.HIGH, ErrorSeverity.CRITICAL]: + logger.error(f"Traceback for {context.component}:{context.operation}:") + logger.error(traceback.format_exc()) + + # Re-raise if requested + if reraise: + # Convert to appropriate custom exception + custom_error = self._convert_to_custom_exception(error, context) + raise custom_error + + def _format_error_message( + self, + error: Exception, + context: ErrorContext, + severity: ErrorSeverity + ) -> str: + """Format error message with context.""" + severity_emoji = { + ErrorSeverity.LOW: "â„šī¸", + ErrorSeverity.MEDIUM: "âš ī¸", + ErrorSeverity.HIGH: "❌", + ErrorSeverity.CRITICAL: "🚨" + } + + emoji = severity_emoji.get(severity, "❓") + + parts = [ + f"{emoji} {severity.value.upper()} ERROR in {context.component}", + f"Operation: {context.operation}", + f"Error: {type(error).__name__}: {error}" + ] + + if context.camera_id: + parts.append(f"Camera: {context.camera_id}") + if context.model_id: + parts.append(f"Model: {context.model_id}") + if context.session_id: + parts.append(f"Session: {context.session_id}") + + error_key = f"{context.component}:{context.operation}:{type(error).__name__}" + count = self.error_counts.get(error_key, 1) + if count > 1: + parts.append(f"Count: {count}") + + return " | ".join(parts) + + def _convert_to_custom_exception( + self, + error: Exception, + context: ErrorContext + ) -> Exception: + """Convert generic exception to appropriate custom exception.""" + error_msg = f"{context.operation} failed: {error}" + + if context.component in ["yolo_detector", "tracking_manager", "stability_validator"]: + return DetectionError(error_msg, details=context.to_dict()) + elif context.component in ["pipeline_executor", "action_executor", "field_mapper"]: + return PipelineError(error_msg, details=context.to_dict()) + elif context.component in ["stream_manager", "frame_reader", "camera_monitor"]: + return StreamError(error_msg, details=context.to_dict()) + elif context.component in ["model_manager", "pipeline_loader"]: + return ModelLoadError(error_msg, details=context.to_dict()) + else: + return error + + def get_error_stats(self) -> Dict[str, Any]: + """Get error statistics.""" + total_errors = sum(self.error_counts.values()) + return { + "total_errors": total_errors, + "error_breakdown": dict(self.error_counts), + "unique_error_types": len(self.error_counts) + } + + def reset_error_counts(self) -> None: + """Reset error counts.""" + self.error_counts.clear() + + +def with_error_handling( + component: str, + operation: str, + severity: ErrorSeverity = ErrorSeverity.MEDIUM, + reraise: bool = True, + default_return: Any = None +): + """ + Decorator to add consistent error handling to functions. + + Args: + component: Component name + operation: Operation name + severity: Error severity level + reraise: Whether to re-raise exceptions + default_return: Default return value if error occurs and not re-raising + """ + def decorator(func: Callable) -> Callable: + @functools.wraps(func) + def wrapper(*args, **kwargs): + error_handler = ErrorHandler(component) + context = ErrorContext( + component=component, + operation=operation, + additional_data={"args": len(args), "kwargs": list(kwargs.keys())} + ) + + try: + return func(*args, **kwargs) + except Exception as e: + error_handler.handle_error( + e, context, severity=severity, reraise=reraise + ) + return default_return + + return wrapper + return decorator + + +@contextmanager +def error_context( + component: str, + operation: str, + camera_id: Optional[str] = None, + model_id: Optional[str] = None, + severity: ErrorSeverity = ErrorSeverity.MEDIUM +): + """ + Context manager for error handling. + + Args: + component: Component name + operation: Operation name + camera_id: Optional camera ID + model_id: Optional model ID + severity: Error severity level + """ + error_handler = ErrorHandler(component) + context = ErrorContext( + component=component, + operation=operation, + camera_id=camera_id, + model_id=model_id + ) + + try: + yield context + except Exception as e: + error_handler.handle_error(e, context, severity=severity, reraise=True) + + +class PerformanceTimer: + """ + Performance timing utility with automatic logging. + """ + + def __init__( + self, + operation: str, + component: str = "detector_worker", + log_threshold: float = 1.0, + auto_log: bool = True + ): + """ + Initialize performance timer. + + Args: + operation: Operation being timed + component: Component name + log_threshold: Log if operation takes longer than this (seconds) + auto_log: Whether to automatically log timing + """ + self.operation = operation + self.component = component + self.log_threshold = log_threshold + self.auto_log = auto_log + self.start_time: Optional[float] = None + self.end_time: Optional[float] = None + + def __enter__(self) -> 'PerformanceTimer': + """Start timing.""" + self.start_time = time.time() + return self + + def __exit__(self, exc_type, exc_val, exc_tb) -> None: + """End timing and optionally log.""" + self.end_time = time.time() + + if self.auto_log: + self.log_timing() + + def get_duration(self) -> float: + """Get duration in seconds.""" + if self.start_time is None: + return 0.0 + end = self.end_time or time.time() + return end - self.start_time + + def log_timing(self) -> None: + """Log timing information.""" + duration = self.get_duration() + + if duration >= self.log_threshold: + level = logging.WARNING if duration >= (self.log_threshold * 2) else logging.INFO + logger.log( + level, + f"âąī¸ {self.component}:{self.operation} took {duration:.3f}s" + ) + else: + logger.debug(f"âąī¸ {self.component}:{self.operation} took {duration:.3f}s") + + +def log_function_entry_exit(component: str = "detector_worker"): + """ + Decorator to log function entry and exit. + + Args: + component: Component name + """ + def decorator(func: Callable) -> Callable: + @functools.wraps(func) + def wrapper(*args, **kwargs): + func_name = f"{func.__module__}.{func.__qualname__}" + logger.debug(f"📍 ENTER {component}:{func_name}") + + try: + result = func(*args, **kwargs) + logger.debug(f"📍 EXIT {component}:{func_name} -> {type(result).__name__}") + return result + except Exception as e: + logger.debug(f"📍 ERROR {component}:{func_name} -> {type(e).__name__}: {e}") + raise + + return wrapper + return decorator + + +def create_logger( + name: str, + level: int = logging.INFO, + format_string: Optional[str] = None +) -> logging.Logger: + """ + Create a standardized logger. + + Args: + name: Logger name + level: Logging level + format_string: Custom format string + + Returns: + Configured logger + """ + if format_string is None: + format_string = "%(asctime)s | %(levelname)s | %(name)s | %(message)s" + + logger = logging.getLogger(name) + logger.setLevel(level) + + # Avoid adding handlers multiple times + if not logger.handlers: + handler = logging.StreamHandler() + handler.setLevel(level) + formatter = logging.Formatter(format_string) + handler.setFormatter(formatter) + logger.addHandler(handler) + + return logger + + +# Global error handler instance +_global_error_handler = ErrorHandler("detector_worker") + + +# Convenience functions +def handle_error( + error: Exception, + component: str, + operation: str, + camera_id: Optional[str] = None, + model_id: Optional[str] = None, + severity: ErrorSeverity = ErrorSeverity.MEDIUM, + reraise: bool = True +) -> None: + """Global error handling function.""" + context = ErrorContext( + component=component, + operation=operation, + camera_id=camera_id, + model_id=model_id + ) + _global_error_handler.handle_error(error, context, severity, reraise) + + +def get_global_error_stats() -> Dict[str, Any]: + """Get global error statistics.""" + return _global_error_handler.get_error_stats() + + +def reset_global_error_stats() -> None: + """Reset global error statistics.""" + _global_error_handler.reset_error_counts() \ No newline at end of file