""" Database Operations Module. Handles PostgreSQL operations for the detection pipeline. """ import psycopg2 import psycopg2.extras from typing import Optional, Dict, Any import logging import uuid logger = logging.getLogger(__name__) class DatabaseManager: """ Manages PostgreSQL connections and operations for the detection pipeline. Handles database operations and schema management. """ def __init__(self, config: Dict[str, Any]): """ Initialize database manager with configuration. Args: config: Database configuration dictionary """ self.config = config self.connection: Optional[psycopg2.extensions.connection] = None def connect(self) -> bool: """ Connect to PostgreSQL database. Returns: True if successful, False otherwise """ try: self.connection = psycopg2.connect( host=self.config['host'], port=self.config['port'], database=self.config['database'], user=self.config['username'], password=self.config['password'] ) logger.info("PostgreSQL connection established successfully") return True except Exception as e: logger.error(f"Failed to connect to PostgreSQL: {e}") return False def disconnect(self): """Disconnect from PostgreSQL database.""" if self.connection: self.connection.close() self.connection = None logger.info("PostgreSQL connection closed") def is_connected(self) -> bool: """ Check if database connection is active. Returns: True if connected, False otherwise """ try: if self.connection and not self.connection.closed: cur = self.connection.cursor() cur.execute("SELECT 1") cur.fetchone() cur.close() return True except: pass return False def update_car_info(self, session_id: str, brand: str, model: str, body_type: str) -> bool: """ Update car information in the database. Args: session_id: Session identifier brand: Car brand model: Car model body_type: Car body type Returns: True if successful, False otherwise """ if not self.is_connected(): if not self.connect(): return False try: cur = self.connection.cursor() query = """ INSERT INTO car_frontal_info (session_id, car_brand, car_model, car_body_type, updated_at) VALUES (%s, %s, %s, %s, NOW()) ON CONFLICT (session_id) DO UPDATE SET car_brand = EXCLUDED.car_brand, car_model = EXCLUDED.car_model, car_body_type = EXCLUDED.car_body_type, updated_at = NOW() """ cur.execute(query, (session_id, brand, model, body_type)) self.connection.commit() cur.close() logger.info(f"Updated car info for session {session_id}: {brand} {model} ({body_type})") return True except Exception as e: logger.error(f"Failed to update car info: {e}") if self.connection: self.connection.rollback() return False def execute_update(self, table: str, key_field: str, key_value: str, fields: Dict[str, str]) -> bool: """ Execute a dynamic update query on the database. Args: table: Table name key_field: Primary key field name key_value: Primary key value fields: Dictionary of fields to update Returns: True if successful, False otherwise """ if not self.is_connected(): if not self.connect(): return False try: cur = self.connection.cursor() # Build the UPDATE query dynamically set_clauses = [] values = [] for field, value in fields.items(): if value == "NOW()": set_clauses.append(f"{field} = NOW()") else: set_clauses.append(f"{field} = %s") values.append(value) # Add schema prefix if table doesn't already have it full_table_name = table if '.' in table else f"gas_station_1.{table}" query = f""" INSERT INTO {full_table_name} ({key_field}, {', '.join(fields.keys())}) VALUES (%s, {', '.join(['%s'] * len(fields))}) ON CONFLICT ({key_field}) DO UPDATE SET {', '.join(set_clauses)} """ # Add key_value to the beginning of values list all_values = [key_value] + list(fields.values()) + values cur.execute(query, all_values) self.connection.commit() cur.close() logger.info(f"Updated {table} for {key_field}={key_value}") return True except Exception as e: logger.error(f"Failed to execute update on {table}: {e}") if self.connection: self.connection.rollback() return False def create_car_frontal_info_table(self) -> bool: """ Create the car_frontal_info table in gas_station_1 schema if it doesn't exist. Returns: True if successful, False otherwise """ if not self.is_connected(): if not self.connect(): return False try: # Since the database already exists, just verify connection cur = self.connection.cursor() # Simple verification that the table exists cur.execute(""" SELECT EXISTS ( SELECT FROM information_schema.tables WHERE table_schema = 'gas_station_1' AND table_name = 'car_frontal_info' ) """) table_exists = cur.fetchone()[0] cur.close() if table_exists: logger.info("Verified car_frontal_info table exists") return True else: logger.error("car_frontal_info table does not exist in the database") return False except Exception as e: logger.error(f"Failed to create car_frontal_info table: {e}") if self.connection: self.connection.rollback() return False def insert_initial_detection(self, display_id: str, captured_timestamp: str, session_id: str = None) -> str: """ Insert initial detection record and return the session_id. Args: display_id: Display identifier captured_timestamp: Timestamp of the detection session_id: Optional session ID, generates one if not provided Returns: Session ID string or None on error """ if not self.is_connected(): if not self.connect(): return None # Generate session_id if not provided if not session_id: session_id = str(uuid.uuid4()) try: # Ensure table exists if not self.create_car_frontal_info_table(): logger.error("Failed to create/verify table before insertion") return None cur = self.connection.cursor() insert_query = """ INSERT INTO gas_station_1.car_frontal_info (display_id, captured_timestamp, session_id, license_character, license_type, car_brand, car_model, car_body_type) VALUES (%s, %s, %s, NULL, 'No model available', NULL, NULL, NULL) ON CONFLICT (session_id) DO NOTHING """ cur.execute(insert_query, (display_id, captured_timestamp, session_id)) self.connection.commit() cur.close() logger.info(f"Inserted initial detection record with session_id: {session_id}") return session_id except Exception as e: logger.error(f"Failed to insert initial detection record: {e}") if self.connection: self.connection.rollback() return None def get_session_info(self, session_id: str) -> Optional[Dict[str, Any]]: """ Get session information from the database. Args: session_id: Session identifier Returns: Dictionary with session data or None if not found """ if not self.is_connected(): if not self.connect(): return None try: cur = self.connection.cursor(cursor_factory=psycopg2.extras.RealDictCursor) query = "SELECT * FROM gas_station_1.car_frontal_info WHERE session_id = %s" cur.execute(query, (session_id,)) result = cur.fetchone() cur.close() if result: return dict(result) else: logger.debug(f"No session info found for session_id: {session_id}") return None except Exception as e: logger.error(f"Failed to get session info: {e}") return None def delete_session(self, session_id: str) -> bool: """ Delete session record from the database. Args: session_id: Session identifier Returns: True if successful, False otherwise """ if not self.is_connected(): if not self.connect(): return False try: cur = self.connection.cursor() query = "DELETE FROM gas_station_1.car_frontal_info WHERE session_id = %s" cur.execute(query, (session_id,)) rows_affected = cur.rowcount self.connection.commit() cur.close() if rows_affected > 0: logger.info(f"Deleted session record: {session_id}") return True else: logger.warning(f"No session record found to delete: {session_id}") return False except Exception as e: logger.error(f"Failed to delete session: {e}") if self.connection: self.connection.rollback() return False def get_statistics(self) -> Dict[str, Any]: """ Get database statistics. Returns: Dictionary with database statistics """ stats = { 'connected': self.is_connected(), 'host': self.config.get('host', 'unknown'), 'port': self.config.get('port', 'unknown'), 'database': self.config.get('database', 'unknown') } if self.is_connected(): try: cur = self.connection.cursor() # Get table record count cur.execute("SELECT COUNT(*) FROM gas_station_1.car_frontal_info") stats['total_records'] = cur.fetchone()[0] # Get recent records count (last hour) cur.execute(""" SELECT COUNT(*) FROM gas_station_1.car_frontal_info WHERE created_at > NOW() - INTERVAL '1 hour' """) stats['recent_records'] = cur.fetchone()[0] cur.close() except Exception as e: logger.warning(f"Failed to get database statistics: {e}") stats['error'] = str(e) return stats