All checks were successful
Build Worker Base and Application Images / check-base-changes (push) Successful in 8s
Build Worker Base and Application Images / build-base (push) Has been skipped
Build Worker Base and Application Images / build-docker (push) Successful in 2m10s
Build Worker Base and Application Images / deploy-stack (push) Successful in 17s
1449 lines
No EOL
46 KiB
Markdown
1449 lines
No EOL
46 KiB
Markdown
# Master Election Service Specification - Distributed Process Coordination
|
|
|
|
## Overview
|
|
|
|
The MasterElection service implements a Redis-based distributed leadership election and process coordination system for the CMS backend cluster. This service provides robust master-slave coordination with automatic failover, process registration, and TTL-based cleanup for multi-process backend deployments.
|
|
|
|
**Key Architectural Principle**: Redis-based coordination with atomic Lua scripts ensures consistency and prevents split-brain scenarios while providing automatic cleanup through per-entry TTL expiration.
|
|
|
|
## Architecture Components
|
|
|
|
### Two-Tier Process Coordination
|
|
|
|
The system manages two distinct coordination layers:
|
|
|
|
1. **Master Election Layer**: Single leader election across all backend processes
|
|
2. **Process Registry Layer**: Individual process registration and heartbeat management
|
|
|
|
### Leadership Election Pattern
|
|
|
|
- **Single Master**: Only one backend process holds master lock at any time
|
|
- **Automatic Failover**: Master election triggers immediately when current master fails
|
|
- **Heartbeat-Based**: Master must renew lock every 10 seconds or lose leadership
|
|
- **Lua Script Atomicity**: All Redis operations use atomic Lua scripts to prevent race conditions
|
|
- **Event-Driven Transitions**: Role changes emit events for dependent services integration
|
|
|
|
## Core Components
|
|
|
|
### MasterElection Class
|
|
`cms-backend/services/MasterElection.ts`
|
|
|
|
Primary coordination service that handles distributed leadership election and process lifecycle management.
|
|
|
|
**Key Responsibilities:**
|
|
- Manages master lock acquisition and renewal using atomic Redis operations
|
|
- Provides process registration with automatic TTL-based expiration (45 seconds)
|
|
- Emits role transition events for dependent service coordination
|
|
- Handles slave registration and heartbeat management
|
|
- Maintains process-to-channel mapping for message routing
|
|
|
|
### Process Management System
|
|
|
|
**Process Registration:**
|
|
- Each backend process registers with unique UUID-based identifier
|
|
- Process metadata includes role, channel name, and capabilities
|
|
- TTL-based expiration (45 seconds) with heartbeat renewal
|
|
- Automatic cleanup of stale process entries without manual intervention
|
|
|
|
**Channel Assignment:**
|
|
- Each process gets assigned a unique Redis pub/sub channel
|
|
- Channel mapping stored persistently for message routing
|
|
- Master process maintains channel-to-process mapping
|
|
|
|
## Data Structures
|
|
|
|
### MasterElectionEvents
|
|
```typescript
|
|
interface MasterElectionEvents {
|
|
'master-acquired': () => void; // This process became master
|
|
'master-lost': () => void; // This process lost master status
|
|
'election-started': () => void; // Election process initiated
|
|
'election-completed': (isMaster: boolean) => void; // Election finished
|
|
'slave-registered': (slave: SlaveNode) => void; // New slave joined
|
|
'slave-removed': (nodeId: string) => void; // Slave left/expired
|
|
'error': (error: Error) => void; // Election/coordination errors
|
|
}
|
|
```
|
|
|
|
### ProcessInfo
|
|
```typescript
|
|
interface ProcessInfo {
|
|
processId: string; // Unique process identifier (UUID)
|
|
nodeId: string; // Node identifier (same as processId)
|
|
role: 'master' | 'slave'; // Current process role
|
|
lastSeen: string; // Last heartbeat timestamp (ISO string)
|
|
capabilities: ProcessCapabilities; // Process feature capabilities
|
|
}
|
|
|
|
// Channel name derived as: `worker:slave:${processInfo.processId}`
|
|
```
|
|
|
|
### ProcessCapabilities
|
|
```typescript
|
|
interface ProcessCapabilities {
|
|
canProcessDetections: boolean; // Can handle AI detection processing
|
|
maxSubscriptions: number; // Maximum camera subscriptions supported
|
|
preferredWorkload: number; // Preferred subscription load (0-100)
|
|
}
|
|
```
|
|
|
|
### SlaveNode
|
|
```typescript
|
|
interface SlaveNode {
|
|
nodeId: string; // Unique slave node identifier
|
|
identifier: string; // Human-readable process identifier
|
|
registeredAt: string; // Initial registration timestamp
|
|
lastSeen: string; // Last heartbeat timestamp
|
|
metadata?: Record<string, any>; // Optional process metadata
|
|
}
|
|
```
|
|
|
|
## Redis Data Architecture
|
|
|
|
### Master Election Keys
|
|
- `master-election:master` - Current master process identifier with TTL lock
|
|
- `master-election:heartbeat` - Master heartbeat timestamp for liveness detection
|
|
- `master-election:master_process` - Detailed master process information (JSON)
|
|
|
|
### Process Registry Keys (TTL-Enabled)
|
|
- `master-election:processes` - Hash map of all active processes with per-entry TTL (45s)
|
|
- Channel names derived directly from process ID: `worker:slave:{processId}` - no separate mapping needed
|
|
|
|
### TTL Configuration
|
|
```typescript
|
|
// Per-entry TTL using hSetEx for automatic cleanup
|
|
PROCESS_TTL = 45; // Process registration expires after 45 seconds
|
|
HEARTBEAT_RENEWAL_INTERVAL = 10; // Process heartbeats renew TTL every 10 seconds
|
|
MASTER_LOCK_TTL = 30; // Master lock expires after 30 seconds
|
|
```
|
|
|
|
### Data Persistence Strategy
|
|
Uses **per-entry TTL with hSetEx** for automatic cleanup:
|
|
- Process entries automatically expire if heartbeats stop
|
|
- No manual cleanup processes required
|
|
- Prevents memory leaks from crashed processes
|
|
- Self-healing system that maintains only active processes
|
|
- Slave information derived from processes with role='slave' - no separate storage needed
|
|
- Channel names derived directly from process ID - no mapping table required
|
|
|
|
## Master Election Algorithm
|
|
|
|
### Election Flow Diagram
|
|
|
|
```mermaid
|
|
graph TB
|
|
subgraph "Election Process"
|
|
START[Process Starts] --> ATTEMPT[attemptElection]
|
|
ATTEMPT --> ACQUIRE{acquireMasterLock}
|
|
|
|
ACQUIRE -->|Success| MASTER[becomeMaster]
|
|
ACQUIRE -->|Failed| SLAVE[becomeSlave]
|
|
|
|
MASTER --> HEARTBEAT[startHeartbeat]
|
|
SLAVE --> REGISTER[registerAsSlave]
|
|
|
|
HEARTBEAT --> RENEW{renewMasterLock}
|
|
RENEW -->|Success| CONTINUE[Continue as Master]
|
|
RENEW -->|Failed| STEPDOWN[Step Down → SLAVE]
|
|
|
|
REGISTER --> MONITOR[Monitor Master]
|
|
MONITOR --> CHECK{Master Exists?}
|
|
CHECK -->|Yes| WAIT[Wait and Monitor]
|
|
CHECK -->|No| ATTEMPT
|
|
|
|
STEPDOWN --> SLAVE
|
|
WAIT --> MONITOR
|
|
CONTINUE --> RENEW
|
|
end
|
|
|
|
subgraph "Atomic Operations"
|
|
ACQUIRE --> LUA1[Lua Script: SET master NX + SET heartbeat]
|
|
RENEW --> LUA2[Lua Script: Check owner + PEXPIRE + SET heartbeat]
|
|
STEPDOWN --> LUA3[Lua Script: Check owner + DEL master + DEL heartbeat]
|
|
end
|
|
```
|
|
|
|
### Atomic Lock Operations
|
|
|
|
#### Master Lock Acquisition
|
|
```lua
|
|
-- Atomic master lock acquisition with heartbeat
|
|
if redis.call("SET", KEYS[1], ARGV[1], "NX", "PX", ARGV[2]) then
|
|
redis.call("SET", KEYS[2], ARGV[3], "PX", ARGV[2])
|
|
return 1
|
|
else
|
|
return 0
|
|
end
|
|
```
|
|
|
|
#### Master Lock Renewal
|
|
```lua
|
|
-- Atomic master lock renewal with heartbeat update
|
|
if redis.call("GET", KEYS[1]) == ARGV[1] then
|
|
redis.call("PEXPIRE", KEYS[1], ARGV[2])
|
|
redis.call("SET", KEYS[2], ARGV[3], "PX", ARGV[2])
|
|
return 1
|
|
else
|
|
return 0
|
|
end
|
|
```
|
|
|
|
#### Master Lock Release
|
|
```lua
|
|
-- Atomic master lock release
|
|
if redis.call("GET", KEYS[1]) == ARGV[1] then
|
|
redis.call("DEL", KEYS[1], KEYS[2])
|
|
return 1
|
|
else
|
|
return 0
|
|
end
|
|
```
|
|
|
|
## Process Lifecycle Management
|
|
|
|
### Process Registration Flow
|
|
|
|
```mermaid
|
|
sequenceDiagram
|
|
participant P as Process
|
|
participant R as Redis
|
|
participant M as Master Process
|
|
|
|
Note over P,M: Process Registration with TTL
|
|
|
|
P->>+P: Generate UUID processId
|
|
P->>+P: Determine role (master/slave)
|
|
P->>+P: Assign channel name
|
|
|
|
P->>+R: hSetEx(processes, processId, processInfo, {EX: 45})
|
|
R-->>-P: Registration confirmed
|
|
|
|
P->>+R: hSet(channels, processId, channelName)
|
|
R-->>-P: Channel mapping stored
|
|
|
|
alt Process becomes master
|
|
P->>+R: set(master_process, processInfo)
|
|
R-->>-P: Master process registered
|
|
P->>+M: emit('master-acquired')
|
|
else Process becomes slave
|
|
P->>+R: hSet(slaves, nodeId, slaveInfo)
|
|
R-->>-P: Slave registered
|
|
P->>+M: emit('slave-registered', slaveInfo)
|
|
end
|
|
|
|
Note over P,M: Heartbeat Loop (Every 10s)
|
|
|
|
loop Every 10 seconds
|
|
P->>+P: updateProcessHeartbeat(processId)
|
|
P->>+R: hSetEx(processes, processId, updatedInfo, {EX: 45})
|
|
Note over R: TTL renewed for 45 seconds
|
|
R-->>-P: Heartbeat recorded
|
|
end
|
|
|
|
Note over P,M: Automatic Expiration (No heartbeat)
|
|
|
|
R->>R: 45 seconds pass without heartbeat
|
|
R->>R: Process entry automatically expires
|
|
Note over R: No manual cleanup needed
|
|
```
|
|
|
|
### Master Election Scenarios
|
|
|
|
#### Scenario 1: Initial Startup
|
|
```mermaid
|
|
sequenceDiagram
|
|
participant P1 as Process 1
|
|
participant P2 as Process 2
|
|
participant R as Redis
|
|
|
|
Note over P1,R: First Process Startup
|
|
|
|
P1->>+P1: attemptElection()
|
|
P1->>+R: Lua Script: SET master NX
|
|
R-->>-P1: Success (no existing master)
|
|
|
|
P1->>+P1: becomeMaster()
|
|
P1->>+P1: emit('master-acquired')
|
|
P1->>+P1: startHeartbeat() every 10s
|
|
|
|
Note over P1,R: Second Process Startup
|
|
|
|
P2->>+P2: attemptElection()
|
|
P2->>+R: Lua Script: SET master NX
|
|
R-->>-P2: Failed (master exists)
|
|
|
|
P2->>+P2: becomeSlave()
|
|
P2->>+R: hSet(slaves, nodeId, slaveInfo)
|
|
P2->>+P2: emit('election-completed', false)
|
|
```
|
|
|
|
#### Scenario 2: Master Failure and Failover
|
|
```mermaid
|
|
sequenceDiagram
|
|
participant P1 as Master Process
|
|
participant P2 as Slave Process 1
|
|
participant P3 as Slave Process 2
|
|
participant R as Redis
|
|
|
|
Note over P1,R: Normal Operation
|
|
|
|
P1->>+R: Heartbeat renewal every 10s
|
|
P2->>+P2: Monitor master existence every 5s
|
|
P3->>+P3: Monitor master existence every 5s
|
|
|
|
Note over P1,R: Master Failure
|
|
|
|
P1--XP1: Process crashes/network failure
|
|
|
|
Note over R: Master lock expires after 30s
|
|
|
|
R->>R: Master lock TTL expires
|
|
|
|
Note over P2,R: Slave Detects Missing Master
|
|
|
|
P2->>+R: checkMasterExists() Lua Script
|
|
R-->>-P2: Master not found or stale
|
|
|
|
P2->>+P2: Random delay (0-2s) to reduce collisions
|
|
P2->>+R: attemptElection() - Lua Script: SET master NX
|
|
R-->>-P2: Success - became new master
|
|
|
|
P2->>+P2: becomeMaster()
|
|
P2->>+P2: emit('master-acquired')
|
|
|
|
Note over P3,R: Other Slave Detects New Master
|
|
|
|
P3->>+R: checkMasterExists()
|
|
R-->>-P3: New master found
|
|
P3->>+P3: Continue as slave - no election needed
|
|
```
|
|
|
|
## TTL-Based Cleanup System
|
|
|
|
### Per-Entry TTL Implementation
|
|
|
|
```typescript
|
|
// Process registration with automatic TTL expiration
|
|
public async registerProcess(processInfo: ProcessInfo): Promise<void> {
|
|
// Set process registration with 45 second TTL per entry
|
|
await redisClient.hSetEx(
|
|
this.processesKey,
|
|
{
|
|
[processInfo.processId]: JSON.stringify(processInfo)
|
|
},
|
|
{
|
|
expiration: {
|
|
type: 'EX',
|
|
value: 45 // 45 second TTL per process entry
|
|
}
|
|
}
|
|
);
|
|
|
|
// Map process to channel (no TTL - cleaned up manually)
|
|
await redisClient.hSet(
|
|
this.processChannelsKey,
|
|
processInfo.processId,
|
|
processInfo.channelName
|
|
);
|
|
}
|
|
|
|
// Heartbeat renewal extends TTL automatically
|
|
public async updateProcessHeartbeat(processId: string): Promise<void> {
|
|
const processData = await redisClient.hGet(this.processesKey, processId);
|
|
if (processData) {
|
|
const processInfo: ProcessInfo = JSON.parse(processData);
|
|
processInfo.lastSeen = new Date().toISOString();
|
|
|
|
// Update process and renew TTL on heartbeat (per-entry TTL)
|
|
await redisClient.hSetEx(
|
|
this.processesKey,
|
|
{
|
|
[processId]: JSON.stringify(processInfo)
|
|
},
|
|
{
|
|
expiration: {
|
|
type: 'EX',
|
|
value: 45 // Renew 45 second TTL for this specific process entry
|
|
}
|
|
}
|
|
);
|
|
}
|
|
}
|
|
```
|
|
|
|
### Cleanup Behavior
|
|
|
|
```mermaid
|
|
graph TB
|
|
subgraph "TTL Cleanup Process"
|
|
REG[Process Registration] --> TTL[45s TTL Set]
|
|
TTL --> HB{Heartbeat Within 45s?}
|
|
|
|
HB -->|Yes| RENEW[TTL Renewed to 45s]
|
|
HB -->|No| EXPIRE[Entry Automatically Expires]
|
|
|
|
RENEW --> HB
|
|
EXPIRE --> GONE[Process Removed from Redis]
|
|
|
|
GONE --> DETECT[Other Processes Detect Absence]
|
|
DETECT --> REBALANCE[Automatic Rebalancing]
|
|
end
|
|
|
|
subgraph "Manual vs TTL Cleanup"
|
|
MANUAL[Manual Cleanup Process]
|
|
AUTOMATIC[TTL-Based Cleanup]
|
|
|
|
MANUAL -.->|"❌ Complex"| ISSUES[Race Conditions<br/>Memory Leaks<br/>Stale Data]
|
|
AUTOMATIC -.->|"✅ Simple"| BENEFITS[Self-Healing<br/>No Race Conditions<br/>Guaranteed Cleanup]
|
|
end
|
|
```
|
|
|
|
## Event System Architecture
|
|
|
|
### Event Emission Flow
|
|
|
|
```mermaid
|
|
graph TD
|
|
subgraph "Election Events"
|
|
START[Election Started] --> ATTEMPT[Attempt Lock Acquisition]
|
|
ATTEMPT --> SUCCESS{Lock Acquired?}
|
|
|
|
SUCCESS -->|Yes| MASTER[Become Master]
|
|
SUCCESS -->|No| SLAVE[Become Slave]
|
|
|
|
MASTER --> MASTER_EVENT[emit('master-acquired')]
|
|
SLAVE --> SLAVE_EVENT[emit('election-completed', false)]
|
|
|
|
MASTER_EVENT --> HEARTBEAT[Start Heartbeat Loop]
|
|
SLAVE_EVENT --> MONITOR[Start Master Monitoring]
|
|
end
|
|
|
|
subgraph "Heartbeat Events"
|
|
HEARTBEAT --> RENEW{Renew Lock?}
|
|
RENEW -->|Success| CONTINUE[Continue as Master]
|
|
RENEW -->|Failed| LOST[emit('master-lost')]
|
|
|
|
LOST --> STEPDOWN[Step Down to Slave]
|
|
STEPDOWN --> TRIGGER[Trigger New Election]
|
|
CONTINUE --> HEARTBEAT
|
|
end
|
|
|
|
subgraph "Slave Management Events"
|
|
SLAVE_JOIN[New Slave Joins] --> SLAVE_REG[emit('slave-registered')]
|
|
SLAVE_TIMEOUT[Slave Heartbeat Timeout] --> SLAVE_REM[emit('slave-removed')]
|
|
|
|
SLAVE_REG --> NOTIFY[Notify Dependent Services]
|
|
SLAVE_REM --> CLEANUP[Cleanup Assignments]
|
|
end
|
|
```
|
|
|
|
### Event Handler Integration
|
|
|
|
```typescript
|
|
// Example: Camera module integration with MasterElection events
|
|
const masterElection = getMasterElection();
|
|
|
|
masterElection.on('master-acquired', () => {
|
|
// This process became master - start managing workers
|
|
masterSlaveWorkerCluster.becomeMaster();
|
|
logger.info('Camera cluster: Became master, connecting to workers');
|
|
});
|
|
|
|
masterElection.on('master-lost', () => {
|
|
// This process lost master status - become slave
|
|
masterSlaveWorkerCluster.becomeSlave();
|
|
logger.info('Camera cluster: Became slave, disconnecting workers');
|
|
});
|
|
|
|
masterElection.on('slave-registered', (slave: SlaveNode) => {
|
|
// New backend process joined - rebalance workload
|
|
masterSlaveWorkerCluster.handleSlaveJoined(slave);
|
|
logger.info(`Camera cluster: Slave ${slave.nodeId} joined`);
|
|
});
|
|
|
|
masterElection.on('slave-removed', (nodeId: string) => {
|
|
// Backend process left - reassign its workload
|
|
masterSlaveWorkerCluster.handleSlaveLeft(nodeId);
|
|
logger.info(`Camera cluster: Slave ${nodeId} removed`);
|
|
});
|
|
```
|
|
|
|
## Process Coordination Patterns
|
|
|
|
### Master Role Responsibilities
|
|
|
|
```mermaid
|
|
graph TB
|
|
subgraph "Master Process Duties"
|
|
LOCK[Maintain Master Lock] --> HEARTBEAT[Send Heartbeats Every 10s]
|
|
HEARTBEAT --> MONITOR[Monitor All Slave Processes]
|
|
|
|
MONITOR --> CLEANUP[Cleanup Stale Slave Entries]
|
|
CLEANUP --> BALANCE[Coordinate Resource Balancing]
|
|
|
|
BALANCE --> WORKERS[Manage Worker Connections]
|
|
WORKERS --> ROUTE[Route Messages to Slaves]
|
|
|
|
ROUTE --> STATUS[Provide Cluster Status]
|
|
STATUS --> LOCK
|
|
end
|
|
|
|
subgraph "Master Failure Scenarios"
|
|
NETWORK[Network Partition] --> TIMEOUT[Lock Renewal Timeout]
|
|
CRASH[Process Crash] --> TIMEOUT
|
|
OVERLOAD[Resource Overload] --> TIMEOUT
|
|
|
|
TIMEOUT --> EXPIRE[Master Lock Expires]
|
|
EXPIRE --> ELECTION[New Election Triggered]
|
|
ELECTION --> RECOVER[New Master Elected]
|
|
end
|
|
```
|
|
|
|
### Slave Role Responsibilities
|
|
|
|
```mermaid
|
|
graph TB
|
|
subgraph "Slave Process Duties"
|
|
REGISTER[Register with Master Election] --> HEARTBEAT[Send Heartbeats Every 5s]
|
|
HEARTBEAT --> MONITOR[Monitor Master Existence]
|
|
|
|
MONITOR --> PROCESS[Process Assigned Messages]
|
|
PROCESS --> REPORT[Report Status to Master]
|
|
|
|
REPORT --> DETECT{Master Missing?}
|
|
DETECT -->|No| MONITOR
|
|
DETECT -->|Yes| ELECTION[Trigger Election]
|
|
|
|
ELECTION --> ATTEMPT{Win Election?}
|
|
ATTEMPT -->|Yes| PROMOTE[Become Master]
|
|
ATTEMPT -->|No| CONTINUE[Continue as Slave]
|
|
|
|
PROMOTE --> MASTER[Master Role Duties]
|
|
CONTINUE --> REGISTER
|
|
end
|
|
```
|
|
|
|
## Class Responsibilities Overview
|
|
|
|
### Core Class Functions
|
|
|
|
| Class | Primary Responsibility | Key Methods | Process Type |
|
|
|-------|----------------------|-------------|--------------|
|
|
| **MasterElection** | Distributed coordination and leadership election | • `start()` - Initialize election process<br/>• `attemptElection()` - Try to acquire master lock<br/>• `becomeMaster()` - Transition to master role<br/>• `becomeSlave()` - Transition to slave role<br/>• `waitForElectionComplete()` - Synchronous election waiting | Both Master & Slave |
|
|
| **Process Registry** | Process lifecycle management | • `registerProcess()` - Register with TTL<br/>• `updateProcessHeartbeat()` - Renew TTL<br/>• `getAllProcesses()` - Get active processes<br/>• `getProcessesByRole()` - Filter by master/slave<br/>• `unregisterProcess()` - Manual cleanup | Both Master & Slave |
|
|
| **Master Lock Manager** | Atomic lock operations | • `acquireMasterLock()` - Lua script lock acquisition<br/>• `renewMasterLock()` - Lua script lock renewal<br/>• `releaseMasterLock()` - Lua script lock release<br/>• `checkMasterExists()` - Lua script master validation | Both Master & Slave |
|
|
| **Slave Management** | Slave registration and monitoring | • `registerAsSlave()` - Register as slave node<br/>• `updateSlaveHeartbeat()` - Update slave status<br/>• `cleanupStaleSlaves()` - Remove expired slaves<br/>• `getSlaves()` - Get all registered slaves | Both Master & Slave |
|
|
|
|
## Object Relationship Diagrams
|
|
|
|
### Core Class Structure and Dependencies
|
|
|
|
```mermaid
|
|
classDiagram
|
|
class MasterElection {
|
|
-nodeId: string
|
|
-identifier: string
|
|
-isMaster: boolean
|
|
-lockTtl: number
|
|
-heartbeatInterval: number
|
|
+start()
|
|
+stop()
|
|
+getIsMaster(): boolean
|
|
+getNodeId(): string
|
|
+waitForElectionComplete(): Promise~boolean~
|
|
-attemptElection()
|
|
-acquireMasterLock(): Promise~boolean~
|
|
-renewMasterLock(): Promise~boolean~
|
|
-releaseMasterLock()
|
|
-becomeMaster()
|
|
-becomeSlave()
|
|
-checkMasterExists(): Promise~boolean~
|
|
}
|
|
|
|
class ProcessRegistry {
|
|
+registerProcess(processInfo)
|
|
+updateProcessHeartbeat(processId)
|
|
+getAllProcesses(): Promise~ProcessInfo[]~
|
|
+getMasterProcess(): Promise~ProcessInfo~
|
|
+getProcessesByRole(role): Promise~ProcessInfo[]~
|
|
+unregisterProcess(processId)
|
|
+getProcessChannel(processId): Promise~string~
|
|
}
|
|
|
|
class SlaveManagement {
|
|
+registerAsSlave()
|
|
+unregisterFromSlaves()
|
|
+updateSlaveHeartbeat()
|
|
+getSlaves(): Promise~SlaveNode[]~
|
|
+getSlave(nodeId): Promise~SlaveNode~
|
|
+getSlaveCount(): Promise~number~
|
|
-cleanupStaleSlaves()
|
|
-startSlaveManagement()
|
|
-stopSlaveManagement()
|
|
}
|
|
|
|
class EventEmitter {
|
|
+on(event, listener)
|
|
+emit(event, ...args)
|
|
+once(event, listener)
|
|
+off(event, listener)
|
|
}
|
|
|
|
MasterElection --|> EventEmitter : extends
|
|
MasterElection --* ProcessRegistry : contains
|
|
MasterElection --* SlaveManagement : contains
|
|
|
|
MasterElection --> Redis : uses for coordination
|
|
ProcessRegistry --> Redis : uses hSetEx for TTL
|
|
SlaveManagement --> Redis : uses for slave state
|
|
```
|
|
|
|
### Redis Operations and Key Management
|
|
|
|
```mermaid
|
|
graph TB
|
|
subgraph "Redis Key Structure"
|
|
MASTER[master-election:master<br/>String - Current master ID with TTL]
|
|
HEARTBEAT[master-election:heartbeat<br/>String - Master heartbeat timestamp]
|
|
MASTER_PROC[master-election:master_process<br/>String - Master ProcessInfo JSON]
|
|
|
|
PROCESSES[master-election:processes<br/>Hash - ProcessInfo with per-entry TTL]
|
|
CHANNELS[master-election:channels<br/>Hash - ProcessID → Channel mapping]
|
|
SLAVES[master-election:slaves<br/>Hash - SlaveNode data]
|
|
end
|
|
|
|
subgraph "Atomic Operations"
|
|
LUA1[Master Acquisition<br/>SET master NX + SET heartbeat]
|
|
LUA2[Master Renewal<br/>Check owner + PEXPIRE + SET heartbeat]
|
|
LUA3[Master Release<br/>Check owner + DEL master + heartbeat]
|
|
LUA4[Master Check<br/>GET master + GET heartbeat + validate TTL]
|
|
end
|
|
|
|
subgraph "TTL Operations"
|
|
HSETEX1[Process Registration<br/>hSetEx with 45s TTL per entry]
|
|
HSETEX2[Heartbeat Renewal<br/>hSetEx renews TTL to 45s]
|
|
AUTO[Automatic Expiration<br/>Redis removes expired entries]
|
|
end
|
|
|
|
MASTER --> LUA1
|
|
MASTER --> LUA2
|
|
MASTER --> LUA3
|
|
HEARTBEAT --> LUA1
|
|
HEARTBEAT --> LUA2
|
|
HEARTBEAT --> LUA4
|
|
|
|
PROCESSES --> HSETEX1
|
|
PROCESSES --> HSETEX2
|
|
PROCESSES --> AUTO
|
|
```
|
|
|
|
## Method Call Flow Analysis
|
|
|
|
### Election and Role Transition Flow
|
|
|
|
```mermaid
|
|
sequenceDiagram
|
|
participant App as Application
|
|
participant ME as MasterElection
|
|
participant R as Redis
|
|
participant Dep as Dependent Services
|
|
|
|
Note over App,Dep: Election Initialization
|
|
|
|
App->>+ME: start()
|
|
ME->>+ME: attemptElection()
|
|
ME->>+ME: emit('election-started')
|
|
|
|
ME->>+R: Lua Script: acquireMasterLock()
|
|
|
|
alt Lock acquired successfully
|
|
R-->>-ME: Success (1)
|
|
ME->>+ME: becomeMaster()
|
|
ME->>+ME: startHeartbeat() - every 10s
|
|
ME->>+ME: startSlaveManagement()
|
|
ME->>+Dep: emit('master-acquired')
|
|
ME->>+ME: emit('election-completed', true)
|
|
else Lock acquisition failed
|
|
R-->>-ME: Failed (0)
|
|
ME->>+ME: becomeSlave()
|
|
ME->>+R: hSet(slaves, nodeId, slaveInfo)
|
|
ME->>+ME: startPeriodicCheck() - every 5s
|
|
ME->>+Dep: emit('election-completed', false)
|
|
end
|
|
|
|
Note over App,Dep: Heartbeat and Monitoring Loop
|
|
|
|
loop Every 10 seconds (Master) / 5 seconds (Slave)
|
|
alt Process is Master
|
|
ME->>+R: Lua Script: renewMasterLock()
|
|
alt Renewal successful
|
|
R-->>-ME: Success (1)
|
|
ME->>+ME: Continue as master
|
|
else Renewal failed
|
|
R-->>-ME: Failed (0)
|
|
ME->>+ME: becomeSlave()
|
|
ME->>+Dep: emit('master-lost')
|
|
ME->>+ME: attemptElection() after delay
|
|
end
|
|
else Process is Slave
|
|
ME->>+R: Lua Script: checkMasterExists()
|
|
alt Master exists and healthy
|
|
R-->>-ME: Master found (1)
|
|
ME->>+ME: Continue monitoring
|
|
else No master or stale
|
|
R-->>-ME: No master (0)
|
|
ME->>+ME: attemptElection() with random delay
|
|
end
|
|
end
|
|
end
|
|
```
|
|
|
|
### Process Registration and TTL Management Flow
|
|
|
|
```mermaid
|
|
sequenceDiagram
|
|
participant P as Process
|
|
participant ME as MasterElection
|
|
participant R as Redis
|
|
participant Auto as Redis TTL
|
|
|
|
Note over P,Auto: Process Registration with TTL
|
|
|
|
P->>+ME: registerProcess(processInfo)
|
|
|
|
ME->>+R: hSetEx(processes, processId, processInfo, {EX: 45})
|
|
Note over R: Entry set with 45 second TTL
|
|
R-->>-ME: Registration confirmed
|
|
|
|
ME->>+R: hSet(channels, processId, channelName)
|
|
R-->>-ME: Channel mapping stored
|
|
|
|
alt Process is master
|
|
ME->>+R: set(master_process, processInfo)
|
|
R-->>-ME: Master process info stored
|
|
end
|
|
|
|
ME-->>-P: Registration complete
|
|
|
|
Note over P,Auto: Heartbeat Loop (Every 10s)
|
|
|
|
loop Every 10 seconds
|
|
P->>+ME: updateProcessHeartbeat(processId)
|
|
|
|
ME->>+R: hGet(processes, processId)
|
|
R-->>-ME: Current process data
|
|
|
|
ME->>+ME: Update lastSeen timestamp
|
|
|
|
ME->>+R: hSetEx(processes, processId, updatedInfo, {EX: 45})
|
|
Note over R: TTL renewed to 45 seconds
|
|
R-->>-ME: Heartbeat recorded
|
|
|
|
ME-->>-P: Heartbeat updated
|
|
end
|
|
|
|
Note over P,Auto: Automatic TTL Expiration (No heartbeat)
|
|
|
|
Note over Auto: 45 seconds pass without heartbeat
|
|
Auto->>Auto: Process entry automatically expires
|
|
Auto->>R: Remove expired entry from hash
|
|
|
|
Note over P,Auto: Other processes detect absence
|
|
|
|
P->>+ME: getAllProcesses()
|
|
ME->>+R: hGetAll(processes)
|
|
R-->>-ME: Only active processes returned
|
|
Note over ME: Expired process not included
|
|
ME-->>-P: Updated process list
|
|
```
|
|
|
|
## System Architecture Diagrams
|
|
|
|
### Master Election Cluster Architecture
|
|
|
|
```mermaid
|
|
graph TB
|
|
subgraph "Backend Process Cluster"
|
|
M[Master Process<br/>Elected Leader<br/>🏆]
|
|
S1[Slave Process 1<br/>Follower]
|
|
S2[Slave Process 2<br/>Follower]
|
|
S3[Slave Process N<br/>Follower]
|
|
end
|
|
|
|
subgraph "Redis Coordination Layer"
|
|
R[(Redis Server)]
|
|
subgraph "Election Keys"
|
|
MK[master-election:master<br/>Lock with TTL]
|
|
HK[master-election:heartbeat<br/>Timestamp]
|
|
end
|
|
subgraph "Process Registry (TTL)"
|
|
PK[master-election:processes<br/>Hash with per-entry TTL]
|
|
CK[master-election:channels<br/>Process→Channel mapping]
|
|
end
|
|
subgraph "Slave Management"
|
|
SK[master-election:slaves<br/>Slave registration data]
|
|
end
|
|
end
|
|
|
|
subgraph "Dependent Services"
|
|
CAM[Camera Module<br/>MasterSlaveWorkerCluster]
|
|
DS[Display Service<br/>WebSocket Cluster]
|
|
OTHER[Other Services<br/>...]
|
|
end
|
|
|
|
M ===|Master Lock<br/>Heartbeat Every 10s| MK
|
|
M ===|Timestamp Update| HK
|
|
M ===|TTL Registration<br/>Heartbeat Renewal| PK
|
|
|
|
S1 <-->|Monitor Master<br/>Every 5s| R
|
|
S2 <-->|Monitor Master<br/>Every 5s| R
|
|
S3 <-->|Monitor Master<br/>Every 5s| R
|
|
|
|
S1 ===|Slave Registration<br/>Heartbeat Every 5s| SK
|
|
S2 ===|Slave Registration<br/>Heartbeat Every 5s| SK
|
|
S3 ===|Slave Registration<br/>Heartbeat Every 5s| SK
|
|
|
|
M -.->|master-acquired<br/>slave-registered<br/>slave-removed| CAM
|
|
M -.->|Role transition events| DS
|
|
M -.->|Coordination events| OTHER
|
|
|
|
S1 -.->|election-completed<br/>master-lost| CAM
|
|
S2 -.->|Election events| DS
|
|
S3 -.->|Status events| OTHER
|
|
```
|
|
|
|
### TTL-Based Cleanup Architecture
|
|
|
|
```mermaid
|
|
graph TB
|
|
subgraph "Process Lifecycle with TTL"
|
|
START[Process Starts] --> REG[Register with 45s TTL]
|
|
REG --> ACTIVE[Process Active]
|
|
|
|
ACTIVE --> HB{Heartbeat?}
|
|
HB -->|Every 10s| RENEW[Renew TTL to 45s]
|
|
HB -->|Missed| COUNT[Count down TTL]
|
|
|
|
RENEW --> ACTIVE
|
|
COUNT --> EXPIRE{TTL = 0?}
|
|
EXPIRE -->|No| COUNT
|
|
EXPIRE -->|Yes| CLEANUP[Redis Auto-Remove]
|
|
|
|
CLEANUP --> DETECT[Other Processes Detect]
|
|
DETECT --> REBALANCE[Trigger Rebalancing]
|
|
end
|
|
|
|
subgraph "Traditional Manual Cleanup vs TTL"
|
|
subgraph "❌ Manual Cleanup Problems"
|
|
RACE[Race Conditions]
|
|
LEAK[Memory Leaks]
|
|
STALE[Stale Data]
|
|
COMPLEX[Complex Logic]
|
|
end
|
|
|
|
subgraph "✅ TTL-Based Benefits"
|
|
AUTO[Automatic Cleanup]
|
|
RELIABLE[Reliable Expiration]
|
|
SIMPLE[Simple Implementation]
|
|
SELF[Self-Healing]
|
|
end
|
|
end
|
|
|
|
subgraph "TTL Management Operations"
|
|
HSETEX[hSetEx(key, field, value, {EX: 45})]
|
|
RENEWAL[Heartbeat renews TTL automatically]
|
|
EXPIRY[Redis removes expired entries]
|
|
|
|
HSETEX --> RENEWAL
|
|
RENEWAL --> EXPIRY
|
|
EXPIRY --> HSETEX
|
|
end
|
|
```
|
|
|
|
### Election Timing and Coordination
|
|
|
|
```mermaid
|
|
gantt
|
|
title Master Election Timeline
|
|
dateFormat X
|
|
axisFormat %s
|
|
|
|
section Master Lock
|
|
Master Lock TTL (30s) :milestone, m1, 0, 0s
|
|
Lock Renewal (10s) :10, 20s
|
|
Lock Renewal (10s) :20, 30s
|
|
Lock Expires :milestone, m2, 30, 30s
|
|
|
|
section Process TTL
|
|
Process Registration (45s) :milestone, p1, 0, 0s
|
|
Heartbeat Renewal (10s) :10, 20s
|
|
Heartbeat Renewal (10s) :20, 30s
|
|
Heartbeat Renewal (10s) :30, 40s
|
|
Process Expires :milestone, p2, 45, 45s
|
|
|
|
section Election Events
|
|
Initial Election :milestone, e1, 0, 0s
|
|
Slave Monitoring (5s) :5, 10s
|
|
Slave Monitoring (5s) :10, 15s
|
|
Master Failure Detected :milestone, e2, 30, 30s
|
|
New Election Started :32, 35s
|
|
New Master Elected :milestone, e3, 35, 35s
|
|
```
|
|
|
|
## Event System Architecture
|
|
|
|
### Event Flow and Dependencies
|
|
|
|
```mermaid
|
|
graph TD
|
|
subgraph "MasterElection Events"
|
|
ES[election-started] --> EA{Election Attempt}
|
|
EA -->|Success| MA[master-acquired]
|
|
EA -->|Failed| EC[election-completed(false)]
|
|
|
|
MA --> HB[Start Heartbeat Loop]
|
|
EC --> MON[Start Master Monitoring]
|
|
|
|
HB --> RENEW{Heartbeat Success?}
|
|
RENEW -->|Success| CONT[Continue as Master]
|
|
RENEW -->|Failed| ML[master-lost]
|
|
|
|
ML --> STEP[Step Down to Slave]
|
|
STEP --> MON
|
|
|
|
CONT --> HB
|
|
MON --> CHECK{Master Missing?}
|
|
CHECK -->|Yes| ES
|
|
CHECK -->|No| MON
|
|
end
|
|
|
|
subgraph "Slave Management Events"
|
|
SR[slave-registered] --> UP[Update Assignments]
|
|
SREM[slave-removed] --> CLEAN[Cleanup Assignments]
|
|
|
|
UP --> NOTIFY[Notify Services]
|
|
CLEAN --> REBAL[Rebalance Load]
|
|
end
|
|
|
|
subgraph "Error Handling Events"
|
|
ERR[error] --> LOG[Log Error Details]
|
|
LOG --> RECOVER[Attempt Recovery]
|
|
RECOVER --> ES
|
|
end
|
|
|
|
subgraph "External Service Integration"
|
|
MA -.->|becomeMaster()| CAMERA[Camera Module]
|
|
ML -.->|becomeSlave()| CAMERA
|
|
SR -.->|slaveJoined()| CAMERA
|
|
SREM -.->|slaveLeft()| CAMERA
|
|
|
|
MA -.->|Master role| DISPLAY[Display Service]
|
|
ML -.->|Slave role| DISPLAY
|
|
|
|
MA -.->|Coordinate| OTHER[Other Services]
|
|
ML -.->|Follow| OTHER
|
|
end
|
|
```
|
|
|
|
### Event Sequence Patterns
|
|
|
|
#### Master Failure and Recovery Pattern
|
|
|
|
```mermaid
|
|
sequenceDiagram
|
|
participant M as Master Process
|
|
participant S1 as Slave 1
|
|
participant S2 as Slave 2
|
|
participant R as Redis
|
|
participant Svc as Dependent Services
|
|
|
|
Note over M,Svc: Normal Operation
|
|
M->>R: Heartbeat renewal every 10s
|
|
S1->>R: Monitor master every 5s
|
|
S2->>R: Monitor master every 5s
|
|
|
|
Note over M,Svc: Master Failure
|
|
M--XM: Process crashes
|
|
|
|
Note over R: Master lock expires (30s)
|
|
R->>R: Lock TTL expires
|
|
|
|
Note over S1,S2: Slaves detect master failure
|
|
S1->>R: checkMasterExists() → false
|
|
S2->>R: checkMasterExists() → false
|
|
|
|
Note over S1,S2: Election race with random delay
|
|
S1->>S1: Random delay 1.2s
|
|
S2->>S2: Random delay 0.8s
|
|
|
|
S2->>R: attemptElection() first
|
|
R->>S2: Success - became master
|
|
S2->>S2: emit('master-acquired')
|
|
S2->>Svc: becomeMaster() event
|
|
|
|
S1->>R: attemptElection() second
|
|
R->>S1: Failed - master exists
|
|
S1->>S1: Continue as slave
|
|
|
|
Note over S2,Svc: New master operational
|
|
S2->>R: Start heartbeat renewal
|
|
Svc->>S2: Acknowledge new master
|
|
```
|
|
|
|
## Configuration and Tuning
|
|
|
|
### Timing Configuration
|
|
|
|
```typescript
|
|
// MasterElection constructor parameters
|
|
interface MasterElectionConfig {
|
|
lockName: string = 'master-election'; // Redis key prefix
|
|
lockTtl: number = 30000; // Master lock TTL (30 seconds)
|
|
heartbeatInterval: number = 10000; // Master heartbeat interval (10 seconds)
|
|
checkInterval: number = 5000; // Slave monitoring interval (5 seconds)
|
|
identifier: string = 'cms-backend'; // Human-readable process identifier
|
|
}
|
|
|
|
// TTL Configuration
|
|
const PROCESS_TTL_SECONDS = 45; // Process registration TTL
|
|
const SLAVE_TIMEOUT_MS = 15000; // Slave cleanup threshold (3x heartbeat)
|
|
const ELECTION_RANDOM_DELAY_MAX = 2000; // Max random delay to prevent collisions
|
|
```
|
|
|
|
### Redis Key Structure
|
|
|
|
```typescript
|
|
// Election and coordination keys
|
|
const REDIS_KEYS = {
|
|
// Master election coordination
|
|
master: `${lockName}:master`, // Current master ID with TTL
|
|
heartbeat: `${lockName}:heartbeat`, // Master heartbeat timestamp
|
|
masterProcess: `${lockName}:master_process`, // Master ProcessInfo JSON
|
|
|
|
// Process registry with TTL
|
|
processes: `${lockName}:processes`, // Hash: processId → ProcessInfo (TTL per entry)
|
|
channels: `${lockName}:channels`, // Hash: processId → channelName
|
|
|
|
// Slave management
|
|
slaves: `${lockName}:slaves`, // Hash: nodeId → SlaveNode
|
|
};
|
|
|
|
// TTL settings
|
|
const TTL_CONFIG = {
|
|
masterLock: 30, // seconds - Master lock expiration
|
|
processEntry: 45, // seconds - Process registration TTL
|
|
heartbeatRenewal: 10, // seconds - How often to renew heartbeats
|
|
slaveMonitoring: 5, // seconds - How often slaves check master
|
|
};
|
|
```
|
|
|
|
### Performance Characteristics
|
|
|
|
#### Scalability Metrics
|
|
- **Election Speed**: < 100ms for uncontested election
|
|
- **Failover Time**: < 5 seconds from master failure to new election
|
|
- **Process Registration**: < 10ms per process registration
|
|
- **TTL Cleanup**: Automatic, no performance impact on application
|
|
|
|
#### Resource Usage
|
|
- **Memory**: O(n) where n = number of backend processes
|
|
- **Redis Operations**: Atomic Lua scripts prevent race conditions
|
|
- **Network**: Minimal - only heartbeats and election attempts
|
|
- **CPU**: Negligible overhead for coordination operations
|
|
|
|
#### Reliability Guarantees
|
|
- **Split-Brain Prevention**: Atomic Lua scripts ensure single master
|
|
- **Automatic Recovery**: TTL-based cleanup handles all failure scenarios
|
|
- **Event Consistency**: All role transitions emit events for service coordination
|
|
- **State Persistence**: Process registry survives Redis restarts
|
|
|
|
## Public Interface Specification
|
|
|
|
The MasterElection service provides a clean, event-driven interface for distributed coordination across backend processes.
|
|
|
|
### Primary Interface: MasterElection Class
|
|
|
|
#### Core Lifecycle Methods
|
|
|
|
```typescript
|
|
/**
|
|
* Initialize and start the master election process
|
|
* @returns Promise<void> - Resolves when election completes
|
|
*/
|
|
public async start(): Promise<void>
|
|
|
|
/**
|
|
* Stop master election and cleanup resources
|
|
* @returns Promise<void> - Resolves when cleanup completes
|
|
*/
|
|
public async stop(): Promise<void>
|
|
|
|
/**
|
|
* Wait for election to complete with timeout
|
|
* @param timeoutMs - Maximum time to wait (default: 30000)
|
|
* @returns Promise<boolean> - true if became master, false if slave
|
|
*/
|
|
public async waitForElectionComplete(timeoutMs: number = 30000): Promise<boolean>
|
|
```
|
|
|
|
#### Status and Information Methods
|
|
|
|
```typescript
|
|
/**
|
|
* Check if this process is currently the master
|
|
* @returns boolean - true if master, false if slave
|
|
*/
|
|
public getIsMaster(): boolean
|
|
|
|
/**
|
|
* Get this process's unique node identifier
|
|
* @returns string - UUID-based node identifier
|
|
*/
|
|
public getNodeId(): string
|
|
|
|
/**
|
|
* Get this process's human-readable identifier
|
|
* @returns string - Process identifier (e.g., 'cms-backend')
|
|
*/
|
|
public getIdentifier(): string
|
|
|
|
/**
|
|
* Get or set process metadata for coordination
|
|
* @param metadata - Optional metadata to set
|
|
* @returns Record<string, any> - Current metadata
|
|
*/
|
|
public setMetadata(metadata: Record<string, any>): void
|
|
public getMetadata(): Record<string, any>
|
|
```
|
|
|
|
#### Process Registry Methods
|
|
|
|
```typescript
|
|
/**
|
|
* Register a process in the distributed registry with TTL
|
|
* @param processInfo - Process information including role and capabilities
|
|
* @returns Promise<void>
|
|
*/
|
|
public async registerProcess(processInfo: ProcessInfo): Promise<void>
|
|
|
|
/**
|
|
* Update process heartbeat to renew TTL (45 seconds)
|
|
* @param processId - Process identifier to update
|
|
* @returns Promise<void>
|
|
*/
|
|
public async updateProcessHeartbeat(processId: string): Promise<void>
|
|
|
|
/**
|
|
* Get all currently registered processes (auto-filtered by TTL)
|
|
* @returns Promise<ProcessInfo[]> - Array of active processes
|
|
*/
|
|
public async getAllProcesses(): Promise<ProcessInfo[]>
|
|
|
|
/**
|
|
* Get current master process information
|
|
* @returns Promise<ProcessInfo | null> - Master process or null if none
|
|
*/
|
|
public async getMasterProcess(): Promise<ProcessInfo | null>
|
|
|
|
/**
|
|
* Get processes filtered by role
|
|
* @param role - 'master' or 'slave'
|
|
* @returns Promise<ProcessInfo[]> - Processes with specified role
|
|
*/
|
|
public async getProcessesByRole(role: 'master' | 'slave'): Promise<ProcessInfo[]>
|
|
```
|
|
|
|
#### Slave Management Methods
|
|
|
|
```typescript
|
|
/**
|
|
* Get all registered slave nodes
|
|
* @returns Promise<SlaveNode[]> - Array of active slaves
|
|
*/
|
|
public async getSlaves(): Promise<SlaveNode[]>
|
|
|
|
/**
|
|
* Get specific slave node information
|
|
* @param nodeId - Slave node identifier
|
|
* @returns Promise<SlaveNode | null> - Slave info or null if not found
|
|
*/
|
|
public async getSlave(nodeId: string): Promise<SlaveNode | null>
|
|
|
|
/**
|
|
* Get count of registered slave nodes
|
|
* @returns Promise<number> - Number of active slaves
|
|
*/
|
|
public async getSlaveCount(): Promise<number>
|
|
```
|
|
|
|
### Event System Interface
|
|
|
|
#### Event Registration
|
|
|
|
```typescript
|
|
// Type-safe event registration
|
|
masterElection.on('master-acquired', () => {
|
|
// This process became the master
|
|
console.log('Became master - start coordinating resources');
|
|
});
|
|
|
|
masterElection.on('master-lost', () => {
|
|
// This process lost master status
|
|
console.log('Lost master status - step down to slave role');
|
|
});
|
|
|
|
masterElection.on('election-completed', (isMaster: boolean) => {
|
|
// Election finished - role determined
|
|
console.log(`Election completed - role: ${isMaster ? 'MASTER' : 'SLAVE'}`);
|
|
});
|
|
|
|
masterElection.on('slave-registered', (slave: SlaveNode) => {
|
|
// New backend process joined cluster
|
|
console.log(`New slave joined: ${slave.nodeId}`);
|
|
});
|
|
|
|
masterElection.on('slave-removed', (nodeId: string) => {
|
|
// Backend process left cluster (TTL expired)
|
|
console.log(`Slave removed: ${nodeId}`);
|
|
});
|
|
|
|
masterElection.on('error', (error: Error) => {
|
|
// Election or coordination error occurred
|
|
console.error('Master election error:', error);
|
|
});
|
|
```
|
|
|
|
#### Event Timing Guarantees
|
|
|
|
- **master-acquired**: Emitted immediately after successful lock acquisition
|
|
- **master-lost**: Emitted immediately after failed lock renewal
|
|
- **election-completed**: Emitted after initial election resolves (master or slave)
|
|
- **slave-registered**: Emitted when new slave joins (master only)
|
|
- **slave-removed**: Emitted when slave TTL expires (master only)
|
|
- **error**: Emitted on Redis connection issues or election failures
|
|
|
|
### Usage Patterns
|
|
|
|
#### Basic Initialization and Coordination
|
|
|
|
```typescript
|
|
import { initialize, getMasterElection } from '~/services/MasterElection';
|
|
|
|
// Initialize master election with custom settings
|
|
await initialize(
|
|
'cms-cluster', // lockName - Redis key prefix
|
|
30000, // lockTtl - Master lock TTL (30s)
|
|
10000, // heartbeatInterval - Master heartbeat (10s)
|
|
5000, // checkInterval - Slave monitoring (5s)
|
|
'cms-backend-prod' // identifier - Human-readable name
|
|
);
|
|
|
|
// Get election instance for event handling
|
|
const masterElection = getMasterElection();
|
|
|
|
// Wait for initial election to complete
|
|
const isMaster = await masterElection.waitForElectionComplete();
|
|
console.log(`Process started as: ${isMaster ? 'MASTER' : 'SLAVE'}`);
|
|
```
|
|
|
|
#### Service Integration Pattern
|
|
|
|
```typescript
|
|
// Camera module integration example
|
|
class CameraClusterService {
|
|
private masterElection: MasterElection;
|
|
|
|
constructor() {
|
|
this.masterElection = getMasterElection();
|
|
this.setupElectionHandlers();
|
|
}
|
|
|
|
private setupElectionHandlers() {
|
|
// Handle master role transitions
|
|
this.masterElection.on('master-acquired', () => {
|
|
this.becomeMaster();
|
|
});
|
|
|
|
this.masterElection.on('master-lost', () => {
|
|
this.becomeSlave();
|
|
});
|
|
|
|
// Handle cluster membership changes
|
|
this.masterElection.on('slave-registered', (slave) => {
|
|
this.handleSlaveJoined(slave);
|
|
});
|
|
|
|
this.masterElection.on('slave-removed', (nodeId) => {
|
|
this.handleSlaveLeft(nodeId);
|
|
});
|
|
}
|
|
|
|
private async becomeMaster() {
|
|
console.log('Camera service: Becoming master');
|
|
|
|
// Connect to all Python ML workers
|
|
await this.connectToAllWorkers();
|
|
|
|
// Start managing cluster assignments
|
|
this.startClusterManagement();
|
|
|
|
// Begin rebalancing subscriptions
|
|
this.startRebalancing();
|
|
}
|
|
|
|
private async becomeSlave() {
|
|
console.log('Camera service: Becoming slave');
|
|
|
|
// Disconnect from Python workers (master-only)
|
|
await this.disconnectFromWorkers();
|
|
|
|
// Stop cluster management
|
|
this.stopClusterManagement();
|
|
|
|
// Start listening for routed messages
|
|
this.startSlaveMessageHandling();
|
|
}
|
|
}
|
|
```
|
|
|
|
#### Process Registration with Custom Capabilities
|
|
|
|
```typescript
|
|
// Register this process with specific capabilities
|
|
await masterElection.registerProcess({
|
|
processId: masterElection.getNodeId(),
|
|
nodeId: masterElection.getNodeId(),
|
|
role: masterElection.getIsMaster() ? 'master' : 'slave',
|
|
channelName: `worker:slave:${masterElection.getNodeId()}`,
|
|
lastSeen: new Date().toISOString(),
|
|
capabilities: {
|
|
canProcessDetections: true, // Can handle AI detection callbacks
|
|
maxSubscriptions: 100, // Maximum camera subscriptions
|
|
preferredWorkload: 80 // Preferred load percentage (0-100)
|
|
}
|
|
});
|
|
|
|
// Start heartbeat loop to maintain registration
|
|
setInterval(async () => {
|
|
await masterElection.updateProcessHeartbeat(masterElection.getNodeId());
|
|
}, 10000); // Every 10 seconds
|
|
```
|
|
|
|
#### Cluster Monitoring and Status
|
|
|
|
```typescript
|
|
// Monitor cluster status and health
|
|
async function monitorClusterHealth() {
|
|
// Get all active processes (TTL-filtered automatically)
|
|
const allProcesses = await masterElection.getAllProcesses();
|
|
console.log(`Active processes: ${allProcesses.length}`);
|
|
|
|
// Get current master
|
|
const masterProcess = await masterElection.getMasterProcess();
|
|
if (masterProcess) {
|
|
console.log(`Master: ${masterProcess.processId} (${masterProcess.capabilities.maxSubscriptions} max subscriptions)`);
|
|
}
|
|
|
|
// Get all slaves
|
|
const slaves = await masterElection.getSlaves();
|
|
console.log(`Slaves: ${slaves.length}`);
|
|
slaves.forEach(slave => {
|
|
console.log(` Slave ${slave.nodeId}: last seen ${slave.lastSeen}`);
|
|
});
|
|
|
|
// Check if this process is master
|
|
if (masterElection.getIsMaster()) {
|
|
console.log('This process is the master - coordinating cluster');
|
|
} else {
|
|
console.log('This process is a slave - following master');
|
|
}
|
|
}
|
|
|
|
// Run monitoring every 30 seconds
|
|
setInterval(monitorClusterHealth, 30000);
|
|
```
|
|
|
|
#### Graceful Shutdown Pattern
|
|
|
|
```typescript
|
|
// Graceful shutdown with proper cleanup
|
|
process.on('SIGTERM', async () => {
|
|
console.log('Shutting down master election...');
|
|
|
|
try {
|
|
// Stop election and cleanup resources
|
|
await masterElection.stop();
|
|
|
|
// Master automatically releases lock
|
|
// Process TTL will expire naturally
|
|
// Slaves will detect and trigger new election
|
|
|
|
console.log('Master election shutdown complete');
|
|
} catch (error) {
|
|
console.error('Error during election shutdown:', error);
|
|
}
|
|
|
|
process.exit(0);
|
|
});
|
|
```
|
|
|
|
### Error Handling and Recovery
|
|
|
|
#### Election Failure Scenarios
|
|
|
|
```typescript
|
|
// Handle various failure modes
|
|
masterElection.on('error', (error) => {
|
|
console.error('Master election error:', error.message);
|
|
|
|
// Common error types:
|
|
if (error.message.includes('Redis')) {
|
|
// Redis connection issues
|
|
console.log('Redis connectivity problem - will retry automatically');
|
|
|
|
} else if (error.message.includes('timeout')) {
|
|
// Election timeout
|
|
console.log('Election timeout - may indicate network issues');
|
|
|
|
} else if (error.message.includes('lock')) {
|
|
// Lock acquisition issues
|
|
console.log('Lock contention - normal during elections');
|
|
}
|
|
|
|
// Service continues running - election will retry automatically
|
|
});
|
|
|
|
// Handle network partitions
|
|
masterElection.on('master-lost', () => {
|
|
console.log('Lost master status - likely network partition or overload');
|
|
|
|
// Dependent services should gracefully step down
|
|
// New election will start automatically after random delay
|
|
});
|
|
```
|
|
|
|
#### Recovery Guarantees
|
|
|
|
- **Split-Brain Prevention**: Atomic Lua scripts ensure only one master exists
|
|
- **Automatic Failover**: New elections triggered immediately when master fails
|
|
- **TTL-Based Cleanup**: Processes automatically removed when heartbeats stop
|
|
- **State Recovery**: Process registry rebuilds automatically from active heartbeats
|
|
- **Event Consistency**: All role changes emit events for service coordination
|
|
|
|
### Integration with Dependent Services
|
|
|
|
The MasterElection service is designed to coordinate multiple backend services that need distributed leadership:
|
|
|
|
#### Camera Module Integration
|
|
- Master: Connects to Python ML workers, manages subscriptions
|
|
- Slaves: Process routed detection messages, forward commands
|
|
|
|
#### Display WebSocket Cluster
|
|
- Master: Manages WebSocket connection assignments across processes
|
|
- Slaves: Handle assigned display connections, route messages
|
|
|
|
#### Database Migration Coordination
|
|
- Master: Executes database migrations and schema changes
|
|
- Slaves: Wait for master to complete before proceeding
|
|
|
|
This specification provides a comprehensive understanding of the MasterElection service's distributed coordination capabilities and integration patterns for multi-process backend systems. |