from typing import Any, Dict import os import json import time import queue import torch import cv2 import numpy as np import base64 import logging import threading import requests import asyncio import psutil import zipfile from urllib.parse import urlparse from fastapi import FastAPI, WebSocket, HTTPException from fastapi.websockets import WebSocketDisconnect from fastapi.responses import Response from websockets.exceptions import ConnectionClosedError from ultralytics import YOLO # Import shared pipeline functions from siwatsystem.pympta import load_pipeline_from_zip, run_pipeline app = FastAPI() # Global dictionaries to keep track of models and streams # "models" now holds a nested dict: { camera_id: { modelId: model_tree } } models: Dict[str, Dict[str, Any]] = {} streams: Dict[str, Dict[str, Any]] = {} with open("config.json", "r") as f: config = json.load(f) poll_interval = config.get("poll_interval_ms", 100) reconnect_interval = config.get("reconnect_interval_sec", 5) TARGET_FPS = config.get("target_fps", 10) poll_interval = 1000 / TARGET_FPS logging.info(f"Poll interval: {poll_interval}ms") max_streams = config.get("max_streams", 5) max_retries = config.get("max_retries", 3) # Configure logging logging.basicConfig( level=logging.INFO, # Set to INFO level for less verbose output format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", handlers=[ logging.FileHandler("detector_worker.log"), # Write logs to a file logging.StreamHandler() # Also output to console ] ) # Create a logger specifically for this application logger = logging.getLogger("detector_worker") logger.setLevel(logging.DEBUG) # Set app-specific logger to DEBUG level # Ensure all other libraries (including root) use at least INFO level logging.getLogger().setLevel(logging.INFO) logger.info("Starting detector worker application") logger.info(f"Configuration: Target FPS: {TARGET_FPS}, Max streams: {max_streams}, Max retries: {max_retries}") # Ensure the models directory exists os.makedirs("models", exist_ok=True) logger.info("Ensured models directory exists") # Constants for heartbeat and timeouts HEARTBEAT_INTERVAL = 2 # seconds WORKER_TIMEOUT_MS = 10000 logger.debug(f"Heartbeat interval set to {HEARTBEAT_INTERVAL} seconds") # Locks for thread-safe operations streams_lock = threading.Lock() models_lock = threading.Lock() logger.debug("Initialized thread locks") # Add helper to download mpta ZIP file from a remote URL def download_mpta(url: str, dest_path: str) -> str: try: logger.info(f"Starting download of model from {url} to {dest_path}") os.makedirs(os.path.dirname(dest_path), exist_ok=True) response = requests.get(url, stream=True) if response.status_code == 200: file_size = int(response.headers.get('content-length', 0)) logger.info(f"Model file size: {file_size/1024/1024:.2f} MB") downloaded = 0 with open(dest_path, "wb") as f: for chunk in response.iter_content(chunk_size=8192): f.write(chunk) downloaded += len(chunk) if file_size > 0 and downloaded % (file_size // 10) < 8192: # Log approximately every 10% logger.debug(f"Download progress: {downloaded/file_size*100:.1f}%") logger.info(f"Successfully downloaded mpta file from {url} to {dest_path}") return dest_path else: logger.error(f"Failed to download mpta file (status code {response.status_code}): {response.text}") return None except Exception as e: logger.error(f"Exception downloading mpta file from {url}: {str(e)}", exc_info=True) return None # Add helper to fetch snapshot image from HTTP/HTTPS URL def fetch_snapshot(url: str): try: response = requests.get(url, timeout=10) if response.status_code == 200: # Convert response content to numpy array nparr = np.frombuffer(response.content, np.uint8) # Decode image frame = cv2.imdecode(nparr, cv2.IMREAD_COLOR) if frame is not None: logger.debug(f"Successfully fetched snapshot from {url}, shape: {frame.shape}") return frame else: logger.error(f"Failed to decode image from snapshot URL: {url}") return None else: logger.error(f"Failed to fetch snapshot (status code {response.status_code}): {url}") return None except Exception as e: logger.error(f"Exception fetching snapshot from {url}: {str(e)}") return None # Helper to get crop coordinates from stream def get_crop_coords(stream): return { "cropX1": stream.get("cropX1"), "cropY1": stream.get("cropY1"), "cropX2": stream.get("cropX2"), "cropY2": stream.get("cropY2") } #################################################### # REST API endpoint for image retrieval #################################################### @app.get("/camera/{camera_id}/image") async def get_camera_image(camera_id: str): """ Get the current frame from a camera as JPEG image """ try: with streams_lock: if camera_id not in streams: logger.warning(f"Camera ID '{camera_id}' not found in streams. Current streams: {list(streams.keys())}") raise HTTPException(status_code=404, detail=f"Camera {camera_id} not found or not active") stream = streams[camera_id] buffer = stream["buffer"] logger.debug(f"Camera '{camera_id}' buffer size: {buffer.qsize()}, buffer empty: {buffer.empty()}") logger.debug(f"Buffer queue contents: {getattr(buffer, 'queue', None)}") if buffer.empty(): logger.warning(f"No frame available for camera '{camera_id}'. Buffer is empty.") raise HTTPException(status_code=404, detail=f"No frame available for camera {camera_id}") # Get the latest frame (non-blocking) try: frame = buffer.queue[-1] # Get the most recent frame without removing it except IndexError: logger.warning(f"Buffer queue is empty for camera '{camera_id}' when trying to access last frame.") raise HTTPException(status_code=404, detail=f"No frame available for camera {camera_id}") # Encode frame as JPEG success, buffer_img = cv2.imencode('.jpg', frame, [cv2.IMWRITE_JPEG_QUALITY, 85]) if not success: raise HTTPException(status_code=500, detail="Failed to encode image as JPEG") # Return image as binary response return Response(content=buffer_img.tobytes(), media_type="image/jpeg") except HTTPException: raise except Exception as e: logger.error(f"Error retrieving image for camera {camera_id}: {str(e)}", exc_info=True) raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") #################################################### # Detection and frame processing functions #################################################### @app.websocket("/") async def detect(websocket: WebSocket): logger.info("WebSocket connection accepted") persistent_data_dict = {} async def handle_detection(camera_id, stream, frame, websocket, model_tree, persistent_data): try: logger.debug(f"Processing frame for camera {camera_id} with model {stream['modelId']}") start_time = time.time() detection_result = run_pipeline(frame, model_tree) process_time = (time.time() - start_time) * 1000 logger.debug(f"Detection for camera {camera_id} completed in {process_time:.2f}ms") # Log the raw detection result for debugging logger.debug(f"Raw detection result for camera {camera_id}:\n{json.dumps(detection_result, indent=2, default=str)}") # Direct class result (no detections/classifications structure) if detection_result and isinstance(detection_result, dict) and "class" in detection_result and "confidence" in detection_result: highest_confidence_detection = { "class": detection_result.get("class", "none"), "confidence": detection_result.get("confidence", 1.0), "box": [0, 0, 0, 0] # Empty bounding box for classifications } # Handle case when no detections found or result is empty elif not detection_result or not detection_result.get("detections"): # Check if we have classification results if detection_result and detection_result.get("classifications"): # Get the highest confidence classification classifications = detection_result.get("classifications", []) highest_confidence_class = max(classifications, key=lambda x: x.get("confidence", 0)) if classifications else None if highest_confidence_class: highest_confidence_detection = { "class": highest_confidence_class.get("class", "none"), "confidence": highest_confidence_class.get("confidence", 1.0), "box": [0, 0, 0, 0] # Empty bounding box for classifications } else: highest_confidence_detection = { "class": "none", "confidence": 1.0, "box": [0, 0, 0, 0] } else: highest_confidence_detection = { "class": "none", "confidence": 1.0, "box": [0, 0, 0, 0] } else: # Find detection with highest confidence detections = detection_result.get("detections", []) highest_confidence_detection = max(detections, key=lambda x: x.get("confidence", 0)) if detections else { "class": "none", "confidence": 1.0, "box": [0, 0, 0, 0] } detection_data = { "type": "imageDetection", "subscriptionIdentifier": stream["subscriptionIdentifier"], "timestamp": time.strftime("%Y-%m-%dT%H:%M:%S.%fZ", time.gmtime()), "data": { "detection": highest_confidence_detection, # Send only the highest confidence detection "modelId": stream["modelId"], "modelName": stream["modelName"] } } if highest_confidence_detection["class"] != "none": logger.info(f"Camera {camera_id}: Detected {highest_confidence_detection['class']} with confidence {highest_confidence_detection['confidence']:.2f} using model {stream['modelName']}") await websocket.send_json(detection_data) logger.debug(f"Sent detection data to client for camera {camera_id}:\n{json.dumps(detection_data, indent=2)}") return persistent_data except Exception as e: logger.error(f"Error in handle_detection for camera {camera_id}: {str(e)}", exc_info=True) return persistent_data def frame_reader(camera_id, cap, buffer, stop_event): retries = 0 logger.info(f"Starting frame reader thread for camera {camera_id}") frame_count = 0 last_log_time = time.time() try: # Log initial camera status and properties if cap.isOpened(): width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) fps = cap.get(cv2.CAP_PROP_FPS) logger.info(f"Camera {camera_id} opened successfully with resolution {width}x{height}, FPS: {fps}") else: logger.error(f"Camera {camera_id} failed to open initially") while not stop_event.is_set(): try: if not cap.isOpened(): logger.error(f"Camera {camera_id} is not open before trying to read") # Attempt to reopen cap = cv2.VideoCapture(streams[camera_id]["rtsp_url"]) time.sleep(reconnect_interval) continue logger.debug(f"Attempting to read frame from camera {camera_id}") ret, frame = cap.read() if not ret: logger.warning(f"Connection lost for camera: {camera_id}, retry {retries+1}/{max_retries}") cap.release() time.sleep(reconnect_interval) retries += 1 if retries > max_retries and max_retries != -1: logger.error(f"Max retries reached for camera: {camera_id}, stopping frame reader") break # Re-open logger.info(f"Attempting to reopen RTSP stream for camera: {camera_id}") cap = cv2.VideoCapture(streams[camera_id]["rtsp_url"]) if not cap.isOpened(): logger.error(f"Failed to reopen RTSP stream for camera: {camera_id}") continue logger.info(f"Successfully reopened RTSP stream for camera: {camera_id}") continue # Successfully read a frame frame_count += 1 current_time = time.time() # Log frame stats every 5 seconds if current_time - last_log_time > 5: logger.info(f"Camera {camera_id}: Read {frame_count} frames in the last {current_time - last_log_time:.1f} seconds") frame_count = 0 last_log_time = current_time logger.debug(f"Successfully read frame from camera {camera_id}, shape: {frame.shape}") retries = 0 # Overwrite old frame if buffer is full if not buffer.empty(): try: buffer.get_nowait() logger.debug(f"[frame_reader] Removed old frame from buffer for camera {camera_id}") except queue.Empty: pass buffer.put(frame) logger.debug(f"[frame_reader] Added new frame to buffer for camera {camera_id}. Buffer size: {buffer.qsize()}") # Short sleep to avoid CPU overuse time.sleep(0.01) except cv2.error as e: logger.error(f"OpenCV error for camera {camera_id}: {e}", exc_info=True) cap.release() time.sleep(reconnect_interval) retries += 1 if retries > max_retries and max_retries != -1: logger.error(f"Max retries reached after OpenCV error for camera {camera_id}") break logger.info(f"Attempting to reopen RTSP stream after OpenCV error for camera: {camera_id}") cap = cv2.VideoCapture(streams[camera_id]["rtsp_url"]) if not cap.isOpened(): logger.error(f"Failed to reopen RTSP stream for camera {camera_id} after OpenCV error") continue logger.info(f"Successfully reopened RTSP stream after OpenCV error for camera: {camera_id}") except Exception as e: logger.error(f"Unexpected error for camera {camera_id}: {str(e)}", exc_info=True) cap.release() break except Exception as e: logger.error(f"Error in frame_reader thread for camera {camera_id}: {str(e)}", exc_info=True) finally: logger.info(f"Frame reader thread for camera {camera_id} is exiting") if cap and cap.isOpened(): cap.release() def snapshot_reader(camera_id, snapshot_url, snapshot_interval, buffer, stop_event): """Frame reader that fetches snapshots from HTTP/HTTPS URL at specified intervals""" retries = 0 logger.info(f"Starting snapshot reader thread for camera {camera_id} from {snapshot_url}") frame_count = 0 last_log_time = time.time() try: interval_seconds = snapshot_interval / 1000.0 # Convert milliseconds to seconds logger.info(f"Snapshot interval for camera {camera_id}: {interval_seconds}s") while not stop_event.is_set(): try: start_time = time.time() frame = fetch_snapshot(snapshot_url) if frame is None: logger.warning(f"Failed to fetch snapshot for camera: {camera_id}, retry {retries+1}/{max_retries}") retries += 1 if retries > max_retries and max_retries != -1: logger.error(f"Max retries reached for snapshot camera: {camera_id}, stopping reader") break time.sleep(min(interval_seconds, reconnect_interval)) continue # Successfully fetched a frame frame_count += 1 current_time = time.time() # Log frame stats every 5 seconds if current_time - last_log_time > 5: logger.info(f"Camera {camera_id}: Fetched {frame_count} snapshots in the last {current_time - last_log_time:.1f} seconds") frame_count = 0 last_log_time = current_time logger.debug(f"Successfully fetched snapshot from camera {camera_id}, shape: {frame.shape}") retries = 0 # Overwrite old frame if buffer is full if not buffer.empty(): try: buffer.get_nowait() logger.debug(f"[snapshot_reader] Removed old snapshot from buffer for camera {camera_id}") except queue.Empty: pass buffer.put(frame) logger.debug(f"[snapshot_reader] Added new snapshot to buffer for camera {camera_id}. Buffer size: {buffer.qsize()}") # Wait for the specified interval elapsed = time.time() - start_time sleep_time = max(interval_seconds - elapsed, 0) if sleep_time > 0: time.sleep(sleep_time) except Exception as e: logger.error(f"Unexpected error fetching snapshot for camera {camera_id}: {str(e)}", exc_info=True) retries += 1 if retries > max_retries and max_retries != -1: logger.error(f"Max retries reached after error for snapshot camera {camera_id}") break time.sleep(min(interval_seconds, reconnect_interval)) except Exception as e: logger.error(f"Error in snapshot_reader thread for camera {camera_id}: {str(e)}", exc_info=True) finally: logger.info(f"Snapshot reader thread for camera {camera_id} is exiting") async def process_streams(): logger.info("Started processing streams") try: while True: start_time = time.time() with streams_lock: current_streams = list(streams.items()) if current_streams: logger.debug(f"Processing {len(current_streams)} active streams") else: logger.debug("No active streams to process") for camera_id, stream in current_streams: buffer = stream["buffer"] if buffer.empty(): logger.debug(f"Frame buffer is empty for camera {camera_id}") continue logger.debug(f"Got frame from buffer for camera {camera_id}") frame = buffer.get() with models_lock: model_tree = models.get(camera_id, {}).get(stream["modelId"]) if not model_tree: logger.warning(f"Model not found for camera {camera_id}, modelId {stream['modelId']}") continue logger.debug(f"Found model tree for camera {camera_id}, modelId {stream['modelId']}") key = (camera_id, stream["modelId"]) persistent_data = persistent_data_dict.get(key, {}) logger.debug(f"Starting detection for camera {camera_id} with modelId {stream['modelId']}") updated_persistent_data = await handle_detection( camera_id, stream, frame, websocket, model_tree, persistent_data ) persistent_data_dict[key] = updated_persistent_data elapsed_time = (time.time() - start_time) * 1000 # ms sleep_time = max(poll_interval - elapsed_time, 0) logger.debug(f"Frame processing cycle: {elapsed_time:.2f}ms, sleeping for: {sleep_time:.2f}ms") await asyncio.sleep(sleep_time / 1000.0) except asyncio.CancelledError: logger.info("Stream processing task cancelled") except Exception as e: logger.error(f"Error in process_streams: {str(e)}", exc_info=True) async def send_heartbeat(): while True: try: cpu_usage = psutil.cpu_percent() memory_usage = psutil.virtual_memory().percent if torch.cuda.is_available(): gpu_usage = torch.cuda.utilization() if hasattr(torch.cuda, 'utilization') else None gpu_memory_usage = torch.cuda.memory_reserved() / (1024 ** 2) else: gpu_usage = None gpu_memory_usage = None camera_connections = [ { "subscriptionIdentifier": stream["subscriptionIdentifier"], "modelId": stream["modelId"], "modelName": stream["modelName"], "online": True, **{k: v for k, v in get_crop_coords(stream).items() if v is not None} } for camera_id, stream in streams.items() ] state_report = { "type": "stateReport", "cpuUsage": cpu_usage, "memoryUsage": memory_usage, "gpuUsage": gpu_usage, "gpuMemoryUsage": gpu_memory_usage, "cameraConnections": camera_connections } await websocket.send_text(json.dumps(state_report)) logger.debug(f"Sent stateReport as heartbeat: CPU {cpu_usage:.1f}%, Memory {memory_usage:.1f}%, {len(camera_connections)} active cameras") await asyncio.sleep(HEARTBEAT_INTERVAL) except Exception as e: logger.error(f"Error sending stateReport heartbeat: {e}") break async def on_message(): while True: try: msg = await websocket.receive_text() logger.debug(f"Received message: {msg}") data = json.loads(msg) msg_type = data.get("type") if msg_type == "subscribe": payload = data.get("payload", {}) subscriptionIdentifier = payload.get("subscriptionIdentifier") rtsp_url = payload.get("rtspUrl") snapshot_url = payload.get("snapshotUrl") snapshot_interval = payload.get("snapshotInterval") model_url = payload.get("modelUrl") modelId = payload.get("modelId") modelName = payload.get("modelName") cropX1 = payload.get("cropX1") cropY1 = payload.get("cropY1") cropX2 = payload.get("cropX2") cropY2 = payload.get("cropY2") camera_id = subscriptionIdentifier # Use subscriptionIdentifier as camera_id for mapping if model_url: with models_lock: if (camera_id not in models) or (modelId not in models[camera_id]): logger.info(f"Loading model from {model_url} for camera {camera_id}, modelId {modelId}") extraction_dir = os.path.join("models", camera_id, str(modelId)) os.makedirs(extraction_dir, exist_ok=True) # If model_url is remote, download it first. parsed = urlparse(model_url) if parsed.scheme in ("http", "https"): logger.info(f"Downloading remote model from {model_url}") local_mpta = os.path.join(extraction_dir, os.path.basename(parsed.path)) logger.debug(f"Download destination: {local_mpta}") local_path = download_mpta(model_url, local_mpta) if not local_path: logger.error(f"Failed to download the remote mpta file from {model_url}") error_response = { "type": "error", "cameraIdentifier": camera_id, "error": f"Failed to download model from {model_url}" } await websocket.send_json(error_response) continue model_tree = load_pipeline_from_zip(local_path, extraction_dir) else: logger.info(f"Loading local model from {model_url}") # Check if file exists before attempting to load if not os.path.exists(model_url): logger.error(f"Local model file not found: {model_url}") logger.debug(f"Current working directory: {os.getcwd()}") error_response = { "type": "error", "cameraIdentifier": camera_id, "error": f"Model file not found: {model_url}" } await websocket.send_json(error_response) continue model_tree = load_pipeline_from_zip(model_url, extraction_dir) if model_tree is None: logger.error(f"Failed to load model {modelId} from mpta file for camera {camera_id}") error_response = { "type": "error", "cameraIdentifier": camera_id, "error": f"Failed to load model {modelId}" } await websocket.send_json(error_response) continue if camera_id not in models: models[camera_id] = {} models[camera_id][modelId] = model_tree logger.info(f"Successfully loaded model {modelId} for camera {camera_id}") success_response = { "type": "modelLoaded", "cameraIdentifier": camera_id, "modelId": modelId } await websocket.send_json(success_response) if camera_id and (rtsp_url or snapshot_url): with streams_lock: if camera_id not in streams and len(streams) < max_streams: buffer = queue.Queue(maxsize=1) stop_event = threading.Event() stream_info = { "buffer": buffer, "thread": None, "stop_event": stop_event, "modelId": modelId, "modelName": modelName, "subscriptionIdentifier": subscriptionIdentifier, "cropX1": cropX1, "cropY1": cropY1, "cropX2": cropX2, "cropY2": cropY2 } if snapshot_url and snapshot_interval: logger.info(f"Using snapshot mode for camera {camera_id}: {snapshot_url}") thread = threading.Thread(target=snapshot_reader, args=(camera_id, snapshot_url, snapshot_interval, buffer, stop_event)) thread.daemon = True thread.start() stream_info.update({ "snapshot_url": snapshot_url, "snapshot_interval": snapshot_interval, "mode": "snapshot" }) stream_info["thread"] = thread streams[camera_id] = stream_info elif rtsp_url: logger.info(f"Using RTSP mode for camera {camera_id}: {rtsp_url}") cap = cv2.VideoCapture(rtsp_url) if not cap.isOpened(): logger.error(f"Failed to open RTSP stream for camera {camera_id}") continue thread = threading.Thread(target=frame_reader, args=(camera_id, cap, buffer, stop_event)) thread.daemon = True thread.start() stream_info.update({ "cap": cap, "rtsp_url": rtsp_url, "mode": "rtsp" }) stream_info["thread"] = thread streams[camera_id] = stream_info else: logger.error(f"No valid URL provided for camera {camera_id}") continue elif camera_id and camera_id in streams: # If already subscribed, unsubscribe first stream = streams.pop(camera_id) stream["stop_event"].set() stream["thread"].join() if "cap" in stream: stream["cap"].release() logger.info(f"Unsubscribed from camera {camera_id} for resubscription") with models_lock: if camera_id in models and modelId in models[camera_id]: del models[camera_id][modelId] if not models[camera_id]: del models[camera_id] elif msg_type == "unsubscribe": payload = data.get("payload", {}) subscriptionIdentifier = payload.get("subscriptionIdentifier") camera_id = subscriptionIdentifier with streams_lock: if camera_id and camera_id in streams: stream = streams.pop(camera_id) stream["stop_event"].set() stream["thread"].join() if "cap" in stream: stream["cap"].release() with models_lock: if camera_id in models: del models[camera_id] elif msg_type == "requestState": cpu_usage = psutil.cpu_percent() memory_usage = psutil.virtual_memory().percent if torch.cuda.is_available(): gpu_usage = torch.cuda.utilization() if hasattr(torch.cuda, 'utilization') else None gpu_memory_usage = torch.cuda.memory_reserved() / (1024 ** 2) else: gpu_usage = None gpu_memory_usage = None camera_connections = [ { "subscriptionIdentifier": stream["subscriptionIdentifier"], "modelId": stream["modelId"], "modelName": stream["modelName"], "online": True, **{k: v for k, v in get_crop_coords(stream).items() if v is not None} } for camera_id, stream in streams.items() ] state_report = { "type": "stateReport", "cpuUsage": cpu_usage, "memoryUsage": memory_usage, "gpuUsage": gpu_usage, "gpuMemoryUsage": gpu_memory_usage, "cameraConnections": camera_connections } await websocket.send_text(json.dumps(state_report)) else: logger.error(f"Unknown message type: {msg_type}") except json.JSONDecodeError: logger.error("Received invalid JSON message") except (WebSocketDisconnect, ConnectionClosedError) as e: logger.warning(f"WebSocket disconnected: {e}") break except Exception as e: logger.error(f"Error handling message: {e}") break try: await websocket.accept() stream_task = asyncio.create_task(process_streams()) heartbeat_task = asyncio.create_task(send_heartbeat()) message_task = asyncio.create_task(on_message()) await asyncio.gather(heartbeat_task, message_task) except Exception as e: logger.error(f"Error in detect websocket: {e}") finally: stream_task.cancel() await stream_task with streams_lock: for camera_id, stream in streams.items(): stream["stop_event"].set() stream["thread"].join() # Only release cap if it exists (RTSP mode) if "cap" in stream: stream["cap"].release() while not stream["buffer"].empty(): try: stream["buffer"].get_nowait() except queue.Empty: pass logger.info(f"Released camera {camera_id} and cleaned up resources") streams.clear() with models_lock: models.clear() logger.info("WebSocket connection closed")