Compare commits

..

1 commit

Author SHA1 Message Date
Pongsatorn
aa4e0463d4 new logic 2025-05-12 19:19:40 +07:00
29 changed files with 811 additions and 10038 deletions

View file

@ -1,112 +0,0 @@
name: Build Worker Base and Application Images
on:
push:
branches:
- main
- dev
workflow_dispatch:
inputs:
force_base_build:
description: 'Force base image build regardless of changes'
required: false
default: 'false'
type: boolean
jobs:
check-base-changes:
runs-on: ubuntu-latest
outputs:
base-changed: ${{ steps.changes.outputs.base-changed }}
steps:
- name: Checkout code
uses: actions/checkout@v3
with:
fetch-depth: 2
- name: Check for base changes
id: changes
run: |
if git diff HEAD^ HEAD --name-only | grep -E "(Dockerfile\.base|requirements\.base\.txt)" > /dev/null; then
echo "base-changed=true" >> $GITHUB_OUTPUT
else
echo "base-changed=false" >> $GITHUB_OUTPUT
fi
build-base:
needs: check-base-changes
if: needs.check-base-changes.outputs.base-changed == 'true' || (github.event_name == 'workflow_dispatch' && github.event.inputs.force_base_build == 'true')
runs-on: ubuntu-latest
permissions:
packages: write
steps:
- name: Checkout code
uses: actions/checkout@v3
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v2
- name: Login to GitHub Container Registry
uses: docker/login-action@v3
with:
registry: git.siwatsystem.com
username: ${{ github.actor }}
password: ${{ secrets.RUNNER_TOKEN }}
- name: Build and push base Docker image
uses: docker/build-push-action@v4
with:
context: .
file: ./Dockerfile.base
push: true
tags: git.siwatsystem.com/adsist-cms/worker-base:latest
build-docker:
needs: [check-base-changes, build-base]
if: always() && (needs.build-base.result == 'success' || needs.build-base.result == 'skipped')
runs-on: ubuntu-latest
permissions:
packages: write
steps:
- name: Checkout code
uses: actions/checkout@v3
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v2
- name: Login to GitHub Container Registry
uses: docker/login-action@v3
with:
registry: git.siwatsystem.com
username: ${{ github.actor }}
password: ${{ secrets.RUNNER_TOKEN }}
- name: Build and push Docker image
uses: docker/build-push-action@v4
with:
context: .
file: ./Dockerfile
push: true
tags: git.siwatsystem.com/adsist-cms/worker:${{ github.ref_name == 'main' && 'latest' || 'dev' }}
deploy-stack:
needs: build-docker
runs-on: adsist
steps:
- name: Checkout code
uses: actions/checkout@v3
- name: Set up SSH connection
run: |
mkdir -p ~/.ssh
echo "${{ secrets.DEPLOY_KEY_CMS }}" > ~/.ssh/id_rsa
chmod 600 ~/.ssh/id_rsa
ssh-keyscan -H ${{ vars.DEPLOY_HOST_CMS }} >> ~/.ssh/known_hosts
- name: Deploy stack
run: |
echo "Pulling and starting containers on server..."
if [ "${{ github.ref_name }}" = "main" ]; then
echo "Deploying production stack..."
ssh -i ~/.ssh/id_rsa ${{ vars.DEPLOY_USER_CMS }}@${{ vars.DEPLOY_HOST_CMS }} "cd ~/cms-system-k8s && docker compose -f docker-compose.production.yml pull && docker compose -f docker-compose.production.yml up -d"
else
echo "Deploying staging stack..."
ssh -i ~/.ssh/id_rsa ${{ vars.DEPLOY_USER_CMS }}@${{ vars.DEPLOY_HOST_CMS }} "cd ~/cms-system-k8s && docker compose -f docker-compose.staging.yml pull && docker compose -f docker-compose.staging.yml up -d"
fi

12
.gitignore vendored
View file

@ -6,14 +6,4 @@ app.log
__pycache__/
.mptacache
mptas
detector_worker.log
.gitignore
no_frame_debug.log
feeder/
.venv/
.vscode/
dist/
websocket_comm.log
temp_debug/
mptas

277
CLAUDE.md
View file

@ -1,277 +0,0 @@
# Python Detector Worker - CLAUDE.md
## Project Overview
This is a FastAPI-based computer vision detection worker that processes video streams from RTSP/HTTP sources and runs advanced YOLO-based machine learning pipelines for multi-class object detection and parallel classification. The system features comprehensive database integration, Redis support, and hierarchical pipeline execution designed to work within a larger CMS (Content Management System) architecture.
### Key Features
- **Multi-Class Detection**: Simultaneous detection of multiple object classes (e.g., Car + Frontal)
- **Parallel Processing**: Concurrent execution of classification branches using ThreadPoolExecutor
- **Database Integration**: Automatic PostgreSQL schema management and record updates
- **Redis Actions**: Image storage with region cropping and pub/sub messaging
- **Pipeline Synchronization**: Branch coordination with `waitForBranches` functionality
- **Dynamic Field Mapping**: Template-based field resolution for database operations
## Architecture & Technology Stack
- **Framework**: FastAPI with WebSocket support
- **ML/CV**: PyTorch, Ultralytics YOLO, OpenCV
- **Containerization**: Docker (Python 3.13-bookworm base)
- **Data Storage**: Redis integration for action handling + PostgreSQL for persistent storage
- **Database**: Automatic schema management with gas_station_1 database
- **Parallel Processing**: ThreadPoolExecutor for concurrent classification
- **Communication**: WebSocket-based real-time protocol
## Core Components
### Main Application (`app.py`)
- **FastAPI WebSocket server** for real-time communication
- **Multi-camera stream management** with shared stream optimization
- **HTTP REST endpoint** for image retrieval (`/camera/{camera_id}/image`)
- **Threading-based frame readers** for RTSP streams and HTTP snapshots
- **Model loading and inference** using MPTA (Machine Learning Pipeline Archive) format
- **Session management** with display identifier mapping
- **Resource monitoring** (CPU, memory, GPU usage via psutil)
### Pipeline System (`siwatsystem/pympta.py`)
- **MPTA file handling** - ZIP archives containing model configurations
- **Hierarchical pipeline execution** with detection → classification branching
- **Multi-class detection** - Simultaneous detection of multiple classes (Car + Frontal)
- **Parallel processing** - Concurrent classification branches with ThreadPoolExecutor
- **Redis action system** - Image saving with region cropping and message publishing
- **PostgreSQL integration** - Automatic table creation and combined updates
- **Dynamic model loading** with GPU optimization
- **Configurable trigger classes and confidence thresholds**
- **Branch synchronization** - waitForBranches coordination for database updates
### Database System (`siwatsystem/database.py`)
- **DatabaseManager class** for PostgreSQL operations
- **Automatic table creation** with gas_station_1.car_frontal_info schema
- **Combined update operations** with field mapping from branch results
- **Session management** with UUID generation
- **Error handling** and connection management
### Testing & Debugging
- **Protocol test script** (`test_protocol.py`) for WebSocket communication validation
- **Pipeline webcam utility** (`pipeline_webcam.py`) for local testing with visual output
- **RTSP streaming debug tool** (`debug/rtsp_webcam.py`) using GStreamer
## Code Conventions & Patterns
### Logging
- **Structured logging** using Python's logging module
- **File + console output** to `detector_worker.log`
- **Debug level separation** for detailed troubleshooting
- **Context-aware messages** with camera IDs and model information
### Error Handling
- **Graceful failure handling** with retry mechanisms (configurable max_retries)
- **Thread-safe operations** using locks for streams and models
- **WebSocket disconnect handling** with proper cleanup
- **Model loading validation** with detailed error reporting
### Configuration
- **JSON configuration** (`config.json`) for runtime parameters:
- `poll_interval_ms`: Frame processing interval
- `max_streams`: Concurrent stream limit
- `target_fps`: Target frame rate
- `reconnect_interval_sec`: Stream reconnection delay
- `max_retries`: Maximum retry attempts (-1 for unlimited)
### Threading Model
- **Frame reader threads** for each camera stream (RTSP/HTTP)
- **Shared stream optimization** - multiple subscriptions can reuse the same camera stream
- **Async WebSocket handling** with concurrent task management
- **Thread-safe data structures** with proper locking mechanisms
## WebSocket Protocol
### Message Types
- **subscribe**: Start camera stream with model pipeline
- **unsubscribe**: Stop camera stream processing
- **requestState**: Request current worker status
- **setSessionId**: Associate display with session identifier
- **patchSession**: Update session data
- **stateReport**: Periodic heartbeat with system metrics
- **imageDetection**: Detection results with timestamp and model info
### Subscription Format
```json
{
"type": "subscribe",
"payload": {
"subscriptionIdentifier": "display-001;cam-001",
"rtspUrl": "rtsp://...", // OR snapshotUrl
"snapshotUrl": "http://...",
"snapshotInterval": 5000,
"modelUrl": "http://...model.mpta",
"modelId": 101,
"modelName": "Vehicle Detection",
"cropX1": 100, "cropY1": 200,
"cropX2": 300, "cropY2": 400
}
}
```
## Model Pipeline (MPTA) Format
### Enhanced Structure
- **ZIP archive** containing models and configuration
- **pipeline.json** - Main configuration file with Redis + PostgreSQL settings
- **Model files** - YOLO .pt files for detection/classification
- **Multi-model support** - Detection + multiple classification models
### Advanced Pipeline Flow
1. **Multi-class detection stage** - YOLO detection of Car + Frontal simultaneously
2. **Validation stage** - Check for expected classes (flexible matching)
3. **Database initialization** - Create initial record with session_id
4. **Redis actions** - Save cropped frontal images with expiration
5. **Parallel classification** - Concurrent brand and body type classification
6. **Branch synchronization** - Wait for all classification branches to complete
7. **Database update** - Combined update with all classification results
### Enhanced Branch Configuration
```json
{
"modelId": "car_frontal_detection_v1",
"modelFile": "car_frontal_detection_v1.pt",
"multiClass": true,
"expectedClasses": ["Car", "Frontal"],
"triggerClasses": ["Car", "Frontal"],
"minConfidence": 0.8,
"actions": [
{
"type": "redis_save_image",
"region": "Frontal",
"key": "inference:{display_id}:{timestamp}:{session_id}:{filename}",
"expire_seconds": 600
}
],
"branches": [
{
"modelId": "car_brand_cls_v1",
"modelFile": "car_brand_cls_v1.pt",
"parallel": true,
"crop": true,
"cropClass": "Frontal",
"triggerClasses": ["Frontal"],
"minConfidence": 0.85
}
],
"parallelActions": [
{
"type": "postgresql_update_combined",
"table": "car_frontal_info",
"key_field": "session_id",
"waitForBranches": ["car_brand_cls_v1", "car_bodytype_cls_v1"],
"fields": {
"car_brand": "{car_brand_cls_v1.brand}",
"car_body_type": "{car_bodytype_cls_v1.body_type}"
}
}
]
}
```
## Stream Management
### Shared Streams
- Multiple subscriptions can share the same camera URL
- Reference counting prevents premature stream termination
- Automatic cleanup when last subscription ends
### Frame Processing
- **Queue-based buffering** with single frame capacity (latest frame only)
- **Configurable polling interval** based on target FPS
- **Automatic reconnection** with exponential backoff
## Development & Testing
### Local Development
```bash
# Install dependencies
pip install -r requirements.txt
# Run the worker
python app.py
# Test protocol compliance
python test_protocol.py
# Test pipeline with webcam
python pipeline_webcam.py --mpta-file path/to/model.mpta --video 0
```
### Docker Deployment
```bash
# Build container
docker build -t detector-worker .
# Run with volume mounts for models
docker run -p 8000:8000 -v ./models:/app/models detector-worker
```
### Testing Commands
- **Protocol testing**: `python test_protocol.py`
- **Pipeline validation**: `python pipeline_webcam.py --mpta-file <path> --video 0`
- **RTSP debugging**: `python debug/rtsp_webcam.py`
## Dependencies
- **fastapi[standard]**: Web framework with WebSocket support
- **uvicorn**: ASGI server
- **torch, torchvision**: PyTorch for ML inference
- **ultralytics**: YOLO implementation
- **opencv-python**: Computer vision operations
- **websockets**: WebSocket client/server
- **redis**: Redis client for action execution
- **psycopg2-binary**: PostgreSQL database adapter
- **scipy**: Scientific computing for advanced algorithms
- **filterpy**: Kalman filtering and state estimation
## Security Considerations
- Model files are loaded from trusted sources only
- Redis connections use authentication when configured
- WebSocket connections handle disconnects gracefully
- Resource usage is monitored to prevent DoS
## Database Integration
### Schema Management
The system automatically creates and manages PostgreSQL tables:
```sql
CREATE TABLE IF NOT EXISTS gas_station_1.car_frontal_info (
display_id VARCHAR(255),
captured_timestamp VARCHAR(255),
session_id VARCHAR(255) PRIMARY KEY,
license_character VARCHAR(255) DEFAULT NULL,
license_type VARCHAR(255) DEFAULT 'No model available',
car_brand VARCHAR(255) DEFAULT NULL,
car_model VARCHAR(255) DEFAULT NULL,
car_body_type VARCHAR(255) DEFAULT NULL,
created_at TIMESTAMP DEFAULT NOW(),
updated_at TIMESTAMP DEFAULT NOW()
);
```
### Workflow
1. **Detection**: When both "Car" and "Frontal" are detected, create initial database record with UUID session_id
2. **Redis Storage**: Save cropped frontal image to Redis with session_id in key
3. **Parallel Processing**: Run brand and body type classification concurrently
4. **Synchronization**: Wait for all branches to complete using `waitForBranches`
5. **Database Update**: Update record with combined classification results using field mapping
### Field Mapping
Templates like `{car_brand_cls_v1.brand}` are resolved to actual classification results:
- `car_brand_cls_v1.brand` → "Honda"
- `car_bodytype_cls_v1.body_type` → "Sedan"
## Performance Optimizations
- GPU acceleration when CUDA is available
- Shared camera streams reduce resource usage
- Frame queue optimization (single latest frame)
- Model caching across subscriptions
- Trigger class filtering for faster inference
- Parallel processing with ThreadPoolExecutor for classification branches
- Multi-class detection reduces inference passes
- Region-based cropping minimizes processing overhead
- Database connection pooling and prepared statements
- Redis image storage with automatic expiration

View file

@ -1,11 +1,19 @@
# Use our pre-built base image with ML dependencies
FROM git.siwatsystem.com/adsist-cms/worker-base:latest
# Use the official Python image from the Docker Hub
FROM python:3.13-bookworm
# Copy and install application requirements (frequently changing dependencies)
# Set the working directory in the container
WORKDIR /app
# Copy the requirements file into the container at /app
COPY requirements.txt .
# Update apt, install libgl1, and clear apt cache
RUN apt update && apt install -y libgl1 && rm -rf /var/lib/apt/lists/*
# Install any dependencies specified in requirements.txt
RUN pip install --no-cache-dir -r requirements.txt
# Copy the application code
# Copy the rest of the application code into the container at /app
COPY . .
# Run the application

View file

@ -1,24 +0,0 @@
# Base image with all ML dependencies
FROM pytorch/pytorch:2.8.0-cuda12.6-cudnn9-runtime
# Install system dependencies
RUN apt update && apt install -y \
libgl1 \
libglib2.0-0 \
libgstreamer1.0-0 \
libgtk-3-0 \
libavcodec58 \
libavformat58 \
libswscale5 \
libgomp1 \
&& rm -rf /var/lib/apt/lists/*
# Copy and install base requirements (ML dependencies that rarely change)
COPY requirements.base.txt .
RUN pip install --no-cache-dir -r requirements.base.txt
# Set working directory
WORKDIR /app
# This base image will be reused for all worker builds
CMD ["python3", "-m", "fastapi", "run", "--host", "0.0.0.0", "--port", "8000"]

2241
app.py

File diff suppressed because it is too large Load diff

366
app_single.py Normal file
View file

@ -0,0 +1,366 @@
from typing import List
from fastapi import FastAPI, WebSocket
from fastapi.websockets import WebSocketDisconnect
from websockets.exceptions import ConnectionClosedError
from ultralytics import YOLO
import torch
import cv2
import base64
import numpy as np
import json
import logging
import threading
import queue
import os
import requests
from urllib.parse import urlparse
import asyncio
import psutil
app = FastAPI()
models = {}
with open("config.json", "r") as f:
config = json.load(f)
poll_interval = config.get("poll_interval_ms", 100)
reconnect_interval = config.get("reconnect_interval_sec", 5)
TARGET_FPS = config.get("target_fps", 10)
poll_interval = 1000 / TARGET_FPS
logging.info(f"Poll interval: {poll_interval}ms")
max_streams = config.get("max_streams", 5)
max_retries = config.get("max_retries", 3)
# Configure logging
logging.basicConfig(
level=logging.DEBUG,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[
logging.FileHandler("app.log"),
logging.StreamHandler()
]
)
# Ensure the models directory exists
os.makedirs("models", exist_ok=True)
# Add constants for heartbeat
HEARTBEAT_INTERVAL = 2 # seconds
WORKER_TIMEOUT_MS = 10000
# Add a lock for thread-safe operations on shared resources
streams_lock = threading.Lock()
models_lock = threading.Lock()
@app.websocket("/")
async def detect(websocket: WebSocket):
import asyncio
import time
logging.info("WebSocket connection accepted")
streams = {}
# This function is user-modifiable
# Save data you want to persist across frames in the persistent_data dictionary
async def handle_detection(camera_id, stream, frame, websocket, model: YOLO, persistent_data):
try:
highest_conf_box = None
max_conf = -1
for r in model.track(frame, stream=False, persist=True):
for box in r.boxes:
box_cpu = box.cpu()
conf = float(box_cpu.conf[0])
if conf > max_conf and hasattr(box, "id") and box.id is not None:
max_conf = conf
highest_conf_box = {
"class": model.names[int(box_cpu.cls[0])],
"confidence": conf,
"id": box.id.item(),
}
# Broadcast to all subscribers of this URL
detection_data = {
"type": "imageDetection",
"cameraIdentifier": camera_id,
"timestamp": time.time(),
"data": {
"detections": highest_conf_box if highest_conf_box else None,
"modelId": stream['modelId'],
"modelName": stream['modelName']
}
}
logging.debug(f"Sending detection data for camera {camera_id}: {detection_data}")
await websocket.send_json(detection_data)
return persistent_data
except Exception as e:
logging.error(f"Error in handle_detection for camera {camera_id}: {e}")
return persistent_data
def frame_reader(camera_id, cap, buffer, stop_event):
import time
retries = 0
try:
while not stop_event.is_set():
try:
ret, frame = cap.read()
if not ret:
logging.warning(f"Connection lost for camera: {camera_id}, retry {retries+1}/{max_retries}")
cap.release()
time.sleep(reconnect_interval)
retries += 1
if retries > max_retries and max_retries != -1:
logging.error(f"Max retries reached for camera: {camera_id}")
break
# Re-open the VideoCapture
cap = cv2.VideoCapture(streams[camera_id]['rtsp_url'])
if not cap.isOpened():
logging.error(f"Failed to reopen RTSP stream for camera: {camera_id}")
continue
continue
retries = 0 # Reset on success
if not buffer.empty():
try:
buffer.get_nowait() # Discard the old frame
except queue.Empty:
pass
buffer.put(frame)
except cv2.error as e:
logging.error(f"OpenCV error for camera {camera_id}: {e}")
cap.release()
time.sleep(reconnect_interval)
retries += 1
if retries > max_retries and max_retries != -1:
logging.error(f"Max retries reached after OpenCV error for camera: {camera_id}")
break
# Re-open the VideoCapture
cap = cv2.VideoCapture(streams[camera_id]['rtsp_url'])
if not cap.isOpened():
logging.error(f"Failed to reopen RTSP stream for camera {camera_id} after OpenCV error")
continue
except Exception as e:
logging.error(f"Unexpected error for camera {camera_id}: {e}")
cap.release()
break
except Exception as e:
logging.error(f"Error in frame_reader thread for camera {camera_id}: {e}")
async def process_streams():
global models
logging.info("Started processing streams")
persistent_data_dict = {}
try:
while True:
start_time = time.time()
# Round-robin processing
with streams_lock:
current_streams = list(streams.items())
for camera_id, stream in current_streams:
buffer = stream['buffer']
if not buffer.empty():
frame = buffer.get()
with models_lock:
model = models.get(camera_id, {}).get(stream['modelId'])
key = (camera_id, stream['modelId'])
persistent_data = persistent_data_dict.get(key, {})
updated_persistent_data = await handle_detection(camera_id, stream, frame, websocket, model, persistent_data)
persistent_data_dict[key] = updated_persistent_data
elapsed_time = (time.time() - start_time) * 1000 # in ms
sleep_time = max(poll_interval - elapsed_time, 0)
logging.debug(f"Elapsed time: {elapsed_time}ms, sleeping for: {sleep_time}ms")
await asyncio.sleep(sleep_time / 1000.0)
except asyncio.CancelledError:
logging.info("Stream processing task cancelled")
except Exception as e:
logging.error(f"Error in process_streams: {e}")
async def send_heartbeat():
while True:
try:
cpu_usage = psutil.cpu_percent()
memory_usage = psutil.virtual_memory().percent
if torch.cuda.is_available():
gpu_usage = torch.cuda.memory_allocated() / (1024 ** 2) # Convert to MB
gpu_memory_usage = torch.cuda.memory_reserved() / (1024 ** 2) # Convert to MB
else:
gpu_usage = None
gpu_memory_usage = None
camera_connections = [
{
"cameraIdentifier": camera_id,
"modelId": stream['modelId'],
"modelName": stream['modelName'],
"online": True
}
for camera_id, stream in streams.items()
]
state_report = {
"type": "stateReport",
"cpuUsage": cpu_usage,
"memoryUsage": memory_usage,
"gpuUsage": gpu_usage,
"gpuMemoryUsage": gpu_memory_usage,
"cameraConnections": camera_connections
}
await websocket.send_text(json.dumps(state_report))
logging.debug("Sent stateReport as heartbeat")
await asyncio.sleep(HEARTBEAT_INTERVAL)
except Exception as e:
logging.error(f"Error sending stateReport heartbeat: {e}")
break
async def on_message():
global models
while True:
try:
msg = await websocket.receive_text()
logging.debug(f"Received message: {msg}")
print(f"Received message: {msg}")
data = json.loads(msg)
msg_type = data.get("type")
if msg_type == "subscribe":
payload = data.get("payload", {})
camera_id = payload.get("cameraIdentifier")
rtsp_url = payload.get("rtspUrl")
model_url = payload.get("modelUrl")
modelId = payload.get("modelId")
modelName = payload.get("modelName")
if model_url:
with models_lock:
if camera_id not in models:
models[camera_id] = {}
if modelId not in models[camera_id]:
print(f"Downloading model from {model_url}")
parsed_url = urlparse(model_url)
filename = os.path.basename(parsed_url.path)
model_filename = os.path.join("models", filename)
# Download the model
response = requests.get(model_url, stream=True)
if response.status_code == 200:
with open(model_filename, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
logging.info(f"Downloaded model from {model_url} to {model_filename}")
model = YOLO(model_filename)
if torch.cuda.is_available():
model.to('cuda')
models[camera_id][modelId] = model
logging.info(f"Loaded model {modelId} for camera {camera_id}")
else:
logging.error(f"Failed to download model from {model_url}")
continue
if camera_id and rtsp_url:
with streams_lock:
if camera_id not in streams and len(streams) < max_streams:
cap = cv2.VideoCapture(rtsp_url)
if not cap.isOpened():
logging.error(f"Failed to open RTSP stream for camera {camera_id}")
continue
buffer = queue.Queue(maxsize=1)
stop_event = threading.Event()
thread = threading.Thread(target=frame_reader, args=(camera_id, cap, buffer, stop_event))
thread.daemon = True
thread.start()
streams[camera_id] = {
'cap': cap,
'buffer': buffer,
'thread': thread,
'rtsp_url': rtsp_url,
'stop_event': stop_event,
'modelId': modelId,
'modelName': modelName
}
logging.info(f"Subscribed to camera {camera_id} with modelId {modelId}, modelName {modelName} and URL {rtsp_url}")
elif camera_id and camera_id in streams:
stream = streams.pop(camera_id)
stream['cap'].release()
logging.info(f"Unsubscribed from camera {camera_id}")
if camera_id in models and modelId in models[camera_id]:
del models[camera_id][modelId]
if not models[camera_id]:
del models[camera_id]
elif msg_type == "unsubscribe":
payload = data.get("payload", {})
camera_id = payload.get("cameraIdentifier")
logging.debug(f"Unsubscribing from camera {camera_id}")
with streams_lock:
if camera_id and camera_id in streams:
stream = streams.pop(camera_id)
stream['stop_event'].set()
stream['thread'].join()
stream['cap'].release()
logging.info(f"Unsubscribed from camera {camera_id}")
if camera_id in models and modelId in models[camera_id]:
del models[camera_id][modelId]
if not models[camera_id]:
del models[camera_id]
elif msg_type == "requestState":
# Handle state request
cpu_usage = psutil.cpu_percent()
memory_usage = psutil.virtual_memory().percent
if torch.cuda.is_available():
gpu_usage = torch.cuda.memory_allocated() / (1024 ** 2) # Convert to MB
gpu_memory_usage = torch.cuda.memory_reserved() / (1024 ** 2) # Convert to MB
else:
gpu_usage = None
gpu_memory_usage = None
camera_connections = [
{
"cameraIdentifier": camera_id,
"modelId": stream['modelId'],
"modelName": stream['modelName'],
"online": True
}
for camera_id, stream in streams.items()
]
state_report = {
"type": "stateReport",
"cpuUsage": cpu_usage,
"memoryUsage": memory_usage,
"gpuUsage": gpu_usage,
"gpuMemoryUsage": gpu_memory_usage,
"cameraConnections": camera_connections
}
await websocket.send_text(json.dumps(state_report))
else:
logging.error(f"Unknown message type: {msg_type}")
except json.JSONDecodeError:
logging.error("Received invalid JSON message")
except (WebSocketDisconnect, ConnectionClosedError) as e:
logging.warning(f"WebSocket disconnected: {e}")
break
except Exception as e:
logging.error(f"Error handling message: {e}")
break
try:
await websocket.accept()
task = asyncio.create_task(process_streams())
heartbeat_task = asyncio.create_task(send_heartbeat())
message_task = asyncio.create_task(on_message())
await asyncio.gather(heartbeat_task, message_task)
except Exception as e:
logging.error(f"Error in detect websocket: {e}")
finally:
task.cancel()
await task
with streams_lock:
for camera_id, stream in streams.items():
stream['stop_event'].set()
stream['thread'].join()
stream['cap'].release()
stream['buffer'].queue.clear()
logging.info(f"Released camera {camera_id} and cleaned up resources")
streams.clear()
with models_lock:
models.clear()
logging.info("WebSocket connection closed")

View file

@ -1,6 +1,6 @@
{
"poll_interval_ms": 100,
"max_streams": 999,
"max_streams": 5,
"target_fps": 2,
"reconnect_interval_sec": 5,
"max_retries": -1

143
debug.py Normal file
View file

@ -0,0 +1,143 @@
import argparse
import os
import cv2
import time
import logging
import shutil
import threading # added threading
import yaml # for silencing YOLO
from siwatsystem.pympta import load_pipeline_from_zip, run_pipeline
# Configure logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
# Silence YOLO logging
os.environ["YOLO_VERBOSE"] = "False"
for logger_name in ["ultralytics", "ultralytics.hub", "ultralytics.yolo.utils"]:
logging.getLogger(logger_name).setLevel(logging.WARNING)
# Global variables for frame sharing
global_frame = None
global_ret = False
capture_running = False
def video_capture_loop(cap):
global global_frame, global_ret, capture_running
while capture_running:
global_ret, global_frame = cap.read()
time.sleep(0.01) # slight delay to reduce CPU usage
def clear_cache(cache_dir: str):
if os.path.exists(cache_dir):
shutil.rmtree(cache_dir)
def log_pipeline_flow(frame, model_tree, level=0):
"""
Wrapper around run_pipeline that logs the model flow and detection results.
Returns the same output as the original run_pipeline function.
"""
indent = " " * level
model_id = model_tree.get("modelId", "unknown")
logging.info(f"{indent}→ Running model: {model_id}")
detection, bbox = run_pipeline(frame, model_tree, return_bbox=True)
if detection:
confidence = detection.get("confidence", 0) * 100
class_name = detection.get("class", "unknown")
object_id = detection.get("id", "N/A")
logging.info(f"{indent}✓ Detected: {class_name} (ID: {object_id}, confidence: {confidence:.1f}%)")
# Check if any branches were triggered
triggered = False
for branch in model_tree.get("branches", []):
trigger_classes = branch.get("triggerClasses", [])
min_conf = branch.get("minConfidence", 0)
if class_name in trigger_classes and detection.get("confidence", 0) >= min_conf:
triggered = True
if branch.get("crop", False) and bbox:
x1, y1, x2, y2 = bbox
cropped_frame = frame[y1:y2, x1:x2]
logging.info(f"{indent} ⌊ Triggering branch with cropped region {x1},{y1} to {x2},{y2}")
branch_result = log_pipeline_flow(cropped_frame, branch, level + 1)
else:
logging.info(f"{indent} ⌊ Triggering branch with full frame")
branch_result = log_pipeline_flow(frame, branch, level + 1)
if branch_result[0]: # If branch detection successful, return it
return branch_result
if not triggered and model_tree.get("branches"):
logging.info(f"{indent} ⌊ No branches triggered")
else:
logging.info(f"{indent}✗ No detection for {model_id}")
return detection, bbox
def main(mpta_file: str, video_source: str):
global capture_running
CACHE_DIR = os.path.join(".", ".mptacache")
clear_cache(CACHE_DIR)
logging.info(f"Loading pipeline from local file: {mpta_file}")
model_tree = load_pipeline_from_zip(mpta_file, CACHE_DIR)
if model_tree is None:
logging.error("Failed to load pipeline.")
return
cap = cv2.VideoCapture(video_source)
if not cap.isOpened():
logging.error(f"Cannot open video source {video_source}")
return
# Start video capture in a separate thread
capture_running = True
capture_thread = threading.Thread(target=video_capture_loop, args=(cap,))
capture_thread.start()
logging.info("Press 'q' to exit.")
try:
while True:
# Use the global frame and ret updated by the thread
if not global_ret or global_frame is None:
continue # wait until a frame is available
frame = global_frame.copy() # local copy to work with
# Replace run_pipeline with our logging version
detection, bbox = log_pipeline_flow(frame, model_tree)
# Stop if "honda" is detected
if detection and detection.get("class", "").lower() == "toyota":
logging.info("Detected 'toyota'. Stopping pipeline.")
break
if bbox:
x1, y1, x2, y2 = bbox
cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
label = detection["class"] if detection else "Detection"
cv2.putText(frame, label, (x1, y1 - 10),
cv2.FONT_HERSHEY_SIMPLEX, 0.9, (36, 255, 12), 2)
cv2.imshow("Pipeline Webcam", frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
finally:
# Stop capture thread and cleanup
capture_running = False
capture_thread.join()
cap.release()
cv2.destroyAllWindows()
clear_cache(CACHE_DIR)
logging.info("Cleaned up .mptacache directory on shutdown.")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Run pipeline webcam utility.")
parser.add_argument("--mpta-file", type=str, required=True, help="Path to the local pipeline mpta (ZIP) file.")
parser.add_argument("--video", type=str, default="0", help="Video source (default webcam index 0).")
args = parser.parse_args()
video_source = int(args.video) if args.video.isdigit() else args.video
main(args.mpta_file, video_source)

View file

@ -1,142 +0,0 @@
#!/usr/bin/env python3
"""
Test script to check available camera indices
"""
import cv2
import logging
import sys
import subprocess
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s"
)
logger = logging.getLogger("camera_index_test")
def test_camera_index(index):
"""Test if a camera index is available"""
try:
cap = cv2.VideoCapture(index)
if cap.isOpened():
ret, frame = cap.read()
if ret and frame is not None:
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = cap.get(cv2.CAP_PROP_FPS)
cap.release()
return True, f"{width}x{height} @ {fps}fps"
else:
cap.release()
return False, "Can open but cannot read frames"
else:
cap.release()
return False, "Cannot open camera"
except Exception as e:
return False, f"Error: {str(e)}"
def get_windows_cameras_ffmpeg():
"""Get available cameras on Windows using FFmpeg"""
try:
result = subprocess.run(['ffmpeg', '-f', 'dshow', '-list_devices', 'true', '-i', 'dummy'],
capture_output=True, text=True, timeout=10, encoding='utf-8', errors='ignore')
output = result.stderr
lines = output.split('\n')
video_devices = []
# Parse the output - look for lines with (video) that contain device names in quotes
for line in lines:
if '[dshow @' in line and '(video)' in line and '"' in line:
# Extract device name between first pair of quotes
start = line.find('"') + 1
end = line.find('"', start)
if start > 0 and end > start:
device_name = line[start:end]
video_devices.append(device_name)
logger.info(f"FFmpeg detected video devices: {video_devices}")
return video_devices
except Exception as e:
logger.error(f"Failed to get Windows camera names: {e}")
return []
def main():
logger.info("=== Camera Index Test ===")
# Check FFmpeg availability for Windows device detection
ffmpeg_available = False
try:
result = subprocess.run(['ffmpeg', '-version'], capture_output=True, text=True, timeout=5)
if result.returncode == 0:
ffmpeg_available = True
logger.info("FFmpeg is available")
except:
logger.info("FFmpeg not available")
# Get Windows camera names if possible
if sys.platform.startswith('win') and ffmpeg_available:
logger.info("\n=== Windows Camera Devices (FFmpeg) ===")
cameras = get_windows_cameras_ffmpeg()
if cameras:
for i, camera in enumerate(cameras):
logger.info(f"Device {i}: {camera}")
else:
logger.info("No cameras detected via FFmpeg")
# Test camera indices 0-9
logger.info("\n=== Testing Camera Indices ===")
available_cameras = []
for index in range(10):
logger.info(f"Testing camera index {index}...")
is_available, info = test_camera_index(index)
if is_available:
logger.info(f"✓ Camera {index}: AVAILABLE - {info}")
available_cameras.append(index)
else:
logger.info(f"✗ Camera {index}: NOT AVAILABLE - {info}")
# Summary
logger.info("\n=== Summary ===")
if available_cameras:
logger.info(f"Available camera indices: {available_cameras}")
logger.info(f"Default camera index to use: {available_cameras[0]}")
# Test the first available camera more thoroughly
logger.info(f"\n=== Detailed Test for Camera {available_cameras[0]} ===")
cap = cv2.VideoCapture(available_cameras[0])
if cap.isOpened():
# Get properties
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = cap.get(cv2.CAP_PROP_FPS)
backend = cap.getBackendName()
logger.info(f"Resolution: {width}x{height}")
logger.info(f"FPS: {fps}")
logger.info(f"Backend: {backend}")
# Test frame capture
ret, frame = cap.read()
if ret and frame is not None:
logger.info(f"Frame capture: SUCCESS")
logger.info(f"Frame shape: {frame.shape}")
logger.info(f"Frame dtype: {frame.dtype}")
else:
logger.info(f"Frame capture: FAILED")
cap.release()
else:
logger.error("No cameras available!")
logger.info("Possible solutions:")
logger.info("1. Check if camera is connected and not used by another application")
logger.info("2. Check camera permissions")
logger.info("3. Try different camera indices")
logger.info("4. Install camera drivers")
if __name__ == "__main__":
main()

BIN
demoa.mpta Normal file

Binary file not shown.

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

23
pipeline.log Normal file
View file

@ -0,0 +1,23 @@
2025-05-12 18:10:04,590 [INFO] Loading pipeline from local file: demoa.mpta
2025-05-12 18:10:04,610 [INFO] Copied local .mpta file from demoa.mpta to .\.mptacache\pipeline.mpta
2025-05-12 18:10:04,901 [INFO] Extracted .mpta file to .\.mptacache
2025-05-12 18:10:04,905 [INFO] Loading model for node DetectionDraft from .\.mptacache\demoa\DetectionDraft.pt
2025-05-12 18:10:05,083 [INFO] Loading model for node ClassificationDraft from .\.mptacache\demoa\ClassificationDraft.pt
2025-05-12 18:10:08,035 [INFO] Press 'q' to exit.
2025-05-12 18:10:12,217 [INFO] Cleaned up .mptacache directory on shutdown.
2025-05-12 18:13:08,465 [INFO] Loading pipeline from local file: demoa.mpta
2025-05-12 18:13:08,512 [INFO] Copied local .mpta file from demoa.mpta to .\.mptacache\pipeline.mpta
2025-05-12 18:13:08,769 [INFO] Extracted .mpta file to .\.mptacache
2025-05-12 18:13:08,773 [INFO] Loading model for node DetectionDraft from .\.mptacache\demoa\DetectionDraft.pt
2025-05-12 18:13:09,083 [INFO] Loading model for node ClassificationDraft from .\.mptacache\demoa\ClassificationDraft.pt
2025-05-12 18:13:12,187 [INFO] Press 'q' to exit.
2025-05-12 18:13:14,146 [INFO] → Running model: DetectionDraft
2025-05-12 18:13:17,119 [INFO] Cleaned up .mptacache directory on shutdown.
2025-05-12 18:14:25,665 [INFO] Loading pipeline from local file: demoa.mpta
2025-05-12 18:14:25,687 [INFO] Copied local .mpta file from demoa.mpta to .\.mptacache\pipeline.mpta
2025-05-12 18:14:25,953 [INFO] Extracted .mpta file to .\.mptacache
2025-05-12 18:14:25,957 [INFO] Loading model for node DetectionDraft from .\.mptacache\demoa\DetectionDraft.pt
2025-05-12 18:14:26,138 [INFO] Loading model for node ClassificationDraft from .\.mptacache\demoa\ClassificationDraft.pt
2025-05-12 18:14:29,171 [INFO] Press 'q' to exit.
2025-05-12 18:14:30,146 [INFO] → Running model: DetectionDraft
2025-05-12 18:14:32,080 [INFO] Cleaned up .mptacache directory on shutdown.

327
pympta.md
View file

@ -1,327 +0,0 @@
# pympta: Modular Pipeline Task Executor
`pympta` is a Python module designed to load and execute modular, multi-stage AI pipelines defined in a special package format (`.mpta`). It is primarily used within the detector worker to run complex computer vision tasks where the output of one model can trigger a subsequent model on a specific region of interest.
## Core Concepts
### 1. MPTA Package (`.mpta`)
An `.mpta` file is a standard `.zip` archive with a different extension. It bundles all the necessary components for a pipeline to run.
A typical `.mpta` file has the following structure:
```
my_pipeline.mpta/
├── pipeline.json
├── model1.pt
├── model2.pt
└── ...
```
- **`pipeline.json`**: (Required) The manifest file that defines the structure of the pipeline, the models to use, and the logic connecting them.
- **Model Files (`.pt`, etc.)**: The actual pre-trained model files (e.g., PyTorch, ONNX). The pipeline currently uses `ultralytics.YOLO` models.
### 2. Pipeline Structure
A pipeline is a tree-like structure of "nodes," defined in `pipeline.json`.
- **Root Node**: The entry point of the pipeline. It processes the initial, full-frame image.
- **Branch Nodes**: Child nodes that are triggered by specific detection results from their parent. For example, a root node might detect a "vehicle," which then triggers a branch node to detect a "license plate" within the vehicle's bounding box.
This modular structure allows for creating complex and efficient inference logic, avoiding the need to run every model on every frame.
## `pipeline.json` Specification
This file defines the entire pipeline logic. The root object contains a `pipeline` key for the pipeline definition, optional `redis` key for Redis configuration, and optional `postgresql` key for database integration.
### Top-Level Object Structure
| Key | Type | Required | Description |
| ------------ | ------ | -------- | ------------------------------------------------------- |
| `pipeline` | Object | Yes | The root node object of the pipeline. |
| `redis` | Object | No | Configuration for connecting to a Redis server. |
| `postgresql` | Object | No | Configuration for connecting to a PostgreSQL database. |
### Redis Configuration (`redis`)
| Key | Type | Required | Description |
| ---------- | ------ | -------- | ------------------------------------------------------- |
| `host` | String | Yes | The hostname or IP address of the Redis server. |
| `port` | Number | Yes | The port number of the Redis server. |
| `password` | String | No | The password for Redis authentication. |
| `db` | Number | No | The Redis database number to use. Defaults to `0`. |
### PostgreSQL Configuration (`postgresql`)
| Key | Type | Required | Description |
| ---------- | ------ | -------- | ------------------------------------------------------- |
| `host` | String | Yes | The hostname or IP address of the PostgreSQL server. |
| `port` | Number | Yes | The port number of the PostgreSQL server. |
| `database` | String | Yes | The database name to connect to. |
| `username` | String | Yes | The username for database authentication. |
| `password` | String | Yes | The password for database authentication. |
### Node Object Structure
| Key | Type | Required | Description |
| ------------------- | ------------- | -------- | -------------------------------------------------------------------------------------------------------------------------------------- |
| `modelId` | String | Yes | A unique identifier for this model node (e.g., "vehicle-detector"). |
| `modelFile` | String | Yes | The path to the model file within the `.mpta` archive (e.g., "yolov8n.pt"). |
| `minConfidence` | Float | Yes | The minimum confidence score (0.0 to 1.0) required for a detection to be considered valid and potentially trigger a branch. |
| `triggerClasses` | Array<String> | Yes | A list of class names that, when detected by the parent, can trigger this node. For the root node, this lists all classes of interest. |
| `crop` | Boolean | No | If `true`, the image is cropped to the parent's detection bounding box before being passed to this node's model. Defaults to `false`. |
| `cropClass` | String | No | The specific class to use for cropping (e.g., "Frontal" for frontal view cropping). |
| `multiClass` | Boolean | No | If `true`, enables multi-class detection mode where multiple classes can be detected simultaneously. |
| `expectedClasses` | Array<String> | No | When `multiClass` is true, defines which classes are expected. At least one must be detected for processing to continue. |
| `parallel` | Boolean | No | If `true`, this branch will be processed in parallel with other parallel branches. |
| `branches` | Array<Node> | No | A list of child node objects that can be triggered by this node's detections. |
| `actions` | Array<Action> | No | A list of actions to execute upon a successful detection in this node. |
| `parallelActions` | Array<Action> | No | A list of actions to execute after all specified branches have completed. |
### Action Object Structure
Actions allow the pipeline to interact with Redis and PostgreSQL databases. They are executed sequentially for a given detection.
#### Action Context & Dynamic Keys
All actions have access to a dynamic context for formatting keys and messages. The context is created for each detection event and includes:
- All key-value pairs from the detection result (e.g., `class`, `confidence`, `id`).
- `{timestamp_ms}`: The current Unix timestamp in milliseconds.
- `{timestamp}`: Formatted timestamp string (YYYY-MM-DDTHH-MM-SS).
- `{uuid}`: A unique identifier (UUID4) for the detection event.
- `{filename}`: Generated filename with UUID.
- `{camera_id}`: Full camera subscription identifier.
- `{display_id}`: Display identifier extracted from subscription.
- `{session_id}`: Session ID for database operations.
- `{image_key}`: If a `redis_save_image` action has already been executed for this event, this placeholder will be replaced with the key where the image was stored.
#### `redis_save_image`
Saves the current image frame (or cropped sub-image) to a Redis key.
| Key | Type | Required | Description |
| ---------------- | ------ | -------- | ------------------------------------------------------------------------------------------------------- |
| `type` | String | Yes | Must be `"redis_save_image"`. |
| `key` | String | Yes | The Redis key to save the image to. Can contain any of the dynamic placeholders. |
| `region` | String | No | Specific detected region to crop and save (e.g., "Frontal"). |
| `format` | String | No | Image format: "jpeg" or "png". Defaults to "jpeg". |
| `quality` | Number | No | JPEG quality (1-100). Defaults to 90. |
| `expire_seconds` | Number | No | If provided, sets an expiration time (in seconds) for the Redis key. |
#### `redis_publish`
Publishes a message to a Redis channel.
| Key | Type | Required | Description |
| --------- | ------ | -------- | ------------------------------------------------------------------------------------------------------- |
| `type` | String | Yes | Must be `"redis_publish"`. |
| `channel` | String | Yes | The Redis channel to publish the message to. |
| `message` | String | Yes | The message to publish. Can contain any of the dynamic placeholders, including `{image_key}`. |
#### `postgresql_update_combined`
Updates PostgreSQL database with results from multiple branches after they complete.
| Key | Type | Required | Description |
| ------------------ | ------------- | -------- | ------------------------------------------------------------------------------------------------------- |
| `type` | String | Yes | Must be `"postgresql_update_combined"`. |
| `table` | String | Yes | The database table name (will be prefixed with `gas_station_1.` schema). |
| `key_field` | String | Yes | The field to use as the update key (typically "session_id"). |
| `key_value` | String | Yes | Template for the key value (e.g., "{session_id}"). |
| `waitForBranches` | Array<String> | Yes | List of branch model IDs to wait for completion before executing update. |
| `fields` | Object | Yes | Field mapping object where keys are database columns and values are templates (e.g., "{branch.field}").|
### Complete Example `pipeline.json`
This example demonstrates a comprehensive pipeline for vehicle detection with parallel classification and database integration:
```json
{
"redis": {
"host": "10.100.1.3",
"port": 6379,
"password": "your-redis-password",
"db": 0
},
"postgresql": {
"host": "10.100.1.3",
"port": 5432,
"database": "inference",
"username": "root",
"password": "your-db-password"
},
"pipeline": {
"modelId": "car_frontal_detection_v1",
"modelFile": "car_frontal_detection_v1.pt",
"crop": false,
"triggerClasses": ["Car", "Frontal"],
"minConfidence": 0.8,
"multiClass": true,
"expectedClasses": ["Car", "Frontal"],
"actions": [
{
"type": "redis_save_image",
"region": "Frontal",
"key": "inference:{display_id}:{timestamp}:{session_id}:{filename}",
"expire_seconds": 600,
"format": "jpeg",
"quality": 90
},
{
"type": "redis_publish",
"channel": "car_detections",
"message": "{\"event\":\"frontal_detected\"}"
}
],
"branches": [
{
"modelId": "car_brand_cls_v1",
"modelFile": "car_brand_cls_v1.pt",
"crop": true,
"cropClass": "Frontal",
"resizeTarget": [224, 224],
"triggerClasses": ["Frontal"],
"minConfidence": 0.85,
"parallel": true,
"branches": []
},
{
"modelId": "car_bodytype_cls_v1",
"modelFile": "car_bodytype_cls_v1.pt",
"crop": true,
"cropClass": "Car",
"resizeTarget": [224, 224],
"triggerClasses": ["Car"],
"minConfidence": 0.85,
"parallel": true,
"branches": []
}
],
"parallelActions": [
{
"type": "postgresql_update_combined",
"table": "car_frontal_info",
"key_field": "session_id",
"key_value": "{session_id}",
"waitForBranches": ["car_brand_cls_v1", "car_bodytype_cls_v1"],
"fields": {
"car_brand": "{car_brand_cls_v1.brand}",
"car_body_type": "{car_bodytype_cls_v1.body_type}"
}
}
]
}
}
```
## API Reference
The `pympta` module exposes two main functions.
### `load_pipeline_from_zip(zip_source: str, target_dir: str) -> dict`
Loads, extracts, and parses an `.mpta` file to build a pipeline tree in memory. It also establishes Redis and PostgreSQL connections if configured in `pipeline.json`.
- **Parameters:**
- `zip_source` (str): The file path to the local `.mpta` zip archive.
- `target_dir` (str): A directory path where the archive's contents will be extracted.
- **Returns:**
- A dictionary representing the root node of the pipeline, ready to be used with `run_pipeline`. Returns `None` if loading fails.
### `run_pipeline(frame, node: dict, return_bbox: bool = False, context: dict = None)`
Executes the inference pipeline on a single image frame.
- **Parameters:**
- `frame`: The input image frame (e.g., a NumPy array from OpenCV).
- `node` (dict): The pipeline node to execute (typically the root node returned by `load_pipeline_from_zip`).
- `return_bbox` (bool): If `True`, the function returns a tuple `(detection, bounding_box)`. Otherwise, it returns only the `detection`.
- `context` (dict): Optional context dictionary containing camera_id, display_id, session_id for action formatting.
- **Returns:**
- The final detection result from the last executed node in the chain. A detection is a dictionary like `{'class': 'car', 'confidence': 0.95, 'id': 1}`. If no detection meets the criteria, it returns `None` (or `(None, None)` if `return_bbox` is `True`).
## Database Integration
The pipeline system includes automatic PostgreSQL database management:
### Table Schema (`gas_station_1.car_frontal_info`)
The system automatically creates and manages the following table structure:
```sql
CREATE TABLE IF NOT EXISTS gas_station_1.car_frontal_info (
display_id VARCHAR(255),
captured_timestamp VARCHAR(255),
session_id VARCHAR(255) PRIMARY KEY,
license_character VARCHAR(255) DEFAULT NULL,
license_type VARCHAR(255) DEFAULT 'No model available',
car_brand VARCHAR(255) DEFAULT NULL,
car_model VARCHAR(255) DEFAULT NULL,
car_body_type VARCHAR(255) DEFAULT NULL,
created_at TIMESTAMP DEFAULT NOW(),
updated_at TIMESTAMP DEFAULT NOW()
);
```
### Workflow
1. **Initial Record Creation**: When both "Car" and "Frontal" are detected, an initial database record is created with a UUID session_id.
2. **Redis Storage**: Vehicle images are stored in Redis with keys containing the session_id.
3. **Parallel Classification**: Brand and body type classification run concurrently.
4. **Database Update**: After all branches complete, the database record is updated with classification results.
## Usage Example
This snippet shows how to use `pympta` with the enhanced features:
```python
import cv2
from siwatsystem.pympta import load_pipeline_from_zip, run_pipeline
# 1. Define paths
MPTA_FILE = "path/to/your/pipeline.mpta"
CACHE_DIR = ".mptacache"
# 2. Load the pipeline from the .mpta file
# This reads pipeline.json and loads the YOLO models into memory.
model_tree = load_pipeline_from_zip(MPTA_FILE, CACHE_DIR)
if not model_tree:
print("Failed to load pipeline.")
exit()
# 3. Open a video source
cap = cv2.VideoCapture(0)
while True:
ret, frame = cap.read()
if not ret:
break
# 4. Run the pipeline on the current frame with context
context = {
"camera_id": "display-001;cam-001",
"display_id": "display-001",
"session_id": None # Will be generated automatically
}
detection_result, bounding_box = run_pipeline(frame, model_tree, return_bbox=True, context=context)
# 5. Display the results
if detection_result:
print(f"Detected: {detection_result['class']} with confidence {detection_result['confidence']:.2f}")
if bounding_box:
x1, y1, x2, y2 = bounding_box
cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
cv2.putText(frame, detection_result['class'], (x1, y1 - 10),
cv2.FONT_HERSHEY_SIMPLEX, 0.9, (36, 255, 12), 2)
cv2.imshow("Pipeline Output", frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
cap.release()
cv2.destroyAllWindows()
```

View file

@ -1,12 +0,0 @@
ultralytics>=8.3.0
opencv-python>=4.6.0
scipy>=1.9.0
filterpy>=1.4.0
psycopg2-binary>=2.9.0
easydict
loguru
pyzmq
gitpython
gdown
lap
pynvml

View file

@ -1,5 +1,8 @@
fastapi[standard]
fastapi
uvicorn
torch
torchvision
ultralytics
opencv-python
websockets
redis
urllib3<2.0.0
fastapi[standard]

View file

@ -1,224 +0,0 @@
import psycopg2
import psycopg2.extras
from typing import Optional, Dict, Any
import logging
import uuid
logger = logging.getLogger(__name__)
class DatabaseManager:
def __init__(self, config: Dict[str, Any]):
self.config = config
self.connection: Optional[psycopg2.extensions.connection] = None
def connect(self) -> bool:
try:
self.connection = psycopg2.connect(
host=self.config['host'],
port=self.config['port'],
database=self.config['database'],
user=self.config['username'],
password=self.config['password']
)
logger.info("PostgreSQL connection established successfully")
return True
except Exception as e:
logger.error(f"Failed to connect to PostgreSQL: {e}")
return False
def disconnect(self):
if self.connection:
self.connection.close()
self.connection = None
logger.info("PostgreSQL connection closed")
def is_connected(self) -> bool:
try:
if self.connection and not self.connection.closed:
cur = self.connection.cursor()
cur.execute("SELECT 1")
cur.fetchone()
cur.close()
return True
except:
pass
return False
def update_car_info(self, session_id: str, brand: str, model: str, body_type: str) -> bool:
if not self.is_connected():
if not self.connect():
return False
try:
cur = self.connection.cursor()
query = """
INSERT INTO car_frontal_info (session_id, car_brand, car_model, car_body_type, updated_at)
VALUES (%s, %s, %s, %s, NOW())
ON CONFLICT (session_id)
DO UPDATE SET
car_brand = EXCLUDED.car_brand,
car_model = EXCLUDED.car_model,
car_body_type = EXCLUDED.car_body_type,
updated_at = NOW()
"""
cur.execute(query, (session_id, brand, model, body_type))
self.connection.commit()
cur.close()
logger.info(f"Updated car info for session {session_id}: {brand} {model} ({body_type})")
return True
except Exception as e:
logger.error(f"Failed to update car info: {e}")
if self.connection:
self.connection.rollback()
return False
def execute_update(self, table: str, key_field: str, key_value: str, fields: Dict[str, str]) -> bool:
if not self.is_connected():
if not self.connect():
return False
try:
cur = self.connection.cursor()
# Build the INSERT and UPDATE query dynamically
insert_placeholders = []
insert_values = [key_value] # Start with key_value
set_clauses = []
update_values = []
for field, value in fields.items():
if value == "NOW()":
# Special handling for NOW()
insert_placeholders.append("NOW()")
set_clauses.append(f"{field} = NOW()")
else:
insert_placeholders.append("%s")
insert_values.append(value)
set_clauses.append(f"{field} = %s")
update_values.append(value)
# Add schema prefix if table doesn't already have it
full_table_name = table if '.' in table else f"gas_station_1.{table}"
# Build the complete query
query = f"""
INSERT INTO {full_table_name} ({key_field}, {', '.join(fields.keys())})
VALUES (%s, {', '.join(insert_placeholders)})
ON CONFLICT ({key_field})
DO UPDATE SET {', '.join(set_clauses)}
"""
# Combine values for the query: insert_values + update_values
all_values = insert_values + update_values
logger.debug(f"SQL Query: {query}")
logger.debug(f"Values: {all_values}")
cur.execute(query, all_values)
self.connection.commit()
cur.close()
logger.info(f"✅ Updated {table} for {key_field}={key_value} with fields: {fields}")
return True
except Exception as e:
logger.error(f"❌ Failed to execute update on {table}: {e}")
logger.debug(f"Query: {query if 'query' in locals() else 'Query not built'}")
logger.debug(f"Values: {all_values if 'all_values' in locals() else 'Values not prepared'}")
if self.connection:
self.connection.rollback()
return False
def create_car_frontal_info_table(self) -> bool:
"""Create the car_frontal_info table in gas_station_1 schema if it doesn't exist."""
if not self.is_connected():
if not self.connect():
return False
try:
cur = self.connection.cursor()
# Create schema if it doesn't exist
cur.execute("CREATE SCHEMA IF NOT EXISTS gas_station_1")
# Create table if it doesn't exist
create_table_query = """
CREATE TABLE IF NOT EXISTS gas_station_1.car_frontal_info (
display_id VARCHAR(255),
captured_timestamp VARCHAR(255),
session_id VARCHAR(255) PRIMARY KEY,
license_character VARCHAR(255) DEFAULT NULL,
license_type VARCHAR(255) DEFAULT 'No model available',
car_brand VARCHAR(255) DEFAULT NULL,
car_model VARCHAR(255) DEFAULT NULL,
car_body_type VARCHAR(255) DEFAULT NULL,
updated_at TIMESTAMP DEFAULT NOW()
)
"""
cur.execute(create_table_query)
# Add columns if they don't exist (for existing tables)
alter_queries = [
"ALTER TABLE gas_station_1.car_frontal_info ADD COLUMN IF NOT EXISTS car_brand VARCHAR(255) DEFAULT NULL",
"ALTER TABLE gas_station_1.car_frontal_info ADD COLUMN IF NOT EXISTS car_model VARCHAR(255) DEFAULT NULL",
"ALTER TABLE gas_station_1.car_frontal_info ADD COLUMN IF NOT EXISTS car_body_type VARCHAR(255) DEFAULT NULL",
"ALTER TABLE gas_station_1.car_frontal_info ADD COLUMN IF NOT EXISTS updated_at TIMESTAMP DEFAULT NOW()"
]
for alter_query in alter_queries:
try:
cur.execute(alter_query)
logger.debug(f"Executed: {alter_query}")
except Exception as e:
# Ignore errors if column already exists (for older PostgreSQL versions)
if "already exists" in str(e).lower():
logger.debug(f"Column already exists, skipping: {alter_query}")
else:
logger.warning(f"Error in ALTER TABLE: {e}")
self.connection.commit()
cur.close()
logger.info("Successfully created/verified car_frontal_info table with all required columns")
return True
except Exception as e:
logger.error(f"Failed to create car_frontal_info table: {e}")
if self.connection:
self.connection.rollback()
return False
def insert_initial_detection(self, display_id: str, captured_timestamp: str, session_id: str = None) -> str:
"""Insert initial detection record and return the session_id."""
if not self.is_connected():
if not self.connect():
return None
# Generate session_id if not provided
if not session_id:
session_id = str(uuid.uuid4())
try:
# Ensure table exists
if not self.create_car_frontal_info_table():
logger.error("Failed to create/verify table before insertion")
return None
cur = self.connection.cursor()
insert_query = """
INSERT INTO gas_station_1.car_frontal_info
(display_id, captured_timestamp, session_id, license_character, license_type, car_brand, car_model, car_body_type)
VALUES (%s, %s, %s, NULL, 'No model available', NULL, NULL, NULL)
ON CONFLICT (session_id) DO NOTHING
"""
cur.execute(insert_query, (display_id, captured_timestamp, session_id))
self.connection.commit()
cur.close()
logger.info(f"Inserted initial detection record with session_id: {session_id}")
return session_id
except Exception as e:
logger.error(f"Failed to insert initial detection record: {e}")
if self.connection:
self.connection.rollback()
return None

View file

@ -1,242 +0,0 @@
"""
Shared Model Registry for Memory Optimization
This module implements a global shared model registry to prevent duplicate model loading
in memory when multiple cameras use the same model. This significantly reduces RAM and
GPU VRAM usage by ensuring only one instance of each unique model is loaded.
Key Features:
- Thread-safe model loading and access
- Reference counting for proper cleanup
- Automatic model lifecycle management
- Maintains compatibility with existing pipeline system
"""
import os
import threading
import logging
from typing import Dict, Any, Optional, Set
import torch
from ultralytics import YOLO
# Create a logger for this module
logger = logging.getLogger("detector_worker.model_registry")
class ModelRegistry:
"""
Singleton class for managing shared YOLO models across multiple cameras.
This registry ensures that each unique model is loaded only once in memory,
dramatically reducing RAM and GPU VRAM usage when multiple cameras use the
same model.
"""
_instance = None
_lock = threading.Lock()
def __new__(cls):
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = super(ModelRegistry, cls).__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self):
if self._initialized:
return
self._initialized = True
# Thread-safe storage for loaded models
self._models: Dict[str, YOLO] = {} # modelId -> YOLO model instance
self._model_files: Dict[str, str] = {} # modelId -> file path
self._reference_counts: Dict[str, int] = {} # modelId -> reference count
self._model_lock = threading.RLock() # Reentrant lock for nested calls
logger.info("🏭 Shared Model Registry initialized - ready for memory-optimized model loading")
def get_model(self, model_id: str, model_file_path: str) -> YOLO:
"""
Get or load a YOLO model. Returns shared instance if already loaded.
Args:
model_id: Unique identifier for the model
model_file_path: Path to the model file
Returns:
YOLO model instance (shared across all callers)
"""
with self._model_lock:
if model_id in self._models:
# Model already loaded - increment reference count and return
self._reference_counts[model_id] += 1
logger.info(f"📖 Model '{model_id}' reused (ref_count: {self._reference_counts[model_id]}) - SAVED MEMORY!")
return self._models[model_id]
# Model not loaded yet - load it
logger.info(f"🔄 Loading NEW model '{model_id}' from {model_file_path}")
if not os.path.exists(model_file_path):
raise FileNotFoundError(f"Model file {model_file_path} not found")
try:
# Load the YOLO model
model = YOLO(model_file_path)
# Move to GPU if available
if torch.cuda.is_available():
logger.info(f"🚀 CUDA available. Moving model '{model_id}' to GPU VRAM")
model.to("cuda")
else:
logger.info(f"💻 CUDA not available. Using CPU for model '{model_id}'")
# Store in registry
self._models[model_id] = model
self._model_files[model_id] = model_file_path
self._reference_counts[model_id] = 1
logger.info(f"✅ Model '{model_id}' loaded and registered (ref_count: 1)")
self._log_registry_status()
return model
except Exception as e:
logger.error(f"❌ Failed to load model '{model_id}' from {model_file_path}: {e}")
raise
def release_model(self, model_id: str) -> None:
"""
Release a reference to a model. If reference count reaches zero,
the model may be unloaded to free memory.
Args:
model_id: Unique identifier for the model to release
"""
with self._model_lock:
if model_id not in self._reference_counts:
logger.warning(f"⚠️ Attempted to release unknown model '{model_id}'")
return
self._reference_counts[model_id] -= 1
logger.info(f"📉 Model '{model_id}' reference count decreased to {self._reference_counts[model_id]}")
# For now, keep models in memory even when ref count reaches 0
# This prevents reload overhead if the same model is needed again soon
# In the future, we could implement LRU eviction policy
# if self._reference_counts[model_id] <= 0:
# logger.info(f"💤 Model '{model_id}' has 0 references but keeping in memory for reuse")
# Optionally: self._unload_model(model_id)
def _unload_model(self, model_id: str) -> None:
"""
Internal method to unload a model from memory.
Currently not used to prevent reload overhead.
"""
with self._model_lock:
if model_id in self._models:
logger.info(f"🗑️ Unloading model '{model_id}' from memory")
# Clear GPU memory if model was on GPU
model = self._models[model_id]
if hasattr(model, 'model') and hasattr(model.model, 'cuda'):
try:
# Move model to CPU before deletion to free GPU memory
model.to('cpu')
except Exception as e:
logger.warning(f"⚠️ Failed to move model '{model_id}' to CPU: {e}")
# Remove from registry
del self._models[model_id]
del self._model_files[model_id]
del self._reference_counts[model_id]
# Force garbage collection
import gc
gc.collect()
if torch.cuda.is_available():
torch.cuda.empty_cache()
logger.info(f"✅ Model '{model_id}' unloaded and memory freed")
self._log_registry_status()
def get_registry_status(self) -> Dict[str, Any]:
"""
Get current status of the model registry.
Returns:
Dictionary with registry statistics
"""
with self._model_lock:
return {
"total_models": len(self._models),
"models": {
model_id: {
"file_path": self._model_files[model_id],
"reference_count": self._reference_counts[model_id]
}
for model_id in self._models
},
"total_references": sum(self._reference_counts.values())
}
def _log_registry_status(self) -> None:
"""Log current registry status for debugging."""
status = self.get_registry_status()
logger.info(f"📊 Model Registry Status: {status['total_models']} unique models, {status['total_references']} total references")
for model_id, info in status['models'].items():
logger.debug(f" 📋 '{model_id}': refs={info['reference_count']}, file={os.path.basename(info['file_path'])}")
def cleanup_all(self) -> None:
"""
Clean up all models from the registry. Used during shutdown.
"""
with self._model_lock:
model_ids = list(self._models.keys())
logger.info(f"🧹 Cleaning up {len(model_ids)} models from registry")
for model_id in model_ids:
self._unload_model(model_id)
logger.info("✅ Model registry cleanup complete")
# Global singleton instance
_registry = ModelRegistry()
def get_shared_model(model_id: str, model_file_path: str) -> YOLO:
"""
Convenience function to get a shared model instance.
Args:
model_id: Unique identifier for the model
model_file_path: Path to the model file
Returns:
YOLO model instance (shared across all callers)
"""
return _registry.get_model(model_id, model_file_path)
def release_shared_model(model_id: str) -> None:
"""
Convenience function to release a shared model reference.
Args:
model_id: Unique identifier for the model to release
"""
_registry.release_model(model_id)
def get_registry_status() -> Dict[str, Any]:
"""
Convenience function to get registry status.
Returns:
Dictionary with registry statistics
"""
return _registry.get_registry_status()
def cleanup_registry() -> None:
"""
Convenience function to cleanup the entire registry.
"""
_registry.cleanup_all()

View file

@ -1,375 +0,0 @@
"""
Shared MPTA Manager for Disk Space Optimization
This module implements shared MPTA file management to prevent duplicate downloads
and extractions when multiple cameras use the same model. MPTA files are stored
in modelId-based directories and shared across all cameras using that model.
Key Features:
- Thread-safe MPTA downloading and extraction
- ModelId-based directory structure: models/{modelId}/
- Reference counting for proper cleanup
- Eliminates duplicate MPTA downloads
- Maintains compatibility with existing pipeline system
"""
import os
import threading
import logging
import shutil
import requests
from typing import Dict, Set, Optional
from urllib.parse import urlparse
from .pympta import load_pipeline_from_zip
# Create a logger for this module
logger = logging.getLogger("detector_worker.mpta_manager")
class MPTAManager:
"""
Singleton class for managing shared MPTA files across multiple cameras.
This manager ensures that each unique modelId is downloaded and extracted
only once, dramatically reducing disk usage and download time when multiple
cameras use the same model.
"""
_instance = None
_lock = threading.Lock()
def __new__(cls):
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = super(MPTAManager, cls).__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self):
if self._initialized:
return
self._initialized = True
# Thread-safe storage for MPTA management
self._model_paths: Dict[int, str] = {} # modelId -> shared_extraction_path
self._mpta_file_paths: Dict[int, str] = {} # modelId -> local_mpta_file_path
self._reference_counts: Dict[int, int] = {} # modelId -> reference count
self._download_locks: Dict[int, threading.Lock] = {} # modelId -> download lock
self._cameras_using_model: Dict[int, Set[str]] = {} # modelId -> set of camera_ids
self._manager_lock = threading.RLock() # Reentrant lock for nested calls
logger.info("🏭 Shared MPTA Manager initialized - ready for disk-optimized MPTA management")
def get_or_download_mpta(self, model_id: int, model_url: str, camera_id: str) -> Optional[tuple[str, str]]:
"""
Get or download an MPTA file. Returns (extraction_path, mpta_file_path) if successful.
Args:
model_id: Unique identifier for the model
model_url: URL to download the MPTA file from
camera_id: Identifier for the requesting camera
Returns:
Tuple of (extraction_path, mpta_file_path), or None if failed
"""
with self._manager_lock:
# Track camera usage
if model_id not in self._cameras_using_model:
self._cameras_using_model[model_id] = set()
self._cameras_using_model[model_id].add(camera_id)
# Check if model directory already exists on disk (from previous sessions)
if model_id not in self._model_paths:
potential_path = f"models/{model_id}"
if os.path.exists(potential_path) and os.path.isdir(potential_path):
# Directory exists from previous session, find the MPTA file
mpta_files = [f for f in os.listdir(potential_path) if f.endswith('.mpta')]
if mpta_files:
# Use the first .mpta file found
mpta_file_path = os.path.join(potential_path, mpta_files[0])
self._model_paths[model_id] = potential_path
self._mpta_file_paths[model_id] = mpta_file_path
self._reference_counts[model_id] = 0 # Will be incremented below
logger.info(f"📂 Found existing MPTA modelId {model_id} from previous session")
# Check if already available
if model_id in self._model_paths:
shared_path = self._model_paths[model_id]
mpta_file_path = self._mpta_file_paths.get(model_id)
if os.path.exists(shared_path) and mpta_file_path and os.path.exists(mpta_file_path):
self._reference_counts[model_id] += 1
logger.info(f"📂 MPTA modelId {model_id} reused for camera {camera_id} (ref_count: {self._reference_counts[model_id]}) - SAVED DOWNLOAD!")
return (shared_path, mpta_file_path)
else:
# Path was deleted externally, clean up our records
logger.warning(f"⚠️ MPTA path for modelId {model_id} was deleted externally, will re-download")
del self._model_paths[model_id]
self._mpta_file_paths.pop(model_id, None)
self._reference_counts.pop(model_id, 0)
# Need to download - get or create download lock for this modelId
if model_id not in self._download_locks:
self._download_locks[model_id] = threading.Lock()
# Download with model-specific lock (released _manager_lock to allow other models)
download_lock = self._download_locks[model_id]
with download_lock:
# Double-check after acquiring download lock
with self._manager_lock:
if model_id in self._model_paths and os.path.exists(self._model_paths[model_id]):
mpta_file_path = self._mpta_file_paths.get(model_id)
if mpta_file_path and os.path.exists(mpta_file_path):
self._reference_counts[model_id] += 1
logger.info(f"📂 MPTA modelId {model_id} became available during wait (ref_count: {self._reference_counts[model_id]})")
return (self._model_paths[model_id], mpta_file_path)
# Actually download and extract
shared_path = f"models/{model_id}"
logger.info(f"🔄 Downloading NEW MPTA for modelId {model_id} from {model_url}")
try:
# Ensure directory exists
os.makedirs(shared_path, exist_ok=True)
# Download MPTA file
mpta_filename = self._extract_filename_from_url(model_url) or f"model_{model_id}.mpta"
local_mpta_path = os.path.join(shared_path, mpta_filename)
if not self._download_file(model_url, local_mpta_path):
logger.error(f"❌ Failed to download MPTA for modelId {model_id}")
return None
# Extract MPTA
pipeline_tree = load_pipeline_from_zip(local_mpta_path, shared_path)
if pipeline_tree is None:
logger.error(f"❌ Failed to extract MPTA for modelId {model_id}")
return None
# Success - register in manager
with self._manager_lock:
self._model_paths[model_id] = shared_path
self._mpta_file_paths[model_id] = local_mpta_path
self._reference_counts[model_id] = 1
logger.info(f"✅ MPTA modelId {model_id} downloaded and registered (ref_count: 1)")
self._log_manager_status()
return (shared_path, local_mpta_path)
except Exception as e:
logger.error(f"❌ Error downloading/extracting MPTA for modelId {model_id}: {e}")
# Clean up partial download
if os.path.exists(shared_path):
shutil.rmtree(shared_path, ignore_errors=True)
return None
def release_mpta(self, model_id: int, camera_id: str) -> None:
"""
Release a reference to an MPTA. If reference count reaches zero,
the MPTA directory may be cleaned up to free disk space.
Args:
model_id: Unique identifier for the model to release
camera_id: Identifier for the camera releasing the reference
"""
with self._manager_lock:
if model_id not in self._reference_counts:
logger.warning(f"⚠️ Attempted to release unknown MPTA modelId {model_id} for camera {camera_id}")
return
# Remove camera from usage tracking
if model_id in self._cameras_using_model:
self._cameras_using_model[model_id].discard(camera_id)
self._reference_counts[model_id] -= 1
logger.info(f"📉 MPTA modelId {model_id} reference count decreased to {self._reference_counts[model_id]} (released by {camera_id})")
# Clean up if no more references
# if self._reference_counts[model_id] <= 0:
# self._cleanup_mpta(model_id)
def _cleanup_mpta(self, model_id: int) -> None:
"""
Internal method to clean up an MPTA directory and free disk space.
"""
if model_id in self._model_paths:
shared_path = self._model_paths[model_id]
try:
if os.path.exists(shared_path):
shutil.rmtree(shared_path)
logger.info(f"🗑️ Cleaned up MPTA directory: {shared_path}")
# Remove from tracking
del self._model_paths[model_id]
self._mpta_file_paths.pop(model_id, None)
del self._reference_counts[model_id]
self._cameras_using_model.pop(model_id, None)
# Clean up download lock (optional, could keep for future use)
self._download_locks.pop(model_id, None)
logger.info(f"✅ MPTA modelId {model_id} fully cleaned up and disk space freed")
self._log_manager_status()
except Exception as e:
logger.error(f"❌ Error cleaning up MPTA modelId {model_id}: {e}")
def get_shared_path(self, model_id: int) -> Optional[str]:
"""
Get the shared extraction path for a modelId without downloading.
Args:
model_id: Model identifier to look up
Returns:
Shared path if available, None otherwise
"""
with self._manager_lock:
return self._model_paths.get(model_id)
def get_manager_status(self) -> Dict:
"""
Get current status of the MPTA manager.
Returns:
Dictionary with manager statistics
"""
with self._manager_lock:
return {
"total_mpta_models": len(self._model_paths),
"models": {
str(model_id): {
"shared_path": path,
"reference_count": self._reference_counts.get(model_id, 0),
"cameras_using": list(self._cameras_using_model.get(model_id, set()))
}
for model_id, path in self._model_paths.items()
},
"total_references": sum(self._reference_counts.values()),
"active_downloads": len(self._download_locks)
}
def _log_manager_status(self) -> None:
"""Log current manager status for debugging."""
status = self.get_manager_status()
logger.info(f"📊 MPTA Manager Status: {status['total_mpta_models']} unique models, {status['total_references']} total references")
for model_id, info in status['models'].items():
cameras_str = ','.join(info['cameras_using'][:3]) # Show first 3 cameras
if len(info['cameras_using']) > 3:
cameras_str += f"+{len(info['cameras_using'])-3} more"
logger.debug(f" 📋 ModelId {model_id}: refs={info['reference_count']}, cameras=[{cameras_str}]")
def cleanup_all(self) -> None:
"""
Clean up all MPTA directories. Used during shutdown.
"""
with self._manager_lock:
model_ids = list(self._model_paths.keys())
logger.info(f"🧹 Cleaning up {len(model_ids)} MPTA directories")
for model_id in model_ids:
self._cleanup_mpta(model_id)
# Clear all tracking data
self._download_locks.clear()
logger.info("✅ MPTA manager cleanup complete")
def _download_file(self, url: str, local_path: str) -> bool:
"""
Download a file from URL to local path with progress logging.
Args:
url: URL to download from
local_path: Local path to save to
Returns:
True if successful, False otherwise
"""
try:
logger.info(f"⬇️ Starting download from {url}")
response = requests.get(url, stream=True)
response.raise_for_status()
total_size = int(response.headers.get('content-length', 0))
if total_size > 0:
logger.info(f"📦 File size: {total_size / 1024 / 1024:.2f} MB")
downloaded = 0
last_logged_progress = 0
with open(local_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
downloaded += len(chunk)
if total_size > 0:
progress = int((downloaded / total_size) * 100)
# Log at 10% intervals (10%, 20%, 30%, etc.)
if progress >= last_logged_progress + 10 and progress <= 100:
logger.debug(f"Download progress: {progress}%")
last_logged_progress = progress
logger.info(f"✅ Successfully downloaded to {local_path}")
return True
except Exception as e:
logger.error(f"❌ Download failed: {e}")
# Clean up partial file
if os.path.exists(local_path):
os.remove(local_path)
return False
def _extract_filename_from_url(self, url: str) -> Optional[str]:
"""Extract filename from URL."""
try:
parsed = urlparse(url)
filename = os.path.basename(parsed.path)
return filename if filename else None
except Exception:
return None
# Global singleton instance
_mpta_manager = MPTAManager()
def get_or_download_mpta(model_id: int, model_url: str, camera_id: str) -> Optional[tuple[str, str]]:
"""
Convenience function to get or download a shared MPTA.
Args:
model_id: Unique identifier for the model
model_url: URL to download the MPTA file from
camera_id: Identifier for the requesting camera
Returns:
Tuple of (extraction_path, mpta_file_path), or None if failed
"""
return _mpta_manager.get_or_download_mpta(model_id, model_url, camera_id)
def release_mpta(model_id: int, camera_id: str) -> None:
"""
Convenience function to release a shared MPTA reference.
Args:
model_id: Unique identifier for the model to release
camera_id: Identifier for the camera releasing the reference
"""
_mpta_manager.release_mpta(model_id, camera_id)
def get_mpta_manager_status() -> Dict:
"""
Convenience function to get MPTA manager status.
Returns:
Dictionary with manager statistics
"""
return _mpta_manager.get_manager_status()
def cleanup_mpta_manager() -> None:
"""
Convenience function to cleanup the entire MPTA manager.
"""
_mpta_manager.cleanup_all()

File diff suppressed because it is too large Load diff

Binary file not shown.

Before

Width:  |  Height:  |  Size: 2.8 MiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 3.1 MiB

View file

@ -1,60 +0,0 @@
from ultralytics import YOLO
import cv2
import os
# Load the model
# model = YOLO('../models/webcam-local-01/4/bangchak_poc/yolo11n.pt')
model = YOLO('yolo11m.pt')
def test_image(image_path):
"""Test a single image with YOLO model"""
if not os.path.exists(image_path):
print(f"Image not found: {image_path}")
return
# Run inference - filter for car class only (class 2 in COCO)
results = model(image_path, classes=[2, 5, 7]) # 2, 5, 7 = car, bus, truck in COCO dataset
# Display results
for r in results:
im_array = r.plot() # plot a BGR numpy array of predictions
# Resize image for display (max width/height 800px)
height, width = im_array.shape[:2]
max_dimension = 800
if width > max_dimension or height > max_dimension:
if width > height:
new_width = max_dimension
new_height = int(height * (max_dimension / width))
else:
new_height = max_dimension
new_width = int(width * (max_dimension / height))
im_array = cv2.resize(im_array, (new_width, new_height))
# Show image with predictions
cv2.imshow('YOLO Test - Car Detection Only', im_array)
cv2.waitKey(0)
cv2.destroyAllWindows()
# Print detection info
print(f"\nDetections for {image_path}:")
if r.boxes is not None and len(r.boxes) > 0:
for i, box in enumerate(r.boxes):
cls = int(box.cls[0])
conf = float(box.conf[0])
original_class = model.names[cls] # Original class name (car/bus/truck)
# Get bounding box coordinates
x1, y1, x2, y2 = box.xyxy[0].tolist()
# Rename all vehicle types to "car"
print(f"Detection {i+1}: car (was: {original_class}) - Confidence: {conf:.3f} - BBox: ({x1:.0f}, {y1:.0f}, {x2:.0f}, {y2:.0f})")
print(f"Total cars detected: {len(r.boxes)}")
else:
print("No cars detected in the image")
if __name__ == "__main__":
# Test with an image file
image_path = input("Enter image path (or press Enter for default test): ")
if not image_path:
image_path = "sample.png" # Default test image
test_image(image_path)

View file

@ -1,352 +0,0 @@
import cv2
import torch
import numpy as np
import time
from collections import defaultdict
from ultralytics import YOLO
def point_in_polygon(point, polygon):
"""Check if a point is inside a polygon using ray casting algorithm"""
x, y = point
n = len(polygon)
inside = False
p1x, p1y = polygon[0]
for i in range(1, n + 1):
p2x, p2y = polygon[i % n]
if y > min(p1y, p2y):
if y <= max(p1y, p2y):
if x <= max(p1x, p2x):
if p1y != p2y:
xinters = (y - p1y) * (p2x - p1x) / (p2y - p1y) + p1x
if p1x == p2x or x <= xinters:
inside = not inside
p1x, p1y = p2x, p2y
return inside
def draw_zone(frame, zone_polygon, color=(255, 0, 0), thickness=3):
"""Draw tracking zone on frame"""
pts = np.array(zone_polygon, np.int32)
pts = pts.reshape((-1, 1, 2))
cv2.polylines(frame, [pts], True, color, thickness)
# Add semi-transparent fill
overlay = frame.copy()
cv2.fillPoly(overlay, [pts], color)
cv2.addWeighted(overlay, 0.2, frame, 0.8, 0, frame)
def setup_video_writer(output_path, fps, width, height):
"""Setup video writer for output"""
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
return cv2.VideoWriter(output_path, fourcc, fps, (width, height))
def write_frame_to_video(video_writer, frame, repeat_count):
"""Write frame to video with specified repeat count"""
for _ in range(repeat_count):
video_writer.write(frame)
def finalize_video(video_writer):
"""Release video writer"""
video_writer.release()
def main():
video_path = "sample2.mp4"
yolo_model = "bangchakv2/yolov8n.pt"
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")
print("Loading YOLO model...")
model = YOLO(yolo_model)
print("Opening video...")
cap = cv2.VideoCapture(video_path)
fps = int(cap.get(cv2.CAP_PROP_FPS))
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
print(f"Video info: {width}x{height}, {fps} FPS, {total_frames} frames")
# Define tracking zone - Gas station floor area (trapezoidal shape)
# Based on the perspective of the gas station floor from your image
# width 2560, height 1440
tracking_zone = [
(423, 974), # Point 1
(1540, 1407), # Point 2
(1976, 806), # Point 3
(1364, 749) # Point 4
]
print(f"🎯 Tracking zone defined: {tracking_zone}")
# CONTINUOUS TRACKING: Process every 118 frames (~2.0s intervals)
frame_skip = 118
print(f"🎯 CONTINUOUS MODE: Processing every {frame_skip} frames ({frame_skip/fps:.2f}s intervals)")
print(f"🎬 Output video will have same duration as input (each processed frame shown for 2 seconds)")
print("🔥 ZONE-FIRST TRACKING: Only cars entering the zone will be tracked!")
print("Requires 5 consecutive detections IN ZONE for verification")
print("🕐 24/7 MODE: Memory reset every hour to prevent overflow")
print("Press 'q' to quit")
# Setup video writer for output (same fps as input for normal playback speed)
output_path = "tracking_output_botsort_zone_track.mp4"
output_fps = fps # Use same fps as input video
out = setup_video_writer(output_path, output_fps, width, height)
# Track car IDs and their consecutive detections
car_id_counts = defaultdict(int)
successful_cars = set()
last_positions = {}
processed_count = 0
# ID remapping for clean sequential zone IDs
tracker_to_zone_id = {} # Maps tracker IDs to clean zone IDs
next_zone_id = 1 # Next clean zone ID to assign
# Store previous frame detections to filter tracking inputs
previous_zone_cars = set()
# 24/7 operation: Reset every hour (1800 snapshots at 2-sec intervals = 1 hour)
RESET_INTERVAL = 1800 # Reset every 1800 processed frames (1 hour)
frame_idx = 0
while True:
# Skip frames to maintain interval
for _ in range(frame_skip):
ret, frame = cap.read()
if not ret:
print("\nNo more frames to read")
cap.release()
cv2.destroyAllWindows()
return
frame_idx += 1
processed_count += 1
current_time = frame_idx / fps
print(f"\n🎬 Frame {frame_idx} at {current_time:.2f}s (processed #{processed_count})")
# 24/7 Memory Management: Reset every hour
if processed_count % RESET_INTERVAL == 0:
print(f"🕐 HOURLY RESET: Clearing all tracking data (processed {processed_count} frames)")
print(f" 📊 Before reset: {len(tracker_to_zone_id)} tracked cars, next Zone ID was {next_zone_id}")
# Clear all tracking data
tracker_to_zone_id.clear()
car_id_counts.clear()
successful_cars.clear()
last_positions.clear()
next_zone_id = 1 # Reset to 1
# Reset BoT-SORT tracker state
try:
model.reset()
print(f" ✅ BoT-SORT tracker reset successfully")
except:
print(f" ⚠️ BoT-SORT reset not available (continuing without reset)")
print(f" 🆕 Zone IDs will start from 1 again")
# Draw tracking zone on frame
draw_zone(frame, tracking_zone, color=(0, 255, 255), thickness=3) # Yellow zone
# First run YOLO detection (without tracking) to find cars in zone
detection_results = model(frame, verbose=False, conf=0.7, classes=[2])
# Find cars currently in the tracking zone
current_zone_cars = []
total_detections = 0
if detection_results[0].boxes is not None:
boxes = detection_results[0].boxes.xyxy.cpu()
scores = detection_results[0].boxes.conf.cpu()
total_detections = len(boxes)
print(f" 🔍 Total car detections: {total_detections}")
for i in range(len(boxes)):
x1, y1, x2, y2 = boxes[i]
conf = float(scores[i])
# Check if detection is in zone (using bottom center)
box_bottom = ((x1 + x2) / 2, y2)
if point_in_polygon(box_bottom, tracking_zone):
current_zone_cars.append({
'bbox': [float(x1), float(y1), float(x2), float(y2)],
'conf': conf,
'center': ((x1 + x2) / 2, (y1 + y2) / 2),
'bottom': box_bottom
})
print(f" 🎯 Cars in zone: {len(current_zone_cars)}")
# Only run tracking if there are cars in the zone
detected_car_ids = set()
if current_zone_cars:
# Run tracking on the full frame (let tracker handle associations)
# But we'll filter results to only zone cars afterward
results = model.track(
frame,
persist=True,
verbose=False,
conf=0.7,
classes=[2],
tracker="botsort_reid.yaml"
)
if results[0].boxes is not None and results[0].boxes.id is not None:
boxes = results[0].boxes.xyxy.cpu()
scores = results[0].boxes.conf.cpu()
track_ids = results[0].boxes.id.cpu().int()
print(f" 📊 Total tracked objects: {len(track_ids)}")
# Filter tracked objects to only those in zone
zone_tracks = []
for i, track_id in enumerate(track_ids):
x1, y1, x2, y2 = boxes[i]
conf = float(scores[i])
# Check if this tracked object is in our zone
box_bottom = ((x1 + x2) / 2, y2)
if point_in_polygon(box_bottom, tracking_zone):
zone_tracks.append({
'id': int(track_id),
'bbox': [int(x1), int(y1), int(x2), int(y2)],
'conf': conf,
'center': ((x1 + x2) / 2, (y1 + y2) / 2),
'bottom': box_bottom
})
print(f" ✅ Zone tracks: {len(zone_tracks)}")
# Process each zone track
for track in zone_tracks:
tracker_id = track['id'] # Original tracker ID
x1, y1, x2, y2 = track['bbox']
conf = track['conf']
box_center = track['center']
# Map tracker ID to clean zone ID
if tracker_id not in tracker_to_zone_id:
tracker_to_zone_id[tracker_id] = next_zone_id
print(f" 🆕 New car: Tracker ID {tracker_id} → Zone ID {next_zone_id}")
next_zone_id += 1
zone_id = tracker_to_zone_id[tracker_id] # Clean sequential ID
# Validate track continuity (use tracker_id for internal logic)
is_valid = True
# Check for suspicious jumps
if tracker_id in last_positions:
last_center = last_positions[tracker_id]
distance = np.sqrt((box_center[0] - last_center[0])**2 +
(box_center[1] - last_center[1])**2)
if distance > 400: # pixels in ~2.0s
is_valid = False
print(f" ⚠️ Zone ID {zone_id} (Tracker {tracker_id}): suspicious jump {distance:.0f}px")
# Skip already successful cars (use zone_id for user logic)
if zone_id in successful_cars:
is_valid = False
print(f" ✅ Zone ID {zone_id}: already successful, skipping")
# Only process valid, high-confidence zone tracks
if is_valid and conf > 0.7:
detected_car_ids.add(zone_id) # Use zone_id for display
car_id_counts[zone_id] += 1
last_positions[tracker_id] = box_center # Track by tracker_id internally
# Draw tracking results with clean zone ID
zone_color = (0, 255, 0) # Green for zone cars
cv2.rectangle(frame, (x1, y1), (x2, y2), zone_color, 2)
cv2.putText(frame, f'ZONE ID:{zone_id}',
(x1, y1-30), cv2.FONT_HERSHEY_SIMPLEX, 0.6, zone_color, 2)
cv2.putText(frame, f'#{car_id_counts[zone_id]} {conf:.2f}',
(x1, y1-10), cv2.FONT_HERSHEY_SIMPLEX, 0.6, zone_color, 2)
# Draw center point
cv2.circle(frame, (int(track['bottom'][0]), int(track['bottom'][1])), 5, zone_color, -1)
print(f" ✅ Zone ID {zone_id} (Tracker {tracker_id}): ZONE detection #{car_id_counts[zone_id]} (conf: {conf:.2f})")
# Check for success (5 consecutive detections IN ZONE)
if car_id_counts[zone_id] == 5:
print(f"🏆 SUCCESS: Zone ID {zone_id} achieved 5 continuous ZONE detections - TRIGGER NEXT MODEL!")
successful_cars.add(zone_id)
# Add success indicator to frame
cv2.putText(frame, f"SUCCESS: Zone Car {zone_id}!",
(50, height-50), cv2.FONT_HERSHEY_SIMPLEX, 1.0, (0, 255, 0), 3)
else:
print(" 📋 No cars in zone - no tracking performed")
# Draw any cars outside the zone in red (for reference)
if detection_results[0].boxes is not None:
boxes = detection_results[0].boxes.xyxy.cpu()
scores = detection_results[0].boxes.conf.cpu()
for i in range(len(boxes)):
x1, y1, x2, y2 = boxes[i]
conf = float(scores[i])
box_bottom = ((x1 + x2) / 2, y2)
if not point_in_polygon(box_bottom, tracking_zone):
# Draw cars outside zone in red (not tracked)
x1, y1, x2, y2 = int(x1), int(y1), int(x2), int(y2)
cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 0, 255), 1)
cv2.putText(frame, f'OUT {conf:.2f}',
(x1, y1-10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1)
# Display results
if detected_car_ids:
print(f" 📋 Active Zone IDs: {sorted(detected_car_ids)} (Clean sequential IDs)")
# Show ID mapping for debugging
if tracker_to_zone_id:
mapping_str = ", ".join([f"Tracker{k}→Zone{v}" for k, v in tracker_to_zone_id.items()])
print(f" 🔄 ID Mapping: {mapping_str}")
# Add annotations to frame
cv2.putText(frame, f"BoT-SORT Zone-First Tracking | Frame: {frame_idx} | {current_time:.2f}s",
(10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255, 255, 255), 2)
cv2.putText(frame, f"Zone Cars: {len(current_zone_cars)} | Active Tracks: {len(detected_car_ids)}",
(10, 65), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2)
cv2.putText(frame, f"Successful Cars: {len(successful_cars)}",
(10, 100), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)
cv2.putText(frame, "TRACKING ZONE",
(tracking_zone[0][0], tracking_zone[0][1]-10), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 255), 2)
# Write annotated frame to output video (repeat for 2 seconds duration)
write_frame_to_video(out, frame, frame_skip)
# Show video with zone tracking info
display_frame = cv2.resize(frame, (960, 540))
cv2.imshow('BoT-SORT Zone-First Tracking', display_frame)
# Quick check for quit
key = cv2.waitKey(1) & 0xFF
if key == ord('q'):
break
# Small delay to see results
time.sleep(0.1)
cap.release()
finalize_video(out)
cv2.destroyAllWindows()
print(f"\n🎯 BoT-SORT zone-first tracking completed!")
print(f"📊 Processed {processed_count} frames with {frame_skip/fps:.2f}s intervals")
print(f"🏆 Successfully tracked {len(successful_cars)} unique cars IN ZONE")
print(f"💾 Annotated video saved to: {output_path}")
if __name__ == "__main__":
main()

View file

@ -1,125 +0,0 @@
#!/usr/bin/env python3
"""
Test script to verify the worker implementation follows the protocol
"""
import json
import asyncio
import websockets
import time
async def test_protocol():
"""Test the worker protocol implementation"""
uri = "ws://localhost:8000"
try:
async with websockets.connect(uri) as websocket:
print("✓ Connected to worker")
# Test 1: Check if we receive heartbeat (stateReport)
print("\n1. Testing heartbeat...")
try:
message = await asyncio.wait_for(websocket.recv(), timeout=5)
data = json.loads(message)
if data.get("type") == "stateReport":
print("✓ Received stateReport heartbeat")
print(f" - CPU Usage: {data.get('cpuUsage', 'N/A')}%")
print(f" - Memory Usage: {data.get('memoryUsage', 'N/A')}%")
print(f" - Camera Connections: {len(data.get('cameraConnections', []))}")
else:
print(f"✗ Expected stateReport, got {data.get('type')}")
except asyncio.TimeoutError:
print("✗ No heartbeat received within 5 seconds")
# Test 2: Request state
print("\n2. Testing requestState...")
await websocket.send(json.dumps({"type": "requestState"}))
try:
message = await asyncio.wait_for(websocket.recv(), timeout=5)
data = json.loads(message)
if data.get("type") == "stateReport":
print("✓ Received stateReport response")
else:
print(f"✗ Expected stateReport, got {data.get('type')}")
except asyncio.TimeoutError:
print("✗ No response to requestState within 5 seconds")
# Test 3: Set session ID
print("\n3. Testing setSessionId...")
session_message = {
"type": "setSessionId",
"payload": {
"displayIdentifier": "display-001",
"sessionId": 12345
}
}
await websocket.send(json.dumps(session_message))
print("✓ Sent setSessionId message")
# Test 4: Test patchSession
print("\n4. Testing patchSession...")
patch_message = {
"type": "patchSession",
"sessionId": 12345,
"data": {
"currentCar": {
"carModel": "Civic",
"carBrand": "Honda"
}
}
}
await websocket.send(json.dumps(patch_message))
# Wait for patchSessionResult
try:
message = await asyncio.wait_for(websocket.recv(), timeout=5)
data = json.loads(message)
if data.get("type") == "patchSessionResult":
print("✓ Received patchSessionResult")
print(f" - Success: {data.get('payload', {}).get('success')}")
print(f" - Message: {data.get('payload', {}).get('message')}")
else:
print(f"✗ Expected patchSessionResult, got {data.get('type')}")
except asyncio.TimeoutError:
print("✗ No patchSessionResult received within 5 seconds")
# Test 5: Test subscribe message format (without actual camera)
print("\n5. Testing subscribe message format...")
subscribe_message = {
"type": "subscribe",
"payload": {
"subscriptionIdentifier": "display-001;cam-001",
"snapshotUrl": "http://example.com/snapshot.jpg",
"snapshotInterval": 5000,
"modelUrl": "http://example.com/model.mpta",
"modelName": "Test Model",
"modelId": 101,
"cropX1": 100,
"cropY1": 200,
"cropX2": 300,
"cropY2": 400
}
}
await websocket.send(json.dumps(subscribe_message))
print("✓ Sent subscribe message (will fail without actual camera/model)")
# Listen for a few more messages to catch any errors
print("\n6. Listening for additional messages...")
for i in range(3):
try:
message = await asyncio.wait_for(websocket.recv(), timeout=2)
data = json.loads(message)
msg_type = data.get("type")
print(f" - Received {msg_type}")
if msg_type == "error":
print(f" Error: {data.get('error')}")
except asyncio.TimeoutError:
break
print("\n✓ Protocol test completed successfully!")
except Exception as e:
print(f"✗ Connection failed: {e}")
print("Make sure the worker is running on localhost:8000")
if __name__ == "__main__":
asyncio.run(test_protocol())

View file

@ -1,162 +0,0 @@
#!/usr/bin/env python3
"""
Script to view frontal images saved in Redis
"""
import redis
import cv2
import numpy as np
import sys
from datetime import datetime
# Redis connection config (from pipeline.json)
REDIS_CONFIG = {
"host": "10.100.1.3",
"port": 6379,
"password": "FBQgi0i5RevAAMO5Hh66",
"db": 0
}
def connect_redis():
"""Connect to Redis server."""
try:
client = redis.Redis(
host=REDIS_CONFIG["host"],
port=REDIS_CONFIG["port"],
password=REDIS_CONFIG["password"],
db=REDIS_CONFIG["db"],
decode_responses=False # Keep bytes for images
)
client.ping()
print(f"✅ Connected to Redis at {REDIS_CONFIG['host']}:{REDIS_CONFIG['port']}")
return client
except redis.exceptions.ConnectionError as e:
print(f"❌ Failed to connect to Redis: {e}")
return None
def list_image_keys(client):
"""List all image keys in Redis."""
try:
# Look for keys matching the inference pattern
keys = client.keys("inference:*")
print(f"\n📋 Found {len(keys)} image keys:")
for i, key in enumerate(keys):
key_str = key.decode() if isinstance(key, bytes) else key
print(f"{i+1}. {key_str}")
return keys
except Exception as e:
print(f"❌ Error listing keys: {e}")
return []
def view_image(client, key):
"""View a specific image from Redis."""
try:
# Get image data from Redis
image_data = client.get(key)
if image_data is None:
print(f"❌ No data found for key: {key}")
return
print(f"📸 Image size: {len(image_data)} bytes")
# Convert bytes to numpy array
nparr = np.frombuffer(image_data, np.uint8)
# Decode image
img = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
if img is None:
print("❌ Failed to decode image data")
return
print(f"🖼️ Image dimensions: {img.shape[1]}x{img.shape[0]} pixels")
# Display image
key_str = key.decode() if isinstance(key, bytes) else key
cv2.imshow(f'Redis Image: {key_str}', img)
print("👁️ Image displayed. Press any key to close...")
cv2.waitKey(0)
cv2.destroyAllWindows()
# Ask if user wants to save the image
save = input("💾 Save image to file? (y/n): ").lower().strip()
if save == 'y':
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"redis_image_{timestamp}.jpg"
cv2.imwrite(filename, img)
print(f"💾 Image saved as: {filename}")
except Exception as e:
print(f"❌ Error viewing image: {e}")
def monitor_new_images(client):
"""Monitor for new images being added to Redis."""
print("👀 Monitoring for new images... (Press Ctrl+C to stop)")
try:
# Subscribe to Redis pub/sub for car detections
pubsub = client.pubsub()
pubsub.subscribe('car_detections')
for message in pubsub.listen():
if message['type'] == 'message':
data = message['data'].decode()
print(f"🚨 New detection: {data}")
# Try to extract image key from message
import json
try:
detection_data = json.loads(data)
image_key = detection_data.get('image_key')
if image_key:
print(f"🖼️ New image available: {image_key}")
view_choice = input("View this image now? (y/n): ").lower().strip()
if view_choice == 'y':
view_image(client, image_key)
except json.JSONDecodeError:
pass
except KeyboardInterrupt:
print("\n👋 Stopping monitor...")
except Exception as e:
print(f"❌ Monitor error: {e}")
def main():
"""Main function."""
print("🔍 Redis Image Viewer")
print("=" * 50)
# Connect to Redis
client = connect_redis()
if not client:
return
while True:
print("\n📋 Options:")
print("1. List all image keys")
print("2. View specific image")
print("3. Monitor for new images")
print("4. Exit")
choice = input("\nEnter choice (1-4): ").strip()
if choice == '1':
keys = list_image_keys(client)
elif choice == '2':
keys = list_image_keys(client)
if keys:
try:
idx = int(input(f"\nEnter image number (1-{len(keys)}): ")) - 1
if 0 <= idx < len(keys):
view_image(client, keys[idx])
else:
print("❌ Invalid selection")
except ValueError:
print("❌ Please enter a valid number")
elif choice == '3':
monitor_new_images(client)
elif choice == '4':
print("👋 Goodbye!")
break
else:
print("❌ Invalid choice")
if __name__ == "__main__":
main()

View file

@ -1,325 +0,0 @@
#!/usr/bin/env python3
"""
Enhanced webcam server that provides both RTSP streaming and HTTP snapshot endpoints
Compatible with CMS UI requirements for camera configuration
"""
import cv2
import threading
import time
import logging
import socket
from http.server import BaseHTTPRequestHandler, HTTPServer
import subprocess
import sys
import os
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s"
)
logger = logging.getLogger("webcam_rtsp_server")
# Global webcam capture object
webcam_cap = None
rtsp_process = None
class WebcamHTTPHandler(BaseHTTPRequestHandler):
"""HTTP handler for snapshot requests"""
def do_GET(self):
if self.path == '/snapshot' or self.path == '/snapshot.jpg':
try:
# Capture fresh frame from webcam for each request
ret, frame = webcam_cap.read()
if ret and frame is not None:
# Encode as JPEG
success, buffer = cv2.imencode('.jpg', frame, [cv2.IMWRITE_JPEG_QUALITY, 85])
if success:
self.send_response(200)
self.send_header('Content-Type', 'image/jpeg')
self.send_header('Content-Length', str(len(buffer)))
self.send_header('Cache-Control', 'no-cache, no-store, must-revalidate')
self.send_header('Pragma', 'no-cache')
self.send_header('Expires', '0')
self.end_headers()
self.wfile.write(buffer.tobytes())
logger.debug(f"Served webcam snapshot, size: {len(buffer)} bytes")
return
else:
logger.error("Failed to encode frame as JPEG")
else:
logger.error("Failed to capture frame from webcam")
# Send error response
self.send_response(500)
self.send_header('Content-Type', 'text/plain')
self.end_headers()
self.wfile.write(b'Failed to capture webcam frame')
except Exception as e:
logger.error(f"Error serving snapshot: {e}")
self.send_response(500)
self.send_header('Content-Type', 'text/plain')
self.end_headers()
self.wfile.write(f'Error: {str(e)}'.encode())
elif self.path == '/status':
# Status endpoint for health checking
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self.end_headers()
width = int(webcam_cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(webcam_cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = webcam_cap.get(cv2.CAP_PROP_FPS)
status = f'{{"status": "online", "width": {width}, "height": {height}, "fps": {fps}}}'
self.wfile.write(status.encode())
else:
# 404 for other paths
self.send_response(404)
self.send_header('Content-Type', 'text/plain')
self.end_headers()
self.wfile.write(b'Not Found - Available endpoints: /snapshot, /snapshot.jpg, /status')
def log_message(self, format, *args):
# Suppress default HTTP server logging to avoid spam
pass
def check_ffmpeg():
"""Check if FFmpeg is available for RTSP streaming"""
try:
result = subprocess.run(['ffmpeg', '-version'],
capture_output=True, text=True, timeout=5)
if result.returncode == 0:
logger.info("FFmpeg found and working")
return True
except (subprocess.TimeoutExpired, FileNotFoundError, subprocess.SubprocessError):
pass
logger.warning("FFmpeg not found. RTSP streaming will not be available.")
logger.info("To enable RTSP streaming, install FFmpeg:")
logger.info(" Windows: Download from https://ffmpeg.org/download.html")
logger.info(" Linux: sudo apt install ffmpeg")
logger.info(" macOS: brew install ffmpeg")
return False
def get_windows_camera_name():
"""Get the actual camera device name on Windows"""
try:
# List video devices using FFmpeg with proper encoding handling
result = subprocess.run(['ffmpeg', '-f', 'dshow', '-list_devices', 'true', '-i', 'dummy'],
capture_output=True, text=True, timeout=10, encoding='utf-8', errors='ignore')
output = result.stderr # FFmpeg outputs device list to stderr
# Look for video devices in the output
lines = output.split('\n')
video_devices = []
# Parse the output - look for lines with (video) that contain device names in quotes
for line in lines:
if '[dshow @' in line and '(video)' in line and '"' in line:
# Extract device name between first pair of quotes
start = line.find('"') + 1
end = line.find('"', start)
if start > 0 and end > start:
device_name = line[start:end]
video_devices.append(device_name)
logger.info(f"Found Windows video devices: {video_devices}")
if video_devices:
# Force use the first device (index 0) which is the Logitech HD webcam
return video_devices[0] # This will be "罗技高清网络摄像机 C930c"
else:
logger.info("No devices found via FFmpeg detection, using fallback")
# Fall through to fallback names
except Exception as e:
logger.debug(f"Failed to get Windows camera name: {e}")
# Try common camera device names as fallback
# Prioritize Integrated Camera since that's what's working now
common_names = [
"Integrated Camera", # This is working for the current setup
"USB Video Device", # Common name for USB cameras
"USB2.0 Camera",
"C930c", # Direct model name
"HD Pro Webcam C930c", # Full Logitech name
"Logitech", # Brand name
"USB Camera",
"Webcam"
]
logger.info(f"Using fallback camera names: {common_names}")
return common_names[0] # Return "Integrated Camera" first
def start_rtsp_stream(webcam_index=0, rtsp_port=8554):
"""Start RTSP streaming using FFmpeg"""
global rtsp_process
if not check_ffmpeg():
return None
try:
# Get the actual camera device name for Windows
if sys.platform.startswith('win'):
camera_name = get_windows_camera_name()
logger.info(f"Using Windows camera device: {camera_name}")
# FFmpeg command to stream webcam via RTSP
if sys.platform.startswith('win'):
cmd = [
'ffmpeg',
'-f', 'dshow',
'-i', f'video={camera_name}', # Use detected camera name
'-c:v', 'libx264',
'-preset', 'veryfast',
'-tune', 'zerolatency',
'-r', '30',
'-s', '1280x720',
'-f', 'rtsp',
f'rtsp://localhost:{rtsp_port}/stream'
]
elif sys.platform.startswith('linux'):
cmd = [
'ffmpeg',
'-f', 'v4l2',
'-i', f'/dev/video{webcam_index}',
'-c:v', 'libx264',
'-preset', 'veryfast',
'-tune', 'zerolatency',
'-r', '30',
'-s', '1280x720',
'-f', 'rtsp',
f'rtsp://localhost:{rtsp_port}/stream'
]
else: # macOS
cmd = [
'ffmpeg',
'-f', 'avfoundation',
'-i', f'{webcam_index}:',
'-c:v', 'libx264',
'-preset', 'veryfast',
'-tune', 'zerolatency',
'-r', '30',
'-s', '1280x720',
'-f', 'rtsp',
f'rtsp://localhost:{rtsp_port}/stream'
]
logger.info(f"Starting RTSP stream on rtsp://localhost:{rtsp_port}/stream")
logger.info(f"FFmpeg command: {' '.join(cmd)}")
rtsp_process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
# Give FFmpeg a moment to start
time.sleep(2)
# Check if process is still running
if rtsp_process.poll() is None:
logger.info("RTSP streaming started successfully")
return rtsp_process
else:
# Get error output if process failed
stdout, stderr = rtsp_process.communicate(timeout=2)
logger.error("RTSP streaming failed to start")
logger.error(f"FFmpeg stdout: {stdout}")
logger.error(f"FFmpeg stderr: {stderr}")
return None
except Exception as e:
logger.error(f"Failed to start RTSP stream: {e}")
return None
def get_local_ip():
"""Get the Wireguard IP address for external access"""
# Use Wireguard IP for external access
return "10.101.1.4"
def main():
global webcam_cap, rtsp_process
# Configuration - Force use index 0 for Logitech HD webcam
webcam_index = 0 # Logitech HD webcam C930c (1920x1080@30fps)
http_port = 8080
rtsp_port = 8554
logger.info("=== Webcam RTSP & HTTP Server ===")
# Initialize webcam
logger.info("Initializing webcam...")
webcam_cap = cv2.VideoCapture(webcam_index)
if not webcam_cap.isOpened():
logger.error(f"Failed to open webcam at index {webcam_index}")
logger.info("Try different webcam indices (0, 1, 2, etc.)")
return
# Set webcam properties - Use high resolution for Logitech HD webcam
webcam_cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1920)
webcam_cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 1080)
webcam_cap.set(cv2.CAP_PROP_FPS, 30)
width = int(webcam_cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(webcam_cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = webcam_cap.get(cv2.CAP_PROP_FPS)
logger.info(f"Webcam initialized: {width}x{height} @ {fps}fps")
# Get local IP for CMS configuration
local_ip = get_local_ip()
# Start RTSP streaming (optional, requires FFmpeg)
rtsp_process = start_rtsp_stream(webcam_index, rtsp_port)
# Start HTTP server for snapshots
server_address = ('0.0.0.0', http_port) # Bind to all interfaces
http_server = HTTPServer(server_address, WebcamHTTPHandler)
logger.info("\n=== Server URLs for CMS Configuration ===")
logger.info(f"HTTP Snapshot URL: http://{local_ip}:{http_port}/snapshot")
if rtsp_process:
logger.info(f"RTSP Stream URL: rtsp://{local_ip}:{rtsp_port}/stream")
else:
logger.info("RTSP Stream: Not available (FFmpeg not found)")
logger.info("HTTP-only mode: Use Snapshot URL for camera input")
logger.info(f"Status URL: http://{local_ip}:{http_port}/status")
logger.info("\n=== CMS Configuration Suggestions ===")
logger.info(f"Camera Identifier: webcam-local-01")
logger.info(f"RTSP Stream URL: rtsp://{local_ip}:{rtsp_port}/stream")
logger.info(f"Snapshot URL: http://{local_ip}:{http_port}/snapshot")
logger.info(f"Snapshot Interval: 2000 (ms)")
logger.info("\nPress Ctrl+C to stop all servers")
try:
# Start HTTP server
http_server.serve_forever()
except KeyboardInterrupt:
logger.info("Shutting down servers...")
finally:
# Clean up
if webcam_cap:
webcam_cap.release()
if rtsp_process:
logger.info("Stopping RTSP stream...")
rtsp_process.terminate()
try:
rtsp_process.wait(timeout=5)
except subprocess.TimeoutExpired:
rtsp_process.kill()
http_server.server_close()
logger.info("All servers stopped")
if __name__ == "__main__":
main()

495
worker.md
View file

@ -1,495 +0,0 @@
# Worker Communication Protocol
This document outlines the WebSocket-based communication protocol between the CMS backend and a detector worker. As a worker developer, your primary responsibility is to implement a WebSocket server that adheres to this protocol.
## 1. Connection
The worker must run a WebSocket server, preferably on port `8000`. The backend system, which is managed by a container orchestration service, will automatically discover and establish a WebSocket connection to your worker.
Upon a successful connection from the backend, you should begin sending `stateReport` messages as heartbeats.
## 2. Communication Overview
Communication is bidirectional and asynchronous. All messages are JSON objects with a `type` field that indicates the message's purpose, and an optional `payload` field containing the data.
- **Worker -> Backend:** You will send messages to the backend to report status, forward detection events, or request changes to session data.
- **Backend -> Worker:** The backend will send commands to you to manage camera subscriptions.
## 3. Dynamic Configuration via MPTA File
To enable modularity and dynamic configuration, the backend will send you a URL to a `.mpta` file when it issues a `subscribe` command. This file is a renamed `.zip` archive that contains everything your worker needs to perform its task.
**Your worker is responsible for:**
1. Fetching this file from the provided URL.
2. Extracting its contents.
3. Interpreting the contents to configure its internal pipeline.
**The contents of the `.mpta` file are entirely up to the user who configures the model in the CMS.** This allows for maximum flexibility. For example, the archive could contain:
- AI/ML Models: Pre-trained models for libraries like TensorFlow, PyTorch, or ONNX.
- Configuration Files: A `config.json` or `pipeline.yaml` that defines a sequence of operations, specifies model paths, or sets detection thresholds.
- Scripts: Custom Python scripts for pre-processing or post-processing.
- API Integration Details: A JSON file with endpoint information and credentials for interacting with third-party detection services.
Essentially, the `.mpta` file is a self-contained package that tells your worker _how_ to process the video stream for a given subscription.
## 4. Messages from Worker to Backend
These are the messages your worker is expected to send to the backend.
### 4.1. State Report (Heartbeat)
This message is crucial for the backend to monitor your worker's health and status, including GPU usage.
- **Type:** `stateReport`
- **When to Send:** Periodically (e.g., every 2 seconds) after a connection is established.
**Payload:**
```json
{
"type": "stateReport",
"cpuUsage": 75.5,
"memoryUsage": 40.2,
"gpuUsage": 60.0,
"gpuMemoryUsage": 25.1,
"cameraConnections": [
{
"subscriptionIdentifier": "display-001;cam-001",
"modelId": 101,
"modelName": "General Object Detection",
"online": true,
"cropX1": 100,
"cropY1": 200,
"cropX2": 300,
"cropY2": 400
}
]
}
```
> **Note:**
>
> - `cropX1`, `cropY1`, `cropX2`, `cropY2` (optional, integer) should be included in each camera connection to indicate the crop coordinates for that subscription.
### 4.2. Image Detection
Sent when the worker detects a relevant object. The `detection` object should be flat and contain key-value pairs corresponding to the detected attributes.
- **Type:** `imageDetection`
**Payload Example:**
```json
{
"type": "imageDetection",
"subscriptionIdentifier": "display-001;cam-001",
"timestamp": "2025-07-14T12:34:56.789Z",
"data": {
"detection": {
"carModel": "Civic",
"carBrand": "Honda",
"carYear": 2023,
"bodyType": "Sedan",
"licensePlateText": "ABCD1234",
"licensePlateConfidence": 0.95
},
"modelId": 101,
"modelName": "US-LPR-and-Vehicle-ID"
}
}
```
### 4.3. Patch Session
> **Note:** Patch messages are only used when the worker can't keep up and needs to retroactively send detections. Normally, detections should be sent in real-time using `imageDetection` messages. Use `patchSession` only to update session data after the fact.
Allows the worker to request a modification to an active session's data. The `data` payload must be a partial object of the `DisplayPersistentData` structure.
- **Type:** `patchSession`
**Payload Example:**
```json
{
"type": "patchSession",
"sessionId": 12345,
"data": {
"currentCar": {
"carModel": "Civic",
"carBrand": "Honda",
"licensePlateText": "ABCD1234"
}
}
}
```
The backend will respond with a `patchSessionResult` command.
#### `DisplayPersistentData` Structure
The `data` object in the `patchSession` message is merged with the existing `DisplayPersistentData` on the backend. Here is its structure:
```typescript
interface DisplayPersistentData {
progressionStage:
| 'welcome'
| 'car_fueling'
| 'car_waitpayment'
| 'car_postpayment'
| null;
qrCode: string | null;
adsPlayback: {
playlistSlotOrder: number; // The 'order' of the current slot
adsId: number | null;
adsUrl: string | null;
} | null;
currentCar: {
carModel?: string;
carBrand?: string;
carYear?: number;
bodyType?: string;
licensePlateText?: string;
licensePlateType?: string;
} | null;
fuelPump: {
/* FuelPumpData structure */
} | null;
weatherData: {
/* WeatherResponse structure */
} | null;
sessionId: number | null;
}
```
#### Patching Behavior
- The patch is a **deep merge**.
- **`undefined`** values are ignored.
- **`null`** values will set the corresponding field to `null`.
- Nested objects are merged recursively.
## 5. Commands from Backend to Worker
These are the commands your worker will receive from the backend.
### 5.1. Subscribe to Camera
Instructs the worker to process a camera's RTSP stream using the configuration from the specified `.mpta` file.
- **Type:** `subscribe`
**Payload:**
```json
{
"type": "subscribe",
"payload": {
"subscriptionIdentifier": "display-001;cam-002",
"rtspUrl": "rtsp://user:pass@host:port/stream",
"snapshotUrl": "http://go2rtc/snapshot/1",
"snapshotInterval": 5000,
"modelUrl": "http://storage/models/us-lpr.mpta",
"modelName": "US-LPR-and-Vehicle-ID",
"modelId": 102,
"cropX1": 100,
"cropY1": 200,
"cropX2": 300,
"cropY2": 400
}
}
```
> **Note:**
>
> - `cropX1`, `cropY1`, `cropX2`, `cropY2` (optional, integer) specify the crop coordinates for the camera stream. These values are configured per display and passed in the subscription payload. If not provided, the worker should process the full frame.
>
> **Important:**
> If multiple displays are bound to the same camera, your worker must ensure that only **one stream** is opened per camera. When you receive multiple subscriptions for the same camera (with different `subscriptionIdentifier` values), you should:
>
> - Open the RTSP stream **once** for that camera if using RTSP.
> - Capture each snapshot only once per cycle, and reuse it for all display subscriptions sharing that camera.
> - Capture each frame/image only once per cycle.
> - Reuse the same captured image and snapshot for all display subscriptions that share the camera, processing and routing detection results separately for each display as needed.
> This avoids unnecessary load and bandwidth usage, and ensures consistent detection results and snapshots across all displays sharing the same camera.
### 5.2. Unsubscribe from Camera
Instructs the worker to stop processing a camera's stream.
- **Type:** `unsubscribe`
**Payload:**
```json
{
"type": "unsubscribe",
"payload": {
"subscriptionIdentifier": "display-001;cam-002"
}
}
```
### 5.3. Request State
Direct request for the worker's current state. Respond with a `stateReport` message.
- **Type:** `requestState`
**Payload:**
```json
{
"type": "requestState"
}
```
### 5.4. Patch Session Result
Backend's response to a `patchSession` message.
- **Type:** `patchSessionResult`
**Payload:**
```json
{
"type": "patchSessionResult",
"payload": {
"sessionId": 12345,
"success": true,
"message": "Session updated successfully."
}
}
```
### 5.5. Set Session ID
Allows the backend to instruct the worker to associate a session ID with a subscription. This is useful for linking detection events to a specific session. The session ID can be `null` to indicate no active session.
- **Type:** `setSessionId`
**Payload:**
```json
{
"type": "setSessionId",
"payload": {
"displayIdentifier": "display-001",
"sessionId": 12345
}
}
```
Or to clear the session:
```json
{
"type": "setSessionId",
"payload": {
"displayIdentifier": "display-001",
"sessionId": null
}
}
```
> **Note:**
>
> - The worker should store the session ID for the given subscription and use it in subsequent detection or patch messages as appropriate. If `sessionId` is `null`, the worker should treat the subscription as having no active session.
## Subscription Identifier Format
The `subscriptionIdentifier` used in all messages is constructed as:
```
displayIdentifier;cameraIdentifier
```
This uniquely identifies a camera subscription for a specific display.
### Session ID Association
When the backend sends a `setSessionId` command, it will only provide the `displayIdentifier` (not the full `subscriptionIdentifier`).
**Worker Responsibility:**
- The worker must match the `displayIdentifier` to all active subscriptions for that display (i.e., all `subscriptionIdentifier` values that start with `displayIdentifier;`).
- The worker should set or clear the session ID for all matching subscriptions.
## 6. Example Communication Log
This section shows a typical sequence of messages between the backend and the worker. Patch messages are not included, as they are only used when the worker cannot keep up.
> **Note:** Unsubscribe is triggered when a user removes a camera or when the node is too heavily loaded and needs rebalancing.
1. **Connection Established** & **Heartbeat**
- **Worker -> Backend**
```json
{
"type": "stateReport",
"cpuUsage": 70.2,
"memoryUsage": 38.1,
"gpuUsage": 55.0,
"gpuMemoryUsage": 20.0,
"cameraConnections": []
}
```
2. **Backend Subscribes Camera**
- **Backend -> Worker**
```json
{
"type": "subscribe",
"payload": {
"subscriptionIdentifier": "display-001;entry-cam-01",
"rtspUrl": "rtsp://192.168.1.100/stream1",
"modelUrl": "http://storage/models/vehicle-id.mpta",
"modelName": "Vehicle Identification",
"modelId": 201
}
}
```
3. **Worker Acknowledges in Heartbeat**
- **Worker -> Backend**
```json
{
"type": "stateReport",
"cpuUsage": 72.5,
"memoryUsage": 39.0,
"gpuUsage": 57.0,
"gpuMemoryUsage": 21.0,
"cameraConnections": [
{
"subscriptionIdentifier": "display-001;entry-cam-01",
"modelId": 201,
"modelName": "Vehicle Identification",
"online": true
}
]
}
```
4. **Worker Detects a Car**
- **Worker -> Backend**
```json
{
"type": "imageDetection",
"subscriptionIdentifier": "display-001;entry-cam-01",
"timestamp": "2025-07-15T10:00:00.000Z",
"data": {
"detection": {
"carBrand": "Honda",
"carModel": "CR-V",
"bodyType": "SUV",
"licensePlateText": "GEMINI-AI",
"licensePlateConfidence": 0.98
},
"modelId": 201,
"modelName": "Vehicle Identification"
}
}
```
- **Worker -> Backend**
```json
{
"type": "imageDetection",
"subscriptionIdentifier": "display-001;entry-cam-01",
"timestamp": "2025-07-15T10:00:01.000Z",
"data": {
"detection": {
"carBrand": "Toyota",
"carModel": "Corolla",
"bodyType": "Sedan",
"licensePlateText": "CMS-1234",
"licensePlateConfidence": 0.97
},
"modelId": 201,
"modelName": "Vehicle Identification"
}
}
```
- **Worker -> Backend**
```json
{
"type": "imageDetection",
"subscriptionIdentifier": "display-001;entry-cam-01",
"timestamp": "2025-07-15T10:00:02.000Z",
"data": {
"detection": {
"carBrand": "Ford",
"carModel": "Focus",
"bodyType": "Hatchback",
"licensePlateText": "CMS-5678",
"licensePlateConfidence": 0.96
},
"modelId": 201,
"modelName": "Vehicle Identification"
}
}
```
5. **Backend Unsubscribes Camera**
- **Backend -> Worker**
```json
{
"type": "unsubscribe",
"payload": {
"subscriptionIdentifier": "display-001;entry-cam-01"
}
}
```
6. **Worker Acknowledges Unsubscription**
- **Worker -> Backend**
```json
{
"type": "stateReport",
"cpuUsage": 68.0,
"memoryUsage": 37.0,
"gpuUsage": 50.0,
"gpuMemoryUsage": 18.0,
"cameraConnections": []
}
```
## 7. HTTP API: Image Retrieval
In addition to the WebSocket protocol, the worker exposes an HTTP endpoint for retrieving the latest image frame from a camera.
### Endpoint
```
GET /camera/{camera_id}/image
```
- **`camera_id`**: The full `subscriptionIdentifier` (e.g., `display-001;cam-001`).
### Response
- **Success (200):** Returns the latest JPEG image from the camera stream.
- `Content-Type: image/jpeg`
- Binary JPEG data.
- **Error (404):** If the camera is not found or no frame is available.
- JSON error response.
- **Error (500):** Internal server error.
### Example Request
```
GET /camera/display-001;cam-001/image
```
### Example Response
- **Headers:**
```
Content-Type: image/jpeg
```
- **Body:** Binary JPEG image.
### Notes
- The endpoint returns the most recent frame available for the specified camera subscription.
- If multiple displays share the same camera, each subscription has its own buffer; the endpoint uses the buffer for the given `camera_id`.
- This API is useful for debugging, monitoring, or integrating with external systems that require direct image access.