import os import json import logging import torch import cv2 import requests import zipfile from ultralytics import YOLO def load_pipeline_node(node_config: dict, models_dir: str) -> dict: # Recursively load a model node from configuration. model_path = os.path.join(models_dir, node_config["modelFile"]) if not os.path.exists(model_path): logging.error(f"Model file {model_path} not found.") raise FileNotFoundError(f"Model file {model_path} not found.") logging.info(f"Loading model for node {node_config['modelId']} from {model_path}") model = YOLO(model_path) if torch.cuda.is_available(): model.to("cuda") node = { "modelId": node_config["modelId"], "modelFile": node_config["modelFile"], "triggerClasses": node_config.get("triggerClasses", []), "crop": node_config.get("crop", False), "minConfidence": node_config.get("minConfidence", None), "model": model, "branches": [] } for child in node_config.get("branches", []): node["branches"].append(load_pipeline_node(child, models_dir)) return node def load_pipeline_from_zip(zip_url: str, target_dir: str) -> dict: # Download, extract, and load a pipeline configuration from a zip (.mpta) file. os.makedirs(target_dir, exist_ok=True) zip_path = os.path.join(target_dir, "pipeline.mpta") try: response = requests.get(zip_url, stream=True) if response.status_code == 200: with open(zip_path, "wb") as f: for chunk in response.iter_content(chunk_size=8192): f.write(chunk) logging.info(f"Downloaded .mpta file from {zip_url} to {zip_path}") else: logging.error(f"Failed to download .mpta file (status {response.status_code})") return None except Exception as e: logging.error(f"Exception downloading .mpta file from {zip_url}: {e}") return None try: with zipfile.ZipFile(zip_path, "r") as zip_ref: zip_ref.extractall(target_dir) logging.info(f"Extracted .mpta file to {target_dir}") except Exception as e: logging.error(f"Failed to extract .mpta file: {e}") return None finally: if os.path.exists(zip_path): os.remove(zip_path) pipeline_json_path = os.path.join(target_dir, "pipeline.json") if not os.path.exists(pipeline_json_path): logging.error("pipeline.json not found in the .mpta file") return None try: with open(pipeline_json_path, "r") as f: pipeline_config = json.load(f) return load_pipeline_node(pipeline_config["pipeline"], target_dir) except Exception as e: logging.error(f"Error loading pipeline.json: {e}") return None def run_pipeline(frame, node: dict, return_bbox: bool = False): """ Processes the frame with the given pipeline node. When return_bbox is True, the function returns a tuple (detection, bbox) where bbox is (x1,y1,x2,y2) for drawing. Otherwise, returns only the detection. """ try: results = node["model"].track(frame, stream=False, persist=True) detection = None best_box = None max_conf = -1 for r in results: for box in r.boxes: box_cpu = box.cpu() conf = float(box_cpu.conf[0]) if conf > max_conf and hasattr(box, "id") and box.id is not None: max_conf = conf detection = { "class": node["model"].names[int(box_cpu.cls[0])], "confidence": conf, "id": box.id.item() } best_box = box_cpu bbox = None if detection and node.get("crop", False) and best_box is not None: coords = best_box.xyxy[0] x1, y1, x2, y2 = map(int, coords) h, w = frame.shape[:2] x1, y1 = max(0, x1), max(0, y1) x2, y2 = min(w, x2), min(h, y2) if x2 > x1 and y2 > y1: bbox = (x1, y1, x2, y2) frame = frame[y1:y2, x1:x2] if detection is not None: for branch in node["branches"]: if detection["class"] in branch.get("triggerClasses", []): min_conf = branch.get("minConfidence") if min_conf is not None and detection["confidence"] < min_conf: logging.debug(f"Confidence {detection['confidence']} below threshold {min_conf} for branch {branch['modelId']}.") if return_bbox: return detection, bbox return detection res = run_pipeline(frame, branch, return_bbox) if res is not None: if return_bbox: return res return res if return_bbox: return detection, bbox return detection if return_bbox: return None, None return None except Exception as e: logging.error(f"Error running pipeline on node {node.get('modelId')}: {e}") if return_bbox: return None, None return None