add snapshot fetching functionality; implement snapshot reader for HTTP/HTTPS URLs with retry logic
All checks were successful
Build Backend Application and Docker Image / build-docker (push) Successful in 9m25s

This commit is contained in:
Siwat Sirichai 2025-06-28 17:44:25 +07:00
parent a6cf9c20c6
commit b5ae2801c1

172
app.py
View file

@ -5,6 +5,7 @@ import time
import queue
import torch
import cv2
import numpy as np
import base64
import logging
import threading
@ -98,6 +99,28 @@ def download_mpta(url: str, dest_path: str) -> str:
logger.error(f"Exception downloading mpta file from {url}: {str(e)}", exc_info=True)
return None
# Add helper to fetch snapshot image from HTTP/HTTPS URL
def fetch_snapshot(url: str):
try:
response = requests.get(url, timeout=10)
if response.status_code == 200:
# Convert response content to numpy array
nparr = np.frombuffer(response.content, np.uint8)
# Decode image
frame = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
if frame is not None:
logger.debug(f"Successfully fetched snapshot from {url}, shape: {frame.shape}")
return frame
else:
logger.error(f"Failed to decode image from snapshot URL: {url}")
return None
else:
logger.error(f"Failed to fetch snapshot (status code {response.status_code}): {url}")
return None
except Exception as e:
logger.error(f"Exception fetching snapshot from {url}: {str(e)}")
return None
####################################################
# Detection and frame processing functions
####################################################
@ -276,6 +299,72 @@ async def detect(websocket: WebSocket):
if cap and cap.isOpened():
cap.release()
def snapshot_reader(camera_id, snapshot_url, snapshot_interval, buffer, stop_event):
"""Frame reader that fetches snapshots from HTTP/HTTPS URL at specified intervals"""
retries = 0
logger.info(f"Starting snapshot reader thread for camera {camera_id} from {snapshot_url}")
frame_count = 0
last_log_time = time.time()
try:
interval_seconds = snapshot_interval / 1000.0 # Convert milliseconds to seconds
logger.info(f"Snapshot interval for camera {camera_id}: {interval_seconds}s")
while not stop_event.is_set():
try:
start_time = time.time()
frame = fetch_snapshot(snapshot_url)
if frame is None:
logger.warning(f"Failed to fetch snapshot for camera: {camera_id}, retry {retries+1}/{max_retries}")
retries += 1
if retries > max_retries and max_retries != -1:
logger.error(f"Max retries reached for snapshot camera: {camera_id}, stopping reader")
break
time.sleep(min(interval_seconds, reconnect_interval))
continue
# Successfully fetched a frame
frame_count += 1
current_time = time.time()
# Log frame stats every 5 seconds
if current_time - last_log_time > 5:
logger.info(f"Camera {camera_id}: Fetched {frame_count} snapshots in the last {current_time - last_log_time:.1f} seconds")
frame_count = 0
last_log_time = current_time
logger.debug(f"Successfully fetched snapshot from camera {camera_id}, shape: {frame.shape}")
retries = 0
# Overwrite old frame if buffer is full
if not buffer.empty():
try:
buffer.get_nowait()
logger.debug(f"Removed old snapshot from buffer for camera {camera_id}")
except queue.Empty:
pass
buffer.put(frame)
logger.debug(f"Added new snapshot to buffer for camera {camera_id}")
# Wait for the specified interval
elapsed = time.time() - start_time
sleep_time = max(interval_seconds - elapsed, 0)
if sleep_time > 0:
time.sleep(sleep_time)
except Exception as e:
logger.error(f"Unexpected error fetching snapshot for camera {camera_id}: {str(e)}", exc_info=True)
retries += 1
if retries > max_retries and max_retries != -1:
logger.error(f"Max retries reached after error for snapshot camera {camera_id}")
break
time.sleep(min(interval_seconds, reconnect_interval))
except Exception as e:
logger.error(f"Error in snapshot_reader thread for camera {camera_id}: {str(e)}", exc_info=True)
finally:
logger.info(f"Snapshot reader thread for camera {camera_id} is exiting")
async def process_streams():
logger.info("Started processing streams")
try:
@ -370,6 +459,8 @@ async def detect(websocket: WebSocket):
payload = data.get("payload", {})
camera_id = payload.get("cameraIdentifier")
rtsp_url = payload.get("rtspUrl")
snapshot_url = payload.get("snapshotUrl")
snapshot_interval = payload.get("snapshotInterval") # in milliseconds
model_url = payload.get("modelUrl") # may be remote or local
modelId = payload.get("modelId")
modelName = payload.get("modelName")
@ -430,34 +521,60 @@ async def detect(websocket: WebSocket):
"modelId": modelId
}
await websocket.send_json(success_response)
if camera_id and rtsp_url:
if camera_id and (rtsp_url or snapshot_url):
with streams_lock:
if camera_id not in streams and len(streams) < max_streams:
cap = cv2.VideoCapture(rtsp_url)
if not cap.isOpened():
logger.error(f"Failed to open RTSP stream for camera {camera_id}")
continue
buffer = queue.Queue(maxsize=1)
stop_event = threading.Event()
thread = threading.Thread(target=frame_reader, args=(camera_id, cap, buffer, stop_event))
thread.daemon = True
thread.start()
streams[camera_id] = {
"cap": cap,
"buffer": buffer,
"thread": thread,
"rtsp_url": rtsp_url,
"stop_event": stop_event,
"modelId": modelId,
"modelName": modelName
}
logger.info(f"Subscribed to camera {camera_id} with modelId {modelId}, modelName {modelName}, URL {rtsp_url}")
# Choose between snapshot and RTSP based on availability
if snapshot_url and snapshot_interval:
logger.info(f"Using snapshot mode for camera {camera_id}: {snapshot_url}")
thread = threading.Thread(target=snapshot_reader, args=(camera_id, snapshot_url, snapshot_interval, buffer, stop_event))
thread.daemon = True
thread.start()
streams[camera_id] = {
"buffer": buffer,
"thread": thread,
"snapshot_url": snapshot_url,
"snapshot_interval": snapshot_interval,
"stop_event": stop_event,
"modelId": modelId,
"modelName": modelName,
"mode": "snapshot"
}
logger.info(f"Subscribed to camera {camera_id} (snapshot mode) with modelId {modelId}, modelName {modelName}, URL {snapshot_url}, interval {snapshot_interval}ms")
elif rtsp_url:
logger.info(f"Using RTSP mode for camera {camera_id}: {rtsp_url}")
cap = cv2.VideoCapture(rtsp_url)
if not cap.isOpened():
logger.error(f"Failed to open RTSP stream for camera {camera_id}")
continue
thread = threading.Thread(target=frame_reader, args=(camera_id, cap, buffer, stop_event))
thread.daemon = True
thread.start()
streams[camera_id] = {
"cap": cap,
"buffer": buffer,
"thread": thread,
"rtsp_url": rtsp_url,
"stop_event": stop_event,
"modelId": modelId,
"modelName": modelName,
"mode": "rtsp"
}
logger.info(f"Subscribed to camera {camera_id} (RTSP mode) with modelId {modelId}, modelName {modelName}, URL {rtsp_url}")
else:
logger.error(f"No valid URL provided for camera {camera_id}")
continue
elif camera_id and camera_id in streams:
# If already subscribed, unsubscribe
# If already subscribed, unsubscribe first
stream = streams.pop(camera_id)
stream["cap"].release()
logger.info(f"Unsubscribed from camera {camera_id}")
stream["stop_event"].set()
stream["thread"].join()
if "cap" in stream:
stream["cap"].release()
logger.info(f"Unsubscribed from camera {camera_id} for resubscription")
with models_lock:
if camera_id in models and modelId in models[camera_id]:
del models[camera_id][modelId]
@ -472,7 +589,12 @@ async def detect(websocket: WebSocket):
stream = streams.pop(camera_id)
stream["stop_event"].set()
stream["thread"].join()
stream["cap"].release()
# Only release cap if it exists (RTSP mode)
if "cap" in stream:
stream["cap"].release()
logger.info(f"Released RTSP capture for camera {camera_id}")
else:
logger.info(f"Released snapshot reader for camera {camera_id}")
logger.info(f"Unsubscribed from camera {camera_id}")
with models_lock:
if camera_id in models:
@ -532,7 +654,9 @@ async def detect(websocket: WebSocket):
for camera_id, stream in streams.items():
stream["stop_event"].set()
stream["thread"].join()
stream["cap"].release()
# Only release cap if it exists (RTSP mode)
if "cap" in stream:
stream["cap"].release()
while not stream["buffer"].empty():
try:
stream["buffer"].get_nowait()