From af26c1477c87b8aba178a05ef33c433eba0f3d2c Mon Sep 17 00:00:00 2001 From: Siwat Sirichai Date: Sat, 11 Jan 2025 01:29:07 +0700 Subject: [PATCH] gracefully stop WS connection --- app.py | 192 ++++++++++++++++++++++++++++++++++------------------ config.json | 4 +- 2 files changed, 130 insertions(+), 66 deletions(-) diff --git a/app.py b/app.py index 4c77b43..1d5c515 100644 --- a/app.py +++ b/app.py @@ -1,4 +1,6 @@ from fastapi import FastAPI, WebSocket +from fastapi.websockets import WebSocketDisconnect +from websockets.exceptions import ConnectionClosedError from ultralytics import YOLO import torch import cv2 @@ -22,10 +24,12 @@ with open("config.json", "r") as f: config = json.load(f) poll_interval = config.get("poll_interval_ms", 100) +reconnect_interval = config.get("reconnect_interval_sec", 5) # New setting TARGET_FPS = config.get("target_fps", 10) # Add TARGET_FPS poll_interval = 1000 / TARGET_FPS # Adjust poll_interval based on TARGET_FPS logging.info(f"Poll interval: {poll_interval}ms") max_streams = config.get("max_streams", 5) +max_retries = config.get("max_retries", 3) # Configure logging logging.basicConfig( @@ -44,89 +48,147 @@ async def detect(websocket: WebSocket): logging.info("WebSocket connection accepted") - # Move streams inside the detect function - streams = {} # Modify to store {camera_id: {'cap': VideoCapture, 'buffer': Queue, 'thread': Thread}} + streams = {} - def frame_reader(camera_id, cap, buffer): - while True: - ret, frame = cap.read() - if not ret: - logging.warning(f"Failed to read frame from camera: {camera_id}") + def frame_reader(camera_id, cap, buffer, stop_event): + import time + retries = 0 + while not stop_event.is_set(): + try: + ret, frame = cap.read() + if not ret: + logging.warning(f"Connection lost for camera: {camera_id}, retry {retries+1}/{max_retries}") + cap.release() + time.sleep(reconnect_interval) + retries += 1 + if retries > max_retries: + logging.error(f"Max retries reached for camera: {camera_id}") + break + # Re-open the VideoCapture + cap = cv2.VideoCapture(streams[camera_id]['rtsp_url']) + if not cap.isOpened(): + logging.error(f"Failed to reopen RTSP stream for camera: {camera_id}") + continue + continue + retries = 0 # Reset on success + if not buffer.empty(): + try: + buffer.get_nowait() # Discard the old frame + except queue.Empty: + pass + buffer.put(frame) + except cv2.error as e: + logging.error(f"OpenCV error for camera {camera_id}: {e}") + cap.release() + time.sleep(reconnect_interval) + retries += 1 + if retries > max_retries: + logging.error(f"Max retries reached after OpenCV error for camera: {camera_id}") + break + # Re-open the VideoCapture + cap = cv2.VideoCapture(streams[camera_id]['rtsp_url']) + if not cap.isOpened(): + logging.error(f"Failed to reopen RTSP stream for camera {camera_id} after OpenCV error") + continue + except Exception as e: + logging.error(f"Unexpected error for camera {camera_id}: {e}") + cap.release() break - if not buffer.empty(): - try: - buffer.get_nowait() # Discard the old frame - except queue.Empty: - pass - buffer.put(frame) async def process_streams(): logging.info("Started processing streams") - while True: - start_time = time.time() - # Round-robin processing - for camera_id, stream in list(streams.items()): - buffer = stream['buffer'] - if not buffer.empty(): - frame = buffer.get() - results = model(frame, stream=False) - boxes = [] - for r in results: - for box in r.boxes: - boxes.append({ - "class": class_names[int(box.cls[0])], # Use class name - "confidence": float(box.conf[0]), - }) - # Broadcast to all subscribers of this URL - detection_data = { - "type": "imageDetection", - "cameraIdentifier": camera_id, - "timestamp": time.time(), - "data": { - "detections": boxes + try: + while True: + start_time = time.time() + # Round-robin processing + for camera_id, stream in list(streams.items()): + buffer = stream['buffer'] + if not buffer.empty(): + frame = buffer.get() + results = model(frame, stream=False) + boxes = [] + for r in results: + for box in r.boxes: + boxes.append({ + "class": class_names[int(box.cls[0])], + "confidence": float(box.conf[0]), + }) + # Broadcast to all subscribers of this URL + detection_data = { + "type": "imageDetection", + "cameraIdentifier": camera_id, + "timestamp": time.time(), + "data": { + "detections": boxes + } } - } - logging.debug(f"Sending detection data for camera {camera_id}: {detection_data}") - await websocket.send_json(detection_data) - elapsed_time = (time.time() - start_time) * 1000 # in ms - sleep_time = max(poll_interval - elapsed_time, 0) - logging.debug(f"Elapsed time: {elapsed_time}ms, sleeping for: {sleep_time}ms") - await asyncio.sleep(sleep_time / 1000.0) + logging.debug(f"Sending detection data for camera {camera_id}: {detection_data}") + await websocket.send_json(detection_data) + elapsed_time = (time.time() - start_time) * 1000 # in ms + sleep_time = max(poll_interval - elapsed_time, 0) + logging.debug(f"Elapsed time: {elapsed_time}ms, sleeping for: {sleep_time}ms") + await asyncio.sleep(sleep_time / 1000.0) + except asyncio.CancelledError: + logging.info("Stream processing task cancelled") + except Exception as e: + logging.error(f"Error in process_streams: {e}") await websocket.accept() task = asyncio.create_task(process_streams()) try: while True: - msg = await websocket.receive_text() - logging.debug(f"Received message: {msg}") - data = json.loads(msg) - camera_id = data.get("cameraIdentifier") - rtsp_url = data.get("rtspUrl") + try: + msg = await websocket.receive_text() + logging.debug(f"Received message: {msg}") + data = json.loads(msg) + camera_id = data.get("cameraIdentifier") + rtsp_url = data.get("rtspUrl") - if camera_id and rtsp_url: - if camera_id not in streams and len(streams) < max_streams: - cap = cv2.VideoCapture(rtsp_url) - buffer = queue.Queue(maxsize=1) - thread = threading.Thread(target=frame_reader, args=(camera_id, cap, buffer)) - thread.daemon = True - thread.start() - streams[camera_id] = {'cap': cap, 'buffer': buffer, 'thread': thread} - logging.info(f"Subscribed to camera {camera_id} with URL {rtsp_url}") - elif camera_id and camera_id in streams: - stream = streams.pop(camera_id) - stream['cap'].release() - logging.info(f"Unsubscribed from camera {camera_id}") - elif data.get("command") == "stop": - logging.info("Received stop command") + if camera_id and rtsp_url: + if camera_id not in streams and len(streams) < max_streams: + cap = cv2.VideoCapture(rtsp_url) + if not cap.isOpened(): + logging.error(f"Failed to open RTSP stream for camera {camera_id}") + continue + buffer = queue.Queue(maxsize=1) + stop_event = threading.Event() + thread = threading.Thread(target=frame_reader, args=(camera_id, cap, buffer, stop_event)) + thread.daemon = True + thread.start() + streams[camera_id] = { + 'cap': cap, + 'buffer': buffer, + 'thread': thread, + 'rtsp_url': rtsp_url, + 'stop_event': stop_event + } + logging.info(f"Subscribed to camera {camera_id} with URL {rtsp_url}") + elif camera_id and camera_id in streams: + stream = streams.pop(camera_id) + stream['cap'].release() + logging.info(f"Unsubscribed from camera {camera_id}") + elif data.get("command") == "stop": + logging.info("Received stop command") + break + except json.JSONDecodeError: + logging.error("Received invalid JSON message") + except (WebSocketDisconnect, ConnectionClosedError) as e: + logging.warning(f"WebSocket disconnected: {e}") + break + except Exception as e: + logging.error(f"Error handling message: {e}") break except Exception as e: - logging.error(f"Error in WebSocket connection: {e}") + logging.error(f"Unexpected error in WebSocket connection: {e}") finally: task.cancel() + await task for camera_id, stream in streams.items(): + stream['stop_event'].set() + stream['thread'].join() stream['cap'].release() - logging.info(f"Released camera {camera_id}") + stream['buffer'].queue.clear() + logging.info(f"Released camera {camera_id} and cleaned up resources") streams.clear() logging.info("WebSocket connection closed") - await websocket.close() diff --git a/config.json b/config.json index 1cd0989..b9ffa8f 100644 --- a/config.json +++ b/config.json @@ -1,5 +1,7 @@ { "poll_interval_ms": 100, "max_streams": 5, - "target_fps": 2 + "target_fps": 2, + "reconnect_interval_sec": 5, + "max_retries": 3 }