From b5ae2801c1edc62e42a2bcbed0792abfe9a6209c Mon Sep 17 00:00:00 2001 From: Siwat Sirichai Date: Sat, 28 Jun 2025 17:44:25 +0700 Subject: [PATCH] add snapshot fetching functionality; implement snapshot reader for HTTP/HTTPS URLs with retry logic --- app.py | 172 +++++++++++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 148 insertions(+), 24 deletions(-) diff --git a/app.py b/app.py index fcd2c10..fc492e5 100644 --- a/app.py +++ b/app.py @@ -5,6 +5,7 @@ import time import queue import torch import cv2 +import numpy as np import base64 import logging import threading @@ -98,6 +99,28 @@ def download_mpta(url: str, dest_path: str) -> str: logger.error(f"Exception downloading mpta file from {url}: {str(e)}", exc_info=True) return None +# Add helper to fetch snapshot image from HTTP/HTTPS URL +def fetch_snapshot(url: str): + try: + response = requests.get(url, timeout=10) + if response.status_code == 200: + # Convert response content to numpy array + nparr = np.frombuffer(response.content, np.uint8) + # Decode image + frame = cv2.imdecode(nparr, cv2.IMREAD_COLOR) + if frame is not None: + logger.debug(f"Successfully fetched snapshot from {url}, shape: {frame.shape}") + return frame + else: + logger.error(f"Failed to decode image from snapshot URL: {url}") + return None + else: + logger.error(f"Failed to fetch snapshot (status code {response.status_code}): {url}") + return None + except Exception as e: + logger.error(f"Exception fetching snapshot from {url}: {str(e)}") + return None + #################################################### # Detection and frame processing functions #################################################### @@ -276,6 +299,72 @@ async def detect(websocket: WebSocket): if cap and cap.isOpened(): cap.release() + def snapshot_reader(camera_id, snapshot_url, snapshot_interval, buffer, stop_event): + """Frame reader that fetches snapshots from HTTP/HTTPS URL at specified intervals""" + retries = 0 + logger.info(f"Starting snapshot reader thread for camera {camera_id} from {snapshot_url}") + frame_count = 0 + last_log_time = time.time() + + try: + interval_seconds = snapshot_interval / 1000.0 # Convert milliseconds to seconds + logger.info(f"Snapshot interval for camera {camera_id}: {interval_seconds}s") + + while not stop_event.is_set(): + try: + start_time = time.time() + frame = fetch_snapshot(snapshot_url) + + if frame is None: + logger.warning(f"Failed to fetch snapshot for camera: {camera_id}, retry {retries+1}/{max_retries}") + retries += 1 + if retries > max_retries and max_retries != -1: + logger.error(f"Max retries reached for snapshot camera: {camera_id}, stopping reader") + break + time.sleep(min(interval_seconds, reconnect_interval)) + continue + + # Successfully fetched a frame + frame_count += 1 + current_time = time.time() + # Log frame stats every 5 seconds + if current_time - last_log_time > 5: + logger.info(f"Camera {camera_id}: Fetched {frame_count} snapshots in the last {current_time - last_log_time:.1f} seconds") + frame_count = 0 + last_log_time = current_time + + logger.debug(f"Successfully fetched snapshot from camera {camera_id}, shape: {frame.shape}") + retries = 0 + + # Overwrite old frame if buffer is full + if not buffer.empty(): + try: + buffer.get_nowait() + logger.debug(f"Removed old snapshot from buffer for camera {camera_id}") + except queue.Empty: + pass + + buffer.put(frame) + logger.debug(f"Added new snapshot to buffer for camera {camera_id}") + + # Wait for the specified interval + elapsed = time.time() - start_time + sleep_time = max(interval_seconds - elapsed, 0) + if sleep_time > 0: + time.sleep(sleep_time) + + except Exception as e: + logger.error(f"Unexpected error fetching snapshot for camera {camera_id}: {str(e)}", exc_info=True) + retries += 1 + if retries > max_retries and max_retries != -1: + logger.error(f"Max retries reached after error for snapshot camera {camera_id}") + break + time.sleep(min(interval_seconds, reconnect_interval)) + except Exception as e: + logger.error(f"Error in snapshot_reader thread for camera {camera_id}: {str(e)}", exc_info=True) + finally: + logger.info(f"Snapshot reader thread for camera {camera_id} is exiting") + async def process_streams(): logger.info("Started processing streams") try: @@ -370,6 +459,8 @@ async def detect(websocket: WebSocket): payload = data.get("payload", {}) camera_id = payload.get("cameraIdentifier") rtsp_url = payload.get("rtspUrl") + snapshot_url = payload.get("snapshotUrl") + snapshot_interval = payload.get("snapshotInterval") # in milliseconds model_url = payload.get("modelUrl") # may be remote or local modelId = payload.get("modelId") modelName = payload.get("modelName") @@ -430,34 +521,60 @@ async def detect(websocket: WebSocket): "modelId": modelId } await websocket.send_json(success_response) - - if camera_id and rtsp_url: + if camera_id and (rtsp_url or snapshot_url): with streams_lock: if camera_id not in streams and len(streams) < max_streams: - cap = cv2.VideoCapture(rtsp_url) - if not cap.isOpened(): - logger.error(f"Failed to open RTSP stream for camera {camera_id}") - continue buffer = queue.Queue(maxsize=1) stop_event = threading.Event() - thread = threading.Thread(target=frame_reader, args=(camera_id, cap, buffer, stop_event)) - thread.daemon = True - thread.start() - streams[camera_id] = { - "cap": cap, - "buffer": buffer, - "thread": thread, - "rtsp_url": rtsp_url, - "stop_event": stop_event, - "modelId": modelId, - "modelName": modelName - } - logger.info(f"Subscribed to camera {camera_id} with modelId {modelId}, modelName {modelName}, URL {rtsp_url}") + + # Choose between snapshot and RTSP based on availability + if snapshot_url and snapshot_interval: + logger.info(f"Using snapshot mode for camera {camera_id}: {snapshot_url}") + thread = threading.Thread(target=snapshot_reader, args=(camera_id, snapshot_url, snapshot_interval, buffer, stop_event)) + thread.daemon = True + thread.start() + streams[camera_id] = { + "buffer": buffer, + "thread": thread, + "snapshot_url": snapshot_url, + "snapshot_interval": snapshot_interval, + "stop_event": stop_event, + "modelId": modelId, + "modelName": modelName, + "mode": "snapshot" + } + logger.info(f"Subscribed to camera {camera_id} (snapshot mode) with modelId {modelId}, modelName {modelName}, URL {snapshot_url}, interval {snapshot_interval}ms") + elif rtsp_url: + logger.info(f"Using RTSP mode for camera {camera_id}: {rtsp_url}") + cap = cv2.VideoCapture(rtsp_url) + if not cap.isOpened(): + logger.error(f"Failed to open RTSP stream for camera {camera_id}") + continue + thread = threading.Thread(target=frame_reader, args=(camera_id, cap, buffer, stop_event)) + thread.daemon = True + thread.start() + streams[camera_id] = { + "cap": cap, + "buffer": buffer, + "thread": thread, + "rtsp_url": rtsp_url, + "stop_event": stop_event, + "modelId": modelId, + "modelName": modelName, + "mode": "rtsp" + } + logger.info(f"Subscribed to camera {camera_id} (RTSP mode) with modelId {modelId}, modelName {modelName}, URL {rtsp_url}") + else: + logger.error(f"No valid URL provided for camera {camera_id}") + continue elif camera_id and camera_id in streams: - # If already subscribed, unsubscribe + # If already subscribed, unsubscribe first stream = streams.pop(camera_id) - stream["cap"].release() - logger.info(f"Unsubscribed from camera {camera_id}") + stream["stop_event"].set() + stream["thread"].join() + if "cap" in stream: + stream["cap"].release() + logger.info(f"Unsubscribed from camera {camera_id} for resubscription") with models_lock: if camera_id in models and modelId in models[camera_id]: del models[camera_id][modelId] @@ -472,7 +589,12 @@ async def detect(websocket: WebSocket): stream = streams.pop(camera_id) stream["stop_event"].set() stream["thread"].join() - stream["cap"].release() + # Only release cap if it exists (RTSP mode) + if "cap" in stream: + stream["cap"].release() + logger.info(f"Released RTSP capture for camera {camera_id}") + else: + logger.info(f"Released snapshot reader for camera {camera_id}") logger.info(f"Unsubscribed from camera {camera_id}") with models_lock: if camera_id in models: @@ -532,7 +654,9 @@ async def detect(websocket: WebSocket): for camera_id, stream in streams.items(): stream["stop_event"].set() stream["thread"].join() - stream["cap"].release() + # Only release cap if it exists (RTSP mode) + if "cap" in stream: + stream["cap"].release() while not stream["buffer"].empty(): try: stream["buffer"].get_nowait()