Refactor: Phase 5: Granular Refactoring

This commit is contained in:
ziesorx 2025-09-12 15:39:19 +07:00
parent 54f21672aa
commit 6c7c4c5d9c
4 changed files with 1216 additions and 15 deletions

View file

@ -0,0 +1,521 @@
"""
Detection orchestrator module.
This module provides the main orchestration classes that coordinate between
all the different components for the detection workflow.
"""
import asyncio
import json
import logging
import time
import traceback
from typing import Dict, Any, Optional, List, Callable
from fastapi import WebSocket
from websockets.exceptions import ConnectionClosedError, WebSocketDisconnect
from .config import config
from .constants import HEARTBEAT_INTERVAL
from .exceptions import DetectionError, OrchestrationError
from ..communication.websocket_handler import WebSocketHandler
from ..communication.message_processor import MessageProcessor, MessageType
from ..communication.response_formatter import ResponseFormatter
from ..streams.stream_manager import StreamManager
from ..models.model_manager import ModelManager
from ..pipeline.pipeline_executor import PipelineExecutor
from ..storage.session_cache import SessionCache
from ..storage.redis_client import RedisClientManager
from ..storage.database_manager import DatabaseManager
from ..utils.system_monitor import get_system_metrics
# Setup logging
logger = logging.getLogger("detector_worker.orchestrator")
class DetectionOrchestrator:
"""
Main orchestrator for the detection workflow.
This class coordinates all components and provides the main
entry point for detection processing.
"""
def __init__(self):
"""Initialize the detection orchestrator."""
# Core components
self.stream_manager = StreamManager()
self.model_manager = ModelManager()
self.pipeline_executor = PipelineExecutor()
self.session_cache = SessionCache()
# Communication components
self.message_processor = MessageProcessor()
self.response_formatter = ResponseFormatter()
# Storage components (optional)
self.redis_client: Optional[RedisClientManager] = None
self.database_manager: Optional[DatabaseManager] = None
# WebSocket handler
self.websocket_handler: Optional[WebSocketHandler] = None
# Initialize components
self._initialize_components()
def _initialize_components(self) -> None:
"""Initialize and wire up components."""
try:
# Initialize Redis if configured
redis_config = config.get("redis", {})
if redis_config.get("enabled", False):
self.redis_client = RedisClientManager(redis_config)
# Initialize database if configured
db_config = config.get("database", {})
if db_config.get("enabled", False):
self.database_manager = DatabaseManager(db_config)
# Wire up pipeline executor dependencies
self.pipeline_executor.set_dependencies(
redis_client=self.redis_client,
database_manager=self.database_manager,
session_cache=self.session_cache
)
# Set up model manager with pipeline loader
from ..models.pipeline_loader import get_pipeline_loader
pipeline_loader = get_pipeline_loader()
self.model_manager.set_pipeline_loader(pipeline_loader)
except Exception as e:
logger.error(f"Failed to initialize components: {e}")
raise OrchestrationError(f"Component initialization failed: {e}")
async def handle_websocket_connection(self, websocket: WebSocket) -> None:
"""
Handle a WebSocket connection (refactored detect() function).
This replaces the massive 586-line detect() function with a clean
orchestration approach.
Args:
websocket: WebSocket connection to handle
"""
try:
# Create WebSocket handler
self.websocket_handler = WebSocketHandler(
stream_manager=self.stream_manager,
model_manager=self.model_manager,
pipeline_executor=self.pipeline_executor,
session_cache=self.session_cache,
redis_client=self.redis_client
)
# Handle the connection
await self.websocket_handler.handle_connection(websocket)
except Exception as e:
logger.error(f"Error in WebSocket orchestration: {e}")
traceback.print_exc()
raise DetectionError(f"WebSocket handling failed: {e}")
def get_status_report(self) -> Dict[str, Any]:
"""
Get comprehensive status report.
Returns:
Dictionary with system status
"""
try:
# Get system metrics
system_metrics = get_system_metrics()
# Get component status
active_streams = self.stream_manager.get_active_streams()
loaded_models = self.model_manager.get_loaded_models()
model_stats = self.model_manager.get_model_stats()
session_stats = self.session_cache.get_cache_stats()
status = {
"timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
"system": system_metrics,
"streams": {
"active_count": len(active_streams),
"cameras": list(active_streams.keys())
},
"models": {
"loaded_count": len(loaded_models),
"total_instances": model_stats.get("total_instances", 0),
"cache_size_mb": model_stats.get("cache_size_mb", 0)
},
"sessions": {
"active_count": session_stats.get("active_sessions", 0),
"cached_detections": session_stats.get("cached_detections", 0),
"cache_size_mb": session_stats.get("cache_size_mb", 0)
}
}
# Add Redis status if available
if self.redis_client:
redis_stats = self.redis_client.get_connection_stats()
status["redis"] = redis_stats
# Add database status if available
if self.database_manager:
db_stats = self.database_manager.get_connection_stats()
status["database"] = db_stats
return status
except Exception as e:
logger.error(f"Error getting status report: {e}")
return {
"timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
"error": f"Failed to get status: {e}"
}
async def shutdown(self) -> None:
"""Shutdown the orchestrator and all components."""
logger.info("Shutting down detection orchestrator")
try:
# Shutdown WebSocket handler
if self.websocket_handler:
await self.websocket_handler._cleanup()
# Cleanup components
await self.stream_manager.cleanup_all_streams()
self.model_manager.cleanup_all_models()
self.session_cache.clear_all_sessions()
# Cleanup storage components
if self.redis_client:
await self.redis_client.cleanup()
if self.database_manager:
await self.database_manager.cleanup()
except Exception as e:
logger.error(f"Error during shutdown: {e}")
logger.info("Detection orchestrator shutdown completed")
class SessionOrchestrator:
"""
Orchestrates session-based detection workflows.
This class handles the complex session state management and
mode transitions that were embedded in the original detect() function.
"""
def __init__(
self,
session_cache: SessionCache,
pipeline_executor: PipelineExecutor
):
"""
Initialize session orchestrator.
Args:
session_cache: Session cache manager
pipeline_executor: Pipeline executor
"""
self.session_cache = session_cache
self.pipeline_executor = pipeline_executor
async def process_camera_detection(
self,
camera_id: str,
stream_config: Dict[str, Any],
frame: Any,
model_tree: Any,
backend_session_id: Optional[str] = None
) -> Optional[Dict[str, Any]]:
"""
Process detection for a camera with session state management.
This method encapsulates the complex session mode logic from
the original detect() function.
Args:
camera_id: Camera identifier
stream_config: Stream configuration
frame: Video frame to process
model_tree: Model pipeline tree
backend_session_id: Backend session ID if available
Returns:
Detection result or None
"""
try:
# Get display identifier for pipeline context
subscription_parts = stream_config["subscriptionIdentifier"].split(';')
display_identifier = subscription_parts[0] if subscription_parts else None
# Get or initialize session pipeline state
pipeline_state = self.session_cache.get_or_init_session_pipeline_state(camera_id)
current_mode = pipeline_state["mode"]
logger.debug(f"🔍 Camera {camera_id}: mode='{current_mode}', session_id={repr(backend_session_id)}")
# Handle session ID-based processing mode
updated_mode = self._handle_session_mode_transition(
camera_id, current_mode, backend_session_id, pipeline_state
)
# Apply frame cropping if configured
cropped_frame = self._apply_frame_crop(frame, stream_config, camera_id)
# Process based on current mode
detection_result = await self._process_by_mode(
camera_id, updated_mode, cropped_frame, stream_config,
model_tree, pipeline_state, backend_session_id
)
return detection_result
except Exception as e:
logger.error(f"Error processing camera detection for {camera_id}: {e}")
traceback.print_exc()
return None
def _handle_session_mode_transition(
self,
camera_id: str,
current_mode: str,
backend_session_id: Optional[str],
pipeline_state: Dict[str, Any]
) -> str:
"""Handle session mode transitions based on session ID availability."""
if not backend_session_id:
# No session ID - handle different modes appropriately
if current_mode == "lightweight":
# Check if we're in car_waitpayment stage
current_progression = pipeline_state.get("progression_stage")
if current_progression == "car_waitpayment":
# Stay in lightweight mode for dual reset condition
logger.debug(f"🔍 Camera {camera_id}: No session ID but in car_waitpayment - staying in lightweight mode")
return current_mode
else:
# Reset to validation_detecting (situation 1)
self.session_cache.update_session_pipeline_mode(camera_id, "validation_detecting")
logger.debug(f"🔍 Camera {camera_id}: No session ID - reset to validation_detecting")
return "validation_detecting"
elif current_mode not in ["validation_detecting", "send_detections", "waiting_for_session_id"]:
# Other modes - reset to validation_detecting
self.session_cache.update_session_pipeline_mode(camera_id, "validation_detecting")
logger.debug(f"🔍 Camera {camera_id}: No session ID - reset from {current_mode} to validation_detecting")
return "validation_detecting"
else:
logger.debug(f"🔍 Camera {camera_id}: No session ID - staying in {current_mode}")
return current_mode
else:
# Session ID available - switch to full pipeline mode
if current_mode in ["send_detections", "waiting_for_session_id"]:
# Session ID just arrived - switch to full pipeline mode
self.session_cache.update_session_pipeline_mode(
camera_id, "full_pipeline", backend_session_id
)
logger.info(f"🔥 Camera {camera_id}: Session ID received ({backend_session_id}) - switching to FULL PIPELINE mode")
return "full_pipeline"
else:
logger.debug(f"🔍 Camera {camera_id}: Session ID available - staying in {current_mode}")
return current_mode
def _apply_frame_crop(
self,
frame: Any,
stream_config: Dict[str, Any],
camera_id: str
) -> Any:
"""Apply frame cropping if configured."""
crop_coords = [
stream_config.get("cropX1"),
stream_config.get("cropY1"),
stream_config.get("cropX2"),
stream_config.get("cropY2")
]
if all(coord is not None for coord in crop_coords):
x1, y1, x2, y2 = crop_coords
cropped_frame = frame[y1:y2, x1:x2]
logger.debug(f"Applied crop ({x1}, {y1}, {x2}, {y2}) to frame for camera {camera_id}")
return cropped_frame
return frame
async def _process_by_mode(
self,
camera_id: str,
mode: str,
frame: Any,
stream_config: Dict[str, Any],
model_tree: Any,
pipeline_state: Dict[str, Any],
backend_session_id: Optional[str]
) -> Optional[Dict[str, Any]]:
"""Process detection based on current mode."""
if mode == "validation_detecting":
return await self._process_validation_mode(
camera_id, frame, stream_config, model_tree, pipeline_state
)
elif mode == "send_detections":
return await self._process_send_detections_mode(
camera_id, frame, stream_config, model_tree, pipeline_state
)
elif mode == "waiting_for_session_id":
return await self._process_waiting_mode(
camera_id, frame, stream_config, model_tree, pipeline_state
)
elif mode == "full_pipeline":
return await self._process_full_pipeline_mode(
camera_id, frame, stream_config, model_tree,
pipeline_state, backend_session_id
)
elif mode == "lightweight":
return await self._process_lightweight_mode(
camera_id, frame, stream_config, model_tree, pipeline_state
)
else:
logger.warning(f"Unknown mode '{mode}' for camera {camera_id}")
return None
async def _process_validation_mode(
self,
camera_id: str,
frame: Any,
stream_config: Dict[str, Any],
model_tree: Any,
pipeline_state: Dict[str, Any]
) -> Optional[Dict[str, Any]]:
"""Process detection in validation mode."""
logger.debug(f"🔍 Camera {camera_id}: Processing in validation_detecting mode")
# Run detection only (no full pipeline)
detection_result = await self.pipeline_executor.run_detection_only(
camera_id, frame, model_tree, pipeline_state
)
if detection_result and detection_result.has_expected_classes():
# Validation successful - transition to send_detections mode
self.session_cache.update_session_pipeline_mode(camera_id, "send_detections")
logger.info(f"🎯 Camera {camera_id}: Validation successful - switching to send_detections mode")
return detection_result
async def _process_send_detections_mode(
self,
camera_id: str,
frame: Any,
stream_config: Dict[str, Any],
model_tree: Any,
pipeline_state: Dict[str, Any]
) -> Optional[Dict[str, Any]]:
"""Process detection in send_detections mode."""
logger.debug(f"🔍 Camera {camera_id}: Processing in send_detections mode")
# Continue sending detections, waiting for session ID
detection_result = await self.pipeline_executor.run_detection_only(
camera_id, frame, model_tree, pipeline_state
)
# Check if we should transition to waiting mode
if detection_result and detection_result.is_stable():
self.session_cache.update_session_pipeline_mode(camera_id, "waiting_for_session_id")
logger.info(f"⏳ Camera {camera_id}: Stable detection - switching to waiting_for_session_id mode")
return detection_result
async def _process_waiting_mode(
self,
camera_id: str,
frame: Any,
stream_config: Dict[str, Any],
model_tree: Any,
pipeline_state: Dict[str, Any]
) -> Optional[Dict[str, Any]]:
"""Process detection in waiting_for_session_id mode."""
logger.debug(f"🔍 Camera {camera_id}: Processing in waiting_for_session_id mode")
# Continue detection but wait for session ID to arrive
return await self.pipeline_executor.run_detection_only(
camera_id, frame, model_tree, pipeline_state
)
async def _process_full_pipeline_mode(
self,
camera_id: str,
frame: Any,
stream_config: Dict[str, Any],
model_tree: Any,
pipeline_state: Dict[str, Any],
backend_session_id: str
) -> Optional[Dict[str, Any]]:
"""Process detection with full pipeline."""
logger.debug(f"🔍 Camera {camera_id}: Processing in full_pipeline mode")
# Run full pipeline with all actions
result = await self.pipeline_executor.execute_pipeline(
camera_id, stream_config, frame, model_tree,
{}, pipeline_state, backend_session_id
)
# Check for mode transitions after pipeline
if result and result.get("should_switch_to_lightweight"):
self.session_cache.update_session_pipeline_mode(camera_id, "lightweight")
logger.info(f"💡 Camera {camera_id}: Switching to lightweight mode after full pipeline")
return result
async def _process_lightweight_mode(
self,
camera_id: str,
frame: Any,
stream_config: Dict[str, Any],
model_tree: Any,
pipeline_state: Dict[str, Any]
) -> Optional[Dict[str, Any]]:
"""Process detection in lightweight mode."""
logger.debug(f"🔍 Camera {camera_id}: Processing in lightweight mode")
# Check if YOLO inference is enabled for this stage
yolo_enabled = pipeline_state.get("yolo_inference_enabled", True)
if yolo_enabled:
# Run detection for absence monitoring
return await self.pipeline_executor.run_detection_only(
camera_id, frame, model_tree, pipeline_state
)
else:
# Return cached detection without inference
cached_result = self.session_cache.get_cached_detection(camera_id)
logger.debug(f"🔍 Camera {camera_id}: YOLO inference disabled - returning cached detection")
return cached_result
# Global orchestrator instance
_detection_orchestrator: Optional[DetectionOrchestrator] = None
def get_detection_orchestrator() -> DetectionOrchestrator:
"""Get or create the global detection orchestrator."""
global _detection_orchestrator
if _detection_orchestrator is None:
_detection_orchestrator = DetectionOrchestrator()
return _detection_orchestrator
# Main entry point for the refactored detect() function
async def handle_websocket_detection(websocket: WebSocket) -> None:
"""
Main entry point for WebSocket detection handling.
This replaces the original 586-line detect() function with a clean,
orchestrated approach using the DetectionOrchestrator.
Args:
websocket: WebSocket connection to handle
"""
orchestrator = get_detection_orchestrator()
await orchestrator.handle_websocket_connection(websocket)

View file

@ -493,6 +493,9 @@ class YOLODetector:
""" """
Run YOLO detection with BoT-SORT tracking and stability validation. Run YOLO detection with BoT-SORT tracking and stability validation.
This method has been refactored from 226 lines into smaller, focused helper methods
for better maintainability and testability.
Args: Args:
frame: Input frame/image frame: Input frame/image
node: Pipeline node configuration with model and settings node: Pipeline node configuration with model and settings
@ -504,6 +507,32 @@ class YOLODetector:
- regions_dict: Dict mapping class names to highest confidence detections - regions_dict: Dict mapping class names to highest confidence detections
- track_validation_result: Dict with validation status and stable tracks - track_validation_result: Dict with validation status and stable tracks
""" """
try:
# Extract context information
camera_id, model_id, current_mode = self._extract_detection_context(node, context)
# Initialize detection configuration
tracking_config = self._extract_tracking_config(node)
min_confidence = self._determine_confidence_threshold(node)
# Prepare tracker state
self._prepare_tracker_state(node, camera_id, model_id)
# Run core detection pipeline
detection_result = self._execute_detection_pipeline(
frame, node, camera_id, model_id, current_mode,
tracking_config, min_confidence
)
# Store validation context
if context is not None:
context["track_validation_result"] = detection_result[2]
return detection_result
except Exception as e:
logger.error(f"Detection error for camera {context.get('camera_id', 'unknown') if context else 'unknown'}: {e}")
return [], {}, {}
try: try:
camera_id = context.get("camera_id", "unknown") if context else "unknown" camera_id = context.get("camera_id", "unknown") if context else "unknown"
model_id = node.get("modelId", "unknown") model_id = node.get("modelId", "unknown")
@ -566,18 +595,116 @@ class YOLODetector:
return [], {}, track_validation_result.to_dict() return [], {}, track_validation_result.to_dict()
logger.info(f"✅ Camera {camera_id}: DETECTION COMPLETE - tracking single car: track_id={track_id}, conf={best_detection['confidence']:.3f}") logger.info(f"✅ Camera {camera_id}: DETECTION COMPLETE - tracking single car: track_id={track_id}, conf={best_detection['confidence']:.3f}")
logger.debug(f"📊 Camera {camera_id}: Detection summary: {len(res.boxes)} raw → {len(candidate_detections)} candidates → 1 selected")
# Store validation state in context for pipeline decisions # Store validation state in context for pipeline decisions
if context is not None: if context is not None:
context["track_validation_result"] = track_validation_result.to_dict() context["track_validation_result"] = track_validation_result.to_dict()
return all_detections, regions_dict, track_validation_result.to_dict() return all_detections, regions_dict, track_validation_result.to_dict()
except Exception as e: except Exception as e:
camera_id = context.get("camera_id", "unknown") if context else "unknown" logger.error(f"Detection error for camera {camera_id}: {e}")
model_id = node.get("modelId", "unknown") return [], {}, {}
raise create_detection_error(camera_id, model_id, "detection_with_tracking", e)
def _extract_detection_context(self, node: Dict[str, Any], context: Optional[Dict[str, Any]]) -> Tuple[str, str, str]:
"""Extract camera ID, model ID, and current mode from context."""
camera_id = context.get("camera_id", "unknown") if context else "unknown"
model_id = node.get("modelId", "unknown")
current_mode = context.get("current_mode", "unknown") if context else "unknown"
return camera_id, model_id, current_mode
def _prepare_tracker_state(self, node: Dict[str, Any], camera_id: str, model_id: str) -> None:
"""Prepare tracker state and reset if needed."""
# Check if we need to reset tracker after cooldown
self._reset_yolo_tracker_if_needed(node, camera_id, model_id)
def _execute_detection_pipeline(
self,
frame,
node: Dict[str, Any],
camera_id: str,
model_id: str,
current_mode: str,
tracking_config: TrackingConfig,
min_confidence: float
) -> Tuple[List[Dict[str, Any]], Dict[str, Any], Dict[str, Any]]:
"""Execute the core detection pipeline."""
# Run YOLO inference
res = self._run_yolo_inference(frame, node, tracking_config)
# Process detection results
candidate_detections = self._process_detections(res, node, camera_id, min_confidence)
# Select best detection
best_detection = self._select_best_detection(candidate_detections, camera_id)
# Update track stability validation
track_validation_result = self._update_track_stability(
best_detection, node, camera_id, model_id, current_mode
)
# Handle no detection case
if best_detection is None:
return [], {}, track_validation_result.to_dict()
# Process successful detection
return self._process_successful_detection(
best_detection, node, camera_id, track_validation_result
)
def _update_track_stability(
self,
detection: Optional[Dict[str, Any]],
node: Dict[str, Any],
camera_id: str,
model_id: str,
current_mode: str
) -> Any:
"""Update track stability validation."""
tracking_config = self._extract_tracking_config(node)
is_branch_node = node.get("cropClass") is not None or node.get("parallel") is True
return self.stability_tracker.update_single_track_stability(
detection=detection,
camera_id=camera_id,
model_id=model_id,
stability_threshold=tracking_config.stability_threshold,
current_mode=current_mode,
is_branch_node=is_branch_node
)
def _process_successful_detection(
self,
detection: Dict[str, Any],
node: Dict[str, Any],
camera_id: str,
track_validation_result: Any
) -> Tuple[List[Dict[str, Any]], Dict[str, Any], Dict[str, Any]]:
"""Process a successful detection result."""
# Apply class mapping
detection = self._apply_class_mapping(detection, node)
# Create regions dictionary
mapped_class = detection["class"]
track_id = detection["id"]
all_detections = [detection]
regions_dict = {
mapped_class: {
"bbox": detection["bbox"],
"confidence": detection["confidence"],
"detection": detection,
"track_id": track_id
}
}
# Multi-class validation
if not self._validate_multi_class(regions_dict, node):
return [], {}, track_validation_result.to_dict()
logger.info(f"✅ Camera {camera_id}: DETECTION COMPLETE - tracking single car: track_id={track_id}, conf={detection['confidence']:.3f}")
return all_detections, regions_dict, track_validation_result.to_dict()
# Global instances for backward compatibility # Global instances for backward compatibility

View file

@ -657,20 +657,167 @@ class PipelineExecutor:
self._execute_node_actions(node, frame, detection_result, regions_dict) self._execute_node_actions(node, frame, detection_result, regions_dict)
# ─── Return detection result ──────────────────────────────── # ─── Return detection result ────────────────────────────────
primary_detection = max(all_detections, key=lambda x: x["confidence"]) return self._finalize_pipeline_result(detection_result, return_bbox)
primary_bbox = primary_detection["bbox"]
# Add branch results and session_id to primary detection for compatibility
if "branch_results" in detection_result:
primary_detection["branch_results"] = detection_result["branch_results"]
if "session_id" in detection_result:
primary_detection["session_id"] = detection_result["session_id"]
return (primary_detection, primary_bbox) if return_bbox else primary_detection
except Exception as e: except Exception as e:
pipeline_id = node.get("modelId", "unknown") pipeline_id = node.get("modelId", "unknown")
raise create_pipeline_error(pipeline_id, "pipeline_execution", e) raise create_pipeline_error(pipeline_id, "pipeline_execution", e)
def _initialize_pipeline_execution(self, node: Dict[str, Any], context: Optional[Dict[str, Any]]) -> PipelineContext:
"""Initialize pipeline execution context."""
pipeline_context = self._extract_context(context)
model_id = node.get("modelId", "unknown")
if pipeline_context.backend_session_id:
logger.info(f"🔑 PIPELINE USING BACKEND SESSION_ID: {pipeline_context.backend_session_id} for camera {pipeline_context.camera_id}")
return pipeline_context
def _handle_classification_pipeline(
self,
frame: np.ndarray,
node: Dict[str, Any],
pipeline_context: PipelineContext,
return_bbox: bool
) -> Optional[Union[Dict[str, Any], Tuple[Dict[str, Any], List[int]]]]:
"""Handle classification pipeline tasks."""
task = getattr(node["model"], "task", None)
if task == "classify":
return self._handle_classification_task(frame, node, pipeline_context, return_bbox)
return None
def _check_session_state(
self,
pipeline_context: PipelineContext,
node: Dict[str, Any],
return_bbox: bool
) -> Optional[Union[Dict[str, Any], Tuple[Dict[str, Any], List[int]]]]:
"""Check session and camera activity state."""
model_id = node.get("modelId", "unknown")
return self._check_camera_active(pipeline_context.camera_id, model_id, return_bbox)
def _execute_detection_stage(
self,
frame: np.ndarray,
node: Dict[str, Any],
pipeline_context: PipelineContext,
validated_detection: Optional[Dict[str, Any]]
) -> Optional[Tuple[List[Dict[str, Any]], Dict[str, Any], Dict[str, Any]]]:
"""Execute the detection stage."""
all_detections, regions_dict, track_validation_result = self._run_detection_stage(
frame, node, pipeline_context, validated_detection
)
if not all_detections:
logger.debug("No detections from structured detection function")
return None
return all_detections, regions_dict, track_validation_result
def _create_none_detection(self, return_bbox: bool) -> Union[Dict[str, Any], Tuple[Dict[str, Any], List[int]]]:
"""Create a 'none' detection result."""
none_detection = {
"class": "none",
"confidence": 1.0,
"bbox": [0, 0, 0, 0],
"branch_results": {}
}
return (none_detection, [0, 0, 0, 0]) if return_bbox else none_detection
def _validate_pipeline_execution(
self,
node: Dict[str, Any],
track_validation_result: Dict[str, Any],
regions_dict: Dict[str, Any],
pipeline_context: PipelineContext,
return_bbox: bool
) -> Optional[Union[Dict[str, Any], Tuple[Dict[str, Any], List[int]], Tuple[None, None]]]:
"""Validate pipeline execution requirements."""
# Track-based validation
tracking_validation_result = self._validate_tracking_requirements(
node, track_validation_result, pipeline_context, return_bbox
)
if tracking_validation_result is not None:
return tracking_validation_result
# Pipeline execution validation
pipeline_valid, missing_branches = validate_pipeline_execution(node, regions_dict)
if not pipeline_valid:
logger.error(f"Pipeline execution validation FAILED - required branches {missing_branches} cannot execute")
logger.error("Aborting pipeline: no Redis actions or database records will be created")
return (None, None) if return_bbox else None
return None
def _execute_main_pipeline(
self,
frame: np.ndarray,
node: Dict[str, Any],
all_detections: List[Dict[str, Any]],
regions_dict: Dict[str, Any],
pipeline_context: PipelineContext
) -> Dict[str, Any]:
"""Execute the main pipeline with actions and branch processing."""
model_id = node.get("modelId", "unknown")
# Create detection result
detection_result = {
"detections": all_detections,
"regions": regions_dict,
**pipeline_context.to_dict()
}
# Handle database operations
self._handle_database_operations(node, detection_result, regions_dict, pipeline_context)
# Execute root node actions if no branches or specific model
if not node.get("branches") or node.get("modelId") == "yolo11n":
self._execute_node_actions(node, frame, detection_result, regions_dict)
# Process branches
branch_results = self._process_branches(frame, node, detection_result, regions_dict, pipeline_context)
detection_result["branch_results"] = branch_results
# Execute parallel actions
if node.get("parallelActions") and "branch_results" in detection_result:
self._execute_parallel_actions(node, frame, detection_result, regions_dict)
# Auto-enable occupancy mode after successful completion
occupancy_detector(pipeline_context.camera_id, model_id, enable=True)
logger.info(f"✅ Camera {pipeline_context.camera_id}: Pipeline completed, detection data will be sent to backend")
logger.info(f"🛑 Camera {pipeline_context.camera_id}: Model will stop inference for future frames")
logger.info(f"📡 Backend sessionId will be handled when received via WebSocket")
# Execute branch node actions
if node.get("actions") and regions_dict and node.get("modelId") != "yolo11n":
logger.debug(f"Executing post-detection actions for branch node {node.get('modelId')}")
self._execute_node_actions(node, frame, detection_result, regions_dict)
return detection_result
def _finalize_pipeline_result(
self,
detection_result: Dict[str, Any],
return_bbox: bool
) -> Union[Dict[str, Any], Tuple[Dict[str, Any], List[int]]]:
"""Finalize and return the pipeline result."""
all_detections = detection_result.get("detections", [])
if not all_detections:
return self._create_none_detection(return_bbox)
# Get primary detection (highest confidence)
primary_detection = max(all_detections, key=lambda x: x["confidence"])
primary_bbox = primary_detection["bbox"]
# Add branch results and session_id to primary detection for compatibility
if "branch_results" in detection_result:
primary_detection["branch_results"] = detection_result["branch_results"]
if "session_id" in detection_result:
primary_detection["session_id"] = detection_result["session_id"]
return (primary_detection, primary_bbox) if return_bbox else primary_detection
# Global pipeline executor instance # Global pipeline executor instance

View file

@ -0,0 +1,406 @@
"""
Common error handling and logging patterns.
This module provides standardized error handling, logging utilities, and
common patterns used throughout the detection worker system.
"""
import logging
import traceback
import functools
import time
from typing import Dict, Any, Optional, Callable, Union
from contextlib import contextmanager
from dataclasses import dataclass
from enum import Enum
from ..core.exceptions import DetectionError, PipelineError, StreamError, ModelLoadError
# Setup logging
logger = logging.getLogger("detector_worker.error_handler")
class ErrorSeverity(Enum):
"""Error severity levels."""
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
CRITICAL = "critical"
@dataclass
class ErrorContext:
"""Context information for error reporting."""
component: str = "unknown"
operation: str = "unknown"
camera_id: Optional[str] = None
model_id: Optional[str] = None
session_id: Optional[str] = None
additional_data: Optional[Dict[str, Any]] = None
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary."""
return {
"component": self.component,
"operation": self.operation,
"camera_id": self.camera_id,
"model_id": self.model_id,
"session_id": self.session_id,
"additional_data": self.additional_data or {}
}
class ErrorHandler:
"""
Centralized error handling and logging utility.
Provides consistent error handling patterns, logging formats,
and error recovery strategies across the detection worker.
"""
def __init__(self, component_name: str = "detector_worker"):
"""
Initialize error handler.
Args:
component_name: Name of the component using this handler
"""
self.component_name = component_name
self.error_counts: Dict[str, int] = {}
def handle_error(
self,
error: Exception,
context: ErrorContext,
severity: ErrorSeverity = ErrorSeverity.MEDIUM,
reraise: bool = True,
log_traceback: bool = True
) -> None:
"""
Handle an error with consistent logging and optional re-raising.
Args:
error: Exception that occurred
context: Error context information
severity: Error severity level
reraise: Whether to re-raise the exception
log_traceback: Whether to log the full traceback
"""
# Generate error key for counting
error_key = f"{context.component}:{context.operation}:{type(error).__name__}"
self.error_counts[error_key] = self.error_counts.get(error_key, 0) + 1
# Create error message
error_msg = self._format_error_message(error, context, severity)
# Log based on severity
if severity == ErrorSeverity.CRITICAL:
logger.critical(error_msg)
elif severity == ErrorSeverity.HIGH:
logger.error(error_msg)
elif severity == ErrorSeverity.MEDIUM:
logger.warning(error_msg)
else:
logger.info(error_msg)
# Log traceback if requested and severity is medium or higher
if log_traceback and severity in [ErrorSeverity.MEDIUM, ErrorSeverity.HIGH, ErrorSeverity.CRITICAL]:
logger.error(f"Traceback for {context.component}:{context.operation}:")
logger.error(traceback.format_exc())
# Re-raise if requested
if reraise:
# Convert to appropriate custom exception
custom_error = self._convert_to_custom_exception(error, context)
raise custom_error
def _format_error_message(
self,
error: Exception,
context: ErrorContext,
severity: ErrorSeverity
) -> str:
"""Format error message with context."""
severity_emoji = {
ErrorSeverity.LOW: "",
ErrorSeverity.MEDIUM: "⚠️",
ErrorSeverity.HIGH: "",
ErrorSeverity.CRITICAL: "🚨"
}
emoji = severity_emoji.get(severity, "")
parts = [
f"{emoji} {severity.value.upper()} ERROR in {context.component}",
f"Operation: {context.operation}",
f"Error: {type(error).__name__}: {error}"
]
if context.camera_id:
parts.append(f"Camera: {context.camera_id}")
if context.model_id:
parts.append(f"Model: {context.model_id}")
if context.session_id:
parts.append(f"Session: {context.session_id}")
error_key = f"{context.component}:{context.operation}:{type(error).__name__}"
count = self.error_counts.get(error_key, 1)
if count > 1:
parts.append(f"Count: {count}")
return " | ".join(parts)
def _convert_to_custom_exception(
self,
error: Exception,
context: ErrorContext
) -> Exception:
"""Convert generic exception to appropriate custom exception."""
error_msg = f"{context.operation} failed: {error}"
if context.component in ["yolo_detector", "tracking_manager", "stability_validator"]:
return DetectionError(error_msg, details=context.to_dict())
elif context.component in ["pipeline_executor", "action_executor", "field_mapper"]:
return PipelineError(error_msg, details=context.to_dict())
elif context.component in ["stream_manager", "frame_reader", "camera_monitor"]:
return StreamError(error_msg, details=context.to_dict())
elif context.component in ["model_manager", "pipeline_loader"]:
return ModelLoadError(error_msg, details=context.to_dict())
else:
return error
def get_error_stats(self) -> Dict[str, Any]:
"""Get error statistics."""
total_errors = sum(self.error_counts.values())
return {
"total_errors": total_errors,
"error_breakdown": dict(self.error_counts),
"unique_error_types": len(self.error_counts)
}
def reset_error_counts(self) -> None:
"""Reset error counts."""
self.error_counts.clear()
def with_error_handling(
component: str,
operation: str,
severity: ErrorSeverity = ErrorSeverity.MEDIUM,
reraise: bool = True,
default_return: Any = None
):
"""
Decorator to add consistent error handling to functions.
Args:
component: Component name
operation: Operation name
severity: Error severity level
reraise: Whether to re-raise exceptions
default_return: Default return value if error occurs and not re-raising
"""
def decorator(func: Callable) -> Callable:
@functools.wraps(func)
def wrapper(*args, **kwargs):
error_handler = ErrorHandler(component)
context = ErrorContext(
component=component,
operation=operation,
additional_data={"args": len(args), "kwargs": list(kwargs.keys())}
)
try:
return func(*args, **kwargs)
except Exception as e:
error_handler.handle_error(
e, context, severity=severity, reraise=reraise
)
return default_return
return wrapper
return decorator
@contextmanager
def error_context(
component: str,
operation: str,
camera_id: Optional[str] = None,
model_id: Optional[str] = None,
severity: ErrorSeverity = ErrorSeverity.MEDIUM
):
"""
Context manager for error handling.
Args:
component: Component name
operation: Operation name
camera_id: Optional camera ID
model_id: Optional model ID
severity: Error severity level
"""
error_handler = ErrorHandler(component)
context = ErrorContext(
component=component,
operation=operation,
camera_id=camera_id,
model_id=model_id
)
try:
yield context
except Exception as e:
error_handler.handle_error(e, context, severity=severity, reraise=True)
class PerformanceTimer:
"""
Performance timing utility with automatic logging.
"""
def __init__(
self,
operation: str,
component: str = "detector_worker",
log_threshold: float = 1.0,
auto_log: bool = True
):
"""
Initialize performance timer.
Args:
operation: Operation being timed
component: Component name
log_threshold: Log if operation takes longer than this (seconds)
auto_log: Whether to automatically log timing
"""
self.operation = operation
self.component = component
self.log_threshold = log_threshold
self.auto_log = auto_log
self.start_time: Optional[float] = None
self.end_time: Optional[float] = None
def __enter__(self) -> 'PerformanceTimer':
"""Start timing."""
self.start_time = time.time()
return self
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
"""End timing and optionally log."""
self.end_time = time.time()
if self.auto_log:
self.log_timing()
def get_duration(self) -> float:
"""Get duration in seconds."""
if self.start_time is None:
return 0.0
end = self.end_time or time.time()
return end - self.start_time
def log_timing(self) -> None:
"""Log timing information."""
duration = self.get_duration()
if duration >= self.log_threshold:
level = logging.WARNING if duration >= (self.log_threshold * 2) else logging.INFO
logger.log(
level,
f"⏱️ {self.component}:{self.operation} took {duration:.3f}s"
)
else:
logger.debug(f"⏱️ {self.component}:{self.operation} took {duration:.3f}s")
def log_function_entry_exit(component: str = "detector_worker"):
"""
Decorator to log function entry and exit.
Args:
component: Component name
"""
def decorator(func: Callable) -> Callable:
@functools.wraps(func)
def wrapper(*args, **kwargs):
func_name = f"{func.__module__}.{func.__qualname__}"
logger.debug(f"📍 ENTER {component}:{func_name}")
try:
result = func(*args, **kwargs)
logger.debug(f"📍 EXIT {component}:{func_name} -> {type(result).__name__}")
return result
except Exception as e:
logger.debug(f"📍 ERROR {component}:{func_name} -> {type(e).__name__}: {e}")
raise
return wrapper
return decorator
def create_logger(
name: str,
level: int = logging.INFO,
format_string: Optional[str] = None
) -> logging.Logger:
"""
Create a standardized logger.
Args:
name: Logger name
level: Logging level
format_string: Custom format string
Returns:
Configured logger
"""
if format_string is None:
format_string = "%(asctime)s | %(levelname)s | %(name)s | %(message)s"
logger = logging.getLogger(name)
logger.setLevel(level)
# Avoid adding handlers multiple times
if not logger.handlers:
handler = logging.StreamHandler()
handler.setLevel(level)
formatter = logging.Formatter(format_string)
handler.setFormatter(formatter)
logger.addHandler(handler)
return logger
# Global error handler instance
_global_error_handler = ErrorHandler("detector_worker")
# Convenience functions
def handle_error(
error: Exception,
component: str,
operation: str,
camera_id: Optional[str] = None,
model_id: Optional[str] = None,
severity: ErrorSeverity = ErrorSeverity.MEDIUM,
reraise: bool = True
) -> None:
"""Global error handling function."""
context = ErrorContext(
component=component,
operation=operation,
camera_id=camera_id,
model_id=model_id
)
_global_error_handler.handle_error(error, context, severity, reraise)
def get_global_error_stats() -> Dict[str, Any]:
"""Get global error statistics."""
return _global_error_handler.get_error_stats()
def reset_global_error_stats() -> None:
"""Reset global error statistics."""
_global_error_handler.reset_error_counts()