Compare commits

..

1 commit

Author SHA1 Message Date
Pongsatorn
78cb5f53c9 update req 2025-08-11 14:28:19 +07:00
7 changed files with 182 additions and 3386 deletions

1
.gitignore vendored
View file

@ -12,4 +12,3 @@ detector_worker.log
no_frame_debug.log no_frame_debug.log
feeder/ feeder/
.venv/

375
app.py
View file

@ -13,13 +13,7 @@ import requests
import asyncio import asyncio
import psutil import psutil
import zipfile import zipfile
import ssl
import urllib3
import subprocess
import tempfile
from urllib.parse import urlparse from urllib.parse import urlparse
from requests.adapters import HTTPAdapter
from urllib3.util.ssl_ import create_urllib3_context
from fastapi import FastAPI, WebSocket, HTTPException from fastapi import FastAPI, WebSocket, HTTPException
from fastapi.websockets import WebSocketDisconnect from fastapi.websockets import WebSocketDisconnect
from fastapi.responses import Response from fastapi.responses import Response
@ -246,14 +240,16 @@ async def detect(websocket: WebSocket):
logger.debug(f"Processing frame for camera {camera_id} with model {stream['modelId']}") logger.debug(f"Processing frame for camera {camera_id} with model {stream['modelId']}")
start_time = time.time() start_time = time.time()
# Extract display identifier for pipeline context # Extract display identifier for session ID lookup
subscription_parts = stream["subscriptionIdentifier"].split(';') subscription_parts = stream["subscriptionIdentifier"].split(';')
display_identifier = subscription_parts[0] if subscription_parts else None display_identifier = subscription_parts[0] if subscription_parts else None
session_id = session_ids.get(display_identifier) if display_identifier else None
# Create context for pipeline execution (session_id will be generated by pipeline) # Create context for pipeline execution
pipeline_context = { pipeline_context = {
"camera_id": camera_id, "camera_id": camera_id,
"display_id": display_identifier "display_id": display_identifier,
"session_id": session_id
} }
detection_result = run_pipeline(cropped_frame, model_tree, context=pipeline_context) detection_result = run_pipeline(cropped_frame, model_tree, context=pipeline_context)
@ -263,63 +259,57 @@ async def detect(websocket: WebSocket):
# Log the raw detection result for debugging # Log the raw detection result for debugging
logger.debug(f"Raw detection result for camera {camera_id}:\n{json.dumps(detection_result, indent=2, default=str)}") logger.debug(f"Raw detection result for camera {camera_id}:\n{json.dumps(detection_result, indent=2, default=str)}")
# Extract session_id from pipeline result (generated during database record creation) # Direct class result (no detections/classifications structure)
session_id = None if detection_result and isinstance(detection_result, dict) and "class" in detection_result and "confidence" in detection_result:
if detection_result and isinstance(detection_result, dict):
# Check if pipeline generated a session_id (happens when Car+Frontal detected together)
if "session_id" in detection_result:
session_id = detection_result["session_id"]
logger.debug(f"Extracted session_id from pipeline result: {session_id}")
# Process detection result - run_pipeline returns the primary detection directly
if detection_result and isinstance(detection_result, dict) and "class" in detection_result:
highest_confidence_detection = detection_result
else:
# No detection found
highest_confidence_detection = { highest_confidence_detection = {
"class": detection_result.get("class", "none"),
"confidence": detection_result.get("confidence", 1.0),
"box": [0, 0, 0, 0] # Empty bounding box for classifications
}
# Handle case when no detections found or result is empty
elif not detection_result or not detection_result.get("detections"):
# Check if we have classification results
if detection_result and detection_result.get("classifications"):
# Get the highest confidence classification
classifications = detection_result.get("classifications", [])
highest_confidence_class = max(classifications, key=lambda x: x.get("confidence", 0)) if classifications else None
if highest_confidence_class:
highest_confidence_detection = {
"class": highest_confidence_class.get("class", "none"),
"confidence": highest_confidence_class.get("confidence", 1.0),
"box": [0, 0, 0, 0] # Empty bounding box for classifications
}
else:
highest_confidence_detection = {
"class": "none",
"confidence": 1.0,
"box": [0, 0, 0, 0]
}
else:
highest_confidence_detection = {
"class": "none",
"confidence": 1.0,
"box": [0, 0, 0, 0]
}
else:
# Find detection with highest confidence
detections = detection_result.get("detections", [])
highest_confidence_detection = max(detections, key=lambda x: x.get("confidence", 0)) if detections else {
"class": "none", "class": "none",
"confidence": 1.0, "confidence": 1.0,
"bbox": [0, 0, 0, 0], "box": [0, 0, 0, 0]
"branch_results": {}
} }
# Convert detection format to match backend expectations exactly as in worker.md section 4.2 # Convert detection format to match protocol - flatten detection attributes
detection_dict = { detection_dict = {}
"carModel": None,
"carBrand": None,
"carYear": None,
"bodyType": None,
"licensePlateText": None,
"licensePlateConfidence": None
}
# Extract and process branch results from parallel classification # Handle different detection result formats
branch_results = highest_confidence_detection.get("branch_results", {}) if isinstance(highest_confidence_detection, dict):
if branch_results: # Copy all fields from the detection result
logger.debug(f"Processing branch results: {branch_results}") for key, value in highest_confidence_detection.items():
if key not in ["box", "id"]: # Skip internal fields
# Transform branch results into backend-expected detection attributes detection_dict[key] = value
for branch_id, branch_data in branch_results.items():
if isinstance(branch_data, dict):
logger.debug(f"Processing branch {branch_id}: {branch_data}")
# Map common classification fields to backend-expected names
if "brand" in branch_data:
detection_dict["carBrand"] = branch_data["brand"]
if "body_type" in branch_data:
detection_dict["bodyType"] = branch_data["body_type"]
if "class" in branch_data:
class_name = branch_data["class"]
# Map based on branch/model type
if "brand" in branch_id.lower():
detection_dict["carBrand"] = class_name
elif "bodytype" in branch_id.lower() or "body" in branch_id.lower():
detection_dict["bodyType"] = class_name
logger.info(f"Detection payload after branch processing: {detection_dict}")
else:
logger.debug("No branch results found in detection result")
detection_data = { detection_data = {
"type": "imageDetection", "type": "imageDetection",
@ -332,14 +322,12 @@ async def detect(websocket: WebSocket):
} }
} }
# Add session ID if available (generated by pipeline when Car+Frontal detected) # Add session ID if available
if session_id is not None: if session_id is not None:
detection_data["sessionId"] = session_id detection_data["sessionId"] = session_id
logger.debug(f"Added session_id to WebSocket response: {session_id}")
if highest_confidence_detection.get("class") != "none": if highest_confidence_detection["class"] != "none":
confidence = highest_confidence_detection.get("confidence", 0.0) logger.info(f"Camera {camera_id}: Detected {highest_confidence_detection['class']} with confidence {highest_confidence_detection['confidence']:.2f} using model {stream['modelName']}")
logger.info(f"Camera {camera_id}: Detected {highest_confidence_detection['class']} with confidence {confidence:.2f} using model {stream['modelName']}")
# Log session ID if available # Log session ID if available
if session_id: if session_id:
@ -347,7 +335,6 @@ async def detect(websocket: WebSocket):
await websocket.send_json(detection_data) await websocket.send_json(detection_data)
logger.debug(f"Sent detection data to client for camera {camera_id}") logger.debug(f"Sent detection data to client for camera {camera_id}")
logger.debug(f"Sent this detection data: {detection_data}")
return persistent_data return persistent_data
except Exception as e: except Exception as e:
logger.error(f"Error in handle_detection for camera {camera_id}: {str(e)}", exc_info=True) logger.error(f"Error in handle_detection for camera {camera_id}: {str(e)}", exc_info=True)
@ -513,199 +500,6 @@ async def detect(websocket: WebSocket):
finally: finally:
logger.info(f"Snapshot reader thread for camera {camera_id} is exiting") logger.info(f"Snapshot reader thread for camera {camera_id} is exiting")
async def reconcile_subscriptions(desired_subscriptions, websocket):
"""
Declarative reconciliation: Compare desired vs current subscriptions and make changes
"""
logger.info(f"Reconciling subscriptions: {len(desired_subscriptions)} desired")
with streams_lock:
# Get current subscriptions
current_subscription_ids = set(streams.keys())
desired_subscription_ids = set(sub["subscriptionIdentifier"] for sub in desired_subscriptions)
# Find what to add and remove
to_add = desired_subscription_ids - current_subscription_ids
to_remove = current_subscription_ids - desired_subscription_ids
to_check_for_changes = current_subscription_ids & desired_subscription_ids
logger.info(f"Reconciliation: {len(to_add)} to add, {len(to_remove)} to remove, {len(to_check_for_changes)} to check for changes")
# Remove subscriptions that are no longer wanted
for subscription_id in to_remove:
await unsubscribe_internal(subscription_id)
# Check existing subscriptions for parameter changes
for subscription_id in to_check_for_changes:
desired_sub = next(sub for sub in desired_subscriptions if sub["subscriptionIdentifier"] == subscription_id)
current_stream = streams[subscription_id]
# Check if parameters changed
if has_subscription_changed(desired_sub, current_stream):
logger.info(f"Parameters changed for {subscription_id}, resubscribing")
await unsubscribe_internal(subscription_id)
await subscribe_internal(desired_sub, websocket)
# Add new subscriptions
for subscription_id in to_add:
desired_sub = next(sub for sub in desired_subscriptions if sub["subscriptionIdentifier"] == subscription_id)
await subscribe_internal(desired_sub, websocket)
def has_subscription_changed(desired_sub, current_stream):
"""Check if subscription parameters have changed"""
return (
desired_sub.get("rtspUrl") != current_stream.get("rtsp_url") or
desired_sub.get("snapshotUrl") != current_stream.get("snapshot_url") or
desired_sub.get("snapshotInterval") != current_stream.get("snapshot_interval") or
desired_sub.get("cropX1") != current_stream.get("cropX1") or
desired_sub.get("cropY1") != current_stream.get("cropY1") or
desired_sub.get("cropX2") != current_stream.get("cropX2") or
desired_sub.get("cropY2") != current_stream.get("cropY2") or
desired_sub.get("modelId") != current_stream.get("modelId") or
desired_sub.get("modelName") != current_stream.get("modelName")
)
async def subscribe_internal(subscription, websocket):
"""Internal subscription logic extracted from original subscribe handler"""
subscriptionIdentifier = subscription.get("subscriptionIdentifier")
rtsp_url = subscription.get("rtspUrl")
snapshot_url = subscription.get("snapshotUrl")
snapshot_interval = subscription.get("snapshotInterval")
model_url = subscription.get("modelUrl")
modelId = subscription.get("modelId")
modelName = subscription.get("modelName")
cropX1 = subscription.get("cropX1")
cropY1 = subscription.get("cropY1")
cropX2 = subscription.get("cropX2")
cropY2 = subscription.get("cropY2")
# Extract camera_id from subscriptionIdentifier
parts = subscriptionIdentifier.split(';')
if len(parts) != 2:
logger.error(f"Invalid subscriptionIdentifier format: {subscriptionIdentifier}")
return
display_identifier, camera_identifier = parts
camera_id = subscriptionIdentifier
# Load model if needed
if model_url:
with models_lock:
if (camera_id not in models) or (modelId not in models[camera_id]):
logger.info(f"Loading model from {model_url} for camera {camera_id}, modelId {modelId}")
extraction_dir = os.path.join("models", camera_identifier, str(modelId))
os.makedirs(extraction_dir, exist_ok=True)
# Handle model loading (same as original)
parsed = urlparse(model_url)
if parsed.scheme in ("http", "https"):
filename = os.path.basename(parsed.path) or f"model_{modelId}.mpta"
local_mpta = os.path.join(extraction_dir, filename)
local_path = download_mpta(model_url, local_mpta)
if not local_path:
logger.error(f"Failed to download model from {model_url}")
return
model_tree = load_pipeline_from_zip(local_path, extraction_dir)
else:
if not os.path.exists(model_url):
logger.error(f"Model file not found: {model_url}")
return
model_tree = load_pipeline_from_zip(model_url, extraction_dir)
if model_tree is None:
logger.error(f"Failed to load model {modelId}")
return
if camera_id not in models:
models[camera_id] = {}
models[camera_id][modelId] = model_tree
# Create stream (same logic as original)
if camera_id and (rtsp_url or snapshot_url) and len(streams) < max_streams:
camera_url = snapshot_url if snapshot_url else rtsp_url
# Check if we already have a stream for this camera URL
shared_stream = camera_streams.get(camera_url)
if shared_stream:
# Reuse existing stream
buffer = shared_stream["buffer"]
stop_event = shared_stream["stop_event"]
thread = shared_stream["thread"]
mode = shared_stream["mode"]
shared_stream["ref_count"] = shared_stream.get("ref_count", 0) + 1
else:
# Create new stream
buffer = queue.Queue(maxsize=1)
stop_event = threading.Event()
if snapshot_url and snapshot_interval:
thread = threading.Thread(target=snapshot_reader, args=(camera_id, snapshot_url, snapshot_interval, buffer, stop_event))
thread.daemon = True
thread.start()
mode = "snapshot"
shared_stream = {
"buffer": buffer, "thread": thread, "stop_event": stop_event,
"mode": mode, "url": snapshot_url, "snapshot_interval": snapshot_interval, "ref_count": 1
}
camera_streams[camera_url] = shared_stream
elif rtsp_url:
cap = cv2.VideoCapture(rtsp_url)
if not cap.isOpened():
logger.error(f"Failed to open RTSP stream for camera {camera_id}")
return
thread = threading.Thread(target=frame_reader, args=(camera_id, cap, buffer, stop_event))
thread.daemon = True
thread.start()
mode = "rtsp"
shared_stream = {
"buffer": buffer, "thread": thread, "stop_event": stop_event,
"mode": mode, "url": rtsp_url, "cap": cap, "ref_count": 1
}
camera_streams[camera_url] = shared_stream
else:
logger.error(f"No valid URL provided for camera {camera_id}")
return
# Create stream info
stream_info = {
"buffer": buffer, "thread": thread, "stop_event": stop_event,
"modelId": modelId, "modelName": modelName, "subscriptionIdentifier": subscriptionIdentifier,
"cropX1": cropX1, "cropY1": cropY1, "cropX2": cropX2, "cropY2": cropY2,
"mode": mode, "camera_url": camera_url, "modelUrl": model_url
}
if mode == "snapshot":
stream_info["snapshot_url"] = snapshot_url
stream_info["snapshot_interval"] = snapshot_interval
elif mode == "rtsp":
stream_info["rtsp_url"] = rtsp_url
stream_info["cap"] = shared_stream["cap"]
streams[camera_id] = stream_info
subscription_to_camera[camera_id] = camera_url
logger.info(f"Subscribed to camera {camera_id}")
async def unsubscribe_internal(subscription_id):
"""Internal unsubscription logic"""
if subscription_id in streams:
stream = streams.pop(subscription_id)
camera_url = subscription_to_camera.pop(subscription_id, None)
if camera_url and camera_url in camera_streams:
shared_stream = camera_streams[camera_url]
shared_stream["ref_count"] -= 1
if shared_stream["ref_count"] <= 0:
shared_stream["stop_event"].set()
shared_stream["thread"].join()
if "cap" in shared_stream:
shared_stream["cap"].release()
del camera_streams[camera_url]
latest_frames.pop(subscription_id, None)
logger.info(f"Unsubscribed from camera {subscription_id}")
async def process_streams(): async def process_streams():
logger.info("Started processing streams") logger.info("Started processing streams")
try: try:
@ -773,10 +567,6 @@ async def detect(websocket: WebSocket):
"modelId": stream["modelId"], "modelId": stream["modelId"],
"modelName": stream["modelName"], "modelName": stream["modelName"],
"online": True, "online": True,
# Include all subscription parameters for proper change detection
"rtspUrl": stream.get("rtsp_url"),
"snapshotUrl": stream.get("snapshot_url"),
"snapshotInterval": stream.get("snapshot_interval"),
**{k: v for k, v in get_crop_coords(stream).items() if v is not None} **{k: v for k, v in get_crop_coords(stream).items() if v is not None}
} }
for camera_id, stream in streams.items() for camera_id, stream in streams.items()
@ -805,44 +595,29 @@ async def detect(websocket: WebSocket):
data = json.loads(msg) data = json.loads(msg)
msg_type = data.get("type") msg_type = data.get("type")
if msg_type == "setSubscriptionList": if msg_type == "subscribe":
# Declarative approach: Backend sends list of subscriptions this worker should have
desired_subscriptions = data.get("subscriptions", [])
logger.info(f"Received subscription list with {len(desired_subscriptions)} subscriptions")
await reconcile_subscriptions(desired_subscriptions, websocket)
elif msg_type == "subscribe":
# Legacy support - convert single subscription to list
payload = data.get("payload", {})
await reconcile_subscriptions([payload], websocket)
elif msg_type == "unsubscribe":
# Legacy support - remove subscription
payload = data.get("payload", {}) payload = data.get("payload", {})
subscriptionIdentifier = payload.get("subscriptionIdentifier") subscriptionIdentifier = payload.get("subscriptionIdentifier")
# Remove from current subscriptions and reconcile rtsp_url = payload.get("rtspUrl")
current_subs = [] snapshot_url = payload.get("snapshotUrl")
with streams_lock: snapshot_interval = payload.get("snapshotInterval")
for camera_id, stream in streams.items(): model_url = payload.get("modelUrl")
if stream["subscriptionIdentifier"] != subscriptionIdentifier: modelId = payload.get("modelId")
# Convert stream back to subscription format modelName = payload.get("modelName")
current_subs.append({ cropX1 = payload.get("cropX1")
"subscriptionIdentifier": stream["subscriptionIdentifier"], cropY1 = payload.get("cropY1")
"rtspUrl": stream.get("rtsp_url"), cropX2 = payload.get("cropX2")
"snapshotUrl": stream.get("snapshot_url"), cropY2 = payload.get("cropY2")
"snapshotInterval": stream.get("snapshot_interval"),
"modelId": stream["modelId"], # Extract camera_id from subscriptionIdentifier (format: displayIdentifier;cameraIdentifier)
"modelName": stream["modelName"], parts = subscriptionIdentifier.split(';')
"modelUrl": stream.get("modelUrl", ""), if len(parts) != 2:
"cropX1": stream.get("cropX1"), logger.error(f"Invalid subscriptionIdentifier format: {subscriptionIdentifier}")
"cropY1": stream.get("cropY1"), continue
"cropX2": stream.get("cropX2"),
"cropY2": stream.get("cropY2")
})
await reconcile_subscriptions(current_subs, websocket)
elif msg_type == "old_subscribe_logic_removed": display_identifier, camera_identifier = parts
camera_id = subscriptionIdentifier # Use full subscriptionIdentifier as camera_id for mapping
if model_url: if model_url:
with models_lock: with models_lock:
if (camera_id not in models) or (modelId not in models[camera_id]): if (camera_id not in models) or (modelId not in models[camera_id]):
@ -1038,10 +813,6 @@ async def detect(websocket: WebSocket):
"modelId": stream["modelId"], "modelId": stream["modelId"],
"modelName": stream["modelName"], "modelName": stream["modelName"],
"online": True, "online": True,
# Include all subscription parameters for proper change detection
"rtspUrl": stream.get("rtsp_url"),
"snapshotUrl": stream.get("snapshot_url"),
"snapshotInterval": stream.get("snapshot_interval"),
**{k: v for k, v in get_crop_coords(stream).items() if v is not None} **{k: v for k, v in get_crop_coords(stream).items() if v is not None}
} }
for camera_id, stream in streams.items() for camera_id, stream in streams.items()

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

View file

@ -1,7 +1,7 @@
torch torch>=1.12.0,<2.1.0
torchvision torchvision>=0.13.0,<0.16.0
ultralytics ultralytics>=8.0.0,<8.1.0
opencv-python opencv-python>=4.6.0,<4.9.0
scipy scipy>=1.9.0,<1.12.0
filterpy filterpy>=1.4.0,<1.5.0
psycopg2-binary psycopg2-binary>=2.9.0,<2.10.0

View file

@ -514,65 +514,6 @@ def resolve_field_mapping(value_template, branch_results, action_context):
logger.error(f"Error resolving field mapping '{value_template}': {e}") logger.error(f"Error resolving field mapping '{value_template}': {e}")
return None return None
def validate_pipeline_execution(node, regions_dict):
"""
Pre-validate that all required branches will execute successfully before
committing to Redis actions and database records.
Returns:
- (True, []) if pipeline can execute completely
- (False, missing_branches) if some required branches won't execute
"""
# Get all branches that parallel actions are waiting for
required_branches = set()
for action in node.get("parallelActions", []):
if action.get("type") == "postgresql_update_combined":
wait_for_branches = action.get("waitForBranches", [])
required_branches.update(wait_for_branches)
if not required_branches:
# No parallel actions requiring specific branches
logger.debug("No parallel actions with waitForBranches - validation passes")
return True, []
logger.debug(f"Pre-validation: checking if required branches {list(required_branches)} will execute")
# Check each required branch
missing_branches = []
for branch in node.get("branches", []):
branch_id = branch["modelId"]
if branch_id not in required_branches:
continue # This branch is not required by parallel actions
# Check if this branch would be triggered
trigger_classes = branch.get("triggerClasses", [])
min_conf = branch.get("minConfidence", 0)
branch_triggered = False
for det_class in regions_dict:
det_confidence = regions_dict[det_class]["confidence"]
if (det_class in trigger_classes and det_confidence >= min_conf):
branch_triggered = True
logger.debug(f"Pre-validation: branch {branch_id} WILL be triggered by {det_class} (conf={det_confidence:.3f} >= {min_conf})")
break
if not branch_triggered:
missing_branches.append(branch_id)
logger.warning(f"Pre-validation: branch {branch_id} will NOT be triggered - no matching classes or insufficient confidence")
logger.debug(f" Required: {trigger_classes} with min_conf={min_conf}")
logger.debug(f" Available: {[(cls, regions_dict[cls]['confidence']) for cls in regions_dict]}")
if missing_branches:
logger.error(f"Pipeline pre-validation FAILED: required branches {missing_branches} will not execute")
return False, missing_branches
else:
logger.info(f"Pipeline pre-validation PASSED: all required branches {list(required_branches)} will execute")
return True, []
def run_pipeline(frame, node: dict, return_bbox: bool=False, context=None): def run_pipeline(frame, node: dict, return_bbox: bool=False, context=None):
""" """
Enhanced pipeline that supports: Enhanced pipeline that supports:
@ -705,14 +646,6 @@ def run_pipeline(frame, node: dict, return_bbox: bool=False, context=None):
else: else:
logger.debug("No multi-class validation - proceeding with all detections") logger.debug("No multi-class validation - proceeding with all detections")
# ─── Pre-validate pipeline execution ────────────────────────
pipeline_valid, missing_branches = validate_pipeline_execution(node, regions_dict)
if not pipeline_valid:
logger.error(f"Pipeline execution validation FAILED - required branches {missing_branches} cannot execute")
logger.error("Aborting pipeline: no Redis actions or database records will be created")
return (None, None) if return_bbox else None
# ─── Execute actions with region information ──────────────── # ─── Execute actions with region information ────────────────
detection_result = { detection_result = {
"detections": all_detections, "detections": all_detections,
@ -853,11 +786,9 @@ def run_pipeline(frame, node: dict, return_bbox: bool=False, context=None):
primary_detection = max(all_detections, key=lambda x: x["confidence"]) primary_detection = max(all_detections, key=lambda x: x["confidence"])
primary_bbox = primary_detection["bbox"] primary_bbox = primary_detection["bbox"]
# Add branch results and session_id to primary detection for compatibility # Add branch results to primary detection for compatibility
if "branch_results" in detection_result: if "branch_results" in detection_result:
primary_detection["branch_results"] = detection_result["branch_results"] primary_detection["branch_results"] = detection_result["branch_results"]
if "session_id" in detection_result:
primary_detection["session_id"] = detection_result["session_id"]
return (primary_detection, primary_bbox) if return_bbox else primary_detection return (primary_detection, primary_bbox) if return_bbox else primary_detection

160
worker.md
View file

@ -2,6 +2,12 @@
This document outlines the WebSocket-based communication protocol between the CMS backend and a detector worker. As a worker developer, your primary responsibility is to implement a WebSocket server that adheres to this protocol. This document outlines the WebSocket-based communication protocol between the CMS backend and a detector worker. As a worker developer, your primary responsibility is to implement a WebSocket server that adheres to this protocol.
The current Python Detector Worker implementation supports advanced computer vision pipelines with:
- Multi-class YOLO detection with parallel processing
- PostgreSQL database integration with automatic schema management
- Redis integration for image storage and pub/sub messaging
- Hierarchical pipeline execution with detection → classification branching
## 1. Connection ## 1. Connection
The worker must run a WebSocket server, preferably on port `8000`. The backend system, which is managed by a container orchestration service, will automatically discover and establish a WebSocket connection to your worker. The worker must run a WebSocket server, preferably on port `8000`. The backend system, which is managed by a container orchestration service, will automatically discover and establish a WebSocket connection to your worker.
@ -25,14 +31,34 @@ To enable modularity and dynamic configuration, the backend will send you a URL
2. Extracting its contents. 2. Extracting its contents.
3. Interpreting the contents to configure its internal pipeline. 3. Interpreting the contents to configure its internal pipeline.
**The contents of the `.mpta` file are entirely up to the user who configures the model in the CMS.** This allows for maximum flexibility. For example, the archive could contain: **The current implementation supports comprehensive pipeline configurations including:**
- AI/ML Models: Pre-trained models for libraries like TensorFlow, PyTorch, or ONNX. - **AI/ML Models**: YOLO models (.pt files) for detection and classification
- Configuration Files: A `config.json` or `pipeline.yaml` that defines a sequence of operations, specifies model paths, or sets detection thresholds. - **Pipeline Configuration**: `pipeline.json` defining hierarchical detection→classification workflows
- Scripts: Custom Python scripts for pre-processing or post-processing. - **Multi-class Detection**: Simultaneous detection of multiple object classes (e.g., Car + Frontal)
- API Integration Details: A JSON file with endpoint information and credentials for interacting with third-party detection services. - **Parallel Processing**: Concurrent execution of classification branches with ThreadPoolExecutor
- **Database Integration**: PostgreSQL configuration for automatic table creation and updates
- **Redis Actions**: Image storage with region cropping and pub/sub messaging
- **Dynamic Field Mapping**: Template-based field resolution for database operations
Essentially, the `.mpta` file is a self-contained package that tells your worker _how_ to process the video stream for a given subscription. **Enhanced MPTA Structure:**
```
pipeline.mpta/
├── pipeline.json # Main configuration with redis/postgresql settings
├── car_detection.pt # Primary YOLO detection model
├── brand_classifier.pt # Classification model for car brands
├── bodytype_classifier.pt # Classification model for body types
└── ...
```
The `pipeline.json` now supports advanced features like:
- Multi-class detection with `expectedClasses` validation
- Parallel branch processing with `parallel: true`
- Database actions with `postgresql_update_combined`
- Redis actions with region-specific image cropping
- Branch synchronization with `waitForBranches`
Essentially, the `.mpta` file is a self-contained package that tells your worker *how* to process the video stream for a given subscription, including complex multi-stage AI pipelines with database persistence.
## 4. Messages from Worker to Backend ## 4. Messages from Worker to Backend
@ -79,6 +105,15 @@ Sent when the worker detects a relevant object. The `detection` object should be
- **Type:** `imageDetection` - **Type:** `imageDetection`
**Enhanced Detection Capabilities:**
The current implementation supports multi-class detection with parallel classification processing. When a vehicle is detected, the system:
1. **Multi-Class Detection**: Simultaneously detects "Car" and "Frontal" classes
2. **Parallel Processing**: Runs brand and body type classification concurrently
3. **Database Integration**: Automatically creates and updates PostgreSQL records
4. **Redis Storage**: Saves cropped frontal images with expiration
**Payload Example:** **Payload Example:**
```json ```json
@ -88,19 +123,38 @@ Sent when the worker detects a relevant object. The `detection` object should be
"timestamp": "2025-07-14T12:34:56.789Z", "timestamp": "2025-07-14T12:34:56.789Z",
"data": { "data": {
"detection": { "detection": {
"carModel": "Civic", "class": "Car",
"confidence": 0.92,
"carBrand": "Honda", "carBrand": "Honda",
"carYear": 2023, "carModel": "Civic",
"bodyType": "Sedan", "bodyType": "Sedan",
"licensePlateText": "ABCD1234", "branch_results": {
"licensePlateConfidence": 0.95 "car_brand_cls_v1": {
"class": "Honda",
"confidence": 0.89,
"brand": "Honda"
},
"car_bodytype_cls_v1": {
"class": "Sedan",
"confidence": 0.85,
"body_type": "Sedan"
}
}
}, },
"modelId": 101, "modelId": 101,
"modelName": "US-LPR-and-Vehicle-ID" "modelName": "Car Frontal Detection V1"
} }
} }
``` ```
**Database Integration:**
Each detection automatically:
- Creates a record in `gas_station_1.car_frontal_info` table
- Generates a unique `session_id` for tracking
- Updates the record with classification results after parallel processing completes
- Stores cropped frontal images in Redis with the session_id as key
### 4.3. Patch Session ### 4.3. Patch Session
> **Note:** Patch messages are only used when the worker can't keep up and needs to retroactively send detections. Normally, detections should be sent in real-time using `imageDetection` messages. Use `patchSession` only to update session data after the fact. > **Note:** Patch messages are only used when the worker can't keep up and needs to retroactively send detections. Normally, detections should be sent in real-time using `imageDetection` messages. Use `patchSession` only to update session data after the fact.
@ -117,9 +171,9 @@ Allows the worker to request a modification to an active session's data. The `da
"sessionId": 12345, "sessionId": 12345,
"data": { "data": {
"currentCar": { "currentCar": {
"carModel": "Civic", "carModel": "Civic",
"carBrand": "Honda", "carBrand": "Honda",
"licensePlateText": "ABCD1234" "licensePlateText": "ABCD1234"
} }
} }
} }
@ -133,33 +187,24 @@ The `data` object in the `patchSession` message is merged with the existing `Dis
```typescript ```typescript
interface DisplayPersistentData { interface DisplayPersistentData {
progressionStage: progressionStage: "welcome" | "car_fueling" | "car_waitpayment" | "car_postpayment" | null;
| 'welcome' qrCode: string | null;
| 'car_fueling' adsPlayback: {
| 'car_waitpayment' playlistSlotOrder: number; // The 'order' of the current slot
| 'car_postpayment' adsId: number | null;
| null; adsUrl: string | null;
qrCode: string | null; } | null;
adsPlayback: { currentCar: {
playlistSlotOrder: number; // The 'order' of the current slot carModel?: string;
adsId: number | null; carBrand?: string;
adsUrl: string | null; carYear?: number;
} | null; bodyType?: string;
currentCar: { licensePlateText?: string;
carModel?: string; licensePlateType?: string;
carBrand?: string; } | null;
carYear?: number; fuelPump: { /* FuelPumpData structure */ } | null;
bodyType?: string; weatherData: { /* WeatherResponse structure */ } | null;
licensePlateText?: string; sessionId: number | null;
licensePlateType?: string;
} | null;
fuelPump: {
/* FuelPumpData structure */
} | null;
weatherData: {
/* WeatherResponse structure */
} | null;
sessionId: number | null;
} }
``` ```
@ -212,7 +257,7 @@ Instructs the worker to process a camera's RTSP stream using the configuration f
> - Capture each snapshot only once per cycle, and reuse it for all display subscriptions sharing that camera. > - Capture each snapshot only once per cycle, and reuse it for all display subscriptions sharing that camera.
> - Capture each frame/image only once per cycle. > - Capture each frame/image only once per cycle.
> - Reuse the same captured image and snapshot for all display subscriptions that share the camera, processing and routing detection results separately for each display as needed. > - Reuse the same captured image and snapshot for all display subscriptions that share the camera, processing and routing detection results separately for each display as needed.
> This avoids unnecessary load and bandwidth usage, and ensures consistent detection results and snapshots across all displays sharing the same camera. > This avoids unnecessary load and bandwidth usage, and ensures consistent detection results and snapshots across all displays sharing the same camera.
### 5.2. Unsubscribe from Camera ### 5.2. Unsubscribe from Camera
@ -324,7 +369,7 @@ This section shows a typical sequence of messages between the backend and the wo
> **Note:** Unsubscribe is triggered when a user removes a camera or when the node is too heavily loaded and needs rebalancing. > **Note:** Unsubscribe is triggered when a user removes a camera or when the node is too heavily loaded and needs rebalancing.
1. **Connection Established** & **Heartbeat** 1. **Connection Established** & **Heartbeat**
- **Worker -> Backend** * **Worker -> Backend**
```json ```json
{ {
"type": "stateReport", "type": "stateReport",
@ -336,7 +381,7 @@ This section shows a typical sequence of messages between the backend and the wo
} }
``` ```
2. **Backend Subscribes Camera** 2. **Backend Subscribes Camera**
- **Backend -> Worker** * **Backend -> Worker**
```json ```json
{ {
"type": "subscribe", "type": "subscribe",
@ -350,7 +395,7 @@ This section shows a typical sequence of messages between the backend and the wo
} }
``` ```
3. **Worker Acknowledges in Heartbeat** 3. **Worker Acknowledges in Heartbeat**
- **Worker -> Backend** * **Worker -> Backend**
```json ```json
{ {
"type": "stateReport", "type": "stateReport",
@ -369,7 +414,7 @@ This section shows a typical sequence of messages between the backend and the wo
} }
``` ```
4. **Worker Detects a Car** 4. **Worker Detects a Car**
- **Worker -> Backend** * **Worker -> Backend**
```json ```json
{ {
"type": "imageDetection", "type": "imageDetection",
@ -388,7 +433,7 @@ This section shows a typical sequence of messages between the backend and the wo
} }
} }
``` ```
- **Worker -> Backend** * **Worker -> Backend**
```json ```json
{ {
"type": "imageDetection", "type": "imageDetection",
@ -407,7 +452,7 @@ This section shows a typical sequence of messages between the backend and the wo
} }
} }
``` ```
- **Worker -> Backend** * **Worker -> Backend**
```json ```json
{ {
"type": "imageDetection", "type": "imageDetection",
@ -427,7 +472,7 @@ This section shows a typical sequence of messages between the backend and the wo
} }
``` ```
5. **Backend Unsubscribes Camera** 5. **Backend Unsubscribes Camera**
- **Backend -> Worker** * **Backend -> Worker**
```json ```json
{ {
"type": "unsubscribe", "type": "unsubscribe",
@ -437,7 +482,7 @@ This section shows a typical sequence of messages between the backend and the wo
} }
``` ```
6. **Worker Acknowledges Unsubscription** 6. **Worker Acknowledges Unsubscription**
- **Worker -> Backend** * **Worker -> Backend**
```json ```json
{ {
"type": "stateReport", "type": "stateReport",
@ -448,7 +493,6 @@ This section shows a typical sequence of messages between the backend and the wo
"cameraConnections": [] "cameraConnections": []
} }
``` ```
## 7. HTTP API: Image Retrieval ## 7. HTTP API: Image Retrieval
In addition to the WebSocket protocol, the worker exposes an HTTP endpoint for retrieving the latest image frame from a camera. In addition to the WebSocket protocol, the worker exposes an HTTP endpoint for retrieving the latest image frame from a camera.
@ -464,13 +508,11 @@ GET /camera/{camera_id}/image
### Response ### Response
- **Success (200):** Returns the latest JPEG image from the camera stream. - **Success (200):** Returns the latest JPEG image from the camera stream.
- `Content-Type: image/jpeg`
- `Content-Type: image/jpeg` - Binary JPEG data.
- Binary JPEG data.
- **Error (404):** If the camera is not found or no frame is available. - **Error (404):** If the camera is not found or no frame is available.
- JSON error response.
- JSON error response.
- **Error (500):** Internal server error. - **Error (500):** Internal server error.
@ -483,9 +525,9 @@ GET /camera/display-001;cam-001/image
### Example Response ### Example Response
- **Headers:** - **Headers:**
``` ```
Content-Type: image/jpeg Content-Type: image/jpeg
``` ```
- **Body:** Binary JPEG image. - **Body:** Binary JPEG image.
### Notes ### Notes