refactor: remove all postgresql related #30
6 changed files with 545 additions and 497 deletions
|
|
@ -43,7 +43,7 @@ class BranchProcessor:
|
||||||
|
|
||||||
# Storage managers (set during initialization)
|
# Storage managers (set during initialization)
|
||||||
self.redis_manager = None
|
self.redis_manager = None
|
||||||
self.db_manager = None
|
# self.db_manager = None # Disabled - PostgreSQL operations moved to microservices
|
||||||
|
|
||||||
# Branch execution timeout (seconds)
|
# Branch execution timeout (seconds)
|
||||||
self.branch_timeout = 30.0
|
self.branch_timeout = 30.0
|
||||||
|
|
@ -60,21 +60,21 @@ class BranchProcessor:
|
||||||
|
|
||||||
logger.info("BranchProcessor initialized")
|
logger.info("BranchProcessor initialized")
|
||||||
|
|
||||||
async def initialize(self, pipeline_config: Any, redis_manager: Any, db_manager: Any) -> bool:
|
async def initialize(self, pipeline_config: Any, redis_manager: Any, db_manager: Any = None) -> bool:
|
||||||
"""
|
"""
|
||||||
Initialize branch processor with pipeline configuration.
|
Initialize branch processor with pipeline configuration.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
pipeline_config: Pipeline configuration object
|
pipeline_config: Pipeline configuration object
|
||||||
redis_manager: Redis manager instance
|
redis_manager: Redis manager instance
|
||||||
db_manager: Database manager instance
|
db_manager: Database manager instance (deprecated, not used)
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
True if successful, False otherwise
|
True if successful, False otherwise
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
self.redis_manager = redis_manager
|
self.redis_manager = redis_manager
|
||||||
self.db_manager = db_manager
|
# self.db_manager = db_manager # Disabled - PostgreSQL operations moved to microservices
|
||||||
|
|
||||||
# Parse field mappings from parallelActions to enable dynamic field extraction
|
# Parse field mappings from parallelActions to enable dynamic field extraction
|
||||||
self._parse_branch_output_fields(pipeline_config)
|
self._parse_branch_output_fields(pipeline_config)
|
||||||
|
|
@ -170,22 +170,25 @@ class BranchProcessor:
|
||||||
return
|
return
|
||||||
|
|
||||||
for action in pipeline_config.parallel_actions:
|
for action in pipeline_config.parallel_actions:
|
||||||
|
# Skip PostgreSQL actions - they are disabled
|
||||||
if action.type.value == 'postgresql_update_combined':
|
if action.type.value == 'postgresql_update_combined':
|
||||||
fields = action.params.get('fields', {})
|
logger.debug(f"[FIELD MAPPING] Skipping PostgreSQL action (disabled)")
|
||||||
|
continue # Skip field parsing for disabled PostgreSQL operations
|
||||||
# Parse each field template to extract branch_id and field_name
|
# fields = action.params.get('fields', {})
|
||||||
for db_field_name, template in fields.items():
|
#
|
||||||
# Template format: "{branch_id.field_name}"
|
# # Parse each field template to extract branch_id and field_name
|
||||||
if template.startswith('{') and template.endswith('}'):
|
# for db_field_name, template in fields.items():
|
||||||
var_name = template[1:-1] # Remove { }
|
# # Template format: "{branch_id.field_name}"
|
||||||
|
# if template.startswith('{') and template.endswith('}'):
|
||||||
if '.' in var_name:
|
# var_name = template[1:-1] # Remove { }
|
||||||
branch_id, field_name = var_name.split('.', 1)
|
#
|
||||||
|
# if '.' in var_name:
|
||||||
# Store the mapping
|
# branch_id, field_name = var_name.split('.', 1)
|
||||||
self.branch_output_fields[branch_id] = field_name
|
#
|
||||||
|
# # Store the mapping
|
||||||
logger.info(f"[FIELD MAPPING] Branch '{branch_id}' → outputs field '{field_name}'")
|
# self.branch_output_fields[branch_id] = field_name
|
||||||
|
#
|
||||||
|
# logger.info(f"[FIELD MAPPING] Branch '{branch_id}' → outputs field '{field_name}'")
|
||||||
|
|
||||||
logger.info(f"[FIELD MAPPING] Parsed {len(self.branch_output_fields)} branch output field mappings")
|
logger.info(f"[FIELD MAPPING] Parsed {len(self.branch_output_fields)} branch output field mappings")
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -15,7 +15,7 @@ from ..models.inference import YOLOWrapper
|
||||||
from ..models.pipeline import PipelineParser
|
from ..models.pipeline import PipelineParser
|
||||||
from .branches import BranchProcessor
|
from .branches import BranchProcessor
|
||||||
from ..storage.redis import RedisManager
|
from ..storage.redis import RedisManager
|
||||||
from ..storage.database import DatabaseManager
|
# from ..storage.database import DatabaseManager # Disabled - PostgreSQL moved to microservices
|
||||||
from ..storage.license_plate import LicensePlateManager
|
from ..storage.license_plate import LicensePlateManager
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
@ -45,7 +45,7 @@ class DetectionPipeline:
|
||||||
# Initialize components
|
# Initialize components
|
||||||
self.branch_processor = BranchProcessor(model_manager, model_id)
|
self.branch_processor = BranchProcessor(model_manager, model_id)
|
||||||
self.redis_manager = None
|
self.redis_manager = None
|
||||||
self.db_manager = None
|
# self.db_manager = None # Disabled - PostgreSQL operations moved to microservices
|
||||||
self.license_plate_manager = None
|
self.license_plate_manager = None
|
||||||
|
|
||||||
# Main detection model
|
# Main detection model
|
||||||
|
|
@ -113,16 +113,18 @@ class DetectionPipeline:
|
||||||
return False
|
return False
|
||||||
logger.info("Redis connection initialized")
|
logger.info("Redis connection initialized")
|
||||||
|
|
||||||
# Initialize database connection
|
# PostgreSQL database connection DISABLED - operations moved to microservices
|
||||||
if self.pipeline_parser.postgresql_config:
|
# Database operations are now handled by backend services via WebSocket
|
||||||
self.db_manager = DatabaseManager(self.pipeline_parser.postgresql_config.__dict__)
|
# if self.pipeline_parser.postgresql_config:
|
||||||
if not self.db_manager.connect():
|
# self.db_manager = DatabaseManager(self.pipeline_parser.postgresql_config.__dict__)
|
||||||
logger.error("Failed to initialize database connection")
|
# if not self.db_manager.connect():
|
||||||
return False
|
# logger.error("Failed to initialize database connection")
|
||||||
# Create required tables
|
# return False
|
||||||
if not self.db_manager.create_car_frontal_info_table():
|
# # Create required tables
|
||||||
logger.warning("Failed to create car_frontal_info table")
|
# if not self.db_manager.create_car_frontal_info_table():
|
||||||
logger.info("Database connection initialized")
|
# logger.warning("Failed to create car_frontal_info table")
|
||||||
|
# logger.info("Database connection initialized")
|
||||||
|
logger.info("PostgreSQL operations disabled - using WebSocket for data communication")
|
||||||
|
|
||||||
# Initialize license plate manager (using same Redis config as main Redis manager)
|
# Initialize license plate manager (using same Redis config as main Redis manager)
|
||||||
if self.pipeline_parser.redis_config:
|
if self.pipeline_parser.redis_config:
|
||||||
|
|
@ -138,11 +140,11 @@ class DetectionPipeline:
|
||||||
logger.error("Failed to initialize detection model")
|
logger.error("Failed to initialize detection model")
|
||||||
return False
|
return False
|
||||||
|
|
||||||
# Initialize branch processor
|
# Initialize branch processor (db_manager=None since PostgreSQL is disabled)
|
||||||
if not await self.branch_processor.initialize(
|
if not await self.branch_processor.initialize(
|
||||||
self.pipeline_config,
|
self.pipeline_config,
|
||||||
self.redis_manager,
|
self.redis_manager,
|
||||||
self.db_manager
|
db_manager=None # PostgreSQL disabled
|
||||||
):
|
):
|
||||||
logger.error("Failed to initialize branch processor")
|
logger.error("Failed to initialize branch processor")
|
||||||
return False
|
return False
|
||||||
|
|
@ -283,23 +285,25 @@ class DetectionPipeline:
|
||||||
return
|
return
|
||||||
|
|
||||||
# Send imageDetection message with license plate data combined with processing results
|
# Send imageDetection message with license plate data combined with processing results
|
||||||
|
# This is the PRIMARY data flow to backend - WebSocket is critical, keep this!
|
||||||
await self._send_license_plate_message(subscription_id, license_text, confidence, session_id)
|
await self._send_license_plate_message(subscription_id, license_text, confidence, session_id)
|
||||||
|
|
||||||
# Update database with license plate information if database manager is available
|
# PostgreSQL database update DISABLED - backend handles data via WebSocket messages
|
||||||
if self.db_manager and license_text:
|
# if self.db_manager and license_text:
|
||||||
success = self.db_manager.execute_update(
|
# success = self.db_manager.execute_update(
|
||||||
table='car_frontal_info',
|
# table='car_frontal_info',
|
||||||
key_field='session_id',
|
# key_field='session_id',
|
||||||
key_value=session_id,
|
# key_value=session_id,
|
||||||
fields={
|
# fields={
|
||||||
'license_character': license_text,
|
# 'license_character': license_text,
|
||||||
'license_type': 'LPR_detected' # Mark as detected by LPR service
|
# 'license_type': 'LPR_detected' # Mark as detected by LPR service
|
||||||
}
|
# }
|
||||||
)
|
# )
|
||||||
if success:
|
# if success:
|
||||||
logger.info(f"[LICENSE PLATE] Updated database for session {session_id}")
|
# logger.info(f"[LICENSE PLATE] Updated database for session {session_id}")
|
||||||
else:
|
# else:
|
||||||
logger.warning(f"[LICENSE PLATE] Failed to update database for session {session_id}")
|
# logger.warning(f"[LICENSE PLATE] Failed to update database for session {session_id}")
|
||||||
|
logger.debug(f"[LICENSE PLATE] Data sent via WebSocket for session {session_id}")
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Error in license plate result callback: {e}", exc_info=True)
|
logger.error(f"Error in license plate result callback: {e}", exc_info=True)
|
||||||
|
|
@ -710,17 +714,18 @@ class DetectionPipeline:
|
||||||
self.session_to_subscription[session_id] = subscription_id
|
self.session_to_subscription[session_id] = subscription_id
|
||||||
logger.info(f"[SESSION MAPPING] Stored mapping: sessionId '{session_id}' -> subscriptionIdentifier '{subscription_id}'")
|
logger.info(f"[SESSION MAPPING] Stored mapping: sessionId '{session_id}' -> subscriptionIdentifier '{subscription_id}'")
|
||||||
|
|
||||||
# Initialize database record with session_id
|
# PostgreSQL database insert DISABLED - backend handles data via WebSocket
|
||||||
if session_id and self.db_manager:
|
# if session_id and self.db_manager:
|
||||||
success = self.db_manager.insert_initial_detection(
|
# success = self.db_manager.insert_initial_detection(
|
||||||
display_id=display_id,
|
# display_id=display_id,
|
||||||
captured_timestamp=detection_context['timestamp'],
|
# captured_timestamp=detection_context['timestamp'],
|
||||||
session_id=session_id
|
# session_id=session_id
|
||||||
)
|
# )
|
||||||
if success:
|
# if success:
|
||||||
logger.info(f"Created initial database record with session {session_id}")
|
# logger.info(f"Created initial database record with session {session_id}")
|
||||||
else:
|
# else:
|
||||||
logger.warning(f"Failed to create initial database record for session {session_id}")
|
# logger.warning(f"Failed to create initial database record for session {session_id}")
|
||||||
|
logger.debug(f"Session {session_id} will be communicated via WebSocket")
|
||||||
|
|
||||||
# Execute branches in parallel
|
# Execute branches in parallel
|
||||||
if hasattr(self.pipeline_config, 'branches') and self.pipeline_config.branches:
|
if hasattr(self.pipeline_config, 'branches') and self.pipeline_config.branches:
|
||||||
|
|
@ -886,15 +891,16 @@ class DetectionPipeline:
|
||||||
if valid_detections:
|
if valid_detections:
|
||||||
logger.info(f"Found {len(valid_detections)} valid detections for pipeline processing")
|
logger.info(f"Found {len(valid_detections)} valid detections for pipeline processing")
|
||||||
|
|
||||||
# Initialize database record if session_id is provided
|
# PostgreSQL database insert DISABLED - backend handles data via WebSocket
|
||||||
if session_id and self.db_manager:
|
# if session_id and self.db_manager:
|
||||||
success = self.db_manager.insert_initial_detection(
|
# success = self.db_manager.insert_initial_detection(
|
||||||
display_id=display_id,
|
# display_id=display_id,
|
||||||
captured_timestamp=detection_context['timestamp'],
|
# captured_timestamp=detection_context['timestamp'],
|
||||||
session_id=session_id
|
# session_id=session_id
|
||||||
)
|
# )
|
||||||
if not success:
|
# if not success:
|
||||||
logger.warning(f"Failed to create initial database record for session {session_id}")
|
# logger.warning(f"Failed to create initial database record for session {session_id}")
|
||||||
|
logger.debug(f"Detection results for session {session_id} will be sent via WebSocket")
|
||||||
|
|
||||||
# Execute branches in parallel
|
# Execute branches in parallel
|
||||||
if hasattr(self.pipeline_config, 'branches') and self.pipeline_config.branches:
|
if hasattr(self.pipeline_config, 'branches') and self.pipeline_config.branches:
|
||||||
|
|
@ -1025,11 +1031,16 @@ class DetectionPipeline:
|
||||||
logger.debug(f"Executing parallel action: {action_type}")
|
logger.debug(f"Executing parallel action: {action_type}")
|
||||||
|
|
||||||
if action_type == 'postgresql_update_combined':
|
if action_type == 'postgresql_update_combined':
|
||||||
result = await self._execute_postgresql_update_combined(action, context)
|
# PostgreSQL action SKIPPED - database operations disabled
|
||||||
|
logger.info(f"Skipping PostgreSQL action '{action_type}' (disabled)")
|
||||||
|
result = {'status': 'skipped', 'message': 'PostgreSQL operations disabled'}
|
||||||
|
|
||||||
# Update session state with processing results after database update
|
# Still update session state for WebSocket messaging
|
||||||
if result.get('status') == 'success':
|
|
||||||
await self._update_session_with_processing_results(context)
|
await self._update_session_with_processing_results(context)
|
||||||
|
|
||||||
|
# result = await self._execute_postgresql_update_combined(action, context)
|
||||||
|
# if result.get('status') == 'success':
|
||||||
|
# await self._update_session_with_processing_results(context)
|
||||||
else:
|
else:
|
||||||
logger.warning(f"Unknown parallel action type: {action_type}")
|
logger.warning(f"Unknown parallel action type: {action_type}")
|
||||||
result = {'status': 'error', 'message': f'Unknown action type: {action_type}'}
|
result = {'status': 'error', 'message': f'Unknown action type: {action_type}'}
|
||||||
|
|
@ -1132,59 +1143,61 @@ class DetectionPipeline:
|
||||||
logger.error(f"Error in redis_publish action: {e}", exc_info=True)
|
logger.error(f"Error in redis_publish action: {e}", exc_info=True)
|
||||||
return {'status': 'error', 'message': str(e)}
|
return {'status': 'error', 'message': str(e)}
|
||||||
|
|
||||||
async def _execute_postgresql_update_combined(self,
|
# PostgreSQL update method DISABLED - database operations moved to microservices
|
||||||
action: Dict,
|
# This method is no longer used as data flows via WebSocket messages to backend
|
||||||
context: Dict[str, Any]) -> Dict[str, Any]:
|
# async def _execute_postgresql_update_combined(self,
|
||||||
"""Execute postgresql_update_combined action."""
|
# action: Dict,
|
||||||
if not self.db_manager:
|
# context: Dict[str, Any]) -> Dict[str, Any]:
|
||||||
return {'status': 'error', 'message': 'Database not available'}
|
# """Execute postgresql_update_combined action."""
|
||||||
|
# if not self.db_manager:
|
||||||
try:
|
# return {'status': 'error', 'message': 'Database not available'}
|
||||||
# Wait for required branches if specified
|
#
|
||||||
wait_for_branches = action.params.get('waitForBranches', [])
|
# try:
|
||||||
branch_results = context.get('branch_results', {})
|
# # Wait for required branches if specified
|
||||||
|
# wait_for_branches = action.params.get('waitForBranches', [])
|
||||||
# Log missing branches but don't block the update (allow partial results)
|
# branch_results = context.get('branch_results', {})
|
||||||
missing_branches = [b for b in wait_for_branches if b not in branch_results]
|
#
|
||||||
if missing_branches:
|
# # Log missing branches but don't block the update (allow partial results)
|
||||||
logger.warning(f"Some branches missing from results (will use null): {missing_branches}")
|
# missing_branches = [b for b in wait_for_branches if b not in branch_results]
|
||||||
available_branches = [b for b in wait_for_branches if b in branch_results]
|
# if missing_branches:
|
||||||
if available_branches:
|
# logger.warning(f"Some branches missing from results (will use null): {missing_branches}")
|
||||||
logger.info(f"Available branches for database update: {available_branches}")
|
# available_branches = [b for b in wait_for_branches if b in branch_results]
|
||||||
|
# if available_branches:
|
||||||
# Prepare fields for database update
|
# logger.info(f"Available branches for database update: {available_branches}")
|
||||||
table = action.params.get('table', 'car_frontal_info')
|
#
|
||||||
key_field = action.params.get('key_field', 'session_id')
|
# # Prepare fields for database update
|
||||||
key_value = action.params.get('key_value', '{session_id}').format(**context)
|
# table = action.params.get('table', 'car_frontal_info')
|
||||||
field_mappings = action.params.get('fields', {})
|
# key_field = action.params.get('key_field', 'session_id')
|
||||||
|
# key_value = action.params.get('key_value', '{session_id}').format(**context)
|
||||||
# Resolve field values using branch results
|
# field_mappings = action.params.get('fields', {})
|
||||||
resolved_fields = {}
|
#
|
||||||
for field_name, field_template in field_mappings.items():
|
# # Resolve field values using branch results
|
||||||
try:
|
# resolved_fields = {}
|
||||||
# Replace template variables with actual values from branch results
|
# for field_name, field_template in field_mappings.items():
|
||||||
resolved_value = self._resolve_field_template(field_template, branch_results, context)
|
# try:
|
||||||
resolved_fields[field_name] = resolved_value
|
# # Replace template variables with actual values from branch results
|
||||||
except Exception as e:
|
# resolved_value = self._resolve_field_template(field_template, branch_results, context)
|
||||||
logger.warning(f"Failed to resolve field {field_name}: {e}")
|
# resolved_fields[field_name] = resolved_value
|
||||||
resolved_fields[field_name] = None
|
# except Exception as e:
|
||||||
|
# logger.warning(f"Failed to resolve field {field_name}: {e}")
|
||||||
# Execute database update
|
# resolved_fields[field_name] = None
|
||||||
success = self.db_manager.execute_update(
|
#
|
||||||
table=table,
|
# # Execute database update
|
||||||
key_field=key_field,
|
# success = self.db_manager.execute_update(
|
||||||
key_value=key_value,
|
# table=table,
|
||||||
fields=resolved_fields
|
# key_field=key_field,
|
||||||
)
|
# key_value=key_value,
|
||||||
|
# fields=resolved_fields
|
||||||
if success:
|
# )
|
||||||
return {'status': 'success', 'table': table, 'key': f'{key_field}={key_value}', 'fields': resolved_fields}
|
#
|
||||||
else:
|
# if success:
|
||||||
return {'status': 'error', 'message': 'Database update failed'}
|
# return {'status': 'success', 'table': table, 'key': f'{key_field}={key_value}', 'fields': resolved_fields}
|
||||||
|
# else:
|
||||||
except Exception as e:
|
# return {'status': 'error', 'message': 'Database update failed'}
|
||||||
logger.error(f"Error in postgresql_update_combined action: {e}", exc_info=True)
|
#
|
||||||
return {'status': 'error', 'message': str(e)}
|
# except Exception as e:
|
||||||
|
# logger.error(f"Error in postgresql_update_combined action: {e}", exc_info=True)
|
||||||
|
# return {'status': 'error', 'message': str(e)}
|
||||||
|
|
||||||
def _resolve_field_template(self, template: str, branch_results: Dict, context: Dict) -> str:
|
def _resolve_field_template(self, template: str, branch_results: Dict, context: Dict) -> str:
|
||||||
"""
|
"""
|
||||||
|
|
@ -1270,7 +1283,7 @@ class DetectionPipeline:
|
||||||
'branches': branch_stats,
|
'branches': branch_stats,
|
||||||
'license_plate': license_stats,
|
'license_plate': license_stats,
|
||||||
'redis_available': self.redis_manager is not None,
|
'redis_available': self.redis_manager is not None,
|
||||||
'database_available': self.db_manager is not None,
|
# 'database_available': self.db_manager is not None, # PostgreSQL disabled
|
||||||
'detection_model_loaded': self.detection_model is not None
|
'detection_model_loaded': self.detection_model is not None
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -1282,8 +1295,9 @@ class DetectionPipeline:
|
||||||
if self.redis_manager:
|
if self.redis_manager:
|
||||||
self.redis_manager.cleanup()
|
self.redis_manager.cleanup()
|
||||||
|
|
||||||
if self.db_manager:
|
# PostgreSQL disconnect DISABLED - database operations moved to microservices
|
||||||
self.db_manager.disconnect()
|
# if self.db_manager:
|
||||||
|
# self.db_manager.disconnect()
|
||||||
|
|
||||||
if self.branch_processor:
|
if self.branch_processor:
|
||||||
self.branch_processor.cleanup()
|
self.branch_processor.cleanup()
|
||||||
|
|
|
||||||
|
|
@ -11,7 +11,7 @@ from .pipeline import (
|
||||||
Action,
|
Action,
|
||||||
ActionType,
|
ActionType,
|
||||||
RedisConfig,
|
RedisConfig,
|
||||||
PostgreSQLConfig
|
# PostgreSQLConfig # Disabled - moved to microservices
|
||||||
)
|
)
|
||||||
from .inference import (
|
from .inference import (
|
||||||
YOLOWrapper,
|
YOLOWrapper,
|
||||||
|
|
@ -32,7 +32,7 @@ __all__ = [
|
||||||
'Action',
|
'Action',
|
||||||
'ActionType',
|
'ActionType',
|
||||||
'RedisConfig',
|
'RedisConfig',
|
||||||
'PostgreSQLConfig',
|
# 'PostgreSQLConfig', # Disabled - moved to microservices
|
||||||
|
|
||||||
# Inference
|
# Inference
|
||||||
'YOLOWrapper',
|
'YOLOWrapper',
|
||||||
|
|
|
||||||
|
|
@ -16,6 +16,8 @@ class ActionType(Enum):
|
||||||
"""Supported action types in pipeline"""
|
"""Supported action types in pipeline"""
|
||||||
REDIS_SAVE_IMAGE = "redis_save_image"
|
REDIS_SAVE_IMAGE = "redis_save_image"
|
||||||
REDIS_PUBLISH = "redis_publish"
|
REDIS_PUBLISH = "redis_publish"
|
||||||
|
# PostgreSQL actions below are DEPRECATED - kept for backward compatibility only
|
||||||
|
# These actions will be silently skipped during pipeline execution
|
||||||
POSTGRESQL_UPDATE = "postgresql_update"
|
POSTGRESQL_UPDATE = "postgresql_update"
|
||||||
POSTGRESQL_UPDATE_COMBINED = "postgresql_update_combined"
|
POSTGRESQL_UPDATE_COMBINED = "postgresql_update_combined"
|
||||||
POSTGRESQL_INSERT = "postgresql_insert"
|
POSTGRESQL_INSERT = "postgresql_insert"
|
||||||
|
|
@ -41,7 +43,15 @@ class RedisConfig:
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class PostgreSQLConfig:
|
class PostgreSQLConfig:
|
||||||
"""PostgreSQL connection configuration"""
|
"""
|
||||||
|
PostgreSQL connection configuration - DISABLED
|
||||||
|
|
||||||
|
NOTE: This configuration is kept for backward compatibility with existing
|
||||||
|
pipeline.json files, but PostgreSQL operations are disabled. All database
|
||||||
|
operations have been moved to microservices architecture.
|
||||||
|
|
||||||
|
This config will be parsed but not used for any database connections.
|
||||||
|
"""
|
||||||
host: str
|
host: str
|
||||||
port: int
|
port: int
|
||||||
database: str
|
database: str
|
||||||
|
|
@ -50,6 +60,7 @@ class PostgreSQLConfig:
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def from_dict(cls, data: Dict[str, Any]) -> 'PostgreSQLConfig':
|
def from_dict(cls, data: Dict[str, Any]) -> 'PostgreSQLConfig':
|
||||||
|
"""Parse PostgreSQL config from dict (kept for backward compatibility)"""
|
||||||
return cls(
|
return cls(
|
||||||
host=data['host'],
|
host=data['host'],
|
||||||
port=data.get('port', 5432),
|
port=data.get('port', 5432),
|
||||||
|
|
@ -272,17 +283,19 @@ class PipelineParser:
|
||||||
if not self._validate_actions(self.pipeline_config):
|
if not self._validate_actions(self.pipeline_config):
|
||||||
return False
|
return False
|
||||||
|
|
||||||
# Validate parallel actions
|
# Validate parallel actions (PostgreSQL actions are skipped)
|
||||||
for action in self.pipeline_config.parallel_actions:
|
for action in self.pipeline_config.parallel_actions:
|
||||||
if action.type == ActionType.POSTGRESQL_UPDATE_COMBINED:
|
if action.type == ActionType.POSTGRESQL_UPDATE_COMBINED:
|
||||||
wait_for = action.params.get('waitForBranches', [])
|
logger.warning(f"PostgreSQL parallel action {action.type.value} found but will be SKIPPED (PostgreSQL disabled)")
|
||||||
if wait_for:
|
# Skip validation for PostgreSQL actions since they won't be executed
|
||||||
# Check that referenced branches exist
|
# wait_for = action.params.get('waitForBranches', [])
|
||||||
branch_ids = self._get_all_branch_ids(self.pipeline_config)
|
# if wait_for:
|
||||||
for branch_id in wait_for:
|
# # Check that referenced branches exist
|
||||||
if branch_id not in branch_ids:
|
# branch_ids = self._get_all_branch_ids(self.pipeline_config)
|
||||||
logger.error(f"Referenced branch '{branch_id}' in waitForBranches not found")
|
# for branch_id in wait_for:
|
||||||
return False
|
# if branch_id not in branch_ids:
|
||||||
|
# logger.error(f"Referenced branch '{branch_id}' in waitForBranches not found")
|
||||||
|
# return False
|
||||||
|
|
||||||
logger.info("Pipeline configuration validated successfully")
|
logger.info("Pipeline configuration validated successfully")
|
||||||
return True
|
return True
|
||||||
|
|
@ -305,11 +318,14 @@ class PipelineParser:
|
||||||
logger.error(f"Action {action.type} requires Redis configuration")
|
logger.error(f"Action {action.type} requires Redis configuration")
|
||||||
return False
|
return False
|
||||||
|
|
||||||
# Validate PostgreSQL actions need PostgreSQL config
|
# PostgreSQL actions are disabled - log warning instead of failing
|
||||||
|
# Kept for backward compatibility with existing pipeline.json files
|
||||||
if action.type in [ActionType.POSTGRESQL_UPDATE, ActionType.POSTGRESQL_UPDATE_COMBINED, ActionType.POSTGRESQL_INSERT]:
|
if action.type in [ActionType.POSTGRESQL_UPDATE, ActionType.POSTGRESQL_UPDATE_COMBINED, ActionType.POSTGRESQL_INSERT]:
|
||||||
if not self.postgresql_config:
|
logger.warning(f"PostgreSQL action {action.type.value} found but will be SKIPPED (PostgreSQL disabled)")
|
||||||
logger.error(f"Action {action.type} requires PostgreSQL configuration")
|
# Do not fail validation - just skip these actions during execution
|
||||||
return False
|
# if not self.postgresql_config:
|
||||||
|
# logger.error(f"Action {action.type} requires PostgreSQL configuration")
|
||||||
|
# return False
|
||||||
|
|
||||||
# Recursively validate branches
|
# Recursively validate branches
|
||||||
if hasattr(config, 'branches'):
|
if hasattr(config, 'branches'):
|
||||||
|
|
|
||||||
|
|
@ -1,10 +1,13 @@
|
||||||
"""
|
"""
|
||||||
Storage module for the Python Detector Worker.
|
Storage module for the Python Detector Worker.
|
||||||
|
|
||||||
This module provides Redis and PostgreSQL operations for data persistence
|
This module provides Redis operations for data persistence
|
||||||
and caching in the detection pipeline.
|
and caching in the detection pipeline.
|
||||||
|
|
||||||
|
Note: PostgreSQL operations have been disabled as database functionality
|
||||||
|
has been moved to microservices architecture.
|
||||||
"""
|
"""
|
||||||
from .redis import RedisManager
|
from .redis import RedisManager
|
||||||
from .database import DatabaseManager
|
# from .database import DatabaseManager # Disabled - moved to microservices
|
||||||
|
|
||||||
__all__ = ['RedisManager', 'DatabaseManager']
|
__all__ = ['RedisManager'] # Removed 'DatabaseManager'
|
||||||
|
|
@ -1,357 +1,369 @@
|
||||||
"""
|
"""
|
||||||
Database Operations Module.
|
Database Operations Module - DISABLED
|
||||||
Handles PostgreSQL operations for the detection pipeline.
|
|
||||||
|
NOTE: This module has been disabled as PostgreSQL database operations have been
|
||||||
|
moved to microservices architecture. All database connections, reads, and writes
|
||||||
|
are now handled by separate backend services.
|
||||||
|
|
||||||
|
The detection worker now communicates results via:
|
||||||
|
- WebSocket imageDetection messages (primary data flow to backend)
|
||||||
|
- Redis image storage and pub/sub (temporary storage)
|
||||||
|
|
||||||
|
Original functionality: PostgreSQL operations for the detection pipeline.
|
||||||
|
Status: Commented out - DO NOT ENABLE without updating architecture
|
||||||
"""
|
"""
|
||||||
import psycopg2
|
|
||||||
import psycopg2.extras
|
# All PostgreSQL functionality below has been commented out
|
||||||
|
# import psycopg2
|
||||||
|
# import psycopg2.extras
|
||||||
from typing import Optional, Dict, Any
|
from typing import Optional, Dict, Any
|
||||||
import logging
|
import logging
|
||||||
import uuid
|
# import uuid
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
# DatabaseManager class is disabled - all methods commented out
|
||||||
class DatabaseManager:
|
# class DatabaseManager:
|
||||||
"""
|
# """
|
||||||
Manages PostgreSQL connections and operations for the detection pipeline.
|
# Manages PostgreSQL connections and operations for the detection pipeline.
|
||||||
Handles database operations and schema management.
|
# Handles database operations and schema management.
|
||||||
"""
|
# """
|
||||||
|
#
|
||||||
def __init__(self, config: Dict[str, Any]):
|
# def __init__(self, config: Dict[str, Any]):
|
||||||
"""
|
# """
|
||||||
Initialize database manager with configuration.
|
# Initialize database manager with configuration.
|
||||||
|
#
|
||||||
Args:
|
# Args:
|
||||||
config: Database configuration dictionary
|
# config: Database configuration dictionary
|
||||||
"""
|
# """
|
||||||
self.config = config
|
# self.config = config
|
||||||
self.connection: Optional[psycopg2.extensions.connection] = None
|
# self.connection: Optional[psycopg2.extensions.connection] = None
|
||||||
|
#
|
||||||
def connect(self) -> bool:
|
# def connect(self) -> bool:
|
||||||
"""
|
# """
|
||||||
Connect to PostgreSQL database.
|
# Connect to PostgreSQL database.
|
||||||
|
#
|
||||||
Returns:
|
# Returns:
|
||||||
True if successful, False otherwise
|
# True if successful, False otherwise
|
||||||
"""
|
# """
|
||||||
try:
|
# try:
|
||||||
self.connection = psycopg2.connect(
|
# self.connection = psycopg2.connect(
|
||||||
host=self.config['host'],
|
# host=self.config['host'],
|
||||||
port=self.config['port'],
|
# port=self.config['port'],
|
||||||
database=self.config['database'],
|
# database=self.config['database'],
|
||||||
user=self.config['username'],
|
# user=self.config['username'],
|
||||||
password=self.config['password']
|
# password=self.config['password']
|
||||||
)
|
# )
|
||||||
logger.info("PostgreSQL connection established successfully")
|
# logger.info("PostgreSQL connection established successfully")
|
||||||
return True
|
# return True
|
||||||
except Exception as e:
|
# except Exception as e:
|
||||||
logger.error(f"Failed to connect to PostgreSQL: {e}")
|
# logger.error(f"Failed to connect to PostgreSQL: {e}")
|
||||||
return False
|
# return False
|
||||||
|
#
|
||||||
def disconnect(self):
|
# def disconnect(self):
|
||||||
"""Disconnect from PostgreSQL database."""
|
# """Disconnect from PostgreSQL database."""
|
||||||
if self.connection:
|
# if self.connection:
|
||||||
self.connection.close()
|
# self.connection.close()
|
||||||
self.connection = None
|
# self.connection = None
|
||||||
logger.info("PostgreSQL connection closed")
|
# logger.info("PostgreSQL connection closed")
|
||||||
|
#
|
||||||
def is_connected(self) -> bool:
|
# def is_connected(self) -> bool:
|
||||||
"""
|
# """
|
||||||
Check if database connection is active.
|
# Check if database connection is active.
|
||||||
|
#
|
||||||
Returns:
|
# Returns:
|
||||||
True if connected, False otherwise
|
# True if connected, False otherwise
|
||||||
"""
|
# """
|
||||||
try:
|
# try:
|
||||||
if self.connection and not self.connection.closed:
|
# if self.connection and not self.connection.closed:
|
||||||
cur = self.connection.cursor()
|
# cur = self.connection.cursor()
|
||||||
cur.execute("SELECT 1")
|
# cur.execute("SELECT 1")
|
||||||
cur.fetchone()
|
# cur.fetchone()
|
||||||
cur.close()
|
# cur.close()
|
||||||
return True
|
# return True
|
||||||
except:
|
# except:
|
||||||
pass
|
# pass
|
||||||
return False
|
# return False
|
||||||
|
#
|
||||||
def update_car_info(self, session_id: str, brand: str, model: str, body_type: str) -> bool:
|
# def update_car_info(self, session_id: str, brand: str, model: str, body_type: str) -> bool:
|
||||||
"""
|
# """
|
||||||
Update car information in the database.
|
# Update car information in the database.
|
||||||
|
#
|
||||||
Args:
|
# Args:
|
||||||
session_id: Session identifier
|
# session_id: Session identifier
|
||||||
brand: Car brand
|
# brand: Car brand
|
||||||
model: Car model
|
# model: Car model
|
||||||
body_type: Car body type
|
# body_type: Car body type
|
||||||
|
#
|
||||||
Returns:
|
# Returns:
|
||||||
True if successful, False otherwise
|
# True if successful, False otherwise
|
||||||
"""
|
# """
|
||||||
if not self.is_connected():
|
# if not self.is_connected():
|
||||||
if not self.connect():
|
# if not self.connect():
|
||||||
return False
|
# return False
|
||||||
|
#
|
||||||
try:
|
# try:
|
||||||
cur = self.connection.cursor()
|
# cur = self.connection.cursor()
|
||||||
query = """
|
# query = """
|
||||||
INSERT INTO car_frontal_info (session_id, car_brand, car_model, car_body_type, updated_at)
|
# INSERT INTO car_frontal_info (session_id, car_brand, car_model, car_body_type, updated_at)
|
||||||
VALUES (%s, %s, %s, %s, NOW())
|
# VALUES (%s, %s, %s, %s, NOW())
|
||||||
ON CONFLICT (session_id)
|
# ON CONFLICT (session_id)
|
||||||
DO UPDATE SET
|
# DO UPDATE SET
|
||||||
car_brand = EXCLUDED.car_brand,
|
# car_brand = EXCLUDED.car_brand,
|
||||||
car_model = EXCLUDED.car_model,
|
# car_model = EXCLUDED.car_model,
|
||||||
car_body_type = EXCLUDED.car_body_type,
|
# car_body_type = EXCLUDED.car_body_type,
|
||||||
updated_at = NOW()
|
# updated_at = NOW()
|
||||||
"""
|
# """
|
||||||
cur.execute(query, (session_id, brand, model, body_type))
|
# cur.execute(query, (session_id, brand, model, body_type))
|
||||||
self.connection.commit()
|
# self.connection.commit()
|
||||||
cur.close()
|
# cur.close()
|
||||||
logger.info(f"Updated car info for session {session_id}: {brand} {model} ({body_type})")
|
# logger.info(f"Updated car info for session {session_id}: {brand} {model} ({body_type})")
|
||||||
return True
|
# return True
|
||||||
except Exception as e:
|
# except Exception as e:
|
||||||
logger.error(f"Failed to update car info: {e}")
|
# logger.error(f"Failed to update car info: {e}")
|
||||||
if self.connection:
|
# if self.connection:
|
||||||
self.connection.rollback()
|
# self.connection.rollback()
|
||||||
return False
|
# return False
|
||||||
|
#
|
||||||
def execute_update(self, table: str, key_field: str, key_value: str, fields: Dict[str, str]) -> bool:
|
# def execute_update(self, table: str, key_field: str, key_value: str, fields: Dict[str, str]) -> bool:
|
||||||
"""
|
# """
|
||||||
Execute a dynamic update query on the database.
|
# Execute a dynamic update query on the database.
|
||||||
|
#
|
||||||
Args:
|
# Args:
|
||||||
table: Table name
|
# table: Table name
|
||||||
key_field: Primary key field name
|
# key_field: Primary key field name
|
||||||
key_value: Primary key value
|
# key_value: Primary key value
|
||||||
fields: Dictionary of fields to update
|
# fields: Dictionary of fields to update
|
||||||
|
#
|
||||||
Returns:
|
# Returns:
|
||||||
True if successful, False otherwise
|
# True if successful, False otherwise
|
||||||
"""
|
# """
|
||||||
if not self.is_connected():
|
# if not self.is_connected():
|
||||||
if not self.connect():
|
# if not self.connect():
|
||||||
return False
|
# return False
|
||||||
|
#
|
||||||
try:
|
# try:
|
||||||
cur = self.connection.cursor()
|
# cur = self.connection.cursor()
|
||||||
|
#
|
||||||
# Build the UPDATE query dynamically
|
# # Build the UPDATE query dynamically
|
||||||
set_clauses = []
|
# set_clauses = []
|
||||||
values = []
|
# values = []
|
||||||
|
#
|
||||||
for field, value in fields.items():
|
# for field, value in fields.items():
|
||||||
if value == "NOW()":
|
# if value == "NOW()":
|
||||||
set_clauses.append(f"{field} = NOW()")
|
# set_clauses.append(f"{field} = NOW()")
|
||||||
else:
|
# else:
|
||||||
set_clauses.append(f"{field} = %s")
|
# set_clauses.append(f"{field} = %s")
|
||||||
values.append(value)
|
# values.append(value)
|
||||||
|
#
|
||||||
# Add schema prefix if table doesn't already have it
|
# # Add schema prefix if table doesn't already have it
|
||||||
full_table_name = table if '.' in table else f"gas_station_1.{table}"
|
# full_table_name = table if '.' in table else f"gas_station_1.{table}"
|
||||||
|
#
|
||||||
query = f"""
|
# query = f"""
|
||||||
INSERT INTO {full_table_name} ({key_field}, {', '.join(fields.keys())})
|
# INSERT INTO {full_table_name} ({key_field}, {', '.join(fields.keys())})
|
||||||
VALUES (%s, {', '.join(['%s'] * len(fields))})
|
# VALUES (%s, {', '.join(['%s'] * len(fields))})
|
||||||
ON CONFLICT ({key_field})
|
# ON CONFLICT ({key_field})
|
||||||
DO UPDATE SET {', '.join(set_clauses)}
|
# DO UPDATE SET {', '.join(set_clauses)}
|
||||||
"""
|
# """
|
||||||
|
#
|
||||||
# Add key_value to the beginning of values list
|
# # Add key_value to the beginning of values list
|
||||||
all_values = [key_value] + list(fields.values()) + values
|
# all_values = [key_value] + list(fields.values()) + values
|
||||||
|
#
|
||||||
cur.execute(query, all_values)
|
# cur.execute(query, all_values)
|
||||||
self.connection.commit()
|
# self.connection.commit()
|
||||||
cur.close()
|
# cur.close()
|
||||||
logger.info(f"Updated {table} for {key_field}={key_value}")
|
# logger.info(f"Updated {table} for {key_field}={key_value}")
|
||||||
return True
|
# return True
|
||||||
except Exception as e:
|
# except Exception as e:
|
||||||
logger.error(f"Failed to execute update on {table}: {e}")
|
# logger.error(f"Failed to execute update on {table}: {e}")
|
||||||
if self.connection:
|
# if self.connection:
|
||||||
self.connection.rollback()
|
# self.connection.rollback()
|
||||||
return False
|
# return False
|
||||||
|
#
|
||||||
def create_car_frontal_info_table(self) -> bool:
|
# def create_car_frontal_info_table(self) -> bool:
|
||||||
"""
|
# """
|
||||||
Create the car_frontal_info table in gas_station_1 schema if it doesn't exist.
|
# Create the car_frontal_info table in gas_station_1 schema if it doesn't exist.
|
||||||
|
#
|
||||||
Returns:
|
# Returns:
|
||||||
True if successful, False otherwise
|
# True if successful, False otherwise
|
||||||
"""
|
# """
|
||||||
if not self.is_connected():
|
# if not self.is_connected():
|
||||||
if not self.connect():
|
# if not self.connect():
|
||||||
return False
|
# return False
|
||||||
|
#
|
||||||
try:
|
# try:
|
||||||
# Since the database already exists, just verify connection
|
# # Since the database already exists, just verify connection
|
||||||
cur = self.connection.cursor()
|
# cur = self.connection.cursor()
|
||||||
|
#
|
||||||
# Simple verification that the table exists
|
# # Simple verification that the table exists
|
||||||
cur.execute("""
|
# cur.execute("""
|
||||||
SELECT EXISTS (
|
# SELECT EXISTS (
|
||||||
SELECT FROM information_schema.tables
|
# SELECT FROM information_schema.tables
|
||||||
WHERE table_schema = 'gas_station_1'
|
# WHERE table_schema = 'gas_station_1'
|
||||||
AND table_name = 'car_frontal_info'
|
# AND table_name = 'car_frontal_info'
|
||||||
)
|
# )
|
||||||
""")
|
# """)
|
||||||
|
#
|
||||||
table_exists = cur.fetchone()[0]
|
# table_exists = cur.fetchone()[0]
|
||||||
cur.close()
|
# cur.close()
|
||||||
|
#
|
||||||
if table_exists:
|
# if table_exists:
|
||||||
logger.info("Verified car_frontal_info table exists")
|
# logger.info("Verified car_frontal_info table exists")
|
||||||
return True
|
# return True
|
||||||
else:
|
# else:
|
||||||
logger.error("car_frontal_info table does not exist in the database")
|
# logger.error("car_frontal_info table does not exist in the database")
|
||||||
return False
|
# return False
|
||||||
|
#
|
||||||
except Exception as e:
|
# except Exception as e:
|
||||||
logger.error(f"Failed to create car_frontal_info table: {e}")
|
# logger.error(f"Failed to create car_frontal_info table: {e}")
|
||||||
if self.connection:
|
# if self.connection:
|
||||||
self.connection.rollback()
|
# self.connection.rollback()
|
||||||
return False
|
# return False
|
||||||
|
#
|
||||||
def insert_initial_detection(self, display_id: str, captured_timestamp: str, session_id: str = None) -> str:
|
# def insert_initial_detection(self, display_id: str, captured_timestamp: str, session_id: str = None) -> str:
|
||||||
"""
|
# """
|
||||||
Insert initial detection record and return the session_id.
|
# Insert initial detection record and return the session_id.
|
||||||
|
#
|
||||||
Args:
|
# Args:
|
||||||
display_id: Display identifier
|
# display_id: Display identifier
|
||||||
captured_timestamp: Timestamp of the detection
|
# captured_timestamp: Timestamp of the detection
|
||||||
session_id: Optional session ID, generates one if not provided
|
# session_id: Optional session ID, generates one if not provided
|
||||||
|
#
|
||||||
Returns:
|
# Returns:
|
||||||
Session ID string or None on error
|
# Session ID string or None on error
|
||||||
"""
|
# """
|
||||||
if not self.is_connected():
|
# if not self.is_connected():
|
||||||
if not self.connect():
|
# if not self.connect():
|
||||||
return None
|
# return None
|
||||||
|
#
|
||||||
# Generate session_id if not provided
|
# # Generate session_id if not provided
|
||||||
if not session_id:
|
# if not session_id:
|
||||||
session_id = str(uuid.uuid4())
|
# session_id = str(uuid.uuid4())
|
||||||
|
#
|
||||||
try:
|
# try:
|
||||||
# Ensure table exists
|
# # Ensure table exists
|
||||||
if not self.create_car_frontal_info_table():
|
# if not self.create_car_frontal_info_table():
|
||||||
logger.error("Failed to create/verify table before insertion")
|
# logger.error("Failed to create/verify table before insertion")
|
||||||
return None
|
# return None
|
||||||
|
#
|
||||||
cur = self.connection.cursor()
|
# cur = self.connection.cursor()
|
||||||
insert_query = """
|
# insert_query = """
|
||||||
INSERT INTO gas_station_1.car_frontal_info
|
# INSERT INTO gas_station_1.car_frontal_info
|
||||||
(display_id, captured_timestamp, session_id, license_character, license_type, car_brand, car_model, car_body_type)
|
# (display_id, captured_timestamp, session_id, license_character, license_type, car_brand, car_model, car_body_type)
|
||||||
VALUES (%s, %s, %s, NULL, 'No model available', NULL, NULL, NULL)
|
# VALUES (%s, %s, %s, NULL, 'No model available', NULL, NULL, NULL)
|
||||||
ON CONFLICT (session_id) DO NOTHING
|
# ON CONFLICT (session_id) DO NOTHING
|
||||||
"""
|
# """
|
||||||
|
#
|
||||||
cur.execute(insert_query, (display_id, captured_timestamp, session_id))
|
# cur.execute(insert_query, (display_id, captured_timestamp, session_id))
|
||||||
self.connection.commit()
|
# self.connection.commit()
|
||||||
cur.close()
|
# cur.close()
|
||||||
logger.info(f"Inserted initial detection record with session_id: {session_id}")
|
# logger.info(f"Inserted initial detection record with session_id: {session_id}")
|
||||||
return session_id
|
# return session_id
|
||||||
|
#
|
||||||
except Exception as e:
|
# except Exception as e:
|
||||||
logger.error(f"Failed to insert initial detection record: {e}")
|
# logger.error(f"Failed to insert initial detection record: {e}")
|
||||||
if self.connection:
|
# if self.connection:
|
||||||
self.connection.rollback()
|
# self.connection.rollback()
|
||||||
return None
|
# return None
|
||||||
|
#
|
||||||
def get_session_info(self, session_id: str) -> Optional[Dict[str, Any]]:
|
# def get_session_info(self, session_id: str) -> Optional[Dict[str, Any]]:
|
||||||
"""
|
# """
|
||||||
Get session information from the database.
|
# Get session information from the database.
|
||||||
|
#
|
||||||
Args:
|
# Args:
|
||||||
session_id: Session identifier
|
# session_id: Session identifier
|
||||||
|
#
|
||||||
Returns:
|
# Returns:
|
||||||
Dictionary with session data or None if not found
|
# Dictionary with session data or None if not found
|
||||||
"""
|
# """
|
||||||
if not self.is_connected():
|
# if not self.is_connected():
|
||||||
if not self.connect():
|
# if not self.connect():
|
||||||
return None
|
# return None
|
||||||
|
#
|
||||||
try:
|
# try:
|
||||||
cur = self.connection.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
|
# cur = self.connection.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
|
||||||
query = "SELECT * FROM gas_station_1.car_frontal_info WHERE session_id = %s"
|
# query = "SELECT * FROM gas_station_1.car_frontal_info WHERE session_id = %s"
|
||||||
cur.execute(query, (session_id,))
|
# cur.execute(query, (session_id,))
|
||||||
result = cur.fetchone()
|
# result = cur.fetchone()
|
||||||
cur.close()
|
# cur.close()
|
||||||
|
#
|
||||||
if result:
|
# if result:
|
||||||
return dict(result)
|
# return dict(result)
|
||||||
else:
|
# else:
|
||||||
logger.debug(f"No session info found for session_id: {session_id}")
|
# logger.debug(f"No session info found for session_id: {session_id}")
|
||||||
return None
|
# return None
|
||||||
|
#
|
||||||
except Exception as e:
|
# except Exception as e:
|
||||||
logger.error(f"Failed to get session info: {e}")
|
# logger.error(f"Failed to get session info: {e}")
|
||||||
return None
|
# return None
|
||||||
|
#
|
||||||
def delete_session(self, session_id: str) -> bool:
|
# def delete_session(self, session_id: str) -> bool:
|
||||||
"""
|
# """
|
||||||
Delete session record from the database.
|
# Delete session record from the database.
|
||||||
|
#
|
||||||
Args:
|
# Args:
|
||||||
session_id: Session identifier
|
# session_id: Session identifier
|
||||||
|
#
|
||||||
Returns:
|
# Returns:
|
||||||
True if successful, False otherwise
|
# True if successful, False otherwise
|
||||||
"""
|
# """
|
||||||
if not self.is_connected():
|
# if not self.is_connected():
|
||||||
if not self.connect():
|
# if not self.connect():
|
||||||
return False
|
# return False
|
||||||
|
#
|
||||||
try:
|
# try:
|
||||||
cur = self.connection.cursor()
|
# cur = self.connection.cursor()
|
||||||
query = "DELETE FROM gas_station_1.car_frontal_info WHERE session_id = %s"
|
# query = "DELETE FROM gas_station_1.car_frontal_info WHERE session_id = %s"
|
||||||
cur.execute(query, (session_id,))
|
# cur.execute(query, (session_id,))
|
||||||
rows_affected = cur.rowcount
|
# rows_affected = cur.rowcount
|
||||||
self.connection.commit()
|
# self.connection.commit()
|
||||||
cur.close()
|
# cur.close()
|
||||||
|
#
|
||||||
if rows_affected > 0:
|
# if rows_affected > 0:
|
||||||
logger.info(f"Deleted session record: {session_id}")
|
# logger.info(f"Deleted session record: {session_id}")
|
||||||
return True
|
# return True
|
||||||
else:
|
# else:
|
||||||
logger.warning(f"No session record found to delete: {session_id}")
|
# logger.warning(f"No session record found to delete: {session_id}")
|
||||||
return False
|
# return False
|
||||||
|
#
|
||||||
except Exception as e:
|
# except Exception as e:
|
||||||
logger.error(f"Failed to delete session: {e}")
|
# logger.error(f"Failed to delete session: {e}")
|
||||||
if self.connection:
|
# if self.connection:
|
||||||
self.connection.rollback()
|
# self.connection.rollback()
|
||||||
return False
|
# return False
|
||||||
|
#
|
||||||
def get_statistics(self) -> Dict[str, Any]:
|
# def get_statistics(self) -> Dict[str, Any]:
|
||||||
"""
|
# """
|
||||||
Get database statistics.
|
# Get database statistics.
|
||||||
|
#
|
||||||
Returns:
|
# Returns:
|
||||||
Dictionary with database statistics
|
# Dictionary with database statistics
|
||||||
"""
|
# """
|
||||||
stats = {
|
# stats = {
|
||||||
'connected': self.is_connected(),
|
# 'connected': self.is_connected(),
|
||||||
'host': self.config.get('host', 'unknown'),
|
# 'host': self.config.get('host', 'unknown'),
|
||||||
'port': self.config.get('port', 'unknown'),
|
# 'port': self.config.get('port', 'unknown'),
|
||||||
'database': self.config.get('database', 'unknown')
|
# 'database': self.config.get('database', 'unknown')
|
||||||
}
|
# }
|
||||||
|
#
|
||||||
if self.is_connected():
|
# if self.is_connected():
|
||||||
try:
|
# try:
|
||||||
cur = self.connection.cursor()
|
# cur = self.connection.cursor()
|
||||||
|
#
|
||||||
# Get table record count
|
# # Get table record count
|
||||||
cur.execute("SELECT COUNT(*) FROM gas_station_1.car_frontal_info")
|
# cur.execute("SELECT COUNT(*) FROM gas_station_1.car_frontal_info")
|
||||||
stats['total_records'] = cur.fetchone()[0]
|
# stats['total_records'] = cur.fetchone()[0]
|
||||||
|
#
|
||||||
# Get recent records count (last hour)
|
# # Get recent records count (last hour)
|
||||||
cur.execute("""
|
# cur.execute("""
|
||||||
SELECT COUNT(*) FROM gas_station_1.car_frontal_info
|
# SELECT COUNT(*) FROM gas_station_1.car_frontal_info
|
||||||
WHERE created_at > NOW() - INTERVAL '1 hour'
|
# WHERE created_at > NOW() - INTERVAL '1 hour'
|
||||||
""")
|
# """)
|
||||||
stats['recent_records'] = cur.fetchone()[0]
|
# stats['recent_records'] = cur.fetchone()[0]
|
||||||
|
#
|
||||||
cur.close()
|
# cur.close()
|
||||||
except Exception as e:
|
# except Exception as e:
|
||||||
logger.warning(f"Failed to get database statistics: {e}")
|
# logger.warning(f"Failed to get database statistics: {e}")
|
||||||
stats['error'] = str(e)
|
# stats['error'] = str(e)
|
||||||
|
#
|
||||||
return stats
|
# return stats
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue