From a1d358aead6f5baa8d8165988fbd5dc1335f3a8c Mon Sep 17 00:00:00 2001 From: ziesorx Date: Sun, 10 Aug 2025 13:11:38 +0700 Subject: [PATCH] Done setup and integration redis and postgresql --- requirements.txt | 5 +- siwatsystem/database.py | 112 ++++++++++++++++++++++++++++++++++++++++ siwatsystem/pympta.py | 90 ++++++++++++++++++++++++++------ 3 files changed, 189 insertions(+), 18 deletions(-) create mode 100644 siwatsystem/database.py diff --git a/requirements.txt b/requirements.txt index 133b3a2..c0691b8 100644 --- a/requirements.txt +++ b/requirements.txt @@ -7,4 +7,7 @@ opencv-python websockets fastapi[standard] redis -urllib3<2.0.0 \ No newline at end of file +urllib3<2.0.0 +psycopg2-binary +scipy +filterpy \ No newline at end of file diff --git a/siwatsystem/database.py b/siwatsystem/database.py new file mode 100644 index 0000000..d298bdf --- /dev/null +++ b/siwatsystem/database.py @@ -0,0 +1,112 @@ +import psycopg2 +import psycopg2.extras +from typing import Optional, Dict, Any +import logging + +logger = logging.getLogger(__name__) + +class DatabaseManager: + def __init__(self, config: Dict[str, Any]): + self.config = config + self.connection: Optional[psycopg2.extensions.connection] = None + + def connect(self) -> bool: + try: + self.connection = psycopg2.connect( + host=self.config['host'], + port=self.config['port'], + database=self.config['database'], + user=self.config['username'], + password=self.config['password'] + ) + logger.info("PostgreSQL connection established successfully") + return True + except Exception as e: + logger.error(f"Failed to connect to PostgreSQL: {e}") + return False + + def disconnect(self): + if self.connection: + self.connection.close() + self.connection = None + logger.info("PostgreSQL connection closed") + + def is_connected(self) -> bool: + try: + if self.connection and not self.connection.closed: + cur = self.connection.cursor() + cur.execute("SELECT 1") + cur.fetchone() + cur.close() + return True + except: + pass + return False + + def update_car_info(self, session_id: str, brand: str, model: str, body_type: str) -> bool: + if not self.is_connected(): + if not self.connect(): + return False + + try: + cur = self.connection.cursor() + query = """ + INSERT INTO car_frontal_info (session_id, car_brand, car_model, car_body_type, updated_at) + VALUES (%s, %s, %s, %s, NOW()) + ON CONFLICT (session_id) + DO UPDATE SET + car_brand = EXCLUDED.car_brand, + car_model = EXCLUDED.car_model, + car_body_type = EXCLUDED.car_body_type, + updated_at = NOW() + """ + cur.execute(query, (session_id, brand, model, body_type)) + self.connection.commit() + cur.close() + logger.info(f"Updated car info for session {session_id}: {brand} {model} ({body_type})") + return True + except Exception as e: + logger.error(f"Failed to update car info: {e}") + if self.connection: + self.connection.rollback() + return False + + def execute_update(self, table: str, key_field: str, key_value: str, fields: Dict[str, str]) -> bool: + if not self.is_connected(): + if not self.connect(): + return False + + try: + cur = self.connection.cursor() + + # Build the UPDATE query dynamically + set_clauses = [] + values = [] + + for field, value in fields.items(): + if value == "NOW()": + set_clauses.append(f"{field} = NOW()") + else: + set_clauses.append(f"{field} = %s") + values.append(value) + + query = f""" + INSERT INTO {table} ({key_field}, {', '.join(fields.keys())}) + VALUES (%s, {', '.join(['%s'] * len(fields))}) + ON CONFLICT ({key_field}) + DO UPDATE SET {', '.join(set_clauses)} + """ + + # Add key_value to the beginning of values list + all_values = [key_value] + list(fields.values()) + values + + cur.execute(query, all_values) + self.connection.commit() + cur.close() + logger.info(f"Updated {table} for {key_field}={key_value}") + return True + except Exception as e: + logger.error(f"Failed to execute update on {table}: {e}") + if self.connection: + self.connection.rollback() + return False \ No newline at end of file diff --git a/siwatsystem/pympta.py b/siwatsystem/pympta.py index f151b55..3642dee 100644 --- a/siwatsystem/pympta.py +++ b/siwatsystem/pympta.py @@ -12,11 +12,40 @@ import time import uuid from ultralytics import YOLO from urllib.parse import urlparse +from .database import DatabaseManager # Create a logger specifically for this module logger = logging.getLogger("detector_worker.pympta") -def load_pipeline_node(node_config: dict, mpta_dir: str, redis_client) -> dict: +def validate_redis_config(redis_config: dict) -> bool: + """Validate Redis configuration parameters.""" + required_fields = ["host", "port"] + for field in required_fields: + if field not in redis_config: + logger.error(f"Missing required Redis config field: {field}") + return False + + if not isinstance(redis_config["port"], int) or redis_config["port"] <= 0: + logger.error(f"Invalid Redis port: {redis_config['port']}") + return False + + return True + +def validate_postgresql_config(pg_config: dict) -> bool: + """Validate PostgreSQL configuration parameters.""" + required_fields = ["host", "port", "database", "username", "password"] + for field in required_fields: + if field not in pg_config: + logger.error(f"Missing required PostgreSQL config field: {field}") + return False + + if not isinstance(pg_config["port"], int) or pg_config["port"] <= 0: + logger.error(f"Invalid PostgreSQL port: {pg_config['port']}") + return False + + return True + +def load_pipeline_node(node_config: dict, mpta_dir: str, redis_client, db_manager=None) -> dict: # Recursively load a model node from configuration. model_path = os.path.join(mpta_dir, node_config["modelFile"]) if not os.path.exists(model_path): @@ -46,16 +75,22 @@ def load_pipeline_node(node_config: dict, mpta_dir: str, redis_client) -> dict: "triggerClasses": trigger_classes, "triggerClassIndices": trigger_class_indices, "crop": node_config.get("crop", False), + "cropClass": node_config.get("cropClass"), "minConfidence": node_config.get("minConfidence", None), + "multiClass": node_config.get("multiClass", False), + "expectedClasses": node_config.get("expectedClasses", []), + "parallel": node_config.get("parallel", False), "actions": node_config.get("actions", []), + "parallelActions": node_config.get("parallelActions", []), "model": model, "branches": [], - "redis_client": redis_client + "redis_client": redis_client, + "db_manager": db_manager } logger.debug(f"Configured node {node_config['modelId']} with trigger classes: {node['triggerClasses']}") for child in node_config.get("branches", []): logger.debug(f"Loading branch for parent node {node_config['modelId']}") - node["branches"].append(load_pipeline_node(child, mpta_dir, redis_client)) + node["branches"].append(load_pipeline_node(child, mpta_dir, redis_client, db_manager)) return node def load_pipeline_from_zip(zip_source: str, target_dir: str) -> dict: @@ -168,21 +203,42 @@ def load_pipeline_from_zip(zip_source: str, target_dir: str) -> dict: redis_client = None if "redis" in pipeline_config: redis_config = pipeline_config["redis"] - try: - redis_client = redis.Redis( - host=redis_config["host"], - port=redis_config["port"], - password=redis_config.get("password"), - db=redis_config.get("db", 0), - decode_responses=True - ) - redis_client.ping() - logger.info(f"Successfully connected to Redis at {redis_config['host']}:{redis_config['port']}") - except redis.exceptions.ConnectionError as e: - logger.error(f"Failed to connect to Redis: {e}") - redis_client = None + if not validate_redis_config(redis_config): + logger.error("Invalid Redis configuration, skipping Redis connection") + else: + try: + redis_client = redis.Redis( + host=redis_config["host"], + port=redis_config["port"], + password=redis_config.get("password"), + db=redis_config.get("db", 0), + decode_responses=True + ) + redis_client.ping() + logger.info(f"Successfully connected to Redis at {redis_config['host']}:{redis_config['port']}") + except redis.exceptions.ConnectionError as e: + logger.error(f"Failed to connect to Redis: {e}") + redis_client = None - return load_pipeline_node(pipeline_config["pipeline"], mpta_dir, redis_client) + # Establish PostgreSQL connection if configured + db_manager = None + if "postgresql" in pipeline_config: + pg_config = pipeline_config["postgresql"] + if not validate_postgresql_config(pg_config): + logger.error("Invalid PostgreSQL configuration, skipping database connection") + else: + try: + db_manager = DatabaseManager(pg_config) + if db_manager.connect(): + logger.info(f"Successfully connected to PostgreSQL at {pg_config['host']}:{pg_config['port']}") + else: + logger.error("Failed to connect to PostgreSQL") + db_manager = None + except Exception as e: + logger.error(f"Error initializing PostgreSQL connection: {e}") + db_manager = None + + return load_pipeline_node(pipeline_config["pipeline"], mpta_dir, redis_client, db_manager) except json.JSONDecodeError as e: logger.error(f"Error parsing pipeline.json: {str(e)}", exc_info=True) return None