21 KiB
Python Detector Worker - CLAUDE.md
Project Overview
This is a FastAPI-based computer vision detection worker that processes video streams from RTSP/HTTP sources and runs advanced YOLO-based machine learning pipelines for multi-class object detection and parallel classification. The system features comprehensive database integration, Redis support, and hierarchical pipeline execution designed to work within a larger CMS (Content Management System) architecture.
IMPORTANT: This project has been completely refactored from a monolithic 4,000+ line codebase into a modular, maintainable architecture with dependency injection, comprehensive testing, and clear separation of concerns.
Key Features
- Multi-Class Detection: Simultaneous detection of multiple object classes (e.g., Car + Frontal)
- Parallel Processing: Concurrent execution of classification branches using ThreadPoolExecutor
- Database Integration: Automatic PostgreSQL schema management and record updates
- Redis Actions: Image storage with region cropping and pub/sub messaging
- Pipeline Synchronization: Branch coordination with
waitForBranches
functionality - Dynamic Field Mapping: Template-based field resolution for database operations
- Modular Architecture: Clean separation of concerns with dependency injection
- Thread-Safe Operations: Singleton managers for global state management
- Comprehensive Testing: Unit, integration, and performance test suites
Architecture & Technology Stack
- Framework: FastAPI with WebSocket support and modern async patterns
- ML/CV: PyTorch, Ultralytics YOLO, OpenCV with BoT-SORT tracking
- Containerization: Docker (Python 3.13-bookworm base)
- Data Storage: Redis integration for action handling + PostgreSQL for persistent storage
- Database: Automatic schema management with gas_station_1 database
- Parallel Processing: ThreadPoolExecutor for concurrent classification
- Communication: WebSocket-based real-time protocol
- Design Patterns: Dependency Injection, Singleton, Repository patterns
- Testing: Pytest with asyncio, mocking, and benchmarking support
New Modular Architecture
Core Directory Structure
detector_worker/
├── core/ # Core infrastructure components
│ ├── config.py # Multi-source configuration management
│ ├── constants.py # Application constants
│ ├── exceptions.py # Custom exception hierarchy
│ ├── singleton_managers.py # Thread-safe singleton managers
│ └── dependency_injection.py # IoC container implementation
├── detection/ # Computer vision and ML components
│ ├── yolo_detector.py # YOLO model inference
│ ├── tracking_manager.py # BoT-SORT multi-object tracking
│ └── stability_validator.py # Detection stability validation
├── pipeline/ # ML pipeline execution
│ ├── pipeline_executor.py # Main pipeline orchestration
│ ├── action_executor.py # Redis/DB action execution
│ └── field_mapper.py # Template-based field mapping
├── streams/ # Video stream management
│ ├── stream_manager.py # Stream lifecycle management
│ ├── frame_reader.py # RTSP/HTTP frame reading
│ └── camera_monitor.py # Connection state monitoring
├── communication/ # WebSocket and messaging
│ ├── websocket_handler.py # WebSocket connection management
│ ├── message_processor.py # Message routing and processing
│ └── response_formatter.py # Response formatting
├── storage/ # Data persistence
│ ├── database_manager.py # PostgreSQL operations
│ ├── redis_client.py # Redis operations
│ └── session_cache.py # Session and cache management
├── models/ # Model and pipeline management
│ ├── model_manager.py # YOLO model loading/caching
│ └── pipeline_loader.py # MPTA pipeline loading
└── utils/ # Utility functions
├── logging_utils.py # Structured logging setup
├── image_utils.py # Image processing utilities
└── system_utils.py # System monitoring utilities
Main Application (app.py
)
- FastAPI application with modern lifespan management (replacing deprecated @app.on_event)
- Dependency injection container initialization and service registration
- WebSocket endpoint delegation to communication layer
- HTTP REST endpoint for image retrieval (
/camera/{camera_id}/image
) - Clean separation between framework code and business logic
- Resource monitoring integrated with singleton managers
Core Infrastructure (core/
)
Configuration Management (core/config.py
)
- Multi-source configuration with environment variables, JSON files, and defaults
- Type-safe configuration with proper validation and error handling
- Development/production environment support
- Configuration providers pattern for extensible config sources
Singleton Managers (core/singleton_managers.py
)
- Thread-safe singleton implementation using metaclass pattern
- Six specialized managers:
SubscriptionManager
: WebSocket subscription lifecycleStreamManager
: RTSP/HTTP stream management with reference countingModelManager
: YOLO model loading and cachingPipelineManager
: MPTA pipeline loading and managementSessionManager
: Session tracking and managementSessionCacheManager
: Redis-backed session caching
- Replaces global dictionaries for thread-safe state management
Dependency Injection (core/dependency_injection.py
)
- Full IoC container with automatic constructor injection
- Three service lifetimes: Singleton, Transient, Scoped
- Interface-based registration supporting both concrete types and factories
- Circular dependency detection and resolution
- Scope management for request-scoped services
Detection System (detection/
)
YOLO Detector (detection/yolo_detector.py
)
- YOLODetector class for inference management
- Multi-class detection support (Car + Frontal simultaneously)
- GPU/CPU optimization with automatic device selection
- Confidence filtering and NMS post-processing
- Integration with tracking system
Tracking Manager (detection/tracking_manager.py
)
- BoT-SORT implementation for multi-object tracking
- Track lifecycle management with unique ID assignment
- Stability validation integration for reliable detections
- Performance optimization with configurable parameters
Stability Validator (detection/stability_validator.py
)
- Detection stability analysis over time windows
- Confidence trend analysis for reliable object detection
- Configurable thresholds for stability determination
- Integration with tracking and pipeline systems
Pipeline System (pipeline/
)
Pipeline Executor (pipeline/pipeline_executor.py
)
- PipelineExecutor class for orchestrating MPTA pipeline execution
- Hierarchical execution with detection → classification branching
- Parallel processing using ThreadPoolExecutor for classification branches
- Branch synchronization with
waitForBranches
coordination - Error handling and retry mechanisms
Action Executor (pipeline/action_executor.py
)
- ActionExecutor class for Redis and database actions
- Redis image storage with region cropping and expiration
- PostgreSQL operations with automatic schema management
- Pub/sub messaging for real-time notifications
- Template-based configuration for dynamic action parameters
Field Mapper (pipeline/field_mapper.py
)
- FieldMapper class for template-based field resolution
- Dynamic field mapping from classification results to database fields
- Template syntax:
{model_id.field_name}
→ actual values - Type-safe field resolution with error handling
Stream Management (streams/
)
Stream Manager (streams/stream_manager.py
)
- StreamManager class for RTSP/HTTP stream lifecycle management
- Reference counting for shared stream optimization
- Automatic reconnection with exponential backoff
- Thread-safe operations with proper locking mechanisms
Frame Reader (streams/frame_reader.py
)
- FrameReader class for RTSP stream processing
- SnapshotReader class for HTTP snapshot capture
- Queue-based buffering with latest frame optimization
- Error handling and connection recovery
Camera Monitor (streams/camera_monitor.py
)
- CameraMonitor class for connection state tracking
- Health monitoring and status reporting
- Integration with stream management for failure detection
Communication Layer (communication/
)
WebSocket Handler (communication/websocket_handler.py
)
- WebSocketHandler class for connection lifecycle management
- Message routing to appropriate processors
- Error handling and graceful disconnection
- Integration with subscription management
Message Processor (communication/message_processor.py
)
- MessageProcessor class for message routing and handling
- Type-safe message processing with validation
- Integration with detection pipeline and state management
- Real-time response formatting
Response Formatter (communication/response_formatter.py
)
- ResponseFormatter class for consistent message formatting
- Detection result formatting with timestamp and metadata
- State report generation with system metrics
- JSON serialization with type safety
Storage Layer (storage/
)
Database Manager (storage/database_manager.py
)
- DatabaseManager class with Repository pattern implementation
- Automatic schema management for gas_station_1.car_frontal_info
- Connection pooling and transaction management
- Combined update operations with field mapping
- Error handling and retry mechanisms
Redis Client (storage/redis_client.py
)
- RedisClient class for Redis operations
- Image storage with automatic expiration
- Pub/sub messaging for real-time notifications
- Connection management with retry logic
- Type-safe operations with proper error handling
Session Cache (storage/session_cache.py
)
- SessionCacheManager class for session and cache management
- Redis-backed caching with configurable expiration
- Session tracking with UUID generation
- Thread-safe operations with singleton pattern
Model Management (models/
)
Model Manager (models/model_manager.py
)
- ModelManager class for YOLO model lifecycle management
- Model caching with memory optimization
- GPU/CPU device management with automatic selection
- Thread-safe model loading and inference preparation
Pipeline Loader (models/pipeline_loader.py
)
- PipelineLoader class for MPTA archive handling
- ZIP archive extraction and validation
- Configuration parsing with schema validation
- Model file management and loading coordination
System Workflow
Application Startup (app.py)
- Configuration Loading - Multi-source configuration initialization
- Dependency Container Setup - Service registration and IoC container configuration
- Singleton Manager Initialization - Thread-safe global state managers
- FastAPI Application Creation - Modern lifespan management setup
- Service Registration - All components registered with dependency injection
- Server Launch - Uvicorn server startup with WebSocket support
WebSocket Connection Flow
- Client Connection (
communication/websocket_handler.py:35
) - Message Reception (
communication/message_processor.py:42
) - Subscription Processing (
communication/message_processor.py:78
) - Stream Initialization (
streams/stream_manager.py:156
) - Model Loading (
models/model_manager.py:89
) - Pipeline Setup (
pipeline/pipeline_executor.py:67
)
Detection Pipeline Flow
- Frame Capture (
streams/frame_reader.py:123
) - YOLO Detection (
detection/yolo_detector.py:145
) - Multi-Object Tracking (
detection/tracking_manager.py:178
) - Stability Validation (
detection/stability_validator.py:92
) - Pipeline Execution (
pipeline/pipeline_executor.py:234
) - Action Execution (
pipeline/action_executor.py:189
) - Field Mapping (
pipeline/field_mapper.py:67
) - Database Update (
storage/database_manager.py:234
) - Response Formatting (
communication/response_formatter.py:89
)
WebSocket Protocol
Message Types
- subscribe: Start camera stream with model pipeline
- unsubscribe: Stop camera stream processing
- requestState: Request current worker status
- setSessionId: Associate display with session identifier
- patchSession: Update session data
- stateReport: Periodic heartbeat with system metrics
- imageDetection: Detection results with timestamp and model info
Subscription Format
{
"type": "subscribe",
"payload": {
"subscriptionIdentifier": "display-001;cam-001",
"rtspUrl": "rtsp://...", // OR snapshotUrl
"snapshotUrl": "http://...",
"snapshotInterval": 5000,
"modelUrl": "http://...model.mpta",
"modelId": 101,
"modelName": "Vehicle Detection",
"cropX1": 100, "cropY1": 200,
"cropX2": 300, "cropY2": 400
}
}
Model Pipeline (MPTA) Format
Enhanced Structure
- ZIP archive containing models and configuration
- pipeline.json - Main configuration file with Redis + PostgreSQL settings
- Model files - YOLO .pt files for detection/classification
- Multi-model support - Detection + multiple classification models
Advanced Pipeline Flow
- Multi-class detection stage - YOLO detection of Car + Frontal simultaneously
- Validation stage - Check for expected classes (flexible matching)
- Database initialization - Create initial record with session_id
- Redis actions - Save cropped frontal images with expiration
- Parallel classification - Concurrent brand and body type classification
- Branch synchronization - Wait for all classification branches to complete
- Database update - Combined update with all classification results
Enhanced Branch Configuration
{
"modelId": "car_frontal_detection_v1",
"modelFile": "car_frontal_detection_v1.pt",
"multiClass": true,
"expectedClasses": ["Car", "Frontal"],
"triggerClasses": ["Car", "Frontal"],
"minConfidence": 0.8,
"actions": [
{
"type": "redis_save_image",
"region": "Frontal",
"key": "inference:{display_id}:{timestamp}:{session_id}:{filename}",
"expire_seconds": 600
}
],
"branches": [
{
"modelId": "car_brand_cls_v1",
"modelFile": "car_brand_cls_v1.pt",
"parallel": true,
"crop": true,
"cropClass": "Frontal",
"triggerClasses": ["Frontal"],
"minConfidence": 0.85
}
],
"parallelActions": [
{
"type": "postgresql_update_combined",
"table": "car_frontal_info",
"key_field": "session_id",
"waitForBranches": ["car_brand_cls_v1", "car_bodytype_cls_v1"],
"fields": {
"car_brand": "{car_brand_cls_v1.brand}",
"car_body_type": "{car_bodytype_cls_v1.body_type}"
}
}
]
}
Testing Infrastructure
Test Structure
tests/
├── unit/ # Unit tests for individual components
│ ├── core/ # Core infrastructure tests
│ ├── detection/ # Computer vision component tests
│ ├── pipeline/ # Pipeline execution tests
│ ├── streams/ # Stream management tests
│ ├── communication/ # WebSocket and messaging tests
│ ├── storage/ # Database and Redis tests
│ └── models/ # Model management tests
├── integration/ # End-to-end workflow tests
├── performance/ # Performance benchmarks and load tests
└── fixtures/ # Test data and mock objects
Test Commands
# Run all tests
make test
# Run specific test categories
make test-unit
make test-integration
make test-performance
# Run with coverage reporting
make test-coverage
# Run code quality checks
make lint
make format
Development & Deployment
Local Development Setup
# Install dependencies
make install-dev
# Run the application
make run
# Run with debug mode
make run-debug
# Run tests
make test
# Format code
make format
# Run quality checks
make quality
Docker Deployment
# Build container
make docker-build
# Run container
make docker-run
# Development container with volume mounts
make docker-dev
Configuration Management
- Environment Variables: Override default configuration
- config.json: Development and production settings
- DETECTOR_WORKER_ENV: Environment selection (dev/test/prod)
Code Conventions & Patterns
Design Patterns Used
- Dependency Injection: IoC container with automatic constructor injection
- Singleton Pattern: Thread-safe implementation using metaclass
- Repository Pattern: Database operations with interface abstraction
- Factory Pattern: Model and pipeline loading with configurable factories
- Observer Pattern: WebSocket communication and event handling
Code Quality Standards
- Type Hints: Comprehensive type annotations throughout codebase
- Error Handling: Structured exception hierarchy with proper propagation
- Logging: Structured logging with context-aware messages
- Testing: Unit, integration, and performance tests with high coverage
- Documentation: Comprehensive docstrings and inline documentation
Performance Optimizations
- GPU Acceleration: CUDA support with automatic fallback to CPU
- Memory Management: Model caching with automatic cleanup
- Thread Safety: Proper locking mechanisms for concurrent operations
- Resource Monitoring: CPU, memory, and GPU usage tracking
- Connection Pooling: Database and Redis connection optimization
Security Considerations
- Input Validation: All WebSocket messages and configuration validated
- Resource Limits: Configurable limits for streams, models, and connections
- Error Handling: Graceful failure handling without information leakage
- Authentication: Redis and PostgreSQL connection security
- Model Security: MPTA file validation and secure model loading
Database Integration
Schema Management
The system automatically creates and manages PostgreSQL tables:
CREATE TABLE IF NOT EXISTS gas_station_1.car_frontal_info (
display_id VARCHAR(255),
captured_timestamp VARCHAR(255),
session_id VARCHAR(255) PRIMARY KEY,
license_character VARCHAR(255) DEFAULT NULL,
license_type VARCHAR(255) DEFAULT 'No model available',
car_brand VARCHAR(255) DEFAULT NULL,
car_model VARCHAR(255) DEFAULT NULL,
car_body_type VARCHAR(255) DEFAULT NULL,
created_at TIMESTAMP DEFAULT NOW(),
updated_at TIMESTAMP DEFAULT NOW()
);
Workflow
- Detection: When both "Car" and "Frontal" are detected, create initial database record with UUID session_id
- Redis Storage: Save cropped frontal image to Redis with session_id in key
- Parallel Processing: Run brand and body type classification concurrently
- Synchronization: Wait for all branches to complete using
waitForBranches
- Database Update: Update record with combined classification results using field mapping
Field Mapping
Templates like {car_brand_cls_v1.brand}
are resolved to actual classification results:
car_brand_cls_v1.brand
→ "Honda"car_bodytype_cls_v1.body_type
→ "Sedan"
Migration from Legacy Code
Key Changes
- Monolithic to Modular: 4,000+ lines split into 30+ focused modules
- Global State Elimination: Replaced with thread-safe singleton managers
- Dependency Injection: Automatic constructor injection with IoC container
- Modern FastAPI: Deprecated patterns replaced with current best practices
- Comprehensive Testing: Full test suite with unit, integration, and performance tests
Breaking Changes
- Import Paths: All imports now use
detector_worker.
prefix - Configuration: New multi-source configuration system
- Dependencies: New dependency injection requirements
- Testing: New test infrastructure and commands
Dependencies
- fastapi[standard]: Web framework with WebSocket support
- uvicorn: ASGI server
- torch, torchvision: PyTorch for ML inference
- ultralytics: YOLO implementation
- opencv-python: Computer vision operations
- websockets: WebSocket client/server
- redis: Redis client for action execution
- psycopg2-binary: PostgreSQL database adapter
- scipy: Scientific computing for advanced algorithms
- filterpy: Kalman filtering and state estimation
- pytest: Testing framework with asyncio support
- black, isort: Code formatting tools
- mypy: Static type checking
- flake8: Code quality analysis
References
- ARCHITECTURE.md: Comprehensive technical documentation with flow diagrams
- test_protocol.py: WebSocket communication validation
- pipeline_webcam.py: Local testing with visual output
- Makefile: Development workflow commands
- tox.ini: Multi-environment testing configuration