9.8 KiB
Worker Communication Protocol
This document outlines the WebSocket-based communication protocol between the CMS backend and a detector worker. As a worker developer, your primary responsibility is to implement a WebSocket server that adheres to this protocol.
1. Connection
The worker must run a WebSocket server, preferably on port 8000
. The backend system, which is managed by a container orchestration service, will automatically discover and establish a WebSocket connection to your worker.
Upon a successful connection from the backend, you should begin sending stateReport
messages as heartbeats.
2. Communication Overview
Communication is bidirectional and asynchronous. All messages are JSON objects with a type
field that indicates the message's purpose, and an optional payload
field containing the data.
- Worker -> Backend: You will send messages to the backend to report status, forward detection events, or request changes to session data.
- Backend -> Worker: The backend will send commands to you to manage camera subscriptions.
3. Dynamic Configuration via MPTA File
To enable modularity and dynamic configuration, the backend will send you a URL to a .mpta
file when it issues a subscribe
command. This file is a renamed .zip
archive that contains everything your worker needs to perform its task.
Your worker is responsible for:
- Fetching this file from the provided URL.
- Extracting its contents.
- Interpreting the contents to configure its internal pipeline.
The contents of the .mpta
file are entirely up to the user who configures the model in the CMS. This allows for maximum flexibility. For example, the archive could contain:
- AI/ML Models: Pre-trained models for libraries like TensorFlow, PyTorch, or ONNX.
- Configuration Files: A
config.json
orpipeline.yaml
that defines a sequence of operations, specifies model paths, or sets detection thresholds. - Scripts: Custom Python scripts for pre-processing or post-processing.
- API Integration Details: A JSON file with endpoint information and credentials for interacting with third-party detection services.
Essentially, the .mpta
file is a self-contained package that tells your worker how to process the video stream for a given subscription.
4. Messages from Worker to Backend
These are the messages your worker is expected to send to the backend.
4.1. State Report (Heartbeat)
This message is crucial for the backend to monitor your worker's health and status, including GPU usage.
- Type:
stateReport
- When to Send: Periodically (e.g., every 2 seconds) after a connection is established.
Payload:
{
"type": "stateReport",
"cpuUsage": 75.5,
"memoryUsage": 40.2,
"gpuUsage": 60.0,
"gpuMemoryUsage": 25.1,
"cameraConnections": [
{
"cameraIdentifier": "cam-001",
"modelId": 101,
"modelName": "General Object Detection",
"online": true
}
]
}
4.2. Image Detection
Sent when the worker detects a relevant object. The detection
object should be flat and contain key-value pairs corresponding to the detected attributes.
- Type:
imageDetection
Payload Example:
{
"type": "imageDetection",
"cameraIdentifier": "cam-001",
"timestamp": "2025-07-14T12:34:56.789Z",
"data": {
"detection": {
"carModel": "Civic",
"carBrand": "Honda",
"carYear": 2023,
"bodyType": "Sedan",
"licensePlateText": "ABCD1234"
},
"modelId": 101,
"modelName": "US-LPR-and-Vehicle-ID"
}
}
4.3. Patch Session
Note: Patch messages are only used when the worker can't keep up and needs to retroactively send detections. Normally, detections should be sent in real-time using
imageDetection
messages. UsepatchSession
only to update session data after the fact.
Allows the worker to request a modification to an active session's data. The data
payload must be a partial object of the DisplayPersistentData
structure.
- Type:
patchSession
Payload Example:
{
"type": "patchSession",
"sessionId": 12345,
"data": {
"currentCar": {
"carModel": "Civic",
"carBrand": "Honda",
"licensePlateText": "ABCD1234"
}
}
}
The backend will respond with a patchSessionResult
command.
DisplayPersistentData
Structure
The data
object in the patchSession
message is merged with the existing DisplayPersistentData
on the backend. Here is its structure:
interface DisplayPersistentData {
progressionStage: "welcome" | "car_fueling" | "car_waitpayment" | "car_postpayment" | null;
qrCode: string | null;
adsPlayback: {
playlistSlotOrder: number; // The 'order' of the current slot
adsId: number | null;
adsUrl: string | null;
} | null;
currentCar: {
carModel?: string;
carBrand?: string;
carYear?: number;
bodyType?: string;
licensePlateText?: string;
licensePlateType?: string;
} | null;
fuelPump: { /* FuelPumpData structure */ } | null;
weatherData: { /* WeatherResponse structure */ } | null;
sessionId: number | null;
}
Patching Behavior
- The patch is a deep merge.
undefined
values are ignored.null
values will set the corresponding field tonull
.- Nested objects are merged recursively.
5. Commands from Backend to Worker
These are the commands your worker will receive from the backend.
5.1. Subscribe to Camera
Instructs the worker to process a camera's RTSP stream using the configuration from the specified .mpta
file.
- Type:
subscribe
Payload:
{
"type": "subscribe",
"payload": {
"rtspUrl": "rtsp://user:pass@host:port/stream",
"cameraIdentifier": "cam-002",
"snapshotUrl": "http://go2rtc/snapshot/1",
"snapshotInterval": 5000,
"modelUrl": "http://storage/models/us-lpr.mpta",
"modelName": "US-LPR-and-Vehicle-ID",
"modelId": 102
}
}
5.2. Unsubscribe from Camera
Instructs the worker to stop processing a camera's stream.
- Type:
unsubscribe
Payload:
{
"type": "unsubscribe",
"payload": {
"cameraIdentifier": "cam-002"
}
}
5.3. Request State
Direct request for the worker's current state. Respond with a stateReport
message.
- Type:
requestState
Payload:
{
"type": "requestState"
}
5.4. Patch Session Result
Backend's response to a patchSession
message.
- Type:
patchSessionResult
Payload:
{
"type": "patchSessionResult",
"payload": {
"sessionId": 12345,
"success": true,
"message": "Session updated successfully."
}
}
6. Example Communication Log
This section shows a typical sequence of messages between the backend and the worker. Patch messages are not included, as they are only used when the worker cannot keep up.
Note: Unsubscribe is triggered when a user removes a camera or when the node is too heavily loaded and needs rebalancing.
- Connection Established & Heartbeat
- Worker -> Backend
{ "type": "stateReport", "cpuUsage": 70.2, "memoryUsage": 38.1, "gpuUsage": 55.0, "gpuMemoryUsage": 20.0, "cameraConnections": [] }
- Backend Subscribes Camera
- Backend -> Worker
{ "type": "subscribe", "payload": { "rtspUrl": "rtsp://192.168.1.100/stream1", "cameraIdentifier": "entry-cam-01", "modelUrl": "http://storage/models/vehicle-id.mpta", "modelName": "Vehicle Identification", "modelId": 201 } }
- Worker Acknowledges in Heartbeat
- Worker -> Backend
{ "type": "stateReport", "cpuUsage": 72.5, "memoryUsage": 39.0, "gpuUsage": 57.0, "gpuMemoryUsage": 21.0, "cameraConnections": [ { "cameraIdentifier": "entry-cam-01", "modelId": 201, "modelName": "Vehicle Identification", "online": true } ] }
- Worker Detects a Car
- Worker -> Backend
{ "type": "imageDetection", "cameraIdentifier": "entry-cam-01", "timestamp": "2025-07-15T10:00:00.000Z", "data": { "detection": { "carBrand": "Honda", "carModel": "CR-V", "bodyType": "SUV", "licensePlateText": "ABCD1234" }, "modelId": 201, "modelName": "Vehicle Identification" } }
- Worker -> Backend
{ "type": "imageDetection", "cameraIdentifier": "entry-cam-01", "timestamp": "2025-07-15T10:00:01.000Z", "data": { "detection": { "carBrand": "Toyota", "carModel": "Corolla", "bodyType": "Sedan", "licensePlateText": "CMS-1234", }, "modelId": 201, "modelName": "Vehicle Identification" } }
- Worker -> Backend
{ "type": "imageDetection", "cameraIdentifier": "entry-cam-01", "timestamp": "2025-07-15T10:00:02.000Z", "data": { "detection": { "carBrand": "Ford", "carModel": "Focus", "bodyType": "Hatchback", "licensePlateText": "CMS-5678", }, "modelId": 201, "modelName": "Vehicle Identification" } }
- Backend Unsubscribes Camera
- Backend -> Worker
{ "type": "unsubscribe", "payload": { "cameraIdentifier": "entry-cam-01" } }
- Worker Acknowledges Unsubscription
- Worker -> Backend
{ "type": "stateReport", "cpuUsage": 68.0, "memoryUsage": 37.0, "gpuUsage": 50.0, "gpuMemoryUsage": 18.0, "cameraConnections": [] }