add WebSocket communication protocol documentation for detector worker; outline connection, message types, and dynamic configuration
Some checks are pending
Build Backend Application and Docker Image / build-docker (push) Waiting to run
Some checks are pending
Build Backend Application and Docker Image / build-docker (push) Waiting to run
This commit is contained in:
parent
22370e2040
commit
5cf1bf08cc
1 changed files with 362 additions and 0 deletions
362
worker.md
Normal file
362
worker.md
Normal file
|
@ -0,0 +1,362 @@
|
|||
# Worker Communication Protocol
|
||||
|
||||
This document outlines the WebSocket-based communication protocol between the CMS backend and a detector worker. As a worker developer, your primary responsibility is to implement a WebSocket server that adheres to this protocol.
|
||||
|
||||
## 1. Connection
|
||||
|
||||
The worker must run a WebSocket server, preferably on port `8000`. The backend system, which is managed by a container orchestration service, will automatically discover and establish a WebSocket connection to your worker.
|
||||
|
||||
Upon a successful connection from the backend, you should begin sending `stateReport` messages as heartbeats.
|
||||
|
||||
## 2. Communication Overview
|
||||
|
||||
Communication is bidirectional and asynchronous. All messages are JSON objects with a `type` field that indicates the message's purpose, and an optional `payload` field containing the data.
|
||||
|
||||
- **Worker -> Backend:** You will send messages to the backend to report status, forward detection events, or request changes to session data.
|
||||
- **Backend -> Worker:** The backend will send commands to you to manage camera subscriptions.
|
||||
|
||||
## 3. Dynamic Configuration via MPTA File
|
||||
|
||||
To enable modularity and dynamic configuration, the backend will send you a URL to a `.mpta` file when it issues a `subscribe` command. This file is a renamed `.zip` archive that contains everything your worker needs to perform its task.
|
||||
|
||||
**Your worker is responsible for:**
|
||||
1. Fetching this file from the provided URL.
|
||||
2. Extracting its contents.
|
||||
3. Interpreting the contents to configure its internal pipeline.
|
||||
|
||||
**The contents of the `.mpta` file are entirely up to the user who configures the model in the CMS.** This allows for maximum flexibility. For example, the archive could contain:
|
||||
|
||||
* **AI/ML Models:** Pre-trained models for libraries like TensorFlow, PyTorch, or ONNX.
|
||||
* **Configuration Files:** A `config.json` or `pipeline.yaml` that defines a sequence of operations, specifies model paths, or sets detection thresholds.
|
||||
* **Scripts:** Custom Python scripts for pre-processing or post-processing.
|
||||
* **API Integration Details:** A JSON file with endpoint information and credentials for interacting with third-party detection services.
|
||||
|
||||
Essentially, the `.mpta` file is a self-contained package that tells your worker *how* to process the video stream for a given subscription.
|
||||
|
||||
## 4. Messages from Worker to Backend
|
||||
|
||||
These are the messages your worker is expected to send to the backend.
|
||||
|
||||
### 4.1. State Report (Heartbeat)
|
||||
|
||||
This message is crucial for the backend to monitor your worker's health and status, including GPU usage.
|
||||
|
||||
- **Type:** `stateReport`
|
||||
- **When to Send:** Periodically (e.g., every 2 seconds) after a connection is established.
|
||||
|
||||
**Payload:**
|
||||
|
||||
```json
|
||||
{
|
||||
"type": "stateReport",
|
||||
"cpuUsage": 75.5,
|
||||
"memoryUsage": 40.2,
|
||||
"gpuUsage": 60.0,
|
||||
"gpuMemoryUsage": 25.1,
|
||||
"cameraConnections": [
|
||||
{
|
||||
"cameraIdentifier": "cam-001",
|
||||
"modelId": 101,
|
||||
"modelName": "General Object Detection",
|
||||
"online": true
|
||||
}
|
||||
]
|
||||
}
|
||||
```
|
||||
|
||||
### 4.2. Image Detection
|
||||
|
||||
Sent when the worker detects a relevant object. The `detection` object should be flat and contain key-value pairs corresponding to the detected attributes.
|
||||
|
||||
- **Type:** `imageDetection`
|
||||
|
||||
**Payload Example:**
|
||||
|
||||
```json
|
||||
{
|
||||
"type": "imageDetection",
|
||||
"cameraIdentifier": "cam-001",
|
||||
"timestamp": "2025-07-14T12:34:56.789Z",
|
||||
"data": {
|
||||
"detection": {
|
||||
"carModel": "Civic",
|
||||
"carBrand": "Honda",
|
||||
"carYear": 2023,
|
||||
"bodyType": "Sedan",
|
||||
"licensePlateText": "ABCD1234",
|
||||
"licensePlateConfidence": 0.95
|
||||
},
|
||||
"modelId": 101,
|
||||
"modelName": "US-LPR-and-Vehicle-ID"
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### 4.3. Patch Session
|
||||
|
||||
> **Note:** Patch messages are only used when the worker can't keep up and needs to retroactively send detections. Normally, detections should be sent in real-time using `imageDetection` messages. Use `patchSession` only to update session data after the fact.
|
||||
|
||||
Allows the worker to request a modification to an active session's data. The `data` payload must be a partial object of the `DisplayPersistentData` structure.
|
||||
|
||||
- **Type:** `patchSession`
|
||||
|
||||
**Payload Example:**
|
||||
|
||||
```json
|
||||
{
|
||||
"type": "patchSession",
|
||||
"sessionId": 12345,
|
||||
"data": {
|
||||
"currentCar": {
|
||||
"carModel": "Civic",
|
||||
"carBrand": "Honda",
|
||||
"licensePlateText": "ABCD1234"
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
The backend will respond with a `patchSessionResult` command.
|
||||
|
||||
#### `DisplayPersistentData` Structure
|
||||
|
||||
The `data` object in the `patchSession` message is merged with the existing `DisplayPersistentData` on the backend. Here is its structure:
|
||||
|
||||
```typescript
|
||||
interface DisplayPersistentData {
|
||||
progressionStage: "welcome" | "car_fueling" | "car_waitpayment" | "car_postpayment" | null;
|
||||
qrCode: string | null;
|
||||
adsPlayback: {
|
||||
playlistSlotOrder: number; // The 'order' of the current slot
|
||||
adsId: number | null;
|
||||
adsUrl: string | null;
|
||||
} | null;
|
||||
currentCar: {
|
||||
carModel?: string;
|
||||
carBrand?: string;
|
||||
carYear?: number;
|
||||
bodyType?: string;
|
||||
licensePlateText?: string;
|
||||
licensePlateType?: string;
|
||||
} | null;
|
||||
fuelPump: { /* FuelPumpData structure */ } | null;
|
||||
weatherData: { /* WeatherResponse structure */ } | null;
|
||||
sessionId: number | null;
|
||||
}
|
||||
```
|
||||
|
||||
#### Patching Behavior
|
||||
|
||||
- The patch is a **deep merge**.
|
||||
- **`undefined`** values are ignored.
|
||||
- **`null`** values will set the corresponding field to `null`.
|
||||
- Nested objects are merged recursively.
|
||||
|
||||
## 5. Commands from Backend to Worker
|
||||
|
||||
These are the commands your worker will receive from the backend.
|
||||
|
||||
### 5.1. Subscribe to Camera
|
||||
|
||||
Instructs the worker to process a camera's RTSP stream using the configuration from the specified `.mpta` file.
|
||||
|
||||
- **Type:** `subscribe`
|
||||
|
||||
**Payload:**
|
||||
|
||||
```json
|
||||
{
|
||||
"type": "subscribe",
|
||||
"payload": {
|
||||
"rtspUrl": "rtsp://user:pass@host:port/stream",
|
||||
"cameraIdentifier": "cam-002",
|
||||
"snapshotUrl": "http://go2rtc/snapshot/1",
|
||||
"snapshotInterval": 5000,
|
||||
"modelUrl": "http://storage/models/us-lpr.mpta",
|
||||
"modelName": "US-LPR-and-Vehicle-ID",
|
||||
"modelId": 102
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### 5.2. Unsubscribe from Camera
|
||||
|
||||
Instructs the worker to stop processing a camera's stream.
|
||||
|
||||
- **Type:** `unsubscribe`
|
||||
|
||||
**Payload:**
|
||||
|
||||
```json
|
||||
{
|
||||
"type": "unsubscribe",
|
||||
"payload": {
|
||||
"cameraIdentifier": "cam-002"
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### 5.3. Request State
|
||||
|
||||
Direct request for the worker's current state. Respond with a `stateReport` message.
|
||||
|
||||
- **Type:** `requestState`
|
||||
|
||||
**Payload:**
|
||||
|
||||
```json
|
||||
{
|
||||
"type": "requestState"
|
||||
}
|
||||
```
|
||||
|
||||
### 5.4. Patch Session Result
|
||||
|
||||
Backend's response to a `patchSession` message.
|
||||
|
||||
- **Type:** `patchSessionResult`
|
||||
|
||||
**Payload:**
|
||||
|
||||
```json
|
||||
{
|
||||
"type": "patchSessionResult",
|
||||
"payload": {
|
||||
"sessionId": 12345,
|
||||
"success": true,
|
||||
"message": "Session updated successfully."
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
## 6. Example Communication Log
|
||||
|
||||
This section shows a typical sequence of messages between the backend and the worker. Patch messages are not included, as they are only used when the worker cannot keep up.
|
||||
|
||||
> **Note:** Unsubscribe is triggered when a user removes a camera or when the node is too heavily loaded and needs rebalancing.
|
||||
|
||||
1. **Connection Established** & **Heartbeat**
|
||||
* **Worker -> Backend**
|
||||
```json
|
||||
{
|
||||
"type": "stateReport",
|
||||
"cpuUsage": 70.2,
|
||||
"memoryUsage": 38.1,
|
||||
"gpuUsage": 55.0,
|
||||
"gpuMemoryUsage": 20.0,
|
||||
"cameraConnections": []
|
||||
}
|
||||
```
|
||||
2. **Backend Subscribes Camera**
|
||||
* **Backend -> Worker**
|
||||
```json
|
||||
{
|
||||
"type": "subscribe",
|
||||
"payload": {
|
||||
"rtspUrl": "rtsp://192.168.1.100/stream1",
|
||||
"cameraIdentifier": "entry-cam-01",
|
||||
"modelUrl": "http://storage/models/vehicle-id.mpta",
|
||||
"modelName": "Vehicle Identification",
|
||||
"modelId": 201
|
||||
}
|
||||
}
|
||||
```
|
||||
3. **Worker Acknowledges in Heartbeat**
|
||||
* **Worker -> Backend**
|
||||
```json
|
||||
{
|
||||
"type": "stateReport",
|
||||
"cpuUsage": 72.5,
|
||||
"memoryUsage": 39.0,
|
||||
"gpuUsage": 57.0,
|
||||
"gpuMemoryUsage": 21.0,
|
||||
"cameraConnections": [
|
||||
{
|
||||
"cameraIdentifier": "entry-cam-01",
|
||||
"modelId": 201,
|
||||
"modelName": "Vehicle Identification",
|
||||
"online": true
|
||||
}
|
||||
]
|
||||
}
|
||||
```
|
||||
4. **Worker Detects a Car**
|
||||
* **Worker -> Backend**
|
||||
```json
|
||||
{
|
||||
"type": "imageDetection",
|
||||
"cameraIdentifier": "entry-cam-01",
|
||||
"timestamp": "2025-07-15T10:00:00.000Z",
|
||||
"data": {
|
||||
"detection": {
|
||||
"carBrand": "Honda",
|
||||
"carModel": "CR-V",
|
||||
"bodyType": "SUV",
|
||||
"licensePlateText": "GEMINI-AI",
|
||||
"licensePlateConfidence": 0.98
|
||||
},
|
||||
"modelId": 201,
|
||||
"modelName": "Vehicle Identification"
|
||||
}
|
||||
}
|
||||
```
|
||||
* **Worker -> Backend**
|
||||
```json
|
||||
{
|
||||
"type": "imageDetection",
|
||||
"cameraIdentifier": "entry-cam-01",
|
||||
"timestamp": "2025-07-15T10:00:01.000Z",
|
||||
"data": {
|
||||
"detection": {
|
||||
"carBrand": "Toyota",
|
||||
"carModel": "Corolla",
|
||||
"bodyType": "Sedan",
|
||||
"licensePlateText": "CMS-1234",
|
||||
"licensePlateConfidence": 0.97
|
||||
},
|
||||
"modelId": 201,
|
||||
"modelName": "Vehicle Identification"
|
||||
}
|
||||
}
|
||||
```
|
||||
* **Worker -> Backend**
|
||||
```json
|
||||
{
|
||||
"type": "imageDetection",
|
||||
"cameraIdentifier": "entry-cam-01",
|
||||
"timestamp": "2025-07-15T10:00:02.000Z",
|
||||
"data": {
|
||||
"detection": {
|
||||
"carBrand": "Ford",
|
||||
"carModel": "Focus",
|
||||
"bodyType": "Hatchback",
|
||||
"licensePlateText": "CMS-5678",
|
||||
"licensePlateConfidence": 0.96
|
||||
},
|
||||
"modelId": 201,
|
||||
"modelName": "Vehicle Identification"
|
||||
}
|
||||
}
|
||||
```
|
||||
5. **Backend Unsubscribes Camera**
|
||||
* **Backend -> Worker**
|
||||
```json
|
||||
{
|
||||
"type": "unsubscribe",
|
||||
"payload": {
|
||||
"cameraIdentifier": "entry-cam-01"
|
||||
}
|
||||
}
|
||||
```
|
||||
6. **Worker Acknowledges Unsubscription**
|
||||
* **Worker -> Backend**
|
||||
```json
|
||||
{
|
||||
"type": "stateReport",
|
||||
"cpuUsage": 68.0,
|
||||
"memoryUsage": 37.0,
|
||||
"gpuUsage": 50.0,
|
||||
"gpuMemoryUsage": 18.0,
|
||||
"cameraConnections": []
|
||||
}
|
||||
```
|
Loading…
Add table
Add a link
Reference in a new issue