python-detector-worker/core/storage/database.py
2025-09-24 20:29:31 +07:00

357 lines
No EOL
12 KiB
Python

"""
Database Operations Module.
Handles PostgreSQL operations for the detection pipeline.
"""
import psycopg2
import psycopg2.extras
from typing import Optional, Dict, Any
import logging
import uuid
logger = logging.getLogger(__name__)
class DatabaseManager:
"""
Manages PostgreSQL connections and operations for the detection pipeline.
Handles database operations and schema management.
"""
def __init__(self, config: Dict[str, Any]):
"""
Initialize database manager with configuration.
Args:
config: Database configuration dictionary
"""
self.config = config
self.connection: Optional[psycopg2.extensions.connection] = None
def connect(self) -> bool:
"""
Connect to PostgreSQL database.
Returns:
True if successful, False otherwise
"""
try:
self.connection = psycopg2.connect(
host=self.config['host'],
port=self.config['port'],
database=self.config['database'],
user=self.config['username'],
password=self.config['password']
)
logger.info("PostgreSQL connection established successfully")
return True
except Exception as e:
logger.error(f"Failed to connect to PostgreSQL: {e}")
return False
def disconnect(self):
"""Disconnect from PostgreSQL database."""
if self.connection:
self.connection.close()
self.connection = None
logger.info("PostgreSQL connection closed")
def is_connected(self) -> bool:
"""
Check if database connection is active.
Returns:
True if connected, False otherwise
"""
try:
if self.connection and not self.connection.closed:
cur = self.connection.cursor()
cur.execute("SELECT 1")
cur.fetchone()
cur.close()
return True
except:
pass
return False
def update_car_info(self, session_id: str, brand: str, model: str, body_type: str) -> bool:
"""
Update car information in the database.
Args:
session_id: Session identifier
brand: Car brand
model: Car model
body_type: Car body type
Returns:
True if successful, False otherwise
"""
if not self.is_connected():
if not self.connect():
return False
try:
cur = self.connection.cursor()
query = """
INSERT INTO car_frontal_info (session_id, car_brand, car_model, car_body_type, updated_at)
VALUES (%s, %s, %s, %s, NOW())
ON CONFLICT (session_id)
DO UPDATE SET
car_brand = EXCLUDED.car_brand,
car_model = EXCLUDED.car_model,
car_body_type = EXCLUDED.car_body_type,
updated_at = NOW()
"""
cur.execute(query, (session_id, brand, model, body_type))
self.connection.commit()
cur.close()
logger.info(f"Updated car info for session {session_id}: {brand} {model} ({body_type})")
return True
except Exception as e:
logger.error(f"Failed to update car info: {e}")
if self.connection:
self.connection.rollback()
return False
def execute_update(self, table: str, key_field: str, key_value: str, fields: Dict[str, str]) -> bool:
"""
Execute a dynamic update query on the database.
Args:
table: Table name
key_field: Primary key field name
key_value: Primary key value
fields: Dictionary of fields to update
Returns:
True if successful, False otherwise
"""
if not self.is_connected():
if not self.connect():
return False
try:
cur = self.connection.cursor()
# Build the UPDATE query dynamically
set_clauses = []
values = []
for field, value in fields.items():
if value == "NOW()":
set_clauses.append(f"{field} = NOW()")
else:
set_clauses.append(f"{field} = %s")
values.append(value)
# Add schema prefix if table doesn't already have it
full_table_name = table if '.' in table else f"gas_station_1.{table}"
query = f"""
INSERT INTO {full_table_name} ({key_field}, {', '.join(fields.keys())})
VALUES (%s, {', '.join(['%s'] * len(fields))})
ON CONFLICT ({key_field})
DO UPDATE SET {', '.join(set_clauses)}
"""
# Add key_value to the beginning of values list
all_values = [key_value] + list(fields.values()) + values
cur.execute(query, all_values)
self.connection.commit()
cur.close()
logger.info(f"Updated {table} for {key_field}={key_value}")
return True
except Exception as e:
logger.error(f"Failed to execute update on {table}: {e}")
if self.connection:
self.connection.rollback()
return False
def create_car_frontal_info_table(self) -> bool:
"""
Create the car_frontal_info table in gas_station_1 schema if it doesn't exist.
Returns:
True if successful, False otherwise
"""
if not self.is_connected():
if not self.connect():
return False
try:
# Since the database already exists, just verify connection
cur = self.connection.cursor()
# Simple verification that the table exists
cur.execute("""
SELECT EXISTS (
SELECT FROM information_schema.tables
WHERE table_schema = 'gas_station_1'
AND table_name = 'car_frontal_info'
)
""")
table_exists = cur.fetchone()[0]
cur.close()
if table_exists:
logger.info("Verified car_frontal_info table exists")
return True
else:
logger.error("car_frontal_info table does not exist in the database")
return False
except Exception as e:
logger.error(f"Failed to create car_frontal_info table: {e}")
if self.connection:
self.connection.rollback()
return False
def insert_initial_detection(self, display_id: str, captured_timestamp: str, session_id: str = None) -> str:
"""
Insert initial detection record and return the session_id.
Args:
display_id: Display identifier
captured_timestamp: Timestamp of the detection
session_id: Optional session ID, generates one if not provided
Returns:
Session ID string or None on error
"""
if not self.is_connected():
if not self.connect():
return None
# Generate session_id if not provided
if not session_id:
session_id = str(uuid.uuid4())
try:
# Ensure table exists
if not self.create_car_frontal_info_table():
logger.error("Failed to create/verify table before insertion")
return None
cur = self.connection.cursor()
insert_query = """
INSERT INTO gas_station_1.car_frontal_info
(display_id, captured_timestamp, session_id, license_character, license_type, car_brand, car_model, car_body_type)
VALUES (%s, %s, %s, NULL, 'No model available', NULL, NULL, NULL)
ON CONFLICT (session_id) DO NOTHING
"""
cur.execute(insert_query, (display_id, captured_timestamp, session_id))
self.connection.commit()
cur.close()
logger.info(f"Inserted initial detection record with session_id: {session_id}")
return session_id
except Exception as e:
logger.error(f"Failed to insert initial detection record: {e}")
if self.connection:
self.connection.rollback()
return None
def get_session_info(self, session_id: str) -> Optional[Dict[str, Any]]:
"""
Get session information from the database.
Args:
session_id: Session identifier
Returns:
Dictionary with session data or None if not found
"""
if not self.is_connected():
if not self.connect():
return None
try:
cur = self.connection.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
query = "SELECT * FROM gas_station_1.car_frontal_info WHERE session_id = %s"
cur.execute(query, (session_id,))
result = cur.fetchone()
cur.close()
if result:
return dict(result)
else:
logger.debug(f"No session info found for session_id: {session_id}")
return None
except Exception as e:
logger.error(f"Failed to get session info: {e}")
return None
def delete_session(self, session_id: str) -> bool:
"""
Delete session record from the database.
Args:
session_id: Session identifier
Returns:
True if successful, False otherwise
"""
if not self.is_connected():
if not self.connect():
return False
try:
cur = self.connection.cursor()
query = "DELETE FROM gas_station_1.car_frontal_info WHERE session_id = %s"
cur.execute(query, (session_id,))
rows_affected = cur.rowcount
self.connection.commit()
cur.close()
if rows_affected > 0:
logger.info(f"Deleted session record: {session_id}")
return True
else:
logger.warning(f"No session record found to delete: {session_id}")
return False
except Exception as e:
logger.error(f"Failed to delete session: {e}")
if self.connection:
self.connection.rollback()
return False
def get_statistics(self) -> Dict[str, Any]:
"""
Get database statistics.
Returns:
Dictionary with database statistics
"""
stats = {
'connected': self.is_connected(),
'host': self.config.get('host', 'unknown'),
'port': self.config.get('port', 'unknown'),
'database': self.config.get('database', 'unknown')
}
if self.is_connected():
try:
cur = self.connection.cursor()
# Get table record count
cur.execute("SELECT COUNT(*) FROM gas_station_1.car_frontal_info")
stats['total_records'] = cur.fetchone()[0]
# Get recent records count (last hour)
cur.execute("""
SELECT COUNT(*) FROM gas_station_1.car_frontal_info
WHERE created_at > NOW() - INTERVAL '1 hour'
""")
stats['recent_records'] = cur.fetchone()[0]
cur.close()
except Exception as e:
logger.warning(f"Failed to get database statistics: {e}")
stats['error'] = str(e)
return stats