From 18c62a23709642c840c23da2252064584997835a Mon Sep 17 00:00:00 2001 From: ziesorx Date: Sun, 10 Aug 2025 15:01:18 +0700 Subject: [PATCH] Done features 2 vehicle detect and store image to redis --- app.py | 23 ++-- siwatsystem/pympta.py | 304 ++++++++++++++++++++++++++++++++++-------- 2 files changed, 262 insertions(+), 65 deletions(-) diff --git a/app.py b/app.py index d78b2bf..09cb227 100644 --- a/app.py +++ b/app.py @@ -239,7 +239,20 @@ async def detect(websocket: WebSocket): logger.debug(f"Processing frame for camera {camera_id} with model {stream['modelId']}") start_time = time.time() - detection_result = run_pipeline(cropped_frame, model_tree) + + # Extract display identifier for session ID lookup + subscription_parts = stream["subscriptionIdentifier"].split(';') + display_identifier = subscription_parts[0] if subscription_parts else None + session_id = session_ids.get(display_identifier) if display_identifier else None + + # Create context for pipeline execution + pipeline_context = { + "camera_id": camera_id, + "display_id": display_identifier, + "session_id": session_id + } + + detection_result = run_pipeline(cropped_frame, model_tree, context=pipeline_context) process_time = (time.time() - start_time) * 1000 logger.debug(f"Detection for camera {camera_id} completed in {process_time:.2f}ms") @@ -298,11 +311,6 @@ async def detect(websocket: WebSocket): if key not in ["box", "id"]: # Skip internal fields detection_dict[key] = value - # Extract display identifier for session ID lookup - subscription_parts = stream["subscriptionIdentifier"].split(';') - display_identifier = subscription_parts[0] if subscription_parts else None - session_id = session_ids.get(display_identifier) if display_identifier else None - detection_data = { "type": "imageDetection", "subscriptionIdentifier": stream["subscriptionIdentifier"], @@ -322,9 +330,6 @@ async def detect(websocket: WebSocket): logger.info(f"Camera {camera_id}: Detected {highest_confidence_detection['class']} with confidence {highest_confidence_detection['confidence']:.2f} using model {stream['modelName']}") # Log session ID if available - subscription_parts = stream["subscriptionIdentifier"].split(';') - display_identifier = subscription_parts[0] if subscription_parts else None - session_id = session_ids.get(display_identifier) if display_identifier else None if session_id: logger.debug(f"Detection associated with session ID: {session_id}") diff --git a/siwatsystem/pympta.py b/siwatsystem/pympta.py index 3642dee..bf95ac9 100644 --- a/siwatsystem/pympta.py +++ b/siwatsystem/pympta.py @@ -3,13 +3,13 @@ import json import logging import torch import cv2 -import requests import zipfile import shutil import traceback import redis import time import uuid +import concurrent.futures from ultralytics import YOLO from urllib.parse import urlparse from .database import DatabaseManager @@ -45,6 +45,29 @@ def validate_postgresql_config(pg_config: dict) -> bool: return True +def crop_region_by_class(frame, regions_dict, class_name): + """Crop a specific region from frame based on detected class.""" + if class_name not in regions_dict: + logger.warning(f"Class '{class_name}' not found in detected regions") + return None + + bbox = regions_dict[class_name]['bbox'] + x1, y1, x2, y2 = bbox + cropped = frame[y1:y2, x1:x2] + + if cropped.size == 0: + logger.warning(f"Empty crop for class '{class_name}' with bbox {bbox}") + return None + + return cropped + +def format_action_context(base_context, additional_context=None): + """Format action context with dynamic values.""" + context = {**base_context} + if additional_context: + context.update(additional_context) + return context + def load_pipeline_node(node_config: dict, mpta_dir: str, redis_client, db_manager=None) -> dict: # Recursively load a model node from configuration. model_path = os.path.join(mpta_dir, node_config["modelFile"]) @@ -249,22 +272,53 @@ def load_pipeline_from_zip(zip_source: str, target_dir: str) -> dict: logger.error(f"Error loading pipeline.json: {str(e)}", exc_info=True) return None -def execute_actions(node, frame, detection_result): +def execute_actions(node, frame, detection_result, regions_dict=None): if not node["redis_client"] or not node["actions"]: return # Create a dynamic context for this detection event + from datetime import datetime action_context = { **detection_result, "timestamp_ms": int(time.time() * 1000), "uuid": str(uuid.uuid4()), + "timestamp": datetime.now().strftime("%Y-%m-%dT%H-%M-%S"), + "filename": f"{uuid.uuid4()}.jpg" } for action in node["actions"]: try: if action["type"] == "redis_save_image": key = action["key"].format(**action_context) - _, buffer = cv2.imencode('.jpg', frame) + + # Check if we need to crop a specific region + region_name = action.get("region") + image_to_save = frame + + if region_name and regions_dict: + cropped_image = crop_region_by_class(frame, regions_dict, region_name) + if cropped_image is not None: + image_to_save = cropped_image + logger.debug(f"Cropped region '{region_name}' for redis_save_image") + else: + logger.warning(f"Could not crop region '{region_name}', saving full frame instead") + + # Encode image with specified format and quality (default to JPEG) + img_format = action.get("format", "jpeg").lower() + quality = action.get("quality", 90) + + if img_format == "jpeg": + encode_params = [cv2.IMWRITE_JPEG_QUALITY, quality] + success, buffer = cv2.imencode('.jpg', image_to_save, encode_params) + elif img_format == "png": + success, buffer = cv2.imencode('.png', image_to_save) + else: + success, buffer = cv2.imencode('.jpg', image_to_save, [cv2.IMWRITE_JPEG_QUALITY, quality]) + + if not success: + logger.error(f"Failed to encode image for redis_save_image") + continue + expire_seconds = action.get("expire_seconds") if expire_seconds: node["redis_client"].setex(key, expire_seconds, buffer.tobytes()) @@ -272,59 +326,101 @@ def execute_actions(node, frame, detection_result): else: node["redis_client"].set(key, buffer.tobytes()) logger.info(f"Saved image to Redis with key: {key}") - # Add the generated key to the context for subsequent actions action_context["image_key"] = key elif action["type"] == "redis_publish": channel = action["channel"] - message = action["message"].format(**action_context) - node["redis_client"].publish(channel, message) - logger.info(f"Published message to Redis channel '{channel}': {message}") + try: + # Handle JSON message format by creating it programmatically + message_template = action["message"] + + # Check if the message is JSON-like (starts and ends with braces) + if message_template.strip().startswith('{') and message_template.strip().endswith('}'): + # Create JSON data programmatically to avoid formatting issues + json_data = {} + + # Add common fields + json_data["event"] = "frontal_detected" + json_data["display_id"] = action_context.get("display_id", "unknown") + json_data["session_id"] = action_context.get("session_id") + json_data["timestamp"] = action_context.get("timestamp", "") + json_data["image_key"] = action_context.get("image_key", "") + + # Convert to JSON string + message = json.dumps(json_data) + else: + # Use regular string formatting for non-JSON messages + message = message_template.format(**action_context) + + # Publish to Redis + if not node["redis_client"]: + logger.error("Redis client is None, cannot publish message") + continue + + # Test Redis connection + try: + node["redis_client"].ping() + logger.debug("Redis connection is active") + except Exception as ping_error: + logger.error(f"Redis connection test failed: {ping_error}") + continue + + result = node["redis_client"].publish(channel, message) + logger.info(f"Published message to Redis channel '{channel}': {message}") + logger.info(f"Redis publish result (subscribers count): {result}") + + # Additional debug info + if result == 0: + logger.warning(f"No subscribers listening to channel '{channel}'") + else: + logger.info(f"Message delivered to {result} subscriber(s)") + + except KeyError as e: + logger.error(f"Missing key in redis_publish message template: {e}") + logger.debug(f"Available context keys: {list(action_context.keys())}") + except Exception as e: + logger.error(f"Error in redis_publish action: {e}") + logger.debug(f"Message template: {action['message']}") + logger.debug(f"Available context keys: {list(action_context.keys())}") + import traceback + logger.debug(f"Full traceback: {traceback.format_exc()}") except Exception as e: logger.error(f"Error executing action {action['type']}: {e}") -def run_pipeline(frame, node: dict, return_bbox: bool=False): +def run_pipeline(frame, node: dict, return_bbox: bool=False, context=None): """ - - For detection nodes (task != 'classify'): - • runs `track(..., classes=triggerClassIndices)` - • picks top box ≥ minConfidence - • optionally crops & resizes → recurse into child - • else returns (det_dict, bbox) - - For classify nodes: - • runs `predict()` - • returns top (class,confidence) and no bbox + Enhanced pipeline that supports: + - Multi-class detection (detecting multiple classes simultaneously) + - Parallel branch processing + - Region-based actions and cropping + - Context passing for session/camera information """ try: task = getattr(node["model"], "task", None) # ─── Classification stage ─────────────────────────────────── if task == "classify": - # run the classifier and grab its top-1 directly via the Probs API results = node["model"].predict(frame, stream=False) - # nothing returned? if not results: return (None, None) if return_bbox else None - # take the first result's probs object - r = results[0] + r = results[0] probs = r.probs if probs is None: return (None, None) if return_bbox else None - # get the top-1 class index and its confidence - top1_idx = int(probs.top1) + top1_idx = int(probs.top1) top1_conf = float(probs.top1conf) det = { "class": node["model"].names[top1_idx], "confidence": top1_conf, - "id": None + "id": None, + node["model"].names[top1_idx]: node["model"].names[top1_idx] # Add class name as key } execute_actions(node, frame, det) return (det, None) if return_bbox else det - - # ─── Detection stage ──────────────────────────────────────── - # only look for your triggerClasses + # ─── Detection stage - Multi-class support ────────────────── tk = node["triggerClassIndices"] res = node["model"].track( frame, @@ -333,48 +429,144 @@ def run_pipeline(frame, node: dict, return_bbox: bool=False): **({"classes": tk} if tk else {}) )[0] - dets, boxes = [], [] + # Collect all detections above confidence threshold + all_detections = [] + all_boxes = [] + regions_dict = {} + for box in res.boxes: conf = float(box.cpu().conf[0]) - cid = int(box.cpu().cls[0]) + cid = int(box.cpu().cls[0]) name = node["model"].names[cid] + if conf < node["minConfidence"]: continue + xy = box.cpu().xyxy[0] - x1,y1,x2,y2 = map(int, xy) - dets.append({"class": name, "confidence": conf, - "id": box.id.item() if hasattr(box, "id") else None}) - boxes.append((x1, y1, x2, y2)) + x1, y1, x2, y2 = map(int, xy) + bbox = (x1, y1, x2, y2) + + detection = { + "class": name, + "confidence": conf, + "id": box.id.item() if hasattr(box, "id") else None, + "bbox": bbox + } + + all_detections.append(detection) + all_boxes.append(bbox) + + # Store highest confidence detection for each class + if name not in regions_dict or conf > regions_dict[name]["confidence"]: + regions_dict[name] = { + "bbox": bbox, + "confidence": conf, + "detection": detection + } - if not dets: + if not all_detections: return (None, None) if return_bbox else None - # take highest‐confidence - best_idx = max(range(len(dets)), key=lambda i: dets[i]["confidence"]) - best_det = dets[best_idx] - best_box = boxes[best_idx] + # ─── Multi-class validation ───────────────────────────────── + if node.get("multiClass", False) and node.get("expectedClasses"): + expected_classes = node["expectedClasses"] + detected_classes = list(regions_dict.keys()) + + # Check if all expected classes are detected + missing_classes = [cls for cls in expected_classes if cls not in detected_classes] + if missing_classes: + logger.debug(f"Missing expected classes: {missing_classes}. Detected: {detected_classes}") + return (None, None) if return_bbox else None + + logger.info(f"Multi-class detection success: {detected_classes}") - # ─── Branch (classification) ─────────────────────────────── - for br in node["branches"]: - if (best_det["class"] in br["triggerClasses"] - and best_det["confidence"] >= br["minConfidence"]): - # crop if requested - sub = frame - if br["crop"]: - x1,y1,x2,y2 = best_box - sub = frame[y1:y2, x1:x2] - sub = cv2.resize(sub, (224, 224)) + # ─── Execute actions with region information ──────────────── + detection_result = { + "detections": all_detections, + "regions": regions_dict, + **(context or {}) + } + execute_actions(node, frame, detection_result, regions_dict) - det2, _ = run_pipeline(sub, br, return_bbox=True) - if det2: - # return classification result + original bbox - execute_actions(br, sub, det2) - return (det2, best_box) if return_bbox else det2 + # ─── Parallel branch processing ───────────────────────────── + if node["branches"]: + branch_results = {} + + # Filter branches that should be triggered + active_branches = [] + for br in node["branches"]: + trigger_classes = br.get("triggerClasses", []) + min_conf = br.get("minConfidence", 0) + + # Check if any detected class matches branch trigger + for det_class in regions_dict: + if (det_class in trigger_classes and + regions_dict[det_class]["confidence"] >= min_conf): + active_branches.append(br) + break + + if active_branches: + if node.get("parallel", False) or any(br.get("parallel", False) for br in active_branches): + # Run branches in parallel + with concurrent.futures.ThreadPoolExecutor(max_workers=len(active_branches)) as executor: + futures = {} + + for br in active_branches: + crop_class = br.get("cropClass", br.get("triggerClasses", [])[0] if br.get("triggerClasses") else None) + sub_frame = frame + + if br.get("crop", False) and crop_class: + cropped = crop_region_by_class(frame, regions_dict, crop_class) + if cropped is not None: + sub_frame = cv2.resize(cropped, (224, 224)) + else: + continue + + future = executor.submit(run_pipeline, sub_frame, br, True, context) + futures[future] = br + + # Collect results + for future in concurrent.futures.as_completed(futures): + br = futures[future] + try: + result, _ = future.result() + if result: + branch_results[br["modelId"]] = result + logger.info(f"Branch {br['modelId']} completed: {result}") + except Exception as e: + logger.error(f"Branch {br['modelId']} failed: {e}") + else: + # Run branches sequentially + for br in active_branches: + crop_class = br.get("cropClass", br.get("triggerClasses", [])[0] if br.get("triggerClasses") else None) + sub_frame = frame + + if br.get("crop", False) and crop_class: + cropped = crop_region_by_class(frame, regions_dict, crop_class) + if cropped is not None: + sub_frame = cv2.resize(cropped, (224, 224)) + else: + continue + + result, _ = run_pipeline(sub_frame, br, True, context) + if result: + branch_results[br["modelId"]] = result + logger.info(f"Branch {br['modelId']} completed: {result}") - # ─── No branch matched → return this detection ───────────── - execute_actions(node, frame, best_det) - return (best_det, best_box) if return_bbox else best_det + # Store branch results in detection_result for parallel actions + detection_result["branch_results"] = branch_results + + # ─── Return detection result ──────────────────────────────── + primary_detection = max(all_detections, key=lambda x: x["confidence"]) + primary_bbox = primary_detection["bbox"] + + # Add branch results to primary detection for compatibility + if "branch_results" in detection_result: + primary_detection["branch_results"] = detection_result["branch_results"] + + return (primary_detection, primary_bbox) if return_bbox else primary_detection except Exception as e: - logging.error(f"Error in node {node.get('modelId')}: {e}") + logger.error(f"Error in node {node.get('modelId')}: {e}") + traceback.print_exc() return (None, None) if return_bbox else None