diff --git a/pympta.md b/pympta.md index 2136480..ac61f4a 100644 --- a/pympta.md +++ b/pympta.md @@ -64,16 +64,26 @@ This file defines the entire pipeline logic. The root object contains a `pipelin ### Action Object Structure -Actions allow the pipeline to interact with Redis. +Actions allow the pipeline to interact with Redis. They are executed sequentially for a given detection. + +#### Action Context & Dynamic Keys + +All actions have access to a dynamic context for formatting keys and messages. The context is created for each detection event and includes: + +- All key-value pairs from the detection result (e.g., `class`, `confidence`, `id`). +- `{timestamp_ms}`: The current Unix timestamp in milliseconds. +- `{uuid}`: A unique identifier (UUID4) for the detection event. +- `{image_key}`: If a `redis_save_image` action has already been executed for this event, this placeholder will be replaced with the key where the image was stored. #### `redis_save_image` Saves the current image frame (or cropped sub-image) to a Redis key. -| Key | Type | Required | Description | -| ----- | ------ | -------- | ------------------------------------------------------------------------------------------------------- | -| `type`| String | Yes | Must be `"redis_save_image"`. | -| `key` | String | Yes | The Redis key to save the image to. Can contain placeholders like `{class}` or `{id}` to be formatted with detection results. | +| Key | Type | Required | Description | +| ---------------- | ------ | -------- | ------------------------------------------------------------------------------------------------------- | +| `type` | String | Yes | Must be `"redis_save_image"`. | +| `key` | String | Yes | The Redis key to save the image to. Can contain any of the dynamic placeholders. | +| `expire_seconds` | Number | No | If provided, sets an expiration time (in seconds) for the Redis key. | #### `redis_publish` @@ -83,43 +93,37 @@ Publishes a message to a Redis channel. | --------- | ------ | -------- | ------------------------------------------------------------------------------------------------------- | | `type` | String | Yes | Must be `"redis_publish"`. | | `channel` | String | Yes | The Redis channel to publish the message to. | -| `message` | String | Yes | The message to publish. Can contain placeholders like `{class}` or `{id}` to be formatted with detection results. | +| `message` | String | Yes | The message to publish. Can contain any of the dynamic placeholders, including `{image_key}`. | ### Example `pipeline.json` with Redis +This example demonstrates a pipeline that detects vehicles, saves a uniquely named image of each detection that expires in one hour, and then publishes a notification with the image key. + ```json { "redis": { - "host": "localhost", + "host": "redis.local", "port": 6379, - "password": "your-password" + "password": "your-super-secret-password" }, "pipeline": { "modelId": "vehicle-detector", "modelFile": "vehicle_model.pt", - "minConfidence": 0.5, + "minConfidence": 0.6, "triggerClasses": ["car", "truck"], "actions": [ { "type": "redis_save_image", - "key": "detection:image:{id}" + "key": "detections:{class}:{timestamp_ms}:{uuid}", + "expire_seconds": 3600 }, { "type": "redis_publish", - "channel": "detections", - "message": "Detected a {class} with ID {id}" + "channel": "vehicle_events", + "message": "{\"event\":\"new_detection\",\"class\":\"{class}\",\"confidence\":{confidence},\"image_key\":\"{image_key}\"}" } ], - "branches": [ - { - "modelId": "lpr-us", - "modelFile": "lpr_model.pt", - "minConfidence": 0.7, - "triggerClasses": ["car"], - "crop": true, - "branches": [] - } - ] + "branches": [] } } ``` diff --git a/siwatsystem/pympta.py b/siwatsystem/pympta.py index 4514182..f151b55 100644 --- a/siwatsystem/pympta.py +++ b/siwatsystem/pympta.py @@ -8,6 +8,8 @@ import zipfile import shutil import traceback import redis +import time +import uuid from ultralytics import YOLO from urllib.parse import urlparse @@ -195,16 +197,30 @@ def execute_actions(node, frame, detection_result): if not node["redis_client"] or not node["actions"]: return + # Create a dynamic context for this detection event + action_context = { + **detection_result, + "timestamp_ms": int(time.time() * 1000), + "uuid": str(uuid.uuid4()), + } + for action in node["actions"]: try: if action["type"] == "redis_save_image": - key = action["key"].format(**detection_result) + key = action["key"].format(**action_context) _, buffer = cv2.imencode('.jpg', frame) - node["redis_client"].set(key, buffer.tobytes()) - logger.info(f"Saved image to Redis with key: {key}") + expire_seconds = action.get("expire_seconds") + if expire_seconds: + node["redis_client"].setex(key, expire_seconds, buffer.tobytes()) + logger.info(f"Saved image to Redis with key: {key} (expires in {expire_seconds}s)") + else: + node["redis_client"].set(key, buffer.tobytes()) + logger.info(f"Saved image to Redis with key: {key}") + # Add the generated key to the context for subsequent actions + action_context["image_key"] = key elif action["type"] == "redis_publish": channel = action["channel"] - message = action["message"].format(**detection_result) + message = action["message"].format(**action_context) node["redis_client"].publish(channel, message) logger.info(f"Published message to Redis channel '{channel}': {message}") except Exception as e: