from typing import Any, Dict import os import json import time import queue import torch import cv2 import numpy as np import base64 import logging import threading import requests import asyncio import psutil import zipfile import ssl import urllib3 import subprocess import tempfile import redis from urllib.parse import urlparse from requests.adapters import HTTPAdapter from urllib3.util.ssl_ import create_urllib3_context from fastapi import FastAPI, WebSocket, HTTPException from fastapi.websockets import WebSocketDisconnect from fastapi.responses import Response from websockets.exceptions import ConnectionClosedError from ultralytics import YOLO # Import shared pipeline functions from siwatsystem.pympta import load_pipeline_from_zip, run_pipeline, cleanup_camera_stability app = FastAPI() # Global dictionaries to keep track of models and streams # "models" now holds a nested dict: { camera_id: { modelId: model_tree } } models: Dict[str, Dict[str, Any]] = {} streams: Dict[str, Dict[str, Any]] = {} # Store session IDs per display session_ids: Dict[str, int] = {} # Track shared camera streams by camera URL camera_streams: Dict[str, Dict[str, Any]] = {} # Map subscriptions to their camera URL subscription_to_camera: Dict[str, str] = {} # Store latest frames for REST API access (separate from processing buffer) latest_frames: Dict[str, Any] = {} # Store cached detection dict after successful pipeline completion cached_detections: Dict[str, Dict[str, Any]] = {} # Enhanced caching system for LPR integration session_detections: Dict[str, Dict[str, Any]] = {} # session_id -> detection data session_to_camera: Dict[str, str] = {} # session_id -> camera_id detection_timestamps: Dict[str, float] = {} # session_id -> timestamp (for cleanup) # Track frame skipping for pipeline buffer after detection frame_skip_flags: Dict[str, bool] = {} # Track camera connection states for immediate error handling camera_states: Dict[str, Dict[str, Any]] = {} # Track session ID states and pipeline modes per camera session_pipeline_states: Dict[str, Dict[str, Any]] = {} # Store full pipeline results for caching cached_full_pipeline_results: Dict[str, Dict[str, Any]] = {} with open("config.json", "r") as f: config = json.load(f) poll_interval = config.get("poll_interval_ms", 100) reconnect_interval = config.get("reconnect_interval_sec", 5) TARGET_FPS = config.get("target_fps", 10) poll_interval = 1000 / TARGET_FPS logging.info(f"Poll interval: {poll_interval}ms") max_streams = config.get("max_streams", 5) max_retries = config.get("max_retries", 3) # Configure logging logging.basicConfig( level=logging.INFO, # Set to INFO level for less verbose output format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", handlers=[ logging.FileHandler("detector_worker.log"), # Write logs to a file logging.StreamHandler() # Also output to console ] ) # Create a logger specifically for this application logger = logging.getLogger("detector_worker") logger.setLevel(logging.DEBUG) # Set app-specific logger to DEBUG level # Create WebSocket communication logger ws_logger = logging.getLogger("websocket_comm") ws_logger.setLevel(logging.INFO) ws_handler = logging.FileHandler("websocket_comm.log", encoding='utf-8') ws_formatter = logging.Formatter("%(asctime)s [%(levelname)s] %(message)s") ws_handler.setFormatter(ws_formatter) ws_logger.addHandler(ws_handler) ws_logger.propagate = False # Don't propagate to root logger # Ensure all other libraries (including root) use at least INFO level logging.getLogger().setLevel(logging.INFO) logger.info("Starting detector worker application") logger.info(f"Configuration: Target FPS: {TARGET_FPS}, Max streams: {max_streams}, Max retries: {max_retries}") ws_logger.info("WebSocket communication logging started - TX/RX format") logger.info("WebSocket communication will be logged to websocket_comm.log") # Ensure the models directory exists os.makedirs("models", exist_ok=True) logger.info("Ensured models directory exists") # Constants for heartbeat and timeouts HEARTBEAT_INTERVAL = 2 # seconds # Global Redis connection for LPR integration redis_client_global = None lpr_listener_thread = None cleanup_timer_thread = None lpr_integration_started = False # Redis connection helper functions def get_redis_config_from_model(camera_id: str) -> Dict[str, Any]: """Extract Redis configuration from loaded model pipeline""" try: for model_id, model_tree in models.get(camera_id, {}).items(): if hasattr(model_tree, 'get') and 'redis_client' in model_tree: # Extract config from existing Redis client client = model_tree['redis_client'] if client: return { 'host': client.connection_pool.connection_kwargs['host'], 'port': client.connection_pool.connection_kwargs['port'], 'password': client.connection_pool.connection_kwargs.get('password'), 'db': client.connection_pool.connection_kwargs.get('db', 0) } except Exception as e: logger.debug(f"Could not extract Redis config from model: {e}") # Fallback - try to read from pipeline.json directly try: pipeline_dirs = [] models_dir = "models" if os.path.exists(models_dir): for root, dirs, files in os.walk(models_dir): if "pipeline.json" in files: with open(os.path.join(root, "pipeline.json"), 'r') as f: config = json.load(f) if 'redis' in config: return config['redis'] except Exception as e: logger.debug(f"Could not read Redis config from pipeline.json: {e}") return None def create_redis_connection() -> redis.Redis: """Create Redis connection using config from pipeline""" global redis_client_global if redis_client_global is not None: try: redis_client_global.ping() return redis_client_global except: redis_client_global = None # Find any camera with a loaded model to get Redis config redis_config = None for camera_id in models.keys(): redis_config = get_redis_config_from_model(camera_id) if redis_config: break if not redis_config: logger.error("No Redis configuration found in any loaded models") return None try: redis_client_global = redis.Redis( host=redis_config['host'], port=redis_config['port'], password=redis_config.get('password'), db=redis_config.get('db', 0), decode_responses=True, socket_connect_timeout=5, socket_timeout=5 ) redis_client_global.ping() logger.info(f"โœ… Connected to Redis for LPR at {redis_config['host']}:{redis_config['port']}") return redis_client_global except Exception as e: logger.error(f"โŒ Failed to connect to Redis for LPR: {e}") redis_client_global = None return None # LPR Integration Functions def process_license_result(lpr_data: Dict[str, Any]): """Process incoming LPR result and update backend""" try: # Enhanced debugging for LPR data reception logger.info("=" * 60) logger.info("๐Ÿš— LPR SERVICE DATA RECEIVED") logger.info("=" * 60) logger.info(f"๐Ÿ“ฅ Raw LPR data: {json.dumps(lpr_data, indent=2)}") session_id = str(lpr_data.get('session_id', '')) license_text = lpr_data.get('license_character', '') logger.info(f"๐Ÿ” Extracted session_id: '{session_id}'") logger.info(f"๐Ÿ” Extracted license_character: '{license_text}'") logger.info(f"๐Ÿ“Š Current cached sessions count: {len(session_detections)}") logger.info(f"๐Ÿ“Š Available session IDs: {list(session_detections.keys())}") # Find cached detection by session_id if session_id not in session_detections: logger.warning("โŒ LPR SESSION ID NOT FOUND!") logger.warning(f" Looking for session_id: '{session_id}'") logger.warning(f" Available sessions: {list(session_detections.keys())}") logger.warning(f" Session count: {len(session_detections)}") # Additional debugging - show session timestamps if session_detections: logger.warning("๐Ÿ“… Available session details:") for sid, timestamp in detection_timestamps.items(): age = time.time() - timestamp camera = session_to_camera.get(sid, 'unknown') logger.warning(f" Session {sid}: camera={camera}, age={age:.1f}s") else: logger.warning(" No cached sessions available - worker may not have processed any detections yet") logger.warning("๐Ÿ’ก Possible causes:") logger.warning(" 1. Session expired (TTL: 10 minutes)") logger.warning(" 2. Session ID mismatch between detection and LPR service") logger.warning(" 3. Detection was not cached (no sessionId from backend)") logger.warning(" 4. Worker restarted after detection but before LPR result") return # Get the original detection data detection_data = session_detections[session_id].copy() camera_id = session_to_camera.get(session_id, 'unknown') logger.info("โœ… LPR SESSION FOUND!") logger.info(f" ๐Ÿ“น Camera ID: {camera_id}") logger.info(f" โฐ Session age: {time.time() - detection_timestamps.get(session_id, 0):.1f} seconds") # Show original detection structure before update original_license = detection_data.get('data', {}).get('detection', {}).get('licensePlateText') logger.info(f" ๐Ÿ” Original licensePlateText: {original_license}") logger.info(f" ๐Ÿ†• New licensePlateText: '{license_text}'") # Update licensePlateText in detection if 'data' in detection_data and 'detection' in detection_data['data']: detection_data['data']['detection']['licensePlateText'] = license_text logger.info("๐ŸŽฏ LICENSE PLATE UPDATE SUCCESS!") logger.info(f" โœ… Updated detection for session {session_id}") logger.info(f" โœ… Set licensePlateText = '{license_text}'") # Show full detection structure after update detection_dict = detection_data['data']['detection'] logger.info("๐Ÿ“‹ Updated detection dictionary:") logger.info(f" carModel: {detection_dict.get('carModel')}") logger.info(f" carBrand: {detection_dict.get('carBrand')}") logger.info(f" bodyType: {detection_dict.get('bodyType')}") logger.info(f" licensePlateText: {detection_dict.get('licensePlateText')} โ† UPDATED") logger.info(f" licensePlateConfidence: {detection_dict.get('licensePlateConfidence')}") else: logger.error("โŒ INVALID DETECTION DATA STRUCTURE!") logger.error(f" Session {session_id} has malformed detection data") logger.error(f" Detection data keys: {list(detection_data.keys())}") if 'data' in detection_data: logger.error(f" Data keys: {list(detection_data['data'].keys())}") return # Update timestamp to indicate this is an LPR update detection_data['timestamp'] = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) # Update all caches with new data session_detections[session_id] = detection_data.copy() cached_detections[camera_id] = detection_data.copy() # CRITICAL: Also update the pipeline state cached detection dict (used by lightweight mode) if camera_id in session_pipeline_states: pipeline_state = session_pipeline_states[camera_id] current_cached_dict = pipeline_state.get("cached_detection_dict", {}) # Update the pipeline cached detection dict with new license plate updated_dict = current_cached_dict.copy() if current_cached_dict else {} updated_dict['licensePlateText'] = license_text pipeline_state["cached_detection_dict"] = updated_dict logger.info(f"โœ… LPR: Updated pipeline state cached_detection_dict for camera {camera_id}") logger.debug(f"๐Ÿ” Pipeline cached dict now: {updated_dict}") else: logger.warning(f"โš ๏ธ Camera {camera_id} not found in session_pipeline_states - pipeline cache not updated") logger.info("๐Ÿ“ก SENDING UPDATED DETECTION TO BACKEND") logger.info(f" ๐Ÿ“น Camera ID: {camera_id}") logger.info(f" ๐Ÿ“จ Updated licensePlateText: '{license_text}'") logger.info(" ๐Ÿ”„ Updated both cache systems:") logger.info(f" 1๏ธโƒฃ cached_detections[{camera_id}] โœ…") logger.info(f" 2๏ธโƒฃ session_pipeline_states[{camera_id}].cached_detection_dict โœ…") # Log the full message being sent logger.info("๐Ÿ“‹ Updated detection data in cache:") logger.info(json.dumps(detection_data, indent=2)) logger.info("โœ… ALL CACHES UPDATED!") logger.info(f" ๐ŸŽฏ Lightweight mode will now use updated licensePlateText") logger.info(f" ๐Ÿ“ค Backend will receive: licensePlateText = '{license_text}'") logger.info(" ๐Ÿ”„ Both cache systems synchronized with LPR data") logger.info("=" * 60) logger.info("๐Ÿ LPR PROCESSING COMPLETE") logger.info(f" Session: {session_id}") logger.info(f" License: '{license_text}'") logger.info(f" Status: โœ… SUCCESS - DETECTION CACHE UPDATED") logger.info("=" * 60) except Exception as e: logger.error("=" * 60) logger.error("โŒ LPR PROCESSING FAILED") logger.error("=" * 60) logger.error(f"Error: {e}") import traceback logger.error(f"Traceback: {traceback.format_exc()}") logger.error("=" * 60) # LPR integration now uses cached detection mechanism instead of direct WebSocket sending def license_results_listener(): """Background thread to listen for LPR results from Redis""" logger.info("๐ŸŽง Starting LPR listener thread...") while True: try: redis_client = create_redis_connection() if not redis_client: logger.error("โŒ No Redis connection available for LPR listener") time.sleep(10) continue pubsub = redis_client.pubsub() pubsub.subscribe("license_results") logger.info("โœ… LPR listener subscribed to 'license_results' channel") for message in pubsub.listen(): try: if message['type'] == 'message': logger.info("๐Ÿ”” REDIS MESSAGE RECEIVED!") logger.info(f" ๐Ÿ“ก Channel: {message['channel']}") logger.info(f" ๐Ÿ“ฅ Raw data: {message['data']}") logger.info(f" ๐Ÿ“ Data size: {len(str(message['data']))} bytes") try: lpr_data = json.loads(message['data']) logger.info("โœ… JSON parsing successful") logger.info("๐Ÿ Starting LPR processing...") process_license_result(lpr_data) logger.info("โœ… LPR processing completed") except json.JSONDecodeError as e: logger.error("โŒ JSON PARSING FAILED!") logger.error(f" Error: {e}") logger.error(f" Raw data: {message['data']}") logger.error(f" Data type: {type(message['data'])}") except Exception as e: logger.error("โŒ LPR PROCESSING ERROR!") logger.error(f" Error: {e}") import traceback logger.error(f" Traceback: {traceback.format_exc()}") elif message['type'] == 'subscribe': logger.info(f"๐Ÿ“ก LPR listener subscribed to channel: {message['channel']}") logger.info("๐ŸŽง Ready to receive license plate results...") elif message['type'] == 'unsubscribe': logger.warning(f"๐Ÿ“ก LPR listener unsubscribed from channel: {message['channel']}") else: logger.debug(f"๐Ÿ“ก Redis message type: {message['type']}") except Exception as e: logger.error(f"โŒ Error in LPR message processing loop: {e}") break except redis.exceptions.ConnectionError as e: logger.error(f"โŒ Redis connection lost in LPR listener: {e}") time.sleep(5) # Wait before reconnecting except Exception as e: logger.error(f"โŒ Unexpected error in LPR listener: {e}") time.sleep(10) logger.warning("๐Ÿ›‘ LPR listener thread stopped") def cleanup_expired_sessions(): """Remove sessions older than TTL (10 minutes)""" try: current_time = time.time() ttl_seconds = 600 # 10 minutes expired_sessions = [ session_id for session_id, timestamp in detection_timestamps.items() if current_time - timestamp > ttl_seconds ] if expired_sessions: logger.info(f"๐Ÿงน Cleaning up {len(expired_sessions)} expired sessions") for session_id in expired_sessions: session_detections.pop(session_id, None) camera_id = session_to_camera.pop(session_id, None) detection_timestamps.pop(session_id, None) logger.debug(f"Cleaned up expired session: {session_id} (camera: {camera_id})") else: logger.debug(f"๐Ÿงน No expired sessions to clean up ({len(detection_timestamps)} active)") except Exception as e: logger.error(f"โŒ Error in session cleanup: {e}") def cleanup_timer(): """Background thread for periodic session cleanup""" logger.info("โฐ Starting session cleanup timer thread...") while True: try: time.sleep(120) # Run cleanup every 2 minutes cleanup_expired_sessions() except Exception as e: logger.error(f"โŒ Error in cleanup timer: {e}") time.sleep(120) def start_lpr_integration(): """Start LPR integration threads""" global lpr_listener_thread, cleanup_timer_thread # Start LPR listener thread lpr_listener_thread = threading.Thread(target=license_results_listener, daemon=True, name="LPR-Listener") lpr_listener_thread.start() logger.info("โœ… LPR listener thread started") # Start cleanup timer thread cleanup_timer_thread = threading.Thread(target=cleanup_timer, daemon=True, name="Session-Cleanup") cleanup_timer_thread.start() logger.info("โœ… Session cleanup timer thread started") WORKER_TIMEOUT_MS = 10000 logger.debug(f"Heartbeat interval set to {HEARTBEAT_INTERVAL} seconds") # Locks for thread-safe operations streams_lock = threading.Lock() models_lock = threading.Lock() logger.debug("Initialized thread locks") # Add helper to download mpta ZIP file from a remote URL def download_mpta(url: str, dest_path: str) -> str: try: logger.info(f"Starting download of model from {url} to {dest_path}") os.makedirs(os.path.dirname(dest_path), exist_ok=True) response = requests.get(url, stream=True) if response.status_code == 200: file_size = int(response.headers.get('content-length', 0)) logger.info(f"Model file size: {file_size/1024/1024:.2f} MB") downloaded = 0 with open(dest_path, "wb") as f: for chunk in response.iter_content(chunk_size=8192): f.write(chunk) downloaded += len(chunk) if file_size > 0 and downloaded % (file_size // 10) < 8192: # Log approximately every 10% logger.debug(f"Download progress: {downloaded/file_size*100:.1f}%") logger.info(f"Successfully downloaded mpta file from {url} to {dest_path}") return dest_path else: logger.error(f"Failed to download mpta file (status code {response.status_code}): {response.text}") return None except Exception as e: logger.error(f"Exception downloading mpta file from {url}: {str(e)}", exc_info=True) return None # Add helper to fetch snapshot image from HTTP/HTTPS URL def fetch_snapshot(url: str): try: from requests.auth import HTTPBasicAuth, HTTPDigestAuth import requests.adapters import urllib3 # Parse URL to extract credentials parsed = urlparse(url) # Prepare headers - some cameras require User-Agent and specific headers headers = { 'User-Agent': 'Mozilla/5.0 (compatible; DetectorWorker/1.0)', 'Accept': 'image/jpeg,image/*,*/*', 'Connection': 'close', 'Cache-Control': 'no-cache' } # Create a session with custom adapter for better connection handling session = requests.Session() adapter = requests.adapters.HTTPAdapter( pool_connections=1, pool_maxsize=1, max_retries=urllib3.util.retry.Retry( total=2, backoff_factor=0.1, status_forcelist=[500, 502, 503, 504] ) ) session.mount('http://', adapter) session.mount('https://', adapter) # Reconstruct URL without credentials clean_url = f"{parsed.scheme}://{parsed.hostname}" if parsed.port: clean_url += f":{parsed.port}" clean_url += parsed.path if parsed.query: clean_url += f"?{parsed.query}" auth = None response = None if parsed.username and parsed.password: # Try HTTP Digest authentication first (common for IP cameras) try: auth = HTTPDigestAuth(parsed.username, parsed.password) response = session.get(clean_url, auth=auth, headers=headers, timeout=(5, 15), stream=True) if response.status_code == 200: logger.debug(f"Successfully authenticated using HTTP Digest for {clean_url}") elif response.status_code == 401: # If Digest fails, try Basic auth logger.debug(f"HTTP Digest failed, trying Basic auth for {clean_url}") auth = HTTPBasicAuth(parsed.username, parsed.password) response = session.get(clean_url, auth=auth, headers=headers, timeout=(5, 15), stream=True) if response.status_code == 200: logger.debug(f"Successfully authenticated using HTTP Basic for {clean_url}") except Exception as auth_error: logger.debug(f"Authentication setup error: {auth_error}") # Fallback to original URL with embedded credentials response = session.get(url, headers=headers, timeout=(5, 15), stream=True) else: # No credentials in URL, make request as-is response = session.get(url, headers=headers, timeout=(5, 15), stream=True) if response and response.status_code == 200: # Read content with size limit to prevent memory issues content = b'' max_size = 10 * 1024 * 1024 # 10MB limit for chunk in response.iter_content(chunk_size=8192): content += chunk if len(content) > max_size: logger.error(f"Snapshot too large (>{max_size} bytes) from {clean_url}") return None # Convert response content to numpy array nparr = np.frombuffer(content, np.uint8) # Decode image frame = cv2.imdecode(nparr, cv2.IMREAD_COLOR) if frame is not None: logger.debug(f"Successfully fetched snapshot from {clean_url}, shape: {frame.shape}, size: {len(content)} bytes") return frame else: logger.error(f"Failed to decode image from snapshot URL: {clean_url} (content size: {len(content)} bytes)") return None elif response: logger.error(f"Failed to fetch snapshot (status code {response.status_code}): {clean_url}") # Log response headers and first part of content for debugging logger.debug(f"Response headers: {dict(response.headers)}") if len(response.content) < 1000: logger.debug(f"Response content: {response.content[:500]}") return None else: logger.error(f"No response received from snapshot URL: {clean_url}") return None except requests.exceptions.Timeout as e: logger.error(f"Timeout fetching snapshot from {url}: {str(e)}") return None except requests.exceptions.ConnectionError as e: logger.error(f"Connection error fetching snapshot from {url}: {str(e)}") return None except Exception as e: logger.error(f"Exception fetching snapshot from {url}: {str(e)}", exc_info=True) return None # Helper to get crop coordinates from stream def get_crop_coords(stream): return { "cropX1": stream.get("cropX1"), "cropY1": stream.get("cropY1"), "cropX2": stream.get("cropX2"), "cropY2": stream.get("cropY2") } # Camera state management functions def set_camera_connected(camera_id, connected=True, error_msg=None): """Set camera connection state and track error information""" current_time = time.time() if camera_id not in camera_states: camera_states[camera_id] = { "connected": True, "last_error": None, "last_error_time": None, "consecutive_failures": 0, "disconnection_notified": False } state = camera_states[camera_id] was_connected = state["connected"] if connected: state["connected"] = True state["consecutive_failures"] = 0 state["disconnection_notified"] = False if not was_connected: logger.info(f"๐Ÿ“ถ CAMERA RECONNECTED: {camera_id}") else: state["connected"] = False state["last_error"] = error_msg state["last_error_time"] = current_time state["consecutive_failures"] += 1 # Distinguish between temporary and permanent disconnection is_permanent = state["consecutive_failures"] >= 3 if was_connected and is_permanent: logger.error(f"๐Ÿ“ต CAMERA DISCONNECTED: {camera_id} - {error_msg} (consecutive failures: {state['consecutive_failures']})") logger.info(f"๐Ÿšจ CAMERA ERROR DETECTED - Will send detection: null to reset backend session for {camera_id}") def is_camera_connected(camera_id): """Check if camera is currently connected""" return camera_states.get(camera_id, {}).get("connected", True) def should_notify_disconnection(camera_id): """Check if we should notify backend about disconnection""" state = camera_states.get(camera_id, {}) is_disconnected = not state.get("connected", True) not_yet_notified = not state.get("disconnection_notified", False) has_enough_failures = state.get("consecutive_failures", 0) >= 3 return is_disconnected and not_yet_notified and has_enough_failures def mark_disconnection_notified(camera_id): """Mark that we've notified backend about this disconnection""" if camera_id in camera_states: camera_states[camera_id]["disconnection_notified"] = True logger.debug(f"Marked disconnection notification sent for camera {camera_id}") def get_or_init_session_pipeline_state(camera_id): """Get or initialize session pipeline state for a camera""" if camera_id not in session_pipeline_states: session_pipeline_states[camera_id] = { "mode": "validation_detecting", # "validation_detecting", "send_detections", "waiting_for_session_id", "full_pipeline", "lightweight", "car_gone_waiting" "session_id_received": False, "full_pipeline_completed": False, "absence_counter": 0, "validation_counter": 0, # Counter for validation phase "validation_threshold": 4, # Default validation threshold "max_absence_frames": 3, "yolo_inference_enabled": True, # Controls whether to run YOLO inference "cached_detection_dict": None, # Cached detection dict for lightweight mode "stable_track_id": None, # The stable track ID we're monitoring "validated_detection": None # Stored detection result from validation phase for full_pipeline reuse } return session_pipeline_states[camera_id] def update_session_pipeline_mode(camera_id, new_mode, session_id=None): """Update session pipeline mode and related state""" state = get_or_init_session_pipeline_state(camera_id) old_mode = state["mode"] state["mode"] = new_mode # Reset counters based on mode transition if new_mode == "validation_detecting": # Transitioning to validation mode - reset both counters for fresh start old_validation_counter = state.get("validation_counter", 0) old_absence_counter = state.get("absence_counter", 0) state["validation_counter"] = 0 state["absence_counter"] = 0 if old_validation_counter > 0 or old_absence_counter > 0: logger.info(f"๐Ÿงน Camera {camera_id}: VALIDATION MODE RESET - validation_counter: {old_validation_counter}โ†’0, absence_counter: {old_absence_counter}โ†’0") if session_id: state["session_id_received"] = True state["absence_counter"] = 0 # Reset absence counter when session starts logger.info(f"๐Ÿ“Š Camera {camera_id}: Pipeline mode changed from '{old_mode}' to '{new_mode}'") return state #################################################### # REST API endpoint for image retrieval #################################################### @app.get("/lpr/debug") async def get_lpr_debug_info(): """Debug endpoint to inspect LPR integration state""" try: return { "status": "success", "lpr_integration_started": lpr_integration_started, "redis_connected": redis_client_global is not None and redis_client_global.ping() if redis_client_global else False, "active_sessions": len(session_detections), "session_details": { session_id: { "camera_id": session_to_camera.get(session_id, "unknown"), "timestamp": detection_timestamps.get(session_id, 0), "age_seconds": time.time() - detection_timestamps.get(session_id, time.time()), "has_license": session_detections[session_id].get('data', {}).get('detection', {}).get('licensePlateText') is not None } for session_id in session_detections.keys() }, "thread_status": { "lpr_listener_alive": lpr_listener_thread.is_alive() if lpr_listener_thread else False, "cleanup_timer_alive": cleanup_timer_thread.is_alive() if cleanup_timer_thread else False }, "cached_detections_by_camera": list(cached_detections.keys()) } except Exception as e: return { "status": "error", "error": str(e), "lpr_integration_started": lpr_integration_started } @app.get("/camera/{camera_id}/image") async def get_camera_image(camera_id: str): """ Get the current frame from a camera as JPEG image """ try: # URL decode the camera_id to handle encoded characters like %3B for semicolon from urllib.parse import unquote original_camera_id = camera_id camera_id = unquote(camera_id) logger.debug(f"REST API request: original='{original_camera_id}', decoded='{camera_id}'") with streams_lock: if camera_id not in streams: logger.warning(f"Camera ID '{camera_id}' not found in streams. Current streams: {list(streams.keys())}") raise HTTPException(status_code=404, detail=f"Camera {camera_id} not found or not active") # Check if we have a cached frame for this camera if camera_id not in latest_frames: logger.warning(f"No cached frame available for camera '{camera_id}'.") raise HTTPException(status_code=404, detail=f"No frame available for camera {camera_id}") frame = latest_frames[camera_id] logger.debug(f"Retrieved cached frame for camera '{camera_id}', frame shape: {frame.shape}") # Encode frame as JPEG success, buffer_img = cv2.imencode('.jpg', frame, [cv2.IMWRITE_JPEG_QUALITY, 85]) if not success: raise HTTPException(status_code=500, detail="Failed to encode image as JPEG") # Return image as binary response return Response(content=buffer_img.tobytes(), media_type="image/jpeg") except HTTPException: raise except Exception as e: logger.error(f"Error retrieving image for camera {camera_id}: {str(e)}", exc_info=True) raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") #################################################### # Detection and frame processing functions #################################################### @app.websocket("/") async def detect(websocket: WebSocket): logger.info("WebSocket connection accepted") persistent_data_dict = {} async def handle_detection(camera_id, stream, frame, websocket, model_tree, persistent_data): try: # Check camera connection state first - handle disconnection immediately if should_notify_disconnection(camera_id): logger.error(f"๐Ÿšจ CAMERA DISCONNECTION DETECTED: {camera_id} - sending immediate detection: null") # Clear cached detections and occupancy state cached_detections.pop(camera_id, None) frame_skip_flags.pop(camera_id, None) cached_full_pipeline_results.pop(camera_id, None) # Clear cached pipeline results session_pipeline_states.pop(camera_id, None) # Reset session pipeline state # Reset pipeline state immediately from siwatsystem.pympta import reset_tracking_state model_id = stream.get("modelId", "unknown") reset_tracking_state(camera_id, model_id, "camera disconnected") # Send immediate detection: null to backend detection_data = { "type": "imageDetection", "subscriptionIdentifier": stream["subscriptionIdentifier"], "timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), "data": { "detection": None, # null detection for disconnection "modelId": stream["modelId"], "modelName": stream["modelName"] } } try: ws_logger.info(f"TX -> {json.dumps(detection_data, separators=(',', ':'))}") await websocket.send_json(detection_data) except RuntimeError as e: if "websocket.close" in str(e): logger.warning(f"WebSocket connection closed - cannot send disconnection signal for camera {camera_id}") return persistent_data else: raise mark_disconnection_notified(camera_id) logger.info(f"๐Ÿ“ก SENT DISCONNECTION SIGNAL - detection: null for camera {camera_id}, backend should clear session") return persistent_data # Apply crop if specified cropped_frame = frame if all(coord is not None for coord in [stream.get("cropX1"), stream.get("cropY1"), stream.get("cropX2"), stream.get("cropY2")]): cropX1, cropY1, cropX2, cropY2 = stream["cropX1"], stream["cropY1"], stream["cropX2"], stream["cropY2"] cropped_frame = frame[cropY1:cropY2, cropX1:cropX2] logger.debug(f"Applied crop coordinates ({cropX1}, {cropY1}, {cropX2}, {cropY2}) to frame for camera {camera_id}") logger.debug(f"Processing frame for camera {camera_id} with model {stream['modelId']}") start_time = time.time() # Extract display identifier for pipeline context subscription_parts = stream["subscriptionIdentifier"].split(';') display_identifier = subscription_parts[0] if subscription_parts else None # Get backend session ID if available backend_session_id = session_ids.get(display_identifier) # Get or initialize session pipeline state pipeline_state = get_or_init_session_pipeline_state(camera_id) current_mode = pipeline_state["mode"] logger.debug(f"๐Ÿ” SESSIONID LOOKUP: display='{display_identifier}', session_id={repr(backend_session_id)}, mode='{current_mode}'") logger.debug(f"๐Ÿ” Available session_ids: {session_ids}") logger.debug(f"๐Ÿ” VALIDATED_DETECTION TRACE: {pipeline_state.get('validated_detection')}") # โ•โ•โ• SESSION ID-BASED PROCESSING MODE โ•โ•โ• if not backend_session_id: # No session ID - keep current mode if it's validation_detecting or send_detections if current_mode not in ["validation_detecting", "send_detections", "waiting_for_session_id"]: update_session_pipeline_mode(camera_id, "validation_detecting") current_mode = "validation_detecting" logger.debug(f"๐Ÿ” Camera {camera_id}: No session ID - in {current_mode} mode") else: # Session ID available - switch to full pipeline mode if current_mode in ["send_detections", "waiting_for_session_id"]: # Session ID just arrived - switch to full pipeline mode update_session_pipeline_mode(camera_id, "full_pipeline", backend_session_id) current_mode = "full_pipeline" logger.info(f"๐Ÿ”ฅ Camera {camera_id}: Session ID received ({backend_session_id}) - switching to FULL PIPELINE mode") # Create context for pipeline execution pipeline_context = { "camera_id": camera_id, "display_id": display_identifier, "backend_session_id": backend_session_id, "current_mode": current_mode # Pass current mode to pipeline } start_time = time.time() detection_result = None if current_mode == "validation_detecting": # โ•โ•โ• TRACK VALIDATION MODE โ•โ•โ• # Run tracking-based validation with track ID stability logger.debug(f"๐Ÿ” Camera {camera_id}: In validation_detecting mode - running track-based validation") # Get tracking configuration from model_tree tracking_config = model_tree.get("tracking", {}) tracking_enabled = tracking_config.get("enabled", True) stability_threshold = tracking_config.get("stabilityThreshold", 4) # Default to "none" - only proceed after track validation detection_result = {"class": "none", "confidence": 1.0, "bbox": [0, 0, 0, 0]} if tracking_enabled: # Run full tracking detection to get track IDs from siwatsystem.pympta import run_detection_with_tracking all_detections, regions_dict, track_validation_result = run_detection_with_tracking(cropped_frame, model_tree, pipeline_context) if track_validation_result.get("validation_complete", False): # Track validation completed - we have stable track IDs stable_tracks = track_validation_result.get("stable_tracks", []) logger.info(f"๐ŸŽฏ Camera {camera_id}: TRACK VALIDATION COMPLETED - stable tracks: {stable_tracks}") # Switch to send_detections mode update_session_pipeline_mode(camera_id, "send_detections") # Send the best detection with stable track if all_detections: # Find detection with stable track ID stable_detection = None for detection in all_detections: if detection.get("id") in stable_tracks: stable_detection = detection break if stable_detection: detection_result = { "class": stable_detection.get("class", "car"), "confidence": stable_detection.get("confidence", 0.0), "bbox": stable_detection.get("bbox", [0, 0, 0, 0]), "track_id": stable_detection.get("id") } # Store validated detection for full_pipeline mode to reuse pipeline_state["validated_detection"] = detection_result.copy() logger.debug(f"๐Ÿ” Camera {camera_id}: VALIDATION DEBUG - storing detection_result = {detection_result}") logger.debug(f"๐Ÿ” Camera {camera_id}: VALIDATION DEBUG - pipeline_state after storing = {pipeline_state.get('validated_detection')}") logger.info(f"๐Ÿš— Camera {camera_id}: SENDING STABLE DETECTION - track ID {detection_result['track_id']}") logger.info(f"๐Ÿ’พ Camera {camera_id}: STORED VALIDATED DETECTION for full_pipeline reuse") else: logger.warning(f"โš ๏ธ Camera {camera_id}: Stable tracks found but no matching detection") else: # Track validation still in progress stable_tracks = track_validation_result.get("stable_tracks", []) current_tracks = track_validation_result.get("current_tracks", []) if current_tracks: track_id = current_tracks[0] if current_tracks else "None" stable_status = "STABLE" if stable_tracks else "validating" logger.info(f"๐Ÿ” Camera {camera_id}: TRACK VALIDATION - car track_id {track_id} ({stable_status}, need {stability_threshold} consecutive frames)") else: logger.debug(f"๐Ÿ‘ป Camera {camera_id}: No car detected") logger.debug(f"๐Ÿ“ค Camera {camera_id}: Sending 'none' (track validation in progress)") else: # Tracking disabled - fall back to basic detection validation logger.debug(f"๐Ÿ” Camera {camera_id}: Tracking disabled - using basic detection validation") from siwatsystem.pympta import run_lightweight_detection basic_detection = run_lightweight_detection(cropped_frame, model_tree) if basic_detection and basic_detection.get("car_detected"): best_detection = basic_detection.get("best_detection") # Increment validation counter for basic detection pipeline_state["validation_counter"] += 1 current_count = pipeline_state["validation_counter"] threshold = pipeline_state["validation_threshold"] if current_count >= threshold: update_session_pipeline_mode(camera_id, "send_detections") detection_result = { "class": best_detection.get("class", "car"), "confidence": best_detection.get("confidence", 0.0), "bbox": best_detection.get("bbox", [0, 0, 0, 0]) } # Store validated detection for full_pipeline mode to reuse pipeline_state["validated_detection"] = detection_result.copy() logger.debug(f"๐Ÿ” Camera {camera_id}: BASIC VALIDATION DEBUG - storing detection_result = {detection_result}") logger.info(f"๐Ÿ’พ Camera {camera_id}: STORED BASIC VALIDATED DETECTION for full_pipeline reuse") logger.info(f"๐ŸŽฏ Camera {camera_id}: BASIC VALIDATION COMPLETED after {current_count} frames") else: logger.info(f"๐Ÿ“Š Camera {camera_id}: Basic validation progress {current_count}/{threshold}") else: # Reset validation counter if pipeline_state["validation_counter"] > 0: pipeline_state["validation_counter"] = 0 logger.info(f"๐Ÿ”„ Camera {camera_id}: Reset validation counter (no detection)") elif current_mode == "send_detections": # โ•โ•โ• SEND DETECTIONS MODE โ•โ•โ• # Validation completed - now send detection_dict for car detections, detection: null for no car logger.debug(f"๐Ÿ“ค Camera {camera_id}: In send_detections mode - sending detection_dict for cars") from siwatsystem.pympta import run_lightweight_detection basic_detection = run_lightweight_detection(cropped_frame, model_tree) if basic_detection and basic_detection.get("car_detected"): # Car detected - send detection_dict best_detection = basic_detection.get("best_detection") detection_result = { "class": best_detection.get("class", "car"), "confidence": best_detection.get("confidence", 0.0), "bbox": best_detection.get("bbox", [0, 0, 0, 0]) } logger.info(f"๐Ÿš— Camera {camera_id}: SENDING DETECTION_DICT - {detection_result['class']} (conf={detection_result['confidence']:.3f}) - backend should generate session ID") else: # No car detected - send "none" detection_result = {"class": "none", "confidence": 1.0, "bbox": [0, 0, 0, 0]} logger.debug(f"๐Ÿ‘ป Camera {camera_id}: No car detected - sending 'none'") elif current_mode == "waiting_for_session_id": # โ•โ•โ• WAITING FOR SESSION ID MODE โ•โ•โ• # Stop processing snapshots, wait for session ID logger.debug(f"โณ Camera {camera_id}: In waiting_for_session_id mode - not processing snapshots") return persistent_data # Don't process or send anything elif current_mode == "full_pipeline": # โ•โ•โ• FULL PIPELINE MODE โ•โ•โ• logger.info(f"๐Ÿ”ฅ Camera {camera_id}: Running FULL PIPELINE (classification branches + Redis + PostgreSQL)") # Use validated detection from validation phase instead of detecting again validated_detection = pipeline_state.get("validated_detection") logger.debug(f"๐Ÿ” Camera {camera_id}: FULL_PIPELINE DEBUG - validated_detection = {validated_detection}") logger.debug(f"๐Ÿ” Camera {camera_id}: FULL_PIPELINE DEBUG - pipeline_state keys = {list(pipeline_state.keys())}") if validated_detection: logger.info(f"๐Ÿ”„ Camera {camera_id}: Using validated detection for full pipeline: track_id={validated_detection.get('track_id')}") detection_result = run_pipeline(cropped_frame, model_tree, context=pipeline_context, validated_detection=validated_detection) # Clear the validated detection after using it pipeline_state["validated_detection"] = None else: logger.warning(f"โš ๏ธ Camera {camera_id}: No validated detection found for full pipeline - this shouldn't happen") detection_result = run_pipeline(cropped_frame, model_tree, context=pipeline_context) if detection_result and isinstance(detection_result, dict): # Cache the full pipeline result cached_full_pipeline_results[camera_id] = { "result": detection_result.copy(), "timestamp": time.time() } # Note: Will cache detection_dict after branch processing completes # Store the stable track ID for lightweight monitoring track_id = detection_result.get("track_id") or detection_result.get("id") if track_id is not None: pipeline_state["stable_track_id"] = track_id logger.info(f"๐Ÿ’พ Camera {camera_id}: Cached stable track_id={track_id}") else: logger.warning(f"โš ๏ธ Camera {camera_id}: No track_id found in detection_result: {detection_result.keys()}") # Ensure we have a cached detection dict for lightweight mode if not pipeline_state.get("cached_detection_dict"): # Create fallback cached detection dict if branch processing didn't populate it fallback_detection = { "carModel": None, "carBrand": None, "carYear": None, "bodyType": None, "licensePlateText": None, "licensePlateConfidence": None } pipeline_state["cached_detection_dict"] = fallback_detection logger.warning(f"โš ๏ธ Camera {camera_id}: Created fallback cached detection dict (branch processing may have failed)") # Switch to lightweight mode update_session_pipeline_mode(camera_id, "lightweight") logger.info(f"โœ… Camera {camera_id}: Full pipeline completed - switching to LIGHTWEIGHT mode") elif current_mode == "lightweight": # โ•โ•โ• SIMPLIFIED LIGHTWEIGHT MODE โ•โ•โ• # Send cached detection dict + check for 2 consecutive empty frames to reset stable_track_id = pipeline_state.get("stable_track_id") cached_detection_dict = pipeline_state.get("cached_detection_dict") logger.debug(f"๐Ÿชถ Camera {camera_id}: LIGHTWEIGHT MODE - stable_track_id={stable_track_id}") if not pipeline_state.get("yolo_inference_enabled", True): # YOLO inference disabled - waiting for reset logger.debug(f"๐Ÿ›‘ Camera {camera_id}: YOLO inference disabled - waiting for reset") detection_result = None # Don't send anything else: # Run YOLO inference to check car presence for reset logic from siwatsystem.pympta import run_detection_with_tracking all_detections, regions_dict, track_validation_result = run_detection_with_tracking(cropped_frame, model_tree, pipeline_context) any_car_detected = len(all_detections) > 0 current_tracks = track_validation_result.get("current_tracks", []) if any_car_detected: # Car detected - reset absence counter, continue sending cached detection dict pipeline_state["absence_counter"] = 0 # Reset absence since cars are present if cached_detection_dict: detection_result = cached_detection_dict # Always send cached data logger.info(f"๐Ÿ’พ Camera {camera_id}: LIGHTWEIGHT - car detected, sending cached detection dict") else: logger.warning(f"โš ๏ธ Camera {camera_id}: LIGHTWEIGHT - car detected but no cached detection dict available") detection_result = None else: # No car detected - increment absence counter pipeline_state["absence_counter"] += 1 absence_count = pipeline_state["absence_counter"] max_absence = 3 # Need 3 consecutive empty frames logger.info(f"๐Ÿ‘ป Camera {camera_id}: LIGHTWEIGHT - no car detected (absence {absence_count}/{max_absence})") if absence_count >= max_absence: # SIMPLE RESET CONDITION: 2 consecutive empty frames logger.info(f"๐Ÿ”„ Camera {camera_id}: RESET CONDITION MET - {max_absence} consecutive empty frames") # Clear all state and prepare for next car cached_full_pipeline_results.pop(camera_id, None) pipeline_state["cached_detection_dict"] = None pipeline_state["stable_track_id"] = None pipeline_state["validated_detection"] = None old_absence_counter = pipeline_state["absence_counter"] old_validation_counter = pipeline_state.get("validation_counter", 0) pipeline_state["absence_counter"] = 0 pipeline_state["validation_counter"] = 0 # Clear validation counter pipeline_state["yolo_inference_enabled"] = True logger.info(f"๐Ÿงน Camera {camera_id}: CLEARING ALL COUNTERS - absence_counter: {old_absence_counter}โ†’0, validation_counter: {old_validation_counter}โ†’0") # Clear stability tracking data for this camera from siwatsystem.pympta import reset_camera_stability_tracking reset_camera_stability_tracking(camera_id, model_tree.get("modelId", "unknown")) # Switch back to validation phase update_session_pipeline_mode(camera_id, "validation_detecting") logger.info(f"โœ… Camera {camera_id}: RESET TO VALIDATION COMPLETE - ready for new car") # Now in validation mode - send what YOLO detection finds (will be null since no car) detection_result = {"class": "none", "confidence": 1.0, "bbox": [0, 0, 0, 0]} else: # Still within absence threshold - continue sending cached detection dict if cached_detection_dict: detection_result = cached_detection_dict # Send cached data logger.info(f"โณ Camera {camera_id}: LIGHTWEIGHT - no car but absence<{max_absence}, still sending cached detection dict") else: logger.warning(f"โš ๏ธ Camera {camera_id}: LIGHTWEIGHT - no cached detection dict available") detection_result = None elif current_mode == "car_gone_waiting": # โ•โ•โ• CAR GONE WAITING STATE โ•โ•โ• # Car is gone (both conditions met), YOLO inference disabled, waiting for new session logger.debug(f"๐Ÿ›‘ Camera {camera_id}: CAR GONE WAITING - YOLO inference stopped") # Check if backend has started a new session (indicates new car scenario) if backend_session_id is not None: # Backend started new session - re-enable YOLO and reset to validation pipeline_state["yolo_inference_enabled"] = True pipeline_state["absence_counter"] = 0 pipeline_state["stable_track_id"] = None pipeline_state["cached_detection_dict"] = None pipeline_state["validated_detection"] = None # Clear stability tracking data for this camera from siwatsystem.pympta import reset_camera_stability_tracking reset_camera_stability_tracking(camera_id, model_tree.get("modelId", "unknown")) update_session_pipeline_mode(camera_id, "validation_detecting") logger.info(f"๐Ÿ”„ Camera {camera_id}: New session detected (id={backend_session_id}) - re-enabling YOLO inference") logger.info(f"โœ… Camera {camera_id}: Reset to validation mode - cleared all tracking, ready for new car detection") # Don't run detection this frame - let next frame start fresh detection_result = {"class": "none", "confidence": 1.0, "bbox": [0, 0, 0, 0]} else: # Still waiting - no sessionId, no detection to send logger.debug(f"๐Ÿ›‘ Camera {camera_id}: Car gone waiting - no YOLO inference, no data sent") detection_result = None process_time = (time.time() - start_time) * 1000 logger.debug(f"Detection for camera {camera_id} completed in {process_time:.2f}ms (mode: {current_mode})") # Skip processing if no detection result (blocked by session gating) if detection_result is None: logger.debug(f"No detection result to process for camera {camera_id}") return persistent_data # Log the raw detection result for debugging logger.debug(f"Raw detection result for camera {camera_id}:\n{json.dumps(detection_result, indent=2, default=str)}") # Extract session_id from pipeline result (always use backend sessionId) session_id = backend_session_id logger.debug(f"Using backend session_id: {session_id}") # Process detection result based on current mode if current_mode == "validation_detecting": # โ•โ•โ• VALIDATION DETECTING MODE โ•โ•โ• # Always send detection: null during validation phase detection_dict = None logger.debug(f"๐Ÿ” SENDING 'NONE' - validation_detecting mode for camera {camera_id}") elif current_mode == "send_detections": # โ•โ•โ• SEND DETECTIONS MODE โ•โ•โ• if detection_result.get("class") == "none": # No car detected - send detection: null detection_dict = None logger.debug(f"๐Ÿ“ค SENDING 'NONE' - send_detections mode (no car) for camera {camera_id}") else: # Car detected in send_detections mode - ALWAYS send empty dict to trigger backend sessionId # Purpose: Tell backend "car is here, please create sessionId" detection_dict = {} logger.info(f"๐Ÿ“ค SENDING EMPTY DETECTION_DICT - send_detections mode, requesting backend to create sessionId (conf={detection_result.get('confidence', 0):.3f}) for camera {camera_id}") if backend_session_id: logger.debug(f"๐Ÿ”„ Camera {camera_id}: Note - sessionId {backend_session_id} exists but still in send_detections mode (transition pending)") elif current_mode == "lightweight": # โ•โ•โ• SIMPLIFIED LIGHTWEIGHT MODE DETECTION PROCESSING โ•โ•โ• if detection_result.get("class") == "none": # No car detected - this happens when resetting to validation detection_dict = None # Send detection: null logger.info(f"๐Ÿšซ LIGHTWEIGHT - no car detected, sending detection=null") elif isinstance(detection_result, dict) and ("carBrand" in detection_result or "carModel" in detection_result): # This is a cached detection dict - send it detection_dict = detection_result logger.info(f"๐Ÿ’พ LIGHTWEIGHT - sending cached detection dict") else: logger.warning(f"โš ๏ธ LIGHTWEIGHT - unexpected detection_result type: {type(detection_result)}") detection_dict = None elif detection_result.get("class") == "none": # Other modes - send null to clear session detection_dict = None logger.info(f"๐Ÿ“ค SENDING 'NONE' (detection: null) - Car absent, expecting backend to clear session for camera {camera_id}") elif detection_result and "carBrand" in detection_result: # Handle cached detection dict format (fallback for compatibility) detection_dict = detection_result logger.info(f"๐Ÿ’พ Camera {camera_id}: LIGHTWEIGHT MODE - using detection_result as detection_dict:") logger.info(f"๐Ÿ’พ Camera {camera_id}: - detection_dict: {detection_dict}") else: # Valid detection - convert to backend format (will be populated by branch processing) detection_dict = { "carModel": None, "carBrand": None, "carYear": None, "bodyType": None, "licensePlateText": None, "licensePlateConfidence": None } # Extract and process branch results from parallel classification (only for valid detections, skip cached mode) if detection_result.get("class") != "none" and "branch_results" in detection_result and not detection_result.get("cached_mode", False): def process_branch_results(branch_results, depth=0): """Recursively process branch results including nested branches.""" if not isinstance(branch_results, dict): return indent = " " * depth for branch_id, branch_data in branch_results.items(): if isinstance(branch_data, dict): logger.debug(f"{indent}Processing branch {branch_id}: {branch_data}") # Map common classification fields to backend-expected names if "brand" in branch_data: detection_dict["carBrand"] = branch_data["brand"] logger.debug(f"{indent}Mapped carBrand: {branch_data['brand']}") if "body_type" in branch_data: detection_dict["bodyType"] = branch_data["body_type"] logger.debug(f"{indent}Mapped bodyType: {branch_data['body_type']}") if "class" in branch_data: class_name = branch_data["class"] # Map based on branch/model type if "brand" in branch_id.lower(): detection_dict["carBrand"] = class_name logger.debug(f"{indent}Mapped carBrand from class: {class_name}") elif "bodytype" in branch_id.lower() or "body" in branch_id.lower(): detection_dict["bodyType"] = class_name logger.debug(f"{indent}Mapped bodyType from class: {class_name}") # Process nested branch results recursively if "branch_results" in branch_data: logger.debug(f"{indent}Processing nested branches in {branch_id}") process_branch_results(branch_data["branch_results"], depth + 1) branch_results = detection_result.get("branch_results", {}) if branch_results: logger.debug(f"Processing branch results: {branch_results}") process_branch_results(branch_results) logger.info(f"Detection payload after branch processing: {detection_dict}") # Cache the detection_dict for lightweight mode (after branch processing completes) if current_mode == "full_pipeline": pipeline_state = get_or_init_session_pipeline_state(camera_id) pipeline_state["cached_detection_dict"] = detection_dict.copy() logger.info(f"๐Ÿ’พ Camera {camera_id}: CACHED DETECTION DICT after branch processing: {detection_dict}") else: logger.debug("No branch results found in detection result") detection_data = { "type": "imageDetection", "subscriptionIdentifier": stream["subscriptionIdentifier"], "timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), # "timestamp": time.strftime("%Y-%m-%dT%H:%M:%S", time.gmtime()) + f".{int(time.time() * 1000) % 1000:03d}Z", "data": { "detection": detection_dict, "modelId": stream["modelId"], "modelName": stream["modelName"] } } # SessionId should NEVER be sent from worker to backend - it's uni-directional (backend -> worker only) # Backend manages sessionIds independently based on detection content logger.debug(f"TX message prepared (no sessionId) - detection_dict type: {type(detection_dict)}") # Log detection details for different modes if current_mode == "lightweight": if detection_result and detection_result.get("class") == "none": logger.info(f"๐Ÿšซ Camera {camera_id}: LIGHTWEIGHT - No car detected (resetting to validation)") elif isinstance(detection_result, dict) and ("carBrand" in detection_result or "carModel" in detection_result): logger.info(f"๐Ÿ’พ Camera {camera_id}: LIGHTWEIGHT - Sending cached detection data") else: logger.info(f"๐Ÿชถ Camera {camera_id}: LIGHTWEIGHT - Processing detection") elif detection_result and "class" in detection_result and detection_result.get("class") != "none": confidence = detection_result.get("confidence", 0.0) logger.info(f"๐Ÿš— Camera {camera_id}: Detected {detection_result['class']} with confidence {confidence:.2f} using model {stream['modelName']}") # Send detection data to backend (session gating handled above in processing logic) logger.debug(f"๐Ÿ“ค SENDING TO BACKEND for camera {camera_id}: {json.dumps(detection_data, indent=2)}") try: ws_logger.info(f"TX -> {json.dumps(detection_data, separators=(',', ':'))}") await websocket.send_json(detection_data) logger.debug(f"Sent detection data to client for camera {camera_id}") # Cache the detection data for potential resubscriptions (only if not null detection) if detection_dict is not None and detection_result.get("class") != "none": cached_detections[camera_id] = detection_data.copy() logger.debug(f"Cached detection for camera {camera_id}: {detection_dict}") # Enhanced caching: Store by session_id for LPR integration session_id = detection_data.get('sessionId') if session_id: session_id_str = str(session_id) session_detections[session_id_str] = detection_data.copy() session_to_camera[session_id_str] = camera_id detection_timestamps[session_id_str] = time.time() logger.debug(f"๐Ÿ”‘ Cached detection for LPR by session_id {session_id_str}: {camera_id}") else: # Don't cache null/none detections - let them reset properly cached_detections.pop(camera_id, None) logger.debug(f"Not caching null/none detection for camera {camera_id}") except RuntimeError as e: if "websocket.close" in str(e): logger.warning(f"WebSocket connection closed - cannot send detection data for camera {camera_id}") return persistent_data else: raise # Log status after sending (no sessionId sent to backend) if detection_dict is None: logger.info(f"๐Ÿ“ก SENT 'none' detection - backend should clear session for camera {camera_id}") elif detection_dict == {}: logger.info(f"๐Ÿ“ก SENT empty detection - backend should create sessionId for camera {camera_id}") else: logger.info(f"๐Ÿ“ก SENT detection data - backend manages sessionId independently for camera {camera_id}") return persistent_data except Exception as e: logger.error(f"Error in handle_detection for camera {camera_id}: {str(e)}", exc_info=True) return persistent_data def frame_reader(camera_id, cap, buffer, stop_event): retries = 0 logger.info(f"Starting frame reader thread for camera {camera_id}") frame_count = 0 last_log_time = time.time() try: # Log initial camera status and properties if cap.isOpened(): width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) fps = cap.get(cv2.CAP_PROP_FPS) logger.info(f"Camera {camera_id} opened successfully with resolution {width}x{height}, FPS: {fps}") set_camera_connected(camera_id, True) else: logger.error(f"Camera {camera_id} failed to open initially") set_camera_connected(camera_id, False, "Failed to open camera initially") while not stop_event.is_set(): try: if not cap.isOpened(): logger.error(f"Camera {camera_id} is not open before trying to read") # Attempt to reopen cap = cv2.VideoCapture(streams[camera_id]["rtsp_url"]) time.sleep(reconnect_interval) continue logger.debug(f"Attempting to read frame from camera {camera_id}") ret, frame = cap.read() if not ret: error_msg = f"Connection lost for camera: {camera_id}, retry {retries+1}/{max_retries}" logger.warning(error_msg) set_camera_connected(camera_id, False, error_msg) cap.release() time.sleep(reconnect_interval) retries += 1 if retries > max_retries and max_retries != -1: logger.error(f"Max retries reached for camera: {camera_id}, stopping frame reader") set_camera_connected(camera_id, False, "Max retries reached") break # Re-open logger.info(f"Attempting to reopen RTSP stream for camera: {camera_id}") cap = cv2.VideoCapture(streams[camera_id]["rtsp_url"]) if not cap.isOpened(): logger.error(f"Failed to reopen RTSP stream for camera: {camera_id}") set_camera_connected(camera_id, False, "Failed to reopen RTSP stream") continue logger.info(f"Successfully reopened RTSP stream for camera: {camera_id}") set_camera_connected(camera_id, True) continue # Successfully read a frame frame_count += 1 current_time = time.time() # Log frame stats every 5 seconds if current_time - last_log_time > 5: logger.info(f"Camera {camera_id}: Read {frame_count} frames in the last {current_time - last_log_time:.1f} seconds") frame_count = 0 last_log_time = current_time logger.debug(f"Successfully read frame from camera {camera_id}, shape: {frame.shape}") retries = 0 set_camera_connected(camera_id, True) # Mark as connected on successful frame read # Overwrite old frame if buffer is full if not buffer.empty(): try: buffer.get_nowait() logger.debug(f"[frame_reader] Removed old frame from buffer for camera {camera_id}") except queue.Empty: pass buffer.put(frame) logger.debug(f"[frame_reader] Added new frame to buffer for camera {camera_id}. Buffer size: {buffer.qsize()}") # Short sleep to avoid CPU overuse time.sleep(0.01) except cv2.error as e: error_msg = f"OpenCV error for camera {camera_id}: {e}" logger.error(error_msg, exc_info=True) set_camera_connected(camera_id, False, error_msg) cap.release() time.sleep(reconnect_interval) retries += 1 if retries > max_retries and max_retries != -1: logger.error(f"Max retries reached after OpenCV error for camera {camera_id}") set_camera_connected(camera_id, False, "Max retries reached after OpenCV error") break logger.info(f"Attempting to reopen RTSP stream after OpenCV error for camera: {camera_id}") cap = cv2.VideoCapture(streams[camera_id]["rtsp_url"]) if not cap.isOpened(): logger.error(f"Failed to reopen RTSP stream for camera {camera_id} after OpenCV error") set_camera_connected(camera_id, False, "Failed to reopen after OpenCV error") continue logger.info(f"Successfully reopened RTSP stream after OpenCV error for camera: {camera_id}") set_camera_connected(camera_id, True) except Exception as e: error_msg = f"Unexpected error for camera {camera_id}: {str(e)}" logger.error(error_msg, exc_info=True) set_camera_connected(camera_id, False, error_msg) cap.release() break except Exception as e: logger.error(f"Error in frame_reader thread for camera {camera_id}: {str(e)}", exc_info=True) finally: logger.info(f"Frame reader thread for camera {camera_id} is exiting") if cap and cap.isOpened(): cap.release() def snapshot_reader(camera_id, snapshot_url, snapshot_interval, buffer, stop_event): """Frame reader that fetches snapshots from HTTP/HTTPS URL at specified intervals""" retries = 0 consecutive_failures = 0 # Track consecutive failures for backoff logger.info(f"Starting snapshot reader thread for camera {camera_id} from {snapshot_url}") frame_count = 0 last_log_time = time.time() # Initialize camera state set_camera_connected(camera_id, True) try: interval_seconds = snapshot_interval / 1000.0 # Convert milliseconds to seconds logger.info(f"Snapshot interval for camera {camera_id}: {interval_seconds}s") while not stop_event.is_set(): try: start_time = time.time() frame = fetch_snapshot(snapshot_url) if frame is None: consecutive_failures += 1 error_msg = f"Failed to fetch snapshot for camera: {camera_id}, consecutive failures: {consecutive_failures}" logger.warning(error_msg) set_camera_connected(camera_id, False, error_msg) retries += 1 # Check network connectivity with a simple ping-like test if consecutive_failures % 5 == 1: # Every 5th failure, test connectivity try: test_response = requests.get(snapshot_url, timeout=(2, 5), stream=False) logger.info(f"Camera {camera_id}: Connectivity test result: {test_response.status_code}") except Exception as test_error: logger.warning(f"Camera {camera_id}: Connectivity test failed: {test_error}") if retries > max_retries and max_retries != -1: logger.error(f"Max retries reached for snapshot camera: {camera_id}, stopping reader") set_camera_connected(camera_id, False, "Max retries reached for snapshot camera") break # Exponential backoff based on consecutive failures backoff_delay = min(30, max(1, min(2 ** min(consecutive_failures - 1, 6), interval_seconds * 2))) # Start with 1s, max 30s logger.debug(f"Camera {camera_id}: Backing off for {backoff_delay:.1f}s (consecutive failures: {consecutive_failures})") if stop_event.wait(backoff_delay): # Use wait with timeout instead of sleep break # Exit if stop_event is set during backoff continue # Successfully fetched a frame - reset consecutive failures consecutive_failures = 0 # Reset backoff on success frame_count += 1 current_time = time.time() # Log frame stats every 5 seconds if current_time - last_log_time > 5: logger.info(f"Camera {camera_id}: Fetched {frame_count} snapshots in the last {current_time - last_log_time:.1f} seconds") frame_count = 0 last_log_time = current_time logger.debug(f"Successfully fetched snapshot from camera {camera_id}, shape: {frame.shape}") retries = 0 set_camera_connected(camera_id, True) # Mark as connected on successful snapshot # Overwrite old frame if buffer is full if not buffer.empty(): try: buffer.get_nowait() logger.debug(f"[snapshot_reader] Removed old snapshot from buffer for camera {camera_id}") except queue.Empty: pass buffer.put(frame) logger.debug(f"[snapshot_reader] Added new snapshot to buffer for camera {camera_id}. Buffer size: {buffer.qsize()}") # Wait for the specified interval elapsed = time.time() - start_time sleep_time = max(interval_seconds - elapsed, 0) if sleep_time > 0: time.sleep(sleep_time) except Exception as e: consecutive_failures += 1 error_msg = f"Unexpected error fetching snapshot for camera {camera_id}: {str(e)}" logger.error(error_msg, exc_info=True) set_camera_connected(camera_id, False, error_msg) retries += 1 if retries > max_retries and max_retries != -1: logger.error(f"Max retries reached after error for snapshot camera {camera_id}") set_camera_connected(camera_id, False, "Max retries reached after error") break # Exponential backoff for exceptions too backoff_delay = min(30, max(1, min(2 ** min(consecutive_failures - 1, 6), interval_seconds * 2))) # Start with 1s, max 30s logger.debug(f"Camera {camera_id}: Exception backoff for {backoff_delay:.1f}s (consecutive failures: {consecutive_failures})") if stop_event.wait(backoff_delay): # Use wait with timeout instead of sleep break # Exit if stop_event is set during backoff except Exception as e: logger.error(f"Error in snapshot_reader thread for camera {camera_id}: {str(e)}", exc_info=True) finally: logger.info(f"Snapshot reader thread for camera {camera_id} is exiting") async def reconcile_subscriptions(desired_subscriptions, websocket): """ Declarative reconciliation: Compare desired vs current subscriptions and make changes """ logger.info(f"Reconciling subscriptions: {len(desired_subscriptions)} desired") with streams_lock: # Get current subscriptions current_subscription_ids = set(streams.keys()) desired_subscription_ids = set(sub["subscriptionIdentifier"] for sub in desired_subscriptions) # Find what to add and remove to_add = desired_subscription_ids - current_subscription_ids to_remove = current_subscription_ids - desired_subscription_ids to_check_for_changes = current_subscription_ids & desired_subscription_ids logger.info(f"Reconciliation: {len(to_add)} to add, {len(to_remove)} to remove, {len(to_check_for_changes)} to check for changes") # Remove subscriptions that are no longer wanted for subscription_id in to_remove: await unsubscribe_internal(subscription_id) # Check existing subscriptions for parameter changes for subscription_id in to_check_for_changes: desired_sub = next(sub for sub in desired_subscriptions if sub["subscriptionIdentifier"] == subscription_id) current_stream = streams[subscription_id] # Check if parameters changed if has_subscription_changed(desired_sub, current_stream): logger.info(f"Parameters changed for {subscription_id}, resubscribing") logger.debug(f"Parameter comparison for {subscription_id}:") logger.debug(f" rtspUrl: '{desired_sub.get('rtspUrl')}' vs '{current_stream.get('rtsp_url')}'") logger.debug(f" snapshotUrl: '{desired_sub.get('snapshotUrl')}' vs '{current_stream.get('snapshot_url')}'") logger.debug(f" modelUrl: '{extract_model_file_identifier(desired_sub.get('modelUrl'))}' vs '{extract_model_file_identifier(current_stream.get('modelUrl'))}'") logger.debug(f" modelId: {desired_sub.get('modelId')} vs {current_stream.get('modelId')}") # Preserve detection state for resubscription cached_detection = cached_detections.get(subscription_id) logger.debug(f"Preserving detection state for resubscription: {cached_detection is not None}") await unsubscribe_internal(subscription_id, preserve_detection=True) await subscribe_internal(desired_sub, websocket, cached_detection=cached_detection) # Add new subscriptions for subscription_id in to_add: desired_sub = next(sub for sub in desired_subscriptions if sub["subscriptionIdentifier"] == subscription_id) await subscribe_internal(desired_sub, websocket) def extract_model_file_identifier(model_url): """Extract the core model file identifier from S3 URLs, ignoring timestamp parameters""" if not model_url: return None # For S3 URLs, extract just the path portion before query parameters try: from urllib.parse import urlparse parsed = urlparse(model_url) # Return the path which contains the actual model file identifier # e.g. "/adsist-cms-staging/models/bangchak_poc-1756312318569.mpta" return parsed.path except Exception as e: logger.warning(f"Failed to parse model URL {model_url}: {e}") return model_url def has_subscription_changed(desired_sub, current_stream): """Check if subscription parameters have changed""" # Smart model URL comparison - ignore timestamp changes in signed URLs desired_model_id = extract_model_file_identifier(desired_sub.get("modelUrl")) current_model_id = extract_model_file_identifier(current_stream.get("modelUrl")) return ( desired_sub.get("rtspUrl") != current_stream.get("rtsp_url") or desired_sub.get("snapshotUrl") != current_stream.get("snapshot_url") or desired_sub.get("snapshotInterval") != current_stream.get("snapshot_interval") or desired_sub.get("cropX1") != current_stream.get("cropX1") or desired_sub.get("cropY1") != current_stream.get("cropY1") or desired_sub.get("cropX2") != current_stream.get("cropX2") or desired_sub.get("cropY2") != current_stream.get("cropY2") or desired_sub.get("modelId") != current_stream.get("modelId") or desired_sub.get("modelName") != current_stream.get("modelName") or desired_model_id != current_model_id ) async def subscribe_internal(subscription, websocket, cached_detection=None): """Internal subscription logic extracted from original subscribe handler""" subscriptionIdentifier = subscription.get("subscriptionIdentifier") rtsp_url = subscription.get("rtspUrl") snapshot_url = subscription.get("snapshotUrl") snapshot_interval = subscription.get("snapshotInterval") model_url = subscription.get("modelUrl") modelId = subscription.get("modelId") modelName = subscription.get("modelName") cropX1 = subscription.get("cropX1") cropY1 = subscription.get("cropY1") cropX2 = subscription.get("cropX2") cropY2 = subscription.get("cropY2") # Extract camera_id from subscriptionIdentifier parts = subscriptionIdentifier.split(';') if len(parts) != 2: logger.error(f"Invalid subscriptionIdentifier format: {subscriptionIdentifier}") return display_identifier, camera_identifier = parts camera_id = subscriptionIdentifier # Load model if needed if model_url: with models_lock: if (camera_id not in models) or (modelId not in models[camera_id]): logger.info(f"Loading model from {model_url} for camera {camera_id}, modelId {modelId}") extraction_dir = os.path.join("models", camera_identifier, str(modelId)) os.makedirs(extraction_dir, exist_ok=True) # Handle model loading (same as original) parsed = urlparse(model_url) if parsed.scheme in ("http", "https"): filename = os.path.basename(parsed.path) or f"model_{modelId}.mpta" local_mpta = os.path.join(extraction_dir, filename) local_path = download_mpta(model_url, local_mpta) if not local_path: logger.error(f"Failed to download model from {model_url}") return model_tree = load_pipeline_from_zip(local_path, extraction_dir) else: if not os.path.exists(model_url): logger.error(f"Model file not found: {model_url}") return model_tree = load_pipeline_from_zip(model_url, extraction_dir) if model_tree is None: logger.error(f"Failed to load model {modelId}") return if camera_id not in models: models[camera_id] = {} models[camera_id][modelId] = model_tree # Start LPR integration threads after first model is loaded (only once) global lpr_integration_started if not lpr_integration_started and hasattr(model_tree, 'get') and model_tree.get('redis_client'): try: start_lpr_integration() lpr_integration_started = True logger.info("๐Ÿš€ LPR integration started after first model load") except Exception as e: logger.error(f"โŒ Failed to start LPR integration: {e}") # Create stream (same logic as original) if camera_id and (rtsp_url or snapshot_url) and len(streams) < max_streams: camera_url = snapshot_url if snapshot_url else rtsp_url # Check if we already have a stream for this camera URL shared_stream = camera_streams.get(camera_url) if shared_stream: # Reuse existing stream buffer = shared_stream["buffer"] stop_event = shared_stream["stop_event"] thread = shared_stream["thread"] mode = shared_stream["mode"] shared_stream["ref_count"] = shared_stream.get("ref_count", 0) + 1 else: # Create new stream buffer = queue.Queue(maxsize=1) stop_event = threading.Event() if snapshot_url and snapshot_interval: thread = threading.Thread(target=snapshot_reader, args=(camera_id, snapshot_url, snapshot_interval, buffer, stop_event)) thread.daemon = True thread.start() mode = "snapshot" shared_stream = { "buffer": buffer, "thread": thread, "stop_event": stop_event, "mode": mode, "url": snapshot_url, "snapshot_interval": snapshot_interval, "ref_count": 1 } camera_streams[camera_url] = shared_stream elif rtsp_url: cap = cv2.VideoCapture(rtsp_url) if not cap.isOpened(): logger.error(f"Failed to open RTSP stream for camera {camera_id}") return thread = threading.Thread(target=frame_reader, args=(camera_id, cap, buffer, stop_event)) thread.daemon = True thread.start() mode = "rtsp" shared_stream = { "buffer": buffer, "thread": thread, "stop_event": stop_event, "mode": mode, "url": rtsp_url, "cap": cap, "ref_count": 1 } camera_streams[camera_url] = shared_stream else: logger.error(f"No valid URL provided for camera {camera_id}") return # Create stream info stream_info = { "buffer": buffer, "thread": thread, "stop_event": stop_event, "modelId": modelId, "modelName": modelName, "subscriptionIdentifier": subscriptionIdentifier, "cropX1": cropX1, "cropY1": cropY1, "cropX2": cropX2, "cropY2": cropY2, "mode": mode, "camera_url": camera_url, "modelUrl": model_url, # Always store both URLs for comparison consistency "rtsp_url": rtsp_url, "snapshot_url": snapshot_url, "snapshot_interval": snapshot_interval } if mode == "rtsp": stream_info["cap"] = shared_stream["cap"] streams[camera_id] = stream_info subscription_to_camera[camera_id] = camera_url logger.info(f"Subscribed to camera {camera_id}") # Send initial detection to backend - use cached if available, otherwise "none" if cached_detection: # Restore cached detection with updated timestamp (RESUBSCRIPTION STATUS UPDATE) initial_detection_data = cached_detection.copy() initial_detection_data["timestamp"] = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) logger.info(f"๐Ÿ“ก RESUBSCRIPTION: Restoring cached detection for camera {camera_id}") logger.debug(f"๐Ÿ“ก RESUBSCRIPTION: Cached detection has sessionId: {initial_detection_data.get('sessionId', 'None')}") else: # Send "none" detection for new subscriptions initial_detection_data = { "type": "imageDetection", "subscriptionIdentifier": subscriptionIdentifier, "timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), "data": { "detection": None, "modelId": modelId, "modelName": modelName } } logger.info(f"๐Ÿ“ก NEW SUBSCRIPTION: Sending initial 'none' detection for camera {camera_id}") ws_logger.info(f"TX -> {json.dumps(initial_detection_data, separators=(',', ':'))}") await websocket.send_json(initial_detection_data) logger.debug(f"Initial detection data sent (resubscription={cached_detection is not None}): {initial_detection_data}") # This cached detection was just a one-time status update for resubscription # Normal frame processing will continue independently async def unsubscribe_internal(subscription_id, preserve_detection=False): """Internal unsubscription logic""" if subscription_id in streams: stream = streams.pop(subscription_id) camera_url = subscription_to_camera.pop(subscription_id, None) if camera_url and camera_url in camera_streams: shared_stream = camera_streams[camera_url] shared_stream["ref_count"] -= 1 if shared_stream["ref_count"] <= 0: shared_stream["stop_event"].set() shared_stream["thread"].join() if "cap" in shared_stream: shared_stream["cap"].release() del camera_streams[camera_url] latest_frames.pop(subscription_id, None) if not preserve_detection: cached_detections.pop(subscription_id, None) # Clear cached detection only if not preserving frame_skip_flags.pop(subscription_id, None) # Clear frame skip flag camera_states.pop(subscription_id, None) # Clear camera state cached_full_pipeline_results.pop(subscription_id, None) # Clear cached pipeline results session_pipeline_states.pop(subscription_id, None) # Clear session pipeline state cleanup_camera_stability(subscription_id) logger.info(f"Unsubscribed from camera {subscription_id} (preserve_detection={preserve_detection})") async def process_streams(): logger.info("Started processing streams") try: while True: start_time = time.time() with streams_lock: current_streams = list(streams.items()) if current_streams: logger.debug(f"Processing {len(current_streams)} active streams") else: logger.debug("No active streams to process") for camera_id, stream in current_streams: buffer = stream["buffer"] if buffer.empty(): logger.debug(f"Frame buffer is empty for camera {camera_id}") continue logger.debug(f"Got frame from buffer for camera {camera_id}") frame = buffer.get() # Cache the frame for REST API access latest_frames[camera_id] = frame.copy() logger.debug(f"Cached frame for REST API access for camera {camera_id}") with models_lock: model_tree = models.get(camera_id, {}).get(stream["modelId"]) if not model_tree: logger.warning(f"Model not found for camera {camera_id}, modelId {stream['modelId']}") continue logger.debug(f"Found model tree for camera {camera_id}, modelId {stream['modelId']}") key = (camera_id, stream["modelId"]) persistent_data = persistent_data_dict.get(key, {}) logger.debug(f"Starting detection for camera {camera_id} with modelId {stream['modelId']}") updated_persistent_data = await handle_detection( camera_id, stream, frame, websocket, model_tree, persistent_data ) persistent_data_dict[key] = updated_persistent_data elapsed_time = (time.time() - start_time) * 1000 # ms sleep_time = max(poll_interval - elapsed_time, 0) logger.debug(f"Frame processing cycle: {elapsed_time:.2f}ms, sleeping for: {sleep_time:.2f}ms") await asyncio.sleep(sleep_time / 1000.0) except asyncio.CancelledError: logger.info("Stream processing task cancelled") except Exception as e: logger.error(f"Error in process_streams: {str(e)}", exc_info=True) async def send_heartbeat(): while True: try: cpu_usage = psutil.cpu_percent() memory_usage = psutil.virtual_memory().percent if torch.cuda.is_available(): gpu_usage = torch.cuda.utilization() if hasattr(torch.cuda, 'utilization') else None gpu_memory_usage = torch.cuda.memory_reserved() / (1024 ** 2) else: gpu_usage = None gpu_memory_usage = None camera_connections = [ { "subscriptionIdentifier": stream["subscriptionIdentifier"], "modelId": stream["modelId"], "modelName": stream["modelName"], "online": True, # Include all subscription parameters for proper change detection "rtspUrl": stream.get("rtsp_url"), "snapshotUrl": stream.get("snapshot_url"), "snapshotInterval": stream.get("snapshot_interval"), **{k: v for k, v in get_crop_coords(stream).items() if v is not None} } for camera_id, stream in streams.items() ] state_report = { "type": "stateReport", "cpuUsage": cpu_usage, "memoryUsage": memory_usage, "gpuUsage": gpu_usage, "gpuMemoryUsage": gpu_memory_usage, "cameraConnections": camera_connections } await websocket.send_text(json.dumps(state_report)) logger.debug(f"Sent stateReport as heartbeat: CPU {cpu_usage:.1f}%, Memory {memory_usage:.1f}%, {len(camera_connections)} active cameras") await asyncio.sleep(HEARTBEAT_INTERVAL) except Exception as e: logger.error(f"Error sending stateReport heartbeat: {e}") break async def on_message(): while True: try: msg = await websocket.receive_text() ws_logger.info(f"RX <- {msg}") logger.debug(f"Received message: {msg}") data = json.loads(msg) msg_type = data.get("type") if msg_type == "setSubscriptionList": # Declarative approach: Backend sends list of subscriptions this worker should have desired_subscriptions = data.get("subscriptions", []) logger.info(f"Received subscription list with {len(desired_subscriptions)} subscriptions") await reconcile_subscriptions(desired_subscriptions, websocket) elif msg_type == "subscribe": # Legacy support - convert single subscription to list payload = data.get("payload", {}) await reconcile_subscriptions([payload], websocket) elif msg_type == "unsubscribe": # Legacy support - remove subscription payload = data.get("payload", {}) subscriptionIdentifier = payload.get("subscriptionIdentifier") # Remove from current subscriptions and reconcile current_subs = [] with streams_lock: for camera_id, stream in streams.items(): if stream["subscriptionIdentifier"] != subscriptionIdentifier: # Convert stream back to subscription format current_subs.append({ "subscriptionIdentifier": stream["subscriptionIdentifier"], "rtspUrl": stream.get("rtsp_url"), "snapshotUrl": stream.get("snapshot_url"), "snapshotInterval": stream.get("snapshot_interval"), "modelId": stream["modelId"], "modelName": stream["modelName"], "modelUrl": stream.get("modelUrl", ""), "cropX1": stream.get("cropX1"), "cropY1": stream.get("cropY1"), "cropX2": stream.get("cropX2"), "cropY2": stream.get("cropY2") }) await reconcile_subscriptions(current_subs, websocket) elif msg_type == "old_subscribe_logic_removed": if model_url: with models_lock: if (camera_id not in models) or (modelId not in models[camera_id]): logger.info(f"Loading model from {model_url} for camera {camera_id}, modelId {modelId}") extraction_dir = os.path.join("models", camera_identifier, str(modelId)) os.makedirs(extraction_dir, exist_ok=True) # If model_url is remote, download it first. parsed = urlparse(model_url) if parsed.scheme in ("http", "https"): logger.info(f"Downloading remote .mpta file from {model_url}") filename = os.path.basename(parsed.path) or f"model_{modelId}.mpta" local_mpta = os.path.join(extraction_dir, filename) logger.debug(f"Download destination: {local_mpta}") local_path = download_mpta(model_url, local_mpta) if not local_path: logger.error(f"Failed to download the remote .mpta file from {model_url}") error_response = { "type": "error", "subscriptionIdentifier": subscriptionIdentifier, "error": f"Failed to download model from {model_url}" } ws_logger.info(f"TX -> {json.dumps(error_response, separators=(',', ':'))}") await websocket.send_json(error_response) continue model_tree = load_pipeline_from_zip(local_path, extraction_dir) else: logger.info(f"Loading local .mpta file from {model_url}") # Check if file exists before attempting to load if not os.path.exists(model_url): logger.error(f"Local .mpta file not found: {model_url}") logger.debug(f"Current working directory: {os.getcwd()}") error_response = { "type": "error", "subscriptionIdentifier": subscriptionIdentifier, "error": f"Model file not found: {model_url}" } ws_logger.info(f"TX -> {json.dumps(error_response, separators=(',', ':'))}") await websocket.send_json(error_response) continue model_tree = load_pipeline_from_zip(model_url, extraction_dir) if model_tree is None: logger.error(f"Failed to load model {modelId} from .mpta file for camera {camera_id}") error_response = { "type": "error", "subscriptionIdentifier": subscriptionIdentifier, "error": f"Failed to load model {modelId}" } await websocket.send_json(error_response) continue if camera_id not in models: models[camera_id] = {} models[camera_id][modelId] = model_tree logger.info(f"Successfully loaded model {modelId} for camera {camera_id}") logger.debug(f"Model extraction directory: {extraction_dir}") # Start LPR integration threads after first model is loaded (only once) if not lpr_integration_started and hasattr(model_tree, 'get') and model_tree.get('redis_client'): try: start_lpr_integration() lpr_integration_started = True logger.info("๐Ÿš€ LPR integration started after first model load") except Exception as e: logger.error(f"โŒ Failed to start LPR integration: {e}") if camera_id and (rtsp_url or snapshot_url): with streams_lock: # Determine camera URL for shared stream management camera_url = snapshot_url if snapshot_url else rtsp_url if camera_id not in streams and len(streams) < max_streams: # Check if we already have a stream for this camera URL shared_stream = camera_streams.get(camera_url) if shared_stream: # Reuse existing stream logger.info(f"Reusing existing stream for camera URL: {camera_url}") buffer = shared_stream["buffer"] stop_event = shared_stream["stop_event"] thread = shared_stream["thread"] mode = shared_stream["mode"] # Increment reference count shared_stream["ref_count"] = shared_stream.get("ref_count", 0) + 1 else: # Create new stream buffer = queue.Queue(maxsize=1) stop_event = threading.Event() if snapshot_url and snapshot_interval: logger.info(f"Creating new snapshot stream for camera {camera_id}: {snapshot_url}") thread = threading.Thread(target=snapshot_reader, args=(camera_id, snapshot_url, snapshot_interval, buffer, stop_event)) thread.daemon = True thread.start() mode = "snapshot" # Store shared stream info shared_stream = { "buffer": buffer, "thread": thread, "stop_event": stop_event, "mode": mode, "url": snapshot_url, "snapshot_interval": snapshot_interval, "ref_count": 1 } camera_streams[camera_url] = shared_stream elif rtsp_url: logger.info(f"Creating new RTSP stream for camera {camera_id}: {rtsp_url}") cap = cv2.VideoCapture(rtsp_url) if not cap.isOpened(): logger.error(f"Failed to open RTSP stream for camera {camera_id}") continue thread = threading.Thread(target=frame_reader, args=(camera_id, cap, buffer, stop_event)) thread.daemon = True thread.start() mode = "rtsp" # Store shared stream info shared_stream = { "buffer": buffer, "thread": thread, "stop_event": stop_event, "mode": mode, "url": rtsp_url, "cap": cap, "ref_count": 1 } camera_streams[camera_url] = shared_stream else: logger.error(f"No valid URL provided for camera {camera_id}") continue # Create stream info for this subscription stream_info = { "buffer": buffer, "thread": thread, "stop_event": stop_event, "modelId": modelId, "modelName": modelName, "subscriptionIdentifier": subscriptionIdentifier, "cropX1": cropX1, "cropY1": cropY1, "cropX2": cropX2, "cropY2": cropY2, "mode": mode, "camera_url": camera_url } if mode == "snapshot": stream_info["snapshot_url"] = snapshot_url stream_info["snapshot_interval"] = snapshot_interval elif mode == "rtsp": stream_info["rtsp_url"] = rtsp_url stream_info["cap"] = shared_stream["cap"] streams[camera_id] = stream_info subscription_to_camera[camera_id] = camera_url elif camera_id and camera_id in streams: # If already subscribed, unsubscribe first logger.info(f"Resubscribing to camera {camera_id}") # Note: Keep models in memory for reuse across subscriptions elif msg_type == "unsubscribe": payload = data.get("payload", {}) subscriptionIdentifier = payload.get("subscriptionIdentifier") camera_id = subscriptionIdentifier with streams_lock: if camera_id and camera_id in streams: stream = streams.pop(camera_id) camera_url = subscription_to_camera.pop(camera_id, None) if camera_url and camera_url in camera_streams: shared_stream = camera_streams[camera_url] shared_stream["ref_count"] -= 1 # If no more references, stop the shared stream if shared_stream["ref_count"] <= 0: logger.info(f"Stopping shared stream for camera URL: {camera_url}") shared_stream["stop_event"].set() shared_stream["thread"].join() if "cap" in shared_stream: shared_stream["cap"].release() del camera_streams[camera_url] else: logger.info(f"Shared stream for {camera_url} still has {shared_stream['ref_count']} references") # Clean up cached frame and stability tracking latest_frames.pop(camera_id, None) cached_detections.pop(camera_id, None) # Clear cached detection frame_skip_flags.pop(camera_id, None) # Clear frame skip flag camera_states.pop(camera_id, None) # Clear camera state cleanup_camera_stability(camera_id) logger.info(f"Unsubscribed from camera {camera_id}") # Note: Keep models in memory for potential reuse elif msg_type == "requestState": cpu_usage = psutil.cpu_percent() memory_usage = psutil.virtual_memory().percent if torch.cuda.is_available(): gpu_usage = torch.cuda.utilization() if hasattr(torch.cuda, 'utilization') else None gpu_memory_usage = torch.cuda.memory_reserved() / (1024 ** 2) else: gpu_usage = None gpu_memory_usage = None camera_connections = [ { "subscriptionIdentifier": stream["subscriptionIdentifier"], "modelId": stream["modelId"], "modelName": stream["modelName"], "online": True, # Include all subscription parameters for proper change detection "rtspUrl": stream.get("rtsp_url"), "snapshotUrl": stream.get("snapshot_url"), "snapshotInterval": stream.get("snapshot_interval"), **{k: v for k, v in get_crop_coords(stream).items() if v is not None} } for camera_id, stream in streams.items() ] state_report = { "type": "stateReport", "cpuUsage": cpu_usage, "memoryUsage": memory_usage, "gpuUsage": gpu_usage, "gpuMemoryUsage": gpu_memory_usage, "cameraConnections": camera_connections } await websocket.send_text(json.dumps(state_report)) elif msg_type == "setSessionId": payload = data.get("payload", {}) display_identifier = payload.get("displayIdentifier") session_id = payload.get("sessionId") # Debug sessionId value types and contents session_id_type = type(session_id).__name__ if session_id is None: logger.info(f"๐Ÿ†” BACKEND SESSIONID RECEIVED: displayId={display_identifier}, sessionId=None (type: {session_id_type})") logger.info(f"๐Ÿ”„ BACKEND WANTS TO CLEAR SESSION for display {display_identifier}") elif session_id == "null": logger.info(f"๐Ÿ†” BACKEND SESSIONID RECEIVED: displayId={display_identifier}, sessionId='null' (type: {session_id_type})") logger.info(f"๐Ÿ”„ BACKEND SENT STRING 'null' for display {display_identifier}") elif session_id == "": logger.info(f"๐Ÿ†” BACKEND SESSIONID RECEIVED: displayId={display_identifier}, sessionId='' (empty string, type: {session_id_type})") logger.info(f"๐Ÿ”„ BACKEND SENT EMPTY STRING for display {display_identifier}") else: logger.info(f"๐Ÿ†” BACKEND SESSIONID RECEIVED: displayId={display_identifier}, sessionId='{session_id}' (type: {session_id_type}, length: {len(str(session_id))})") logger.info(f"๐Ÿ”„ BACKEND CREATED/UPDATED SESSION: {session_id} for display {display_identifier}") logger.debug(f"Full setSessionId payload: {payload}") logger.debug(f"WebSocket message raw data: {json.dumps(data, indent=2)}") logger.debug(f"Current active cameras: {list(streams.keys())}") if display_identifier: # Store session ID for this display if session_id is None or session_id == "null" or session_id == "": old_session_id = session_ids.get(display_identifier) session_ids.pop(display_identifier, None) if session_id is None: logger.info(f"๐Ÿšซ BACKEND ENDED SESSION: Cleared session ID for display {display_identifier} (was: {old_session_id}) - received None") elif session_id == "null": logger.info(f"๐Ÿšซ BACKEND ENDED SESSION: Cleared session ID for display {display_identifier} (was: {old_session_id}) - received string 'null'") elif session_id == "": logger.info(f"๐Ÿšซ BACKEND ENDED SESSION: Cleared session ID for display {display_identifier} (was: {old_session_id}) - received empty string") logger.debug(f"Session IDs after clearing: {session_ids}") # Reset tracking state for all cameras associated with this display with streams_lock: affected_cameras = [] for camera_id, stream in streams.items(): if stream["subscriptionIdentifier"].startswith(display_identifier + ";"): affected_cameras.append(camera_id) # Import here to avoid circular import from siwatsystem.pympta import reset_tracking_state model_id = stream.get("modelId", "unknown") reset_tracking_state(camera_id, model_id, "backend session ended") logger.info(f"Reset tracking for camera {camera_id} (display: {display_identifier})") logger.debug(f"Reset tracking for {len(affected_cameras)} cameras: {affected_cameras}") else: old_session_id = session_ids.get(display_identifier) session_ids[display_identifier] = session_id logger.info(f"โœ… BACKEND SESSION STARTED: Set session ID {session_id} for display {display_identifier} (previous: {old_session_id})") logger.debug(f"Session IDs after update: {session_ids}") logger.debug(f"๐ŸŽฏ CMS Backend created sessionId {session_id} after receiving detection data") # ๐Ÿ”‘ LPR Integration: Retroactively cache the last detection by this new session_id session_id_str = str(session_id) logger.info(f"๐Ÿ”‘ LPR: Attempting to retroactively cache detection for session_id {session_id_str}") # Find cameras associated with this display display_cameras = [] with streams_lock: for camera_id, stream in streams.items(): if stream["subscriptionIdentifier"].startswith(display_identifier + ";"): display_cameras.append(camera_id) logger.debug(f"๐Ÿ” Found {len(display_cameras)} cameras for display {display_identifier}: {display_cameras}") # Cache the most recent detection for each camera by the new session_id cached_count = 0 for camera_id in display_cameras: if camera_id in cached_detections: detection_data = cached_detections[camera_id].copy() # Add sessionId to the detection data detection_data['sessionId'] = session_id # Cache by session_id for LPR lookup session_detections[session_id_str] = detection_data session_to_camera[session_id_str] = camera_id detection_timestamps[session_id_str] = time.time() cached_count += 1 logger.info(f"โœ… LPR: Cached detection for session_id {session_id_str} -> camera {camera_id}") logger.debug(f"๐Ÿ” Detection data: {detection_data.get('data', {}).get('detection', {})}") else: logger.debug(f"โš ๏ธ No cached detection available for camera {camera_id}") if cached_count > 0: logger.info(f"๐ŸŽ‰ LPR: Successfully cached {cached_count} detection(s) for session_id {session_id_str}") logger.info(f"๐Ÿ“Š Total LPR sessions now cached: {len(session_detections)}") else: logger.warning(f"โš ๏ธ LPR: No detections could be cached for session_id {session_id_str}") logger.warning(f" Display cameras: {display_cameras}") logger.warning(f" Available cached detections: {list(cached_detections.keys())}") # Clear waiting state for cameras associated with this display with streams_lock: affected_cameras = [] for camera_id, stream in streams.items(): if stream["subscriptionIdentifier"].startswith(display_identifier + ";"): affected_cameras.append(camera_id) from siwatsystem.pympta import get_camera_stability_data model_id = stream.get("modelId", "unknown") stability_data = get_camera_stability_data(camera_id, model_id) session_state = stability_data["session_state"] if session_state.get("waiting_for_backend_session", False): session_state["waiting_for_backend_session"] = False session_state["wait_start_time"] = 0.0 logger.info(f"๐Ÿš€ PIPELINE UNBLOCKED: Backend sessionId {session_id} received - camera {camera_id} can proceed with database operations") logger.debug(f"๐Ÿ“‹ Camera {camera_id}: SessionId {session_id} now available for future database operations") logger.debug(f"Updated session state for {len(affected_cameras)} cameras: {affected_cameras}") else: logger.warning(f"๐Ÿšจ Invalid setSessionId message: missing displayIdentifier in payload") elif msg_type == "patchSession": session_id = data.get("sessionId") patch_data = data.get("data", {}) # For now, just acknowledge the patch - actual implementation depends on backend requirements response = { "type": "patchSessionResult", "payload": { "sessionId": session_id, "success": True, "message": "Session patch acknowledged" } } ws_logger.info(f"TX -> {json.dumps(response, separators=(',', ':'))}") await websocket.send_json(response) logger.info(f"Acknowledged patch for session {session_id}") else: logger.error(f"Unknown message type: {msg_type}") except json.JSONDecodeError: logger.error("Received invalid JSON message") except (WebSocketDisconnect, ConnectionClosedError) as e: logger.warning(f"WebSocket disconnected: {e}") break except Exception as e: logger.error(f"Error handling message: {e}") break try: await websocket.accept() stream_task = asyncio.create_task(process_streams()) heartbeat_task = asyncio.create_task(send_heartbeat()) message_task = asyncio.create_task(on_message()) await asyncio.gather(heartbeat_task, message_task) except Exception as e: logger.error(f"Error in detect websocket: {e}") finally: stream_task.cancel() await stream_task with streams_lock: # Clean up shared camera streams for camera_url, shared_stream in camera_streams.items(): shared_stream["stop_event"].set() shared_stream["thread"].join() if "cap" in shared_stream: shared_stream["cap"].release() while not shared_stream["buffer"].empty(): try: shared_stream["buffer"].get_nowait() except queue.Empty: pass logger.info(f"Released shared camera stream for {camera_url}") streams.clear() camera_streams.clear() subscription_to_camera.clear() with models_lock: models.clear() latest_frames.clear() cached_detections.clear() frame_skip_flags.clear() camera_states.clear() cached_full_pipeline_results.clear() session_pipeline_states.clear() session_ids.clear() # Clean up LPR integration caches session_detections.clear() session_to_camera.clear() detection_timestamps.clear() logger.info("WebSocket connection closed")