python-detector-worker/worker.md

14 KiB

Worker Communication Protocol

This document outlines the WebSocket-based communication protocol between the CMS backend and a detector worker. As a worker developer, your primary responsibility is to implement a WebSocket server that adheres to this protocol.

1. Connection

The worker must run a WebSocket server, preferably on port 8000. The backend system, which is managed by a container orchestration service, will automatically discover and establish a WebSocket connection to your worker.

Upon a successful connection from the backend, you should begin sending stateReport messages as heartbeats.

2. Communication Overview

Communication is bidirectional and asynchronous. All messages are JSON objects with a type field that indicates the message's purpose, and an optional payload field containing the data.

  • Worker -> Backend: You will send messages to the backend to report status, forward detection events, or request changes to session data.
  • Backend -> Worker: The backend will send commands to you to manage camera subscriptions.

3. Dynamic Configuration via MPTA File

To enable modularity and dynamic configuration, the backend will send you a URL to a .mpta file when it issues a subscribe command. This file is a renamed .zip archive that contains everything your worker needs to perform its task.

Your worker is responsible for:

  1. Fetching this file from the provided URL.
  2. Extracting its contents.
  3. Interpreting the contents to configure its internal pipeline.

The contents of the .mpta file are entirely up to the user who configures the model in the CMS. This allows for maximum flexibility. For example, the archive could contain:

  • AI/ML Models: Pre-trained models for libraries like TensorFlow, PyTorch, or ONNX.
  • Configuration Files: A config.json or pipeline.yaml that defines a sequence of operations, specifies model paths, or sets detection thresholds.
  • Scripts: Custom Python scripts for pre-processing or post-processing.
  • API Integration Details: A JSON file with endpoint information and credentials for interacting with third-party detection services.

Essentially, the .mpta file is a self-contained package that tells your worker how to process the video stream for a given subscription.

4. Messages from Worker to Backend

These are the messages your worker is expected to send to the backend.

4.1. State Report (Heartbeat)

This message is crucial for the backend to monitor your worker's health and status, including GPU usage.

  • Type: stateReport
  • When to Send: Periodically (e.g., every 2 seconds) after a connection is established.

Payload:

{
  "type": "stateReport",
  "cpuUsage": 75.5,
  "memoryUsage": 40.2,
  "gpuUsage": 60.0,
  "gpuMemoryUsage": 25.1,
  "cameraConnections": [
    {
      "subscriptionIdentifier": "display-001;cam-001",
      "modelId": 101,
      "modelName": "General Object Detection",
      "online": true,
      "cropX1": 100,
      "cropY1": 200,
      "cropX2": 300,
      "cropY2": 400
    }
  ]
}

Note:

  • cropX1, cropY1, cropX2, cropY2 (optional, integer) should be included in each camera connection to indicate the crop coordinates for that subscription.

4.2. Image Detection

Sent when the worker detects a relevant object. The detection object should be flat and contain key-value pairs corresponding to the detected attributes.

  • Type: imageDetection

Payload Example:

{
  "type": "imageDetection",
  "subscriptionIdentifier": "display-001;cam-001",
  "timestamp": "2025-07-14T12:34:56.789Z",
  "data": {
    "detection": {
      "carModel": "Civic",
      "carBrand": "Honda",
      "carYear": 2023,
      "bodyType": "Sedan",
      "licensePlateText": "ABCD1234",
      "licensePlateConfidence": 0.95
    },
    "modelId": 101,
    "modelName": "US-LPR-and-Vehicle-ID"
  }
}

4.3. Patch Session

Note: Patch messages are only used when the worker can't keep up and needs to retroactively send detections. Normally, detections should be sent in real-time using imageDetection messages. Use patchSession only to update session data after the fact.

Allows the worker to request a modification to an active session's data. The data payload must be a partial object of the DisplayPersistentData structure.

  • Type: patchSession

Payload Example:

{
  "type": "patchSession",
  "sessionId": 12345,
  "data": {
    "currentCar": {
        "carModel": "Civic",
        "carBrand": "Honda",
        "licensePlateText": "ABCD1234"
    }
  }
}

The backend will respond with a patchSessionResult command.

DisplayPersistentData Structure

The data object in the patchSession message is merged with the existing DisplayPersistentData on the backend. Here is its structure:

interface DisplayPersistentData {
    progressionStage: "welcome" | "car_fueling" | "car_waitpayment" | "car_postpayment" | null;
    qrCode: string | null;
    adsPlayback: {
        playlistSlotOrder: number; // The 'order' of the current slot
        adsId: number | null;
        adsUrl: string | null;
    } | null;
    currentCar: {
        carModel?: string;
        carBrand?: string;
        carYear?: number;
        bodyType?: string;
        licensePlateText?: string;
        licensePlateType?: string;
    } | null;
    fuelPump: { /* FuelPumpData structure */ } | null;
    weatherData: { /* WeatherResponse structure */ } | null;
    sessionId: number | null;
}

Patching Behavior

  • The patch is a deep merge.
  • undefined values are ignored.
  • null values will set the corresponding field to null.
  • Nested objects are merged recursively.

5. Commands from Backend to Worker

These are the commands your worker will receive from the backend.

5.1. Subscribe to Camera

Instructs the worker to process a camera's RTSP stream using the configuration from the specified .mpta file.

  • Type: subscribe

Payload:

{
  "type": "subscribe",
  "payload": {
    "subscriptionIdentifier": "display-001;cam-002",
    "rtspUrl": "rtsp://user:pass@host:port/stream",
    "snapshotUrl": "http://go2rtc/snapshot/1",
    "snapshotInterval": 5000,
    "modelUrl": "http://storage/models/us-lpr.mpta",
    "modelName": "US-LPR-and-Vehicle-ID",
    "modelId": 102,
    "cropX1": 100,
    "cropY1": 200,
    "cropX2": 300,
    "cropY2": 400
  }
}

Note:

  • cropX1, cropY1, cropX2, cropY2 (optional, integer) specify the crop coordinates for the camera stream. These values are configured per display and passed in the subscription payload. If not provided, the worker should process the full frame.

Important: If multiple displays are bound to the same camera, your worker must ensure that only one stream is opened per camera. When you receive multiple subscriptions for the same camera (with different subscriptionIdentifier values), you should:

  • Open the RTSP stream once for that camera if using RTSP.
  • Capture each snapshot only once per cycle, and reuse it for all display subscriptions sharing that camera.
  • Capture each frame/image only once per cycle.
  • Reuse the same captured image and snapshot for all display subscriptions that share the camera, processing and routing detection results separately for each display as needed. This avoids unnecessary load and bandwidth usage, and ensures consistent detection results and snapshots across all displays sharing the same camera.

5.2. Unsubscribe from Camera

Instructs the worker to stop processing a camera's stream.

  • Type: unsubscribe

Payload:

{
  "type": "unsubscribe",
  "payload": {
    "subscriptionIdentifier": "display-001;cam-002"
  }
}

5.3. Request State

Direct request for the worker's current state. Respond with a stateReport message.

  • Type: requestState

Payload:

{
  "type": "requestState"
}

5.4. Patch Session Result

Backend's response to a patchSession message.

  • Type: patchSessionResult

Payload:

{
  "type": "patchSessionResult",
  "payload": {
    "sessionId": 12345,
    "success": true,
    "message": "Session updated successfully."
  }
}

5.5. Set Session ID

Allows the backend to instruct the worker to associate a session ID with a subscription. This is useful for linking detection events to a specific session. The session ID can be null to indicate no active session.

  • Type: setSessionId

Payload:

{
  "type": "setSessionId",
  "payload": {
    "displayIdentifier": "display-001",
    "sessionId": 12345
  }
}

Or to clear the session:

{
  "type": "setSessionId",
  "payload": {
    "displayIdentifier": "display-001",
    "sessionId": null
  }
}

Note:

  • The worker should store the session ID for the given subscription and use it in subsequent detection or patch messages as appropriate. If sessionId is null, the worker should treat the subscription as having no active session.

Subscription Identifier Format

The subscriptionIdentifier used in all messages is constructed as:

displayIdentifier;cameraIdentifier

This uniquely identifies a camera subscription for a specific display.

Session ID Association

When the backend sends a setSessionId command, it will only provide the displayIdentifier (not the full subscriptionIdentifier).

Worker Responsibility:

  • The worker must match the displayIdentifier to all active subscriptions for that display (i.e., all subscriptionIdentifier values that start with displayIdentifier;).
  • The worker should set or clear the session ID for all matching subscriptions.

6. Example Communication Log

This section shows a typical sequence of messages between the backend and the worker. Patch messages are not included, as they are only used when the worker cannot keep up.

Note: Unsubscribe is triggered when a user removes a camera or when the node is too heavily loaded and needs rebalancing.

  1. Connection Established & Heartbeat
    • Worker -> Backend
    {
      "type": "stateReport",
      "cpuUsage": 70.2,
      "memoryUsage": 38.1,
      "gpuUsage": 55.0,
      "gpuMemoryUsage": 20.0,
      "cameraConnections": []
    }
    
  2. Backend Subscribes Camera
    • Backend -> Worker
    {
      "type": "subscribe",
      "payload": {
        "subscriptionIdentifier": "display-001;entry-cam-01",
        "rtspUrl": "rtsp://192.168.1.100/stream1",
        "modelUrl": "http://storage/models/vehicle-id.mpta",
        "modelName": "Vehicle Identification",
        "modelId": 201
      }
    }
    
  3. Worker Acknowledges in Heartbeat
    • Worker -> Backend
    {
      "type": "stateReport",
      "cpuUsage": 72.5,
      "memoryUsage": 39.0,
      "gpuUsage": 57.0,
      "gpuMemoryUsage": 21.0,
      "cameraConnections": [
        {
          "subscriptionIdentifier": "display-001;entry-cam-01",
          "modelId": 201,
          "modelName": "Vehicle Identification",
          "online": true
        }
      ]
    }
    
  4. Worker Detects a Car
    • Worker -> Backend
    {
      "type": "imageDetection",
      "subscriptionIdentifier": "display-001;entry-cam-01",
      "timestamp": "2025-07-15T10:00:00.000Z",
      "data": {
        "detection": {
          "carBrand": "Honda",
          "carModel": "CR-V",
          "bodyType": "SUV",
          "licensePlateText": "GEMINI-AI",
          "licensePlateConfidence": 0.98
        },
        "modelId": 201,
        "modelName": "Vehicle Identification"
      }
    }
    
    • Worker -> Backend
    {
      "type": "imageDetection",
      "subscriptionIdentifier": "display-001;entry-cam-01",
      "timestamp": "2025-07-15T10:00:01.000Z",
      "data": {
        "detection": {
          "carBrand": "Toyota",
          "carModel": "Corolla",
          "bodyType": "Sedan",
          "licensePlateText": "CMS-1234",
          "licensePlateConfidence": 0.97
        },
        "modelId": 201,
        "modelName": "Vehicle Identification"
      }
    }
    
    • Worker -> Backend
    {
      "type": "imageDetection",
      "subscriptionIdentifier": "display-001;entry-cam-01",
      "timestamp": "2025-07-15T10:00:02.000Z",
      "data": {
        "detection": {
          "carBrand": "Ford",
          "carModel": "Focus",
          "bodyType": "Hatchback",
          "licensePlateText": "CMS-5678",
          "licensePlateConfidence": 0.96
        },
        "modelId": 201,
        "modelName": "Vehicle Identification"
      }
    }
    
  5. Backend Unsubscribes Camera
    • Backend -> Worker
    {
      "type": "unsubscribe",
      "payload": {
        "subscriptionIdentifier": "display-001;entry-cam-01"
      }
    }
    
  6. Worker Acknowledges Unsubscription
    • Worker -> Backend
    {
      "type": "stateReport",
      "cpuUsage": 68.0,
      "memoryUsage": 37.0,
      "gpuUsage": 50.0,
      "gpuMemoryUsage": 18.0,
      "cameraConnections": []
    }
    

7. HTTP API: Image Retrieval

In addition to the WebSocket protocol, the worker exposes an HTTP endpoint for retrieving the latest image frame from a camera.

Endpoint

GET /camera/{camera_id}/image
  • camera_id: The full subscriptionIdentifier (e.g., display-001;cam-001).

Response

  • Success (200): Returns the latest JPEG image from the camera stream.

    • Content-Type: image/jpeg
    • Binary JPEG data.
  • Error (404): If the camera is not found or no frame is available.

    • JSON error response.
  • Error (500): Internal server error.

Example Request

GET /camera/display-001;cam-001/image

Example Response

  • Headers:
    Content-Type: image/jpeg
    
  • Body: Binary JPEG image.

Notes

  • The endpoint returns the most recent frame available for the specified camera subscription.
  • If multiple displays share the same camera, each subscription has its own buffer; the endpoint uses the buffer for the given camera_id.
  • This API is useful for debugging, monitoring, or integrating with external systems that require direct image access.