diff --git a/core/detection/branches.py b/core/detection/branches.py index 61b6dbb..89881b2 100644 --- a/core/detection/branches.py +++ b/core/detection/branches.py @@ -43,7 +43,7 @@ class BranchProcessor: # Storage managers (set during initialization) self.redis_manager = None - self.db_manager = None + # self.db_manager = None # Disabled - PostgreSQL operations moved to microservices # Branch execution timeout (seconds) self.branch_timeout = 30.0 @@ -60,21 +60,21 @@ class BranchProcessor: logger.info("BranchProcessor initialized") - async def initialize(self, pipeline_config: Any, redis_manager: Any, db_manager: Any) -> bool: + async def initialize(self, pipeline_config: Any, redis_manager: Any, db_manager: Any = None) -> bool: """ Initialize branch processor with pipeline configuration. Args: pipeline_config: Pipeline configuration object redis_manager: Redis manager instance - db_manager: Database manager instance + db_manager: Database manager instance (deprecated, not used) Returns: True if successful, False otherwise """ try: self.redis_manager = redis_manager - self.db_manager = db_manager + # self.db_manager = db_manager # Disabled - PostgreSQL operations moved to microservices # Parse field mappings from parallelActions to enable dynamic field extraction self._parse_branch_output_fields(pipeline_config) @@ -170,22 +170,25 @@ class BranchProcessor: return for action in pipeline_config.parallel_actions: + # Skip PostgreSQL actions - they are disabled if action.type.value == 'postgresql_update_combined': - fields = action.params.get('fields', {}) - - # Parse each field template to extract branch_id and field_name - for db_field_name, template in fields.items(): - # Template format: "{branch_id.field_name}" - if template.startswith('{') and template.endswith('}'): - var_name = template[1:-1] # Remove { } - - if '.' in var_name: - branch_id, field_name = var_name.split('.', 1) - - # Store the mapping - self.branch_output_fields[branch_id] = field_name - - logger.info(f"[FIELD MAPPING] Branch '{branch_id}' → outputs field '{field_name}'") + logger.debug(f"[FIELD MAPPING] Skipping PostgreSQL action (disabled)") + continue # Skip field parsing for disabled PostgreSQL operations + # fields = action.params.get('fields', {}) + # + # # Parse each field template to extract branch_id and field_name + # for db_field_name, template in fields.items(): + # # Template format: "{branch_id.field_name}" + # if template.startswith('{') and template.endswith('}'): + # var_name = template[1:-1] # Remove { } + # + # if '.' in var_name: + # branch_id, field_name = var_name.split('.', 1) + # + # # Store the mapping + # self.branch_output_fields[branch_id] = field_name + # + # logger.info(f"[FIELD MAPPING] Branch '{branch_id}' → outputs field '{field_name}'") logger.info(f"[FIELD MAPPING] Parsed {len(self.branch_output_fields)} branch output field mappings") diff --git a/core/detection/pipeline.py b/core/detection/pipeline.py index d71f525..26654cc 100644 --- a/core/detection/pipeline.py +++ b/core/detection/pipeline.py @@ -15,7 +15,7 @@ from ..models.inference import YOLOWrapper from ..models.pipeline import PipelineParser from .branches import BranchProcessor from ..storage.redis import RedisManager -from ..storage.database import DatabaseManager +# from ..storage.database import DatabaseManager # Disabled - PostgreSQL moved to microservices from ..storage.license_plate import LicensePlateManager logger = logging.getLogger(__name__) @@ -45,7 +45,7 @@ class DetectionPipeline: # Initialize components self.branch_processor = BranchProcessor(model_manager, model_id) self.redis_manager = None - self.db_manager = None + # self.db_manager = None # Disabled - PostgreSQL operations moved to microservices self.license_plate_manager = None # Main detection model @@ -113,16 +113,18 @@ class DetectionPipeline: return False logger.info("Redis connection initialized") - # Initialize database connection - if self.pipeline_parser.postgresql_config: - self.db_manager = DatabaseManager(self.pipeline_parser.postgresql_config.__dict__) - if not self.db_manager.connect(): - logger.error("Failed to initialize database connection") - return False - # Create required tables - if not self.db_manager.create_car_frontal_info_table(): - logger.warning("Failed to create car_frontal_info table") - logger.info("Database connection initialized") + # PostgreSQL database connection DISABLED - operations moved to microservices + # Database operations are now handled by backend services via WebSocket + # if self.pipeline_parser.postgresql_config: + # self.db_manager = DatabaseManager(self.pipeline_parser.postgresql_config.__dict__) + # if not self.db_manager.connect(): + # logger.error("Failed to initialize database connection") + # return False + # # Create required tables + # if not self.db_manager.create_car_frontal_info_table(): + # logger.warning("Failed to create car_frontal_info table") + # logger.info("Database connection initialized") + logger.info("PostgreSQL operations disabled - using WebSocket for data communication") # Initialize license plate manager (using same Redis config as main Redis manager) if self.pipeline_parser.redis_config: @@ -138,11 +140,11 @@ class DetectionPipeline: logger.error("Failed to initialize detection model") return False - # Initialize branch processor + # Initialize branch processor (db_manager=None since PostgreSQL is disabled) if not await self.branch_processor.initialize( self.pipeline_config, self.redis_manager, - self.db_manager + db_manager=None # PostgreSQL disabled ): logger.error("Failed to initialize branch processor") return False @@ -283,23 +285,25 @@ class DetectionPipeline: return # Send imageDetection message with license plate data combined with processing results + # This is the PRIMARY data flow to backend - WebSocket is critical, keep this! await self._send_license_plate_message(subscription_id, license_text, confidence, session_id) - # Update database with license plate information if database manager is available - if self.db_manager and license_text: - success = self.db_manager.execute_update( - table='car_frontal_info', - key_field='session_id', - key_value=session_id, - fields={ - 'license_character': license_text, - 'license_type': 'LPR_detected' # Mark as detected by LPR service - } - ) - if success: - logger.info(f"[LICENSE PLATE] Updated database for session {session_id}") - else: - logger.warning(f"[LICENSE PLATE] Failed to update database for session {session_id}") + # PostgreSQL database update DISABLED - backend handles data via WebSocket messages + # if self.db_manager and license_text: + # success = self.db_manager.execute_update( + # table='car_frontal_info', + # key_field='session_id', + # key_value=session_id, + # fields={ + # 'license_character': license_text, + # 'license_type': 'LPR_detected' # Mark as detected by LPR service + # } + # ) + # if success: + # logger.info(f"[LICENSE PLATE] Updated database for session {session_id}") + # else: + # logger.warning(f"[LICENSE PLATE] Failed to update database for session {session_id}") + logger.debug(f"[LICENSE PLATE] Data sent via WebSocket for session {session_id}") except Exception as e: logger.error(f"Error in license plate result callback: {e}", exc_info=True) @@ -710,17 +714,18 @@ class DetectionPipeline: self.session_to_subscription[session_id] = subscription_id logger.info(f"[SESSION MAPPING] Stored mapping: sessionId '{session_id}' -> subscriptionIdentifier '{subscription_id}'") - # Initialize database record with session_id - if session_id and self.db_manager: - success = self.db_manager.insert_initial_detection( - display_id=display_id, - captured_timestamp=detection_context['timestamp'], - session_id=session_id - ) - if success: - logger.info(f"Created initial database record with session {session_id}") - else: - logger.warning(f"Failed to create initial database record for session {session_id}") + # PostgreSQL database insert DISABLED - backend handles data via WebSocket + # if session_id and self.db_manager: + # success = self.db_manager.insert_initial_detection( + # display_id=display_id, + # captured_timestamp=detection_context['timestamp'], + # session_id=session_id + # ) + # if success: + # logger.info(f"Created initial database record with session {session_id}") + # else: + # logger.warning(f"Failed to create initial database record for session {session_id}") + logger.debug(f"Session {session_id} will be communicated via WebSocket") # Execute branches in parallel if hasattr(self.pipeline_config, 'branches') and self.pipeline_config.branches: @@ -886,15 +891,16 @@ class DetectionPipeline: if valid_detections: logger.info(f"Found {len(valid_detections)} valid detections for pipeline processing") - # Initialize database record if session_id is provided - if session_id and self.db_manager: - success = self.db_manager.insert_initial_detection( - display_id=display_id, - captured_timestamp=detection_context['timestamp'], - session_id=session_id - ) - if not success: - logger.warning(f"Failed to create initial database record for session {session_id}") + # PostgreSQL database insert DISABLED - backend handles data via WebSocket + # if session_id and self.db_manager: + # success = self.db_manager.insert_initial_detection( + # display_id=display_id, + # captured_timestamp=detection_context['timestamp'], + # session_id=session_id + # ) + # if not success: + # logger.warning(f"Failed to create initial database record for session {session_id}") + logger.debug(f"Detection results for session {session_id} will be sent via WebSocket") # Execute branches in parallel if hasattr(self.pipeline_config, 'branches') and self.pipeline_config.branches: @@ -1025,11 +1031,16 @@ class DetectionPipeline: logger.debug(f"Executing parallel action: {action_type}") if action_type == 'postgresql_update_combined': - result = await self._execute_postgresql_update_combined(action, context) + # PostgreSQL action SKIPPED - database operations disabled + logger.info(f"Skipping PostgreSQL action '{action_type}' (disabled)") + result = {'status': 'skipped', 'message': 'PostgreSQL operations disabled'} - # Update session state with processing results after database update - if result.get('status') == 'success': - await self._update_session_with_processing_results(context) + # Still update session state for WebSocket messaging + await self._update_session_with_processing_results(context) + + # result = await self._execute_postgresql_update_combined(action, context) + # if result.get('status') == 'success': + # await self._update_session_with_processing_results(context) else: logger.warning(f"Unknown parallel action type: {action_type}") result = {'status': 'error', 'message': f'Unknown action type: {action_type}'} @@ -1132,59 +1143,61 @@ class DetectionPipeline: logger.error(f"Error in redis_publish action: {e}", exc_info=True) return {'status': 'error', 'message': str(e)} - async def _execute_postgresql_update_combined(self, - action: Dict, - context: Dict[str, Any]) -> Dict[str, Any]: - """Execute postgresql_update_combined action.""" - if not self.db_manager: - return {'status': 'error', 'message': 'Database not available'} - - try: - # Wait for required branches if specified - wait_for_branches = action.params.get('waitForBranches', []) - branch_results = context.get('branch_results', {}) - - # Log missing branches but don't block the update (allow partial results) - missing_branches = [b for b in wait_for_branches if b not in branch_results] - if missing_branches: - logger.warning(f"Some branches missing from results (will use null): {missing_branches}") - available_branches = [b for b in wait_for_branches if b in branch_results] - if available_branches: - logger.info(f"Available branches for database update: {available_branches}") - - # Prepare fields for database update - table = action.params.get('table', 'car_frontal_info') - key_field = action.params.get('key_field', 'session_id') - key_value = action.params.get('key_value', '{session_id}').format(**context) - field_mappings = action.params.get('fields', {}) - - # Resolve field values using branch results - resolved_fields = {} - for field_name, field_template in field_mappings.items(): - try: - # Replace template variables with actual values from branch results - resolved_value = self._resolve_field_template(field_template, branch_results, context) - resolved_fields[field_name] = resolved_value - except Exception as e: - logger.warning(f"Failed to resolve field {field_name}: {e}") - resolved_fields[field_name] = None - - # Execute database update - success = self.db_manager.execute_update( - table=table, - key_field=key_field, - key_value=key_value, - fields=resolved_fields - ) - - if success: - return {'status': 'success', 'table': table, 'key': f'{key_field}={key_value}', 'fields': resolved_fields} - else: - return {'status': 'error', 'message': 'Database update failed'} - - except Exception as e: - logger.error(f"Error in postgresql_update_combined action: {e}", exc_info=True) - return {'status': 'error', 'message': str(e)} + # PostgreSQL update method DISABLED - database operations moved to microservices + # This method is no longer used as data flows via WebSocket messages to backend + # async def _execute_postgresql_update_combined(self, + # action: Dict, + # context: Dict[str, Any]) -> Dict[str, Any]: + # """Execute postgresql_update_combined action.""" + # if not self.db_manager: + # return {'status': 'error', 'message': 'Database not available'} + # + # try: + # # Wait for required branches if specified + # wait_for_branches = action.params.get('waitForBranches', []) + # branch_results = context.get('branch_results', {}) + # + # # Log missing branches but don't block the update (allow partial results) + # missing_branches = [b for b in wait_for_branches if b not in branch_results] + # if missing_branches: + # logger.warning(f"Some branches missing from results (will use null): {missing_branches}") + # available_branches = [b for b in wait_for_branches if b in branch_results] + # if available_branches: + # logger.info(f"Available branches for database update: {available_branches}") + # + # # Prepare fields for database update + # table = action.params.get('table', 'car_frontal_info') + # key_field = action.params.get('key_field', 'session_id') + # key_value = action.params.get('key_value', '{session_id}').format(**context) + # field_mappings = action.params.get('fields', {}) + # + # # Resolve field values using branch results + # resolved_fields = {} + # for field_name, field_template in field_mappings.items(): + # try: + # # Replace template variables with actual values from branch results + # resolved_value = self._resolve_field_template(field_template, branch_results, context) + # resolved_fields[field_name] = resolved_value + # except Exception as e: + # logger.warning(f"Failed to resolve field {field_name}: {e}") + # resolved_fields[field_name] = None + # + # # Execute database update + # success = self.db_manager.execute_update( + # table=table, + # key_field=key_field, + # key_value=key_value, + # fields=resolved_fields + # ) + # + # if success: + # return {'status': 'success', 'table': table, 'key': f'{key_field}={key_value}', 'fields': resolved_fields} + # else: + # return {'status': 'error', 'message': 'Database update failed'} + # + # except Exception as e: + # logger.error(f"Error in postgresql_update_combined action: {e}", exc_info=True) + # return {'status': 'error', 'message': str(e)} def _resolve_field_template(self, template: str, branch_results: Dict, context: Dict) -> str: """ @@ -1270,7 +1283,7 @@ class DetectionPipeline: 'branches': branch_stats, 'license_plate': license_stats, 'redis_available': self.redis_manager is not None, - 'database_available': self.db_manager is not None, + # 'database_available': self.db_manager is not None, # PostgreSQL disabled 'detection_model_loaded': self.detection_model is not None } @@ -1282,8 +1295,9 @@ class DetectionPipeline: if self.redis_manager: self.redis_manager.cleanup() - if self.db_manager: - self.db_manager.disconnect() + # PostgreSQL disconnect DISABLED - database operations moved to microservices + # if self.db_manager: + # self.db_manager.disconnect() if self.branch_processor: self.branch_processor.cleanup() diff --git a/core/models/__init__.py b/core/models/__init__.py index c817eb2..fa2c71a 100644 --- a/core/models/__init__.py +++ b/core/models/__init__.py @@ -11,7 +11,7 @@ from .pipeline import ( Action, ActionType, RedisConfig, - PostgreSQLConfig + # PostgreSQLConfig # Disabled - moved to microservices ) from .inference import ( YOLOWrapper, @@ -32,7 +32,7 @@ __all__ = [ 'Action', 'ActionType', 'RedisConfig', - 'PostgreSQLConfig', + # 'PostgreSQLConfig', # Disabled - moved to microservices # Inference 'YOLOWrapper', diff --git a/core/models/pipeline.py b/core/models/pipeline.py index de5667b..3ae7463 100644 --- a/core/models/pipeline.py +++ b/core/models/pipeline.py @@ -16,6 +16,8 @@ class ActionType(Enum): """Supported action types in pipeline""" REDIS_SAVE_IMAGE = "redis_save_image" REDIS_PUBLISH = "redis_publish" + # PostgreSQL actions below are DEPRECATED - kept for backward compatibility only + # These actions will be silently skipped during pipeline execution POSTGRESQL_UPDATE = "postgresql_update" POSTGRESQL_UPDATE_COMBINED = "postgresql_update_combined" POSTGRESQL_INSERT = "postgresql_insert" @@ -41,7 +43,15 @@ class RedisConfig: @dataclass class PostgreSQLConfig: - """PostgreSQL connection configuration""" + """ + PostgreSQL connection configuration - DISABLED + + NOTE: This configuration is kept for backward compatibility with existing + pipeline.json files, but PostgreSQL operations are disabled. All database + operations have been moved to microservices architecture. + + This config will be parsed but not used for any database connections. + """ host: str port: int database: str @@ -50,6 +60,7 @@ class PostgreSQLConfig: @classmethod def from_dict(cls, data: Dict[str, Any]) -> 'PostgreSQLConfig': + """Parse PostgreSQL config from dict (kept for backward compatibility)""" return cls( host=data['host'], port=data.get('port', 5432), @@ -272,17 +283,19 @@ class PipelineParser: if not self._validate_actions(self.pipeline_config): return False - # Validate parallel actions + # Validate parallel actions (PostgreSQL actions are skipped) for action in self.pipeline_config.parallel_actions: if action.type == ActionType.POSTGRESQL_UPDATE_COMBINED: - wait_for = action.params.get('waitForBranches', []) - if wait_for: - # Check that referenced branches exist - branch_ids = self._get_all_branch_ids(self.pipeline_config) - for branch_id in wait_for: - if branch_id not in branch_ids: - logger.error(f"Referenced branch '{branch_id}' in waitForBranches not found") - return False + logger.warning(f"PostgreSQL parallel action {action.type.value} found but will be SKIPPED (PostgreSQL disabled)") + # Skip validation for PostgreSQL actions since they won't be executed + # wait_for = action.params.get('waitForBranches', []) + # if wait_for: + # # Check that referenced branches exist + # branch_ids = self._get_all_branch_ids(self.pipeline_config) + # for branch_id in wait_for: + # if branch_id not in branch_ids: + # logger.error(f"Referenced branch '{branch_id}' in waitForBranches not found") + # return False logger.info("Pipeline configuration validated successfully") return True @@ -305,11 +318,14 @@ class PipelineParser: logger.error(f"Action {action.type} requires Redis configuration") return False - # Validate PostgreSQL actions need PostgreSQL config + # PostgreSQL actions are disabled - log warning instead of failing + # Kept for backward compatibility with existing pipeline.json files if action.type in [ActionType.POSTGRESQL_UPDATE, ActionType.POSTGRESQL_UPDATE_COMBINED, ActionType.POSTGRESQL_INSERT]: - if not self.postgresql_config: - logger.error(f"Action {action.type} requires PostgreSQL configuration") - return False + logger.warning(f"PostgreSQL action {action.type.value} found but will be SKIPPED (PostgreSQL disabled)") + # Do not fail validation - just skip these actions during execution + # if not self.postgresql_config: + # logger.error(f"Action {action.type} requires PostgreSQL configuration") + # return False # Recursively validate branches if hasattr(config, 'branches'): diff --git a/core/storage/__init__.py b/core/storage/__init__.py index 973837a..b2ff324 100644 --- a/core/storage/__init__.py +++ b/core/storage/__init__.py @@ -1,10 +1,13 @@ """ Storage module for the Python Detector Worker. -This module provides Redis and PostgreSQL operations for data persistence +This module provides Redis operations for data persistence and caching in the detection pipeline. + +Note: PostgreSQL operations have been disabled as database functionality +has been moved to microservices architecture. """ from .redis import RedisManager -from .database import DatabaseManager +# from .database import DatabaseManager # Disabled - moved to microservices -__all__ = ['RedisManager', 'DatabaseManager'] \ No newline at end of file +__all__ = ['RedisManager'] # Removed 'DatabaseManager' \ No newline at end of file diff --git a/core/storage/database.py b/core/storage/database.py index a90df97..4715768 100644 --- a/core/storage/database.py +++ b/core/storage/database.py @@ -1,357 +1,369 @@ """ -Database Operations Module. -Handles PostgreSQL operations for the detection pipeline. +Database Operations Module - DISABLED + +NOTE: This module has been disabled as PostgreSQL database operations have been +moved to microservices architecture. All database connections, reads, and writes +are now handled by separate backend services. + +The detection worker now communicates results via: +- WebSocket imageDetection messages (primary data flow to backend) +- Redis image storage and pub/sub (temporary storage) + +Original functionality: PostgreSQL operations for the detection pipeline. +Status: Commented out - DO NOT ENABLE without updating architecture """ -import psycopg2 -import psycopg2.extras + +# All PostgreSQL functionality below has been commented out +# import psycopg2 +# import psycopg2.extras from typing import Optional, Dict, Any import logging -import uuid +# import uuid logger = logging.getLogger(__name__) - -class DatabaseManager: - """ - Manages PostgreSQL connections and operations for the detection pipeline. - Handles database operations and schema management. - """ - - def __init__(self, config: Dict[str, Any]): - """ - Initialize database manager with configuration. - - Args: - config: Database configuration dictionary - """ - self.config = config - self.connection: Optional[psycopg2.extensions.connection] = None - - def connect(self) -> bool: - """ - Connect to PostgreSQL database. - - Returns: - True if successful, False otherwise - """ - try: - self.connection = psycopg2.connect( - host=self.config['host'], - port=self.config['port'], - database=self.config['database'], - user=self.config['username'], - password=self.config['password'] - ) - logger.info("PostgreSQL connection established successfully") - return True - except Exception as e: - logger.error(f"Failed to connect to PostgreSQL: {e}") - return False - - def disconnect(self): - """Disconnect from PostgreSQL database.""" - if self.connection: - self.connection.close() - self.connection = None - logger.info("PostgreSQL connection closed") - - def is_connected(self) -> bool: - """ - Check if database connection is active. - - Returns: - True if connected, False otherwise - """ - try: - if self.connection and not self.connection.closed: - cur = self.connection.cursor() - cur.execute("SELECT 1") - cur.fetchone() - cur.close() - return True - except: - pass - return False - - def update_car_info(self, session_id: str, brand: str, model: str, body_type: str) -> bool: - """ - Update car information in the database. - - Args: - session_id: Session identifier - brand: Car brand - model: Car model - body_type: Car body type - - Returns: - True if successful, False otherwise - """ - if not self.is_connected(): - if not self.connect(): - return False - - try: - cur = self.connection.cursor() - query = """ - INSERT INTO car_frontal_info (session_id, car_brand, car_model, car_body_type, updated_at) - VALUES (%s, %s, %s, %s, NOW()) - ON CONFLICT (session_id) - DO UPDATE SET - car_brand = EXCLUDED.car_brand, - car_model = EXCLUDED.car_model, - car_body_type = EXCLUDED.car_body_type, - updated_at = NOW() - """ - cur.execute(query, (session_id, brand, model, body_type)) - self.connection.commit() - cur.close() - logger.info(f"Updated car info for session {session_id}: {brand} {model} ({body_type})") - return True - except Exception as e: - logger.error(f"Failed to update car info: {e}") - if self.connection: - self.connection.rollback() - return False - - def execute_update(self, table: str, key_field: str, key_value: str, fields: Dict[str, str]) -> bool: - """ - Execute a dynamic update query on the database. - - Args: - table: Table name - key_field: Primary key field name - key_value: Primary key value - fields: Dictionary of fields to update - - Returns: - True if successful, False otherwise - """ - if not self.is_connected(): - if not self.connect(): - return False - - try: - cur = self.connection.cursor() - - # Build the UPDATE query dynamically - set_clauses = [] - values = [] - - for field, value in fields.items(): - if value == "NOW()": - set_clauses.append(f"{field} = NOW()") - else: - set_clauses.append(f"{field} = %s") - values.append(value) - - # Add schema prefix if table doesn't already have it - full_table_name = table if '.' in table else f"gas_station_1.{table}" - - query = f""" - INSERT INTO {full_table_name} ({key_field}, {', '.join(fields.keys())}) - VALUES (%s, {', '.join(['%s'] * len(fields))}) - ON CONFLICT ({key_field}) - DO UPDATE SET {', '.join(set_clauses)} - """ - - # Add key_value to the beginning of values list - all_values = [key_value] + list(fields.values()) + values - - cur.execute(query, all_values) - self.connection.commit() - cur.close() - logger.info(f"Updated {table} for {key_field}={key_value}") - return True - except Exception as e: - logger.error(f"Failed to execute update on {table}: {e}") - if self.connection: - self.connection.rollback() - return False - - def create_car_frontal_info_table(self) -> bool: - """ - Create the car_frontal_info table in gas_station_1 schema if it doesn't exist. - - Returns: - True if successful, False otherwise - """ - if not self.is_connected(): - if not self.connect(): - return False - - try: - # Since the database already exists, just verify connection - cur = self.connection.cursor() - - # Simple verification that the table exists - cur.execute(""" - SELECT EXISTS ( - SELECT FROM information_schema.tables - WHERE table_schema = 'gas_station_1' - AND table_name = 'car_frontal_info' - ) - """) - - table_exists = cur.fetchone()[0] - cur.close() - - if table_exists: - logger.info("Verified car_frontal_info table exists") - return True - else: - logger.error("car_frontal_info table does not exist in the database") - return False - - except Exception as e: - logger.error(f"Failed to create car_frontal_info table: {e}") - if self.connection: - self.connection.rollback() - return False - - def insert_initial_detection(self, display_id: str, captured_timestamp: str, session_id: str = None) -> str: - """ - Insert initial detection record and return the session_id. - - Args: - display_id: Display identifier - captured_timestamp: Timestamp of the detection - session_id: Optional session ID, generates one if not provided - - Returns: - Session ID string or None on error - """ - if not self.is_connected(): - if not self.connect(): - return None - - # Generate session_id if not provided - if not session_id: - session_id = str(uuid.uuid4()) - - try: - # Ensure table exists - if not self.create_car_frontal_info_table(): - logger.error("Failed to create/verify table before insertion") - return None - - cur = self.connection.cursor() - insert_query = """ - INSERT INTO gas_station_1.car_frontal_info - (display_id, captured_timestamp, session_id, license_character, license_type, car_brand, car_model, car_body_type) - VALUES (%s, %s, %s, NULL, 'No model available', NULL, NULL, NULL) - ON CONFLICT (session_id) DO NOTHING - """ - - cur.execute(insert_query, (display_id, captured_timestamp, session_id)) - self.connection.commit() - cur.close() - logger.info(f"Inserted initial detection record with session_id: {session_id}") - return session_id - - except Exception as e: - logger.error(f"Failed to insert initial detection record: {e}") - if self.connection: - self.connection.rollback() - return None - - def get_session_info(self, session_id: str) -> Optional[Dict[str, Any]]: - """ - Get session information from the database. - - Args: - session_id: Session identifier - - Returns: - Dictionary with session data or None if not found - """ - if not self.is_connected(): - if not self.connect(): - return None - - try: - cur = self.connection.cursor(cursor_factory=psycopg2.extras.RealDictCursor) - query = "SELECT * FROM gas_station_1.car_frontal_info WHERE session_id = %s" - cur.execute(query, (session_id,)) - result = cur.fetchone() - cur.close() - - if result: - return dict(result) - else: - logger.debug(f"No session info found for session_id: {session_id}") - return None - - except Exception as e: - logger.error(f"Failed to get session info: {e}") - return None - - def delete_session(self, session_id: str) -> bool: - """ - Delete session record from the database. - - Args: - session_id: Session identifier - - Returns: - True if successful, False otherwise - """ - if not self.is_connected(): - if not self.connect(): - return False - - try: - cur = self.connection.cursor() - query = "DELETE FROM gas_station_1.car_frontal_info WHERE session_id = %s" - cur.execute(query, (session_id,)) - rows_affected = cur.rowcount - self.connection.commit() - cur.close() - - if rows_affected > 0: - logger.info(f"Deleted session record: {session_id}") - return True - else: - logger.warning(f"No session record found to delete: {session_id}") - return False - - except Exception as e: - logger.error(f"Failed to delete session: {e}") - if self.connection: - self.connection.rollback() - return False - - def get_statistics(self) -> Dict[str, Any]: - """ - Get database statistics. - - Returns: - Dictionary with database statistics - """ - stats = { - 'connected': self.is_connected(), - 'host': self.config.get('host', 'unknown'), - 'port': self.config.get('port', 'unknown'), - 'database': self.config.get('database', 'unknown') - } - - if self.is_connected(): - try: - cur = self.connection.cursor() - - # Get table record count - cur.execute("SELECT COUNT(*) FROM gas_station_1.car_frontal_info") - stats['total_records'] = cur.fetchone()[0] - - # Get recent records count (last hour) - cur.execute(""" - SELECT COUNT(*) FROM gas_station_1.car_frontal_info - WHERE created_at > NOW() - INTERVAL '1 hour' - """) - stats['recent_records'] = cur.fetchone()[0] - - cur.close() - except Exception as e: - logger.warning(f"Failed to get database statistics: {e}") - stats['error'] = str(e) - - return stats \ No newline at end of file +# DatabaseManager class is disabled - all methods commented out +# class DatabaseManager: +# """ +# Manages PostgreSQL connections and operations for the detection pipeline. +# Handles database operations and schema management. +# """ +# +# def __init__(self, config: Dict[str, Any]): +# """ +# Initialize database manager with configuration. +# +# Args: +# config: Database configuration dictionary +# """ +# self.config = config +# self.connection: Optional[psycopg2.extensions.connection] = None +# +# def connect(self) -> bool: +# """ +# Connect to PostgreSQL database. +# +# Returns: +# True if successful, False otherwise +# """ +# try: +# self.connection = psycopg2.connect( +# host=self.config['host'], +# port=self.config['port'], +# database=self.config['database'], +# user=self.config['username'], +# password=self.config['password'] +# ) +# logger.info("PostgreSQL connection established successfully") +# return True +# except Exception as e: +# logger.error(f"Failed to connect to PostgreSQL: {e}") +# return False +# +# def disconnect(self): +# """Disconnect from PostgreSQL database.""" +# if self.connection: +# self.connection.close() +# self.connection = None +# logger.info("PostgreSQL connection closed") +# +# def is_connected(self) -> bool: +# """ +# Check if database connection is active. +# +# Returns: +# True if connected, False otherwise +# """ +# try: +# if self.connection and not self.connection.closed: +# cur = self.connection.cursor() +# cur.execute("SELECT 1") +# cur.fetchone() +# cur.close() +# return True +# except: +# pass +# return False +# +# def update_car_info(self, session_id: str, brand: str, model: str, body_type: str) -> bool: +# """ +# Update car information in the database. +# +# Args: +# session_id: Session identifier +# brand: Car brand +# model: Car model +# body_type: Car body type +# +# Returns: +# True if successful, False otherwise +# """ +# if not self.is_connected(): +# if not self.connect(): +# return False +# +# try: +# cur = self.connection.cursor() +# query = """ +# INSERT INTO car_frontal_info (session_id, car_brand, car_model, car_body_type, updated_at) +# VALUES (%s, %s, %s, %s, NOW()) +# ON CONFLICT (session_id) +# DO UPDATE SET +# car_brand = EXCLUDED.car_brand, +# car_model = EXCLUDED.car_model, +# car_body_type = EXCLUDED.car_body_type, +# updated_at = NOW() +# """ +# cur.execute(query, (session_id, brand, model, body_type)) +# self.connection.commit() +# cur.close() +# logger.info(f"Updated car info for session {session_id}: {brand} {model} ({body_type})") +# return True +# except Exception as e: +# logger.error(f"Failed to update car info: {e}") +# if self.connection: +# self.connection.rollback() +# return False +# +# def execute_update(self, table: str, key_field: str, key_value: str, fields: Dict[str, str]) -> bool: +# """ +# Execute a dynamic update query on the database. +# +# Args: +# table: Table name +# key_field: Primary key field name +# key_value: Primary key value +# fields: Dictionary of fields to update +# +# Returns: +# True if successful, False otherwise +# """ +# if not self.is_connected(): +# if not self.connect(): +# return False +# +# try: +# cur = self.connection.cursor() +# +# # Build the UPDATE query dynamically +# set_clauses = [] +# values = [] +# +# for field, value in fields.items(): +# if value == "NOW()": +# set_clauses.append(f"{field} = NOW()") +# else: +# set_clauses.append(f"{field} = %s") +# values.append(value) +# +# # Add schema prefix if table doesn't already have it +# full_table_name = table if '.' in table else f"gas_station_1.{table}" +# +# query = f""" +# INSERT INTO {full_table_name} ({key_field}, {', '.join(fields.keys())}) +# VALUES (%s, {', '.join(['%s'] * len(fields))}) +# ON CONFLICT ({key_field}) +# DO UPDATE SET {', '.join(set_clauses)} +# """ +# +# # Add key_value to the beginning of values list +# all_values = [key_value] + list(fields.values()) + values +# +# cur.execute(query, all_values) +# self.connection.commit() +# cur.close() +# logger.info(f"Updated {table} for {key_field}={key_value}") +# return True +# except Exception as e: +# logger.error(f"Failed to execute update on {table}: {e}") +# if self.connection: +# self.connection.rollback() +# return False +# +# def create_car_frontal_info_table(self) -> bool: +# """ +# Create the car_frontal_info table in gas_station_1 schema if it doesn't exist. +# +# Returns: +# True if successful, False otherwise +# """ +# if not self.is_connected(): +# if not self.connect(): +# return False +# +# try: +# # Since the database already exists, just verify connection +# cur = self.connection.cursor() +# +# # Simple verification that the table exists +# cur.execute(""" +# SELECT EXISTS ( +# SELECT FROM information_schema.tables +# WHERE table_schema = 'gas_station_1' +# AND table_name = 'car_frontal_info' +# ) +# """) +# +# table_exists = cur.fetchone()[0] +# cur.close() +# +# if table_exists: +# logger.info("Verified car_frontal_info table exists") +# return True +# else: +# logger.error("car_frontal_info table does not exist in the database") +# return False +# +# except Exception as e: +# logger.error(f"Failed to create car_frontal_info table: {e}") +# if self.connection: +# self.connection.rollback() +# return False +# +# def insert_initial_detection(self, display_id: str, captured_timestamp: str, session_id: str = None) -> str: +# """ +# Insert initial detection record and return the session_id. +# +# Args: +# display_id: Display identifier +# captured_timestamp: Timestamp of the detection +# session_id: Optional session ID, generates one if not provided +# +# Returns: +# Session ID string or None on error +# """ +# if not self.is_connected(): +# if not self.connect(): +# return None +# +# # Generate session_id if not provided +# if not session_id: +# session_id = str(uuid.uuid4()) +# +# try: +# # Ensure table exists +# if not self.create_car_frontal_info_table(): +# logger.error("Failed to create/verify table before insertion") +# return None +# +# cur = self.connection.cursor() +# insert_query = """ +# INSERT INTO gas_station_1.car_frontal_info +# (display_id, captured_timestamp, session_id, license_character, license_type, car_brand, car_model, car_body_type) +# VALUES (%s, %s, %s, NULL, 'No model available', NULL, NULL, NULL) +# ON CONFLICT (session_id) DO NOTHING +# """ +# +# cur.execute(insert_query, (display_id, captured_timestamp, session_id)) +# self.connection.commit() +# cur.close() +# logger.info(f"Inserted initial detection record with session_id: {session_id}") +# return session_id +# +# except Exception as e: +# logger.error(f"Failed to insert initial detection record: {e}") +# if self.connection: +# self.connection.rollback() +# return None +# +# def get_session_info(self, session_id: str) -> Optional[Dict[str, Any]]: +# """ +# Get session information from the database. +# +# Args: +# session_id: Session identifier +# +# Returns: +# Dictionary with session data or None if not found +# """ +# if not self.is_connected(): +# if not self.connect(): +# return None +# +# try: +# cur = self.connection.cursor(cursor_factory=psycopg2.extras.RealDictCursor) +# query = "SELECT * FROM gas_station_1.car_frontal_info WHERE session_id = %s" +# cur.execute(query, (session_id,)) +# result = cur.fetchone() +# cur.close() +# +# if result: +# return dict(result) +# else: +# logger.debug(f"No session info found for session_id: {session_id}") +# return None +# +# except Exception as e: +# logger.error(f"Failed to get session info: {e}") +# return None +# +# def delete_session(self, session_id: str) -> bool: +# """ +# Delete session record from the database. +# +# Args: +# session_id: Session identifier +# +# Returns: +# True if successful, False otherwise +# """ +# if not self.is_connected(): +# if not self.connect(): +# return False +# +# try: +# cur = self.connection.cursor() +# query = "DELETE FROM gas_station_1.car_frontal_info WHERE session_id = %s" +# cur.execute(query, (session_id,)) +# rows_affected = cur.rowcount +# self.connection.commit() +# cur.close() +# +# if rows_affected > 0: +# logger.info(f"Deleted session record: {session_id}") +# return True +# else: +# logger.warning(f"No session record found to delete: {session_id}") +# return False +# +# except Exception as e: +# logger.error(f"Failed to delete session: {e}") +# if self.connection: +# self.connection.rollback() +# return False +# +# def get_statistics(self) -> Dict[str, Any]: +# """ +# Get database statistics. +# +# Returns: +# Dictionary with database statistics +# """ +# stats = { +# 'connected': self.is_connected(), +# 'host': self.config.get('host', 'unknown'), +# 'port': self.config.get('port', 'unknown'), +# 'database': self.config.get('database', 'unknown') +# } +# +# if self.is_connected(): +# try: +# cur = self.connection.cursor() +# +# # Get table record count +# cur.execute("SELECT COUNT(*) FROM gas_station_1.car_frontal_info") +# stats['total_records'] = cur.fetchone()[0] +# +# # Get recent records count (last hour) +# cur.execute(""" +# SELECT COUNT(*) FROM gas_station_1.car_frontal_info +# WHERE created_at > NOW() - INTERVAL '1 hour' +# """) +# stats['recent_records'] = cur.fetchone()[0] +# +# cur.close() +# except Exception as e: +# logger.warning(f"Failed to get database statistics: {e}") +# stats['error'] = str(e) +# +# return stats