python-detector-worker/app.py
2025-01-10 12:58:09 +07:00

132 lines
4.8 KiB
Python

from fastapi import FastAPI, WebSocket
from ultralytics import YOLO
import torch
import cv2
import base64
import numpy as np
import json
import logging
import threading
import queue
app = FastAPI()
model = YOLO("yolov8n.pt")
if torch.cuda.is_available():
model.to('cuda')
# Retrieve class names from the model
class_names = model.names
with open("config.json", "r") as f:
config = json.load(f)
poll_interval = config.get("poll_interval_ms", 100)
TARGET_FPS = config.get("target_fps", 10) # Add TARGET_FPS
poll_interval = 1000 / TARGET_FPS # Adjust poll_interval based on TARGET_FPS
logging.info(f"Poll interval: {poll_interval}ms")
max_streams = config.get("max_streams", 5)
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[
logging.FileHandler("app.log"),
logging.StreamHandler()
]
)
@app.websocket("/")
async def detect(websocket: WebSocket):
import asyncio
import time
logging.info("WebSocket connection accepted")
# Move streams inside the detect function
streams = {} # Modify to store {camera_id: {'cap': VideoCapture, 'buffer': Queue, 'thread': Thread}}
def frame_reader(camera_id, cap, buffer):
while True:
ret, frame = cap.read()
if not ret:
logging.warning(f"Failed to read frame from camera: {camera_id}")
break
if not buffer.empty():
try:
buffer.get_nowait() # Discard the old frame
except queue.Empty:
pass
buffer.put(frame)
async def process_streams():
logging.info("Started processing streams")
while True:
start_time = time.time()
# Round-robin processing
for camera_id, stream in list(streams.items()):
buffer = stream['buffer']
if not buffer.empty():
frame = buffer.get()
results = model(frame, stream=False)
boxes = []
for r in results:
for box in r.boxes:
boxes.append({
"class": class_names[int(box.cls[0])], # Use class name
"confidence": float(box.conf[0]),
})
# Broadcast to all subscribers of this URL
detection_data = {
"type": "imageDetection",
"cameraIdentifier": camera_id,
"timestamp": time.time(),
"data": {
"detections": boxes
}
}
logging.debug(f"Sending detection data for camera {camera_id}: {detection_data}")
await websocket.send_json(detection_data)
elapsed_time = (time.time() - start_time) * 1000 # in ms
sleep_time = max(poll_interval - elapsed_time, 0)
logging.debug(f"Elapsed time: {elapsed_time}ms, sleeping for: {sleep_time}ms")
await asyncio.sleep(sleep_time / 1000.0)
await websocket.accept()
task = asyncio.create_task(process_streams())
try:
while True:
msg = await websocket.receive_text()
logging.debug(f"Received message: {msg}")
data = json.loads(msg)
camera_id = data.get("cameraIdentifier")
rtsp_url = data.get("rtspUrl")
if camera_id and rtsp_url:
if camera_id not in streams and len(streams) < max_streams:
cap = cv2.VideoCapture(rtsp_url)
buffer = queue.Queue(maxsize=1)
thread = threading.Thread(target=frame_reader, args=(camera_id, cap, buffer))
thread.daemon = True
thread.start()
streams[camera_id] = {'cap': cap, 'buffer': buffer, 'thread': thread}
logging.info(f"Subscribed to camera {camera_id} with URL {rtsp_url}")
elif camera_id and camera_id in streams:
stream = streams.pop(camera_id)
stream['cap'].release()
logging.info(f"Unsubscribed from camera {camera_id}")
elif data.get("command") == "stop":
logging.info("Received stop command")
break
except Exception as e:
logging.error(f"Error in WebSocket connection: {e}")
finally:
task.cancel()
for camera_id, stream in streams.items():
stream['cap'].release()
logging.info(f"Released camera {camera_id}")
streams.clear()
logging.info("WebSocket connection closed")
await websocket.close()