137 lines
5.2 KiB
Python
Executable file
137 lines
5.2 KiB
Python
Executable file
import argparse
|
|
import os
|
|
import cv2
|
|
import time
|
|
import logging
|
|
import shutil
|
|
import threading # added threading
|
|
import yaml # for silencing YOLO
|
|
|
|
from siwatsystem.pympta import load_pipeline_from_zip, run_pipeline
|
|
|
|
# Configure logging
|
|
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
|
|
|
|
# Silence YOLO logging
|
|
os.environ["YOLO_VERBOSE"] = "False"
|
|
for logger_name in ["ultralytics", "ultralytics.hub", "ultralytics.yolo.utils"]:
|
|
logging.getLogger(logger_name).setLevel(logging.WARNING)
|
|
|
|
# Global variables for frame sharing
|
|
global_frame = None
|
|
global_ret = False
|
|
capture_running = False
|
|
|
|
def video_capture_loop(cap):
|
|
global global_frame, global_ret, capture_running
|
|
while capture_running:
|
|
global_ret, global_frame = cap.read()
|
|
time.sleep(0.01) # slight delay to reduce CPU usage
|
|
|
|
def clear_cache(cache_dir: str):
|
|
if os.path.exists(cache_dir):
|
|
shutil.rmtree(cache_dir)
|
|
|
|
def log_pipeline_flow(frame, model_tree, level=0):
|
|
"""
|
|
Wrapper around run_pipeline that logs the model flow and detection results.
|
|
Returns the same output as the original run_pipeline function.
|
|
"""
|
|
indent = " " * level
|
|
model_id = model_tree.get("modelId", "unknown")
|
|
logging.info(f"{indent}→ Running model: {model_id}")
|
|
|
|
detection, bbox = run_pipeline(frame, model_tree, return_bbox=True)
|
|
|
|
if detection:
|
|
confidence = detection.get("confidence", 0) * 100
|
|
class_name = detection.get("class", "unknown")
|
|
object_id = detection.get("id", "N/A")
|
|
|
|
logging.info(f"{indent}✓ Detected: {class_name} (ID: {object_id}, confidence: {confidence:.1f}%)")
|
|
|
|
# Check if any branches were triggered
|
|
triggered = False
|
|
for branch in model_tree.get("branches", []):
|
|
trigger_classes = branch.get("triggerClasses", [])
|
|
min_conf = branch.get("minConfidence", 0)
|
|
|
|
if class_name in trigger_classes and detection.get("confidence", 0) >= min_conf:
|
|
triggered = True
|
|
if branch.get("crop", False) and bbox:
|
|
x1, y1, x2, y2 = bbox
|
|
cropped_frame = frame[y1:y2, x1:x2]
|
|
logging.info(f"{indent} ⌊ Triggering branch with cropped region {x1},{y1} to {x2},{y2}")
|
|
branch_result = log_pipeline_flow(cropped_frame, branch, level + 1)
|
|
else:
|
|
logging.info(f"{indent} ⌊ Triggering branch with full frame")
|
|
branch_result = log_pipeline_flow(frame, branch, level + 1)
|
|
|
|
if branch_result[0]: # If branch detection successful, return it
|
|
return branch_result
|
|
|
|
if not triggered and model_tree.get("branches"):
|
|
logging.info(f"{indent} ⌊ No branches triggered")
|
|
else:
|
|
logging.info(f"{indent}✗ No detection for {model_id}")
|
|
|
|
return detection, bbox
|
|
|
|
def main(mpta_file: str, video_source: str):
|
|
global capture_running
|
|
CACHE_DIR = os.path.join(".", ".mptacache")
|
|
clear_cache(CACHE_DIR)
|
|
logging.info(f"Loading pipeline from local file: {mpta_file}")
|
|
model_tree = load_pipeline_from_zip(mpta_file, CACHE_DIR)
|
|
if model_tree is None:
|
|
logging.error("Failed to load pipeline.")
|
|
return
|
|
|
|
cap = cv2.VideoCapture(video_source)
|
|
if not cap.isOpened():
|
|
logging.error(f"Cannot open video source {video_source}")
|
|
return
|
|
|
|
# Start video capture in a separate thread
|
|
capture_running = True
|
|
capture_thread = threading.Thread(target=video_capture_loop, args=(cap,))
|
|
capture_thread.start()
|
|
|
|
logging.info("Press 'q' to exit.")
|
|
try:
|
|
while True:
|
|
# Use the global frame and ret updated by the thread
|
|
if not global_ret or global_frame is None:
|
|
continue # wait until a frame is available
|
|
|
|
frame = global_frame.copy() # local copy to work with
|
|
|
|
# Replace run_pipeline with our logging version
|
|
detection, bbox = log_pipeline_flow(frame, model_tree)
|
|
|
|
if bbox:
|
|
x1, y1, x2, y2 = bbox
|
|
cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
|
|
label = detection["class"] if detection else "Detection"
|
|
cv2.putText(frame, label, (x1, y1 - 10),
|
|
cv2.FONT_HERSHEY_SIMPLEX, 0.9, (36, 255, 12), 2)
|
|
|
|
cv2.imshow("Pipeline Webcam", frame)
|
|
if cv2.waitKey(1) & 0xFF == ord('q'):
|
|
break
|
|
finally:
|
|
# Stop capture thread and cleanup
|
|
capture_running = False
|
|
capture_thread.join()
|
|
cap.release()
|
|
cv2.destroyAllWindows()
|
|
clear_cache(CACHE_DIR)
|
|
logging.info("Cleaned up .mptacache directory on shutdown.")
|
|
|
|
if __name__ == "__main__":
|
|
parser = argparse.ArgumentParser(description="Run pipeline webcam utility.")
|
|
parser.add_argument("--mpta-file", type=str, required=True, help="Path to the local pipeline mpta (ZIP) file.")
|
|
parser.add_argument("--video", type=str, default="0", help="Video source (default webcam index 0).")
|
|
args = parser.parse_args()
|
|
video_source = int(args.video) if args.video.isdigit() else args.video
|
|
main(args.mpta_file, video_source)
|