# pympta: Modular Pipeline Task Executor `pympta` is a Python module designed to load and execute modular, multi-stage AI pipelines defined in a special package format (`.mpta`). It is primarily used within the detector worker to run complex computer vision tasks where the output of one model can trigger a subsequent model on a specific region of interest. ## Core Concepts ### 1. MPTA Package (`.mpta`) An `.mpta` file is a standard `.zip` archive with a different extension. It bundles all the necessary components for a pipeline to run. A typical `.mpta` file has the following structure: ``` my_pipeline.mpta/ ├── pipeline.json ├── model1.pt ├── model2.pt └── ... ``` - **`pipeline.json`**: (Required) The manifest file that defines the structure of the pipeline, the models to use, and the logic connecting them. - **Model Files (`.pt`, etc.)**: The actual pre-trained model files (e.g., PyTorch, ONNX). The pipeline currently uses `ultralytics.YOLO` models. ### 2. Pipeline Structure A pipeline is a tree-like structure of "nodes," defined in `pipeline.json`. - **Root Node**: The entry point of the pipeline. It processes the initial, full-frame image. - **Branch Nodes**: Child nodes that are triggered by specific detection results from their parent. For example, a root node might detect a "vehicle," which then triggers a branch node to detect a "license plate" within the vehicle's bounding box. This modular structure allows for creating complex and efficient inference logic, avoiding the need to run every model on every frame. ## `pipeline.json` Specification This file defines the entire pipeline logic. The root object contains a `pipeline` key for the pipeline definition, optional `redis` key for Redis configuration, and optional `postgresql` key for database integration. ### Top-Level Object Structure | Key | Type | Required | Description | | ------------ | ------ | -------- | ------------------------------------------------------- | | `pipeline` | Object | Yes | The root node object of the pipeline. | | `redis` | Object | No | Configuration for connecting to a Redis server. | | `postgresql` | Object | No | Configuration for connecting to a PostgreSQL database. | ### Redis Configuration (`redis`) | Key | Type | Required | Description | | ---------- | ------ | -------- | ------------------------------------------------------- | | `host` | String | Yes | The hostname or IP address of the Redis server. | | `port` | Number | Yes | The port number of the Redis server. | | `password` | String | No | The password for Redis authentication. | | `db` | Number | No | The Redis database number to use. Defaults to `0`. | ### PostgreSQL Configuration (`postgresql`) | Key | Type | Required | Description | | ---------- | ------ | -------- | ------------------------------------------------------- | | `host` | String | Yes | The hostname or IP address of the PostgreSQL server. | | `port` | Number | Yes | The port number of the PostgreSQL server. | | `database` | String | Yes | The database name to connect to. | | `username` | String | Yes | The username for database authentication. | | `password` | String | Yes | The password for database authentication. | ### Node Object Structure | Key | Type | Required | Description | | ------------------- | ------------- | -------- | -------------------------------------------------------------------------------------------------------------------------------------- | | `modelId` | String | Yes | A unique identifier for this model node (e.g., "vehicle-detector"). | | `modelFile` | String | Yes | The path to the model file within the `.mpta` archive (e.g., "yolov8n.pt"). | | `minConfidence` | Float | Yes | The minimum confidence score (0.0 to 1.0) required for a detection to be considered valid and potentially trigger a branch. | | `triggerClasses` | Array | Yes | A list of class names that, when detected by the parent, can trigger this node. For the root node, this lists all classes of interest. | | `crop` | Boolean | No | If `true`, the image is cropped to the parent's detection bounding box before being passed to this node's model. Defaults to `false`. | | `cropClass` | String | No | The specific class to use for cropping (e.g., "Frontal" for frontal view cropping). | | `multiClass` | Boolean | No | If `true`, enables multi-class detection mode where multiple classes can be detected simultaneously. | | `expectedClasses` | Array | No | When `multiClass` is true, defines which classes are expected. At least one must be detected for processing to continue. | | `parallel` | Boolean | No | If `true`, this branch will be processed in parallel with other parallel branches. | | `branches` | Array | No | A list of child node objects that can be triggered by this node's detections. | | `actions` | Array | No | A list of actions to execute upon a successful detection in this node. | | `parallelActions` | Array | No | A list of actions to execute after all specified branches have completed. | ### Action Object Structure Actions allow the pipeline to interact with Redis and PostgreSQL databases. They are executed sequentially for a given detection. #### Action Context & Dynamic Keys All actions have access to a dynamic context for formatting keys and messages. The context is created for each detection event and includes: - All key-value pairs from the detection result (e.g., `class`, `confidence`, `id`). - `{timestamp_ms}`: The current Unix timestamp in milliseconds. - `{timestamp}`: Formatted timestamp string (YYYY-MM-DDTHH-MM-SS). - `{uuid}`: A unique identifier (UUID4) for the detection event. - `{filename}`: Generated filename with UUID. - `{camera_id}`: Full camera subscription identifier. - `{display_id}`: Display identifier extracted from subscription. - `{session_id}`: Session ID for database operations. - `{image_key}`: If a `redis_save_image` action has already been executed for this event, this placeholder will be replaced with the key where the image was stored. #### `redis_save_image` Saves the current image frame (or cropped sub-image) to a Redis key. | Key | Type | Required | Description | | ---------------- | ------ | -------- | ------------------------------------------------------------------------------------------------------- | | `type` | String | Yes | Must be `"redis_save_image"`. | | `key` | String | Yes | The Redis key to save the image to. Can contain any of the dynamic placeholders. | | `region` | String | No | Specific detected region to crop and save (e.g., "Frontal"). | | `format` | String | No | Image format: "jpeg" or "png". Defaults to "jpeg". | | `quality` | Number | No | JPEG quality (1-100). Defaults to 90. | | `expire_seconds` | Number | No | If provided, sets an expiration time (in seconds) for the Redis key. | #### `redis_publish` Publishes a message to a Redis channel. | Key | Type | Required | Description | | --------- | ------ | -------- | ------------------------------------------------------------------------------------------------------- | | `type` | String | Yes | Must be `"redis_publish"`. | | `channel` | String | Yes | The Redis channel to publish the message to. | | `message` | String | Yes | The message to publish. Can contain any of the dynamic placeholders, including `{image_key}`. | #### `postgresql_update_combined` Updates PostgreSQL database with results from multiple branches after they complete. | Key | Type | Required | Description | | ------------------ | ------------- | -------- | ------------------------------------------------------------------------------------------------------- | | `type` | String | Yes | Must be `"postgresql_update_combined"`. | | `table` | String | Yes | The database table name (will be prefixed with `gas_station_1.` schema). | | `key_field` | String | Yes | The field to use as the update key (typically "session_id"). | | `key_value` | String | Yes | Template for the key value (e.g., "{session_id}"). | | `waitForBranches` | Array | Yes | List of branch model IDs to wait for completion before executing update. | | `fields` | Object | Yes | Field mapping object where keys are database columns and values are templates (e.g., "{branch.field}").| ### Complete Example `pipeline.json` This example demonstrates a comprehensive pipeline for vehicle detection with parallel classification and database integration: ```json { "redis": { "host": "10.100.1.3", "port": 6379, "password": "your-redis-password", "db": 0 }, "postgresql": { "host": "10.100.1.3", "port": 5432, "database": "inference", "username": "root", "password": "your-db-password" }, "pipeline": { "modelId": "car_frontal_detection_v1", "modelFile": "car_frontal_detection_v1.pt", "crop": false, "triggerClasses": ["Car", "Frontal"], "minConfidence": 0.8, "multiClass": true, "expectedClasses": ["Car", "Frontal"], "actions": [ { "type": "redis_save_image", "region": "Frontal", "key": "inference:{display_id}:{timestamp}:{session_id}:{filename}", "expire_seconds": 600, "format": "jpeg", "quality": 90 }, { "type": "redis_publish", "channel": "car_detections", "message": "{\"event\":\"frontal_detected\"}" } ], "branches": [ { "modelId": "car_brand_cls_v1", "modelFile": "car_brand_cls_v1.pt", "crop": true, "cropClass": "Frontal", "resizeTarget": [224, 224], "triggerClasses": ["Frontal"], "minConfidence": 0.85, "parallel": true, "branches": [] }, { "modelId": "car_bodytype_cls_v1", "modelFile": "car_bodytype_cls_v1.pt", "crop": true, "cropClass": "Car", "resizeTarget": [224, 224], "triggerClasses": ["Car"], "minConfidence": 0.85, "parallel": true, "branches": [] } ], "parallelActions": [ { "type": "postgresql_update_combined", "table": "car_frontal_info", "key_field": "session_id", "key_value": "{session_id}", "waitForBranches": ["car_brand_cls_v1", "car_bodytype_cls_v1"], "fields": { "car_brand": "{car_brand_cls_v1.brand}", "car_body_type": "{car_bodytype_cls_v1.body_type}" } } ] } } ``` ## API Reference The `pympta` module exposes two main functions. ### `load_pipeline_from_zip(zip_source: str, target_dir: str) -> dict` Loads, extracts, and parses an `.mpta` file to build a pipeline tree in memory. It also establishes Redis and PostgreSQL connections if configured in `pipeline.json`. - **Parameters:** - `zip_source` (str): The file path to the local `.mpta` zip archive. - `target_dir` (str): A directory path where the archive's contents will be extracted. - **Returns:** - A dictionary representing the root node of the pipeline, ready to be used with `run_pipeline`. Returns `None` if loading fails. ### `run_pipeline(frame, node: dict, return_bbox: bool = False, context: dict = None)` Executes the inference pipeline on a single image frame. - **Parameters:** - `frame`: The input image frame (e.g., a NumPy array from OpenCV). - `node` (dict): The pipeline node to execute (typically the root node returned by `load_pipeline_from_zip`). - `return_bbox` (bool): If `True`, the function returns a tuple `(detection, bounding_box)`. Otherwise, it returns only the `detection`. - `context` (dict): Optional context dictionary containing camera_id, display_id, session_id for action formatting. - **Returns:** - The final detection result from the last executed node in the chain. A detection is a dictionary like `{'class': 'car', 'confidence': 0.95, 'id': 1}`. If no detection meets the criteria, it returns `None` (or `(None, None)` if `return_bbox` is `True`). ## Database Integration The pipeline system includes automatic PostgreSQL database management: ### Table Schema (`gas_station_1.car_frontal_info`) The system automatically creates and manages the following table structure: ```sql CREATE TABLE IF NOT EXISTS gas_station_1.car_frontal_info ( display_id VARCHAR(255), captured_timestamp VARCHAR(255), session_id VARCHAR(255) PRIMARY KEY, license_character VARCHAR(255) DEFAULT NULL, license_type VARCHAR(255) DEFAULT 'No model available', car_brand VARCHAR(255) DEFAULT NULL, car_model VARCHAR(255) DEFAULT NULL, car_body_type VARCHAR(255) DEFAULT NULL, created_at TIMESTAMP DEFAULT NOW(), updated_at TIMESTAMP DEFAULT NOW() ); ``` ### Workflow 1. **Initial Record Creation**: When both "Car" and "Frontal" are detected, an initial database record is created with a UUID session_id. 2. **Redis Storage**: Vehicle images are stored in Redis with keys containing the session_id. 3. **Parallel Classification**: Brand and body type classification run concurrently. 4. **Database Update**: After all branches complete, the database record is updated with classification results. ## Usage Example This snippet shows how to use `pympta` with the enhanced features: ```python import cv2 from siwatsystem.pympta import load_pipeline_from_zip, run_pipeline # 1. Define paths MPTA_FILE = "path/to/your/pipeline.mpta" CACHE_DIR = ".mptacache" # 2. Load the pipeline from the .mpta file # This reads pipeline.json and loads the YOLO models into memory. model_tree = load_pipeline_from_zip(MPTA_FILE, CACHE_DIR) if not model_tree: print("Failed to load pipeline.") exit() # 3. Open a video source cap = cv2.VideoCapture(0) while True: ret, frame = cap.read() if not ret: break # 4. Run the pipeline on the current frame with context context = { "camera_id": "display-001;cam-001", "display_id": "display-001", "session_id": None # Will be generated automatically } detection_result, bounding_box = run_pipeline(frame, model_tree, return_bbox=True, context=context) # 5. Display the results if detection_result: print(f"Detected: {detection_result['class']} with confidence {detection_result['confidence']:.2f}") if bounding_box: x1, y1, x2, y2 = bounding_box cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2) cv2.putText(frame, detection_result['class'], (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (36, 255, 12), 2) cv2.imshow("Pipeline Output", frame) if cv2.waitKey(1) & 0xFF == ord('q'): break cap.release() cv2.destroyAllWindows() ```