update webcam output

This commit is contained in:
Siwat Sirichai 2025-04-01 19:12:23 +07:00
parent 7911245ff9
commit 192b96d658
3 changed files with 115 additions and 30 deletions

4
.gitignore vendored
View file

@ -4,4 +4,6 @@ app.log
# All pycache directories # All pycache directories
__pycache__/ __pycache__/
.mptacache .mptacache
mptas

View file

@ -5,10 +5,17 @@ import time
import logging import logging
import shutil import shutil
import threading # added threading import threading # added threading
import yaml # for silencing YOLO
from siwatsystem.pympta import load_pipeline_from_zip, run_pipeline from siwatsystem.pympta import load_pipeline_from_zip, run_pipeline
logging.basicConfig(level=logging.DEBUG, format="%(asctime)s [%(levelname)s] %(message)s") # Configure logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
# Silence YOLO logging
os.environ["YOLO_VERBOSE"] = "False"
for logger_name in ["ultralytics", "ultralytics.hub", "ultralytics.yolo.utils"]:
logging.getLogger(logger_name).setLevel(logging.WARNING)
# Global variables for frame sharing # Global variables for frame sharing
global_frame = None global_frame = None
@ -25,6 +32,51 @@ def clear_cache(cache_dir: str):
if os.path.exists(cache_dir): if os.path.exists(cache_dir):
shutil.rmtree(cache_dir) shutil.rmtree(cache_dir)
def log_pipeline_flow(frame, model_tree, level=0):
"""
Wrapper around run_pipeline that logs the model flow and detection results.
Returns the same output as the original run_pipeline function.
"""
indent = " " * level
model_id = model_tree.get("modelId", "unknown")
logging.info(f"{indent}→ Running model: {model_id}")
detection, bbox = run_pipeline(frame, model_tree, return_bbox=True)
if detection:
confidence = detection.get("confidence", 0) * 100
class_name = detection.get("class", "unknown")
object_id = detection.get("id", "N/A")
logging.info(f"{indent}✓ Detected: {class_name} (ID: {object_id}, confidence: {confidence:.1f}%)")
# Check if any branches were triggered
triggered = False
for branch in model_tree.get("branches", []):
trigger_classes = branch.get("triggerClasses", [])
min_conf = branch.get("minConfidence", 0)
if class_name in trigger_classes and detection.get("confidence", 0) >= min_conf:
triggered = True
if branch.get("crop", False) and bbox:
x1, y1, x2, y2 = bbox
cropped_frame = frame[y1:y2, x1:x2]
logging.info(f"{indent} ⌊ Triggering branch with cropped region {x1},{y1} to {x2},{y2}")
branch_result = log_pipeline_flow(cropped_frame, branch, level + 1)
else:
logging.info(f"{indent} ⌊ Triggering branch with full frame")
branch_result = log_pipeline_flow(frame, branch, level + 1)
if branch_result[0]: # If branch detection successful, return it
return branch_result
if not triggered and model_tree.get("branches"):
logging.info(f"{indent} ⌊ No branches triggered")
else:
logging.info(f"{indent}✗ No detection for {model_id}")
return detection, bbox
def main(mpta_file: str, video_source: str): def main(mpta_file: str, video_source: str):
global capture_running global capture_running
CACHE_DIR = os.path.join(".", ".mptacache") CACHE_DIR = os.path.join(".", ".mptacache")
@ -52,9 +104,11 @@ def main(mpta_file: str, video_source: str):
if not global_ret or global_frame is None: if not global_ret or global_frame is None:
continue # wait until a frame is available continue # wait until a frame is available
frame = global_frame # local copy to work with frame = global_frame.copy() # local copy to work with
detection, bbox = run_pipeline(frame, model_tree, return_bbox=True) # Replace run_pipeline with our logging version
detection, bbox = log_pipeline_flow(frame, model_tree)
if bbox: if bbox:
x1, y1, x2, y2 = bbox x1, y1, x2, y2 = bbox
cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2) cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)

View file

@ -87,36 +87,65 @@ def run_pipeline(frame, node: dict, return_bbox: bool = False):
for drawing. Otherwise, returns only the detection. for drawing. Otherwise, returns only the detection.
""" """
try: try:
results = node["model"].track(frame, stream=False, persist=True) # Check model type and use appropriate method
detection = None model_task = getattr(node["model"], "task", None)
best_box = None
max_conf = -1 if model_task == "classify":
# Classification models need to use predict() instead of track()
for r in results: logging.debug(f"Running classification model: {node.get('modelId')}")
for box in r.boxes: results = node["model"].predict(frame, stream=False)
box_cpu = box.cpu() detection = None
conf = float(box_cpu.conf[0]) best_box = None
if conf > max_conf and hasattr(box, "id") and box.id is not None:
max_conf = conf # Process classification results
for r in results:
probs = r.probs
if probs is not None and len(probs) > 0:
# Get the most confident class
class_id = int(probs.top1)
conf = float(probs.top1conf)
detection = { detection = {
"class": node["model"].names[int(box_cpu.cls[0])], "class": node["model"].names[class_id],
"confidence": conf, "confidence": conf,
"id": box.id.item() "id": None # Classification doesn't have tracking IDs
} }
best_box = box_cpu
# Classification doesn't produce bounding boxes
bbox = None
else:
# Detection/segmentation models use tracking
logging.debug(f"Running detection/tracking model: {node.get('modelId')}")
results = node["model"].track(frame, stream=False, persist=True)
detection = None
best_box = None
max_conf = -1
bbox = None for r in results:
# Modified bounding box calculation: always compute bbox if best_box exists for box in r.boxes:
if detection and best_box is not None: box_cpu = box.cpu()
coords = best_box.xyxy[0] conf = float(box_cpu.conf[0])
x1, y1, x2, y2 = map(int, coords) if conf > max_conf and hasattr(box, "id") and box.id is not None:
h, w = frame.shape[:2] max_conf = conf
x1, y1 = max(0, x1), max(0, y1) detection = {
x2, y2 = min(w, x2), min(h, y2) "class": node["model"].names[int(box_cpu.cls[0])],
if x2 > x1 and y2 > y1: "confidence": conf,
bbox = (x1, y1, x2, y2) "id": box.id.item()
if node.get("crop", False): }
frame = frame[y1:y2, x1:x2] best_box = box_cpu
bbox = None
# Calculate bbox if best_box exists
if detection and best_box is not None:
coords = best_box.xyxy[0]
x1, y1, x2, y2 = map(int, coords)
h, w = frame.shape[:2]
x1, y1 = max(0, x1), max(0, y1)
x2, y2 = min(w, x2), min(h, y2)
if x2 > x1 and y2 > y1:
bbox = (x1, y1, x2, y2)
if node.get("crop", False):
frame = frame[y1:y2, x1:x2]
if detection is not None: if detection is not None:
for branch in node["branches"]: for branch in node["branches"]: