openable webcam pipeline

This commit is contained in:
Siwat Sirichai 2025-02-23 20:58:48 +07:00
parent ee0071284e
commit e1da9a6dca
4 changed files with 101 additions and 50 deletions

61
pipeline_webcam.py Normal file → Executable file
View file

@ -1,17 +1,26 @@
#!/usr/bin/python3
import argparse
import os
import cv2
import time
import logging
import shutil
from siwatsystem.pympta import load_pipeline_from_zip, run_pipeline
logging.basicConfig(level=logging.DEBUG, format="%(asctime)s [%(levelname)s] %(message)s")
def main(mpta_url: str, video_source: str):
extraction_dir = os.path.join("models", "webcam_pipeline")
logging.info(f"Loading pipeline from {mpta_url}")
model_tree = load_pipeline_from_zip(mpta_url, extraction_dir)
def clear_cache(cache_dir: str):
if os.path.exists(cache_dir):
shutil.rmtree(cache_dir)
os.makedirs(cache_dir, exist_ok=True)
def main(mpta_file: str, video_source: str):
CACHE_DIR = os.path.join(".", ".mptacache")
clear_cache(CACHE_DIR)
logging.info(f"Loading pipeline from local file: {mpta_file}")
model_tree = load_pipeline_from_zip(mpta_file, CACHE_DIR)
if model_tree is None:
logging.error("Failed to load pipeline.")
return
@ -22,31 +31,35 @@ def main(mpta_url: str, video_source: str):
return
logging.info("Press 'q' to exit.")
while True:
ret, frame = cap.read()
if not ret:
logging.error("Failed to capture frame.")
break
try:
while True:
ret, frame = cap.read()
if not ret:
logging.error("Failed to capture frame.")
break
detection, bbox = run_pipeline(frame, model_tree, return_bbox=True)
if bbox:
x1, y1, x2, y2 = bbox
cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
label = detection["class"] if detection else "Detection"
cv2.putText(frame, label, (x1, y1 - 10),
cv2.FONT_HERSHEY_SIMPLEX, 0.9, (36, 255, 12), 2)
detection, bbox = run_pipeline(frame, model_tree, return_bbox=True)
if bbox:
x1, y1, x2, y2 = bbox
cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
label = detection["class"] if detection else "Detection"
cv2.putText(frame, label, (x1, y1 - 10),
cv2.FONT_HERSHEY_SIMPLEX, 0.9, (36, 255, 12), 2)
cv2.imshow("Pipeline Webcam", frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
cap.release()
cv2.destroyAllWindows()
cv2.imshow("Pipeline Webcam", frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
finally:
cap.release()
cv2.destroyAllWindows()
# Clear cache on shutdown
clear_cache(CACHE_DIR)
logging.info("Cleaned up .mptacache directory on shutdown.")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Run pipeline webcam utility.")
parser.add_argument("--mpta-url", type=str, required=True, help="URL to the pipeline mpta (ZIP) file.")
parser.add_argument("--mpta-file", type=str, required=True, help="Path to the local pipeline mpta (ZIP) file.")
parser.add_argument("--video", type=str, default="0", help="Video source (default webcam index 0).")
args = parser.parse_args()
video_source = int(args.video) if args.video.isdigit() else args.video
main(args.mpta_url, video_source)
main(args.mpta_file, video_source)