Compare commits

..

2 commits

Author SHA1 Message Date
Pongsatorn
39394caa8e Finish 2025-08-29 00:57:32 +07:00
Pongsatorn
85b49ddf0f fix postgresql 2025-08-29 00:34:16 +07:00
4 changed files with 1717 additions and 90 deletions

177
app.py
View file

@ -600,22 +600,7 @@ async def detect(websocket: WebSocket):
"timestamp": time.time() "timestamp": time.time()
} }
# Cache the detection dict for lightweight mode reuse # Note: Will cache detection_dict after branch processing completes
branch_results = detection_result.get("branch_results", {})
cached_dict = {
"carModel": branch_results.get("car_brand_cls_v1", {}).get("model"),
"carBrand": branch_results.get("car_brand_cls_v1", {}).get("brand"),
"carYear": None,
"bodyType": branch_results.get("car_bodytype_cls_v1", {}).get("body_type"),
"licensePlateText": None,
"licensePlateConfidence": None
}
pipeline_state["cached_detection_dict"] = cached_dict
# Log what was cached for debugging
logger.info(f"💾 Camera {camera_id}: CACHING DETECTION DICT:")
logger.info(f"💾 Camera {camera_id}: - Full branch_results: {branch_results}")
logger.info(f"💾 Camera {camera_id}: - Cached dict: {cached_dict}")
# Store the stable track ID for lightweight monitoring # Store the stable track ID for lightweight monitoring
track_id = detection_result.get("track_id") or detection_result.get("id") track_id = detection_result.get("track_id") or detection_result.get("id")
@ -768,25 +753,28 @@ async def detect(websocket: WebSocket):
detection_dict = None detection_dict = None
logger.debug(f"📤 SENDING 'NONE' - send_detections mode (no car) for camera {camera_id}") logger.debug(f"📤 SENDING 'NONE' - send_detections mode (no car) for camera {camera_id}")
else: else:
# Car detected - check if we have sessionId to determine what to send # Car detected in send_detections mode - ALWAYS send empty dict to trigger backend sessionId
if backend_session_id: # Purpose: Tell backend "car is here, please create sessionId"
# Have sessionId - send full detection_dict for database updates
detection_dict = {
"carModel": None,
"carBrand": None,
"carYear": None,
"bodyType": None,
"licensePlateText": None,
"licensePlateConfidence": None
}
logger.info(f"📤 SENDING FULL DETECTION_DICT - send_detections mode with sessionId {backend_session_id} (conf={detection_result.get('confidence', 0):.3f}) for camera {camera_id}")
else:
# No sessionId - send empty detection_dict to trigger backend to generate sessionId
detection_dict = {} detection_dict = {}
logger.info(f"📤 SENDING EMPTY DETECTION_DICT - send_detections mode without sessionId, triggering backend to generate sessionId (conf={detection_result.get('confidence', 0):.3f}) for camera {camera_id}") logger.info(f"📤 SENDING EMPTY DETECTION_DICT - send_detections mode, requesting backend to create sessionId (conf={detection_result.get('confidence', 0):.3f}) for camera {camera_id}")
if backend_session_id:
logger.debug(f"🔄 Camera {camera_id}: Note - sessionId {backend_session_id} exists but still in send_detections mode (transition pending)")
elif detection_result.get("class") == "none": elif detection_result.get("class") == "none":
# "None" detection in other modes (lightweight) - car left or absent for 3 frames # "None" detection - skip override if lightweight mode already made the decision
if current_mode == "lightweight":
# Lightweight mode already set detection_result correctly, don't override
logger.debug(f"🪶 Camera {camera_id}: Lightweight mode - respecting detection_result decision")
if detection_result is None:
detection_dict = None
logger.info(f"📤 LIGHTWEIGHT SENDING 'NONE' - Reset conditions met for camera {camera_id}")
else:
# detection_result should be the cached_detection_dict
detection_dict = detection_result
logger.info(f"💾 LIGHTWEIGHT SENDING CACHED - Maintaining session for camera {camera_id}")
else:
# Other modes - send null to clear session
detection_dict = None detection_dict = None
logger.info(f"📤 SENDING 'NONE' (detection: null) - Car absent, expecting backend to clear session for camera {camera_id}") logger.info(f"📤 SENDING 'NONE' (detection: null) - Car absent, expecting backend to clear session for camera {camera_id}")
elif detection_result.get("cached_mode", False): elif detection_result.get("cached_mode", False):
@ -800,10 +788,11 @@ async def detect(websocket: WebSocket):
"licensePlateText": None, "licensePlateText": None,
"licensePlateConfidence": None "licensePlateConfidence": None
} }
logger.info(f"💾 Camera {camera_id}: SENDING CACHED DETECTION_DICT to backend:") elif detection_result and "carBrand" in detection_result:
logger.info(f"💾 Camera {camera_id}: - Cached branch_results: {cached_dict}") # Lightweight mode - detection_result IS the cached detection dict
logger.info(f"💾 Camera {camera_id}: - Final detection_dict: {detection_dict}") detection_dict = detection_result
logger.info(f"💾 Camera {camera_id}: - Track ID: {detection_result.get('track_id')} (lightweight mode)") logger.info(f"💾 Camera {camera_id}: LIGHTWEIGHT MODE - using detection_result as detection_dict:")
logger.info(f"💾 Camera {camera_id}: - detection_dict: {detection_dict}")
else: else:
# Valid detection - convert to backend format (will be populated by branch processing) # Valid detection - convert to backend format (will be populated by branch processing)
detection_dict = { detection_dict = {
@ -855,6 +844,13 @@ async def detect(websocket: WebSocket):
logger.debug(f"Processing branch results: {branch_results}") logger.debug(f"Processing branch results: {branch_results}")
process_branch_results(branch_results) process_branch_results(branch_results)
logger.info(f"Detection payload after branch processing: {detection_dict}") logger.info(f"Detection payload after branch processing: {detection_dict}")
# Cache the detection_dict for lightweight mode (after branch processing completes)
if current_mode == "full_pipeline":
pipeline_state = get_or_init_session_pipeline_state(camera_id)
pipeline_state["cached_detection_dict"] = detection_dict.copy()
logger.info(f"💾 Camera {camera_id}: CACHED DETECTION DICT after branch processing: {detection_dict}")
else: else:
logger.debug("No branch results found in detection result") logger.debug("No branch results found in detection result")
@ -870,17 +866,17 @@ async def detect(websocket: WebSocket):
} }
} }
# Add session ID to detection data (NOT for "none" detections - backend uses absence of sessionId to know to clear the session) # SessionId should NEVER be sent from worker to backend - it's uni-directional (backend -> worker only)
if session_id and detection_result.get("class") != "none": # Backend manages sessionIds independently based on detection content
detection_data["sessionId"] = session_id logger.debug(f"TX message prepared (no sessionId) - detection_dict type: {type(detection_dict)}")
logger.debug(f"Including sessionId {session_id} in WebSocket message")
elif detection_result.get("class") == "none":
logger.debug(f"NOT including sessionId in 'none' detection - backend should clear session")
# Log detection details # Log detection details
if detection_result.get("class") != "none": if detection_result and "class" in detection_result and detection_result.get("class") != "none":
confidence = detection_result.get("confidence", 0.0) confidence = detection_result.get("confidence", 0.0)
logger.info(f"Camera {camera_id}: Detected {detection_result['class']} with confidence {confidence:.2f} using model {stream['modelName']}") logger.info(f"Camera {camera_id}: Detected {detection_result['class']} with confidence {confidence:.2f} using model {stream['modelName']}")
elif detection_result and "carBrand" in detection_result:
# Lightweight mode cached detection dict - different format
logger.info(f"Camera {camera_id}: Using cached detection dict (lightweight mode) - {detection_result.get('carBrand', 'Unknown')} {detection_result.get('bodyType', '')}")
# Send detection data to backend (session gating handled above in processing logic) # Send detection data to backend (session gating handled above in processing logic)
logger.debug(f"📤 SENDING TO BACKEND for camera {camera_id}: {json.dumps(detection_data, indent=2)}") logger.debug(f"📤 SENDING TO BACKEND for camera {camera_id}: {json.dumps(detection_data, indent=2)}")
@ -888,6 +884,16 @@ async def detect(websocket: WebSocket):
ws_logger.info(f"TX -> {json.dumps(detection_data, separators=(',', ':'))}") ws_logger.info(f"TX -> {json.dumps(detection_data, separators=(',', ':'))}")
await websocket.send_json(detection_data) await websocket.send_json(detection_data)
logger.debug(f"Sent detection data to client for camera {camera_id}") logger.debug(f"Sent detection data to client for camera {camera_id}")
# Cache the detection data for potential resubscriptions (only if not null detection)
if detection_dict is not None and detection_result.get("class") != "none":
cached_detections[camera_id] = detection_data.copy()
logger.debug(f"Cached detection for camera {camera_id}: {detection_dict}")
else:
# Don't cache null/none detections - let them reset properly
cached_detections.pop(camera_id, None)
logger.debug(f"Not caching null/none detection for camera {camera_id}")
except RuntimeError as e: except RuntimeError as e:
if "websocket.close" in str(e): if "websocket.close" in str(e):
logger.warning(f"WebSocket connection closed - cannot send detection data for camera {camera_id}") logger.warning(f"WebSocket connection closed - cannot send detection data for camera {camera_id}")
@ -895,13 +901,13 @@ async def detect(websocket: WebSocket):
else: else:
raise raise
# Log status after sending # Log status after sending (no sessionId sent to backend)
if session_id and detection_result.get("class") != "none": if detection_dict is None:
logger.info(f"📤 WEBSOCKET RESPONSE with sessionId: {session_id} for camera {camera_id}")
elif detection_result.get("class") == "none":
logger.info(f"📡 SENT 'none' detection - backend should clear session for camera {camera_id}") logger.info(f"📡 SENT 'none' detection - backend should clear session for camera {camera_id}")
elif detection_dict == {}:
logger.info(f"📡 SENT empty detection - backend should create sessionId for camera {camera_id}")
else: else:
logger.info(f"📡 Detection data sent for camera {camera_id}") logger.info(f"📡 SENT detection data - backend manages sessionId independently for camera {camera_id}")
return persistent_data return persistent_data
except Exception as e: except Exception as e:
logger.error(f"Error in handle_detection for camera {camera_id}: {str(e)}", exc_info=True) logger.error(f"Error in handle_detection for camera {camera_id}: {str(e)}", exc_info=True)
@ -1145,16 +1151,46 @@ async def detect(websocket: WebSocket):
# Check if parameters changed # Check if parameters changed
if has_subscription_changed(desired_sub, current_stream): if has_subscription_changed(desired_sub, current_stream):
logger.info(f"Parameters changed for {subscription_id}, resubscribing") logger.info(f"Parameters changed for {subscription_id}, resubscribing")
await unsubscribe_internal(subscription_id) logger.debug(f"Parameter comparison for {subscription_id}:")
await subscribe_internal(desired_sub, websocket) logger.debug(f" rtspUrl: '{desired_sub.get('rtspUrl')}' vs '{current_stream.get('rtsp_url')}'")
logger.debug(f" snapshotUrl: '{desired_sub.get('snapshotUrl')}' vs '{current_stream.get('snapshot_url')}'")
logger.debug(f" modelUrl: '{extract_model_file_identifier(desired_sub.get('modelUrl'))}' vs '{extract_model_file_identifier(current_stream.get('modelUrl'))}'")
logger.debug(f" modelId: {desired_sub.get('modelId')} vs {current_stream.get('modelId')}")
# Preserve detection state for resubscription
cached_detection = cached_detections.get(subscription_id)
logger.debug(f"Preserving detection state for resubscription: {cached_detection is not None}")
await unsubscribe_internal(subscription_id, preserve_detection=True)
await subscribe_internal(desired_sub, websocket, cached_detection=cached_detection)
# Add new subscriptions # Add new subscriptions
for subscription_id in to_add: for subscription_id in to_add:
desired_sub = next(sub for sub in desired_subscriptions if sub["subscriptionIdentifier"] == subscription_id) desired_sub = next(sub for sub in desired_subscriptions if sub["subscriptionIdentifier"] == subscription_id)
await subscribe_internal(desired_sub, websocket) await subscribe_internal(desired_sub, websocket)
def extract_model_file_identifier(model_url):
"""Extract the core model file identifier from S3 URLs, ignoring timestamp parameters"""
if not model_url:
return None
# For S3 URLs, extract just the path portion before query parameters
try:
from urllib.parse import urlparse
parsed = urlparse(model_url)
# Return the path which contains the actual model file identifier
# e.g. "/adsist-cms-staging/models/bangchak_poc-1756312318569.mpta"
return parsed.path
except Exception as e:
logger.warning(f"Failed to parse model URL {model_url}: {e}")
return model_url
def has_subscription_changed(desired_sub, current_stream): def has_subscription_changed(desired_sub, current_stream):
"""Check if subscription parameters have changed""" """Check if subscription parameters have changed"""
# Smart model URL comparison - ignore timestamp changes in signed URLs
desired_model_id = extract_model_file_identifier(desired_sub.get("modelUrl"))
current_model_id = extract_model_file_identifier(current_stream.get("modelUrl"))
return ( return (
desired_sub.get("rtspUrl") != current_stream.get("rtsp_url") or desired_sub.get("rtspUrl") != current_stream.get("rtsp_url") or
desired_sub.get("snapshotUrl") != current_stream.get("snapshot_url") or desired_sub.get("snapshotUrl") != current_stream.get("snapshot_url") or
@ -1164,10 +1200,11 @@ async def detect(websocket: WebSocket):
desired_sub.get("cropX2") != current_stream.get("cropX2") or desired_sub.get("cropX2") != current_stream.get("cropX2") or
desired_sub.get("cropY2") != current_stream.get("cropY2") or desired_sub.get("cropY2") != current_stream.get("cropY2") or
desired_sub.get("modelId") != current_stream.get("modelId") or desired_sub.get("modelId") != current_stream.get("modelId") or
desired_sub.get("modelName") != current_stream.get("modelName") desired_sub.get("modelName") != current_stream.get("modelName") or
desired_model_id != current_model_id
) )
async def subscribe_internal(subscription, websocket): async def subscribe_internal(subscription, websocket, cached_detection=None):
"""Internal subscription logic extracted from original subscribe handler""" """Internal subscription logic extracted from original subscribe handler"""
subscriptionIdentifier = subscription.get("subscriptionIdentifier") subscriptionIdentifier = subscription.get("subscriptionIdentifier")
rtsp_url = subscription.get("rtspUrl") rtsp_url = subscription.get("rtspUrl")
@ -1274,21 +1311,29 @@ async def detect(websocket: WebSocket):
"buffer": buffer, "thread": thread, "stop_event": stop_event, "buffer": buffer, "thread": thread, "stop_event": stop_event,
"modelId": modelId, "modelName": modelName, "subscriptionIdentifier": subscriptionIdentifier, "modelId": modelId, "modelName": modelName, "subscriptionIdentifier": subscriptionIdentifier,
"cropX1": cropX1, "cropY1": cropY1, "cropX2": cropX2, "cropY2": cropY2, "cropX1": cropX1, "cropY1": cropY1, "cropX2": cropX2, "cropY2": cropY2,
"mode": mode, "camera_url": camera_url, "modelUrl": model_url "mode": mode, "camera_url": camera_url, "modelUrl": model_url,
# Always store both URLs for comparison consistency
"rtsp_url": rtsp_url,
"snapshot_url": snapshot_url,
"snapshot_interval": snapshot_interval
} }
if mode == "snapshot": if mode == "rtsp":
stream_info["snapshot_url"] = snapshot_url
stream_info["snapshot_interval"] = snapshot_interval
elif mode == "rtsp":
stream_info["rtsp_url"] = rtsp_url
stream_info["cap"] = shared_stream["cap"] stream_info["cap"] = shared_stream["cap"]
streams[camera_id] = stream_info streams[camera_id] = stream_info
subscription_to_camera[camera_id] = camera_url subscription_to_camera[camera_id] = camera_url
logger.info(f"Subscribed to camera {camera_id}") logger.info(f"Subscribed to camera {camera_id}")
# Send initial "none" detection to backend on camera connect # Send initial detection to backend - use cached if available, otherwise "none"
if cached_detection:
# Restore cached detection with updated timestamp (RESUBSCRIPTION STATUS UPDATE)
initial_detection_data = cached_detection.copy()
initial_detection_data["timestamp"] = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
logger.info(f"📡 RESUBSCRIPTION: Restoring cached detection for camera {camera_id}")
logger.debug(f"📡 RESUBSCRIPTION: Cached detection has sessionId: {initial_detection_data.get('sessionId', 'None')}")
else:
# Send "none" detection for new subscriptions
initial_detection_data = { initial_detection_data = {
"type": "imageDetection", "type": "imageDetection",
"subscriptionIdentifier": subscriptionIdentifier, "subscriptionIdentifier": subscriptionIdentifier,
@ -1299,13 +1344,16 @@ async def detect(websocket: WebSocket):
"modelName": modelName "modelName": modelName
} }
} }
logger.info(f"📡 NEW SUBSCRIPTION: Sending initial 'none' detection for camera {camera_id}")
ws_logger.info(f"TX -> {json.dumps(initial_detection_data, separators=(',', ':'))}") ws_logger.info(f"TX -> {json.dumps(initial_detection_data, separators=(',', ':'))}")
await websocket.send_json(initial_detection_data) await websocket.send_json(initial_detection_data)
logger.info(f"📡 Sent initial 'none' detection to backend for camera {camera_id}") logger.debug(f"Initial detection data sent (resubscription={cached_detection is not None}): {initial_detection_data}")
logger.debug(f"Initial detection data: {initial_detection_data}")
async def unsubscribe_internal(subscription_id): # This cached detection was just a one-time status update for resubscription
# Normal frame processing will continue independently
async def unsubscribe_internal(subscription_id, preserve_detection=False):
"""Internal unsubscription logic""" """Internal unsubscription logic"""
if subscription_id in streams: if subscription_id in streams:
stream = streams.pop(subscription_id) stream = streams.pop(subscription_id)
@ -1323,13 +1371,14 @@ async def detect(websocket: WebSocket):
del camera_streams[camera_url] del camera_streams[camera_url]
latest_frames.pop(subscription_id, None) latest_frames.pop(subscription_id, None)
cached_detections.pop(subscription_id, None) # Clear cached detection if not preserve_detection:
cached_detections.pop(subscription_id, None) # Clear cached detection only if not preserving
frame_skip_flags.pop(subscription_id, None) # Clear frame skip flag frame_skip_flags.pop(subscription_id, None) # Clear frame skip flag
camera_states.pop(subscription_id, None) # Clear camera state camera_states.pop(subscription_id, None) # Clear camera state
cached_full_pipeline_results.pop(subscription_id, None) # Clear cached pipeline results cached_full_pipeline_results.pop(subscription_id, None) # Clear cached pipeline results
session_pipeline_states.pop(subscription_id, None) # Clear session pipeline state session_pipeline_states.pop(subscription_id, None) # Clear session pipeline state
cleanup_camera_stability(subscription_id) cleanup_camera_stability(subscription_id)
logger.info(f"Unsubscribed from camera {subscription_id}") logger.info(f"Unsubscribed from camera {subscription_id} (preserve_detection={preserve_detection})")
async def process_streams(): async def process_streams():
logger.info("Started processing streams") logger.info("Started processing streams")

View file

@ -80,37 +80,50 @@ class DatabaseManager:
try: try:
cur = self.connection.cursor() cur = self.connection.cursor()
# Build the UPDATE query dynamically # Build the INSERT and UPDATE query dynamically
insert_placeholders = []
insert_values = [key_value] # Start with key_value
set_clauses = [] set_clauses = []
values = [] update_values = []
for field, value in fields.items(): for field, value in fields.items():
if value == "NOW()": if value == "NOW()":
# Special handling for NOW()
insert_placeholders.append("NOW()")
set_clauses.append(f"{field} = NOW()") set_clauses.append(f"{field} = NOW()")
else: else:
insert_placeholders.append("%s")
insert_values.append(value)
set_clauses.append(f"{field} = %s") set_clauses.append(f"{field} = %s")
values.append(value) update_values.append(value)
# Add schema prefix if table doesn't already have it # Add schema prefix if table doesn't already have it
full_table_name = table if '.' in table else f"gas_station_1.{table}" full_table_name = table if '.' in table else f"gas_station_1.{table}"
# Build the complete query
query = f""" query = f"""
INSERT INTO {full_table_name} ({key_field}, {', '.join(fields.keys())}) INSERT INTO {full_table_name} ({key_field}, {', '.join(fields.keys())})
VALUES (%s, {', '.join(['%s'] * len(fields))}) VALUES (%s, {', '.join(insert_placeholders)})
ON CONFLICT ({key_field}) ON CONFLICT ({key_field})
DO UPDATE SET {', '.join(set_clauses)} DO UPDATE SET {', '.join(set_clauses)}
""" """
# Add key_value to the beginning of values list # Combine values for the query: insert_values + update_values
all_values = [key_value] + list(fields.values()) + values all_values = insert_values + update_values
logger.debug(f"SQL Query: {query}")
logger.debug(f"Values: {all_values}")
cur.execute(query, all_values) cur.execute(query, all_values)
self.connection.commit() self.connection.commit()
cur.close() cur.close()
logger.info(f"Updated {table} for {key_field}={key_value}") logger.info(f"Updated {table} for {key_field}={key_value} with fields: {fields}")
return True return True
except Exception as e: except Exception as e:
logger.error(f"Failed to execute update on {table}: {e}") logger.error(f"❌ Failed to execute update on {table}: {e}")
logger.debug(f"Query: {query if 'query' in locals() else 'Query not built'}")
logger.debug(f"Values: {all_values if 'all_values' in locals() else 'Values not prepared'}")
if self.connection: if self.connection:
self.connection.rollback() self.connection.rollback()
return False return False

View file

@ -453,6 +453,7 @@ def execute_postgresql_update_combined(node, action, detection_result, branch_re
key_value = key_value_template.format(**action_context) key_value = key_value_template.format(**action_context)
logger.info(f"Executing database update: table={table}, {key_field}={key_value}") logger.info(f"Executing database update: table={table}, {key_field}={key_value}")
logger.debug(f"Available branch results: {list(branch_results.keys())}")
# Process field mappings # Process field mappings
mapped_fields = {} mapped_fields = {}
@ -461,26 +462,38 @@ def execute_postgresql_update_combined(node, action, detection_result, branch_re
mapped_value = resolve_field_mapping(value_template, branch_results, action_context) mapped_value = resolve_field_mapping(value_template, branch_results, action_context)
if mapped_value is not None: if mapped_value is not None:
mapped_fields[db_field] = mapped_value mapped_fields[db_field] = mapped_value
logger.debug(f"Mapped field: {db_field} = {mapped_value}") logger.info(f"Mapped field: {db_field} = {mapped_value}")
else: else:
logger.warning(f"Could not resolve field mapping for {db_field}: {value_template}") logger.warning(f"Could not resolve field mapping for {db_field}: {value_template}")
logger.debug(f"Available branch results: {branch_results}")
except Exception as e: except Exception as e:
logger.error(f"Error mapping field {db_field} with template '{value_template}': {e}") logger.error(f"Error mapping field {db_field} with template '{value_template}': {e}")
import traceback
logger.debug(f"Field mapping error traceback: {traceback.format_exc()}")
if not mapped_fields: if not mapped_fields:
logger.warning("No fields mapped successfully, skipping database update") logger.warning("No fields mapped successfully, skipping database update")
logger.debug(f"Branch results available: {branch_results}")
logger.debug(f"Field templates: {fields}")
return return
# Add updated_at field automatically
mapped_fields["updated_at"] = "NOW()"
# Execute the database update # Execute the database update
logger.info(f"Attempting database update with fields: {mapped_fields}")
success = node["db_manager"].execute_update(table, key_field, key_value, mapped_fields) success = node["db_manager"].execute_update(table, key_field, key_value, mapped_fields)
if success: if success:
logger.info(f"Successfully updated database: {table} with {len(mapped_fields)} fields") logger.info(f"✅ Successfully updated database: {table} with {len(mapped_fields)} fields")
logger.info(f"Updated fields: {mapped_fields}")
else: else:
logger.error(f"Failed to update database: {table}") logger.error(f"❌ Failed to update database: {table}")
logger.error(f"Attempted update with: {key_field}={key_value}, fields={mapped_fields}")
except KeyError as e: except KeyError as e:
logger.error(f"Missing required field in postgresql_update_combined action: {e}") logger.error(f"Missing required field in postgresql_update_combined action: {e}")
logger.debug(f"Action config: {action}")
except Exception as e: except Exception as e:
logger.error(f"Error in postgresql_update_combined action: {e}") logger.error(f"Error in postgresql_update_combined action: {e}")
import traceback import traceback
@ -489,27 +502,67 @@ def execute_postgresql_update_combined(node, action, detection_result, branch_re
def resolve_field_mapping(value_template, branch_results, action_context): def resolve_field_mapping(value_template, branch_results, action_context):
"""Resolve field mapping templates like {car_brand_cls_v1.brand}.""" """Resolve field mapping templates like {car_brand_cls_v1.brand}."""
try: try:
logger.debug(f"Resolving field mapping: '{value_template}'")
logger.debug(f"Available branch results: {list(branch_results.keys())}")
# Handle simple context variables first (non-branch references) # Handle simple context variables first (non-branch references)
if not '.' in value_template: if not '.' in value_template:
return value_template.format(**action_context) result = value_template.format(**action_context)
logger.debug(f"Simple template resolved: '{value_template}' -> '{result}'")
return result
# Handle branch result references like {model_id.field} # Handle branch result references like {model_id.field}
import re import re
branch_refs = re.findall(r'\{([^}]+\.[^}]+)\}', value_template) branch_refs = re.findall(r'\{([^}]+\.[^}]+)\}', value_template)
logger.debug(f"Found branch references: {branch_refs}")
resolved_template = value_template resolved_template = value_template
for ref in branch_refs: for ref in branch_refs:
try: try:
model_id, field_name = ref.split('.', 1) model_id, field_name = ref.split('.', 1)
logger.debug(f"Processing branch reference: model_id='{model_id}', field_name='{field_name}'")
if model_id in branch_results: if model_id in branch_results:
branch_data = branch_results[model_id] branch_data = branch_results[model_id]
logger.debug(f"Branch '{model_id}' data: {branch_data}")
if field_name in branch_data: if field_name in branch_data:
field_value = branch_data[field_name] field_value = branch_data[field_name]
resolved_template = resolved_template.replace(f'{{{ref}}}', str(field_value)) resolved_template = resolved_template.replace(f'{{{ref}}}', str(field_value))
logger.debug(f"Resolved {ref} to {field_value}") logger.info(f"Resolved {ref} to '{field_value}'")
else: else:
logger.warning(f"Field '{field_name}' not found in branch '{model_id}' results. Available fields: {list(branch_data.keys())}") logger.warning(f"Field '{field_name}' not found in branch '{model_id}' results.")
logger.debug(f"Available fields in '{model_id}': {list(branch_data.keys())}")
# Try alternative field names based on the class result and model type
if isinstance(branch_data, dict):
fallback_value = None
# First, try the exact field name
if field_name in branch_data:
fallback_value = branch_data[field_name]
# Then try 'class' field as fallback
elif 'class' in branch_data:
fallback_value = branch_data['class']
logger.info(f"Using 'class' field as fallback for '{field_name}': '{fallback_value}'")
# For brand models, also check if the class name exists as a key
elif field_name == 'brand' and branch_data.get('class') in branch_data:
fallback_value = branch_data[branch_data['class']]
logger.info(f"Found brand value using class name as key: '{fallback_value}'")
# For body_type models, also check if the class name exists as a key
elif field_name == 'body_type' and branch_data.get('class') in branch_data:
fallback_value = branch_data[branch_data['class']]
logger.info(f"Found body_type value using class name as key: '{fallback_value}'")
if fallback_value is not None:
resolved_template = resolved_template.replace(f'{{{ref}}}', str(fallback_value))
logger.info(f"✅ Resolved {ref} to '{fallback_value}' (using fallback)")
else:
logger.error(f"No suitable field found for '{field_name}' in branch '{model_id}'")
logger.debug(f"Branch data structure: {branch_data}")
return None
else:
logger.error(f"Branch data for '{model_id}' is not a dictionary: {type(branch_data)}")
return None return None
else: else:
logger.warning(f"Branch '{model_id}' not found in results. Available branches: {list(branch_results.keys())}") logger.warning(f"Branch '{model_id}' not found in results. Available branches: {list(branch_results.keys())}")
@ -521,6 +574,7 @@ def resolve_field_mapping(value_template, branch_results, action_context):
# Format any remaining simple variables # Format any remaining simple variables
try: try:
final_value = resolved_template.format(**action_context) final_value = resolved_template.format(**action_context)
logger.debug(f"Final resolved value: '{final_value}'")
return final_value return final_value
except KeyError as e: except KeyError as e:
logger.warning(f"Could not resolve context variable in template: {e}") logger.warning(f"Could not resolve context variable in template: {e}")
@ -528,6 +582,8 @@ def resolve_field_mapping(value_template, branch_results, action_context):
except Exception as e: except Exception as e:
logger.error(f"Error resolving field mapping '{value_template}': {e}") logger.error(f"Error resolving field mapping '{value_template}': {e}")
import traceback
logger.debug(f"Field mapping error traceback: {traceback.format_exc()}")
return None return None
def run_detection_with_tracking(frame, node, context=None): def run_detection_with_tracking(frame, node, context=None):
@ -1720,6 +1776,12 @@ def run_pipeline(frame, node: dict, return_bbox: bool=False, context=None):
if result: if result:
branch_results[br["modelId"]] = result branch_results[br["modelId"]] = result
logger.info(f"Branch {br['modelId']} completed: {result}") logger.info(f"Branch {br['modelId']} completed: {result}")
# Collect nested branch results if they exist
if "branch_results" in result:
for nested_id, nested_result in result["branch_results"].items():
branch_results[nested_id] = nested_result
logger.info(f"Collected nested branch result: {nested_id} = {nested_result}")
except Exception as e: except Exception as e:
logger.error(f"Branch {br['modelId']} failed: {e}") logger.error(f"Branch {br['modelId']} failed: {e}")
else: else:
@ -1760,6 +1822,12 @@ def run_pipeline(frame, node: dict, return_bbox: bool=False, context=None):
if result: if result:
branch_results[br["modelId"]] = result branch_results[br["modelId"]] = result
logger.info(f"Branch {br['modelId']} completed: {result}") logger.info(f"Branch {br['modelId']} completed: {result}")
# Collect nested branch results if they exist
if "branch_results" in result:
for nested_id, nested_result in result["branch_results"].items():
branch_results[nested_id] = nested_result
logger.info(f"Collected nested branch result: {nested_id} = {nested_result}")
else: else:
logger.warning(f"Branch {br['modelId']} returned no result") logger.warning(f"Branch {br['modelId']} returned no result")
except Exception as e: except Exception as e:

File diff suppressed because it is too large Load diff