import argparse import os import cv2 import time import logging import shutil import threading # added threading import yaml # for silencing YOLO from siwatsystem.pympta import load_pipeline_from_zip, run_pipeline # Configure logging logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s") # Silence YOLO logging os.environ["YOLO_VERBOSE"] = "False" for logger_name in ["ultralytics", "ultralytics.hub", "ultralytics.yolo.utils"]: logging.getLogger(logger_name).setLevel(logging.WARNING) # Global variables for frame sharing global_frame = None global_ret = False capture_running = False def video_capture_loop(cap): global global_frame, global_ret, capture_running while capture_running: global_ret, global_frame = cap.read() time.sleep(0.01) # slight delay to reduce CPU usage def clear_cache(cache_dir: str): if os.path.exists(cache_dir): shutil.rmtree(cache_dir) def log_pipeline_flow(frame, model_tree, level=0): """ Wrapper around run_pipeline that logs the model flow and detection results. Returns the same output as the original run_pipeline function. """ indent = " " * level model_id = model_tree.get("modelId", "unknown") logging.info(f"{indent}→ Running model: {model_id}") detection, bbox = run_pipeline(frame, model_tree, return_bbox=True) if detection: confidence = detection.get("confidence", 0) * 100 class_name = detection.get("class", "unknown") object_id = detection.get("id", "N/A") logging.info(f"{indent}✓ Detected: {class_name} (ID: {object_id}, confidence: {confidence:.1f}%)") # Check if any branches were triggered triggered = False for branch in model_tree.get("branches", []): trigger_classes = branch.get("triggerClasses", []) min_conf = branch.get("minConfidence", 0) if class_name in trigger_classes and detection.get("confidence", 0) >= min_conf: triggered = True if branch.get("crop", False) and bbox: x1, y1, x2, y2 = bbox cropped_frame = frame[y1:y2, x1:x2] logging.info(f"{indent} ⌊ Triggering branch with cropped region {x1},{y1} to {x2},{y2}") branch_result = log_pipeline_flow(cropped_frame, branch, level + 1) else: logging.info(f"{indent} ⌊ Triggering branch with full frame") branch_result = log_pipeline_flow(frame, branch, level + 1) if branch_result[0]: # If branch detection successful, return it return branch_result if not triggered and model_tree.get("branches"): logging.info(f"{indent} ⌊ No branches triggered") else: logging.info(f"{indent}✗ No detection for {model_id}") return detection, bbox def main(mpta_file: str, video_source: str): global capture_running CACHE_DIR = os.path.join(".", ".mptacache") clear_cache(CACHE_DIR) logging.info(f"Loading pipeline from local file: {mpta_file}") model_tree = load_pipeline_from_zip(mpta_file, CACHE_DIR) if model_tree is None: logging.error("Failed to load pipeline.") return cap = cv2.VideoCapture(video_source) if not cap.isOpened(): logging.error(f"Cannot open video source {video_source}") return # Start video capture in a separate thread capture_running = True capture_thread = threading.Thread(target=video_capture_loop, args=(cap,)) capture_thread.start() logging.info("Press 'q' to exit.") try: while True: # Use the global frame and ret updated by the thread if not global_ret or global_frame is None: continue # wait until a frame is available frame = global_frame.copy() # local copy to work with # Replace run_pipeline with our logging version detection, bbox = log_pipeline_flow(frame, model_tree) # Stop if "honda" is detected if detection and detection.get("class", "").lower() == "toyota": logging.info("Detected 'toyota'. Stopping pipeline.") break if bbox: x1, y1, x2, y2 = bbox cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2) label = detection["class"] if detection else "Detection" cv2.putText(frame, label, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (36, 255, 12), 2) cv2.imshow("Pipeline Webcam", frame) if cv2.waitKey(1) & 0xFF == ord('q'): break finally: # Stop capture thread and cleanup capture_running = False capture_thread.join() cap.release() cv2.destroyAllWindows() clear_cache(CACHE_DIR) logging.info("Cleaned up .mptacache directory on shutdown.") if __name__ == "__main__": parser = argparse.ArgumentParser(description="Run pipeline webcam utility.") parser.add_argument("--mpta-file", type=str, required=True, help="Path to the local pipeline mpta (ZIP) file.") parser.add_argument("--video", type=str, default="0", help="Video source (default webcam index 0).") args = parser.parse_args() video_source = int(args.video) if args.video.isdigit() else args.video main(args.mpta_file, video_source)