From 476f19cabe8ce5b8a55c72757ded703c9306dff0 Mon Sep 17 00:00:00 2001 From: ziesorx Date: Wed, 24 Sep 2025 22:01:26 +0700 Subject: [PATCH] Refactor: done phase 5 --- core/detection/branches.py | 210 ++++++++++++++++++++ core/detection/pipeline.py | 357 +++++++++++++++++++++++----------- core/storage/license_plate.py | 282 +++++++++++++++++++++++++++ 3 files changed, 740 insertions(+), 109 deletions(-) create mode 100644 core/storage/license_plate.py diff --git a/core/detection/branches.py b/core/detection/branches.py index 47cd7fc..e0ca1df 100644 --- a/core/detection/branches.py +++ b/core/detection/branches.py @@ -520,6 +520,58 @@ class BranchProcessor: else: logger.warning(f"[NO RESULTS] {branch_id}: No detections found") + # Execute branch actions if this branch found valid detections + actions_executed = [] + branch_actions = getattr(branch_config, 'actions', []) + if branch_actions and branch_detections: + logger.info(f"[BRANCH ACTIONS] {branch_id}: Executing {len(branch_actions)} actions") + + # Create detected_regions from THIS branch's detections for actions + branch_detected_regions = {} + for detection in branch_detections: + branch_detected_regions[detection['class_name']] = { + 'bbox': detection['bbox'], + 'confidence': detection['confidence'] + } + + for action in branch_actions: + try: + action_type = action.type.value # Access the enum value + logger.info(f"[ACTION EXECUTE] {branch_id}: Executing action '{action_type}'") + + if action_type == 'redis_save_image': + action_result = self._execute_redis_save_image_sync( + action, input_frame, branch_detected_regions, detection_context + ) + elif action_type == 'redis_publish': + action_result = self._execute_redis_publish_sync( + action, detection_context + ) + else: + logger.warning(f"[ACTION UNKNOWN] {branch_id}: Unknown action type '{action_type}'") + action_result = {'status': 'error', 'message': f'Unknown action type: {action_type}'} + + actions_executed.append({ + 'action_type': action_type, + 'result': action_result + }) + + logger.info(f"[ACTION COMPLETE] {branch_id}: Action '{action_type}' result: {action_result.get('status')}") + + except Exception as e: + action_type = getattr(action, 'type', None) + if action_type: + action_type = action_type.value if hasattr(action_type, 'value') else str(action_type) + logger.error(f"[ACTION ERROR] {branch_id}: Error executing action '{action_type}': {e}", exc_info=True) + actions_executed.append({ + 'action_type': action_type, + 'result': {'status': 'error', 'message': str(e)} + }) + + # Add actions executed to result + if actions_executed: + result['actions_executed'] = actions_executed + # Handle nested branches ONLY if parent found valid detections nested_branches = getattr(branch_config, 'branches', []) if nested_branches: @@ -566,6 +618,164 @@ class BranchProcessor: return result + def _execute_redis_save_image_sync(self, + action: Dict, + frame: np.ndarray, + detected_regions: Dict[str, Any], + context: Dict[str, Any]) -> Dict[str, Any]: + """Execute redis_save_image action synchronously.""" + if not self.redis_manager: + return {'status': 'error', 'message': 'Redis not available'} + + try: + # Get image to save (cropped or full frame) + image_to_save = frame + region_name = action.params.get('region') + + bbox = None + if region_name and region_name in detected_regions: + # Crop the specified region + bbox = detected_regions[region_name]['bbox'] + elif region_name and region_name.lower() == 'frontal' and 'front_rear' in detected_regions: + # Special case: "frontal" region maps to "front_rear" detection + bbox = detected_regions['front_rear']['bbox'] + + if bbox is not None: + x1, y1, x2, y2 = [int(coord) for coord in bbox] + cropped = frame[y1:y2, x1:x2] + if cropped.size > 0: + image_to_save = cropped + logger.debug(f"Cropped region '{region_name}' for redis_save_image") + else: + logger.warning(f"Empty crop for region '{region_name}', using full frame") + + # Format key with context + key = action.params['key'].format(**context) + + # Convert image to bytes + import cv2 + image_format = action.params.get('format', 'jpeg') + quality = action.params.get('quality', 90) + + if image_format.lower() == 'jpeg': + encode_param = [cv2.IMWRITE_JPEG_QUALITY, quality] + _, image_bytes = cv2.imencode('.jpg', image_to_save, encode_param) + else: + _, image_bytes = cv2.imencode('.png', image_to_save) + + # Save to Redis synchronously using a sync Redis client + try: + import redis + import cv2 + + # Create a synchronous Redis client with same connection details + sync_redis = redis.Redis( + host=self.redis_manager.host, + port=self.redis_manager.port, + password=self.redis_manager.password, + db=self.redis_manager.db, + decode_responses=False, # We're storing binary data + socket_timeout=self.redis_manager.socket_timeout, + socket_connect_timeout=self.redis_manager.socket_connect_timeout + ) + + # Encode the image + if image_format.lower() == 'jpeg': + encode_param = [cv2.IMWRITE_JPEG_QUALITY, quality] + success, encoded_image = cv2.imencode('.jpg', image_to_save, encode_param) + else: + success, encoded_image = cv2.imencode('.png', image_to_save) + + if not success: + return {'status': 'error', 'message': 'Failed to encode image'} + + # Save to Redis with expiration + expire_seconds = action.params.get('expire_seconds', 600) + result = sync_redis.setex(key, expire_seconds, encoded_image.tobytes()) + + sync_redis.close() # Clean up connection + + if result: + # Add image_key to context for subsequent actions + context['image_key'] = key + return {'status': 'success', 'key': key} + else: + return {'status': 'error', 'message': 'Failed to save image to Redis'} + + except Exception as redis_error: + logger.error(f"Error calling Redis from sync context: {redis_error}") + return {'status': 'error', 'message': f'Redis operation failed: {redis_error}'} + + except Exception as e: + logger.error(f"Error in redis_save_image action: {e}", exc_info=True) + return {'status': 'error', 'message': str(e)} + + def _execute_redis_publish_sync(self, action: Dict, context: Dict[str, Any]) -> Dict[str, Any]: + """Execute redis_publish action synchronously.""" + if not self.redis_manager: + return {'status': 'error', 'message': 'Redis not available'} + + try: + channel = action.params['channel'] + message_template = action.params['message'] + + # Debug the message template + logger.debug(f"Message template: {repr(message_template)}") + logger.debug(f"Context keys: {list(context.keys())}") + + # Format message with context - handle JSON string formatting carefully + # The message template contains JSON which causes issues with .format() + # Use string replacement instead of format to avoid JSON brace conflicts + try: + # Ensure image_key is available for message formatting + if 'image_key' not in context: + context['image_key'] = '' # Default empty value if redis_save_image failed + + # Use string replacement to avoid JSON formatting issues + message = message_template + for key, value in context.items(): + placeholder = '{' + key + '}' + message = message.replace(placeholder, str(value)) + + logger.debug(f"Formatted message using replacement: {message}") + except Exception as e: + logger.error(f"Message formatting failed: {e}") + logger.error(f"Template: {repr(message_template)}") + logger.error(f"Context: {context}") + return {'status': 'error', 'message': f'Message formatting failed: {e}'} + + # Publish message synchronously using a sync Redis client + try: + import redis + + # Create a synchronous Redis client with same connection details + sync_redis = redis.Redis( + host=self.redis_manager.host, + port=self.redis_manager.port, + password=self.redis_manager.password, + db=self.redis_manager.db, + decode_responses=True, # For publishing text messages + socket_timeout=self.redis_manager.socket_timeout, + socket_connect_timeout=self.redis_manager.socket_connect_timeout + ) + + # Publish message + result = sync_redis.publish(channel, message) + sync_redis.close() # Clean up connection + + if result >= 0: # Redis publish returns number of subscribers + return {'status': 'success', 'subscribers': result, 'channel': channel} + else: + return {'status': 'error', 'message': 'Failed to publish message to Redis'} + + except Exception as redis_error: + logger.error(f"Error calling Redis from sync context: {redis_error}") + return {'status': 'error', 'message': f'Redis operation failed: {redis_error}'} + + except Exception as e: + logger.error(f"Error in redis_publish action: {e}", exc_info=True) + return {'status': 'error', 'message': str(e)} + def get_statistics(self) -> Dict[str, Any]: """Get branch processor statistics.""" return { diff --git a/core/detection/pipeline.py b/core/detection/pipeline.py index b52fd45..cfab8dd 100644 --- a/core/detection/pipeline.py +++ b/core/detection/pipeline.py @@ -2,6 +2,7 @@ Detection Pipeline Module. Main detection pipeline orchestration that coordinates detection flow and execution. """ +import asyncio import logging import time import uuid @@ -15,6 +16,7 @@ from ..models.pipeline import PipelineParser from .branches import BranchProcessor from ..storage.redis import RedisManager from ..storage.database import DatabaseManager +from ..storage.license_plate import LicensePlateManager logger = logging.getLogger(__name__) @@ -42,6 +44,7 @@ class DetectionPipeline: self.branch_processor = BranchProcessor(model_manager) self.redis_manager = None self.db_manager = None + self.license_plate_manager = None # Main detection model self.detection_model: Optional[YOLOWrapper] = None @@ -53,6 +56,12 @@ class DetectionPipeline: # Pipeline configuration self.pipeline_config = pipeline_parser.pipeline_config + # SessionId to subscriptionIdentifier mapping + self.session_to_subscription = {} + + # SessionId to processing results mapping (for combining with license plate results) + self.session_processing_results = {} + # Statistics self.stats = { 'detections_processed': 0, @@ -90,6 +99,15 @@ class DetectionPipeline: logger.warning("Failed to create car_frontal_info table") logger.info("Database connection initialized") + # Initialize license plate manager (using same Redis config as main Redis manager) + if self.pipeline_parser.redis_config: + self.license_plate_manager = LicensePlateManager(self.pipeline_parser.redis_config.__dict__) + if not await self.license_plate_manager.initialize(self._on_license_plate_result): + logger.error("Failed to initialize license plate manager") + return False + logger.info("License plate manager initialized") + + # Initialize main detection model if not await self._initialize_detection_model(): logger.error("Failed to initialize detection model") @@ -154,6 +172,193 @@ class DetectionPipeline: logger.error(f"Error initializing detection model: {e}", exc_info=True) return False + async def _on_license_plate_result(self, session_id: str, license_data: Dict[str, Any]): + """ + Callback for handling license plate results from LPR service. + + Args: + session_id: Session identifier + license_data: License plate data including text and confidence + """ + try: + license_text = license_data.get('license_plate_text', '') + confidence = license_data.get('confidence', 0.0) + + logger.info(f"[LICENSE PLATE CALLBACK] Session {session_id}: " + f"text='{license_text}', confidence={confidence:.3f}") + + # Find matching subscriptionIdentifier for this sessionId + subscription_id = self.session_to_subscription.get(session_id) + + if not subscription_id: + logger.warning(f"[LICENSE PLATE] No subscription found for sessionId '{session_id}' (type: {type(session_id)}), cannot send imageDetection") + logger.warning(f"[LICENSE PLATE DEBUG] Current session mappings: {dict(self.session_to_subscription)}") + + # Try to find by type conversion in case of type mismatch + # Try as integer if session_id is string + if isinstance(session_id, str) and session_id.isdigit(): + session_id_int = int(session_id) + subscription_id = self.session_to_subscription.get(session_id_int) + if subscription_id: + logger.info(f"[LICENSE PLATE] Found subscription using int conversion: '{session_id}' -> {session_id_int} -> '{subscription_id}'") + else: + logger.error(f"[LICENSE PLATE] Failed to find subscription with int conversion") + return + # Try as string if session_id is integer + elif isinstance(session_id, int): + session_id_str = str(session_id) + subscription_id = self.session_to_subscription.get(session_id_str) + if subscription_id: + logger.info(f"[LICENSE PLATE] Found subscription using string conversion: {session_id} -> '{session_id_str}' -> '{subscription_id}'") + else: + logger.error(f"[LICENSE PLATE] Failed to find subscription with string conversion") + return + else: + logger.error(f"[LICENSE PLATE] Failed to find subscription with any type conversion") + return + + # Send imageDetection message with license plate data combined with processing results + await self._send_license_plate_message(subscription_id, license_text, confidence, session_id) + + # Update database with license plate information if database manager is available + if self.db_manager and license_text: + success = self.db_manager.execute_update( + table='car_frontal_info', + key_field='session_id', + key_value=session_id, + fields={ + 'license_character': license_text, + 'license_type': 'LPR_detected' # Mark as detected by LPR service + } + ) + if success: + logger.info(f"[LICENSE PLATE] Updated database for session {session_id}") + else: + logger.warning(f"[LICENSE PLATE] Failed to update database for session {session_id}") + + except Exception as e: + logger.error(f"Error in license plate result callback: {e}", exc_info=True) + + + async def _send_license_plate_message(self, subscription_id: str, license_text: str, confidence: float, session_id: str = None): + """ + Send imageDetection message with license plate data plus any available processing results. + + Args: + subscription_id: Subscription identifier to send message to + license_text: License plate text + confidence: License plate confidence score + session_id: Session identifier for looking up processing results + """ + try: + if not self.message_sender: + logger.warning("No message sender configured, cannot send imageDetection") + return + + # Import here to avoid circular imports + from ..communication.models import ImageDetectionMessage, DetectionData + + # Get processing results for this session from stored results + car_brand = None + body_type = None + + # Find session_id from session mappings (we need session_id as key) + session_id_for_lookup = None + + # Try direct lookup first (if session_id is already the right type) + if session_id in self.session_processing_results: + session_id_for_lookup = session_id + else: + # Try to find by type conversion + for stored_session_id in self.session_processing_results.keys(): + if str(stored_session_id) == str(session_id): + session_id_for_lookup = stored_session_id + break + + if session_id_for_lookup and session_id_for_lookup in self.session_processing_results: + branch_results = self.session_processing_results[session_id_for_lookup] + logger.info(f"[LICENSE PLATE] Retrieved processing results for session {session_id_for_lookup}") + + if 'car_brand_cls_v2' in branch_results: + brand_result = branch_results['car_brand_cls_v2'].get('result', {}) + car_brand = brand_result.get('brand') + if 'car_bodytype_cls_v1' in branch_results: + bodytype_result = branch_results['car_bodytype_cls_v1'].get('result', {}) + body_type = bodytype_result.get('body_type') + + # Clean up stored results after use + del self.session_processing_results[session_id_for_lookup] + logger.debug(f"[LICENSE PLATE] Cleaned up stored results for session {session_id_for_lookup}") + else: + logger.warning(f"[LICENSE PLATE] No processing results found for session {session_id}") + + # Create detection data with combined information + detection_data_obj = DetectionData( + detection={ + "carBrand": car_brand, + "carModel": None, + "bodyType": body_type, + "licensePlateText": license_text, + "licensePlateConfidence": confidence + }, + modelId=52, # Default model ID + modelName="yolo11m" # Default model name + ) + + # Create imageDetection message + detection_message = ImageDetectionMessage( + subscriptionIdentifier=subscription_id, + data=detection_data_obj + ) + + # Send message + await self.message_sender(detection_message) + logger.info(f"[COMBINED MESSAGE] Sent imageDetection with brand='{car_brand}', bodyType='{body_type}', license='{license_text}' to '{subscription_id}'") + + except Exception as e: + logger.error(f"Error sending license plate imageDetection message: {e}", exc_info=True) + + async def _send_initial_detection_message(self, subscription_id: str): + """ + Send initial imageDetection message when vehicle is first detected. + + Args: + subscription_id: Subscription identifier to send message to + """ + try: + if not self.message_sender: + logger.warning("No message sender configured, cannot send imageDetection") + return + + # Import here to avoid circular imports + from ..communication.models import ImageDetectionMessage, DetectionData + + # Create detection data with all fields as None (vehicle just detected, no classification yet) + detection_data_obj = DetectionData( + detection={ + "carBrand": None, + "carModel": None, + "bodyType": None, + "licensePlateText": None, + "licensePlateConfidence": None + }, + modelId=52, # Default model ID + modelName="yolo11m" # Default model name + ) + + # Create imageDetection message + detection_message = ImageDetectionMessage( + subscriptionIdentifier=subscription_id, + data=detection_data_obj + ) + + # Send message + await self.message_sender(detection_message) + logger.info(f"[INITIAL DETECTION] Sent imageDetection for vehicle detection to '{subscription_id}'") + + except Exception as e: + logger.error(f"Error sending initial detection imageDetection message: {e}", exc_info=True) + async def execute_detection_phase(self, frame: np.ndarray, display_id: str, @@ -249,21 +454,20 @@ class DetectionPipeline: result['detections'] = valid_detections - # If we have valid detections, send imageDetection message with empty detection + # If we have valid detections, create session and send initial imageDetection if valid_detections: - logger.info(f"Found {len(valid_detections)} valid detections, sending imageDetection message") + logger.info(f"Found {len(valid_detections)} valid detections, storing session mapping") - # Send imageDetection with empty detection data - message_sent = await self._send_image_detection_message( - subscription_id=subscription_id, - detection_context=detection_context - ) - result['message_sent'] = message_sent + # Store mapping from display_id to subscriptionIdentifier (for detection phase) + # Note: We'll store session_id mapping later in processing phase + self.session_to_subscription[display_id] = subscription_id + logger.info(f"[SESSION MAPPING] Stored mapping: displayId '{display_id}' -> subscriptionIdentifier '{subscription_id}'") - if message_sent: - logger.info(f"Detection phase completed - imageDetection message sent for {display_id}") - else: - logger.warning(f"Failed to send imageDetection message for {display_id}") + # Send initial imageDetection message with empty detection data + await self._send_initial_detection_message(subscription_id) + + logger.info(f"Detection phase completed - {len(valid_detections)} detections found for {display_id}") + result['message_sent'] = True else: logger.debug("No valid detections found in detection phase") @@ -341,6 +545,11 @@ class DetectionPipeline: 'confidence': confidence } + # Store session mapping for license plate callback + if session_id: + self.session_to_subscription[session_id] = subscription_id + logger.info(f"[SESSION MAPPING] Stored mapping: sessionId '{session_id}' -> subscriptionIdentifier '{subscription_id}'") + # Initialize database record with session_id if session_id and self.db_manager: success = self.db_manager.insert_initial_detection( @@ -391,6 +600,11 @@ class DetectionPipeline: ) result['actions_executed'].extend(executed_parallel_actions) + # Store processing results for later combination with license plate data + if result['branch_results'] and session_id: + self.session_processing_results[session_id] = result['branch_results'] + logger.info(f"[PROCESSING RESULTS] Stored results for session {session_id} for later combination") + logger.info(f"Processing phase completed for session {session_id}: " f"{len(result['branch_results'])} branches, {len(result['actions_executed'])} actions") @@ -402,57 +616,6 @@ class DetectionPipeline: result['processing_time'] = time.time() - start_time return result - async def _send_image_detection_message(self, - subscription_id: str, - detection_context: Dict[str, Any]) -> bool: - """ - Send imageDetection message with empty detection data to backend. - - Args: - subscription_id: Subscription identifier - detection_context: Detection context data - - Returns: - True if message sent successfully, False otherwise - """ - try: - if not self.message_sender: - logger.warning("No message sender available for imageDetection") - return False - - # Import here to avoid circular imports - from ..communication.messages import create_image_detection - - # Create empty detection data as specified - detection_data = {} - - # Get model info from pipeline configuration - model_id = 52 # Default model ID - model_name = "yolo11m" # Default - - if self.pipeline_config: - model_name = getattr(self.pipeline_config, 'model_id', 'yolo11m') - # Try to extract numeric model ID from pipeline context, fallback to default - if hasattr(self.pipeline_config, 'model_id'): - # For now, use default model ID since pipeline config stores string identifiers - model_id = 52 - - # Create imageDetection message - detection_message = create_image_detection( - subscription_identifier=subscription_id, - detection_data=detection_data, - model_id=model_id, - model_name=model_name - ) - - # Send to backend via WebSocket - await self.message_sender(detection_message) - logger.info(f"[DETECTION PHASE] Sent imageDetection with empty detection: {detection_data}") - return True - - except Exception as e: - logger.error(f"Error sending imageDetection message: {e}", exc_info=True) - return False async def execute_detection(self, frame: np.ndarray, @@ -697,9 +860,9 @@ class DetectionPipeline: if action_type == 'postgresql_update_combined': result = await self._execute_postgresql_update_combined(action, context) - # Send imageDetection message with actual processing results after database update + # Update session state with processing results after database update if result.get('status') == 'success': - await self._send_processing_results_message(context) + await self._update_session_with_processing_results(context) else: logger.warning(f"Unknown parallel action type: {action_type}") result = {'status': 'error', 'message': f'Unknown action type: {action_type}'} @@ -889,76 +1052,49 @@ class DetectionPipeline: logger.error(f"Error resolving field template {template}: {e}") return None - async def _send_processing_results_message(self, context: Dict[str, Any]): + async def _update_session_with_processing_results(self, context: Dict[str, Any]): """ - Send imageDetection message with actual processing results after database update. + Update session state with processing results from branch execution. Args: - context: Detection context containing branch results and subscription info + context: Detection context containing branch results and session info """ try: branch_results = context.get('branch_results', {}) + session_id = context.get('session_id', '') + subscription_id = context.get('subscription_id', '') - # Extract detection results from branch results - detection_data = { - "carBrand": None, - "carModel": None, - "bodyType": None, - "licensePlateText": None, - "licensePlateConfidence": None - } + if not session_id: + logger.warning("No session_id in context for processing results") + return # Extract car brand from car_brand_cls_v2 results + car_brand = None if 'car_brand_cls_v2' in branch_results: brand_result = branch_results['car_brand_cls_v2'].get('result', {}) - detection_data["carBrand"] = brand_result.get('brand') + car_brand = brand_result.get('brand') # Extract body type from car_bodytype_cls_v1 results + body_type = None if 'car_bodytype_cls_v1' in branch_results: bodytype_result = branch_results['car_bodytype_cls_v1'].get('result', {}) - detection_data["bodyType"] = bodytype_result.get('body_type') + body_type = bodytype_result.get('body_type') - # Create detection message - subscription_id = context.get('subscription_id', '') - # Get the actual numeric model ID from context - model_id_value = context.get('model_id', 52) - if isinstance(model_id_value, str): - try: - model_id_value = int(model_id_value) - except (ValueError, TypeError): - model_id_value = 52 - model_name = str(getattr(self.pipeline_config, 'model_id', 'unknown')) - - logger.debug(f"Creating DetectionData with modelId={model_id_value}, modelName='{model_name}'") - - from core.communication.models import ImageDetectionMessage, DetectionData - detection_data_obj = DetectionData( - detection=detection_data, - modelId=model_id_value, - modelName=model_name - ) - detection_message = ImageDetectionMessage( - subscriptionIdentifier=subscription_id, - data=detection_data_obj - ) - - # Send to backend via WebSocket - if self.message_sender: - await self.message_sender(detection_message) - logger.info(f"[RESULTS] Sent imageDetection with processing results: {detection_data}") - else: - logger.warning("No message sender available for processing results") + logger.info(f"[PROCESSING RESULTS] Completed for session {session_id}: " + f"brand={car_brand}, bodyType={body_type}") except Exception as e: - logger.error(f"Error sending processing results message: {e}", exc_info=True) + logger.error(f"Error updating session with processing results: {e}", exc_info=True) def get_statistics(self) -> Dict[str, Any]: """Get detection pipeline statistics.""" branch_stats = self.branch_processor.get_statistics() if self.branch_processor else {} + license_stats = self.license_plate_manager.get_statistics() if self.license_plate_manager else {} return { 'pipeline': self.stats, 'branches': branch_stats, + 'license_plate': license_stats, 'redis_available': self.redis_manager is not None, 'database_available': self.db_manager is not None, 'detection_model_loaded': self.detection_model is not None @@ -978,4 +1114,7 @@ class DetectionPipeline: if self.branch_processor: self.branch_processor.cleanup() + if self.license_plate_manager: + asyncio.create_task(self.license_plate_manager.close()) + logger.info("Detection pipeline cleaned up") \ No newline at end of file diff --git a/core/storage/license_plate.py b/core/storage/license_plate.py new file mode 100644 index 0000000..b0c7194 --- /dev/null +++ b/core/storage/license_plate.py @@ -0,0 +1,282 @@ +""" +License Plate Manager Module. +Handles Redis subscription to license plate results from LPR service. +""" +import logging +import json +import asyncio +from typing import Dict, Optional, Any, Callable +import redis.asyncio as redis + +logger = logging.getLogger(__name__) + + +class LicensePlateManager: + """ + Manages license plate result subscription from Redis channel. + Subscribes to 'license_results' channel for license plate data from LPR service. + """ + + def __init__(self, redis_config: Dict[str, Any]): + """ + Initialize license plate manager with Redis configuration. + + Args: + redis_config: Redis configuration dictionary + """ + self.config = redis_config + self.redis_client: Optional[redis.Redis] = None + self.pubsub = None + self.subscription_task = None + self.callback = None + + # Connection parameters + self.host = redis_config.get('host', 'localhost') + self.port = redis_config.get('port', 6379) + self.password = redis_config.get('password') + self.db = redis_config.get('db', 0) + + # License plate data cache - store recent results by session_id + self.license_plate_cache: Dict[str, Dict[str, Any]] = {} + self.cache_ttl = 300 # 5 minutes TTL for cached results + + logger.info(f"LicensePlateManager initialized for {self.host}:{self.port}") + + async def initialize(self, callback: Optional[Callable] = None) -> bool: + """ + Initialize Redis connection and start subscription to license_results channel. + + Args: + callback: Optional callback function for processing license plate results + + Returns: + True if successful, False otherwise + """ + try: + # Create Redis connection + self.redis_client = redis.Redis( + host=self.host, + port=self.port, + password=self.password, + db=self.db, + decode_responses=True + ) + + # Test connection + await self.redis_client.ping() + logger.info(f"Connected to Redis for license plate subscription") + + # Set callback + self.callback = callback + + # Start subscription + await self._start_subscription() + + return True + + except Exception as e: + logger.error(f"Failed to initialize license plate manager: {e}", exc_info=True) + return False + + async def _start_subscription(self): + """Start Redis subscription to license_results channel.""" + try: + if not self.redis_client: + logger.error("Redis client not initialized") + return + + # Create pubsub and subscribe + self.pubsub = self.redis_client.pubsub() + await self.pubsub.subscribe('license_results') + + logger.info("Subscribed to Redis channel: license_results") + + # Start listening task + self.subscription_task = asyncio.create_task(self._listen_for_messages()) + + except Exception as e: + logger.error(f"Error starting license plate subscription: {e}", exc_info=True) + + async def _listen_for_messages(self): + """Listen for messages on the license_results channel.""" + try: + if not self.pubsub: + return + + async for message in self.pubsub.listen(): + if message['type'] == 'message': + try: + # Log the raw message from Redis channel + logger.info(f"[LICENSE PLATE RAW] Received from 'license_results' channel: {message['data']}") + + # Parse the license plate result message + data = json.loads(message['data']) + logger.info(f"[LICENSE PLATE PARSED] Parsed JSON data: {data}") + await self._process_license_plate_result(data) + except json.JSONDecodeError as e: + logger.error(f"[LICENSE PLATE ERROR] Invalid JSON in license plate message: {e}") + logger.error(f"[LICENSE PLATE ERROR] Raw message was: {message['data']}") + except Exception as e: + logger.error(f"Error processing license plate message: {e}", exc_info=True) + + except asyncio.CancelledError: + logger.info("License plate subscription task cancelled") + except Exception as e: + logger.error(f"Error in license plate message listener: {e}", exc_info=True) + + async def _process_license_plate_result(self, data: Dict[str, Any]): + """ + Process incoming license plate result from LPR service. + + Expected message format (from actual LPR service): + { + "session_id": "511", + "license_character": "ข3184" + } + or + { + "session_id": "508", + "display_id": "test3", + "license_plate_text": "ABC-123", + "confidence": 0.95, + "timestamp": "2025-09-24T21:10:00Z" + } + + Args: + data: License plate result data + """ + try: + session_id = data.get('session_id') + if not session_id: + logger.warning("License plate result missing session_id") + return + + # Handle different message formats + # Format 1: {"session_id": "511", "license_character": "ข3184"} + # Format 2: {"session_id": "508", "license_plate_text": "ABC-123", "confidence": 0.95, ...} + license_plate_text = data.get('license_plate_text') or data.get('license_character') + confidence = data.get('confidence', 1.0) # Default confidence for LPR service results + display_id = data.get('display_id', '') + timestamp = data.get('timestamp', '') + + logger.info(f"[LICENSE PLATE] Received result for session {session_id}: " + f"text='{license_plate_text}', confidence={confidence:.3f}") + + # Store in cache + self.license_plate_cache[session_id] = { + 'license_plate_text': license_plate_text, + 'confidence': confidence, + 'display_id': display_id, + 'timestamp': timestamp, + 'received_at': asyncio.get_event_loop().time() + } + + # Call callback if provided + if self.callback: + await self.callback(session_id, { + 'license_plate_text': license_plate_text, + 'confidence': confidence, + 'display_id': display_id, + 'timestamp': timestamp + }) + + except Exception as e: + logger.error(f"Error processing license plate result: {e}", exc_info=True) + + def get_license_plate_result(self, session_id: str) -> Optional[Dict[str, Any]]: + """ + Get cached license plate result for a session. + + Args: + session_id: Session identifier + + Returns: + License plate result dictionary or None if not found + """ + if session_id not in self.license_plate_cache: + return None + + result = self.license_plate_cache[session_id] + + # Check TTL + current_time = asyncio.get_event_loop().time() + if current_time - result.get('received_at', 0) > self.cache_ttl: + # Expired, remove from cache + del self.license_plate_cache[session_id] + return None + + return { + 'license_plate_text': result.get('license_plate_text'), + 'confidence': result.get('confidence'), + 'display_id': result.get('display_id'), + 'timestamp': result.get('timestamp') + } + + def cleanup_expired_results(self): + """Remove expired license plate results from cache.""" + try: + current_time = asyncio.get_event_loop().time() + expired_sessions = [] + + for session_id, result in self.license_plate_cache.items(): + if current_time - result.get('received_at', 0) > self.cache_ttl: + expired_sessions.append(session_id) + + for session_id in expired_sessions: + del self.license_plate_cache[session_id] + logger.debug(f"Removed expired license plate result for session {session_id}") + + except Exception as e: + logger.error(f"Error cleaning up expired license plate results: {e}", exc_info=True) + + async def close(self): + """Close Redis connection and cleanup resources.""" + try: + # Cancel subscription task first + if self.subscription_task and not self.subscription_task.done(): + self.subscription_task.cancel() + try: + await self.subscription_task + except asyncio.CancelledError: + logger.debug("License plate subscription task cancelled successfully") + except Exception as e: + logger.warning(f"Error waiting for subscription task cancellation: {e}") + + # Close pubsub connection properly + if self.pubsub: + try: + # First unsubscribe from channels + await self.pubsub.unsubscribe('license_results') + # Then close the pubsub connection + await self.pubsub.aclose() + except Exception as e: + logger.warning(f"Error closing pubsub connection: {e}") + finally: + self.pubsub = None + + # Close Redis connection + if self.redis_client: + try: + await self.redis_client.aclose() + except Exception as e: + logger.warning(f"Error closing Redis connection: {e}") + finally: + self.redis_client = None + + # Clear cache + self.license_plate_cache.clear() + + logger.info("License plate manager closed successfully") + + except Exception as e: + logger.error(f"Error closing license plate manager: {e}", exc_info=True) + + def get_statistics(self) -> Dict[str, Any]: + """Get license plate manager statistics.""" + return { + 'cached_results': len(self.license_plate_cache), + 'connected': self.redis_client is not None, + 'subscribed': self.pubsub is not None, + 'host': self.host, + 'port': self.port + } \ No newline at end of file