Fix: 401 and buffer 404
This commit is contained in:
parent
e6716bbe73
commit
7f9cc3de8d
1 changed files with 66 additions and 19 deletions
85
app.py
85
app.py
|
@ -35,6 +35,8 @@ session_ids: Dict[str, int] = {}
|
||||||
camera_streams: Dict[str, Dict[str, Any]] = {}
|
camera_streams: Dict[str, Dict[str, Any]] = {}
|
||||||
# Map subscriptions to their camera URL
|
# Map subscriptions to their camera URL
|
||||||
subscription_to_camera: Dict[str, str] = {}
|
subscription_to_camera: Dict[str, str] = {}
|
||||||
|
# Store latest frames for REST API access (separate from processing buffer)
|
||||||
|
latest_frames: Dict[str, Any] = {}
|
||||||
|
|
||||||
with open("config.json", "r") as f:
|
with open("config.json", "r") as f:
|
||||||
config = json.load(f)
|
config = json.load(f)
|
||||||
|
@ -109,20 +111,60 @@ def download_mpta(url: str, dest_path: str) -> str:
|
||||||
# Add helper to fetch snapshot image from HTTP/HTTPS URL
|
# Add helper to fetch snapshot image from HTTP/HTTPS URL
|
||||||
def fetch_snapshot(url: str):
|
def fetch_snapshot(url: str):
|
||||||
try:
|
try:
|
||||||
response = requests.get(url, timeout=10)
|
from requests.auth import HTTPBasicAuth, HTTPDigestAuth
|
||||||
|
|
||||||
|
# Parse URL to extract credentials
|
||||||
|
parsed = urlparse(url)
|
||||||
|
|
||||||
|
# Prepare headers - some cameras require User-Agent
|
||||||
|
headers = {
|
||||||
|
'User-Agent': 'Mozilla/5.0 (compatible; DetectorWorker/1.0)'
|
||||||
|
}
|
||||||
|
|
||||||
|
# Reconstruct URL without credentials
|
||||||
|
clean_url = f"{parsed.scheme}://{parsed.hostname}"
|
||||||
|
if parsed.port:
|
||||||
|
clean_url += f":{parsed.port}"
|
||||||
|
clean_url += parsed.path
|
||||||
|
if parsed.query:
|
||||||
|
clean_url += f"?{parsed.query}"
|
||||||
|
|
||||||
|
auth = None
|
||||||
|
if parsed.username and parsed.password:
|
||||||
|
# Try HTTP Digest authentication first (common for IP cameras)
|
||||||
|
try:
|
||||||
|
auth = HTTPDigestAuth(parsed.username, parsed.password)
|
||||||
|
response = requests.get(clean_url, auth=auth, headers=headers, timeout=10)
|
||||||
|
if response.status_code == 200:
|
||||||
|
logger.debug(f"Successfully authenticated using HTTP Digest for {clean_url}")
|
||||||
|
elif response.status_code == 401:
|
||||||
|
# If Digest fails, try Basic auth
|
||||||
|
logger.debug(f"HTTP Digest failed, trying Basic auth for {clean_url}")
|
||||||
|
auth = HTTPBasicAuth(parsed.username, parsed.password)
|
||||||
|
response = requests.get(clean_url, auth=auth, headers=headers, timeout=10)
|
||||||
|
if response.status_code == 200:
|
||||||
|
logger.debug(f"Successfully authenticated using HTTP Basic for {clean_url}")
|
||||||
|
except Exception as auth_error:
|
||||||
|
logger.debug(f"Authentication setup error: {auth_error}")
|
||||||
|
# Fallback to original URL with embedded credentials
|
||||||
|
response = requests.get(url, headers=headers, timeout=10)
|
||||||
|
else:
|
||||||
|
# No credentials in URL, make request as-is
|
||||||
|
response = requests.get(url, headers=headers, timeout=10)
|
||||||
|
|
||||||
if response.status_code == 200:
|
if response.status_code == 200:
|
||||||
# Convert response content to numpy array
|
# Convert response content to numpy array
|
||||||
nparr = np.frombuffer(response.content, np.uint8)
|
nparr = np.frombuffer(response.content, np.uint8)
|
||||||
# Decode image
|
# Decode image
|
||||||
frame = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
|
frame = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
|
||||||
if frame is not None:
|
if frame is not None:
|
||||||
logger.debug(f"Successfully fetched snapshot from {url}, shape: {frame.shape}")
|
logger.debug(f"Successfully fetched snapshot from {clean_url}, shape: {frame.shape}")
|
||||||
return frame
|
return frame
|
||||||
else:
|
else:
|
||||||
logger.error(f"Failed to decode image from snapshot URL: {url}")
|
logger.error(f"Failed to decode image from snapshot URL: {clean_url}")
|
||||||
return None
|
return None
|
||||||
else:
|
else:
|
||||||
logger.error(f"Failed to fetch snapshot (status code {response.status_code}): {url}")
|
logger.error(f"Failed to fetch snapshot (status code {response.status_code}): {clean_url}")
|
||||||
return None
|
return None
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Exception fetching snapshot from {url}: {str(e)}")
|
logger.error(f"Exception fetching snapshot from {url}: {str(e)}")
|
||||||
|
@ -146,26 +188,24 @@ async def get_camera_image(camera_id: str):
|
||||||
Get the current frame from a camera as JPEG image
|
Get the current frame from a camera as JPEG image
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
|
# URL decode the camera_id to handle encoded characters like %3B for semicolon
|
||||||
|
from urllib.parse import unquote
|
||||||
|
original_camera_id = camera_id
|
||||||
|
camera_id = unquote(camera_id)
|
||||||
|
logger.debug(f"REST API request: original='{original_camera_id}', decoded='{camera_id}'")
|
||||||
|
|
||||||
with streams_lock:
|
with streams_lock:
|
||||||
if camera_id not in streams:
|
if camera_id not in streams:
|
||||||
logger.warning(f"Camera ID '{camera_id}' not found in streams. Current streams: {list(streams.keys())}")
|
logger.warning(f"Camera ID '{camera_id}' not found in streams. Current streams: {list(streams.keys())}")
|
||||||
raise HTTPException(status_code=404, detail=f"Camera {camera_id} not found or not active")
|
raise HTTPException(status_code=404, detail=f"Camera {camera_id} not found or not active")
|
||||||
|
|
||||||
stream = streams[camera_id]
|
# Check if we have a cached frame for this camera
|
||||||
buffer = stream["buffer"]
|
if camera_id not in latest_frames:
|
||||||
logger.debug(f"Camera '{camera_id}' buffer size: {buffer.qsize()}, buffer empty: {buffer.empty()}")
|
logger.warning(f"No cached frame available for camera '{camera_id}'.")
|
||||||
logger.debug(f"Buffer queue contents: {getattr(buffer, 'queue', None)}")
|
|
||||||
|
|
||||||
if buffer.empty():
|
|
||||||
logger.warning(f"No frame available for camera '{camera_id}'. Buffer is empty.")
|
|
||||||
raise HTTPException(status_code=404, detail=f"No frame available for camera {camera_id}")
|
raise HTTPException(status_code=404, detail=f"No frame available for camera {camera_id}")
|
||||||
|
|
||||||
# Get the latest frame (non-blocking)
|
frame = latest_frames[camera_id]
|
||||||
try:
|
logger.debug(f"Retrieved cached frame for camera '{camera_id}', frame shape: {frame.shape}")
|
||||||
frame = buffer.queue[-1] # Get the most recent frame without removing it
|
|
||||||
except IndexError:
|
|
||||||
logger.warning(f"Buffer queue is empty for camera '{camera_id}' when trying to access last frame.")
|
|
||||||
raise HTTPException(status_code=404, detail=f"No frame available for camera {camera_id}")
|
|
||||||
# Encode frame as JPEG
|
# Encode frame as JPEG
|
||||||
success, buffer_img = cv2.imencode('.jpg', frame, [cv2.IMWRITE_JPEG_QUALITY, 85])
|
success, buffer_img = cv2.imencode('.jpg', frame, [cv2.IMWRITE_JPEG_QUALITY, 85])
|
||||||
if not success:
|
if not success:
|
||||||
|
@ -476,6 +516,10 @@ async def detect(websocket: WebSocket):
|
||||||
logger.debug(f"Got frame from buffer for camera {camera_id}")
|
logger.debug(f"Got frame from buffer for camera {camera_id}")
|
||||||
frame = buffer.get()
|
frame = buffer.get()
|
||||||
|
|
||||||
|
# Cache the frame for REST API access
|
||||||
|
latest_frames[camera_id] = frame.copy()
|
||||||
|
logger.debug(f"Cached frame for REST API access for camera {camera_id}")
|
||||||
|
|
||||||
with models_lock:
|
with models_lock:
|
||||||
model_tree = models.get(camera_id, {}).get(stream["modelId"])
|
model_tree = models.get(camera_id, {}).get(stream["modelId"])
|
||||||
if not model_tree:
|
if not model_tree:
|
||||||
|
@ -647,7 +691,7 @@ async def detect(websocket: WebSocket):
|
||||||
|
|
||||||
if snapshot_url and snapshot_interval:
|
if snapshot_url and snapshot_interval:
|
||||||
logger.info(f"Creating new snapshot stream for camera {camera_id}: {snapshot_url}")
|
logger.info(f"Creating new snapshot stream for camera {camera_id}: {snapshot_url}")
|
||||||
thread = threading.Thread(target=snapshot_reader, args=(camera_identifier, snapshot_url, snapshot_interval, buffer, stop_event))
|
thread = threading.Thread(target=snapshot_reader, args=(camera_id, snapshot_url, snapshot_interval, buffer, stop_event))
|
||||||
thread.daemon = True
|
thread.daemon = True
|
||||||
thread.start()
|
thread.start()
|
||||||
mode = "snapshot"
|
mode = "snapshot"
|
||||||
|
@ -670,7 +714,7 @@ async def detect(websocket: WebSocket):
|
||||||
if not cap.isOpened():
|
if not cap.isOpened():
|
||||||
logger.error(f"Failed to open RTSP stream for camera {camera_id}")
|
logger.error(f"Failed to open RTSP stream for camera {camera_id}")
|
||||||
continue
|
continue
|
||||||
thread = threading.Thread(target=frame_reader, args=(camera_identifier, cap, buffer, stop_event))
|
thread = threading.Thread(target=frame_reader, args=(camera_id, cap, buffer, stop_event))
|
||||||
thread.daemon = True
|
thread.daemon = True
|
||||||
thread.start()
|
thread.start()
|
||||||
mode = "rtsp"
|
mode = "rtsp"
|
||||||
|
@ -744,6 +788,8 @@ async def detect(websocket: WebSocket):
|
||||||
else:
|
else:
|
||||||
logger.info(f"Shared stream for {camera_url} still has {shared_stream['ref_count']} references")
|
logger.info(f"Shared stream for {camera_url} still has {shared_stream['ref_count']} references")
|
||||||
|
|
||||||
|
# Clean up cached frame
|
||||||
|
latest_frames.pop(camera_id, None)
|
||||||
logger.info(f"Unsubscribed from camera {camera_id}")
|
logger.info(f"Unsubscribed from camera {camera_id}")
|
||||||
# Note: Keep models in memory for potential reuse
|
# Note: Keep models in memory for potential reuse
|
||||||
elif msg_type == "requestState":
|
elif msg_type == "requestState":
|
||||||
|
@ -847,5 +893,6 @@ async def detect(websocket: WebSocket):
|
||||||
subscription_to_camera.clear()
|
subscription_to_camera.clear()
|
||||||
with models_lock:
|
with models_lock:
|
||||||
models.clear()
|
models.clear()
|
||||||
|
latest_frames.clear()
|
||||||
session_ids.clear()
|
session_ids.clear()
|
||||||
logger.info("WebSocket connection closed")
|
logger.info("WebSocket connection closed")
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue