from typing import Any, Dict import os import json import time import queue import torch import cv2 import numpy as np import base64 import logging import threading import requests import asyncio import psutil import zipfile import ssl import urllib3 import subprocess import tempfile from urllib.parse import urlparse from requests.adapters import HTTPAdapter from urllib3.util.ssl_ import create_urllib3_context from fastapi import FastAPI, WebSocket, HTTPException from fastapi.websockets import WebSocketDisconnect from fastapi.responses import Response from websockets.exceptions import ConnectionClosedError from ultralytics import YOLO # Import shared pipeline functions from siwatsystem.pympta import load_pipeline_from_zip, run_pipeline, cleanup_camera_stability app = FastAPI() # Global dictionaries to keep track of models and streams # "models" now holds a nested dict: { camera_id: { modelId: model_tree } } models: Dict[str, Dict[str, Any]] = {} streams: Dict[str, Dict[str, Any]] = {} # Store session IDs per display session_ids: Dict[str, int] = {} # Track shared camera streams by camera URL camera_streams: Dict[str, Dict[str, Any]] = {} # Map subscriptions to their camera URL subscription_to_camera: Dict[str, str] = {} # Store latest frames for REST API access (separate from processing buffer) latest_frames: Dict[str, Any] = {} # Store cached detection dict after successful pipeline completion cached_detections: Dict[str, Dict[str, Any]] = {} # Track frame skipping for pipeline buffer after detection frame_skip_flags: Dict[str, bool] = {} # Track camera connection states for immediate error handling camera_states: Dict[str, Dict[str, Any]] = {} # Track session ID states and pipeline modes per camera session_pipeline_states: Dict[str, Dict[str, Any]] = {} # Store full pipeline results for caching cached_full_pipeline_results: Dict[str, Dict[str, Any]] = {} with open("config.json", "r") as f: config = json.load(f) poll_interval = config.get("poll_interval_ms", 100) reconnect_interval = config.get("reconnect_interval_sec", 5) TARGET_FPS = config.get("target_fps", 10) poll_interval = 1000 / TARGET_FPS logging.info(f"Poll interval: {poll_interval}ms") max_streams = config.get("max_streams", 5) max_retries = config.get("max_retries", 3) # Configure logging logging.basicConfig( level=logging.INFO, # Set to INFO level for less verbose output format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", handlers=[ logging.FileHandler("detector_worker.log"), # Write logs to a file logging.StreamHandler() # Also output to console ] ) # Create a logger specifically for this application logger = logging.getLogger("detector_worker") logger.setLevel(logging.DEBUG) # Set app-specific logger to DEBUG level # Create WebSocket communication logger ws_logger = logging.getLogger("websocket_comm") ws_logger.setLevel(logging.INFO) ws_handler = logging.FileHandler("websocket_comm.log", encoding='utf-8') ws_formatter = logging.Formatter("%(asctime)s [%(levelname)s] %(message)s") ws_handler.setFormatter(ws_formatter) ws_logger.addHandler(ws_handler) ws_logger.propagate = False # Don't propagate to root logger # Ensure all other libraries (including root) use at least INFO level logging.getLogger().setLevel(logging.INFO) logger.info("Starting detector worker application") logger.info(f"Configuration: Target FPS: {TARGET_FPS}, Max streams: {max_streams}, Max retries: {max_retries}") ws_logger.info("WebSocket communication logging started - TX/RX format") logger.info("WebSocket communication will be logged to websocket_comm.log") # Ensure the models directory exists os.makedirs("models", exist_ok=True) logger.info("Ensured models directory exists") # Constants for heartbeat and timeouts HEARTBEAT_INTERVAL = 2 # seconds WORKER_TIMEOUT_MS = 10000 logger.debug(f"Heartbeat interval set to {HEARTBEAT_INTERVAL} seconds") # Locks for thread-safe operations streams_lock = threading.Lock() models_lock = threading.Lock() logger.debug("Initialized thread locks") # Add helper to download mpta ZIP file from a remote URL def download_mpta(url: str, dest_path: str) -> str: try: logger.info(f"Starting download of model from {url} to {dest_path}") os.makedirs(os.path.dirname(dest_path), exist_ok=True) response = requests.get(url, stream=True) if response.status_code == 200: file_size = int(response.headers.get('content-length', 0)) logger.info(f"Model file size: {file_size/1024/1024:.2f} MB") downloaded = 0 with open(dest_path, "wb") as f: for chunk in response.iter_content(chunk_size=8192): f.write(chunk) downloaded += len(chunk) if file_size > 0 and downloaded % (file_size // 10) < 8192: # Log approximately every 10% logger.debug(f"Download progress: {downloaded/file_size*100:.1f}%") logger.info(f"Successfully downloaded mpta file from {url} to {dest_path}") return dest_path else: logger.error(f"Failed to download mpta file (status code {response.status_code}): {response.text}") return None except Exception as e: logger.error(f"Exception downloading mpta file from {url}: {str(e)}", exc_info=True) return None # Add helper to fetch snapshot image from HTTP/HTTPS URL def fetch_snapshot(url: str): try: from requests.auth import HTTPBasicAuth, HTTPDigestAuth import requests.adapters import urllib3 # Parse URL to extract credentials parsed = urlparse(url) # Prepare headers - some cameras require User-Agent and specific headers headers = { 'User-Agent': 'Mozilla/5.0 (compatible; DetectorWorker/1.0)', 'Accept': 'image/jpeg,image/*,*/*', 'Connection': 'close', 'Cache-Control': 'no-cache' } # Create a session with custom adapter for better connection handling session = requests.Session() adapter = requests.adapters.HTTPAdapter( pool_connections=1, pool_maxsize=1, max_retries=urllib3.util.retry.Retry( total=2, backoff_factor=0.1, status_forcelist=[500, 502, 503, 504] ) ) session.mount('http://', adapter) session.mount('https://', adapter) # Reconstruct URL without credentials clean_url = f"{parsed.scheme}://{parsed.hostname}" if parsed.port: clean_url += f":{parsed.port}" clean_url += parsed.path if parsed.query: clean_url += f"?{parsed.query}" auth = None response = None if parsed.username and parsed.password: # Try HTTP Digest authentication first (common for IP cameras) try: auth = HTTPDigestAuth(parsed.username, parsed.password) response = session.get(clean_url, auth=auth, headers=headers, timeout=(5, 15), stream=True) if response.status_code == 200: logger.debug(f"Successfully authenticated using HTTP Digest for {clean_url}") elif response.status_code == 401: # If Digest fails, try Basic auth logger.debug(f"HTTP Digest failed, trying Basic auth for {clean_url}") auth = HTTPBasicAuth(parsed.username, parsed.password) response = session.get(clean_url, auth=auth, headers=headers, timeout=(5, 15), stream=True) if response.status_code == 200: logger.debug(f"Successfully authenticated using HTTP Basic for {clean_url}") except Exception as auth_error: logger.debug(f"Authentication setup error: {auth_error}") # Fallback to original URL with embedded credentials response = session.get(url, headers=headers, timeout=(5, 15), stream=True) else: # No credentials in URL, make request as-is response = session.get(url, headers=headers, timeout=(5, 15), stream=True) if response and response.status_code == 200: # Read content with size limit to prevent memory issues content = b'' max_size = 10 * 1024 * 1024 # 10MB limit for chunk in response.iter_content(chunk_size=8192): content += chunk if len(content) > max_size: logger.error(f"Snapshot too large (>{max_size} bytes) from {clean_url}") return None # Convert response content to numpy array nparr = np.frombuffer(content, np.uint8) # Decode image frame = cv2.imdecode(nparr, cv2.IMREAD_COLOR) if frame is not None: logger.debug(f"Successfully fetched snapshot from {clean_url}, shape: {frame.shape}, size: {len(content)} bytes") return frame else: logger.error(f"Failed to decode image from snapshot URL: {clean_url} (content size: {len(content)} bytes)") return None elif response: logger.error(f"Failed to fetch snapshot (status code {response.status_code}): {clean_url}") # Log response headers and first part of content for debugging logger.debug(f"Response headers: {dict(response.headers)}") if len(response.content) < 1000: logger.debug(f"Response content: {response.content[:500]}") return None else: logger.error(f"No response received from snapshot URL: {clean_url}") return None except requests.exceptions.Timeout as e: logger.error(f"Timeout fetching snapshot from {url}: {str(e)}") return None except requests.exceptions.ConnectionError as e: logger.error(f"Connection error fetching snapshot from {url}: {str(e)}") return None except Exception as e: logger.error(f"Exception fetching snapshot from {url}: {str(e)}", exc_info=True) return None # Helper to get crop coordinates from stream def get_crop_coords(stream): return { "cropX1": stream.get("cropX1"), "cropY1": stream.get("cropY1"), "cropX2": stream.get("cropX2"), "cropY2": stream.get("cropY2") } # Camera state management functions def set_camera_connected(camera_id, connected=True, error_msg=None): """Set camera connection state and track error information""" current_time = time.time() if camera_id not in camera_states: camera_states[camera_id] = { "connected": True, "last_error": None, "last_error_time": None, "consecutive_failures": 0, "disconnection_notified": False } state = camera_states[camera_id] was_connected = state["connected"] if connected: state["connected"] = True state["consecutive_failures"] = 0 state["disconnection_notified"] = False if not was_connected: logger.info(f"ðŸ“ķ CAMERA RECONNECTED: {camera_id}") else: state["connected"] = False state["last_error"] = error_msg state["last_error_time"] = current_time state["consecutive_failures"] += 1 # Distinguish between temporary and permanent disconnection is_permanent = state["consecutive_failures"] >= 3 if was_connected and is_permanent: logger.error(f"ðŸ“ĩ CAMERA DISCONNECTED: {camera_id} - {error_msg} (consecutive failures: {state['consecutive_failures']})") logger.info(f"ðŸšĻ CAMERA ERROR DETECTED - Will send detection: null to reset backend session for {camera_id}") def is_camera_connected(camera_id): """Check if camera is currently connected""" return camera_states.get(camera_id, {}).get("connected", True) def should_notify_disconnection(camera_id): """Check if we should notify backend about disconnection""" state = camera_states.get(camera_id, {}) is_disconnected = not state.get("connected", True) not_yet_notified = not state.get("disconnection_notified", False) has_enough_failures = state.get("consecutive_failures", 0) >= 3 return is_disconnected and not_yet_notified and has_enough_failures def mark_disconnection_notified(camera_id): """Mark that we've notified backend about this disconnection""" if camera_id in camera_states: camera_states[camera_id]["disconnection_notified"] = True logger.debug(f"Marked disconnection notification sent for camera {camera_id}") def get_or_init_session_pipeline_state(camera_id): """Get or initialize session pipeline state for a camera""" if camera_id not in session_pipeline_states: session_pipeline_states[camera_id] = { "mode": "validation_detecting", # "validation_detecting", "send_detections", "waiting_for_session_id", "full_pipeline", "lightweight", "car_gone_waiting" "session_id_received": False, "full_pipeline_completed": False, "absence_counter": 0, "max_absence_frames": 3, "yolo_inference_enabled": True, # Controls whether to run YOLO inference "cached_detection_dict": None, # Cached detection dict for lightweight mode "stable_track_id": None # The stable track ID we're monitoring } return session_pipeline_states[camera_id] def update_session_pipeline_mode(camera_id, new_mode, session_id=None): """Update session pipeline mode and related state""" state = get_or_init_session_pipeline_state(camera_id) old_mode = state["mode"] state["mode"] = new_mode if session_id: state["session_id_received"] = True state["absence_counter"] = 0 # Reset absence counter when session starts logger.info(f"📊 Camera {camera_id}: Pipeline mode changed from '{old_mode}' to '{new_mode}'") return state #################################################### # REST API endpoint for image retrieval #################################################### @app.get("/camera/{camera_id}/image") async def get_camera_image(camera_id: str): """ Get the current frame from a camera as JPEG image """ try: # URL decode the camera_id to handle encoded characters like %3B for semicolon from urllib.parse import unquote original_camera_id = camera_id camera_id = unquote(camera_id) logger.debug(f"REST API request: original='{original_camera_id}', decoded='{camera_id}'") with streams_lock: if camera_id not in streams: logger.warning(f"Camera ID '{camera_id}' not found in streams. Current streams: {list(streams.keys())}") raise HTTPException(status_code=404, detail=f"Camera {camera_id} not found or not active") # Check if we have a cached frame for this camera if camera_id not in latest_frames: logger.warning(f"No cached frame available for camera '{camera_id}'.") raise HTTPException(status_code=404, detail=f"No frame available for camera {camera_id}") frame = latest_frames[camera_id] logger.debug(f"Retrieved cached frame for camera '{camera_id}', frame shape: {frame.shape}") # Encode frame as JPEG success, buffer_img = cv2.imencode('.jpg', frame, [cv2.IMWRITE_JPEG_QUALITY, 85]) if not success: raise HTTPException(status_code=500, detail="Failed to encode image as JPEG") # Return image as binary response return Response(content=buffer_img.tobytes(), media_type="image/jpeg") except HTTPException: raise except Exception as e: logger.error(f"Error retrieving image for camera {camera_id}: {str(e)}", exc_info=True) raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") #################################################### # Detection and frame processing functions #################################################### @app.websocket("/") async def detect(websocket: WebSocket): logger.info("WebSocket connection accepted") persistent_data_dict = {} async def handle_detection(camera_id, stream, frame, websocket, model_tree, persistent_data): try: # Check camera connection state first - handle disconnection immediately if should_notify_disconnection(camera_id): logger.error(f"ðŸšĻ CAMERA DISCONNECTION DETECTED: {camera_id} - sending immediate detection: null") # Clear cached detections and occupancy state cached_detections.pop(camera_id, None) frame_skip_flags.pop(camera_id, None) cached_full_pipeline_results.pop(camera_id, None) # Clear cached pipeline results session_pipeline_states.pop(camera_id, None) # Reset session pipeline state # Reset pipeline state immediately from siwatsystem.pympta import reset_tracking_state model_id = stream.get("modelId", "unknown") reset_tracking_state(camera_id, model_id, "camera disconnected") # Send immediate detection: null to backend detection_data = { "type": "imageDetection", "subscriptionIdentifier": stream["subscriptionIdentifier"], "timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), "data": { "detection": None, # null detection for disconnection "modelId": stream["modelId"], "modelName": stream["modelName"] } } try: ws_logger.info(f"TX -> {json.dumps(detection_data, separators=(',', ':'))}") await websocket.send_json(detection_data) except RuntimeError as e: if "websocket.close" in str(e): logger.warning(f"WebSocket connection closed - cannot send disconnection signal for camera {camera_id}") return persistent_data else: raise mark_disconnection_notified(camera_id) logger.info(f"ðŸ“Ą SENT DISCONNECTION SIGNAL - detection: null for camera {camera_id}, backend should clear session") return persistent_data # Apply crop if specified cropped_frame = frame if all(coord is not None for coord in [stream.get("cropX1"), stream.get("cropY1"), stream.get("cropX2"), stream.get("cropY2")]): cropX1, cropY1, cropX2, cropY2 = stream["cropX1"], stream["cropY1"], stream["cropX2"], stream["cropY2"] cropped_frame = frame[cropY1:cropY2, cropX1:cropX2] logger.debug(f"Applied crop coordinates ({cropX1}, {cropY1}, {cropX2}, {cropY2}) to frame for camera {camera_id}") logger.debug(f"Processing frame for camera {camera_id} with model {stream['modelId']}") start_time = time.time() # Extract display identifier for pipeline context subscription_parts = stream["subscriptionIdentifier"].split(';') display_identifier = subscription_parts[0] if subscription_parts else None # Get backend session ID if available backend_session_id = session_ids.get(display_identifier) # Get or initialize session pipeline state pipeline_state = get_or_init_session_pipeline_state(camera_id) current_mode = pipeline_state["mode"] logger.debug(f"🔍 SESSIONID LOOKUP: display='{display_identifier}', session_id={repr(backend_session_id)}, mode='{current_mode}'") logger.debug(f"🔍 Available session_ids: {session_ids}") # ═══ SESSION ID-BASED PROCESSING MODE ═══ if not backend_session_id: # No session ID - keep current mode if it's validation_detecting or send_detections if current_mode not in ["validation_detecting", "send_detections", "waiting_for_session_id"]: update_session_pipeline_mode(camera_id, "validation_detecting") current_mode = "validation_detecting" logger.debug(f"🔍 Camera {camera_id}: No session ID - in {current_mode} mode") else: # Session ID available - switch to full pipeline mode if current_mode in ["send_detections", "waiting_for_session_id"]: # Session ID just arrived - switch to full pipeline mode update_session_pipeline_mode(camera_id, "full_pipeline", backend_session_id) current_mode = "full_pipeline" logger.info(f"ðŸ”Ĩ Camera {camera_id}: Session ID received ({backend_session_id}) - switching to FULL PIPELINE mode") # Create context for pipeline execution pipeline_context = { "camera_id": camera_id, "display_id": display_identifier, "backend_session_id": backend_session_id } start_time = time.time() detection_result = None if current_mode == "validation_detecting": # ═══ TRACK VALIDATION MODE ═══ # Run tracking-based validation with track ID stability logger.debug(f"🔍 Camera {camera_id}: In validation_detecting mode - running track-based validation") # Get tracking configuration from model_tree tracking_config = model_tree.get("tracking", {}) tracking_enabled = tracking_config.get("enabled", True) stability_threshold = tracking_config.get("stabilityThreshold", 4) # Default to "none" - only proceed after track validation detection_result = {"class": "none", "confidence": 1.0, "bbox": [0, 0, 0, 0]} if tracking_enabled: # Run full tracking detection to get track IDs from siwatsystem.pympta import run_detection_with_tracking all_detections, regions_dict, track_validation_result = run_detection_with_tracking(cropped_frame, model_tree, pipeline_context) if track_validation_result.get("validation_complete", False): # Track validation completed - we have stable track IDs stable_tracks = track_validation_result.get("stable_tracks", []) logger.info(f"ðŸŽŊ Camera {camera_id}: TRACK VALIDATION COMPLETED - stable tracks: {stable_tracks}") # Switch to send_detections mode update_session_pipeline_mode(camera_id, "send_detections") # Send the best detection with stable track if all_detections: # Find detection with stable track ID stable_detection = None for detection in all_detections: if detection.get("id") in stable_tracks: stable_detection = detection break if stable_detection: detection_result = { "class": stable_detection.get("class", "car"), "confidence": stable_detection.get("confidence", 0.0), "bbox": stable_detection.get("bbox", [0, 0, 0, 0]), "track_id": stable_detection.get("id") } logger.info(f"🚗 Camera {camera_id}: SENDING STABLE DETECTION - track ID {detection_result['track_id']}") else: logger.warning(f"⚠ïļ Camera {camera_id}: Stable tracks found but no matching detection") else: # Track validation still in progress stable_tracks = track_validation_result.get("stable_tracks", []) current_tracks = track_validation_result.get("current_tracks", []) if current_tracks: track_id = current_tracks[0] if current_tracks else "None" stable_status = "STABLE" if stable_tracks else "validating" logger.info(f"🔍 Camera {camera_id}: TRACK VALIDATION - car track_id {track_id} ({stable_status}, need {stability_threshold} consecutive frames)") else: logger.debug(f"ðŸ‘ŧ Camera {camera_id}: No car detected") logger.debug(f"ðŸ“Ī Camera {camera_id}: Sending 'none' (track validation in progress)") else: # Tracking disabled - fall back to basic detection validation logger.debug(f"🔍 Camera {camera_id}: Tracking disabled - using basic detection validation") from siwatsystem.pympta import run_lightweight_detection basic_detection = run_lightweight_detection(cropped_frame, model_tree) if basic_detection and basic_detection.get("car_detected"): best_detection = basic_detection.get("best_detection") # Increment validation counter for basic detection pipeline_state["validation_counter"] += 1 current_count = pipeline_state["validation_counter"] threshold = pipeline_state["validation_threshold"] if current_count >= threshold: update_session_pipeline_mode(camera_id, "send_detections") detection_result = { "class": best_detection.get("class", "car"), "confidence": best_detection.get("confidence", 0.0), "bbox": best_detection.get("bbox", [0, 0, 0, 0]) } logger.info(f"ðŸŽŊ Camera {camera_id}: BASIC VALIDATION COMPLETED after {current_count} frames") else: logger.info(f"📊 Camera {camera_id}: Basic validation progress {current_count}/{threshold}") else: # Reset validation counter if pipeline_state["validation_counter"] > 0: pipeline_state["validation_counter"] = 0 logger.info(f"🔄 Camera {camera_id}: Reset validation counter (no detection)") elif current_mode == "send_detections": # ═══ SEND DETECTIONS MODE ═══ # Validation completed - now send detection_dict for car detections, detection: null for no car logger.debug(f"ðŸ“Ī Camera {camera_id}: In send_detections mode - sending detection_dict for cars") from siwatsystem.pympta import run_lightweight_detection basic_detection = run_lightweight_detection(cropped_frame, model_tree) if basic_detection and basic_detection.get("car_detected"): # Car detected - send detection_dict best_detection = basic_detection.get("best_detection") detection_result = { "class": best_detection.get("class", "car"), "confidence": best_detection.get("confidence", 0.0), "bbox": best_detection.get("bbox", [0, 0, 0, 0]) } logger.info(f"🚗 Camera {camera_id}: SENDING DETECTION_DICT - {detection_result['class']} (conf={detection_result['confidence']:.3f}) - backend should generate session ID") else: # No car detected - send "none" detection_result = {"class": "none", "confidence": 1.0, "bbox": [0, 0, 0, 0]} logger.debug(f"ðŸ‘ŧ Camera {camera_id}: No car detected - sending 'none'") elif current_mode == "waiting_for_session_id": # ═══ WAITING FOR SESSION ID MODE ═══ # Stop processing snapshots, wait for session ID logger.debug(f"âģ Camera {camera_id}: In waiting_for_session_id mode - not processing snapshots") return persistent_data # Don't process or send anything elif current_mode == "full_pipeline": # ═══ FULL PIPELINE MODE ═══ logger.info(f"ðŸ”Ĩ Camera {camera_id}: Running FULL PIPELINE (detection + branches + Redis + PostgreSQL)") detection_result = run_pipeline(cropped_frame, model_tree, context=pipeline_context) if detection_result and isinstance(detection_result, dict): # Cache the full pipeline result cached_full_pipeline_results[camera_id] = { "result": detection_result.copy(), "timestamp": time.time() } # Note: Will cache detection_dict after branch processing completes # Store the stable track ID for lightweight monitoring track_id = detection_result.get("track_id") or detection_result.get("id") if track_id is not None: pipeline_state["stable_track_id"] = track_id logger.info(f"ðŸ’ū Camera {camera_id}: Cached stable track_id={track_id}") else: logger.warning(f"⚠ïļ Camera {camera_id}: No track_id found in detection_result: {detection_result.keys()}") # Switch to lightweight mode update_session_pipeline_mode(camera_id, "lightweight") logger.info(f"✅ Camera {camera_id}: Full pipeline completed - switching to LIGHTWEIGHT mode") elif current_mode == "lightweight": # ═══ ENHANCED LIGHTWEIGHT MODE ═══ # Only run YOLO11n.pt to check stable track presence, use cached detection dict stable_track_id = pipeline_state.get("stable_track_id") cached_detection_dict = pipeline_state.get("cached_detection_dict") logger.debug(f"ðŸŠķ Camera {camera_id}: LIGHTWEIGHT MODE - monitoring stable track_id={stable_track_id}") if not pipeline_state.get("yolo_inference_enabled", True): # YOLO inference disabled - car considered gone, wait for reset logger.debug(f"🛑 Camera {camera_id}: YOLO inference disabled - waiting for reset") detection_result = None # Don't send anything else: # Run lightweight YOLO inference to check track presence only (no full pipeline) from siwatsystem.pympta import run_detection_with_tracking all_detections, regions_dict, track_validation_result = run_detection_with_tracking(cropped_frame, model_tree, pipeline_context) # OPTION A: Car presence only (track ID kept for internal use) any_car_detected = len(all_detections) > 0 current_tracks = track_validation_result.get("current_tracks", []) logger.debug(f"ðŸŠķ Camera {camera_id}: LIGHTWEIGHT - any_cars={any_car_detected} (main decision), current_tracks={current_tracks} (internal only)") if not any_car_detected: # NO cars detected at all - increment absence counter pipeline_state["absence_counter"] += 1 absence_count = pipeline_state["absence_counter"] max_absence = 2 # Changed from 3 to 2 consecutive frames logger.info(f"ðŸ‘ŧ Camera {camera_id}: NO CARS detected - absence {absence_count}/{max_absence}") # Check robust AND condition: backend confirmed AND detection confirmed backend_confirmed_gone = (backend_session_id is None) detection_confirmed_gone = (absence_count >= max_absence) logger.debug(f"🔍 Camera {camera_id}: Reset conditions - backend_null={backend_confirmed_gone}, absence_2frames={detection_confirmed_gone}") if backend_confirmed_gone and detection_confirmed_gone: # BOTH conditions met - RESET TO VALIDATION PHASE logger.info(f"🔄 Camera {camera_id}: ROBUST RESET - both conditions met (backend=null AND absence>=2)") # Clear all state and prepare for next car cached_full_pipeline_results.pop(camera_id, None) pipeline_state["cached_detection_dict"] = None pipeline_state["stable_track_id"] = None pipeline_state["absence_counter"] = 0 pipeline_state["yolo_inference_enabled"] = True # Re-enable for next car # Clear stability tracking data for this camera from siwatsystem.pympta import reset_camera_stability_tracking reset_camera_stability_tracking(camera_id, model_tree.get("modelId", "unknown")) # Switch back to validation phase - ready for next car update_session_pipeline_mode(camera_id, "detection_dict") logger.info(f"🔄 Camera {camera_id}: RESET TO VALIDATION - model ready for next car") detection_result = None # Stop sending data during reset else: # One or both conditions not met - keep sending cached detection dict if cached_detection_dict: detection_result = cached_detection_dict # Always send cached data logger.info(f"âģ Camera {camera_id}: NO CARS absence {absence_count}/2, backend_null={backend_confirmed_gone} - sending cached detection dict") else: logger.warning(f"⚠ïļ Camera {camera_id}: NO CARS but no cached detection dict available") detection_result = None else: # Cars detected - reset absence counter, send cached detection dict pipeline_state["absence_counter"] = 0 # Reset absence since cars are present if cached_detection_dict: detection_result = cached_detection_dict # Always send cached data logger.info(f"ðŸŠķ Camera {camera_id}: CARS DETECTED - sending cached detection dict:") logger.info(f"ðŸŠķ Camera {camera_id}: - Cached dict: {cached_detection_dict}") logger.info(f"ðŸŠķ Camera {camera_id}: - Track info (internal): {current_tracks}") else: logger.warning(f"⚠ïļ Camera {camera_id}: Cars detected but no cached detection dict available") detection_result = None elif current_mode == "car_gone_waiting": # ═══ CAR GONE WAITING STATE ═══ # Car is gone (both conditions met), YOLO inference disabled, waiting for new session logger.debug(f"🛑 Camera {camera_id}: CAR GONE WAITING - YOLO inference stopped") # Check if backend has started a new session (indicates new car scenario) if backend_session_id is not None: # Backend started new session - re-enable YOLO and reset to validation pipeline_state["yolo_inference_enabled"] = True pipeline_state["absence_counter"] = 0 pipeline_state["stable_track_id"] = None pipeline_state["cached_detection_dict"] = None # Clear stability tracking data for this camera from siwatsystem.pympta import reset_camera_stability_tracking reset_camera_stability_tracking(camera_id, model_tree.get("modelId", "unknown")) update_session_pipeline_mode(camera_id, "validation_detecting") logger.info(f"🔄 Camera {camera_id}: New session detected (id={backend_session_id}) - re-enabling YOLO inference") logger.info(f"✅ Camera {camera_id}: Reset to validation mode - cleared all tracking, ready for new car detection") # Don't run detection this frame - let next frame start fresh detection_result = {"class": "none", "confidence": 1.0, "bbox": [0, 0, 0, 0]} else: # Still waiting - no sessionId, no detection to send logger.debug(f"🛑 Camera {camera_id}: Car gone waiting - no YOLO inference, no data sent") detection_result = None process_time = (time.time() - start_time) * 1000 logger.debug(f"Detection for camera {camera_id} completed in {process_time:.2f}ms (mode: {current_mode})") # Skip processing if no detection result (blocked by session gating) if detection_result is None: logger.debug(f"No detection result to process for camera {camera_id}") return persistent_data # Log the raw detection result for debugging logger.debug(f"Raw detection result for camera {camera_id}:\n{json.dumps(detection_result, indent=2, default=str)}") # Extract session_id from pipeline result (always use backend sessionId) session_id = backend_session_id logger.debug(f"Using backend session_id: {session_id}") # Process detection result based on current mode if current_mode == "validation_detecting": # ═══ VALIDATION DETECTING MODE ═══ # Always send detection: null during validation phase detection_dict = None logger.debug(f"🔍 SENDING 'NONE' - validation_detecting mode for camera {camera_id}") elif current_mode == "send_detections": # ═══ SEND DETECTIONS MODE ═══ if detection_result.get("class") == "none": # No car detected - send detection: null detection_dict = None logger.debug(f"ðŸ“Ī SENDING 'NONE' - send_detections mode (no car) for camera {camera_id}") else: # Car detected in send_detections mode - ALWAYS send empty dict to trigger backend sessionId # Purpose: Tell backend "car is here, please create sessionId" detection_dict = {} logger.info(f"ðŸ“Ī SENDING EMPTY DETECTION_DICT - send_detections mode, requesting backend to create sessionId (conf={detection_result.get('confidence', 0):.3f}) for camera {camera_id}") if backend_session_id: logger.debug(f"🔄 Camera {camera_id}: Note - sessionId {backend_session_id} exists but still in send_detections mode (transition pending)") elif detection_result.get("class") == "none": # "None" detection - skip override if lightweight mode already made the decision if current_mode == "lightweight": # Lightweight mode already set detection_result correctly, don't override logger.debug(f"ðŸŠķ Camera {camera_id}: Lightweight mode - respecting detection_result decision") if detection_result is None: detection_dict = None logger.info(f"ðŸ“Ī LIGHTWEIGHT SENDING 'NONE' - Reset conditions met for camera {camera_id}") else: # detection_result should be the cached_detection_dict detection_dict = detection_result logger.info(f"ðŸ’ū LIGHTWEIGHT SENDING CACHED - Maintaining session for camera {camera_id}") else: # Other modes - send null to clear session detection_dict = None logger.info(f"ðŸ“Ī SENDING 'NONE' (detection: null) - Car absent, expecting backend to clear session for camera {camera_id}") elif detection_result.get("cached_mode", False): # Cached mode in lightweight - use cached detection dict directly cached_dict = detection_result.get("branch_results", {}) detection_dict = cached_dict if cached_dict else { "carModel": None, "carBrand": None, "carYear": None, "bodyType": None, "licensePlateText": None, "licensePlateConfidence": None } elif detection_result and "carBrand" in detection_result: # Lightweight mode - detection_result IS the cached detection dict detection_dict = detection_result logger.info(f"ðŸ’ū Camera {camera_id}: LIGHTWEIGHT MODE - using detection_result as detection_dict:") logger.info(f"ðŸ’ū Camera {camera_id}: - detection_dict: {detection_dict}") else: # Valid detection - convert to backend format (will be populated by branch processing) detection_dict = { "carModel": None, "carBrand": None, "carYear": None, "bodyType": None, "licensePlateText": None, "licensePlateConfidence": None } # Extract and process branch results from parallel classification (only for valid detections, skip cached mode) if detection_result.get("class") != "none" and "branch_results" in detection_result and not detection_result.get("cached_mode", False): def process_branch_results(branch_results, depth=0): """Recursively process branch results including nested branches.""" if not isinstance(branch_results, dict): return indent = " " * depth for branch_id, branch_data in branch_results.items(): if isinstance(branch_data, dict): logger.debug(f"{indent}Processing branch {branch_id}: {branch_data}") # Map common classification fields to backend-expected names if "brand" in branch_data: detection_dict["carBrand"] = branch_data["brand"] logger.debug(f"{indent}Mapped carBrand: {branch_data['brand']}") if "body_type" in branch_data: detection_dict["bodyType"] = branch_data["body_type"] logger.debug(f"{indent}Mapped bodyType: {branch_data['body_type']}") if "class" in branch_data: class_name = branch_data["class"] # Map based on branch/model type if "brand" in branch_id.lower(): detection_dict["carBrand"] = class_name logger.debug(f"{indent}Mapped carBrand from class: {class_name}") elif "bodytype" in branch_id.lower() or "body" in branch_id.lower(): detection_dict["bodyType"] = class_name logger.debug(f"{indent}Mapped bodyType from class: {class_name}") # Process nested branch results recursively if "branch_results" in branch_data: logger.debug(f"{indent}Processing nested branches in {branch_id}") process_branch_results(branch_data["branch_results"], depth + 1) branch_results = detection_result.get("branch_results", {}) if branch_results: logger.debug(f"Processing branch results: {branch_results}") process_branch_results(branch_results) logger.info(f"Detection payload after branch processing: {detection_dict}") # Cache the detection_dict for lightweight mode (after branch processing completes) if current_mode == "full_pipeline": pipeline_state = get_or_init_session_pipeline_state(camera_id) pipeline_state["cached_detection_dict"] = detection_dict.copy() logger.info(f"ðŸ’ū Camera {camera_id}: CACHED DETECTION DICT after branch processing: {detection_dict}") else: logger.debug("No branch results found in detection result") detection_data = { "type": "imageDetection", "subscriptionIdentifier": stream["subscriptionIdentifier"], "timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), # "timestamp": time.strftime("%Y-%m-%dT%H:%M:%S", time.gmtime()) + f".{int(time.time() * 1000) % 1000:03d}Z", "data": { "detection": detection_dict, "modelId": stream["modelId"], "modelName": stream["modelName"] } } # SessionId should NEVER be sent from worker to backend - it's uni-directional (backend -> worker only) # Backend manages sessionIds independently based on detection content logger.debug(f"TX message prepared (no sessionId) - detection_dict type: {type(detection_dict)}") # Log detection details if detection_result and "class" in detection_result and detection_result.get("class") != "none": confidence = detection_result.get("confidence", 0.0) logger.info(f"Camera {camera_id}: Detected {detection_result['class']} with confidence {confidence:.2f} using model {stream['modelName']}") elif detection_result and "carBrand" in detection_result: # Lightweight mode cached detection dict - different format logger.info(f"Camera {camera_id}: Using cached detection dict (lightweight mode) - {detection_result.get('carBrand', 'Unknown')} {detection_result.get('bodyType', '')}") # Send detection data to backend (session gating handled above in processing logic) logger.debug(f"ðŸ“Ī SENDING TO BACKEND for camera {camera_id}: {json.dumps(detection_data, indent=2)}") try: ws_logger.info(f"TX -> {json.dumps(detection_data, separators=(',', ':'))}") await websocket.send_json(detection_data) logger.debug(f"Sent detection data to client for camera {camera_id}") # Cache the detection data for potential resubscriptions (only if not null detection) if detection_dict is not None and detection_result.get("class") != "none": cached_detections[camera_id] = detection_data.copy() logger.debug(f"Cached detection for camera {camera_id}: {detection_dict}") else: # Don't cache null/none detections - let them reset properly cached_detections.pop(camera_id, None) logger.debug(f"Not caching null/none detection for camera {camera_id}") except RuntimeError as e: if "websocket.close" in str(e): logger.warning(f"WebSocket connection closed - cannot send detection data for camera {camera_id}") return persistent_data else: raise # Log status after sending (no sessionId sent to backend) if detection_dict is None: logger.info(f"ðŸ“Ą SENT 'none' detection - backend should clear session for camera {camera_id}") elif detection_dict == {}: logger.info(f"ðŸ“Ą SENT empty detection - backend should create sessionId for camera {camera_id}") else: logger.info(f"ðŸ“Ą SENT detection data - backend manages sessionId independently for camera {camera_id}") return persistent_data except Exception as e: logger.error(f"Error in handle_detection for camera {camera_id}: {str(e)}", exc_info=True) return persistent_data def frame_reader(camera_id, cap, buffer, stop_event): retries = 0 logger.info(f"Starting frame reader thread for camera {camera_id}") frame_count = 0 last_log_time = time.time() try: # Log initial camera status and properties if cap.isOpened(): width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) fps = cap.get(cv2.CAP_PROP_FPS) logger.info(f"Camera {camera_id} opened successfully with resolution {width}x{height}, FPS: {fps}") set_camera_connected(camera_id, True) else: logger.error(f"Camera {camera_id} failed to open initially") set_camera_connected(camera_id, False, "Failed to open camera initially") while not stop_event.is_set(): try: if not cap.isOpened(): logger.error(f"Camera {camera_id} is not open before trying to read") # Attempt to reopen cap = cv2.VideoCapture(streams[camera_id]["rtsp_url"]) time.sleep(reconnect_interval) continue logger.debug(f"Attempting to read frame from camera {camera_id}") ret, frame = cap.read() if not ret: error_msg = f"Connection lost for camera: {camera_id}, retry {retries+1}/{max_retries}" logger.warning(error_msg) set_camera_connected(camera_id, False, error_msg) cap.release() time.sleep(reconnect_interval) retries += 1 if retries > max_retries and max_retries != -1: logger.error(f"Max retries reached for camera: {camera_id}, stopping frame reader") set_camera_connected(camera_id, False, "Max retries reached") break # Re-open logger.info(f"Attempting to reopen RTSP stream for camera: {camera_id}") cap = cv2.VideoCapture(streams[camera_id]["rtsp_url"]) if not cap.isOpened(): logger.error(f"Failed to reopen RTSP stream for camera: {camera_id}") set_camera_connected(camera_id, False, "Failed to reopen RTSP stream") continue logger.info(f"Successfully reopened RTSP stream for camera: {camera_id}") set_camera_connected(camera_id, True) continue # Successfully read a frame frame_count += 1 current_time = time.time() # Log frame stats every 5 seconds if current_time - last_log_time > 5: logger.info(f"Camera {camera_id}: Read {frame_count} frames in the last {current_time - last_log_time:.1f} seconds") frame_count = 0 last_log_time = current_time logger.debug(f"Successfully read frame from camera {camera_id}, shape: {frame.shape}") retries = 0 set_camera_connected(camera_id, True) # Mark as connected on successful frame read # Overwrite old frame if buffer is full if not buffer.empty(): try: buffer.get_nowait() logger.debug(f"[frame_reader] Removed old frame from buffer for camera {camera_id}") except queue.Empty: pass buffer.put(frame) logger.debug(f"[frame_reader] Added new frame to buffer for camera {camera_id}. Buffer size: {buffer.qsize()}") # Short sleep to avoid CPU overuse time.sleep(0.01) except cv2.error as e: error_msg = f"OpenCV error for camera {camera_id}: {e}" logger.error(error_msg, exc_info=True) set_camera_connected(camera_id, False, error_msg) cap.release() time.sleep(reconnect_interval) retries += 1 if retries > max_retries and max_retries != -1: logger.error(f"Max retries reached after OpenCV error for camera {camera_id}") set_camera_connected(camera_id, False, "Max retries reached after OpenCV error") break logger.info(f"Attempting to reopen RTSP stream after OpenCV error for camera: {camera_id}") cap = cv2.VideoCapture(streams[camera_id]["rtsp_url"]) if not cap.isOpened(): logger.error(f"Failed to reopen RTSP stream for camera {camera_id} after OpenCV error") set_camera_connected(camera_id, False, "Failed to reopen after OpenCV error") continue logger.info(f"Successfully reopened RTSP stream after OpenCV error for camera: {camera_id}") set_camera_connected(camera_id, True) except Exception as e: error_msg = f"Unexpected error for camera {camera_id}: {str(e)}" logger.error(error_msg, exc_info=True) set_camera_connected(camera_id, False, error_msg) cap.release() break except Exception as e: logger.error(f"Error in frame_reader thread for camera {camera_id}: {str(e)}", exc_info=True) finally: logger.info(f"Frame reader thread for camera {camera_id} is exiting") if cap and cap.isOpened(): cap.release() def snapshot_reader(camera_id, snapshot_url, snapshot_interval, buffer, stop_event): """Frame reader that fetches snapshots from HTTP/HTTPS URL at specified intervals""" retries = 0 consecutive_failures = 0 # Track consecutive failures for backoff logger.info(f"Starting snapshot reader thread for camera {camera_id} from {snapshot_url}") frame_count = 0 last_log_time = time.time() # Initialize camera state set_camera_connected(camera_id, True) try: interval_seconds = snapshot_interval / 1000.0 # Convert milliseconds to seconds logger.info(f"Snapshot interval for camera {camera_id}: {interval_seconds}s") while not stop_event.is_set(): try: start_time = time.time() frame = fetch_snapshot(snapshot_url) if frame is None: consecutive_failures += 1 error_msg = f"Failed to fetch snapshot for camera: {camera_id}, consecutive failures: {consecutive_failures}" logger.warning(error_msg) set_camera_connected(camera_id, False, error_msg) retries += 1 # Check network connectivity with a simple ping-like test if consecutive_failures % 5 == 1: # Every 5th failure, test connectivity try: test_response = requests.get(snapshot_url, timeout=(2, 5), stream=False) logger.info(f"Camera {camera_id}: Connectivity test result: {test_response.status_code}") except Exception as test_error: logger.warning(f"Camera {camera_id}: Connectivity test failed: {test_error}") if retries > max_retries and max_retries != -1: logger.error(f"Max retries reached for snapshot camera: {camera_id}, stopping reader") set_camera_connected(camera_id, False, "Max retries reached for snapshot camera") break # Exponential backoff based on consecutive failures backoff_delay = min(30, max(1, min(2 ** min(consecutive_failures - 1, 6), interval_seconds * 2))) # Start with 1s, max 30s logger.debug(f"Camera {camera_id}: Backing off for {backoff_delay:.1f}s (consecutive failures: {consecutive_failures})") if stop_event.wait(backoff_delay): # Use wait with timeout instead of sleep break # Exit if stop_event is set during backoff continue # Successfully fetched a frame - reset consecutive failures consecutive_failures = 0 # Reset backoff on success frame_count += 1 current_time = time.time() # Log frame stats every 5 seconds if current_time - last_log_time > 5: logger.info(f"Camera {camera_id}: Fetched {frame_count} snapshots in the last {current_time - last_log_time:.1f} seconds") frame_count = 0 last_log_time = current_time logger.debug(f"Successfully fetched snapshot from camera {camera_id}, shape: {frame.shape}") retries = 0 set_camera_connected(camera_id, True) # Mark as connected on successful snapshot # Overwrite old frame if buffer is full if not buffer.empty(): try: buffer.get_nowait() logger.debug(f"[snapshot_reader] Removed old snapshot from buffer for camera {camera_id}") except queue.Empty: pass buffer.put(frame) logger.debug(f"[snapshot_reader] Added new snapshot to buffer for camera {camera_id}. Buffer size: {buffer.qsize()}") # Wait for the specified interval elapsed = time.time() - start_time sleep_time = max(interval_seconds - elapsed, 0) if sleep_time > 0: time.sleep(sleep_time) except Exception as e: consecutive_failures += 1 error_msg = f"Unexpected error fetching snapshot for camera {camera_id}: {str(e)}" logger.error(error_msg, exc_info=True) set_camera_connected(camera_id, False, error_msg) retries += 1 if retries > max_retries and max_retries != -1: logger.error(f"Max retries reached after error for snapshot camera {camera_id}") set_camera_connected(camera_id, False, "Max retries reached after error") break # Exponential backoff for exceptions too backoff_delay = min(30, max(1, min(2 ** min(consecutive_failures - 1, 6), interval_seconds * 2))) # Start with 1s, max 30s logger.debug(f"Camera {camera_id}: Exception backoff for {backoff_delay:.1f}s (consecutive failures: {consecutive_failures})") if stop_event.wait(backoff_delay): # Use wait with timeout instead of sleep break # Exit if stop_event is set during backoff except Exception as e: logger.error(f"Error in snapshot_reader thread for camera {camera_id}: {str(e)}", exc_info=True) finally: logger.info(f"Snapshot reader thread for camera {camera_id} is exiting") async def reconcile_subscriptions(desired_subscriptions, websocket): """ Declarative reconciliation: Compare desired vs current subscriptions and make changes """ logger.info(f"Reconciling subscriptions: {len(desired_subscriptions)} desired") with streams_lock: # Get current subscriptions current_subscription_ids = set(streams.keys()) desired_subscription_ids = set(sub["subscriptionIdentifier"] for sub in desired_subscriptions) # Find what to add and remove to_add = desired_subscription_ids - current_subscription_ids to_remove = current_subscription_ids - desired_subscription_ids to_check_for_changes = current_subscription_ids & desired_subscription_ids logger.info(f"Reconciliation: {len(to_add)} to add, {len(to_remove)} to remove, {len(to_check_for_changes)} to check for changes") # Remove subscriptions that are no longer wanted for subscription_id in to_remove: await unsubscribe_internal(subscription_id) # Check existing subscriptions for parameter changes for subscription_id in to_check_for_changes: desired_sub = next(sub for sub in desired_subscriptions if sub["subscriptionIdentifier"] == subscription_id) current_stream = streams[subscription_id] # Check if parameters changed if has_subscription_changed(desired_sub, current_stream): logger.info(f"Parameters changed for {subscription_id}, resubscribing") logger.debug(f"Parameter comparison for {subscription_id}:") logger.debug(f" rtspUrl: '{desired_sub.get('rtspUrl')}' vs '{current_stream.get('rtsp_url')}'") logger.debug(f" snapshotUrl: '{desired_sub.get('snapshotUrl')}' vs '{current_stream.get('snapshot_url')}'") logger.debug(f" modelUrl: '{extract_model_file_identifier(desired_sub.get('modelUrl'))}' vs '{extract_model_file_identifier(current_stream.get('modelUrl'))}'") logger.debug(f" modelId: {desired_sub.get('modelId')} vs {current_stream.get('modelId')}") # Preserve detection state for resubscription cached_detection = cached_detections.get(subscription_id) logger.debug(f"Preserving detection state for resubscription: {cached_detection is not None}") await unsubscribe_internal(subscription_id, preserve_detection=True) await subscribe_internal(desired_sub, websocket, cached_detection=cached_detection) # Add new subscriptions for subscription_id in to_add: desired_sub = next(sub for sub in desired_subscriptions if sub["subscriptionIdentifier"] == subscription_id) await subscribe_internal(desired_sub, websocket) def extract_model_file_identifier(model_url): """Extract the core model file identifier from S3 URLs, ignoring timestamp parameters""" if not model_url: return None # For S3 URLs, extract just the path portion before query parameters try: from urllib.parse import urlparse parsed = urlparse(model_url) # Return the path which contains the actual model file identifier # e.g. "/adsist-cms-staging/models/bangchak_poc-1756312318569.mpta" return parsed.path except Exception as e: logger.warning(f"Failed to parse model URL {model_url}: {e}") return model_url def has_subscription_changed(desired_sub, current_stream): """Check if subscription parameters have changed""" # Smart model URL comparison - ignore timestamp changes in signed URLs desired_model_id = extract_model_file_identifier(desired_sub.get("modelUrl")) current_model_id = extract_model_file_identifier(current_stream.get("modelUrl")) return ( desired_sub.get("rtspUrl") != current_stream.get("rtsp_url") or desired_sub.get("snapshotUrl") != current_stream.get("snapshot_url") or desired_sub.get("snapshotInterval") != current_stream.get("snapshot_interval") or desired_sub.get("cropX1") != current_stream.get("cropX1") or desired_sub.get("cropY1") != current_stream.get("cropY1") or desired_sub.get("cropX2") != current_stream.get("cropX2") or desired_sub.get("cropY2") != current_stream.get("cropY2") or desired_sub.get("modelId") != current_stream.get("modelId") or desired_sub.get("modelName") != current_stream.get("modelName") or desired_model_id != current_model_id ) async def subscribe_internal(subscription, websocket, cached_detection=None): """Internal subscription logic extracted from original subscribe handler""" subscriptionIdentifier = subscription.get("subscriptionIdentifier") rtsp_url = subscription.get("rtspUrl") snapshot_url = subscription.get("snapshotUrl") snapshot_interval = subscription.get("snapshotInterval") model_url = subscription.get("modelUrl") modelId = subscription.get("modelId") modelName = subscription.get("modelName") cropX1 = subscription.get("cropX1") cropY1 = subscription.get("cropY1") cropX2 = subscription.get("cropX2") cropY2 = subscription.get("cropY2") # Extract camera_id from subscriptionIdentifier parts = subscriptionIdentifier.split(';') if len(parts) != 2: logger.error(f"Invalid subscriptionIdentifier format: {subscriptionIdentifier}") return display_identifier, camera_identifier = parts camera_id = subscriptionIdentifier # Load model if needed if model_url: with models_lock: if (camera_id not in models) or (modelId not in models[camera_id]): logger.info(f"Loading model from {model_url} for camera {camera_id}, modelId {modelId}") extraction_dir = os.path.join("models", camera_identifier, str(modelId)) os.makedirs(extraction_dir, exist_ok=True) # Handle model loading (same as original) parsed = urlparse(model_url) if parsed.scheme in ("http", "https"): filename = os.path.basename(parsed.path) or f"model_{modelId}.mpta" local_mpta = os.path.join(extraction_dir, filename) local_path = download_mpta(model_url, local_mpta) if not local_path: logger.error(f"Failed to download model from {model_url}") return model_tree = load_pipeline_from_zip(local_path, extraction_dir) else: if not os.path.exists(model_url): logger.error(f"Model file not found: {model_url}") return model_tree = load_pipeline_from_zip(model_url, extraction_dir) if model_tree is None: logger.error(f"Failed to load model {modelId}") return if camera_id not in models: models[camera_id] = {} models[camera_id][modelId] = model_tree # Create stream (same logic as original) if camera_id and (rtsp_url or snapshot_url) and len(streams) < max_streams: camera_url = snapshot_url if snapshot_url else rtsp_url # Check if we already have a stream for this camera URL shared_stream = camera_streams.get(camera_url) if shared_stream: # Reuse existing stream buffer = shared_stream["buffer"] stop_event = shared_stream["stop_event"] thread = shared_stream["thread"] mode = shared_stream["mode"] shared_stream["ref_count"] = shared_stream.get("ref_count", 0) + 1 else: # Create new stream buffer = queue.Queue(maxsize=1) stop_event = threading.Event() if snapshot_url and snapshot_interval: thread = threading.Thread(target=snapshot_reader, args=(camera_id, snapshot_url, snapshot_interval, buffer, stop_event)) thread.daemon = True thread.start() mode = "snapshot" shared_stream = { "buffer": buffer, "thread": thread, "stop_event": stop_event, "mode": mode, "url": snapshot_url, "snapshot_interval": snapshot_interval, "ref_count": 1 } camera_streams[camera_url] = shared_stream elif rtsp_url: cap = cv2.VideoCapture(rtsp_url) if not cap.isOpened(): logger.error(f"Failed to open RTSP stream for camera {camera_id}") return thread = threading.Thread(target=frame_reader, args=(camera_id, cap, buffer, stop_event)) thread.daemon = True thread.start() mode = "rtsp" shared_stream = { "buffer": buffer, "thread": thread, "stop_event": stop_event, "mode": mode, "url": rtsp_url, "cap": cap, "ref_count": 1 } camera_streams[camera_url] = shared_stream else: logger.error(f"No valid URL provided for camera {camera_id}") return # Create stream info stream_info = { "buffer": buffer, "thread": thread, "stop_event": stop_event, "modelId": modelId, "modelName": modelName, "subscriptionIdentifier": subscriptionIdentifier, "cropX1": cropX1, "cropY1": cropY1, "cropX2": cropX2, "cropY2": cropY2, "mode": mode, "camera_url": camera_url, "modelUrl": model_url, # Always store both URLs for comparison consistency "rtsp_url": rtsp_url, "snapshot_url": snapshot_url, "snapshot_interval": snapshot_interval } if mode == "rtsp": stream_info["cap"] = shared_stream["cap"] streams[camera_id] = stream_info subscription_to_camera[camera_id] = camera_url logger.info(f"Subscribed to camera {camera_id}") # Send initial detection to backend - use cached if available, otherwise "none" if cached_detection: # Restore cached detection with updated timestamp (RESUBSCRIPTION STATUS UPDATE) initial_detection_data = cached_detection.copy() initial_detection_data["timestamp"] = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) logger.info(f"ðŸ“Ą RESUBSCRIPTION: Restoring cached detection for camera {camera_id}") logger.debug(f"ðŸ“Ą RESUBSCRIPTION: Cached detection has sessionId: {initial_detection_data.get('sessionId', 'None')}") else: # Send "none" detection for new subscriptions initial_detection_data = { "type": "imageDetection", "subscriptionIdentifier": subscriptionIdentifier, "timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), "data": { "detection": None, "modelId": modelId, "modelName": modelName } } logger.info(f"ðŸ“Ą NEW SUBSCRIPTION: Sending initial 'none' detection for camera {camera_id}") ws_logger.info(f"TX -> {json.dumps(initial_detection_data, separators=(',', ':'))}") await websocket.send_json(initial_detection_data) logger.debug(f"Initial detection data sent (resubscription={cached_detection is not None}): {initial_detection_data}") # This cached detection was just a one-time status update for resubscription # Normal frame processing will continue independently async def unsubscribe_internal(subscription_id, preserve_detection=False): """Internal unsubscription logic""" if subscription_id in streams: stream = streams.pop(subscription_id) camera_url = subscription_to_camera.pop(subscription_id, None) if camera_url and camera_url in camera_streams: shared_stream = camera_streams[camera_url] shared_stream["ref_count"] -= 1 if shared_stream["ref_count"] <= 0: shared_stream["stop_event"].set() shared_stream["thread"].join() if "cap" in shared_stream: shared_stream["cap"].release() del camera_streams[camera_url] latest_frames.pop(subscription_id, None) if not preserve_detection: cached_detections.pop(subscription_id, None) # Clear cached detection only if not preserving frame_skip_flags.pop(subscription_id, None) # Clear frame skip flag camera_states.pop(subscription_id, None) # Clear camera state cached_full_pipeline_results.pop(subscription_id, None) # Clear cached pipeline results session_pipeline_states.pop(subscription_id, None) # Clear session pipeline state cleanup_camera_stability(subscription_id) logger.info(f"Unsubscribed from camera {subscription_id} (preserve_detection={preserve_detection})") async def process_streams(): logger.info("Started processing streams") try: while True: start_time = time.time() with streams_lock: current_streams = list(streams.items()) if current_streams: logger.debug(f"Processing {len(current_streams)} active streams") else: logger.debug("No active streams to process") for camera_id, stream in current_streams: buffer = stream["buffer"] if buffer.empty(): logger.debug(f"Frame buffer is empty for camera {camera_id}") continue logger.debug(f"Got frame from buffer for camera {camera_id}") frame = buffer.get() # Cache the frame for REST API access latest_frames[camera_id] = frame.copy() logger.debug(f"Cached frame for REST API access for camera {camera_id}") with models_lock: model_tree = models.get(camera_id, {}).get(stream["modelId"]) if not model_tree: logger.warning(f"Model not found for camera {camera_id}, modelId {stream['modelId']}") continue logger.debug(f"Found model tree for camera {camera_id}, modelId {stream['modelId']}") key = (camera_id, stream["modelId"]) persistent_data = persistent_data_dict.get(key, {}) logger.debug(f"Starting detection for camera {camera_id} with modelId {stream['modelId']}") updated_persistent_data = await handle_detection( camera_id, stream, frame, websocket, model_tree, persistent_data ) persistent_data_dict[key] = updated_persistent_data elapsed_time = (time.time() - start_time) * 1000 # ms sleep_time = max(poll_interval - elapsed_time, 0) logger.debug(f"Frame processing cycle: {elapsed_time:.2f}ms, sleeping for: {sleep_time:.2f}ms") await asyncio.sleep(sleep_time / 1000.0) except asyncio.CancelledError: logger.info("Stream processing task cancelled") except Exception as e: logger.error(f"Error in process_streams: {str(e)}", exc_info=True) async def send_heartbeat(): while True: try: cpu_usage = psutil.cpu_percent() memory_usage = psutil.virtual_memory().percent if torch.cuda.is_available(): gpu_usage = torch.cuda.utilization() if hasattr(torch.cuda, 'utilization') else None gpu_memory_usage = torch.cuda.memory_reserved() / (1024 ** 2) else: gpu_usage = None gpu_memory_usage = None camera_connections = [ { "subscriptionIdentifier": stream["subscriptionIdentifier"], "modelId": stream["modelId"], "modelName": stream["modelName"], "online": True, # Include all subscription parameters for proper change detection "rtspUrl": stream.get("rtsp_url"), "snapshotUrl": stream.get("snapshot_url"), "snapshotInterval": stream.get("snapshot_interval"), **{k: v for k, v in get_crop_coords(stream).items() if v is not None} } for camera_id, stream in streams.items() ] state_report = { "type": "stateReport", "cpuUsage": cpu_usage, "memoryUsage": memory_usage, "gpuUsage": gpu_usage, "gpuMemoryUsage": gpu_memory_usage, "cameraConnections": camera_connections } await websocket.send_text(json.dumps(state_report)) logger.debug(f"Sent stateReport as heartbeat: CPU {cpu_usage:.1f}%, Memory {memory_usage:.1f}%, {len(camera_connections)} active cameras") await asyncio.sleep(HEARTBEAT_INTERVAL) except Exception as e: logger.error(f"Error sending stateReport heartbeat: {e}") break async def on_message(): while True: try: msg = await websocket.receive_text() ws_logger.info(f"RX <- {msg}") logger.debug(f"Received message: {msg}") data = json.loads(msg) msg_type = data.get("type") if msg_type == "setSubscriptionList": # Declarative approach: Backend sends list of subscriptions this worker should have desired_subscriptions = data.get("subscriptions", []) logger.info(f"Received subscription list with {len(desired_subscriptions)} subscriptions") await reconcile_subscriptions(desired_subscriptions, websocket) elif msg_type == "subscribe": # Legacy support - convert single subscription to list payload = data.get("payload", {}) await reconcile_subscriptions([payload], websocket) elif msg_type == "unsubscribe": # Legacy support - remove subscription payload = data.get("payload", {}) subscriptionIdentifier = payload.get("subscriptionIdentifier") # Remove from current subscriptions and reconcile current_subs = [] with streams_lock: for camera_id, stream in streams.items(): if stream["subscriptionIdentifier"] != subscriptionIdentifier: # Convert stream back to subscription format current_subs.append({ "subscriptionIdentifier": stream["subscriptionIdentifier"], "rtspUrl": stream.get("rtsp_url"), "snapshotUrl": stream.get("snapshot_url"), "snapshotInterval": stream.get("snapshot_interval"), "modelId": stream["modelId"], "modelName": stream["modelName"], "modelUrl": stream.get("modelUrl", ""), "cropX1": stream.get("cropX1"), "cropY1": stream.get("cropY1"), "cropX2": stream.get("cropX2"), "cropY2": stream.get("cropY2") }) await reconcile_subscriptions(current_subs, websocket) elif msg_type == "old_subscribe_logic_removed": if model_url: with models_lock: if (camera_id not in models) or (modelId not in models[camera_id]): logger.info(f"Loading model from {model_url} for camera {camera_id}, modelId {modelId}") extraction_dir = os.path.join("models", camera_identifier, str(modelId)) os.makedirs(extraction_dir, exist_ok=True) # If model_url is remote, download it first. parsed = urlparse(model_url) if parsed.scheme in ("http", "https"): logger.info(f"Downloading remote .mpta file from {model_url}") filename = os.path.basename(parsed.path) or f"model_{modelId}.mpta" local_mpta = os.path.join(extraction_dir, filename) logger.debug(f"Download destination: {local_mpta}") local_path = download_mpta(model_url, local_mpta) if not local_path: logger.error(f"Failed to download the remote .mpta file from {model_url}") error_response = { "type": "error", "subscriptionIdentifier": subscriptionIdentifier, "error": f"Failed to download model from {model_url}" } ws_logger.info(f"TX -> {json.dumps(error_response, separators=(',', ':'))}") await websocket.send_json(error_response) continue model_tree = load_pipeline_from_zip(local_path, extraction_dir) else: logger.info(f"Loading local .mpta file from {model_url}") # Check if file exists before attempting to load if not os.path.exists(model_url): logger.error(f"Local .mpta file not found: {model_url}") logger.debug(f"Current working directory: {os.getcwd()}") error_response = { "type": "error", "subscriptionIdentifier": subscriptionIdentifier, "error": f"Model file not found: {model_url}" } ws_logger.info(f"TX -> {json.dumps(error_response, separators=(',', ':'))}") await websocket.send_json(error_response) continue model_tree = load_pipeline_from_zip(model_url, extraction_dir) if model_tree is None: logger.error(f"Failed to load model {modelId} from .mpta file for camera {camera_id}") error_response = { "type": "error", "subscriptionIdentifier": subscriptionIdentifier, "error": f"Failed to load model {modelId}" } await websocket.send_json(error_response) continue if camera_id not in models: models[camera_id] = {} models[camera_id][modelId] = model_tree logger.info(f"Successfully loaded model {modelId} for camera {camera_id}") logger.debug(f"Model extraction directory: {extraction_dir}") if camera_id and (rtsp_url or snapshot_url): with streams_lock: # Determine camera URL for shared stream management camera_url = snapshot_url if snapshot_url else rtsp_url if camera_id not in streams and len(streams) < max_streams: # Check if we already have a stream for this camera URL shared_stream = camera_streams.get(camera_url) if shared_stream: # Reuse existing stream logger.info(f"Reusing existing stream for camera URL: {camera_url}") buffer = shared_stream["buffer"] stop_event = shared_stream["stop_event"] thread = shared_stream["thread"] mode = shared_stream["mode"] # Increment reference count shared_stream["ref_count"] = shared_stream.get("ref_count", 0) + 1 else: # Create new stream buffer = queue.Queue(maxsize=1) stop_event = threading.Event() if snapshot_url and snapshot_interval: logger.info(f"Creating new snapshot stream for camera {camera_id}: {snapshot_url}") thread = threading.Thread(target=snapshot_reader, args=(camera_id, snapshot_url, snapshot_interval, buffer, stop_event)) thread.daemon = True thread.start() mode = "snapshot" # Store shared stream info shared_stream = { "buffer": buffer, "thread": thread, "stop_event": stop_event, "mode": mode, "url": snapshot_url, "snapshot_interval": snapshot_interval, "ref_count": 1 } camera_streams[camera_url] = shared_stream elif rtsp_url: logger.info(f"Creating new RTSP stream for camera {camera_id}: {rtsp_url}") cap = cv2.VideoCapture(rtsp_url) if not cap.isOpened(): logger.error(f"Failed to open RTSP stream for camera {camera_id}") continue thread = threading.Thread(target=frame_reader, args=(camera_id, cap, buffer, stop_event)) thread.daemon = True thread.start() mode = "rtsp" # Store shared stream info shared_stream = { "buffer": buffer, "thread": thread, "stop_event": stop_event, "mode": mode, "url": rtsp_url, "cap": cap, "ref_count": 1 } camera_streams[camera_url] = shared_stream else: logger.error(f"No valid URL provided for camera {camera_id}") continue # Create stream info for this subscription stream_info = { "buffer": buffer, "thread": thread, "stop_event": stop_event, "modelId": modelId, "modelName": modelName, "subscriptionIdentifier": subscriptionIdentifier, "cropX1": cropX1, "cropY1": cropY1, "cropX2": cropX2, "cropY2": cropY2, "mode": mode, "camera_url": camera_url } if mode == "snapshot": stream_info["snapshot_url"] = snapshot_url stream_info["snapshot_interval"] = snapshot_interval elif mode == "rtsp": stream_info["rtsp_url"] = rtsp_url stream_info["cap"] = shared_stream["cap"] streams[camera_id] = stream_info subscription_to_camera[camera_id] = camera_url elif camera_id and camera_id in streams: # If already subscribed, unsubscribe first logger.info(f"Resubscribing to camera {camera_id}") # Note: Keep models in memory for reuse across subscriptions elif msg_type == "unsubscribe": payload = data.get("payload", {}) subscriptionIdentifier = payload.get("subscriptionIdentifier") camera_id = subscriptionIdentifier with streams_lock: if camera_id and camera_id in streams: stream = streams.pop(camera_id) camera_url = subscription_to_camera.pop(camera_id, None) if camera_url and camera_url in camera_streams: shared_stream = camera_streams[camera_url] shared_stream["ref_count"] -= 1 # If no more references, stop the shared stream if shared_stream["ref_count"] <= 0: logger.info(f"Stopping shared stream for camera URL: {camera_url}") shared_stream["stop_event"].set() shared_stream["thread"].join() if "cap" in shared_stream: shared_stream["cap"].release() del camera_streams[camera_url] else: logger.info(f"Shared stream for {camera_url} still has {shared_stream['ref_count']} references") # Clean up cached frame and stability tracking latest_frames.pop(camera_id, None) cached_detections.pop(camera_id, None) # Clear cached detection frame_skip_flags.pop(camera_id, None) # Clear frame skip flag camera_states.pop(camera_id, None) # Clear camera state cleanup_camera_stability(camera_id) logger.info(f"Unsubscribed from camera {camera_id}") # Note: Keep models in memory for potential reuse elif msg_type == "requestState": cpu_usage = psutil.cpu_percent() memory_usage = psutil.virtual_memory().percent if torch.cuda.is_available(): gpu_usage = torch.cuda.utilization() if hasattr(torch.cuda, 'utilization') else None gpu_memory_usage = torch.cuda.memory_reserved() / (1024 ** 2) else: gpu_usage = None gpu_memory_usage = None camera_connections = [ { "subscriptionIdentifier": stream["subscriptionIdentifier"], "modelId": stream["modelId"], "modelName": stream["modelName"], "online": True, # Include all subscription parameters for proper change detection "rtspUrl": stream.get("rtsp_url"), "snapshotUrl": stream.get("snapshot_url"), "snapshotInterval": stream.get("snapshot_interval"), **{k: v for k, v in get_crop_coords(stream).items() if v is not None} } for camera_id, stream in streams.items() ] state_report = { "type": "stateReport", "cpuUsage": cpu_usage, "memoryUsage": memory_usage, "gpuUsage": gpu_usage, "gpuMemoryUsage": gpu_memory_usage, "cameraConnections": camera_connections } await websocket.send_text(json.dumps(state_report)) elif msg_type == "setSessionId": payload = data.get("payload", {}) display_identifier = payload.get("displayIdentifier") session_id = payload.get("sessionId") # Debug sessionId value types and contents session_id_type = type(session_id).__name__ if session_id is None: logger.info(f"🆔 BACKEND SESSIONID RECEIVED: displayId={display_identifier}, sessionId=None (type: {session_id_type})") logger.info(f"🔄 BACKEND WANTS TO CLEAR SESSION for display {display_identifier}") elif session_id == "null": logger.info(f"🆔 BACKEND SESSIONID RECEIVED: displayId={display_identifier}, sessionId='null' (type: {session_id_type})") logger.info(f"🔄 BACKEND SENT STRING 'null' for display {display_identifier}") elif session_id == "": logger.info(f"🆔 BACKEND SESSIONID RECEIVED: displayId={display_identifier}, sessionId='' (empty string, type: {session_id_type})") logger.info(f"🔄 BACKEND SENT EMPTY STRING for display {display_identifier}") else: logger.info(f"🆔 BACKEND SESSIONID RECEIVED: displayId={display_identifier}, sessionId='{session_id}' (type: {session_id_type}, length: {len(str(session_id))})") logger.info(f"🔄 BACKEND CREATED/UPDATED SESSION: {session_id} for display {display_identifier}") logger.debug(f"Full setSessionId payload: {payload}") logger.debug(f"WebSocket message raw data: {json.dumps(data, indent=2)}") logger.debug(f"Current active cameras: {list(streams.keys())}") if display_identifier: # Store session ID for this display if session_id is None or session_id == "null" or session_id == "": old_session_id = session_ids.get(display_identifier) session_ids.pop(display_identifier, None) if session_id is None: logger.info(f"ðŸšŦ BACKEND ENDED SESSION: Cleared session ID for display {display_identifier} (was: {old_session_id}) - received None") elif session_id == "null": logger.info(f"ðŸšŦ BACKEND ENDED SESSION: Cleared session ID for display {display_identifier} (was: {old_session_id}) - received string 'null'") elif session_id == "": logger.info(f"ðŸšŦ BACKEND ENDED SESSION: Cleared session ID for display {display_identifier} (was: {old_session_id}) - received empty string") logger.debug(f"Session IDs after clearing: {session_ids}") # Reset tracking state for all cameras associated with this display with streams_lock: affected_cameras = [] for camera_id, stream in streams.items(): if stream["subscriptionIdentifier"].startswith(display_identifier + ";"): affected_cameras.append(camera_id) # Import here to avoid circular import from siwatsystem.pympta import reset_tracking_state model_id = stream.get("modelId", "unknown") reset_tracking_state(camera_id, model_id, "backend session ended") logger.info(f"Reset tracking for camera {camera_id} (display: {display_identifier})") logger.debug(f"Reset tracking for {len(affected_cameras)} cameras: {affected_cameras}") else: old_session_id = session_ids.get(display_identifier) session_ids[display_identifier] = session_id logger.info(f"✅ BACKEND SESSION STARTED: Set session ID {session_id} for display {display_identifier} (previous: {old_session_id})") logger.debug(f"Session IDs after update: {session_ids}") logger.debug(f"ðŸŽŊ CMS Backend created sessionId {session_id} after receiving detection data") # Clear waiting state for cameras associated with this display with streams_lock: affected_cameras = [] for camera_id, stream in streams.items(): if stream["subscriptionIdentifier"].startswith(display_identifier + ";"): affected_cameras.append(camera_id) from siwatsystem.pympta import get_camera_stability_data model_id = stream.get("modelId", "unknown") stability_data = get_camera_stability_data(camera_id, model_id) session_state = stability_data["session_state"] if session_state.get("waiting_for_backend_session", False): session_state["waiting_for_backend_session"] = False session_state["wait_start_time"] = 0.0 logger.info(f"🚀 PIPELINE UNBLOCKED: Backend sessionId {session_id} received - camera {camera_id} can proceed with database operations") logger.debug(f"📋 Camera {camera_id}: SessionId {session_id} now available for future database operations") logger.debug(f"Updated session state for {len(affected_cameras)} cameras: {affected_cameras}") else: logger.warning(f"ðŸšĻ Invalid setSessionId message: missing displayIdentifier in payload") elif msg_type == "patchSession": session_id = data.get("sessionId") patch_data = data.get("data", {}) # For now, just acknowledge the patch - actual implementation depends on backend requirements response = { "type": "patchSessionResult", "payload": { "sessionId": session_id, "success": True, "message": "Session patch acknowledged" } } ws_logger.info(f"TX -> {json.dumps(response, separators=(',', ':'))}") await websocket.send_json(response) logger.info(f"Acknowledged patch for session {session_id}") else: logger.error(f"Unknown message type: {msg_type}") except json.JSONDecodeError: logger.error("Received invalid JSON message") except (WebSocketDisconnect, ConnectionClosedError) as e: logger.warning(f"WebSocket disconnected: {e}") break except Exception as e: logger.error(f"Error handling message: {e}") break try: await websocket.accept() stream_task = asyncio.create_task(process_streams()) heartbeat_task = asyncio.create_task(send_heartbeat()) message_task = asyncio.create_task(on_message()) await asyncio.gather(heartbeat_task, message_task) except Exception as e: logger.error(f"Error in detect websocket: {e}") finally: stream_task.cancel() await stream_task with streams_lock: # Clean up shared camera streams for camera_url, shared_stream in camera_streams.items(): shared_stream["stop_event"].set() shared_stream["thread"].join() if "cap" in shared_stream: shared_stream["cap"].release() while not shared_stream["buffer"].empty(): try: shared_stream["buffer"].get_nowait() except queue.Empty: pass logger.info(f"Released shared camera stream for {camera_url}") streams.clear() camera_streams.clear() subscription_to_camera.clear() with models_lock: models.clear() latest_frames.clear() cached_detections.clear() frame_skip_flags.clear() camera_states.clear() cached_full_pipeline_results.clear() session_pipeline_states.clear() session_ids.clear() logger.info("WebSocket connection closed")