fix: send every data that got result
All checks were successful
Build Worker Base and Application Images / check-base-changes (push) Successful in 8s
Build Worker Base and Application Images / build-base (push) Has been skipped
Build Worker Base and Application Images / build-docker (push) Successful in 2m46s
Build Worker Base and Application Images / deploy-stack (push) Successful in 9s

This commit is contained in:
ziesorx 2025-09-25 14:02:10 +07:00
parent 0cf0bc8b91
commit 270df1a457

View file

@ -352,6 +352,76 @@ class DetectionPipeline:
except Exception as e:
logger.error(f"Error sending initial detection imageDetection message: {e}", exc_info=True)
async def _send_processing_results_message(self, subscription_id: str, branch_results: Dict[str, Any], session_id: Optional[str] = None):
"""
Send imageDetection message immediately with processing results, regardless of completeness.
Sends even if no results, partial results, or complete results are available.
Args:
subscription_id: Subscription identifier to send message to
branch_results: Branch processing results (may be empty or partial)
session_id: Session identifier for logging
"""
try:
if not self.message_sender:
logger.warning("No message sender configured, cannot send imageDetection")
return
# Import here to avoid circular imports
from ..communication.models import ImageDetectionMessage, DetectionData
# Extract classification results from branch results
car_brand = None
body_type = None
if branch_results:
# Extract car brand from car_brand_cls_v2 results
if 'car_brand_cls_v2' in branch_results:
brand_result = branch_results['car_brand_cls_v2'].get('result', {})
car_brand = brand_result.get('brand')
# Extract body type from car_bodytype_cls_v1 results
if 'car_bodytype_cls_v1' in branch_results:
bodytype_result = branch_results['car_bodytype_cls_v1'].get('result', {})
body_type = bodytype_result.get('body_type')
# Create detection data with available results (fields can be None)
detection_data_obj = DetectionData(
detection={
"carBrand": car_brand,
"carModel": None, # Not implemented yet
"bodyType": body_type,
"licensePlateText": None, # Will be updated later if available
"licensePlateConfidence": None
},
modelId=self.model_id,
modelName=self.pipeline_parser.pipeline_config.model_id if self.pipeline_parser.pipeline_config else "detection_model"
)
# Create imageDetection message
detection_message = ImageDetectionMessage(
subscriptionIdentifier=subscription_id,
data=detection_data_obj
)
# Send message
await self.message_sender(detection_message)
# Log what was sent
result_summary = []
if car_brand:
result_summary.append(f"brand='{car_brand}'")
if body_type:
result_summary.append(f"bodyType='{body_type}'")
if not result_summary:
result_summary.append("no classification results")
logger.info(f"[PROCESSING COMPLETE] Sent imageDetection with {', '.join(result_summary)} to '{subscription_id}'"
f"{f' (session {session_id})' if session_id else ''}")
except Exception as e:
logger.error(f"Error sending processing results imageDetection message: {e}", exc_info=True)
async def execute_detection_phase(self,
frame: np.ndarray,
display_id: str,
@ -593,19 +663,31 @@ class DetectionPipeline:
)
result['actions_executed'].extend(executed_parallel_actions)
# Store processing results for later combination with license plate data
# Send imageDetection message immediately with available results
await self._send_processing_results_message(subscription_id, result['branch_results'], session_id)
# Store processing results for later combination with license plate data if needed
if result['branch_results'] and session_id:
self.session_processing_results[session_id] = result['branch_results']
logger.info(f"[PROCESSING RESULTS] Stored results for session {session_id} for later combination")
logger.info(f"[PROCESSING RESULTS] Stored results for session {session_id} for potential license plate combination")
logger.info(f"Processing phase completed for session {session_id}: "
f"{len(result['branch_results'])} branches, {len(result['actions_executed'])} actions")
f"status={result.get('status', 'unknown')}, "
f"branches={len(result['branch_results'])}, "
f"actions={len(result['actions_executed'])}, "
f"processing_time={result.get('processing_time', 0):.3f}s")
except Exception as e:
logger.error(f"Error in processing phase: {e}", exc_info=True)
result['status'] = 'error'
result['message'] = str(e)
# Even if there was an error, send imageDetection message with whatever results we have
try:
await self._send_processing_results_message(subscription_id, result['branch_results'], session_id)
except Exception as send_error:
logger.error(f"Failed to send imageDetection message after processing error: {send_error}")
result['processing_time'] = time.time() - start_time
return result
@ -958,11 +1040,16 @@ class DetectionPipeline:
wait_for_branches = action.params.get('waitForBranches', [])
branch_results = context.get('branch_results', {})
# Check if all required branches have completed
for branch_id in wait_for_branches:
if branch_id not in branch_results:
logger.warning(f"Branch {branch_id} result not available for database update")
return {'status': 'error', 'message': f'Missing branch result: {branch_id}'}
# Log which branches are available vs. expected
missing_branches = [branch_id for branch_id in wait_for_branches if branch_id not in branch_results]
available_branches = [branch_id for branch_id in wait_for_branches if branch_id in branch_results]
if missing_branches:
logger.warning(f"Some branches missing for database update - available: {available_branches}, missing: {missing_branches}")
else:
logger.info(f"All expected branches available for database update: {available_branches}")
# Continue with update using whatever results are available (don't fail on missing branches)
# Prepare fields for database update
table = action.params.get('table', 'car_frontal_info')
@ -981,7 +1068,7 @@ class DetectionPipeline:
logger.warning(f"Failed to resolve field {field_name}: {e}")
resolved_fields[field_name] = None
# Execute database update
# Execute database update with available data
success = self.db_manager.execute_update(
table=table,
key_field=key_field,
@ -989,9 +1076,26 @@ class DetectionPipeline:
fields=resolved_fields
)
# Log the update result with details about what data was available
non_null_fields = {k: v for k, v in resolved_fields.items() if v is not None}
null_fields = [k for k, v in resolved_fields.items() if v is None]
if success:
return {'status': 'success', 'table': table, 'key': f'{key_field}={key_value}', 'fields': resolved_fields}
logger.info(f"[DATABASE UPDATE] Success for session {key_value}: "
f"updated {len(non_null_fields)} fields {list(non_null_fields.keys())}"
f"{f', {len(null_fields)} null fields {null_fields}' if null_fields else ''}")
return {
'status': 'success',
'table': table,
'key': f'{key_field}={key_value}',
'fields': resolved_fields,
'updated_fields': non_null_fields,
'null_fields': null_fields,
'available_branches': available_branches,
'missing_branches': missing_branches
}
else:
logger.error(f"[DATABASE UPDATE] Failed for session {key_value}")
return {'status': 'error', 'message': 'Database update failed'}
except Exception as e: