diff --git a/worker.md b/worker.md index 302c8ce..c485db5 100644 --- a/worker.md +++ b/worker.md @@ -2,12 +2,6 @@ This document outlines the WebSocket-based communication protocol between the CMS backend and a detector worker. As a worker developer, your primary responsibility is to implement a WebSocket server that adheres to this protocol. -The current Python Detector Worker implementation supports advanced computer vision pipelines with: -- Multi-class YOLO detection with parallel processing -- PostgreSQL database integration with automatic schema management -- Redis integration for image storage and pub/sub messaging -- Hierarchical pipeline execution with detection → classification branching - ## 1. Connection The worker must run a WebSocket server, preferably on port `8000`. The backend system, which is managed by a container orchestration service, will automatically discover and establish a WebSocket connection to your worker. @@ -31,34 +25,14 @@ To enable modularity and dynamic configuration, the backend will send you a URL 2. Extracting its contents. 3. Interpreting the contents to configure its internal pipeline. -**The current implementation supports comprehensive pipeline configurations including:** +**The contents of the `.mpta` file are entirely up to the user who configures the model in the CMS.** This allows for maximum flexibility. For example, the archive could contain: -- **AI/ML Models**: YOLO models (.pt files) for detection and classification -- **Pipeline Configuration**: `pipeline.json` defining hierarchical detection→classification workflows -- **Multi-class Detection**: Simultaneous detection of multiple object classes (e.g., Car + Frontal) -- **Parallel Processing**: Concurrent execution of classification branches with ThreadPoolExecutor -- **Database Integration**: PostgreSQL configuration for automatic table creation and updates -- **Redis Actions**: Image storage with region cropping and pub/sub messaging -- **Dynamic Field Mapping**: Template-based field resolution for database operations +- AI/ML Models: Pre-trained models for libraries like TensorFlow, PyTorch, or ONNX. +- Configuration Files: A `config.json` or `pipeline.yaml` that defines a sequence of operations, specifies model paths, or sets detection thresholds. +- Scripts: Custom Python scripts for pre-processing or post-processing. +- API Integration Details: A JSON file with endpoint information and credentials for interacting with third-party detection services. -**Enhanced MPTA Structure:** -``` -pipeline.mpta/ -├── pipeline.json # Main configuration with redis/postgresql settings -├── car_detection.pt # Primary YOLO detection model -├── brand_classifier.pt # Classification model for car brands -├── bodytype_classifier.pt # Classification model for body types -└── ... -``` - -The `pipeline.json` now supports advanced features like: -- Multi-class detection with `expectedClasses` validation -- Parallel branch processing with `parallel: true` -- Database actions with `postgresql_update_combined` -- Redis actions with region-specific image cropping -- Branch synchronization with `waitForBranches` - -Essentially, the `.mpta` file is a self-contained package that tells your worker *how* to process the video stream for a given subscription, including complex multi-stage AI pipelines with database persistence. +Essentially, the `.mpta` file is a self-contained package that tells your worker _how_ to process the video stream for a given subscription. ## 4. Messages from Worker to Backend @@ -105,15 +79,6 @@ Sent when the worker detects a relevant object. The `detection` object should be - **Type:** `imageDetection` -**Enhanced Detection Capabilities:** - -The current implementation supports multi-class detection with parallel classification processing. When a vehicle is detected, the system: - -1. **Multi-Class Detection**: Simultaneously detects "Car" and "Frontal" classes -2. **Parallel Processing**: Runs brand and body type classification concurrently -3. **Database Integration**: Automatically creates and updates PostgreSQL records -4. **Redis Storage**: Saves cropped frontal images with expiration - **Payload Example:** ```json @@ -123,38 +88,19 @@ The current implementation supports multi-class detection with parallel classifi "timestamp": "2025-07-14T12:34:56.789Z", "data": { "detection": { - "class": "Car", - "confidence": 0.92, - "carBrand": "Honda", "carModel": "Civic", + "carBrand": "Honda", + "carYear": 2023, "bodyType": "Sedan", - "branch_results": { - "car_brand_cls_v1": { - "class": "Honda", - "confidence": 0.89, - "brand": "Honda" - }, - "car_bodytype_cls_v1": { - "class": "Sedan", - "confidence": 0.85, - "body_type": "Sedan" - } - } + "licensePlateText": "ABCD1234", + "licensePlateConfidence": 0.95 }, "modelId": 101, - "modelName": "Car Frontal Detection V1" + "modelName": "US-LPR-and-Vehicle-ID" } } ``` -**Database Integration:** - -Each detection automatically: -- Creates a record in `gas_station_1.car_frontal_info` table -- Generates a unique `session_id` for tracking -- Updates the record with classification results after parallel processing completes -- Stores cropped frontal images in Redis with the session_id as key - ### 4.3. Patch Session > **Note:** Patch messages are only used when the worker can't keep up and needs to retroactively send detections. Normally, detections should be sent in real-time using `imageDetection` messages. Use `patchSession` only to update session data after the fact. @@ -171,9 +117,9 @@ Allows the worker to request a modification to an active session's data. The `da "sessionId": 12345, "data": { "currentCar": { - "carModel": "Civic", - "carBrand": "Honda", - "licensePlateText": "ABCD1234" + "carModel": "Civic", + "carBrand": "Honda", + "licensePlateText": "ABCD1234" } } } @@ -187,24 +133,33 @@ The `data` object in the `patchSession` message is merged with the existing `Dis ```typescript interface DisplayPersistentData { - progressionStage: "welcome" | "car_fueling" | "car_waitpayment" | "car_postpayment" | null; - qrCode: string | null; - adsPlayback: { - playlistSlotOrder: number; // The 'order' of the current slot - adsId: number | null; - adsUrl: string | null; - } | null; - currentCar: { - carModel?: string; - carBrand?: string; - carYear?: number; - bodyType?: string; - licensePlateText?: string; - licensePlateType?: string; - } | null; - fuelPump: { /* FuelPumpData structure */ } | null; - weatherData: { /* WeatherResponse structure */ } | null; - sessionId: number | null; + progressionStage: + | 'welcome' + | 'car_fueling' + | 'car_waitpayment' + | 'car_postpayment' + | null; + qrCode: string | null; + adsPlayback: { + playlistSlotOrder: number; // The 'order' of the current slot + adsId: number | null; + adsUrl: string | null; + } | null; + currentCar: { + carModel?: string; + carBrand?: string; + carYear?: number; + bodyType?: string; + licensePlateText?: string; + licensePlateType?: string; + } | null; + fuelPump: { + /* FuelPumpData structure */ + } | null; + weatherData: { + /* WeatherResponse structure */ + } | null; + sessionId: number | null; } ``` @@ -257,7 +212,7 @@ Instructs the worker to process a camera's RTSP stream using the configuration f > - Capture each snapshot only once per cycle, and reuse it for all display subscriptions sharing that camera. > - Capture each frame/image only once per cycle. > - Reuse the same captured image and snapshot for all display subscriptions that share the camera, processing and routing detection results separately for each display as needed. -> This avoids unnecessary load and bandwidth usage, and ensures consistent detection results and snapshots across all displays sharing the same camera. +> This avoids unnecessary load and bandwidth usage, and ensures consistent detection results and snapshots across all displays sharing the same camera. ### 5.2. Unsubscribe from Camera @@ -369,7 +324,7 @@ This section shows a typical sequence of messages between the backend and the wo > **Note:** Unsubscribe is triggered when a user removes a camera or when the node is too heavily loaded and needs rebalancing. 1. **Connection Established** & **Heartbeat** - * **Worker -> Backend** + - **Worker -> Backend** ```json { "type": "stateReport", @@ -381,7 +336,7 @@ This section shows a typical sequence of messages between the backend and the wo } ``` 2. **Backend Subscribes Camera** - * **Backend -> Worker** + - **Backend -> Worker** ```json { "type": "subscribe", @@ -395,7 +350,7 @@ This section shows a typical sequence of messages between the backend and the wo } ``` 3. **Worker Acknowledges in Heartbeat** - * **Worker -> Backend** + - **Worker -> Backend** ```json { "type": "stateReport", @@ -414,7 +369,7 @@ This section shows a typical sequence of messages between the backend and the wo } ``` 4. **Worker Detects a Car** - * **Worker -> Backend** + - **Worker -> Backend** ```json { "type": "imageDetection", @@ -433,7 +388,7 @@ This section shows a typical sequence of messages between the backend and the wo } } ``` - * **Worker -> Backend** + - **Worker -> Backend** ```json { "type": "imageDetection", @@ -452,7 +407,7 @@ This section shows a typical sequence of messages between the backend and the wo } } ``` - * **Worker -> Backend** + - **Worker -> Backend** ```json { "type": "imageDetection", @@ -472,7 +427,7 @@ This section shows a typical sequence of messages between the backend and the wo } ``` 5. **Backend Unsubscribes Camera** - * **Backend -> Worker** + - **Backend -> Worker** ```json { "type": "unsubscribe", @@ -482,7 +437,7 @@ This section shows a typical sequence of messages between the backend and the wo } ``` 6. **Worker Acknowledges Unsubscription** - * **Worker -> Backend** + - **Worker -> Backend** ```json { "type": "stateReport", @@ -493,6 +448,7 @@ This section shows a typical sequence of messages between the backend and the wo "cameraConnections": [] } ``` + ## 7. HTTP API: Image Retrieval In addition to the WebSocket protocol, the worker exposes an HTTP endpoint for retrieving the latest image frame from a camera. @@ -508,11 +464,13 @@ GET /camera/{camera_id}/image ### Response - **Success (200):** Returns the latest JPEG image from the camera stream. - - `Content-Type: image/jpeg` - - Binary JPEG data. + + - `Content-Type: image/jpeg` + - Binary JPEG data. - **Error (404):** If the camera is not found or no frame is available. - - JSON error response. + + - JSON error response. - **Error (500):** Internal server error. @@ -525,9 +483,9 @@ GET /camera/display-001;cam-001/image ### Example Response - **Headers:** - ``` - Content-Type: image/jpeg - ``` + ``` + Content-Type: image/jpeg + ``` - **Body:** Binary JPEG image. ### Notes