From aacc5145d4790f47c2c75ef178a6afabe1a5f844 Mon Sep 17 00:00:00 2001 From: ziesorx Date: Fri, 12 Sep 2025 20:18:14 +0700 Subject: [PATCH] Chore: update arch md --- ARCHITECTURE.md | 1286 +++++++++++++++++++++++++++++++++++++++++++++++ CLAUDE.md | 443 ++++++++++++---- 2 files changed, 1626 insertions(+), 103 deletions(-) create mode 100644 ARCHITECTURE.md diff --git a/ARCHITECTURE.md b/ARCHITECTURE.md new file mode 100644 index 0000000..72533a9 --- /dev/null +++ b/ARCHITECTURE.md @@ -0,0 +1,1286 @@ +# Detector Worker - Architecture & Workflow Documentation + +## Table of Contents + +1. [Architecture Overview](#architecture-overview) +2. [Module Structure](#module-structure) +3. [System Startup Flow](#system-startup-flow) +4. [WebSocket Communication Flow](#websocket-communication-flow) +5. [Detection Pipeline Flow](#detection-pipeline-flow) +6. [Data Storage Flow](#data-storage-flow) +7. [Error Handling & Recovery](#error-handling--recovery) +8. [Testing Architecture](#testing-architecture) +9. [Development Workflow](#development-workflow) + +## Architecture Overview + +The Detector Worker has been refactored from a monolithic 4,115-line application into a modular, maintainable system with clear separation of concerns. The new architecture follows modern software engineering principles including dependency injection, thread safety, and comprehensive testing. + +### High-Level System Diagram + +```mermaid +graph TB + Client[WebSocket Client] --> WS[WebSocket Handler] + WS --> MP[Message Processor] + MP --> SM[Stream Manager] + MP --> MM[Model Manager] + MP --> PE[Pipeline Executor] + + SM --> FR[Frame Reader] + FR --> YD[YOLO Detector] + YD --> TM[Tracking Manager] + TM --> SV[Stability Validator] + + PE --> AE[Action Executor] + PE --> FM[Field Mapper] + AE --> DM[Database Manager] + AE --> RC[Redis Client] + AE --> SC[Session Cache] + + subgraph "Singleton Managers" + MSM[Model State Manager] + SSM[Stream State Manager] + SeSM[Session State Manager] + CSM[Cache State Manager] + CaSM[Camera State Manager] + PSM[Pipeline State Manager] + end + + subgraph "Storage Layer" + PostgreSQL[(PostgreSQL)] + Redis[(Redis)] + LocalCache[Local Cache] + end + + DM --> PostgreSQL + RC --> Redis + SC --> LocalCache +``` + +### Key Architectural Principles + +1. **Modular Design**: Each module has a single responsibility +2. **Dependency Injection**: IoC container manages object dependencies +3. **Thread Safety**: Singleton managers with proper locking +4. **Async/Await**: Non-blocking I/O operations throughout +5. **Type Safety**: Comprehensive type hints and validation +6. **Error Resilience**: Proper exception handling and recovery + +## Module Structure + +### Core Modules (`detector_worker/core/`) + +#### `config.py` - Configuration Management +```python +# Central configuration with multi-source loading +Configuration() + ├── load_from_file(path) # JSON/YAML config files + ├── load_from_env() # Environment variables + └── validate_config() # Configuration validation + +ConfigurationProvider(ABC) # Abstract base for config sources + ├── JSONConfigProvider + ├── YAMLConfigProvider + └── EnvironmentConfigProvider +``` + +#### `dependency_injection.py` - IoC Container +```python +ServiceContainer() + ├── register_singleton() # Single instance per container + ├── register_transient() # New instance per request + ├── register_scoped() # Instance per scope + └── resolve() # Dependency resolution with auto-injection +``` + +#### `singleton_managers.py` - Thread-Safe State Management +```python +# Six singleton managers replacing global dictionaries +ModelStateManager() # Model loading states +StreamStateManager() # Active stream connections +SessionStateManager() # Client session tracking +CacheStateManager() # Cache state management +CameraStateManager() # Camera connection states +PipelineStateManager() # Pipeline execution states +``` + +#### `exceptions.py` - Exception Hierarchy +```python +DetectorWorkerError # Base exception +├── ConfigurationError # Configuration issues +├── StreamError # Stream-related errors +├── ModelError # Model loading/inference errors +├── PipelineError # Pipeline execution errors +├── DatabaseError # Database operation errors +├── RedisError # Redis operation errors +└── MessageProcessingError # WebSocket message errors +``` + +### Communication Layer (`detector_worker/communication/`) + +#### `websocket_handler.py` - WebSocket Management +```python +WebSocketHandler() + ├── handle_websocket() # Main WebSocket connection handler + ├── _heartbeat_loop() # Keep-alive mechanism + └── _cleanup_connections() # Connection cleanup + +ConnectionManager() + ├── add_connection() # Register new client + ├── remove_connection() # Cleanup disconnected client + ├── broadcast() # Send to all clients + └── broadcast_to_subscription() # Send to specific subscription + +WebSocketConnection() # Per-client connection wrapper + ├── accept() # Accept WebSocket connection + ├── send_message() # Send JSON/text message + ├── receive_message() # Receive and parse message + └── ping() # Send keep-alive ping +``` + +#### `message_processor.py` - Message Processing Pipeline +```python +MessageProcessor() + ├── process_message() # Main message dispatcher + ├── _handle_subscribe() # Process subscription requests + ├── _handle_unsubscribe() # Process unsubscription + ├── _handle_state_request() # System state requests + └── _handle_session_ops() # Session management operations + +MessageType(Enum) # Supported message types + ├── SUBSCRIBE + ├── UNSUBSCRIBE + ├── REQUEST_STATE + ├── SET_SESSION_ID + └── PATCH_SESSION +``` + +### Stream Management (`detector_worker/streams/`) + +#### `stream_manager.py` - Stream Lifecycle Management +```python +StreamManager() + ├── create_stream() # Create RTSP/HTTP stream + ├── remove_stream() # Stop and cleanup stream + ├── get_latest_frame() # Get current frame + ├── reconnect_stream() # Handle connection failures + └── stop_all_streams() # Cleanup all streams + +StreamConfig() # Stream configuration + ├── stream_url # RTSP/HTTP URL + ├── stream_type # "rtsp" or "http_snapshot" + ├── target_fps # Target frame rate + └── reconnect_interval # Reconnection delay + +StreamReader() # Individual stream handler + ├── start() # Start frame capture + ├── stop() # Stop and cleanup + ├── get_latest_frame() # Get most recent frame + └── _reader_loop() # Main capture loop +``` + +#### `frame_reader.py` - Frame Capture Implementation +```python +RTSPReader() # RTSP stream handler + ├── connect() # Establish RTSP connection + ├── read_frame() # Capture single frame + └── handle_reconnection() # Connection recovery + +HTTPSnapshotReader() # HTTP snapshot handler + ├── fetch_snapshot() # HTTP GET request + ├── decode_image() # Image decoding + └── schedule_next() # Schedule next capture +``` + +### Detection System (`detector_worker/detection/`) + +#### `yolo_detector.py` - Object Detection +```python +YOLODetector() + ├── load_model() # Load YOLO model + ├── detect() # Run inference + ├── _preprocess_frame() # Input preprocessing + ├── _postprocess_results() # Output processing + └── _filter_detections() # Confidence filtering + +DetectionResult() # Detection output structure + ├── class_name # Detected class + ├── confidence # Detection confidence + ├── bounding_box # Spatial coordinates + ├── track_id # Tracking identifier + └── timestamp # Detection timestamp +``` + +#### `tracking_manager.py` - Multi-Object Tracking +```python +TrackingManager() + ├── update_tracks() # Update tracker with detections + ├── _associate_detections() # Data association + ├── _create_new_tracks() # Initialize new tracks + ├── _update_existing_tracks() # Update track states + └── _cleanup_lost_tracks() # Remove stale tracks + +Track() # Individual object track + ├── update() # Update with new detection + ├── predict() # Predict next state + ├── is_confirmed() # Track confirmation status + └── time_since_update() # Track age +``` + +#### `stability_validator.py` - Detection Validation +```python +StabilityValidator() + ├── add_detection() # Add detection to history + ├── is_detection_stable() # Check stability criteria + ├── _calculate_stability() # Stability metrics + └── _cleanup_old_detections() # History management +``` + +### Pipeline System (`detector_worker/pipeline/`) + +#### `pipeline_executor.py` - ML Pipeline Orchestration +```python +PipelineExecutor() + ├── execute_pipeline() # Main pipeline execution + ├── _run_detection_stage() # Object detection phase + ├── _run_classification_branches() # Parallel classification + ├── _execute_actions() # Post-processing actions + └── _wait_for_branches() # Synchronization + +PipelineContext() # Execution context + ├── camera_id # Camera identifier + ├── session_id # Session identifier + ├── frame # Input frame + ├── timestamp # Processing timestamp + └── intermediate_results # Shared results +``` + +#### `action_executor.py` - Action Processing +```python +ActionExecutor() + ├── execute_action() # Execute single action + ├── _redis_save_image() # Redis image storage + ├── _postgresql_create() # Database record creation + ├── _postgresql_update() # Database record update + └── _publish_message() # Message publishing + +ActionType(Enum) # Supported action types + ├── REDIS_SAVE_IMAGE + ├── POSTGRESQL_CREATE + ├── POSTGRESQL_UPDATE + └── PUBLISH_MESSAGE +``` + +#### `field_mapper.py` - Dynamic Field Resolution +```python +FieldMapper() + ├── resolve_fields() # Resolve template fields + ├── _substitute_variables() # Variable substitution + ├── _resolve_branch_results() # Branch result mapping + └── _validate_mapping() # Mapping validation +``` + +### Storage Layer (`detector_worker/storage/`) + +#### `database_manager.py` - PostgreSQL Operations +```python +DatabaseManager() + ├── connect() # Database connection + ├── create_record() # INSERT operations + ├── update_record() # UPDATE operations + ├── get_record() # SELECT operations + ├── execute_query() # Raw SQL execution + └── _handle_connection_error() # Error recovery + +DatabaseConfig() # Database configuration + ├── host, port, database # Connection params + ├── user, password # Authentication + └── connection_pool_size # Pool configuration +``` + +#### `redis_client.py` - Redis Operations & Image Storage +```python +RedisClient() + ├── connect() # Redis connection + ├── set/get/delete() # Basic operations + ├── pipeline() # Batch operations + └── scan_keys() # Key scanning + +RedisImageStorage() # Image-specific operations + ├── store_image() # Store with compression + ├── retrieve_image() # Retrieve and decode + ├── delete_image() # Delete image + └── cleanup_expired() # Cleanup expired images + +RedisPublisher/Subscriber() # Pub/Sub messaging + ├── publish() # Publish message + ├── subscribe() # Subscribe to channel + └── listen() # Message listening +``` + +#### `session_cache.py` - High-Performance Caching +```python +SessionCacheManager() # Singleton cache manager + ├── cache_detection() # Cache detection results + ├── cache_pipeline_result() # Cache pipeline outputs + ├── create_session() # Create session entry + ├── update_session() # Update session data + └── cleanup_expired() # Cache maintenance + +SessionCache() # LRU cache implementation + ├── put/get/remove() # Basic cache operations + ├── _evict_lru() # LRU eviction + ├── _check_memory_limit() # Memory management + └── get_stats() # Cache statistics +``` + +### Model Management (`detector_worker/models/`) + +#### `model_manager.py` - Model Loading & Caching +```python +ModelManager() + ├── load_model() # Load and cache model + ├── get_model() # Retrieve cached model + ├── unload_model() # Remove from cache + ├── cleanup_unused() # Cache maintenance + └── get_memory_usage() # Memory tracking + +ModelCache() # Model cache implementation + ├── put/get/remove() # Cache operations + ├── _estimate_memory() # Memory estimation + ├── _evict_unused() # Memory-based eviction + └── get_cache_stats() # Cache metrics +``` + +#### `pipeline_loader.py` - MPTA Pipeline Loading +```python +PipelineLoader() + ├── load_pipeline() # Load MPTA file + ├── _extract_archive() # ZIP extraction + ├── _parse_config() # Configuration parsing + ├── _validate_pipeline() # Pipeline validation + └── _load_models() # Load pipeline models +``` + +## System Startup Flow + +### Application Initialization Sequence + +```mermaid +sequenceDiagram + participant Main as app.py + participant Config as Configuration + participant Container as ServiceContainer + participant Managers as Singleton Managers + participant FastAPI as FastAPI App + + Main->>Config: load_configuration() + Config->>Config: load_from_file() + Config->>Config: load_from_env() + Config->>Config: validate_config() + + Main->>Container: ServiceContainer() + Main->>Container: register_services() + Container->>Managers: initialize_singletons() + + Main->>FastAPI: create_app() + FastAPI->>FastAPI: setup_lifespan() + FastAPI->>FastAPI: add_websocket_routes() + FastAPI->>FastAPI: add_http_routes() + + Note over Main,FastAPI: Application Ready + FastAPI->>Main: uvicorn.run() +``` + +### Detailed Startup Process + +1. **Configuration Loading** (`app.py:15-25`) + ```python + # Load configuration from multiple sources + config = Configuration() + config.load_from_file("config.json") # Primary config + config.load_from_env() # Environment overrides + config.validate_config() # Validation + ``` + +2. **Dependency Injection Setup** (`app.py:27-45`) + ```python + # Create and configure IoC container + container = ServiceContainer() + + # Register core services + container.register_singleton(Configuration, lambda: config) + container.register_singleton(StreamManager, StreamManager) + container.register_singleton(ModelManager, ModelManager) + container.register_singleton(PipelineExecutor, PipelineExecutor) + ``` + +3. **Singleton Manager Initialization** (`app.py:47-55`) + ```python + # Initialize thread-safe singleton managers + model_state = ModelStateManager() + stream_state = StreamStateManager() + session_state = SessionStateManager() + # ... other managers + ``` + +4. **FastAPI Application Creation** (`app.py:57-75`) + ```python + # Create FastAPI app with lifespan management + @asynccontextmanager + async def lifespan(app: FastAPI): + # Startup logic + await initialize_services() + yield + # Shutdown logic + await cleanup_services() + + app = FastAPI(lifespan=lifespan) + ``` + +5. **Route Registration** (`app.py:77-85`) + ```python + # WebSocket endpoint + @app.websocket("/") + async def websocket_endpoint(websocket: WebSocket): + ws_handler = container.resolve(WebSocketHandler) + await ws_handler.handle_connection(websocket) + + # HTTP endpoints + @app.get("/camera/{camera_id}/image") + async def get_camera_image(camera_id: str): + return stream_manager.get_latest_frame(camera_id) + ``` + +## WebSocket Communication Flow + +### Client Connection Lifecycle + +```mermaid +sequenceDiagram + participant Client as WebSocket Client + participant WS as WebSocketHandler + participant CM as ConnectionManager + participant MP as MessageProcessor + participant SM as StreamManager + + Client->>WS: WebSocket Connection + WS->>WS: handle_websocket() + WS->>CM: add_connection() + CM->>CM: create WebSocketConnection + WS->>Client: Connection Accepted + + loop Message Processing + Client->>WS: JSON Message + WS->>MP: process_message() + + alt Subscribe Message + MP->>SM: create_stream() + SM->>SM: initialize StreamReader + MP->>Client: subscribeAck + else Unsubscribe Message + MP->>SM: remove_stream() + SM->>SM: cleanup StreamReader + MP->>Client: unsubscribeAck + else State Request + MP->>MP: collect_system_state() + MP->>Client: stateReport + end + end + + Client->>WS: Disconnect + WS->>CM: remove_connection() + WS->>SM: cleanup_client_streams() +``` + +### Message Processing Detail + +#### 1. Subscribe Message Flow (`message_processor.py:125-185`) + +```python +async def _handle_subscribe(self, payload: Dict, client_id: str) -> Dict: + """Process subscription request""" + + # 1. Extract subscription parameters + subscription_id = payload["subscriptionIdentifier"] + stream_url = payload.get("rtspUrl") or payload.get("snapshotUrl") + model_url = payload["modelUrl"] + + # 2. Create stream configuration + stream_config = StreamConfig( + stream_url=stream_url, + stream_type="rtsp" if "rtsp" in stream_url else "http_snapshot", + crop_region=[payload.get("cropX1"), payload.get("cropY1"), + payload.get("cropX2"), payload.get("cropY2")] + ) + + # 3. Load ML pipeline + pipeline_config = await pipeline_loader.load_from_url(model_url) + + # 4. Create stream (with sharing if same URL) + stream_info = await stream_manager.create_stream( + camera_id=subscription_id.split(';')[1], + config=stream_config, + subscription_id=subscription_id + ) + + # 5. Register client subscription + connection_manager.add_subscription(client_id, subscription_id) + + return {"type": "subscribeAck", "status": "success", + "subscriptionId": subscription_id} +``` + +#### 2. Detection Result Broadcasting (`websocket_handler.py:245-265`) + +```python +async def broadcast_detection_result(self, subscription_id: str, + detection_result: Dict): + """Broadcast detection to subscribed clients""" + + message = { + "type": "imageDetection", + "payload": { + "subscriptionId": subscription_id, + "detections": detection_result["detections"], + "timestamp": detection_result["timestamp"], + "modelInfo": detection_result["model_info"] + } + } + + await self.connection_manager.broadcast_to_subscription( + subscription_id, message + ) +``` + +## Detection Pipeline Flow + +### Complete Detection Workflow + +```mermaid +flowchart TD + A[Frame Captured] --> B[YOLO Detection] + B --> C{Expected Classes Found?} + C -->|No| D[Skip Processing] + C -->|Yes| E[Multi-Object Tracking] + E --> F[Stability Validation] + F --> G{Stable Detection?} + G -->|No| H[Continue Tracking] + G -->|Yes| I[Create Database Record] + I --> J[Execute Redis Actions] + J --> K[Start Classification Branches] + + K --> L[Brand Classification] + K --> M[Body Type Classification] + K --> N[License Plate Recognition] + + L --> O[Wait for All Branches] + M --> O + N --> O + + O --> P[Field Mapping & Resolution] + P --> Q[Database Update Combined] + Q --> R[Broadcast Results] + R --> S[Update Session Cache] +``` + +### Detailed Pipeline Execution (`pipeline_executor.py:85-250`) + +#### 1. Detection Stage +```python +async def _run_detection_stage(self, pipeline_config: Dict, + context: PipelineContext) -> List[DetectionResult]: + """Execute object detection stage""" + + # 1. Load detection model + model = await self.model_manager.load_model( + ModelConfig.from_dict(pipeline_config) + ) + + # 2. Run YOLO inference + detector = YOLODetector() + raw_detections = detector.detect( + frame=context.frame, + confidence_threshold=pipeline_config["minConfidence"] + ) + + # 3. Filter expected classes + expected_classes = pipeline_config["expectedClasses"] + filtered_detections = [ + det for det in raw_detections + if det.class_name in expected_classes + ] + + # 4. Update tracking + if len(filtered_detections) > 0: + tracking_manager = TrackingManager() + tracked_detections = tracking_manager.update_tracks( + filtered_detections, context.frame_id + ) + + # 5. Validate stability + stability_validator = StabilityValidator() + stable_detections = [] + for det in tracked_detections: + if stability_validator.is_detection_stable(det): + stable_detections.append(det) + + return stable_detections + + return [] +``` + +#### 2. Action Execution Stage +```python +async def _execute_actions(self, actions: List[Dict], + context: PipelineContext) -> Dict: + """Execute pipeline actions""" + + action_results = {} + action_executor = ActionExecutor() + + for action in actions: + action_type = action["type"] + + if action_type == "redis_save_image": + # Save cropped image to Redis + result = await action_executor.redis_save_image( + frame=context.frame, + region=action["region"], # e.g., "Frontal" + key_template=action["key"], + context=context, + expire_seconds=action.get("expire_seconds", 3600) + ) + + elif action_type == "postgresql_create_record": + # Create initial database record + result = await action_executor.postgresql_create( + table=action["table"], + fields=action["fields"], + context=context + ) + + action_results[action_type] = result + + return action_results +``` + +#### 3. Classification Branch Execution +```python +async def _run_classification_branches(self, branches: List[Dict], + context: PipelineContext) -> Dict: + """Execute parallel classification branches""" + + if not branches: + return {} + + # 1. Create parallel tasks for each branch + branch_tasks = [] + for branch in branches: + if branch.get("parallel", False): + task = asyncio.create_task( + self._execute_branch(branch, context) + ) + branch_tasks.append((branch["modelId"], task)) + + # 2. Wait for all branches to complete + branch_results = {} + for model_id, task in branch_tasks: + try: + result = await task + branch_results[model_id] = result + except Exception as e: + logging.error(f"Branch {model_id} failed: {e}") + branch_results[model_id] = {"error": str(e)} + + return branch_results + +async def _execute_branch(self, branch_config: Dict, + context: PipelineContext) -> Dict: + """Execute single classification branch""" + + # 1. Load classification model + model = await self.model_manager.load_model( + ModelConfig.from_dict(branch_config) + ) + + # 2. Prepare input (crop if specified) + input_frame = context.frame + if branch_config.get("crop", False): + crop_class = branch_config["cropClass"] + # Find detection of specified class and crop + for detection in context.detections: + if detection.class_name == crop_class: + bbox = detection.bounding_box + input_frame = context.frame[bbox.y1:bbox.y2, bbox.x1:bbox.x2] + break + + # 3. Run classification + classifier = YOLODetector() # Can handle classification too + results = classifier.classify( + frame=input_frame, + confidence_threshold=branch_config["minConfidence"] + ) + + # 4. Format results based on model type + if "brand" in branch_config["modelId"]: + return {"brand": results.top_class, "confidence": results.confidence} + elif "bodytype" in branch_config["modelId"]: + return {"body_type": results.top_class, "confidence": results.confidence} + else: + return {"class": results.top_class, "confidence": results.confidence} +``` + +#### 4. Field Mapping & Database Update +```python +async def _execute_parallel_actions(self, actions: List[Dict], + context: PipelineContext, + branch_results: Dict) -> Dict: + """Execute actions that depend on branch results""" + + for action in actions: + if action["type"] == "postgresql_update_combined": + # 1. Wait for specified branches to complete + wait_for_branches = action.get("waitForBranches", []) + for branch_id in wait_for_branches: + if branch_id not in branch_results: + logging.warning(f"Branch {branch_id} not completed") + + # 2. Resolve field mappings + field_mapper = FieldMapper() + resolved_fields = field_mapper.resolve_fields( + field_templates=action["fields"], + context=context, + branch_results=branch_results + ) + + # Example field resolution: + # "{car_brand_cls_v1.brand}" -> "Toyota" + # "{car_bodytype_cls_v1.body_type}" -> "Sedan" + + # 3. Execute database update + database_manager = DatabaseManager() + await database_manager.update_record( + table=action["table"], + key_value=context.session_id, + key_field=action["key_field"], + update_data=resolved_fields + ) + + return {"status": "success", "fields_updated": resolved_fields} + + return {} +``` + +## Data Storage Flow + +### Database Operations Flow + +```mermaid +sequenceDiagram + participant PE as PipelineExecutor + participant AE as ActionExecutor + participant DM as DatabaseManager + participant FM as FieldMapper + participant DB as PostgreSQL + + PE->>AE: postgresql_create_record + AE->>DM: create_record() + DM->>DB: INSERT INTO car_frontal_info + DB->>DM: session_id (UUID) + DM->>AE: Record created + + Note over PE: Classification branches execute... + + PE->>FM: resolve_fields(templates, branch_results) + FM->>FM: substitute variables + FM->>PE: resolved_fields + + PE->>AE: postgresql_update_combined + AE->>DM: update_record() + DM->>DB: UPDATE car_frontal_info SET car_brand=?, car_body_type=? + DB->>DM: Update successful + DM->>AE: Record updated +``` + +### Redis Storage Operations + +```mermaid +sequenceDiagram + participant AE as ActionExecutor + participant RC as RedisClient + participant RIS as RedisImageStorage + participant Redis as Redis Server + + AE->>RC: redis_save_image + RC->>RIS: store_image() + RIS->>RIS: crop_region_from_frame() + RIS->>RIS: compress_image() + RIS->>Redis: SET key encoded_image + RIS->>Redis: EXPIRE key 600 + Redis->>RIS: OK + RIS->>RC: Storage successful + RC->>AE: Image saved +``` + +### Session Cache Operations + +```mermaid +flowchart LR + A[Detection Event] --> B[Cache Detection Result] + B --> C[Create Session Entry] + C --> D[Pipeline Processing] + D --> E[Update Session with Branch Results] + E --> F[Cache Pipeline Result] + F --> G[Broadcast to Clients] + + subgraph "Cache Types" + H[Detection Cache
Latest detection per camera] + I[Pipeline Cache
Pipeline execution results] + J[Session Cache
Session tracking data] + end + + B --> H + F --> I + C --> J + E --> J +``` + +## Error Handling & Recovery + +### Exception Hierarchy & Handling + +```mermaid +classDiagram + DetectorWorkerError <|-- ConfigurationError + DetectorWorkerError <|-- StreamError + DetectorWorkerError <|-- ModelError + DetectorWorkerError <|-- PipelineError + DetectorWorkerError <|-- DatabaseError + DetectorWorkerError <|-- RedisError + DetectorWorkerError <|-- MessageProcessingError + + StreamError <|-- ConnectionError + StreamError <|-- StreamTimeoutError + + ModelError <|-- ModelLoadError + ModelError <|-- ModelCacheError + + PipelineError <|-- ActionExecutionError + PipelineError <|-- BranchExecutionError +``` + +### Error Recovery Strategies + +#### 1. Stream Connection Recovery (`stream_manager.py:245-285`) +```python +async def _handle_stream_error(self, stream_id: str, error: Exception): + """Handle stream errors with exponential backoff retry""" + + stream_info = self.get_stream_info(stream_id) + if not stream_info: + return + + # Increment error count + stream_info.error_count += 1 + stream_info.update_status("error", error_message=str(error)) + + # Exponential backoff retry + max_retries = stream_info.config.max_retries + if max_retries == -1 or stream_info.error_count <= max_retries: + + # Calculate backoff delay + base_delay = stream_info.config.reconnect_interval + backoff_delay = base_delay * (2 ** min(stream_info.error_count - 1, 6)) + + logging.warning(f"Stream {stream_id} error: {error}. " + f"Retrying in {backoff_delay} seconds...") + + await asyncio.sleep(backoff_delay) + + try: + await self.reconnect_stream(stream_id) + stream_info.error_count = 0 # Reset on success + stream_info.update_status("active") + + except Exception as retry_error: + logging.error(f"Stream {stream_id} retry failed: {retry_error}") + await self._handle_stream_error(stream_id, retry_error) + else: + # Max retries exceeded + logging.error(f"Stream {stream_id} exceeded max retries. Marking as failed.") + stream_info.update_status("failed") + await self._notify_stream_failure(stream_id) +``` + +#### 2. Database Connection Recovery (`database_manager.py:185-220`) +```python +async def _execute_with_retry(self, operation: Callable, *args, **kwargs): + """Execute database operation with connection retry""" + + max_retries = 3 + retry_delay = 1.0 + + for attempt in range(max_retries + 1): + try: + return await operation(*args, **kwargs) + + except psycopg2.OperationalError as e: + if attempt == max_retries: + raise DatabaseError(f"Database operation failed after {max_retries} retries: {e}") + + logging.warning(f"Database operation failed (attempt {attempt + 1}): {e}") + + # Try to reconnect + try: + await self.disconnect() + await asyncio.sleep(retry_delay) + await self.connect() + retry_delay *= 2 # Exponential backoff + + except Exception as reconnect_error: + logging.error(f"Database reconnection failed: {reconnect_error}") + + except Exception as e: + # Non-recoverable error + raise DatabaseError(f"Database operation failed: {e}") +``` + +#### 3. Pipeline Error Isolation (`pipeline_executor.py:325-365`) +```python +async def _execute_branch_with_isolation(self, branch_config: Dict, + context: PipelineContext) -> Dict: + """Execute branch with error isolation""" + + branch_id = branch_config["modelId"] + + try: + # Set timeout for branch execution + timeout = branch_config.get("timeout_seconds", 30) + + result = await asyncio.wait_for( + self._execute_branch(branch_config, context), + timeout=timeout + ) + + return result + + except asyncio.TimeoutError: + error_msg = f"Branch {branch_id} timed out after {timeout} seconds" + logging.error(error_msg) + return {"error": error_msg, "type": "timeout"} + + except ModelError as e: + error_msg = f"Branch {branch_id} model error: {e}" + logging.error(error_msg) + return {"error": error_msg, "type": "model_error"} + + except Exception as e: + error_msg = f"Branch {branch_id} unexpected error: {e}" + logging.error(error_msg, exc_info=True) + return {"error": error_msg, "type": "unexpected_error"} + +async def _handle_partial_branch_failure(self, branch_results: Dict, + required_branches: List[str]) -> bool: + """Determine if pipeline can continue with partial branch failures""" + + successful_branches = [ + branch_id for branch_id, result in branch_results.items() + if not isinstance(result, dict) or "error" not in result + ] + + failed_branches = [ + branch_id for branch_id, result in branch_results.items() + if isinstance(result, dict) and "error" in result + ] + + if failed_branches: + logging.warning(f"Failed branches: {failed_branches}") + logging.info(f"Successful branches: {successful_branches}") + + # Continue if at least one required branch succeeded + required_successful = any( + branch_id in successful_branches + for branch_id in required_branches + ) + + return required_successful +``` + +## Testing Architecture + +### Test Structure Overview + +``` +tests/ +├── unit/ # Fast, isolated unit tests +│ ├── core/ # Core module tests (config, DI, singletons) +│ ├── detection/ # Detection system tests +│ ├── pipeline/ # Pipeline execution tests +│ ├── streams/ # Stream management tests +│ ├── communication/ # WebSocket & messaging tests +│ ├── storage/ # Storage layer tests +│ └── models/ # Model management tests +├── integration/ # Multi-component integration tests +│ ├── test_complete_detection_workflow.py +│ ├── test_websocket_protocol.py +│ └── test_pipeline_integration.py +├── performance/ # Performance benchmarks +│ ├── test_detection_performance.py +│ ├── test_websocket_performance.py +│ └── test_storage_performance.py +└── conftest.py # Shared fixtures and configuration +``` + +### Test Execution Flows + +#### Unit Test Example (`tests/unit/detection/test_yolo_detector.py`) +```python +class TestYOLODetector: + """Test YOLO detector functionality""" + + def test_detection_basic_functionality(self, mock_frame): + """Test basic detection pipeline""" + detector = YOLODetector() + + with patch('torch.load') as mock_torch_load: + # Setup mock model + mock_model = Mock() + mock_result = self._create_mock_detection_result() + mock_model.return_value = mock_result + mock_torch_load.return_value = mock_model + + # Execute detection + detections = detector.detect(mock_frame, confidence_threshold=0.5) + + # Verify results + assert len(detections) == 2 + assert detections[0].class_name == "car" + assert detections[0].confidence >= 0.5 + assert isinstance(detections[0].bounding_box, BoundingBox) +``` + +#### Integration Test Example (`tests/integration/test_complete_detection_workflow.py`) +```python +@pytest.mark.asyncio +async def test_complete_rtsp_detection_workflow(self, temp_config_file, + sample_mpta_file, mock_frame): + """Test complete workflow: RTSP stream -> detection -> classification -> database""" + + # 1. Initialize all components + config = Configuration() + config.load_from_file(temp_config_file) + + # 2. Mock external dependencies (Redis, DB, models) + with patch('cv2.VideoCapture') as mock_video_cap, \ + patch('torch.load') as mock_torch_load, \ + patch('psycopg2.connect') as mock_db_connect: + + # Setup mocks... + + # 3. Execute complete workflow + stream_manager = StreamManager() + pipeline_executor = PipelineExecutor() + + # Create stream + stream_info = await stream_manager.create_stream(camera_id, config, sub_id) + + # Run pipeline + result = await pipeline_executor.execute_pipeline(pipeline_config, context) + + # 4. Verify end-to-end results + assert result["status"] == "completed" + assert "detections" in result + assert "classification_results" in result + + # Verify database operations occurred + assert mock_db_cursor.execute.called + + # Verify Redis operations occurred + assert mock_redis.set.called +``` + +#### Performance Test Example (`tests/performance/test_detection_performance.py`) +```python +def test_yolo_detection_speed(self, sample_frame, performance_config): + """Benchmark YOLO detection speed""" + detector = YOLODetector() + + # Warm up + for _ in range(5): + detector.detect(sample_frame, confidence_threshold=0.5) + + # Benchmark + detection_times = [] + for _ in range(100): + start_time = time.perf_counter() + detections = detector.detect(sample_frame, confidence_threshold=0.5) + end_time = time.perf_counter() + + detection_times.append((end_time - start_time) * 1000) + + # Analyze performance + avg_time = statistics.mean(detection_times) + theoretical_fps = 1000 / avg_time + + # Performance assertions + assert avg_time < performance_config["max_detection_time_ms"] + assert theoretical_fps >= performance_config["target_fps"] + + print(f"Average detection time: {avg_time:.2f} ms") + print(f"Theoretical FPS: {theoretical_fps:.1f}") +``` + +## Development Workflow + +### Development Commands + +```bash +# Setup +make install-dev # Install development dependencies +make format # Format code with black & isort +make lint # Run code linting + +# Testing +make test # Run all tests with coverage +make test-unit # Run unit tests only +make test-integration # Run integration tests +make test-performance # Run performance benchmarks +make test-fast # Run fast tests only +make test-coverage # Generate detailed coverage report + +# Development +make run # Run the application +make clean # Clean build artifacts +``` + +### Using the Test Runner + +```bash +# Basic test execution +python scripts/run_tests.py --all # All tests with coverage +python scripts/run_tests.py --unit --verbose # Unit tests with verbose output +python scripts/run_tests.py --integration # Integration tests only +python scripts/run_tests.py --performance # Performance benchmarks + +# Advanced options +python scripts/run_tests.py --fast # Fast tests only (no slow markers) +python scripts/run_tests.py --failed # Rerun only failed tests +python scripts/run_tests.py --specific "config" # Run tests matching pattern +python scripts/run_tests.py --coverage --open-browser # Generate and open coverage report + +# Quality checks +python scripts/run_tests.py --quality # Run linting and formatting checks +``` + +### CI/CD Pipeline + +The GitHub Actions workflow (`.github/workflows/ci.yml`) runs: + +1. **Code Quality Checks**: flake8, mypy, black, isort +2. **Unit Tests**: Fast, isolated tests with coverage +3. **Integration Tests**: With Redis and PostgreSQL services +4. **Performance Tests**: On main branch pushes +5. **Security Scans**: safety and bandit +6. **Docker Build**: Verify containerization works + +### Adding New Features + +1. **Create Feature Branch** + ```bash + git checkout -b feature/new-detection-algorithm + ``` + +2. **Implement with TDD** + ```bash + # 1. Write failing test + # tests/unit/detection/test_new_algorithm.py + + # 2. Implement minimal code to pass + # detector_worker/detection/new_algorithm.py + + # 3. Refactor and improve + # 4. Add integration tests if needed + ``` + +3. **Run Quality Checks** + ```bash + make format # Auto-format code + make lint # Check code quality + make test # Run all tests + ``` + +4. **Create Pull Request** + - CI/CD pipeline runs automatically + - Coverage report posted as comment + - All checks must pass before merge + +### Debugging & Monitoring + +#### Logging Configuration +```python +# detector_worker/utils/logging.py +import logging + +def setup_logging(level=logging.INFO): + """Configure structured logging""" + + formatter = logging.Formatter( + '%(asctime)s - %(name)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s' + ) + + # Console handler + console_handler = logging.StreamHandler() + console_handler.setFormatter(formatter) + + # File handler + file_handler = logging.FileHandler('detector_worker.log') + file_handler.setFormatter(formatter) + + # Root logger + logger = logging.getLogger() + logger.setLevel(level) + logger.addHandler(console_handler) + logger.addHandler(file_handler) + + return logger +``` + +#### Performance Monitoring +```python +# detector_worker/utils/monitoring.py +import psutil +import time +from typing import Dict + +class SystemMonitor: + """System resource monitoring""" + + def get_system_metrics(self) -> Dict: + """Get current system metrics""" + + return { + "cpu_percent": psutil.cpu_percent(), + "memory_percent": psutil.virtual_memory().percent, + "disk_percent": psutil.disk_usage('/').percent, + "network_io": psutil.net_io_counters()._asdict(), + "timestamp": time.time() + } + + def get_process_metrics(self) -> Dict: + """Get current process metrics""" + + process = psutil.Process() + + return { + "pid": process.pid, + "cpu_percent": process.cpu_percent(), + "memory_mb": process.memory_info().rss / 1024 / 1024, + "threads": process.num_threads(), + "open_files": len(process.open_files()), + "connections": len(process.connections()), + "timestamp": time.time() + } +``` + +This comprehensive architecture documentation provides a complete technical overview of the refactored system, enabling any engineer to quickly understand and contribute to the codebase. The modular design, clear separation of concerns, and extensive testing infrastructure ensure the system is maintainable, scalable, and reliable. \ No newline at end of file diff --git a/CLAUDE.md b/CLAUDE.md index 06f7b97..d05925c 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -3,6 +3,8 @@ ## Project Overview This is a FastAPI-based computer vision detection worker that processes video streams from RTSP/HTTP sources and runs advanced YOLO-based machine learning pipelines for multi-class object detection and parallel classification. The system features comprehensive database integration, Redis support, and hierarchical pipeline execution designed to work within a larger CMS (Content Management System) architecture. +**IMPORTANT**: This project has been completely refactored from a monolithic 4,000+ line codebase into a modular, maintainable architecture with dependency injection, comprehensive testing, and clear separation of concerns. + ### Key Features - **Multi-Class Detection**: Simultaneous detection of multiple object classes (e.g., Car + Frontal) - **Parallel Processing**: Concurrent execution of classification branches using ThreadPoolExecutor @@ -10,77 +12,241 @@ This is a FastAPI-based computer vision detection worker that processes video st - **Redis Actions**: Image storage with region cropping and pub/sub messaging - **Pipeline Synchronization**: Branch coordination with `waitForBranches` functionality - **Dynamic Field Mapping**: Template-based field resolution for database operations +- **Modular Architecture**: Clean separation of concerns with dependency injection +- **Thread-Safe Operations**: Singleton managers for global state management +- **Comprehensive Testing**: Unit, integration, and performance test suites ## Architecture & Technology Stack -- **Framework**: FastAPI with WebSocket support -- **ML/CV**: PyTorch, Ultralytics YOLO, OpenCV +- **Framework**: FastAPI with WebSocket support and modern async patterns +- **ML/CV**: PyTorch, Ultralytics YOLO, OpenCV with BoT-SORT tracking - **Containerization**: Docker (Python 3.13-bookworm base) - **Data Storage**: Redis integration for action handling + PostgreSQL for persistent storage - **Database**: Automatic schema management with gas_station_1 database - **Parallel Processing**: ThreadPoolExecutor for concurrent classification - **Communication**: WebSocket-based real-time protocol +- **Design Patterns**: Dependency Injection, Singleton, Repository patterns +- **Testing**: Pytest with asyncio, mocking, and benchmarking support -## Core Components +## New Modular Architecture + +### Core Directory Structure +``` +detector_worker/ +├── core/ # Core infrastructure components +│ ├── config.py # Multi-source configuration management +│ ├── constants.py # Application constants +│ ├── exceptions.py # Custom exception hierarchy +│ ├── singleton_managers.py # Thread-safe singleton managers +│ └── dependency_injection.py # IoC container implementation +├── detection/ # Computer vision and ML components +│ ├── yolo_detector.py # YOLO model inference +│ ├── tracking_manager.py # BoT-SORT multi-object tracking +│ └── stability_validator.py # Detection stability validation +├── pipeline/ # ML pipeline execution +│ ├── pipeline_executor.py # Main pipeline orchestration +│ ├── action_executor.py # Redis/DB action execution +│ └── field_mapper.py # Template-based field mapping +├── streams/ # Video stream management +│ ├── stream_manager.py # Stream lifecycle management +│ ├── frame_reader.py # RTSP/HTTP frame reading +│ └── camera_monitor.py # Connection state monitoring +├── communication/ # WebSocket and messaging +│ ├── websocket_handler.py # WebSocket connection management +│ ├── message_processor.py # Message routing and processing +│ └── response_formatter.py # Response formatting +├── storage/ # Data persistence +│ ├── database_manager.py # PostgreSQL operations +│ ├── redis_client.py # Redis operations +│ └── session_cache.py # Session and cache management +├── models/ # Model and pipeline management +│ ├── model_manager.py # YOLO model loading/caching +│ └── pipeline_loader.py # MPTA pipeline loading +└── utils/ # Utility functions + ├── logging_utils.py # Structured logging setup + ├── image_utils.py # Image processing utilities + └── system_utils.py # System monitoring utilities +``` ### Main Application (`app.py`) -- **FastAPI WebSocket server** for real-time communication -- **Multi-camera stream management** with shared stream optimization +- **FastAPI application** with modern lifespan management (replacing deprecated @app.on_event) +- **Dependency injection container** initialization and service registration +- **WebSocket endpoint** delegation to communication layer - **HTTP REST endpoint** for image retrieval (`/camera/{camera_id}/image`) -- **Threading-based frame readers** for RTSP streams and HTTP snapshots -- **Model loading and inference** using MPTA (Machine Learning Pipeline Archive) format -- **Session management** with display identifier mapping -- **Resource monitoring** (CPU, memory, GPU usage via psutil) +- **Clean separation** between framework code and business logic +- **Resource monitoring** integrated with singleton managers -### Pipeline System (`siwatsystem/pympta.py`) -- **MPTA file handling** - ZIP archives containing model configurations -- **Hierarchical pipeline execution** with detection → classification branching -- **Multi-class detection** - Simultaneous detection of multiple classes (Car + Frontal) -- **Parallel processing** - Concurrent classification branches with ThreadPoolExecutor -- **Redis action system** - Image saving with region cropping and message publishing -- **PostgreSQL integration** - Automatic table creation and combined updates -- **Dynamic model loading** with GPU optimization -- **Configurable trigger classes and confidence thresholds** -- **Branch synchronization** - waitForBranches coordination for database updates +### Core Infrastructure (`core/`) -### Database System (`siwatsystem/database.py`) -- **DatabaseManager class** for PostgreSQL operations -- **Automatic table creation** with gas_station_1.car_frontal_info schema -- **Combined update operations** with field mapping from branch results -- **Session management** with UUID generation -- **Error handling** and connection management +#### Configuration Management (`core/config.py`) +- **Multi-source configuration** with environment variables, JSON files, and defaults +- **Type-safe configuration** with proper validation and error handling +- **Development/production** environment support +- **Configuration providers** pattern for extensible config sources -### Testing & Debugging -- **Protocol test script** (`test_protocol.py`) for WebSocket communication validation -- **Pipeline webcam utility** (`pipeline_webcam.py`) for local testing with visual output -- **RTSP streaming debug tool** (`debug/rtsp_webcam.py`) using GStreamer +#### Singleton Managers (`core/singleton_managers.py`) +- **Thread-safe singleton implementation** using metaclass pattern +- **Six specialized managers**: + - `SubscriptionManager`: WebSocket subscription lifecycle + - `StreamManager`: RTSP/HTTP stream management with reference counting + - `ModelManager`: YOLO model loading and caching + - `PipelineManager`: MPTA pipeline loading and management + - `SessionManager`: Session tracking and management + - `SessionCacheManager`: Redis-backed session caching +- **Replaces global dictionaries** for thread-safe state management -## Code Conventions & Patterns +#### Dependency Injection (`core/dependency_injection.py`) +- **Full IoC container** with automatic constructor injection +- **Three service lifetimes**: Singleton, Transient, Scoped +- **Interface-based registration** supporting both concrete types and factories +- **Circular dependency detection** and resolution +- **Scope management** for request-scoped services -### Logging -- **Structured logging** using Python's logging module -- **File + console output** to `detector_worker.log` -- **Debug level separation** for detailed troubleshooting -- **Context-aware messages** with camera IDs and model information +### Detection System (`detection/`) -### Error Handling -- **Graceful failure handling** with retry mechanisms (configurable max_retries) -- **Thread-safe operations** using locks for streams and models -- **WebSocket disconnect handling** with proper cleanup -- **Model loading validation** with detailed error reporting +#### YOLO Detector (`detection/yolo_detector.py`) +- **YOLODetector class** for inference management +- **Multi-class detection** support (Car + Frontal simultaneously) +- **GPU/CPU optimization** with automatic device selection +- **Confidence filtering** and NMS post-processing +- **Integration with tracking system** -### Configuration -- **JSON configuration** (`config.json`) for runtime parameters: - - `poll_interval_ms`: Frame processing interval - - `max_streams`: Concurrent stream limit - - `target_fps`: Target frame rate - - `reconnect_interval_sec`: Stream reconnection delay - - `max_retries`: Maximum retry attempts (-1 for unlimited) +#### Tracking Manager (`detection/tracking_manager.py`) +- **BoT-SORT implementation** for multi-object tracking +- **Track lifecycle management** with unique ID assignment +- **Stability validation** integration for reliable detections +- **Performance optimization** with configurable parameters -### Threading Model -- **Frame reader threads** for each camera stream (RTSP/HTTP) -- **Shared stream optimization** - multiple subscriptions can reuse the same camera stream -- **Async WebSocket handling** with concurrent task management -- **Thread-safe data structures** with proper locking mechanisms +#### Stability Validator (`detection/stability_validator.py`) +- **Detection stability analysis** over time windows +- **Confidence trend analysis** for reliable object detection +- **Configurable thresholds** for stability determination +- **Integration with tracking and pipeline systems** + +### Pipeline System (`pipeline/`) + +#### Pipeline Executor (`pipeline/pipeline_executor.py`) +- **PipelineExecutor class** for orchestrating MPTA pipeline execution +- **Hierarchical execution** with detection → classification branching +- **Parallel processing** using ThreadPoolExecutor for classification branches +- **Branch synchronization** with `waitForBranches` coordination +- **Error handling** and retry mechanisms + +#### Action Executor (`pipeline/action_executor.py`) +- **ActionExecutor class** for Redis and database actions +- **Redis image storage** with region cropping and expiration +- **PostgreSQL operations** with automatic schema management +- **Pub/sub messaging** for real-time notifications +- **Template-based configuration** for dynamic action parameters + +#### Field Mapper (`pipeline/field_mapper.py`) +- **FieldMapper class** for template-based field resolution +- **Dynamic field mapping** from classification results to database fields +- **Template syntax**: `{model_id.field_name}` → actual values +- **Type-safe field resolution** with error handling + +### Stream Management (`streams/`) + +#### Stream Manager (`streams/stream_manager.py`) +- **StreamManager class** for RTSP/HTTP stream lifecycle management +- **Reference counting** for shared stream optimization +- **Automatic reconnection** with exponential backoff +- **Thread-safe operations** with proper locking mechanisms + +#### Frame Reader (`streams/frame_reader.py`) +- **FrameReader class** for RTSP stream processing +- **SnapshotReader class** for HTTP snapshot capture +- **Queue-based buffering** with latest frame optimization +- **Error handling** and connection recovery + +#### Camera Monitor (`streams/camera_monitor.py`) +- **CameraMonitor class** for connection state tracking +- **Health monitoring** and status reporting +- **Integration with stream management** for failure detection + +### Communication Layer (`communication/`) + +#### WebSocket Handler (`communication/websocket_handler.py`) +- **WebSocketHandler class** for connection lifecycle management +- **Message routing** to appropriate processors +- **Error handling** and graceful disconnection +- **Integration with subscription management** + +#### Message Processor (`communication/message_processor.py`) +- **MessageProcessor class** for message routing and handling +- **Type-safe message processing** with validation +- **Integration with detection pipeline** and state management +- **Real-time response formatting** + +#### Response Formatter (`communication/response_formatter.py`) +- **ResponseFormatter class** for consistent message formatting +- **Detection result formatting** with timestamp and metadata +- **State report generation** with system metrics +- **JSON serialization** with type safety + +### Storage Layer (`storage/`) + +#### Database Manager (`storage/database_manager.py`) +- **DatabaseManager class** with Repository pattern implementation +- **Automatic schema management** for gas_station_1.car_frontal_info +- **Connection pooling** and transaction management +- **Combined update operations** with field mapping +- **Error handling** and retry mechanisms + +#### Redis Client (`storage/redis_client.py`) +- **RedisClient class** for Redis operations +- **Image storage** with automatic expiration +- **Pub/sub messaging** for real-time notifications +- **Connection management** with retry logic +- **Type-safe operations** with proper error handling + +#### Session Cache (`storage/session_cache.py`) +- **SessionCacheManager class** for session and cache management +- **Redis-backed caching** with configurable expiration +- **Session tracking** with UUID generation +- **Thread-safe operations** with singleton pattern + +### Model Management (`models/`) + +#### Model Manager (`models/model_manager.py`) +- **ModelManager class** for YOLO model lifecycle management +- **Model caching** with memory optimization +- **GPU/CPU device management** with automatic selection +- **Thread-safe model loading** and inference preparation + +#### Pipeline Loader (`models/pipeline_loader.py`) +- **PipelineLoader class** for MPTA archive handling +- **ZIP archive extraction** and validation +- **Configuration parsing** with schema validation +- **Model file management** and loading coordination + +## System Workflow + +### Application Startup (app.py) +1. **Configuration Loading** - Multi-source configuration initialization +2. **Dependency Container Setup** - Service registration and IoC container configuration +3. **Singleton Manager Initialization** - Thread-safe global state managers +4. **FastAPI Application Creation** - Modern lifespan management setup +5. **Service Registration** - All components registered with dependency injection +6. **Server Launch** - Uvicorn server startup with WebSocket support + +### WebSocket Connection Flow +1. **Client Connection** (`communication/websocket_handler.py:35`) +2. **Message Reception** (`communication/message_processor.py:42`) +3. **Subscription Processing** (`communication/message_processor.py:78`) +4. **Stream Initialization** (`streams/stream_manager.py:156`) +5. **Model Loading** (`models/model_manager.py:89`) +6. **Pipeline Setup** (`pipeline/pipeline_executor.py:67`) + +### Detection Pipeline Flow +1. **Frame Capture** (`streams/frame_reader.py:123`) +2. **YOLO Detection** (`detection/yolo_detector.py:145`) +3. **Multi-Object Tracking** (`detection/tracking_manager.py:178`) +4. **Stability Validation** (`detection/stability_validator.py:92`) +5. **Pipeline Execution** (`pipeline/pipeline_executor.py:234`) +6. **Action Execution** (`pipeline/action_executor.py:189`) +7. **Field Mapping** (`pipeline/field_mapper.py:67`) +8. **Database Update** (`storage/database_manager.py:234`) +9. **Response Formatting** (`communication/response_formatter.py:89`) ## WebSocket Protocol @@ -171,66 +337,111 @@ This is a FastAPI-based computer vision detection worker that processes video st } ``` -## Stream Management +## Testing Infrastructure -### Shared Streams -- Multiple subscriptions can share the same camera URL -- Reference counting prevents premature stream termination -- Automatic cleanup when last subscription ends +### Test Structure +``` +tests/ +├── unit/ # Unit tests for individual components +│ ├── core/ # Core infrastructure tests +│ ├── detection/ # Computer vision component tests +│ ├── pipeline/ # Pipeline execution tests +│ ├── streams/ # Stream management tests +│ ├── communication/ # WebSocket and messaging tests +│ ├── storage/ # Database and Redis tests +│ └── models/ # Model management tests +├── integration/ # End-to-end workflow tests +├── performance/ # Performance benchmarks and load tests +└── fixtures/ # Test data and mock objects +``` -### Frame Processing -- **Queue-based buffering** with single frame capacity (latest frame only) -- **Configurable polling interval** based on target FPS -- **Automatic reconnection** with exponential backoff +### Test Commands +```bash +# Run all tests +make test -## Development & Testing +# Run specific test categories +make test-unit +make test-integration +make test-performance -### Local Development +# Run with coverage reporting +make test-coverage + +# Run code quality checks +make lint +make format +``` + +## Development & Deployment + +### Local Development Setup ```bash # Install dependencies -pip install -r requirements.txt +make install-dev -# Run the worker -python app.py +# Run the application +make run -# Test protocol compliance -python test_protocol.py +# Run with debug mode +make run-debug -# Test pipeline with webcam -python pipeline_webcam.py --mpta-file path/to/model.mpta --video 0 +# Run tests +make test + +# Format code +make format + +# Run quality checks +make quality ``` ### Docker Deployment ```bash # Build container -docker build -t detector-worker . +make docker-build -# Run with volume mounts for models -docker run -p 8000:8000 -v ./models:/app/models detector-worker +# Run container +make docker-run + +# Development container with volume mounts +make docker-dev ``` -### Testing Commands -- **Protocol testing**: `python test_protocol.py` -- **Pipeline validation**: `python pipeline_webcam.py --mpta-file --video 0` -- **RTSP debugging**: `python debug/rtsp_webcam.py` +### Configuration Management +- **Environment Variables**: Override default configuration +- **config.json**: Development and production settings +- **DETECTOR_WORKER_ENV**: Environment selection (dev/test/prod) -## Dependencies -- **fastapi[standard]**: Web framework with WebSocket support -- **uvicorn**: ASGI server -- **torch, torchvision**: PyTorch for ML inference -- **ultralytics**: YOLO implementation -- **opencv-python**: Computer vision operations -- **websockets**: WebSocket client/server -- **redis**: Redis client for action execution -- **psycopg2-binary**: PostgreSQL database adapter -- **scipy**: Scientific computing for advanced algorithms -- **filterpy**: Kalman filtering and state estimation +## Code Conventions & Patterns + +### Design Patterns Used +- **Dependency Injection**: IoC container with automatic constructor injection +- **Singleton Pattern**: Thread-safe implementation using metaclass +- **Repository Pattern**: Database operations with interface abstraction +- **Factory Pattern**: Model and pipeline loading with configurable factories +- **Observer Pattern**: WebSocket communication and event handling + +### Code Quality Standards +- **Type Hints**: Comprehensive type annotations throughout codebase +- **Error Handling**: Structured exception hierarchy with proper propagation +- **Logging**: Structured logging with context-aware messages +- **Testing**: Unit, integration, and performance tests with high coverage +- **Documentation**: Comprehensive docstrings and inline documentation + +### Performance Optimizations +- **GPU Acceleration**: CUDA support with automatic fallback to CPU +- **Memory Management**: Model caching with automatic cleanup +- **Thread Safety**: Proper locking mechanisms for concurrent operations +- **Resource Monitoring**: CPU, memory, and GPU usage tracking +- **Connection Pooling**: Database and Redis connection optimization ## Security Considerations -- Model files are loaded from trusted sources only -- Redis connections use authentication when configured -- WebSocket connections handle disconnects gracefully -- Resource usage is monitored to prevent DoS +- **Input Validation**: All WebSocket messages and configuration validated +- **Resource Limits**: Configurable limits for streams, models, and connections +- **Error Handling**: Graceful failure handling without information leakage +- **Authentication**: Redis and PostgreSQL connection security +- **Model Security**: MPTA file validation and secure model loading ## Database Integration @@ -264,14 +475,40 @@ Templates like `{car_brand_cls_v1.brand}` are resolved to actual classification - `car_brand_cls_v1.brand` → "Honda" - `car_bodytype_cls_v1.body_type` → "Sedan" -## Performance Optimizations -- GPU acceleration when CUDA is available -- Shared camera streams reduce resource usage -- Frame queue optimization (single latest frame) -- Model caching across subscriptions -- Trigger class filtering for faster inference -- Parallel processing with ThreadPoolExecutor for classification branches -- Multi-class detection reduces inference passes -- Region-based cropping minimizes processing overhead -- Database connection pooling and prepared statements -- Redis image storage with automatic expiration \ No newline at end of file +## Migration from Legacy Code + +### Key Changes +- **Monolithic to Modular**: 4,000+ lines split into 30+ focused modules +- **Global State Elimination**: Replaced with thread-safe singleton managers +- **Dependency Injection**: Automatic constructor injection with IoC container +- **Modern FastAPI**: Deprecated patterns replaced with current best practices +- **Comprehensive Testing**: Full test suite with unit, integration, and performance tests + +### Breaking Changes +- **Import Paths**: All imports now use `detector_worker.` prefix +- **Configuration**: New multi-source configuration system +- **Dependencies**: New dependency injection requirements +- **Testing**: New test infrastructure and commands + +## Dependencies +- **fastapi[standard]**: Web framework with WebSocket support +- **uvicorn**: ASGI server +- **torch, torchvision**: PyTorch for ML inference +- **ultralytics**: YOLO implementation +- **opencv-python**: Computer vision operations +- **websockets**: WebSocket client/server +- **redis**: Redis client for action execution +- **psycopg2-binary**: PostgreSQL database adapter +- **scipy**: Scientific computing for advanced algorithms +- **filterpy**: Kalman filtering and state estimation +- **pytest**: Testing framework with asyncio support +- **black, isort**: Code formatting tools +- **mypy**: Static type checking +- **flake8**: Code quality analysis + +## References +- **ARCHITECTURE.md**: Comprehensive technical documentation with flow diagrams +- **test_protocol.py**: WebSocket communication validation +- **pipeline_webcam.py**: Local testing with visual output +- **Makefile**: Development workflow commands +- **tox.ini**: Multi-environment testing configuration \ No newline at end of file