gracefully stop WS connection

This commit is contained in:
Siwat Sirichai 2025-01-11 01:29:07 +07:00
parent d5e0ebbe9d
commit af26c1477c
2 changed files with 130 additions and 66 deletions

192
app.py
View file

@ -1,4 +1,6 @@
from fastapi import FastAPI, WebSocket
from fastapi.websockets import WebSocketDisconnect
from websockets.exceptions import ConnectionClosedError
from ultralytics import YOLO
import torch
import cv2
@ -22,10 +24,12 @@ with open("config.json", "r") as f:
config = json.load(f)
poll_interval = config.get("poll_interval_ms", 100)
reconnect_interval = config.get("reconnect_interval_sec", 5) # New setting
TARGET_FPS = config.get("target_fps", 10) # Add TARGET_FPS
poll_interval = 1000 / TARGET_FPS # Adjust poll_interval based on TARGET_FPS
logging.info(f"Poll interval: {poll_interval}ms")
max_streams = config.get("max_streams", 5)
max_retries = config.get("max_retries", 3)
# Configure logging
logging.basicConfig(
@ -44,89 +48,147 @@ async def detect(websocket: WebSocket):
logging.info("WebSocket connection accepted")
# Move streams inside the detect function
streams = {} # Modify to store {camera_id: {'cap': VideoCapture, 'buffer': Queue, 'thread': Thread}}
streams = {}
def frame_reader(camera_id, cap, buffer):
while True:
ret, frame = cap.read()
if not ret:
logging.warning(f"Failed to read frame from camera: {camera_id}")
def frame_reader(camera_id, cap, buffer, stop_event):
import time
retries = 0
while not stop_event.is_set():
try:
ret, frame = cap.read()
if not ret:
logging.warning(f"Connection lost for camera: {camera_id}, retry {retries+1}/{max_retries}")
cap.release()
time.sleep(reconnect_interval)
retries += 1
if retries > max_retries:
logging.error(f"Max retries reached for camera: {camera_id}")
break
# Re-open the VideoCapture
cap = cv2.VideoCapture(streams[camera_id]['rtsp_url'])
if not cap.isOpened():
logging.error(f"Failed to reopen RTSP stream for camera: {camera_id}")
continue
continue
retries = 0 # Reset on success
if not buffer.empty():
try:
buffer.get_nowait() # Discard the old frame
except queue.Empty:
pass
buffer.put(frame)
except cv2.error as e:
logging.error(f"OpenCV error for camera {camera_id}: {e}")
cap.release()
time.sleep(reconnect_interval)
retries += 1
if retries > max_retries:
logging.error(f"Max retries reached after OpenCV error for camera: {camera_id}")
break
# Re-open the VideoCapture
cap = cv2.VideoCapture(streams[camera_id]['rtsp_url'])
if not cap.isOpened():
logging.error(f"Failed to reopen RTSP stream for camera {camera_id} after OpenCV error")
continue
except Exception as e:
logging.error(f"Unexpected error for camera {camera_id}: {e}")
cap.release()
break
if not buffer.empty():
try:
buffer.get_nowait() # Discard the old frame
except queue.Empty:
pass
buffer.put(frame)
async def process_streams():
logging.info("Started processing streams")
while True:
start_time = time.time()
# Round-robin processing
for camera_id, stream in list(streams.items()):
buffer = stream['buffer']
if not buffer.empty():
frame = buffer.get()
results = model(frame, stream=False)
boxes = []
for r in results:
for box in r.boxes:
boxes.append({
"class": class_names[int(box.cls[0])], # Use class name
"confidence": float(box.conf[0]),
})
# Broadcast to all subscribers of this URL
detection_data = {
"type": "imageDetection",
"cameraIdentifier": camera_id,
"timestamp": time.time(),
"data": {
"detections": boxes
try:
while True:
start_time = time.time()
# Round-robin processing
for camera_id, stream in list(streams.items()):
buffer = stream['buffer']
if not buffer.empty():
frame = buffer.get()
results = model(frame, stream=False)
boxes = []
for r in results:
for box in r.boxes:
boxes.append({
"class": class_names[int(box.cls[0])],
"confidence": float(box.conf[0]),
})
# Broadcast to all subscribers of this URL
detection_data = {
"type": "imageDetection",
"cameraIdentifier": camera_id,
"timestamp": time.time(),
"data": {
"detections": boxes
}
}
}
logging.debug(f"Sending detection data for camera {camera_id}: {detection_data}")
await websocket.send_json(detection_data)
elapsed_time = (time.time() - start_time) * 1000 # in ms
sleep_time = max(poll_interval - elapsed_time, 0)
logging.debug(f"Elapsed time: {elapsed_time}ms, sleeping for: {sleep_time}ms")
await asyncio.sleep(sleep_time / 1000.0)
logging.debug(f"Sending detection data for camera {camera_id}: {detection_data}")
await websocket.send_json(detection_data)
elapsed_time = (time.time() - start_time) * 1000 # in ms
sleep_time = max(poll_interval - elapsed_time, 0)
logging.debug(f"Elapsed time: {elapsed_time}ms, sleeping for: {sleep_time}ms")
await asyncio.sleep(sleep_time / 1000.0)
except asyncio.CancelledError:
logging.info("Stream processing task cancelled")
except Exception as e:
logging.error(f"Error in process_streams: {e}")
await websocket.accept()
task = asyncio.create_task(process_streams())
try:
while True:
msg = await websocket.receive_text()
logging.debug(f"Received message: {msg}")
data = json.loads(msg)
camera_id = data.get("cameraIdentifier")
rtsp_url = data.get("rtspUrl")
try:
msg = await websocket.receive_text()
logging.debug(f"Received message: {msg}")
data = json.loads(msg)
camera_id = data.get("cameraIdentifier")
rtsp_url = data.get("rtspUrl")
if camera_id and rtsp_url:
if camera_id not in streams and len(streams) < max_streams:
cap = cv2.VideoCapture(rtsp_url)
buffer = queue.Queue(maxsize=1)
thread = threading.Thread(target=frame_reader, args=(camera_id, cap, buffer))
thread.daemon = True
thread.start()
streams[camera_id] = {'cap': cap, 'buffer': buffer, 'thread': thread}
logging.info(f"Subscribed to camera {camera_id} with URL {rtsp_url}")
elif camera_id and camera_id in streams:
stream = streams.pop(camera_id)
stream['cap'].release()
logging.info(f"Unsubscribed from camera {camera_id}")
elif data.get("command") == "stop":
logging.info("Received stop command")
if camera_id and rtsp_url:
if camera_id not in streams and len(streams) < max_streams:
cap = cv2.VideoCapture(rtsp_url)
if not cap.isOpened():
logging.error(f"Failed to open RTSP stream for camera {camera_id}")
continue
buffer = queue.Queue(maxsize=1)
stop_event = threading.Event()
thread = threading.Thread(target=frame_reader, args=(camera_id, cap, buffer, stop_event))
thread.daemon = True
thread.start()
streams[camera_id] = {
'cap': cap,
'buffer': buffer,
'thread': thread,
'rtsp_url': rtsp_url,
'stop_event': stop_event
}
logging.info(f"Subscribed to camera {camera_id} with URL {rtsp_url}")
elif camera_id and camera_id in streams:
stream = streams.pop(camera_id)
stream['cap'].release()
logging.info(f"Unsubscribed from camera {camera_id}")
elif data.get("command") == "stop":
logging.info("Received stop command")
break
except json.JSONDecodeError:
logging.error("Received invalid JSON message")
except (WebSocketDisconnect, ConnectionClosedError) as e:
logging.warning(f"WebSocket disconnected: {e}")
break
except Exception as e:
logging.error(f"Error handling message: {e}")
break
except Exception as e:
logging.error(f"Error in WebSocket connection: {e}")
logging.error(f"Unexpected error in WebSocket connection: {e}")
finally:
task.cancel()
await task
for camera_id, stream in streams.items():
stream['stop_event'].set()
stream['thread'].join()
stream['cap'].release()
logging.info(f"Released camera {camera_id}")
stream['buffer'].queue.clear()
logging.info(f"Released camera {camera_id} and cleaned up resources")
streams.clear()
logging.info("WebSocket connection closed")
await websocket.close()

View file

@ -1,5 +1,7 @@
{
"poll_interval_ms": 100,
"max_streams": 5,
"target_fps": 2
"target_fps": 2,
"reconnect_interval_sec": 5,
"max_retries": 3
}