fix postgresql
This commit is contained in:
parent
80d9c925de
commit
85b49ddf0f
2 changed files with 1468 additions and 75 deletions
199
app.py
199
app.py
|
@ -600,22 +600,7 @@ async def detect(websocket: WebSocket):
|
|||
"timestamp": time.time()
|
||||
}
|
||||
|
||||
# Cache the detection dict for lightweight mode reuse
|
||||
branch_results = detection_result.get("branch_results", {})
|
||||
cached_dict = {
|
||||
"carModel": branch_results.get("car_brand_cls_v1", {}).get("model"),
|
||||
"carBrand": branch_results.get("car_brand_cls_v1", {}).get("brand"),
|
||||
"carYear": None,
|
||||
"bodyType": branch_results.get("car_bodytype_cls_v1", {}).get("body_type"),
|
||||
"licensePlateText": None,
|
||||
"licensePlateConfidence": None
|
||||
}
|
||||
pipeline_state["cached_detection_dict"] = cached_dict
|
||||
|
||||
# Log what was cached for debugging
|
||||
logger.info(f"💾 Camera {camera_id}: CACHING DETECTION DICT:")
|
||||
logger.info(f"💾 Camera {camera_id}: - Full branch_results: {branch_results}")
|
||||
logger.info(f"💾 Camera {camera_id}: - Cached dict: {cached_dict}")
|
||||
# Note: Will cache detection_dict after branch processing completes
|
||||
|
||||
# Store the stable track ID for lightweight monitoring
|
||||
track_id = detection_result.get("track_id") or detection_result.get("id")
|
||||
|
@ -768,27 +753,30 @@ async def detect(websocket: WebSocket):
|
|||
detection_dict = None
|
||||
logger.debug(f"📤 SENDING 'NONE' - send_detections mode (no car) for camera {camera_id}")
|
||||
else:
|
||||
# Car detected - check if we have sessionId to determine what to send
|
||||
# Car detected in send_detections mode - ALWAYS send empty dict to trigger backend sessionId
|
||||
# Purpose: Tell backend "car is here, please create sessionId"
|
||||
detection_dict = {}
|
||||
logger.info(f"📤 SENDING EMPTY DETECTION_DICT - send_detections mode, requesting backend to create sessionId (conf={detection_result.get('confidence', 0):.3f}) for camera {camera_id}")
|
||||
|
||||
if backend_session_id:
|
||||
# Have sessionId - send full detection_dict for database updates
|
||||
detection_dict = {
|
||||
"carModel": None,
|
||||
"carBrand": None,
|
||||
"carYear": None,
|
||||
"bodyType": None,
|
||||
"licensePlateText": None,
|
||||
"licensePlateConfidence": None
|
||||
}
|
||||
logger.info(f"📤 SENDING FULL DETECTION_DICT - send_detections mode with sessionId {backend_session_id} (conf={detection_result.get('confidence', 0):.3f}) for camera {camera_id}")
|
||||
else:
|
||||
# No sessionId - send empty detection_dict to trigger backend to generate sessionId
|
||||
detection_dict = {}
|
||||
logger.info(f"📤 SENDING EMPTY DETECTION_DICT - send_detections mode without sessionId, triggering backend to generate sessionId (conf={detection_result.get('confidence', 0):.3f}) for camera {camera_id}")
|
||||
logger.debug(f"🔄 Camera {camera_id}: Note - sessionId {backend_session_id} exists but still in send_detections mode (transition pending)")
|
||||
|
||||
elif detection_result.get("class") == "none":
|
||||
# "None" detection in other modes (lightweight) - car left or absent for 3 frames
|
||||
detection_dict = None
|
||||
logger.info(f"📤 SENDING 'NONE' (detection: null) - Car absent, expecting backend to clear session for camera {camera_id}")
|
||||
# "None" detection - skip override if lightweight mode already made the decision
|
||||
if current_mode == "lightweight":
|
||||
# Lightweight mode already set detection_result correctly, don't override
|
||||
logger.debug(f"🪶 Camera {camera_id}: Lightweight mode - respecting detection_result decision")
|
||||
if detection_result is None:
|
||||
detection_dict = None
|
||||
logger.info(f"📤 LIGHTWEIGHT SENDING 'NONE' - Reset conditions met for camera {camera_id}")
|
||||
else:
|
||||
# detection_result should be the cached_detection_dict
|
||||
detection_dict = detection_result
|
||||
logger.info(f"💾 LIGHTWEIGHT SENDING CACHED - Maintaining session for camera {camera_id}")
|
||||
else:
|
||||
# Other modes - send null to clear session
|
||||
detection_dict = None
|
||||
logger.info(f"📤 SENDING 'NONE' (detection: null) - Car absent, expecting backend to clear session for camera {camera_id}")
|
||||
elif detection_result.get("cached_mode", False):
|
||||
# Cached mode in lightweight - use cached detection dict directly
|
||||
cached_dict = detection_result.get("branch_results", {})
|
||||
|
@ -800,10 +788,11 @@ async def detect(websocket: WebSocket):
|
|||
"licensePlateText": None,
|
||||
"licensePlateConfidence": None
|
||||
}
|
||||
logger.info(f"💾 Camera {camera_id}: SENDING CACHED DETECTION_DICT to backend:")
|
||||
logger.info(f"💾 Camera {camera_id}: - Cached branch_results: {cached_dict}")
|
||||
logger.info(f"💾 Camera {camera_id}: - Final detection_dict: {detection_dict}")
|
||||
logger.info(f"💾 Camera {camera_id}: - Track ID: {detection_result.get('track_id')} (lightweight mode)")
|
||||
elif detection_result and "carBrand" in detection_result:
|
||||
# Lightweight mode - detection_result IS the cached detection dict
|
||||
detection_dict = detection_result
|
||||
logger.info(f"💾 Camera {camera_id}: LIGHTWEIGHT MODE - using detection_result as detection_dict:")
|
||||
logger.info(f"💾 Camera {camera_id}: - detection_dict: {detection_dict}")
|
||||
else:
|
||||
# Valid detection - convert to backend format (will be populated by branch processing)
|
||||
detection_dict = {
|
||||
|
@ -855,6 +844,13 @@ async def detect(websocket: WebSocket):
|
|||
logger.debug(f"Processing branch results: {branch_results}")
|
||||
process_branch_results(branch_results)
|
||||
logger.info(f"Detection payload after branch processing: {detection_dict}")
|
||||
|
||||
# Cache the detection_dict for lightweight mode (after branch processing completes)
|
||||
if current_mode == "full_pipeline":
|
||||
pipeline_state = get_or_init_session_pipeline_state(camera_id)
|
||||
pipeline_state["cached_detection_dict"] = detection_dict.copy()
|
||||
logger.info(f"💾 Camera {camera_id}: CACHED DETECTION DICT after branch processing: {detection_dict}")
|
||||
|
||||
else:
|
||||
logger.debug("No branch results found in detection result")
|
||||
|
||||
|
@ -870,17 +866,17 @@ async def detect(websocket: WebSocket):
|
|||
}
|
||||
}
|
||||
|
||||
# Add session ID to detection data (NOT for "none" detections - backend uses absence of sessionId to know to clear the session)
|
||||
if session_id and detection_result.get("class") != "none":
|
||||
detection_data["sessionId"] = session_id
|
||||
logger.debug(f"Including sessionId {session_id} in WebSocket message")
|
||||
elif detection_result.get("class") == "none":
|
||||
logger.debug(f"NOT including sessionId in 'none' detection - backend should clear session")
|
||||
# SessionId should NEVER be sent from worker to backend - it's uni-directional (backend -> worker only)
|
||||
# Backend manages sessionIds independently based on detection content
|
||||
logger.debug(f"TX message prepared (no sessionId) - detection_dict type: {type(detection_dict)}")
|
||||
|
||||
# Log detection details
|
||||
if detection_result.get("class") != "none":
|
||||
if detection_result and "class" in detection_result and detection_result.get("class") != "none":
|
||||
confidence = detection_result.get("confidence", 0.0)
|
||||
logger.info(f"Camera {camera_id}: Detected {detection_result['class']} with confidence {confidence:.2f} using model {stream['modelName']}")
|
||||
elif detection_result and "carBrand" in detection_result:
|
||||
# Lightweight mode cached detection dict - different format
|
||||
logger.info(f"Camera {camera_id}: Using cached detection dict (lightweight mode) - {detection_result.get('carBrand', 'Unknown')} {detection_result.get('bodyType', '')}")
|
||||
|
||||
# Send detection data to backend (session gating handled above in processing logic)
|
||||
logger.debug(f"📤 SENDING TO BACKEND for camera {camera_id}: {json.dumps(detection_data, indent=2)}")
|
||||
|
@ -888,6 +884,16 @@ async def detect(websocket: WebSocket):
|
|||
ws_logger.info(f"TX -> {json.dumps(detection_data, separators=(',', ':'))}")
|
||||
await websocket.send_json(detection_data)
|
||||
logger.debug(f"Sent detection data to client for camera {camera_id}")
|
||||
|
||||
# Cache the detection data for potential resubscriptions (only if not null detection)
|
||||
if detection_dict is not None and detection_result.get("class") != "none":
|
||||
cached_detections[camera_id] = detection_data.copy()
|
||||
logger.debug(f"Cached detection for camera {camera_id}: {detection_dict}")
|
||||
else:
|
||||
# Don't cache null/none detections - let them reset properly
|
||||
cached_detections.pop(camera_id, None)
|
||||
logger.debug(f"Not caching null/none detection for camera {camera_id}")
|
||||
|
||||
except RuntimeError as e:
|
||||
if "websocket.close" in str(e):
|
||||
logger.warning(f"WebSocket connection closed - cannot send detection data for camera {camera_id}")
|
||||
|
@ -895,13 +901,13 @@ async def detect(websocket: WebSocket):
|
|||
else:
|
||||
raise
|
||||
|
||||
# Log status after sending
|
||||
if session_id and detection_result.get("class") != "none":
|
||||
logger.info(f"📤 WEBSOCKET RESPONSE with sessionId: {session_id} for camera {camera_id}")
|
||||
elif detection_result.get("class") == "none":
|
||||
# Log status after sending (no sessionId sent to backend)
|
||||
if detection_dict is None:
|
||||
logger.info(f"📡 SENT 'none' detection - backend should clear session for camera {camera_id}")
|
||||
elif detection_dict == {}:
|
||||
logger.info(f"📡 SENT empty detection - backend should create sessionId for camera {camera_id}")
|
||||
else:
|
||||
logger.info(f"📡 Detection data sent for camera {camera_id}")
|
||||
logger.info(f"📡 SENT detection data - backend manages sessionId independently for camera {camera_id}")
|
||||
return persistent_data
|
||||
except Exception as e:
|
||||
logger.error(f"Error in handle_detection for camera {camera_id}: {str(e)}", exc_info=True)
|
||||
|
@ -1145,16 +1151,46 @@ async def detect(websocket: WebSocket):
|
|||
# Check if parameters changed
|
||||
if has_subscription_changed(desired_sub, current_stream):
|
||||
logger.info(f"Parameters changed for {subscription_id}, resubscribing")
|
||||
await unsubscribe_internal(subscription_id)
|
||||
await subscribe_internal(desired_sub, websocket)
|
||||
logger.debug(f"Parameter comparison for {subscription_id}:")
|
||||
logger.debug(f" rtspUrl: '{desired_sub.get('rtspUrl')}' vs '{current_stream.get('rtsp_url')}'")
|
||||
logger.debug(f" snapshotUrl: '{desired_sub.get('snapshotUrl')}' vs '{current_stream.get('snapshot_url')}'")
|
||||
logger.debug(f" modelUrl: '{extract_model_file_identifier(desired_sub.get('modelUrl'))}' vs '{extract_model_file_identifier(current_stream.get('modelUrl'))}'")
|
||||
logger.debug(f" modelId: {desired_sub.get('modelId')} vs {current_stream.get('modelId')}")
|
||||
|
||||
# Preserve detection state for resubscription
|
||||
cached_detection = cached_detections.get(subscription_id)
|
||||
logger.debug(f"Preserving detection state for resubscription: {cached_detection is not None}")
|
||||
|
||||
await unsubscribe_internal(subscription_id, preserve_detection=True)
|
||||
await subscribe_internal(desired_sub, websocket, cached_detection=cached_detection)
|
||||
|
||||
# Add new subscriptions
|
||||
for subscription_id in to_add:
|
||||
desired_sub = next(sub for sub in desired_subscriptions if sub["subscriptionIdentifier"] == subscription_id)
|
||||
await subscribe_internal(desired_sub, websocket)
|
||||
|
||||
def extract_model_file_identifier(model_url):
|
||||
"""Extract the core model file identifier from S3 URLs, ignoring timestamp parameters"""
|
||||
if not model_url:
|
||||
return None
|
||||
|
||||
# For S3 URLs, extract just the path portion before query parameters
|
||||
try:
|
||||
from urllib.parse import urlparse
|
||||
parsed = urlparse(model_url)
|
||||
# Return the path which contains the actual model file identifier
|
||||
# e.g. "/adsist-cms-staging/models/bangchak_poc-1756312318569.mpta"
|
||||
return parsed.path
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to parse model URL {model_url}: {e}")
|
||||
return model_url
|
||||
|
||||
def has_subscription_changed(desired_sub, current_stream):
|
||||
"""Check if subscription parameters have changed"""
|
||||
# Smart model URL comparison - ignore timestamp changes in signed URLs
|
||||
desired_model_id = extract_model_file_identifier(desired_sub.get("modelUrl"))
|
||||
current_model_id = extract_model_file_identifier(current_stream.get("modelUrl"))
|
||||
|
||||
return (
|
||||
desired_sub.get("rtspUrl") != current_stream.get("rtsp_url") or
|
||||
desired_sub.get("snapshotUrl") != current_stream.get("snapshot_url") or
|
||||
|
@ -1164,10 +1200,11 @@ async def detect(websocket: WebSocket):
|
|||
desired_sub.get("cropX2") != current_stream.get("cropX2") or
|
||||
desired_sub.get("cropY2") != current_stream.get("cropY2") or
|
||||
desired_sub.get("modelId") != current_stream.get("modelId") or
|
||||
desired_sub.get("modelName") != current_stream.get("modelName")
|
||||
desired_sub.get("modelName") != current_stream.get("modelName") or
|
||||
desired_model_id != current_model_id
|
||||
)
|
||||
|
||||
async def subscribe_internal(subscription, websocket):
|
||||
async def subscribe_internal(subscription, websocket, cached_detection=None):
|
||||
"""Internal subscription logic extracted from original subscribe handler"""
|
||||
subscriptionIdentifier = subscription.get("subscriptionIdentifier")
|
||||
rtsp_url = subscription.get("rtspUrl")
|
||||
|
@ -1274,38 +1311,49 @@ async def detect(websocket: WebSocket):
|
|||
"buffer": buffer, "thread": thread, "stop_event": stop_event,
|
||||
"modelId": modelId, "modelName": modelName, "subscriptionIdentifier": subscriptionIdentifier,
|
||||
"cropX1": cropX1, "cropY1": cropY1, "cropX2": cropX2, "cropY2": cropY2,
|
||||
"mode": mode, "camera_url": camera_url, "modelUrl": model_url
|
||||
"mode": mode, "camera_url": camera_url, "modelUrl": model_url,
|
||||
# Always store both URLs for comparison consistency
|
||||
"rtsp_url": rtsp_url,
|
||||
"snapshot_url": snapshot_url,
|
||||
"snapshot_interval": snapshot_interval
|
||||
}
|
||||
|
||||
if mode == "snapshot":
|
||||
stream_info["snapshot_url"] = snapshot_url
|
||||
stream_info["snapshot_interval"] = snapshot_interval
|
||||
elif mode == "rtsp":
|
||||
stream_info["rtsp_url"] = rtsp_url
|
||||
if mode == "rtsp":
|
||||
stream_info["cap"] = shared_stream["cap"]
|
||||
|
||||
streams[camera_id] = stream_info
|
||||
subscription_to_camera[camera_id] = camera_url
|
||||
logger.info(f"Subscribed to camera {camera_id}")
|
||||
|
||||
# Send initial "none" detection to backend on camera connect
|
||||
initial_detection_data = {
|
||||
"type": "imageDetection",
|
||||
"subscriptionIdentifier": subscriptionIdentifier,
|
||||
"timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
|
||||
"data": {
|
||||
"detection": None,
|
||||
"modelId": modelId,
|
||||
"modelName": modelName
|
||||
# Send initial detection to backend - use cached if available, otherwise "none"
|
||||
if cached_detection:
|
||||
# Restore cached detection with updated timestamp (RESUBSCRIPTION STATUS UPDATE)
|
||||
initial_detection_data = cached_detection.copy()
|
||||
initial_detection_data["timestamp"] = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
|
||||
logger.info(f"📡 RESUBSCRIPTION: Restoring cached detection for camera {camera_id}")
|
||||
logger.debug(f"📡 RESUBSCRIPTION: Cached detection has sessionId: {initial_detection_data.get('sessionId', 'None')}")
|
||||
else:
|
||||
# Send "none" detection for new subscriptions
|
||||
initial_detection_data = {
|
||||
"type": "imageDetection",
|
||||
"subscriptionIdentifier": subscriptionIdentifier,
|
||||
"timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
|
||||
"data": {
|
||||
"detection": None,
|
||||
"modelId": modelId,
|
||||
"modelName": modelName
|
||||
}
|
||||
}
|
||||
}
|
||||
logger.info(f"📡 NEW SUBSCRIPTION: Sending initial 'none' detection for camera {camera_id}")
|
||||
|
||||
ws_logger.info(f"TX -> {json.dumps(initial_detection_data, separators=(',', ':'))}")
|
||||
await websocket.send_json(initial_detection_data)
|
||||
logger.info(f"📡 Sent initial 'none' detection to backend for camera {camera_id}")
|
||||
logger.debug(f"Initial detection data: {initial_detection_data}")
|
||||
logger.debug(f"Initial detection data sent (resubscription={cached_detection is not None}): {initial_detection_data}")
|
||||
|
||||
# This cached detection was just a one-time status update for resubscription
|
||||
# Normal frame processing will continue independently
|
||||
|
||||
async def unsubscribe_internal(subscription_id):
|
||||
async def unsubscribe_internal(subscription_id, preserve_detection=False):
|
||||
"""Internal unsubscription logic"""
|
||||
if subscription_id in streams:
|
||||
stream = streams.pop(subscription_id)
|
||||
|
@ -1323,13 +1371,14 @@ async def detect(websocket: WebSocket):
|
|||
del camera_streams[camera_url]
|
||||
|
||||
latest_frames.pop(subscription_id, None)
|
||||
cached_detections.pop(subscription_id, None) # Clear cached detection
|
||||
if not preserve_detection:
|
||||
cached_detections.pop(subscription_id, None) # Clear cached detection only if not preserving
|
||||
frame_skip_flags.pop(subscription_id, None) # Clear frame skip flag
|
||||
camera_states.pop(subscription_id, None) # Clear camera state
|
||||
cached_full_pipeline_results.pop(subscription_id, None) # Clear cached pipeline results
|
||||
session_pipeline_states.pop(subscription_id, None) # Clear session pipeline state
|
||||
cleanup_camera_stability(subscription_id)
|
||||
logger.info(f"Unsubscribed from camera {subscription_id}")
|
||||
logger.info(f"Unsubscribed from camera {subscription_id} (preserve_detection={preserve_detection})")
|
||||
|
||||
async def process_streams():
|
||||
logger.info("Started processing streams")
|
||||
|
|
1344
websocket_comm.log
1344
websocket_comm.log
File diff suppressed because it is too large
Load diff
Loading…
Add table
Add a link
Reference in a new issue