thread safety
This commit is contained in:
		
							parent
							
								
									e52efabbb7
								
							
						
					
					
						commit
						ffe9c90747
					
				
					 1 changed files with 199 additions and 240 deletions
				
			
		
							
								
								
									
										439
									
								
								app.py
									
										
									
									
									
								
							
							
						
						
									
										439
									
								
								app.py
									
										
									
									
									
								
							| 
						 | 
				
			
			@ -34,7 +34,7 @@ max_retries = config.get("max_retries", 3)
 | 
			
		|||
 | 
			
		||||
# Configure logging
 | 
			
		||||
logging.basicConfig(
 | 
			
		||||
    level=logging.INFO,
 | 
			
		||||
    level=logging.DEBUG,
 | 
			
		||||
    format="%(asctime)s [%(levelname)s] %(message)s",
 | 
			
		||||
    handlers=[
 | 
			
		||||
        logging.FileHandler("app.log"),
 | 
			
		||||
| 
						 | 
				
			
			@ -49,6 +49,10 @@ os.makedirs("models", exist_ok=True)
 | 
			
		|||
HEARTBEAT_INTERVAL = 2  # seconds
 | 
			
		||||
WORKER_TIMEOUT_MS = 10000
 | 
			
		||||
 | 
			
		||||
# Add a lock for thread-safe operations on shared resources
 | 
			
		||||
streams_lock = threading.Lock()
 | 
			
		||||
models_lock = threading.Lock()
 | 
			
		||||
 | 
			
		||||
@app.websocket("/")
 | 
			
		||||
async def detect(websocket: WebSocket):
 | 
			
		||||
    import asyncio
 | 
			
		||||
| 
						 | 
				
			
			@ -61,77 +65,84 @@ async def detect(websocket: WebSocket):
 | 
			
		|||
    # This function is user-modifiable
 | 
			
		||||
    # Save data you want to persist across frames in the persistent_data dictionary
 | 
			
		||||
    async def handle_detection(camera_id, stream, frame, websocket, model: YOLO, persistent_data):
 | 
			
		||||
        boxes = []
 | 
			
		||||
        for r in model.track(frame, stream=False, persist=True):
 | 
			
		||||
            for box in r.boxes:
 | 
			
		||||
                track_id = None
 | 
			
		||||
                if hasattr(box, "id") and box.id is not None:
 | 
			
		||||
                    track_id = box.id.item()
 | 
			
		||||
                box_cpu = box.cpu()
 | 
			
		||||
                boxes.append({
 | 
			
		||||
                    "class": model.names[int(box_cpu.cls[0])],
 | 
			
		||||
                    "confidence": float(box_cpu.conf[0]),
 | 
			
		||||
                    "id": track_id,
 | 
			
		||||
                })
 | 
			
		||||
        # Broadcast to all subscribers of this URL
 | 
			
		||||
        detection_data = {
 | 
			
		||||
            "type": "imageDetection",
 | 
			
		||||
            "cameraIdentifier": camera_id,
 | 
			
		||||
            "timestamp": time.time(),
 | 
			
		||||
            "data": {
 | 
			
		||||
                "detections": boxes,
 | 
			
		||||
                "modelId": stream['modelId'],
 | 
			
		||||
                "modelName": stream['modelName']
 | 
			
		||||
        try:
 | 
			
		||||
            boxes = []
 | 
			
		||||
            for r in model.track(frame, stream=False, persist=True):
 | 
			
		||||
                for box in r.boxes:
 | 
			
		||||
                    track_id = None
 | 
			
		||||
                    if hasattr(box, "id") and box.id is not None:
 | 
			
		||||
                        track_id = box.id.item()
 | 
			
		||||
                    box_cpu = box.cpu()
 | 
			
		||||
                    boxes.append({
 | 
			
		||||
                        "class": model.names[int(box_cpu.cls[0])],
 | 
			
		||||
                        "confidence": float(box_cpu.conf[0]),
 | 
			
		||||
                        "id": track_id,
 | 
			
		||||
                    })
 | 
			
		||||
            # Broadcast to all subscribers of this URL
 | 
			
		||||
            detection_data = {
 | 
			
		||||
                "type": "imageDetection",
 | 
			
		||||
                "cameraIdentifier": camera_id,
 | 
			
		||||
                "timestamp": time.time(),
 | 
			
		||||
                "data": {
 | 
			
		||||
                    "detections": boxes,
 | 
			
		||||
                    "modelId": stream['modelId'],
 | 
			
		||||
                    "modelName": stream['modelName']
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
        logging.debug(f"Sending detection data for camera {camera_id}: {detection_data}")
 | 
			
		||||
        await websocket.send_json(detection_data)
 | 
			
		||||
        return persistent_data
 | 
			
		||||
            logging.debug(f"Sending detection data for camera {camera_id}: {detection_data}")
 | 
			
		||||
            await websocket.send_json(detection_data)
 | 
			
		||||
            return persistent_data
 | 
			
		||||
        except Exception as e:
 | 
			
		||||
            logging.error(f"Error in handle_detection for camera {camera_id}: {e}")
 | 
			
		||||
            return persistent_data
 | 
			
		||||
 | 
			
		||||
    def frame_reader(camera_id, cap, buffer, stop_event):
 | 
			
		||||
        import time
 | 
			
		||||
        retries = 0
 | 
			
		||||
        while not stop_event.is_set():
 | 
			
		||||
            try:
 | 
			
		||||
                ret, frame = cap.read()
 | 
			
		||||
                if not ret:
 | 
			
		||||
                    logging.warning(f"Connection lost for camera: {camera_id}, retry {retries+1}/{max_retries}")
 | 
			
		||||
        try:
 | 
			
		||||
            while not stop_event.is_set():
 | 
			
		||||
                try:
 | 
			
		||||
                    ret, frame = cap.read()
 | 
			
		||||
                    if not ret:
 | 
			
		||||
                        logging.warning(f"Connection lost for camera: {camera_id}, retry {retries+1}/{max_retries}")
 | 
			
		||||
                        cap.release()
 | 
			
		||||
                        time.sleep(reconnect_interval)
 | 
			
		||||
                        retries += 1
 | 
			
		||||
                        if retries > max_retries:
 | 
			
		||||
                            logging.error(f"Max retries reached for camera: {camera_id}")
 | 
			
		||||
                            break
 | 
			
		||||
                        # Re-open the VideoCapture
 | 
			
		||||
                        cap = cv2.VideoCapture(streams[camera_id]['rtsp_url'])
 | 
			
		||||
                        if not cap.isOpened():
 | 
			
		||||
                            logging.error(f"Failed to reopen RTSP stream for camera: {camera_id}")
 | 
			
		||||
                            continue
 | 
			
		||||
                        continue
 | 
			
		||||
                    retries = 0  # Reset on success
 | 
			
		||||
                    if not buffer.empty():
 | 
			
		||||
                        try:
 | 
			
		||||
                            buffer.get_nowait()  # Discard the old frame
 | 
			
		||||
                        except queue.Empty:
 | 
			
		||||
                            pass
 | 
			
		||||
                    buffer.put(frame)
 | 
			
		||||
                except cv2.error as e:
 | 
			
		||||
                    logging.error(f"OpenCV error for camera {camera_id}: {e}")
 | 
			
		||||
                    cap.release()
 | 
			
		||||
                    time.sleep(reconnect_interval)
 | 
			
		||||
                    retries += 1
 | 
			
		||||
                    if retries > max_retries:
 | 
			
		||||
                        logging.error(f"Max retries reached for camera: {camera_id}")
 | 
			
		||||
                    if retries > max_retries and max_retries != -1:
 | 
			
		||||
                        logging.error(f"Max retries reached after OpenCV error for camera: {camera_id}")
 | 
			
		||||
                        break
 | 
			
		||||
                    # Re-open the VideoCapture
 | 
			
		||||
                    cap = cv2.VideoCapture(streams[camera_id]['rtsp_url'])
 | 
			
		||||
                    if not cap.isOpened():
 | 
			
		||||
                        logging.error(f"Failed to reopen RTSP stream for camera: {camera_id}")
 | 
			
		||||
                        logging.error(f"Failed to reopen RTSP stream for camera {camera_id} after OpenCV error")
 | 
			
		||||
                        continue
 | 
			
		||||
                    continue
 | 
			
		||||
                retries = 0  # Reset on success
 | 
			
		||||
                if not buffer.empty():
 | 
			
		||||
                    try:
 | 
			
		||||
                        buffer.get_nowait()  # Discard the old frame
 | 
			
		||||
                    except queue.Empty:
 | 
			
		||||
                        pass
 | 
			
		||||
                buffer.put(frame)
 | 
			
		||||
            except cv2.error as e:
 | 
			
		||||
                logging.error(f"OpenCV error for camera {camera_id}: {e}")
 | 
			
		||||
                cap.release()
 | 
			
		||||
                time.sleep(reconnect_interval)
 | 
			
		||||
                retries += 1
 | 
			
		||||
                if retries > max_retries and max_retries != -1:
 | 
			
		||||
                    logging.error(f"Max retries reached after OpenCV error for camera: {camera_id}")
 | 
			
		||||
                except Exception as e:
 | 
			
		||||
                    logging.error(f"Unexpected error for camera {camera_id}: {e}")
 | 
			
		||||
                    cap.release()
 | 
			
		||||
                    break
 | 
			
		||||
                # Re-open the VideoCapture
 | 
			
		||||
                cap = cv2.VideoCapture(streams[camera_id]['rtsp_url'])
 | 
			
		||||
                if not cap.isOpened():
 | 
			
		||||
                    logging.error(f"Failed to reopen RTSP stream for camera {camera_id} after OpenCV error")
 | 
			
		||||
                    continue
 | 
			
		||||
            except Exception as e:
 | 
			
		||||
                logging.error(f"Unexpected error for camera {camera_id}: {e}")
 | 
			
		||||
                cap.release()
 | 
			
		||||
                break
 | 
			
		||||
        except Exception as e:
 | 
			
		||||
            logging.error(f"Error in frame_reader thread for camera {camera_id}: {e}")
 | 
			
		||||
 | 
			
		||||
    async def process_streams():
 | 
			
		||||
        global models
 | 
			
		||||
| 
						 | 
				
			
			@ -141,11 +152,14 @@ async def detect(websocket: WebSocket):
 | 
			
		|||
            while True:
 | 
			
		||||
                start_time = time.time()
 | 
			
		||||
                # Round-robin processing
 | 
			
		||||
                for camera_id, stream in list(streams.items()):
 | 
			
		||||
                with streams_lock:
 | 
			
		||||
                    current_streams = list(streams.items())
 | 
			
		||||
                for camera_id, stream in current_streams:
 | 
			
		||||
                    buffer = stream['buffer']
 | 
			
		||||
                    if not buffer.empty():
 | 
			
		||||
                        frame = buffer.get()
 | 
			
		||||
                        model = models.get(camera_id, {}).get(stream['modelId'])
 | 
			
		||||
                        with models_lock:
 | 
			
		||||
                            model = models.get(camera_id, {}).get(stream['modelId'])
 | 
			
		||||
                        key = (camera_id, stream['modelId'])
 | 
			
		||||
                        persistent_data = persistent_data_dict.get(key, {})
 | 
			
		||||
                        updated_persistent_data = await handle_detection(camera_id, stream, frame, websocket, model, persistent_data)
 | 
			
		||||
| 
						 | 
				
			
			@ -198,185 +212,120 @@ async def detect(websocket: WebSocket):
 | 
			
		|||
 | 
			
		||||
    async def on_message():
 | 
			
		||||
        global models
 | 
			
		||||
        while True:
 | 
			
		||||
            msg = await websocket.receive_text()
 | 
			
		||||
            logging.debug(f"Received message: {msg}")
 | 
			
		||||
            data = json.loads(msg)
 | 
			
		||||
            msg_type = data.get("type")
 | 
			
		||||
 | 
			
		||||
            if msg_type == "subscribe":
 | 
			
		||||
                payload = data.get("payload", {})
 | 
			
		||||
                camera_id = payload.get("cameraIdentifier")
 | 
			
		||||
                rtsp_url = payload.get("rtspUrl")
 | 
			
		||||
                model_url = payload.get("modelUrl")
 | 
			
		||||
                modelId = payload.get("modelId")
 | 
			
		||||
                modelName = payload.get("modelName")
 | 
			
		||||
    
 | 
			
		||||
                if model_url:
 | 
			
		||||
                    if camera_id not in models:
 | 
			
		||||
                        models[camera_id] = {}
 | 
			
		||||
                    if modelId not in models[camera_id]:
 | 
			
		||||
                        print(f"Downloading model from {model_url}")
 | 
			
		||||
                        parsed_url = urlparse(model_url)
 | 
			
		||||
                        filename = os.path.basename(parsed_url.path)    
 | 
			
		||||
                        model_filename = os.path.join("models", filename)
 | 
			
		||||
                        # Download the model
 | 
			
		||||
                        response = requests.get(model_url, stream=True)
 | 
			
		||||
                        if response.status_code == 200:
 | 
			
		||||
                            with open(model_filename, 'wb') as f:
 | 
			
		||||
                                for chunk in response.iter_content(chunk_size=8192):
 | 
			
		||||
                                    f.write(chunk)
 | 
			
		||||
                            logging.info(f"Downloaded model from {model_url} to {model_filename}")
 | 
			
		||||
                            model = YOLO(model_filename)
 | 
			
		||||
                            if torch.cuda.is_available():
 | 
			
		||||
                                model.to('cuda')
 | 
			
		||||
                            models[camera_id][modelId] = model
 | 
			
		||||
                            logging.info(f"Loaded model {modelId} for camera {camera_id}")
 | 
			
		||||
                        else:
 | 
			
		||||
                            logging.error(f"Failed to download model from {model_url}")
 | 
			
		||||
                            continue
 | 
			
		||||
                if camera_id and rtsp_url:
 | 
			
		||||
                    if camera_id not in streams and len(streams) < max_streams:
 | 
			
		||||
                        cap = cv2.VideoCapture(rtsp_url)
 | 
			
		||||
                        if not cap.isOpened():
 | 
			
		||||
                            logging.error(f"Failed to open RTSP stream for camera {camera_id}")
 | 
			
		||||
                            continue
 | 
			
		||||
                        buffer = queue.Queue(maxsize=1)
 | 
			
		||||
                        stop_event = threading.Event()
 | 
			
		||||
                        thread = threading.Thread(target=frame_reader, args=(camera_id, cap, buffer, stop_event))
 | 
			
		||||
                        thread.daemon = True
 | 
			
		||||
                        thread.start()
 | 
			
		||||
                        streams[camera_id] = {
 | 
			
		||||
                            'cap': cap,
 | 
			
		||||
                            'buffer': buffer,
 | 
			
		||||
                            'thread': thread,
 | 
			
		||||
                            'rtsp_url': rtsp_url,
 | 
			
		||||
                            'stop_event': stop_event,
 | 
			
		||||
                            'modelId': modelId,
 | 
			
		||||
                            'modelName': modelName
 | 
			
		||||
                        }
 | 
			
		||||
                        logging.info(f"Subscribed to camera {camera_id} with modelId {modelId}, modelName {modelName} and URL {rtsp_url}")
 | 
			
		||||
                    elif camera_id and camera_id in streams:
 | 
			
		||||
                        stream = streams.pop(camera_id)
 | 
			
		||||
                        stream['cap'].release()
 | 
			
		||||
                        logging.info(f"Unsubscribed from camera {camera_id}")
 | 
			
		||||
                        if camera_id in models and modelId in models[camera_id]:
 | 
			
		||||
                            del models[camera_id][modelId]
 | 
			
		||||
                            if not models[camera_id]:
 | 
			
		||||
                                del models[camera_id]
 | 
			
		||||
            elif msg_type == "unsubscribe":
 | 
			
		||||
                payload = data.get("payload", {})
 | 
			
		||||
                camera_id = payload.get("cameraIdentifier")
 | 
			
		||||
                if camera_id and camera_id in streams:
 | 
			
		||||
                    stream = streams.pop(camera_id)
 | 
			
		||||
                    stream['cap'].release()
 | 
			
		||||
                    logging.info(f"Unsubscribed from camera {camera_id}")
 | 
			
		||||
                    if camera_id in models and modelId in models[camera_id]:
 | 
			
		||||
                        del models[camera_id][modelId]
 | 
			
		||||
                        if not models[camera_id]:
 | 
			
		||||
                            del models[camera_id]
 | 
			
		||||
            elif msg_type == "requestState":
 | 
			
		||||
                # Handle state request
 | 
			
		||||
                cpu_usage = psutil.cpu_percent()
 | 
			
		||||
                memory_usage = psutil.virtual_memory().percent
 | 
			
		||||
                if torch.cuda.is_available():
 | 
			
		||||
                    gpu_usage = torch.cuda.memory_allocated() / (1024 ** 2)  # Convert to MB
 | 
			
		||||
                    gpu_memory_usage = torch.cuda.memory_reserved() / (1024 ** 2)  # Convert to MB
 | 
			
		||||
                else:
 | 
			
		||||
                    gpu_usage = None
 | 
			
		||||
                    gpu_memory_usage = None
 | 
			
		||||
                
 | 
			
		||||
                camera_connections = [
 | 
			
		||||
                    {
 | 
			
		||||
                        "cameraIdentifier": camera_id,
 | 
			
		||||
                        "modelId": stream['modelId'],
 | 
			
		||||
                        "modelName": stream['modelName'],
 | 
			
		||||
                        "online": True
 | 
			
		||||
                    }
 | 
			
		||||
                    for camera_id, stream in streams.items()
 | 
			
		||||
                ]
 | 
			
		||||
                
 | 
			
		||||
                state_report = {
 | 
			
		||||
                    "type": "stateReport",
 | 
			
		||||
                    "cpuUsage": cpu_usage,
 | 
			
		||||
                    "memoryUsage": memory_usage,
 | 
			
		||||
                    "gpuUsage": gpu_usage,
 | 
			
		||||
                    "gpuMemoryUsage": gpu_memory_usage,
 | 
			
		||||
                    "cameraConnections": camera_connections
 | 
			
		||||
                }
 | 
			
		||||
                await websocket.send_text(json.dumps(state_report))
 | 
			
		||||
            else:
 | 
			
		||||
                logging.error(f"Unknown message type: {msg_type}")
 | 
			
		||||
 | 
			
		||||
    await websocket.accept()
 | 
			
		||||
    task = asyncio.create_task(process_streams())
 | 
			
		||||
    heartbeat_task = asyncio.create_task(send_heartbeat())
 | 
			
		||||
    message_task = asyncio.create_task(on_message())
 | 
			
		||||
 | 
			
		||||
    await asyncio.gather(heartbeat_task, message_task)
 | 
			
		||||
 | 
			
		||||
    model = None
 | 
			
		||||
    model_path = None
 | 
			
		||||
 | 
			
		||||
    try:
 | 
			
		||||
        while True:
 | 
			
		||||
            try:
 | 
			
		||||
                msg = await websocket.receive_text()
 | 
			
		||||
                logging.debug(f"Received message: {msg}")
 | 
			
		||||
                data = json.loads(msg)
 | 
			
		||||
                camera_id = data.get("cameraIdentifier")
 | 
			
		||||
                rtsp_url = data.get("rtspUrl")
 | 
			
		||||
                model_url = data.get("modelUrl")
 | 
			
		||||
                modelId = data.get("modelId")
 | 
			
		||||
                modelName = data.get("modelName")
 | 
			
		||||
                msg_type = data.get("type")
 | 
			
		||||
 | 
			
		||||
                if msg_type == "subscribe":
 | 
			
		||||
                    payload = data.get("payload", {})
 | 
			
		||||
                    camera_id = payload.get("cameraIdentifier")
 | 
			
		||||
                    rtsp_url = payload.get("rtspUrl")
 | 
			
		||||
                    model_url = payload.get("modelUrl")
 | 
			
		||||
                    modelId = payload.get("modelId")
 | 
			
		||||
                    modelName = payload.get("modelName")
 | 
			
		||||
    
 | 
			
		||||
                if model_url:
 | 
			
		||||
                    print(f"Downloading model from {model_url}")
 | 
			
		||||
                    parsed_url = urlparse(model_url)
 | 
			
		||||
                    filename = os.path.basename(parsed_url.path)    
 | 
			
		||||
                    model_filename = os.path.join("models", filename)
 | 
			
		||||
                    # Download the model
 | 
			
		||||
                    response = requests.get(model_url, stream=True)
 | 
			
		||||
                    if response.status_code == 200:
 | 
			
		||||
                        with open(model_filename, 'wb') as f:
 | 
			
		||||
                            for chunk in response.iter_content(chunk_size=8192):
 | 
			
		||||
                                f.write(chunk)
 | 
			
		||||
                        logging.info(f"Downloaded model from {model_url} to {model_filename}")
 | 
			
		||||
                        model = YOLO(model_filename)
 | 
			
		||||
                        if torch.cuda.is_available():
 | 
			
		||||
                            model.to('cuda')
 | 
			
		||||
                        class_names = model.names
 | 
			
		||||
                    if model_url:
 | 
			
		||||
                        with models_lock:
 | 
			
		||||
                            if camera_id not in models:
 | 
			
		||||
                                models[camera_id] = {}
 | 
			
		||||
                            if modelId not in models[camera_id]:
 | 
			
		||||
                                print(f"Downloading model from {model_url}")
 | 
			
		||||
                                parsed_url = urlparse(model_url)
 | 
			
		||||
                                filename = os.path.basename(parsed_url.path)    
 | 
			
		||||
                                model_filename = os.path.join("models", filename)
 | 
			
		||||
                                # Download the model
 | 
			
		||||
                                response = requests.get(model_url, stream=True)
 | 
			
		||||
                                if response.status_code == 200:
 | 
			
		||||
                                    with open(model_filename, 'wb') as f:
 | 
			
		||||
                                        for chunk in response.iter_content(chunk_size=8192):
 | 
			
		||||
                                            f.write(chunk)
 | 
			
		||||
                                    logging.info(f"Downloaded model from {model_url} to {model_filename}")
 | 
			
		||||
                                    model = YOLO(model_filename)
 | 
			
		||||
                                    if torch.cuda.is_available():
 | 
			
		||||
                                        model.to('cuda')
 | 
			
		||||
                                    models[camera_id][modelId] = model
 | 
			
		||||
                                    logging.info(f"Loaded model {modelId} for camera {camera_id}")
 | 
			
		||||
                                else:
 | 
			
		||||
                                    logging.error(f"Failed to download model from {model_url}")
 | 
			
		||||
                                    continue
 | 
			
		||||
                    if camera_id and rtsp_url:
 | 
			
		||||
                        with streams_lock:
 | 
			
		||||
                            if camera_id not in streams and len(streams) < max_streams:
 | 
			
		||||
                                cap = cv2.VideoCapture(rtsp_url)
 | 
			
		||||
                                if not cap.isOpened():
 | 
			
		||||
                                    logging.error(f"Failed to open RTSP stream for camera {camera_id}")
 | 
			
		||||
                                    continue
 | 
			
		||||
                                buffer = queue.Queue(maxsize=1)
 | 
			
		||||
                                stop_event = threading.Event()
 | 
			
		||||
                                thread = threading.Thread(target=frame_reader, args=(camera_id, cap, buffer, stop_event))
 | 
			
		||||
                                thread.daemon = True
 | 
			
		||||
                                thread.start()
 | 
			
		||||
                                streams[camera_id] = {
 | 
			
		||||
                                    'cap': cap,
 | 
			
		||||
                                    'buffer': buffer,
 | 
			
		||||
                                    'thread': thread,
 | 
			
		||||
                                    'rtsp_url': rtsp_url,
 | 
			
		||||
                                    'stop_event': stop_event,
 | 
			
		||||
                                    'modelId': modelId,
 | 
			
		||||
                                    'modelName': modelName
 | 
			
		||||
                                }
 | 
			
		||||
                                logging.info(f"Subscribed to camera {camera_id} with modelId {modelId}, modelName {modelName} and URL {rtsp_url}")
 | 
			
		||||
                            elif camera_id and camera_id in streams:
 | 
			
		||||
                                stream = streams.pop(camera_id)
 | 
			
		||||
                                stream['cap'].release()
 | 
			
		||||
                                logging.info(f"Unsubscribed from camera {camera_id}")
 | 
			
		||||
                                if camera_id in models and modelId in models[camera_id]:
 | 
			
		||||
                                    del models[camera_id][modelId]
 | 
			
		||||
                                    if not models[camera_id]:
 | 
			
		||||
                                        del models[camera_id]
 | 
			
		||||
                elif msg_type == "unsubscribe":
 | 
			
		||||
                    payload = data.get("payload", {})
 | 
			
		||||
                    camera_id = payload.get("cameraIdentifier")
 | 
			
		||||
                    logging.debug(f"Unsubscribing from camera {camera_id}")
 | 
			
		||||
                    with streams_lock:
 | 
			
		||||
                        if camera_id and camera_id in streams:
 | 
			
		||||
                            stream = streams.pop(camera_id)
 | 
			
		||||
                            stream['cap'].release()
 | 
			
		||||
                            logging.info(f"Unsubscribed from camera {camera_id}")
 | 
			
		||||
                            if camera_id in models and modelId in models[camera_id]:
 | 
			
		||||
                                del models[camera_id][modelId]
 | 
			
		||||
                                if not models[camera_id]:
 | 
			
		||||
                                    del models[camera_id]
 | 
			
		||||
                elif msg_type == "requestState":
 | 
			
		||||
                    # Handle state request
 | 
			
		||||
                    cpu_usage = psutil.cpu_percent()
 | 
			
		||||
                    memory_usage = psutil.virtual_memory().percent
 | 
			
		||||
                    if torch.cuda.is_available():
 | 
			
		||||
                        gpu_usage = torch.cuda.memory_allocated() / (1024 ** 2)  # Convert to MB
 | 
			
		||||
                        gpu_memory_usage = torch.cuda.memory_reserved() / (1024 ** 2)  # Convert to MB
 | 
			
		||||
                    else:
 | 
			
		||||
                        logging.error(f"Failed to download model from {model_url}")
 | 
			
		||||
                        continue
 | 
			
		||||
                if camera_id and rtsp_url:
 | 
			
		||||
                    if camera_id not in streams and len(streams) < max_streams:
 | 
			
		||||
                        cap = cv2.VideoCapture(rtsp_url)
 | 
			
		||||
                        if not cap.isOpened():
 | 
			
		||||
                            logging.error(f"Failed to open RTSP stream for camera {camera_id}")
 | 
			
		||||
                            continue
 | 
			
		||||
                        buffer = queue.Queue(maxsize=1)
 | 
			
		||||
                        stop_event = threading.Event()
 | 
			
		||||
                        thread = threading.Thread(target=frame_reader, args=(camera_id, cap, buffer, stop_event))
 | 
			
		||||
                        thread.daemon = True
 | 
			
		||||
                        thread.start()
 | 
			
		||||
                        streams[camera_id] = {
 | 
			
		||||
                            'cap': cap,
 | 
			
		||||
                            'buffer': buffer,
 | 
			
		||||
                            'thread': thread,
 | 
			
		||||
                            'rtsp_url': rtsp_url,
 | 
			
		||||
                            'stop_event': stop_event,
 | 
			
		||||
                            'modelId': modelId,
 | 
			
		||||
                            'modelName': modelName
 | 
			
		||||
                        gpu_usage = None
 | 
			
		||||
                        gpu_memory_usage = None
 | 
			
		||||
                    
 | 
			
		||||
                    camera_connections = [
 | 
			
		||||
                        {
 | 
			
		||||
                            "cameraIdentifier": camera_id,
 | 
			
		||||
                            "modelId": stream['modelId'],
 | 
			
		||||
                            "modelName": stream['modelName'],
 | 
			
		||||
                            "online": True
 | 
			
		||||
                        }
 | 
			
		||||
                        logging.info(f"Subscribed to camera {camera_id} with modelId {modelId}, modelName {modelName} and URL {rtsp_url}")
 | 
			
		||||
                    elif camera_id and camera_id in streams:
 | 
			
		||||
                        stream = streams.pop(camera_id)
 | 
			
		||||
                        stream['cap'].release()
 | 
			
		||||
                        logging.info(f"Unsubscribed from camera {camera_id}")
 | 
			
		||||
                elif data.get("command") == "stop":
 | 
			
		||||
                    logging.info("Received stop command")
 | 
			
		||||
                    break
 | 
			
		||||
                        for camera_id, stream in streams.items()
 | 
			
		||||
                    ]
 | 
			
		||||
                    
 | 
			
		||||
                    state_report = {
 | 
			
		||||
                        "type": "stateReport",
 | 
			
		||||
                        "cpuUsage": cpu_usage,
 | 
			
		||||
                        "memoryUsage": memory_usage,
 | 
			
		||||
                        "gpuUsage": gpu_usage,
 | 
			
		||||
                        "gpuMemoryUsage": gpu_memory_usage,
 | 
			
		||||
                        "cameraConnections": camera_connections
 | 
			
		||||
                    }
 | 
			
		||||
                    await websocket.send_text(json.dumps(state_report))
 | 
			
		||||
                else:
 | 
			
		||||
                    logging.error(f"Unknown message type: {msg_type}")
 | 
			
		||||
            except json.JSONDecodeError:
 | 
			
		||||
                logging.error("Received invalid JSON message")
 | 
			
		||||
            except (WebSocketDisconnect, ConnectionClosedError) as e:
 | 
			
		||||
| 
						 | 
				
			
			@ -385,17 +334,27 @@ async def detect(websocket: WebSocket):
 | 
			
		|||
            except Exception as e:
 | 
			
		||||
                logging.error(f"Error handling message: {e}")
 | 
			
		||||
                break
 | 
			
		||||
 | 
			
		||||
    try:
 | 
			
		||||
        await websocket.accept()
 | 
			
		||||
        task = asyncio.create_task(process_streams())
 | 
			
		||||
        heartbeat_task = asyncio.create_task(send_heartbeat())
 | 
			
		||||
        message_task = asyncio.create_task(on_message())
 | 
			
		||||
 | 
			
		||||
        await asyncio.gather(heartbeat_task, message_task)
 | 
			
		||||
    except Exception as e:
 | 
			
		||||
        logging.error(f"Unexpected error in WebSocket connection: {e}")
 | 
			
		||||
        logging.error(f"Error in detect websocket: {e}")
 | 
			
		||||
    finally:
 | 
			
		||||
        task.cancel()
 | 
			
		||||
        await task
 | 
			
		||||
        for camera_id, stream in streams.items():
 | 
			
		||||
            stream['stop_event'].set()
 | 
			
		||||
            stream['thread'].join()
 | 
			
		||||
            stream['cap'].release()
 | 
			
		||||
            stream['buffer'].queue.clear()
 | 
			
		||||
            logging.info(f"Released camera {camera_id} and cleaned up resources")
 | 
			
		||||
        streams.clear()
 | 
			
		||||
        models.clear()
 | 
			
		||||
        with streams_lock:
 | 
			
		||||
            for camera_id, stream in streams.items():
 | 
			
		||||
                stream['stop_event'].set()
 | 
			
		||||
                stream['thread'].join()
 | 
			
		||||
                stream['cap'].release()
 | 
			
		||||
                stream['buffer'].queue.clear()
 | 
			
		||||
                logging.info(f"Released camera {camera_id} and cleaned up resources")
 | 
			
		||||
            streams.clear()
 | 
			
		||||
        with models_lock:
 | 
			
		||||
            models.clear()
 | 
			
		||||
        logging.info("WebSocket connection closed")
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue