python-detector-worker/CLAUDE.md
2025-07-16 03:24:40 +07:00

6.6 KiB

Python Detector Worker - CLAUDE.md

Project Overview

This is a FastAPI-based computer vision detection worker that processes video streams from RTSP/HTTP sources and runs YOLO-based machine learning pipelines for object detection and classification. The system is designed to work within a larger CMS (Content Management System) architecture.

Architecture & Technology Stack

  • Framework: FastAPI with WebSocket support
  • ML/CV: PyTorch, Ultralytics YOLO, OpenCV
  • Containerization: Docker (Python 3.13-bookworm base)
  • Data Storage: Redis integration for action handling
  • Communication: WebSocket-based real-time protocol

Core Components

Main Application (app.py)

  • FastAPI WebSocket server for real-time communication
  • Multi-camera stream management with shared stream optimization
  • HTTP REST endpoint for image retrieval (/camera/{camera_id}/image)
  • Threading-based frame readers for RTSP streams and HTTP snapshots
  • Model loading and inference using MPTA (Machine Learning Pipeline Archive) format
  • Session management with display identifier mapping
  • Resource monitoring (CPU, memory, GPU usage via psutil)

Pipeline System (siwatsystem/pympta.py)

  • MPTA file handling - ZIP archives containing model configurations
  • Hierarchical pipeline execution with detection → classification branching
  • Redis action system for image saving and message publishing
  • Dynamic model loading with GPU optimization
  • Configurable trigger classes and confidence thresholds

Testing & Debugging

  • Protocol test script (test_protocol.py) for WebSocket communication validation
  • Pipeline webcam utility (pipeline_webcam.py) for local testing with visual output
  • RTSP streaming debug tool (debug/rtsp_webcam.py) using GStreamer

Code Conventions & Patterns

Logging

  • Structured logging using Python's logging module
  • File + console output to detector_worker.log
  • Debug level separation for detailed troubleshooting
  • Context-aware messages with camera IDs and model information

Error Handling

  • Graceful failure handling with retry mechanisms (configurable max_retries)
  • Thread-safe operations using locks for streams and models
  • WebSocket disconnect handling with proper cleanup
  • Model loading validation with detailed error reporting

Configuration

  • JSON configuration (config.json) for runtime parameters:
    • poll_interval_ms: Frame processing interval
    • max_streams: Concurrent stream limit
    • target_fps: Target frame rate
    • reconnect_interval_sec: Stream reconnection delay
    • max_retries: Maximum retry attempts (-1 for unlimited)

Threading Model

  • Frame reader threads for each camera stream (RTSP/HTTP)
  • Shared stream optimization - multiple subscriptions can reuse the same camera stream
  • Async WebSocket handling with concurrent task management
  • Thread-safe data structures with proper locking mechanisms

WebSocket Protocol

Message Types

  • subscribe: Start camera stream with model pipeline
  • unsubscribe: Stop camera stream processing
  • requestState: Request current worker status
  • setSessionId: Associate display with session identifier
  • patchSession: Update session data
  • stateReport: Periodic heartbeat with system metrics
  • imageDetection: Detection results with timestamp and model info

Subscription Format

{
  "type": "subscribe",
  "payload": {
    "subscriptionIdentifier": "display-001;cam-001",
    "rtspUrl": "rtsp://...",  // OR snapshotUrl
    "snapshotUrl": "http://...",
    "snapshotInterval": 5000,
    "modelUrl": "http://...model.mpta",
    "modelId": 101,
    "modelName": "Vehicle Detection",
    "cropX1": 100, "cropY1": 200,
    "cropX2": 300, "cropY2": 400
  }
}

Model Pipeline (MPTA) Format

Structure

  • ZIP archive containing models and configuration
  • pipeline.json - Main configuration file
  • Model files - YOLO .pt files for detection/classification
  • Redis configuration - Optional for action execution

Pipeline Flow

  1. Detection stage - YOLO object detection with bounding boxes
  2. Trigger evaluation - Check if detected class matches trigger conditions
  3. Classification stage - Crop detected region and run classification model
  4. Action execution - Redis operations (image saving, message publishing)

Branch Configuration

{
  "modelId": "detector-v1",
  "modelFile": "detector.pt",
  "triggerClasses": ["car", "truck"],
  "minConfidence": 0.5,
  "branches": [{
    "modelId": "classifier-v1", 
    "modelFile": "classifier.pt",
    "crop": true,
    "triggerClasses": ["car"],
    "minConfidence": 0.3,
    "actions": [...]
  }]
}

Stream Management

Shared Streams

  • Multiple subscriptions can share the same camera URL
  • Reference counting prevents premature stream termination
  • Automatic cleanup when last subscription ends

Frame Processing

  • Queue-based buffering with single frame capacity (latest frame only)
  • Configurable polling interval based on target FPS
  • Automatic reconnection with exponential backoff

Development & Testing

Local Development

# Install dependencies
pip install -r requirements.txt

# Run the worker
python app.py

# Test protocol compliance
python test_protocol.py

# Test pipeline with webcam
python pipeline_webcam.py --mpta-file path/to/model.mpta --video 0

Docker Deployment

# Build container
docker build -t detector-worker .

# Run with volume mounts for models
docker run -p 8000:8000 -v ./models:/app/models detector-worker

Testing Commands

  • Protocol testing: python test_protocol.py
  • Pipeline validation: python pipeline_webcam.py --mpta-file <path> --video 0
  • RTSP debugging: python debug/rtsp_webcam.py

Dependencies

  • fastapi[standard]: Web framework with WebSocket support
  • uvicorn: ASGI server
  • torch, torchvision: PyTorch for ML inference
  • ultralytics: YOLO implementation
  • opencv-python: Computer vision operations
  • websockets: WebSocket client/server
  • redis: Redis client for action execution

Security Considerations

  • Model files are loaded from trusted sources only
  • Redis connections use authentication when configured
  • WebSocket connections handle disconnects gracefully
  • Resource usage is monitored to prevent DoS

Performance Optimizations

  • GPU acceleration when CUDA is available
  • Shared camera streams reduce resource usage
  • Frame queue optimization (single latest frame)
  • Model caching across subscriptions
  • Trigger class filtering for faster inference