python-detector-worker/core/detection/pipeline.py
ziesorx 354ed9ce3c
All checks were successful
Build Worker Base and Application Images / check-base-changes (push) Successful in 7s
Build Worker Base and Application Images / build-base (push) Has been skipped
Build Worker Base and Application Images / build-docker (push) Successful in 2m28s
Build Worker Base and Application Images / deploy-stack (push) Successful in 14s
fix: fallback when there is sessionId
2025-09-30 15:46:32 +07:00

1169 lines
No EOL
51 KiB
Python

"""
Detection Pipeline Module.
Main detection pipeline orchestration that coordinates detection flow and execution.
"""
import asyncio
import logging
import time
import uuid
from datetime import datetime
from typing import Dict, List, Optional, Any
from concurrent.futures import ThreadPoolExecutor
import numpy as np
from ..models.inference import YOLOWrapper
from ..models.pipeline import PipelineParser
from .branches import BranchProcessor
from ..storage.redis import RedisManager
from ..storage.database import DatabaseManager
from ..storage.license_plate import LicensePlateManager
logger = logging.getLogger(__name__)
class DetectionPipeline:
"""
Main detection pipeline that orchestrates the complete detection flow.
Handles detection execution, branch coordination, and result aggregation.
"""
def __init__(self, pipeline_parser: PipelineParser, model_manager: Any, model_id: int, message_sender=None):
"""
Initialize detection pipeline.
Args:
pipeline_parser: Pipeline parser with loaded configuration
model_manager: Model manager for loading models
model_id: The model ID to use for loading models
message_sender: Optional callback function for sending WebSocket messages
"""
self.pipeline_parser = pipeline_parser
self.model_manager = model_manager
self.model_id = model_id
self.message_sender = message_sender
# Initialize components
self.branch_processor = BranchProcessor(model_manager, model_id)
self.redis_manager = None
self.db_manager = None
self.license_plate_manager = None
# Main detection model
self.detection_model: Optional[YOLOWrapper] = None
self.detection_model_id = None
# Thread pool for parallel processing
self.executor = ThreadPoolExecutor(max_workers=4)
# Pipeline configuration
self.pipeline_config = pipeline_parser.pipeline_config
# SessionId to subscriptionIdentifier mapping
self.session_to_subscription = {}
# SessionId to processing results mapping (for combining with license plate results)
self.session_processing_results = {}
# Field mappings from parallelActions (e.g., {"car_brand": "{car_brand_cls_v3.brand}"})
self.field_mappings = {}
self._parse_field_mappings()
# Statistics
self.stats = {
'detections_processed': 0,
'branches_executed': 0,
'actions_executed': 0,
'total_processing_time': 0.0
}
logger.info("DetectionPipeline initialized")
def _parse_field_mappings(self):
"""
Parse field mappings from parallelActions.postgresql_update_combined.fields.
Extracts mappings like {"car_brand": "{car_brand_cls_v3.brand}"} for dynamic field resolution.
"""
try:
if not self.pipeline_config or not hasattr(self.pipeline_config, 'parallel_actions'):
return
for action in self.pipeline_config.parallel_actions:
if action.type.value == 'postgresql_update_combined':
fields = action.params.get('fields', {})
self.field_mappings = fields
logger.info(f"[FIELD MAPPINGS] Parsed from pipeline config: {self.field_mappings}")
break
except Exception as e:
logger.error(f"Error parsing field mappings: {e}", exc_info=True)
async def initialize(self) -> bool:
"""
Initialize all pipeline components including models, Redis, and database.
Returns:
True if successful, False otherwise
"""
try:
# Initialize Redis connection
if self.pipeline_parser.redis_config:
self.redis_manager = RedisManager(self.pipeline_parser.redis_config.__dict__)
if not await self.redis_manager.initialize():
logger.error("Failed to initialize Redis connection")
return False
logger.info("Redis connection initialized")
# Initialize database connection
if self.pipeline_parser.postgresql_config:
self.db_manager = DatabaseManager(self.pipeline_parser.postgresql_config.__dict__)
if not self.db_manager.connect():
logger.error("Failed to initialize database connection")
return False
# Create required tables
if not self.db_manager.create_car_frontal_info_table():
logger.warning("Failed to create car_frontal_info table")
logger.info("Database connection initialized")
# Initialize license plate manager (using same Redis config as main Redis manager)
if self.pipeline_parser.redis_config:
self.license_plate_manager = LicensePlateManager(self.pipeline_parser.redis_config.__dict__)
if not await self.license_plate_manager.initialize(self._on_license_plate_result):
logger.error("Failed to initialize license plate manager")
return False
logger.info("License plate manager initialized")
# Initialize main detection model
if not await self._initialize_detection_model():
logger.error("Failed to initialize detection model")
return False
# Initialize branch processor
if not await self.branch_processor.initialize(
self.pipeline_config,
self.redis_manager,
self.db_manager
):
logger.error("Failed to initialize branch processor")
return False
logger.info("Detection pipeline initialization completed successfully")
return True
except Exception as e:
logger.error(f"Error initializing detection pipeline: {e}", exc_info=True)
return False
async def _initialize_detection_model(self) -> bool:
"""
Load and initialize the main detection model.
Returns:
True if successful, False otherwise
"""
try:
if not self.pipeline_config:
logger.warning("No pipeline configuration found")
return False
model_file = getattr(self.pipeline_config, 'model_file', None)
model_id = getattr(self.pipeline_config, 'model_id', None)
if not model_file:
logger.warning("No detection model file specified")
return False
# Load detection model
logger.info(f"Loading detection model: {model_id} ({model_file})")
self.detection_model = self.model_manager.get_yolo_model(self.model_id, model_file)
if not self.detection_model:
logger.error(f"Failed to load detection model {model_file} from model {self.model_id}")
return False
self.detection_model_id = model_id
logger.info(f"Detection model {model_id} loaded successfully")
return True
except Exception as e:
logger.error(f"Error initializing detection model: {e}", exc_info=True)
return False
def _extract_fields_from_branches(self, branch_results: Dict[str, Any]) -> Dict[str, Any]:
"""
Extract fields dynamically from branch results using field mappings.
Args:
branch_results: Dictionary of branch execution results
Returns:
Dictionary with extracted field values (e.g., {"car_brand": "Honda", "body_type": "Sedan"})
"""
extracted = {}
try:
for db_field_name, template in self.field_mappings.items():
# Parse template like "{car_brand_cls_v3.brand}" -> branch_id="car_brand_cls_v3", field="brand"
if template.startswith('{') and template.endswith('}'):
var_name = template[1:-1]
if '.' in var_name:
branch_id, field_name = var_name.split('.', 1)
# Look up value in branch_results
if branch_id in branch_results:
branch_data = branch_results[branch_id]
if isinstance(branch_data, dict) and 'result' in branch_data:
result_data = branch_data['result']
if isinstance(result_data, dict) and field_name in result_data:
extracted[field_name] = result_data[field_name]
logger.debug(f"[DYNAMIC EXTRACT] {field_name}={result_data[field_name]} from branch {branch_id}")
else:
logger.debug(f"[DYNAMIC EXTRACT] Field '{field_name}' not found in branch {branch_id}")
else:
logger.debug(f"[DYNAMIC EXTRACT] Branch '{branch_id}' not in results")
except Exception as e:
logger.error(f"Error extracting fields from branches: {e}", exc_info=True)
return extracted
async def _on_license_plate_result(self, session_id: str, license_data: Dict[str, Any]):
"""
Callback for handling license plate results from LPR service.
Args:
session_id: Session identifier
license_data: License plate data including text and confidence
"""
try:
license_text = license_data.get('license_plate_text', '')
confidence = license_data.get('confidence', 0.0)
logger.info(f"[LICENSE PLATE CALLBACK] Session {session_id}: "
f"text='{license_text}', confidence={confidence:.3f}")
# Find matching subscriptionIdentifier for this sessionId
subscription_id = self.session_to_subscription.get(session_id)
if not subscription_id:
logger.warning(f"[LICENSE PLATE] No subscription found for sessionId '{session_id}' (type: {type(session_id)}), cannot send imageDetection")
logger.warning(f"[LICENSE PLATE DEBUG] Current session mappings: {dict(self.session_to_subscription)}")
# Try to find by type conversion in case of type mismatch
# Try as integer if session_id is string
if isinstance(session_id, str) and session_id.isdigit():
session_id_int = int(session_id)
subscription_id = self.session_to_subscription.get(session_id_int)
if subscription_id:
logger.info(f"[LICENSE PLATE] Found subscription using int conversion: '{session_id}' -> {session_id_int} -> '{subscription_id}'")
else:
logger.error(f"[LICENSE PLATE] Failed to find subscription with int conversion")
return
# Try as string if session_id is integer
elif isinstance(session_id, int):
session_id_str = str(session_id)
subscription_id = self.session_to_subscription.get(session_id_str)
if subscription_id:
logger.info(f"[LICENSE PLATE] Found subscription using string conversion: {session_id} -> '{session_id_str}' -> '{subscription_id}'")
else:
logger.error(f"[LICENSE PLATE] Failed to find subscription with string conversion")
return
else:
logger.error(f"[LICENSE PLATE] Failed to find subscription with any type conversion")
return
# Send imageDetection message with license plate data combined with processing results
await self._send_license_plate_message(subscription_id, license_text, confidence, session_id)
# Update database with license plate information if database manager is available
if self.db_manager and license_text:
success = self.db_manager.execute_update(
table='car_frontal_info',
key_field='session_id',
key_value=session_id,
fields={
'license_character': license_text,
'license_type': 'LPR_detected' # Mark as detected by LPR service
}
)
if success:
logger.info(f"[LICENSE PLATE] Updated database for session {session_id}")
else:
logger.warning(f"[LICENSE PLATE] Failed to update database for session {session_id}")
except Exception as e:
logger.error(f"Error in license plate result callback: {e}", exc_info=True)
async def _send_license_plate_message(self, subscription_id: str, license_text: str, confidence: float, session_id: str = None):
"""
Send imageDetection message with license plate data plus any available processing results.
Args:
subscription_id: Subscription identifier to send message to
license_text: License plate text
confidence: License plate confidence score
session_id: Session identifier for looking up processing results
"""
try:
if not self.message_sender:
logger.warning("No message sender configured, cannot send imageDetection")
return
# Import here to avoid circular imports
from ..communication.models import ImageDetectionMessage, DetectionData
# Get processing results for this session from stored results
car_brand = None
body_type = None
# Find session_id from session mappings (we need session_id as key)
session_id_for_lookup = None
# Try direct lookup first (if session_id is already the right type)
if session_id in self.session_processing_results:
session_id_for_lookup = session_id
else:
# Try to find by type conversion
for stored_session_id in self.session_processing_results.keys():
if str(stored_session_id) == str(session_id):
session_id_for_lookup = stored_session_id
break
if session_id_for_lookup and session_id_for_lookup in self.session_processing_results:
branch_results = self.session_processing_results[session_id_for_lookup]
logger.info(f"[LICENSE PLATE] Retrieved processing results for session {session_id_for_lookup}")
# Extract fields dynamically using field mappings from pipeline config
extracted_fields = self._extract_fields_from_branches(branch_results)
car_brand = extracted_fields.get('brand')
body_type = extracted_fields.get('body_type')
logger.info(f"[LICENSE PLATE] Extracted fields: brand={car_brand}, body_type={body_type}")
# Clean up stored results after use
del self.session_processing_results[session_id_for_lookup]
logger.debug(f"[LICENSE PLATE] Cleaned up stored results for session {session_id_for_lookup}")
else:
logger.warning(f"[LICENSE PLATE] No processing results found for session {session_id}")
# Create detection data with combined information
detection_data_obj = DetectionData(
detection={
"carBrand": car_brand,
"carModel": None,
"bodyType": body_type,
"licensePlateText": license_text,
"licensePlateConfidence": confidence
},
modelId=self.model_id,
modelName=self.pipeline_parser.pipeline_config.model_id if self.pipeline_parser.pipeline_config else "detection_model"
)
# Create imageDetection message
detection_message = ImageDetectionMessage(
subscriptionIdentifier=subscription_id,
data=detection_data_obj
)
# Send message
await self.message_sender(detection_message)
logger.info(f"[COMBINED MESSAGE] Sent imageDetection with brand='{car_brand}', bodyType='{body_type}', license='{license_text}' to '{subscription_id}'")
except Exception as e:
logger.error(f"Error sending license plate imageDetection message: {e}", exc_info=True)
async def _send_initial_detection_message(self, subscription_id: str):
"""
Send initial imageDetection message when vehicle is first detected.
Args:
subscription_id: Subscription identifier to send message to
"""
try:
if not self.message_sender:
logger.warning("No message sender configured, cannot send imageDetection")
return
# Import here to avoid circular imports
from ..communication.models import ImageDetectionMessage, DetectionData
# Create detection data with all fields as None (vehicle just detected, no classification yet)
detection_data_obj = DetectionData(
detection={
"carBrand": None,
"carModel": None,
"bodyType": None,
"licensePlateText": None,
"licensePlateConfidence": None
},
modelId=self.model_id,
modelName=self.pipeline_parser.pipeline_config.model_id if self.pipeline_parser.pipeline_config else "detection_model"
)
# Create imageDetection message
detection_message = ImageDetectionMessage(
subscriptionIdentifier=subscription_id,
data=detection_data_obj
)
# Send message
await self.message_sender(detection_message)
logger.info(f"[INITIAL DETECTION] Sent imageDetection for vehicle detection to '{subscription_id}'")
except Exception as e:
logger.error(f"Error sending initial detection imageDetection message: {e}", exc_info=True)
async def execute_detection_phase(self,
frame: np.ndarray,
display_id: str,
subscription_id: str) -> Dict[str, Any]:
"""
Execute only the detection phase - run main detection and send imageDetection message.
This is the first phase that runs when a vehicle is validated.
Args:
frame: Input frame to process
display_id: Display identifier
subscription_id: Subscription identifier
Returns:
Dictionary with detection phase results
"""
start_time = time.time()
result = {
'status': 'success',
'detections': [],
'message_sent': False,
'processing_time': 0.0,
'timestamp': datetime.now().isoformat()
}
try:
# Run main detection model
if not self.detection_model:
result['status'] = 'error'
result['message'] = 'Detection model not available'
return result
# Create detection context
detection_context = {
'display_id': display_id,
'subscription_id': subscription_id,
'timestamp': datetime.now().strftime("%Y-%m-%dT%H-%M-%S"),
'timestamp_ms': int(time.time() * 1000)
}
# Run inference on single snapshot using .predict() method
detection_results = self.detection_model.model.predict(
frame,
conf=getattr(self.pipeline_config, 'min_confidence', 0.6),
verbose=False
)
# Process detection results using clean logic
valid_detections = []
detected_regions = {}
if detection_results and len(detection_results) > 0:
result_obj = detection_results[0]
trigger_classes = getattr(self.pipeline_config, 'trigger_classes', [])
# Handle .predict() results which have .boxes for detection models
if hasattr(result_obj, 'boxes') and result_obj.boxes is not None:
logger.info(f"[DETECTION PHASE] Found {len(result_obj.boxes)} raw detections from {getattr(self.pipeline_config, 'model_id', 'unknown')}")
for i, box in enumerate(result_obj.boxes):
class_id = int(box.cls[0])
confidence = float(box.conf[0])
bbox = box.xyxy[0].cpu().numpy().tolist() # [x1, y1, x2, y2]
class_name = self.detection_model.model.names[class_id]
logger.info(f"[DETECTION PHASE {i+1}] {class_name}: bbox={bbox}, conf={confidence:.3f}")
# Check if detection matches trigger classes
if trigger_classes and class_name not in trigger_classes:
logger.debug(f"[DETECTION PHASE] Filtered '{class_name}' - not in trigger_classes {trigger_classes}")
continue
logger.info(f"[DETECTION PHASE] Accepted '{class_name}' - matches trigger_classes")
# Store detection info
detection_info = {
'class_name': class_name,
'confidence': confidence,
'bbox': bbox
}
valid_detections.append(detection_info)
# Store region for processing phase
detected_regions[class_name] = {
'bbox': bbox,
'confidence': confidence
}
else:
logger.warning("[DETECTION PHASE] No boxes found in detection results")
# Store detected_regions in result for processing phase
result['detected_regions'] = detected_regions
result['detections'] = valid_detections
# If we have valid detections, create session and send initial imageDetection
if valid_detections:
logger.info(f"Found {len(valid_detections)} valid detections, storing session mapping")
# Store mapping from display_id to subscriptionIdentifier (for detection phase)
# Note: We'll store session_id mapping later in processing phase
self.session_to_subscription[display_id] = subscription_id
logger.info(f"[SESSION MAPPING] Stored mapping: displayId '{display_id}' -> subscriptionIdentifier '{subscription_id}'")
# Send initial imageDetection message with empty detection data
await self._send_initial_detection_message(subscription_id)
logger.info(f"Detection phase completed - {len(valid_detections)} detections found for {display_id}")
result['message_sent'] = True
else:
logger.debug("No valid detections found in detection phase")
except Exception as e:
logger.error(f"Error in detection phase: {e}", exc_info=True)
result['status'] = 'error'
result['message'] = str(e)
result['processing_time'] = time.time() - start_time
return result
async def execute_processing_phase(self,
frame: np.ndarray,
display_id: str,
session_id: str,
subscription_id: str,
detected_regions: Dict[str, Any] = None) -> Dict[str, Any]:
"""
Execute the processing phase - run branches and database operations after receiving sessionId.
This is the second phase that runs after backend sends setSessionId.
Args:
frame: Input frame to process
display_id: Display identifier
session_id: Session ID from backend
subscription_id: Subscription identifier
detected_regions: Pre-detected regions from detection phase
Returns:
Dictionary with processing phase results
"""
start_time = time.time()
result = {
'status': 'success',
'branch_results': {},
'actions_executed': [],
'session_id': session_id,
'processing_time': 0.0,
'timestamp': datetime.now().isoformat()
}
try:
# Create enhanced detection context with session_id
detection_context = {
'display_id': display_id,
'session_id': session_id,
'subscription_id': subscription_id,
'timestamp': datetime.now().strftime("%Y-%m-%dT%H-%M-%S"),
'timestamp_ms': int(time.time() * 1000),
'uuid': str(uuid.uuid4()),
'filename': f"{uuid.uuid4()}.jpg"
}
# If no detected_regions provided, re-run detection to get them
if not detected_regions:
# Use .predict() method for detection
detection_results = self.detection_model.model.predict(
frame,
conf=getattr(self.pipeline_config, 'min_confidence', 0.6),
verbose=False
)
detected_regions = {}
if detection_results and len(detection_results) > 0:
result_obj = detection_results[0]
if hasattr(result_obj, 'boxes') and result_obj.boxes is not None:
for box in result_obj.boxes:
class_id = int(box.cls[0])
confidence = float(box.conf[0])
bbox = box.xyxy[0].cpu().numpy().tolist() # [x1, y1, x2, y2]
class_name = self.detection_model.model.names[class_id]
detected_regions[class_name] = {
'bbox': bbox,
'confidence': confidence
}
# Store session mapping for license plate callback
if session_id:
self.session_to_subscription[session_id] = subscription_id
logger.info(f"[SESSION MAPPING] Stored mapping: sessionId '{session_id}' -> subscriptionIdentifier '{subscription_id}'")
# Initialize database record with session_id
if session_id and self.db_manager:
success = self.db_manager.insert_initial_detection(
display_id=display_id,
captured_timestamp=detection_context['timestamp'],
session_id=session_id
)
if success:
logger.info(f"Created initial database record with session {session_id}")
else:
logger.warning(f"Failed to create initial database record for session {session_id}")
# Execute branches in parallel
if hasattr(self.pipeline_config, 'branches') and self.pipeline_config.branches:
branch_results = await self.branch_processor.execute_branches(
frame=frame,
branches=self.pipeline_config.branches,
detected_regions=detected_regions,
detection_context=detection_context
)
result['branch_results'] = branch_results
logger.info(f"Executed {len(branch_results)} branches for session {session_id}")
# Execute immediate actions (non-parallel)
immediate_actions = getattr(self.pipeline_config, 'actions', [])
if immediate_actions:
executed_actions = await self._execute_immediate_actions(
actions=immediate_actions,
frame=frame,
detected_regions=detected_regions,
detection_context=detection_context
)
result['actions_executed'].extend(executed_actions)
# Execute parallel actions (after all branches complete)
parallel_actions = getattr(self.pipeline_config, 'parallel_actions', [])
if parallel_actions:
# Add branch results to context
enhanced_context = {**detection_context}
if result['branch_results']:
enhanced_context['branch_results'] = result['branch_results']
executed_parallel_actions = await self._execute_parallel_actions(
actions=parallel_actions,
frame=frame,
detected_regions=detected_regions,
context=enhanced_context
)
result['actions_executed'].extend(executed_parallel_actions)
# Store processing results for later combination with license plate data
if result['branch_results'] and session_id:
self.session_processing_results[session_id] = result['branch_results']
logger.info(f"[PROCESSING RESULTS] Stored results for session {session_id} for later combination")
logger.info(f"Processing phase completed for session {session_id}: "
f"{len(result['branch_results'])} branches, {len(result['actions_executed'])} actions")
except Exception as e:
logger.error(f"Error in processing phase: {e}", exc_info=True)
result['status'] = 'error'
result['message'] = str(e)
result['processing_time'] = time.time() - start_time
return result
async def execute_detection(self,
frame: np.ndarray,
display_id: str,
session_id: Optional[str] = None,
subscription_id: Optional[str] = None) -> Dict[str, Any]:
"""
Execute the main detection pipeline on a frame.
Args:
frame: Input frame to process
display_id: Display identifier
session_id: Optional session ID
subscription_id: Optional subscription identifier
Returns:
Dictionary with detection results
"""
start_time = time.time()
result = {
'status': 'success',
'detections': [],
'branch_results': {},
'actions_executed': [],
'session_id': session_id,
'processing_time': 0.0,
'timestamp': datetime.now().isoformat()
}
try:
# Update stats
self.stats['detections_processed'] += 1
# Run main detection model
if not self.detection_model:
result['status'] = 'error'
result['message'] = 'Detection model not available'
return result
# Create detection context
detection_context = {
'display_id': display_id,
'session_id': session_id,
'subscription_id': subscription_id,
'timestamp': datetime.now().strftime("%Y-%m-%dT%H-%M-%S"),
'timestamp_ms': int(time.time() * 1000),
'uuid': str(uuid.uuid4()),
'filename': f"{uuid.uuid4()}.jpg"
}
# Run inference on single snapshot using .predict() method
detection_results = self.detection_model.model.predict(
frame,
conf=getattr(self.pipeline_config, 'min_confidence', 0.6),
verbose=False
)
# Process detection results
detected_regions = {}
valid_detections = []
if detection_results and len(detection_results) > 0:
result_obj = detection_results[0]
trigger_classes = getattr(self.pipeline_config, 'trigger_classes', [])
# Handle .predict() results which have .boxes for detection models
if hasattr(result_obj, 'boxes') and result_obj.boxes is not None:
logger.info(f"[PIPELINE RAW] Found {len(result_obj.boxes)} raw detections from {getattr(self.pipeline_config, 'model_id', 'unknown')}")
for i, box in enumerate(result_obj.boxes):
class_id = int(box.cls[0])
confidence = float(box.conf[0])
bbox = box.xyxy[0].cpu().numpy().tolist() # [x1, y1, x2, y2]
class_name = self.detection_model.model.names[class_id]
logger.info(f"[PIPELINE RAW {i+1}] {class_name}: bbox={bbox}, conf={confidence:.3f}")
# Check if detection matches trigger classes
if trigger_classes and class_name not in trigger_classes:
continue
# Store detection info
detection_info = {
'class_name': class_name,
'confidence': confidence,
'bbox': bbox
}
valid_detections.append(detection_info)
# Store region for cropping
detected_regions[class_name] = {
'bbox': bbox,
'confidence': confidence
}
logger.info(f"[PIPELINE DETECTION] {class_name}: bbox={bbox}, conf={confidence:.3f}")
result['detections'] = valid_detections
# If we have valid detections, proceed with branches and actions
if valid_detections:
logger.info(f"Found {len(valid_detections)} valid detections for pipeline processing")
# Initialize database record if session_id is provided
if session_id and self.db_manager:
success = self.db_manager.insert_initial_detection(
display_id=display_id,
captured_timestamp=detection_context['timestamp'],
session_id=session_id
)
if not success:
logger.warning(f"Failed to create initial database record for session {session_id}")
# Execute branches in parallel
if hasattr(self.pipeline_config, 'branches') and self.pipeline_config.branches:
branch_results = await self.branch_processor.execute_branches(
frame=frame,
branches=self.pipeline_config.branches,
detected_regions=detected_regions,
detection_context=detection_context
)
result['branch_results'] = branch_results
self.stats['branches_executed'] += len(branch_results)
# Execute immediate actions (non-parallel)
immediate_actions = getattr(self.pipeline_config, 'actions', [])
if immediate_actions:
executed_actions = await self._execute_immediate_actions(
actions=immediate_actions,
frame=frame,
detected_regions=detected_regions,
detection_context=detection_context
)
result['actions_executed'].extend(executed_actions)
# Execute parallel actions (after all branches complete)
parallel_actions = getattr(self.pipeline_config, 'parallel_actions', [])
if parallel_actions:
# Add branch results to context
enhanced_context = {**detection_context}
if result['branch_results']:
enhanced_context['branch_results'] = result['branch_results']
executed_parallel_actions = await self._execute_parallel_actions(
actions=parallel_actions,
frame=frame,
detected_regions=detected_regions,
context=enhanced_context
)
result['actions_executed'].extend(executed_parallel_actions)
self.stats['actions_executed'] += len(result['actions_executed'])
else:
logger.debug("No valid detections found for pipeline processing")
except Exception as e:
logger.error(f"Error in detection pipeline execution: {e}", exc_info=True)
result['status'] = 'error'
result['message'] = str(e)
# Update timing
processing_time = time.time() - start_time
result['processing_time'] = processing_time
self.stats['total_processing_time'] += processing_time
return result
async def _execute_immediate_actions(self,
actions: List[Dict],
frame: np.ndarray,
detected_regions: Dict[str, Any],
detection_context: Dict[str, Any]) -> List[Dict]:
"""
Execute immediate actions (non-parallel).
Args:
actions: List of action configurations
frame: Input frame
detected_regions: Dictionary of detected regions
detection_context: Detection context data
Returns:
List of executed action results
"""
executed_actions = []
for action in actions:
try:
action_type = action.type.value
logger.debug(f"Executing immediate action: {action_type}")
if action_type == 'redis_save_image':
result = await self._execute_redis_save_image(
action, frame, detected_regions, detection_context
)
elif action_type == 'redis_publish':
result = await self._execute_redis_publish(
action, detection_context
)
else:
logger.warning(f"Unknown immediate action type: {action_type}")
result = {'status': 'error', 'message': f'Unknown action type: {action_type}'}
executed_actions.append({
'action_type': action_type,
'result': result
})
except Exception as e:
logger.error(f"Error executing immediate action {action_type}: {e}", exc_info=True)
executed_actions.append({
'action_type': action.type.value,
'result': {'status': 'error', 'message': str(e)}
})
return executed_actions
async def _execute_parallel_actions(self,
actions: List[Dict],
frame: np.ndarray,
detected_regions: Dict[str, Any],
context: Dict[str, Any]) -> List[Dict]:
"""
Execute parallel actions (after branches complete).
Args:
actions: List of parallel action configurations
frame: Input frame
detected_regions: Dictionary of detected regions
context: Enhanced context with branch results
Returns:
List of executed action results
"""
executed_actions = []
for action in actions:
try:
action_type = action.type.value
logger.debug(f"Executing parallel action: {action_type}")
if action_type == 'postgresql_update_combined':
result = await self._execute_postgresql_update_combined(action, context)
# Update session state with processing results after database update
if result.get('status') == 'success':
await self._update_session_with_processing_results(context)
else:
logger.warning(f"Unknown parallel action type: {action_type}")
result = {'status': 'error', 'message': f'Unknown action type: {action_type}'}
executed_actions.append({
'action_type': action_type,
'result': result
})
except Exception as e:
logger.error(f"Error executing parallel action {action_type}: {e}", exc_info=True)
executed_actions.append({
'action_type': action.type.value,
'result': {'status': 'error', 'message': str(e)}
})
return executed_actions
async def _execute_redis_save_image(self,
action: Dict,
frame: np.ndarray,
detected_regions: Dict[str, Any],
context: Dict[str, Any]) -> Dict[str, Any]:
"""Execute redis_save_image action."""
if not self.redis_manager:
return {'status': 'error', 'message': 'Redis not available'}
try:
# Get image to save (cropped or full frame)
image_to_save = frame
region_name = action.params.get('region')
if region_name and region_name in detected_regions:
# Crop the specified region
bbox = detected_regions[region_name]['bbox']
x1, y1, x2, y2 = [int(coord) for coord in bbox]
cropped = frame[y1:y2, x1:x2]
if cropped.size > 0:
image_to_save = cropped
logger.debug(f"Cropped region '{region_name}' for redis_save_image")
else:
logger.warning(f"Empty crop for region '{region_name}', using full frame")
# Format key with context
key = action.params['key'].format(**context)
# Save image to Redis
result = await self.redis_manager.save_image(
key=key,
image=image_to_save,
expire_seconds=action.params.get('expire_seconds'),
image_format=action.params.get('format', 'jpeg'),
quality=action.params.get('quality', 90)
)
if result:
# Add image_key to context for subsequent actions
context['image_key'] = key
return {'status': 'success', 'key': key}
else:
return {'status': 'error', 'message': 'Failed to save image to Redis'}
except Exception as e:
logger.error(f"Error in redis_save_image action: {e}", exc_info=True)
return {'status': 'error', 'message': str(e)}
async def _execute_redis_publish(self, action: Dict, context: Dict[str, Any]) -> Dict[str, Any]:
"""Execute redis_publish action."""
if not self.redis_manager:
return {'status': 'error', 'message': 'Redis not available'}
try:
channel = action.params['channel']
message_template = action.params['message']
# Format message with context
message = message_template.format(**context)
# Publish message
result = await self.redis_manager.publish_message(channel, message)
if result >= 0: # Redis publish returns number of subscribers
return {'status': 'success', 'subscribers': result, 'channel': channel}
else:
return {'status': 'error', 'message': 'Failed to publish message to Redis'}
except Exception as e:
logger.error(f"Error in redis_publish action: {e}", exc_info=True)
return {'status': 'error', 'message': str(e)}
async def _execute_postgresql_update_combined(self,
action: Dict,
context: Dict[str, Any]) -> Dict[str, Any]:
"""Execute postgresql_update_combined action."""
if not self.db_manager:
return {'status': 'error', 'message': 'Database not available'}
try:
# Wait for required branches if specified
wait_for_branches = action.params.get('waitForBranches', [])
branch_results = context.get('branch_results', {})
# Check if all required branches have completed
for branch_id in wait_for_branches:
if branch_id not in branch_results:
logger.warning(f"Branch {branch_id} result not available for database update")
return {'status': 'error', 'message': f'Missing branch result: {branch_id}'}
# Prepare fields for database update
table = action.params.get('table', 'car_frontal_info')
key_field = action.params.get('key_field', 'session_id')
key_value = action.params.get('key_value', '{session_id}').format(**context)
field_mappings = action.params.get('fields', {})
# Resolve field values using branch results
resolved_fields = {}
for field_name, field_template in field_mappings.items():
try:
# Replace template variables with actual values from branch results
resolved_value = self._resolve_field_template(field_template, branch_results, context)
resolved_fields[field_name] = resolved_value
except Exception as e:
logger.warning(f"Failed to resolve field {field_name}: {e}")
resolved_fields[field_name] = None
# Execute database update
success = self.db_manager.execute_update(
table=table,
key_field=key_field,
key_value=key_value,
fields=resolved_fields
)
if success:
return {'status': 'success', 'table': table, 'key': f'{key_field}={key_value}', 'fields': resolved_fields}
else:
return {'status': 'error', 'message': 'Database update failed'}
except Exception as e:
logger.error(f"Error in postgresql_update_combined action: {e}", exc_info=True)
return {'status': 'error', 'message': str(e)}
def _resolve_field_template(self, template: str, branch_results: Dict, context: Dict) -> str:
"""
Resolve field template using branch results and context.
Args:
template: Template string like "{car_brand_cls_v3.brand}"
branch_results: Dictionary of branch execution results
context: Detection context
Returns:
Resolved field value
"""
try:
# Handle simple context variables first
if template.startswith('{') and template.endswith('}'):
var_name = template[1:-1]
# Check for branch result reference (e.g., "car_brand_cls_v3.brand")
if '.' in var_name:
branch_id, field_name = var_name.split('.', 1)
if branch_id in branch_results:
branch_data = branch_results[branch_id]
# Look for the field in branch results
if isinstance(branch_data, dict) and 'result' in branch_data:
result_data = branch_data['result']
if isinstance(result_data, dict) and field_name in result_data:
return str(result_data[field_name])
logger.warning(f"Field {field_name} not found in branch {branch_id} results")
return None
else:
logger.warning(f"Branch {branch_id} not found in results")
return None
# Simple context variable
elif var_name in context:
return str(context[var_name])
logger.warning(f"Template variable {var_name} not found in context or branch results")
return None
# Return template as-is if not a template variable
return template
except Exception as e:
logger.error(f"Error resolving field template {template}: {e}")
return None
async def _update_session_with_processing_results(self, context: Dict[str, Any]):
"""
Update session state with processing results from branch execution.
Args:
context: Detection context containing branch results and session info
"""
try:
branch_results = context.get('branch_results', {})
session_id = context.get('session_id', '')
subscription_id = context.get('subscription_id', '')
if not session_id:
logger.warning("No session_id in context for processing results")
return
# Extract fields dynamically using field mappings from pipeline config
extracted_fields = self._extract_fields_from_branches(branch_results)
car_brand = extracted_fields.get('brand')
body_type = extracted_fields.get('body_type')
logger.info(f"[PROCESSING RESULTS] Completed for session {session_id}: "
f"brand={car_brand}, bodyType={body_type}")
except Exception as e:
logger.error(f"Error updating session with processing results: {e}", exc_info=True)
def get_statistics(self) -> Dict[str, Any]:
"""Get detection pipeline statistics."""
branch_stats = self.branch_processor.get_statistics() if self.branch_processor else {}
license_stats = self.license_plate_manager.get_statistics() if self.license_plate_manager else {}
return {
'pipeline': self.stats,
'branches': branch_stats,
'license_plate': license_stats,
'redis_available': self.redis_manager is not None,
'database_available': self.db_manager is not None,
'detection_model_loaded': self.detection_model is not None
}
def cleanup(self):
"""Cleanup resources."""
if self.executor:
self.executor.shutdown(wait=False)
if self.redis_manager:
self.redis_manager.cleanup()
if self.db_manager:
self.db_manager.disconnect()
if self.branch_processor:
self.branch_processor.cleanup()
if self.license_plate_manager:
# Schedule cleanup task and track it to prevent warnings
cleanup_task = asyncio.create_task(self.license_plate_manager.close())
cleanup_task.add_done_callback(lambda _: None) # Suppress "Task exception was never retrieved"
logger.info("Detection pipeline cleaned up")