From ffe9c90747dc66a83383626140333611fc3a566e Mon Sep 17 00:00:00 2001 From: Siwat Sirichai Date: Tue, 14 Jan 2025 23:54:07 +0700 Subject: [PATCH] thread safety --- app.py | 439 ++++++++++++++++++++++++++------------------------------- 1 file changed, 199 insertions(+), 240 deletions(-) diff --git a/app.py b/app.py index bde8d83..0941d3b 100644 --- a/app.py +++ b/app.py @@ -34,7 +34,7 @@ max_retries = config.get("max_retries", 3) # Configure logging logging.basicConfig( - level=logging.INFO, + level=logging.DEBUG, format="%(asctime)s [%(levelname)s] %(message)s", handlers=[ logging.FileHandler("app.log"), @@ -49,6 +49,10 @@ os.makedirs("models", exist_ok=True) HEARTBEAT_INTERVAL = 2 # seconds WORKER_TIMEOUT_MS = 10000 +# Add a lock for thread-safe operations on shared resources +streams_lock = threading.Lock() +models_lock = threading.Lock() + @app.websocket("/") async def detect(websocket: WebSocket): import asyncio @@ -61,77 +65,84 @@ async def detect(websocket: WebSocket): # This function is user-modifiable # Save data you want to persist across frames in the persistent_data dictionary async def handle_detection(camera_id, stream, frame, websocket, model: YOLO, persistent_data): - boxes = [] - for r in model.track(frame, stream=False, persist=True): - for box in r.boxes: - track_id = None - if hasattr(box, "id") and box.id is not None: - track_id = box.id.item() - box_cpu = box.cpu() - boxes.append({ - "class": model.names[int(box_cpu.cls[0])], - "confidence": float(box_cpu.conf[0]), - "id": track_id, - }) - # Broadcast to all subscribers of this URL - detection_data = { - "type": "imageDetection", - "cameraIdentifier": camera_id, - "timestamp": time.time(), - "data": { - "detections": boxes, - "modelId": stream['modelId'], - "modelName": stream['modelName'] + try: + boxes = [] + for r in model.track(frame, stream=False, persist=True): + for box in r.boxes: + track_id = None + if hasattr(box, "id") and box.id is not None: + track_id = box.id.item() + box_cpu = box.cpu() + boxes.append({ + "class": model.names[int(box_cpu.cls[0])], + "confidence": float(box_cpu.conf[0]), + "id": track_id, + }) + # Broadcast to all subscribers of this URL + detection_data = { + "type": "imageDetection", + "cameraIdentifier": camera_id, + "timestamp": time.time(), + "data": { + "detections": boxes, + "modelId": stream['modelId'], + "modelName": stream['modelName'] + } } - } - logging.debug(f"Sending detection data for camera {camera_id}: {detection_data}") - await websocket.send_json(detection_data) - return persistent_data + logging.debug(f"Sending detection data for camera {camera_id}: {detection_data}") + await websocket.send_json(detection_data) + return persistent_data + except Exception as e: + logging.error(f"Error in handle_detection for camera {camera_id}: {e}") + return persistent_data def frame_reader(camera_id, cap, buffer, stop_event): import time retries = 0 - while not stop_event.is_set(): - try: - ret, frame = cap.read() - if not ret: - logging.warning(f"Connection lost for camera: {camera_id}, retry {retries+1}/{max_retries}") + try: + while not stop_event.is_set(): + try: + ret, frame = cap.read() + if not ret: + logging.warning(f"Connection lost for camera: {camera_id}, retry {retries+1}/{max_retries}") + cap.release() + time.sleep(reconnect_interval) + retries += 1 + if retries > max_retries: + logging.error(f"Max retries reached for camera: {camera_id}") + break + # Re-open the VideoCapture + cap = cv2.VideoCapture(streams[camera_id]['rtsp_url']) + if not cap.isOpened(): + logging.error(f"Failed to reopen RTSP stream for camera: {camera_id}") + continue + continue + retries = 0 # Reset on success + if not buffer.empty(): + try: + buffer.get_nowait() # Discard the old frame + except queue.Empty: + pass + buffer.put(frame) + except cv2.error as e: + logging.error(f"OpenCV error for camera {camera_id}: {e}") cap.release() time.sleep(reconnect_interval) retries += 1 - if retries > max_retries: - logging.error(f"Max retries reached for camera: {camera_id}") + if retries > max_retries and max_retries != -1: + logging.error(f"Max retries reached after OpenCV error for camera: {camera_id}") break # Re-open the VideoCapture cap = cv2.VideoCapture(streams[camera_id]['rtsp_url']) if not cap.isOpened(): - logging.error(f"Failed to reopen RTSP stream for camera: {camera_id}") + logging.error(f"Failed to reopen RTSP stream for camera {camera_id} after OpenCV error") continue - continue - retries = 0 # Reset on success - if not buffer.empty(): - try: - buffer.get_nowait() # Discard the old frame - except queue.Empty: - pass - buffer.put(frame) - except cv2.error as e: - logging.error(f"OpenCV error for camera {camera_id}: {e}") - cap.release() - time.sleep(reconnect_interval) - retries += 1 - if retries > max_retries and max_retries != -1: - logging.error(f"Max retries reached after OpenCV error for camera: {camera_id}") + except Exception as e: + logging.error(f"Unexpected error for camera {camera_id}: {e}") + cap.release() break - # Re-open the VideoCapture - cap = cv2.VideoCapture(streams[camera_id]['rtsp_url']) - if not cap.isOpened(): - logging.error(f"Failed to reopen RTSP stream for camera {camera_id} after OpenCV error") - continue - except Exception as e: - logging.error(f"Unexpected error for camera {camera_id}: {e}") - cap.release() - break + except Exception as e: + logging.error(f"Error in frame_reader thread for camera {camera_id}: {e}") async def process_streams(): global models @@ -141,11 +152,14 @@ async def detect(websocket: WebSocket): while True: start_time = time.time() # Round-robin processing - for camera_id, stream in list(streams.items()): + with streams_lock: + current_streams = list(streams.items()) + for camera_id, stream in current_streams: buffer = stream['buffer'] if not buffer.empty(): frame = buffer.get() - model = models.get(camera_id, {}).get(stream['modelId']) + with models_lock: + model = models.get(camera_id, {}).get(stream['modelId']) key = (camera_id, stream['modelId']) persistent_data = persistent_data_dict.get(key, {}) updated_persistent_data = await handle_detection(camera_id, stream, frame, websocket, model, persistent_data) @@ -198,185 +212,120 @@ async def detect(websocket: WebSocket): async def on_message(): global models - while True: - msg = await websocket.receive_text() - logging.debug(f"Received message: {msg}") - data = json.loads(msg) - msg_type = data.get("type") - - if msg_type == "subscribe": - payload = data.get("payload", {}) - camera_id = payload.get("cameraIdentifier") - rtsp_url = payload.get("rtspUrl") - model_url = payload.get("modelUrl") - modelId = payload.get("modelId") - modelName = payload.get("modelName") - - if model_url: - if camera_id not in models: - models[camera_id] = {} - if modelId not in models[camera_id]: - print(f"Downloading model from {model_url}") - parsed_url = urlparse(model_url) - filename = os.path.basename(parsed_url.path) - model_filename = os.path.join("models", filename) - # Download the model - response = requests.get(model_url, stream=True) - if response.status_code == 200: - with open(model_filename, 'wb') as f: - for chunk in response.iter_content(chunk_size=8192): - f.write(chunk) - logging.info(f"Downloaded model from {model_url} to {model_filename}") - model = YOLO(model_filename) - if torch.cuda.is_available(): - model.to('cuda') - models[camera_id][modelId] = model - logging.info(f"Loaded model {modelId} for camera {camera_id}") - else: - logging.error(f"Failed to download model from {model_url}") - continue - if camera_id and rtsp_url: - if camera_id not in streams and len(streams) < max_streams: - cap = cv2.VideoCapture(rtsp_url) - if not cap.isOpened(): - logging.error(f"Failed to open RTSP stream for camera {camera_id}") - continue - buffer = queue.Queue(maxsize=1) - stop_event = threading.Event() - thread = threading.Thread(target=frame_reader, args=(camera_id, cap, buffer, stop_event)) - thread.daemon = True - thread.start() - streams[camera_id] = { - 'cap': cap, - 'buffer': buffer, - 'thread': thread, - 'rtsp_url': rtsp_url, - 'stop_event': stop_event, - 'modelId': modelId, - 'modelName': modelName - } - logging.info(f"Subscribed to camera {camera_id} with modelId {modelId}, modelName {modelName} and URL {rtsp_url}") - elif camera_id and camera_id in streams: - stream = streams.pop(camera_id) - stream['cap'].release() - logging.info(f"Unsubscribed from camera {camera_id}") - if camera_id in models and modelId in models[camera_id]: - del models[camera_id][modelId] - if not models[camera_id]: - del models[camera_id] - elif msg_type == "unsubscribe": - payload = data.get("payload", {}) - camera_id = payload.get("cameraIdentifier") - if camera_id and camera_id in streams: - stream = streams.pop(camera_id) - stream['cap'].release() - logging.info(f"Unsubscribed from camera {camera_id}") - if camera_id in models and modelId in models[camera_id]: - del models[camera_id][modelId] - if not models[camera_id]: - del models[camera_id] - elif msg_type == "requestState": - # Handle state request - cpu_usage = psutil.cpu_percent() - memory_usage = psutil.virtual_memory().percent - if torch.cuda.is_available(): - gpu_usage = torch.cuda.memory_allocated() / (1024 ** 2) # Convert to MB - gpu_memory_usage = torch.cuda.memory_reserved() / (1024 ** 2) # Convert to MB - else: - gpu_usage = None - gpu_memory_usage = None - - camera_connections = [ - { - "cameraIdentifier": camera_id, - "modelId": stream['modelId'], - "modelName": stream['modelName'], - "online": True - } - for camera_id, stream in streams.items() - ] - - state_report = { - "type": "stateReport", - "cpuUsage": cpu_usage, - "memoryUsage": memory_usage, - "gpuUsage": gpu_usage, - "gpuMemoryUsage": gpu_memory_usage, - "cameraConnections": camera_connections - } - await websocket.send_text(json.dumps(state_report)) - else: - logging.error(f"Unknown message type: {msg_type}") - - await websocket.accept() - task = asyncio.create_task(process_streams()) - heartbeat_task = asyncio.create_task(send_heartbeat()) - message_task = asyncio.create_task(on_message()) - - await asyncio.gather(heartbeat_task, message_task) - - model = None - model_path = None - - try: while True: try: msg = await websocket.receive_text() logging.debug(f"Received message: {msg}") data = json.loads(msg) - camera_id = data.get("cameraIdentifier") - rtsp_url = data.get("rtspUrl") - model_url = data.get("modelUrl") - modelId = data.get("modelId") - modelName = data.get("modelName") + msg_type = data.get("type") + + if msg_type == "subscribe": + payload = data.get("payload", {}) + camera_id = payload.get("cameraIdentifier") + rtsp_url = payload.get("rtspUrl") + model_url = payload.get("modelUrl") + modelId = payload.get("modelId") + modelName = payload.get("modelName") - if model_url: - print(f"Downloading model from {model_url}") - parsed_url = urlparse(model_url) - filename = os.path.basename(parsed_url.path) - model_filename = os.path.join("models", filename) - # Download the model - response = requests.get(model_url, stream=True) - if response.status_code == 200: - with open(model_filename, 'wb') as f: - for chunk in response.iter_content(chunk_size=8192): - f.write(chunk) - logging.info(f"Downloaded model from {model_url} to {model_filename}") - model = YOLO(model_filename) - if torch.cuda.is_available(): - model.to('cuda') - class_names = model.names + if model_url: + with models_lock: + if camera_id not in models: + models[camera_id] = {} + if modelId not in models[camera_id]: + print(f"Downloading model from {model_url}") + parsed_url = urlparse(model_url) + filename = os.path.basename(parsed_url.path) + model_filename = os.path.join("models", filename) + # Download the model + response = requests.get(model_url, stream=True) + if response.status_code == 200: + with open(model_filename, 'wb') as f: + for chunk in response.iter_content(chunk_size=8192): + f.write(chunk) + logging.info(f"Downloaded model from {model_url} to {model_filename}") + model = YOLO(model_filename) + if torch.cuda.is_available(): + model.to('cuda') + models[camera_id][modelId] = model + logging.info(f"Loaded model {modelId} for camera {camera_id}") + else: + logging.error(f"Failed to download model from {model_url}") + continue + if camera_id and rtsp_url: + with streams_lock: + if camera_id not in streams and len(streams) < max_streams: + cap = cv2.VideoCapture(rtsp_url) + if not cap.isOpened(): + logging.error(f"Failed to open RTSP stream for camera {camera_id}") + continue + buffer = queue.Queue(maxsize=1) + stop_event = threading.Event() + thread = threading.Thread(target=frame_reader, args=(camera_id, cap, buffer, stop_event)) + thread.daemon = True + thread.start() + streams[camera_id] = { + 'cap': cap, + 'buffer': buffer, + 'thread': thread, + 'rtsp_url': rtsp_url, + 'stop_event': stop_event, + 'modelId': modelId, + 'modelName': modelName + } + logging.info(f"Subscribed to camera {camera_id} with modelId {modelId}, modelName {modelName} and URL {rtsp_url}") + elif camera_id and camera_id in streams: + stream = streams.pop(camera_id) + stream['cap'].release() + logging.info(f"Unsubscribed from camera {camera_id}") + if camera_id in models and modelId in models[camera_id]: + del models[camera_id][modelId] + if not models[camera_id]: + del models[camera_id] + elif msg_type == "unsubscribe": + payload = data.get("payload", {}) + camera_id = payload.get("cameraIdentifier") + logging.debug(f"Unsubscribing from camera {camera_id}") + with streams_lock: + if camera_id and camera_id in streams: + stream = streams.pop(camera_id) + stream['cap'].release() + logging.info(f"Unsubscribed from camera {camera_id}") + if camera_id in models and modelId in models[camera_id]: + del models[camera_id][modelId] + if not models[camera_id]: + del models[camera_id] + elif msg_type == "requestState": + # Handle state request + cpu_usage = psutil.cpu_percent() + memory_usage = psutil.virtual_memory().percent + if torch.cuda.is_available(): + gpu_usage = torch.cuda.memory_allocated() / (1024 ** 2) # Convert to MB + gpu_memory_usage = torch.cuda.memory_reserved() / (1024 ** 2) # Convert to MB else: - logging.error(f"Failed to download model from {model_url}") - continue - if camera_id and rtsp_url: - if camera_id not in streams and len(streams) < max_streams: - cap = cv2.VideoCapture(rtsp_url) - if not cap.isOpened(): - logging.error(f"Failed to open RTSP stream for camera {camera_id}") - continue - buffer = queue.Queue(maxsize=1) - stop_event = threading.Event() - thread = threading.Thread(target=frame_reader, args=(camera_id, cap, buffer, stop_event)) - thread.daemon = True - thread.start() - streams[camera_id] = { - 'cap': cap, - 'buffer': buffer, - 'thread': thread, - 'rtsp_url': rtsp_url, - 'stop_event': stop_event, - 'modelId': modelId, - 'modelName': modelName + gpu_usage = None + gpu_memory_usage = None + + camera_connections = [ + { + "cameraIdentifier": camera_id, + "modelId": stream['modelId'], + "modelName": stream['modelName'], + "online": True } - logging.info(f"Subscribed to camera {camera_id} with modelId {modelId}, modelName {modelName} and URL {rtsp_url}") - elif camera_id and camera_id in streams: - stream = streams.pop(camera_id) - stream['cap'].release() - logging.info(f"Unsubscribed from camera {camera_id}") - elif data.get("command") == "stop": - logging.info("Received stop command") - break + for camera_id, stream in streams.items() + ] + + state_report = { + "type": "stateReport", + "cpuUsage": cpu_usage, + "memoryUsage": memory_usage, + "gpuUsage": gpu_usage, + "gpuMemoryUsage": gpu_memory_usage, + "cameraConnections": camera_connections + } + await websocket.send_text(json.dumps(state_report)) + else: + logging.error(f"Unknown message type: {msg_type}") except json.JSONDecodeError: logging.error("Received invalid JSON message") except (WebSocketDisconnect, ConnectionClosedError) as e: @@ -385,17 +334,27 @@ async def detect(websocket: WebSocket): except Exception as e: logging.error(f"Error handling message: {e}") break + + try: + await websocket.accept() + task = asyncio.create_task(process_streams()) + heartbeat_task = asyncio.create_task(send_heartbeat()) + message_task = asyncio.create_task(on_message()) + + await asyncio.gather(heartbeat_task, message_task) except Exception as e: - logging.error(f"Unexpected error in WebSocket connection: {e}") + logging.error(f"Error in detect websocket: {e}") finally: task.cancel() await task - for camera_id, stream in streams.items(): - stream['stop_event'].set() - stream['thread'].join() - stream['cap'].release() - stream['buffer'].queue.clear() - logging.info(f"Released camera {camera_id} and cleaned up resources") - streams.clear() - models.clear() + with streams_lock: + for camera_id, stream in streams.items(): + stream['stop_event'].set() + stream['thread'].join() + stream['cap'].release() + stream['buffer'].queue.clear() + logging.info(f"Released camera {camera_id} and cleaned up resources") + streams.clear() + with models_lock: + models.clear() logging.info("WebSocket connection closed")