m2-inno-bedpressure/services/BedHardwareMQTT.ts

139 lines
No EOL
4.4 KiB
TypeScript

import { EventEmitter } from 'events';
import { IBedHardware, PinState, PinChange } from '../types/bedhardware';
import MQTT from '../adapter/mqtt';
import { getMQTTClient, BASE_TOPIC } from './mqttService';
export interface MqttConfig {
topics?: {
pinState: string;
pinChange: string;
initialization: string;
};
}
export class BedHardwareMQTT extends EventEmitter implements IBedHardware {
private client: MQTT | null = null;
private pinStates: Map<number, PinState> = new Map();
private connectionState: boolean = false;
private topics: {
pinState: string;
pinChange: string;
initialization: string;
};
constructor(private config: MqttConfig = {}) {
super();
this.topics = config.topics || {
pinState: `${BASE_TOPIC}pin/state`,
pinChange: `${BASE_TOPIC}pin/change`,
initialization: `${BASE_TOPIC}init`
};
}
async connect(): Promise<void> {
try {
// Use the MQTT service to get a client connected to HiveMQ
this.client = await getMQTTClient();
// Subscribe to topics - adapter handles reconnection/resubscription
await this.client.subscribe(this.topics.initialization, (topic, message) => {
this.handleMqttMessage(topic, message);
});
await this.client.subscribe(this.topics.pinState, (topic, message) => {
this.handleMqttMessage(topic, message);
});
await this.client.subscribe(this.topics.pinChange, (topic, message) => {
this.handleMqttMessage(topic, message);
});
this.connectionState = true;
this.emit('connected');
console.log('BedHardware MQTT connected to HiveMQ');
} catch (error) {
const errorMsg = `Failed to connect to MQTT broker: ${error}`;
console.error(errorMsg);
this.emit('error', new Error(errorMsg));
throw new Error(errorMsg);
}
} async disconnect(): Promise<void> {
if (this.client) {
await this.client.disconnect();
}
this.client = null;
this.connectionState = false;
this.emit('disconnected');
}
private handleMqttMessage(topic: string, message: string): void {
try {
const data = JSON.parse(message);
if (topic === this.topics.initialization) {
if (data.type === 'START') {
this.emit('initialized');
console.log('MQTT initialization started');
} else if (data.type === 'PIN_INIT' && data.pin !== undefined && data.state !== undefined) {
const pinState: PinState = {
pin: data.pin,
state: data.state,
name: data.name || `PIN${data.pin}`,
timestamp: new Date(data.timestamp || Date.now())
};
this.pinStates.set(data.pin, pinState);
this.emit('pinInitialized', pinState);
} } else if (topic === this.topics.pinChange) {
if (data.pin !== undefined && data.previousValue !== undefined && data.currentValue !== undefined) {
const pinChange: PinChange = {
pin: data.pin,
previousState: data.previousValue,
currentState: data.currentValue,
timestamp: new Date(data.timestamp || Date.now())
};
// Update stored pin state
const pinState: PinState = {
pin: data.pin,
state: data.currentValue,
name: data.name || `PIN${data.pin}`,
timestamp: new Date(data.timestamp || Date.now())
};
this.pinStates.set(data.pin, pinState);
this.emit('pinChanged', pinChange);
this.emit(`pin${data.pin}Changed`, pinChange);
}
} else if (topic === this.topics.pinState) {
if (data.pin !== undefined && data.state !== undefined) {
const pinState: PinState = {
pin: data.pin,
state: data.state,
name: data.name || `PIN${data.pin}`,
timestamp: new Date(data.timestamp || Date.now())
};
this.pinStates.set(data.pin, pinState);
this.emit('pinInitialized', pinState);
}
}
} catch (error) {
console.error('Error parsing MQTT message:', error);
this.emit('error', new Error(`Failed to parse MQTT message: ${error}`));
}
}
getPinState(pin: number): PinState | undefined {
return this.pinStates.get(pin);
}
getAllPinStates(): PinState[] {
return Array.from(this.pinStates.values());
}
isConnected(): boolean {
return this.connectionState;
}
}