Compare commits
	
		
			2 commits
		
	
	
		
			80d9c925de
			...
			39394caa8e
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| 
							 | 
						39394caa8e | ||
| 
							 | 
						85b49ddf0f | 
					 4 changed files with 1717 additions and 90 deletions
				
			
		
							
								
								
									
										199
									
								
								app.py
									
										
									
									
									
								
							
							
						
						
									
										199
									
								
								app.py
									
										
									
									
									
								
							| 
						 | 
					@ -600,22 +600,7 @@ async def detect(websocket: WebSocket):
 | 
				
			||||||
                        "timestamp": time.time()
 | 
					                        "timestamp": time.time()
 | 
				
			||||||
                    }
 | 
					                    }
 | 
				
			||||||
                    
 | 
					                    
 | 
				
			||||||
                    # Cache the detection dict for lightweight mode reuse
 | 
					                    # Note: Will cache detection_dict after branch processing completes
 | 
				
			||||||
                    branch_results = detection_result.get("branch_results", {})
 | 
					 | 
				
			||||||
                    cached_dict = {
 | 
					 | 
				
			||||||
                        "carModel": branch_results.get("car_brand_cls_v1", {}).get("model"),
 | 
					 | 
				
			||||||
                        "carBrand": branch_results.get("car_brand_cls_v1", {}).get("brand"),
 | 
					 | 
				
			||||||
                        "carYear": None,
 | 
					 | 
				
			||||||
                        "bodyType": branch_results.get("car_bodytype_cls_v1", {}).get("body_type"),
 | 
					 | 
				
			||||||
                        "licensePlateText": None,
 | 
					 | 
				
			||||||
                        "licensePlateConfidence": None
 | 
					 | 
				
			||||||
                    }
 | 
					 | 
				
			||||||
                    pipeline_state["cached_detection_dict"] = cached_dict
 | 
					 | 
				
			||||||
                    
 | 
					 | 
				
			||||||
                    # Log what was cached for debugging
 | 
					 | 
				
			||||||
                    logger.info(f"💾 Camera {camera_id}: CACHING DETECTION DICT:")
 | 
					 | 
				
			||||||
                    logger.info(f"💾 Camera {camera_id}: - Full branch_results: {branch_results}")
 | 
					 | 
				
			||||||
                    logger.info(f"💾 Camera {camera_id}: - Cached dict: {cached_dict}")
 | 
					 | 
				
			||||||
                    
 | 
					                    
 | 
				
			||||||
                    # Store the stable track ID for lightweight monitoring
 | 
					                    # Store the stable track ID for lightweight monitoring
 | 
				
			||||||
                    track_id = detection_result.get("track_id") or detection_result.get("id")
 | 
					                    track_id = detection_result.get("track_id") or detection_result.get("id")
 | 
				
			||||||
| 
						 | 
					@ -768,27 +753,30 @@ async def detect(websocket: WebSocket):
 | 
				
			||||||
                    detection_dict = None
 | 
					                    detection_dict = None
 | 
				
			||||||
                    logger.debug(f"📤 SENDING 'NONE' - send_detections mode (no car) for camera {camera_id}")
 | 
					                    logger.debug(f"📤 SENDING 'NONE' - send_detections mode (no car) for camera {camera_id}")
 | 
				
			||||||
                else:
 | 
					                else:
 | 
				
			||||||
                    # Car detected - check if we have sessionId to determine what to send
 | 
					                    # Car detected in send_detections mode - ALWAYS send empty dict to trigger backend sessionId
 | 
				
			||||||
 | 
					                    # Purpose: Tell backend "car is here, please create sessionId"
 | 
				
			||||||
 | 
					                    detection_dict = {}
 | 
				
			||||||
 | 
					                    logger.info(f"📤 SENDING EMPTY DETECTION_DICT - send_detections mode, requesting backend to create sessionId (conf={detection_result.get('confidence', 0):.3f}) for camera {camera_id}")
 | 
				
			||||||
 | 
					                    
 | 
				
			||||||
                    if backend_session_id:
 | 
					                    if backend_session_id:
 | 
				
			||||||
                        # Have sessionId - send full detection_dict for database updates
 | 
					                        logger.debug(f"🔄 Camera {camera_id}: Note - sessionId {backend_session_id} exists but still in send_detections mode (transition pending)")
 | 
				
			||||||
                        detection_dict = {
 | 
					 | 
				
			||||||
                            "carModel": None,
 | 
					 | 
				
			||||||
                            "carBrand": None,
 | 
					 | 
				
			||||||
                            "carYear": None,
 | 
					 | 
				
			||||||
                            "bodyType": None,
 | 
					 | 
				
			||||||
                            "licensePlateText": None,
 | 
					 | 
				
			||||||
                            "licensePlateConfidence": None
 | 
					 | 
				
			||||||
                        }
 | 
					 | 
				
			||||||
                        logger.info(f"📤 SENDING FULL DETECTION_DICT - send_detections mode with sessionId {backend_session_id} (conf={detection_result.get('confidence', 0):.3f}) for camera {camera_id}")
 | 
					 | 
				
			||||||
                    else:
 | 
					 | 
				
			||||||
                        # No sessionId - send empty detection_dict to trigger backend to generate sessionId
 | 
					 | 
				
			||||||
                        detection_dict = {}
 | 
					 | 
				
			||||||
                        logger.info(f"📤 SENDING EMPTY DETECTION_DICT - send_detections mode without sessionId, triggering backend to generate sessionId (conf={detection_result.get('confidence', 0):.3f}) for camera {camera_id}")
 | 
					 | 
				
			||||||
                    
 | 
					                    
 | 
				
			||||||
            elif detection_result.get("class") == "none":
 | 
					            elif detection_result.get("class") == "none":
 | 
				
			||||||
                # "None" detection in other modes (lightweight) - car left or absent for 3 frames
 | 
					                # "None" detection - skip override if lightweight mode already made the decision
 | 
				
			||||||
                detection_dict = None
 | 
					                if current_mode == "lightweight":
 | 
				
			||||||
                logger.info(f"📤 SENDING 'NONE' (detection: null) - Car absent, expecting backend to clear session for camera {camera_id}")
 | 
					                    # Lightweight mode already set detection_result correctly, don't override
 | 
				
			||||||
 | 
					                    logger.debug(f"🪶 Camera {camera_id}: Lightweight mode - respecting detection_result decision")
 | 
				
			||||||
 | 
					                    if detection_result is None:
 | 
				
			||||||
 | 
					                        detection_dict = None
 | 
				
			||||||
 | 
					                        logger.info(f"📤 LIGHTWEIGHT SENDING 'NONE' - Reset conditions met for camera {camera_id}")
 | 
				
			||||||
 | 
					                    else:
 | 
				
			||||||
 | 
					                        # detection_result should be the cached_detection_dict
 | 
				
			||||||
 | 
					                        detection_dict = detection_result
 | 
				
			||||||
 | 
					                        logger.info(f"💾 LIGHTWEIGHT SENDING CACHED - Maintaining session for camera {camera_id}")
 | 
				
			||||||
 | 
					                else:
 | 
				
			||||||
 | 
					                    # Other modes - send null to clear session
 | 
				
			||||||
 | 
					                    detection_dict = None
 | 
				
			||||||
 | 
					                    logger.info(f"📤 SENDING 'NONE' (detection: null) - Car absent, expecting backend to clear session for camera {camera_id}")
 | 
				
			||||||
            elif detection_result.get("cached_mode", False):
 | 
					            elif detection_result.get("cached_mode", False):
 | 
				
			||||||
                # Cached mode in lightweight - use cached detection dict directly
 | 
					                # Cached mode in lightweight - use cached detection dict directly
 | 
				
			||||||
                cached_dict = detection_result.get("branch_results", {})
 | 
					                cached_dict = detection_result.get("branch_results", {})
 | 
				
			||||||
| 
						 | 
					@ -800,10 +788,11 @@ async def detect(websocket: WebSocket):
 | 
				
			||||||
                    "licensePlateText": None,
 | 
					                    "licensePlateText": None,
 | 
				
			||||||
                    "licensePlateConfidence": None
 | 
					                    "licensePlateConfidence": None
 | 
				
			||||||
                }
 | 
					                }
 | 
				
			||||||
                logger.info(f"💾 Camera {camera_id}: SENDING CACHED DETECTION_DICT to backend:")
 | 
					            elif detection_result and "carBrand" in detection_result:
 | 
				
			||||||
                logger.info(f"💾 Camera {camera_id}: - Cached branch_results: {cached_dict}")
 | 
					                # Lightweight mode - detection_result IS the cached detection dict
 | 
				
			||||||
                logger.info(f"💾 Camera {camera_id}: - Final detection_dict: {detection_dict}")
 | 
					                detection_dict = detection_result
 | 
				
			||||||
                logger.info(f"💾 Camera {camera_id}: - Track ID: {detection_result.get('track_id')} (lightweight mode)")
 | 
					                logger.info(f"💾 Camera {camera_id}: LIGHTWEIGHT MODE - using detection_result as detection_dict:")
 | 
				
			||||||
 | 
					                logger.info(f"💾 Camera {camera_id}: - detection_dict: {detection_dict}")
 | 
				
			||||||
            else:
 | 
					            else:
 | 
				
			||||||
                # Valid detection - convert to backend format (will be populated by branch processing)
 | 
					                # Valid detection - convert to backend format (will be populated by branch processing)
 | 
				
			||||||
                detection_dict = {
 | 
					                detection_dict = {
 | 
				
			||||||
| 
						 | 
					@ -855,6 +844,13 @@ async def detect(websocket: WebSocket):
 | 
				
			||||||
                    logger.debug(f"Processing branch results: {branch_results}")
 | 
					                    logger.debug(f"Processing branch results: {branch_results}")
 | 
				
			||||||
                    process_branch_results(branch_results)
 | 
					                    process_branch_results(branch_results)
 | 
				
			||||||
                    logger.info(f"Detection payload after branch processing: {detection_dict}")
 | 
					                    logger.info(f"Detection payload after branch processing: {detection_dict}")
 | 
				
			||||||
 | 
					                    
 | 
				
			||||||
 | 
					                    # Cache the detection_dict for lightweight mode (after branch processing completes)
 | 
				
			||||||
 | 
					                    if current_mode == "full_pipeline":
 | 
				
			||||||
 | 
					                        pipeline_state = get_or_init_session_pipeline_state(camera_id)
 | 
				
			||||||
 | 
					                        pipeline_state["cached_detection_dict"] = detection_dict.copy()
 | 
				
			||||||
 | 
					                        logger.info(f"💾 Camera {camera_id}: CACHED DETECTION DICT after branch processing: {detection_dict}")
 | 
				
			||||||
 | 
					                        
 | 
				
			||||||
                else:
 | 
					                else:
 | 
				
			||||||
                    logger.debug("No branch results found in detection result")
 | 
					                    logger.debug("No branch results found in detection result")
 | 
				
			||||||
            
 | 
					            
 | 
				
			||||||
| 
						 | 
					@ -870,17 +866,17 @@ async def detect(websocket: WebSocket):
 | 
				
			||||||
                }
 | 
					                }
 | 
				
			||||||
            }
 | 
					            }
 | 
				
			||||||
            
 | 
					            
 | 
				
			||||||
            # Add session ID to detection data (NOT for "none" detections - backend uses absence of sessionId to know to clear the session)
 | 
					            # SessionId should NEVER be sent from worker to backend - it's uni-directional (backend -> worker only)
 | 
				
			||||||
            if session_id and detection_result.get("class") != "none":
 | 
					            # Backend manages sessionIds independently based on detection content
 | 
				
			||||||
                detection_data["sessionId"] = session_id
 | 
					            logger.debug(f"TX message prepared (no sessionId) - detection_dict type: {type(detection_dict)}")
 | 
				
			||||||
                logger.debug(f"Including sessionId {session_id} in WebSocket message")
 | 
					 | 
				
			||||||
            elif detection_result.get("class") == "none":
 | 
					 | 
				
			||||||
                logger.debug(f"NOT including sessionId in 'none' detection - backend should clear session")
 | 
					 | 
				
			||||||
            
 | 
					            
 | 
				
			||||||
            # Log detection details
 | 
					            # Log detection details
 | 
				
			||||||
            if detection_result.get("class") != "none":
 | 
					            if detection_result and "class" in detection_result and detection_result.get("class") != "none":
 | 
				
			||||||
                confidence = detection_result.get("confidence", 0.0)
 | 
					                confidence = detection_result.get("confidence", 0.0)
 | 
				
			||||||
                logger.info(f"Camera {camera_id}: Detected {detection_result['class']} with confidence {confidence:.2f} using model {stream['modelName']}")
 | 
					                logger.info(f"Camera {camera_id}: Detected {detection_result['class']} with confidence {confidence:.2f} using model {stream['modelName']}")
 | 
				
			||||||
 | 
					            elif detection_result and "carBrand" in detection_result:
 | 
				
			||||||
 | 
					                # Lightweight mode cached detection dict - different format
 | 
				
			||||||
 | 
					                logger.info(f"Camera {camera_id}: Using cached detection dict (lightweight mode) - {detection_result.get('carBrand', 'Unknown')} {detection_result.get('bodyType', '')}")
 | 
				
			||||||
            
 | 
					            
 | 
				
			||||||
            # Send detection data to backend (session gating handled above in processing logic)
 | 
					            # Send detection data to backend (session gating handled above in processing logic)
 | 
				
			||||||
            logger.debug(f"📤 SENDING TO BACKEND for camera {camera_id}: {json.dumps(detection_data, indent=2)}")
 | 
					            logger.debug(f"📤 SENDING TO BACKEND for camera {camera_id}: {json.dumps(detection_data, indent=2)}")
 | 
				
			||||||
| 
						 | 
					@ -888,6 +884,16 @@ async def detect(websocket: WebSocket):
 | 
				
			||||||
                ws_logger.info(f"TX -> {json.dumps(detection_data, separators=(',', ':'))}")
 | 
					                ws_logger.info(f"TX -> {json.dumps(detection_data, separators=(',', ':'))}")
 | 
				
			||||||
                await websocket.send_json(detection_data)
 | 
					                await websocket.send_json(detection_data)
 | 
				
			||||||
                logger.debug(f"Sent detection data to client for camera {camera_id}")
 | 
					                logger.debug(f"Sent detection data to client for camera {camera_id}")
 | 
				
			||||||
 | 
					                
 | 
				
			||||||
 | 
					                # Cache the detection data for potential resubscriptions (only if not null detection)
 | 
				
			||||||
 | 
					                if detection_dict is not None and detection_result.get("class") != "none":
 | 
				
			||||||
 | 
					                    cached_detections[camera_id] = detection_data.copy()
 | 
				
			||||||
 | 
					                    logger.debug(f"Cached detection for camera {camera_id}: {detection_dict}")
 | 
				
			||||||
 | 
					                else:
 | 
				
			||||||
 | 
					                    # Don't cache null/none detections - let them reset properly
 | 
				
			||||||
 | 
					                    cached_detections.pop(camera_id, None)
 | 
				
			||||||
 | 
					                    logger.debug(f"Not caching null/none detection for camera {camera_id}")
 | 
				
			||||||
 | 
					                
 | 
				
			||||||
            except RuntimeError as e:
 | 
					            except RuntimeError as e:
 | 
				
			||||||
                if "websocket.close" in str(e):
 | 
					                if "websocket.close" in str(e):
 | 
				
			||||||
                    logger.warning(f"WebSocket connection closed - cannot send detection data for camera {camera_id}")
 | 
					                    logger.warning(f"WebSocket connection closed - cannot send detection data for camera {camera_id}")
 | 
				
			||||||
| 
						 | 
					@ -895,13 +901,13 @@ async def detect(websocket: WebSocket):
 | 
				
			||||||
                else:
 | 
					                else:
 | 
				
			||||||
                    raise
 | 
					                    raise
 | 
				
			||||||
            
 | 
					            
 | 
				
			||||||
            # Log status after sending
 | 
					            # Log status after sending (no sessionId sent to backend)
 | 
				
			||||||
            if session_id and detection_result.get("class") != "none":
 | 
					            if detection_dict is None:
 | 
				
			||||||
                logger.info(f"📤 WEBSOCKET RESPONSE with sessionId: {session_id} for camera {camera_id}")
 | 
					 | 
				
			||||||
            elif detection_result.get("class") == "none":
 | 
					 | 
				
			||||||
                logger.info(f"📡 SENT 'none' detection - backend should clear session for camera {camera_id}")
 | 
					                logger.info(f"📡 SENT 'none' detection - backend should clear session for camera {camera_id}")
 | 
				
			||||||
 | 
					            elif detection_dict == {}:
 | 
				
			||||||
 | 
					                logger.info(f"📡 SENT empty detection - backend should create sessionId for camera {camera_id}")
 | 
				
			||||||
            else:
 | 
					            else:
 | 
				
			||||||
                logger.info(f"📡 Detection data sent for camera {camera_id}")
 | 
					                logger.info(f"📡 SENT detection data - backend manages sessionId independently for camera {camera_id}")
 | 
				
			||||||
            return persistent_data
 | 
					            return persistent_data
 | 
				
			||||||
        except Exception as e:
 | 
					        except Exception as e:
 | 
				
			||||||
            logger.error(f"Error in handle_detection for camera {camera_id}: {str(e)}", exc_info=True)
 | 
					            logger.error(f"Error in handle_detection for camera {camera_id}: {str(e)}", exc_info=True)
 | 
				
			||||||
| 
						 | 
					@ -1145,16 +1151,46 @@ async def detect(websocket: WebSocket):
 | 
				
			||||||
                # Check if parameters changed
 | 
					                # Check if parameters changed
 | 
				
			||||||
                if has_subscription_changed(desired_sub, current_stream):
 | 
					                if has_subscription_changed(desired_sub, current_stream):
 | 
				
			||||||
                    logger.info(f"Parameters changed for {subscription_id}, resubscribing")
 | 
					                    logger.info(f"Parameters changed for {subscription_id}, resubscribing")
 | 
				
			||||||
                    await unsubscribe_internal(subscription_id)
 | 
					                    logger.debug(f"Parameter comparison for {subscription_id}:")
 | 
				
			||||||
                    await subscribe_internal(desired_sub, websocket)
 | 
					                    logger.debug(f"  rtspUrl: '{desired_sub.get('rtspUrl')}' vs '{current_stream.get('rtsp_url')}'")
 | 
				
			||||||
 | 
					                    logger.debug(f"  snapshotUrl: '{desired_sub.get('snapshotUrl')}' vs '{current_stream.get('snapshot_url')}'")
 | 
				
			||||||
 | 
					                    logger.debug(f"  modelUrl: '{extract_model_file_identifier(desired_sub.get('modelUrl'))}' vs '{extract_model_file_identifier(current_stream.get('modelUrl'))}'")
 | 
				
			||||||
 | 
					                    logger.debug(f"  modelId: {desired_sub.get('modelId')} vs {current_stream.get('modelId')}")
 | 
				
			||||||
 | 
					                    
 | 
				
			||||||
 | 
					                    # Preserve detection state for resubscription 
 | 
				
			||||||
 | 
					                    cached_detection = cached_detections.get(subscription_id)
 | 
				
			||||||
 | 
					                    logger.debug(f"Preserving detection state for resubscription: {cached_detection is not None}")
 | 
				
			||||||
 | 
					                    
 | 
				
			||||||
 | 
					                    await unsubscribe_internal(subscription_id, preserve_detection=True)
 | 
				
			||||||
 | 
					                    await subscribe_internal(desired_sub, websocket, cached_detection=cached_detection)
 | 
				
			||||||
                    
 | 
					                    
 | 
				
			||||||
            # Add new subscriptions
 | 
					            # Add new subscriptions
 | 
				
			||||||
            for subscription_id in to_add:
 | 
					            for subscription_id in to_add:
 | 
				
			||||||
                desired_sub = next(sub for sub in desired_subscriptions if sub["subscriptionIdentifier"] == subscription_id)
 | 
					                desired_sub = next(sub for sub in desired_subscriptions if sub["subscriptionIdentifier"] == subscription_id)
 | 
				
			||||||
                await subscribe_internal(desired_sub, websocket)
 | 
					                await subscribe_internal(desired_sub, websocket)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def extract_model_file_identifier(model_url):
 | 
				
			||||||
 | 
					        """Extract the core model file identifier from S3 URLs, ignoring timestamp parameters"""
 | 
				
			||||||
 | 
					        if not model_url:
 | 
				
			||||||
 | 
					            return None
 | 
				
			||||||
 | 
					        
 | 
				
			||||||
 | 
					        # For S3 URLs, extract just the path portion before query parameters
 | 
				
			||||||
 | 
					        try:
 | 
				
			||||||
 | 
					            from urllib.parse import urlparse
 | 
				
			||||||
 | 
					            parsed = urlparse(model_url)
 | 
				
			||||||
 | 
					            # Return the path which contains the actual model file identifier
 | 
				
			||||||
 | 
					            # e.g. "/adsist-cms-staging/models/bangchak_poc-1756312318569.mpta"
 | 
				
			||||||
 | 
					            return parsed.path
 | 
				
			||||||
 | 
					        except Exception as e:
 | 
				
			||||||
 | 
					            logger.warning(f"Failed to parse model URL {model_url}: {e}")
 | 
				
			||||||
 | 
					            return model_url
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def has_subscription_changed(desired_sub, current_stream):
 | 
					    def has_subscription_changed(desired_sub, current_stream):
 | 
				
			||||||
        """Check if subscription parameters have changed"""
 | 
					        """Check if subscription parameters have changed"""
 | 
				
			||||||
 | 
					        # Smart model URL comparison - ignore timestamp changes in signed URLs
 | 
				
			||||||
 | 
					        desired_model_id = extract_model_file_identifier(desired_sub.get("modelUrl"))
 | 
				
			||||||
 | 
					        current_model_id = extract_model_file_identifier(current_stream.get("modelUrl"))
 | 
				
			||||||
 | 
					        
 | 
				
			||||||
        return (
 | 
					        return (
 | 
				
			||||||
            desired_sub.get("rtspUrl") != current_stream.get("rtsp_url") or
 | 
					            desired_sub.get("rtspUrl") != current_stream.get("rtsp_url") or
 | 
				
			||||||
            desired_sub.get("snapshotUrl") != current_stream.get("snapshot_url") or  
 | 
					            desired_sub.get("snapshotUrl") != current_stream.get("snapshot_url") or  
 | 
				
			||||||
| 
						 | 
					@ -1164,10 +1200,11 @@ async def detect(websocket: WebSocket):
 | 
				
			||||||
            desired_sub.get("cropX2") != current_stream.get("cropX2") or
 | 
					            desired_sub.get("cropX2") != current_stream.get("cropX2") or
 | 
				
			||||||
            desired_sub.get("cropY2") != current_stream.get("cropY2") or
 | 
					            desired_sub.get("cropY2") != current_stream.get("cropY2") or
 | 
				
			||||||
            desired_sub.get("modelId") != current_stream.get("modelId") or
 | 
					            desired_sub.get("modelId") != current_stream.get("modelId") or
 | 
				
			||||||
            desired_sub.get("modelName") != current_stream.get("modelName")
 | 
					            desired_sub.get("modelName") != current_stream.get("modelName") or
 | 
				
			||||||
 | 
					            desired_model_id != current_model_id
 | 
				
			||||||
        )
 | 
					        )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    async def subscribe_internal(subscription, websocket):
 | 
					    async def subscribe_internal(subscription, websocket, cached_detection=None):
 | 
				
			||||||
        """Internal subscription logic extracted from original subscribe handler"""
 | 
					        """Internal subscription logic extracted from original subscribe handler"""
 | 
				
			||||||
        subscriptionIdentifier = subscription.get("subscriptionIdentifier")
 | 
					        subscriptionIdentifier = subscription.get("subscriptionIdentifier")
 | 
				
			||||||
        rtsp_url = subscription.get("rtspUrl") 
 | 
					        rtsp_url = subscription.get("rtspUrl") 
 | 
				
			||||||
| 
						 | 
					@ -1274,38 +1311,49 @@ async def detect(websocket: WebSocket):
 | 
				
			||||||
                "buffer": buffer, "thread": thread, "stop_event": stop_event,
 | 
					                "buffer": buffer, "thread": thread, "stop_event": stop_event,
 | 
				
			||||||
                "modelId": modelId, "modelName": modelName, "subscriptionIdentifier": subscriptionIdentifier,
 | 
					                "modelId": modelId, "modelName": modelName, "subscriptionIdentifier": subscriptionIdentifier,
 | 
				
			||||||
                "cropX1": cropX1, "cropY1": cropY1, "cropX2": cropX2, "cropY2": cropY2,
 | 
					                "cropX1": cropX1, "cropY1": cropY1, "cropX2": cropX2, "cropY2": cropY2,
 | 
				
			||||||
                "mode": mode, "camera_url": camera_url, "modelUrl": model_url
 | 
					                "mode": mode, "camera_url": camera_url, "modelUrl": model_url,
 | 
				
			||||||
 | 
					                # Always store both URLs for comparison consistency
 | 
				
			||||||
 | 
					                "rtsp_url": rtsp_url,
 | 
				
			||||||
 | 
					                "snapshot_url": snapshot_url,
 | 
				
			||||||
 | 
					                "snapshot_interval": snapshot_interval
 | 
				
			||||||
            }
 | 
					            }
 | 
				
			||||||
            
 | 
					            
 | 
				
			||||||
            if mode == "snapshot":
 | 
					            if mode == "rtsp":
 | 
				
			||||||
                stream_info["snapshot_url"] = snapshot_url
 | 
					 | 
				
			||||||
                stream_info["snapshot_interval"] = snapshot_interval
 | 
					 | 
				
			||||||
            elif mode == "rtsp":
 | 
					 | 
				
			||||||
                stream_info["rtsp_url"] = rtsp_url
 | 
					 | 
				
			||||||
                stream_info["cap"] = shared_stream["cap"]
 | 
					                stream_info["cap"] = shared_stream["cap"]
 | 
				
			||||||
            
 | 
					            
 | 
				
			||||||
            streams[camera_id] = stream_info
 | 
					            streams[camera_id] = stream_info
 | 
				
			||||||
            subscription_to_camera[camera_id] = camera_url
 | 
					            subscription_to_camera[camera_id] = camera_url
 | 
				
			||||||
            logger.info(f"Subscribed to camera {camera_id}")
 | 
					            logger.info(f"Subscribed to camera {camera_id}")
 | 
				
			||||||
            
 | 
					            
 | 
				
			||||||
            # Send initial "none" detection to backend on camera connect
 | 
					            # Send initial detection to backend - use cached if available, otherwise "none"
 | 
				
			||||||
            initial_detection_data = {
 | 
					            if cached_detection:
 | 
				
			||||||
                "type": "imageDetection",
 | 
					                # Restore cached detection with updated timestamp (RESUBSCRIPTION STATUS UPDATE)
 | 
				
			||||||
                "subscriptionIdentifier": subscriptionIdentifier,
 | 
					                initial_detection_data = cached_detection.copy()
 | 
				
			||||||
                "timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
 | 
					                initial_detection_data["timestamp"] = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
 | 
				
			||||||
                "data": {
 | 
					                logger.info(f"📡 RESUBSCRIPTION: Restoring cached detection for camera {camera_id}")
 | 
				
			||||||
                    "detection": None,
 | 
					                logger.debug(f"📡 RESUBSCRIPTION: Cached detection has sessionId: {initial_detection_data.get('sessionId', 'None')}")
 | 
				
			||||||
                    "modelId": modelId,
 | 
					            else:
 | 
				
			||||||
                    "modelName": modelName
 | 
					                # Send "none" detection for new subscriptions
 | 
				
			||||||
 | 
					                initial_detection_data = {
 | 
				
			||||||
 | 
					                    "type": "imageDetection",
 | 
				
			||||||
 | 
					                    "subscriptionIdentifier": subscriptionIdentifier,
 | 
				
			||||||
 | 
					                    "timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
 | 
				
			||||||
 | 
					                    "data": {
 | 
				
			||||||
 | 
					                        "detection": None,
 | 
				
			||||||
 | 
					                        "modelId": modelId,
 | 
				
			||||||
 | 
					                        "modelName": modelName
 | 
				
			||||||
 | 
					                    }
 | 
				
			||||||
                }
 | 
					                }
 | 
				
			||||||
            }
 | 
					                logger.info(f"📡 NEW SUBSCRIPTION: Sending initial 'none' detection for camera {camera_id}")
 | 
				
			||||||
            
 | 
					            
 | 
				
			||||||
            ws_logger.info(f"TX -> {json.dumps(initial_detection_data, separators=(',', ':'))}")
 | 
					            ws_logger.info(f"TX -> {json.dumps(initial_detection_data, separators=(',', ':'))}")
 | 
				
			||||||
            await websocket.send_json(initial_detection_data)
 | 
					            await websocket.send_json(initial_detection_data)
 | 
				
			||||||
            logger.info(f"📡 Sent initial 'none' detection to backend for camera {camera_id}")
 | 
					            logger.debug(f"Initial detection data sent (resubscription={cached_detection is not None}): {initial_detection_data}")
 | 
				
			||||||
            logger.debug(f"Initial detection data: {initial_detection_data}")
 | 
					 | 
				
			||||||
            
 | 
					            
 | 
				
			||||||
    async def unsubscribe_internal(subscription_id):
 | 
					            # This cached detection was just a one-time status update for resubscription
 | 
				
			||||||
 | 
					            # Normal frame processing will continue independently
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    async def unsubscribe_internal(subscription_id, preserve_detection=False):
 | 
				
			||||||
        """Internal unsubscription logic"""
 | 
					        """Internal unsubscription logic"""
 | 
				
			||||||
        if subscription_id in streams:
 | 
					        if subscription_id in streams:
 | 
				
			||||||
            stream = streams.pop(subscription_id)
 | 
					            stream = streams.pop(subscription_id)
 | 
				
			||||||
| 
						 | 
					@ -1323,13 +1371,14 @@ async def detect(websocket: WebSocket):
 | 
				
			||||||
                    del camera_streams[camera_url]
 | 
					                    del camera_streams[camera_url]
 | 
				
			||||||
            
 | 
					            
 | 
				
			||||||
            latest_frames.pop(subscription_id, None)
 | 
					            latest_frames.pop(subscription_id, None)
 | 
				
			||||||
            cached_detections.pop(subscription_id, None)  # Clear cached detection
 | 
					            if not preserve_detection:
 | 
				
			||||||
 | 
					                cached_detections.pop(subscription_id, None)  # Clear cached detection only if not preserving
 | 
				
			||||||
            frame_skip_flags.pop(subscription_id, None)   # Clear frame skip flag
 | 
					            frame_skip_flags.pop(subscription_id, None)   # Clear frame skip flag
 | 
				
			||||||
            camera_states.pop(subscription_id, None)     # Clear camera state
 | 
					            camera_states.pop(subscription_id, None)     # Clear camera state
 | 
				
			||||||
            cached_full_pipeline_results.pop(subscription_id, None)  # Clear cached pipeline results
 | 
					            cached_full_pipeline_results.pop(subscription_id, None)  # Clear cached pipeline results
 | 
				
			||||||
            session_pipeline_states.pop(subscription_id, None)  # Clear session pipeline state
 | 
					            session_pipeline_states.pop(subscription_id, None)  # Clear session pipeline state
 | 
				
			||||||
            cleanup_camera_stability(subscription_id)
 | 
					            cleanup_camera_stability(subscription_id)
 | 
				
			||||||
            logger.info(f"Unsubscribed from camera {subscription_id}")
 | 
					            logger.info(f"Unsubscribed from camera {subscription_id} (preserve_detection={preserve_detection})")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    async def process_streams():
 | 
					    async def process_streams():
 | 
				
			||||||
        logger.info("Started processing streams")
 | 
					        logger.info("Started processing streams")
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -80,37 +80,50 @@ class DatabaseManager:
 | 
				
			||||||
        try:
 | 
					        try:
 | 
				
			||||||
            cur = self.connection.cursor()
 | 
					            cur = self.connection.cursor()
 | 
				
			||||||
            
 | 
					            
 | 
				
			||||||
            # Build the UPDATE query dynamically
 | 
					            # Build the INSERT and UPDATE query dynamically
 | 
				
			||||||
 | 
					            insert_placeholders = []
 | 
				
			||||||
 | 
					            insert_values = [key_value]  # Start with key_value
 | 
				
			||||||
 | 
					            
 | 
				
			||||||
            set_clauses = []
 | 
					            set_clauses = []
 | 
				
			||||||
            values = []
 | 
					            update_values = []
 | 
				
			||||||
            
 | 
					            
 | 
				
			||||||
            for field, value in fields.items():
 | 
					            for field, value in fields.items():
 | 
				
			||||||
                if value == "NOW()":
 | 
					                if value == "NOW()":
 | 
				
			||||||
 | 
					                    # Special handling for NOW()
 | 
				
			||||||
 | 
					                    insert_placeholders.append("NOW()")
 | 
				
			||||||
                    set_clauses.append(f"{field} = NOW()")
 | 
					                    set_clauses.append(f"{field} = NOW()")
 | 
				
			||||||
                else:
 | 
					                else:
 | 
				
			||||||
 | 
					                    insert_placeholders.append("%s")
 | 
				
			||||||
 | 
					                    insert_values.append(value)
 | 
				
			||||||
                    set_clauses.append(f"{field} = %s")
 | 
					                    set_clauses.append(f"{field} = %s")
 | 
				
			||||||
                    values.append(value)
 | 
					                    update_values.append(value)
 | 
				
			||||||
            
 | 
					            
 | 
				
			||||||
            # Add schema prefix if table doesn't already have it
 | 
					            # Add schema prefix if table doesn't already have it
 | 
				
			||||||
            full_table_name = table if '.' in table else f"gas_station_1.{table}"
 | 
					            full_table_name = table if '.' in table else f"gas_station_1.{table}"
 | 
				
			||||||
            
 | 
					            
 | 
				
			||||||
 | 
					            # Build the complete query
 | 
				
			||||||
            query = f"""
 | 
					            query = f"""
 | 
				
			||||||
            INSERT INTO {full_table_name} ({key_field}, {', '.join(fields.keys())})
 | 
					            INSERT INTO {full_table_name} ({key_field}, {', '.join(fields.keys())})
 | 
				
			||||||
            VALUES (%s, {', '.join(['%s'] * len(fields))})
 | 
					            VALUES (%s, {', '.join(insert_placeholders)})
 | 
				
			||||||
            ON CONFLICT ({key_field})
 | 
					            ON CONFLICT ({key_field})
 | 
				
			||||||
            DO UPDATE SET {', '.join(set_clauses)}
 | 
					            DO UPDATE SET {', '.join(set_clauses)}
 | 
				
			||||||
            """
 | 
					            """
 | 
				
			||||||
            
 | 
					            
 | 
				
			||||||
            # Add key_value to the beginning of values list
 | 
					            # Combine values for the query: insert_values + update_values
 | 
				
			||||||
            all_values = [key_value] + list(fields.values()) + values
 | 
					            all_values = insert_values + update_values
 | 
				
			||||||
 | 
					            
 | 
				
			||||||
 | 
					            logger.debug(f"SQL Query: {query}")
 | 
				
			||||||
 | 
					            logger.debug(f"Values: {all_values}")
 | 
				
			||||||
            
 | 
					            
 | 
				
			||||||
            cur.execute(query, all_values)
 | 
					            cur.execute(query, all_values)
 | 
				
			||||||
            self.connection.commit()
 | 
					            self.connection.commit()
 | 
				
			||||||
            cur.close()
 | 
					            cur.close()
 | 
				
			||||||
            logger.info(f"Updated {table} for {key_field}={key_value}")
 | 
					            logger.info(f"✅ Updated {table} for {key_field}={key_value} with fields: {fields}")
 | 
				
			||||||
            return True
 | 
					            return True
 | 
				
			||||||
        except Exception as e:
 | 
					        except Exception as e:
 | 
				
			||||||
            logger.error(f"Failed to execute update on {table}: {e}")
 | 
					            logger.error(f"❌ Failed to execute update on {table}: {e}")
 | 
				
			||||||
 | 
					            logger.debug(f"Query: {query if 'query' in locals() else 'Query not built'}")
 | 
				
			||||||
 | 
					            logger.debug(f"Values: {all_values if 'all_values' in locals() else 'Values not prepared'}")
 | 
				
			||||||
            if self.connection:
 | 
					            if self.connection:
 | 
				
			||||||
                self.connection.rollback()
 | 
					                self.connection.rollback()
 | 
				
			||||||
            return False
 | 
					            return False
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -453,6 +453,7 @@ def execute_postgresql_update_combined(node, action, detection_result, branch_re
 | 
				
			||||||
        key_value = key_value_template.format(**action_context)
 | 
					        key_value = key_value_template.format(**action_context)
 | 
				
			||||||
        
 | 
					        
 | 
				
			||||||
        logger.info(f"Executing database update: table={table}, {key_field}={key_value}")
 | 
					        logger.info(f"Executing database update: table={table}, {key_field}={key_value}")
 | 
				
			||||||
 | 
					        logger.debug(f"Available branch results: {list(branch_results.keys())}")
 | 
				
			||||||
        
 | 
					        
 | 
				
			||||||
        # Process field mappings
 | 
					        # Process field mappings
 | 
				
			||||||
        mapped_fields = {}
 | 
					        mapped_fields = {}
 | 
				
			||||||
| 
						 | 
					@ -461,26 +462,38 @@ def execute_postgresql_update_combined(node, action, detection_result, branch_re
 | 
				
			||||||
                mapped_value = resolve_field_mapping(value_template, branch_results, action_context)
 | 
					                mapped_value = resolve_field_mapping(value_template, branch_results, action_context)
 | 
				
			||||||
                if mapped_value is not None:
 | 
					                if mapped_value is not None:
 | 
				
			||||||
                    mapped_fields[db_field] = mapped_value
 | 
					                    mapped_fields[db_field] = mapped_value
 | 
				
			||||||
                    logger.debug(f"Mapped field: {db_field} = {mapped_value}")
 | 
					                    logger.info(f"Mapped field: {db_field} = {mapped_value}")
 | 
				
			||||||
                else:
 | 
					                else:
 | 
				
			||||||
                    logger.warning(f"Could not resolve field mapping for {db_field}: {value_template}")
 | 
					                    logger.warning(f"Could not resolve field mapping for {db_field}: {value_template}")
 | 
				
			||||||
 | 
					                    logger.debug(f"Available branch results: {branch_results}")
 | 
				
			||||||
            except Exception as e:
 | 
					            except Exception as e:
 | 
				
			||||||
                logger.error(f"Error mapping field {db_field} with template '{value_template}': {e}")
 | 
					                logger.error(f"Error mapping field {db_field} with template '{value_template}': {e}")
 | 
				
			||||||
 | 
					                import traceback
 | 
				
			||||||
 | 
					                logger.debug(f"Field mapping error traceback: {traceback.format_exc()}")
 | 
				
			||||||
        
 | 
					        
 | 
				
			||||||
        if not mapped_fields:
 | 
					        if not mapped_fields:
 | 
				
			||||||
            logger.warning("No fields mapped successfully, skipping database update")
 | 
					            logger.warning("No fields mapped successfully, skipping database update")
 | 
				
			||||||
 | 
					            logger.debug(f"Branch results available: {branch_results}")
 | 
				
			||||||
 | 
					            logger.debug(f"Field templates: {fields}")
 | 
				
			||||||
            return
 | 
					            return
 | 
				
			||||||
            
 | 
					            
 | 
				
			||||||
 | 
					        # Add updated_at field automatically
 | 
				
			||||||
 | 
					        mapped_fields["updated_at"] = "NOW()"
 | 
				
			||||||
 | 
					        
 | 
				
			||||||
        # Execute the database update
 | 
					        # Execute the database update
 | 
				
			||||||
 | 
					        logger.info(f"Attempting database update with fields: {mapped_fields}")
 | 
				
			||||||
        success = node["db_manager"].execute_update(table, key_field, key_value, mapped_fields)
 | 
					        success = node["db_manager"].execute_update(table, key_field, key_value, mapped_fields)
 | 
				
			||||||
        
 | 
					        
 | 
				
			||||||
        if success:
 | 
					        if success:
 | 
				
			||||||
            logger.info(f"Successfully updated database: {table} with {len(mapped_fields)} fields")
 | 
					            logger.info(f"✅ Successfully updated database: {table} with {len(mapped_fields)} fields")
 | 
				
			||||||
 | 
					            logger.info(f"Updated fields: {mapped_fields}")
 | 
				
			||||||
        else:
 | 
					        else:
 | 
				
			||||||
            logger.error(f"Failed to update database: {table}")
 | 
					            logger.error(f"❌ Failed to update database: {table}")
 | 
				
			||||||
 | 
					            logger.error(f"Attempted update with: {key_field}={key_value}, fields={mapped_fields}")
 | 
				
			||||||
            
 | 
					            
 | 
				
			||||||
    except KeyError as e:
 | 
					    except KeyError as e:
 | 
				
			||||||
        logger.error(f"Missing required field in postgresql_update_combined action: {e}")
 | 
					        logger.error(f"Missing required field in postgresql_update_combined action: {e}")
 | 
				
			||||||
 | 
					        logger.debug(f"Action config: {action}")
 | 
				
			||||||
    except Exception as e:
 | 
					    except Exception as e:
 | 
				
			||||||
        logger.error(f"Error in postgresql_update_combined action: {e}")
 | 
					        logger.error(f"Error in postgresql_update_combined action: {e}")
 | 
				
			||||||
        import traceback
 | 
					        import traceback
 | 
				
			||||||
| 
						 | 
					@ -489,28 +502,68 @@ def execute_postgresql_update_combined(node, action, detection_result, branch_re
 | 
				
			||||||
def resolve_field_mapping(value_template, branch_results, action_context):
 | 
					def resolve_field_mapping(value_template, branch_results, action_context):
 | 
				
			||||||
    """Resolve field mapping templates like {car_brand_cls_v1.brand}."""
 | 
					    """Resolve field mapping templates like {car_brand_cls_v1.brand}."""
 | 
				
			||||||
    try:
 | 
					    try:
 | 
				
			||||||
 | 
					        logger.debug(f"Resolving field mapping: '{value_template}'")
 | 
				
			||||||
 | 
					        logger.debug(f"Available branch results: {list(branch_results.keys())}")
 | 
				
			||||||
 | 
					        
 | 
				
			||||||
        # Handle simple context variables first (non-branch references)
 | 
					        # Handle simple context variables first (non-branch references)
 | 
				
			||||||
        if not '.' in value_template:
 | 
					        if not '.' in value_template:
 | 
				
			||||||
            return value_template.format(**action_context)
 | 
					            result = value_template.format(**action_context)
 | 
				
			||||||
 | 
					            logger.debug(f"Simple template resolved: '{value_template}' -> '{result}'")
 | 
				
			||||||
 | 
					            return result
 | 
				
			||||||
        
 | 
					        
 | 
				
			||||||
        # Handle branch result references like {model_id.field}
 | 
					        # Handle branch result references like {model_id.field}
 | 
				
			||||||
        import re
 | 
					        import re
 | 
				
			||||||
        branch_refs = re.findall(r'\{([^}]+\.[^}]+)\}', value_template)
 | 
					        branch_refs = re.findall(r'\{([^}]+\.[^}]+)\}', value_template)
 | 
				
			||||||
 | 
					        logger.debug(f"Found branch references: {branch_refs}")
 | 
				
			||||||
        
 | 
					        
 | 
				
			||||||
        resolved_template = value_template
 | 
					        resolved_template = value_template
 | 
				
			||||||
        for ref in branch_refs:
 | 
					        for ref in branch_refs:
 | 
				
			||||||
            try:
 | 
					            try:
 | 
				
			||||||
                model_id, field_name = ref.split('.', 1)
 | 
					                model_id, field_name = ref.split('.', 1)
 | 
				
			||||||
 | 
					                logger.debug(f"Processing branch reference: model_id='{model_id}', field_name='{field_name}'")
 | 
				
			||||||
                
 | 
					                
 | 
				
			||||||
                if model_id in branch_results:
 | 
					                if model_id in branch_results:
 | 
				
			||||||
                    branch_data = branch_results[model_id]
 | 
					                    branch_data = branch_results[model_id]
 | 
				
			||||||
 | 
					                    logger.debug(f"Branch '{model_id}' data: {branch_data}")
 | 
				
			||||||
 | 
					                    
 | 
				
			||||||
                    if field_name in branch_data:
 | 
					                    if field_name in branch_data:
 | 
				
			||||||
                        field_value = branch_data[field_name]
 | 
					                        field_value = branch_data[field_name]
 | 
				
			||||||
                        resolved_template = resolved_template.replace(f'{{{ref}}}', str(field_value))
 | 
					                        resolved_template = resolved_template.replace(f'{{{ref}}}', str(field_value))
 | 
				
			||||||
                        logger.debug(f"Resolved {ref} to {field_value}")
 | 
					                        logger.info(f"✅ Resolved {ref} to '{field_value}'")
 | 
				
			||||||
                    else:
 | 
					                    else:
 | 
				
			||||||
                        logger.warning(f"Field '{field_name}' not found in branch '{model_id}' results. Available fields: {list(branch_data.keys())}")
 | 
					                        logger.warning(f"Field '{field_name}' not found in branch '{model_id}' results.")
 | 
				
			||||||
                        return None
 | 
					                        logger.debug(f"Available fields in '{model_id}': {list(branch_data.keys())}")
 | 
				
			||||||
 | 
					                        
 | 
				
			||||||
 | 
					                        # Try alternative field names based on the class result and model type
 | 
				
			||||||
 | 
					                        if isinstance(branch_data, dict):
 | 
				
			||||||
 | 
					                            fallback_value = None
 | 
				
			||||||
 | 
					                            
 | 
				
			||||||
 | 
					                            # First, try the exact field name
 | 
				
			||||||
 | 
					                            if field_name in branch_data:
 | 
				
			||||||
 | 
					                                fallback_value = branch_data[field_name]
 | 
				
			||||||
 | 
					                            # Then try 'class' field as fallback
 | 
				
			||||||
 | 
					                            elif 'class' in branch_data:
 | 
				
			||||||
 | 
					                                fallback_value = branch_data['class']
 | 
				
			||||||
 | 
					                                logger.info(f"Using 'class' field as fallback for '{field_name}': '{fallback_value}'")
 | 
				
			||||||
 | 
					                            # For brand models, also check if the class name exists as a key
 | 
				
			||||||
 | 
					                            elif field_name == 'brand' and branch_data.get('class') in branch_data:
 | 
				
			||||||
 | 
					                                fallback_value = branch_data[branch_data['class']]
 | 
				
			||||||
 | 
					                                logger.info(f"Found brand value using class name as key: '{fallback_value}'")
 | 
				
			||||||
 | 
					                            # For body_type models, also check if the class name exists as a key
 | 
				
			||||||
 | 
					                            elif field_name == 'body_type' and branch_data.get('class') in branch_data:
 | 
				
			||||||
 | 
					                                fallback_value = branch_data[branch_data['class']]
 | 
				
			||||||
 | 
					                                logger.info(f"Found body_type value using class name as key: '{fallback_value}'")
 | 
				
			||||||
 | 
					                            
 | 
				
			||||||
 | 
					                            if fallback_value is not None:
 | 
				
			||||||
 | 
					                                resolved_template = resolved_template.replace(f'{{{ref}}}', str(fallback_value))
 | 
				
			||||||
 | 
					                                logger.info(f"✅ Resolved {ref} to '{fallback_value}' (using fallback)")
 | 
				
			||||||
 | 
					                            else:
 | 
				
			||||||
 | 
					                                logger.error(f"No suitable field found for '{field_name}' in branch '{model_id}'")
 | 
				
			||||||
 | 
					                                logger.debug(f"Branch data structure: {branch_data}")
 | 
				
			||||||
 | 
					                                return None
 | 
				
			||||||
 | 
					                        else:
 | 
				
			||||||
 | 
					                            logger.error(f"Branch data for '{model_id}' is not a dictionary: {type(branch_data)}")
 | 
				
			||||||
 | 
					                            return None
 | 
				
			||||||
                else:
 | 
					                else:
 | 
				
			||||||
                    logger.warning(f"Branch '{model_id}' not found in results. Available branches: {list(branch_results.keys())}")
 | 
					                    logger.warning(f"Branch '{model_id}' not found in results. Available branches: {list(branch_results.keys())}")
 | 
				
			||||||
                    return None
 | 
					                    return None
 | 
				
			||||||
| 
						 | 
					@ -521,6 +574,7 @@ def resolve_field_mapping(value_template, branch_results, action_context):
 | 
				
			||||||
        # Format any remaining simple variables
 | 
					        # Format any remaining simple variables
 | 
				
			||||||
        try:
 | 
					        try:
 | 
				
			||||||
            final_value = resolved_template.format(**action_context)
 | 
					            final_value = resolved_template.format(**action_context)
 | 
				
			||||||
 | 
					            logger.debug(f"Final resolved value: '{final_value}'")
 | 
				
			||||||
            return final_value
 | 
					            return final_value
 | 
				
			||||||
        except KeyError as e:
 | 
					        except KeyError as e:
 | 
				
			||||||
            logger.warning(f"Could not resolve context variable in template: {e}")
 | 
					            logger.warning(f"Could not resolve context variable in template: {e}")
 | 
				
			||||||
| 
						 | 
					@ -528,6 +582,8 @@ def resolve_field_mapping(value_template, branch_results, action_context):
 | 
				
			||||||
            
 | 
					            
 | 
				
			||||||
    except Exception as e:
 | 
					    except Exception as e:
 | 
				
			||||||
        logger.error(f"Error resolving field mapping '{value_template}': {e}")
 | 
					        logger.error(f"Error resolving field mapping '{value_template}': {e}")
 | 
				
			||||||
 | 
					        import traceback
 | 
				
			||||||
 | 
					        logger.debug(f"Field mapping error traceback: {traceback.format_exc()}")
 | 
				
			||||||
        return None
 | 
					        return None
 | 
				
			||||||
 | 
					
 | 
				
			||||||
def run_detection_with_tracking(frame, node, context=None):
 | 
					def run_detection_with_tracking(frame, node, context=None):
 | 
				
			||||||
| 
						 | 
					@ -1720,6 +1776,12 @@ def run_pipeline(frame, node: dict, return_bbox: bool=False, context=None):
 | 
				
			||||||
                                if result:
 | 
					                                if result:
 | 
				
			||||||
                                    branch_results[br["modelId"]] = result
 | 
					                                    branch_results[br["modelId"]] = result
 | 
				
			||||||
                                    logger.info(f"Branch {br['modelId']} completed: {result}")
 | 
					                                    logger.info(f"Branch {br['modelId']} completed: {result}")
 | 
				
			||||||
 | 
					                                    
 | 
				
			||||||
 | 
					                                    # Collect nested branch results if they exist
 | 
				
			||||||
 | 
					                                    if "branch_results" in result:
 | 
				
			||||||
 | 
					                                        for nested_id, nested_result in result["branch_results"].items():
 | 
				
			||||||
 | 
					                                            branch_results[nested_id] = nested_result
 | 
				
			||||||
 | 
					                                            logger.info(f"Collected nested branch result: {nested_id} = {nested_result}")
 | 
				
			||||||
                            except Exception as e:
 | 
					                            except Exception as e:
 | 
				
			||||||
                                logger.error(f"Branch {br['modelId']} failed: {e}")
 | 
					                                logger.error(f"Branch {br['modelId']} failed: {e}")
 | 
				
			||||||
                else:
 | 
					                else:
 | 
				
			||||||
| 
						 | 
					@ -1760,6 +1822,12 @@ def run_pipeline(frame, node: dict, return_bbox: bool=False, context=None):
 | 
				
			||||||
                            if result:
 | 
					                            if result:
 | 
				
			||||||
                                branch_results[br["modelId"]] = result
 | 
					                                branch_results[br["modelId"]] = result
 | 
				
			||||||
                                logger.info(f"Branch {br['modelId']} completed: {result}")
 | 
					                                logger.info(f"Branch {br['modelId']} completed: {result}")
 | 
				
			||||||
 | 
					                                
 | 
				
			||||||
 | 
					                                # Collect nested branch results if they exist
 | 
				
			||||||
 | 
					                                if "branch_results" in result:
 | 
				
			||||||
 | 
					                                    for nested_id, nested_result in result["branch_results"].items():
 | 
				
			||||||
 | 
					                                        branch_results[nested_id] = nested_result
 | 
				
			||||||
 | 
					                                        logger.info(f"Collected nested branch result: {nested_id} = {nested_result}")
 | 
				
			||||||
                            else:
 | 
					                            else:
 | 
				
			||||||
                                logger.warning(f"Branch {br['modelId']} returned no result")
 | 
					                                logger.warning(f"Branch {br['modelId']} returned no result")
 | 
				
			||||||
                        except Exception as e:
 | 
					                        except Exception as e:
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
							
								
								
									
										1497
									
								
								websocket_comm.log
									
										
									
									
									
								
							
							
						
						
									
										1497
									
								
								websocket_comm.log
									
										
									
									
									
								
							
										
											
												File diff suppressed because it is too large
												Load diff
											
										
									
								
							
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue