python-detector-worker/CLAUDE.md
ziesorx cfc7503a14
All checks were successful
Build Worker Base and Application Images / check-base-changes (push) Successful in 8s
Build Worker Base and Application Images / build-base (push) Has been skipped
Build Worker Base and Application Images / build-docker (push) Successful in 2m15s
Build Worker Base and Application Images / deploy-stack (push) Successful in 8s
Update markdown
2025-08-10 20:51:16 +07:00

11 KiB

Python Detector Worker - CLAUDE.md

Project Overview

This is a FastAPI-based computer vision detection worker that processes video streams from RTSP/HTTP sources and runs advanced YOLO-based machine learning pipelines for multi-class object detection and parallel classification. The system features comprehensive database integration, Redis support, and hierarchical pipeline execution designed to work within a larger CMS (Content Management System) architecture.

Key Features

  • Multi-Class Detection: Simultaneous detection of multiple object classes (e.g., Car + Frontal)
  • Parallel Processing: Concurrent execution of classification branches using ThreadPoolExecutor
  • Database Integration: Automatic PostgreSQL schema management and record updates
  • Redis Actions: Image storage with region cropping and pub/sub messaging
  • Pipeline Synchronization: Branch coordination with waitForBranches functionality
  • Dynamic Field Mapping: Template-based field resolution for database operations

Architecture & Technology Stack

  • Framework: FastAPI with WebSocket support
  • ML/CV: PyTorch, Ultralytics YOLO, OpenCV
  • Containerization: Docker (Python 3.13-bookworm base)
  • Data Storage: Redis integration for action handling + PostgreSQL for persistent storage
  • Database: Automatic schema management with gas_station_1 database
  • Parallel Processing: ThreadPoolExecutor for concurrent classification
  • Communication: WebSocket-based real-time protocol

Core Components

Main Application (app.py)

  • FastAPI WebSocket server for real-time communication
  • Multi-camera stream management with shared stream optimization
  • HTTP REST endpoint for image retrieval (/camera/{camera_id}/image)
  • Threading-based frame readers for RTSP streams and HTTP snapshots
  • Model loading and inference using MPTA (Machine Learning Pipeline Archive) format
  • Session management with display identifier mapping
  • Resource monitoring (CPU, memory, GPU usage via psutil)

Pipeline System (siwatsystem/pympta.py)

  • MPTA file handling - ZIP archives containing model configurations
  • Hierarchical pipeline execution with detection → classification branching
  • Multi-class detection - Simultaneous detection of multiple classes (Car + Frontal)
  • Parallel processing - Concurrent classification branches with ThreadPoolExecutor
  • Redis action system - Image saving with region cropping and message publishing
  • PostgreSQL integration - Automatic table creation and combined updates
  • Dynamic model loading with GPU optimization
  • Configurable trigger classes and confidence thresholds
  • Branch synchronization - waitForBranches coordination for database updates

Database System (siwatsystem/database.py)

  • DatabaseManager class for PostgreSQL operations
  • Automatic table creation with gas_station_1.car_frontal_info schema
  • Combined update operations with field mapping from branch results
  • Session management with UUID generation
  • Error handling and connection management

Testing & Debugging

  • Protocol test script (test_protocol.py) for WebSocket communication validation
  • Pipeline webcam utility (pipeline_webcam.py) for local testing with visual output
  • RTSP streaming debug tool (debug/rtsp_webcam.py) using GStreamer

Code Conventions & Patterns

Logging

  • Structured logging using Python's logging module
  • File + console output to detector_worker.log
  • Debug level separation for detailed troubleshooting
  • Context-aware messages with camera IDs and model information

Error Handling

  • Graceful failure handling with retry mechanisms (configurable max_retries)
  • Thread-safe operations using locks for streams and models
  • WebSocket disconnect handling with proper cleanup
  • Model loading validation with detailed error reporting

Configuration

  • JSON configuration (config.json) for runtime parameters:
    • poll_interval_ms: Frame processing interval
    • max_streams: Concurrent stream limit
    • target_fps: Target frame rate
    • reconnect_interval_sec: Stream reconnection delay
    • max_retries: Maximum retry attempts (-1 for unlimited)

Threading Model

  • Frame reader threads for each camera stream (RTSP/HTTP)
  • Shared stream optimization - multiple subscriptions can reuse the same camera stream
  • Async WebSocket handling with concurrent task management
  • Thread-safe data structures with proper locking mechanisms

WebSocket Protocol

Message Types

  • subscribe: Start camera stream with model pipeline
  • unsubscribe: Stop camera stream processing
  • requestState: Request current worker status
  • setSessionId: Associate display with session identifier
  • patchSession: Update session data
  • stateReport: Periodic heartbeat with system metrics
  • imageDetection: Detection results with timestamp and model info

Subscription Format

{
  "type": "subscribe",
  "payload": {
    "subscriptionIdentifier": "display-001;cam-001",
    "rtspUrl": "rtsp://...",  // OR snapshotUrl
    "snapshotUrl": "http://...",
    "snapshotInterval": 5000,
    "modelUrl": "http://...model.mpta",
    "modelId": 101,
    "modelName": "Vehicle Detection",
    "cropX1": 100, "cropY1": 200,
    "cropX2": 300, "cropY2": 400
  }
}

Model Pipeline (MPTA) Format

Enhanced Structure

  • ZIP archive containing models and configuration
  • pipeline.json - Main configuration file with Redis + PostgreSQL settings
  • Model files - YOLO .pt files for detection/classification
  • Multi-model support - Detection + multiple classification models

Advanced Pipeline Flow

  1. Multi-class detection stage - YOLO detection of Car + Frontal simultaneously
  2. Validation stage - Check for expected classes (flexible matching)
  3. Database initialization - Create initial record with session_id
  4. Redis actions - Save cropped frontal images with expiration
  5. Parallel classification - Concurrent brand and body type classification
  6. Branch synchronization - Wait for all classification branches to complete
  7. Database update - Combined update with all classification results

Enhanced Branch Configuration

{
  "modelId": "car_frontal_detection_v1",
  "modelFile": "car_frontal_detection_v1.pt",
  "multiClass": true,
  "expectedClasses": ["Car", "Frontal"],
  "triggerClasses": ["Car", "Frontal"],
  "minConfidence": 0.8,
  "actions": [
    {
      "type": "redis_save_image",
      "region": "Frontal",
      "key": "inference:{display_id}:{timestamp}:{session_id}:{filename}",
      "expire_seconds": 600
    }
  ],
  "branches": [
    {
      "modelId": "car_brand_cls_v1",
      "modelFile": "car_brand_cls_v1.pt",
      "parallel": true,
      "crop": true,
      "cropClass": "Frontal",
      "triggerClasses": ["Frontal"],
      "minConfidence": 0.85
    }
  ],
  "parallelActions": [
    {
      "type": "postgresql_update_combined",
      "table": "car_frontal_info",
      "key_field": "session_id",
      "waitForBranches": ["car_brand_cls_v1", "car_bodytype_cls_v1"],
      "fields": {
        "car_brand": "{car_brand_cls_v1.brand}",
        "car_body_type": "{car_bodytype_cls_v1.body_type}"
      }
    }
  ]
}

Stream Management

Shared Streams

  • Multiple subscriptions can share the same camera URL
  • Reference counting prevents premature stream termination
  • Automatic cleanup when last subscription ends

Frame Processing

  • Queue-based buffering with single frame capacity (latest frame only)
  • Configurable polling interval based on target FPS
  • Automatic reconnection with exponential backoff

Development & Testing

Local Development

# Install dependencies
pip install -r requirements.txt

# Run the worker
python app.py

# Test protocol compliance
python test_protocol.py

# Test pipeline with webcam
python pipeline_webcam.py --mpta-file path/to/model.mpta --video 0

Docker Deployment

# Build container
docker build -t detector-worker .

# Run with volume mounts for models
docker run -p 8000:8000 -v ./models:/app/models detector-worker

Testing Commands

  • Protocol testing: python test_protocol.py
  • Pipeline validation: python pipeline_webcam.py --mpta-file <path> --video 0
  • RTSP debugging: python debug/rtsp_webcam.py

Dependencies

  • fastapi[standard]: Web framework with WebSocket support
  • uvicorn: ASGI server
  • torch, torchvision: PyTorch for ML inference
  • ultralytics: YOLO implementation
  • opencv-python: Computer vision operations
  • websockets: WebSocket client/server
  • redis: Redis client for action execution
  • psycopg2-binary: PostgreSQL database adapter
  • scipy: Scientific computing for advanced algorithms
  • filterpy: Kalman filtering and state estimation

Security Considerations

  • Model files are loaded from trusted sources only
  • Redis connections use authentication when configured
  • WebSocket connections handle disconnects gracefully
  • Resource usage is monitored to prevent DoS

Database Integration

Schema Management

The system automatically creates and manages PostgreSQL tables:

CREATE TABLE IF NOT EXISTS gas_station_1.car_frontal_info (
    display_id VARCHAR(255),
    captured_timestamp VARCHAR(255),
    session_id VARCHAR(255) PRIMARY KEY,
    license_character VARCHAR(255) DEFAULT NULL,
    license_type VARCHAR(255) DEFAULT 'No model available',
    car_brand VARCHAR(255) DEFAULT NULL,
    car_model VARCHAR(255) DEFAULT NULL,
    car_body_type VARCHAR(255) DEFAULT NULL,
    created_at TIMESTAMP DEFAULT NOW(),
    updated_at TIMESTAMP DEFAULT NOW()
);

Workflow

  1. Detection: When both "Car" and "Frontal" are detected, create initial database record with UUID session_id
  2. Redis Storage: Save cropped frontal image to Redis with session_id in key
  3. Parallel Processing: Run brand and body type classification concurrently
  4. Synchronization: Wait for all branches to complete using waitForBranches
  5. Database Update: Update record with combined classification results using field mapping

Field Mapping

Templates like {car_brand_cls_v1.brand} are resolved to actual classification results:

  • car_brand_cls_v1.brand → "Honda"
  • car_bodytype_cls_v1.body_type → "Sedan"

Performance Optimizations

  • GPU acceleration when CUDA is available
  • Shared camera streams reduce resource usage
  • Frame queue optimization (single latest frame)
  • Model caching across subscriptions
  • Trigger class filtering for faster inference
  • Parallel processing with ThreadPoolExecutor for classification branches
  • Multi-class detection reduces inference passes
  • Region-based cropping minimizes processing overhead
  • Database connection pooling and prepared statements
  • Redis image storage with automatic expiration