diff --git a/app.py b/app.py index 5386496..4e9be15 100644 --- a/app.py +++ b/app.py @@ -122,6 +122,15 @@ def fetch_snapshot(url: str): logger.error(f"Exception fetching snapshot from {url}: {str(e)}") return None +# Helper to get crop coordinates from stream +def get_crop_coords(stream): + return { + "cropX1": stream.get("cropX1"), + "cropY1": stream.get("cropY1"), + "cropX2": stream.get("cropX2"), + "cropY2": stream.get("cropY2") + } + #################################################### # REST API endpoint for image retrieval #################################################### @@ -133,20 +142,24 @@ async def get_camera_image(camera_id: str): try: with streams_lock: if camera_id not in streams: + logger.warning(f"Camera ID '{camera_id}' not found in streams. Current streams: {list(streams.keys())}") raise HTTPException(status_code=404, detail=f"Camera {camera_id} not found or not active") stream = streams[camera_id] buffer = stream["buffer"] + logger.debug(f"Camera '{camera_id}' buffer size: {buffer.qsize()}, buffer empty: {buffer.empty()}") + logger.debug(f"Buffer queue contents: {getattr(buffer, 'queue', None)}") if buffer.empty(): + logger.warning(f"No frame available for camera '{camera_id}'. Buffer is empty.") raise HTTPException(status_code=404, detail=f"No frame available for camera {camera_id}") # Get the latest frame (non-blocking) try: frame = buffer.queue[-1] # Get the most recent frame without removing it except IndexError: + logger.warning(f"Buffer queue is empty for camera '{camera_id}' when trying to access last frame.") raise HTTPException(status_code=404, detail=f"No frame available for camera {camera_id}") - # Encode frame as JPEG success, buffer_img = cv2.imencode('.jpg', frame, [cv2.IMWRITE_JPEG_QUALITY, 85]) if not success: @@ -224,8 +237,8 @@ async def detect(websocket: WebSocket): detection_data = { "type": "imageDetection", - "cameraIdentifier": camera_id, - "timestamp": time.time(), + "subscriptionIdentifier": stream["subscriptionIdentifier"], + "timestamp": time.strftime("%Y-%m-%dT%H:%M:%S.%fZ", time.gmtime()), "data": { "detection": highest_confidence_detection, # Send only the highest confidence detection "modelId": stream["modelId"], @@ -304,12 +317,11 @@ async def detect(websocket: WebSocket): if not buffer.empty(): try: buffer.get_nowait() - logger.debug(f"Removed old frame from buffer for camera {camera_id}") + logger.debug(f"[frame_reader] Removed old frame from buffer for camera {camera_id}") except queue.Empty: pass - buffer.put(frame) - logger.debug(f"Added new frame to buffer for camera {camera_id}") + logger.debug(f"[frame_reader] Added new frame to buffer for camera {camera_id}. Buffer size: {buffer.qsize()}") # Short sleep to avoid CPU overuse time.sleep(0.01) @@ -380,12 +392,11 @@ async def detect(websocket: WebSocket): if not buffer.empty(): try: buffer.get_nowait() - logger.debug(f"Removed old snapshot from buffer for camera {camera_id}") + logger.debug(f"[snapshot_reader] Removed old snapshot from buffer for camera {camera_id}") except queue.Empty: pass - buffer.put(frame) - logger.debug(f"Added new snapshot to buffer for camera {camera_id}") + logger.debug(f"[snapshot_reader] Added new snapshot to buffer for camera {camera_id}. Buffer size: {buffer.qsize()}") # Wait for the specified interval elapsed = time.time() - start_time @@ -456,18 +467,19 @@ async def detect(websocket: WebSocket): cpu_usage = psutil.cpu_percent() memory_usage = psutil.virtual_memory().percent if torch.cuda.is_available(): - gpu_usage = torch.cuda.memory_allocated() / (1024 ** 2) # MB - gpu_memory_usage = torch.cuda.memory_reserved() / (1024 ** 2) # MB + gpu_usage = torch.cuda.utilization() if hasattr(torch.cuda, 'utilization') else None + gpu_memory_usage = torch.cuda.memory_reserved() / (1024 ** 2) else: gpu_usage = None gpu_memory_usage = None camera_connections = [ { - "cameraIdentifier": camera_id, + "subscriptionIdentifier": stream["subscriptionIdentifier"], "modelId": stream["modelId"], "modelName": stream["modelName"], - "online": True + "online": True, + **{k: v for k, v in get_crop_coords(stream).items() if v is not None} } for camera_id, stream in streams.items() ] @@ -497,13 +509,19 @@ async def detect(websocket: WebSocket): if msg_type == "subscribe": payload = data.get("payload", {}) - camera_id = payload.get("cameraIdentifier") + subscriptionIdentifier = payload.get("subscriptionIdentifier") rtsp_url = payload.get("rtspUrl") snapshot_url = payload.get("snapshotUrl") - snapshot_interval = payload.get("snapshotInterval") # in milliseconds - model_url = payload.get("modelUrl") # may be remote or local + snapshot_interval = payload.get("snapshotInterval") + model_url = payload.get("modelUrl") modelId = payload.get("modelId") modelName = payload.get("modelName") + cropX1 = payload.get("cropX1") + cropY1 = payload.get("cropY1") + cropX2 = payload.get("cropX2") + cropY2 = payload.get("cropY2") + + camera_id = subscriptionIdentifier # Use subscriptionIdentifier as camera_id for mapping if model_url: with models_lock: @@ -566,24 +584,30 @@ async def detect(websocket: WebSocket): if camera_id not in streams and len(streams) < max_streams: buffer = queue.Queue(maxsize=1) stop_event = threading.Event() - - # Choose between snapshot and RTSP based on availability + stream_info = { + "buffer": buffer, + "thread": None, + "stop_event": stop_event, + "modelId": modelId, + "modelName": modelName, + "subscriptionIdentifier": subscriptionIdentifier, + "cropX1": cropX1, + "cropY1": cropY1, + "cropX2": cropX2, + "cropY2": cropY2 + } if snapshot_url and snapshot_interval: logger.info(f"Using snapshot mode for camera {camera_id}: {snapshot_url}") thread = threading.Thread(target=snapshot_reader, args=(camera_id, snapshot_url, snapshot_interval, buffer, stop_event)) thread.daemon = True thread.start() - streams[camera_id] = { - "buffer": buffer, - "thread": thread, + stream_info.update({ "snapshot_url": snapshot_url, "snapshot_interval": snapshot_interval, - "stop_event": stop_event, - "modelId": modelId, - "modelName": modelName, "mode": "snapshot" - } - logger.info(f"Subscribed to camera {camera_id} (snapshot mode) with modelId {modelId}, modelName {modelName}, URL {snapshot_url}, interval {snapshot_interval}ms") + }) + stream_info["thread"] = thread + streams[camera_id] = stream_info elif rtsp_url: logger.info(f"Using RTSP mode for camera {camera_id}: {rtsp_url}") cap = cv2.VideoCapture(rtsp_url) @@ -593,17 +617,13 @@ async def detect(websocket: WebSocket): thread = threading.Thread(target=frame_reader, args=(camera_id, cap, buffer, stop_event)) thread.daemon = True thread.start() - streams[camera_id] = { + stream_info.update({ "cap": cap, - "buffer": buffer, - "thread": thread, "rtsp_url": rtsp_url, - "stop_event": stop_event, - "modelId": modelId, - "modelName": modelName, "mode": "rtsp" - } - logger.info(f"Subscribed to camera {camera_id} (RTSP mode) with modelId {modelId}, modelName {modelName}, URL {rtsp_url}") + }) + stream_info["thread"] = thread + streams[camera_id] = stream_info else: logger.error(f"No valid URL provided for camera {camera_id}") continue @@ -622,20 +642,15 @@ async def detect(websocket: WebSocket): del models[camera_id] elif msg_type == "unsubscribe": payload = data.get("payload", {}) - camera_id = payload.get("cameraIdentifier") - logger.debug(f"Unsubscribing from camera {camera_id}") + subscriptionIdentifier = payload.get("subscriptionIdentifier") + camera_id = subscriptionIdentifier with streams_lock: if camera_id and camera_id in streams: stream = streams.pop(camera_id) stream["stop_event"].set() stream["thread"].join() - # Only release cap if it exists (RTSP mode) if "cap" in stream: stream["cap"].release() - logger.info(f"Released RTSP capture for camera {camera_id}") - else: - logger.info(f"Released snapshot reader for camera {camera_id}") - logger.info(f"Unsubscribed from camera {camera_id}") with models_lock: if camera_id in models: del models[camera_id] @@ -643,7 +658,7 @@ async def detect(websocket: WebSocket): cpu_usage = psutil.cpu_percent() memory_usage = psutil.virtual_memory().percent if torch.cuda.is_available(): - gpu_usage = torch.cuda.memory_allocated() / (1024 ** 2) + gpu_usage = torch.cuda.utilization() if hasattr(torch.cuda, 'utilization') else None gpu_memory_usage = torch.cuda.memory_reserved() / (1024 ** 2) else: gpu_usage = None @@ -651,10 +666,11 @@ async def detect(websocket: WebSocket): camera_connections = [ { - "cameraIdentifier": camera_id, + "subscriptionIdentifier": stream["subscriptionIdentifier"], "modelId": stream["modelId"], "modelName": stream["modelName"], - "online": True + "online": True, + **{k: v for k, v in get_crop_coords(stream).items() if v is not None} } for camera_id, stream in streams.items() ] @@ -678,7 +694,6 @@ async def detect(websocket: WebSocket): except Exception as e: logger.error(f"Error handling message: {e}") break - try: await websocket.accept() stream_task = asyncio.create_task(process_streams())