diff --git a/core/detection/branches.py b/core/detection/branches.py index 247c5f8..9359ea8 100644 --- a/core/detection/branches.py +++ b/core/detection/branches.py @@ -35,6 +35,9 @@ class BranchProcessor: # Branch models cache self.branch_models: Dict[str, YOLOWrapper] = {} + # Dynamic field mapping: branch_id → output_field_name (e.g., {"car_brand_cls_v3": "brand"}) + self.branch_output_fields: Dict[str, str] = {} + # Thread pool for parallel execution self.executor = ThreadPoolExecutor(max_workers=4) @@ -68,6 +71,9 @@ class BranchProcessor: self.redis_manager = redis_manager self.db_manager = db_manager + # Parse field mappings from parallelActions to enable dynamic field extraction + self._parse_branch_output_fields(pipeline_config) + # Pre-load branch models if they exist branches = getattr(pipeline_config, 'branches', []) if branches: @@ -141,6 +147,46 @@ class BranchProcessor: logger.error(f"Error loading branch model {getattr(branch_config, 'model_id', 'unknown')}: {e}") return None + def _parse_branch_output_fields(self, pipeline_config: Any) -> None: + """ + Parse parallelActions.fields to determine what output field each branch produces. + Creates dynamic mapping from branch_id to output field name. + + Example: + Input: parallelActions.fields = {"car_brand": "{car_brand_cls_v3.brand}"} + Output: self.branch_output_fields = {"car_brand_cls_v3": "brand"} + + Args: + pipeline_config: Pipeline configuration object + """ + try: + if not pipeline_config or not hasattr(pipeline_config, 'parallel_actions'): + logger.debug("[FIELD MAPPING] No parallelActions found in pipeline config") + return + + for action in pipeline_config.parallel_actions: + if action.type.value == 'postgresql_update_combined': + fields = action.params.get('fields', {}) + + # Parse each field template to extract branch_id and field_name + for db_field_name, template in fields.items(): + # Template format: "{branch_id.field_name}" + if template.startswith('{') and template.endswith('}'): + var_name = template[1:-1] # Remove { } + + if '.' in var_name: + branch_id, field_name = var_name.split('.', 1) + + # Store the mapping + self.branch_output_fields[branch_id] = field_name + + logger.info(f"[FIELD MAPPING] Branch '{branch_id}' → outputs field '{field_name}'") + + logger.info(f"[FIELD MAPPING] Parsed {len(self.branch_output_fields)} branch output field mappings") + + except Exception as e: + logger.error(f"[FIELD MAPPING] Error parsing branch output fields: {e}", exc_info=True) + async def execute_branches(self, frame: np.ndarray, branches: List[Any], @@ -350,10 +396,11 @@ class BranchProcessor: logger.debug(f"[REGION DATA] {branch_id}: '{region_name}' -> bbox={region_data.get('bbox')}, conf={region_data.get('confidence')}") if trigger_classes: - # Check if any parent detection matches our trigger classes + # Check if any parent detection matches our trigger classes (case-insensitive) should_execute = False for trigger_class in trigger_classes: - if trigger_class in detected_regions: + # Case-insensitive comparison for robustness + if trigger_class.lower() in [k.lower() for k in detected_regions.keys()]: should_execute = True logger.info(f"[TRIGGER CHECK] {branch_id}: Found '{trigger_class}' in parent detections - branch will execute") break @@ -410,16 +457,15 @@ class BranchProcessor: region = detected_regions[crop_class] confidence = region.get('confidence', 0.0) - # Only use detections above min_confidence - if confidence >= min_confidence: - bbox = region['bbox'] - area = (bbox[2] - bbox[0]) * (bbox[3] - bbox[1]) # width * height + # Select largest bbox (no confidence filtering - parent already validated it) + bbox = region['bbox'] + area = (bbox[2] - bbox[0]) * (bbox[3] - bbox[1]) # width * height - # Choose biggest bbox among valid detections - if area > best_area: - best_region = region - best_class = crop_class - best_area = area + # Choose biggest bbox among available detections + if area > best_area: + best_region = region + best_class = crop_class + best_area = area if best_region: bbox = best_region['bbox'] @@ -478,17 +524,25 @@ class BranchProcessor: top_indices = probs.top5 # Get top 5 predictions top_conf = probs.top5conf.cpu().numpy() - for idx, conf in zip(top_indices, top_conf): - if conf >= min_confidence: - class_name = model.model.names[int(idx)] - logger.debug(f"[CLASSIFICATION RESULT {len(branch_detections)+1}] {branch_id}: '{class_name}', conf={conf:.3f}") + # For classification: take only TOP-1 prediction (not all top-5) + # This prevents empty results when all top-5 predictions are below threshold + if len(top_indices) > 0 and len(top_conf) > 0: + top_idx = top_indices[0] + top_confidence = float(top_conf[0]) + + # Apply minConfidence threshold to top-1 only + if top_confidence >= min_confidence: + class_name = model.model.names[int(top_idx)] + logger.info(f"[CLASSIFICATION TOP-1] {branch_id}: '{class_name}', conf={top_confidence:.3f}") # For classification, use full input frame dimensions as bbox branch_detections.append({ 'class_name': class_name, - 'confidence': float(conf), + 'confidence': top_confidence, 'bbox': [0, 0, input_frame.shape[1], input_frame.shape[0]] }) + else: + logger.warning(f"[CLASSIFICATION FILTERED] {branch_id}: Top prediction conf={top_confidence:.3f} < threshold={min_confidence}") else: logger.warning(f"[UNKNOWN MODEL] {branch_id}: Model results have no .boxes or .probs") @@ -499,22 +553,27 @@ class BranchProcessor: logger.info(f"[FINAL RESULTS] {branch_id}: {len(branch_detections)} detections processed") - # Extract best result for classification models + # Determine output field name from dynamic mapping (parsed from parallelActions.fields) + output_field = self.branch_output_fields.get(branch_id) + + # Always initialize the field (even if None) to ensure it exists for database update + if output_field: + result['result'][output_field] = None + logger.debug(f"[FIELD INIT] {branch_id}: Initialized field '{output_field}' = None") + + # Extract best detection if available if branch_detections: best_detection = max(branch_detections, key=lambda x: x['confidence']) logger.info(f"[BEST DETECTION] {branch_id}: '{best_detection['class_name']}' with confidence {best_detection['confidence']:.3f}") - # Add classification-style results for database operations - if 'brand' in branch_id.lower(): - result['result']['brand'] = best_detection['class_name'] - elif 'body' in branch_id.lower() or 'bodytype' in branch_id.lower(): - result['result']['body_type'] = best_detection['class_name'] - elif 'front_rear' in branch_id.lower(): - result['result']['front_rear'] = best_detection['confidence'] - - logger.info(f"[CLASSIFICATION RESULT] {branch_id}: Extracted classification fields") + # Set the output field value using dynamic mapping + if output_field: + result['result'][output_field] = best_detection['class_name'] + logger.info(f"[FIELD SET] {branch_id}: Set field '{output_field}' = '{best_detection['class_name']}'") + else: + logger.warning(f"[NO MAPPING] {branch_id}: No output field defined in parallelActions.fields") else: - logger.warning(f"[NO RESULTS] {branch_id}: No detections found") + logger.warning(f"[NO RESULTS] {branch_id}: No detections found, field '{output_field}' remains None") # Execute branch actions if this branch found valid detections actions_executed = [] diff --git a/core/streaming/readers/ffmpeg_rtsp.py b/core/streaming/readers/ffmpeg_rtsp.py index 88f45ae..e469c9e 100644 --- a/core/streaming/readers/ffmpeg_rtsp.py +++ b/core/streaming/readers/ffmpeg_rtsp.py @@ -113,8 +113,8 @@ class FFmpegRTSPReader(VideoReader): cmd = [ 'ffmpeg', # DO NOT REMOVE - '-hwaccel', 'cuda', - '-hwaccel_device', '0', + # '-hwaccel', 'cuda', + # '-hwaccel_device', '0', # Real-time input flags '-fflags', 'nobuffer+genpts', '-flags', 'low_delay',