m2-inno-bedpressure/adapter/mqtt.ts

159 lines
No EOL
4.7 KiB
TypeScript

import mqtt, { type MqttClient } from "mqtt";
import type { QoS } from "mqtt-packet";
export interface MQTTConfig {
host: string;
port: number;
username?: string;
password?: string;
}
interface MQTTSubscription {
topic: string;
callback: (topic: string, message: string) => void;
}
export interface MQTTMessage {
topic: string;
payload: string;
qos?: QoS;
retain?: boolean;
}
class MQTT {
private client: MqttClient | null = null;
private config: MQTTConfig | null = null;
private readonly subscriptions: MQTTSubscription[] = [];
private isConnected: boolean = false;
private keepAliveTimer: NodeJS.Timeout | null = null;
async keepAlive(): Promise<void> {
if (this.isConnected) {
return;
}
console.log("MQTT client is not connected, attempting to reconnect...");
await this.connectMQTT();
}
async initialize(config: MQTTConfig): Promise<void> {
this.config = config;
this.connectMQTT();
// Start keep-alive timer
this.keepAliveTimer = setInterval(() => {
this.keepAlive();
}, 5000); // Run every 5 seconds
}
async connectMQTT(): Promise<void> {
if (!this.config) {
throw new Error("MQTT configuration is not set.");
}
try {
this.client = mqtt.connect(`mqtt://${this.config.host}:${this.config.port}`, {
username: this.config?.username,
password: this.config?.password
});
await this.setupHandler();
} catch (error) {
console.error("Failed to connect to MQTT broker:", error);
}
}
async publish(message: MQTTMessage): Promise<void> {
if (!this.client || !this.isConnected) {
console.error("MQTT client is not connected, cannot publish message.");
return;
}
const { topic, payload, qos = 0, retain = false } = message;
try {
await this.client.publishAsync(topic, payload, { qos, retain });
} catch (error) {
console.error(`Failed to publish message to ${topic}:`, error);
}
}
async subscribe(topic: string, callback: (topic: string, message: string) => void): Promise<void> {
const subscription: MQTTSubscription = { topic, callback };
this.subscriptions.push(subscription);
if (this.client && this.isConnected) {
try {
await this.client.subscribeAsync(topic);
} catch (error) {
console.error(`Failed to subscribe to ${topic}:`, error);
}
}
}
async setupHandler(): Promise<void> {
if (!this.client) {
console.error("MQTT client is not initialized.");
return;
}
this.client.on("connect", this.onConnect.bind(this));
this.client.on("message", this.onMessage.bind(this));
this.client.on("disconnect", this.onDisconnect.bind(this));
}
async disconnect(): Promise<void> {
if (this.keepAliveTimer) {
clearInterval(this.keepAliveTimer);
this.keepAliveTimer = null;
}
if (this.client) {
await this.client.endAsync();
this.client = null;
}
this.isConnected = false;
}
private async onConnect(): Promise<void> {
console.log("MQTT connected successfully.");
for (const sub of this.subscriptions) {
if (this.client) {
try {
await this.client.subscribeAsync(sub.topic);
} catch (error) {
console.error(`Failed to subscribe to ${sub.topic}:`, error);
}
}
}
this.isConnected = true;
}
private onDisconnect(): void {
console.log("MQTT disconnected.");
this.isConnected = false;
}
private onMessage(topic: string, message: Buffer): void {
const msg = message.toString();
console.log(`Received message on topic ${topic}:`, msg);
this.subscriptions.forEach(sub => {
if (this.matchTopic(sub.topic, topic)) {
sub.callback(topic, msg);
}
});
}
private matchTopic(subscriptionTopic: string, publishTopic: string): boolean {
// Exact match
if (subscriptionTopic === publishTopic) {
return true;
}
// Convert MQTT wildcards to regex
const regexPattern = subscriptionTopic
.replace(/\+/g, '[^/]+') // + matches single level
.replace(/#$/, '.*'); // # matches multi-level (only at end)
const regex = new RegExp(`^${regexPattern}$`);
return regex.test(publishTopic);
}
}
export default MQTT;