fix: sent partial results
All checks were successful
Build Worker Base and Application Images / check-base-changes (push) Successful in 7s
Build Worker Base and Application Images / build-base (push) Has been skipped
Build Worker Base and Application Images / build-docker (push) Successful in 3m40s
Build Worker Base and Application Images / deploy-stack (push) Successful in 15s

This commit is contained in:
ziesorx 2025-10-20 17:54:50 +07:00
parent 10c54bc6e0
commit 0dd1b9f5c2
2 changed files with 155 additions and 19 deletions

View file

@ -45,12 +45,17 @@ class BranchProcessor:
self.redis_manager = None
self.db_manager = None
# Branch execution timeout (seconds)
self.branch_timeout = 30.0
# Statistics
self.stats = {
'branches_processed': 0,
'parallel_executions': 0,
'total_processing_time': 0.0,
'models_loaded': 0
'models_loaded': 0,
'branches_timed_out': 0,
'branches_failed': 0
}
logger.info("BranchProcessor initialized")
@ -279,22 +284,46 @@ class BranchProcessor:
)
future_to_branch[future] = branch
# Collect results as they complete
for future in as_completed(future_to_branch):
# Collect results as they complete with timeout
try:
for future in as_completed(future_to_branch, timeout=self.branch_timeout):
branch = future_to_branch[future]
branch_id = getattr(branch, 'model_id', 'unknown')
try:
result = future.result()
# Get result with timeout to prevent indefinite hanging
result = future.result(timeout=self.branch_timeout)
results[branch_id] = result
logger.info(f"[PARALLEL COMPLETE] {branch_id}: Branch completed successfully")
except TimeoutError:
logger.error(f"[TIMEOUT] Branch {branch_id} exceeded timeout of {self.branch_timeout}s")
self.stats['branches_timed_out'] += 1
results[branch_id] = {
'status': 'timeout',
'message': f'Branch execution timeout after {self.branch_timeout}s',
'processing_time': self.branch_timeout
}
except Exception as e:
logger.error(f"Error in parallel branch {branch_id}: {e}")
logger.error(f"[ERROR] Error in parallel branch {branch_id}: {e}", exc_info=True)
self.stats['branches_failed'] += 1
results[branch_id] = {
'status': 'error',
'message': str(e),
'processing_time': 0.0
}
except TimeoutError:
# as_completed iterator timed out - mark remaining futures as timed out
logger.error(f"[TIMEOUT] Branch execution timeout after {self.branch_timeout}s - some branches did not complete")
for future, branch in future_to_branch.items():
branch_id = getattr(branch, 'model_id', 'unknown')
if branch_id not in results:
logger.error(f"[TIMEOUT] Branch {branch_id} did not complete within timeout")
self.stats['branches_timed_out'] += 1
results[branch_id] = {
'status': 'timeout',
'message': f'Branch did not complete within {self.branch_timeout}s timeout',
'processing_time': self.branch_timeout
}
# Flatten nested branch results to top level for database access
flattened_results = {}
@ -309,6 +338,24 @@ class BranchProcessor:
flattened_results[nested_branch_id] = nested_result
logger.info(f"[FLATTEN] Added nested branch {nested_branch_id} to top-level results")
# Log summary of branch execution results
succeeded = [bid for bid, res in results.items() if res.get('status') == 'success']
failed = [bid for bid, res in results.items() if res.get('status') == 'error']
timed_out = [bid for bid, res in results.items() if res.get('status') == 'timeout']
skipped = [bid for bid, res in results.items() if res.get('status') == 'skipped']
summary_parts = []
if succeeded:
summary_parts.append(f"{len(succeeded)} succeeded: {', '.join(succeeded)}")
if failed:
summary_parts.append(f"{len(failed)} FAILED: {', '.join(failed)}")
if timed_out:
summary_parts.append(f"{len(timed_out)} TIMED OUT: {', '.join(timed_out)}")
if skipped:
summary_parts.append(f"{len(skipped)} skipped: {', '.join(skipped)}")
logger.info(f"[PARALLEL SUMMARY] Branch execution completed: {' | '.join(summary_parts) if summary_parts else 'no branches'}")
return flattened_results
async def _execute_sequential_branches(self,
@ -496,9 +543,19 @@ class BranchProcessor:
# Use .predict() method for both detection and classification models
inference_start = time.time()
try:
detection_results = model.model.predict(input_frame, conf=min_confidence, verbose=False)
inference_time = time.time() - inference_start
logger.info(f"[INFERENCE DONE] {branch_id}: Predict completed in {inference_time:.3f}s using .predict() method")
except Exception as inference_error:
inference_time = time.time() - inference_start
logger.error(f"[INFERENCE ERROR] {branch_id}: Model inference failed after {inference_time:.3f}s: {inference_error}", exc_info=True)
return {
'status': 'error',
'branch_id': branch_id,
'message': f'Model inference failed: {str(inference_error)}',
'processing_time': time.time() - start_time
}
# Initialize branch_detections outside the conditional
branch_detections = []

View file

@ -445,6 +445,78 @@ class DetectionPipeline:
except Exception as e:
logger.error(f"Error sending initial detection imageDetection message: {e}", exc_info=True)
async def _send_classification_results(self, subscription_id: str, session_id: str, branch_results: Dict[str, Any]):
"""
Send imageDetection message with classification results (without license plate).
Called after processing phase completes to send partial results immediately.
Args:
subscription_id: Subscription identifier to send message to
session_id: Session identifier
branch_results: Dictionary of branch execution results
"""
try:
if not self.message_sender:
logger.warning("No message sender configured, cannot send imageDetection")
return
# Import here to avoid circular imports
from ..communication.models import ImageDetectionMessage, DetectionData
# Extract classification fields from branch results
extracted_fields = self._extract_fields_from_branches(branch_results)
car_brand = extracted_fields.get('brand')
body_type = extracted_fields.get('body_type')
# Log what we're sending
fields_status = []
if car_brand is not None:
fields_status.append(f"brand={car_brand}")
else:
fields_status.append("brand=null")
if body_type is not None:
fields_status.append(f"bodyType={body_type}")
else:
fields_status.append("bodyType=null")
logger.info(f"[CLASSIFICATION] Sending partial results for session {session_id}: {', '.join(fields_status)}")
# Create detection data with classification results (license plate still pending)
detection_data_obj = DetectionData(
detection={
"carBrand": car_brand,
"carModel": None, # Not implemented yet
"bodyType": body_type,
"licensePlateText": None, # Will be sent later via license plate callback
"licensePlateConfidence": None
},
modelId=self.model_id,
modelName=self.pipeline_parser.pipeline_config.model_id if self.pipeline_parser.pipeline_config else "detection_model"
)
# Create imageDetection message
detection_message = ImageDetectionMessage(
subscriptionIdentifier=subscription_id,
data=detection_data_obj
)
# Send message
await self.message_sender(detection_message)
# Log with indication of partial results
null_fields = []
if car_brand is None:
null_fields.append('brand')
if body_type is None:
null_fields.append('bodyType')
if null_fields:
logger.info(f"[PARTIAL RESULTS] Sent imageDetection with PARTIAL results (null: {', '.join(null_fields)}) - brand='{car_brand}', bodyType='{body_type}' to '{subscription_id}'")
else:
logger.info(f"[CLASSIFICATION COMPLETE] Sent imageDetection with brand='{car_brand}', bodyType='{body_type}' to '{subscription_id}'")
except Exception as e:
logger.error(f"Error sending classification results imageDetection message: {e}", exc_info=True)
async def execute_detection_phase(self,
frame: np.ndarray,
display_id: str,
@ -693,6 +765,13 @@ class DetectionPipeline:
self.session_processing_results[session_id] = result['branch_results']
logger.info(f"[PROCESSING RESULTS] Stored results for session {session_id} for later combination")
# Send classification results immediately (license plate will come later via callback)
await self._send_classification_results(
subscription_id=subscription_id,
session_id=session_id,
branch_results=result['branch_results']
)
logger.info(f"Processing phase completed for session {session_id}: "
f"{len(result['branch_results'])} branches, {len(result['actions_executed'])} actions")