From e1da9a6dcacfbfbe96ad97eee949feb7aecedfe0 Mon Sep 17 00:00:00 2001 From: Siwat Sirichai Date: Sun, 23 Feb 2025 20:58:48 +0700 Subject: [PATCH] openable webcam pipeline --- .gitignore | 10 ++++--- app.py | 36 ++++++++++++++++++++++--- pipeline_webcam.py | 61 ++++++++++++++++++++++++++----------------- siwatsystem/pympta.py | 44 ++++++++++++++++++------------- 4 files changed, 101 insertions(+), 50 deletions(-) mode change 100644 => 100755 pipeline_webcam.py diff --git a/.gitignore b/.gitignore index 74a4d74..e5fcf20 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,7 @@ - -/__pycache__ -models +/models app.log -*.pt \ No newline at end of file +*.pt + +# All pycache directories +__pycache__/ +.mptacache \ No newline at end of file diff --git a/app.py b/app.py index ce80974..cbf6186 100644 --- a/app.py +++ b/app.py @@ -60,6 +60,24 @@ WORKER_TIMEOUT_MS = 10000 streams_lock = threading.Lock() models_lock = threading.Lock() +# Add helper to download mpta ZIP file from a remote URL +def download_mpta(url: str, dest_path: str) -> str: + try: + os.makedirs(os.path.dirname(dest_path), exist_ok=True) + response = requests.get(url, stream=True) + if response.status_code == 200: + with open(dest_path, "wb") as f: + for chunk in response.iter_content(chunk_size=8192): + f.write(chunk) + logging.info(f"Downloaded mpta file from {url} to {dest_path}") + return dest_path + else: + logging.error(f"Failed to download mpta file (status code {response.status_code})") + return None + except Exception as e: + logging.error(f"Exception downloading mpta file from {url}: {e}") + return None + #################################################### # Detection and frame processing functions #################################################### @@ -212,7 +230,7 @@ async def detect(websocket: WebSocket): payload = data.get("payload", {}) camera_id = payload.get("cameraIdentifier") rtsp_url = payload.get("rtspUrl") - model_url = payload.get("modelUrl") # ZIP file URL + model_url = payload.get("modelUrl") # may be remote or local modelId = payload.get("modelId") modelName = payload.get("modelName") @@ -221,12 +239,22 @@ async def detect(websocket: WebSocket): if camera_id not in models: models[camera_id] = {} if modelId not in models[camera_id]: - logging.info(f"Downloading model from {model_url}") + logging.info(f"Loading model from {model_url}") extraction_dir = os.path.join("models", camera_id, str(modelId)) os.makedirs(extraction_dir, exist_ok=True) - model_tree = load_pipeline_from_zip(model_url, extraction_dir) + # If model_url is remote, download it first. + parsed = urlparse(model_url) + if parsed.scheme in ("http", "https"): + local_mpta = os.path.join(extraction_dir, os.path.basename(parsed.path)) + local_path = download_mpta(model_url, local_mpta) + if not local_path: + logging.error("Failed to download the remote mpta file.") + continue + model_tree = load_pipeline_from_zip(local_path, extraction_dir) + else: + model_tree = load_pipeline_from_zip(model_url, extraction_dir) if model_tree is None: - logging.error("Failed to load model from ZIP file.") + logging.error("Failed to load model from mpta file.") continue models[camera_id][modelId] = model_tree logging.info(f"Loaded model {modelId} for camera {camera_id}") diff --git a/pipeline_webcam.py b/pipeline_webcam.py old mode 100644 new mode 100755 index d13af2a..8c1ca3e --- a/pipeline_webcam.py +++ b/pipeline_webcam.py @@ -1,17 +1,26 @@ +#!/usr/bin/python3 + import argparse import os import cv2 import time import logging +import shutil from siwatsystem.pympta import load_pipeline_from_zip, run_pipeline logging.basicConfig(level=logging.DEBUG, format="%(asctime)s [%(levelname)s] %(message)s") -def main(mpta_url: str, video_source: str): - extraction_dir = os.path.join("models", "webcam_pipeline") - logging.info(f"Loading pipeline from {mpta_url}") - model_tree = load_pipeline_from_zip(mpta_url, extraction_dir) +def clear_cache(cache_dir: str): + if os.path.exists(cache_dir): + shutil.rmtree(cache_dir) + os.makedirs(cache_dir, exist_ok=True) + +def main(mpta_file: str, video_source: str): + CACHE_DIR = os.path.join(".", ".mptacache") + clear_cache(CACHE_DIR) + logging.info(f"Loading pipeline from local file: {mpta_file}") + model_tree = load_pipeline_from_zip(mpta_file, CACHE_DIR) if model_tree is None: logging.error("Failed to load pipeline.") return @@ -22,31 +31,35 @@ def main(mpta_url: str, video_source: str): return logging.info("Press 'q' to exit.") - while True: - ret, frame = cap.read() - if not ret: - logging.error("Failed to capture frame.") - break + try: + while True: + ret, frame = cap.read() + if not ret: + logging.error("Failed to capture frame.") + break - detection, bbox = run_pipeline(frame, model_tree, return_bbox=True) - if bbox: - x1, y1, x2, y2 = bbox - cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2) - label = detection["class"] if detection else "Detection" - cv2.putText(frame, label, (x1, y1 - 10), - cv2.FONT_HERSHEY_SIMPLEX, 0.9, (36, 255, 12), 2) + detection, bbox = run_pipeline(frame, model_tree, return_bbox=True) + if bbox: + x1, y1, x2, y2 = bbox + cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2) + label = detection["class"] if detection else "Detection" + cv2.putText(frame, label, (x1, y1 - 10), + cv2.FONT_HERSHEY_SIMPLEX, 0.9, (36, 255, 12), 2) - cv2.imshow("Pipeline Webcam", frame) - if cv2.waitKey(1) & 0xFF == ord('q'): - break - - cap.release() - cv2.destroyAllWindows() + cv2.imshow("Pipeline Webcam", frame) + if cv2.waitKey(1) & 0xFF == ord('q'): + break + finally: + cap.release() + cv2.destroyAllWindows() + # Clear cache on shutdown + clear_cache(CACHE_DIR) + logging.info("Cleaned up .mptacache directory on shutdown.") if __name__ == "__main__": parser = argparse.ArgumentParser(description="Run pipeline webcam utility.") - parser.add_argument("--mpta-url", type=str, required=True, help="URL to the pipeline mpta (ZIP) file.") + parser.add_argument("--mpta-file", type=str, required=True, help="Path to the local pipeline mpta (ZIP) file.") parser.add_argument("--video", type=str, default="0", help="Video source (default webcam index 0).") args = parser.parse_args() video_source = int(args.video) if args.video.isdigit() else args.video - main(args.mpta_url, video_source) + main(args.mpta_file, video_source) diff --git a/siwatsystem/pympta.py b/siwatsystem/pympta.py index b5f05d1..20b2770 100644 --- a/siwatsystem/pympta.py +++ b/siwatsystem/pympta.py @@ -5,11 +5,13 @@ import torch import cv2 import requests import zipfile +import shutil from ultralytics import YOLO +from urllib.parse import urlparse -def load_pipeline_node(node_config: dict, models_dir: str) -> dict: +def load_pipeline_node(node_config: dict, mpta_dir: str) -> dict: # Recursively load a model node from configuration. - model_path = os.path.join(models_dir, node_config["modelFile"]) + model_path = os.path.join(mpta_dir, node_config["modelFile"]) if not os.path.exists(model_path): logging.error(f"Model file {model_path} not found.") raise FileNotFoundError(f"Model file {model_path} not found.") @@ -27,25 +29,29 @@ def load_pipeline_node(node_config: dict, models_dir: str) -> dict: "branches": [] } for child in node_config.get("branches", []): - node["branches"].append(load_pipeline_node(child, models_dir)) + node["branches"].append(load_pipeline_node(child, mpta_dir)) return node -def load_pipeline_from_zip(zip_url: str, target_dir: str) -> dict: - # Download, extract, and load a pipeline configuration from a zip (.mpta) file. +def load_pipeline_from_zip(zip_source: str, target_dir: str) -> dict: os.makedirs(target_dir, exist_ok=True) zip_path = os.path.join(target_dir, "pipeline.mpta") - try: - response = requests.get(zip_url, stream=True) - if response.status_code == 200: - with open(zip_path, "wb") as f: - for chunk in response.iter_content(chunk_size=8192): - f.write(chunk) - logging.info(f"Downloaded .mpta file from {zip_url} to {zip_path}") + + # Parse the source; only local files are supported here. + parsed = urlparse(zip_source) + if parsed.scheme in ("", "file"): + local_path = parsed.path if parsed.scheme == "file" else zip_source + if os.path.exists(local_path): + try: + shutil.copy(local_path, zip_path) + logging.info(f"Copied local .mpta file from {local_path} to {zip_path}") + except Exception as e: + logging.error(f"Failed to copy local .mpta file from {local_path}: {e}") + return None else: - logging.error(f"Failed to download .mpta file (status {response.status_code})") + logging.error(f"Local file {local_path} does not exist.") return None - except Exception as e: - logging.error(f"Exception downloading .mpta file from {zip_url}: {e}") + else: + logging.error("HTTP download functionality has been moved. Use a local file path here.") return None try: @@ -58,8 +64,10 @@ def load_pipeline_from_zip(zip_url: str, target_dir: str) -> dict: finally: if os.path.exists(zip_path): os.remove(zip_path) - - pipeline_json_path = os.path.join(target_dir, "pipeline.json") + pipeline_name = os.path.basename(zip_source) + pipeline_name = os.path.splitext(pipeline_name)[0] + mpta_dir = os.path.join(target_dir, pipeline_name) + pipeline_json_path = os.path.join(mpta_dir, "pipeline.json") if not os.path.exists(pipeline_json_path): logging.error("pipeline.json not found in the .mpta file") return None @@ -67,7 +75,7 @@ def load_pipeline_from_zip(zip_url: str, target_dir: str) -> dict: try: with open(pipeline_json_path, "r") as f: pipeline_config = json.load(f) - return load_pipeline_node(pipeline_config["pipeline"], target_dir) + return load_pipeline_node(pipeline_config["pipeline"], mpta_dir) except Exception as e: logging.error(f"Error loading pipeline.json: {e}") return None