All checks were successful
		
		
	
	Build Worker Base and Application Images / check-base-changes (push) Successful in 8s
				
			Build Worker Base and Application Images / build-base (push) Has been skipped
				
			Build Worker Base and Application Images / build-docker (push) Successful in 2m15s
				
			Build Worker Base and Application Images / deploy-stack (push) Successful in 8s
				
			
		
			
				
	
	
	
	
		
			11 KiB
		
	
	
	
	
	
	
	
			
		
		
	
	
			11 KiB
		
	
	
	
	
	
	
	
Python Detector Worker - CLAUDE.md
Project Overview
This is a FastAPI-based computer vision detection worker that processes video streams from RTSP/HTTP sources and runs advanced YOLO-based machine learning pipelines for multi-class object detection and parallel classification. The system features comprehensive database integration, Redis support, and hierarchical pipeline execution designed to work within a larger CMS (Content Management System) architecture.
Key Features
- Multi-Class Detection: Simultaneous detection of multiple object classes (e.g., Car + Frontal)
 - Parallel Processing: Concurrent execution of classification branches using ThreadPoolExecutor
 - Database Integration: Automatic PostgreSQL schema management and record updates
 - Redis Actions: Image storage with region cropping and pub/sub messaging
 - Pipeline Synchronization: Branch coordination with 
waitForBranchesfunctionality - Dynamic Field Mapping: Template-based field resolution for database operations
 
Architecture & Technology Stack
- Framework: FastAPI with WebSocket support
 - ML/CV: PyTorch, Ultralytics YOLO, OpenCV
 - Containerization: Docker (Python 3.13-bookworm base)
 - Data Storage: Redis integration for action handling + PostgreSQL for persistent storage
 - Database: Automatic schema management with gas_station_1 database
 - Parallel Processing: ThreadPoolExecutor for concurrent classification
 - Communication: WebSocket-based real-time protocol
 
Core Components
Main Application (app.py)
- FastAPI WebSocket server for real-time communication
 - Multi-camera stream management with shared stream optimization
 - HTTP REST endpoint for image retrieval (
/camera/{camera_id}/image) - Threading-based frame readers for RTSP streams and HTTP snapshots
 - Model loading and inference using MPTA (Machine Learning Pipeline Archive) format
 - Session management with display identifier mapping
 - Resource monitoring (CPU, memory, GPU usage via psutil)
 
Pipeline System (siwatsystem/pympta.py)
- MPTA file handling - ZIP archives containing model configurations
 - Hierarchical pipeline execution with detection → classification branching
 - Multi-class detection - Simultaneous detection of multiple classes (Car + Frontal)
 - Parallel processing - Concurrent classification branches with ThreadPoolExecutor
 - Redis action system - Image saving with region cropping and message publishing
 - PostgreSQL integration - Automatic table creation and combined updates
 - Dynamic model loading with GPU optimization
 - Configurable trigger classes and confidence thresholds
 - Branch synchronization - waitForBranches coordination for database updates
 
Database System (siwatsystem/database.py)
- DatabaseManager class for PostgreSQL operations
 - Automatic table creation with gas_station_1.car_frontal_info schema
 - Combined update operations with field mapping from branch results
 - Session management with UUID generation
 - Error handling and connection management
 
Testing & Debugging
- Protocol test script (
test_protocol.py) for WebSocket communication validation - Pipeline webcam utility (
pipeline_webcam.py) for local testing with visual output - RTSP streaming debug tool (
debug/rtsp_webcam.py) using GStreamer 
Code Conventions & Patterns
Logging
- Structured logging using Python's logging module
 - File + console output to 
detector_worker.log - Debug level separation for detailed troubleshooting
 - Context-aware messages with camera IDs and model information
 
Error Handling
- Graceful failure handling with retry mechanisms (configurable max_retries)
 - Thread-safe operations using locks for streams and models
 - WebSocket disconnect handling with proper cleanup
 - Model loading validation with detailed error reporting
 
Configuration
- JSON configuration (
config.json) for runtime parameters:poll_interval_ms: Frame processing intervalmax_streams: Concurrent stream limittarget_fps: Target frame ratereconnect_interval_sec: Stream reconnection delaymax_retries: Maximum retry attempts (-1 for unlimited)
 
Threading Model
- Frame reader threads for each camera stream (RTSP/HTTP)
 - Shared stream optimization - multiple subscriptions can reuse the same camera stream
 - Async WebSocket handling with concurrent task management
 - Thread-safe data structures with proper locking mechanisms
 
WebSocket Protocol
Message Types
- subscribe: Start camera stream with model pipeline
 - unsubscribe: Stop camera stream processing
 - requestState: Request current worker status
 - setSessionId: Associate display with session identifier
 - patchSession: Update session data
 - stateReport: Periodic heartbeat with system metrics
 - imageDetection: Detection results with timestamp and model info
 
Subscription Format
{
  "type": "subscribe",
  "payload": {
    "subscriptionIdentifier": "display-001;cam-001",
    "rtspUrl": "rtsp://...",  // OR snapshotUrl
    "snapshotUrl": "http://...",
    "snapshotInterval": 5000,
    "modelUrl": "http://...model.mpta",
    "modelId": 101,
    "modelName": "Vehicle Detection",
    "cropX1": 100, "cropY1": 200,
    "cropX2": 300, "cropY2": 400
  }
}
Model Pipeline (MPTA) Format
Enhanced Structure
- ZIP archive containing models and configuration
 - pipeline.json - Main configuration file with Redis + PostgreSQL settings
 - Model files - YOLO .pt files for detection/classification
 - Multi-model support - Detection + multiple classification models
 
Advanced Pipeline Flow
- Multi-class detection stage - YOLO detection of Car + Frontal simultaneously
 - Validation stage - Check for expected classes (flexible matching)
 - Database initialization - Create initial record with session_id
 - Redis actions - Save cropped frontal images with expiration
 - Parallel classification - Concurrent brand and body type classification
 - Branch synchronization - Wait for all classification branches to complete
 - Database update - Combined update with all classification results
 
Enhanced Branch Configuration
{
  "modelId": "car_frontal_detection_v1",
  "modelFile": "car_frontal_detection_v1.pt",
  "multiClass": true,
  "expectedClasses": ["Car", "Frontal"],
  "triggerClasses": ["Car", "Frontal"],
  "minConfidence": 0.8,
  "actions": [
    {
      "type": "redis_save_image",
      "region": "Frontal",
      "key": "inference:{display_id}:{timestamp}:{session_id}:{filename}",
      "expire_seconds": 600
    }
  ],
  "branches": [
    {
      "modelId": "car_brand_cls_v1",
      "modelFile": "car_brand_cls_v1.pt",
      "parallel": true,
      "crop": true,
      "cropClass": "Frontal",
      "triggerClasses": ["Frontal"],
      "minConfidence": 0.85
    }
  ],
  "parallelActions": [
    {
      "type": "postgresql_update_combined",
      "table": "car_frontal_info",
      "key_field": "session_id",
      "waitForBranches": ["car_brand_cls_v1", "car_bodytype_cls_v1"],
      "fields": {
        "car_brand": "{car_brand_cls_v1.brand}",
        "car_body_type": "{car_bodytype_cls_v1.body_type}"
      }
    }
  ]
}
Stream Management
Shared Streams
- Multiple subscriptions can share the same camera URL
 - Reference counting prevents premature stream termination
 - Automatic cleanup when last subscription ends
 
Frame Processing
- Queue-based buffering with single frame capacity (latest frame only)
 - Configurable polling interval based on target FPS
 - Automatic reconnection with exponential backoff
 
Development & Testing
Local Development
# Install dependencies
pip install -r requirements.txt
# Run the worker
python app.py
# Test protocol compliance
python test_protocol.py
# Test pipeline with webcam
python pipeline_webcam.py --mpta-file path/to/model.mpta --video 0
Docker Deployment
# Build container
docker build -t detector-worker .
# Run with volume mounts for models
docker run -p 8000:8000 -v ./models:/app/models detector-worker
Testing Commands
- Protocol testing: 
python test_protocol.py - Pipeline validation: 
python pipeline_webcam.py --mpta-file <path> --video 0 - RTSP debugging: 
python debug/rtsp_webcam.py 
Dependencies
- fastapi[standard]: Web framework with WebSocket support
 - uvicorn: ASGI server
 - torch, torchvision: PyTorch for ML inference
 - ultralytics: YOLO implementation
 - opencv-python: Computer vision operations
 - websockets: WebSocket client/server
 - redis: Redis client for action execution
 - psycopg2-binary: PostgreSQL database adapter
 - scipy: Scientific computing for advanced algorithms
 - filterpy: Kalman filtering and state estimation
 
Security Considerations
- Model files are loaded from trusted sources only
 - Redis connections use authentication when configured
 - WebSocket connections handle disconnects gracefully
 - Resource usage is monitored to prevent DoS
 
Database Integration
Schema Management
The system automatically creates and manages PostgreSQL tables:
CREATE TABLE IF NOT EXISTS gas_station_1.car_frontal_info (
    display_id VARCHAR(255),
    captured_timestamp VARCHAR(255),
    session_id VARCHAR(255) PRIMARY KEY,
    license_character VARCHAR(255) DEFAULT NULL,
    license_type VARCHAR(255) DEFAULT 'No model available',
    car_brand VARCHAR(255) DEFAULT NULL,
    car_model VARCHAR(255) DEFAULT NULL,
    car_body_type VARCHAR(255) DEFAULT NULL,
    created_at TIMESTAMP DEFAULT NOW(),
    updated_at TIMESTAMP DEFAULT NOW()
);
Workflow
- Detection: When both "Car" and "Frontal" are detected, create initial database record with UUID session_id
 - Redis Storage: Save cropped frontal image to Redis with session_id in key
 - Parallel Processing: Run brand and body type classification concurrently
 - Synchronization: Wait for all branches to complete using 
waitForBranches - Database Update: Update record with combined classification results using field mapping
 
Field Mapping
Templates like {car_brand_cls_v1.brand} are resolved to actual classification results:
car_brand_cls_v1.brand→ "Honda"car_bodytype_cls_v1.body_type→ "Sedan"
Performance Optimizations
- GPU acceleration when CUDA is available
 - Shared camera streams reduce resource usage
 - Frame queue optimization (single latest frame)
 - Model caching across subscriptions
 - Trigger class filtering for faster inference
 - Parallel processing with ThreadPoolExecutor for classification branches
 - Multi-class detection reduces inference passes
 - Region-based cropping minimizes processing overhead
 - Database connection pooling and prepared statements
 - Redis image storage with automatic expiration