feat: integrate Redis support in pipeline execution; add actions for saving images and publishing messages

This commit is contained in:
Siwat Sirichai 2025-07-15 00:30:09 +07:00
parent a1f797f564
commit 769371a1a3
3 changed files with 250 additions and 5 deletions

View file

@ -7,13 +7,14 @@ import requests
import zipfile
import shutil
import traceback
import redis
from ultralytics import YOLO
from urllib.parse import urlparse
# Create a logger specifically for this module
logger = logging.getLogger("detector_worker.pympta")
def load_pipeline_node(node_config: dict, mpta_dir: str) -> dict:
def load_pipeline_node(node_config: dict, mpta_dir: str, redis_client) -> dict:
# Recursively load a model node from configuration.
model_path = os.path.join(mpta_dir, node_config["modelFile"])
if not os.path.exists(model_path):
@ -44,13 +45,15 @@ def load_pipeline_node(node_config: dict, mpta_dir: str) -> dict:
"triggerClassIndices": trigger_class_indices,
"crop": node_config.get("crop", False),
"minConfidence": node_config.get("minConfidence", None),
"actions": node_config.get("actions", []),
"model": model,
"branches": []
"branches": [],
"redis_client": redis_client
}
logger.debug(f"Configured node {node_config['modelId']} with trigger classes: {node['triggerClasses']}")
for child in node_config.get("branches", []):
logger.debug(f"Loading branch for parent node {node_config['modelId']}")
node["branches"].append(load_pipeline_node(child, mpta_dir))
node["branches"].append(load_pipeline_node(child, mpta_dir, redis_client))
return node
def load_pipeline_from_zip(zip_source: str, target_dir: str) -> dict:
@ -158,7 +161,26 @@ def load_pipeline_from_zip(zip_source: str, target_dir: str) -> dict:
pipeline_config = json.load(f)
logger.info(f"Successfully loaded pipeline configuration from {pipeline_json_path}")
logger.debug(f"Pipeline config: {json.dumps(pipeline_config, indent=2)}")
return load_pipeline_node(pipeline_config["pipeline"], mpta_dir)
# Establish Redis connection if configured
redis_client = None
if "redis" in pipeline_config:
redis_config = pipeline_config["redis"]
try:
redis_client = redis.Redis(
host=redis_config["host"],
port=redis_config["port"],
password=redis_config.get("password"),
db=redis_config.get("db", 0),
decode_responses=True
)
redis_client.ping()
logger.info(f"Successfully connected to Redis at {redis_config['host']}:{redis_config['port']}")
except redis.exceptions.ConnectionError as e:
logger.error(f"Failed to connect to Redis: {e}")
redis_client = None
return load_pipeline_node(pipeline_config["pipeline"], mpta_dir, redis_client)
except json.JSONDecodeError as e:
logger.error(f"Error parsing pipeline.json: {str(e)}", exc_info=True)
return None
@ -169,6 +191,25 @@ def load_pipeline_from_zip(zip_source: str, target_dir: str) -> dict:
logger.error(f"Error loading pipeline.json: {str(e)}", exc_info=True)
return None
def execute_actions(node, frame, detection_result):
if not node["redis_client"] or not node["actions"]:
return
for action in node["actions"]:
try:
if action["type"] == "redis_save_image":
key = action["key"].format(**detection_result)
_, buffer = cv2.imencode('.jpg', frame)
node["redis_client"].set(key, buffer.tobytes())
logger.info(f"Saved image to Redis with key: {key}")
elif action["type"] == "redis_publish":
channel = action["channel"]
message = action["message"].format(**detection_result)
node["redis_client"].publish(channel, message)
logger.info(f"Published message to Redis channel '{channel}': {message}")
except Exception as e:
logger.error(f"Error executing action {action['type']}: {e}")
def run_pipeline(frame, node: dict, return_bbox: bool=False):
"""
- For detection nodes (task != 'classify'):
@ -206,6 +247,7 @@ def run_pipeline(frame, node: dict, return_bbox: bool=False):
"confidence": top1_conf,
"id": None
}
execute_actions(node, frame, det)
return (det, None) if return_bbox else det
@ -254,9 +296,11 @@ def run_pipeline(frame, node: dict, return_bbox: bool=False):
det2, _ = run_pipeline(sub, br, return_bbox=True)
if det2:
# return classification result + original bbox
execute_actions(br, sub, det2)
return (det2, best_box) if return_bbox else det2
# ─── No branch matched → return this detection ─────────────
execute_actions(node, frame, best_det)
return (best_det, best_box) if return_bbox else best_det
except Exception as e: