import os import json import logging import torch import cv2 import zipfile import shutil import traceback import redis import time import uuid import concurrent.futures from ultralytics import YOLO from urllib.parse import urlparse from .database import DatabaseManager from datetime import datetime # Create a logger specifically for this module logger = logging.getLogger("detector_worker.pympta") # Global camera-aware stability tracking # Structure: {camera_id: {model_id: {"track_stability_counters": {track_id: count}, "stable_tracks": set(), "session_state": {...}}}} _camera_stability_tracking = {} # Session timeout configuration (waiting for backend sessionId) _session_timeout_seconds = 15 def validate_redis_config(redis_config: dict) -> bool: """Validate Redis configuration parameters.""" required_fields = ["host", "port"] for field in required_fields: if field not in redis_config: logger.error(f"Missing required Redis config field: {field}") return False if not isinstance(redis_config["port"], int) or redis_config["port"] <= 0: logger.error(f"Invalid Redis port: {redis_config['port']}") return False return True def validate_postgresql_config(pg_config: dict) -> bool: """Validate PostgreSQL configuration parameters.""" required_fields = ["host", "port", "database", "username", "password"] for field in required_fields: if field not in pg_config: logger.error(f"Missing required PostgreSQL config field: {field}") return False if not isinstance(pg_config["port"], int) or pg_config["port"] <= 0: logger.error(f"Invalid PostgreSQL port: {pg_config['port']}") return False return True def crop_region_by_class(frame, regions_dict, class_name): """Crop a specific region from frame based on detected class.""" if class_name not in regions_dict: logger.warning(f"Class '{class_name}' not found in detected regions") return None bbox = regions_dict[class_name]['bbox'] x1, y1, x2, y2 = bbox cropped = frame[y1:y2, x1:x2] if cropped.size == 0: logger.warning(f"Empty crop for class '{class_name}' with bbox {bbox}") return None return cropped def format_action_context(base_context, additional_context=None): """Format action context with dynamic values.""" context = {**base_context} if additional_context: context.update(additional_context) return context def load_pipeline_node(node_config: dict, mpta_dir: str, redis_client, db_manager=None) -> dict: # Recursively load a model node from configuration. model_path = os.path.join(mpta_dir, node_config["modelFile"]) if not os.path.exists(model_path): logger.error(f"Model file {model_path} not found. Current directory: {os.getcwd()}") logger.error(f"Directory content: {os.listdir(os.path.dirname(model_path))}") raise FileNotFoundError(f"Model file {model_path} not found.") logger.info(f"Loading model for node {node_config['modelId']} from {model_path}") model = YOLO(model_path) if torch.cuda.is_available(): logger.info(f"CUDA available. Moving model {node_config['modelId']} to GPU VRAM") model.to("cuda") else: logger.info(f"CUDA not available. Using CPU for model {node_config['modelId']}") # Prepare trigger class indices for optimization trigger_classes = node_config.get("triggerClasses", []) trigger_class_indices = None if trigger_classes and hasattr(model, "names"): # Convert class names to indices for the model trigger_class_indices = [i for i, name in model.names.items() if name in trigger_classes] logger.debug(f"Converted trigger classes to indices: {trigger_class_indices}") # Extract stability threshold from tracking config tracking_config = node_config.get("tracking", {"enabled": True, "reidConfigPath": "botsort.yaml"}) stability_threshold = tracking_config.get("stabilityThreshold", 1) node = { "modelId": node_config["modelId"], "modelFile": node_config["modelFile"], "triggerClasses": trigger_classes, "triggerClassIndices": trigger_class_indices, "classMapping": node_config.get("classMapping", {}), "crop": node_config.get("crop", False), "cropClass": node_config.get("cropClass"), "minConfidence": node_config.get("minConfidence", None), "multiClass": node_config.get("multiClass", False), "expectedClasses": node_config.get("expectedClasses", []), "parallel": node_config.get("parallel", False), "actions": node_config.get("actions", []), "parallelActions": node_config.get("parallelActions", []), "tracking": tracking_config, "stabilityThreshold": stability_threshold, "model": model, "branches": [], "redis_client": redis_client, "db_manager": db_manager } logger.debug(f"Configured node {node_config['modelId']} with trigger classes: {node['triggerClasses']}") for child in node_config.get("branches", []): logger.debug(f"Loading branch for parent node {node_config['modelId']}") node["branches"].append(load_pipeline_node(child, mpta_dir, redis_client, db_manager)) return node def load_pipeline_from_zip(zip_source: str, target_dir: str) -> dict: logger.info(f"Attempting to load pipeline from {zip_source} to {target_dir}") os.makedirs(target_dir, exist_ok=True) zip_path = os.path.join(target_dir, "pipeline.mpta") # Parse the source; only local files are supported here. parsed = urlparse(zip_source) if parsed.scheme in ("", "file"): local_path = parsed.path if parsed.scheme == "file" else zip_source logger.debug(f"Checking if local file exists: {local_path}") if os.path.exists(local_path): try: shutil.copy(local_path, zip_path) logger.info(f"Copied local .mpta file from {local_path} to {zip_path}") except Exception as e: logger.error(f"Failed to copy local .mpta file from {local_path}: {str(e)}", exc_info=True) return None else: logger.error(f"Local file {local_path} does not exist. Current directory: {os.getcwd()}") # List all subdirectories of models directory to help debugging if os.path.exists("models"): logger.error(f"Content of models directory: {os.listdir('models')}") for root, dirs, files in os.walk("models"): logger.error(f"Directory {root} contains subdirs: {dirs} and files: {files}") else: logger.error("The models directory doesn't exist") return None else: logger.error(f"HTTP download functionality has been moved. Use a local file path here. Received: {zip_source}") return None try: if not os.path.exists(zip_path): logger.error(f"Zip file not found at expected location: {zip_path}") return None logger.debug(f"Extracting .mpta file from {zip_path} to {target_dir}") # Extract contents and track the directories created extracted_dirs = [] with zipfile.ZipFile(zip_path, "r") as zip_ref: file_list = zip_ref.namelist() logger.debug(f"Files in .mpta archive: {file_list}") # Extract and track the top-level directories for file_path in file_list: parts = file_path.split('/') if len(parts) > 1: top_dir = parts[0] if top_dir and top_dir not in extracted_dirs: extracted_dirs.append(top_dir) # Now extract the files zip_ref.extractall(target_dir) logger.info(f"Successfully extracted .mpta file to {target_dir}") logger.debug(f"Extracted directories: {extracted_dirs}") # Check what was actually created after extraction actual_dirs = [d for d in os.listdir(target_dir) if os.path.isdir(os.path.join(target_dir, d))] logger.debug(f"Actual directories created: {actual_dirs}") except zipfile.BadZipFile as e: logger.error(f"Bad zip file {zip_path}: {str(e)}", exc_info=True) return None except Exception as e: logger.error(f"Failed to extract .mpta file {zip_path}: {str(e)}", exc_info=True) return None finally: if os.path.exists(zip_path): os.remove(zip_path) logger.debug(f"Removed temporary zip file: {zip_path}") # Use the first extracted directory if it exists, otherwise use the expected name pipeline_name = os.path.basename(zip_source) pipeline_name = os.path.splitext(pipeline_name)[0] # Find the directory with pipeline.json mpta_dir = None # First try the expected directory name expected_dir = os.path.join(target_dir, pipeline_name) if os.path.exists(expected_dir) and os.path.exists(os.path.join(expected_dir, "pipeline.json")): mpta_dir = expected_dir logger.debug(f"Found pipeline.json in the expected directory: {mpta_dir}") else: # Look through all subdirectories for pipeline.json for subdir in actual_dirs: potential_dir = os.path.join(target_dir, subdir) if os.path.exists(os.path.join(potential_dir, "pipeline.json")): mpta_dir = potential_dir logger.info(f"Found pipeline.json in directory: {mpta_dir} (different from expected: {expected_dir})") break if not mpta_dir: logger.error(f"Could not find pipeline.json in any extracted directory. Directory content: {os.listdir(target_dir)}") return None pipeline_json_path = os.path.join(mpta_dir, "pipeline.json") if not os.path.exists(pipeline_json_path): logger.error(f"pipeline.json not found in the .mpta file. Files in directory: {os.listdir(mpta_dir)}") return None try: with open(pipeline_json_path, "r") as f: pipeline_config = json.load(f) logger.info(f"Successfully loaded pipeline configuration from {pipeline_json_path}") logger.debug(f"Pipeline config: {json.dumps(pipeline_config, indent=2)}") # Establish Redis connection if configured redis_client = None if "redis" in pipeline_config: redis_config = pipeline_config["redis"] if not validate_redis_config(redis_config): logger.error("Invalid Redis configuration, skipping Redis connection") else: try: redis_client = redis.Redis( host=redis_config["host"], port=redis_config["port"], password=redis_config.get("password"), db=redis_config.get("db", 0), decode_responses=True ) redis_client.ping() logger.info(f"Successfully connected to Redis at {redis_config['host']}:{redis_config['port']}") except redis.exceptions.ConnectionError as e: logger.error(f"Failed to connect to Redis: {e}") redis_client = None # Establish PostgreSQL connection if configured db_manager = None if "postgresql" in pipeline_config: pg_config = pipeline_config["postgresql"] if not validate_postgresql_config(pg_config): logger.error("Invalid PostgreSQL configuration, skipping database connection") else: try: db_manager = DatabaseManager(pg_config) if db_manager.connect(): logger.info(f"Successfully connected to PostgreSQL at {pg_config['host']}:{pg_config['port']}") else: logger.error("Failed to connect to PostgreSQL") db_manager = None except Exception as e: logger.error(f"Error initializing PostgreSQL connection: {e}") db_manager = None return load_pipeline_node(pipeline_config["pipeline"], mpta_dir, redis_client, db_manager) except json.JSONDecodeError as e: logger.error(f"Error parsing pipeline.json: {str(e)}", exc_info=True) return None except KeyError as e: logger.error(f"Missing key in pipeline.json: {str(e)}", exc_info=True) return None except Exception as e: logger.error(f"Error loading pipeline.json: {str(e)}", exc_info=True) return None def execute_actions(node, frame, detection_result, regions_dict=None): if not node["redis_client"] or not node["actions"]: return # Create a dynamic context for this detection event from datetime import datetime action_context = { **detection_result, "timestamp_ms": int(time.time() * 1000), "uuid": str(uuid.uuid4()), "timestamp": datetime.now().strftime("%Y-%m-%dT%H-%M-%S"), "filename": f"{uuid.uuid4()}.jpg" } for action in node["actions"]: try: if action["type"] == "redis_save_image": key = action["key"].format(**action_context) # Check if we need to crop a specific region region_name = action.get("region") image_to_save = frame if region_name and regions_dict: cropped_image = crop_region_by_class(frame, regions_dict, region_name) if cropped_image is not None: image_to_save = cropped_image logger.debug(f"Cropped region '{region_name}' for redis_save_image") else: logger.warning(f"Could not crop region '{region_name}', saving full frame instead") # Encode image with specified format and quality (default to JPEG) img_format = action.get("format", "jpeg").lower() quality = action.get("quality", 90) if img_format == "jpeg": encode_params = [cv2.IMWRITE_JPEG_QUALITY, quality] success, buffer = cv2.imencode('.jpg', image_to_save, encode_params) elif img_format == "png": success, buffer = cv2.imencode('.png', image_to_save) else: success, buffer = cv2.imencode('.jpg', image_to_save, [cv2.IMWRITE_JPEG_QUALITY, quality]) if not success: logger.error(f"Failed to encode image for redis_save_image") continue expire_seconds = action.get("expire_seconds") if expire_seconds: node["redis_client"].setex(key, expire_seconds, buffer.tobytes()) logger.info(f"Saved image to Redis with key: {key} (expires in {expire_seconds}s)") else: node["redis_client"].set(key, buffer.tobytes()) logger.info(f"Saved image to Redis with key: {key}") action_context["image_key"] = key elif action["type"] == "redis_publish": channel = action["channel"] try: # Handle JSON message format by creating it programmatically message_template = action["message"] # Check if the message is JSON-like (starts and ends with braces) if message_template.strip().startswith('{') and message_template.strip().endswith('}'): # Create JSON data programmatically to avoid formatting issues json_data = {} # Add common fields json_data["event"] = "frontal_detected" json_data["display_id"] = action_context.get("display_id", "unknown") json_data["session_id"] = action_context.get("session_id") json_data["timestamp"] = action_context.get("timestamp", "") json_data["image_key"] = action_context.get("image_key", "") # Convert to JSON string message = json.dumps(json_data) else: # Use regular string formatting for non-JSON messages message = message_template.format(**action_context) # Publish to Redis if not node["redis_client"]: logger.error("Redis client is None, cannot publish message") continue # Test Redis connection try: node["redis_client"].ping() logger.debug("Redis connection is active") except Exception as ping_error: logger.error(f"Redis connection test failed: {ping_error}") continue result = node["redis_client"].publish(channel, message) logger.info(f"Published message to Redis channel '{channel}': {message}") logger.info(f"Redis publish result (subscribers count): {result}") # Additional debug info if result == 0: logger.warning(f"No subscribers listening to channel '{channel}'") else: logger.info(f"Message delivered to {result} subscriber(s)") except KeyError as e: logger.error(f"Missing key in redis_publish message template: {e}") logger.debug(f"Available context keys: {list(action_context.keys())}") except Exception as e: logger.error(f"Error in redis_publish action: {e}") logger.debug(f"Message template: {action['message']}") logger.debug(f"Available context keys: {list(action_context.keys())}") import traceback logger.debug(f"Full traceback: {traceback.format_exc()}") except Exception as e: logger.error(f"Error executing action {action['type']}: {e}") def execute_parallel_actions(node, frame, detection_result, regions_dict): """Execute parallel actions after all required branches have completed.""" if not node.get("parallelActions"): return logger.debug("Executing parallel actions...") branch_results = detection_result.get("branch_results", {}) for action in node["parallelActions"]: try: action_type = action.get("type") logger.debug(f"Processing parallel action: {action_type}") if action_type == "postgresql_update_combined": # Check if all required branches have completed wait_for_branches = action.get("waitForBranches", []) missing_branches = [branch for branch in wait_for_branches if branch not in branch_results] if missing_branches: logger.warning(f"Cannot execute postgresql_update_combined: missing branch results for {missing_branches}") continue logger.info(f"All required branches completed: {wait_for_branches}") # Execute the database update execute_postgresql_update_combined(node, action, detection_result, branch_results) else: logger.warning(f"Unknown parallel action type: {action_type}") except Exception as e: logger.error(f"Error executing parallel action {action.get('type', 'unknown')}: {e}") import traceback logger.debug(f"Full traceback: {traceback.format_exc()}") def execute_postgresql_update_combined(node, action, detection_result, branch_results): """Execute a PostgreSQL update with combined branch results.""" if not node.get("db_manager"): logger.error("No database manager available for postgresql_update_combined action") return try: table = action["table"] key_field = action["key_field"] key_value_template = action["key_value"] fields = action["fields"] # Create context for key value formatting action_context = {**detection_result} key_value = key_value_template.format(**action_context) logger.info(f"Executing database update: table={table}, {key_field}={key_value}") # Process field mappings mapped_fields = {} for db_field, value_template in fields.items(): try: mapped_value = resolve_field_mapping(value_template, branch_results, action_context) if mapped_value is not None: mapped_fields[db_field] = mapped_value logger.debug(f"Mapped field: {db_field} = {mapped_value}") else: logger.warning(f"Could not resolve field mapping for {db_field}: {value_template}") except Exception as e: logger.error(f"Error mapping field {db_field} with template '{value_template}': {e}") if not mapped_fields: logger.warning("No fields mapped successfully, skipping database update") return # Execute the database update success = node["db_manager"].execute_update(table, key_field, key_value, mapped_fields) if success: logger.info(f"Successfully updated database: {table} with {len(mapped_fields)} fields") else: logger.error(f"Failed to update database: {table}") except KeyError as e: logger.error(f"Missing required field in postgresql_update_combined action: {e}") except Exception as e: logger.error(f"Error in postgresql_update_combined action: {e}") import traceback logger.debug(f"Full traceback: {traceback.format_exc()}") def resolve_field_mapping(value_template, branch_results, action_context): """Resolve field mapping templates like {car_brand_cls_v1.brand}.""" try: # Handle simple context variables first (non-branch references) if not '.' in value_template: return value_template.format(**action_context) # Handle branch result references like {model_id.field} import re branch_refs = re.findall(r'\{([^}]+\.[^}]+)\}', value_template) resolved_template = value_template for ref in branch_refs: try: model_id, field_name = ref.split('.', 1) if model_id in branch_results: branch_data = branch_results[model_id] if field_name in branch_data: field_value = branch_data[field_name] resolved_template = resolved_template.replace(f'{{{ref}}}', str(field_value)) logger.debug(f"Resolved {ref} to {field_value}") else: logger.warning(f"Field '{field_name}' not found in branch '{model_id}' results. Available fields: {list(branch_data.keys())}") return None else: logger.warning(f"Branch '{model_id}' not found in results. Available branches: {list(branch_results.keys())}") return None except ValueError as e: logger.error(f"Invalid branch reference format: {ref}") return None # Format any remaining simple variables try: final_value = resolved_template.format(**action_context) return final_value except KeyError as e: logger.warning(f"Could not resolve context variable in template: {e}") return resolved_template except Exception as e: logger.error(f"Error resolving field mapping '{value_template}': {e}") return None def run_detection_with_tracking(frame, node, context=None): """ Structured function for running YOLO detection with BoT-SORT tracking. Args: frame: Input frame/image node: Pipeline node configuration with model and settings context: Optional context information (camera info, session data, etc.) Returns: tuple: (all_detections, regions_dict) where: - all_detections: List of all detection objects - regions_dict: Dict mapping class names to highest confidence detections Configuration options in node: - model: YOLO model instance - triggerClassIndices: List of class indices to detect (None for all classes) - minConfidence: Minimum confidence threshold - multiClass: Whether to enable multi-class detection mode - expectedClasses: List of expected class names for multi-class validation - tracking: Dict with tracking configuration - enabled: Boolean to enable/disable tracking - reidConfigPath: Path to ReID config file (default: "botsort.yaml") """ try: # Extract tracking configuration tracking_config = node.get("tracking", {}) tracking_enabled = tracking_config.get("enabled", True) reid_config_path = tracking_config.get("reidConfigPath", "botsort.yaml") # Check if we need to reset tracker after cooldown camera_id = context.get("camera_id", "unknown") if context else "unknown" model_id = node.get("modelId", "unknown") stability_data = get_camera_stability_data(camera_id, model_id) session_state = stability_data["session_state"] if session_state.get("reset_tracker_on_resume", False): # Reset YOLO tracker to get fresh track IDs if hasattr(node["model"], 'trackers') and node["model"].trackers: node["model"].trackers.clear() # Clear tracker state logger.info(f"Camera {camera_id}: ๐Ÿ”„ Reset YOLO tracker - new cars will get fresh track IDs") session_state["reset_tracker_on_resume"] = False # Clear the flag # Get tracking zone from runtime context (camera-specific) tracking_zone = context.get("trackingZone", []) if context else [] # Prepare class filtering trigger_class_indices = node.get("triggerClassIndices") class_filter = {"classes": trigger_class_indices} if trigger_class_indices else {} logger.debug(f"Running detection for {node['modelId']} - tracking: {tracking_enabled}, classes: {node.get('triggerClasses', 'all')}") if tracking_enabled and tracking_zone: # Use tracking with zone validation logger.debug(f"Using tracking with ReID config: {reid_config_path}") res = node["model"].track( frame, stream=False, persist=True, tracker=reid_config_path, **class_filter )[0] elif tracking_enabled: # Use tracking without zone restriction logger.debug("Using tracking without zone restriction") res = node["model"].track( frame, stream=False, persist=True, **class_filter )[0] else: # Use detection only (no tracking) logger.debug("Using detection only (tracking disabled)") res = node["model"].predict( frame, stream=False, **class_filter )[0] # Process detection results candidate_detections = [] min_confidence = node.get("minConfidence", 0.0) if res.boxes is None or len(res.boxes) == 0: logger.debug("No detections found") return [], {} logger.debug(f"Processing {len(res.boxes)} raw detections") # First pass: collect all valid detections for i, box in enumerate(res.boxes): # Extract detection data conf = float(box.cpu().conf[0]) cls_id = int(box.cpu().cls[0]) class_name = node["model"].names[cls_id] # Apply confidence filtering if conf < min_confidence: logger.debug(f"Detection {i} '{class_name}' rejected: {conf:.3f} < {min_confidence}") continue # Extract bounding box xy = box.cpu().xyxy[0] x1, y1, x2, y2 = map(int, xy) bbox = (x1, y1, x2, y2) # Extract tracking ID if available track_id = None if hasattr(box, "id") and box.id is not None: track_id = int(box.id.item()) # Apply tracking zone validation if enabled if tracking_enabled and tracking_zone: bbox_center_x = (x1 + x2) // 2 bbox_center_y = (y1 + y2) // 2 # Check if detection center is within tracking zone if not _point_in_polygon((bbox_center_x, bbox_center_y), tracking_zone): logger.debug(f"Detection {i} '{class_name}' outside tracking zone") continue # Create detection object detection = { "class": class_name, "confidence": conf, "id": track_id, "bbox": bbox, "class_id": cls_id } candidate_detections.append(detection) logger.debug(f"Detection {i} candidate: {class_name} (conf={conf:.3f}, id={track_id}, bbox={bbox})") # Second pass: select only the highest confidence detection overall if not candidate_detections: logger.debug("No valid candidate detections found") return [], {} # Find the single highest confidence detection across all detected classes best_detection = max(candidate_detections, key=lambda x: x["confidence"]) original_class = best_detection["class"] logger.info(f"Selected highest confidence detection: {original_class} (conf={best_detection['confidence']:.3f})") # Apply class mapping if configured mapped_class = original_class class_mapping = node.get("classMapping", {}) if original_class in class_mapping: mapped_class = class_mapping[original_class] logger.info(f"Class mapping applied: {original_class} โ†’ {mapped_class}") # Update the detection object with mapped class best_detection["class"] = mapped_class best_detection["original_class"] = original_class # Keep original for reference # Keep only the best detection with mapped class all_detections = [best_detection] regions_dict = { mapped_class: { "bbox": best_detection["bbox"], "confidence": best_detection["confidence"], "detection": best_detection, "track_id": best_detection["id"] } } # Multi-class validation if node.get("multiClass", False) and node.get("expectedClasses"): expected_classes = node["expectedClasses"] detected_classes = list(regions_dict.keys()) logger.debug(f"Multi-class validation: expected={expected_classes}, detected={detected_classes}") # Check for required classes (flexible - at least one must match) matching_classes = [cls for cls in expected_classes if cls in detected_classes] if not matching_classes: logger.warning(f"Multi-class validation failed: no expected classes detected") return [], {} logger.info(f"Multi-class validation passed: {matching_classes} detected") logger.info(f"Detection completed: {len(all_detections)} detections, {len(regions_dict)} unique classes") # Update stability tracking for detections with track IDs (requires camera_id from context) camera_id = context.get("camera_id", "unknown") if context else "unknown" update_track_stability(node, all_detections, camera_id) return all_detections, regions_dict except Exception as e: logger.error(f"Error in detection_with_tracking for {node.get('modelId', 'unknown')}: {e}") logger.debug(f"Detection error traceback: {traceback.format_exc()}") return [], {} def _point_in_polygon(point, polygon): """Check if a point is inside a polygon using ray casting algorithm.""" if not polygon or len(polygon) < 3: return True # No zone restriction if invalid polygon x, y = point n = len(polygon) inside = False p1x, p1y = polygon[0] for i in range(1, n + 1): p2x, p2y = polygon[i % n] if y > min(p1y, p2y): if y <= max(p1y, p2y): if x <= max(p1x, p2x): if p1y != p2y: xinters = (y - p1y) * (p2x - p1x) / (p2y - p1y) + p1x if p1x == p2x or x <= xinters: inside = not inside p1x, p1y = p2x, p2y return inside def get_camera_stability_data(camera_id, model_id): """Get or create stability tracking data for a specific camera and model.""" global _camera_stability_tracking if camera_id not in _camera_stability_tracking: _camera_stability_tracking[camera_id] = {} if model_id not in _camera_stability_tracking[camera_id]: logger.warning(f"๐Ÿ”„ Camera {camera_id}: Creating NEW stability data for {model_id} - this will reset any cooldown!") _camera_stability_tracking[camera_id][model_id] = { "track_stability_counters": {}, "stable_tracks": set(), "session_state": { "active": True, "waiting_for_backend_session": False, "wait_start_time": 0.0, "reset_tracker_on_resume": False } } return _camera_stability_tracking[camera_id][model_id] def update_track_stability(node, detections, camera_id): """Update stability counters for tracked objects per camera.""" stability_threshold = node.get("stabilityThreshold", 1) model_id = node.get("modelId", "unknown") # Get camera-specific stability data stability_data = get_camera_stability_data(camera_id, model_id) track_counters = stability_data["track_stability_counters"] stable_tracks = stability_data["stable_tracks"] # Get current track IDs from detections current_track_ids = set() for detection in detections: track_id = detection.get("id") if track_id is not None: current_track_ids.add(track_id) # Increment counter for this track track_counters[track_id] = track_counters.get(track_id, 0) + 1 # Check if track becomes stable if track_counters[track_id] >= stability_threshold and track_id not in stable_tracks: stable_tracks.add(track_id) logger.info(f"Camera {camera_id}: Track ID {track_id} became stable after {track_counters[track_id]} detections (threshold: {stability_threshold})") # Clean up counters for tracks that disappeared disappeared_tracks = set(track_counters.keys()) - current_track_ids for track_id in disappeared_tracks: logger.debug(f"Camera {camera_id}: Track ID {track_id} disappeared, removing from counters") track_counters.pop(track_id, None) stable_tracks.discard(track_id) logger.debug(f"Camera {camera_id}: Track stability: active={list(current_track_ids)}, stable={list(stable_tracks)}, counters={track_counters}") def check_stable_tracks(camera_id, model_id, regions_dict): """Check if any stable tracks match the detected classes for a specific camera.""" # Get camera-specific stability data stability_data = get_camera_stability_data(camera_id, model_id) stable_tracks = stability_data["stable_tracks"] if not stable_tracks: return False, [] # Check if any detection in regions_dict has a stable track ID stable_detections = [] for class_name, region_data in regions_dict.items(): detection = region_data.get("detection", {}) track_id = detection.get("id") if track_id is not None and track_id in stable_tracks: stable_detections.append((class_name, track_id)) logger.debug(f"Camera {camera_id}: Found stable detection: {class_name} with stable track ID {track_id}") has_stable_tracks = len(stable_detections) > 0 return has_stable_tracks, stable_detections def reset_tracking_state(camera_id, model_id, reason="session ended"): """Reset tracking state after session completion or timeout.""" stability_data = get_camera_stability_data(camera_id, model_id) session_state = stability_data["session_state"] # Clear all tracking data for fresh start stability_data["track_stability_counters"].clear() stability_data["stable_tracks"].clear() session_state["active"] = True session_state["waiting_for_backend_session"] = False session_state["wait_start_time"] = 0.0 session_state["reset_tracker_on_resume"] = True logger.info(f"Camera {camera_id}: ๐Ÿ”„ Reset tracking state - {reason}") logger.info(f"Camera {camera_id}: ๐Ÿงน Cleared stability counters and stable tracks for fresh session") def is_camera_active(camera_id, model_id): """Check if camera should be processing detections.""" stability_data = get_camera_stability_data(camera_id, model_id) session_state = stability_data["session_state"] # Check if waiting for backend sessionId has timed out if session_state.get("waiting_for_backend_session", False): current_time = time.time() wait_start_time = session_state.get("wait_start_time", 0) elapsed_time = current_time - wait_start_time if elapsed_time >= _session_timeout_seconds: logger.warning(f"Camera {camera_id}: Backend sessionId timeout ({_session_timeout_seconds}s) - resetting tracking") reset_tracking_state(camera_id, model_id, "backend sessionId timeout") return True else: remaining_time = _session_timeout_seconds - elapsed_time logger.debug(f"Camera {camera_id}: Still waiting for backend sessionId - {remaining_time:.1f}s remaining") return False return session_state.get("active", True) def cleanup_camera_stability(camera_id): """Clean up stability tracking data when a camera is disconnected.""" global _camera_stability_tracking if camera_id in _camera_stability_tracking: del _camera_stability_tracking[camera_id] logger.info(f"Cleaned up stability tracking data for camera {camera_id}") def validate_pipeline_execution(node, regions_dict): """ Pre-validate that all required branches will execute successfully before committing to Redis actions and database records. Returns: - (True, []) if pipeline can execute completely - (False, missing_branches) if some required branches won't execute """ # Get all branches that parallel actions are waiting for required_branches = set() for action in node.get("parallelActions", []): if action.get("type") == "postgresql_update_combined": wait_for_branches = action.get("waitForBranches", []) required_branches.update(wait_for_branches) if not required_branches: # No parallel actions requiring specific branches logger.debug("No parallel actions with waitForBranches - validation passes") return True, [] logger.debug(f"Pre-validation: checking if required branches {list(required_branches)} will execute") # Check each required branch missing_branches = [] for branch in node.get("branches", []): branch_id = branch["modelId"] if branch_id not in required_branches: continue # This branch is not required by parallel actions # Check if this branch would be triggered trigger_classes = branch.get("triggerClasses", []) min_conf = branch.get("minConfidence", 0) branch_triggered = False for det_class in regions_dict: det_confidence = regions_dict[det_class]["confidence"] if (det_class in trigger_classes and det_confidence >= min_conf): branch_triggered = True logger.debug(f"Pre-validation: branch {branch_id} WILL be triggered by {det_class} (conf={det_confidence:.3f} >= {min_conf})") break if not branch_triggered: missing_branches.append(branch_id) logger.warning(f"Pre-validation: branch {branch_id} will NOT be triggered - no matching classes or insufficient confidence") logger.debug(f" Required: {trigger_classes} with min_conf={min_conf}") logger.debug(f" Available: {[(cls, regions_dict[cls]['confidence']) for cls in regions_dict]}") if missing_branches: logger.error(f"Pipeline pre-validation FAILED: required branches {missing_branches} will not execute") return False, missing_branches else: logger.info(f"Pipeline pre-validation PASSED: all required branches {list(required_branches)} will execute") return True, [] def run_pipeline(frame, node: dict, return_bbox: bool=False, context=None): """ Enhanced pipeline that supports: - Multi-class detection (detecting multiple classes simultaneously) - Parallel branch processing - Region-based actions and cropping - Context passing for session/camera information """ try: # Extract backend sessionId from context at the start of function backend_session_id = context.get("backend_session_id") if context else None camera_id = context.get("camera_id", "unknown") if context else "unknown" model_id = node.get("modelId", "unknown") if backend_session_id: logger.info(f"๐Ÿ”‘ PIPELINE USING BACKEND SESSION_ID: {backend_session_id} for camera {camera_id}") else: logger.debug(f"โŒ No backend session_id in pipeline context for camera {camera_id}") task = getattr(node["model"], "task", None) # โ”€โ”€โ”€ Classification stage โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ if task == "classify": results = node["model"].predict(frame, stream=False) if not results: return (None, None) if return_bbox else None r = results[0] probs = r.probs if probs is None: return (None, None) if return_bbox else None top1_idx = int(probs.top1) top1_conf = float(probs.top1conf) class_name = node["model"].names[top1_idx] det = { "class": class_name, "confidence": top1_conf, "id": None, class_name: class_name # Add class name as key for backward compatibility } # Add specific field mappings for database operations based on model type model_id = node.get("modelId", "").lower() if "brand" in model_id or "brand_cls" in model_id: det["brand"] = class_name elif "bodytype" in model_id or "body" in model_id: det["body_type"] = class_name elif "color" in model_id: det["color"] = class_name execute_actions(node, frame, det, context.get("regions_dict") if context else None) return (det, None) if return_bbox else det # โ”€โ”€โ”€ Session management check โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ if not is_camera_active(camera_id, model_id): logger.debug(f"โฐ Camera {camera_id}: Waiting for backend sessionId, skipping pipeline") return (None, None) if return_bbox else None # โ”€โ”€โ”€ Detection stage - Using structured detection function โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ all_detections, regions_dict = run_detection_with_tracking(frame, node, context) if not all_detections: logger.warning("No detections from structured detection function - returning null") return (None, None) if return_bbox else None # Extract bounding boxes for compatibility all_boxes = [det["bbox"] for det in all_detections] # โ”€โ”€โ”€ Stability validation (only for root pipeline node) โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ stability_threshold = node.get("stabilityThreshold", 1) if stability_threshold > 1: # Extract camera_id for stability check camera_id = context.get("camera_id", "unknown") if context else "unknown" model_id = node.get("modelId", "unknown") # Check if we have stable tracks for this specific camera has_stable_tracks, stable_detections = check_stable_tracks(camera_id, model_id, regions_dict) if not has_stable_tracks: logger.info(f"Camera {camera_id}: Track not stable yet (threshold: {stability_threshold}) - validation only, skipping branches") # Return early with just the detection result, no branch processing primary_detection = max(all_detections, key=lambda x: x["confidence"]) if all_detections else {"class": "none", "confidence": 0.0, "bbox": [0, 0, 0, 0]} primary_bbox = primary_detection.get("bbox", [0, 0, 0, 0]) return (primary_detection, primary_bbox) if return_bbox else primary_detection else: logger.info(f"Camera {camera_id}: Stable tracks {[det[1] for det in stable_detections]} detected - checking backend sessionId") # Check if we need to wait for backend sessionId if not backend_session_id: logger.info(f"Camera {camera_id}: Stable car detected, waiting for backend sessionId...") stability_data = get_camera_stability_data(camera_id, model_id) session_state = stability_data["session_state"] if not session_state.get("waiting_for_backend_session", False): # Start waiting for backend sessionId session_state["waiting_for_backend_session"] = True session_state["wait_start_time"] = time.time() logger.info(f"โณ Camera {camera_id}: WAITING FOR BACKEND SESSION_ID (timeout: {_session_timeout_seconds}s)") logger.info(f"๐Ÿ“ก Stable car detected - sending imageDetection to trigger backend session creation") # Return detection to signal backend, but don't proceed with pipeline primary_detection = max(all_detections, key=lambda x: x["confidence"]) if all_detections else {"class": "none", "confidence": 0.0, "bbox": [0, 0, 0, 0]} primary_bbox = primary_detection.get("bbox", [0, 0, 0, 0]) return (primary_detection, primary_bbox) if return_bbox else primary_detection logger.info(f"๐Ÿš€ Camera {camera_id}: BACKEND SESSION_ID AVAILABLE ({backend_session_id}) - proceeding with full pipeline") # โ”€โ”€โ”€ Pre-validate pipeline execution โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ pipeline_valid, missing_branches = validate_pipeline_execution(node, regions_dict) if not pipeline_valid: logger.error(f"Pipeline execution validation FAILED - required branches {missing_branches} cannot execute") logger.error("Aborting pipeline: no Redis actions or database records will be created") return (None, None) if return_bbox else None # โ”€โ”€โ”€ Execute actions with region information โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ detection_result = { "detections": all_detections, "regions": regions_dict, **(context or {}) } # โ”€โ”€โ”€ Check backend sessionId before database operations โ”€โ”€โ”€โ”€ if node.get("db_manager") and regions_dict: detected_classes = list(regions_dict.keys()) logger.debug(f"Valid detections found - checking for backend sessionId: {detected_classes}") if not backend_session_id: logger.error(f"๐Ÿšซ Camera {camera_id}: No backend sessionId available - cannot proceed with database operations") logger.error(f"๐Ÿšซ Camera {camera_id}: Pipeline requires backend sessionId for Redis/PostgreSQL operations") # Reset tracking and wait for new stable car reset_tracking_state(camera_id, model_id, "missing backend sessionId") return (None, None) if return_bbox else None # Use backend sessionId for database operations if detected_classes: from datetime import datetime display_id = detection_result.get("display_id", "unknown") timestamp = datetime.now().strftime("%Y-%m-%dT%H-%M-%S") inserted_session_id = node["db_manager"].insert_initial_detection( display_id=display_id, captured_timestamp=timestamp, session_id=backend_session_id # Use backend sessionId ) if inserted_session_id: detection_result["session_id"] = inserted_session_id detection_result["timestamp"] = timestamp logger.info(f"๐Ÿ’พ DATABASE RECORD CREATED with backend session_id: {inserted_session_id}") logger.debug(f"Database record: display_id={display_id}, timestamp={timestamp}") else: logger.error(f"Failed to create database record with backend session_id: {backend_session_id}") reset_tracking_state(camera_id, model_id, "database insertion failed") return (None, None) if return_bbox else None # Execute actions for root node only if it doesn't have branches # Branch nodes with actions will execute them after branch processing if not node.get("branches") or node.get("modelId") == "yolo11n": execute_actions(node, frame, detection_result, regions_dict) # โ”€โ”€โ”€ Branch processing (no stability check here) โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ if node["branches"]: branch_results = {} # Extract camera_id for logging camera_id = detection_result.get("camera_id", context.get("camera_id", "unknown") if context else "unknown") # Filter branches that should be triggered active_branches = [] for br in node["branches"]: trigger_classes = br.get("triggerClasses", []) min_conf = br.get("minConfidence", 0) logger.debug(f"Evaluating branch {br['modelId']}: trigger_classes={trigger_classes}, min_conf={min_conf}") # Check if any detected class matches branch trigger branch_triggered = False for det_class in regions_dict: det_confidence = regions_dict[det_class]["confidence"] logger.debug(f" Checking detected class '{det_class}' (confidence={det_confidence:.3f}) against triggers {trigger_classes}") if (det_class in trigger_classes and det_confidence >= min_conf): active_branches.append(br) branch_triggered = True logger.info(f"Branch {br['modelId']} activated by class '{det_class}' (conf={det_confidence:.3f} >= {min_conf})") break if not branch_triggered: logger.debug(f"Branch {br['modelId']} not triggered - no matching classes or insufficient confidence") if active_branches: if node.get("parallel", False) or any(br.get("parallel", False) for br in active_branches): # Run branches in parallel with concurrent.futures.ThreadPoolExecutor(max_workers=len(active_branches)) as executor: futures = {} for br in active_branches: sub_frame = frame crop_class = br.get("cropClass") logger.info(f"Starting parallel branch: {br['modelId']}, cropClass: {crop_class}") if br.get("crop", False) and crop_class: if crop_class in regions_dict: cropped = crop_region_by_class(frame, regions_dict, crop_class) if cropped is not None: sub_frame = cropped # Use cropped image without manual resizing logger.debug(f"Successfully cropped {crop_class} region for {br['modelId']} - model will handle resizing") else: logger.warning(f"Failed to crop {crop_class} region for {br['modelId']}, skipping branch") continue else: logger.warning(f"Crop class {crop_class} not found in detected regions for {br['modelId']}, skipping branch") continue # Add regions_dict to context for child branches branch_context = dict(context) if context else {} branch_context["regions_dict"] = regions_dict future = executor.submit(run_pipeline, sub_frame, br, True, branch_context) futures[future] = br # Collect results for future in concurrent.futures.as_completed(futures): br = futures[future] try: result, _ = future.result() if result: branch_results[br["modelId"]] = result logger.info(f"Branch {br['modelId']} completed: {result}") except Exception as e: logger.error(f"Branch {br['modelId']} failed: {e}") else: # Run branches sequentially for br in active_branches: sub_frame = frame crop_class = br.get("cropClass") logger.info(f"Starting sequential branch: {br['modelId']}, cropClass: {crop_class}") if br.get("crop", False) and crop_class: if crop_class in regions_dict: cropped = crop_region_by_class(frame, regions_dict, crop_class) if cropped is not None: sub_frame = cropped # Use cropped image without manual resizing logger.debug(f"Successfully cropped {crop_class} region for {br['modelId']} - model will handle resizing") else: logger.warning(f"Failed to crop {crop_class} region for {br['modelId']}, skipping branch") continue else: logger.warning(f"Crop class {crop_class} not found in detected regions for {br['modelId']}, skipping branch") continue try: # Add regions_dict to context for child branches branch_context = dict(context) if context else {} branch_context["regions_dict"] = regions_dict result, _ = run_pipeline(sub_frame, br, True, branch_context) if result: branch_results[br["modelId"]] = result logger.info(f"Branch {br['modelId']} completed: {result}") else: logger.warning(f"Branch {br['modelId']} returned no result") except Exception as e: logger.error(f"Error in sequential branch {br['modelId']}: {e}") import traceback logger.debug(f"Branch error traceback: {traceback.format_exc()}") # Store branch results in detection_result for parallel actions detection_result["branch_results"] = branch_results # โ”€โ”€โ”€ Execute Parallel Actions โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ if node.get("parallelActions") and "branch_results" in detection_result: execute_parallel_actions(node, frame, detection_result, regions_dict) # โ”€โ”€โ”€ Note: Tracking will be reset when backend sends setSessionId: null โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ logger.info(f"Camera {camera_id}: Pipeline completed successfully - waiting for backend to end session") # โ”€โ”€โ”€ Execute actions after successful detection AND branch processing โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ # This ensures detection nodes (like frontal_detection_v1) execute their actions # after completing both detection and branch processing if node.get("actions") and regions_dict and node.get("modelId") != "yolo11n": # Execute actions for branch detection nodes, skip root to avoid duplication logger.debug(f"Executing post-detection actions for branch node {node.get('modelId')}") execute_actions(node, frame, detection_result, regions_dict) # โ”€โ”€โ”€ Return detection result โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ primary_detection = max(all_detections, key=lambda x: x["confidence"]) primary_bbox = primary_detection["bbox"] # Add branch results and session_id to primary detection for compatibility if "branch_results" in detection_result: primary_detection["branch_results"] = detection_result["branch_results"] if "session_id" in detection_result: primary_detection["session_id"] = detection_result["session_id"] return (primary_detection, primary_bbox) if return_bbox else primary_detection except Exception as e: logger.error(f"Error in node {node.get('modelId')}: {e}") import traceback traceback.print_exc() return (None, None) if return_bbox else None