Compare commits
1 commit
dev
...
feat/optim
Author | SHA1 | Date | |
---|---|---|---|
|
78cb5f53c9 |
7 changed files with 182 additions and 3386 deletions
1
.gitignore
vendored
1
.gitignore
vendored
|
@ -12,4 +12,3 @@ detector_worker.log
|
||||||
no_frame_debug.log
|
no_frame_debug.log
|
||||||
|
|
||||||
feeder/
|
feeder/
|
||||||
.venv/
|
|
||||||
|
|
375
app.py
375
app.py
|
@ -13,13 +13,7 @@ import requests
|
||||||
import asyncio
|
import asyncio
|
||||||
import psutil
|
import psutil
|
||||||
import zipfile
|
import zipfile
|
||||||
import ssl
|
|
||||||
import urllib3
|
|
||||||
import subprocess
|
|
||||||
import tempfile
|
|
||||||
from urllib.parse import urlparse
|
from urllib.parse import urlparse
|
||||||
from requests.adapters import HTTPAdapter
|
|
||||||
from urllib3.util.ssl_ import create_urllib3_context
|
|
||||||
from fastapi import FastAPI, WebSocket, HTTPException
|
from fastapi import FastAPI, WebSocket, HTTPException
|
||||||
from fastapi.websockets import WebSocketDisconnect
|
from fastapi.websockets import WebSocketDisconnect
|
||||||
from fastapi.responses import Response
|
from fastapi.responses import Response
|
||||||
|
@ -246,14 +240,16 @@ async def detect(websocket: WebSocket):
|
||||||
logger.debug(f"Processing frame for camera {camera_id} with model {stream['modelId']}")
|
logger.debug(f"Processing frame for camera {camera_id} with model {stream['modelId']}")
|
||||||
start_time = time.time()
|
start_time = time.time()
|
||||||
|
|
||||||
# Extract display identifier for pipeline context
|
# Extract display identifier for session ID lookup
|
||||||
subscription_parts = stream["subscriptionIdentifier"].split(';')
|
subscription_parts = stream["subscriptionIdentifier"].split(';')
|
||||||
display_identifier = subscription_parts[0] if subscription_parts else None
|
display_identifier = subscription_parts[0] if subscription_parts else None
|
||||||
|
session_id = session_ids.get(display_identifier) if display_identifier else None
|
||||||
|
|
||||||
# Create context for pipeline execution (session_id will be generated by pipeline)
|
# Create context for pipeline execution
|
||||||
pipeline_context = {
|
pipeline_context = {
|
||||||
"camera_id": camera_id,
|
"camera_id": camera_id,
|
||||||
"display_id": display_identifier
|
"display_id": display_identifier,
|
||||||
|
"session_id": session_id
|
||||||
}
|
}
|
||||||
|
|
||||||
detection_result = run_pipeline(cropped_frame, model_tree, context=pipeline_context)
|
detection_result = run_pipeline(cropped_frame, model_tree, context=pipeline_context)
|
||||||
|
@ -263,63 +259,57 @@ async def detect(websocket: WebSocket):
|
||||||
# Log the raw detection result for debugging
|
# Log the raw detection result for debugging
|
||||||
logger.debug(f"Raw detection result for camera {camera_id}:\n{json.dumps(detection_result, indent=2, default=str)}")
|
logger.debug(f"Raw detection result for camera {camera_id}:\n{json.dumps(detection_result, indent=2, default=str)}")
|
||||||
|
|
||||||
# Extract session_id from pipeline result (generated during database record creation)
|
# Direct class result (no detections/classifications structure)
|
||||||
session_id = None
|
if detection_result and isinstance(detection_result, dict) and "class" in detection_result and "confidence" in detection_result:
|
||||||
if detection_result and isinstance(detection_result, dict):
|
|
||||||
# Check if pipeline generated a session_id (happens when Car+Frontal detected together)
|
|
||||||
if "session_id" in detection_result:
|
|
||||||
session_id = detection_result["session_id"]
|
|
||||||
logger.debug(f"Extracted session_id from pipeline result: {session_id}")
|
|
||||||
|
|
||||||
# Process detection result - run_pipeline returns the primary detection directly
|
|
||||||
if detection_result and isinstance(detection_result, dict) and "class" in detection_result:
|
|
||||||
highest_confidence_detection = detection_result
|
|
||||||
else:
|
|
||||||
# No detection found
|
|
||||||
highest_confidence_detection = {
|
highest_confidence_detection = {
|
||||||
|
"class": detection_result.get("class", "none"),
|
||||||
|
"confidence": detection_result.get("confidence", 1.0),
|
||||||
|
"box": [0, 0, 0, 0] # Empty bounding box for classifications
|
||||||
|
}
|
||||||
|
# Handle case when no detections found or result is empty
|
||||||
|
elif not detection_result or not detection_result.get("detections"):
|
||||||
|
# Check if we have classification results
|
||||||
|
if detection_result and detection_result.get("classifications"):
|
||||||
|
# Get the highest confidence classification
|
||||||
|
classifications = detection_result.get("classifications", [])
|
||||||
|
highest_confidence_class = max(classifications, key=lambda x: x.get("confidence", 0)) if classifications else None
|
||||||
|
|
||||||
|
if highest_confidence_class:
|
||||||
|
highest_confidence_detection = {
|
||||||
|
"class": highest_confidence_class.get("class", "none"),
|
||||||
|
"confidence": highest_confidence_class.get("confidence", 1.0),
|
||||||
|
"box": [0, 0, 0, 0] # Empty bounding box for classifications
|
||||||
|
}
|
||||||
|
else:
|
||||||
|
highest_confidence_detection = {
|
||||||
|
"class": "none",
|
||||||
|
"confidence": 1.0,
|
||||||
|
"box": [0, 0, 0, 0]
|
||||||
|
}
|
||||||
|
else:
|
||||||
|
highest_confidence_detection = {
|
||||||
|
"class": "none",
|
||||||
|
"confidence": 1.0,
|
||||||
|
"box": [0, 0, 0, 0]
|
||||||
|
}
|
||||||
|
else:
|
||||||
|
# Find detection with highest confidence
|
||||||
|
detections = detection_result.get("detections", [])
|
||||||
|
highest_confidence_detection = max(detections, key=lambda x: x.get("confidence", 0)) if detections else {
|
||||||
"class": "none",
|
"class": "none",
|
||||||
"confidence": 1.0,
|
"confidence": 1.0,
|
||||||
"bbox": [0, 0, 0, 0],
|
"box": [0, 0, 0, 0]
|
||||||
"branch_results": {}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
# Convert detection format to match backend expectations exactly as in worker.md section 4.2
|
# Convert detection format to match protocol - flatten detection attributes
|
||||||
detection_dict = {
|
detection_dict = {}
|
||||||
"carModel": None,
|
|
||||||
"carBrand": None,
|
|
||||||
"carYear": None,
|
|
||||||
"bodyType": None,
|
|
||||||
"licensePlateText": None,
|
|
||||||
"licensePlateConfidence": None
|
|
||||||
}
|
|
||||||
|
|
||||||
# Extract and process branch results from parallel classification
|
# Handle different detection result formats
|
||||||
branch_results = highest_confidence_detection.get("branch_results", {})
|
if isinstance(highest_confidence_detection, dict):
|
||||||
if branch_results:
|
# Copy all fields from the detection result
|
||||||
logger.debug(f"Processing branch results: {branch_results}")
|
for key, value in highest_confidence_detection.items():
|
||||||
|
if key not in ["box", "id"]: # Skip internal fields
|
||||||
# Transform branch results into backend-expected detection attributes
|
detection_dict[key] = value
|
||||||
for branch_id, branch_data in branch_results.items():
|
|
||||||
if isinstance(branch_data, dict):
|
|
||||||
logger.debug(f"Processing branch {branch_id}: {branch_data}")
|
|
||||||
|
|
||||||
# Map common classification fields to backend-expected names
|
|
||||||
if "brand" in branch_data:
|
|
||||||
detection_dict["carBrand"] = branch_data["brand"]
|
|
||||||
if "body_type" in branch_data:
|
|
||||||
detection_dict["bodyType"] = branch_data["body_type"]
|
|
||||||
if "class" in branch_data:
|
|
||||||
class_name = branch_data["class"]
|
|
||||||
|
|
||||||
# Map based on branch/model type
|
|
||||||
if "brand" in branch_id.lower():
|
|
||||||
detection_dict["carBrand"] = class_name
|
|
||||||
elif "bodytype" in branch_id.lower() or "body" in branch_id.lower():
|
|
||||||
detection_dict["bodyType"] = class_name
|
|
||||||
|
|
||||||
logger.info(f"Detection payload after branch processing: {detection_dict}")
|
|
||||||
else:
|
|
||||||
logger.debug("No branch results found in detection result")
|
|
||||||
|
|
||||||
detection_data = {
|
detection_data = {
|
||||||
"type": "imageDetection",
|
"type": "imageDetection",
|
||||||
|
@ -332,14 +322,12 @@ async def detect(websocket: WebSocket):
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
# Add session ID if available (generated by pipeline when Car+Frontal detected)
|
# Add session ID if available
|
||||||
if session_id is not None:
|
if session_id is not None:
|
||||||
detection_data["sessionId"] = session_id
|
detection_data["sessionId"] = session_id
|
||||||
logger.debug(f"Added session_id to WebSocket response: {session_id}")
|
|
||||||
|
|
||||||
if highest_confidence_detection.get("class") != "none":
|
if highest_confidence_detection["class"] != "none":
|
||||||
confidence = highest_confidence_detection.get("confidence", 0.0)
|
logger.info(f"Camera {camera_id}: Detected {highest_confidence_detection['class']} with confidence {highest_confidence_detection['confidence']:.2f} using model {stream['modelName']}")
|
||||||
logger.info(f"Camera {camera_id}: Detected {highest_confidence_detection['class']} with confidence {confidence:.2f} using model {stream['modelName']}")
|
|
||||||
|
|
||||||
# Log session ID if available
|
# Log session ID if available
|
||||||
if session_id:
|
if session_id:
|
||||||
|
@ -347,7 +335,6 @@ async def detect(websocket: WebSocket):
|
||||||
|
|
||||||
await websocket.send_json(detection_data)
|
await websocket.send_json(detection_data)
|
||||||
logger.debug(f"Sent detection data to client for camera {camera_id}")
|
logger.debug(f"Sent detection data to client for camera {camera_id}")
|
||||||
logger.debug(f"Sent this detection data: {detection_data}")
|
|
||||||
return persistent_data
|
return persistent_data
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Error in handle_detection for camera {camera_id}: {str(e)}", exc_info=True)
|
logger.error(f"Error in handle_detection for camera {camera_id}: {str(e)}", exc_info=True)
|
||||||
|
@ -513,199 +500,6 @@ async def detect(websocket: WebSocket):
|
||||||
finally:
|
finally:
|
||||||
logger.info(f"Snapshot reader thread for camera {camera_id} is exiting")
|
logger.info(f"Snapshot reader thread for camera {camera_id} is exiting")
|
||||||
|
|
||||||
async def reconcile_subscriptions(desired_subscriptions, websocket):
|
|
||||||
"""
|
|
||||||
Declarative reconciliation: Compare desired vs current subscriptions and make changes
|
|
||||||
"""
|
|
||||||
logger.info(f"Reconciling subscriptions: {len(desired_subscriptions)} desired")
|
|
||||||
|
|
||||||
with streams_lock:
|
|
||||||
# Get current subscriptions
|
|
||||||
current_subscription_ids = set(streams.keys())
|
|
||||||
desired_subscription_ids = set(sub["subscriptionIdentifier"] for sub in desired_subscriptions)
|
|
||||||
|
|
||||||
# Find what to add and remove
|
|
||||||
to_add = desired_subscription_ids - current_subscription_ids
|
|
||||||
to_remove = current_subscription_ids - desired_subscription_ids
|
|
||||||
to_check_for_changes = current_subscription_ids & desired_subscription_ids
|
|
||||||
|
|
||||||
logger.info(f"Reconciliation: {len(to_add)} to add, {len(to_remove)} to remove, {len(to_check_for_changes)} to check for changes")
|
|
||||||
|
|
||||||
# Remove subscriptions that are no longer wanted
|
|
||||||
for subscription_id in to_remove:
|
|
||||||
await unsubscribe_internal(subscription_id)
|
|
||||||
|
|
||||||
# Check existing subscriptions for parameter changes
|
|
||||||
for subscription_id in to_check_for_changes:
|
|
||||||
desired_sub = next(sub for sub in desired_subscriptions if sub["subscriptionIdentifier"] == subscription_id)
|
|
||||||
current_stream = streams[subscription_id]
|
|
||||||
|
|
||||||
# Check if parameters changed
|
|
||||||
if has_subscription_changed(desired_sub, current_stream):
|
|
||||||
logger.info(f"Parameters changed for {subscription_id}, resubscribing")
|
|
||||||
await unsubscribe_internal(subscription_id)
|
|
||||||
await subscribe_internal(desired_sub, websocket)
|
|
||||||
|
|
||||||
# Add new subscriptions
|
|
||||||
for subscription_id in to_add:
|
|
||||||
desired_sub = next(sub for sub in desired_subscriptions if sub["subscriptionIdentifier"] == subscription_id)
|
|
||||||
await subscribe_internal(desired_sub, websocket)
|
|
||||||
|
|
||||||
def has_subscription_changed(desired_sub, current_stream):
|
|
||||||
"""Check if subscription parameters have changed"""
|
|
||||||
return (
|
|
||||||
desired_sub.get("rtspUrl") != current_stream.get("rtsp_url") or
|
|
||||||
desired_sub.get("snapshotUrl") != current_stream.get("snapshot_url") or
|
|
||||||
desired_sub.get("snapshotInterval") != current_stream.get("snapshot_interval") or
|
|
||||||
desired_sub.get("cropX1") != current_stream.get("cropX1") or
|
|
||||||
desired_sub.get("cropY1") != current_stream.get("cropY1") or
|
|
||||||
desired_sub.get("cropX2") != current_stream.get("cropX2") or
|
|
||||||
desired_sub.get("cropY2") != current_stream.get("cropY2") or
|
|
||||||
desired_sub.get("modelId") != current_stream.get("modelId") or
|
|
||||||
desired_sub.get("modelName") != current_stream.get("modelName")
|
|
||||||
)
|
|
||||||
|
|
||||||
async def subscribe_internal(subscription, websocket):
|
|
||||||
"""Internal subscription logic extracted from original subscribe handler"""
|
|
||||||
subscriptionIdentifier = subscription.get("subscriptionIdentifier")
|
|
||||||
rtsp_url = subscription.get("rtspUrl")
|
|
||||||
snapshot_url = subscription.get("snapshotUrl")
|
|
||||||
snapshot_interval = subscription.get("snapshotInterval")
|
|
||||||
model_url = subscription.get("modelUrl")
|
|
||||||
modelId = subscription.get("modelId")
|
|
||||||
modelName = subscription.get("modelName")
|
|
||||||
cropX1 = subscription.get("cropX1")
|
|
||||||
cropY1 = subscription.get("cropY1")
|
|
||||||
cropX2 = subscription.get("cropX2")
|
|
||||||
cropY2 = subscription.get("cropY2")
|
|
||||||
|
|
||||||
# Extract camera_id from subscriptionIdentifier
|
|
||||||
parts = subscriptionIdentifier.split(';')
|
|
||||||
if len(parts) != 2:
|
|
||||||
logger.error(f"Invalid subscriptionIdentifier format: {subscriptionIdentifier}")
|
|
||||||
return
|
|
||||||
|
|
||||||
display_identifier, camera_identifier = parts
|
|
||||||
camera_id = subscriptionIdentifier
|
|
||||||
|
|
||||||
# Load model if needed
|
|
||||||
if model_url:
|
|
||||||
with models_lock:
|
|
||||||
if (camera_id not in models) or (modelId not in models[camera_id]):
|
|
||||||
logger.info(f"Loading model from {model_url} for camera {camera_id}, modelId {modelId}")
|
|
||||||
extraction_dir = os.path.join("models", camera_identifier, str(modelId))
|
|
||||||
os.makedirs(extraction_dir, exist_ok=True)
|
|
||||||
|
|
||||||
# Handle model loading (same as original)
|
|
||||||
parsed = urlparse(model_url)
|
|
||||||
if parsed.scheme in ("http", "https"):
|
|
||||||
filename = os.path.basename(parsed.path) or f"model_{modelId}.mpta"
|
|
||||||
local_mpta = os.path.join(extraction_dir, filename)
|
|
||||||
local_path = download_mpta(model_url, local_mpta)
|
|
||||||
if not local_path:
|
|
||||||
logger.error(f"Failed to download model from {model_url}")
|
|
||||||
return
|
|
||||||
model_tree = load_pipeline_from_zip(local_path, extraction_dir)
|
|
||||||
else:
|
|
||||||
if not os.path.exists(model_url):
|
|
||||||
logger.error(f"Model file not found: {model_url}")
|
|
||||||
return
|
|
||||||
model_tree = load_pipeline_from_zip(model_url, extraction_dir)
|
|
||||||
|
|
||||||
if model_tree is None:
|
|
||||||
logger.error(f"Failed to load model {modelId}")
|
|
||||||
return
|
|
||||||
|
|
||||||
if camera_id not in models:
|
|
||||||
models[camera_id] = {}
|
|
||||||
models[camera_id][modelId] = model_tree
|
|
||||||
|
|
||||||
# Create stream (same logic as original)
|
|
||||||
if camera_id and (rtsp_url or snapshot_url) and len(streams) < max_streams:
|
|
||||||
camera_url = snapshot_url if snapshot_url else rtsp_url
|
|
||||||
|
|
||||||
# Check if we already have a stream for this camera URL
|
|
||||||
shared_stream = camera_streams.get(camera_url)
|
|
||||||
|
|
||||||
if shared_stream:
|
|
||||||
# Reuse existing stream
|
|
||||||
buffer = shared_stream["buffer"]
|
|
||||||
stop_event = shared_stream["stop_event"]
|
|
||||||
thread = shared_stream["thread"]
|
|
||||||
mode = shared_stream["mode"]
|
|
||||||
shared_stream["ref_count"] = shared_stream.get("ref_count", 0) + 1
|
|
||||||
else:
|
|
||||||
# Create new stream
|
|
||||||
buffer = queue.Queue(maxsize=1)
|
|
||||||
stop_event = threading.Event()
|
|
||||||
|
|
||||||
if snapshot_url and snapshot_interval:
|
|
||||||
thread = threading.Thread(target=snapshot_reader, args=(camera_id, snapshot_url, snapshot_interval, buffer, stop_event))
|
|
||||||
thread.daemon = True
|
|
||||||
thread.start()
|
|
||||||
mode = "snapshot"
|
|
||||||
shared_stream = {
|
|
||||||
"buffer": buffer, "thread": thread, "stop_event": stop_event,
|
|
||||||
"mode": mode, "url": snapshot_url, "snapshot_interval": snapshot_interval, "ref_count": 1
|
|
||||||
}
|
|
||||||
camera_streams[camera_url] = shared_stream
|
|
||||||
elif rtsp_url:
|
|
||||||
cap = cv2.VideoCapture(rtsp_url)
|
|
||||||
if not cap.isOpened():
|
|
||||||
logger.error(f"Failed to open RTSP stream for camera {camera_id}")
|
|
||||||
return
|
|
||||||
thread = threading.Thread(target=frame_reader, args=(camera_id, cap, buffer, stop_event))
|
|
||||||
thread.daemon = True
|
|
||||||
thread.start()
|
|
||||||
mode = "rtsp"
|
|
||||||
shared_stream = {
|
|
||||||
"buffer": buffer, "thread": thread, "stop_event": stop_event,
|
|
||||||
"mode": mode, "url": rtsp_url, "cap": cap, "ref_count": 1
|
|
||||||
}
|
|
||||||
camera_streams[camera_url] = shared_stream
|
|
||||||
else:
|
|
||||||
logger.error(f"No valid URL provided for camera {camera_id}")
|
|
||||||
return
|
|
||||||
|
|
||||||
# Create stream info
|
|
||||||
stream_info = {
|
|
||||||
"buffer": buffer, "thread": thread, "stop_event": stop_event,
|
|
||||||
"modelId": modelId, "modelName": modelName, "subscriptionIdentifier": subscriptionIdentifier,
|
|
||||||
"cropX1": cropX1, "cropY1": cropY1, "cropX2": cropX2, "cropY2": cropY2,
|
|
||||||
"mode": mode, "camera_url": camera_url, "modelUrl": model_url
|
|
||||||
}
|
|
||||||
|
|
||||||
if mode == "snapshot":
|
|
||||||
stream_info["snapshot_url"] = snapshot_url
|
|
||||||
stream_info["snapshot_interval"] = snapshot_interval
|
|
||||||
elif mode == "rtsp":
|
|
||||||
stream_info["rtsp_url"] = rtsp_url
|
|
||||||
stream_info["cap"] = shared_stream["cap"]
|
|
||||||
|
|
||||||
streams[camera_id] = stream_info
|
|
||||||
subscription_to_camera[camera_id] = camera_url
|
|
||||||
logger.info(f"Subscribed to camera {camera_id}")
|
|
||||||
|
|
||||||
async def unsubscribe_internal(subscription_id):
|
|
||||||
"""Internal unsubscription logic"""
|
|
||||||
if subscription_id in streams:
|
|
||||||
stream = streams.pop(subscription_id)
|
|
||||||
camera_url = subscription_to_camera.pop(subscription_id, None)
|
|
||||||
|
|
||||||
if camera_url and camera_url in camera_streams:
|
|
||||||
shared_stream = camera_streams[camera_url]
|
|
||||||
shared_stream["ref_count"] -= 1
|
|
||||||
|
|
||||||
if shared_stream["ref_count"] <= 0:
|
|
||||||
shared_stream["stop_event"].set()
|
|
||||||
shared_stream["thread"].join()
|
|
||||||
if "cap" in shared_stream:
|
|
||||||
shared_stream["cap"].release()
|
|
||||||
del camera_streams[camera_url]
|
|
||||||
|
|
||||||
latest_frames.pop(subscription_id, None)
|
|
||||||
logger.info(f"Unsubscribed from camera {subscription_id}")
|
|
||||||
|
|
||||||
async def process_streams():
|
async def process_streams():
|
||||||
logger.info("Started processing streams")
|
logger.info("Started processing streams")
|
||||||
try:
|
try:
|
||||||
|
@ -773,10 +567,6 @@ async def detect(websocket: WebSocket):
|
||||||
"modelId": stream["modelId"],
|
"modelId": stream["modelId"],
|
||||||
"modelName": stream["modelName"],
|
"modelName": stream["modelName"],
|
||||||
"online": True,
|
"online": True,
|
||||||
# Include all subscription parameters for proper change detection
|
|
||||||
"rtspUrl": stream.get("rtsp_url"),
|
|
||||||
"snapshotUrl": stream.get("snapshot_url"),
|
|
||||||
"snapshotInterval": stream.get("snapshot_interval"),
|
|
||||||
**{k: v for k, v in get_crop_coords(stream).items() if v is not None}
|
**{k: v for k, v in get_crop_coords(stream).items() if v is not None}
|
||||||
}
|
}
|
||||||
for camera_id, stream in streams.items()
|
for camera_id, stream in streams.items()
|
||||||
|
@ -805,44 +595,29 @@ async def detect(websocket: WebSocket):
|
||||||
data = json.loads(msg)
|
data = json.loads(msg)
|
||||||
msg_type = data.get("type")
|
msg_type = data.get("type")
|
||||||
|
|
||||||
if msg_type == "setSubscriptionList":
|
if msg_type == "subscribe":
|
||||||
# Declarative approach: Backend sends list of subscriptions this worker should have
|
|
||||||
desired_subscriptions = data.get("subscriptions", [])
|
|
||||||
logger.info(f"Received subscription list with {len(desired_subscriptions)} subscriptions")
|
|
||||||
|
|
||||||
await reconcile_subscriptions(desired_subscriptions, websocket)
|
|
||||||
|
|
||||||
elif msg_type == "subscribe":
|
|
||||||
# Legacy support - convert single subscription to list
|
|
||||||
payload = data.get("payload", {})
|
|
||||||
await reconcile_subscriptions([payload], websocket)
|
|
||||||
|
|
||||||
elif msg_type == "unsubscribe":
|
|
||||||
# Legacy support - remove subscription
|
|
||||||
payload = data.get("payload", {})
|
payload = data.get("payload", {})
|
||||||
subscriptionIdentifier = payload.get("subscriptionIdentifier")
|
subscriptionIdentifier = payload.get("subscriptionIdentifier")
|
||||||
# Remove from current subscriptions and reconcile
|
rtsp_url = payload.get("rtspUrl")
|
||||||
current_subs = []
|
snapshot_url = payload.get("snapshotUrl")
|
||||||
with streams_lock:
|
snapshot_interval = payload.get("snapshotInterval")
|
||||||
for camera_id, stream in streams.items():
|
model_url = payload.get("modelUrl")
|
||||||
if stream["subscriptionIdentifier"] != subscriptionIdentifier:
|
modelId = payload.get("modelId")
|
||||||
# Convert stream back to subscription format
|
modelName = payload.get("modelName")
|
||||||
current_subs.append({
|
cropX1 = payload.get("cropX1")
|
||||||
"subscriptionIdentifier": stream["subscriptionIdentifier"],
|
cropY1 = payload.get("cropY1")
|
||||||
"rtspUrl": stream.get("rtsp_url"),
|
cropX2 = payload.get("cropX2")
|
||||||
"snapshotUrl": stream.get("snapshot_url"),
|
cropY2 = payload.get("cropY2")
|
||||||
"snapshotInterval": stream.get("snapshot_interval"),
|
|
||||||
"modelId": stream["modelId"],
|
# Extract camera_id from subscriptionIdentifier (format: displayIdentifier;cameraIdentifier)
|
||||||
"modelName": stream["modelName"],
|
parts = subscriptionIdentifier.split(';')
|
||||||
"modelUrl": stream.get("modelUrl", ""),
|
if len(parts) != 2:
|
||||||
"cropX1": stream.get("cropX1"),
|
logger.error(f"Invalid subscriptionIdentifier format: {subscriptionIdentifier}")
|
||||||
"cropY1": stream.get("cropY1"),
|
continue
|
||||||
"cropX2": stream.get("cropX2"),
|
|
||||||
"cropY2": stream.get("cropY2")
|
|
||||||
})
|
|
||||||
await reconcile_subscriptions(current_subs, websocket)
|
|
||||||
|
|
||||||
elif msg_type == "old_subscribe_logic_removed":
|
display_identifier, camera_identifier = parts
|
||||||
|
camera_id = subscriptionIdentifier # Use full subscriptionIdentifier as camera_id for mapping
|
||||||
|
|
||||||
if model_url:
|
if model_url:
|
||||||
with models_lock:
|
with models_lock:
|
||||||
if (camera_id not in models) or (modelId not in models[camera_id]):
|
if (camera_id not in models) or (modelId not in models[camera_id]):
|
||||||
|
@ -1038,10 +813,6 @@ async def detect(websocket: WebSocket):
|
||||||
"modelId": stream["modelId"],
|
"modelId": stream["modelId"],
|
||||||
"modelName": stream["modelName"],
|
"modelName": stream["modelName"],
|
||||||
"online": True,
|
"online": True,
|
||||||
# Include all subscription parameters for proper change detection
|
|
||||||
"rtspUrl": stream.get("rtsp_url"),
|
|
||||||
"snapshotUrl": stream.get("snapshot_url"),
|
|
||||||
"snapshotInterval": stream.get("snapshot_interval"),
|
|
||||||
**{k: v for k, v in get_crop_coords(stream).items() if v is not None}
|
**{k: v for k, v in get_crop_coords(stream).items() if v is not None}
|
||||||
}
|
}
|
||||||
for camera_id, stream in streams.items()
|
for camera_id, stream in streams.items()
|
||||||
|
|
File diff suppressed because it is too large
Load diff
File diff suppressed because it is too large
Load diff
|
@ -1,7 +1,7 @@
|
||||||
torch
|
torch>=1.12.0,<2.1.0
|
||||||
torchvision
|
torchvision>=0.13.0,<0.16.0
|
||||||
ultralytics
|
ultralytics>=8.0.0,<8.1.0
|
||||||
opencv-python
|
opencv-python>=4.6.0,<4.9.0
|
||||||
scipy
|
scipy>=1.9.0,<1.12.0
|
||||||
filterpy
|
filterpy>=1.4.0,<1.5.0
|
||||||
psycopg2-binary
|
psycopg2-binary>=2.9.0,<2.10.0
|
|
@ -514,65 +514,6 @@ def resolve_field_mapping(value_template, branch_results, action_context):
|
||||||
logger.error(f"Error resolving field mapping '{value_template}': {e}")
|
logger.error(f"Error resolving field mapping '{value_template}': {e}")
|
||||||
return None
|
return None
|
||||||
|
|
||||||
def validate_pipeline_execution(node, regions_dict):
|
|
||||||
"""
|
|
||||||
Pre-validate that all required branches will execute successfully before
|
|
||||||
committing to Redis actions and database records.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
- (True, []) if pipeline can execute completely
|
|
||||||
- (False, missing_branches) if some required branches won't execute
|
|
||||||
"""
|
|
||||||
# Get all branches that parallel actions are waiting for
|
|
||||||
required_branches = set()
|
|
||||||
|
|
||||||
for action in node.get("parallelActions", []):
|
|
||||||
if action.get("type") == "postgresql_update_combined":
|
|
||||||
wait_for_branches = action.get("waitForBranches", [])
|
|
||||||
required_branches.update(wait_for_branches)
|
|
||||||
|
|
||||||
if not required_branches:
|
|
||||||
# No parallel actions requiring specific branches
|
|
||||||
logger.debug("No parallel actions with waitForBranches - validation passes")
|
|
||||||
return True, []
|
|
||||||
|
|
||||||
logger.debug(f"Pre-validation: checking if required branches {list(required_branches)} will execute")
|
|
||||||
|
|
||||||
# Check each required branch
|
|
||||||
missing_branches = []
|
|
||||||
|
|
||||||
for branch in node.get("branches", []):
|
|
||||||
branch_id = branch["modelId"]
|
|
||||||
|
|
||||||
if branch_id not in required_branches:
|
|
||||||
continue # This branch is not required by parallel actions
|
|
||||||
|
|
||||||
# Check if this branch would be triggered
|
|
||||||
trigger_classes = branch.get("triggerClasses", [])
|
|
||||||
min_conf = branch.get("minConfidence", 0)
|
|
||||||
|
|
||||||
branch_triggered = False
|
|
||||||
for det_class in regions_dict:
|
|
||||||
det_confidence = regions_dict[det_class]["confidence"]
|
|
||||||
|
|
||||||
if (det_class in trigger_classes and det_confidence >= min_conf):
|
|
||||||
branch_triggered = True
|
|
||||||
logger.debug(f"Pre-validation: branch {branch_id} WILL be triggered by {det_class} (conf={det_confidence:.3f} >= {min_conf})")
|
|
||||||
break
|
|
||||||
|
|
||||||
if not branch_triggered:
|
|
||||||
missing_branches.append(branch_id)
|
|
||||||
logger.warning(f"Pre-validation: branch {branch_id} will NOT be triggered - no matching classes or insufficient confidence")
|
|
||||||
logger.debug(f" Required: {trigger_classes} with min_conf={min_conf}")
|
|
||||||
logger.debug(f" Available: {[(cls, regions_dict[cls]['confidence']) for cls in regions_dict]}")
|
|
||||||
|
|
||||||
if missing_branches:
|
|
||||||
logger.error(f"Pipeline pre-validation FAILED: required branches {missing_branches} will not execute")
|
|
||||||
return False, missing_branches
|
|
||||||
else:
|
|
||||||
logger.info(f"Pipeline pre-validation PASSED: all required branches {list(required_branches)} will execute")
|
|
||||||
return True, []
|
|
||||||
|
|
||||||
def run_pipeline(frame, node: dict, return_bbox: bool=False, context=None):
|
def run_pipeline(frame, node: dict, return_bbox: bool=False, context=None):
|
||||||
"""
|
"""
|
||||||
Enhanced pipeline that supports:
|
Enhanced pipeline that supports:
|
||||||
|
@ -705,14 +646,6 @@ def run_pipeline(frame, node: dict, return_bbox: bool=False, context=None):
|
||||||
else:
|
else:
|
||||||
logger.debug("No multi-class validation - proceeding with all detections")
|
logger.debug("No multi-class validation - proceeding with all detections")
|
||||||
|
|
||||||
# ─── Pre-validate pipeline execution ────────────────────────
|
|
||||||
pipeline_valid, missing_branches = validate_pipeline_execution(node, regions_dict)
|
|
||||||
|
|
||||||
if not pipeline_valid:
|
|
||||||
logger.error(f"Pipeline execution validation FAILED - required branches {missing_branches} cannot execute")
|
|
||||||
logger.error("Aborting pipeline: no Redis actions or database records will be created")
|
|
||||||
return (None, None) if return_bbox else None
|
|
||||||
|
|
||||||
# ─── Execute actions with region information ────────────────
|
# ─── Execute actions with region information ────────────────
|
||||||
detection_result = {
|
detection_result = {
|
||||||
"detections": all_detections,
|
"detections": all_detections,
|
||||||
|
@ -853,11 +786,9 @@ def run_pipeline(frame, node: dict, return_bbox: bool=False, context=None):
|
||||||
primary_detection = max(all_detections, key=lambda x: x["confidence"])
|
primary_detection = max(all_detections, key=lambda x: x["confidence"])
|
||||||
primary_bbox = primary_detection["bbox"]
|
primary_bbox = primary_detection["bbox"]
|
||||||
|
|
||||||
# Add branch results and session_id to primary detection for compatibility
|
# Add branch results to primary detection for compatibility
|
||||||
if "branch_results" in detection_result:
|
if "branch_results" in detection_result:
|
||||||
primary_detection["branch_results"] = detection_result["branch_results"]
|
primary_detection["branch_results"] = detection_result["branch_results"]
|
||||||
if "session_id" in detection_result:
|
|
||||||
primary_detection["session_id"] = detection_result["session_id"]
|
|
||||||
|
|
||||||
return (primary_detection, primary_bbox) if return_bbox else primary_detection
|
return (primary_detection, primary_bbox) if return_bbox else primary_detection
|
||||||
|
|
||||||
|
|
160
worker.md
160
worker.md
|
@ -2,6 +2,12 @@
|
||||||
|
|
||||||
This document outlines the WebSocket-based communication protocol between the CMS backend and a detector worker. As a worker developer, your primary responsibility is to implement a WebSocket server that adheres to this protocol.
|
This document outlines the WebSocket-based communication protocol between the CMS backend and a detector worker. As a worker developer, your primary responsibility is to implement a WebSocket server that adheres to this protocol.
|
||||||
|
|
||||||
|
The current Python Detector Worker implementation supports advanced computer vision pipelines with:
|
||||||
|
- Multi-class YOLO detection with parallel processing
|
||||||
|
- PostgreSQL database integration with automatic schema management
|
||||||
|
- Redis integration for image storage and pub/sub messaging
|
||||||
|
- Hierarchical pipeline execution with detection → classification branching
|
||||||
|
|
||||||
## 1. Connection
|
## 1. Connection
|
||||||
|
|
||||||
The worker must run a WebSocket server, preferably on port `8000`. The backend system, which is managed by a container orchestration service, will automatically discover and establish a WebSocket connection to your worker.
|
The worker must run a WebSocket server, preferably on port `8000`. The backend system, which is managed by a container orchestration service, will automatically discover and establish a WebSocket connection to your worker.
|
||||||
|
@ -25,14 +31,34 @@ To enable modularity and dynamic configuration, the backend will send you a URL
|
||||||
2. Extracting its contents.
|
2. Extracting its contents.
|
||||||
3. Interpreting the contents to configure its internal pipeline.
|
3. Interpreting the contents to configure its internal pipeline.
|
||||||
|
|
||||||
**The contents of the `.mpta` file are entirely up to the user who configures the model in the CMS.** This allows for maximum flexibility. For example, the archive could contain:
|
**The current implementation supports comprehensive pipeline configurations including:**
|
||||||
|
|
||||||
- AI/ML Models: Pre-trained models for libraries like TensorFlow, PyTorch, or ONNX.
|
- **AI/ML Models**: YOLO models (.pt files) for detection and classification
|
||||||
- Configuration Files: A `config.json` or `pipeline.yaml` that defines a sequence of operations, specifies model paths, or sets detection thresholds.
|
- **Pipeline Configuration**: `pipeline.json` defining hierarchical detection→classification workflows
|
||||||
- Scripts: Custom Python scripts for pre-processing or post-processing.
|
- **Multi-class Detection**: Simultaneous detection of multiple object classes (e.g., Car + Frontal)
|
||||||
- API Integration Details: A JSON file with endpoint information and credentials for interacting with third-party detection services.
|
- **Parallel Processing**: Concurrent execution of classification branches with ThreadPoolExecutor
|
||||||
|
- **Database Integration**: PostgreSQL configuration for automatic table creation and updates
|
||||||
|
- **Redis Actions**: Image storage with region cropping and pub/sub messaging
|
||||||
|
- **Dynamic Field Mapping**: Template-based field resolution for database operations
|
||||||
|
|
||||||
Essentially, the `.mpta` file is a self-contained package that tells your worker _how_ to process the video stream for a given subscription.
|
**Enhanced MPTA Structure:**
|
||||||
|
```
|
||||||
|
pipeline.mpta/
|
||||||
|
├── pipeline.json # Main configuration with redis/postgresql settings
|
||||||
|
├── car_detection.pt # Primary YOLO detection model
|
||||||
|
├── brand_classifier.pt # Classification model for car brands
|
||||||
|
├── bodytype_classifier.pt # Classification model for body types
|
||||||
|
└── ...
|
||||||
|
```
|
||||||
|
|
||||||
|
The `pipeline.json` now supports advanced features like:
|
||||||
|
- Multi-class detection with `expectedClasses` validation
|
||||||
|
- Parallel branch processing with `parallel: true`
|
||||||
|
- Database actions with `postgresql_update_combined`
|
||||||
|
- Redis actions with region-specific image cropping
|
||||||
|
- Branch synchronization with `waitForBranches`
|
||||||
|
|
||||||
|
Essentially, the `.mpta` file is a self-contained package that tells your worker *how* to process the video stream for a given subscription, including complex multi-stage AI pipelines with database persistence.
|
||||||
|
|
||||||
## 4. Messages from Worker to Backend
|
## 4. Messages from Worker to Backend
|
||||||
|
|
||||||
|
@ -79,6 +105,15 @@ Sent when the worker detects a relevant object. The `detection` object should be
|
||||||
|
|
||||||
- **Type:** `imageDetection`
|
- **Type:** `imageDetection`
|
||||||
|
|
||||||
|
**Enhanced Detection Capabilities:**
|
||||||
|
|
||||||
|
The current implementation supports multi-class detection with parallel classification processing. When a vehicle is detected, the system:
|
||||||
|
|
||||||
|
1. **Multi-Class Detection**: Simultaneously detects "Car" and "Frontal" classes
|
||||||
|
2. **Parallel Processing**: Runs brand and body type classification concurrently
|
||||||
|
3. **Database Integration**: Automatically creates and updates PostgreSQL records
|
||||||
|
4. **Redis Storage**: Saves cropped frontal images with expiration
|
||||||
|
|
||||||
**Payload Example:**
|
**Payload Example:**
|
||||||
|
|
||||||
```json
|
```json
|
||||||
|
@ -88,19 +123,38 @@ Sent when the worker detects a relevant object. The `detection` object should be
|
||||||
"timestamp": "2025-07-14T12:34:56.789Z",
|
"timestamp": "2025-07-14T12:34:56.789Z",
|
||||||
"data": {
|
"data": {
|
||||||
"detection": {
|
"detection": {
|
||||||
"carModel": "Civic",
|
"class": "Car",
|
||||||
|
"confidence": 0.92,
|
||||||
"carBrand": "Honda",
|
"carBrand": "Honda",
|
||||||
"carYear": 2023,
|
"carModel": "Civic",
|
||||||
"bodyType": "Sedan",
|
"bodyType": "Sedan",
|
||||||
"licensePlateText": "ABCD1234",
|
"branch_results": {
|
||||||
"licensePlateConfidence": 0.95
|
"car_brand_cls_v1": {
|
||||||
|
"class": "Honda",
|
||||||
|
"confidence": 0.89,
|
||||||
|
"brand": "Honda"
|
||||||
|
},
|
||||||
|
"car_bodytype_cls_v1": {
|
||||||
|
"class": "Sedan",
|
||||||
|
"confidence": 0.85,
|
||||||
|
"body_type": "Sedan"
|
||||||
|
}
|
||||||
|
}
|
||||||
},
|
},
|
||||||
"modelId": 101,
|
"modelId": 101,
|
||||||
"modelName": "US-LPR-and-Vehicle-ID"
|
"modelName": "Car Frontal Detection V1"
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
|
**Database Integration:**
|
||||||
|
|
||||||
|
Each detection automatically:
|
||||||
|
- Creates a record in `gas_station_1.car_frontal_info` table
|
||||||
|
- Generates a unique `session_id` for tracking
|
||||||
|
- Updates the record with classification results after parallel processing completes
|
||||||
|
- Stores cropped frontal images in Redis with the session_id as key
|
||||||
|
|
||||||
### 4.3. Patch Session
|
### 4.3. Patch Session
|
||||||
|
|
||||||
> **Note:** Patch messages are only used when the worker can't keep up and needs to retroactively send detections. Normally, detections should be sent in real-time using `imageDetection` messages. Use `patchSession` only to update session data after the fact.
|
> **Note:** Patch messages are only used when the worker can't keep up and needs to retroactively send detections. Normally, detections should be sent in real-time using `imageDetection` messages. Use `patchSession` only to update session data after the fact.
|
||||||
|
@ -117,9 +171,9 @@ Allows the worker to request a modification to an active session's data. The `da
|
||||||
"sessionId": 12345,
|
"sessionId": 12345,
|
||||||
"data": {
|
"data": {
|
||||||
"currentCar": {
|
"currentCar": {
|
||||||
"carModel": "Civic",
|
"carModel": "Civic",
|
||||||
"carBrand": "Honda",
|
"carBrand": "Honda",
|
||||||
"licensePlateText": "ABCD1234"
|
"licensePlateText": "ABCD1234"
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -133,33 +187,24 @@ The `data` object in the `patchSession` message is merged with the existing `Dis
|
||||||
|
|
||||||
```typescript
|
```typescript
|
||||||
interface DisplayPersistentData {
|
interface DisplayPersistentData {
|
||||||
progressionStage:
|
progressionStage: "welcome" | "car_fueling" | "car_waitpayment" | "car_postpayment" | null;
|
||||||
| 'welcome'
|
qrCode: string | null;
|
||||||
| 'car_fueling'
|
adsPlayback: {
|
||||||
| 'car_waitpayment'
|
playlistSlotOrder: number; // The 'order' of the current slot
|
||||||
| 'car_postpayment'
|
adsId: number | null;
|
||||||
| null;
|
adsUrl: string | null;
|
||||||
qrCode: string | null;
|
} | null;
|
||||||
adsPlayback: {
|
currentCar: {
|
||||||
playlistSlotOrder: number; // The 'order' of the current slot
|
carModel?: string;
|
||||||
adsId: number | null;
|
carBrand?: string;
|
||||||
adsUrl: string | null;
|
carYear?: number;
|
||||||
} | null;
|
bodyType?: string;
|
||||||
currentCar: {
|
licensePlateText?: string;
|
||||||
carModel?: string;
|
licensePlateType?: string;
|
||||||
carBrand?: string;
|
} | null;
|
||||||
carYear?: number;
|
fuelPump: { /* FuelPumpData structure */ } | null;
|
||||||
bodyType?: string;
|
weatherData: { /* WeatherResponse structure */ } | null;
|
||||||
licensePlateText?: string;
|
sessionId: number | null;
|
||||||
licensePlateType?: string;
|
|
||||||
} | null;
|
|
||||||
fuelPump: {
|
|
||||||
/* FuelPumpData structure */
|
|
||||||
} | null;
|
|
||||||
weatherData: {
|
|
||||||
/* WeatherResponse structure */
|
|
||||||
} | null;
|
|
||||||
sessionId: number | null;
|
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
|
@ -212,7 +257,7 @@ Instructs the worker to process a camera's RTSP stream using the configuration f
|
||||||
> - Capture each snapshot only once per cycle, and reuse it for all display subscriptions sharing that camera.
|
> - Capture each snapshot only once per cycle, and reuse it for all display subscriptions sharing that camera.
|
||||||
> - Capture each frame/image only once per cycle.
|
> - Capture each frame/image only once per cycle.
|
||||||
> - Reuse the same captured image and snapshot for all display subscriptions that share the camera, processing and routing detection results separately for each display as needed.
|
> - Reuse the same captured image and snapshot for all display subscriptions that share the camera, processing and routing detection results separately for each display as needed.
|
||||||
> This avoids unnecessary load and bandwidth usage, and ensures consistent detection results and snapshots across all displays sharing the same camera.
|
> This avoids unnecessary load and bandwidth usage, and ensures consistent detection results and snapshots across all displays sharing the same camera.
|
||||||
|
|
||||||
### 5.2. Unsubscribe from Camera
|
### 5.2. Unsubscribe from Camera
|
||||||
|
|
||||||
|
@ -324,7 +369,7 @@ This section shows a typical sequence of messages between the backend and the wo
|
||||||
> **Note:** Unsubscribe is triggered when a user removes a camera or when the node is too heavily loaded and needs rebalancing.
|
> **Note:** Unsubscribe is triggered when a user removes a camera or when the node is too heavily loaded and needs rebalancing.
|
||||||
|
|
||||||
1. **Connection Established** & **Heartbeat**
|
1. **Connection Established** & **Heartbeat**
|
||||||
- **Worker -> Backend**
|
* **Worker -> Backend**
|
||||||
```json
|
```json
|
||||||
{
|
{
|
||||||
"type": "stateReport",
|
"type": "stateReport",
|
||||||
|
@ -336,7 +381,7 @@ This section shows a typical sequence of messages between the backend and the wo
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
2. **Backend Subscribes Camera**
|
2. **Backend Subscribes Camera**
|
||||||
- **Backend -> Worker**
|
* **Backend -> Worker**
|
||||||
```json
|
```json
|
||||||
{
|
{
|
||||||
"type": "subscribe",
|
"type": "subscribe",
|
||||||
|
@ -350,7 +395,7 @@ This section shows a typical sequence of messages between the backend and the wo
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
3. **Worker Acknowledges in Heartbeat**
|
3. **Worker Acknowledges in Heartbeat**
|
||||||
- **Worker -> Backend**
|
* **Worker -> Backend**
|
||||||
```json
|
```json
|
||||||
{
|
{
|
||||||
"type": "stateReport",
|
"type": "stateReport",
|
||||||
|
@ -369,7 +414,7 @@ This section shows a typical sequence of messages between the backend and the wo
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
4. **Worker Detects a Car**
|
4. **Worker Detects a Car**
|
||||||
- **Worker -> Backend**
|
* **Worker -> Backend**
|
||||||
```json
|
```json
|
||||||
{
|
{
|
||||||
"type": "imageDetection",
|
"type": "imageDetection",
|
||||||
|
@ -388,7 +433,7 @@ This section shows a typical sequence of messages between the backend and the wo
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
- **Worker -> Backend**
|
* **Worker -> Backend**
|
||||||
```json
|
```json
|
||||||
{
|
{
|
||||||
"type": "imageDetection",
|
"type": "imageDetection",
|
||||||
|
@ -407,7 +452,7 @@ This section shows a typical sequence of messages between the backend and the wo
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
- **Worker -> Backend**
|
* **Worker -> Backend**
|
||||||
```json
|
```json
|
||||||
{
|
{
|
||||||
"type": "imageDetection",
|
"type": "imageDetection",
|
||||||
|
@ -427,7 +472,7 @@ This section shows a typical sequence of messages between the backend and the wo
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
5. **Backend Unsubscribes Camera**
|
5. **Backend Unsubscribes Camera**
|
||||||
- **Backend -> Worker**
|
* **Backend -> Worker**
|
||||||
```json
|
```json
|
||||||
{
|
{
|
||||||
"type": "unsubscribe",
|
"type": "unsubscribe",
|
||||||
|
@ -437,7 +482,7 @@ This section shows a typical sequence of messages between the backend and the wo
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
6. **Worker Acknowledges Unsubscription**
|
6. **Worker Acknowledges Unsubscription**
|
||||||
- **Worker -> Backend**
|
* **Worker -> Backend**
|
||||||
```json
|
```json
|
||||||
{
|
{
|
||||||
"type": "stateReport",
|
"type": "stateReport",
|
||||||
|
@ -448,7 +493,6 @@ This section shows a typical sequence of messages between the backend and the wo
|
||||||
"cameraConnections": []
|
"cameraConnections": []
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
## 7. HTTP API: Image Retrieval
|
## 7. HTTP API: Image Retrieval
|
||||||
|
|
||||||
In addition to the WebSocket protocol, the worker exposes an HTTP endpoint for retrieving the latest image frame from a camera.
|
In addition to the WebSocket protocol, the worker exposes an HTTP endpoint for retrieving the latest image frame from a camera.
|
||||||
|
@ -464,13 +508,11 @@ GET /camera/{camera_id}/image
|
||||||
### Response
|
### Response
|
||||||
|
|
||||||
- **Success (200):** Returns the latest JPEG image from the camera stream.
|
- **Success (200):** Returns the latest JPEG image from the camera stream.
|
||||||
|
- `Content-Type: image/jpeg`
|
||||||
- `Content-Type: image/jpeg`
|
- Binary JPEG data.
|
||||||
- Binary JPEG data.
|
|
||||||
|
|
||||||
- **Error (404):** If the camera is not found or no frame is available.
|
- **Error (404):** If the camera is not found or no frame is available.
|
||||||
|
- JSON error response.
|
||||||
- JSON error response.
|
|
||||||
|
|
||||||
- **Error (500):** Internal server error.
|
- **Error (500):** Internal server error.
|
||||||
|
|
||||||
|
@ -483,9 +525,9 @@ GET /camera/display-001;cam-001/image
|
||||||
### Example Response
|
### Example Response
|
||||||
|
|
||||||
- **Headers:**
|
- **Headers:**
|
||||||
```
|
```
|
||||||
Content-Type: image/jpeg
|
Content-Type: image/jpeg
|
||||||
```
|
```
|
||||||
- **Body:** Binary JPEG image.
|
- **Body:** Binary JPEG image.
|
||||||
|
|
||||||
### Notes
|
### Notes
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue