import mqtt, { type MqttClient } from "mqtt"; import type { QoS } from "mqtt-packet"; export interface MQTTConfig { host: string; port: number; username?: string; password?: string; } interface MQTTSubscription { topic: string; callback: (topic: string, message: string) => void; } export interface MQTTMessage { topic: string; payload: string; qos?: QoS; retain?: boolean; } class MQTT { private client: MqttClient | null = null; private config: MQTTConfig | null = null; private readonly subscriptions: MQTTSubscription[] = []; private isConnected: boolean = false; private keepAliveTimer: NodeJS.Timeout | null = null; async keepAlive(): Promise { if (this.isConnected) { return; } console.log("MQTT client is not connected, attempting to reconnect..."); await this.connectMQTT(); } async initialize(config: MQTTConfig): Promise { this.config = config; this.connectMQTT(); // Start keep-alive timer this.keepAliveTimer = setInterval(() => { this.keepAlive(); }, 5000); // Run every 5 seconds } async connectMQTT(): Promise { if (!this.config) { throw new Error("MQTT configuration is not set."); } try { this.client = mqtt.connect(`mqtt://${this.config.host}:${this.config.port}`, { username: this.config?.username, password: this.config?.password }); await this.setupHandler(); } catch (error) { console.error("Failed to connect to MQTT broker:", error); } } async publish(message: MQTTMessage): Promise { if (!this.client || !this.isConnected) { console.error("MQTT client is not connected, cannot publish message."); return; } const { topic, payload, qos = 0, retain = false } = message; try { await this.client.publishAsync(topic, payload, { qos, retain }); } catch (error) { console.error(`Failed to publish message to ${topic}:`, error); } } async subscribe(topic: string, callback: (topic: string, message: string) => void): Promise { const subscription: MQTTSubscription = { topic, callback }; this.subscriptions.push(subscription); if (this.client && this.isConnected) { try { await this.client.subscribeAsync(topic); } catch (error) { console.error(`Failed to subscribe to ${topic}:`, error); } } } async setupHandler(): Promise { if (!this.client) { console.error("MQTT client is not initialized."); return; } this.client.on("connect", this.onConnect.bind(this)); this.client.on("message", this.onMessage.bind(this)); this.client.on("disconnect", this.onDisconnect.bind(this)); } async disconnect(): Promise { if (this.keepAliveTimer) { clearInterval(this.keepAliveTimer); this.keepAliveTimer = null; } if (this.client) { await this.client.endAsync(); this.client = null; } this.isConnected = false; } private async onConnect(): Promise { console.log("MQTT connected successfully."); for (const sub of this.subscriptions) { if (this.client) { try { await this.client.subscribeAsync(sub.topic); } catch (error) { console.error(`Failed to subscribe to ${sub.topic}:`, error); } } } this.isConnected = true; } private onDisconnect(): void { console.log("MQTT disconnected."); this.isConnected = false; } private onMessage(topic: string, message: Buffer): void { const msg = message.toString(); console.log(`Received message on topic ${topic}:`, msg); this.subscriptions.forEach(sub => { if (this.matchTopic(sub.topic, topic)) { sub.callback(topic, msg); } }); } private matchTopic(subscriptionTopic: string, publishTopic: string): boolean { // Exact match if (subscriptionTopic === publishTopic) { return true; } // Convert MQTT wildcards to regex const regexPattern = subscriptionTopic .replace(/\+/g, '[^/]+') // + matches single level .replace(/#$/, '.*'); // # matches multi-level (only at end) const regex = new RegExp(`^${regexPattern}$`); return regex.test(publishTopic); } } export default MQTT;