diff --git a/core/detection/pipeline.py b/core/detection/pipeline.py index 076cdc9..d395f3a 100644 --- a/core/detection/pipeline.py +++ b/core/detection/pipeline.py @@ -64,6 +64,10 @@ class DetectionPipeline: # SessionId to processing results mapping (for combining with license plate results) self.session_processing_results = {} + # Field mappings from parallelActions (e.g., {"car_brand": "{car_brand_cls_v3.brand}"}) + self.field_mappings = {} + self._parse_field_mappings() + # Statistics self.stats = { 'detections_processed': 0, @@ -74,6 +78,25 @@ class DetectionPipeline: logger.info("DetectionPipeline initialized") + def _parse_field_mappings(self): + """ + Parse field mappings from parallelActions.postgresql_update_combined.fields. + Extracts mappings like {"car_brand": "{car_brand_cls_v3.brand}"} for dynamic field resolution. + """ + try: + if not self.pipeline_config or not hasattr(self.pipeline_config, 'parallel_actions'): + return + + for action in self.pipeline_config.parallel_actions: + if action.type.value == 'postgresql_update_combined': + fields = action.params.get('fields', {}) + self.field_mappings = fields + logger.info(f"[FIELD MAPPINGS] Parsed from pipeline config: {self.field_mappings}") + break + + except Exception as e: + logger.error(f"Error parsing field mappings: {e}", exc_info=True) + async def initialize(self) -> bool: """ Initialize all pipeline components including models, Redis, and database. @@ -165,6 +188,44 @@ class DetectionPipeline: logger.error(f"Error initializing detection model: {e}", exc_info=True) return False + def _extract_fields_from_branches(self, branch_results: Dict[str, Any]) -> Dict[str, Any]: + """ + Extract fields dynamically from branch results using field mappings. + + Args: + branch_results: Dictionary of branch execution results + + Returns: + Dictionary with extracted field values (e.g., {"car_brand": "Honda", "body_type": "Sedan"}) + """ + extracted = {} + + try: + for db_field_name, template in self.field_mappings.items(): + # Parse template like "{car_brand_cls_v3.brand}" -> branch_id="car_brand_cls_v3", field="brand" + if template.startswith('{') and template.endswith('}'): + var_name = template[1:-1] + if '.' in var_name: + branch_id, field_name = var_name.split('.', 1) + + # Look up value in branch_results + if branch_id in branch_results: + branch_data = branch_results[branch_id] + if isinstance(branch_data, dict) and 'result' in branch_data: + result_data = branch_data['result'] + if isinstance(result_data, dict) and field_name in result_data: + extracted[field_name] = result_data[field_name] + logger.debug(f"[DYNAMIC EXTRACT] {field_name}={result_data[field_name]} from branch {branch_id}") + else: + logger.debug(f"[DYNAMIC EXTRACT] Field '{field_name}' not found in branch {branch_id}") + else: + logger.debug(f"[DYNAMIC EXTRACT] Branch '{branch_id}' not in results") + + except Exception as e: + logger.error(f"Error extracting fields from branches: {e}", exc_info=True) + + return extracted + async def _on_license_plate_result(self, session_id: str, license_data: Dict[str, Any]): """ Callback for handling license plate results from LPR service. @@ -272,12 +333,12 @@ class DetectionPipeline: branch_results = self.session_processing_results[session_id_for_lookup] logger.info(f"[LICENSE PLATE] Retrieved processing results for session {session_id_for_lookup}") - if 'car_brand_cls_v2' in branch_results: - brand_result = branch_results['car_brand_cls_v2'].get('result', {}) - car_brand = brand_result.get('brand') - if 'car_bodytype_cls_v1' in branch_results: - bodytype_result = branch_results['car_bodytype_cls_v1'].get('result', {}) - body_type = bodytype_result.get('body_type') + # Extract fields dynamically using field mappings from pipeline config + extracted_fields = self._extract_fields_from_branches(branch_results) + car_brand = extracted_fields.get('brand') + body_type = extracted_fields.get('body_type') + + logger.info(f"[LICENSE PLATE] Extracted fields: brand={car_brand}, body_type={body_type}") # Clean up stored results after use del self.session_processing_results[session_id_for_lookup] @@ -1003,7 +1064,7 @@ class DetectionPipeline: Resolve field template using branch results and context. Args: - template: Template string like "{car_brand_cls_v2.brand}" + template: Template string like "{car_brand_cls_v3.brand}" branch_results: Dictionary of branch execution results context: Detection context @@ -1015,7 +1076,7 @@ class DetectionPipeline: if template.startswith('{') and template.endswith('}'): var_name = template[1:-1] - # Check for branch result reference (e.g., "car_brand_cls_v2.brand") + # Check for branch result reference (e.g., "car_brand_cls_v3.brand") if '.' in var_name: branch_id, field_name = var_name.split('.', 1) if branch_id in branch_results: @@ -1061,17 +1122,10 @@ class DetectionPipeline: logger.warning("No session_id in context for processing results") return - # Extract car brand from car_brand_cls_v2 results - car_brand = None - if 'car_brand_cls_v2' in branch_results: - brand_result = branch_results['car_brand_cls_v2'].get('result', {}) - car_brand = brand_result.get('brand') - - # Extract body type from car_bodytype_cls_v1 results - body_type = None - if 'car_bodytype_cls_v1' in branch_results: - bodytype_result = branch_results['car_bodytype_cls_v1'].get('result', {}) - body_type = bodytype_result.get('body_type') + # Extract fields dynamically using field mappings from pipeline config + extracted_fields = self._extract_fields_from_branches(branch_results) + car_brand = extracted_fields.get('brand') + body_type = extracted_fields.get('body_type') logger.info(f"[PROCESSING RESULTS] Completed for session {session_id}: " f"brand={car_brand}, bodyType={body_type}") diff --git a/core/tracking/integration.py b/core/tracking/integration.py index 58afcec..8e0d8fa 100644 --- a/core/tracking/integration.py +++ b/core/tracking/integration.py @@ -521,19 +521,23 @@ class TrackingPipelineIntegration: logger.warning(f"No pending processing data found for display {display_id} when setting session {session_id}") # FALLBACK: Execute pipeline for POS-initiated sessions - # Use stored subscription_id instead of creating fake one - stored_subscription_id = self.display_to_subscription.get(display_id) - if stored_subscription_id: - logger.info(f"[FALLBACK] Triggering fallback pipeline for session {session_id} on display {display_id} with subscription {stored_subscription_id}") + # Skip if session_id is None (no car present or car has left) + if session_id is not None: + # Use stored subscription_id instead of creating fake one + stored_subscription_id = self.display_to_subscription.get(display_id) + if stored_subscription_id: + logger.info(f"[FALLBACK] Triggering fallback pipeline for session {session_id} on display {display_id} with subscription {stored_subscription_id}") - # Trigger the fallback pipeline asynchronously with real subscription_id - asyncio.create_task(self._execute_fallback_pipeline( - display_id=display_id, - session_id=session_id, - subscription_id=stored_subscription_id - )) + # Trigger the fallback pipeline asynchronously with real subscription_id + asyncio.create_task(self._execute_fallback_pipeline( + display_id=display_id, + session_id=session_id, + subscription_id=stored_subscription_id + )) + else: + logger.error(f"[FALLBACK] No subscription_id stored for display {display_id}, cannot execute fallback pipeline") else: - logger.error(f"[FALLBACK] No subscription_id stored for display {display_id}, cannot execute fallback pipeline") + logger.debug(f"[FALLBACK] Skipping pipeline execution for session_id=None on display {display_id}") def clear_session_id(self, session_id: str): """