from typing import Any, Dict import os import json import time import queue import torch import cv2 import numpy as np import base64 import logging import threading import requests import asyncio import psutil import zipfile import ssl import urllib3 import subprocess import tempfile from urllib.parse import urlparse from requests.adapters import HTTPAdapter from urllib3.util.ssl_ import create_urllib3_context from fastapi import FastAPI, WebSocket, HTTPException from fastapi.websockets import WebSocketDisconnect from fastapi.responses import Response from websockets.exceptions import ConnectionClosedError from ultralytics import YOLO # Import shared pipeline functions from siwatsystem.pympta import load_pipeline_from_zip, run_pipeline, cleanup_camera_stability app = FastAPI() # Global dictionaries to keep track of models and streams # "models" now holds a nested dict: { camera_id: { modelId: model_tree } } models: Dict[str, Dict[str, Any]] = {} streams: Dict[str, Dict[str, Any]] = {} # Store session IDs per display session_ids: Dict[str, int] = {} # Track shared camera streams by camera URL camera_streams: Dict[str, Dict[str, Any]] = {} # Map subscriptions to their camera URL subscription_to_camera: Dict[str, str] = {} # Store latest frames for REST API access (separate from processing buffer) latest_frames: Dict[str, Any] = {} # Store cached detection dict after successful pipeline completion cached_detections: Dict[str, Dict[str, Any]] = {} # Track frame skipping for pipeline buffer after detection frame_skip_flags: Dict[str, bool] = {} # Track camera connection states for immediate error handling camera_states: Dict[str, Dict[str, Any]] = {} # Track session ID states and pipeline modes per camera session_pipeline_states: Dict[str, Dict[str, Any]] = {} # Store full pipeline results for caching cached_full_pipeline_results: Dict[str, Dict[str, Any]] = {} with open("config.json", "r") as f: config = json.load(f) poll_interval = config.get("poll_interval_ms", 100) reconnect_interval = config.get("reconnect_interval_sec", 5) TARGET_FPS = config.get("target_fps", 10) poll_interval = 1000 / TARGET_FPS logging.info(f"Poll interval: {poll_interval}ms") max_streams = config.get("max_streams", 5) max_retries = config.get("max_retries", 3) # Configure logging logging.basicConfig( level=logging.INFO, # Set to INFO level for less verbose output format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", handlers=[ logging.FileHandler("detector_worker.log"), # Write logs to a file logging.StreamHandler() # Also output to console ] ) # Create a logger specifically for this application logger = logging.getLogger("detector_worker") logger.setLevel(logging.DEBUG) # Set app-specific logger to DEBUG level # Create WebSocket communication logger ws_logger = logging.getLogger("websocket_comm") ws_logger.setLevel(logging.INFO) ws_handler = logging.FileHandler("websocket_comm.log", encoding='utf-8') ws_formatter = logging.Formatter("%(asctime)s [%(levelname)s] %(message)s") ws_handler.setFormatter(ws_formatter) ws_logger.addHandler(ws_handler) ws_logger.propagate = False # Don't propagate to root logger # Ensure all other libraries (including root) use at least INFO level logging.getLogger().setLevel(logging.INFO) logger.info("Starting detector worker application") logger.info(f"Configuration: Target FPS: {TARGET_FPS}, Max streams: {max_streams}, Max retries: {max_retries}") ws_logger.info("WebSocket communication logging started - TX/RX format") logger.info("WebSocket communication will be logged to websocket_comm.log") # Ensure the models directory exists os.makedirs("models", exist_ok=True) logger.info("Ensured models directory exists") # Constants for heartbeat and timeouts HEARTBEAT_INTERVAL = 2 # seconds WORKER_TIMEOUT_MS = 10000 logger.debug(f"Heartbeat interval set to {HEARTBEAT_INTERVAL} seconds") # Locks for thread-safe operations streams_lock = threading.Lock() models_lock = threading.Lock() logger.debug("Initialized thread locks") # Add helper to download mpta ZIP file from a remote URL def download_mpta(url: str, dest_path: str) -> str: try: logger.info(f"Starting download of model from {url} to {dest_path}") os.makedirs(os.path.dirname(dest_path), exist_ok=True) response = requests.get(url, stream=True) if response.status_code == 200: file_size = int(response.headers.get('content-length', 0)) logger.info(f"Model file size: {file_size/1024/1024:.2f} MB") downloaded = 0 with open(dest_path, "wb") as f: for chunk in response.iter_content(chunk_size=8192): f.write(chunk) downloaded += len(chunk) if file_size > 0 and downloaded % (file_size // 10) < 8192: # Log approximately every 10% logger.debug(f"Download progress: {downloaded/file_size*100:.1f}%") logger.info(f"Successfully downloaded mpta file from {url} to {dest_path}") return dest_path else: logger.error(f"Failed to download mpta file (status code {response.status_code}): {response.text}") return None except Exception as e: logger.error(f"Exception downloading mpta file from {url}: {str(e)}", exc_info=True) return None # Add helper to fetch snapshot image from HTTP/HTTPS URL def fetch_snapshot(url: str): try: from requests.auth import HTTPBasicAuth, HTTPDigestAuth import requests.adapters import urllib3 # Parse URL to extract credentials parsed = urlparse(url) # Prepare headers - some cameras require User-Agent and specific headers headers = { 'User-Agent': 'Mozilla/5.0 (compatible; DetectorWorker/1.0)', 'Accept': 'image/jpeg,image/*,*/*', 'Connection': 'close', 'Cache-Control': 'no-cache' } # Create a session with custom adapter for better connection handling session = requests.Session() adapter = requests.adapters.HTTPAdapter( pool_connections=1, pool_maxsize=1, max_retries=urllib3.util.retry.Retry( total=2, backoff_factor=0.1, status_forcelist=[500, 502, 503, 504] ) ) session.mount('http://', adapter) session.mount('https://', adapter) # Reconstruct URL without credentials clean_url = f"{parsed.scheme}://{parsed.hostname}" if parsed.port: clean_url += f":{parsed.port}" clean_url += parsed.path if parsed.query: clean_url += f"?{parsed.query}" auth = None response = None if parsed.username and parsed.password: # Try HTTP Digest authentication first (common for IP cameras) try: auth = HTTPDigestAuth(parsed.username, parsed.password) response = session.get(clean_url, auth=auth, headers=headers, timeout=(5, 15), stream=True) if response.status_code == 200: logger.debug(f"Successfully authenticated using HTTP Digest for {clean_url}") elif response.status_code == 401: # If Digest fails, try Basic auth logger.debug(f"HTTP Digest failed, trying Basic auth for {clean_url}") auth = HTTPBasicAuth(parsed.username, parsed.password) response = session.get(clean_url, auth=auth, headers=headers, timeout=(5, 15), stream=True) if response.status_code == 200: logger.debug(f"Successfully authenticated using HTTP Basic for {clean_url}") except Exception as auth_error: logger.debug(f"Authentication setup error: {auth_error}") # Fallback to original URL with embedded credentials response = session.get(url, headers=headers, timeout=(5, 15), stream=True) else: # No credentials in URL, make request as-is response = session.get(url, headers=headers, timeout=(5, 15), stream=True) if response and response.status_code == 200: # Read content with size limit to prevent memory issues content = b'' max_size = 10 * 1024 * 1024 # 10MB limit for chunk in response.iter_content(chunk_size=8192): content += chunk if len(content) > max_size: logger.error(f"Snapshot too large (>{max_size} bytes) from {clean_url}") return None # Convert response content to numpy array nparr = np.frombuffer(content, np.uint8) # Decode image frame = cv2.imdecode(nparr, cv2.IMREAD_COLOR) if frame is not None: logger.debug(f"Successfully fetched snapshot from {clean_url}, shape: {frame.shape}, size: {len(content)} bytes") return frame else: logger.error(f"Failed to decode image from snapshot URL: {clean_url} (content size: {len(content)} bytes)") return None elif response: logger.error(f"Failed to fetch snapshot (status code {response.status_code}): {clean_url}") # Log response headers and first part of content for debugging logger.debug(f"Response headers: {dict(response.headers)}") if len(response.content) < 1000: logger.debug(f"Response content: {response.content[:500]}") return None else: logger.error(f"No response received from snapshot URL: {clean_url}") return None except requests.exceptions.Timeout as e: logger.error(f"Timeout fetching snapshot from {url}: {str(e)}") return None except requests.exceptions.ConnectionError as e: logger.error(f"Connection error fetching snapshot from {url}: {str(e)}") return None except Exception as e: logger.error(f"Exception fetching snapshot from {url}: {str(e)}", exc_info=True) return None # Helper to get crop coordinates from stream def get_crop_coords(stream): return { "cropX1": stream.get("cropX1"), "cropY1": stream.get("cropY1"), "cropX2": stream.get("cropX2"), "cropY2": stream.get("cropY2") } # Camera state management functions def set_camera_connected(camera_id, connected=True, error_msg=None): """Set camera connection state and track error information""" current_time = time.time() if camera_id not in camera_states: camera_states[camera_id] = { "connected": True, "last_error": None, "last_error_time": None, "consecutive_failures": 0, "disconnection_notified": False } state = camera_states[camera_id] was_connected = state["connected"] if connected: state["connected"] = True state["consecutive_failures"] = 0 state["disconnection_notified"] = False if not was_connected: logger.info(f"📶 CAMERA RECONNECTED: {camera_id}") else: state["connected"] = False state["last_error"] = error_msg state["last_error_time"] = current_time state["consecutive_failures"] += 1 # Distinguish between temporary and permanent disconnection is_permanent = state["consecutive_failures"] >= 3 if was_connected and is_permanent: logger.error(f"📵 CAMERA DISCONNECTED: {camera_id} - {error_msg} (consecutive failures: {state['consecutive_failures']})") logger.info(f"🚨 CAMERA ERROR DETECTED - Will send detection: null to reset backend session for {camera_id}") def is_camera_connected(camera_id): """Check if camera is currently connected""" return camera_states.get(camera_id, {}).get("connected", True) def should_notify_disconnection(camera_id): """Check if we should notify backend about disconnection""" state = camera_states.get(camera_id, {}) is_disconnected = not state.get("connected", True) not_yet_notified = not state.get("disconnection_notified", False) has_enough_failures = state.get("consecutive_failures", 0) >= 3 return is_disconnected and not_yet_notified and has_enough_failures def mark_disconnection_notified(camera_id): """Mark that we've notified backend about this disconnection""" if camera_id in camera_states: camera_states[camera_id]["disconnection_notified"] = True logger.debug(f"Marked disconnection notification sent for camera {camera_id}") def get_or_init_session_pipeline_state(camera_id): """Get or initialize session pipeline state for a camera""" if camera_id not in session_pipeline_states: session_pipeline_states[camera_id] = { "mode": "validation_detecting", # "validation_detecting", "send_detections", "waiting_for_session_id", "full_pipeline", "lightweight" "session_id_received": False, "full_pipeline_completed": False, "absence_counter": 0, "max_absence_frames": 3 # Removed validation_counter and validation_threshold - now using only track-based validation } return session_pipeline_states[camera_id] def update_session_pipeline_mode(camera_id, new_mode, session_id=None): """Update session pipeline mode and related state""" state = get_or_init_session_pipeline_state(camera_id) old_mode = state["mode"] state["mode"] = new_mode if session_id: state["session_id_received"] = True state["absence_counter"] = 0 # Reset absence counter when session starts logger.info(f"📊 Camera {camera_id}: Pipeline mode changed from '{old_mode}' to '{new_mode}'") return state #################################################### # REST API endpoint for image retrieval #################################################### @app.get("/camera/{camera_id}/image") async def get_camera_image(camera_id: str): """ Get the current frame from a camera as JPEG image """ try: # URL decode the camera_id to handle encoded characters like %3B for semicolon from urllib.parse import unquote original_camera_id = camera_id camera_id = unquote(camera_id) logger.debug(f"REST API request: original='{original_camera_id}', decoded='{camera_id}'") with streams_lock: if camera_id not in streams: logger.warning(f"Camera ID '{camera_id}' not found in streams. Current streams: {list(streams.keys())}") raise HTTPException(status_code=404, detail=f"Camera {camera_id} not found or not active") # Check if we have a cached frame for this camera if camera_id not in latest_frames: logger.warning(f"No cached frame available for camera '{camera_id}'.") raise HTTPException(status_code=404, detail=f"No frame available for camera {camera_id}") frame = latest_frames[camera_id] logger.debug(f"Retrieved cached frame for camera '{camera_id}', frame shape: {frame.shape}") # Encode frame as JPEG success, buffer_img = cv2.imencode('.jpg', frame, [cv2.IMWRITE_JPEG_QUALITY, 85]) if not success: raise HTTPException(status_code=500, detail="Failed to encode image as JPEG") # Return image as binary response return Response(content=buffer_img.tobytes(), media_type="image/jpeg") except HTTPException: raise except Exception as e: logger.error(f"Error retrieving image for camera {camera_id}: {str(e)}", exc_info=True) raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") #################################################### # Detection and frame processing functions #################################################### @app.websocket("/") async def detect(websocket: WebSocket): logger.info("WebSocket connection accepted") persistent_data_dict = {} async def handle_detection(camera_id, stream, frame, websocket, model_tree, persistent_data): try: # Check camera connection state first - handle disconnection immediately if should_notify_disconnection(camera_id): logger.error(f"🚨 CAMERA DISCONNECTION DETECTED: {camera_id} - sending immediate detection: null") # Clear cached detections and occupancy state cached_detections.pop(camera_id, None) frame_skip_flags.pop(camera_id, None) cached_full_pipeline_results.pop(camera_id, None) # Clear cached pipeline results session_pipeline_states.pop(camera_id, None) # Reset session pipeline state # Reset pipeline state immediately from siwatsystem.pympta import reset_tracking_state model_id = stream.get("modelId", "unknown") reset_tracking_state(camera_id, model_id, "camera disconnected") # Send immediate detection: null to backend detection_data = { "type": "imageDetection", "subscriptionIdentifier": stream["subscriptionIdentifier"], "timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), "data": { "detection": None, # null detection for disconnection "modelId": stream["modelId"], "modelName": stream["modelName"] } } try: ws_logger.info(f"TX -> {json.dumps(detection_data, separators=(',', ':'))}") await websocket.send_json(detection_data) except RuntimeError as e: if "websocket.close" in str(e): logger.warning(f"WebSocket connection closed - cannot send disconnection signal for camera {camera_id}") return persistent_data else: raise mark_disconnection_notified(camera_id) logger.info(f"📡 SENT DISCONNECTION SIGNAL - detection: null for camera {camera_id}, backend should clear session") return persistent_data # Apply crop if specified cropped_frame = frame if all(coord is not None for coord in [stream.get("cropX1"), stream.get("cropY1"), stream.get("cropX2"), stream.get("cropY2")]): cropX1, cropY1, cropX2, cropY2 = stream["cropX1"], stream["cropY1"], stream["cropX2"], stream["cropY2"] cropped_frame = frame[cropY1:cropY2, cropX1:cropX2] logger.debug(f"Applied crop coordinates ({cropX1}, {cropY1}, {cropX2}, {cropY2}) to frame for camera {camera_id}") logger.debug(f"Processing frame for camera {camera_id} with model {stream['modelId']}") start_time = time.time() # Extract display identifier for pipeline context subscription_parts = stream["subscriptionIdentifier"].split(';') display_identifier = subscription_parts[0] if subscription_parts else None # Get backend session ID if available backend_session_id = session_ids.get(display_identifier) # Get or initialize session pipeline state pipeline_state = get_or_init_session_pipeline_state(camera_id) current_mode = pipeline_state["mode"] logger.debug(f"🔍 SESSIONID LOOKUP: display='{display_identifier}', session_id={repr(backend_session_id)}, mode='{current_mode}'") logger.debug(f"🔍 Available session_ids: {session_ids}") # ═══ SESSION ID-BASED PROCESSING MODE ═══ if not backend_session_id: # No session ID - keep current mode if it's validation_detecting or send_detections if current_mode not in ["validation_detecting", "send_detections", "waiting_for_session_id"]: update_session_pipeline_mode(camera_id, "validation_detecting") current_mode = "validation_detecting" logger.debug(f"🔍 Camera {camera_id}: No session ID - in {current_mode} mode") else: # Session ID available - switch to full pipeline mode if current_mode in ["send_detections", "waiting_for_session_id"]: # Session ID just arrived - switch to full pipeline mode update_session_pipeline_mode(camera_id, "full_pipeline", backend_session_id) current_mode = "full_pipeline" logger.info(f"🔥 Camera {camera_id}: Session ID received ({backend_session_id}) - switching to FULL PIPELINE mode") # Create context for pipeline execution pipeline_context = { "camera_id": camera_id, "display_id": display_identifier, "backend_session_id": backend_session_id } start_time = time.time() detection_result = None if current_mode == "validation_detecting": # ═══ TRACK VALIDATION MODE ═══ # Run tracking-based validation with track ID stability logger.debug(f"🔍 Camera {camera_id}: In validation_detecting mode - running track-based validation") # Get tracking configuration from model_tree tracking_config = model_tree.get("tracking", {}) tracking_enabled = tracking_config.get("enabled", True) stability_threshold = tracking_config.get("stabilityThreshold", 4) # Default to "none" - only proceed after track validation detection_result = {"class": "none", "confidence": 1.0, "bbox": [0, 0, 0, 0]} if tracking_enabled: # Run full tracking detection to get track IDs from siwatsystem.pympta import run_detection_with_tracking all_detections, regions_dict, track_validation_result = run_detection_with_tracking(cropped_frame, model_tree, pipeline_context) if track_validation_result.get("validation_complete", False): # Track validation completed - we have stable track IDs stable_tracks = track_validation_result.get("stable_tracks", []) logger.info(f"🎯 Camera {camera_id}: TRACK VALIDATION COMPLETED - stable tracks: {stable_tracks}") # Switch to send_detections mode update_session_pipeline_mode(camera_id, "send_detections") # Send the best detection with stable track if all_detections: # Find detection with stable track ID stable_detection = None for detection in all_detections: if detection.get("id") in stable_tracks: stable_detection = detection break if stable_detection: detection_result = { "class": stable_detection.get("class", "car"), "confidence": stable_detection.get("confidence", 0.0), "bbox": stable_detection.get("bbox", [0, 0, 0, 0]), "track_id": stable_detection.get("id") } logger.info(f"🚗 Camera {camera_id}: SENDING STABLE DETECTION - track ID {detection_result['track_id']}") else: logger.warning(f"⚠️ Camera {camera_id}: Stable tracks found but no matching detection") else: # Track validation still in progress stable_tracks = track_validation_result.get("stable_tracks", []) current_tracks = track_validation_result.get("current_tracks", []) if current_tracks: track_id = current_tracks[0] if current_tracks else "None" stable_status = "STABLE" if stable_tracks else "validating" logger.info(f"🔍 Camera {camera_id}: TRACK VALIDATION - car track_id {track_id} ({stable_status}, need {stability_threshold} consecutive frames)") else: logger.debug(f"👻 Camera {camera_id}: No car detected") logger.debug(f"📤 Camera {camera_id}: Sending 'none' (track validation in progress)") else: # Tracking disabled - fall back to basic detection validation logger.debug(f"🔍 Camera {camera_id}: Tracking disabled - using basic detection validation") from siwatsystem.pympta import run_lightweight_detection basic_detection = run_lightweight_detection(cropped_frame, model_tree) if basic_detection and basic_detection.get("car_detected"): best_detection = basic_detection.get("best_detection") # Increment validation counter for basic detection pipeline_state["validation_counter"] += 1 current_count = pipeline_state["validation_counter"] threshold = pipeline_state["validation_threshold"] if current_count >= threshold: update_session_pipeline_mode(camera_id, "send_detections") detection_result = { "class": best_detection.get("class", "car"), "confidence": best_detection.get("confidence", 0.0), "bbox": best_detection.get("bbox", [0, 0, 0, 0]) } logger.info(f"🎯 Camera {camera_id}: BASIC VALIDATION COMPLETED after {current_count} frames") else: logger.info(f"📊 Camera {camera_id}: Basic validation progress {current_count}/{threshold}") else: # Reset validation counter if pipeline_state["validation_counter"] > 0: pipeline_state["validation_counter"] = 0 logger.info(f"🔄 Camera {camera_id}: Reset validation counter (no detection)") elif current_mode == "send_detections": # ═══ SEND DETECTIONS MODE ═══ # Validation completed - now send detection_dict for car detections, detection: null for no car logger.debug(f"📤 Camera {camera_id}: In send_detections mode - sending detection_dict for cars") from siwatsystem.pympta import run_lightweight_detection basic_detection = run_lightweight_detection(cropped_frame, model_tree) if basic_detection and basic_detection.get("car_detected"): # Car detected - send detection_dict best_detection = basic_detection.get("best_detection") detection_result = { "class": best_detection.get("class", "car"), "confidence": best_detection.get("confidence", 0.0), "bbox": best_detection.get("bbox", [0, 0, 0, 0]) } logger.info(f"🚗 Camera {camera_id}: SENDING DETECTION_DICT - {detection_result['class']} (conf={detection_result['confidence']:.3f}) - backend should generate session ID") else: # No car detected - send "none" detection_result = {"class": "none", "confidence": 1.0, "bbox": [0, 0, 0, 0]} logger.debug(f"👻 Camera {camera_id}: No car detected - sending 'none'") elif current_mode == "waiting_for_session_id": # ═══ WAITING FOR SESSION ID MODE ═══ # Stop processing snapshots, wait for session ID logger.debug(f"⏳ Camera {camera_id}: In waiting_for_session_id mode - not processing snapshots") return persistent_data # Don't process or send anything elif current_mode == "full_pipeline": # ═══ FULL PIPELINE MODE ═══ logger.info(f"🔥 Camera {camera_id}: Running FULL PIPELINE (detection + branches + Redis + PostgreSQL)") detection_result = run_pipeline(cropped_frame, model_tree, context=pipeline_context) if detection_result and isinstance(detection_result, dict): # Cache the full pipeline result cached_full_pipeline_results[camera_id] = { "result": detection_result.copy(), "timestamp": time.time() } # Switch to lightweight mode update_session_pipeline_mode(camera_id, "lightweight") logger.info(f"✅ Camera {camera_id}: Full pipeline completed - switching to LIGHTWEIGHT mode") elif current_mode == "lightweight": # ═══ LIGHTWEIGHT MODE ═══ # Use tracking to check for stable car presence from siwatsystem.pympta import run_detection_with_tracking all_detections, regions_dict, track_validation_result = run_detection_with_tracking(cropped_frame, model_tree, pipeline_context) stable_tracks = track_validation_result.get("stable_tracks", []) current_tracks = track_validation_result.get("current_tracks", []) stable_tracks_present = bool(set(stable_tracks) & set(current_tracks)) if stable_tracks_present: # Stable tracked car still present - use cached result pipeline_state["absence_counter"] = 0 if camera_id in cached_full_pipeline_results: detection_result = cached_full_pipeline_results[camera_id]["result"] logger.debug(f"🔄 Camera {camera_id}: Stable tracked car still present - using cached detection") else: logger.warning(f"⚠️ Camera {camera_id}: Stable tracked car detected but no cached result available") detection_result = None else: # No stable tracked cars - increment absence counter pipeline_state["absence_counter"] += 1 absence_count = pipeline_state["absence_counter"] max_absence = pipeline_state["max_absence_frames"] logger.debug(f"👻 Camera {camera_id}: No stable tracked cars - absence {absence_count}/{max_absence}") if absence_count >= max_absence: # Send "none" detection and reset to validation mode detection_result = { "class": "none", "confidence": 1.0, "bbox": [0, 0, 0, 0], "branch_results": {} } cached_full_pipeline_results.pop(camera_id, None) # Clear cache update_session_pipeline_mode(camera_id, "validation_detecting") logger.info(f"📤 Camera {camera_id}: Stable tracked cars absent for {absence_count} frames - sending 'none' and resetting to track validation") else: # Still within absence tolerance - use cached result if camera_id in cached_full_pipeline_results: detection_result = cached_full_pipeline_results[camera_id]["result"] logger.debug(f"⏳ Camera {camera_id}: Stable tracked cars absent {absence_count}/{max_absence} - still using cached detection") else: detection_result = None process_time = (time.time() - start_time) * 1000 logger.debug(f"Detection for camera {camera_id} completed in {process_time:.2f}ms (mode: {current_mode})") # Skip processing if no detection result (blocked by session gating) if detection_result is None: logger.debug(f"No detection result to process for camera {camera_id}") return persistent_data # Log the raw detection result for debugging logger.debug(f"Raw detection result for camera {camera_id}:\n{json.dumps(detection_result, indent=2, default=str)}") # Extract session_id from pipeline result (always use backend sessionId) session_id = backend_session_id logger.debug(f"Using backend session_id: {session_id}") # Process detection result based on current mode if current_mode == "validation_detecting": # ═══ VALIDATION DETECTING MODE ═══ # Always send detection: null during validation phase detection_dict = None logger.debug(f"🔍 SENDING 'NONE' - validation_detecting mode for camera {camera_id}") elif current_mode == "send_detections": # ═══ SEND DETECTIONS MODE ═══ if detection_result.get("class") == "none": # No car detected - send detection: null detection_dict = None logger.debug(f"📤 SENDING 'NONE' - send_detections mode (no car) for camera {camera_id}") else: # Car detected - check if we have sessionId to determine what to send if backend_session_id: # Have sessionId - send full detection_dict for database updates detection_dict = { "carModel": None, "carBrand": None, "carYear": None, "bodyType": None, "licensePlateText": None, "licensePlateConfidence": None } logger.info(f"📤 SENDING FULL DETECTION_DICT - send_detections mode with sessionId {backend_session_id} (conf={detection_result.get('confidence', 0):.3f}) for camera {camera_id}") else: # No sessionId - send empty detection_dict to trigger backend to generate sessionId detection_dict = {} logger.info(f"📤 SENDING EMPTY DETECTION_DICT - send_detections mode without sessionId, triggering backend to generate sessionId (conf={detection_result.get('confidence', 0):.3f}) for camera {camera_id}") elif detection_result.get("class") == "none": # "None" detection in other modes (lightweight) - car left or absent for 3 frames detection_dict = None logger.info(f"📤 SENDING 'NONE' (detection: null) - Car absent, expecting backend to clear session for camera {camera_id}") else: # Valid detection - convert to backend format detection_dict = { "carModel": None, "carBrand": None, "carYear": None, "bodyType": None, "licensePlateText": None, "licensePlateConfidence": None } # Extract and process branch results from parallel classification (only for valid detections) if detection_result.get("class") != "none" and "branch_results" in detection_result: def process_branch_results(branch_results, depth=0): """Recursively process branch results including nested branches.""" if not isinstance(branch_results, dict): return indent = " " * depth for branch_id, branch_data in branch_results.items(): if isinstance(branch_data, dict): logger.debug(f"{indent}Processing branch {branch_id}: {branch_data}") # Map common classification fields to backend-expected names if "brand" in branch_data: detection_dict["carBrand"] = branch_data["brand"] logger.debug(f"{indent}Mapped carBrand: {branch_data['brand']}") if "body_type" in branch_data: detection_dict["bodyType"] = branch_data["body_type"] logger.debug(f"{indent}Mapped bodyType: {branch_data['body_type']}") if "class" in branch_data: class_name = branch_data["class"] # Map based on branch/model type if "brand" in branch_id.lower(): detection_dict["carBrand"] = class_name logger.debug(f"{indent}Mapped carBrand from class: {class_name}") elif "bodytype" in branch_id.lower() or "body" in branch_id.lower(): detection_dict["bodyType"] = class_name logger.debug(f"{indent}Mapped bodyType from class: {class_name}") # Process nested branch results recursively if "branch_results" in branch_data: logger.debug(f"{indent}Processing nested branches in {branch_id}") process_branch_results(branch_data["branch_results"], depth + 1) branch_results = detection_result.get("branch_results", {}) if branch_results: logger.debug(f"Processing branch results: {branch_results}") process_branch_results(branch_results) logger.info(f"Detection payload after branch processing: {detection_dict}") else: logger.debug("No branch results found in detection result") detection_data = { "type": "imageDetection", "subscriptionIdentifier": stream["subscriptionIdentifier"], "timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), # "timestamp": time.strftime("%Y-%m-%dT%H:%M:%S", time.gmtime()) + f".{int(time.time() * 1000) % 1000:03d}Z", "data": { "detection": detection_dict, "modelId": stream["modelId"], "modelName": stream["modelName"] } } # Add session ID to detection data (NOT for "none" detections - backend uses absence of sessionId to know to clear the session) if session_id and detection_result.get("class") != "none": detection_data["sessionId"] = session_id logger.debug(f"Including sessionId {session_id} in WebSocket message") elif detection_result.get("class") == "none": logger.debug(f"NOT including sessionId in 'none' detection - backend should clear session") # Log detection details if detection_result.get("class") != "none": confidence = detection_result.get("confidence", 0.0) logger.info(f"Camera {camera_id}: Detected {detection_result['class']} with confidence {confidence:.2f} using model {stream['modelName']}") # Send detection data to backend (session gating handled above in processing logic) logger.debug(f"📤 SENDING TO BACKEND for camera {camera_id}: {json.dumps(detection_data, indent=2)}") try: ws_logger.info(f"TX -> {json.dumps(detection_data, separators=(',', ':'))}") await websocket.send_json(detection_data) logger.debug(f"Sent detection data to client for camera {camera_id}") except RuntimeError as e: if "websocket.close" in str(e): logger.warning(f"WebSocket connection closed - cannot send detection data for camera {camera_id}") return persistent_data else: raise # Log status after sending if session_id and detection_result.get("class") != "none": logger.info(f"📤 WEBSOCKET RESPONSE with sessionId: {session_id} for camera {camera_id}") elif detection_result.get("class") == "none": logger.info(f"📡 SENT 'none' detection - backend should clear session for camera {camera_id}") else: logger.info(f"📡 Detection data sent for camera {camera_id}") return persistent_data except Exception as e: logger.error(f"Error in handle_detection for camera {camera_id}: {str(e)}", exc_info=True) return persistent_data def frame_reader(camera_id, cap, buffer, stop_event): retries = 0 logger.info(f"Starting frame reader thread for camera {camera_id}") frame_count = 0 last_log_time = time.time() try: # Log initial camera status and properties if cap.isOpened(): width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) fps = cap.get(cv2.CAP_PROP_FPS) logger.info(f"Camera {camera_id} opened successfully with resolution {width}x{height}, FPS: {fps}") set_camera_connected(camera_id, True) else: logger.error(f"Camera {camera_id} failed to open initially") set_camera_connected(camera_id, False, "Failed to open camera initially") while not stop_event.is_set(): try: if not cap.isOpened(): logger.error(f"Camera {camera_id} is not open before trying to read") # Attempt to reopen cap = cv2.VideoCapture(streams[camera_id]["rtsp_url"]) time.sleep(reconnect_interval) continue logger.debug(f"Attempting to read frame from camera {camera_id}") ret, frame = cap.read() if not ret: error_msg = f"Connection lost for camera: {camera_id}, retry {retries+1}/{max_retries}" logger.warning(error_msg) set_camera_connected(camera_id, False, error_msg) cap.release() time.sleep(reconnect_interval) retries += 1 if retries > max_retries and max_retries != -1: logger.error(f"Max retries reached for camera: {camera_id}, stopping frame reader") set_camera_connected(camera_id, False, "Max retries reached") break # Re-open logger.info(f"Attempting to reopen RTSP stream for camera: {camera_id}") cap = cv2.VideoCapture(streams[camera_id]["rtsp_url"]) if not cap.isOpened(): logger.error(f"Failed to reopen RTSP stream for camera: {camera_id}") set_camera_connected(camera_id, False, "Failed to reopen RTSP stream") continue logger.info(f"Successfully reopened RTSP stream for camera: {camera_id}") set_camera_connected(camera_id, True) continue # Successfully read a frame frame_count += 1 current_time = time.time() # Log frame stats every 5 seconds if current_time - last_log_time > 5: logger.info(f"Camera {camera_id}: Read {frame_count} frames in the last {current_time - last_log_time:.1f} seconds") frame_count = 0 last_log_time = current_time logger.debug(f"Successfully read frame from camera {camera_id}, shape: {frame.shape}") retries = 0 set_camera_connected(camera_id, True) # Mark as connected on successful frame read # Overwrite old frame if buffer is full if not buffer.empty(): try: buffer.get_nowait() logger.debug(f"[frame_reader] Removed old frame from buffer for camera {camera_id}") except queue.Empty: pass buffer.put(frame) logger.debug(f"[frame_reader] Added new frame to buffer for camera {camera_id}. Buffer size: {buffer.qsize()}") # Short sleep to avoid CPU overuse time.sleep(0.01) except cv2.error as e: error_msg = f"OpenCV error for camera {camera_id}: {e}" logger.error(error_msg, exc_info=True) set_camera_connected(camera_id, False, error_msg) cap.release() time.sleep(reconnect_interval) retries += 1 if retries > max_retries and max_retries != -1: logger.error(f"Max retries reached after OpenCV error for camera {camera_id}") set_camera_connected(camera_id, False, "Max retries reached after OpenCV error") break logger.info(f"Attempting to reopen RTSP stream after OpenCV error for camera: {camera_id}") cap = cv2.VideoCapture(streams[camera_id]["rtsp_url"]) if not cap.isOpened(): logger.error(f"Failed to reopen RTSP stream for camera {camera_id} after OpenCV error") set_camera_connected(camera_id, False, "Failed to reopen after OpenCV error") continue logger.info(f"Successfully reopened RTSP stream after OpenCV error for camera: {camera_id}") set_camera_connected(camera_id, True) except Exception as e: error_msg = f"Unexpected error for camera {camera_id}: {str(e)}" logger.error(error_msg, exc_info=True) set_camera_connected(camera_id, False, error_msg) cap.release() break except Exception as e: logger.error(f"Error in frame_reader thread for camera {camera_id}: {str(e)}", exc_info=True) finally: logger.info(f"Frame reader thread for camera {camera_id} is exiting") if cap and cap.isOpened(): cap.release() def snapshot_reader(camera_id, snapshot_url, snapshot_interval, buffer, stop_event): """Frame reader that fetches snapshots from HTTP/HTTPS URL at specified intervals""" retries = 0 consecutive_failures = 0 # Track consecutive failures for backoff logger.info(f"Starting snapshot reader thread for camera {camera_id} from {snapshot_url}") frame_count = 0 last_log_time = time.time() # Initialize camera state set_camera_connected(camera_id, True) try: interval_seconds = snapshot_interval / 1000.0 # Convert milliseconds to seconds logger.info(f"Snapshot interval for camera {camera_id}: {interval_seconds}s") while not stop_event.is_set(): try: start_time = time.time() frame = fetch_snapshot(snapshot_url) if frame is None: consecutive_failures += 1 error_msg = f"Failed to fetch snapshot for camera: {camera_id}, consecutive failures: {consecutive_failures}" logger.warning(error_msg) set_camera_connected(camera_id, False, error_msg) retries += 1 # Check network connectivity with a simple ping-like test if consecutive_failures % 5 == 1: # Every 5th failure, test connectivity try: test_response = requests.get(snapshot_url, timeout=(2, 5), stream=False) logger.info(f"Camera {camera_id}: Connectivity test result: {test_response.status_code}") except Exception as test_error: logger.warning(f"Camera {camera_id}: Connectivity test failed: {test_error}") if retries > max_retries and max_retries != -1: logger.error(f"Max retries reached for snapshot camera: {camera_id}, stopping reader") set_camera_connected(camera_id, False, "Max retries reached for snapshot camera") break # Exponential backoff based on consecutive failures backoff_delay = min(30, max(1, min(2 ** min(consecutive_failures - 1, 6), interval_seconds * 2))) # Start with 1s, max 30s logger.debug(f"Camera {camera_id}: Backing off for {backoff_delay:.1f}s (consecutive failures: {consecutive_failures})") if stop_event.wait(backoff_delay): # Use wait with timeout instead of sleep break # Exit if stop_event is set during backoff continue # Successfully fetched a frame - reset consecutive failures consecutive_failures = 0 # Reset backoff on success frame_count += 1 current_time = time.time() # Log frame stats every 5 seconds if current_time - last_log_time > 5: logger.info(f"Camera {camera_id}: Fetched {frame_count} snapshots in the last {current_time - last_log_time:.1f} seconds") frame_count = 0 last_log_time = current_time logger.debug(f"Successfully fetched snapshot from camera {camera_id}, shape: {frame.shape}") retries = 0 set_camera_connected(camera_id, True) # Mark as connected on successful snapshot # Overwrite old frame if buffer is full if not buffer.empty(): try: buffer.get_nowait() logger.debug(f"[snapshot_reader] Removed old snapshot from buffer for camera {camera_id}") except queue.Empty: pass buffer.put(frame) logger.debug(f"[snapshot_reader] Added new snapshot to buffer for camera {camera_id}. Buffer size: {buffer.qsize()}") # Wait for the specified interval elapsed = time.time() - start_time sleep_time = max(interval_seconds - elapsed, 0) if sleep_time > 0: time.sleep(sleep_time) except Exception as e: consecutive_failures += 1 error_msg = f"Unexpected error fetching snapshot for camera {camera_id}: {str(e)}" logger.error(error_msg, exc_info=True) set_camera_connected(camera_id, False, error_msg) retries += 1 if retries > max_retries and max_retries != -1: logger.error(f"Max retries reached after error for snapshot camera {camera_id}") set_camera_connected(camera_id, False, "Max retries reached after error") break # Exponential backoff for exceptions too backoff_delay = min(30, max(1, min(2 ** min(consecutive_failures - 1, 6), interval_seconds * 2))) # Start with 1s, max 30s logger.debug(f"Camera {camera_id}: Exception backoff for {backoff_delay:.1f}s (consecutive failures: {consecutive_failures})") if stop_event.wait(backoff_delay): # Use wait with timeout instead of sleep break # Exit if stop_event is set during backoff except Exception as e: logger.error(f"Error in snapshot_reader thread for camera {camera_id}: {str(e)}", exc_info=True) finally: logger.info(f"Snapshot reader thread for camera {camera_id} is exiting") async def reconcile_subscriptions(desired_subscriptions, websocket): """ Declarative reconciliation: Compare desired vs current subscriptions and make changes """ logger.info(f"Reconciling subscriptions: {len(desired_subscriptions)} desired") with streams_lock: # Get current subscriptions current_subscription_ids = set(streams.keys()) desired_subscription_ids = set(sub["subscriptionIdentifier"] for sub in desired_subscriptions) # Find what to add and remove to_add = desired_subscription_ids - current_subscription_ids to_remove = current_subscription_ids - desired_subscription_ids to_check_for_changes = current_subscription_ids & desired_subscription_ids logger.info(f"Reconciliation: {len(to_add)} to add, {len(to_remove)} to remove, {len(to_check_for_changes)} to check for changes") # Remove subscriptions that are no longer wanted for subscription_id in to_remove: await unsubscribe_internal(subscription_id) # Check existing subscriptions for parameter changes for subscription_id in to_check_for_changes: desired_sub = next(sub for sub in desired_subscriptions if sub["subscriptionIdentifier"] == subscription_id) current_stream = streams[subscription_id] # Check if parameters changed if has_subscription_changed(desired_sub, current_stream): logger.info(f"Parameters changed for {subscription_id}, resubscribing") await unsubscribe_internal(subscription_id) await subscribe_internal(desired_sub, websocket) # Add new subscriptions for subscription_id in to_add: desired_sub = next(sub for sub in desired_subscriptions if sub["subscriptionIdentifier"] == subscription_id) await subscribe_internal(desired_sub, websocket) def has_subscription_changed(desired_sub, current_stream): """Check if subscription parameters have changed""" return ( desired_sub.get("rtspUrl") != current_stream.get("rtsp_url") or desired_sub.get("snapshotUrl") != current_stream.get("snapshot_url") or desired_sub.get("snapshotInterval") != current_stream.get("snapshot_interval") or desired_sub.get("cropX1") != current_stream.get("cropX1") or desired_sub.get("cropY1") != current_stream.get("cropY1") or desired_sub.get("cropX2") != current_stream.get("cropX2") or desired_sub.get("cropY2") != current_stream.get("cropY2") or desired_sub.get("modelId") != current_stream.get("modelId") or desired_sub.get("modelName") != current_stream.get("modelName") ) async def subscribe_internal(subscription, websocket): """Internal subscription logic extracted from original subscribe handler""" subscriptionIdentifier = subscription.get("subscriptionIdentifier") rtsp_url = subscription.get("rtspUrl") snapshot_url = subscription.get("snapshotUrl") snapshot_interval = subscription.get("snapshotInterval") model_url = subscription.get("modelUrl") modelId = subscription.get("modelId") modelName = subscription.get("modelName") cropX1 = subscription.get("cropX1") cropY1 = subscription.get("cropY1") cropX2 = subscription.get("cropX2") cropY2 = subscription.get("cropY2") # Extract camera_id from subscriptionIdentifier parts = subscriptionIdentifier.split(';') if len(parts) != 2: logger.error(f"Invalid subscriptionIdentifier format: {subscriptionIdentifier}") return display_identifier, camera_identifier = parts camera_id = subscriptionIdentifier # Load model if needed if model_url: with models_lock: if (camera_id not in models) or (modelId not in models[camera_id]): logger.info(f"Loading model from {model_url} for camera {camera_id}, modelId {modelId}") extraction_dir = os.path.join("models", camera_identifier, str(modelId)) os.makedirs(extraction_dir, exist_ok=True) # Handle model loading (same as original) parsed = urlparse(model_url) if parsed.scheme in ("http", "https"): filename = os.path.basename(parsed.path) or f"model_{modelId}.mpta" local_mpta = os.path.join(extraction_dir, filename) local_path = download_mpta(model_url, local_mpta) if not local_path: logger.error(f"Failed to download model from {model_url}") return model_tree = load_pipeline_from_zip(local_path, extraction_dir) else: if not os.path.exists(model_url): logger.error(f"Model file not found: {model_url}") return model_tree = load_pipeline_from_zip(model_url, extraction_dir) if model_tree is None: logger.error(f"Failed to load model {modelId}") return if camera_id not in models: models[camera_id] = {} models[camera_id][modelId] = model_tree # Create stream (same logic as original) if camera_id and (rtsp_url or snapshot_url) and len(streams) < max_streams: camera_url = snapshot_url if snapshot_url else rtsp_url # Check if we already have a stream for this camera URL shared_stream = camera_streams.get(camera_url) if shared_stream: # Reuse existing stream buffer = shared_stream["buffer"] stop_event = shared_stream["stop_event"] thread = shared_stream["thread"] mode = shared_stream["mode"] shared_stream["ref_count"] = shared_stream.get("ref_count", 0) + 1 else: # Create new stream buffer = queue.Queue(maxsize=1) stop_event = threading.Event() if snapshot_url and snapshot_interval: thread = threading.Thread(target=snapshot_reader, args=(camera_id, snapshot_url, snapshot_interval, buffer, stop_event)) thread.daemon = True thread.start() mode = "snapshot" shared_stream = { "buffer": buffer, "thread": thread, "stop_event": stop_event, "mode": mode, "url": snapshot_url, "snapshot_interval": snapshot_interval, "ref_count": 1 } camera_streams[camera_url] = shared_stream elif rtsp_url: cap = cv2.VideoCapture(rtsp_url) if not cap.isOpened(): logger.error(f"Failed to open RTSP stream for camera {camera_id}") return thread = threading.Thread(target=frame_reader, args=(camera_id, cap, buffer, stop_event)) thread.daemon = True thread.start() mode = "rtsp" shared_stream = { "buffer": buffer, "thread": thread, "stop_event": stop_event, "mode": mode, "url": rtsp_url, "cap": cap, "ref_count": 1 } camera_streams[camera_url] = shared_stream else: logger.error(f"No valid URL provided for camera {camera_id}") return # Create stream info stream_info = { "buffer": buffer, "thread": thread, "stop_event": stop_event, "modelId": modelId, "modelName": modelName, "subscriptionIdentifier": subscriptionIdentifier, "cropX1": cropX1, "cropY1": cropY1, "cropX2": cropX2, "cropY2": cropY2, "mode": mode, "camera_url": camera_url, "modelUrl": model_url } if mode == "snapshot": stream_info["snapshot_url"] = snapshot_url stream_info["snapshot_interval"] = snapshot_interval elif mode == "rtsp": stream_info["rtsp_url"] = rtsp_url stream_info["cap"] = shared_stream["cap"] streams[camera_id] = stream_info subscription_to_camera[camera_id] = camera_url logger.info(f"Subscribed to camera {camera_id}") # Send initial "none" detection to backend on camera connect initial_detection_data = { "type": "imageDetection", "subscriptionIdentifier": subscriptionIdentifier, "timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), "data": { "detection": None, "modelId": modelId, "modelName": modelName } } ws_logger.info(f"TX -> {json.dumps(initial_detection_data, separators=(',', ':'))}") await websocket.send_json(initial_detection_data) logger.info(f"📡 Sent initial 'none' detection to backend for camera {camera_id}") logger.debug(f"Initial detection data: {initial_detection_data}") async def unsubscribe_internal(subscription_id): """Internal unsubscription logic""" if subscription_id in streams: stream = streams.pop(subscription_id) camera_url = subscription_to_camera.pop(subscription_id, None) if camera_url and camera_url in camera_streams: shared_stream = camera_streams[camera_url] shared_stream["ref_count"] -= 1 if shared_stream["ref_count"] <= 0: shared_stream["stop_event"].set() shared_stream["thread"].join() if "cap" in shared_stream: shared_stream["cap"].release() del camera_streams[camera_url] latest_frames.pop(subscription_id, None) cached_detections.pop(subscription_id, None) # Clear cached detection frame_skip_flags.pop(subscription_id, None) # Clear frame skip flag camera_states.pop(subscription_id, None) # Clear camera state cached_full_pipeline_results.pop(subscription_id, None) # Clear cached pipeline results session_pipeline_states.pop(subscription_id, None) # Clear session pipeline state cleanup_camera_stability(subscription_id) logger.info(f"Unsubscribed from camera {subscription_id}") async def process_streams(): logger.info("Started processing streams") try: while True: start_time = time.time() with streams_lock: current_streams = list(streams.items()) if current_streams: logger.debug(f"Processing {len(current_streams)} active streams") else: logger.debug("No active streams to process") for camera_id, stream in current_streams: buffer = stream["buffer"] if buffer.empty(): logger.debug(f"Frame buffer is empty for camera {camera_id}") continue logger.debug(f"Got frame from buffer for camera {camera_id}") frame = buffer.get() # Cache the frame for REST API access latest_frames[camera_id] = frame.copy() logger.debug(f"Cached frame for REST API access for camera {camera_id}") with models_lock: model_tree = models.get(camera_id, {}).get(stream["modelId"]) if not model_tree: logger.warning(f"Model not found for camera {camera_id}, modelId {stream['modelId']}") continue logger.debug(f"Found model tree for camera {camera_id}, modelId {stream['modelId']}") key = (camera_id, stream["modelId"]) persistent_data = persistent_data_dict.get(key, {}) logger.debug(f"Starting detection for camera {camera_id} with modelId {stream['modelId']}") updated_persistent_data = await handle_detection( camera_id, stream, frame, websocket, model_tree, persistent_data ) persistent_data_dict[key] = updated_persistent_data elapsed_time = (time.time() - start_time) * 1000 # ms sleep_time = max(poll_interval - elapsed_time, 0) logger.debug(f"Frame processing cycle: {elapsed_time:.2f}ms, sleeping for: {sleep_time:.2f}ms") await asyncio.sleep(sleep_time / 1000.0) except asyncio.CancelledError: logger.info("Stream processing task cancelled") except Exception as e: logger.error(f"Error in process_streams: {str(e)}", exc_info=True) async def send_heartbeat(): while True: try: cpu_usage = psutil.cpu_percent() memory_usage = psutil.virtual_memory().percent if torch.cuda.is_available(): gpu_usage = torch.cuda.utilization() if hasattr(torch.cuda, 'utilization') else None gpu_memory_usage = torch.cuda.memory_reserved() / (1024 ** 2) else: gpu_usage = None gpu_memory_usage = None camera_connections = [ { "subscriptionIdentifier": stream["subscriptionIdentifier"], "modelId": stream["modelId"], "modelName": stream["modelName"], "online": True, # Include all subscription parameters for proper change detection "rtspUrl": stream.get("rtsp_url"), "snapshotUrl": stream.get("snapshot_url"), "snapshotInterval": stream.get("snapshot_interval"), **{k: v for k, v in get_crop_coords(stream).items() if v is not None} } for camera_id, stream in streams.items() ] state_report = { "type": "stateReport", "cpuUsage": cpu_usage, "memoryUsage": memory_usage, "gpuUsage": gpu_usage, "gpuMemoryUsage": gpu_memory_usage, "cameraConnections": camera_connections } await websocket.send_text(json.dumps(state_report)) logger.debug(f"Sent stateReport as heartbeat: CPU {cpu_usage:.1f}%, Memory {memory_usage:.1f}%, {len(camera_connections)} active cameras") await asyncio.sleep(HEARTBEAT_INTERVAL) except Exception as e: logger.error(f"Error sending stateReport heartbeat: {e}") break async def on_message(): while True: try: msg = await websocket.receive_text() ws_logger.info(f"RX <- {msg}") logger.debug(f"Received message: {msg}") data = json.loads(msg) msg_type = data.get("type") if msg_type == "setSubscriptionList": # Declarative approach: Backend sends list of subscriptions this worker should have desired_subscriptions = data.get("subscriptions", []) logger.info(f"Received subscription list with {len(desired_subscriptions)} subscriptions") await reconcile_subscriptions(desired_subscriptions, websocket) elif msg_type == "subscribe": # Legacy support - convert single subscription to list payload = data.get("payload", {}) await reconcile_subscriptions([payload], websocket) elif msg_type == "unsubscribe": # Legacy support - remove subscription payload = data.get("payload", {}) subscriptionIdentifier = payload.get("subscriptionIdentifier") # Remove from current subscriptions and reconcile current_subs = [] with streams_lock: for camera_id, stream in streams.items(): if stream["subscriptionIdentifier"] != subscriptionIdentifier: # Convert stream back to subscription format current_subs.append({ "subscriptionIdentifier": stream["subscriptionIdentifier"], "rtspUrl": stream.get("rtsp_url"), "snapshotUrl": stream.get("snapshot_url"), "snapshotInterval": stream.get("snapshot_interval"), "modelId": stream["modelId"], "modelName": stream["modelName"], "modelUrl": stream.get("modelUrl", ""), "cropX1": stream.get("cropX1"), "cropY1": stream.get("cropY1"), "cropX2": stream.get("cropX2"), "cropY2": stream.get("cropY2") }) await reconcile_subscriptions(current_subs, websocket) elif msg_type == "old_subscribe_logic_removed": if model_url: with models_lock: if (camera_id not in models) or (modelId not in models[camera_id]): logger.info(f"Loading model from {model_url} for camera {camera_id}, modelId {modelId}") extraction_dir = os.path.join("models", camera_identifier, str(modelId)) os.makedirs(extraction_dir, exist_ok=True) # If model_url is remote, download it first. parsed = urlparse(model_url) if parsed.scheme in ("http", "https"): logger.info(f"Downloading remote .mpta file from {model_url}") filename = os.path.basename(parsed.path) or f"model_{modelId}.mpta" local_mpta = os.path.join(extraction_dir, filename) logger.debug(f"Download destination: {local_mpta}") local_path = download_mpta(model_url, local_mpta) if not local_path: logger.error(f"Failed to download the remote .mpta file from {model_url}") error_response = { "type": "error", "subscriptionIdentifier": subscriptionIdentifier, "error": f"Failed to download model from {model_url}" } ws_logger.info(f"TX -> {json.dumps(error_response, separators=(',', ':'))}") await websocket.send_json(error_response) continue model_tree = load_pipeline_from_zip(local_path, extraction_dir) else: logger.info(f"Loading local .mpta file from {model_url}") # Check if file exists before attempting to load if not os.path.exists(model_url): logger.error(f"Local .mpta file not found: {model_url}") logger.debug(f"Current working directory: {os.getcwd()}") error_response = { "type": "error", "subscriptionIdentifier": subscriptionIdentifier, "error": f"Model file not found: {model_url}" } ws_logger.info(f"TX -> {json.dumps(error_response, separators=(',', ':'))}") await websocket.send_json(error_response) continue model_tree = load_pipeline_from_zip(model_url, extraction_dir) if model_tree is None: logger.error(f"Failed to load model {modelId} from .mpta file for camera {camera_id}") error_response = { "type": "error", "subscriptionIdentifier": subscriptionIdentifier, "error": f"Failed to load model {modelId}" } await websocket.send_json(error_response) continue if camera_id not in models: models[camera_id] = {} models[camera_id][modelId] = model_tree logger.info(f"Successfully loaded model {modelId} for camera {camera_id}") logger.debug(f"Model extraction directory: {extraction_dir}") if camera_id and (rtsp_url or snapshot_url): with streams_lock: # Determine camera URL for shared stream management camera_url = snapshot_url if snapshot_url else rtsp_url if camera_id not in streams and len(streams) < max_streams: # Check if we already have a stream for this camera URL shared_stream = camera_streams.get(camera_url) if shared_stream: # Reuse existing stream logger.info(f"Reusing existing stream for camera URL: {camera_url}") buffer = shared_stream["buffer"] stop_event = shared_stream["stop_event"] thread = shared_stream["thread"] mode = shared_stream["mode"] # Increment reference count shared_stream["ref_count"] = shared_stream.get("ref_count", 0) + 1 else: # Create new stream buffer = queue.Queue(maxsize=1) stop_event = threading.Event() if snapshot_url and snapshot_interval: logger.info(f"Creating new snapshot stream for camera {camera_id}: {snapshot_url}") thread = threading.Thread(target=snapshot_reader, args=(camera_id, snapshot_url, snapshot_interval, buffer, stop_event)) thread.daemon = True thread.start() mode = "snapshot" # Store shared stream info shared_stream = { "buffer": buffer, "thread": thread, "stop_event": stop_event, "mode": mode, "url": snapshot_url, "snapshot_interval": snapshot_interval, "ref_count": 1 } camera_streams[camera_url] = shared_stream elif rtsp_url: logger.info(f"Creating new RTSP stream for camera {camera_id}: {rtsp_url}") cap = cv2.VideoCapture(rtsp_url) if not cap.isOpened(): logger.error(f"Failed to open RTSP stream for camera {camera_id}") continue thread = threading.Thread(target=frame_reader, args=(camera_id, cap, buffer, stop_event)) thread.daemon = True thread.start() mode = "rtsp" # Store shared stream info shared_stream = { "buffer": buffer, "thread": thread, "stop_event": stop_event, "mode": mode, "url": rtsp_url, "cap": cap, "ref_count": 1 } camera_streams[camera_url] = shared_stream else: logger.error(f"No valid URL provided for camera {camera_id}") continue # Create stream info for this subscription stream_info = { "buffer": buffer, "thread": thread, "stop_event": stop_event, "modelId": modelId, "modelName": modelName, "subscriptionIdentifier": subscriptionIdentifier, "cropX1": cropX1, "cropY1": cropY1, "cropX2": cropX2, "cropY2": cropY2, "mode": mode, "camera_url": camera_url } if mode == "snapshot": stream_info["snapshot_url"] = snapshot_url stream_info["snapshot_interval"] = snapshot_interval elif mode == "rtsp": stream_info["rtsp_url"] = rtsp_url stream_info["cap"] = shared_stream["cap"] streams[camera_id] = stream_info subscription_to_camera[camera_id] = camera_url elif camera_id and camera_id in streams: # If already subscribed, unsubscribe first logger.info(f"Resubscribing to camera {camera_id}") # Note: Keep models in memory for reuse across subscriptions elif msg_type == "unsubscribe": payload = data.get("payload", {}) subscriptionIdentifier = payload.get("subscriptionIdentifier") camera_id = subscriptionIdentifier with streams_lock: if camera_id and camera_id in streams: stream = streams.pop(camera_id) camera_url = subscription_to_camera.pop(camera_id, None) if camera_url and camera_url in camera_streams: shared_stream = camera_streams[camera_url] shared_stream["ref_count"] -= 1 # If no more references, stop the shared stream if shared_stream["ref_count"] <= 0: logger.info(f"Stopping shared stream for camera URL: {camera_url}") shared_stream["stop_event"].set() shared_stream["thread"].join() if "cap" in shared_stream: shared_stream["cap"].release() del camera_streams[camera_url] else: logger.info(f"Shared stream for {camera_url} still has {shared_stream['ref_count']} references") # Clean up cached frame and stability tracking latest_frames.pop(camera_id, None) cached_detections.pop(camera_id, None) # Clear cached detection frame_skip_flags.pop(camera_id, None) # Clear frame skip flag camera_states.pop(camera_id, None) # Clear camera state cleanup_camera_stability(camera_id) logger.info(f"Unsubscribed from camera {camera_id}") # Note: Keep models in memory for potential reuse elif msg_type == "requestState": cpu_usage = psutil.cpu_percent() memory_usage = psutil.virtual_memory().percent if torch.cuda.is_available(): gpu_usage = torch.cuda.utilization() if hasattr(torch.cuda, 'utilization') else None gpu_memory_usage = torch.cuda.memory_reserved() / (1024 ** 2) else: gpu_usage = None gpu_memory_usage = None camera_connections = [ { "subscriptionIdentifier": stream["subscriptionIdentifier"], "modelId": stream["modelId"], "modelName": stream["modelName"], "online": True, # Include all subscription parameters for proper change detection "rtspUrl": stream.get("rtsp_url"), "snapshotUrl": stream.get("snapshot_url"), "snapshotInterval": stream.get("snapshot_interval"), **{k: v for k, v in get_crop_coords(stream).items() if v is not None} } for camera_id, stream in streams.items() ] state_report = { "type": "stateReport", "cpuUsage": cpu_usage, "memoryUsage": memory_usage, "gpuUsage": gpu_usage, "gpuMemoryUsage": gpu_memory_usage, "cameraConnections": camera_connections } await websocket.send_text(json.dumps(state_report)) elif msg_type == "setSessionId": payload = data.get("payload", {}) display_identifier = payload.get("displayIdentifier") session_id = payload.get("sessionId") # Debug sessionId value types and contents session_id_type = type(session_id).__name__ if session_id is None: logger.info(f"🆔 BACKEND SESSIONID RECEIVED: displayId={display_identifier}, sessionId=None (type: {session_id_type})") logger.info(f"🔄 BACKEND WANTS TO CLEAR SESSION for display {display_identifier}") elif session_id == "null": logger.info(f"🆔 BACKEND SESSIONID RECEIVED: displayId={display_identifier}, sessionId='null' (type: {session_id_type})") logger.info(f"🔄 BACKEND SENT STRING 'null' for display {display_identifier}") elif session_id == "": logger.info(f"🆔 BACKEND SESSIONID RECEIVED: displayId={display_identifier}, sessionId='' (empty string, type: {session_id_type})") logger.info(f"🔄 BACKEND SENT EMPTY STRING for display {display_identifier}") else: logger.info(f"🆔 BACKEND SESSIONID RECEIVED: displayId={display_identifier}, sessionId='{session_id}' (type: {session_id_type}, length: {len(str(session_id))})") logger.info(f"🔄 BACKEND CREATED/UPDATED SESSION: {session_id} for display {display_identifier}") logger.debug(f"Full setSessionId payload: {payload}") logger.debug(f"WebSocket message raw data: {json.dumps(data, indent=2)}") logger.debug(f"Current active cameras: {list(streams.keys())}") if display_identifier: # Store session ID for this display if session_id is None or session_id == "null" or session_id == "": old_session_id = session_ids.get(display_identifier) session_ids.pop(display_identifier, None) if session_id is None: logger.info(f"🚫 BACKEND ENDED SESSION: Cleared session ID for display {display_identifier} (was: {old_session_id}) - received None") elif session_id == "null": logger.info(f"🚫 BACKEND ENDED SESSION: Cleared session ID for display {display_identifier} (was: {old_session_id}) - received string 'null'") elif session_id == "": logger.info(f"🚫 BACKEND ENDED SESSION: Cleared session ID for display {display_identifier} (was: {old_session_id}) - received empty string") logger.debug(f"Session IDs after clearing: {session_ids}") # Reset tracking state for all cameras associated with this display with streams_lock: affected_cameras = [] for camera_id, stream in streams.items(): if stream["subscriptionIdentifier"].startswith(display_identifier + ";"): affected_cameras.append(camera_id) # Import here to avoid circular import from siwatsystem.pympta import reset_tracking_state model_id = stream.get("modelId", "unknown") reset_tracking_state(camera_id, model_id, "backend session ended") logger.info(f"Reset tracking for camera {camera_id} (display: {display_identifier})") logger.debug(f"Reset tracking for {len(affected_cameras)} cameras: {affected_cameras}") else: old_session_id = session_ids.get(display_identifier) session_ids[display_identifier] = session_id logger.info(f"✅ BACKEND SESSION STARTED: Set session ID {session_id} for display {display_identifier} (previous: {old_session_id})") logger.debug(f"Session IDs after update: {session_ids}") logger.debug(f"🎯 CMS Backend created sessionId {session_id} after receiving detection data") # Clear waiting state for cameras associated with this display with streams_lock: affected_cameras = [] for camera_id, stream in streams.items(): if stream["subscriptionIdentifier"].startswith(display_identifier + ";"): affected_cameras.append(camera_id) from siwatsystem.pympta import get_camera_stability_data model_id = stream.get("modelId", "unknown") stability_data = get_camera_stability_data(camera_id, model_id) session_state = stability_data["session_state"] if session_state.get("waiting_for_backend_session", False): session_state["waiting_for_backend_session"] = False session_state["wait_start_time"] = 0.0 logger.info(f"🚀 PIPELINE UNBLOCKED: Backend sessionId {session_id} received - camera {camera_id} can proceed with database operations") logger.debug(f"📋 Camera {camera_id}: SessionId {session_id} now available for future database operations") logger.debug(f"Updated session state for {len(affected_cameras)} cameras: {affected_cameras}") else: logger.warning(f"🚨 Invalid setSessionId message: missing displayIdentifier in payload") elif msg_type == "patchSession": session_id = data.get("sessionId") patch_data = data.get("data", {}) # For now, just acknowledge the patch - actual implementation depends on backend requirements response = { "type": "patchSessionResult", "payload": { "sessionId": session_id, "success": True, "message": "Session patch acknowledged" } } ws_logger.info(f"TX -> {json.dumps(response, separators=(',', ':'))}") await websocket.send_json(response) logger.info(f"Acknowledged patch for session {session_id}") else: logger.error(f"Unknown message type: {msg_type}") except json.JSONDecodeError: logger.error("Received invalid JSON message") except (WebSocketDisconnect, ConnectionClosedError) as e: logger.warning(f"WebSocket disconnected: {e}") break except Exception as e: logger.error(f"Error handling message: {e}") break try: await websocket.accept() stream_task = asyncio.create_task(process_streams()) heartbeat_task = asyncio.create_task(send_heartbeat()) message_task = asyncio.create_task(on_message()) await asyncio.gather(heartbeat_task, message_task) except Exception as e: logger.error(f"Error in detect websocket: {e}") finally: stream_task.cancel() await stream_task with streams_lock: # Clean up shared camera streams for camera_url, shared_stream in camera_streams.items(): shared_stream["stop_event"].set() shared_stream["thread"].join() if "cap" in shared_stream: shared_stream["cap"].release() while not shared_stream["buffer"].empty(): try: shared_stream["buffer"].get_nowait() except queue.Empty: pass logger.info(f"Released shared camera stream for {camera_url}") streams.clear() camera_streams.clear() subscription_to_camera.clear() with models_lock: models.clear() latest_frames.clear() cached_detections.clear() frame_skip_flags.clear() camera_states.clear() cached_full_pipeline_results.clear() session_pipeline_states.clear() session_ids.clear() logger.info("WebSocket connection closed")