diff --git a/core/detection/pipeline.py b/core/detection/pipeline.py index 076cdc9..e13b739 100644 --- a/core/detection/pipeline.py +++ b/core/detection/pipeline.py @@ -352,6 +352,76 @@ class DetectionPipeline: except Exception as e: logger.error(f"Error sending initial detection imageDetection message: {e}", exc_info=True) + async def _send_processing_results_message(self, subscription_id: str, branch_results: Dict[str, Any], session_id: Optional[str] = None): + """ + Send imageDetection message immediately with processing results, regardless of completeness. + Sends even if no results, partial results, or complete results are available. + + Args: + subscription_id: Subscription identifier to send message to + branch_results: Branch processing results (may be empty or partial) + session_id: Session identifier for logging + """ + try: + if not self.message_sender: + logger.warning("No message sender configured, cannot send imageDetection") + return + + # Import here to avoid circular imports + from ..communication.models import ImageDetectionMessage, DetectionData + + # Extract classification results from branch results + car_brand = None + body_type = None + + if branch_results: + # Extract car brand from car_brand_cls_v2 results + if 'car_brand_cls_v2' in branch_results: + brand_result = branch_results['car_brand_cls_v2'].get('result', {}) + car_brand = brand_result.get('brand') + + # Extract body type from car_bodytype_cls_v1 results + if 'car_bodytype_cls_v1' in branch_results: + bodytype_result = branch_results['car_bodytype_cls_v1'].get('result', {}) + body_type = bodytype_result.get('body_type') + + # Create detection data with available results (fields can be None) + detection_data_obj = DetectionData( + detection={ + "carBrand": car_brand, + "carModel": None, # Not implemented yet + "bodyType": body_type, + "licensePlateText": None, # Will be updated later if available + "licensePlateConfidence": None + }, + modelId=self.model_id, + modelName=self.pipeline_parser.pipeline_config.model_id if self.pipeline_parser.pipeline_config else "detection_model" + ) + + # Create imageDetection message + detection_message = ImageDetectionMessage( + subscriptionIdentifier=subscription_id, + data=detection_data_obj + ) + + # Send message + await self.message_sender(detection_message) + + # Log what was sent + result_summary = [] + if car_brand: + result_summary.append(f"brand='{car_brand}'") + if body_type: + result_summary.append(f"bodyType='{body_type}'") + if not result_summary: + result_summary.append("no classification results") + + logger.info(f"[PROCESSING COMPLETE] Sent imageDetection with {', '.join(result_summary)} to '{subscription_id}'" + f"{f' (session {session_id})' if session_id else ''}") + + except Exception as e: + logger.error(f"Error sending processing results imageDetection message: {e}", exc_info=True) + async def execute_detection_phase(self, frame: np.ndarray, display_id: str, @@ -593,19 +663,31 @@ class DetectionPipeline: ) result['actions_executed'].extend(executed_parallel_actions) - # Store processing results for later combination with license plate data + # Send imageDetection message immediately with available results + await self._send_processing_results_message(subscription_id, result['branch_results'], session_id) + + # Store processing results for later combination with license plate data if needed if result['branch_results'] and session_id: self.session_processing_results[session_id] = result['branch_results'] - logger.info(f"[PROCESSING RESULTS] Stored results for session {session_id} for later combination") + logger.info(f"[PROCESSING RESULTS] Stored results for session {session_id} for potential license plate combination") logger.info(f"Processing phase completed for session {session_id}: " - f"{len(result['branch_results'])} branches, {len(result['actions_executed'])} actions") + f"status={result.get('status', 'unknown')}, " + f"branches={len(result['branch_results'])}, " + f"actions={len(result['actions_executed'])}, " + f"processing_time={result.get('processing_time', 0):.3f}s") except Exception as e: logger.error(f"Error in processing phase: {e}", exc_info=True) result['status'] = 'error' result['message'] = str(e) + # Even if there was an error, send imageDetection message with whatever results we have + try: + await self._send_processing_results_message(subscription_id, result['branch_results'], session_id) + except Exception as send_error: + logger.error(f"Failed to send imageDetection message after processing error: {send_error}") + result['processing_time'] = time.time() - start_time return result @@ -958,11 +1040,16 @@ class DetectionPipeline: wait_for_branches = action.params.get('waitForBranches', []) branch_results = context.get('branch_results', {}) - # Check if all required branches have completed - for branch_id in wait_for_branches: - if branch_id not in branch_results: - logger.warning(f"Branch {branch_id} result not available for database update") - return {'status': 'error', 'message': f'Missing branch result: {branch_id}'} + # Log which branches are available vs. expected + missing_branches = [branch_id for branch_id in wait_for_branches if branch_id not in branch_results] + available_branches = [branch_id for branch_id in wait_for_branches if branch_id in branch_results] + + if missing_branches: + logger.warning(f"Some branches missing for database update - available: {available_branches}, missing: {missing_branches}") + else: + logger.info(f"All expected branches available for database update: {available_branches}") + + # Continue with update using whatever results are available (don't fail on missing branches) # Prepare fields for database update table = action.params.get('table', 'car_frontal_info') @@ -981,7 +1068,7 @@ class DetectionPipeline: logger.warning(f"Failed to resolve field {field_name}: {e}") resolved_fields[field_name] = None - # Execute database update + # Execute database update with available data success = self.db_manager.execute_update( table=table, key_field=key_field, @@ -989,9 +1076,26 @@ class DetectionPipeline: fields=resolved_fields ) + # Log the update result with details about what data was available + non_null_fields = {k: v for k, v in resolved_fields.items() if v is not None} + null_fields = [k for k, v in resolved_fields.items() if v is None] + if success: - return {'status': 'success', 'table': table, 'key': f'{key_field}={key_value}', 'fields': resolved_fields} + logger.info(f"[DATABASE UPDATE] Success for session {key_value}: " + f"updated {len(non_null_fields)} fields {list(non_null_fields.keys())}" + f"{f', {len(null_fields)} null fields {null_fields}' if null_fields else ''}") + return { + 'status': 'success', + 'table': table, + 'key': f'{key_field}={key_value}', + 'fields': resolved_fields, + 'updated_fields': non_null_fields, + 'null_fields': null_fields, + 'available_branches': available_branches, + 'missing_branches': missing_branches + } else: + logger.error(f"[DATABASE UPDATE] Failed for session {key_value}") return {'status': 'error', 'message': 'Database update failed'} except Exception as e: