import psycopg2 import psycopg2.extras from typing import Optional, Dict, Any import logging logger = logging.getLogger(__name__) class DatabaseManager: def __init__(self, config: Dict[str, Any]): self.config = config self.connection: Optional[psycopg2.extensions.connection] = None def connect(self) -> bool: try: self.connection = psycopg2.connect( host=self.config['host'], port=self.config['port'], database=self.config['database'], user=self.config['username'], password=self.config['password'] ) logger.info("PostgreSQL connection established successfully") return True except Exception as e: logger.error(f"Failed to connect to PostgreSQL: {e}") return False def disconnect(self): if self.connection: self.connection.close() self.connection = None logger.info("PostgreSQL connection closed") def is_connected(self) -> bool: try: if self.connection and not self.connection.closed: cur = self.connection.cursor() cur.execute("SELECT 1") cur.fetchone() cur.close() return True except: pass return False def update_car_info(self, session_id: str, brand: str, model: str, body_type: str) -> bool: if not self.is_connected(): if not self.connect(): return False try: cur = self.connection.cursor() query = """ INSERT INTO car_frontal_info (session_id, car_brand, car_model, car_body_type, updated_at) VALUES (%s, %s, %s, %s, NOW()) ON CONFLICT (session_id) DO UPDATE SET car_brand = EXCLUDED.car_brand, car_model = EXCLUDED.car_model, car_body_type = EXCLUDED.car_body_type, updated_at = NOW() """ cur.execute(query, (session_id, brand, model, body_type)) self.connection.commit() cur.close() logger.info(f"Updated car info for session {session_id}: {brand} {model} ({body_type})") return True except Exception as e: logger.error(f"Failed to update car info: {e}") if self.connection: self.connection.rollback() return False def execute_update(self, table: str, key_field: str, key_value: str, fields: Dict[str, str]) -> bool: if not self.is_connected(): if not self.connect(): return False try: cur = self.connection.cursor() # Build the UPDATE query dynamically set_clauses = [] values = [] for field, value in fields.items(): if value == "NOW()": set_clauses.append(f"{field} = NOW()") else: set_clauses.append(f"{field} = %s") values.append(value) query = f""" INSERT INTO {table} ({key_field}, {', '.join(fields.keys())}) VALUES (%s, {', '.join(['%s'] * len(fields))}) ON CONFLICT ({key_field}) DO UPDATE SET {', '.join(set_clauses)} """ # Add key_value to the beginning of values list all_values = [key_value] + list(fields.values()) + values cur.execute(query, all_values) self.connection.commit() cur.close() logger.info(f"Updated {table} for {key_field}={key_value}") return True except Exception as e: logger.error(f"Failed to execute update on {table}: {e}") if self.connection: self.connection.rollback() return False