# Python Detector Worker - CLAUDE.md ## Project Overview This is a FastAPI-based computer vision detection worker that processes video streams from RTSP/HTTP sources and runs advanced YOLO-based machine learning pipelines for multi-class object detection and parallel classification. The system features comprehensive database integration, Redis support, and hierarchical pipeline execution designed to work within a larger CMS (Content Management System) architecture. **IMPORTANT**: This project has been completely refactored from a monolithic 4,000+ line codebase into a modular, maintainable architecture with dependency injection, comprehensive testing, and clear separation of concerns. ### Key Features - **Multi-Class Detection**: Simultaneous detection of multiple object classes (e.g., Car + Frontal) - **Parallel Processing**: Concurrent execution of classification branches using ThreadPoolExecutor - **Database Integration**: Automatic PostgreSQL schema management and record updates - **Redis Actions**: Image storage with region cropping and pub/sub messaging - **Pipeline Synchronization**: Branch coordination with `waitForBranches` functionality - **Dynamic Field Mapping**: Template-based field resolution for database operations - **Modular Architecture**: Clean separation of concerns with dependency injection - **Thread-Safe Operations**: Singleton managers for global state management - **Comprehensive Testing**: Unit, integration, and performance test suites ## Architecture & Technology Stack - **Framework**: FastAPI with WebSocket support and modern async patterns - **ML/CV**: PyTorch, Ultralytics YOLO, OpenCV with BoT-SORT tracking - **Containerization**: Docker (Python 3.13-bookworm base) - **Data Storage**: Redis integration for action handling + PostgreSQL for persistent storage - **Database**: Automatic schema management with gas_station_1 database - **Parallel Processing**: ThreadPoolExecutor for concurrent classification - **Communication**: WebSocket-based real-time protocol - **Design Patterns**: Dependency Injection, Singleton, Repository patterns - **Testing**: Pytest with asyncio, mocking, and benchmarking support ## New Modular Architecture ### Core Directory Structure ``` detector_worker/ ├── core/ # Core infrastructure components │ ├── config.py # Multi-source configuration management │ ├── constants.py # Application constants │ ├── exceptions.py # Custom exception hierarchy │ ├── singleton_managers.py # Thread-safe singleton managers │ └── dependency_injection.py # IoC container implementation ├── detection/ # Computer vision and ML components │ ├── yolo_detector.py # YOLO model inference │ ├── tracking_manager.py # BoT-SORT multi-object tracking │ └── stability_validator.py # Detection stability validation ├── pipeline/ # ML pipeline execution │ ├── pipeline_executor.py # Main pipeline orchestration │ ├── action_executor.py # Redis/DB action execution │ └── field_mapper.py # Template-based field mapping ├── streams/ # Video stream management │ ├── stream_manager.py # Stream lifecycle management │ ├── frame_reader.py # RTSP/HTTP frame reading │ └── camera_monitor.py # Connection state monitoring ├── communication/ # WebSocket and messaging │ ├── websocket_handler.py # WebSocket connection management │ ├── message_processor.py # Message routing and processing │ └── response_formatter.py # Response formatting ├── storage/ # Data persistence │ ├── database_manager.py # PostgreSQL operations │ ├── redis_client.py # Redis operations │ └── session_cache.py # Session and cache management ├── models/ # Model and pipeline management │ ├── model_manager.py # YOLO model loading/caching │ └── pipeline_loader.py # MPTA pipeline loading └── utils/ # Utility functions ├── logging_utils.py # Structured logging setup ├── image_utils.py # Image processing utilities └── system_utils.py # System monitoring utilities ``` ### Main Application (`app.py`) - **FastAPI application** with modern lifespan management (replacing deprecated @app.on_event) - **Dependency injection container** initialization and service registration - **WebSocket endpoint** delegation to communication layer - **HTTP REST endpoint** for image retrieval (`/camera/{camera_id}/image`) - **Clean separation** between framework code and business logic - **Resource monitoring** integrated with singleton managers ### Core Infrastructure (`core/`) #### Configuration Management (`core/config.py`) - **Multi-source configuration** with environment variables, JSON files, and defaults - **Type-safe configuration** with proper validation and error handling - **Development/production** environment support - **Configuration providers** pattern for extensible config sources #### Singleton Managers (`core/singleton_managers.py`) - **Thread-safe singleton implementation** using metaclass pattern - **Six specialized managers**: - `SubscriptionManager`: WebSocket subscription lifecycle - `StreamManager`: RTSP/HTTP stream management with reference counting - `ModelManager`: YOLO model loading and caching - `PipelineManager`: MPTA pipeline loading and management - `SessionManager`: Session tracking and management - `SessionCacheManager`: Redis-backed session caching - **Replaces global dictionaries** for thread-safe state management #### Dependency Injection (`core/dependency_injection.py`) - **Full IoC container** with automatic constructor injection - **Three service lifetimes**: Singleton, Transient, Scoped - **Interface-based registration** supporting both concrete types and factories - **Circular dependency detection** and resolution - **Scope management** for request-scoped services ### Detection System (`detection/`) #### YOLO Detector (`detection/yolo_detector.py`) - **YOLODetector class** for inference management - **Multi-class detection** support (Car + Frontal simultaneously) - **GPU/CPU optimization** with automatic device selection - **Confidence filtering** and NMS post-processing - **Integration with tracking system** #### Tracking Manager (`detection/tracking_manager.py`) - **BoT-SORT implementation** for multi-object tracking - **Track lifecycle management** with unique ID assignment - **Stability validation** integration for reliable detections - **Performance optimization** with configurable parameters #### Stability Validator (`detection/stability_validator.py`) - **Detection stability analysis** over time windows - **Confidence trend analysis** for reliable object detection - **Configurable thresholds** for stability determination - **Integration with tracking and pipeline systems** ### Pipeline System (`pipeline/`) #### Pipeline Executor (`pipeline/pipeline_executor.py`) - **PipelineExecutor class** for orchestrating MPTA pipeline execution - **Hierarchical execution** with detection → classification branching - **Parallel processing** using ThreadPoolExecutor for classification branches - **Branch synchronization** with `waitForBranches` coordination - **Error handling** and retry mechanisms #### Action Executor (`pipeline/action_executor.py`) - **ActionExecutor class** for Redis and database actions - **Redis image storage** with region cropping and expiration - **PostgreSQL operations** with automatic schema management - **Pub/sub messaging** for real-time notifications - **Template-based configuration** for dynamic action parameters #### Field Mapper (`pipeline/field_mapper.py`) - **FieldMapper class** for template-based field resolution - **Dynamic field mapping** from classification results to database fields - **Template syntax**: `{model_id.field_name}` → actual values - **Type-safe field resolution** with error handling ### Stream Management (`streams/`) #### Stream Manager (`streams/stream_manager.py`) - **StreamManager class** for RTSP/HTTP stream lifecycle management - **Reference counting** for shared stream optimization - **Automatic reconnection** with exponential backoff - **Thread-safe operations** with proper locking mechanisms #### Frame Reader (`streams/frame_reader.py`) - **FrameReader class** for RTSP stream processing - **SnapshotReader class** for HTTP snapshot capture - **Queue-based buffering** with latest frame optimization - **Error handling** and connection recovery #### Camera Monitor (`streams/camera_monitor.py`) - **CameraMonitor class** for connection state tracking - **Health monitoring** and status reporting - **Integration with stream management** for failure detection ### Communication Layer (`communication/`) #### WebSocket Handler (`communication/websocket_handler.py`) - **WebSocketHandler class** for connection lifecycle management - **Message routing** to appropriate processors - **Error handling** and graceful disconnection - **Integration with subscription management** #### Message Processor (`communication/message_processor.py`) - **MessageProcessor class** for message routing and handling - **Type-safe message processing** with validation - **Integration with detection pipeline** and state management - **Real-time response formatting** #### Response Formatter (`communication/response_formatter.py`) - **ResponseFormatter class** for consistent message formatting - **Detection result formatting** with timestamp and metadata - **State report generation** with system metrics - **JSON serialization** with type safety ### Storage Layer (`storage/`) #### Database Manager (`storage/database_manager.py`) - **DatabaseManager class** with Repository pattern implementation - **Automatic schema management** for gas_station_1.car_frontal_info - **Connection pooling** and transaction management - **Combined update operations** with field mapping - **Error handling** and retry mechanisms #### Redis Client (`storage/redis_client.py`) - **RedisClient class** for Redis operations - **Image storage** with automatic expiration - **Pub/sub messaging** for real-time notifications - **Connection management** with retry logic - **Type-safe operations** with proper error handling #### Session Cache (`storage/session_cache.py`) - **SessionCacheManager class** for session and cache management - **Redis-backed caching** with configurable expiration - **Session tracking** with UUID generation - **Thread-safe operations** with singleton pattern ### Model Management (`models/`) #### Model Manager (`models/model_manager.py`) - **ModelManager class** for YOLO model lifecycle management - **Model caching** with memory optimization - **GPU/CPU device management** with automatic selection - **Thread-safe model loading** and inference preparation #### Pipeline Loader (`models/pipeline_loader.py`) - **PipelineLoader class** for MPTA archive handling - **ZIP archive extraction** and validation - **Configuration parsing** with schema validation - **Model file management** and loading coordination ## System Workflow ### Application Startup (app.py) 1. **Configuration Loading** - Multi-source configuration initialization 2. **Dependency Container Setup** - Service registration and IoC container configuration 3. **Singleton Manager Initialization** - Thread-safe global state managers 4. **FastAPI Application Creation** - Modern lifespan management setup 5. **Service Registration** - All components registered with dependency injection 6. **Server Launch** - Uvicorn server startup with WebSocket support ### WebSocket Connection Flow 1. **Client Connection** (`communication/websocket_handler.py:35`) 2. **Message Reception** (`communication/message_processor.py:42`) 3. **Subscription Processing** (`communication/message_processor.py:78`) 4. **Stream Initialization** (`streams/stream_manager.py:156`) 5. **Model Loading** (`models/model_manager.py:89`) 6. **Pipeline Setup** (`pipeline/pipeline_executor.py:67`) ### Detection Pipeline Flow 1. **Frame Capture** (`streams/frame_reader.py:123`) 2. **YOLO Detection** (`detection/yolo_detector.py:145`) 3. **Multi-Object Tracking** (`detection/tracking_manager.py:178`) 4. **Stability Validation** (`detection/stability_validator.py:92`) 5. **Pipeline Execution** (`pipeline/pipeline_executor.py:234`) 6. **Action Execution** (`pipeline/action_executor.py:189`) 7. **Field Mapping** (`pipeline/field_mapper.py:67`) 8. **Database Update** (`storage/database_manager.py:234`) 9. **Response Formatting** (`communication/response_formatter.py:89`) ## WebSocket Protocol ### Message Types - **subscribe**: Start camera stream with model pipeline - **unsubscribe**: Stop camera stream processing - **requestState**: Request current worker status - **setSessionId**: Associate display with session identifier - **patchSession**: Update session data - **stateReport**: Periodic heartbeat with system metrics - **imageDetection**: Detection results with timestamp and model info ### Subscription Format ```json { "type": "subscribe", "payload": { "subscriptionIdentifier": "display-001;cam-001", "rtspUrl": "rtsp://...", // OR snapshotUrl "snapshotUrl": "http://...", "snapshotInterval": 5000, "modelUrl": "http://...model.mpta", "modelId": 101, "modelName": "Vehicle Detection", "cropX1": 100, "cropY1": 200, "cropX2": 300, "cropY2": 400 } } ``` ## Model Pipeline (MPTA) Format ### Enhanced Structure - **ZIP archive** containing models and configuration - **pipeline.json** - Main configuration file with Redis + PostgreSQL settings - **Model files** - YOLO .pt files for detection/classification - **Multi-model support** - Detection + multiple classification models ### Advanced Pipeline Flow 1. **Multi-class detection stage** - YOLO detection of Car + Frontal simultaneously 2. **Validation stage** - Check for expected classes (flexible matching) 3. **Database initialization** - Create initial record with session_id 4. **Redis actions** - Save cropped frontal images with expiration 5. **Parallel classification** - Concurrent brand and body type classification 6. **Branch synchronization** - Wait for all classification branches to complete 7. **Database update** - Combined update with all classification results ### Enhanced Branch Configuration ```json { "modelId": "car_frontal_detection_v1", "modelFile": "car_frontal_detection_v1.pt", "multiClass": true, "expectedClasses": ["Car", "Frontal"], "triggerClasses": ["Car", "Frontal"], "minConfidence": 0.8, "actions": [ { "type": "redis_save_image", "region": "Frontal", "key": "inference:{display_id}:{timestamp}:{session_id}:{filename}", "expire_seconds": 600 } ], "branches": [ { "modelId": "car_brand_cls_v1", "modelFile": "car_brand_cls_v1.pt", "parallel": true, "crop": true, "cropClass": "Frontal", "triggerClasses": ["Frontal"], "minConfidence": 0.85 } ], "parallelActions": [ { "type": "postgresql_update_combined", "table": "car_frontal_info", "key_field": "session_id", "waitForBranches": ["car_brand_cls_v1", "car_bodytype_cls_v1"], "fields": { "car_brand": "{car_brand_cls_v1.brand}", "car_body_type": "{car_bodytype_cls_v1.body_type}" } } ] } ``` ## Testing Infrastructure ### Test Structure ``` tests/ ├── unit/ # Unit tests for individual components │ ├── core/ # Core infrastructure tests │ ├── detection/ # Computer vision component tests │ ├── pipeline/ # Pipeline execution tests │ ├── streams/ # Stream management tests │ ├── communication/ # WebSocket and messaging tests │ ├── storage/ # Database and Redis tests │ └── models/ # Model management tests ├── integration/ # End-to-end workflow tests ├── performance/ # Performance benchmarks and load tests └── fixtures/ # Test data and mock objects ``` ### Test Commands ```bash # Run all tests make test # Run specific test categories make test-unit make test-integration make test-performance # Run with coverage reporting make test-coverage # Run code quality checks make lint make format ``` ## Development & Deployment ### Local Development Setup ```bash # Install dependencies make install-dev # Run the application make run # Run with debug mode make run-debug # Run tests make test # Format code make format # Run quality checks make quality ``` ### Docker Deployment ```bash # Build container make docker-build # Run container make docker-run # Development container with volume mounts make docker-dev ``` ### Configuration Management - **Environment Variables**: Override default configuration - **config.json**: Development and production settings - **DETECTOR_WORKER_ENV**: Environment selection (dev/test/prod) ## Code Conventions & Patterns ### Design Patterns Used - **Dependency Injection**: IoC container with automatic constructor injection - **Singleton Pattern**: Thread-safe implementation using metaclass - **Repository Pattern**: Database operations with interface abstraction - **Factory Pattern**: Model and pipeline loading with configurable factories - **Observer Pattern**: WebSocket communication and event handling ### Code Quality Standards - **Type Hints**: Comprehensive type annotations throughout codebase - **Error Handling**: Structured exception hierarchy with proper propagation - **Logging**: Structured logging with context-aware messages - **Testing**: Unit, integration, and performance tests with high coverage - **Documentation**: Comprehensive docstrings and inline documentation ### Performance Optimizations - **GPU Acceleration**: CUDA support with automatic fallback to CPU - **Memory Management**: Model caching with automatic cleanup - **Thread Safety**: Proper locking mechanisms for concurrent operations - **Resource Monitoring**: CPU, memory, and GPU usage tracking - **Connection Pooling**: Database and Redis connection optimization ## Security Considerations - **Input Validation**: All WebSocket messages and configuration validated - **Resource Limits**: Configurable limits for streams, models, and connections - **Error Handling**: Graceful failure handling without information leakage - **Authentication**: Redis and PostgreSQL connection security - **Model Security**: MPTA file validation and secure model loading ## Database Integration ### Schema Management The system automatically creates and manages PostgreSQL tables: ```sql CREATE TABLE IF NOT EXISTS gas_station_1.car_frontal_info ( display_id VARCHAR(255), captured_timestamp VARCHAR(255), session_id VARCHAR(255) PRIMARY KEY, license_character VARCHAR(255) DEFAULT NULL, license_type VARCHAR(255) DEFAULT 'No model available', car_brand VARCHAR(255) DEFAULT NULL, car_model VARCHAR(255) DEFAULT NULL, car_body_type VARCHAR(255) DEFAULT NULL, created_at TIMESTAMP DEFAULT NOW(), updated_at TIMESTAMP DEFAULT NOW() ); ``` ### Workflow 1. **Detection**: When both "Car" and "Frontal" are detected, create initial database record with UUID session_id 2. **Redis Storage**: Save cropped frontal image to Redis with session_id in key 3. **Parallel Processing**: Run brand and body type classification concurrently 4. **Synchronization**: Wait for all branches to complete using `waitForBranches` 5. **Database Update**: Update record with combined classification results using field mapping ### Field Mapping Templates like `{car_brand_cls_v1.brand}` are resolved to actual classification results: - `car_brand_cls_v1.brand` → "Honda" - `car_bodytype_cls_v1.body_type` → "Sedan" ## Migration from Legacy Code ### Key Changes - **Monolithic to Modular**: 4,000+ lines split into 30+ focused modules - **Global State Elimination**: Replaced with thread-safe singleton managers - **Dependency Injection**: Automatic constructor injection with IoC container - **Modern FastAPI**: Deprecated patterns replaced with current best practices - **Comprehensive Testing**: Full test suite with unit, integration, and performance tests ### Breaking Changes - **Import Paths**: All imports now use `detector_worker.` prefix - **Configuration**: New multi-source configuration system - **Dependencies**: New dependency injection requirements - **Testing**: New test infrastructure and commands ## Dependencies - **fastapi[standard]**: Web framework with WebSocket support - **uvicorn**: ASGI server - **torch, torchvision**: PyTorch for ML inference - **ultralytics**: YOLO implementation - **opencv-python**: Computer vision operations - **websockets**: WebSocket client/server - **redis**: Redis client for action execution - **psycopg2-binary**: PostgreSQL database adapter - **scipy**: Scientific computing for advanced algorithms - **filterpy**: Kalman filtering and state estimation - **pytest**: Testing framework with asyncio support - **black, isort**: Code formatting tools - **mypy**: Static type checking - **flake8**: Code quality analysis ## References - **ARCHITECTURE.md**: Comprehensive technical documentation with flow diagrams - **test_protocol.py**: WebSocket communication validation - **pipeline_webcam.py**: Local testing with visual output - **Makefile**: Development workflow commands - **tox.ini**: Multi-environment testing configuration