Revert worker.md
All checks were successful
Build Worker Base and Application Images / check-base-changes (push) Successful in 7s
Build Worker Base and Application Images / build-base (push) Has been skipped
Build Worker Base and Application Images / build-docker (push) Successful in 2m12s
Build Worker Base and Application Images / deploy-stack (push) Successful in 8s

This commit is contained in:
ziesorx 2025-08-10 22:47:16 +07:00
parent cfc7503a14
commit 416db7a33a

110
worker.md
View file

@ -2,12 +2,6 @@
This document outlines the WebSocket-based communication protocol between the CMS backend and a detector worker. As a worker developer, your primary responsibility is to implement a WebSocket server that adheres to this protocol. This document outlines the WebSocket-based communication protocol between the CMS backend and a detector worker. As a worker developer, your primary responsibility is to implement a WebSocket server that adheres to this protocol.
The current Python Detector Worker implementation supports advanced computer vision pipelines with:
- Multi-class YOLO detection with parallel processing
- PostgreSQL database integration with automatic schema management
- Redis integration for image storage and pub/sub messaging
- Hierarchical pipeline execution with detection → classification branching
## 1. Connection ## 1. Connection
The worker must run a WebSocket server, preferably on port `8000`. The backend system, which is managed by a container orchestration service, will automatically discover and establish a WebSocket connection to your worker. The worker must run a WebSocket server, preferably on port `8000`. The backend system, which is managed by a container orchestration service, will automatically discover and establish a WebSocket connection to your worker.
@ -31,34 +25,14 @@ To enable modularity and dynamic configuration, the backend will send you a URL
2. Extracting its contents. 2. Extracting its contents.
3. Interpreting the contents to configure its internal pipeline. 3. Interpreting the contents to configure its internal pipeline.
**The current implementation supports comprehensive pipeline configurations including:** **The contents of the `.mpta` file are entirely up to the user who configures the model in the CMS.** This allows for maximum flexibility. For example, the archive could contain:
- **AI/ML Models**: YOLO models (.pt files) for detection and classification - AI/ML Models: Pre-trained models for libraries like TensorFlow, PyTorch, or ONNX.
- **Pipeline Configuration**: `pipeline.json` defining hierarchical detection→classification workflows - Configuration Files: A `config.json` or `pipeline.yaml` that defines a sequence of operations, specifies model paths, or sets detection thresholds.
- **Multi-class Detection**: Simultaneous detection of multiple object classes (e.g., Car + Frontal) - Scripts: Custom Python scripts for pre-processing or post-processing.
- **Parallel Processing**: Concurrent execution of classification branches with ThreadPoolExecutor - API Integration Details: A JSON file with endpoint information and credentials for interacting with third-party detection services.
- **Database Integration**: PostgreSQL configuration for automatic table creation and updates
- **Redis Actions**: Image storage with region cropping and pub/sub messaging
- **Dynamic Field Mapping**: Template-based field resolution for database operations
**Enhanced MPTA Structure:** Essentially, the `.mpta` file is a self-contained package that tells your worker _how_ to process the video stream for a given subscription.
```
pipeline.mpta/
├── pipeline.json # Main configuration with redis/postgresql settings
├── car_detection.pt # Primary YOLO detection model
├── brand_classifier.pt # Classification model for car brands
├── bodytype_classifier.pt # Classification model for body types
└── ...
```
The `pipeline.json` now supports advanced features like:
- Multi-class detection with `expectedClasses` validation
- Parallel branch processing with `parallel: true`
- Database actions with `postgresql_update_combined`
- Redis actions with region-specific image cropping
- Branch synchronization with `waitForBranches`
Essentially, the `.mpta` file is a self-contained package that tells your worker *how* to process the video stream for a given subscription, including complex multi-stage AI pipelines with database persistence.
## 4. Messages from Worker to Backend ## 4. Messages from Worker to Backend
@ -105,15 +79,6 @@ Sent when the worker detects a relevant object. The `detection` object should be
- **Type:** `imageDetection` - **Type:** `imageDetection`
**Enhanced Detection Capabilities:**
The current implementation supports multi-class detection with parallel classification processing. When a vehicle is detected, the system:
1. **Multi-Class Detection**: Simultaneously detects "Car" and "Frontal" classes
2. **Parallel Processing**: Runs brand and body type classification concurrently
3. **Database Integration**: Automatically creates and updates PostgreSQL records
4. **Redis Storage**: Saves cropped frontal images with expiration
**Payload Example:** **Payload Example:**
```json ```json
@ -123,38 +88,19 @@ The current implementation supports multi-class detection with parallel classifi
"timestamp": "2025-07-14T12:34:56.789Z", "timestamp": "2025-07-14T12:34:56.789Z",
"data": { "data": {
"detection": { "detection": {
"class": "Car",
"confidence": 0.92,
"carBrand": "Honda",
"carModel": "Civic", "carModel": "Civic",
"carBrand": "Honda",
"carYear": 2023,
"bodyType": "Sedan", "bodyType": "Sedan",
"branch_results": { "licensePlateText": "ABCD1234",
"car_brand_cls_v1": { "licensePlateConfidence": 0.95
"class": "Honda",
"confidence": 0.89,
"brand": "Honda"
},
"car_bodytype_cls_v1": {
"class": "Sedan",
"confidence": 0.85,
"body_type": "Sedan"
}
}
}, },
"modelId": 101, "modelId": 101,
"modelName": "Car Frontal Detection V1" "modelName": "US-LPR-and-Vehicle-ID"
} }
} }
``` ```
**Database Integration:**
Each detection automatically:
- Creates a record in `gas_station_1.car_frontal_info` table
- Generates a unique `session_id` for tracking
- Updates the record with classification results after parallel processing completes
- Stores cropped frontal images in Redis with the session_id as key
### 4.3. Patch Session ### 4.3. Patch Session
> **Note:** Patch messages are only used when the worker can't keep up and needs to retroactively send detections. Normally, detections should be sent in real-time using `imageDetection` messages. Use `patchSession` only to update session data after the fact. > **Note:** Patch messages are only used when the worker can't keep up and needs to retroactively send detections. Normally, detections should be sent in real-time using `imageDetection` messages. Use `patchSession` only to update session data after the fact.
@ -187,7 +133,12 @@ The `data` object in the `patchSession` message is merged with the existing `Dis
```typescript ```typescript
interface DisplayPersistentData { interface DisplayPersistentData {
progressionStage: "welcome" | "car_fueling" | "car_waitpayment" | "car_postpayment" | null; progressionStage:
| 'welcome'
| 'car_fueling'
| 'car_waitpayment'
| 'car_postpayment'
| null;
qrCode: string | null; qrCode: string | null;
adsPlayback: { adsPlayback: {
playlistSlotOrder: number; // The 'order' of the current slot playlistSlotOrder: number; // The 'order' of the current slot
@ -202,8 +153,12 @@ interface DisplayPersistentData {
licensePlateText?: string; licensePlateText?: string;
licensePlateType?: string; licensePlateType?: string;
} | null; } | null;
fuelPump: { /* FuelPumpData structure */ } | null; fuelPump: {
weatherData: { /* WeatherResponse structure */ } | null; /* FuelPumpData structure */
} | null;
weatherData: {
/* WeatherResponse structure */
} | null;
sessionId: number | null; sessionId: number | null;
} }
``` ```
@ -369,7 +324,7 @@ This section shows a typical sequence of messages between the backend and the wo
> **Note:** Unsubscribe is triggered when a user removes a camera or when the node is too heavily loaded and needs rebalancing. > **Note:** Unsubscribe is triggered when a user removes a camera or when the node is too heavily loaded and needs rebalancing.
1. **Connection Established** & **Heartbeat** 1. **Connection Established** & **Heartbeat**
* **Worker -> Backend** - **Worker -> Backend**
```json ```json
{ {
"type": "stateReport", "type": "stateReport",
@ -381,7 +336,7 @@ This section shows a typical sequence of messages between the backend and the wo
} }
``` ```
2. **Backend Subscribes Camera** 2. **Backend Subscribes Camera**
* **Backend -> Worker** - **Backend -> Worker**
```json ```json
{ {
"type": "subscribe", "type": "subscribe",
@ -395,7 +350,7 @@ This section shows a typical sequence of messages between the backend and the wo
} }
``` ```
3. **Worker Acknowledges in Heartbeat** 3. **Worker Acknowledges in Heartbeat**
* **Worker -> Backend** - **Worker -> Backend**
```json ```json
{ {
"type": "stateReport", "type": "stateReport",
@ -414,7 +369,7 @@ This section shows a typical sequence of messages between the backend and the wo
} }
``` ```
4. **Worker Detects a Car** 4. **Worker Detects a Car**
* **Worker -> Backend** - **Worker -> Backend**
```json ```json
{ {
"type": "imageDetection", "type": "imageDetection",
@ -433,7 +388,7 @@ This section shows a typical sequence of messages between the backend and the wo
} }
} }
``` ```
* **Worker -> Backend** - **Worker -> Backend**
```json ```json
{ {
"type": "imageDetection", "type": "imageDetection",
@ -452,7 +407,7 @@ This section shows a typical sequence of messages between the backend and the wo
} }
} }
``` ```
* **Worker -> Backend** - **Worker -> Backend**
```json ```json
{ {
"type": "imageDetection", "type": "imageDetection",
@ -472,7 +427,7 @@ This section shows a typical sequence of messages between the backend and the wo
} }
``` ```
5. **Backend Unsubscribes Camera** 5. **Backend Unsubscribes Camera**
* **Backend -> Worker** - **Backend -> Worker**
```json ```json
{ {
"type": "unsubscribe", "type": "unsubscribe",
@ -482,7 +437,7 @@ This section shows a typical sequence of messages between the backend and the wo
} }
``` ```
6. **Worker Acknowledges Unsubscription** 6. **Worker Acknowledges Unsubscription**
* **Worker -> Backend** - **Worker -> Backend**
```json ```json
{ {
"type": "stateReport", "type": "stateReport",
@ -493,6 +448,7 @@ This section shows a typical sequence of messages between the backend and the wo
"cameraConnections": [] "cameraConnections": []
} }
``` ```
## 7. HTTP API: Image Retrieval ## 7. HTTP API: Image Retrieval
In addition to the WebSocket protocol, the worker exposes an HTTP endpoint for retrieving the latest image frame from a camera. In addition to the WebSocket protocol, the worker exposes an HTTP endpoint for retrieving the latest image frame from a camera.
@ -508,10 +464,12 @@ GET /camera/{camera_id}/image
### Response ### Response
- **Success (200):** Returns the latest JPEG image from the camera stream. - **Success (200):** Returns the latest JPEG image from the camera stream.
- `Content-Type: image/jpeg` - `Content-Type: image/jpeg`
- Binary JPEG data. - Binary JPEG data.
- **Error (404):** If the camera is not found or no frame is available. - **Error (404):** If the camera is not found or no frame is available.
- JSON error response. - JSON error response.
- **Error (500):** Internal server error. - **Error (500):** Internal server error.