dev #28
					 4 changed files with 212 additions and 61 deletions
				
			
		| 
						 | 
				
			
			@ -103,10 +103,4 @@ jobs:
 | 
			
		|||
      - name: Deploy stack
 | 
			
		||||
        run: |
 | 
			
		||||
          echo "Pulling and starting containers on server..."
 | 
			
		||||
          if [ "${{ github.ref_name }}" = "main" ]; then
 | 
			
		||||
            echo "Deploying production stack..."
 | 
			
		||||
            ssh -i ~/.ssh/id_rsa ${{ vars.DEPLOY_USER_CMS }}@${{ vars.DEPLOY_HOST_CMS }} "cd ~/cms-system-k8s && docker compose -f docker-compose.production.yml pull && docker compose -f docker-compose.production.yml up -d"
 | 
			
		||||
          else
 | 
			
		||||
            echo "Deploying staging stack..."
 | 
			
		||||
            ssh -i ~/.ssh/id_rsa ${{ vars.DEPLOY_USER_CMS }}@${{ vars.DEPLOY_HOST_CMS }} "cd ~/cms-system-k8s && docker compose -f docker-compose.staging.yml pull && docker compose -f docker-compose.staging.yml up -d"
 | 
			
		||||
          fi
 | 
			
		||||
          ssh -i ~/.ssh/id_rsa ${{ vars.DEPLOY_USER_CMS }}@${{ vars.DEPLOY_HOST_CMS }} "cd ~/cms-system-k8s && docker compose -f docker-compose.staging.yml -f docker-compose.production.yml pull && docker compose -f docker-compose.staging.yml -f docker-compose.production.yml up -d"
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -35,6 +35,9 @@ class BranchProcessor:
 | 
			
		|||
        # Branch models cache
 | 
			
		||||
        self.branch_models: Dict[str, YOLOWrapper] = {}
 | 
			
		||||
 | 
			
		||||
        # Dynamic field mapping: branch_id → output_field_name (e.g., {"car_brand_cls_v3": "brand"})
 | 
			
		||||
        self.branch_output_fields: Dict[str, str] = {}
 | 
			
		||||
 | 
			
		||||
        # Thread pool for parallel execution
 | 
			
		||||
        self.executor = ThreadPoolExecutor(max_workers=4)
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -68,6 +71,9 @@ class BranchProcessor:
 | 
			
		|||
            self.redis_manager = redis_manager
 | 
			
		||||
            self.db_manager = db_manager
 | 
			
		||||
 | 
			
		||||
            # Parse field mappings from parallelActions to enable dynamic field extraction
 | 
			
		||||
            self._parse_branch_output_fields(pipeline_config)
 | 
			
		||||
 | 
			
		||||
            # Pre-load branch models if they exist
 | 
			
		||||
            branches = getattr(pipeline_config, 'branches', [])
 | 
			
		||||
            if branches:
 | 
			
		||||
| 
						 | 
				
			
			@ -141,6 +147,46 @@ class BranchProcessor:
 | 
			
		|||
            logger.error(f"Error loading branch model {getattr(branch_config, 'model_id', 'unknown')}: {e}")
 | 
			
		||||
            return None
 | 
			
		||||
 | 
			
		||||
    def _parse_branch_output_fields(self, pipeline_config: Any) -> None:
 | 
			
		||||
        """
 | 
			
		||||
        Parse parallelActions.fields to determine what output field each branch produces.
 | 
			
		||||
        Creates dynamic mapping from branch_id to output field name.
 | 
			
		||||
 | 
			
		||||
        Example:
 | 
			
		||||
            Input: parallelActions.fields = {"car_brand": "{car_brand_cls_v3.brand}"}
 | 
			
		||||
            Output: self.branch_output_fields = {"car_brand_cls_v3": "brand"}
 | 
			
		||||
 | 
			
		||||
        Args:
 | 
			
		||||
            pipeline_config: Pipeline configuration object
 | 
			
		||||
        """
 | 
			
		||||
        try:
 | 
			
		||||
            if not pipeline_config or not hasattr(pipeline_config, 'parallel_actions'):
 | 
			
		||||
                logger.debug("[FIELD MAPPING] No parallelActions found in pipeline config")
 | 
			
		||||
                return
 | 
			
		||||
 | 
			
		||||
            for action in pipeline_config.parallel_actions:
 | 
			
		||||
                if action.type.value == 'postgresql_update_combined':
 | 
			
		||||
                    fields = action.params.get('fields', {})
 | 
			
		||||
 | 
			
		||||
                    # Parse each field template to extract branch_id and field_name
 | 
			
		||||
                    for db_field_name, template in fields.items():
 | 
			
		||||
                        # Template format: "{branch_id.field_name}"
 | 
			
		||||
                        if template.startswith('{') and template.endswith('}'):
 | 
			
		||||
                            var_name = template[1:-1]  # Remove { }
 | 
			
		||||
 | 
			
		||||
                            if '.' in var_name:
 | 
			
		||||
                                branch_id, field_name = var_name.split('.', 1)
 | 
			
		||||
 | 
			
		||||
                                # Store the mapping
 | 
			
		||||
                                self.branch_output_fields[branch_id] = field_name
 | 
			
		||||
 | 
			
		||||
                                logger.info(f"[FIELD MAPPING] Branch '{branch_id}' → outputs field '{field_name}'")
 | 
			
		||||
 | 
			
		||||
            logger.info(f"[FIELD MAPPING] Parsed {len(self.branch_output_fields)} branch output field mappings")
 | 
			
		||||
 | 
			
		||||
        except Exception as e:
 | 
			
		||||
            logger.error(f"[FIELD MAPPING] Error parsing branch output fields: {e}", exc_info=True)
 | 
			
		||||
 | 
			
		||||
    async def execute_branches(self,
 | 
			
		||||
                             frame: np.ndarray,
 | 
			
		||||
                             branches: List[Any],
 | 
			
		||||
| 
						 | 
				
			
			@ -347,13 +393,19 @@ class BranchProcessor:
 | 
			
		|||
        trigger_classes = getattr(branch_config, 'trigger_classes', [])
 | 
			
		||||
        logger.info(f"[DETECTED REGIONS] {branch_id}: Available parent detections: {list(detected_regions.keys())}")
 | 
			
		||||
        for region_name, region_data in detected_regions.items():
 | 
			
		||||
            logger.debug(f"[REGION DATA] {branch_id}: '{region_name}' -> bbox={region_data.get('bbox')}, conf={region_data.get('confidence')}")
 | 
			
		||||
            # Handle both list (new) and single dict (backward compat)
 | 
			
		||||
            if isinstance(region_data, list):
 | 
			
		||||
                for i, region in enumerate(region_data):
 | 
			
		||||
                    logger.debug(f"[REGION DATA] {branch_id}: '{region_name}[{i}]' -> bbox={region.get('bbox')}, conf={region.get('confidence')}")
 | 
			
		||||
            else:
 | 
			
		||||
                logger.debug(f"[REGION DATA] {branch_id}: '{region_name}' -> bbox={region_data.get('bbox')}, conf={region_data.get('confidence')}")
 | 
			
		||||
 | 
			
		||||
        if trigger_classes:
 | 
			
		||||
            # Check if any parent detection matches our trigger classes
 | 
			
		||||
            # Check if any parent detection matches our trigger classes (case-insensitive)
 | 
			
		||||
            should_execute = False
 | 
			
		||||
            for trigger_class in trigger_classes:
 | 
			
		||||
                if trigger_class in detected_regions:
 | 
			
		||||
                # Case-insensitive comparison for robustness
 | 
			
		||||
                if trigger_class.lower() in [k.lower() for k in detected_regions.keys()]:
 | 
			
		||||
                    should_execute = True
 | 
			
		||||
                    logger.info(f"[TRIGGER CHECK] {branch_id}: Found '{trigger_class}' in parent detections - branch will execute")
 | 
			
		||||
                    break
 | 
			
		||||
| 
						 | 
				
			
			@ -407,19 +459,24 @@ class BranchProcessor:
 | 
			
		|||
 | 
			
		||||
                for crop_class in crop_classes:
 | 
			
		||||
                    if crop_class in detected_regions:
 | 
			
		||||
                        region = detected_regions[crop_class]
 | 
			
		||||
                        confidence = region.get('confidence', 0.0)
 | 
			
		||||
                        regions = detected_regions[crop_class]
 | 
			
		||||
 | 
			
		||||
                        # Only use detections above min_confidence
 | 
			
		||||
                        if confidence >= min_confidence:
 | 
			
		||||
                        # Handle both list (new) and single dict (backward compat)
 | 
			
		||||
                        if not isinstance(regions, list):
 | 
			
		||||
                            regions = [regions]
 | 
			
		||||
 | 
			
		||||
                        # Find largest bbox from all detections of this class
 | 
			
		||||
                        for region in regions:
 | 
			
		||||
                            confidence = region.get('confidence', 0.0)
 | 
			
		||||
                            bbox = region['bbox']
 | 
			
		||||
                            area = (bbox[2] - bbox[0]) * (bbox[3] - bbox[1])  # width * height
 | 
			
		||||
 | 
			
		||||
                            # Choose biggest bbox among valid detections
 | 
			
		||||
                            # Choose biggest bbox among all available detections
 | 
			
		||||
                            if area > best_area:
 | 
			
		||||
                                best_region = region
 | 
			
		||||
                                best_class = crop_class
 | 
			
		||||
                                best_area = area
 | 
			
		||||
                                logger.debug(f"[CROP] Selected larger bbox for '{crop_class}': area={area:.0f}px², conf={confidence:.3f}")
 | 
			
		||||
 | 
			
		||||
                if best_region:
 | 
			
		||||
                    bbox = best_region['bbox']
 | 
			
		||||
| 
						 | 
				
			
			@ -437,7 +494,6 @@ class BranchProcessor:
 | 
			
		|||
            logger.info(f"[INFERENCE START] {branch_id}: Running inference on {'cropped' if input_frame is not frame else 'full'} frame "
 | 
			
		||||
                       f"({input_frame.shape[1]}x{input_frame.shape[0]}) with confidence={min_confidence}")
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
            # Use .predict() method for both detection and classification models
 | 
			
		||||
            inference_start = time.time()
 | 
			
		||||
            detection_results = model.model.predict(input_frame, conf=min_confidence, verbose=False)
 | 
			
		||||
| 
						 | 
				
			
			@ -478,17 +534,25 @@ class BranchProcessor:
 | 
			
		|||
                    top_indices = probs.top5  # Get top 5 predictions
 | 
			
		||||
                    top_conf = probs.top5conf.cpu().numpy()
 | 
			
		||||
 | 
			
		||||
                    for idx, conf in zip(top_indices, top_conf):
 | 
			
		||||
                        if conf >= min_confidence:
 | 
			
		||||
                            class_name = model.model.names[int(idx)]
 | 
			
		||||
                            logger.debug(f"[CLASSIFICATION RESULT {len(branch_detections)+1}] {branch_id}: '{class_name}', conf={conf:.3f}")
 | 
			
		||||
                    # For classification: take only TOP-1 prediction (not all top-5)
 | 
			
		||||
                    # This prevents empty results when all top-5 predictions are below threshold
 | 
			
		||||
                    if len(top_indices) > 0 and len(top_conf) > 0:
 | 
			
		||||
                        top_idx = top_indices[0]
 | 
			
		||||
                        top_confidence = float(top_conf[0])
 | 
			
		||||
 | 
			
		||||
                        # Apply minConfidence threshold to top-1 only
 | 
			
		||||
                        if top_confidence >= min_confidence:
 | 
			
		||||
                            class_name = model.model.names[int(top_idx)]
 | 
			
		||||
                            logger.info(f"[CLASSIFICATION TOP-1] {branch_id}: '{class_name}', conf={top_confidence:.3f}")
 | 
			
		||||
 | 
			
		||||
                            # For classification, use full input frame dimensions as bbox
 | 
			
		||||
                            branch_detections.append({
 | 
			
		||||
                                'class_name': class_name,
 | 
			
		||||
                                'confidence': float(conf),
 | 
			
		||||
                                'confidence': top_confidence,
 | 
			
		||||
                                'bbox': [0, 0, input_frame.shape[1], input_frame.shape[0]]
 | 
			
		||||
                            })
 | 
			
		||||
                        else:
 | 
			
		||||
                            logger.warning(f"[CLASSIFICATION FILTERED] {branch_id}: Top prediction conf={top_confidence:.3f} < threshold={min_confidence}")
 | 
			
		||||
                else:
 | 
			
		||||
                    logger.warning(f"[UNKNOWN MODEL] {branch_id}: Model results have no .boxes or .probs")
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -499,22 +563,27 @@ class BranchProcessor:
 | 
			
		|||
 | 
			
		||||
            logger.info(f"[FINAL RESULTS] {branch_id}: {len(branch_detections)} detections processed")
 | 
			
		||||
 | 
			
		||||
            # Extract best result for classification models
 | 
			
		||||
            # Determine output field name from dynamic mapping (parsed from parallelActions.fields)
 | 
			
		||||
            output_field = self.branch_output_fields.get(branch_id)
 | 
			
		||||
 | 
			
		||||
            # Always initialize the field (even if None) to ensure it exists for database update
 | 
			
		||||
            if output_field:
 | 
			
		||||
                result['result'][output_field] = None
 | 
			
		||||
                logger.debug(f"[FIELD INIT] {branch_id}: Initialized field '{output_field}' = None")
 | 
			
		||||
 | 
			
		||||
            # Extract best detection if available
 | 
			
		||||
            if branch_detections:
 | 
			
		||||
                best_detection = max(branch_detections, key=lambda x: x['confidence'])
 | 
			
		||||
                logger.info(f"[BEST DETECTION] {branch_id}: '{best_detection['class_name']}' with confidence {best_detection['confidence']:.3f}")
 | 
			
		||||
 | 
			
		||||
                # Add classification-style results for database operations
 | 
			
		||||
                if 'brand' in branch_id.lower():
 | 
			
		||||
                    result['result']['brand'] = best_detection['class_name']
 | 
			
		||||
                elif 'body' in branch_id.lower() or 'bodytype' in branch_id.lower():
 | 
			
		||||
                    result['result']['body_type'] = best_detection['class_name']
 | 
			
		||||
                elif 'front_rear' in branch_id.lower():
 | 
			
		||||
                    result['result']['front_rear'] = best_detection['confidence']
 | 
			
		||||
 | 
			
		||||
                logger.info(f"[CLASSIFICATION RESULT] {branch_id}: Extracted classification fields")
 | 
			
		||||
                # Set the output field value using dynamic mapping
 | 
			
		||||
                if output_field:
 | 
			
		||||
                    result['result'][output_field] = best_detection['class_name']
 | 
			
		||||
                    logger.info(f"[FIELD SET] {branch_id}: Set field '{output_field}' = '{best_detection['class_name']}'")
 | 
			
		||||
                else:
 | 
			
		||||
                    logger.warning(f"[NO MAPPING] {branch_id}: No output field defined in parallelActions.fields")
 | 
			
		||||
            else:
 | 
			
		||||
                logger.warning(f"[NO RESULTS] {branch_id}: No detections found")
 | 
			
		||||
                logger.warning(f"[NO RESULTS] {branch_id}: No detections found, field '{output_field}' remains None")
 | 
			
		||||
 | 
			
		||||
            # Execute branch actions if this branch found valid detections
 | 
			
		||||
            actions_executed = []
 | 
			
		||||
| 
						 | 
				
			
			@ -631,10 +700,26 @@ class BranchProcessor:
 | 
			
		|||
            bbox = None
 | 
			
		||||
            if region_name and region_name in detected_regions:
 | 
			
		||||
                # Crop the specified region
 | 
			
		||||
                bbox = detected_regions[region_name]['bbox']
 | 
			
		||||
                # Handle both list (new) and single dict (backward compat)
 | 
			
		||||
                regions = detected_regions[region_name]
 | 
			
		||||
                if isinstance(regions, list):
 | 
			
		||||
                    # Multiple detections - select largest bbox
 | 
			
		||||
                    if regions:
 | 
			
		||||
                        best_region = max(regions, key=lambda r: (r['bbox'][2] - r['bbox'][0]) * (r['bbox'][3] - r['bbox'][1]))
 | 
			
		||||
                        bbox = best_region['bbox']
 | 
			
		||||
                else:
 | 
			
		||||
                    bbox = regions['bbox']
 | 
			
		||||
            elif region_name and region_name.lower() == 'frontal' and 'front_rear' in detected_regions:
 | 
			
		||||
                # Special case: "frontal" region maps to "front_rear" detection
 | 
			
		||||
                bbox = detected_regions['front_rear']['bbox']
 | 
			
		||||
                # Handle both list (new) and single dict (backward compat)
 | 
			
		||||
                regions = detected_regions['front_rear']
 | 
			
		||||
                if isinstance(regions, list):
 | 
			
		||||
                    # Multiple detections - select largest bbox
 | 
			
		||||
                    if regions:
 | 
			
		||||
                        best_region = max(regions, key=lambda r: (r['bbox'][2] - r['bbox'][0]) * (r['bbox'][3] - r['bbox'][1]))
 | 
			
		||||
                        bbox = best_region['bbox']
 | 
			
		||||
                else:
 | 
			
		||||
                    bbox = regions['bbox']
 | 
			
		||||
 | 
			
		||||
            if bbox is not None:
 | 
			
		||||
                x1, y1, x2, y2 = [int(coord) for coord in bbox]
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -199,6 +199,8 @@ class DetectionPipeline:
 | 
			
		|||
            Dictionary with extracted field values (e.g., {"car_brand": "Honda", "body_type": "Sedan"})
 | 
			
		||||
        """
 | 
			
		||||
        extracted = {}
 | 
			
		||||
        missing_fields = []
 | 
			
		||||
        available_fields = []
 | 
			
		||||
 | 
			
		||||
        try:
 | 
			
		||||
            for db_field_name, template in self.field_mappings.items():
 | 
			
		||||
| 
						 | 
				
			
			@ -215,12 +217,21 @@ class DetectionPipeline:
 | 
			
		|||
                                result_data = branch_data['result']
 | 
			
		||||
                                if isinstance(result_data, dict) and field_name in result_data:
 | 
			
		||||
                                    extracted[field_name] = result_data[field_name]
 | 
			
		||||
                                    available_fields.append(f"{field_name}={result_data[field_name]}")
 | 
			
		||||
                                    logger.debug(f"[DYNAMIC EXTRACT] {field_name}={result_data[field_name]} from branch {branch_id}")
 | 
			
		||||
                                else:
 | 
			
		||||
                                    missing_fields.append(f"{field_name} (field not in branch {branch_id})")
 | 
			
		||||
                                    logger.debug(f"[DYNAMIC EXTRACT] Field '{field_name}' not found in branch {branch_id}")
 | 
			
		||||
                        else:
 | 
			
		||||
                            missing_fields.append(f"{field_name} (branch {branch_id} missing)")
 | 
			
		||||
                            logger.debug(f"[DYNAMIC EXTRACT] Branch '{branch_id}' not in results")
 | 
			
		||||
 | 
			
		||||
            # Log summary of extraction
 | 
			
		||||
            if available_fields:
 | 
			
		||||
                logger.info(f"[FIELD EXTRACTION] Available fields: {', '.join(available_fields)}")
 | 
			
		||||
            if missing_fields:
 | 
			
		||||
                logger.warning(f"[FIELD EXTRACTION] Missing fields (will be null): {', '.join(missing_fields)}")
 | 
			
		||||
 | 
			
		||||
        except Exception as e:
 | 
			
		||||
            logger.error(f"Error extracting fields from branches: {e}", exc_info=True)
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -338,7 +349,17 @@ class DetectionPipeline:
 | 
			
		|||
                car_brand = extracted_fields.get('brand')
 | 
			
		||||
                body_type = extracted_fields.get('body_type')
 | 
			
		||||
 | 
			
		||||
                logger.info(f"[LICENSE PLATE] Extracted fields: brand={car_brand}, body_type={body_type}")
 | 
			
		||||
                # Log extraction results
 | 
			
		||||
                fields_status = []
 | 
			
		||||
                if car_brand is not None:
 | 
			
		||||
                    fields_status.append(f"brand={car_brand}")
 | 
			
		||||
                else:
 | 
			
		||||
                    fields_status.append("brand=null")
 | 
			
		||||
                if body_type is not None:
 | 
			
		||||
                    fields_status.append(f"bodyType={body_type}")
 | 
			
		||||
                else:
 | 
			
		||||
                    fields_status.append("bodyType=null")
 | 
			
		||||
                logger.info(f"[LICENSE PLATE] Extracted fields: {', '.join(fields_status)}")
 | 
			
		||||
 | 
			
		||||
                # Clean up stored results after use
 | 
			
		||||
                del self.session_processing_results[session_id_for_lookup]
 | 
			
		||||
| 
						 | 
				
			
			@ -367,7 +388,18 @@ class DetectionPipeline:
 | 
			
		|||
 | 
			
		||||
            # Send message
 | 
			
		||||
            await self.message_sender(detection_message)
 | 
			
		||||
            logger.info(f"[COMBINED MESSAGE] Sent imageDetection with brand='{car_brand}', bodyType='{body_type}', license='{license_text}' to '{subscription_id}'")
 | 
			
		||||
 | 
			
		||||
            # Log with indication of partial results
 | 
			
		||||
            null_fields = []
 | 
			
		||||
            if car_brand is None:
 | 
			
		||||
                null_fields.append('brand')
 | 
			
		||||
            if body_type is None:
 | 
			
		||||
                null_fields.append('bodyType')
 | 
			
		||||
 | 
			
		||||
            if null_fields:
 | 
			
		||||
                logger.info(f"[COMBINED MESSAGE] Sent imageDetection with PARTIAL results (null: {', '.join(null_fields)}) - brand='{car_brand}', bodyType='{body_type}', license='{license_text}' to '{subscription_id}'")
 | 
			
		||||
            else:
 | 
			
		||||
                logger.info(f"[COMBINED MESSAGE] Sent imageDetection with brand='{car_brand}', bodyType='{body_type}', license='{license_text}' to '{subscription_id}'")
 | 
			
		||||
 | 
			
		||||
        except Exception as e:
 | 
			
		||||
            logger.error(f"Error sending license plate imageDetection message: {e}", exc_info=True)
 | 
			
		||||
| 
						 | 
				
			
			@ -495,11 +527,13 @@ class DetectionPipeline:
 | 
			
		|||
                        }
 | 
			
		||||
                        valid_detections.append(detection_info)
 | 
			
		||||
 | 
			
		||||
                        # Store region for processing phase
 | 
			
		||||
                        detected_regions[class_name] = {
 | 
			
		||||
                        # Store region for processing phase (support multiple detections per class)
 | 
			
		||||
                        if class_name not in detected_regions:
 | 
			
		||||
                            detected_regions[class_name] = []
 | 
			
		||||
                        detected_regions[class_name].append({
 | 
			
		||||
                            'bbox': bbox,
 | 
			
		||||
                            'confidence': confidence
 | 
			
		||||
                        }
 | 
			
		||||
                        })
 | 
			
		||||
                else:
 | 
			
		||||
                    logger.warning("[DETECTION PHASE] No boxes found in detection results")
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -951,14 +985,26 @@ class DetectionPipeline:
 | 
			
		|||
 | 
			
		||||
            if region_name and region_name in detected_regions:
 | 
			
		||||
                # Crop the specified region
 | 
			
		||||
                bbox = detected_regions[region_name]['bbox']
 | 
			
		||||
                x1, y1, x2, y2 = [int(coord) for coord in bbox]
 | 
			
		||||
                cropped = frame[y1:y2, x1:x2]
 | 
			
		||||
                if cropped.size > 0:
 | 
			
		||||
                    image_to_save = cropped
 | 
			
		||||
                    logger.debug(f"Cropped region '{region_name}' for redis_save_image")
 | 
			
		||||
                # Handle both list (new) and single dict (backward compat)
 | 
			
		||||
                regions = detected_regions[region_name]
 | 
			
		||||
                if isinstance(regions, list):
 | 
			
		||||
                    # Multiple detections - select largest bbox
 | 
			
		||||
                    if regions:
 | 
			
		||||
                        best_region = max(regions, key=lambda r: (r['bbox'][2] - r['bbox'][0]) * (r['bbox'][3] - r['bbox'][1]))
 | 
			
		||||
                        bbox = best_region['bbox']
 | 
			
		||||
                    else:
 | 
			
		||||
                        bbox = None
 | 
			
		||||
                else:
 | 
			
		||||
                    logger.warning(f"Empty crop for region '{region_name}', using full frame")
 | 
			
		||||
                    bbox = regions['bbox']
 | 
			
		||||
 | 
			
		||||
                if bbox:
 | 
			
		||||
                    x1, y1, x2, y2 = [int(coord) for coord in bbox]
 | 
			
		||||
                    cropped = frame[y1:y2, x1:x2]
 | 
			
		||||
                    if cropped.size > 0:
 | 
			
		||||
                        image_to_save = cropped
 | 
			
		||||
                        logger.debug(f"Cropped region '{region_name}' for redis_save_image")
 | 
			
		||||
                    else:
 | 
			
		||||
                        logger.warning(f"Empty crop for region '{region_name}', using full frame")
 | 
			
		||||
 | 
			
		||||
            # Format key with context
 | 
			
		||||
            key = action.params['key'].format(**context)
 | 
			
		||||
| 
						 | 
				
			
			@ -1019,11 +1065,13 @@ class DetectionPipeline:
 | 
			
		|||
            wait_for_branches = action.params.get('waitForBranches', [])
 | 
			
		||||
            branch_results = context.get('branch_results', {})
 | 
			
		||||
 | 
			
		||||
            # Check if all required branches have completed
 | 
			
		||||
            for branch_id in wait_for_branches:
 | 
			
		||||
                if branch_id not in branch_results:
 | 
			
		||||
                    logger.warning(f"Branch {branch_id} result not available for database update")
 | 
			
		||||
                    return {'status': 'error', 'message': f'Missing branch result: {branch_id}'}
 | 
			
		||||
            # Log missing branches but don't block the update (allow partial results)
 | 
			
		||||
            missing_branches = [b for b in wait_for_branches if b not in branch_results]
 | 
			
		||||
            if missing_branches:
 | 
			
		||||
                logger.warning(f"Some branches missing from results (will use null): {missing_branches}")
 | 
			
		||||
            available_branches = [b for b in wait_for_branches if b in branch_results]
 | 
			
		||||
            if available_branches:
 | 
			
		||||
                logger.info(f"Available branches for database update: {available_branches}")
 | 
			
		||||
 | 
			
		||||
            # Prepare fields for database update
 | 
			
		||||
            table = action.params.get('table', 'car_frontal_info')
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -350,10 +350,21 @@ class TrackingPipelineIntegration:
 | 
			
		|||
                    'session_id': session_id
 | 
			
		||||
                }
 | 
			
		||||
 | 
			
		||||
            # Fetch high-quality 2K snapshot for detection phase (not RTSP frame)
 | 
			
		||||
            # This ensures bbox coordinates match the frame used in processing phase
 | 
			
		||||
            logger.info(f"[DETECTION PHASE] Fetching 2K snapshot for vehicle {vehicle.track_id}")
 | 
			
		||||
            snapshot_frame = self._fetch_snapshot()
 | 
			
		||||
 | 
			
		||||
            if snapshot_frame is None:
 | 
			
		||||
                logger.warning(f"[DETECTION PHASE] Failed to fetch snapshot, falling back to RTSP frame")
 | 
			
		||||
                snapshot_frame = frame  # Fallback to RTSP if snapshot fails
 | 
			
		||||
            else:
 | 
			
		||||
                logger.info(f"[DETECTION PHASE] Using {snapshot_frame.shape[1]}x{snapshot_frame.shape[0]} snapshot for detection")
 | 
			
		||||
 | 
			
		||||
            # Execute only the detection phase (first phase)
 | 
			
		||||
            # This will run detection and send imageDetection message to backend
 | 
			
		||||
            detection_result = await self.detection_pipeline.execute_detection_phase(
 | 
			
		||||
                frame=frame,
 | 
			
		||||
                frame=snapshot_frame,  # Use 2K snapshot instead of RTSP frame
 | 
			
		||||
                display_id=display_id,
 | 
			
		||||
                subscription_id=subscription_id
 | 
			
		||||
            )
 | 
			
		||||
| 
						 | 
				
			
			@ -373,13 +384,13 @@ class TrackingPipelineIntegration:
 | 
			
		|||
            if detection_result['message_sent']:
 | 
			
		||||
                # Store for later processing when sessionId is received
 | 
			
		||||
                self.pending_processing_data[display_id] = {
 | 
			
		||||
                    'frame': frame.copy(),  # Store copy of frame for processing phase
 | 
			
		||||
                    'frame': snapshot_frame.copy(),  # Store copy of 2K snapshot (not RTSP frame!)
 | 
			
		||||
                    'vehicle': vehicle,
 | 
			
		||||
                    'subscription_id': subscription_id,
 | 
			
		||||
                    'detection_result': detection_result,
 | 
			
		||||
                    'timestamp': time.time()
 | 
			
		||||
                }
 | 
			
		||||
                logger.info(f"Stored processing data for {display_id}, waiting for sessionId from backend")
 | 
			
		||||
                logger.info(f"Stored processing data ({snapshot_frame.shape[1]}x{snapshot_frame.shape[0]} frame) for {display_id}, waiting for sessionId from backend")
 | 
			
		||||
 | 
			
		||||
            return detection_result
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -413,14 +424,27 @@ class TrackingPipelineIntegration:
 | 
			
		|||
 | 
			
		||||
            logger.info(f"Executing processing phase for session {session_id}, vehicle {vehicle.track_id}")
 | 
			
		||||
 | 
			
		||||
            # Capture high-quality snapshot for pipeline processing
 | 
			
		||||
            logger.info(f"[PROCESSING PHASE] Fetching 2K snapshot for session {session_id}")
 | 
			
		||||
            frame = self._fetch_snapshot()
 | 
			
		||||
            # Reuse the snapshot from detection phase OR fetch fresh one if detection used RTSP fallback
 | 
			
		||||
            detection_frame = processing_data['frame']
 | 
			
		||||
            frame_height = detection_frame.shape[0]
 | 
			
		||||
 | 
			
		||||
            if frame is None:
 | 
			
		||||
                logger.warning(f"[PROCESSING PHASE] Failed to capture snapshot, falling back to RTSP frame")
 | 
			
		||||
                # Fall back to RTSP frame if snapshot fails
 | 
			
		||||
                frame = processing_data['frame']
 | 
			
		||||
            # Check if detection phase used 2K snapshot (height > 1000) or RTSP fallback (height = 720)
 | 
			
		||||
            if frame_height >= 1000:
 | 
			
		||||
                # Detection used 2K snapshot - reuse it for consistent coordinates
 | 
			
		||||
                logger.info(f"[PROCESSING PHASE] Reusing 2K snapshot from detection phase ({detection_frame.shape[1]}x{detection_frame.shape[0]})")
 | 
			
		||||
                frame = detection_frame
 | 
			
		||||
            else:
 | 
			
		||||
                # Detection used RTSP fallback - need to fetch fresh 2K snapshot
 | 
			
		||||
                logger.warning(f"[PROCESSING PHASE] Detection used RTSP fallback ({detection_frame.shape[1]}x{detection_frame.shape[0]}), fetching fresh 2K snapshot")
 | 
			
		||||
                frame = self._fetch_snapshot()
 | 
			
		||||
 | 
			
		||||
                if frame is None:
 | 
			
		||||
                    logger.error(f"[PROCESSING PHASE] Failed to fetch snapshot and detection used RTSP - coordinate mismatch will occur!")
 | 
			
		||||
                    logger.error(f"[PROCESSING PHASE] Cannot proceed with mismatched coordinates. Aborting processing phase.")
 | 
			
		||||
                    return  # Cannot process safely - bbox coordinates won't match frame resolution
 | 
			
		||||
                else:
 | 
			
		||||
                    logger.warning(f"[PROCESSING PHASE] Fetched fresh 2K snapshot ({frame.shape[1]}x{frame.shape[0]}), but coordinates may not match exactly")
 | 
			
		||||
                    logger.warning(f"[PROCESSING PHASE] Re-running detection on fresh snapshot is recommended but not implemented yet")
 | 
			
		||||
 | 
			
		||||
            # Extract detected regions from detection phase result if available
 | 
			
		||||
            detected_regions = detection_result.get('detected_regions', {})
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue