From d5e0ebbe9d802ce8bb03d30dc1ca05cc70ecc8eb Mon Sep 17 00:00:00 2001 From: Siwat Sirichai Date: Fri, 10 Jan 2025 12:58:09 +0700 Subject: [PATCH] optimize latency --- app.py | 82 +++++++++++++++++++++++++++++++++++----------------------- 1 file changed, 50 insertions(+), 32 deletions(-) diff --git a/app.py b/app.py index 51f4e75..4c77b43 100644 --- a/app.py +++ b/app.py @@ -6,6 +6,8 @@ import base64 import numpy as np import json import logging +import threading +import queue app = FastAPI() @@ -43,38 +45,49 @@ async def detect(websocket: WebSocket): logging.info("WebSocket connection accepted") # Move streams inside the detect function - streams = {} + streams = {} # Modify to store {camera_id: {'cap': VideoCapture, 'buffer': Queue, 'thread': Thread}} + + def frame_reader(camera_id, cap, buffer): + while True: + ret, frame = cap.read() + if not ret: + logging.warning(f"Failed to read frame from camera: {camera_id}") + break + if not buffer.empty(): + try: + buffer.get_nowait() # Discard the old frame + except queue.Empty: + pass + buffer.put(frame) async def process_streams(): logging.info("Started processing streams") while True: start_time = time.time() # Round-robin processing - for camera_id, cap in list(streams.items()): - logging.debug(f"Processing camera: {camera_id}") - ret, frame = cap.read() - if not ret: - logging.warning(f"Failed to read frame from camera: {camera_id}") - continue - results = model(frame, stream=False) - boxes = [] - for r in results: - for box in r.boxes: - boxes.append({ - "class": class_names[int(box.cls[0])], # Use class name - "confidence": float(box.conf[0]), - }) - # Broadcast to all subscribers of this URL - detection_data = { - "type": "imageDetection", - "cameraIdentifier": camera_id, - "timestamp": time.time(), - "data": { - "detections": boxes + for camera_id, stream in list(streams.items()): + buffer = stream['buffer'] + if not buffer.empty(): + frame = buffer.get() + results = model(frame, stream=False) + boxes = [] + for r in results: + for box in r.boxes: + boxes.append({ + "class": class_names[int(box.cls[0])], # Use class name + "confidence": float(box.conf[0]), + }) + # Broadcast to all subscribers of this URL + detection_data = { + "type": "imageDetection", + "cameraIdentifier": camera_id, + "timestamp": time.time(), + "data": { + "detections": boxes + } } - } - logging.debug(f"Sending detection data for camera {camera_id}: {detection_data}") - await websocket.send_json(detection_data) + logging.debug(f"Sending detection data for camera {camera_id}: {detection_data}") + await websocket.send_json(detection_data) elapsed_time = (time.time() - start_time) * 1000 # in ms sleep_time = max(poll_interval - elapsed_time, 0) logging.debug(f"Elapsed time: {elapsed_time}ms, sleeping for: {sleep_time}ms") @@ -93,12 +106,17 @@ async def detect(websocket: WebSocket): if camera_id and rtsp_url: if camera_id not in streams and len(streams) < max_streams: - streams[camera_id] = cv2.VideoCapture(rtsp_url) + cap = cv2.VideoCapture(rtsp_url) + buffer = queue.Queue(maxsize=1) + thread = threading.Thread(target=frame_reader, args=(camera_id, cap, buffer)) + thread.daemon = True + thread.start() + streams[camera_id] = {'cap': cap, 'buffer': buffer, 'thread': thread} logging.info(f"Subscribed to camera {camera_id} with URL {rtsp_url}") - elif camera_id and camera_id in streams: - cap = streams.pop(camera_id) - cap.release() - logging.info(f"Unsubscribed from camera {camera_id}") + elif camera_id and camera_id in streams: + stream = streams.pop(camera_id) + stream['cap'].release() + logging.info(f"Unsubscribed from camera {camera_id}") elif data.get("command") == "stop": logging.info("Received stop command") break @@ -106,8 +124,8 @@ async def detect(websocket: WebSocket): logging.error(f"Error in WebSocket connection: {e}") finally: task.cancel() - for camera_id, cap in streams.items(): - cap.release() + for camera_id, stream in streams.items(): + stream['cap'].release() logging.info(f"Released camera {camera_id}") streams.clear() logging.info("WebSocket connection closed")