feat: Add Swagger documentation support and restructure routes

- Added @elysiajs/swagger dependency to package.json for API documentation.
- Removed the old bed router and replaced it with a new history router.
- Created a new state router to manage WebSocket connections and state updates.
- Implemented a comprehensive state management system with the StateManager service.
- Introduced AlarmManagement and BedService services for handling alarms and sensor readings.
- Established a new MQTT service for managing MQTT connections and subscriptions.
- Created an AlarmStateStore to manage volatile alerts and their states.
- Defined FrontendState types for structured state management and WebSocket messaging.
This commit is contained in:
Siwat Sirichai 2025-06-21 18:56:34 +07:00
parent a767dc3635
commit 4ae5196ef1
12 changed files with 1189 additions and 1 deletions

148
services/AlarmManagement.ts Normal file
View file

@ -0,0 +1,148 @@
import { EventEmitter } from 'events';
import { AlarmStateStore, VolatileAlert } from '../store/AlarmStateStore';
import { MeasurementPointState } from '../types/FrontendState';
export class AlarmManagement extends EventEmitter {
private alarmStore: AlarmStateStore;
private measurementPoints: Map<string, MeasurementPointState> = new Map();
constructor() {
super();
this.alarmStore = new AlarmStateStore();
this.setupEventListeners();
}
private setupEventListeners(): void {
// Forward alarm store events
this.alarmStore.on('alertCreated', (alert: VolatileAlert) => {
this.emit('alertCreated', alert);
});
this.alarmStore.on('alertUpdated', (alert: VolatileAlert) => {
this.emit('alertUpdated', alert);
});
this.alarmStore.on('alertRemoved', (alert: VolatileAlert) => {
this.emit('alertRemoved', alert);
});
}
// Update measurement points for reference
updateMeasurementPoints(measurementPoints: Record<string, MeasurementPointState>): void {
this.measurementPoints.clear();
Object.values(measurementPoints).forEach(mp => {
this.measurementPoints.set(mp.id, mp);
});
}
// Process sensor reading and check for alerts
processSensorReading(sensorId: string, value: number, timestamp: Date): void {
// Find measurement point by sensorId
const measurementPoint = Array.from(this.measurementPoints.values())
.find(mp => mp.sensorId === sensorId);
if (!measurementPoint) {
console.warn(`No measurement point found for sensor: ${sensorId}`);
return;
}
this.checkAlerts(measurementPoint, value);
}
private checkAlerts(measurementPoint: MeasurementPointState, value: number): void {
const { id: pointId, warningThreshold, alarmThreshold, warningDelayMs, label, zone } = measurementPoint;
// Check if value exceeds alarm threshold (immediate alarm)
if (value >= alarmThreshold) {
this.alarmStore.clearWarningTimer(pointId);
this.alarmStore.createAlert(
pointId,
measurementPoint.sensorId,
'ALARM',
value,
alarmThreshold,
label,
zone
);
return;
}
// Check if value exceeds warning threshold
if (value >= warningThreshold) {
const existingAlert = this.alarmStore.getAlertByMeasurementPointId(pointId);
if (!existingAlert) {
// Create warning alert
this.alarmStore.createAlert(
pointId,
measurementPoint.sensorId,
'WARNING',
value,
warningThreshold,
label,
zone
);
// Set timer for warning to escalate to alarm
const timer = setTimeout(() => {
this.alarmStore.createAlert(
pointId,
measurementPoint.sensorId,
'ALARM',
value,
warningThreshold,
label,
zone
);
}, warningDelayMs);
this.alarmStore.setWarningTimer(pointId, timer);
}
} else {
// Value is below warning threshold, clear any alerts for this point
this.alarmStore.clearWarningTimer(pointId);
this.alarmStore.removeAlertsByMeasurementPointId(pointId);
}
}
// Get all active alerts
getActiveAlerts(): VolatileAlert[] {
return this.alarmStore.getAllAlerts();
}
// Get alerts in frontend state format
getAlertStates(): Record<string, import('../types/FrontendState').AlertState> {
return this.alarmStore.toAlertStates();
}
// Acknowledge alert
acknowledgeAlert(alertId: string): boolean {
return this.alarmStore.acknowledgeAlert(alertId);
}
// Silence alert
silenceAlert(alertId: string): boolean {
return this.alarmStore.silenceAlert(alertId);
}
// Get alert by ID
getAlert(alertId: string): VolatileAlert | undefined {
return this.alarmStore.getAlert(alertId);
}
// Get statistics
getStats() {
return this.alarmStore.getStats();
}
// Clear all alerts (for testing/reset)
clearAllAlerts(): void {
this.alarmStore.clearAll();
}
// Cleanup
cleanup(): void {
this.alarmStore.clearAll();
this.removeAllListeners();
}
}

228
services/BedService.ts Normal file
View file

@ -0,0 +1,228 @@
import { PrismaClient, type MeasurementPoint } from '../generated/prisma';
import { EventEmitter } from 'events';
import MQTT from '../adapter/mqtt';
import { getMQTTClient, BASE_TOPIC } from './mqttService';
import { AlarmManagement } from './AlarmManagement';
import { VolatileAlert } from '../store/AlarmStateStore';
export interface SensorReading {
sensorId: string;
value: number;
timestamp: Date;
}
export interface AlertConfig {
warningThreshold: number;
alarmThreshold: number;
warningDelayMs: number;
}
export class BedService extends EventEmitter {
private prisma: PrismaClient;
private mqtt: MQTT | null = null;
private alarmManagement: AlarmManagement;
private baseTopic = `${BASE_TOPIC}pressure`;
constructor() {
super();
this.prisma = new PrismaClient();
this.alarmManagement = new AlarmManagement();
this.setupAlarmEventListeners();
}
private setupAlarmEventListeners(): void {
// Forward alarm events
this.alarmManagement.on('alertCreated', (alert: VolatileAlert) => {
this.emit('alert', alert);
});
this.alarmManagement.on('alertUpdated', (alert: VolatileAlert) => {
this.emit('alert', alert);
});
this.alarmManagement.on('alertRemoved', (alert: VolatileAlert) => {
this.emit('alertRemoved', alert);
});
}
async initialize(mqttConfig?: { host?: string; port?: number; username?: string; password?: string }): Promise<void> {
try {
// Use mqttService to get initialized MQTT client
this.mqtt = await getMQTTClient(mqttConfig);
// Subscribe to sensor data topic
await this.mqtt.subscribe(`${this.baseTopic}/+/data`, (topic, message) => {
this.handleSensorData(topic, message);
});
console.log('BedService initialized successfully');
this.emit('initialized');
} catch (error) {
console.error('Failed to initialize BedService:', error);
throw error;
}
}
private handleSensorData(topic: string, message: string): void {
try {
// Extract sensor ID from topic: bed/pressure/{sensorId}/data
const sensorId = topic.split('/')[2];
const data = JSON.parse(message);
const reading: SensorReading = {
sensorId,
value: data.value,
timestamp: new Date(data.timestamp || Date.now())
};
this.processSensorReading(reading);
} catch (error) {
console.error('Error processing MQTT sensor data:', error);
this.emit('error', error);
}
}
private async processSensorReading(reading: SensorReading): Promise<void> {
try {
// Find measurement point
const measurementPoint = await this.prisma.measurementPoint.findUnique({
where: { sensorId: reading.sensorId }
});
if (!measurementPoint) {
console.warn(`Unknown sensor ID: ${reading.sensorId}`);
return;
}
// Store sensor data
await this.prisma.measurementPointData.create({
data: {
measurementPointId: measurementPoint.id,
value: reading.value,
timestamp: reading.timestamp,
time: reading.timestamp.toISOString()
}
});
// Let alarm management handle alerts
this.alarmManagement.processSensorReading(reading.sensorId, reading.value, reading.timestamp);
this.emit('sensorReading', reading);
} catch (error) {
console.error('Error processing sensor reading:', error);
this.emit('error', error);
}
}
// Public API methods
async createMeasurementPoint(data: {
sensorId: string;
label: string;
zone: string;
x: number;
y: number;
pin: number;
warningThreshold: number;
alarmThreshold: number;
warningDelayMs: number;
}): Promise<MeasurementPoint> {
return this.prisma.measurementPoint.create({ data });
}
async getMeasurementPoints(): Promise<MeasurementPoint[]> {
const points = await this.prisma.measurementPoint.findMany({
orderBy: { zone: 'asc' }
});
// Update alarm management with current measurement points
const pointsRecord: Record<string, {
id: string;
sensorId: string;
label: string;
zone: string;
x: number;
y: number;
pin: number;
warningThreshold: number;
alarmThreshold: number;
warningDelayMs: number;
currentValue: number;
lastUpdateTime: Date;
status: 'offline';
}> = {};
points.forEach(point => {
pointsRecord[point.id] = {
id: point.id,
sensorId: point.sensorId,
label: point.label,
zone: point.zone, x: point.x,
y: point.y,
pin: point.pin,
warningThreshold: point.warningThreshold,
alarmThreshold: point.alarmThreshold,
warningDelayMs: point.warningDelayMs,
currentValue: 0,
lastUpdateTime: new Date(),
status: 'offline' as const
};
});
this.alarmManagement.updateMeasurementPoints(pointsRecord);
return points;
}
async getMeasurementPointData(sensorId: string, limit = 100) {
const measurementPoint = await this.prisma.measurementPoint.findUnique({
where: { sensorId }
});
if (!measurementPoint) {
throw new Error(`Measurement point not found for sensor: ${sensorId}`);
}
return this.prisma.measurementPointData.findMany({
where: { measurementPointId: measurementPoint.id },
orderBy: { timestamp: 'desc' },
take: limit
});
}
// Get active alerts from alarm management (volatile)
getActiveAlerts(): VolatileAlert[] {
return this.alarmManagement.getActiveAlerts();
}
// Acknowledge alert in volatile store
acknowledgeAlert(alertId: string): boolean {
return this.alarmManagement.acknowledgeAlert(alertId);
}
// Silence alert in volatile store
silenceAlert(alertId: string): boolean {
return this.alarmManagement.silenceAlert(alertId);
}
async updateAlertConfig(sensorId: string, config: AlertConfig): Promise<MeasurementPoint> {
return this.prisma.measurementPoint.update({
where: { sensorId },
data: {
warningThreshold: config.warningThreshold,
alarmThreshold: config.alarmThreshold,
warningDelayMs: config.warningDelayMs
}
});
}
async disconnect(): Promise<void> {
// Cleanup alarm management
this.alarmManagement.cleanup();
// Disconnect MQTT
if (this.mqtt) {
await this.mqtt.disconnect();
}
// Disconnect Prisma
await this.prisma.$disconnect();
this.emit('disconnected');
}
}

232
services/StateManager.ts Normal file
View file

@ -0,0 +1,232 @@
import { EventEmitter } from 'events';
import { FrontendState, MeasurementPointState, AlertState, SystemStatus, StateUpdateEvent } from '../types/FrontendState';
import { BedService } from './BedService';
import { VolatileAlert } from '../store/AlarmStateStore';
import { PrismaClient } from '../generated/prisma';
export class StateManager extends EventEmitter {
private state: FrontendState;
private prisma: PrismaClient;
constructor(private bedService: BedService) {
super();
this.prisma = new PrismaClient(); // Initialize empty state
this.state = {
measurementPoints: {},
alerts: {},
system: {
mqttConnected: false,
databaseConnected: false,
lastHeartbeat: new Date(),
activeConnections: 0,
totalMeasurementPoints: 0,
activeSensors: 0
}
};
this.setupEventListeners();
}
private setupEventListeners(): void {
// Listen to BedService events
this.bedService.on('sensorReading', (reading) => {
this.updateSensorReading(reading.sensorId, reading.value, reading.timestamp);
});
this.bedService.on('alert', (alert) => {
this.updateAlert(alert);
});
this.bedService.on('initialized', () => {
this.updateSystemStatus({ mqttConnected: true });
});
this.bedService.on('disconnected', () => {
this.updateSystemStatus({ mqttConnected: false });
});
}
// Get current state (read-only)
getState(): Readonly<FrontendState> {
return { ...this.state };
}
// Initialize state from database
async initializeState(): Promise<void> {
try { // Load measurement points
const measurementPoints = await this.bedService.getMeasurementPoints();
const measurementPointStates: Record<string, MeasurementPointState> = {};
for (const mp of measurementPoints) { measurementPointStates[mp.id] = {
id: mp.id,
sensorId: mp.sensorId,
label: mp.label,
zone: mp.zone,
x: mp.x ?? 0,
y: mp.y ?? 0,
pin: mp.pin ?? 0,
currentValue: 0,
lastUpdateTime: new Date(),
warningThreshold: mp.warningThreshold,
alarmThreshold: mp.alarmThreshold,
warningDelayMs: mp.warningDelayMs,
status: 'offline'
};
} // Load active alerts
const alerts = await this.bedService.getActiveAlerts();
const alertStates: Record<string, AlertState> = {};
for (const alert of alerts) {
const measurementPoint = measurementPointStates[alert.measurementPointId];
alertStates[alert.id] = {
id: alert.id,
measurementPointId: alert.measurementPointId,
type: alert.type,
value: alert.value,
threshold: alert.threshold,
acknowledged: alert.acknowledged,
silenced: alert.silenced,
startTime: alert.startTime,
endTime: alert.endTime ?? undefined,
sensorLabel: measurementPoint?.label || 'Unknown',
zone: measurementPoint?.zone || 'Unknown'
};
} // Update state
this.state.measurementPoints = measurementPointStates;
this.state.alerts = alertStates;
this.state.system.totalMeasurementPoints = measurementPoints.length;
this.state.system.databaseConnected = true;
this.emitStateUpdate('FULL_STATE', this.state);
} catch (error) {
console.error('Failed to initialize state:', error);
this.state.system.databaseConnected = false;
}
}
// Update sensor reading
updateSensorReading(sensorId: string, value: number, timestamp: Date): void {
// Find measurement point by sensorId
const measurementPoint = Object.values(this.state.measurementPoints)
.find(mp => mp.sensorId === sensorId);
if (!measurementPoint) return;
// Determine status based on thresholds
let status: 'normal' | 'warning' | 'alarm' | 'offline' = 'normal';
if (value >= measurementPoint.alarmThreshold) {
status = 'alarm';
} else if (value >= measurementPoint.warningThreshold) {
status = 'warning';
} // Update measurement point state
this.state.measurementPoints[measurementPoint.id] = {
...measurementPoint,
currentValue: value,
lastUpdateTime: timestamp,
status
};
// Update system stats
this.updateActiveSensors();
this.emitStateUpdate('SENSOR_UPDATE', this.state.measurementPoints[measurementPoint.id]);
}
// Update alert
updateAlert(alert: Alert & { measurementPoint: MeasurementPoint }): void {
const alertState: AlertState = {
id: alert.id,
measurementPointId: alert.measurementPointId,
type: alert.type,
value: alert.value,
threshold: alert.threshold,
acknowledged: alert.acknowledged,
silenced: alert.silenced,
startTime: alert.startTime,
endTime: alert.endTime || undefined,
sensorLabel: alert.measurementPoint.label,
zone: alert.measurementPoint.zone
}; this.state.alerts[alert.id] = alertState;
this.emitStateUpdate('ALERT_UPDATE', alertState);
}
// Remove alert (when closed)
removeAlert(alertId: string): void {
if (this.state.alerts[alertId]) {
delete this.state.alerts[alertId];
this.emitStateUpdate('PARTIAL_UPDATE', { alerts: this.state.alerts });
}
}
// Update system status
updateSystemStatus(updates: Partial<SystemStatus>): void {
this.state.system = {
...this.state.system,
...updates,
lastHeartbeat: new Date()
};
this.emitStateUpdate('SYSTEM_UPDATE', this.state.system);
}
// Update active sensors count
private updateActiveSensors(): void {
const now = new Date();
const fiveMinutesAgo = new Date(now.getTime() - 5 * 60 * 1000);
const activeSensors = Object.values(this.state.measurementPoints)
.filter(mp => mp.lastUpdateTime > fiveMinutesAgo).length;
this.state.system.activeSensors = activeSensors;
}
// Update connection count (for WebSocket clients)
updateConnectionCount(count: number): void {
this.state.system.activeConnections = count;
this.emitStateUpdate('SYSTEM_UPDATE', this.state.system);
}
// Acknowledge alert
async acknowledgeAlert(alertId: string): Promise<void> {
try {
await this.bedService.acknowledgeAlert(alertId);
if (this.state.alerts[alertId]) {
this.state.alerts[alertId].acknowledged = true;
this.emitStateUpdate('ALERT_UPDATE', this.state.alerts[alertId]);
}
} catch (error) {
console.error('Failed to acknowledge alert:', error);
}
}
// Silence alert
async silenceAlert(alertId: string): Promise<void> {
try {
await this.bedService.silenceAlert(alertId);
if (this.state.alerts[alertId]) {
this.state.alerts[alertId].silenced = true;
this.emitStateUpdate('ALERT_UPDATE', this.state.alerts[alertId]);
}
} catch (error) {
console.error('Failed to silence alert:', error);
}
}
// Emit state update event
private emitStateUpdate(type: StateUpdateEvent['type'], data: StateUpdateEvent['data']): void {
const event: StateUpdateEvent = {
type,
timestamp: new Date(),
data
};
this.emit('stateUpdate', event);
}
// Cleanup
async disconnect(): Promise<void> {
await this.prisma.$disconnect();
this.removeAllListeners();
}
}

72
services/mqttService.ts Normal file
View file

@ -0,0 +1,72 @@
import MQTT, { MQTTConfig } from '../adapter/mqtt';
// Default MQTT configuration for HiveMQ broker
const defaultConfig: MQTTConfig = {
host: 'broker.hivemq.com',
port: 1883,
username: undefined,
password: undefined
};
export const BASE_TOPIC = '/Jtkcp2N/pressurebed/';
// Singleton MQTT client instance
let mqttInstance: MQTT | null = null;
export class MQTTService {
private static instance: MQTT | null = null;
static async initialize(config?: Partial<MQTTConfig>): Promise<MQTT> {
if (!MQTTService.instance) {
const finalConfig = { ...defaultConfig, ...config };
MQTTService.instance = new MQTT();
await MQTTService.instance.initialize(finalConfig);
}
return MQTTService.instance;
}
static getInstance(): MQTT | null {
return MQTTService.instance;
}
static async disconnect(): Promise<void> {
if (MQTTService.instance) {
await MQTTService.instance.disconnect();
MQTTService.instance = null;
}
}
}
// Factory function to get or create MQTT client
export async function getMQTTClient(config?: Partial<MQTTConfig>): Promise<MQTT> {
if (!mqttInstance) {
mqttInstance = new MQTT();
const finalConfig = { ...defaultConfig, ...config };
await mqttInstance.initialize(finalConfig);
}
return mqttInstance;
}
// Export the singleton instance getter
export function getMQTTInstance(): MQTT | null {
return mqttInstance;
}
// Cleanup function
export async function disconnectMQTT(): Promise<void> {
if (mqttInstance) {
await mqttInstance.disconnect();
mqttInstance = null;
}
}
// Export default configured client (lazy initialization)
const mqttService = {
async getClient(config?: Partial<MQTTConfig>): Promise<MQTT> {
return getMQTTClient(config);
},
getInstance: getMQTTInstance,
disconnect: disconnectMQTT
};
export default mqttService;