341 lines
No EOL
14 KiB
Python
341 lines
No EOL
14 KiB
Python
"""
|
|
Field mapping and template resolution for dynamic database operations.
|
|
|
|
This module provides functionality for resolving field mapping templates
|
|
that reference branch results and context variables for database operations.
|
|
"""
|
|
|
|
import re
|
|
import logging
|
|
from typing import Dict, Any, Optional, List, Union
|
|
|
|
from ..core.exceptions import FieldMappingError
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class FieldMapper:
|
|
"""
|
|
Field mapping resolver for dynamic template resolution.
|
|
|
|
This class handles the resolution of field mapping templates that can reference:
|
|
- Branch results (e.g., {car_brand_cls_v1.brand})
|
|
- Context variables (e.g., {session_id})
|
|
- Nested field lookups with fallback strategies
|
|
"""
|
|
|
|
def __init__(self):
|
|
"""Initialize field mapper."""
|
|
pass
|
|
|
|
def _extract_branch_references(self, template: str) -> List[str]:
|
|
"""Extract branch references from template string."""
|
|
# Match patterns like {model_id.field_name}
|
|
branch_refs = re.findall(r'\{([^}]+\.[^}]+)\}', template)
|
|
return branch_refs
|
|
|
|
def _resolve_simple_template(self, template: str, context: Dict[str, Any]) -> str:
|
|
"""Resolve simple template without branch references."""
|
|
try:
|
|
result = template.format(**context)
|
|
logger.debug(f"Simple template resolved: '{template}' -> '{result}'")
|
|
return result
|
|
except KeyError as e:
|
|
logger.warning(f"Could not resolve context variable in simple template: {e}")
|
|
return template
|
|
|
|
def _find_fallback_value(self,
|
|
branch_data: Dict[str, Any],
|
|
field_name: str,
|
|
model_id: str) -> Optional[str]:
|
|
"""Find fallback value using various strategies."""
|
|
if not isinstance(branch_data, dict):
|
|
logger.error(f"Branch data for '{model_id}' is not a dictionary: {type(branch_data)}")
|
|
return None
|
|
|
|
# First, try the exact field name
|
|
if field_name in branch_data:
|
|
return branch_data[field_name]
|
|
|
|
# Then try 'class' field as fallback
|
|
if 'class' in branch_data:
|
|
fallback_value = branch_data['class']
|
|
logger.info(f"Using 'class' field as fallback for '{field_name}': '{fallback_value}'")
|
|
return fallback_value
|
|
|
|
# For brand models, check if the class name exists as a key
|
|
if field_name == 'brand' and branch_data.get('class') in branch_data:
|
|
fallback_value = branch_data[branch_data['class']]
|
|
logger.info(f"Found brand value using class name as key: '{fallback_value}'")
|
|
return fallback_value
|
|
|
|
# For body_type models, check if the class name exists as a key
|
|
if field_name == 'body_type' and branch_data.get('class') in branch_data:
|
|
fallback_value = branch_data[branch_data['class']]
|
|
logger.info(f"Found body_type value using class name as key: '{fallback_value}'")
|
|
return fallback_value
|
|
|
|
# Additional fallback strategies for common field mappings
|
|
field_mappings = {
|
|
'brand': ['car_brand', 'brand_name', 'detected_brand'],
|
|
'body_type': ['car_body_type', 'bodytype', 'body', 'car_type'],
|
|
'model': ['car_model', 'model_name', 'detected_model'],
|
|
'color': ['car_color', 'color_name', 'detected_color']
|
|
}
|
|
|
|
if field_name in field_mappings:
|
|
for alternative in field_mappings[field_name]:
|
|
if alternative in branch_data:
|
|
fallback_value = branch_data[alternative]
|
|
logger.info(f"Found '{field_name}' value using alternative field '{alternative}': '{fallback_value}'")
|
|
return fallback_value
|
|
|
|
return None
|
|
|
|
def _resolve_branch_reference(self,
|
|
ref: str,
|
|
branch_results: Dict[str, Any]) -> Optional[str]:
|
|
"""Resolve a single branch reference."""
|
|
try:
|
|
model_id, field_name = ref.split('.', 1)
|
|
logger.debug(f"Processing branch reference: model_id='{model_id}', field_name='{field_name}'")
|
|
|
|
if model_id not in branch_results:
|
|
logger.warning(f"Branch '{model_id}' not found in results. Available branches: {list(branch_results.keys())}")
|
|
return None
|
|
|
|
branch_data = branch_results[model_id]
|
|
logger.debug(f"Branch '{model_id}' data: {branch_data}")
|
|
|
|
if field_name in branch_data:
|
|
field_value = branch_data[field_name]
|
|
logger.info(f"✅ Resolved {ref} to '{field_value}'")
|
|
return str(field_value)
|
|
else:
|
|
logger.warning(f"Field '{field_name}' not found in branch '{model_id}' results.")
|
|
logger.debug(f"Available fields in '{model_id}': {list(branch_data.keys()) if isinstance(branch_data, dict) else 'N/A'}")
|
|
|
|
# Try fallback strategies
|
|
fallback_value = self._find_fallback_value(branch_data, field_name, model_id)
|
|
|
|
if fallback_value is not None:
|
|
logger.info(f"✅ Resolved {ref} to '{fallback_value}' (using fallback)")
|
|
return str(fallback_value)
|
|
else:
|
|
logger.error(f"No suitable field found for '{field_name}' in branch '{model_id}'")
|
|
logger.debug(f"Branch data structure: {branch_data}")
|
|
return None
|
|
|
|
except ValueError as e:
|
|
logger.error(f"Invalid branch reference format: {ref}")
|
|
return None
|
|
except Exception as e:
|
|
logger.error(f"Error resolving branch reference '{ref}': {e}")
|
|
return None
|
|
|
|
def resolve_field_mapping(self,
|
|
value_template: str,
|
|
branch_results: Dict[str, Any],
|
|
action_context: Dict[str, Any]) -> Optional[str]:
|
|
"""
|
|
Resolve field mapping templates like {car_brand_cls_v1.brand}.
|
|
|
|
Args:
|
|
value_template: Template string with placeholders
|
|
branch_results: Dictionary of branch execution results
|
|
action_context: Context variables for template resolution
|
|
|
|
Returns:
|
|
Resolved string value or None if resolution failed
|
|
"""
|
|
try:
|
|
logger.debug(f"Resolving field mapping: '{value_template}'")
|
|
logger.debug(f"Available branch results: {list(branch_results.keys())}")
|
|
|
|
# Handle simple context variables first (non-branch references)
|
|
if '.' not in value_template:
|
|
result = self._resolve_simple_template(value_template, action_context)
|
|
return result
|
|
|
|
# Handle branch result references like {model_id.field}
|
|
branch_refs = self._extract_branch_references(value_template)
|
|
logger.debug(f"Found branch references: {branch_refs}")
|
|
|
|
resolved_template = value_template
|
|
|
|
for ref in branch_refs:
|
|
resolved_value = self._resolve_branch_reference(ref, branch_results)
|
|
|
|
if resolved_value is not None:
|
|
resolved_template = resolved_template.replace(f'{{{ref}}}', resolved_value)
|
|
else:
|
|
logger.error(f"Failed to resolve branch reference: {ref}")
|
|
return None
|
|
|
|
# Format any remaining simple variables
|
|
try:
|
|
final_value = resolved_template.format(**action_context)
|
|
logger.debug(f"Final resolved value: '{final_value}'")
|
|
return final_value
|
|
except KeyError as e:
|
|
logger.warning(f"Could not resolve context variable in template: {e}")
|
|
return resolved_template
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error resolving field mapping '{value_template}': {e}")
|
|
return None
|
|
|
|
def resolve_multiple_fields(self,
|
|
field_templates: Dict[str, str],
|
|
branch_results: Dict[str, Any],
|
|
action_context: Dict[str, Any]) -> Dict[str, str]:
|
|
"""
|
|
Resolve multiple field mappings at once.
|
|
|
|
Args:
|
|
field_templates: Dictionary mapping field names to templates
|
|
branch_results: Dictionary of branch execution results
|
|
action_context: Context variables for template resolution
|
|
|
|
Returns:
|
|
Dictionary mapping field names to resolved values
|
|
"""
|
|
resolved_fields = {}
|
|
|
|
for field_name, template in field_templates.items():
|
|
try:
|
|
resolved_value = self.resolve_field_mapping(template, branch_results, action_context)
|
|
if resolved_value is not None:
|
|
resolved_fields[field_name] = resolved_value
|
|
logger.debug(f"Successfully resolved field '{field_name}': {resolved_value}")
|
|
else:
|
|
logger.warning(f"Failed to resolve field '{field_name}' with template: {template}")
|
|
except Exception as e:
|
|
logger.error(f"Error resolving field '{field_name}' with template '{template}': {e}")
|
|
|
|
return resolved_fields
|
|
|
|
def validate_template(self, template: str) -> Dict[str, Any]:
|
|
"""
|
|
Validate a field mapping template and return analysis.
|
|
|
|
Args:
|
|
template: Template string to validate
|
|
|
|
Returns:
|
|
Dictionary with validation results and analysis
|
|
"""
|
|
analysis = {
|
|
"valid": True,
|
|
"errors": [],
|
|
"warnings": [],
|
|
"branch_references": [],
|
|
"context_references": [],
|
|
"has_branch_refs": False,
|
|
"has_context_refs": False
|
|
}
|
|
|
|
try:
|
|
# Extract all placeholders
|
|
all_refs = re.findall(r'\{([^}]+)\}', template)
|
|
|
|
for ref in all_refs:
|
|
if '.' in ref:
|
|
# This is a branch reference
|
|
analysis["branch_references"].append(ref)
|
|
analysis["has_branch_refs"] = True
|
|
|
|
# Validate format
|
|
parts = ref.split('.')
|
|
if len(parts) != 2:
|
|
analysis["errors"].append(f"Invalid branch reference format: {ref}")
|
|
analysis["valid"] = False
|
|
elif not parts[0] or not parts[1]:
|
|
analysis["errors"].append(f"Empty model_id or field_name in reference: {ref}")
|
|
analysis["valid"] = False
|
|
else:
|
|
# This is a context reference
|
|
analysis["context_references"].append(ref)
|
|
analysis["has_context_refs"] = True
|
|
|
|
# Check for common issues
|
|
if analysis["has_branch_refs"] and not analysis["has_context_refs"]:
|
|
analysis["warnings"].append("Template only uses branch references, consider adding context info")
|
|
|
|
if not analysis["branch_references"] and not analysis["context_references"]:
|
|
analysis["warnings"].append("Template has no placeholders - it's a static value")
|
|
|
|
except Exception as e:
|
|
analysis["valid"] = False
|
|
analysis["errors"].append(f"Template analysis failed: {e}")
|
|
|
|
return analysis
|
|
|
|
|
|
# Global field mapper instance
|
|
field_mapper = FieldMapper()
|
|
|
|
|
|
# ===== CONVENIENCE FUNCTIONS =====
|
|
# These provide the same interface as the original functions in pympta.py
|
|
|
|
def resolve_field_mapping(value_template: str,
|
|
branch_results: Dict[str, Any],
|
|
action_context: Dict[str, Any]) -> Optional[str]:
|
|
"""Resolve field mapping templates like {car_brand_cls_v1.brand}."""
|
|
return field_mapper.resolve_field_mapping(value_template, branch_results, action_context)
|
|
|
|
|
|
def resolve_multiple_field_mappings(field_templates: Dict[str, str],
|
|
branch_results: Dict[str, Any],
|
|
action_context: Dict[str, Any]) -> Dict[str, str]:
|
|
"""Resolve multiple field mappings at once."""
|
|
return field_mapper.resolve_multiple_fields(field_templates, branch_results, action_context)
|
|
|
|
|
|
def validate_field_mapping_template(template: str) -> Dict[str, Any]:
|
|
"""Validate a field mapping template and return analysis."""
|
|
return field_mapper.validate_template(template)
|
|
|
|
|
|
def get_available_field_mappings(branch_results: Dict[str, Any]) -> Dict[str, List[str]]:
|
|
"""
|
|
Get available field mappings from branch results.
|
|
|
|
Args:
|
|
branch_results: Dictionary of branch execution results
|
|
|
|
Returns:
|
|
Dictionary mapping model IDs to available field names
|
|
"""
|
|
available_mappings = {}
|
|
|
|
for model_id, branch_data in branch_results.items():
|
|
if isinstance(branch_data, dict):
|
|
available_mappings[model_id] = list(branch_data.keys())
|
|
else:
|
|
logger.warning(f"Branch '{model_id}' data is not a dictionary: {type(branch_data)}")
|
|
available_mappings[model_id] = []
|
|
|
|
return available_mappings
|
|
|
|
|
|
def create_field_mapping_examples(branch_results: Dict[str, Any]) -> List[str]:
|
|
"""
|
|
Create example field mapping templates based on available branch results.
|
|
|
|
Args:
|
|
branch_results: Dictionary of branch execution results
|
|
|
|
Returns:
|
|
List of example template strings
|
|
"""
|
|
examples = []
|
|
|
|
for model_id, branch_data in branch_results.items():
|
|
if isinstance(branch_data, dict):
|
|
for field_name in branch_data.keys():
|
|
example = f"{{{model_id}.{field_name}}}"
|
|
examples.append(example)
|
|
|
|
return examples |