add crop coordinates handling in camera stream management; update logging and refactor subscription identifiers
All checks were successful
Build Backend Application and Docker Image / build-docker (push) Successful in 8m38s

This commit is contained in:
Siwat Sirichai 2025-07-14 01:46:22 +07:00
parent 39d49ba617
commit 3c67fa933c

105
app.py
View file

@ -122,6 +122,15 @@ def fetch_snapshot(url: str):
logger.error(f"Exception fetching snapshot from {url}: {str(e)}") logger.error(f"Exception fetching snapshot from {url}: {str(e)}")
return None return None
# Helper to get crop coordinates from stream
def get_crop_coords(stream):
return {
"cropX1": stream.get("cropX1"),
"cropY1": stream.get("cropY1"),
"cropX2": stream.get("cropX2"),
"cropY2": stream.get("cropY2")
}
#################################################### ####################################################
# REST API endpoint for image retrieval # REST API endpoint for image retrieval
#################################################### ####################################################
@ -133,20 +142,24 @@ async def get_camera_image(camera_id: str):
try: try:
with streams_lock: with streams_lock:
if camera_id not in streams: if camera_id not in streams:
logger.warning(f"Camera ID '{camera_id}' not found in streams. Current streams: {list(streams.keys())}")
raise HTTPException(status_code=404, detail=f"Camera {camera_id} not found or not active") raise HTTPException(status_code=404, detail=f"Camera {camera_id} not found or not active")
stream = streams[camera_id] stream = streams[camera_id]
buffer = stream["buffer"] buffer = stream["buffer"]
logger.debug(f"Camera '{camera_id}' buffer size: {buffer.qsize()}, buffer empty: {buffer.empty()}")
logger.debug(f"Buffer queue contents: {getattr(buffer, 'queue', None)}")
if buffer.empty(): if buffer.empty():
logger.warning(f"No frame available for camera '{camera_id}'. Buffer is empty.")
raise HTTPException(status_code=404, detail=f"No frame available for camera {camera_id}") raise HTTPException(status_code=404, detail=f"No frame available for camera {camera_id}")
# Get the latest frame (non-blocking) # Get the latest frame (non-blocking)
try: try:
frame = buffer.queue[-1] # Get the most recent frame without removing it frame = buffer.queue[-1] # Get the most recent frame without removing it
except IndexError: except IndexError:
logger.warning(f"Buffer queue is empty for camera '{camera_id}' when trying to access last frame.")
raise HTTPException(status_code=404, detail=f"No frame available for camera {camera_id}") raise HTTPException(status_code=404, detail=f"No frame available for camera {camera_id}")
# Encode frame as JPEG # Encode frame as JPEG
success, buffer_img = cv2.imencode('.jpg', frame, [cv2.IMWRITE_JPEG_QUALITY, 85]) success, buffer_img = cv2.imencode('.jpg', frame, [cv2.IMWRITE_JPEG_QUALITY, 85])
if not success: if not success:
@ -224,8 +237,8 @@ async def detect(websocket: WebSocket):
detection_data = { detection_data = {
"type": "imageDetection", "type": "imageDetection",
"cameraIdentifier": camera_id, "subscriptionIdentifier": stream["subscriptionIdentifier"],
"timestamp": time.time(), "timestamp": time.strftime("%Y-%m-%dT%H:%M:%S.%fZ", time.gmtime()),
"data": { "data": {
"detection": highest_confidence_detection, # Send only the highest confidence detection "detection": highest_confidence_detection, # Send only the highest confidence detection
"modelId": stream["modelId"], "modelId": stream["modelId"],
@ -304,12 +317,11 @@ async def detect(websocket: WebSocket):
if not buffer.empty(): if not buffer.empty():
try: try:
buffer.get_nowait() buffer.get_nowait()
logger.debug(f"Removed old frame from buffer for camera {camera_id}") logger.debug(f"[frame_reader] Removed old frame from buffer for camera {camera_id}")
except queue.Empty: except queue.Empty:
pass pass
buffer.put(frame) buffer.put(frame)
logger.debug(f"Added new frame to buffer for camera {camera_id}") logger.debug(f"[frame_reader] Added new frame to buffer for camera {camera_id}. Buffer size: {buffer.qsize()}")
# Short sleep to avoid CPU overuse # Short sleep to avoid CPU overuse
time.sleep(0.01) time.sleep(0.01)
@ -380,12 +392,11 @@ async def detect(websocket: WebSocket):
if not buffer.empty(): if not buffer.empty():
try: try:
buffer.get_nowait() buffer.get_nowait()
logger.debug(f"Removed old snapshot from buffer for camera {camera_id}") logger.debug(f"[snapshot_reader] Removed old snapshot from buffer for camera {camera_id}")
except queue.Empty: except queue.Empty:
pass pass
buffer.put(frame) buffer.put(frame)
logger.debug(f"Added new snapshot to buffer for camera {camera_id}") logger.debug(f"[snapshot_reader] Added new snapshot to buffer for camera {camera_id}. Buffer size: {buffer.qsize()}")
# Wait for the specified interval # Wait for the specified interval
elapsed = time.time() - start_time elapsed = time.time() - start_time
@ -456,18 +467,19 @@ async def detect(websocket: WebSocket):
cpu_usage = psutil.cpu_percent() cpu_usage = psutil.cpu_percent()
memory_usage = psutil.virtual_memory().percent memory_usage = psutil.virtual_memory().percent
if torch.cuda.is_available(): if torch.cuda.is_available():
gpu_usage = torch.cuda.memory_allocated() / (1024 ** 2) # MB gpu_usage = torch.cuda.utilization() if hasattr(torch.cuda, 'utilization') else None
gpu_memory_usage = torch.cuda.memory_reserved() / (1024 ** 2) # MB gpu_memory_usage = torch.cuda.memory_reserved() / (1024 ** 2)
else: else:
gpu_usage = None gpu_usage = None
gpu_memory_usage = None gpu_memory_usage = None
camera_connections = [ camera_connections = [
{ {
"cameraIdentifier": camera_id, "subscriptionIdentifier": stream["subscriptionIdentifier"],
"modelId": stream["modelId"], "modelId": stream["modelId"],
"modelName": stream["modelName"], "modelName": stream["modelName"],
"online": True "online": True,
**{k: v for k, v in get_crop_coords(stream).items() if v is not None}
} }
for camera_id, stream in streams.items() for camera_id, stream in streams.items()
] ]
@ -497,13 +509,19 @@ async def detect(websocket: WebSocket):
if msg_type == "subscribe": if msg_type == "subscribe":
payload = data.get("payload", {}) payload = data.get("payload", {})
camera_id = payload.get("cameraIdentifier") subscriptionIdentifier = payload.get("subscriptionIdentifier")
rtsp_url = payload.get("rtspUrl") rtsp_url = payload.get("rtspUrl")
snapshot_url = payload.get("snapshotUrl") snapshot_url = payload.get("snapshotUrl")
snapshot_interval = payload.get("snapshotInterval") # in milliseconds snapshot_interval = payload.get("snapshotInterval")
model_url = payload.get("modelUrl") # may be remote or local model_url = payload.get("modelUrl")
modelId = payload.get("modelId") modelId = payload.get("modelId")
modelName = payload.get("modelName") modelName = payload.get("modelName")
cropX1 = payload.get("cropX1")
cropY1 = payload.get("cropY1")
cropX2 = payload.get("cropX2")
cropY2 = payload.get("cropY2")
camera_id = subscriptionIdentifier # Use subscriptionIdentifier as camera_id for mapping
if model_url: if model_url:
with models_lock: with models_lock:
@ -566,24 +584,30 @@ async def detect(websocket: WebSocket):
if camera_id not in streams and len(streams) < max_streams: if camera_id not in streams and len(streams) < max_streams:
buffer = queue.Queue(maxsize=1) buffer = queue.Queue(maxsize=1)
stop_event = threading.Event() stop_event = threading.Event()
stream_info = {
# Choose between snapshot and RTSP based on availability "buffer": buffer,
"thread": None,
"stop_event": stop_event,
"modelId": modelId,
"modelName": modelName,
"subscriptionIdentifier": subscriptionIdentifier,
"cropX1": cropX1,
"cropY1": cropY1,
"cropX2": cropX2,
"cropY2": cropY2
}
if snapshot_url and snapshot_interval: if snapshot_url and snapshot_interval:
logger.info(f"Using snapshot mode for camera {camera_id}: {snapshot_url}") logger.info(f"Using snapshot mode for camera {camera_id}: {snapshot_url}")
thread = threading.Thread(target=snapshot_reader, args=(camera_id, snapshot_url, snapshot_interval, buffer, stop_event)) thread = threading.Thread(target=snapshot_reader, args=(camera_id, snapshot_url, snapshot_interval, buffer, stop_event))
thread.daemon = True thread.daemon = True
thread.start() thread.start()
streams[camera_id] = { stream_info.update({
"buffer": buffer,
"thread": thread,
"snapshot_url": snapshot_url, "snapshot_url": snapshot_url,
"snapshot_interval": snapshot_interval, "snapshot_interval": snapshot_interval,
"stop_event": stop_event,
"modelId": modelId,
"modelName": modelName,
"mode": "snapshot" "mode": "snapshot"
} })
logger.info(f"Subscribed to camera {camera_id} (snapshot mode) with modelId {modelId}, modelName {modelName}, URL {snapshot_url}, interval {snapshot_interval}ms") stream_info["thread"] = thread
streams[camera_id] = stream_info
elif rtsp_url: elif rtsp_url:
logger.info(f"Using RTSP mode for camera {camera_id}: {rtsp_url}") logger.info(f"Using RTSP mode for camera {camera_id}: {rtsp_url}")
cap = cv2.VideoCapture(rtsp_url) cap = cv2.VideoCapture(rtsp_url)
@ -593,17 +617,13 @@ async def detect(websocket: WebSocket):
thread = threading.Thread(target=frame_reader, args=(camera_id, cap, buffer, stop_event)) thread = threading.Thread(target=frame_reader, args=(camera_id, cap, buffer, stop_event))
thread.daemon = True thread.daemon = True
thread.start() thread.start()
streams[camera_id] = { stream_info.update({
"cap": cap, "cap": cap,
"buffer": buffer,
"thread": thread,
"rtsp_url": rtsp_url, "rtsp_url": rtsp_url,
"stop_event": stop_event,
"modelId": modelId,
"modelName": modelName,
"mode": "rtsp" "mode": "rtsp"
} })
logger.info(f"Subscribed to camera {camera_id} (RTSP mode) with modelId {modelId}, modelName {modelName}, URL {rtsp_url}") stream_info["thread"] = thread
streams[camera_id] = stream_info
else: else:
logger.error(f"No valid URL provided for camera {camera_id}") logger.error(f"No valid URL provided for camera {camera_id}")
continue continue
@ -622,20 +642,15 @@ async def detect(websocket: WebSocket):
del models[camera_id] del models[camera_id]
elif msg_type == "unsubscribe": elif msg_type == "unsubscribe":
payload = data.get("payload", {}) payload = data.get("payload", {})
camera_id = payload.get("cameraIdentifier") subscriptionIdentifier = payload.get("subscriptionIdentifier")
logger.debug(f"Unsubscribing from camera {camera_id}") camera_id = subscriptionIdentifier
with streams_lock: with streams_lock:
if camera_id and camera_id in streams: if camera_id and camera_id in streams:
stream = streams.pop(camera_id) stream = streams.pop(camera_id)
stream["stop_event"].set() stream["stop_event"].set()
stream["thread"].join() stream["thread"].join()
# Only release cap if it exists (RTSP mode)
if "cap" in stream: if "cap" in stream:
stream["cap"].release() stream["cap"].release()
logger.info(f"Released RTSP capture for camera {camera_id}")
else:
logger.info(f"Released snapshot reader for camera {camera_id}")
logger.info(f"Unsubscribed from camera {camera_id}")
with models_lock: with models_lock:
if camera_id in models: if camera_id in models:
del models[camera_id] del models[camera_id]
@ -643,7 +658,7 @@ async def detect(websocket: WebSocket):
cpu_usage = psutil.cpu_percent() cpu_usage = psutil.cpu_percent()
memory_usage = psutil.virtual_memory().percent memory_usage = psutil.virtual_memory().percent
if torch.cuda.is_available(): if torch.cuda.is_available():
gpu_usage = torch.cuda.memory_allocated() / (1024 ** 2) gpu_usage = torch.cuda.utilization() if hasattr(torch.cuda, 'utilization') else None
gpu_memory_usage = torch.cuda.memory_reserved() / (1024 ** 2) gpu_memory_usage = torch.cuda.memory_reserved() / (1024 ** 2)
else: else:
gpu_usage = None gpu_usage = None
@ -651,10 +666,11 @@ async def detect(websocket: WebSocket):
camera_connections = [ camera_connections = [
{ {
"cameraIdentifier": camera_id, "subscriptionIdentifier": stream["subscriptionIdentifier"],
"modelId": stream["modelId"], "modelId": stream["modelId"],
"modelName": stream["modelName"], "modelName": stream["modelName"],
"online": True "online": True,
**{k: v for k, v in get_crop_coords(stream).items() if v is not None}
} }
for camera_id, stream in streams.items() for camera_id, stream in streams.items()
] ]
@ -678,7 +694,6 @@ async def detect(websocket: WebSocket):
except Exception as e: except Exception as e:
logger.error(f"Error handling message: {e}") logger.error(f"Error handling message: {e}")
break break
try: try:
await websocket.accept() await websocket.accept()
stream_task = asyncio.create_task(process_streams()) stream_task = asyncio.create_task(process_streams())