fix: fallback when there is sessionId
All checks were successful
Build Worker Base and Application Images / check-base-changes (push) Successful in 7s
Build Worker Base and Application Images / build-base (push) Has been skipped
Build Worker Base and Application Images / build-docker (push) Successful in 2m28s
Build Worker Base and Application Images / deploy-stack (push) Successful in 14s
All checks were successful
Build Worker Base and Application Images / check-base-changes (push) Successful in 7s
Build Worker Base and Application Images / build-base (push) Has been skipped
Build Worker Base and Application Images / build-docker (push) Successful in 2m28s
Build Worker Base and Application Images / deploy-stack (push) Successful in 14s
This commit is contained in:
parent
e92efdbe11
commit
354ed9ce3c
2 changed files with 88 additions and 30 deletions
|
@ -64,6 +64,10 @@ class DetectionPipeline:
|
|||
# SessionId to processing results mapping (for combining with license plate results)
|
||||
self.session_processing_results = {}
|
||||
|
||||
# Field mappings from parallelActions (e.g., {"car_brand": "{car_brand_cls_v3.brand}"})
|
||||
self.field_mappings = {}
|
||||
self._parse_field_mappings()
|
||||
|
||||
# Statistics
|
||||
self.stats = {
|
||||
'detections_processed': 0,
|
||||
|
@ -74,6 +78,25 @@ class DetectionPipeline:
|
|||
|
||||
logger.info("DetectionPipeline initialized")
|
||||
|
||||
def _parse_field_mappings(self):
|
||||
"""
|
||||
Parse field mappings from parallelActions.postgresql_update_combined.fields.
|
||||
Extracts mappings like {"car_brand": "{car_brand_cls_v3.brand}"} for dynamic field resolution.
|
||||
"""
|
||||
try:
|
||||
if not self.pipeline_config or not hasattr(self.pipeline_config, 'parallel_actions'):
|
||||
return
|
||||
|
||||
for action in self.pipeline_config.parallel_actions:
|
||||
if action.type.value == 'postgresql_update_combined':
|
||||
fields = action.params.get('fields', {})
|
||||
self.field_mappings = fields
|
||||
logger.info(f"[FIELD MAPPINGS] Parsed from pipeline config: {self.field_mappings}")
|
||||
break
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error parsing field mappings: {e}", exc_info=True)
|
||||
|
||||
async def initialize(self) -> bool:
|
||||
"""
|
||||
Initialize all pipeline components including models, Redis, and database.
|
||||
|
@ -165,6 +188,44 @@ class DetectionPipeline:
|
|||
logger.error(f"Error initializing detection model: {e}", exc_info=True)
|
||||
return False
|
||||
|
||||
def _extract_fields_from_branches(self, branch_results: Dict[str, Any]) -> Dict[str, Any]:
|
||||
"""
|
||||
Extract fields dynamically from branch results using field mappings.
|
||||
|
||||
Args:
|
||||
branch_results: Dictionary of branch execution results
|
||||
|
||||
Returns:
|
||||
Dictionary with extracted field values (e.g., {"car_brand": "Honda", "body_type": "Sedan"})
|
||||
"""
|
||||
extracted = {}
|
||||
|
||||
try:
|
||||
for db_field_name, template in self.field_mappings.items():
|
||||
# Parse template like "{car_brand_cls_v3.brand}" -> branch_id="car_brand_cls_v3", field="brand"
|
||||
if template.startswith('{') and template.endswith('}'):
|
||||
var_name = template[1:-1]
|
||||
if '.' in var_name:
|
||||
branch_id, field_name = var_name.split('.', 1)
|
||||
|
||||
# Look up value in branch_results
|
||||
if branch_id in branch_results:
|
||||
branch_data = branch_results[branch_id]
|
||||
if isinstance(branch_data, dict) and 'result' in branch_data:
|
||||
result_data = branch_data['result']
|
||||
if isinstance(result_data, dict) and field_name in result_data:
|
||||
extracted[field_name] = result_data[field_name]
|
||||
logger.debug(f"[DYNAMIC EXTRACT] {field_name}={result_data[field_name]} from branch {branch_id}")
|
||||
else:
|
||||
logger.debug(f"[DYNAMIC EXTRACT] Field '{field_name}' not found in branch {branch_id}")
|
||||
else:
|
||||
logger.debug(f"[DYNAMIC EXTRACT] Branch '{branch_id}' not in results")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error extracting fields from branches: {e}", exc_info=True)
|
||||
|
||||
return extracted
|
||||
|
||||
async def _on_license_plate_result(self, session_id: str, license_data: Dict[str, Any]):
|
||||
"""
|
||||
Callback for handling license plate results from LPR service.
|
||||
|
@ -272,12 +333,12 @@ class DetectionPipeline:
|
|||
branch_results = self.session_processing_results[session_id_for_lookup]
|
||||
logger.info(f"[LICENSE PLATE] Retrieved processing results for session {session_id_for_lookup}")
|
||||
|
||||
if 'car_brand_cls_v2' in branch_results:
|
||||
brand_result = branch_results['car_brand_cls_v2'].get('result', {})
|
||||
car_brand = brand_result.get('brand')
|
||||
if 'car_bodytype_cls_v1' in branch_results:
|
||||
bodytype_result = branch_results['car_bodytype_cls_v1'].get('result', {})
|
||||
body_type = bodytype_result.get('body_type')
|
||||
# Extract fields dynamically using field mappings from pipeline config
|
||||
extracted_fields = self._extract_fields_from_branches(branch_results)
|
||||
car_brand = extracted_fields.get('brand')
|
||||
body_type = extracted_fields.get('body_type')
|
||||
|
||||
logger.info(f"[LICENSE PLATE] Extracted fields: brand={car_brand}, body_type={body_type}")
|
||||
|
||||
# Clean up stored results after use
|
||||
del self.session_processing_results[session_id_for_lookup]
|
||||
|
@ -1003,7 +1064,7 @@ class DetectionPipeline:
|
|||
Resolve field template using branch results and context.
|
||||
|
||||
Args:
|
||||
template: Template string like "{car_brand_cls_v2.brand}"
|
||||
template: Template string like "{car_brand_cls_v3.brand}"
|
||||
branch_results: Dictionary of branch execution results
|
||||
context: Detection context
|
||||
|
||||
|
@ -1015,7 +1076,7 @@ class DetectionPipeline:
|
|||
if template.startswith('{') and template.endswith('}'):
|
||||
var_name = template[1:-1]
|
||||
|
||||
# Check for branch result reference (e.g., "car_brand_cls_v2.brand")
|
||||
# Check for branch result reference (e.g., "car_brand_cls_v3.brand")
|
||||
if '.' in var_name:
|
||||
branch_id, field_name = var_name.split('.', 1)
|
||||
if branch_id in branch_results:
|
||||
|
@ -1061,17 +1122,10 @@ class DetectionPipeline:
|
|||
logger.warning("No session_id in context for processing results")
|
||||
return
|
||||
|
||||
# Extract car brand from car_brand_cls_v2 results
|
||||
car_brand = None
|
||||
if 'car_brand_cls_v2' in branch_results:
|
||||
brand_result = branch_results['car_brand_cls_v2'].get('result', {})
|
||||
car_brand = brand_result.get('brand')
|
||||
|
||||
# Extract body type from car_bodytype_cls_v1 results
|
||||
body_type = None
|
||||
if 'car_bodytype_cls_v1' in branch_results:
|
||||
bodytype_result = branch_results['car_bodytype_cls_v1'].get('result', {})
|
||||
body_type = bodytype_result.get('body_type')
|
||||
# Extract fields dynamically using field mappings from pipeline config
|
||||
extracted_fields = self._extract_fields_from_branches(branch_results)
|
||||
car_brand = extracted_fields.get('brand')
|
||||
body_type = extracted_fields.get('body_type')
|
||||
|
||||
logger.info(f"[PROCESSING RESULTS] Completed for session {session_id}: "
|
||||
f"brand={car_brand}, bodyType={body_type}")
|
||||
|
|
|
@ -521,6 +521,8 @@ class TrackingPipelineIntegration:
|
|||
logger.warning(f"No pending processing data found for display {display_id} when setting session {session_id}")
|
||||
|
||||
# FALLBACK: Execute pipeline for POS-initiated sessions
|
||||
# Skip if session_id is None (no car present or car has left)
|
||||
if session_id is not None:
|
||||
# Use stored subscription_id instead of creating fake one
|
||||
stored_subscription_id = self.display_to_subscription.get(display_id)
|
||||
if stored_subscription_id:
|
||||
|
@ -534,6 +536,8 @@ class TrackingPipelineIntegration:
|
|||
))
|
||||
else:
|
||||
logger.error(f"[FALLBACK] No subscription_id stored for display {display_id}, cannot execute fallback pipeline")
|
||||
else:
|
||||
logger.debug(f"[FALLBACK] Skipping pipeline execution for session_id=None on display {display_id}")
|
||||
|
||||
def clear_session_id(self, session_id: str):
|
||||
"""
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue