diff --git a/app.py b/app.py index 60beb27..d78b2bf 100644 --- a/app.py +++ b/app.py @@ -35,6 +35,8 @@ session_ids: Dict[str, int] = {} camera_streams: Dict[str, Dict[str, Any]] = {} # Map subscriptions to their camera URL subscription_to_camera: Dict[str, str] = {} +# Store latest frames for REST API access (separate from processing buffer) +latest_frames: Dict[str, Any] = {} with open("config.json", "r") as f: config = json.load(f) @@ -109,20 +111,60 @@ def download_mpta(url: str, dest_path: str) -> str: # Add helper to fetch snapshot image from HTTP/HTTPS URL def fetch_snapshot(url: str): try: - response = requests.get(url, timeout=10) + from requests.auth import HTTPBasicAuth, HTTPDigestAuth + + # Parse URL to extract credentials + parsed = urlparse(url) + + # Prepare headers - some cameras require User-Agent + headers = { + 'User-Agent': 'Mozilla/5.0 (compatible; DetectorWorker/1.0)' + } + + # Reconstruct URL without credentials + clean_url = f"{parsed.scheme}://{parsed.hostname}" + if parsed.port: + clean_url += f":{parsed.port}" + clean_url += parsed.path + if parsed.query: + clean_url += f"?{parsed.query}" + + auth = None + if parsed.username and parsed.password: + # Try HTTP Digest authentication first (common for IP cameras) + try: + auth = HTTPDigestAuth(parsed.username, parsed.password) + response = requests.get(clean_url, auth=auth, headers=headers, timeout=10) + if response.status_code == 200: + logger.debug(f"Successfully authenticated using HTTP Digest for {clean_url}") + elif response.status_code == 401: + # If Digest fails, try Basic auth + logger.debug(f"HTTP Digest failed, trying Basic auth for {clean_url}") + auth = HTTPBasicAuth(parsed.username, parsed.password) + response = requests.get(clean_url, auth=auth, headers=headers, timeout=10) + if response.status_code == 200: + logger.debug(f"Successfully authenticated using HTTP Basic for {clean_url}") + except Exception as auth_error: + logger.debug(f"Authentication setup error: {auth_error}") + # Fallback to original URL with embedded credentials + response = requests.get(url, headers=headers, timeout=10) + else: + # No credentials in URL, make request as-is + response = requests.get(url, headers=headers, timeout=10) + if response.status_code == 200: # Convert response content to numpy array nparr = np.frombuffer(response.content, np.uint8) # Decode image frame = cv2.imdecode(nparr, cv2.IMREAD_COLOR) if frame is not None: - logger.debug(f"Successfully fetched snapshot from {url}, shape: {frame.shape}") + logger.debug(f"Successfully fetched snapshot from {clean_url}, shape: {frame.shape}") return frame else: - logger.error(f"Failed to decode image from snapshot URL: {url}") + logger.error(f"Failed to decode image from snapshot URL: {clean_url}") return None else: - logger.error(f"Failed to fetch snapshot (status code {response.status_code}): {url}") + logger.error(f"Failed to fetch snapshot (status code {response.status_code}): {clean_url}") return None except Exception as e: logger.error(f"Exception fetching snapshot from {url}: {str(e)}") @@ -146,26 +188,24 @@ async def get_camera_image(camera_id: str): Get the current frame from a camera as JPEG image """ try: + # URL decode the camera_id to handle encoded characters like %3B for semicolon + from urllib.parse import unquote + original_camera_id = camera_id + camera_id = unquote(camera_id) + logger.debug(f"REST API request: original='{original_camera_id}', decoded='{camera_id}'") + with streams_lock: if camera_id not in streams: logger.warning(f"Camera ID '{camera_id}' not found in streams. Current streams: {list(streams.keys())}") raise HTTPException(status_code=404, detail=f"Camera {camera_id} not found or not active") - stream = streams[camera_id] - buffer = stream["buffer"] - logger.debug(f"Camera '{camera_id}' buffer size: {buffer.qsize()}, buffer empty: {buffer.empty()}") - logger.debug(f"Buffer queue contents: {getattr(buffer, 'queue', None)}") - - if buffer.empty(): - logger.warning(f"No frame available for camera '{camera_id}'. Buffer is empty.") + # Check if we have a cached frame for this camera + if camera_id not in latest_frames: + logger.warning(f"No cached frame available for camera '{camera_id}'.") raise HTTPException(status_code=404, detail=f"No frame available for camera {camera_id}") - # Get the latest frame (non-blocking) - try: - frame = buffer.queue[-1] # Get the most recent frame without removing it - except IndexError: - logger.warning(f"Buffer queue is empty for camera '{camera_id}' when trying to access last frame.") - raise HTTPException(status_code=404, detail=f"No frame available for camera {camera_id}") + frame = latest_frames[camera_id] + logger.debug(f"Retrieved cached frame for camera '{camera_id}', frame shape: {frame.shape}") # Encode frame as JPEG success, buffer_img = cv2.imencode('.jpg', frame, [cv2.IMWRITE_JPEG_QUALITY, 85]) if not success: @@ -476,6 +516,10 @@ async def detect(websocket: WebSocket): logger.debug(f"Got frame from buffer for camera {camera_id}") frame = buffer.get() + # Cache the frame for REST API access + latest_frames[camera_id] = frame.copy() + logger.debug(f"Cached frame for REST API access for camera {camera_id}") + with models_lock: model_tree = models.get(camera_id, {}).get(stream["modelId"]) if not model_tree: @@ -647,7 +691,7 @@ async def detect(websocket: WebSocket): if snapshot_url and snapshot_interval: logger.info(f"Creating new snapshot stream for camera {camera_id}: {snapshot_url}") - thread = threading.Thread(target=snapshot_reader, args=(camera_identifier, snapshot_url, snapshot_interval, buffer, stop_event)) + thread = threading.Thread(target=snapshot_reader, args=(camera_id, snapshot_url, snapshot_interval, buffer, stop_event)) thread.daemon = True thread.start() mode = "snapshot" @@ -670,7 +714,7 @@ async def detect(websocket: WebSocket): if not cap.isOpened(): logger.error(f"Failed to open RTSP stream for camera {camera_id}") continue - thread = threading.Thread(target=frame_reader, args=(camera_identifier, cap, buffer, stop_event)) + thread = threading.Thread(target=frame_reader, args=(camera_id, cap, buffer, stop_event)) thread.daemon = True thread.start() mode = "rtsp" @@ -744,6 +788,8 @@ async def detect(websocket: WebSocket): else: logger.info(f"Shared stream for {camera_url} still has {shared_stream['ref_count']} references") + # Clean up cached frame + latest_frames.pop(camera_id, None) logger.info(f"Unsubscribed from camera {camera_id}") # Note: Keep models in memory for potential reuse elif msg_type == "requestState": @@ -847,5 +893,6 @@ async def detect(websocket: WebSocket): subscription_to_camera.clear() with models_lock: models.clear() + latest_frames.clear() session_ids.clear() logger.info("WebSocket connection closed")