feat: enhance Redis action handling; add dynamic context for actions and support for expiration time
	
		
			
	
		
	
	
		
	
		
			All checks were successful
		
		
	
	
		
			
				
	
				Build Backend Application and Docker Image / build-docker (push) Successful in 9m3s
				
			
		
		
	
	
				
					
				
			
		
			All checks were successful
		
		
	
	Build Backend Application and Docker Image / build-docker (push) Successful in 9m3s
				
			This commit is contained in:
		
							parent
							
								
									769371a1a3
								
							
						
					
					
						commit
						f50585f26d
					
				
					 2 changed files with 46 additions and 26 deletions
				
			
		
							
								
								
									
										42
									
								
								pympta.md
									
										
									
									
									
								
							
							
						
						
									
										42
									
								
								pympta.md
									
										
									
									
									
								
							| 
						 | 
					@ -64,16 +64,26 @@ This file defines the entire pipeline logic. The root object contains a `pipelin
 | 
				
			||||||
 | 
					
 | 
				
			||||||
### Action Object Structure
 | 
					### Action Object Structure
 | 
				
			||||||
 | 
					
 | 
				
			||||||
Actions allow the pipeline to interact with Redis.
 | 
					Actions allow the pipeline to interact with Redis. They are executed sequentially for a given detection.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					#### Action Context & Dynamic Keys
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					All actions have access to a dynamic context for formatting keys and messages. The context is created for each detection event and includes:
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					- All key-value pairs from the detection result (e.g., `class`, `confidence`, `id`).
 | 
				
			||||||
 | 
					- `{timestamp_ms}`: The current Unix timestamp in milliseconds.
 | 
				
			||||||
 | 
					- `{uuid}`: A unique identifier (UUID4) for the detection event.
 | 
				
			||||||
 | 
					- `{image_key}`: If a `redis_save_image` action has already been executed for this event, this placeholder will be replaced with the key where the image was stored.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
#### `redis_save_image`
 | 
					#### `redis_save_image`
 | 
				
			||||||
 | 
					
 | 
				
			||||||
Saves the current image frame (or cropped sub-image) to a Redis key.
 | 
					Saves the current image frame (or cropped sub-image) to a Redis key.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| Key              | Type   | Required | Description                                                                                             |
 | 
					| Key              | Type   | Required | Description                                                                                             |
 | 
				
			||||||
| ----- | ------ | -------- | ------------------------------------------------------------------------------------------------------- |
 | 
					| ---------------- | ------ | -------- | ------------------------------------------------------------------------------------------------------- |
 | 
				
			||||||
| `type`           | String | Yes      | Must be `"redis_save_image"`.                                                                           |
 | 
					| `type`           | String | Yes      | Must be `"redis_save_image"`.                                                                           |
 | 
				
			||||||
| `key` | String | Yes      | The Redis key to save the image to. Can contain placeholders like `{class}` or `{id}` to be formatted with detection results. |
 | 
					| `key`            | String | Yes      | The Redis key to save the image to. Can contain any of the dynamic placeholders.                        |
 | 
				
			||||||
 | 
					| `expire_seconds` | Number | No       | If provided, sets an expiration time (in seconds) for the Redis key.                                    |
 | 
				
			||||||
 | 
					
 | 
				
			||||||
#### `redis_publish`
 | 
					#### `redis_publish`
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -83,44 +93,38 @@ Publishes a message to a Redis channel.
 | 
				
			||||||
| --------- | ------ | -------- | ------------------------------------------------------------------------------------------------------- |
 | 
					| --------- | ------ | -------- | ------------------------------------------------------------------------------------------------------- |
 | 
				
			||||||
| `type`    | String | Yes      | Must be `"redis_publish"`.                                                                              |
 | 
					| `type`    | String | Yes      | Must be `"redis_publish"`.                                                                              |
 | 
				
			||||||
| `channel` | String | Yes      | The Redis channel to publish the message to.                                                            |
 | 
					| `channel` | String | Yes      | The Redis channel to publish the message to.                                                            |
 | 
				
			||||||
| `message` | String | Yes      | The message to publish. Can contain placeholders like `{class}` or `{id}` to be formatted with detection results. |
 | 
					| `message` | String | Yes      | The message to publish. Can contain any of the dynamic placeholders, including `{image_key}`.           |
 | 
				
			||||||
 | 
					
 | 
				
			||||||
### Example `pipeline.json` with Redis
 | 
					### Example `pipeline.json` with Redis
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					This example demonstrates a pipeline that detects vehicles, saves a uniquely named image of each detection that expires in one hour, and then publishes a notification with the image key.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
```json
 | 
					```json
 | 
				
			||||||
{
 | 
					{
 | 
				
			||||||
  "redis": {
 | 
					  "redis": {
 | 
				
			||||||
    "host": "localhost",
 | 
					    "host": "redis.local",
 | 
				
			||||||
    "port": 6379,
 | 
					    "port": 6379,
 | 
				
			||||||
    "password": "your-password"
 | 
					    "password": "your-super-secret-password"
 | 
				
			||||||
  },
 | 
					  },
 | 
				
			||||||
  "pipeline": {
 | 
					  "pipeline": {
 | 
				
			||||||
    "modelId": "vehicle-detector",
 | 
					    "modelId": "vehicle-detector",
 | 
				
			||||||
    "modelFile": "vehicle_model.pt",
 | 
					    "modelFile": "vehicle_model.pt",
 | 
				
			||||||
    "minConfidence": 0.5,
 | 
					    "minConfidence": 0.6,
 | 
				
			||||||
    "triggerClasses": ["car", "truck"],
 | 
					    "triggerClasses": ["car", "truck"],
 | 
				
			||||||
    "actions": [
 | 
					    "actions": [
 | 
				
			||||||
      {
 | 
					      {
 | 
				
			||||||
        "type": "redis_save_image",
 | 
					        "type": "redis_save_image",
 | 
				
			||||||
        "key": "detection:image:{id}"
 | 
					        "key": "detections:{class}:{timestamp_ms}:{uuid}",
 | 
				
			||||||
 | 
					        "expire_seconds": 3600
 | 
				
			||||||
      },
 | 
					      },
 | 
				
			||||||
      {
 | 
					      {
 | 
				
			||||||
        "type": "redis_publish",
 | 
					        "type": "redis_publish",
 | 
				
			||||||
        "channel": "detections",
 | 
					        "channel": "vehicle_events",
 | 
				
			||||||
        "message": "Detected a {class} with ID {id}"
 | 
					        "message": "{\"event\":\"new_detection\",\"class\":\"{class}\",\"confidence\":{confidence},\"image_key\":\"{image_key}\"}"
 | 
				
			||||||
      }
 | 
					      }
 | 
				
			||||||
    ],
 | 
					    ],
 | 
				
			||||||
    "branches": [
 | 
					 | 
				
			||||||
      {
 | 
					 | 
				
			||||||
        "modelId": "lpr-us",
 | 
					 | 
				
			||||||
        "modelFile": "lpr_model.pt",
 | 
					 | 
				
			||||||
        "minConfidence": 0.7,
 | 
					 | 
				
			||||||
        "triggerClasses": ["car"],
 | 
					 | 
				
			||||||
        "crop": true,
 | 
					 | 
				
			||||||
    "branches": []
 | 
					    "branches": []
 | 
				
			||||||
  }
 | 
					  }
 | 
				
			||||||
    ]
 | 
					 | 
				
			||||||
  }
 | 
					 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
```
 | 
					```
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -8,6 +8,8 @@ import zipfile
 | 
				
			||||||
import shutil
 | 
					import shutil
 | 
				
			||||||
import traceback
 | 
					import traceback
 | 
				
			||||||
import redis
 | 
					import redis
 | 
				
			||||||
 | 
					import time
 | 
				
			||||||
 | 
					import uuid
 | 
				
			||||||
from ultralytics import YOLO
 | 
					from ultralytics import YOLO
 | 
				
			||||||
from urllib.parse import urlparse
 | 
					from urllib.parse import urlparse
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -195,16 +197,30 @@ def execute_actions(node, frame, detection_result):
 | 
				
			||||||
    if not node["redis_client"] or not node["actions"]:
 | 
					    if not node["redis_client"] or not node["actions"]:
 | 
				
			||||||
        return
 | 
					        return
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    # Create a dynamic context for this detection event
 | 
				
			||||||
 | 
					    action_context = {
 | 
				
			||||||
 | 
					        **detection_result,
 | 
				
			||||||
 | 
					        "timestamp_ms": int(time.time() * 1000),
 | 
				
			||||||
 | 
					        "uuid": str(uuid.uuid4()),
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    for action in node["actions"]:
 | 
					    for action in node["actions"]:
 | 
				
			||||||
        try:
 | 
					        try:
 | 
				
			||||||
            if action["type"] == "redis_save_image":
 | 
					            if action["type"] == "redis_save_image":
 | 
				
			||||||
                key = action["key"].format(**detection_result)
 | 
					                key = action["key"].format(**action_context)
 | 
				
			||||||
                _, buffer = cv2.imencode('.jpg', frame)
 | 
					                _, buffer = cv2.imencode('.jpg', frame)
 | 
				
			||||||
 | 
					                expire_seconds = action.get("expire_seconds")
 | 
				
			||||||
 | 
					                if expire_seconds:
 | 
				
			||||||
 | 
					                    node["redis_client"].setex(key, expire_seconds, buffer.tobytes())
 | 
				
			||||||
 | 
					                    logger.info(f"Saved image to Redis with key: {key} (expires in {expire_seconds}s)")
 | 
				
			||||||
 | 
					                else:
 | 
				
			||||||
                    node["redis_client"].set(key, buffer.tobytes())
 | 
					                    node["redis_client"].set(key, buffer.tobytes())
 | 
				
			||||||
                    logger.info(f"Saved image to Redis with key: {key}")
 | 
					                    logger.info(f"Saved image to Redis with key: {key}")
 | 
				
			||||||
 | 
					                # Add the generated key to the context for subsequent actions
 | 
				
			||||||
 | 
					                action_context["image_key"] = key
 | 
				
			||||||
            elif action["type"] == "redis_publish":
 | 
					            elif action["type"] == "redis_publish":
 | 
				
			||||||
                channel = action["channel"]
 | 
					                channel = action["channel"]
 | 
				
			||||||
                message = action["message"].format(**detection_result)
 | 
					                message = action["message"].format(**action_context)
 | 
				
			||||||
                node["redis_client"].publish(channel, message)
 | 
					                node["redis_client"].publish(channel, message)
 | 
				
			||||||
                logger.info(f"Published message to Redis channel '{channel}': {message}")
 | 
					                logger.info(f"Published message to Redis channel '{channel}': {message}")
 | 
				
			||||||
        except Exception as e:
 | 
					        except Exception as e:
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue