159 lines
No EOL
4.7 KiB
TypeScript
159 lines
No EOL
4.7 KiB
TypeScript
import mqtt, { type MqttClient } from "mqtt";
|
|
import type { QoS } from "mqtt-packet";
|
|
|
|
export interface MQTTConfig {
|
|
host: string;
|
|
port: number;
|
|
username?: string;
|
|
password?: string;
|
|
}
|
|
|
|
interface MQTTSubscription {
|
|
topic: string;
|
|
callback: (topic: string, message: string) => void;
|
|
}
|
|
|
|
export interface MQTTMessage {
|
|
topic: string;
|
|
payload: string;
|
|
qos?: QoS;
|
|
retain?: boolean;
|
|
}
|
|
|
|
class MQTT {
|
|
private client: MqttClient | null = null;
|
|
private config: MQTTConfig | null = null;
|
|
private readonly subscriptions: MQTTSubscription[] = [];
|
|
private isConnected: boolean = false;
|
|
private keepAliveTimer: NodeJS.Timeout | null = null;
|
|
|
|
async keepAlive(): Promise<void> {
|
|
if (this.isConnected) {
|
|
return;
|
|
}
|
|
console.log("MQTT client is not connected, attempting to reconnect...");
|
|
await this.connectMQTT();
|
|
}
|
|
|
|
async initialize(config: MQTTConfig): Promise<void> {
|
|
this.config = config;
|
|
this.connectMQTT();
|
|
|
|
// Start keep-alive timer
|
|
this.keepAliveTimer = setInterval(() => {
|
|
this.keepAlive();
|
|
}, 5000); // Run every 5 seconds
|
|
}
|
|
|
|
async connectMQTT(): Promise<void> {
|
|
if (!this.config) {
|
|
throw new Error("MQTT configuration is not set.");
|
|
}
|
|
try {
|
|
this.client = mqtt.connect(`mqtt://${this.config.host}:${this.config.port}`, {
|
|
username: this.config?.username,
|
|
password: this.config?.password
|
|
});
|
|
await this.setupHandler();
|
|
} catch (error) {
|
|
console.error("Failed to connect to MQTT broker:", error);
|
|
}
|
|
}
|
|
|
|
async publish(message: MQTTMessage): Promise<void> {
|
|
if (!this.client || !this.isConnected) {
|
|
console.error("MQTT client is not connected, cannot publish message.");
|
|
return;
|
|
}
|
|
|
|
const { topic, payload, qos = 0, retain = false } = message;
|
|
try {
|
|
await this.client.publishAsync(topic, payload, { qos, retain });
|
|
} catch (error) {
|
|
console.error(`Failed to publish message to ${topic}:`, error);
|
|
}
|
|
}
|
|
|
|
async subscribe(topic: string, callback: (topic: string, message: string) => void): Promise<void> {
|
|
const subscription: MQTTSubscription = { topic, callback };
|
|
this.subscriptions.push(subscription);
|
|
|
|
if (this.client && this.isConnected) {
|
|
try {
|
|
await this.client.subscribeAsync(topic);
|
|
} catch (error) {
|
|
console.error(`Failed to subscribe to ${topic}:`, error);
|
|
}
|
|
}
|
|
}
|
|
|
|
async setupHandler(): Promise<void> {
|
|
if (!this.client) {
|
|
console.error("MQTT client is not initialized.");
|
|
return;
|
|
}
|
|
|
|
this.client.on("connect", this.onConnect.bind(this));
|
|
this.client.on("message", this.onMessage.bind(this));
|
|
this.client.on("disconnect", this.onDisconnect.bind(this));
|
|
}
|
|
|
|
async disconnect(): Promise<void> {
|
|
if (this.keepAliveTimer) {
|
|
clearInterval(this.keepAliveTimer);
|
|
this.keepAliveTimer = null;
|
|
}
|
|
|
|
if (this.client) {
|
|
await this.client.endAsync();
|
|
this.client = null;
|
|
}
|
|
this.isConnected = false;
|
|
}
|
|
|
|
private async onConnect(): Promise<void> {
|
|
console.log("MQTT connected successfully.");
|
|
for (const sub of this.subscriptions) {
|
|
if (this.client) {
|
|
try {
|
|
await this.client.subscribeAsync(sub.topic);
|
|
} catch (error) {
|
|
console.error(`Failed to subscribe to ${sub.topic}:`, error);
|
|
}
|
|
}
|
|
}
|
|
this.isConnected = true;
|
|
}
|
|
|
|
private onDisconnect(): void {
|
|
console.log("MQTT disconnected.");
|
|
this.isConnected = false;
|
|
}
|
|
|
|
private onMessage(topic: string, message: Buffer): void {
|
|
const msg = message.toString();
|
|
console.log(`Received message on topic ${topic}:`, msg);
|
|
this.subscriptions.forEach(sub => {
|
|
if (this.matchTopic(sub.topic, topic)) {
|
|
sub.callback(topic, msg);
|
|
}
|
|
});
|
|
}
|
|
|
|
private matchTopic(subscriptionTopic: string, publishTopic: string): boolean {
|
|
// Exact match
|
|
if (subscriptionTopic === publishTopic) {
|
|
return true;
|
|
}
|
|
|
|
// Convert MQTT wildcards to regex
|
|
const regexPattern = subscriptionTopic
|
|
.replace(/\+/g, '[^/]+') // + matches single level
|
|
.replace(/#$/, '.*'); // # matches multi-level (only at end)
|
|
|
|
const regex = new RegExp(`^${regexPattern}$`);
|
|
return regex.test(publishTopic);
|
|
}
|
|
}
|
|
|
|
export default MQTT; |