All checks were successful
Build Worker Base and Application Images / check-base-changes (push) Successful in 8s
Build Worker Base and Application Images / build-base (push) Has been skipped
Build Worker Base and Application Images / build-docker (push) Successful in 2m10s
Build Worker Base and Application Images / deploy-stack (push) Successful in 17s
1498 lines
No EOL
59 KiB
Markdown
1498 lines
No EOL
59 KiB
Markdown
# Worker Connection Architecture Specification - Pure Declarative State Management
|
|
|
|
## Overview
|
|
|
|
The Camera Module implements a pure VMware DRS-like declarative architecture for managing connections to Python ML workers. This system uses the database as the single source of truth for desired subscription state, with automatic regeneration and reconciliation providing intelligent camera management, real-time object detection, and AI-powered content selection with automatic load balancing capabilities.
|
|
|
|
**Key Architectural Principle**: Database mutations trigger complete state regeneration rather than incremental updates, ensuring consistency and eliminating complex state synchronization issues.
|
|
|
|
## Architecture Components
|
|
|
|
### Two-Cluster System
|
|
|
|
The system consists of two distinct but coordinated clusters:
|
|
|
|
1. **Backend Process Cluster**: Multiple CMS backend processes with leader election
|
|
2. **Worker Cluster**: Python ML workers for object detection processing
|
|
|
|
### Master-Slave WebSocket Architecture
|
|
|
|
- **Master Process**: Single elected backend process that maintains WebSocket connections to Python workers
|
|
- **Slave Processes**: All other backend processes that handle message routing and processing
|
|
- **Message Routing**: Master forwards worker messages to assigned slaves via Redis pub/sub channels
|
|
- **MasterElection Integration**: Automated master/slave role management with event-driven transitions
|
|
- **Seamless Scaling**: Backend processes can be added/removed without affecting WebSocket connections
|
|
|
|
## Core Components
|
|
|
|
### DetectorCluster
|
|
`cms-backend/modules/camera/services/DetectorCluster.ts`
|
|
|
|
Primary interface for camera operations that abstracts the underlying distributed architecture.
|
|
|
|
**Key Responsibilities:**
|
|
- Routes camera subscription requests through the cluster
|
|
- Manages detection callback registration and event emission
|
|
- Bridges CameraService with underlying MasterSlaveWorkerCluster
|
|
- Provides unified API regardless of master/slave status
|
|
|
|
### MasterSlaveWorkerCluster
|
|
`cms-backend/modules/camera/services/MasterSlaveWorkerCluster.ts`
|
|
|
|
Core distributed cluster implementation that handles declarative state management and worker assignment reconciliation.
|
|
|
|
**Master Mode Responsibilities:**
|
|
- Maintains WebSocket connections to all Python workers
|
|
- Manages desired vs actual subscription state separation
|
|
- Implements VMware DRS-like global rebalancing algorithm
|
|
- Processes automatic reconciliation every 30 seconds
|
|
- Responds to slave join/leave events from MasterElection
|
|
- Generates fresh pre-signed model URLs for worker assignments
|
|
|
|
**Slave Mode Responsibilities:**
|
|
- Submits desired subscription state changes to master
|
|
- Processes detection results routed from master
|
|
- Event-driven role transitions managed by MasterElection
|
|
- No direct worker management (delegated to master)
|
|
|
|
### DetectorConnection
|
|
`cms-backend/modules/camera/services/DetectorConnection.ts`
|
|
|
|
Individual WebSocket connection handler for Python workers.
|
|
|
|
**Key Features:**
|
|
- Connection lifecycle management (connect, disconnect, reconnect)
|
|
- Exponential backoff reconnection with 10-second intervals
|
|
- Subscription state management and restoration after reconnection
|
|
- Real-time heartbeat monitoring with 10-second timeout
|
|
- Resource usage tracking (CPU, memory, GPU)
|
|
|
|
## Data Structures
|
|
|
|
### WorkerConnectionState
|
|
```typescript
|
|
interface WorkerConnectionState {
|
|
url: string; // Worker WebSocket URL
|
|
processId: string; // Backend process managing this worker
|
|
online: boolean; // Connection status
|
|
cpuUsage: number | null; // Worker CPU utilization
|
|
memoryUsage: number | null; // Worker memory usage
|
|
gpuUsage: number | null; // Worker GPU utilization
|
|
gpuMemoryUsage: number | null; // Worker GPU memory usage
|
|
subscriptionCount: number; // Active camera subscriptions
|
|
subscriptions: string[]; // List of subscription identifiers
|
|
lastHeartbeat: string; // Last heartbeat timestamp
|
|
connectedAt: string; // Connection established timestamp
|
|
}
|
|
```
|
|
|
|
### DesiredCameraSubscription
|
|
```typescript
|
|
interface DesiredCameraSubscription {
|
|
subscriptionIdentifier: string; // Format: ${displayId};${cameraId}
|
|
rtspUrl: string; // Camera RTSP stream URL
|
|
modelId: number; // AI model database ID
|
|
modelName: string; // AI model identifier
|
|
createdAt: string; // Subscription creation timestamp
|
|
|
|
// Snapshot configuration
|
|
snapshotUrl?: string; // Optional snapshot endpoint URL
|
|
snapshotInterval?: number; // Snapshot interval in milliseconds
|
|
|
|
// Image cropping parameters
|
|
cropX1?: number; // Crop region top-left X
|
|
cropY1?: number; // Crop region top-left Y
|
|
cropX2?: number; // Crop region bottom-right X
|
|
cropY2?: number; // Crop region bottom-right Y
|
|
}
|
|
```
|
|
|
|
### ActualCameraSubscription
|
|
```typescript
|
|
interface ActualCameraSubscription {
|
|
subscriptionIdentifier: string; // Format: ${displayId};${cameraId}
|
|
assignedWorkerUrl: string; // Worker handling this subscription
|
|
modelUrl: string; // AI model presigned URL (1hr TTL)
|
|
status: 'active' | 'pending' | 'failed' | 'recovering';
|
|
assignedAt: string; // Worker assignment timestamp
|
|
lastSeen: string; // Last activity timestamp
|
|
}
|
|
```
|
|
|
|
### SlaveState
|
|
```typescript
|
|
interface SlaveState {
|
|
slaveId: string; // Unique slave identifier (process ID)
|
|
processId: string; // Backend process ID (same as slaveId)
|
|
online: boolean; // Always true (maintained by MasterElection)
|
|
workload: number; // Number of assigned workers (calculated)
|
|
lastSeen: string; // Last heartbeat from MasterElection
|
|
capabilities?: Record<string, any>; // Metadata from MasterElection
|
|
}
|
|
```
|
|
|
|
### DetectorWorkerCommand
|
|
```typescript
|
|
interface DetectorWorkerCommand {
|
|
type: DetectorWorkerCommandType;
|
|
payload?: {
|
|
subscriptionIdentifier: string;
|
|
rtspUrl: string;
|
|
snapshotUrl?: string;
|
|
snapshotInterval?: number;
|
|
modelUrl: string;
|
|
modelName: string;
|
|
modelId: number;
|
|
cropX1?: number;
|
|
cropY1?: number;
|
|
cropX2?: number;
|
|
cropY2?: number;
|
|
};
|
|
}
|
|
|
|
enum DetectorWorkerCommandType {
|
|
SUBSCRIBE = "subscribe",
|
|
UNSUBSCRIBE = "unsubscribe",
|
|
REQUEST_STATE = "requestState",
|
|
PATCH_SESSION_RESULT = "patchSessionResult",
|
|
SET_SESSION_ID = "setSessionId"
|
|
}
|
|
```
|
|
|
|
### ImageDetectionResponse
|
|
```typescript
|
|
interface ImageDetectionResponse {
|
|
subscriptionIdentifier: string;
|
|
timestamp: Date;
|
|
data: {
|
|
detection: {
|
|
carModel?: string;
|
|
carBrand?: string;
|
|
carYear?: number;
|
|
bodyType?: string;
|
|
licensePlateText?: string;
|
|
licensePlateType?: string;
|
|
};
|
|
modelId: number;
|
|
modelName: string;
|
|
};
|
|
}
|
|
```
|
|
|
|
## Redis Data Architecture
|
|
|
|
### Persistent Storage Keys
|
|
- `worker:connections` - Worker connection states and health metrics
|
|
- `worker:assignments` - Worker-to-slave assignment mappings
|
|
- `worker:desired_subscriptions` - Desired camera subscription state (user intent)
|
|
- `worker:actual_subscriptions` - Actual worker subscription assignments (system state)
|
|
- `master-election:slaves` - Slave registration and heartbeat (managed by MasterElection)
|
|
|
|
### Communication Channels
|
|
- `worker:slave:{slaveId}` - Individual slave message routing channels
|
|
- `worker:messages:upstream` - Worker-to-master communication channel (currently unused)
|
|
- `worker:assignments:changed` - Assignment change broadcast notifications
|
|
- `worker:master:commands` - Database change notification channel (slaves → master)
|
|
|
|
### Data Persistence Strategy
|
|
All Redis data uses **manual cleanup only** (no TTL) to ensure:
|
|
- Reliable state recovery after process restarts
|
|
- Consistent subscription persistence across failovers
|
|
- Predictable cleanup during planned maintenance
|
|
- Debug visibility into system state history
|
|
|
|
## Pure Declarative Architecture (VMware DRS-like)
|
|
|
|
### Concept Overview
|
|
The system implements a pure declarative approach similar to VMware Distributed Resource Scheduler (DRS), where:
|
|
- **Database**: Single source of truth for desired state (Display+Camera+Playlist combinations)
|
|
- **Actual State**: What subscriptions are currently running on workers (stored in `worker:actual_subscriptions`)
|
|
- **Regeneration**: Master regenerates complete desired state from database on every change notification
|
|
- **Reconciliation**: Master continuously reconciles desired vs actual state via global rebalancing
|
|
|
|
### Pure Declarative Benefits
|
|
- **Database as Truth**: Desired state always derived fresh from database, eliminating state synchronization issues
|
|
- **Zero Incremental Updates**: No complex state management, just "regenerate everything on change"
|
|
- **Automatic Recovery**: System heals itself by comparing database state vs actual worker state
|
|
- **Load Balancing**: Global optimization across all workers and subscriptions
|
|
- **Fault Tolerance**: Desired state survives all failures since it's always derived from database
|
|
- **Simplicity**: Database mutations just trigger regeneration - no complex command protocols
|
|
|
|
### Pure Declarative Flow
|
|
```typescript
|
|
// Triggered by any database change
|
|
async handleDatabaseChange(changeType: string, entityId: string) {
|
|
// 1. Any process detects database change
|
|
await triggerSubscriptionUpdate(changeType, entityId);
|
|
|
|
// 2. Master receives regeneration request
|
|
async handleMasterCommand(message) {
|
|
if (data.type === 'regenerate_subscriptions') {
|
|
await regenerateDesiredStateFromDatabase();
|
|
}
|
|
}
|
|
|
|
// 3. Master regenerates complete desired state from database
|
|
async regenerateDesiredStateFromDatabase() {
|
|
const activeDisplays = await db.display.findMany({
|
|
where: {
|
|
AND: [
|
|
{ cameraIdentifier: { not: null } },
|
|
{ playlistId: { not: null } }
|
|
]
|
|
},
|
|
include: { camera: true, playlist: { include: { model: true } } }
|
|
});
|
|
|
|
// Generate fresh desired subscriptions from database
|
|
await storeDesiredSubscriptions(generateFromDisplays(activeDisplays));
|
|
|
|
// Trigger reconciliation
|
|
await rebalanceCameraSubscriptions();
|
|
}
|
|
|
|
// 4. Reconciliation (same VMware DRS algorithm)
|
|
async rebalanceCameraSubscriptions() {
|
|
const desired = await getDesiredSubscriptions(); // Fresh from database
|
|
const actual = await getActualSubscriptions(); // Current worker state
|
|
|
|
// Find and fix differences using load balancing
|
|
await reconcileDifferences(desired, actual);
|
|
}
|
|
}
|
|
|
|
// VMware DRS-like worker selection (unchanged)
|
|
function findBestWorkerVMwareDRS(workers, currentLoads) {
|
|
return workers
|
|
.map(worker => ({
|
|
worker,
|
|
score: (currentLoads.get(worker.url) * 0.4) + // 40% load balance
|
|
(worker.cpuUsage * 0.35) + // 35% CPU usage
|
|
(worker.memoryUsage * 0.25) // 25% memory usage
|
|
}))
|
|
.sort((a, b) => a.score - b.score)[0].worker; // Lower score = better
|
|
}
|
|
```
|
|
|
|
### Simplified Reconciliation Flow
|
|
1. **Database Change**: Any process modifies database (Display, Camera, Playlist, Model)
|
|
2. **Trigger Notification**: Process sends `regenerate_subscriptions` to `worker:master:commands`
|
|
3. **Complete Regeneration**: Master queries database for all active Display+Camera+Playlist combinations
|
|
4. **Desired State Creation**: Master generates fresh desired subscriptions from database query results
|
|
5. **Diff Analysis**: Master compares fresh desired state vs current actual state on workers
|
|
6. **Global Reconciliation**: Master applies VMware DRS algorithm to reconcile differences
|
|
7. **Worker Commands**: Master sends subscription/unsubscription commands to workers
|
|
8. **State Update**: Master updates actual subscription state in Redis
|
|
|
|
### Key Simplifications vs Previous Architecture
|
|
- **No Incremental State Management**: No complex tracking of individual subscription changes
|
|
- **No State Synchronization Issues**: Desired state always freshly derived from database
|
|
- **No Complex Command Protocols**: Only one command type: `regenerate_subscriptions`
|
|
- **No Partial Update Bugs**: Complete regeneration eliminates edge cases and race conditions
|
|
- **Zero Database-Redis Divergence**: Database is always the authoritative source
|
|
- **Simpler Service Layer**: Services just update database + trigger, no subscription logic
|
|
|
|
## Class Responsibilities Overview
|
|
|
|
### Core Class Functions
|
|
|
|
| Class | Primary Responsibility | Key Functions | Process Type |
|
|
|-------|----------------------|---------------|--------------|
|
|
| **DetectorCluster** | Public API facade and event management | • `subscribeToCamera()` - Legacy interface (triggers regeneration)<br/>• `addDetectionListener()` - Callback registration<br/>• `getState()` - Cluster monitoring<br/>• Event emission to external services | Both Master & Slave |
|
|
| **MasterSlaveWorkerCluster** | Pure declarative cluster coordination | **Master**: `regenerateDesiredStateFromDatabase()`, `rebalanceCameraSubscriptions()`, `connectToAllWorkers()`<br/>**Slave**: Minimal role - just routes detection messages<br/>**Both**: `handleDetectionMessage()` for callbacks | Both (different roles) |
|
|
| **DetectorConnection** | Individual worker WebSocket management | • `initialize()` - WebSocket connection setup<br/>• `subscribeToCamera()` - Send subscription to worker<br/>• `handleImageDetectionResponse()` - Process AI results<br/>• `resubscribeAll()` - Restore subscriptions after reconnect | Master Only |
|
|
| **CameraService** | Database operations + trigger notifications | • `addCamera()` - Database create + trigger regeneration<br/>• `updateCamera()` - Database update + trigger regeneration<br/>• `removeCamera()` - Database delete + trigger regeneration | Both Master & Slave |
|
|
| **DisplayService** | Database operations + trigger notifications | • `registerDisplay()` - Database create + trigger regeneration<br/>• `updateDisplay()` - Database update + trigger regeneration<br/>• `deleteDisplay()` - Database delete + trigger regeneration | Both Master & Slave |
|
|
| **SubscriptionTrigger** | Simple notification system | • `triggerSubscriptionUpdate()` - Send regeneration request to master | Both Master & Slave |
|
|
|
|
## Object Relationship Diagrams
|
|
|
|
### Core Class Structure and Methods
|
|
|
|
```mermaid
|
|
classDiagram
|
|
class CameraService {
|
|
+addCamera(identifier, rtspUrl)
|
|
+removeCamera(identifier)
|
|
+resubscribeCamera(identifier)
|
|
+getCameras()
|
|
+updateCamera(...)
|
|
-processDetection(data)
|
|
}
|
|
|
|
class DetectorCluster {
|
|
+initialize()
|
|
+subscribeToCamera(...)
|
|
+unsubscribeFromCamera(subscriptionId)
|
|
+unsubscribeFromAllWithCameraID(cameraId)
|
|
+getState()
|
|
+addDetectionListener(subscriptionId, callback)
|
|
+addGlobalDetectionListener(callback)
|
|
-handleWorkerDetection(data)
|
|
}
|
|
|
|
class MasterSlaveWorkerCluster {
|
|
+initialize()
|
|
+subscribeToCamera(...)
|
|
+storeCameraSubscription(subscription)
|
|
+getClusterState()
|
|
+shutdown()
|
|
-connectToAllWorkers() [MASTER]
|
|
-rebalanceCameraSubscriptions() [MASTER]
|
|
-triggerRebalancing() [MASTER]
|
|
-becomeMaster()
|
|
-becomeSlave()
|
|
-setupMasterElectionListeners()
|
|
}
|
|
|
|
class DetectorConnection {
|
|
+initialize()
|
|
+subscribeToCamera(...)
|
|
+unsubscribeFromCamera(subscriptionId)
|
|
+getCameraImage(cameraId)
|
|
+setSessionId(displayId, sessionId)
|
|
+getState()
|
|
-connect()
|
|
-resubscribeAll()
|
|
-handleImageDetectionResponse(data)
|
|
-scheduleReconnect()
|
|
}
|
|
|
|
CameraService --> DetectorCluster : "subscribeToCamera()\ngetState()"
|
|
DetectorCluster --> MasterSlaveWorkerCluster : "initialize()\nstoreCameraSubscription()"
|
|
MasterSlaveWorkerCluster --> DetectorConnection : "[MASTER] creates connections"
|
|
```
|
|
|
|
### Direct Function Call Relationships
|
|
|
|
```mermaid
|
|
graph TD
|
|
API[API Routes] --> CS[CameraService]
|
|
CS --> |subscribeToCamera<br/>getState<br/>unsubscribeFromAllWithCameraID| DC[DetectorCluster]
|
|
DC --> |initialize<br/>storeCameraSubscription<br/>getClusterState<br/>subscribeToCamera| MSC[MasterSlaveWorkerCluster]
|
|
|
|
subgraph "Master Process Only"
|
|
MSC --> |connectToAllWorkers<br/>creates connections| CONN[DetectorConnection]
|
|
CONN --> |WebSocket calls| PW[Python ML Worker]
|
|
end
|
|
|
|
ME[MasterElection] --> |getIsMaster<br/>getNodeId<br/>getSlaves| MSC
|
|
WL[WorkerLogger] --> |attachToDetectorCluster| DC
|
|
|
|
classDef masterOnly fill:#ffcccc
|
|
classDef external fill:#ffffcc
|
|
|
|
class CONN masterOnly
|
|
class PW external
|
|
class API external
|
|
```
|
|
|
|
### Event-Driven Communication
|
|
|
|
```mermaid
|
|
graph LR
|
|
subgraph "Internal Events"
|
|
MSC[MasterSlaveWorkerCluster] -.-> |emit detection| DC[DetectorCluster]
|
|
MSC -.-> |emit worker:online<br/>emit worker:offline| DC
|
|
DC -.-> |emit worker:detection_result<br/>emit worker:online<br/>emit worker:offline| CS[CameraService]
|
|
DC -.-> |emit events| WL[WorkerLogger]
|
|
ME[MasterElection] -.-> |master-acquired<br/>master-lost<br/>slave-registered<br/>slave-removed| MSC
|
|
end
|
|
|
|
subgraph "Callback System"
|
|
CS -.-> |callback registration| DC
|
|
DC -.-> |detection callbacks| CS
|
|
end
|
|
|
|
subgraph "WebSocket Events (Master Only)"
|
|
CONN[DetectorConnection] -.-> |handleWorkerMessage<br/>handleWorkerOnline<br/>handleWorkerOffline| MSC
|
|
PW[Python ML Worker] -.-> |IMAGE_DETECTION<br/>STATE_REPORT| CONN
|
|
end
|
|
|
|
classDef events fill:#e6f3ff
|
|
classDef callbacks fill:#fff2e6
|
|
classDef websocket fill:#ffe6e6
|
|
|
|
class MSC,DC,CS,WL events
|
|
class CONN,PW websocket
|
|
```
|
|
|
|
### Redis Communication Patterns
|
|
|
|
```mermaid
|
|
graph TB
|
|
subgraph "Master Process"
|
|
M[Master MasterSlaveWorkerCluster]
|
|
end
|
|
|
|
subgraph "Slave Processes"
|
|
S1[Slave Process 1]
|
|
S2[Slave Process 2]
|
|
end
|
|
|
|
subgraph "Redis Channels"
|
|
SC1[worker:slave:slave1]
|
|
SC2[worker:slave:slave2]
|
|
MC[worker:master:commands]
|
|
AC[worker:assignments:changed]
|
|
end
|
|
|
|
subgraph "Redis Storage"
|
|
WC[worker:connections]
|
|
WA[worker:assignments]
|
|
WS[worker:slaves]
|
|
CS[worker:camera_subscriptions]
|
|
end
|
|
|
|
M --> |publish detection routing| SC1
|
|
M --> |publish detection routing| SC2
|
|
M --> |publish assignments| AC
|
|
M --> |hSet/hGet state| WC
|
|
M --> |hSet/hGet assignments| WA
|
|
M --> |hSet/hGet subscriptions| CS
|
|
|
|
S1 --> |publish commands| MC
|
|
S2 --> |publish commands| MC
|
|
S1 --> |hSet registration| WS
|
|
S2 --> |hSet registration| WS
|
|
|
|
SC1 --> |subscribe| S1
|
|
SC2 --> |subscribe| S2
|
|
MC --> |subscribe| M
|
|
AC --> |subscribe all| S1
|
|
AC --> |subscribe all| S2
|
|
```
|
|
|
|
## Method Call Flow Analysis
|
|
|
|
### Camera Subscription Flow (External Request → Worker)
|
|
|
|
```mermaid
|
|
sequenceDiagram
|
|
participant API as API Routes
|
|
participant CS as CameraService
|
|
participant DB as Database
|
|
participant ST as SubscriptionTrigger
|
|
participant R as Redis
|
|
participant MSC as MasterSlaveCluster
|
|
participant CONN as DetectorConnection
|
|
participant W as Python Worker
|
|
|
|
Note over API,W: Pure Declarative Flow
|
|
API->>+CS: POST /api/camera
|
|
CS->>+DB: db.cameraEntity.create({...})
|
|
DB-->>-CS: Camera created
|
|
CS->>+ST: triggerSubscriptionUpdate('camera.created', id)
|
|
ST->>+R: publish(worker:master:commands, {type: 'regenerate_subscriptions', ...})
|
|
|
|
Note over R,MSC: Only Master Processes Commands
|
|
R->>+MSC: Master receives regeneration request
|
|
MSC->>+MSC: regenerateDesiredStateFromDatabase()
|
|
MSC->>+DB: Query all Display+Camera+Playlist combinations
|
|
DB-->>-MSC: Active display configurations
|
|
MSC->>+MSC: Generate fresh desired subscriptions
|
|
MSC->>+R: Store desired state in Redis
|
|
MSC->>+MSC: rebalanceCameraSubscriptions()
|
|
MSC->>+MSC: findBestWorkerForSubscription()
|
|
MSC->>+CONN: subscribeToCamera(subscriptionId, rtspUrl, ...)
|
|
CONN->>+W: WebSocket: {type: "subscribe", payload: {...}}
|
|
W-->>-CONN: WebSocket: {type: "stateReport", ...}
|
|
CONN->>-MSC: handleWorkerOnline(workerUrl)
|
|
MSC->>-R: Update actual subscription state
|
|
|
|
Note over W,CS: Detection Processing (unchanged)
|
|
W->>CONN: Detection results
|
|
CONN->>MSC: Route to assigned slave
|
|
MSC->>CS: Detection callback
|
|
CS-->>-API: Camera added successfully
|
|
```
|
|
|
|
### Detection Processing Flow (Worker → External Callback)
|
|
|
|
```mermaid
|
|
sequenceDiagram
|
|
participant W as Python Worker
|
|
participant CONN as DetectorConnection
|
|
participant MSC as MasterSlaveCluster
|
|
participant R as Redis
|
|
participant DC as DetectorCluster
|
|
participant CS as CameraService
|
|
|
|
Note over W,CS: AI Detection Result Processing
|
|
W->>+CONN: WebSocket: {type: "imageDetection", subscriptionIdentifier, data}
|
|
CONN->>+MSC: handleWorkerMessage(ImageDetectionResponse)
|
|
|
|
Note over MSC: Master finds assigned slave
|
|
MSC->>+MSC: findWorkerForSubscription(subscriptionId)
|
|
MSC->>+R: hGet(worker:assignments, workerUrl)
|
|
MSC->>+R: publish(worker:slave:{slaveId}, {type: 'detection', ...})
|
|
|
|
Note over R: Redis routes to assigned slave
|
|
R-->>+MSC: Slave receives detection message
|
|
MSC->>+MSC: handleDetectionMessage(message)
|
|
MSC->>+DC: emit('detection', detectionData)
|
|
|
|
Note over DC: Process detection and trigger callbacks
|
|
DC->>+DC: handleWorkerDetection(data)
|
|
DC->>+DC: detectionListeners.get(subscriptionId).forEach(callback)
|
|
DC->>+CS: callback(detectionData)
|
|
DC->>+DC: emit('worker:detection_result', {url, cameraId, detections})
|
|
|
|
Note over CS: External service processes detection
|
|
CS->>+CS: processDetection(data)
|
|
CS-->>CS: updateAnalytics(), triggerDecisionTrees()
|
|
```
|
|
|
|
### Master Election and Failover Flow
|
|
|
|
```mermaid
|
|
sequenceDiagram
|
|
participant ME as MasterElection
|
|
participant MSC1 as MasterSlaveCluster (Process 1)
|
|
participant MSC2 as MasterSlaveCluster (Process 2)
|
|
participant R as Redis
|
|
participant W1 as Python Worker 1
|
|
participant W2 as Python Worker 2
|
|
|
|
Note over ME,W2: Master Failover Scenario
|
|
|
|
%% Initial state
|
|
ME->>+MSC1: emit('master-acquired')
|
|
MSC1->>+MSC1: becomeMaster()
|
|
ME->>+MSC2: emit('master-lost')
|
|
MSC2->>+MSC2: becomeSlave()
|
|
|
|
ME->>+R: Automatic slave registration
|
|
MSC1->>+W1: WebSocket connection (Master)
|
|
MSC1->>+W2: WebSocket connection (Master)
|
|
|
|
Note over MSC1: Original master fails
|
|
MSC1--xMSC1: Process crash/network failure
|
|
|
|
%% MasterElection detects failure and triggers new election
|
|
ME->>+ME: Detect failed master, trigger election
|
|
ME->>+MSC2: emit('master-acquired')
|
|
MSC2->>+MSC2: becomeMaster()
|
|
|
|
%% Master recovery process
|
|
MSC2->>+MSC2: connectToAllWorkers()
|
|
MSC2->>+W1: WebSocket reconnection
|
|
MSC2->>+W2: WebSocket reconnection
|
|
|
|
MSC2->>+MSC2: healClusterAssignments()
|
|
MSC2->>+R: hGetAll(worker:camera_subscriptions)
|
|
MSC2->>+MSC2: rebalanceCameraSubscriptions()
|
|
|
|
%% Restore subscriptions
|
|
MSC2->>+W1: WebSocket: SUBSCRIBE commands
|
|
MSC2->>+W2: WebSocket: SUBSCRIBE commands
|
|
|
|
Note over MSC2,W2: New master operational - slave registration handled by MasterElection
|
|
```
|
|
|
|
## System Architecture Diagrams
|
|
|
|
### Master-Slave Cluster Architecture
|
|
|
|
```mermaid
|
|
graph TB
|
|
subgraph "Backend Process Cluster"
|
|
M[Master Process<br/>NodeJS Backend]
|
|
S1[Slave Process 1<br/>NodeJS Backend]
|
|
S2[Slave Process 2<br/>NodeJS Backend]
|
|
S3[Slave Process N<br/>NodeJS Backend]
|
|
end
|
|
|
|
subgraph "Python Worker Cluster"
|
|
W1[Python ML Worker 1<br/>WebSocket Server]
|
|
W2[Python ML Worker 2<br/>WebSocket Server]
|
|
W3[Python ML Worker N<br/>WebSocket Server]
|
|
end
|
|
|
|
subgraph "Redis Coordination Layer"
|
|
R[(Redis)]
|
|
R --- C1[worker:slave:* channels]
|
|
R --- C2[worker:connections state]
|
|
R --- C3[worker:assignments mapping]
|
|
R --- C4[worker:camera_subscriptions]
|
|
end
|
|
|
|
M ===|WebSocket Connections<br/>Only Master| W1
|
|
M ===|WebSocket Connections<br/>Only Master| W2
|
|
M ===|WebSocket Connections<br/>Only Master| W3
|
|
|
|
M <-->|Pub/Sub Messages| R
|
|
S1 <-->|Pub/Sub Messages| R
|
|
S2 <-->|Pub/Sub Messages| R
|
|
S3 <-->|Pub/Sub Messages| R
|
|
|
|
M -.->|Route Messages| S1
|
|
M -.->|Route Messages| S2
|
|
M -.->|Route Messages| S3
|
|
```
|
|
|
|
### Data Flow Architecture
|
|
|
|
```mermaid
|
|
sequenceDiagram
|
|
participant CS as CameraService
|
|
participant DC as DetectorCluster
|
|
participant MS as MasterSlaveCluster
|
|
participant R as Redis
|
|
participant W as Python Worker
|
|
participant S as Slave Process
|
|
|
|
Note over CS,S: Camera Subscription Flow
|
|
|
|
CS->>DC: subscribeToCamera(cameraId, rtspUrl, modelUrl, ...)
|
|
DC->>MS: storeCameraSubscription({...})
|
|
|
|
alt Master Process
|
|
MS->>MS: findBestWorkerForSubscription()
|
|
MS->>R: hSet(camera_subscriptions, subscriptionId, {...})
|
|
MS->>W: WebSocket: SUBSCRIBE command
|
|
W->>MS: STATE_REPORT (subscription confirmed)
|
|
MS->>R: publish(worker:slave:{slaveId}, detection_message)
|
|
else Slave Process
|
|
MS->>R: publish(worker:master:commands, subscribe_command)
|
|
Note over MS: Routes to master for execution
|
|
end
|
|
|
|
Note over CS,S: Detection Processing Flow
|
|
|
|
W->>MS: WebSocket: IMAGE_DETECTION response
|
|
MS->>MS: findSlaveForWorker(workerUrl)
|
|
MS->>R: publish(worker:slave:{slaveId}, detection_data)
|
|
R->>S: Redis pub/sub delivery
|
|
S->>DC: emit('detection', detectionData)
|
|
DC->>CS: callback(detectionData)
|
|
```
|
|
|
|
### Subscription Lifecycle Management
|
|
|
|
```mermaid
|
|
stateDiagram-v2
|
|
[*] --> Pending: Camera Subscription Request
|
|
|
|
Pending --> Active: Worker accepts subscription
|
|
Pending --> Failed: Worker rejects/unavailable
|
|
Pending --> Recovering: Assignment change needed
|
|
|
|
Active --> Recovering: Worker goes offline
|
|
Active --> [*]: Unsubscribe request
|
|
|
|
Recovering --> Active: Reassigned to online worker
|
|
Recovering --> Failed: No workers available
|
|
Recovering --> [*]: Subscription expired
|
|
|
|
Failed --> Recovering: Worker becomes available
|
|
Failed --> [*]: Max retries exceeded
|
|
|
|
note right of Recovering
|
|
Automatic rebalancing every 30s
|
|
Master detects offline workers
|
|
Reassigns to healthy workers
|
|
end note
|
|
```
|
|
|
|
### Worker Connection State Machine
|
|
|
|
```mermaid
|
|
stateDiagram-v2
|
|
[*] --> Connecting: initialize()
|
|
|
|
Connecting --> Online: WebSocket connected + STATE_REPORT received
|
|
Connecting --> Reconnecting: Connection failed
|
|
|
|
Online --> Offline: Heartbeat timeout (10s)
|
|
Online --> Reconnecting: WebSocket error/close
|
|
Online --> [*]: close() called
|
|
|
|
Offline --> Reconnecting: Scheduled reconnect (10s)
|
|
Offline --> [*]: close() called
|
|
|
|
Reconnecting --> Online: Reconnection successful
|
|
Reconnecting --> Reconnecting: Reconnection failed (retry)
|
|
Reconnecting --> [*]: close() called
|
|
|
|
note right of Online
|
|
- Sends heartbeat every 2s
|
|
- Processes subscriptions
|
|
- Reports resource usage
|
|
- Handles detection results
|
|
end note
|
|
```
|
|
|
|
### Redis Channel Communication Flow
|
|
|
|
```mermaid
|
|
graph LR
|
|
subgraph "Master Process"
|
|
M[Master]
|
|
WS1[WebSocket to Worker 1]
|
|
WS2[WebSocket to Worker 2]
|
|
end
|
|
|
|
subgraph "Slave Processes"
|
|
S1[Slave 1]
|
|
S2[Slave 2]
|
|
end
|
|
|
|
subgraph "Redis Channels"
|
|
CH1[worker:slave:slave1]
|
|
CH2[worker:slave:slave2]
|
|
CH3[worker:messages:upstream]
|
|
CH4[worker:assignments:changed]
|
|
end
|
|
|
|
WS1 -->|Detection Data| M
|
|
WS2 -->|Detection Data| M
|
|
|
|
M -->|Route by Assignment| CH1
|
|
M -->|Route by Assignment| CH2
|
|
|
|
CH1 -->|Subscribed| S1
|
|
CH2 -->|Subscribed| S2
|
|
|
|
S1 -->|Commands/Responses| CH3
|
|
S2 -->|Commands/Responses| CH3
|
|
CH3 -->|Subscribed| M
|
|
|
|
M -->|Assignment Updates| CH4
|
|
CH4 -->|Subscribed| S1
|
|
CH4 -->|Subscribed| S2
|
|
```
|
|
|
|
### Detailed Message Flow by Channel
|
|
|
|
```mermaid
|
|
graph TB
|
|
subgraph "Python ML Workers"
|
|
W1[Worker 1<br/>ws://worker1:8000]
|
|
W2[Worker 2<br/>ws://worker2:8000]
|
|
W3[Worker N<br/>ws://workerN:8000]
|
|
end
|
|
|
|
subgraph "Master Process (Only One)"
|
|
M[Master Backend Process]
|
|
subgraph "Master Managed Data"
|
|
WC1[WebSocket Connection Pool]
|
|
AS[Assignment State]
|
|
SUB[Subscription Manager]
|
|
end
|
|
end
|
|
|
|
subgraph "Redis Channels & Storage"
|
|
subgraph "Individual Slave Channels"
|
|
SC1["worker:slave:slave-uuid-1"]
|
|
SC2["worker:slave:slave-uuid-2"]
|
|
SC3["worker:slave:slave-uuid-N"]
|
|
end
|
|
|
|
subgraph "Master Coordination Channels"
|
|
MC["worker:master:commands"]
|
|
ACH["worker:assignments:changed"]
|
|
UPC["worker:messages:upstream"]
|
|
SEC["worker:subscription:events"]
|
|
end
|
|
|
|
subgraph "Persistent Storage"
|
|
WCS["worker:connections<br/>(Worker Health States)"]
|
|
WAS["worker:assignments<br/>(Worker→Slave Mapping)"]
|
|
WSS["worker:slaves<br/>(Slave Registration)"]
|
|
CSS["worker:camera_subscriptions<br/>(Subscription Persistence)"]
|
|
end
|
|
end
|
|
|
|
subgraph "Slave Processes"
|
|
S1[Slave Process 1<br/>slave-uuid-1]
|
|
S2[Slave Process 2<br/>slave-uuid-2]
|
|
S3[Slave Process N<br/>slave-uuid-N]
|
|
end
|
|
|
|
%% WebSocket Communications (Master Only)
|
|
W1 -.->|"WebSocket Messages:<br/>• IMAGE_DETECTION<br/>• STATE_REPORT<br/>• PATCH_SESSION"| WC1
|
|
W2 -.->|"WebSocket Messages:<br/>• IMAGE_DETECTION<br/>• STATE_REPORT<br/>• PATCH_SESSION"| WC1
|
|
W3 -.->|"WebSocket Messages:<br/>• IMAGE_DETECTION<br/>• STATE_REPORT<br/>• PATCH_SESSION"| WC1
|
|
|
|
WC1 -.->|"WebSocket Commands:<br/>• SUBSCRIBE<br/>• UNSUBSCRIBE<br/>• REQUEST_STATE<br/>• SET_SESSION_ID"| W1
|
|
WC1 -.->|"WebSocket Commands:<br/>• SUBSCRIBE<br/>• UNSUBSCRIBE<br/>• REQUEST_STATE<br/>• SET_SESSION_ID"| W2
|
|
WC1 -.->|"WebSocket Commands:<br/>• SUBSCRIBE<br/>• UNSUBSCRIBE<br/>• REQUEST_STATE<br/>• SET_SESSION_ID"| W3
|
|
|
|
%% Master Redis Operations
|
|
M -->|"hSet() operations:<br/>• Worker states<br/>• Assignments<br/>• Subscriptions"| WCS
|
|
M -->|"hSet() operations:<br/>• Worker→Slave mapping<br/>• Load balancing data"| WAS
|
|
M -->|"hSet() operations:<br/>• Subscription details<br/>• Assignment tracking"| CSS
|
|
|
|
%% Master to Slave Routing
|
|
M -->|"Detection Routing:<br/>{type: 'detection',<br/>workerUrl: string,<br/>data: ImageDetectionResponse,<br/>timestamp: string}"| SC1
|
|
M -->|"Detection Routing:<br/>{type: 'detection',<br/>workerUrl: string,<br/>data: ImageDetectionResponse,<br/>timestamp: string}"| SC2
|
|
M -->|"Detection Routing:<br/>{type: 'detection',<br/>workerUrl: string,<br/>data: ImageDetectionResponse,<br/>timestamp: string}"| SC3
|
|
|
|
M -->|"Assignment Updates:<br/>{type: 'assignments_updated',<br/>assignments: Record<string,string>,<br/>timestamp: string}"| ACH
|
|
|
|
%% Slave to Master Communication
|
|
S1 -->|"Slave Commands:<br/>{type: 'subscribe_camera',<br/>subscriptionIdentifier: string,<br/>rtspUrl: string,<br/>modelUrl: string,<br/>modelId: number,<br/>snapshotUrl?: string,<br/>cropX1?: number, ...}"| MC
|
|
S2 -->|"Slave Commands:<br/>{type: 'subscribe_camera',<br/>subscriptionIdentifier: string,<br/>rtspUrl: string,<br/>modelUrl: string,<br/>modelId: number,<br/>snapshotUrl?: string,<br/>cropX1?: number, ...}"| MC
|
|
S3 -->|"Slave Commands:<br/>{type: 'subscribe_camera',<br/>subscriptionIdentifier: string,<br/>rtspUrl: string,<br/>modelUrl: string,<br/>modelId: number,<br/>snapshotUrl?: string,<br/>cropX1?: number, ...}"| MC
|
|
|
|
%% Slave Registration and Heartbeats
|
|
S1 -->|"hSet() Slave Registration:<br/>{slaveId: string,<br/>processId: string,<br/>online: boolean,<br/>workload: number,<br/>lastSeen: string,<br/>capabilities: {...}}"| WSS
|
|
S2 -->|"hSet() Slave Registration:<br/>{slaveId: string,<br/>processId: string,<br/>online: boolean,<br/>workload: number,<br/>lastSeen: string,<br/>capabilities: {...}}"| WSS
|
|
S3 -->|"hSet() Slave Registration:<br/>{slaveId: string,<br/>processId: string,<br/>online: boolean,<br/>workload: number,<br/>lastSeen: string,<br/>capabilities: {...}}"| WSS
|
|
|
|
%% Channel Subscriptions
|
|
SC1 -->|"Subscribed"| S1
|
|
SC2 -->|"Subscribed"| S2
|
|
SC3 -->|"Subscribed"| S3
|
|
|
|
MC -->|"Subscribed"| M
|
|
ACH -->|"Subscribed (All Slaves)"| S1
|
|
ACH -->|"Subscribed (All Slaves)"| S2
|
|
ACH -->|"Subscribed (All Slaves)"| S3
|
|
|
|
style M fill:#ff9999
|
|
style WC1 fill:#ffcc99
|
|
style AS fill:#ffcc99
|
|
style SUB fill:#ffcc99
|
|
style S1 fill:#99ccff
|
|
style S2 fill:#99ccff
|
|
style S3 fill:#99ccff
|
|
```
|
|
|
|
### Channel Message Specification
|
|
|
|
| Channel Name | Direction | Message Type | Sender | Receiver | Payload Structure | Purpose |
|
|
|--------------|-----------|--------------|---------|-----------|-------------------|---------|
|
|
| `worker:slave:{slaveId}` | Master→Slave | `detection` | Master Process | Assigned Slave | `{type: 'detection', workerUrl: string, data: ImageDetectionResponse, timestamp: string}` | Route AI detection results from workers to processing slaves |
|
|
| `worker:master:commands` | Slave→Master | `regenerate_subscriptions` | Any Process | Master Process | `{type: 'regenerate_subscriptions', reason: string, triggeredBy: string, timestamp: string}` | Notify master that database changed and subscriptions need regeneration |
|
|
| `worker:assignments:changed` | Master→All Slaves | `assignments_updated` | Master Process | All Slave Processes | `{type: 'assignments_updated', assignments: Record<string,string>, timestamp: string}` | Broadcast worker-to-slave assignment changes for rebalancing |
|
|
| `worker:messages:upstream` | Slave→Master | Various | Any Slave Process | Master Process | `{type: string, slaveId: string, data: any, timestamp: string}` | General slave-to-master communication (currently unused) |
|
|
|
|
### Redis Hash Storage Specification
|
|
|
|
| Redis Key | Data Type | Content | Update Pattern | Cleanup Strategy |
|
|
|-----------|-----------|---------|----------------|-------------------|
|
|
| `worker:connections` | Hash Map | `{[workerUrl]: JSON.stringify(WorkerConnectionState)}` | Master updates every 2s | Manual cleanup only |
|
|
| `worker:assignments` | Hash Map | `{[workerUrl]: slaveId}` | Master updates on rebalancing | Manual cleanup only |
|
|
| `worker:camera_subscriptions` | Hash Map | `{[subscriptionId]: JSON.stringify(CameraSubscription)}` | Master on subscription changes | Manual cleanup only |
|
|
| `master-election:slaves` | Hash Map | `{[nodeId]: JSON.stringify(SlaveNode)}` | MasterElection service manages | TTL-based cleanup |
|
|
|
|
### WebSocket Message Protocol
|
|
|
|
| Direction | Message Type | JSON Structure | Trigger | Response Expected |
|
|
|-----------|--------------|----------------|---------|-------------------|
|
|
| Backend→Worker | `SUBSCRIBE` | `{type: "subscribe", payload: {subscriptionIdentifier, rtspUrl, snapshotUrl?, snapshotInterval?, modelUrl, modelName, modelId, cropX1?, cropY1?, cropX2?, cropY2?}}` | Camera subscription request | STATE_REPORT confirmation |
|
|
| Backend→Worker | `UNSUBSCRIBE` | `{type: "unsubscribe", payload: {subscriptionIdentifier}}` | Camera unsubscription | STATE_REPORT confirmation |
|
|
| Backend→Worker | `REQUEST_STATE` | `{type: "requestState"}` | Health check or monitoring | STATE_REPORT response |
|
|
| Backend→Worker | `SET_SESSION_ID` | `{type: "setSessionId", payload: {displayIdentifier, sessionId}}` | Associate session with display | None |
|
|
| Backend→Worker | `PATCH_SESSION_RESULT` | `{type: "patchSessionResult", payload: {sessionId, success, message?}}` | Session update response | None |
|
|
| Worker→Backend | `IMAGE_DETECTION` | `{type: "imageDetection", subscriptionIdentifier, timestamp, data: {detection: {carModel?, carBrand?, carYear?, bodyType?, licensePlateText?, licensePlateType?}, modelId, modelName}}` | AI detection result | None |
|
|
| Worker→Backend | `STATE_REPORT` | `{type: "stateReport", cpuUsage, memoryUsage, gpuUsage?, gpuMemoryUsage?, cameraConnections: [{subscriptionIdentifier, modelId, modelName, online, cropX?, cropY?}]}` | Periodic health report (every 2s) | None |
|
|
| Worker→Backend | `PATCH_SESSION` | `{type: "patchSession", sessionId, data: any}` | Session data update from ML processing | PATCH_SESSION_RESULT |
|
|
|
|
## Event System Architecture
|
|
|
|
### Event Flow Hierarchy
|
|
|
|
```mermaid
|
|
graph TD
|
|
subgraph "Service Layer"
|
|
CS[CameraService]
|
|
end
|
|
|
|
subgraph "Cluster Layer"
|
|
DC[DetectorCluster]
|
|
DC --> DCE[Detection Events]
|
|
DC --> WOE[Worker Online Events]
|
|
DC --> WOFE[Worker Offline Events]
|
|
end
|
|
|
|
subgraph "Worker Management Layer"
|
|
MS[MasterSlaveWorkerCluster]
|
|
MS --> DE[detection]
|
|
MS --> WC[worker:connected]
|
|
MS --> WD[worker:disconnected]
|
|
MS --> WSE[worker:websocket_error]
|
|
MS --> WON[worker:online]
|
|
MS --> WOFF[worker:offline]
|
|
MS --> WSR[worker:state_report]
|
|
end
|
|
|
|
subgraph "Connection Layer"
|
|
DConn[DetectorConnection]
|
|
DConn --> IMG[IMAGE_DETECTION]
|
|
DConn --> STATE[STATE_REPORT]
|
|
DConn --> PATCH[PATCH_SESSION]
|
|
end
|
|
|
|
DConn --> MS
|
|
MS --> DC
|
|
DC --> CS
|
|
|
|
IMG -.-> DE
|
|
STATE -.-> WSR
|
|
WC -.-> WOE
|
|
WD -.-> WOFE
|
|
```
|
|
|
|
### Message Types and Routing
|
|
|
|
#### WebSocket Message Types (Python Worker → Backend)
|
|
- `IMAGE_DETECTION`: AI detection results from camera streams
|
|
- `STATE_REPORT`: Worker health, resource usage, and subscription status
|
|
- `PATCH_SESSION`: Session data updates from worker processing
|
|
|
|
#### Redis Channel Message Types
|
|
- `detection`: Detection results routed from master to assigned slave
|
|
- `command_response`: Command acknowledgment and status updates
|
|
- `heartbeat`: Worker and slave health monitoring messages
|
|
- `assignments_updated`: Worker-to-slave assignment change notifications
|
|
|
|
#### Internal Event Types
|
|
- `worker:online`: Worker connection established and ready
|
|
- `worker:offline`: Worker connection lost or health check failed
|
|
- `worker:connected`: WebSocket connection opened (not necessarily ready)
|
|
- `worker:disconnected`: WebSocket connection closed
|
|
- `worker:websocket_error`: WebSocket communication errors
|
|
- `worker:detection_result`: Processed detection with metadata
|
|
- `worker:state_report`: Worker resource and subscription status
|
|
|
|
## Subscription Management
|
|
|
|
### Camera Subscription Flow
|
|
|
|
1. **Registration Phase**
|
|
- `CameraService.subscribeToCamera()` → `DetectorCluster.subscribeToCamera()`
|
|
- Master process finds optimal worker using load balancing algorithm
|
|
- Subscription stored in Redis with full configuration including crop parameters
|
|
- Master sends WebSocket SUBSCRIBE command to assigned worker
|
|
|
|
2. **Processing Phase**
|
|
- Python worker establishes RTSP connection to camera
|
|
- Worker performs AI inference on video stream frames
|
|
- Detection results sent back via WebSocket with subscription identifier
|
|
- Master routes results to appropriate slave based on worker assignments
|
|
|
|
3. **Rebalancing Phase**
|
|
- Master monitors worker health every 30 seconds
|
|
- Orphaned subscriptions (offline workers) automatically detected
|
|
- Load balancing algorithm reassigns cameras to healthy workers
|
|
- Fresh model URLs generated to handle S3 presigned URL expiration
|
|
|
|
### Load Balancing Algorithm
|
|
|
|
```typescript
|
|
// Simplified load balancing logic
|
|
function findBestWorkerForSubscription(onlineWorkers, allSubscriptions) {
|
|
return onlineWorkers
|
|
.sort((a, b) => {
|
|
const loadA = getSubscriptionCount(a.url);
|
|
const loadB = getSubscriptionCount(b.url);
|
|
if (loadA !== loadB) {
|
|
return loadA - loadB; // Prefer lower load
|
|
}
|
|
return (a.cpuUsage || 0) - (b.cpuUsage || 0); // Then prefer lower CPU
|
|
})[0];
|
|
}
|
|
```
|
|
|
|
### Automatic Failover Process
|
|
|
|
1. **Detection**: Master detects worker offline via missed heartbeats (10s timeout)
|
|
2. **Identification**: System identifies all camera subscriptions assigned to offline worker
|
|
3. **Reassignment**: Load balancer selects optimal replacement worker
|
|
4. **Migration**: Subscription updated in Redis with new worker assignment
|
|
5. **Resubscription**: Master sends SUBSCRIBE command to new worker with fresh model URL
|
|
6. **Verification**: New worker confirms subscription and begins processing
|
|
|
|
## Resource Management
|
|
|
|
### Connection Pooling
|
|
- Master maintains persistent WebSocket connections to all configured workers
|
|
- Connection sharing across all backend processes reduces resource overhead
|
|
- Automatic reconnection with exponential backoff prevents connection storms
|
|
|
|
### Memory Management
|
|
- Redis data uses manual cleanup to prevent accidental state loss
|
|
- Subscription callbacks stored in local memory with automatic cleanup on unsubscribe
|
|
- Worker resource usage tracked in real-time to prevent overload
|
|
|
|
### CPU and GPU Monitoring
|
|
- Workers report resource usage every 2 seconds via STATE_REPORT messages
|
|
- Load balancing algorithm considers CPU usage when assigning new subscriptions
|
|
- GPU utilization tracked for ML model optimization and capacity planning
|
|
|
|
## Error Handling
|
|
|
|
### Connection Error Recovery
|
|
- **Exponential Backoff**: 10-second fixed interval reconnection attempts
|
|
- **Circuit Breaker**: Automatic failover prevents overwhelming failed workers
|
|
- **Graceful Degradation**: System continues operating with available workers
|
|
|
|
### Master Election Failover
|
|
- **Leadership Transfer**: New master elected via Redis-based coordination
|
|
- **State Recovery**: Worker connections and subscriptions restored from Redis persistence
|
|
- **Seamless Transition**: No subscription loss during master failover process
|
|
|
|
### Monitoring and Observability
|
|
|
|
#### Structured Logging Topics
|
|
- `detector-cluster`: High-level cluster operations and state changes
|
|
- `master-slave-worker-cluster`: Worker assignment and rebalancing operations
|
|
- `DetectorConnection`: WebSocket connection events and message processing
|
|
|
|
#### Monitoring Information
|
|
- Subscription identifier format: `${displayId};${cameraId}` for traceability
|
|
- Worker assignment tracking with process ID and timestamp correlation
|
|
- Redis pub/sub message routing with structured logging
|
|
- Heartbeat and health check timing with millisecond precision
|
|
|
|
## Configuration Parameters
|
|
|
|
### Timing Configuration
|
|
```typescript
|
|
const WORKER_TIMEOUT_MS = 10000; // Worker heartbeat timeout
|
|
const SLAVE_HEARTBEAT_INTERVAL = 5000; // Slave heartbeat frequency
|
|
const SLAVE_TIMEOUT = 15000; // Slave registration timeout
|
|
const REBALANCE_INTERVAL = 30000; // Automatic rebalancing frequency
|
|
const STATE_UPDATE_INTERVAL = 2000; // Worker state update frequency
|
|
const RECONNECT_DELAY = 10000; // WebSocket reconnection delay
|
|
```
|
|
|
|
### Environment Variables
|
|
```bash
|
|
DETECTOR_WORKERS=ws://worker1:8000,ws://worker2:8000 # Python worker URLs
|
|
REDIS_HOST=localhost # Redis coordination server
|
|
REDIS_PORT=6379 # Redis server port
|
|
REDIS_PASSWORD=secure_password # Redis authentication
|
|
DETECT_DEBUG=true # Enable detailed structured logging
|
|
```
|
|
|
|
## Performance Characteristics
|
|
|
|
### Scalability Metrics
|
|
- **Horizontal Scaling**: Add backend processes without WebSocket connection changes
|
|
- **Worker Scaling**: Python ML workers scale independently of backend processes
|
|
- **Redis Optimization**: Efficient pub/sub routing with minimal memory overhead
|
|
|
|
### Throughput Capabilities
|
|
- **Camera Subscriptions**: Support for 100+ simultaneous camera streams per worker
|
|
- **Detection Processing**: Sub-second AI inference with real-time result delivery
|
|
- **Message Routing**: Sub-millisecond Redis pub/sub message delivery
|
|
|
|
### Resource Efficiency
|
|
- **Connection Multiplexing**: Single WebSocket per worker shared across all processes
|
|
- **Memory Usage**: Lightweight subscription state with callback cleanup
|
|
- **Network Optimization**: Binary WebSocket frames with JSON payload compression
|
|
|
|
## Public Interface Specification
|
|
|
|
The distributed worker cluster exposes a clean, simplified interface to external services like CameraService, hiding the complexity of the underlying master-slave architecture. All interactions go through the `DetectorCluster` class, which serves as the primary facade.
|
|
|
|
### Primary Interface: DetectorCluster
|
|
|
|
The `DetectorCluster` class in `/services/DetectorCluster.ts` provides the main public interface that external services interact with. It abstracts away the distributed architecture complexity and provides consistent behavior regardless of whether the current process is a master or slave.
|
|
|
|
#### Core Interface Methods
|
|
|
|
##### Camera Subscription Management
|
|
|
|
```typescript
|
|
/**
|
|
* Subscribe to a camera stream for AI detection processing
|
|
* @param subscriptionIdentifier - Unique identifier format: "${displayId};${cameraId}"
|
|
* @param rtspUrl - RTSP stream URL for the camera
|
|
* @param modelUrl - Pre-signed S3 URL for AI model (1hr TTL)
|
|
* @param modelId - Database ID of the AI model
|
|
* @param modelName - Human-readable model identifier
|
|
* @param callback - Function called when detection results are received
|
|
* @param snapshotUrl - Optional HTTP endpoint for camera snapshots
|
|
* @param snapshotInterval - Optional snapshot capture interval in milliseconds
|
|
* @param cropX1, cropY1, cropX2, cropY2 - Optional image crop coordinates
|
|
* @returns Promise<boolean> - Always returns true (errors thrown as exceptions)
|
|
*/
|
|
public async subscribeToCamera(
|
|
subscriptionIdentifier: string,
|
|
rtspUrl: string,
|
|
modelUrl: string,
|
|
modelId: number,
|
|
modelName: string,
|
|
callback: Function,
|
|
snapshotUrl?: string,
|
|
snapshotInterval?: number,
|
|
cropX1?: number,
|
|
cropY1?: number,
|
|
cropX2?: number,
|
|
cropY2?: number
|
|
): Promise<boolean>
|
|
```
|
|
|
|
**Behavior:**
|
|
- **Master Process**: Stores subscription in Redis, assigns to optimal worker, sends WebSocket command
|
|
- **Slave Process**: Routes subscription request to master via Redis pub/sub
|
|
- **Callback Registration**: Stores callback locally for detection result processing
|
|
- **Persistence**: All subscription details stored in Redis for failover recovery
|
|
- **Load Balancing**: Automatically selects best available worker based on CPU and subscription load
|
|
|
|
```typescript
|
|
/**
|
|
* Unsubscribe from a specific camera stream
|
|
* @param subscriptionIdentifier - The subscription to remove
|
|
* @returns Promise<boolean> - Success status
|
|
*/
|
|
public async unsubscribeFromCamera(subscriptionIdentifier: string): Promise<boolean>
|
|
```
|
|
|
|
**Behavior:**
|
|
- Removes local callback listeners immediately
|
|
- Subscription cleanup handled automatically by cluster rebalancing
|
|
- Safe to call multiple times (idempotent operation)
|
|
|
|
```typescript
|
|
/**
|
|
* Remove all subscriptions for a specific camera across all displays
|
|
* @param cameraIdentifier - The camera ID to unsubscribe from all displays
|
|
* @returns Promise<void>
|
|
*/
|
|
public async unsubscribeFromAllWithCameraID(cameraIdentifier: string): Promise<void>
|
|
```
|
|
|
|
**Behavior:**
|
|
- Finds all subscription identifiers matching pattern `*;${cameraIdentifier}`
|
|
- Removes all local callbacks for matched subscriptions
|
|
- Cluster automatically handles worker-side cleanup
|
|
|
|
##### Event Registration and Callbacks
|
|
|
|
```typescript
|
|
/**
|
|
* Register a callback for detection results from a specific subscription
|
|
* @param subscriptionIdentifier - Target subscription
|
|
* @param callback - Function to call with detection data
|
|
*/
|
|
public addDetectionListener(subscriptionIdentifier: string, callback: Function): void
|
|
|
|
/**
|
|
* Register a global callback for all detection results
|
|
* @param callback - Function to call with any detection data
|
|
*/
|
|
public addGlobalDetectionListener(callback: Function): void
|
|
```
|
|
|
|
**Detection Callback Signature:**
|
|
```typescript
|
|
type DetectionCallback = (data: {
|
|
subscriptionIdentifier: string;
|
|
timestamp: Date;
|
|
data: {
|
|
detection: {
|
|
carModel?: string;
|
|
carBrand?: string;
|
|
carYear?: number;
|
|
bodyType?: string;
|
|
licensePlateText?: string;
|
|
licensePlateType?: string;
|
|
};
|
|
modelId: number;
|
|
modelName: string;
|
|
};
|
|
}) => void;
|
|
```
|
|
|
|
##### Cluster State Management
|
|
|
|
```typescript
|
|
/**
|
|
* Get comprehensive cluster state for monitoring and status reporting
|
|
* @returns Promise<DetectorClusterState>
|
|
*/
|
|
public async getState(): Promise<DetectorClusterState>
|
|
|
|
/**
|
|
* Legacy method - rebalancing now happens automatically
|
|
* @returns Promise<boolean> - Always returns true
|
|
*/
|
|
public async rebalanceWorkers(): Promise<boolean>
|
|
```
|
|
|
|
**DetectorClusterState Interface:**
|
|
```typescript
|
|
interface DetectorClusterState {
|
|
processId: string; // Current process identifier
|
|
isMaster: boolean; // Whether this process is the master
|
|
slaveId: string; // This process's slave identifier
|
|
totalWorkers: number; // Number of Python ML workers
|
|
totalSlaves: number; // Number of backend slave processes
|
|
workers: WorkerState[]; // Detailed worker health and status
|
|
slaves: SlaveInfo[]; // Slave process information
|
|
assignments: Record<string, string>; // workerUrl -> slaveId mapping
|
|
}
|
|
```
|
|
|
|
##### Session Management (Future Implementation)
|
|
|
|
```typescript
|
|
/**
|
|
* Associate a session ID with a camera subscription for tracking
|
|
* @param subscriptionIdentifier - Target subscription
|
|
* @param sessionId - Session ID to associate (null to clear)
|
|
* @returns Promise<boolean> - Success status
|
|
*/
|
|
public async setSessionId(subscriptionIdentifier: string, sessionId: number | null): Promise<boolean>
|
|
|
|
/**
|
|
* Get current camera image via worker REST API
|
|
* @param cameraIdentifier - Camera to capture from
|
|
* @returns Promise<Buffer> - JPEG image data
|
|
*/
|
|
public async getCameraImage(cameraIdentifier: string): Promise<Buffer>
|
|
```
|
|
|
|
**Note:** These methods are currently not fully implemented in master-slave mode.
|
|
|
|
### Event System Interface
|
|
|
|
The cluster emits events that external services can listen to for system monitoring and integration:
|
|
|
|
#### Emitted Events
|
|
|
|
```typescript
|
|
// Detection result processed
|
|
detectorCluster.on('worker:detection_result', (event: {
|
|
url: string; // Worker URL (always 'cluster-managed')
|
|
cameraId: string; // Subscription identifier
|
|
detections: number; // Number of objects detected (0 or 1)
|
|
}) => void);
|
|
|
|
// Worker status changes
|
|
detectorCluster.on('worker:online', (event: { url: string }) => void);
|
|
detectorCluster.on('worker:offline', (event: { url: string }) => void);
|
|
|
|
// Connection events
|
|
detectorCluster.on('worker:connecting', (event: { url: string }) => void);
|
|
detectorCluster.on('worker:disconnected', (event: { url: string, reason: string }) => void);
|
|
detectorCluster.on('worker:websocket_error', (event: { url: string, error: string }) => void);
|
|
```
|
|
|
|
### Usage Examples
|
|
|
|
#### Basic Camera Subscription (CameraService Integration)
|
|
|
|
```typescript
|
|
import { detectorCluster } from '~/modules/camera/services/CameraService';
|
|
|
|
// Subscribe to camera with AI detection
|
|
const success = await detectorCluster.subscribeToCamera(
|
|
`display-123;camera-456`, // subscriptionIdentifier
|
|
'rtsp://192.168.1.100:554/stream1', // rtspUrl
|
|
'https://s3.bucket.com/model.onnx', // modelUrl (pre-signed)
|
|
42, // modelId
|
|
'vehicle-detection-v2', // modelName
|
|
(detectionData) => { // callback
|
|
console.log('Detection:', detectionData.data.detection);
|
|
// Process car model, license plate, etc.
|
|
},
|
|
'http://192.168.1.100/snapshot.jpg', // snapshotUrl (optional)
|
|
5000, // snapshotInterval (optional)
|
|
100, 50, 800, 600 // crop coordinates (optional)
|
|
);
|
|
```
|
|
|
|
#### Event Monitoring Integration
|
|
|
|
```typescript
|
|
// Monitor worker health
|
|
detectorCluster.on('worker:online', (event) => {
|
|
console.log(`Worker ${event.url} came online`);
|
|
// Update dashboard, send notifications, etc.
|
|
});
|
|
|
|
detectorCluster.on('worker:offline', (event) => {
|
|
console.log(`Worker ${event.url} went offline`);
|
|
// Alert administrators, trigger failover procedures
|
|
});
|
|
|
|
// Monitor detection activity
|
|
detectorCluster.on('worker:detection_result', (event) => {
|
|
if (event.detections > 0) {
|
|
console.log(`Camera ${event.cameraId} detected objects`);
|
|
// Trigger content changes, log analytics, etc.
|
|
}
|
|
});
|
|
```
|
|
|
|
#### Cluster State Monitoring
|
|
|
|
```typescript
|
|
// Get comprehensive cluster status
|
|
const state = await detectorCluster.getState();
|
|
|
|
console.log(`Process ${state.processId} is ${state.isMaster ? 'MASTER' : 'SLAVE'}`);
|
|
console.log(`Cluster: ${state.totalWorkers} workers, ${state.totalSlaves} slaves`);
|
|
|
|
// Monitor worker health
|
|
state.workers.forEach(worker => {
|
|
console.log(`Worker ${worker.url}: ${worker.online ? 'ONLINE' : 'OFFLINE'}`);
|
|
console.log(` CPU: ${worker.cpuUsage}%, Memory: ${worker.memoryUsage}%`);
|
|
console.log(` Subscriptions: ${worker.subscriptionCount}`);
|
|
});
|
|
|
|
// Check assignments
|
|
Object.entries(state.assignments).forEach(([workerUrl, slaveId]) => {
|
|
console.log(`Worker ${workerUrl} assigned to slave ${slaveId}`);
|
|
});
|
|
```
|
|
|
|
#### Bulk Camera Management
|
|
|
|
```typescript
|
|
// Remove all subscriptions for a camera being deleted
|
|
await detectorCluster.unsubscribeFromAllWithCameraID('camera-456');
|
|
|
|
// Re-subscribe camera to all displays after configuration change
|
|
const displays = await getDisplaysForCamera('camera-456');
|
|
for (const display of displays) {
|
|
await detectorCluster.subscribeToCamera(
|
|
`${display.id};camera-456`,
|
|
camera.rtspUrl,
|
|
freshModelUrl,
|
|
modelId,
|
|
modelName,
|
|
createDetectionHandler(display.id, camera.id),
|
|
camera.snapshotUrl,
|
|
camera.snapshotInterval,
|
|
display.cropX1, display.cropY1,
|
|
display.cropX2, display.cropY2
|
|
);
|
|
}
|
|
```
|
|
|
|
### Error Handling Interface
|
|
|
|
The cluster interface follows consistent error handling patterns:
|
|
|
|
#### Exception Types
|
|
|
|
```typescript
|
|
// Subscription errors
|
|
try {
|
|
await detectorCluster.subscribeToCamera(...);
|
|
} catch (error) {
|
|
// Possible errors:
|
|
// - "No workers available for assignment"
|
|
// - "Invalid subscription identifier format"
|
|
// - "Model URL expired or inaccessible"
|
|
// - Redis connection errors
|
|
}
|
|
|
|
// State retrieval errors
|
|
try {
|
|
const state = await detectorCluster.getState();
|
|
} catch (error) {
|
|
// Returns safe default state on errors
|
|
// Logs detailed error information
|
|
}
|
|
```
|
|
|
|
#### Graceful Degradation
|
|
|
|
- **No Workers Available**: Subscriptions stored in Redis, will activate when workers come online
|
|
- **Master Process Failure**: New master elected, all subscriptions restored from Redis
|
|
- **Redis Connection Issues**: Local callbacks continue working, subscriptions restored when connection recovers
|
|
- **Invalid Parameters**: Clear error messages with parameter validation
|
|
|
|
### Integration Patterns
|
|
|
|
#### Service Layer Integration
|
|
|
|
```typescript
|
|
// CameraService.ts example
|
|
export class CameraService {
|
|
constructor() {
|
|
// Initialize cluster connection
|
|
detectorCluster.initialize();
|
|
|
|
// Set up global detection processing
|
|
detectorCluster.addGlobalDetectionListener(this.processDetection.bind(this));
|
|
}
|
|
|
|
async subscribeCamera(displayId: string, camera: CameraEntity) {
|
|
const subscriptionId = `${displayId};${camera.cameraIdentifier}`;
|
|
|
|
return await detectorCluster.subscribeToCamera(
|
|
subscriptionId,
|
|
camera.rtspUrl,
|
|
await this.getModelUrl(camera.modelId),
|
|
camera.modelId,
|
|
camera.modelName,
|
|
(data) => this.handleDetection(displayId, camera.id, data),
|
|
camera.snapshotUrl,
|
|
camera.snapshotInterval,
|
|
camera.cropX1, camera.cropY1,
|
|
camera.cropX2, camera.cropY2
|
|
);
|
|
}
|
|
|
|
private processDetection(data: ImageDetectionResponse) {
|
|
// Global detection processing logic
|
|
this.updateAnalytics(data);
|
|
this.triggerDecisionTrees(data);
|
|
}
|
|
}
|
|
```
|
|
|
|
### Interface Guarantees and Contracts
|
|
|
|
#### Reliability Guarantees
|
|
|
|
- **At-Least-Once Detection Delivery**: Detection callbacks will be called at least once per detection
|
|
- **Subscription Persistence**: Subscriptions survive process restarts and master failovers
|
|
- **Automatic Reconnection**: Workers automatically reconnect with exponential backoff
|
|
- **Load Balancing**: New subscriptions automatically assigned to least loaded workers
|
|
|
|
#### Performance Characteristics
|
|
|
|
- **Subscription Latency**: < 100ms for new camera subscriptions
|
|
- **Detection Latency**: < 50ms from worker to callback (excluding AI processing time)
|
|
- **State Query Performance**: < 10ms for cluster state retrieval
|
|
- **Memory Usage**: O(n) where n = number of active subscriptions
|
|
|
|
#### Thread Safety
|
|
|
|
- **Callback Execution**: All callbacks executed on main event loop (Node.js single-threaded)
|
|
- **Concurrent Subscriptions**: Multiple simultaneous subscriptions handled safely
|
|
- **State Consistency**: Redis operations use atomic transactions where needed
|
|
|
|
This interface specification provides external services with a clear understanding of how to integrate with the distributed worker cluster while maintaining abstraction from the underlying complexity.
|
|
|
|
## Architecture Evolution: From Complex to Pure Declarative
|
|
|
|
### Previous Architecture Limitations (Addressed)
|
|
- **Complex State Synchronization**: Incremental updates between database, Redis desired state, and worker actual state created synchronization complexity
|
|
- **Command Protocol Complexity**: Multiple command types (`subscribe_camera`, `unsubscribe_camera`) with complex payloads and error handling
|
|
- **State Divergence**: Database and Redis desired state could diverge, causing inconsistent behavior
|
|
- **Partial Update Complexity**: Complex logic for handling individual subscription changes led to edge cases and race conditions
|
|
- **Service Layer Complexity**: Camera/Display services contained complex subscription management logic
|
|
|
|
### Current Pure Declarative Architecture Benefits
|
|
- **Single Source of Truth**: Database is the only source for desired state - no secondary state stores to synchronize
|
|
- **Zero State Divergence**: Desired state is always freshly derived from database queries, eliminating synchronization complexity
|
|
- **Simplified Protocol**: Only one command type (`regenerate_subscriptions`) with minimal payload
|
|
- **Consistent State Management**: Complete regeneration eliminates all edge cases and partial update complexity
|
|
- **Service Layer Simplicity**: Services just update database + trigger regeneration - no subscription logic
|
|
- **Operational Resilience**: System is self-healing and predictable - any database change triggers complete reconciliation
|
|
|
|
### VMware DRS-like Benefits
|
|
- **Global Optimization**: Every regeneration considers all subscriptions globally for optimal load balancing
|
|
- **Automatic Recovery**: System automatically heals from any inconsistent state by regenerating from database
|
|
- **Resource Efficiency**: Workers assigned based on real-time CPU/memory metrics with load balancing
|
|
- **Fault Tolerance**: Complete state recovery from database after any failure (process crashes, network interruptions, etc.)
|
|
|
|
### Performance Characteristics
|
|
- **Regeneration Speed**: Database queries are fast (~10ms) even with hundreds of displays
|
|
- **Reconciliation Efficiency**: Only changed subscriptions are actually modified on workers
|
|
- **Memory Efficiency**: No persistent state storage outside of database and current worker assignments
|
|
- **Network Efficiency**: Minimal command protocol reduces Redis pub/sub overhead
|
|
|
|
This pure declarative architecture provides the reliability and simplicity of Kubernetes-style declarative resource management while maintaining the performance and scalability needed for real-time camera processing systems. |