All checks were successful
Build Worker Base and Application Images / check-base-changes (push) Successful in 8s
Build Worker Base and Application Images / build-base (push) Has been skipped
Build Worker Base and Application Images / build-docker (push) Successful in 2m15s
Build Worker Base and Application Images / deploy-stack (push) Successful in 8s
11 KiB
11 KiB
Python Detector Worker - CLAUDE.md
Project Overview
This is a FastAPI-based computer vision detection worker that processes video streams from RTSP/HTTP sources and runs advanced YOLO-based machine learning pipelines for multi-class object detection and parallel classification. The system features comprehensive database integration, Redis support, and hierarchical pipeline execution designed to work within a larger CMS (Content Management System) architecture.
Key Features
- Multi-Class Detection: Simultaneous detection of multiple object classes (e.g., Car + Frontal)
- Parallel Processing: Concurrent execution of classification branches using ThreadPoolExecutor
- Database Integration: Automatic PostgreSQL schema management and record updates
- Redis Actions: Image storage with region cropping and pub/sub messaging
- Pipeline Synchronization: Branch coordination with
waitForBranches
functionality - Dynamic Field Mapping: Template-based field resolution for database operations
Architecture & Technology Stack
- Framework: FastAPI with WebSocket support
- ML/CV: PyTorch, Ultralytics YOLO, OpenCV
- Containerization: Docker (Python 3.13-bookworm base)
- Data Storage: Redis integration for action handling + PostgreSQL for persistent storage
- Database: Automatic schema management with gas_station_1 database
- Parallel Processing: ThreadPoolExecutor for concurrent classification
- Communication: WebSocket-based real-time protocol
Core Components
Main Application (app.py
)
- FastAPI WebSocket server for real-time communication
- Multi-camera stream management with shared stream optimization
- HTTP REST endpoint for image retrieval (
/camera/{camera_id}/image
) - Threading-based frame readers for RTSP streams and HTTP snapshots
- Model loading and inference using MPTA (Machine Learning Pipeline Archive) format
- Session management with display identifier mapping
- Resource monitoring (CPU, memory, GPU usage via psutil)
Pipeline System (siwatsystem/pympta.py
)
- MPTA file handling - ZIP archives containing model configurations
- Hierarchical pipeline execution with detection → classification branching
- Multi-class detection - Simultaneous detection of multiple classes (Car + Frontal)
- Parallel processing - Concurrent classification branches with ThreadPoolExecutor
- Redis action system - Image saving with region cropping and message publishing
- PostgreSQL integration - Automatic table creation and combined updates
- Dynamic model loading with GPU optimization
- Configurable trigger classes and confidence thresholds
- Branch synchronization - waitForBranches coordination for database updates
Database System (siwatsystem/database.py
)
- DatabaseManager class for PostgreSQL operations
- Automatic table creation with gas_station_1.car_frontal_info schema
- Combined update operations with field mapping from branch results
- Session management with UUID generation
- Error handling and connection management
Testing & Debugging
- Protocol test script (
test_protocol.py
) for WebSocket communication validation - Pipeline webcam utility (
pipeline_webcam.py
) for local testing with visual output - RTSP streaming debug tool (
debug/rtsp_webcam.py
) using GStreamer
Code Conventions & Patterns
Logging
- Structured logging using Python's logging module
- File + console output to
detector_worker.log
- Debug level separation for detailed troubleshooting
- Context-aware messages with camera IDs and model information
Error Handling
- Graceful failure handling with retry mechanisms (configurable max_retries)
- Thread-safe operations using locks for streams and models
- WebSocket disconnect handling with proper cleanup
- Model loading validation with detailed error reporting
Configuration
- JSON configuration (
config.json
) for runtime parameters:poll_interval_ms
: Frame processing intervalmax_streams
: Concurrent stream limittarget_fps
: Target frame ratereconnect_interval_sec
: Stream reconnection delaymax_retries
: Maximum retry attempts (-1 for unlimited)
Threading Model
- Frame reader threads for each camera stream (RTSP/HTTP)
- Shared stream optimization - multiple subscriptions can reuse the same camera stream
- Async WebSocket handling with concurrent task management
- Thread-safe data structures with proper locking mechanisms
WebSocket Protocol
Message Types
- subscribe: Start camera stream with model pipeline
- unsubscribe: Stop camera stream processing
- requestState: Request current worker status
- setSessionId: Associate display with session identifier
- patchSession: Update session data
- stateReport: Periodic heartbeat with system metrics
- imageDetection: Detection results with timestamp and model info
Subscription Format
{
"type": "subscribe",
"payload": {
"subscriptionIdentifier": "display-001;cam-001",
"rtspUrl": "rtsp://...", // OR snapshotUrl
"snapshotUrl": "http://...",
"snapshotInterval": 5000,
"modelUrl": "http://...model.mpta",
"modelId": 101,
"modelName": "Vehicle Detection",
"cropX1": 100, "cropY1": 200,
"cropX2": 300, "cropY2": 400
}
}
Model Pipeline (MPTA) Format
Enhanced Structure
- ZIP archive containing models and configuration
- pipeline.json - Main configuration file with Redis + PostgreSQL settings
- Model files - YOLO .pt files for detection/classification
- Multi-model support - Detection + multiple classification models
Advanced Pipeline Flow
- Multi-class detection stage - YOLO detection of Car + Frontal simultaneously
- Validation stage - Check for expected classes (flexible matching)
- Database initialization - Create initial record with session_id
- Redis actions - Save cropped frontal images with expiration
- Parallel classification - Concurrent brand and body type classification
- Branch synchronization - Wait for all classification branches to complete
- Database update - Combined update with all classification results
Enhanced Branch Configuration
{
"modelId": "car_frontal_detection_v1",
"modelFile": "car_frontal_detection_v1.pt",
"multiClass": true,
"expectedClasses": ["Car", "Frontal"],
"triggerClasses": ["Car", "Frontal"],
"minConfidence": 0.8,
"actions": [
{
"type": "redis_save_image",
"region": "Frontal",
"key": "inference:{display_id}:{timestamp}:{session_id}:{filename}",
"expire_seconds": 600
}
],
"branches": [
{
"modelId": "car_brand_cls_v1",
"modelFile": "car_brand_cls_v1.pt",
"parallel": true,
"crop": true,
"cropClass": "Frontal",
"triggerClasses": ["Frontal"],
"minConfidence": 0.85
}
],
"parallelActions": [
{
"type": "postgresql_update_combined",
"table": "car_frontal_info",
"key_field": "session_id",
"waitForBranches": ["car_brand_cls_v1", "car_bodytype_cls_v1"],
"fields": {
"car_brand": "{car_brand_cls_v1.brand}",
"car_body_type": "{car_bodytype_cls_v1.body_type}"
}
}
]
}
Stream Management
Shared Streams
- Multiple subscriptions can share the same camera URL
- Reference counting prevents premature stream termination
- Automatic cleanup when last subscription ends
Frame Processing
- Queue-based buffering with single frame capacity (latest frame only)
- Configurable polling interval based on target FPS
- Automatic reconnection with exponential backoff
Development & Testing
Local Development
# Install dependencies
pip install -r requirements.txt
# Run the worker
python app.py
# Test protocol compliance
python test_protocol.py
# Test pipeline with webcam
python pipeline_webcam.py --mpta-file path/to/model.mpta --video 0
Docker Deployment
# Build container
docker build -t detector-worker .
# Run with volume mounts for models
docker run -p 8000:8000 -v ./models:/app/models detector-worker
Testing Commands
- Protocol testing:
python test_protocol.py
- Pipeline validation:
python pipeline_webcam.py --mpta-file <path> --video 0
- RTSP debugging:
python debug/rtsp_webcam.py
Dependencies
- fastapi[standard]: Web framework with WebSocket support
- uvicorn: ASGI server
- torch, torchvision: PyTorch for ML inference
- ultralytics: YOLO implementation
- opencv-python: Computer vision operations
- websockets: WebSocket client/server
- redis: Redis client for action execution
- psycopg2-binary: PostgreSQL database adapter
- scipy: Scientific computing for advanced algorithms
- filterpy: Kalman filtering and state estimation
Security Considerations
- Model files are loaded from trusted sources only
- Redis connections use authentication when configured
- WebSocket connections handle disconnects gracefully
- Resource usage is monitored to prevent DoS
Database Integration
Schema Management
The system automatically creates and manages PostgreSQL tables:
CREATE TABLE IF NOT EXISTS gas_station_1.car_frontal_info (
display_id VARCHAR(255),
captured_timestamp VARCHAR(255),
session_id VARCHAR(255) PRIMARY KEY,
license_character VARCHAR(255) DEFAULT NULL,
license_type VARCHAR(255) DEFAULT 'No model available',
car_brand VARCHAR(255) DEFAULT NULL,
car_model VARCHAR(255) DEFAULT NULL,
car_body_type VARCHAR(255) DEFAULT NULL,
created_at TIMESTAMP DEFAULT NOW(),
updated_at TIMESTAMP DEFAULT NOW()
);
Workflow
- Detection: When both "Car" and "Frontal" are detected, create initial database record with UUID session_id
- Redis Storage: Save cropped frontal image to Redis with session_id in key
- Parallel Processing: Run brand and body type classification concurrently
- Synchronization: Wait for all branches to complete using
waitForBranches
- Database Update: Update record with combined classification results using field mapping
Field Mapping
Templates like {car_brand_cls_v1.brand}
are resolved to actual classification results:
car_brand_cls_v1.brand
→ "Honda"car_bodytype_cls_v1.body_type
→ "Sedan"
Performance Optimizations
- GPU acceleration when CUDA is available
- Shared camera streams reduce resource usage
- Frame queue optimization (single latest frame)
- Model caching across subscriptions
- Trigger class filtering for faster inference
- Parallel processing with ThreadPoolExecutor for classification branches
- Multi-class detection reduces inference passes
- Region-based cropping minimizes processing overhead
- Database connection pooling and prepared statements
- Redis image storage with automatic expiration