python-detector-worker/worker.md
2025-07-13 23:58:01 +07:00

11 KiB

Worker Communication Protocol

This document outlines the WebSocket-based communication protocol between the CMS backend and a detector worker. As a worker developer, your primary responsibility is to implement a WebSocket server that adheres to this protocol.

1. Connection

The worker must run a WebSocket server, preferably on port 8000. The backend system, which is managed by a container orchestration service, will automatically discover and establish a WebSocket connection to your worker.

Upon a successful connection from the backend, you should begin sending stateReport messages as heartbeats.

2. Communication Overview

Communication is bidirectional and asynchronous. All messages are JSON objects with a type field that indicates the message's purpose, and an optional payload field containing the data.

  • Worker -> Backend: You will send messages to the backend to report status, forward detection events, or request changes to session data.
  • Backend -> Worker: The backend will send commands to you to manage camera subscriptions.

3. Dynamic Configuration via MPTA File

To enable modularity and dynamic configuration, the backend will send you a URL to a .mpta file when it issues a subscribe command. This file is a renamed .zip archive that contains everything your worker needs to perform its task.

Your worker is responsible for:

  1. Fetching this file from the provided URL.
  2. Extracting its contents.
  3. Interpreting the contents to configure its internal pipeline.

The contents of the .mpta file are entirely up to the user who configures the model in the CMS. This allows for maximum flexibility. For example, the archive could contain:

  • AI/ML Models: Pre-trained models for libraries like TensorFlow, PyTorch, or ONNX.
  • Configuration Files: A config.json or pipeline.yaml that defines a sequence of operations, specifies model paths, or sets detection thresholds.
  • Scripts: Custom Python scripts for pre-processing or post-processing.
  • API Integration Details: A JSON file with endpoint information and credentials for interacting with third-party detection services.

Essentially, the .mpta file is a self-contained package that tells your worker how to process the video stream for a given subscription.

4. Messages from Worker to Backend

These are the messages your worker is expected to send to the backend.

4.1. State Report (Heartbeat)

This message is crucial for the backend to monitor your worker's health and status, including GPU usage.

  • Type: stateReport
  • When to Send: Periodically (e.g., every 2 seconds) after a connection is established.

Payload:

{
  "type": "stateReport",
  "cpuUsage": 75.5,
  "memoryUsage": 40.2,
  "gpuUsage": 60.0,
  "gpuMemoryUsage": 25.1,
  "cameraConnections": [
    {
      "subscriptionIdentifier": "display-001;cam-001",
      "modelId": 101,
      "modelName": "General Object Detection",
      "online": true
    }
  ]
}

4.2. Image Detection

Sent when the worker detects a relevant object. The detection object should be flat and contain key-value pairs corresponding to the detected attributes.

  • Type: imageDetection

Payload Example:

{
  "type": "imageDetection",
  "subscriptionIdentifier": "display-001;cam-001",
  "timestamp": "2025-07-14T12:34:56.789Z",
  "data": {
    "detection": {
      "carModel": "Civic",
      "carBrand": "Honda",
      "carYear": 2023,
      "bodyType": "Sedan",
      "licensePlateText": "ABCD1234",
      "licensePlateConfidence": 0.95
    },
    "modelId": 101,
    "modelName": "US-LPR-and-Vehicle-ID"
  }
}

4.3. Patch Session

Note: Patch messages are only used when the worker can't keep up and needs to retroactively send detections. Normally, detections should be sent in real-time using imageDetection messages. Use patchSession only to update session data after the fact.

Allows the worker to request a modification to an active session's data. The data payload must be a partial object of the DisplayPersistentData structure.

  • Type: patchSession

Payload Example:

{
  "type": "patchSession",
  "sessionId": 12345,
  "data": {
    "currentCar": {
        "carModel": "Civic",
        "carBrand": "Honda",
        "licensePlateText": "ABCD1234"
    }
  }
}

The backend will respond with a patchSessionResult command.

DisplayPersistentData Structure

The data object in the patchSession message is merged with the existing DisplayPersistentData on the backend. Here is its structure:

interface DisplayPersistentData {
    progressionStage: "welcome" | "car_fueling" | "car_waitpayment" | "car_postpayment" | null;
    qrCode: string | null;
    adsPlayback: {
        playlistSlotOrder: number; // The 'order' of the current slot
        adsId: number | null;
        adsUrl: string | null;
    } | null;
    currentCar: {
        carModel?: string;
        carBrand?: string;
        carYear?: number;
        bodyType?: string;
        licensePlateText?: string;
        licensePlateType?: string;
    } | null;
    fuelPump: { /* FuelPumpData structure */ } | null;
    weatherData: { /* WeatherResponse structure */ } | null;
    sessionId: number | null;
}

Patching Behavior

  • The patch is a deep merge.
  • undefined values are ignored.
  • null values will set the corresponding field to null.
  • Nested objects are merged recursively.

5. Commands from Backend to Worker

These are the commands your worker will receive from the backend.

5.1. Subscribe to Camera

Instructs the worker to process a camera's RTSP stream using the configuration from the specified .mpta file.

  • Type: subscribe

Payload:

{
  "type": "subscribe",
  "payload": {
    "subscriptionIdentifier": "display-001;cam-002",
    "rtspUrl": "rtsp://user:pass@host:port/stream",
    "snapshotUrl": "http://go2rtc/snapshot/1",
    "snapshotInterval": 5000,
    "modelUrl": "http://storage/models/us-lpr.mpta",
    "modelName": "US-LPR-and-Vehicle-ID",
    "modelId": 102,
    "cropX": 100,
    "cropY": 200
  }
}

Note:

  • cropX and cropY (optional, integer) specify the initial crop coordinates for the camera stream. These values are configured per display and passed in the subscription payload. If not provided, the worker should process the full frame.

Important: If multiple displays are bound to the same camera, your worker must ensure that only one stream is opened per camera. When you receive multiple subscriptions for the same camera (with different subscriptionIdentifier values), you should:

  • Open the RTSP stream once for that camera if using RTSP.
  • Capture each snapshot only once per cycle, and reuse it for all display subscriptions sharing that camera.
  • Capture each frame/image only once per cycle.
  • Reuse the same captured image and snapshot for all display subscriptions that share the camera, processing and routing detection results separately for each display as needed.

This avoids unnecessary load and bandwidth usage, and ensures consistent detection results and snapshots across all displays sharing the same camera.

5.2. Unsubscribe from Camera

Instructs the worker to stop processing a camera's stream.

  • Type: unsubscribe

Payload:

{
  "type": "unsubscribe",
  "payload": {
    "subscriptionIdentifier": "display-001;cam-002"
  }
}

5.3. Request State

Direct request for the worker's current state. Respond with a stateReport message.

  • Type: requestState

Payload:

{
  "type": "requestState"
}

5.4. Patch Session Result

Backend's response to a patchSession message.

  • Type: patchSessionResult

Payload:

{
  "type": "patchSessionResult",
  "payload": {
    "sessionId": 12345,
    "success": true,
    "message": "Session updated successfully."
  }
}

6. Example Communication Log

This section shows a typical sequence of messages between the backend and the worker. Patch messages are not included, as they are only used when the worker cannot keep up.

Note: Unsubscribe is triggered when a user removes a camera or when the node is too heavily loaded and needs rebalancing.

  1. Connection Established & Heartbeat
    • Worker -> Backend
    {
      "type": "stateReport",
      "cpuUsage": 70.2,
      "memoryUsage": 38.1,
      "gpuUsage": 55.0,
      "gpuMemoryUsage": 20.0,
      "cameraConnections": []
    }
    
  2. Backend Subscribes Camera
    • Backend -> Worker
    {
      "type": "subscribe",
      "payload": {
        "subscriptionIdentifier": "display-001;entry-cam-01",
        "rtspUrl": "rtsp://192.168.1.100/stream1",
        "modelUrl": "http://storage/models/vehicle-id.mpta",
        "modelName": "Vehicle Identification",
        "modelId": 201
      }
    }
    
  3. Worker Acknowledges in Heartbeat
    • Worker -> Backend
    {
      "type": "stateReport",
      "cpuUsage": 72.5,
      "memoryUsage": 39.0,
      "gpuUsage": 57.0,
      "gpuMemoryUsage": 21.0,
      "cameraConnections": [
        {
          "subscriptionIdentifier": "display-001;entry-cam-01",
          "modelId": 201,
          "modelName": "Vehicle Identification",
          "online": true
        }
      ]
    }
    
  4. Worker Detects a Car
    • Worker -> Backend
    {
      "type": "imageDetection",
      "subscriptionIdentifier": "display-001;entry-cam-01",
      "timestamp": "2025-07-15T10:00:00.000Z",
      "data": {
        "detection": {
          "carBrand": "Honda",
          "carModel": "CR-V",
          "bodyType": "SUV",
          "licensePlateText": "GEMINI-AI",
          "licensePlateConfidence": 0.98
        },
        "modelId": 201,
        "modelName": "Vehicle Identification"
      }
    }
    
    • Worker -> Backend
    {
      "type": "imageDetection",
      "subscriptionIdentifier": "display-001;entry-cam-01",
      "timestamp": "2025-07-15T10:00:01.000Z",
      "data": {
        "detection": {
          "carBrand": "Toyota",
          "carModel": "Corolla",
          "bodyType": "Sedan",
          "licensePlateText": "CMS-1234",
          "licensePlateConfidence": 0.97
        },
        "modelId": 201,
        "modelName": "Vehicle Identification"
      }
    }
    
    • Worker -> Backend
    {
      "type": "imageDetection",
      "subscriptionIdentifier": "display-001;entry-cam-01",
      "timestamp": "2025-07-15T10:00:02.000Z",
      "data": {
        "detection": {
          "carBrand": "Ford",
          "carModel": "Focus",
          "bodyType": "Hatchback",
          "licensePlateText": "CMS-5678",
          "licensePlateConfidence": 0.96
        },
        "modelId": 201,
        "modelName": "Vehicle Identification"
      }
    }
    
  5. Backend Unsubscribes Camera
    • Backend -> Worker
    {
      "type": "unsubscribe",
      "payload": {
        "subscriptionIdentifier": "display-001;entry-cam-01"
      }
    }
    
  6. Worker Acknowledges Unsubscription
    • Worker -> Backend
    {
      "type": "stateReport",
      "cpuUsage": 68.0,
      "memoryUsage": 37.0,
      "gpuUsage": 50.0,
      "gpuMemoryUsage": 18.0,
      "cameraConnections": []
    }