fix: classification top-1, dynamic result field, removed crop filter
	
		
			
	
		
	
	
		
	
		
			Some checks failed
		
		
	
	
		
			
				
	
				Build Worker Base and Application Images / check-base-changes (push) Failing after 16s
				
			
		
			
				
	
				Build Worker Base and Application Images / build-base (push) Has been skipped
				
			
		
			
				
	
				Build Worker Base and Application Images / build-docker (push) Successful in 3m31s
				
			
		
			
				
	
				Build Worker Base and Application Images / deploy-stack (push) Successful in 25s
				
			
		
		
	
	
				
					
				
			
		
			Some checks failed
		
		
	
	Build Worker Base and Application Images / check-base-changes (push) Failing after 16s
				
			Build Worker Base and Application Images / build-base (push) Has been skipped
				
			Build Worker Base and Application Images / build-docker (push) Successful in 3m31s
				
			Build Worker Base and Application Images / deploy-stack (push) Successful in 25s
				
			This commit is contained in:
		
							parent
							
								
									498b285e80
								
							
						
					
					
						commit
						5e59e00c55
					
				
					 2 changed files with 88 additions and 29 deletions
				
			
		| 
						 | 
					@ -35,6 +35,9 @@ class BranchProcessor:
 | 
				
			||||||
        # Branch models cache
 | 
					        # Branch models cache
 | 
				
			||||||
        self.branch_models: Dict[str, YOLOWrapper] = {}
 | 
					        self.branch_models: Dict[str, YOLOWrapper] = {}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        # Dynamic field mapping: branch_id → output_field_name (e.g., {"car_brand_cls_v3": "brand"})
 | 
				
			||||||
 | 
					        self.branch_output_fields: Dict[str, str] = {}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        # Thread pool for parallel execution
 | 
					        # Thread pool for parallel execution
 | 
				
			||||||
        self.executor = ThreadPoolExecutor(max_workers=4)
 | 
					        self.executor = ThreadPoolExecutor(max_workers=4)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -68,6 +71,9 @@ class BranchProcessor:
 | 
				
			||||||
            self.redis_manager = redis_manager
 | 
					            self.redis_manager = redis_manager
 | 
				
			||||||
            self.db_manager = db_manager
 | 
					            self.db_manager = db_manager
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            # Parse field mappings from parallelActions to enable dynamic field extraction
 | 
				
			||||||
 | 
					            self._parse_branch_output_fields(pipeline_config)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            # Pre-load branch models if they exist
 | 
					            # Pre-load branch models if they exist
 | 
				
			||||||
            branches = getattr(pipeline_config, 'branches', [])
 | 
					            branches = getattr(pipeline_config, 'branches', [])
 | 
				
			||||||
            if branches:
 | 
					            if branches:
 | 
				
			||||||
| 
						 | 
					@ -141,6 +147,46 @@ class BranchProcessor:
 | 
				
			||||||
            logger.error(f"Error loading branch model {getattr(branch_config, 'model_id', 'unknown')}: {e}")
 | 
					            logger.error(f"Error loading branch model {getattr(branch_config, 'model_id', 'unknown')}: {e}")
 | 
				
			||||||
            return None
 | 
					            return None
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def _parse_branch_output_fields(self, pipeline_config: Any) -> None:
 | 
				
			||||||
 | 
					        """
 | 
				
			||||||
 | 
					        Parse parallelActions.fields to determine what output field each branch produces.
 | 
				
			||||||
 | 
					        Creates dynamic mapping from branch_id to output field name.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        Example:
 | 
				
			||||||
 | 
					            Input: parallelActions.fields = {"car_brand": "{car_brand_cls_v3.brand}"}
 | 
				
			||||||
 | 
					            Output: self.branch_output_fields = {"car_brand_cls_v3": "brand"}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        Args:
 | 
				
			||||||
 | 
					            pipeline_config: Pipeline configuration object
 | 
				
			||||||
 | 
					        """
 | 
				
			||||||
 | 
					        try:
 | 
				
			||||||
 | 
					            if not pipeline_config or not hasattr(pipeline_config, 'parallel_actions'):
 | 
				
			||||||
 | 
					                logger.debug("[FIELD MAPPING] No parallelActions found in pipeline config")
 | 
				
			||||||
 | 
					                return
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            for action in pipeline_config.parallel_actions:
 | 
				
			||||||
 | 
					                if action.type.value == 'postgresql_update_combined':
 | 
				
			||||||
 | 
					                    fields = action.params.get('fields', {})
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                    # Parse each field template to extract branch_id and field_name
 | 
				
			||||||
 | 
					                    for db_field_name, template in fields.items():
 | 
				
			||||||
 | 
					                        # Template format: "{branch_id.field_name}"
 | 
				
			||||||
 | 
					                        if template.startswith('{') and template.endswith('}'):
 | 
				
			||||||
 | 
					                            var_name = template[1:-1]  # Remove { }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                            if '.' in var_name:
 | 
				
			||||||
 | 
					                                branch_id, field_name = var_name.split('.', 1)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                                # Store the mapping
 | 
				
			||||||
 | 
					                                self.branch_output_fields[branch_id] = field_name
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                                logger.info(f"[FIELD MAPPING] Branch '{branch_id}' → outputs field '{field_name}'")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            logger.info(f"[FIELD MAPPING] Parsed {len(self.branch_output_fields)} branch output field mappings")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        except Exception as e:
 | 
				
			||||||
 | 
					            logger.error(f"[FIELD MAPPING] Error parsing branch output fields: {e}", exc_info=True)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    async def execute_branches(self,
 | 
					    async def execute_branches(self,
 | 
				
			||||||
                             frame: np.ndarray,
 | 
					                             frame: np.ndarray,
 | 
				
			||||||
                             branches: List[Any],
 | 
					                             branches: List[Any],
 | 
				
			||||||
| 
						 | 
					@ -350,10 +396,11 @@ class BranchProcessor:
 | 
				
			||||||
            logger.debug(f"[REGION DATA] {branch_id}: '{region_name}' -> bbox={region_data.get('bbox')}, conf={region_data.get('confidence')}")
 | 
					            logger.debug(f"[REGION DATA] {branch_id}: '{region_name}' -> bbox={region_data.get('bbox')}, conf={region_data.get('confidence')}")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        if trigger_classes:
 | 
					        if trigger_classes:
 | 
				
			||||||
            # Check if any parent detection matches our trigger classes
 | 
					            # Check if any parent detection matches our trigger classes (case-insensitive)
 | 
				
			||||||
            should_execute = False
 | 
					            should_execute = False
 | 
				
			||||||
            for trigger_class in trigger_classes:
 | 
					            for trigger_class in trigger_classes:
 | 
				
			||||||
                if trigger_class in detected_regions:
 | 
					                # Case-insensitive comparison for robustness
 | 
				
			||||||
 | 
					                if trigger_class.lower() in [k.lower() for k in detected_regions.keys()]:
 | 
				
			||||||
                    should_execute = True
 | 
					                    should_execute = True
 | 
				
			||||||
                    logger.info(f"[TRIGGER CHECK] {branch_id}: Found '{trigger_class}' in parent detections - branch will execute")
 | 
					                    logger.info(f"[TRIGGER CHECK] {branch_id}: Found '{trigger_class}' in parent detections - branch will execute")
 | 
				
			||||||
                    break
 | 
					                    break
 | 
				
			||||||
| 
						 | 
					@ -410,16 +457,15 @@ class BranchProcessor:
 | 
				
			||||||
                        region = detected_regions[crop_class]
 | 
					                        region = detected_regions[crop_class]
 | 
				
			||||||
                        confidence = region.get('confidence', 0.0)
 | 
					                        confidence = region.get('confidence', 0.0)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                        # Only use detections above min_confidence
 | 
					                        # Select largest bbox (no confidence filtering - parent already validated it)
 | 
				
			||||||
                        if confidence >= min_confidence:
 | 
					                        bbox = region['bbox']
 | 
				
			||||||
                            bbox = region['bbox']
 | 
					                        area = (bbox[2] - bbox[0]) * (bbox[3] - bbox[1])  # width * height
 | 
				
			||||||
                            area = (bbox[2] - bbox[0]) * (bbox[3] - bbox[1])  # width * height
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
                            # Choose biggest bbox among valid detections
 | 
					                        # Choose biggest bbox among available detections
 | 
				
			||||||
                            if area > best_area:
 | 
					                        if area > best_area:
 | 
				
			||||||
                                best_region = region
 | 
					                            best_region = region
 | 
				
			||||||
                                best_class = crop_class
 | 
					                            best_class = crop_class
 | 
				
			||||||
                                best_area = area
 | 
					                            best_area = area
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                if best_region:
 | 
					                if best_region:
 | 
				
			||||||
                    bbox = best_region['bbox']
 | 
					                    bbox = best_region['bbox']
 | 
				
			||||||
| 
						 | 
					@ -478,17 +524,25 @@ class BranchProcessor:
 | 
				
			||||||
                    top_indices = probs.top5  # Get top 5 predictions
 | 
					                    top_indices = probs.top5  # Get top 5 predictions
 | 
				
			||||||
                    top_conf = probs.top5conf.cpu().numpy()
 | 
					                    top_conf = probs.top5conf.cpu().numpy()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                    for idx, conf in zip(top_indices, top_conf):
 | 
					                    # For classification: take only TOP-1 prediction (not all top-5)
 | 
				
			||||||
                        if conf >= min_confidence:
 | 
					                    # This prevents empty results when all top-5 predictions are below threshold
 | 
				
			||||||
                            class_name = model.model.names[int(idx)]
 | 
					                    if len(top_indices) > 0 and len(top_conf) > 0:
 | 
				
			||||||
                            logger.debug(f"[CLASSIFICATION RESULT {len(branch_detections)+1}] {branch_id}: '{class_name}', conf={conf:.3f}")
 | 
					                        top_idx = top_indices[0]
 | 
				
			||||||
 | 
					                        top_confidence = float(top_conf[0])
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                        # Apply minConfidence threshold to top-1 only
 | 
				
			||||||
 | 
					                        if top_confidence >= min_confidence:
 | 
				
			||||||
 | 
					                            class_name = model.model.names[int(top_idx)]
 | 
				
			||||||
 | 
					                            logger.info(f"[CLASSIFICATION TOP-1] {branch_id}: '{class_name}', conf={top_confidence:.3f}")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                            # For classification, use full input frame dimensions as bbox
 | 
					                            # For classification, use full input frame dimensions as bbox
 | 
				
			||||||
                            branch_detections.append({
 | 
					                            branch_detections.append({
 | 
				
			||||||
                                'class_name': class_name,
 | 
					                                'class_name': class_name,
 | 
				
			||||||
                                'confidence': float(conf),
 | 
					                                'confidence': top_confidence,
 | 
				
			||||||
                                'bbox': [0, 0, input_frame.shape[1], input_frame.shape[0]]
 | 
					                                'bbox': [0, 0, input_frame.shape[1], input_frame.shape[0]]
 | 
				
			||||||
                            })
 | 
					                            })
 | 
				
			||||||
 | 
					                        else:
 | 
				
			||||||
 | 
					                            logger.warning(f"[CLASSIFICATION FILTERED] {branch_id}: Top prediction conf={top_confidence:.3f} < threshold={min_confidence}")
 | 
				
			||||||
                else:
 | 
					                else:
 | 
				
			||||||
                    logger.warning(f"[UNKNOWN MODEL] {branch_id}: Model results have no .boxes or .probs")
 | 
					                    logger.warning(f"[UNKNOWN MODEL] {branch_id}: Model results have no .boxes or .probs")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -499,22 +553,27 @@ class BranchProcessor:
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            logger.info(f"[FINAL RESULTS] {branch_id}: {len(branch_detections)} detections processed")
 | 
					            logger.info(f"[FINAL RESULTS] {branch_id}: {len(branch_detections)} detections processed")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            # Extract best result for classification models
 | 
					            # Determine output field name from dynamic mapping (parsed from parallelActions.fields)
 | 
				
			||||||
 | 
					            output_field = self.branch_output_fields.get(branch_id)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            # Always initialize the field (even if None) to ensure it exists for database update
 | 
				
			||||||
 | 
					            if output_field:
 | 
				
			||||||
 | 
					                result['result'][output_field] = None
 | 
				
			||||||
 | 
					                logger.debug(f"[FIELD INIT] {branch_id}: Initialized field '{output_field}' = None")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            # Extract best detection if available
 | 
				
			||||||
            if branch_detections:
 | 
					            if branch_detections:
 | 
				
			||||||
                best_detection = max(branch_detections, key=lambda x: x['confidence'])
 | 
					                best_detection = max(branch_detections, key=lambda x: x['confidence'])
 | 
				
			||||||
                logger.info(f"[BEST DETECTION] {branch_id}: '{best_detection['class_name']}' with confidence {best_detection['confidence']:.3f}")
 | 
					                logger.info(f"[BEST DETECTION] {branch_id}: '{best_detection['class_name']}' with confidence {best_detection['confidence']:.3f}")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                # Add classification-style results for database operations
 | 
					                # Set the output field value using dynamic mapping
 | 
				
			||||||
                if 'brand' in branch_id.lower():
 | 
					                if output_field:
 | 
				
			||||||
                    result['result']['brand'] = best_detection['class_name']
 | 
					                    result['result'][output_field] = best_detection['class_name']
 | 
				
			||||||
                elif 'body' in branch_id.lower() or 'bodytype' in branch_id.lower():
 | 
					                    logger.info(f"[FIELD SET] {branch_id}: Set field '{output_field}' = '{best_detection['class_name']}'")
 | 
				
			||||||
                    result['result']['body_type'] = best_detection['class_name']
 | 
					                else:
 | 
				
			||||||
                elif 'front_rear' in branch_id.lower():
 | 
					                    logger.warning(f"[NO MAPPING] {branch_id}: No output field defined in parallelActions.fields")
 | 
				
			||||||
                    result['result']['front_rear'] = best_detection['confidence']
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
                logger.info(f"[CLASSIFICATION RESULT] {branch_id}: Extracted classification fields")
 | 
					 | 
				
			||||||
            else:
 | 
					            else:
 | 
				
			||||||
                logger.warning(f"[NO RESULTS] {branch_id}: No detections found")
 | 
					                logger.warning(f"[NO RESULTS] {branch_id}: No detections found, field '{output_field}' remains None")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            # Execute branch actions if this branch found valid detections
 | 
					            # Execute branch actions if this branch found valid detections
 | 
				
			||||||
            actions_executed = []
 | 
					            actions_executed = []
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -113,8 +113,8 @@ class FFmpegRTSPReader(VideoReader):
 | 
				
			||||||
        cmd = [
 | 
					        cmd = [
 | 
				
			||||||
            'ffmpeg',
 | 
					            'ffmpeg',
 | 
				
			||||||
            # DO NOT REMOVE
 | 
					            # DO NOT REMOVE
 | 
				
			||||||
            '-hwaccel', 'cuda',
 | 
					            # '-hwaccel', 'cuda',
 | 
				
			||||||
            '-hwaccel_device', '0',
 | 
					            # '-hwaccel_device', '0',
 | 
				
			||||||
            # Real-time input flags
 | 
					            # Real-time input flags
 | 
				
			||||||
            '-fflags', 'nobuffer+genpts',
 | 
					            '-fflags', 'nobuffer+genpts',
 | 
				
			||||||
            '-flags', 'low_delay',
 | 
					            '-flags', 'low_delay',
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue