# Worker Communication Protocol This document outlines the WebSocket-based communication protocol between the CMS backend and a detector worker. As a worker developer, your primary responsibility is to implement a WebSocket server that adheres to this protocol. The current Python Detector Worker implementation supports advanced computer vision pipelines with: - Multi-class YOLO detection with parallel processing - PostgreSQL database integration with automatic schema management - Redis integration for image storage and pub/sub messaging - Hierarchical pipeline execution with detection → classification branching ## 1. Connection The worker must run a WebSocket server, preferably on port `8000`. The backend system, which is managed by a container orchestration service, will automatically discover and establish a WebSocket connection to your worker. Upon a successful connection from the backend, you should begin sending `stateReport` messages as heartbeats. ## 2. Communication Overview Communication is bidirectional and asynchronous. All messages are JSON objects with a `type` field that indicates the message's purpose, and an optional `payload` field containing the data. - **Worker -> Backend:** You will send messages to the backend to report status, forward detection events, or request changes to session data. - **Backend -> Worker:** The backend will send commands to you to manage camera subscriptions. ## 3. Dynamic Configuration via MPTA File To enable modularity and dynamic configuration, the backend will send you a URL to a `.mpta` file when it issues a `subscribe` command. This file is a renamed `.zip` archive that contains everything your worker needs to perform its task. **Your worker is responsible for:** 1. Fetching this file from the provided URL. 2. Extracting its contents. 3. Interpreting the contents to configure its internal pipeline. **The current implementation supports comprehensive pipeline configurations including:** - **AI/ML Models**: YOLO models (.pt files) for detection and classification - **Pipeline Configuration**: `pipeline.json` defining hierarchical detection→classification workflows - **Multi-class Detection**: Simultaneous detection of multiple object classes (e.g., Car + Frontal) - **Parallel Processing**: Concurrent execution of classification branches with ThreadPoolExecutor - **Database Integration**: PostgreSQL configuration for automatic table creation and updates - **Redis Actions**: Image storage with region cropping and pub/sub messaging - **Dynamic Field Mapping**: Template-based field resolution for database operations **Enhanced MPTA Structure:** ``` pipeline.mpta/ ├── pipeline.json # Main configuration with redis/postgresql settings ├── car_detection.pt # Primary YOLO detection model ├── brand_classifier.pt # Classification model for car brands ├── bodytype_classifier.pt # Classification model for body types └── ... ``` The `pipeline.json` now supports advanced features like: - Multi-class detection with `expectedClasses` validation - Parallel branch processing with `parallel: true` - Database actions with `postgresql_update_combined` - Redis actions with region-specific image cropping - Branch synchronization with `waitForBranches` Essentially, the `.mpta` file is a self-contained package that tells your worker *how* to process the video stream for a given subscription, including complex multi-stage AI pipelines with database persistence. ## 4. Messages from Worker to Backend These are the messages your worker is expected to send to the backend. ### 4.1. State Report (Heartbeat) This message is crucial for the backend to monitor your worker's health and status, including GPU usage. - **Type:** `stateReport` - **When to Send:** Periodically (e.g., every 2 seconds) after a connection is established. **Payload:** ```json { "type": "stateReport", "cpuUsage": 75.5, "memoryUsage": 40.2, "gpuUsage": 60.0, "gpuMemoryUsage": 25.1, "cameraConnections": [ { "subscriptionIdentifier": "display-001;cam-001", "modelId": 101, "modelName": "General Object Detection", "online": true, "cropX1": 100, "cropY1": 200, "cropX2": 300, "cropY2": 400 } ] } ``` > **Note:** > > - `cropX1`, `cropY1`, `cropX2`, `cropY2` (optional, integer) should be included in each camera connection to indicate the crop coordinates for that subscription. ### 4.2. Image Detection Sent when the worker detects a relevant object. The `detection` object should be flat and contain key-value pairs corresponding to the detected attributes. - **Type:** `imageDetection` **Enhanced Detection Capabilities:** The current implementation supports multi-class detection with parallel classification processing. When a vehicle is detected, the system: 1. **Multi-Class Detection**: Simultaneously detects "Car" and "Frontal" classes 2. **Parallel Processing**: Runs brand and body type classification concurrently 3. **Database Integration**: Automatically creates and updates PostgreSQL records 4. **Redis Storage**: Saves cropped frontal images with expiration **Payload Example:** ```json { "type": "imageDetection", "subscriptionIdentifier": "display-001;cam-001", "timestamp": "2025-07-14T12:34:56.789Z", "data": { "detection": { "class": "Car", "confidence": 0.92, "carBrand": "Honda", "carModel": "Civic", "bodyType": "Sedan", "branch_results": { "car_brand_cls_v1": { "class": "Honda", "confidence": 0.89, "brand": "Honda" }, "car_bodytype_cls_v1": { "class": "Sedan", "confidence": 0.85, "body_type": "Sedan" } } }, "modelId": 101, "modelName": "Car Frontal Detection V1" } } ``` **Database Integration:** Each detection automatically: - Creates a record in `gas_station_1.car_frontal_info` table - Generates a unique `session_id` for tracking - Updates the record with classification results after parallel processing completes - Stores cropped frontal images in Redis with the session_id as key ### 4.3. Patch Session > **Note:** Patch messages are only used when the worker can't keep up and needs to retroactively send detections. Normally, detections should be sent in real-time using `imageDetection` messages. Use `patchSession` only to update session data after the fact. Allows the worker to request a modification to an active session's data. The `data` payload must be a partial object of the `DisplayPersistentData` structure. - **Type:** `patchSession` **Payload Example:** ```json { "type": "patchSession", "sessionId": 12345, "data": { "currentCar": { "carModel": "Civic", "carBrand": "Honda", "licensePlateText": "ABCD1234" } } } ``` The backend will respond with a `patchSessionResult` command. #### `DisplayPersistentData` Structure The `data` object in the `patchSession` message is merged with the existing `DisplayPersistentData` on the backend. Here is its structure: ```typescript interface DisplayPersistentData { progressionStage: "welcome" | "car_fueling" | "car_waitpayment" | "car_postpayment" | null; qrCode: string | null; adsPlayback: { playlistSlotOrder: number; // The 'order' of the current slot adsId: number | null; adsUrl: string | null; } | null; currentCar: { carModel?: string; carBrand?: string; carYear?: number; bodyType?: string; licensePlateText?: string; licensePlateType?: string; } | null; fuelPump: { /* FuelPumpData structure */ } | null; weatherData: { /* WeatherResponse structure */ } | null; sessionId: number | null; } ``` #### Patching Behavior - The patch is a **deep merge**. - **`undefined`** values are ignored. - **`null`** values will set the corresponding field to `null`. - Nested objects are merged recursively. ## 5. Commands from Backend to Worker These are the commands your worker will receive from the backend. ### 5.1. Subscribe to Camera Instructs the worker to process a camera's RTSP stream using the configuration from the specified `.mpta` file. - **Type:** `subscribe` **Payload:** ```json { "type": "subscribe", "payload": { "subscriptionIdentifier": "display-001;cam-002", "rtspUrl": "rtsp://user:pass@host:port/stream", "snapshotUrl": "http://go2rtc/snapshot/1", "snapshotInterval": 5000, "modelUrl": "http://storage/models/us-lpr.mpta", "modelName": "US-LPR-and-Vehicle-ID", "modelId": 102, "cropX1": 100, "cropY1": 200, "cropX2": 300, "cropY2": 400 } } ``` > **Note:** > > - `cropX1`, `cropY1`, `cropX2`, `cropY2` (optional, integer) specify the crop coordinates for the camera stream. These values are configured per display and passed in the subscription payload. If not provided, the worker should process the full frame. > > **Important:** > If multiple displays are bound to the same camera, your worker must ensure that only **one stream** is opened per camera. When you receive multiple subscriptions for the same camera (with different `subscriptionIdentifier` values), you should: > > - Open the RTSP stream **once** for that camera if using RTSP. > - Capture each snapshot only once per cycle, and reuse it for all display subscriptions sharing that camera. > - Capture each frame/image only once per cycle. > - Reuse the same captured image and snapshot for all display subscriptions that share the camera, processing and routing detection results separately for each display as needed. > This avoids unnecessary load and bandwidth usage, and ensures consistent detection results and snapshots across all displays sharing the same camera. ### 5.2. Unsubscribe from Camera Instructs the worker to stop processing a camera's stream. - **Type:** `unsubscribe` **Payload:** ```json { "type": "unsubscribe", "payload": { "subscriptionIdentifier": "display-001;cam-002" } } ``` ### 5.3. Request State Direct request for the worker's current state. Respond with a `stateReport` message. - **Type:** `requestState` **Payload:** ```json { "type": "requestState" } ``` ### 5.4. Patch Session Result Backend's response to a `patchSession` message. - **Type:** `patchSessionResult` **Payload:** ```json { "type": "patchSessionResult", "payload": { "sessionId": 12345, "success": true, "message": "Session updated successfully." } } ``` ### 5.5. Set Session ID Allows the backend to instruct the worker to associate a session ID with a subscription. This is useful for linking detection events to a specific session. The session ID can be `null` to indicate no active session. - **Type:** `setSessionId` **Payload:** ```json { "type": "setSessionId", "payload": { "displayIdentifier": "display-001", "sessionId": 12345 } } ``` Or to clear the session: ```json { "type": "setSessionId", "payload": { "displayIdentifier": "display-001", "sessionId": null } } ``` > **Note:** > > - The worker should store the session ID for the given subscription and use it in subsequent detection or patch messages as appropriate. If `sessionId` is `null`, the worker should treat the subscription as having no active session. ## Subscription Identifier Format The `subscriptionIdentifier` used in all messages is constructed as: ``` displayIdentifier;cameraIdentifier ``` This uniquely identifies a camera subscription for a specific display. ### Session ID Association When the backend sends a `setSessionId` command, it will only provide the `displayIdentifier` (not the full `subscriptionIdentifier`). **Worker Responsibility:** - The worker must match the `displayIdentifier` to all active subscriptions for that display (i.e., all `subscriptionIdentifier` values that start with `displayIdentifier;`). - The worker should set or clear the session ID for all matching subscriptions. ## 6. Example Communication Log This section shows a typical sequence of messages between the backend and the worker. Patch messages are not included, as they are only used when the worker cannot keep up. > **Note:** Unsubscribe is triggered when a user removes a camera or when the node is too heavily loaded and needs rebalancing. 1. **Connection Established** & **Heartbeat** * **Worker -> Backend** ```json { "type": "stateReport", "cpuUsage": 70.2, "memoryUsage": 38.1, "gpuUsage": 55.0, "gpuMemoryUsage": 20.0, "cameraConnections": [] } ``` 2. **Backend Subscribes Camera** * **Backend -> Worker** ```json { "type": "subscribe", "payload": { "subscriptionIdentifier": "display-001;entry-cam-01", "rtspUrl": "rtsp://192.168.1.100/stream1", "modelUrl": "http://storage/models/vehicle-id.mpta", "modelName": "Vehicle Identification", "modelId": 201 } } ``` 3. **Worker Acknowledges in Heartbeat** * **Worker -> Backend** ```json { "type": "stateReport", "cpuUsage": 72.5, "memoryUsage": 39.0, "gpuUsage": 57.0, "gpuMemoryUsage": 21.0, "cameraConnections": [ { "subscriptionIdentifier": "display-001;entry-cam-01", "modelId": 201, "modelName": "Vehicle Identification", "online": true } ] } ``` 4. **Worker Detects a Car** * **Worker -> Backend** ```json { "type": "imageDetection", "subscriptionIdentifier": "display-001;entry-cam-01", "timestamp": "2025-07-15T10:00:00.000Z", "data": { "detection": { "carBrand": "Honda", "carModel": "CR-V", "bodyType": "SUV", "licensePlateText": "GEMINI-AI", "licensePlateConfidence": 0.98 }, "modelId": 201, "modelName": "Vehicle Identification" } } ``` * **Worker -> Backend** ```json { "type": "imageDetection", "subscriptionIdentifier": "display-001;entry-cam-01", "timestamp": "2025-07-15T10:00:01.000Z", "data": { "detection": { "carBrand": "Toyota", "carModel": "Corolla", "bodyType": "Sedan", "licensePlateText": "CMS-1234", "licensePlateConfidence": 0.97 }, "modelId": 201, "modelName": "Vehicle Identification" } } ``` * **Worker -> Backend** ```json { "type": "imageDetection", "subscriptionIdentifier": "display-001;entry-cam-01", "timestamp": "2025-07-15T10:00:02.000Z", "data": { "detection": { "carBrand": "Ford", "carModel": "Focus", "bodyType": "Hatchback", "licensePlateText": "CMS-5678", "licensePlateConfidence": 0.96 }, "modelId": 201, "modelName": "Vehicle Identification" } } ``` 5. **Backend Unsubscribes Camera** * **Backend -> Worker** ```json { "type": "unsubscribe", "payload": { "subscriptionIdentifier": "display-001;entry-cam-01" } } ``` 6. **Worker Acknowledges Unsubscription** * **Worker -> Backend** ```json { "type": "stateReport", "cpuUsage": 68.0, "memoryUsage": 37.0, "gpuUsage": 50.0, "gpuMemoryUsage": 18.0, "cameraConnections": [] } ``` ## 7. HTTP API: Image Retrieval In addition to the WebSocket protocol, the worker exposes an HTTP endpoint for retrieving the latest image frame from a camera. ### Endpoint ``` GET /camera/{camera_id}/image ``` - **`camera_id`**: The full `subscriptionIdentifier` (e.g., `display-001;cam-001`). ### Response - **Success (200):** Returns the latest JPEG image from the camera stream. - `Content-Type: image/jpeg` - Binary JPEG data. - **Error (404):** If the camera is not found or no frame is available. - JSON error response. - **Error (500):** Internal server error. ### Example Request ``` GET /camera/display-001;cam-001/image ``` ### Example Response - **Headers:** ``` Content-Type: image/jpeg ``` - **Body:** Binary JPEG image. ### Notes - The endpoint returns the most recent frame available for the specified camera subscription. - If multiple displays share the same camera, each subscription has its own buffer; the endpoint uses the buffer for the given `camera_id`. - This API is useful for debugging, monitoring, or integrating with external systems that require direct image access.