python-detector-worker/pympta.md
Siwat Sirichai f50585f26d
All checks were successful
Build Backend Application and Docker Image / build-docker (push) Successful in 9m3s
feat: enhance Redis action handling; add dynamic context for actions and support for expiration time
2025-07-15 00:35:22 +07:00

204 lines
No EOL
10 KiB
Markdown

# pympta: Modular Pipeline Task Executor
`pympta` is a Python module designed to load and execute modular, multi-stage AI pipelines defined in a special package format (`.mpta`). It is primarily used within the detector worker to run complex computer vision tasks where the output of one model can trigger a subsequent model on a specific region of interest.
## Core Concepts
### 1. MPTA Package (`.mpta`)
An `.mpta` file is a standard `.zip` archive with a different extension. It bundles all the necessary components for a pipeline to run.
A typical `.mpta` file has the following structure:
```
my_pipeline.mpta/
├── pipeline.json
├── model1.pt
├── model2.pt
└── ...
```
- **`pipeline.json`**: (Required) The manifest file that defines the structure of the pipeline, the models to use, and the logic connecting them.
- **Model Files (`.pt`, etc.)**: The actual pre-trained model files (e.g., PyTorch, ONNX). The pipeline currently uses `ultralytics.YOLO` models.
### 2. Pipeline Structure
A pipeline is a tree-like structure of "nodes," defined in `pipeline.json`.
- **Root Node**: The entry point of the pipeline. It processes the initial, full-frame image.
- **Branch Nodes**: Child nodes that are triggered by specific detection results from their parent. For example, a root node might detect a "vehicle," which then triggers a branch node to detect a "license plate" within the vehicle's bounding box.
This modular structure allows for creating complex and efficient inference logic, avoiding the need to run every model on every frame.
## `pipeline.json` Specification
This file defines the entire pipeline logic. The root object contains a `pipeline` key for the pipeline definition and an optional `redis` key for Redis configuration.
### Top-Level Object Structure
| Key | Type | Required | Description |
| ---------- | ------ | -------- | ------------------------------------------------------- |
| `pipeline` | Object | Yes | The root node object of the pipeline. |
| `redis` | Object | No | Configuration for connecting to a Redis server. |
### Redis Configuration (`redis`)
| Key | Type | Required | Description |
| ---------- | ------ | -------- | ------------------------------------------------------- |
| `host` | String | Yes | The hostname or IP address of the Redis server. |
| `port` | Number | Yes | The port number of the Redis server. |
| `password` | String | No | The password for Redis authentication. |
| `db` | Number | No | The Redis database number to use. Defaults to `0`. |
### Node Object Structure
| Key | Type | Required | Description |
| ------------------- | ------------- | -------- | -------------------------------------------------------------------------------------------------------------------------------------- |
| `modelId` | String | Yes | A unique identifier for this model node (e.g., "vehicle-detector"). |
| `modelFile` | String | Yes | The path to the model file within the `.mpta` archive (e.g., "yolov8n.pt"). |
| `minConfidence` | Float | Yes | The minimum confidence score (0.0 to 1.0) required for a detection to be considered valid and potentially trigger a branch. |
| `triggerClasses` | Array<String> | Yes | A list of class names that, when detected by the parent, can trigger this node. For the root node, this lists all classes of interest. |
| `crop` | Boolean | No | If `true`, the image is cropped to the parent's detection bounding box before being passed to this node's model. Defaults to `false`. |
| `branches` | Array<Node> | No | A list of child node objects that can be triggered by this node's detections. |
| `actions` | Array<Action> | No | A list of actions to execute upon a successful detection in this node. |
### Action Object Structure
Actions allow the pipeline to interact with Redis. They are executed sequentially for a given detection.
#### Action Context & Dynamic Keys
All actions have access to a dynamic context for formatting keys and messages. The context is created for each detection event and includes:
- All key-value pairs from the detection result (e.g., `class`, `confidence`, `id`).
- `{timestamp_ms}`: The current Unix timestamp in milliseconds.
- `{uuid}`: A unique identifier (UUID4) for the detection event.
- `{image_key}`: If a `redis_save_image` action has already been executed for this event, this placeholder will be replaced with the key where the image was stored.
#### `redis_save_image`
Saves the current image frame (or cropped sub-image) to a Redis key.
| Key | Type | Required | Description |
| ---------------- | ------ | -------- | ------------------------------------------------------------------------------------------------------- |
| `type` | String | Yes | Must be `"redis_save_image"`. |
| `key` | String | Yes | The Redis key to save the image to. Can contain any of the dynamic placeholders. |
| `expire_seconds` | Number | No | If provided, sets an expiration time (in seconds) for the Redis key. |
#### `redis_publish`
Publishes a message to a Redis channel.
| Key | Type | Required | Description |
| --------- | ------ | -------- | ------------------------------------------------------------------------------------------------------- |
| `type` | String | Yes | Must be `"redis_publish"`. |
| `channel` | String | Yes | The Redis channel to publish the message to. |
| `message` | String | Yes | The message to publish. Can contain any of the dynamic placeholders, including `{image_key}`. |
### Example `pipeline.json` with Redis
This example demonstrates a pipeline that detects vehicles, saves a uniquely named image of each detection that expires in one hour, and then publishes a notification with the image key.
```json
{
"redis": {
"host": "redis.local",
"port": 6379,
"password": "your-super-secret-password"
},
"pipeline": {
"modelId": "vehicle-detector",
"modelFile": "vehicle_model.pt",
"minConfidence": 0.6,
"triggerClasses": ["car", "truck"],
"actions": [
{
"type": "redis_save_image",
"key": "detections:{class}:{timestamp_ms}:{uuid}",
"expire_seconds": 3600
},
{
"type": "redis_publish",
"channel": "vehicle_events",
"message": "{\"event\":\"new_detection\",\"class\":\"{class}\",\"confidence\":{confidence},\"image_key\":\"{image_key}\"}"
}
],
"branches": []
}
}
```
## API Reference
The `pympta` module exposes two main functions.
### `load_pipeline_from_zip(zip_source: str, target_dir: str) -> dict`
Loads, extracts, and parses an `.mpta` file to build a pipeline tree in memory. It also establishes a Redis connection if configured in `pipeline.json`.
- **Parameters:**
- `zip_source` (str): The file path to the local `.mpta` zip archive.
- `target_dir` (str): A directory path where the archive's contents will be extracted.
- **Returns:**
- A dictionary representing the root node of the pipeline, ready to be used with `run_pipeline`. Returns `None` if loading fails.
### `run_pipeline(frame, node: dict, return_bbox: bool = False)`
Executes the inference pipeline on a single image frame.
- **Parameters:**
- `frame`: The input image frame (e.g., a NumPy array from OpenCV).
- `node` (dict): The pipeline node to execute (typically the root node returned by `load_pipeline_from_zip`).
- `return_bbox` (bool): If `True`, the function returns a tuple `(detection, bounding_box)`. Otherwise, it returns only the `detection`.
- **Returns:**
- The final detection result from the last executed node in the chain. A detection is a dictionary like `{'class': 'car', 'confidence': 0.95, 'id': 1}`. If no detection meets the criteria, it returns `None` (or `(None, None)` if `return_bbox` is `True`).
## Usage Example
This snippet, inspired by `pipeline_webcam.py`, shows how to use `pympta` to load a pipeline and process an image from a webcam.
```python
import cv2
from siwatsystem.pympta import load_pipeline_from_zip, run_pipeline
# 1. Define paths
MPTA_FILE = "path/to/your/pipeline.mpta"
CACHE_DIR = ".mptacache"
# 2. Load the pipeline from the .mpta file
# This reads pipeline.json and loads the YOLO models into memory.
model_tree = load_pipeline_from_zip(MPTA_FILE, CACHE_DIR)
if not model_tree:
print("Failed to load pipeline.")
exit()
# 3. Open a video source
cap = cv2.VideoCapture(0)
while True:
ret, frame = cap.read()
if not ret:
break
# 4. Run the pipeline on the current frame
# The function will handle the entire logic tree (e.g., find a car, then find its license plate).
detection_result, bounding_box = run_pipeline(frame, model_tree, return_bbox=True)
# 5. Display the results
if detection_result:
print(f"Detected: {detection_result['class']} with confidence {detection_result['confidence']:.2f}")
if bounding_box:
x1, y1, x2, y2 = bounding_box
cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
cv2.putText(frame, detection_result['class'], (x1, y1 - 10),
cv2.FONT_HERSHEY_SIMPLEX, 0.9, (36, 255, 12), 2)
cv2.imshow("Pipeline Output", frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
cap.release()
cv2.destroyAllWindows()
```