from fastapi import FastAPI, WebSocket from ultralytics import YOLO import torch import cv2 import base64 import numpy as np import json import logging app = FastAPI() model = YOLO("yolov8n.pt") if torch.cuda.is_available(): model.to('cuda') with open("config.json", "r") as f: config = json.load(f) poll_interval = config.get("poll_interval_ms", 100) TARGET_FPS = config.get("target_fps", 10) # Add TARGET_FPS poll_interval = 1000 / TARGET_FPS # Adjust poll_interval based on TARGET_FPS logging.info(f"Poll interval: {poll_interval}ms") max_streams = config.get("max_streams", 5) # Configure logging logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", handlers=[ logging.FileHandler("app.log"), logging.StreamHandler() ] ) @app.websocket("/") async def detect(websocket: WebSocket): import asyncio import time logging.info("WebSocket connection accepted") # Move streams inside the detect function streams = {} async def process_streams(): logging.info("Started processing streams") while True: start_time = time.time() # Round-robin processing for camera_id, cap in list(streams.items()): logging.debug(f"Processing camera: {camera_id}") ret, frame = cap.read() if not ret: logging.warning(f"Failed to read frame from camera: {camera_id}") continue results = model(frame, stream=False) boxes = [] for r in results: for box in r.boxes: boxes.append({ "class": int(box.cls[0]), "confidence": float(box.conf[0]), }) # Broadcast to all subscribers of this URL detection_data = { "type": "imageDetection", "cameraIdentifier": camera_id, "timestamp": time.time(), "data": { "detections": boxes } } logging.debug(f"Sending detection data for camera {camera_id}: {detection_data}") await websocket.send_json(detection_data) elapsed_time = (time.time() - start_time) * 1000 # in ms sleep_time = max(poll_interval - elapsed_time, 0) logging.debug(f"Elapsed time: {elapsed_time}ms, sleeping for: {sleep_time}ms") await asyncio.sleep(sleep_time / 1000.0) await websocket.accept() task = asyncio.create_task(process_streams()) try: while True: msg = await websocket.receive_text() logging.debug(f"Received message: {msg}") data = json.loads(msg) camera_id = data.get("cameraIdentifier") rtsp_url = data.get("rtspUrl") if camera_id and rtsp_url: if camera_id not in streams and len(streams) < max_streams: streams[camera_id] = cv2.VideoCapture(rtsp_url) logging.info(f"Subscribed to camera {camera_id} with URL {rtsp_url}") elif camera_id and camera_id in streams: cap = streams.pop(camera_id) cap.release() logging.info(f"Unsubscribed from camera {camera_id}") elif data.get("command") == "stop": logging.info("Received stop command") break except Exception as e: logging.error(f"Error in WebSocket connection: {e}") finally: task.cancel() for camera_id, cap in streams.items(): cap.release() logging.info(f"Released camera {camera_id}") streams.clear() logging.info("WebSocket connection closed") await websocket.close()