From 5cf1bf08ccc016a988e4059bf1d97749b3912928 Mon Sep 17 00:00:00 2001 From: Siwat Sirichai Date: Sun, 13 Jul 2025 19:39:17 +0700 Subject: [PATCH] add WebSocket communication protocol documentation for detector worker; outline connection, message types, and dynamic configuration --- worker.md | 362 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 362 insertions(+) create mode 100644 worker.md diff --git a/worker.md b/worker.md new file mode 100644 index 0000000..6101f2b --- /dev/null +++ b/worker.md @@ -0,0 +1,362 @@ +# Worker Communication Protocol + +This document outlines the WebSocket-based communication protocol between the CMS backend and a detector worker. As a worker developer, your primary responsibility is to implement a WebSocket server that adheres to this protocol. + +## 1. Connection + +The worker must run a WebSocket server, preferably on port `8000`. The backend system, which is managed by a container orchestration service, will automatically discover and establish a WebSocket connection to your worker. + +Upon a successful connection from the backend, you should begin sending `stateReport` messages as heartbeats. + +## 2. Communication Overview + +Communication is bidirectional and asynchronous. All messages are JSON objects with a `type` field that indicates the message's purpose, and an optional `payload` field containing the data. + +- **Worker -> Backend:** You will send messages to the backend to report status, forward detection events, or request changes to session data. +- **Backend -> Worker:** The backend will send commands to you to manage camera subscriptions. + +## 3. Dynamic Configuration via MPTA File + +To enable modularity and dynamic configuration, the backend will send you a URL to a `.mpta` file when it issues a `subscribe` command. This file is a renamed `.zip` archive that contains everything your worker needs to perform its task. + +**Your worker is responsible for:** +1. Fetching this file from the provided URL. +2. Extracting its contents. +3. Interpreting the contents to configure its internal pipeline. + +**The contents of the `.mpta` file are entirely up to the user who configures the model in the CMS.** This allows for maximum flexibility. For example, the archive could contain: + +* **AI/ML Models:** Pre-trained models for libraries like TensorFlow, PyTorch, or ONNX. +* **Configuration Files:** A `config.json` or `pipeline.yaml` that defines a sequence of operations, specifies model paths, or sets detection thresholds. +* **Scripts:** Custom Python scripts for pre-processing or post-processing. +* **API Integration Details:** A JSON file with endpoint information and credentials for interacting with third-party detection services. + +Essentially, the `.mpta` file is a self-contained package that tells your worker *how* to process the video stream for a given subscription. + +## 4. Messages from Worker to Backend + +These are the messages your worker is expected to send to the backend. + +### 4.1. State Report (Heartbeat) + +This message is crucial for the backend to monitor your worker's health and status, including GPU usage. + +- **Type:** `stateReport` +- **When to Send:** Periodically (e.g., every 2 seconds) after a connection is established. + +**Payload:** + +```json +{ + "type": "stateReport", + "cpuUsage": 75.5, + "memoryUsage": 40.2, + "gpuUsage": 60.0, + "gpuMemoryUsage": 25.1, + "cameraConnections": [ + { + "cameraIdentifier": "cam-001", + "modelId": 101, + "modelName": "General Object Detection", + "online": true + } + ] +} +``` + +### 4.2. Image Detection + +Sent when the worker detects a relevant object. The `detection` object should be flat and contain key-value pairs corresponding to the detected attributes. + +- **Type:** `imageDetection` + +**Payload Example:** + +```json +{ + "type": "imageDetection", + "cameraIdentifier": "cam-001", + "timestamp": "2025-07-14T12:34:56.789Z", + "data": { + "detection": { + "carModel": "Civic", + "carBrand": "Honda", + "carYear": 2023, + "bodyType": "Sedan", + "licensePlateText": "ABCD1234", + "licensePlateConfidence": 0.95 + }, + "modelId": 101, + "modelName": "US-LPR-and-Vehicle-ID" + } +} +``` + +### 4.3. Patch Session + +> **Note:** Patch messages are only used when the worker can't keep up and needs to retroactively send detections. Normally, detections should be sent in real-time using `imageDetection` messages. Use `patchSession` only to update session data after the fact. + +Allows the worker to request a modification to an active session's data. The `data` payload must be a partial object of the `DisplayPersistentData` structure. + +- **Type:** `patchSession` + +**Payload Example:** + +```json +{ + "type": "patchSession", + "sessionId": 12345, + "data": { + "currentCar": { + "carModel": "Civic", + "carBrand": "Honda", + "licensePlateText": "ABCD1234" + } + } +} +``` + +The backend will respond with a `patchSessionResult` command. + +#### `DisplayPersistentData` Structure + +The `data` object in the `patchSession` message is merged with the existing `DisplayPersistentData` on the backend. Here is its structure: + +```typescript +interface DisplayPersistentData { + progressionStage: "welcome" | "car_fueling" | "car_waitpayment" | "car_postpayment" | null; + qrCode: string | null; + adsPlayback: { + playlistSlotOrder: number; // The 'order' of the current slot + adsId: number | null; + adsUrl: string | null; + } | null; + currentCar: { + carModel?: string; + carBrand?: string; + carYear?: number; + bodyType?: string; + licensePlateText?: string; + licensePlateType?: string; + } | null; + fuelPump: { /* FuelPumpData structure */ } | null; + weatherData: { /* WeatherResponse structure */ } | null; + sessionId: number | null; +} +``` + +#### Patching Behavior + +- The patch is a **deep merge**. +- **`undefined`** values are ignored. +- **`null`** values will set the corresponding field to `null`. +- Nested objects are merged recursively. + +## 5. Commands from Backend to Worker + +These are the commands your worker will receive from the backend. + +### 5.1. Subscribe to Camera + +Instructs the worker to process a camera's RTSP stream using the configuration from the specified `.mpta` file. + +- **Type:** `subscribe` + +**Payload:** + +```json +{ + "type": "subscribe", + "payload": { + "rtspUrl": "rtsp://user:pass@host:port/stream", + "cameraIdentifier": "cam-002", + "snapshotUrl": "http://go2rtc/snapshot/1", + "snapshotInterval": 5000, + "modelUrl": "http://storage/models/us-lpr.mpta", + "modelName": "US-LPR-and-Vehicle-ID", + "modelId": 102 + } +} +``` + +### 5.2. Unsubscribe from Camera + +Instructs the worker to stop processing a camera's stream. + +- **Type:** `unsubscribe` + +**Payload:** + +```json +{ + "type": "unsubscribe", + "payload": { + "cameraIdentifier": "cam-002" + } +} +``` + +### 5.3. Request State + +Direct request for the worker's current state. Respond with a `stateReport` message. + +- **Type:** `requestState` + +**Payload:** + +```json +{ + "type": "requestState" +} +``` + +### 5.4. Patch Session Result + +Backend's response to a `patchSession` message. + +- **Type:** `patchSessionResult` + +**Payload:** + +```json +{ + "type": "patchSessionResult", + "payload": { + "sessionId": 12345, + "success": true, + "message": "Session updated successfully." + } +} +``` + +## 6. Example Communication Log + +This section shows a typical sequence of messages between the backend and the worker. Patch messages are not included, as they are only used when the worker cannot keep up. + +> **Note:** Unsubscribe is triggered when a user removes a camera or when the node is too heavily loaded and needs rebalancing. + +1. **Connection Established** & **Heartbeat** + * **Worker -> Backend** + ```json + { + "type": "stateReport", + "cpuUsage": 70.2, + "memoryUsage": 38.1, + "gpuUsage": 55.0, + "gpuMemoryUsage": 20.0, + "cameraConnections": [] + } + ``` +2. **Backend Subscribes Camera** + * **Backend -> Worker** + ```json + { + "type": "subscribe", + "payload": { + "rtspUrl": "rtsp://192.168.1.100/stream1", + "cameraIdentifier": "entry-cam-01", + "modelUrl": "http://storage/models/vehicle-id.mpta", + "modelName": "Vehicle Identification", + "modelId": 201 + } + } + ``` +3. **Worker Acknowledges in Heartbeat** + * **Worker -> Backend** + ```json + { + "type": "stateReport", + "cpuUsage": 72.5, + "memoryUsage": 39.0, + "gpuUsage": 57.0, + "gpuMemoryUsage": 21.0, + "cameraConnections": [ + { + "cameraIdentifier": "entry-cam-01", + "modelId": 201, + "modelName": "Vehicle Identification", + "online": true + } + ] + } + ``` +4. **Worker Detects a Car** + * **Worker -> Backend** + ```json + { + "type": "imageDetection", + "cameraIdentifier": "entry-cam-01", + "timestamp": "2025-07-15T10:00:00.000Z", + "data": { + "detection": { + "carBrand": "Honda", + "carModel": "CR-V", + "bodyType": "SUV", + "licensePlateText": "GEMINI-AI", + "licensePlateConfidence": 0.98 + }, + "modelId": 201, + "modelName": "Vehicle Identification" + } + } + ``` + * **Worker -> Backend** + ```json + { + "type": "imageDetection", + "cameraIdentifier": "entry-cam-01", + "timestamp": "2025-07-15T10:00:01.000Z", + "data": { + "detection": { + "carBrand": "Toyota", + "carModel": "Corolla", + "bodyType": "Sedan", + "licensePlateText": "CMS-1234", + "licensePlateConfidence": 0.97 + }, + "modelId": 201, + "modelName": "Vehicle Identification" + } + } + ``` + * **Worker -> Backend** + ```json + { + "type": "imageDetection", + "cameraIdentifier": "entry-cam-01", + "timestamp": "2025-07-15T10:00:02.000Z", + "data": { + "detection": { + "carBrand": "Ford", + "carModel": "Focus", + "bodyType": "Hatchback", + "licensePlateText": "CMS-5678", + "licensePlateConfidence": 0.96 + }, + "modelId": 201, + "modelName": "Vehicle Identification" + } + } + ``` +5. **Backend Unsubscribes Camera** + * **Backend -> Worker** + ```json + { + "type": "unsubscribe", + "payload": { + "cameraIdentifier": "entry-cam-01" + } + } + ``` +6. **Worker Acknowledges Unsubscription** + * **Worker -> Backend** + ```json + { + "type": "stateReport", + "cpuUsage": 68.0, + "memoryUsage": 37.0, + "gpuUsage": 50.0, + "gpuMemoryUsage": 18.0, + "cameraConnections": [] + } + ```