27 KiB
Worker Communication Protocol
This document outlines the WebSocket-based communication protocol between the CMS backend and a detector worker. As a worker developer, your primary responsibility is to implement a WebSocket server that adheres to this protocol.
1. Connection
The worker must run a WebSocket server, preferably on port 8000
. The backend system, which is managed by a container orchestration service, will automatically discover and establish a WebSocket connection to your worker.
Upon a successful connection from the backend, you should begin sending stateReport
messages as heartbeats.
2. Communication Overview
Communication is bidirectional and asynchronous. All messages are JSON objects with a type
field that indicates the message's purpose, and an optional payload
field containing the data.
- Worker -> Backend: You will send messages to the backend to report status, forward detection events, or request changes to session data.
- Backend -> Worker: The backend will send commands to you to manage camera subscriptions.
2.1. Multi-Process Cluster Architecture
The backend uses a sophisticated multi-process cluster architecture with Redis-based coordination to manage worker connections at scale:
Redis Communication Channels:
worker:commands
- Commands sent TO workers (subscribe, unsubscribe, setSessionId, setProgressionStage)worker:responses
- Detection responses and state reports FROM workersworker:events
- Worker lifecycle events (connection, disconnection, health status)
Distributed State Management:
worker:states
- Redis hash map storing real-time worker performance metrics and connection statusworker:assignments
- Redis hash map tracking camera-to-worker assignments across the clusterworker:owners
- Redis key-based worker ownership leases with 30-second TTL for automatic failover
Load Balancing & Failover:
- Assignment Algorithm: Workers are assigned based on subscription count and CPU usage
- Distributed Locking: Assignment operations use Redis locks to prevent race conditions
- Automatic Failover: Orphaned workers are detected via lease expiration and automatically reclaimed
- Horizontal Scaling: New backend processes automatically join the cluster and participate in load balancing
Inter-Process Coordination:
- Each backend process maintains local WebSocket connections with workers
- Commands are routed via Redis pub/sub to the process that owns the target worker connection
- Master election ensures coordinated cluster management and prevents split-brain scenarios
- Process identification uses UUIDs for clean process tracking and ownership management
3. Message Types and Command Structure
All worker communication follows a standardized message structure with the following command types:
Commands from Backend to Worker:
setSubscriptionList
- Set complete list of camera subscriptions for declarative state managementsetSessionId
- Associate a session ID with a display for detection linkingsetProgressionStage
- Update the progression stage for context-aware processingrequestState
- Request immediate state report from workerpatchSessionResult
- Response to worker's patch session request
Messages from Worker to Backend:
stateReport
- Periodic heartbeat with performance metrics and subscription statusimageDetection
- Real-time detection results with timestamp and datapatchSession
- Request to modify display persistent session data
Command Structure:
interface WorkerCommand {
type: string;
subscriptions?: SubscriptionObject[]; // For setSubscriptionList
payload?: {
displayIdentifier?: string;
sessionId?: number | null;
progressionStage?: string | null;
// Additional payload fields based on command type
};
}
interface SubscriptionObject {
subscriptionIdentifier: string; // Format: "displayId;cameraId"
rtspUrl: string;
snapshotUrl?: string;
snapshotInterval?: number; // milliseconds
modelUrl: string; // Fresh pre-signed URL (1-hour TTL)
modelId: number;
modelName: string;
cropX1?: number;
cropY1?: number;
cropX2?: number;
cropY2?: number;
}
4. Dynamic Configuration via MPTA File
To enable modularity and dynamic configuration, the backend will send you a URL to a .mpta
file in each subscription within the setSubscriptionList
command. This file is a renamed .zip
archive that contains everything your worker needs to perform its task.
Your worker is responsible for:
- Fetching this file from the provided URL.
- Extracting its contents.
- Interpreting the contents to configure its internal pipeline.
The contents of the .mpta
file are entirely up to the user who configures the model in the CMS. This allows for maximum flexibility. For example, the archive could contain:
- AI/ML Models: Pre-trained models for libraries like TensorFlow, PyTorch, or ONNX.
- Configuration Files: A
config.json
orpipeline.yaml
that defines a sequence of operations, specifies model paths, or sets detection thresholds. - Scripts: Custom Python scripts for pre-processing or post-processing.
- API Integration Details: A JSON file with endpoint information and credentials for interacting with third-party detection services.
Essentially, the .mpta
file is a self-contained package that tells your worker how to process the video stream for a given subscription.
5. Worker State Recovery and Reconnection
The system provides comprehensive state recovery mechanisms to ensure seamless operation across worker disconnections and backend restarts.
5.1. Automatic Resubscription
Connection Recovery Flow:
- Connection Detection: Backend detects worker reconnection via WebSocket events
- State Restoration: All subscription states are restored from backend memory and Redis
- Fresh Model URLs: New model URLs are generated to handle S3 URL expiration
- Session Recovery: Session IDs and progression stages are automatically restored
- Heartbeat Resumption: Worker immediately begins sending state reports
5.2. State Persistence Architecture
Backend State Storage:
- Local State: Each backend process maintains
DetectorWorkerState
with active subscriptions - Redis Coordination: Assignment mappings stored in
worker:assignments
Redis hash - Session Tracking: Display session IDs tracked in display persistent data
- Progression Stages: Current stages maintained in display controllers
Recovery Guarantees:
- Zero Configuration Loss: All subscription parameters are preserved across disconnections
- Session Continuity: Active sessions remain linked after worker reconnection
- Stage Synchronization: Progression stages are immediately synchronized on reconnection
- Model Availability: Fresh model URLs ensure continuous access to detection models
5.3. Heartbeat and Health Monitoring
Health Check Protocol:
- Heartbeat Interval: Workers send
stateReport
every 2 seconds - Timeout Detection: Backend marks workers offline after 10-second timeout
- Automatic Recovery: Offline workers are automatically rescheduled when they reconnect
- Performance Tracking: CPU, memory, and GPU usage monitored for load balancing
Failure Scenarios:
- Worker Crash: Subscriptions are reassigned to other available workers
- Network Interruption: Automatic reconnection with full state restoration
- Backend Restart: Worker assignments are restored from Redis state
- Redis Failure: Local state provides temporary operation until Redis recovers
5.4. Multi-Process Coordination
Ownership and Leasing:
- Worker Ownership: Each worker is owned by a single backend process via Redis lease
- Lease Renewal: 30-second TTL leases automatically renewed by owning process
- Orphan Detection: Expired leases allow worker reassignment to active processes
- Graceful Handover: Clean ownership transfer during process shutdown
6. Messages from Worker to Backend
These are the messages your worker is expected to send to the backend.
6.1. State Report (Heartbeat)
This message is crucial for the backend to monitor your worker's health and status, including GPU usage.
- Type:
stateReport
- When to Send: Periodically (e.g., every 2 seconds) after a connection is established.
Payload:
{
"type": "stateReport",
"cpuUsage": 75.5,
"memoryUsage": 40.2,
"gpuUsage": 60.0,
"gpuMemoryUsage": 25.1,
"cameraConnections": [
{
"subscriptionIdentifier": "display-001;cam-001",
"modelId": 101,
"modelName": "General Object Detection",
"online": true,
"cropX1": 100,
"cropY1": 200,
"cropX2": 300,
"cropY2": 400
}
]
}
Note:
cropX1
,cropY1
,cropX2
,cropY2
(optional, integer) should be included in each camera connection to indicate the crop coordinates for that subscription.
6.2. Image Detection
Sent when the worker detects a relevant object. The detection
object should be flat and contain key-value pairs corresponding to the detected attributes.
- Type:
imageDetection
Payload Example:
{
"type": "imageDetection",
"subscriptionIdentifier": "display-001;cam-001",
"timestamp": "2025-07-14T12:34:56.789Z",
"data": {
"detection": {
"carModel": "Civic",
"carBrand": "Honda",
"carYear": 2023,
"bodyType": "Sedan",
"licensePlateText": "ABCD1234",
"licensePlateConfidence": 0.95
},
"modelId": 101,
"modelName": "US-LPR-and-Vehicle-ID"
}
}
6.3. Patch Session
Note: Patch messages are only used when the worker can't keep up and needs to retroactively send detections. Normally, detections should be sent in real-time using
imageDetection
messages. UsepatchSession
only to update session data after the fact.
Allows the worker to request a modification to an active session's data. The data
payload must be a partial object of the DisplayPersistentData
structure.
- Type:
patchSession
Payload Example:
{
"type": "patchSession",
"sessionId": 12345,
"data": {
"currentCar": {
"carModel": "Civic",
"carBrand": "Honda",
"licensePlateText": "ABCD1234"
}
}
}
The backend will respond with a patchSessionResult
command.
DisplayPersistentData
Structure
The data
object in the patchSession
message is merged with the existing DisplayPersistentData
on the backend. Here is its structure:
interface DisplayPersistentData {
progressionStage:
| 'welcome'
| 'car_fueling'
| 'car_waitpayment'
| 'car_postpayment'
| null;
qrCode: string | null;
adsPlayback: {
playlistSlotOrder: number; // The 'order' of the current slot
adsId: number | null;
adsUrl: string | null;
} | null;
currentCar: {
carModel?: string;
carBrand?: string;
carYear?: number;
bodyType?: string;
licensePlateText?: string;
licensePlateType?: string;
} | null;
fuelPump: {
/* FuelPumpData structure */
} | null;
weatherData: {
/* WeatherResponse structure */
} | null;
sessionId: number | null;
}
Patching Behavior
- The patch is a deep merge.
undefined
values are ignored.null
values will set the corresponding field tonull
.- Nested objects are merged recursively.
7. Commands from Backend to Worker
These are the commands your worker will receive from the backend. The subscription system uses a fully declarative approach with setSubscriptionList
- the backend sends the complete desired subscription list, and workers handle reconciliation internally.
7.1. Set Subscription List (Declarative Subscriptions)
The primary subscription command that replaces individual subscribe/unsubscribe operations.
Instructs the worker to process the complete list of camera streams. The worker must reconcile this list with its current subscriptions, adding new ones, removing obsolete ones, and updating existing ones as needed.
- Type:
setSubscriptionList
Payload:
{
"type": "setSubscriptionList",
"subscriptions": [
{
"subscriptionIdentifier": "display-001;cam-001",
"rtspUrl": "rtsp://user:pass@host:port/stream1",
"snapshotUrl": "http://go2rtc/snapshot/1",
"snapshotInterval": 5000,
"modelUrl": "http://storage/models/us-lpr.mpta?token=fresh-token",
"modelName": "US-LPR-and-Vehicle-ID",
"modelId": 102,
"cropX1": 100,
"cropY1": 200,
"cropX2": 300,
"cropY2": 400
},
{
"subscriptionIdentifier": "display-002;cam-001",
"rtspUrl": "rtsp://user:pass@host:port/stream1",
"snapshotUrl": "http://go2rtc/snapshot/1",
"snapshotInterval": 5000,
"modelUrl": "http://storage/models/vehicle-detect.mpta?token=fresh-token",
"modelName": "Vehicle Detection",
"modelId": 201,
"cropX1": 0,
"cropY1": 0,
"cropX2": 1920,
"cropY2": 1080
}
]
}
Declarative Subscription Behavior:
- Complete State Definition: The backend sends the complete desired subscription list for this worker
- Worker-Side Reconciliation: Workers compare the new list with current subscriptions and handle differences
- Fresh Model URLs: Each command includes fresh pre-signed S3 URLs (1-hour TTL) for ML models
- Load Balancing: The backend intelligently distributes subscriptions across available workers
- State Recovery: Complete subscription list is sent on worker reconnection
Worker Reconciliation Responsibility:
When receiving a setSubscriptionList
command, your worker must:
- Compare with Current State: Identify new subscriptions, removed subscriptions, and updated subscriptions
- Add New Subscriptions: Start processing new camera streams with the provided configuration
- Remove Obsolete Subscriptions: Stop processing camera streams not in the new list
- Update Existing Subscriptions: Handle configuration changes (model updates, crop coordinates, etc.)
- Maintain Single Streams: Ensure only one RTSP stream per camera, even with multiple display bindings
- Report Final State: Send updated
stateReport
confirming the actual subscription state
Note:
cropX1
,cropY1
,cropX2
,cropY2
(optional, integer) specify the crop coordinates for the camera streamsnapshotUrl
andsnapshotInterval
(optional) enable periodic snapshot capture- Multiple subscriptions may share the same
rtspUrl
but have differentsubscriptionIdentifier
valuesCamera Stream Optimization: When multiple subscriptions share the same camera (same
rtspUrl
), your worker must:
- Open the RTSP stream once for that camera
- Capture each frame/snapshot once per cycle
- Process the shared stream for each subscription's requirements (crop coordinates, model)
- Route detection results separately for each
subscriptionIdentifier
- Apply display-specific crop coordinates during processing
This optimization reduces bandwidth usage and ensures consistent detection timing across displays.
7.2. Request State
Direct request for the worker's current state. Respond with a stateReport
message.
- Type:
requestState
Payload:
{
"type": "requestState"
}
7.3. Patch Session Result
Backend's response to a patchSession
message.
- Type:
patchSessionResult
Payload:
{
"type": "patchSessionResult",
"payload": {
"sessionId": 12345,
"success": true,
"message": "Session updated successfully."
}
}
7.4. Set Session ID
Real-time session association for linking detection events to user sessions.
Allows the backend to instruct the worker to associate a session ID with a display. This enables linking detection events to specific user sessions. The system automatically propagates session changes across all worker processes via Redis pub/sub.
- Type:
setSessionId
Payload:
{
"type": "setSessionId",
"payload": {
"displayIdentifier": "display-001",
"sessionId": 12345
}
}
Or to clear the session:
{
"type": "setSessionId",
"payload": {
"displayIdentifier": "display-001",
"sessionId": null
}
}
Session Management Flow:
- Session Creation: When a new session is created (user interaction), the backend immediately sends
setSessionId
to all relevant workers - Cross-Process Distribution: The command is distributed across multiple backend processes via Redis
worker:commands
channel - Worker State Synchronization: Workers maintain session IDs for each display and apply them to all matching subscriptions
- Automatic Recovery: Session IDs are restored when workers reconnect, ensuring no session context is lost
- Multi-Subscription Support: A single session ID applies to all camera subscriptions for the given display
Worker Responsibility:
- Store the session ID for the given
displayIdentifier
- Apply the session ID to all active subscriptions that start with
displayIdentifier;
(e.g.,display-001;cam-001
,display-001;cam-002
) - Include the session ID in subsequent
imageDetection
andpatchSession
messages - Handle session clearing when
sessionId
isnull
- Restore session IDs from backend state after reconnection
Multi-Process Coordination:
The session ID command uses the distributed worker communication system:
- Commands are routed via Redis pub/sub to the process managing the target worker
- Automatic failover ensures session updates reach workers even during process changes
- Lease-based worker ownership prevents duplicate session notifications
7.5. Set Progression Stage
Real-time progression stage synchronization for dynamic content adaptation.
Notifies workers about the current progression stage of a display, enabling context-aware content selection and detection behavior. The system automatically tracks stage changes and avoids redundant updates.
- Type:
setProgressionStage
Payload:
{
"type": "setProgressionStage",
"payload": {
"displayIdentifier": "display-001",
"progressionStage": "car_fueling"
}
}
Or to clear the progression stage:
{
"type": "setProgressionStage",
"payload": {
"displayIdentifier": "display-001",
"progressionStage": null
}
}
Available Progression Stages:
"welcome"
- Initial state, awaiting user interaction"car_fueling"
- Vehicle is actively fueling"car_waitpayment"
- Fueling complete, awaiting payment"car_postpayment"
- Payment completed, transaction finishingnull
- No active progression stage
Progression Stage Flow:
- Automatic Detection: Display controllers automatically detect progression stage changes based on display persistent data
- Change Filtering: The system compares current stage with last sent stage to avoid redundant updates
- Instant Propagation: Stage changes are immediately sent to all workers associated with the display
- Cross-Process Distribution: Commands are distributed via Redis
worker:commands
channel to all backend processes - State Recovery: Progression stages are restored when workers reconnect
Worker Responsibility:
- Store the progression stage for the given
displayIdentifier
- Apply the stage to all active subscriptions for that display
- Use progression stage for context-aware detection and content adaptation
- Handle stage clearing when
progressionStage
isnull
- Restore progression stages from backend state after reconnection
Use Cases:
- Fuel Station Displays: Adapt content based on fueling progress (welcome ads vs. payment prompts)
- Dynamic Detection: Adjust detection sensitivity based on interaction stage
- Content Personalization: Select appropriate advertisements for current user journey stage
- Analytics: Track user progression through interaction stages
8. Subscription Identifier Format
The subscriptionIdentifier
used in all messages is constructed as:
displayIdentifier;cameraIdentifier
This uniquely identifies a camera subscription for a specific display.
Session ID Association
When the backend sends a setSessionId
command, it will only provide the displayIdentifier
(not the full subscriptionIdentifier
).
Worker Responsibility:
- The worker must match the
displayIdentifier
to all active subscriptions for that display (i.e., allsubscriptionIdentifier
values that start withdisplayIdentifier;
). - The worker should set or clear the session ID for all matching subscriptions.
9. Example Communication Log
This section shows a typical sequence of messages between the backend and the worker, including the new declarative subscription model, session ID management, and progression stage synchronization.
Note: Unsubscribe is triggered during load rebalancing or when displays/cameras are removed from the system. The system automatically handles worker reconnection with full state recovery.
-
Connection Established & Heartbeat
- Worker -> Backend
{ "type": "stateReport", "cpuUsage": 70.2, "memoryUsage": 38.1, "gpuUsage": 55.0, "gpuMemoryUsage": 20.0, "cameraConnections": [] }
-
Backend Sets Subscription List
- Backend -> Worker
{ "type": "setSubscriptionList", "subscriptions": [ { "subscriptionIdentifier": "display-001;entry-cam-01", "rtspUrl": "rtsp://192.168.1.100/stream1", "modelUrl": "http://storage/models/vehicle-id.mpta?token=fresh-token", "modelName": "Vehicle Identification", "modelId": 201, "snapshotInterval": 5000 } ] }
-
Worker Acknowledges with Reconciled State
- Worker -> Backend
{ "type": "stateReport", "cpuUsage": 72.5, "memoryUsage": 39.0, "gpuUsage": 57.0, "gpuMemoryUsage": 21.0, "cameraConnections": [ { "subscriptionIdentifier": "display-001;entry-cam-01", "modelId": 201, "modelName": "Vehicle Identification", "online": true } ] }
-
Backend Sets Session ID
- Backend -> Worker
{ "type": "setSessionId", "payload": { "displayIdentifier": "display-001", "sessionId": 12345 } }
-
Backend Sets Progression Stage
- Backend -> Worker
{ "type": "setProgressionStage", "payload": { "displayIdentifier": "display-001", "progressionStage": "welcome" } }
-
Worker Detects a Car with Session Context
- Worker -> Backend
{ "type": "imageDetection", "subscriptionIdentifier": "display-001;entry-cam-01", "timestamp": "2025-07-15T10:00:00.000Z", "sessionId": 12345, "data": { "detection": { "carBrand": "Honda", "carModel": "CR-V", "bodyType": "SUV", "licensePlateText": "GEMINI-AI", "licensePlateConfidence": 0.98 }, "modelId": 201, "modelName": "Vehicle Identification" } }
-
Progression Stage Change
- Backend -> Worker
{ "type": "setProgressionStage", "payload": { "displayIdentifier": "display-001", "progressionStage": "car_fueling" } }
-
Worker Reconnection with State Recovery
- Worker Disconnects and Reconnects
- Worker -> Backend (Immediate heartbeat after reconnection)
{ "type": "stateReport", "cpuUsage": 70.0, "memoryUsage": 38.0, "gpuUsage": 55.0, "gpuMemoryUsage": 20.0, "cameraConnections": [] }
- Backend -> Worker (Automatic subscription list restoration with fresh model URLs)
{ "type": "setSubscriptionList", "subscriptions": [ { "subscriptionIdentifier": "display-001;entry-cam-01", "rtspUrl": "rtsp://192.168.1.100/stream1", "modelUrl": "http://storage/models/vehicle-id.mpta?token=fresh-reconnect-token", "modelName": "Vehicle Identification", "modelId": 201, "snapshotInterval": 5000 } ] }
- Backend -> Worker (Session ID recovery)
{ "type": "setSessionId", "payload": { "displayIdentifier": "display-001", "sessionId": 12345 } }
- Backend -> Worker (Progression stage recovery)
{ "type": "setProgressionStage", "payload": { "displayIdentifier": "display-001", "progressionStage": "car_fueling" } }
-
Backend Updates Subscription List (Load balancing or system cleanup)
- Backend -> Worker (Empty list removes all subscriptions)
{ "type": "setSubscriptionList", "subscriptions": [] }
-
Worker Acknowledges Subscription Removal
- Worker -> Backend (Updated heartbeat showing no active connections after reconciliation)
{ "type": "stateReport", "cpuUsage": 68.0, "memoryUsage": 37.0, "gpuUsage": 50.0, "gpuMemoryUsage": 18.0, "cameraConnections": [] }
Key Improvements in Communication Flow:
- Fully Declarative Subscriptions: Complete subscription list sent in single command, worker handles reconciliation
- Worker-Side Reconciliation: Workers compare desired vs. current state and make necessary changes internally
- Session Context: All detection events include session IDs for proper user linking
- Progression Stages: Real-time stage updates enable context-aware content selection
- State Recovery: Complete automatic recovery of subscription lists, session IDs, and progression stages
- Fresh Model URLs: S3 URL expiration is handled transparently with 1-hour TTL tokens
- Load Balancing: Backend intelligently distributes complete subscription lists across available workers
10. HTTP API: Image Retrieval
In addition to the WebSocket protocol, the worker exposes an HTTP endpoint for retrieving the latest image frame from a camera.
Endpoint
GET /camera/{camera_id}/image
camera_id
: The fullsubscriptionIdentifier
(e.g.,display-001;cam-001
).
Response
-
Success (200): Returns the latest JPEG image from the camera stream.
Content-Type: image/jpeg
- Binary JPEG data.
-
Error (404): If the camera is not found or no frame is available.
- JSON error response.
-
Error (500): Internal server error.
Example Request
GET /camera/display-001;cam-001/image
Example Response
- Headers:
Content-Type: image/jpeg
- Body: Binary JPEG image.
Notes
- The endpoint returns the most recent frame available for the specified camera subscription.
- If multiple displays share the same camera, each subscription has its own buffer; the endpoint uses the buffer for the given
camera_id
. - This API is useful for debugging, monitoring, or integrating with external systems that require direct image access.