diff --git a/debug.py b/debug.py new file mode 100644 index 0000000..012ccde --- /dev/null +++ b/debug.py @@ -0,0 +1,143 @@ +import argparse +import os +import cv2 +import time +import logging +import shutil +import threading # added threading +import yaml # for silencing YOLO + +from siwatsystem.pympta import load_pipeline_from_zip, run_pipeline + +# Configure logging +logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s") + +# Silence YOLO logging +os.environ["YOLO_VERBOSE"] = "False" +for logger_name in ["ultralytics", "ultralytics.hub", "ultralytics.yolo.utils"]: + logging.getLogger(logger_name).setLevel(logging.WARNING) + +# Global variables for frame sharing +global_frame = None +global_ret = False +capture_running = False + +def video_capture_loop(cap): + global global_frame, global_ret, capture_running + while capture_running: + global_ret, global_frame = cap.read() + time.sleep(0.01) # slight delay to reduce CPU usage + +def clear_cache(cache_dir: str): + if os.path.exists(cache_dir): + shutil.rmtree(cache_dir) + +def log_pipeline_flow(frame, model_tree, level=0): + """ + Wrapper around run_pipeline that logs the model flow and detection results. + Returns the same output as the original run_pipeline function. + """ + indent = " " * level + model_id = model_tree.get("modelId", "unknown") + logging.info(f"{indent}→ Running model: {model_id}") + + detection, bbox = run_pipeline(frame, model_tree, return_bbox=True) + + if detection: + confidence = detection.get("confidence", 0) * 100 + class_name = detection.get("class", "unknown") + object_id = detection.get("id", "N/A") + + logging.info(f"{indent}✓ Detected: {class_name} (ID: {object_id}, confidence: {confidence:.1f}%)") + + # Check if any branches were triggered + triggered = False + for branch in model_tree.get("branches", []): + trigger_classes = branch.get("triggerClasses", []) + min_conf = branch.get("minConfidence", 0) + + if class_name in trigger_classes and detection.get("confidence", 0) >= min_conf: + triggered = True + if branch.get("crop", False) and bbox: + x1, y1, x2, y2 = bbox + cropped_frame = frame[y1:y2, x1:x2] + logging.info(f"{indent} ⌊ Triggering branch with cropped region {x1},{y1} to {x2},{y2}") + branch_result = log_pipeline_flow(cropped_frame, branch, level + 1) + else: + logging.info(f"{indent} ⌊ Triggering branch with full frame") + branch_result = log_pipeline_flow(frame, branch, level + 1) + + if branch_result[0]: # If branch detection successful, return it + return branch_result + + if not triggered and model_tree.get("branches"): + logging.info(f"{indent} ⌊ No branches triggered") + else: + logging.info(f"{indent}✗ No detection for {model_id}") + + return detection, bbox + +def main(mpta_file: str, video_source: str): + global capture_running + CACHE_DIR = os.path.join(".", ".mptacache") + clear_cache(CACHE_DIR) + logging.info(f"Loading pipeline from local file: {mpta_file}") + model_tree = load_pipeline_from_zip(mpta_file, CACHE_DIR) + if model_tree is None: + logging.error("Failed to load pipeline.") + return + + cap = cv2.VideoCapture(video_source) + if not cap.isOpened(): + logging.error(f"Cannot open video source {video_source}") + return + + # Start video capture in a separate thread + capture_running = True + capture_thread = threading.Thread(target=video_capture_loop, args=(cap,)) + capture_thread.start() + + logging.info("Press 'q' to exit.") + try: + while True: + # Use the global frame and ret updated by the thread + if not global_ret or global_frame is None: + continue # wait until a frame is available + + frame = global_frame.copy() # local copy to work with + + # Replace run_pipeline with our logging version + detection, bbox = log_pipeline_flow(frame, model_tree) + + # Stop if "honda" is detected + if detection and detection.get("class", "").lower() == "toyota": + logging.info("Detected 'toyota'. Stopping pipeline.") + break + + if bbox: + x1, y1, x2, y2 = bbox + cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2) + label = detection["class"] if detection else "Detection" + cv2.putText(frame, label, (x1, y1 - 10), + cv2.FONT_HERSHEY_SIMPLEX, 0.9, (36, 255, 12), 2) + + cv2.imshow("Pipeline Webcam", frame) + if cv2.waitKey(1) & 0xFF == ord('q'): + break + + finally: + # Stop capture thread and cleanup + capture_running = False + capture_thread.join() + cap.release() + cv2.destroyAllWindows() + clear_cache(CACHE_DIR) + logging.info("Cleaned up .mptacache directory on shutdown.") + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Run pipeline webcam utility.") + parser.add_argument("--mpta-file", type=str, required=True, help="Path to the local pipeline mpta (ZIP) file.") + parser.add_argument("--video", type=str, default="0", help="Video source (default webcam index 0).") + args = parser.parse_args() + video_source = int(args.video) if args.video.isdigit() else args.video + main(args.mpta_file, video_source) diff --git a/demoa.mpta b/demoa.mpta new file mode 100644 index 0000000..7471d5d Binary files /dev/null and b/demoa.mpta differ diff --git a/pipeline.log b/pipeline.log new file mode 100644 index 0000000..d3a14c7 --- /dev/null +++ b/pipeline.log @@ -0,0 +1,23 @@ +2025-05-12 18:10:04,590 [INFO] Loading pipeline from local file: demoa.mpta +2025-05-12 18:10:04,610 [INFO] Copied local .mpta file from demoa.mpta to .\.mptacache\pipeline.mpta +2025-05-12 18:10:04,901 [INFO] Extracted .mpta file to .\.mptacache +2025-05-12 18:10:04,905 [INFO] Loading model for node DetectionDraft from .\.mptacache\demoa\DetectionDraft.pt +2025-05-12 18:10:05,083 [INFO] Loading model for node ClassificationDraft from .\.mptacache\demoa\ClassificationDraft.pt +2025-05-12 18:10:08,035 [INFO] Press 'q' to exit. +2025-05-12 18:10:12,217 [INFO] Cleaned up .mptacache directory on shutdown. +2025-05-12 18:13:08,465 [INFO] Loading pipeline from local file: demoa.mpta +2025-05-12 18:13:08,512 [INFO] Copied local .mpta file from demoa.mpta to .\.mptacache\pipeline.mpta +2025-05-12 18:13:08,769 [INFO] Extracted .mpta file to .\.mptacache +2025-05-12 18:13:08,773 [INFO] Loading model for node DetectionDraft from .\.mptacache\demoa\DetectionDraft.pt +2025-05-12 18:13:09,083 [INFO] Loading model for node ClassificationDraft from .\.mptacache\demoa\ClassificationDraft.pt +2025-05-12 18:13:12,187 [INFO] Press 'q' to exit. +2025-05-12 18:13:14,146 [INFO] → Running model: DetectionDraft +2025-05-12 18:13:17,119 [INFO] Cleaned up .mptacache directory on shutdown. +2025-05-12 18:14:25,665 [INFO] Loading pipeline from local file: demoa.mpta +2025-05-12 18:14:25,687 [INFO] Copied local .mpta file from demoa.mpta to .\.mptacache\pipeline.mpta +2025-05-12 18:14:25,953 [INFO] Extracted .mpta file to .\.mptacache +2025-05-12 18:14:25,957 [INFO] Loading model for node DetectionDraft from .\.mptacache\demoa\DetectionDraft.pt +2025-05-12 18:14:26,138 [INFO] Loading model for node ClassificationDraft from .\.mptacache\demoa\ClassificationDraft.pt +2025-05-12 18:14:29,171 [INFO] Press 'q' to exit. +2025-05-12 18:14:30,146 [INFO] → Running model: DetectionDraft +2025-05-12 18:14:32,080 [INFO] Cleaned up .mptacache directory on shutdown. diff --git a/siwatsystem/pympta.py b/siwatsystem/pympta.py index 05a566d..fc58f3b 100644 --- a/siwatsystem/pympta.py +++ b/siwatsystem/pympta.py @@ -3,172 +3,180 @@ import json import logging import torch import cv2 -import requests import zipfile import shutil from ultralytics import YOLO from urllib.parse import urlparse def load_pipeline_node(node_config: dict, mpta_dir: str) -> dict: - # Recursively load a model node from configuration. model_path = os.path.join(mpta_dir, node_config["modelFile"]) if not os.path.exists(model_path): logging.error(f"Model file {model_path} not found.") raise FileNotFoundError(f"Model file {model_path} not found.") - logging.info(f"Loading model for node {node_config['modelId']} from {model_path}") + logging.info(f"Loading model {node_config['modelId']} from {model_path}") model = YOLO(model_path) if torch.cuda.is_available(): model.to("cuda") - node = { + + # map triggerClasses names → indices for YOLO + names = model.names # idx -> class name + trigger_names = node_config.get("triggerClasses", []) + trigger_inds = [i for i, nm in names.items() if nm in trigger_names] + + return { "modelId": node_config["modelId"], "modelFile": node_config["modelFile"], - "triggerClasses": node_config.get("triggerClasses", []), + "triggerClasses": trigger_names, + "triggerClassIndices": trigger_inds, "crop": node_config.get("crop", False), - "minConfidence": node_config.get("minConfidence", None), + "minConfidence": node_config.get("minConfidence", 0.0), "model": model, - "branches": [] + "branches": [ + load_pipeline_node(child, mpta_dir) + for child in node_config.get("branches", []) + ] } - for child in node_config.get("branches", []): - node["branches"].append(load_pipeline_node(child, mpta_dir)) - return node def load_pipeline_from_zip(zip_source: str, target_dir: str) -> dict: os.makedirs(target_dir, exist_ok=True) zip_path = os.path.join(target_dir, "pipeline.mpta") - - # Parse the source; only local files are supported here. parsed = urlparse(zip_source) if parsed.scheme in ("", "file"): - local_path = parsed.path if parsed.scheme == "file" else zip_source - if os.path.exists(local_path): - try: - shutil.copy(local_path, zip_path) - logging.info(f"Copied local .mpta file from {local_path} to {zip_path}") - except Exception as e: - logging.error(f"Failed to copy local .mpta file from {local_path}: {e}") - return None - else: - logging.error(f"Local file {local_path} does not exist.") + local = parsed.path if parsed.scheme == "file" else zip_source + if not os.path.exists(local): + logging.error(f"Local file {local} does not exist.") return None + shutil.copy(local, zip_path) else: - logging.error("HTTP download functionality has been moved. Use a local file path here.") + logging.error("HTTP download not supported; use local file.") return None - try: - with zipfile.ZipFile(zip_path, "r") as zip_ref: - zip_ref.extractall(target_dir) - logging.info(f"Extracted .mpta file to {target_dir}") - except Exception as e: - logging.error(f"Failed to extract .mpta file: {e}") - return None - finally: - if os.path.exists(zip_path): - os.remove(zip_path) - pipeline_name = os.path.basename(zip_source) - pipeline_name = os.path.splitext(pipeline_name)[0] - mpta_dir = os.path.join(target_dir, pipeline_name) - pipeline_json_path = os.path.join(mpta_dir, "pipeline.json") - if not os.path.exists(pipeline_json_path): - logging.error("pipeline.json not found in the .mpta file") + with zipfile.ZipFile(zip_path, "r") as z: + z.extractall(target_dir) + os.remove(zip_path) + + base = os.path.splitext(os.path.basename(zip_source))[0] + mpta_dir = os.path.join(target_dir, base) + cfg = os.path.join(mpta_dir, "pipeline.json") + if not os.path.exists(cfg): + logging.error("pipeline.json not found in archive.") return None - try: - with open(pipeline_json_path, "r") as f: - pipeline_config = json.load(f) - return load_pipeline_node(pipeline_config["pipeline"], mpta_dir) - except Exception as e: - logging.error(f"Error loading pipeline.json: {e}") - return None + with open(cfg) as f: + pipeline_config = json.load(f) + return load_pipeline_node(pipeline_config["pipeline"], mpta_dir) -def run_pipeline(frame, node: dict, return_bbox: bool = False): + +def run_pipeline(frame, node: dict, return_bbox: bool=False): """ - Processes the frame with the given pipeline node. When return_bbox is True, - the function returns a tuple (detection, bbox) where bbox is (x1,y1,x2,y2) - for drawing. Otherwise, returns only the detection. + - For detection nodes (task != 'classify'): + • runs `track(..., classes=triggerClassIndices)` + • picks top box ≥ minConfidence + • optionally crops & resizes → recurse into child + • else returns (det_dict, bbox) + - For classify nodes: + • runs `predict()` + • returns top (class,confidence) and no bbox """ try: - # Check model type and use appropriate method - model_task = getattr(node["model"], "task", None) + task = getattr(node["model"], "task", None) + + # ─── Classification stage ─────────────────────────────────── + # if task == "classify": + # results = node["model"].predict(frame, stream=False) + # dets = [] + # for r in results: + # probs = r.probs + # if probs is not None: + # # sort descending + # idxs = probs.argsort(descending=True) + # for cid in idxs: + # dets.append({ + # "class": node["model"].names[int(cid)], + # "confidence": float(probs[int(cid)]), + # "id": None + # }) + # if not dets: + # return (None, None) if return_bbox else None + + # best = dets[0] + # return (best, None) if return_bbox else best - if model_task == "classify": - # Classification models need to use predict() instead of track() - logging.debug(f"Running classification model: {node.get('modelId')}") + if task == "classify": + # run the classifier and grab its top-1 directly via the Probs API results = node["model"].predict(frame, stream=False) - detection = None - best_box = None - - # Process classification results - for r in results: - probs = r.probs - if probs is not None and len(probs) > 0: - # Get the most confident class - class_id = int(probs.top1) - conf = float(probs.top1conf) - detection = { - "class": node["model"].names[class_id], - "confidence": conf, - "id": None # Classification doesn't have tracking IDs - } - - # Classification doesn't produce bounding boxes - bbox = None - - else: - # Detection/segmentation models use tracking - logging.debug(f"Running detection/tracking model: {node.get('modelId')}") - results = node["model"].track(frame, stream=False, persist=True) - detection = None - best_box = None - max_conf = -1 + # nothing returned? + if not results: + return (None, None) if return_bbox else None - for r in results: - for box in r.boxes: - box_cpu = box.cpu() - conf = float(box_cpu.conf[0]) - if conf > max_conf and hasattr(box, "id") and box.id is not None: - max_conf = conf - detection = { - "class": node["model"].names[int(box_cpu.cls[0])], - "confidence": conf, - "id": box.id.item() - } - best_box = box_cpu + # take the first result's probs object + r = results[0] + probs = r.probs + if probs is None: + return (None, None) if return_bbox else None - bbox = None - # Calculate bbox if best_box exists - if detection and best_box is not None: - coords = best_box.xyxy[0] - x1, y1, x2, y2 = map(int, coords) - h, w = frame.shape[:2] - x1, y1 = max(0, x1), max(0, y1) - x2, y2 = min(w, x2), min(h, y2) - if x2 > x1 and y2 > y1: - bbox = (x1, y1, x2, y2) - if node.get("crop", False): - frame = frame[y1:y2, x1:x2] + # get the top-1 class index and its confidence + top1_idx = int(probs.top1) + top1_conf = float(probs.top1conf) + + det = { + "class": node["model"].names[top1_idx], + "confidence": top1_conf, + "id": None + } + return (det, None) if return_bbox else det + + + # ─── Detection stage ──────────────────────────────────────── + # only look for your triggerClasses + tk = node["triggerClassIndices"] + res = node["model"].track( + frame, + stream=False, + persist=True, + **({"classes": tk} if tk else {}) + )[0] + + dets, boxes = [], [] + for box in res.boxes: + conf = float(box.cpu().conf[0]) + cid = int(box.cpu().cls[0]) + name = node["model"].names[cid] + if conf < node["minConfidence"]: + continue + xy = box.cpu().xyxy[0] + x1,y1,x2,y2 = map(int, xy) + dets.append({"class": name, "confidence": conf, + "id": box.id.item() if hasattr(box, "id") else None}) + boxes.append((x1, y1, x2, y2)) + + if not dets: + return (None, None) if return_bbox else None + + # take highest‐confidence + best_idx = max(range(len(dets)), key=lambda i: dets[i]["confidence"]) + best_det = dets[best_idx] + best_box = boxes[best_idx] + + # ─── Branch (classification) ─────────────────────────────── + for br in node["branches"]: + if (best_det["class"] in br["triggerClasses"] + and best_det["confidence"] >= br["minConfidence"]): + # crop if requested + sub = frame + if br["crop"]: + x1,y1,x2,y2 = best_box + sub = frame[y1:y2, x1:x2] + sub = cv2.resize(sub, (224, 224)) + + det2, _ = run_pipeline(sub, br, return_bbox=True) + if det2: + # return classification result + original bbox + return (det2, best_box) if return_bbox else det2 + + # ─── No branch matched → return this detection ───────────── + return (best_det, best_box) if return_bbox else best_det - if detection is not None: - for branch in node["branches"]: - if detection["class"] in branch.get("triggerClasses", []): - min_conf = branch.get("minConfidence") - if min_conf is not None and detection["confidence"] < min_conf: - logging.debug(f"Confidence {detection['confidence']} below threshold {min_conf} for branch {branch['modelId']}.") - if return_bbox: - return detection, bbox - return detection - res = run_pipeline(frame, branch, return_bbox) - if res is not None: - if return_bbox: - return res - return res - if return_bbox: - return detection, bbox - return detection - if return_bbox: - return None, None - return None except Exception as e: - logging.error(f"Error running pipeline on node {node.get('modelId')}: {e}") - if return_bbox: - return None, None - return None + logging.error(f"Error in node {node.get('modelId')}: {e}") + return (None, None) if return_bbox else None