From 07eddd3f0de708300a5ed49e5c436e001cd881a9 Mon Sep 17 00:00:00 2001 From: Pongsatorn Date: Sat, 23 Aug 2025 18:38:50 +0700 Subject: [PATCH] update sessionID backend --- app.py | 60 +++++++++++++--- siwatsystem/pympta.py | 158 +++++++++++++++++++++++------------------- 2 files changed, 137 insertions(+), 81 deletions(-) diff --git a/app.py b/app.py index 757a6df..6e75963 100644 --- a/app.py +++ b/app.py @@ -293,12 +293,22 @@ async def detect(websocket: WebSocket): subscription_parts = stream["subscriptionIdentifier"].split(';') display_identifier = subscription_parts[0] if subscription_parts else None - # Create context for pipeline execution (session_id will be generated by pipeline) + # Get backend session ID if available + backend_session_id = session_ids.get(display_identifier) + + # Create context for pipeline execution with backend sessionId pipeline_context = { "camera_id": camera_id, - "display_id": display_identifier + "display_id": display_identifier, + "backend_session_id": backend_session_id } + if backend_session_id: + logger.info(f"๐Ÿ”ฅ USING BACKEND SESSION_ID: {backend_session_id} for camera {camera_id} (display: {display_identifier})") + logger.debug(f"Pipeline context includes backend session_id: {backend_session_id}") + else: + logger.debug(f"โŒ No backend session_id available for display: {display_identifier} (session_ids: {session_ids})") + detection_result = run_pipeline(cropped_frame, model_tree, context=pipeline_context) process_time = (time.time() - start_time) * 1000 logger.debug(f"Detection for camera {camera_id} completed in {process_time:.2f}ms") @@ -306,13 +316,17 @@ async def detect(websocket: WebSocket): # Log the raw detection result for debugging logger.debug(f"Raw detection result for camera {camera_id}:\n{json.dumps(detection_result, indent=2, default=str)}") - # Extract session_id from pipeline result (generated during database record creation) + # Extract session_id from pipeline result (uses backend sessionId) session_id = None if detection_result and isinstance(detection_result, dict): - # Check if pipeline generated a session_id (happens when Car+Frontal detected together) + # Check if pipeline used backend session_id for operations if "session_id" in detection_result: session_id = detection_result["session_id"] - logger.debug(f"Extracted session_id from pipeline result: {session_id}") + logger.debug(f"Pipeline used session_id: {session_id}") + elif backend_session_id: + # Use backend session_id even if pipeline didn't return it + session_id = backend_session_id + logger.debug(f"Using backend session_id for WebSocket response: {session_id}") # Process detection result - run_pipeline returns the primary detection directly if detection_result and isinstance(detection_result, dict) and "class" in detection_result: @@ -376,10 +390,13 @@ async def detect(websocket: WebSocket): } } - # Add session ID if available (generated by pipeline when Car+Frontal detected) + # Add session ID if available (from backend or pipeline operations) if session_id is not None: detection_data["sessionId"] = session_id + logger.info(f"๐Ÿ“ค WEBSOCKET RESPONSE with sessionId: {session_id} for camera {camera_id}") logger.debug(f"Added session_id to WebSocket response: {session_id}") + else: + logger.debug(f"๐Ÿ“ค WebSocket response WITHOUT sessionId for camera {camera_id}") if highest_confidence_detection.get("class") != "none": confidence = highest_confidence_detection.get("confidence", 0.0) @@ -1131,14 +1148,41 @@ async def detect(websocket: WebSocket): display_identifier = payload.get("displayIdentifier") session_id = payload.get("sessionId") + logger.info(f"๐Ÿ†” BACKEND SESSIONID RECEIVED: displayId={display_identifier}, sessionId={session_id}") + logger.debug(f"Full setSessionId payload: {payload}") + if display_identifier: # Store session ID for this display if session_id is None: session_ids.pop(display_identifier, None) - logger.info(f"Cleared session ID for display {display_identifier}") + logger.info(f"๐Ÿšซ BACKEND ENDED SESSION: Cleared session ID for display {display_identifier} - resetting tracking") + + # Reset tracking state for all cameras associated with this display + with streams_lock: + for camera_id, stream in streams.items(): + if stream["subscriptionIdentifier"].startswith(display_identifier + ";"): + # Import here to avoid circular import + from siwatsystem.pympta import reset_tracking_state + model_id = stream.get("modelId", "unknown") + reset_tracking_state(camera_id, model_id, "backend session ended") + logger.info(f"Reset tracking for camera {camera_id} (display: {display_identifier})") else: session_ids[display_identifier] = session_id - logger.info(f"Set session ID {session_id} for display {display_identifier}") + logger.info(f"โœ… BACKEND SESSION STARTED: Set session ID {session_id} for display {display_identifier}") + logger.debug(f"Current session_ids dict: {session_ids}") + + # Clear waiting state for cameras associated with this display + with streams_lock: + for camera_id, stream in streams.items(): + if stream["subscriptionIdentifier"].startswith(display_identifier + ";"): + from siwatsystem.pympta import get_camera_stability_data + model_id = stream.get("modelId", "unknown") + stability_data = get_camera_stability_data(camera_id, model_id) + session_state = stability_data["session_state"] + if session_state.get("waiting_for_backend_session", False): + session_state["waiting_for_backend_session"] = False + session_state["wait_start_time"] = 0.0 + logger.info(f"๐Ÿš€ PIPELINE UNBLOCKED: Backend sessionId {session_id} received - camera {camera_id} can proceed with pipeline") elif msg_type == "patchSession": session_id = data.get("sessionId") diff --git a/siwatsystem/pympta.py b/siwatsystem/pympta.py index 64218d6..719fc93 100644 --- a/siwatsystem/pympta.py +++ b/siwatsystem/pympta.py @@ -22,8 +22,8 @@ logger = logging.getLogger("detector_worker.pympta") # Structure: {camera_id: {model_id: {"track_stability_counters": {track_id: count}, "stable_tracks": set(), "session_state": {...}}}} _camera_stability_tracking = {} -# Timer-based cooldown configuration (for testing) -_cooldown_duration_seconds = 30 +# Session timeout configuration (waiting for backend sessionId) +_session_timeout_seconds = 15 def validate_redis_config(redis_config: dict) -> bool: """Validate Redis configuration parameters.""" @@ -759,7 +759,8 @@ def get_camera_stability_data(camera_id, model_id): "stable_tracks": set(), "session_state": { "active": True, - "cooldown_until": 0.0, + "waiting_for_backend_session": False, + "wait_start_time": 0.0, "reset_tracker_on_resume": False } } @@ -822,70 +823,50 @@ def check_stable_tracks(camera_id, model_id, regions_dict): has_stable_tracks = len(stable_detections) > 0 return has_stable_tracks, stable_detections -def start_cooldown_timer(camera_id, model_id): - """Start 30-second cooldown timer after successful pipeline completion.""" +def reset_tracking_state(camera_id, model_id, reason="session ended"): + """Reset tracking state after session completion or timeout.""" stability_data = get_camera_stability_data(camera_id, model_id) session_state = stability_data["session_state"] - # Start timer-based cooldown - cooldown_until = time.time() + _cooldown_duration_seconds - session_state["cooldown_until"] = cooldown_until - session_state["active"] = False - session_state["reset_tracker_on_resume"] = True # Flag to reset YOLO tracker + # Clear all tracking data for fresh start + stability_data["track_stability_counters"].clear() + stability_data["stable_tracks"].clear() + session_state["active"] = True + session_state["waiting_for_backend_session"] = False + session_state["wait_start_time"] = 0.0 + session_state["reset_tracker_on_resume"] = True - logger.info(f"Camera {camera_id}: ๐Ÿ›‘ Starting {_cooldown_duration_seconds}s cooldown timer (until: {cooldown_until:.2f})") - - # DO NOT clear tracking state here - preserve it during cooldown - # Tracking state will be cleared when cooldown expires and new session starts + logger.info(f"Camera {camera_id}: ๐Ÿ”„ Reset tracking state - {reason}") + logger.info(f"Camera {camera_id}: ๐Ÿงน Cleared stability counters and stable tracks for fresh session") def is_camera_active(camera_id, model_id): - """Check if camera should be processing detections (timer-based cooldown).""" + """Check if camera should be processing detections.""" stability_data = get_camera_stability_data(camera_id, model_id) session_state = stability_data["session_state"] - # Check if cooldown timer has expired - if not session_state["active"]: + # Check if waiting for backend sessionId has timed out + if session_state.get("waiting_for_backend_session", False): current_time = time.time() - cooldown_until = session_state["cooldown_until"] - remaining_time = cooldown_until - current_time + wait_start_time = session_state.get("wait_start_time", 0) + elapsed_time = current_time - wait_start_time - if current_time >= cooldown_until: - session_state["active"] = True - session_state["reset_tracker_on_resume"] = True # Ensure tracker reset flag is set - - # Clear tracking state NOW - before new detection session starts - stability_data["track_stability_counters"].clear() - stability_data["stable_tracks"].clear() - - logger.info(f"Camera {camera_id}: ๐Ÿ“ข Cooldown timer ended, resuming detection with fresh track IDs") - logger.info(f"Camera {camera_id}: ๐Ÿงน Cleared stability counters and stable tracks for fresh session") + if elapsed_time >= _session_timeout_seconds: + logger.warning(f"Camera {camera_id}: Backend sessionId timeout ({_session_timeout_seconds}s) - resetting tracking") + reset_tracking_state(camera_id, model_id, "backend sessionId timeout") + return True else: - logger.debug(f"Camera {camera_id}: Still in cooldown - {remaining_time:.1f}s remaining") + remaining_time = _session_timeout_seconds - elapsed_time + logger.debug(f"Camera {camera_id}: Still waiting for backend sessionId - {remaining_time:.1f}s remaining") + return False - return session_state["active"] + return session_state.get("active", True) def cleanup_camera_stability(camera_id): - """Clean up stability tracking data when a camera is disconnected, preserving cooldown timers.""" + """Clean up stability tracking data when a camera is disconnected.""" global _camera_stability_tracking if camera_id in _camera_stability_tracking: - # Check if any models are still in cooldown before cleanup - models_in_cooldown = [] - for model_id, model_data in _camera_stability_tracking[camera_id].items(): - session_state = model_data.get("session_state", {}) - if not session_state.get("active", True) and time.time() < session_state.get("cooldown_until", 0): - cooldown_remaining = session_state["cooldown_until"] - time.time() - models_in_cooldown.append((model_id, cooldown_remaining)) - logger.warning(f"โš ๏ธ Camera {camera_id}: Model {model_id} is in cooldown ({cooldown_remaining:.1f}s remaining) - preserving timer!") - - if models_in_cooldown: - # DO NOT clear any tracking data during cooldown - preserve everything - logger.warning(f"โš ๏ธ Camera {camera_id}: PRESERVING ALL data during cooldown - no cleanup performed!") - logger.warning(f" - Track IDs will reset only AFTER cooldown expires") - logger.warning(f" - Stability counters preserved until cooldown ends") - else: - # Safe to delete everything - no active cooldowns - del _camera_stability_tracking[camera_id] - logger.info(f"Cleaned up stability tracking data for camera {camera_id} (no active cooldowns)") + del _camera_stability_tracking[camera_id] + logger.info(f"Cleaned up stability tracking data for camera {camera_id}") def validate_pipeline_execution(node, regions_dict): """ @@ -955,6 +936,16 @@ def run_pipeline(frame, node: dict, return_bbox: bool=False, context=None): - Context passing for session/camera information """ try: + # Extract backend sessionId from context at the start of function + backend_session_id = context.get("backend_session_id") if context else None + camera_id = context.get("camera_id", "unknown") if context else "unknown" + model_id = node.get("modelId", "unknown") + + if backend_session_id: + logger.info(f"๐Ÿ”‘ PIPELINE USING BACKEND SESSION_ID: {backend_session_id} for camera {camera_id}") + else: + logger.debug(f"โŒ No backend session_id in pipeline context for camera {camera_id}") + task = getattr(node["model"], "task", None) # โ”€โ”€โ”€ Classification stage โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ @@ -992,11 +983,8 @@ def run_pipeline(frame, node: dict, return_bbox: bool=False, context=None): return (det, None) if return_bbox else det # โ”€โ”€โ”€ Session management check โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ - camera_id = context.get("camera_id", "unknown") if context else "unknown" - model_id = node.get("modelId", "unknown") - if not is_camera_active(camera_id, model_id): - logger.info(f"โฐ Camera {camera_id}: Tracker stopped - in cooldown period, skipping all detection") + logger.debug(f"โฐ Camera {camera_id}: Waiting for backend sessionId, skipping pipeline") return (None, None) if return_bbox else None # โ”€โ”€โ”€ Detection stage - Using structured detection function โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ @@ -1026,7 +1014,27 @@ def run_pipeline(frame, node: dict, return_bbox: bool=False, context=None): primary_bbox = primary_detection.get("bbox", [0, 0, 0, 0]) return (primary_detection, primary_bbox) if return_bbox else primary_detection else: - logger.info(f"Camera {camera_id}: Stable tracks {[det[1] for det in stable_detections]} detected - proceeding with full pipeline") + logger.info(f"Camera {camera_id}: Stable tracks {[det[1] for det in stable_detections]} detected - checking backend sessionId") + + # Check if we need to wait for backend sessionId + if not backend_session_id: + logger.info(f"Camera {camera_id}: Stable car detected, waiting for backend sessionId...") + stability_data = get_camera_stability_data(camera_id, model_id) + session_state = stability_data["session_state"] + + if not session_state.get("waiting_for_backend_session", False): + # Start waiting for backend sessionId + session_state["waiting_for_backend_session"] = True + session_state["wait_start_time"] = time.time() + logger.info(f"โณ Camera {camera_id}: WAITING FOR BACKEND SESSION_ID (timeout: {_session_timeout_seconds}s)") + logger.info(f"๐Ÿ“ก Stable car detected - sending imageDetection to trigger backend session creation") + + # Return detection to signal backend, but don't proceed with pipeline + primary_detection = max(all_detections, key=lambda x: x["confidence"]) if all_detections else {"class": "none", "confidence": 0.0, "bbox": [0, 0, 0, 0]} + primary_bbox = primary_detection.get("bbox", [0, 0, 0, 0]) + return (primary_detection, primary_bbox) if return_bbox else primary_detection + + logger.info(f"๐Ÿš€ Camera {camera_id}: BACKEND SESSION_ID AVAILABLE ({backend_session_id}) - proceeding with full pipeline") # โ”€โ”€โ”€ Pre-validate pipeline execution โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ pipeline_valid, missing_branches = validate_pipeline_execution(node, regions_dict) @@ -1043,36 +1051,40 @@ def run_pipeline(frame, node: dict, return_bbox: bool=False, context=None): **(context or {}) } - # โ”€โ”€โ”€ Create initial database record when valid detection found โ”€โ”€โ”€โ”€ + # โ”€โ”€โ”€ Check backend sessionId before database operations โ”€โ”€โ”€โ”€ + if node.get("db_manager") and regions_dict: - # Create database record if we have any valid detection (after class mapping and filtering) detected_classes = list(regions_dict.keys()) - logger.debug(f"Valid detections found for database record: {detected_classes}") + logger.debug(f"Valid detections found - checking for backend sessionId: {detected_classes}") - # Always create record if we have valid detections that passed all filters + if not backend_session_id: + logger.error(f"๐Ÿšซ Camera {camera_id}: No backend sessionId available - cannot proceed with database operations") + logger.error(f"๐Ÿšซ Camera {camera_id}: Pipeline requires backend sessionId for Redis/PostgreSQL operations") + # Reset tracking and wait for new stable car + reset_tracking_state(camera_id, model_id, "missing backend sessionId") + return (None, None) if return_bbox else None + + # Use backend sessionId for database operations if detected_classes: - # Generate UUID session_id since client session is None for now - import uuid as uuid_lib from datetime import datetime - generated_session_id = str(uuid_lib.uuid4()) - - # Insert initial detection record display_id = detection_result.get("display_id", "unknown") timestamp = datetime.now().strftime("%Y-%m-%dT%H-%M-%S") inserted_session_id = node["db_manager"].insert_initial_detection( display_id=display_id, captured_timestamp=timestamp, - session_id=generated_session_id + session_id=backend_session_id # Use backend sessionId ) if inserted_session_id: - # Update detection_result with the generated session_id for actions and branches detection_result["session_id"] = inserted_session_id - detection_result["timestamp"] = timestamp # Update with proper timestamp - logger.info(f"Created initial database record with session_id: {inserted_session_id}") - else: - logger.debug("Database record not created - no valid detections found after filtering") + detection_result["timestamp"] = timestamp + logger.info(f"๐Ÿ’พ DATABASE RECORD CREATED with backend session_id: {inserted_session_id}") + logger.debug(f"Database record: display_id={display_id}, timestamp={timestamp}") + else: + logger.error(f"Failed to create database record with backend session_id: {backend_session_id}") + reset_tracking_state(camera_id, model_id, "database insertion failed") + return (None, None) if return_bbox else None # Execute actions for root node only if it doesn't have branches # Branch nodes with actions will execute them after branch processing @@ -1194,9 +1206,8 @@ def run_pipeline(frame, node: dict, return_bbox: bool=False, context=None): if node.get("parallelActions") and "branch_results" in detection_result: execute_parallel_actions(node, frame, detection_result, regions_dict) - # โ”€โ”€โ”€ Start 30s cooldown timer after successful pipeline completion โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ - start_cooldown_timer(camera_id, model_id) - logger.info(f"Camera {camera_id}: Pipeline completed successfully, starting 30s cooldown") + # โ”€โ”€โ”€ Note: Tracking will be reset when backend sends setSessionId: null โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ + logger.info(f"Camera {camera_id}: Pipeline completed successfully - waiting for backend to end session") # โ”€โ”€โ”€ Execute actions after successful detection AND branch processing โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ # This ensures detection nodes (like frontal_detection_v1) execute their actions @@ -1220,5 +1231,6 @@ def run_pipeline(frame, node: dict, return_bbox: bool=False, context=None): except Exception as e: logger.error(f"Error in node {node.get('modelId')}: {e}") + import traceback traceback.print_exc() return (None, None) if return_bbox else None