feat: Refactor BedHardware to support both Serial and MQTT implementations

- Added MQTT support to BedHardware, allowing for connection to MQTT brokers.
- Created BedHardwareMQTT and BedHardwareSerial classes to handle respective connections.
- Introduced a unified BedHardwareConfig interface for configuration management.
- Implemented event forwarding from the underlying implementations to the BedHardware class.
- Added MQTT adapter for handling MQTT connections and message subscriptions.
- Updated package.json to include the mqtt library as a dependency.
- Created a singleton MQTTService for managing MQTT client instances.
- Enhanced error handling and logging throughout the BedHardware and MQTT classes.
This commit is contained in:
Siwat Sirichai 2025-06-21 14:55:10 +07:00
parent 0c5c7bcb5f
commit fb87e74ec9
8 changed files with 753 additions and 158 deletions

View file

@ -1,181 +1,126 @@
import { SerialPort } from 'serialport';
import { ReadlineParser } from '@serialport/parser-readline';
import { EventEmitter } from 'events';
import { IBedHardware, PinState, PinChange, BedHardwareConfig } from '../types/bedhardware';
import { BedHardwareSerial } from './BedHardwareSerial';
import { BedHardwareMQTT } from './BedHardwareMQTT';
export interface PinState {
pin: number;
state: number;
name: string;
timestamp: Date;
}
/**
* BedHardware - Factory class for creating bed hardware implementations
*
* Usage:
* // MQTT (connects to broker.hivemq.com with base topic /Jtkcp2N/pressurebed/)
* const hardware = BedHardware.createSimpleMQTT();
*
* // Serial
* const hardware = BedHardware.createSerial('COM3', 9600);
*
* // With custom topics
* const hardware = BedHardware.createMQTT({
* topics: {
* pinState: '/custom/pin/state',
* pinChange: '/custom/pin/change',
* initialization: '/custom/init'
* }
* });
*/
export class BedHardware extends EventEmitter implements IBedHardware {
private implementation: IBedHardware;
export interface PinChange {
pin: number;
previousState: number;
currentState: number;
timestamp: Date;
}
export class BedHardware extends EventEmitter {
private serialPort: SerialPort | null = null;
private parser: ReadlineParser | null = null;
private pinStates: Map<number, PinState> = new Map();
private isConnected: boolean = false;
constructor(private portPath: string, private baudRate: number = 9600) {
constructor(config: BedHardwareConfig) {
super();
if (config.type === 'serial') {
if (!config.serial?.portPath) {
throw new Error('Serial port path is required for serial connection');
}
this.implementation = new BedHardwareSerial(
config.serial.portPath,
config.serial.baudRate || 9600 );
} else if (config.type === 'mqtt') {
this.implementation = new BedHardwareMQTT({
topics: config.mqtt?.topics
});
} else {
throw new Error(`Unsupported hardware type: ${config.type}`);
}
// Forward all events from the implementation
this.forwardEvents();
}
private forwardEvents(): void {
const events = [
'connected',
'disconnected',
'error',
'initialized',
'pinInitialized',
'pinChanged'
];
events.forEach(event => {
this.implementation.on(event, (...args: unknown[]) => {
this.emit(event, ...args);
});
});
// Forward dynamic pin events (pin{number}Changed)
this.implementation.on('pinChanged', (pinChange: PinChange) => {
this.emit(`pin${pinChange.pin}Changed`, pinChange);
});
}
async connect(): Promise<void> {
try {
this.serialPort = new SerialPort({
path: this.portPath,
baudRate: this.baudRate,
autoOpen: false
});
this.parser = new ReadlineParser({ delimiter: '\n' });
this.serialPort.pipe(this.parser);
// Setup event handlers
this.serialPort.on('open', () => {
this.isConnected = true;
this.emit('connected');
console.log('Serial port opened');
});
this.serialPort.on('error', (error) => {
this.emit('error', error);
console.error('Serial port error:', error);
});
this.serialPort.on('close', () => {
this.isConnected = false;
this.emit('disconnected');
console.log('Serial port closed');
});
this.parser.on('data', (data: string) => {
this.handleSerialData(data.trim());
});
// Open the port
await new Promise<void>((resolve, reject) => {
this.serialPort!.open((error) => {
if (error) {
reject(error);
} else {
resolve();
}
});
});
} catch (error) {
throw new Error(`Failed to connect to ${this.portPath}: ${error}`);
}
return this.implementation.connect();
}
async disconnect(): Promise<void> {
if (this.serialPort && this.serialPort.isOpen) {
await new Promise<void>((resolve) => {
this.serialPort!.close(() => {
resolve();
});
});
}
this.serialPort = null;
this.parser = null;
this.isConnected = false;
}
private handleSerialData(data: string): void {
const parts = data.split(':');
if (parts[0] === 'INIT') {
if (parts[1] === 'START') {
this.emit('initialized');
console.log('Arduino initialization started');
} else if (parts.length >= 3) {
// INIT:PIN:STATE format
const pin = parseInt(parts[1]);
const state = parseInt(parts[2]);
if (!isNaN(pin) && !isNaN(state)) {
const pinState: PinState = {
pin,
state,
name: `PIN${pin}`,
timestamp: new Date()
};
this.pinStates.set(pin, pinState);
this.emit('pinInitialized', pinState);
}
}
} else if (parts[0] === 'CHANGE' && parts.length >= 4) {
// CHANGE:PIN:PREVIOUS_STATE:CURRENT_STATE format
const pin = parseInt(parts[1]);
const previousState = parseInt(parts[2]);
const currentState = parseInt(parts[3]);
if (!isNaN(pin) && !isNaN(previousState) && !isNaN(currentState)) {
const pinChange: PinChange = {
pin,
previousState,
currentState,
timestamp: new Date()
};
// Update stored pin state
const pinState: PinState = {
pin,
state: currentState,
name: `PIN${pin}`,
timestamp: new Date()
};
this.pinStates.set(pin, pinState);
this.emit('pinChanged', pinChange);
this.emit(`pin${pin}Changed`, pinChange);
}
}
return this.implementation.disconnect();
}
getPinState(pin: number): PinState | undefined {
return this.pinStates.get(pin);
return this.implementation.getPinState(pin);
}
getAllPinStates(): PinState[] {
return Array.from(this.pinStates.values());
return this.implementation.getAllPinStates();
}
isPortConnected(): boolean {
return this.isConnected && this.serialPort?.isOpen === true;
isConnected(): boolean {
return this.implementation.isConnected();
}
// Static method to list available serial ports
static async listPorts(): Promise<string[]> {
const ports = await SerialPort.list();
return ports.map(port => port.path);
// Static factory methods for convenience
static createSerial(portPath: string, baudRate: number = 9600): BedHardware {
return new BedHardware({
type: 'serial',
serial: { portPath, baudRate }
});
} static createMQTT(config?: {
topics?: {
pinState: string;
pinChange: string;
initialization: string;
};
}): BedHardware {
return new BedHardware({
type: 'mqtt',
mqtt: config
});
}
static createSimpleMQTT(): BedHardware {
return new BedHardware({
type: 'mqtt',
mqtt: {}
});
}
// Static method to list available serial ports (for serial implementation)
static async listSerialPorts(): Promise<string[]> {
return BedHardwareSerial.listPorts();
}
}
// Example usage:
/*
const bedHardware = new BedHardware('/dev/ttyUSB0', 9600);
bedHardware.on('connected', () => {
console.log('Connected to bed hardware');
});
bedHardware.on('pinChanged', (change: PinChange) => {
console.log(`Pin ${change.pin} changed from ${change.previousState} to ${change.currentState}`);
});
bedHardware.on('error', (error) => {
console.error('Hardware error:', error);
});
await bedHardware.connect();
*/
// Export all classes for direct access if needed
export { BedHardwareSerial, BedHardwareMQTT };
export * from '../types/bedhardware';

140
services/BedHardwareMQTT.ts Normal file
View file

@ -0,0 +1,140 @@
import { EventEmitter } from 'events';
import { IBedHardware, PinState, PinChange } from '../types/bedhardware';
import MQTT from '../adapter/mqtt';
import { getMQTTClient, BASE_TOPIC } from './mqttService';
export interface MqttConfig {
topics?: {
pinState: string;
pinChange: string;
initialization: string;
};
}
export class BedHardwareMQTT extends EventEmitter implements IBedHardware {
private client: MQTT | null = null;
private pinStates: Map<number, PinState> = new Map();
private connectionState: boolean = false;
private topics: {
pinState: string;
pinChange: string;
initialization: string;
};
constructor(private config: MqttConfig = {}) {
super();
this.topics = config.topics || {
pinState: `${BASE_TOPIC}pin/state`,
pinChange: `${BASE_TOPIC}pin/change`,
initialization: `${BASE_TOPIC}init`
};
}
async connect(): Promise<void> {
try {
// Use the MQTT service to get a client connected to HiveMQ
this.client = await getMQTTClient();
// Subscribe to topics - adapter handles reconnection/resubscription
await this.client.subscribe(this.topics.initialization, (topic, message) => {
this.handleMqttMessage(topic, message);
});
await this.client.subscribe(this.topics.pinState, (topic, message) => {
this.handleMqttMessage(topic, message);
});
await this.client.subscribe(this.topics.pinChange, (topic, message) => {
this.handleMqttMessage(topic, message);
});
this.connectionState = true;
this.emit('connected');
console.log('BedHardware MQTT connected to HiveMQ');
} catch (error) {
const errorMsg = `Failed to connect to MQTT broker: ${error}`;
console.error(errorMsg);
this.emit('error', new Error(errorMsg));
throw new Error(errorMsg);
}
} async disconnect(): Promise<void> {
if (this.client) {
await this.client.disconnect();
}
this.client = null;
this.connectionState = false;
this.emit('disconnected');
}
private handleMqttMessage(topic: string, message: string): void {
try {
const data = JSON.parse(message);
if (topic === this.topics.initialization) {
if (data.type === 'START') {
this.emit('initialized');
console.log('MQTT initialization started');
} else if (data.type === 'PIN_INIT' && data.pin !== undefined && data.state !== undefined) {
const pinState: PinState = {
pin: data.pin,
state: data.state,
name: data.name || `PIN${data.pin}`,
timestamp: new Date(data.timestamp || Date.now())
};
this.pinStates.set(data.pin, pinState);
this.emit('pinInitialized', pinState);
}
} else if (topic === this.topics.pinChange) {
if (data.pin !== undefined && data.previousState !== undefined && data.currentState !== undefined) {
const pinChange: PinChange = {
pin: data.pin,
previousState: data.previousState,
currentState: data.currentState,
timestamp: new Date(data.timestamp || Date.now())
};
// Update stored pin state
const pinState: PinState = {
pin: data.pin,
state: data.currentState,
name: data.name || `PIN${data.pin}`,
timestamp: new Date(data.timestamp || Date.now())
};
this.pinStates.set(data.pin, pinState);
this.emit('pinChanged', pinChange);
this.emit(`pin${data.pin}Changed`, pinChange);
}
} else if (topic === this.topics.pinState) {
if (data.pin !== undefined && data.state !== undefined) {
const pinState: PinState = {
pin: data.pin,
state: data.state,
name: data.name || `PIN${data.pin}`,
timestamp: new Date(data.timestamp || Date.now())
};
this.pinStates.set(data.pin, pinState);
this.emit('pinInitialized', pinState);
}
}
} catch (error) {
console.error('Error parsing MQTT message:', error);
this.emit('error', new Error(`Failed to parse MQTT message: ${error}`));
}
}
getPinState(pin: number): PinState | undefined {
return this.pinStates.get(pin);
}
getAllPinStates(): PinState[] {
return Array.from(this.pinStates.values());
}
isConnected(): boolean {
return this.connectionState;
}
}

View file

@ -0,0 +1,149 @@
import { SerialPort } from 'serialport';
import { ReadlineParser } from '@serialport/parser-readline';
import { EventEmitter } from 'events';
import { IBedHardware, PinState, PinChange } from '../types/bedhardware';
export class BedHardwareSerial extends EventEmitter implements IBedHardware {
private serialPort: SerialPort | null = null;
private parser: ReadlineParser | null = null;
private pinStates: Map<number, PinState> = new Map();
private connectionState: boolean = false;
constructor(private portPath: string, private baudRate: number = 9600) {
super();
}
async connect(): Promise<void> {
try {
this.serialPort = new SerialPort({
path: this.portPath,
baudRate: this.baudRate,
autoOpen: false
});
this.parser = new ReadlineParser({ delimiter: '\n' });
this.serialPort.pipe(this.parser);
// Setup event handlers
this.serialPort.on('open', () => {
this.connectionState = true;
this.emit('connected');
console.log('Serial port opened');
});
this.serialPort.on('error', (error) => {
this.emit('error', error);
console.error('Serial port error:', error);
});
this.serialPort.on('close', () => {
this.connectionState = false;
this.emit('disconnected');
console.log('Serial port closed');
});
this.parser.on('data', (data: string) => {
this.handleSerialData(data.trim());
});
// Open the port
await new Promise<void>((resolve, reject) => {
this.serialPort!.open((error) => {
if (error) {
reject(error);
} else {
resolve();
}
});
});
} catch (error) {
throw new Error(`Failed to connect to ${this.portPath}: ${error}`);
}
}
async disconnect(): Promise<void> {
if (this.serialPort && this.serialPort.isOpen) {
await new Promise<void>((resolve) => {
this.serialPort!.close(() => {
resolve();
});
});
}
this.serialPort = null;
this.parser = null;
this.connectionState = false;
}
private handleSerialData(data: string): void {
const parts = data.split(':');
if (parts[0] === 'INIT') {
if (parts[1] === 'START') {
this.emit('initialized');
console.log('Arduino initialization started');
} else if (parts.length >= 3) {
// INIT:PIN:STATE format
const pin = parseInt(parts[1]);
const state = parseInt(parts[2]);
if (!isNaN(pin) && !isNaN(state)) {
const pinState: PinState = {
pin,
state,
name: `PIN${pin}`,
timestamp: new Date()
};
this.pinStates.set(pin, pinState);
this.emit('pinInitialized', pinState);
}
}
} else if (parts[0] === 'CHANGE' && parts.length >= 4) {
// CHANGE:PIN:PREVIOUS_STATE:CURRENT_STATE format
const pin = parseInt(parts[1]);
const previousState = parseInt(parts[2]);
const currentState = parseInt(parts[3]);
if (!isNaN(pin) && !isNaN(previousState) && !isNaN(currentState)) {
const pinChange: PinChange = {
pin,
previousState,
currentState,
timestamp: new Date()
};
// Update stored pin state
const pinState: PinState = {
pin,
state: currentState,
name: `PIN${pin}`,
timestamp: new Date()
};
this.pinStates.set(pin, pinState);
this.emit('pinChanged', pinChange);
this.emit(`pin${pin}Changed`, pinChange);
}
}
}
getPinState(pin: number): PinState | undefined {
return this.pinStates.get(pin);
}
getAllPinStates(): PinState[] {
return Array.from(this.pinStates.values());
}
isConnected(): boolean {
return this.connectionState && this.serialPort?.isOpen === true;
}
// Static method to list available serial ports
static async listPorts(): Promise<string[]> {
const ports = await SerialPort.list();
return ports.map(port => port.path);
}
}

72
services/mqttService.ts Normal file
View file

@ -0,0 +1,72 @@
import MQTT, { MQTTConfig } from '../adapter/mqtt';
// Default MQTT configuration for HiveMQ broker
const defaultConfig: MQTTConfig = {
host: 'broker.hivemq.com',
port: 1883,
username: undefined,
password: undefined
};
export const BASE_TOPIC = '/Jtkcp2N/pressurebed/';
// Singleton MQTT client instance
let mqttInstance: MQTT | null = null;
export class MQTTService {
private static instance: MQTT | null = null;
static async initialize(config?: Partial<MQTTConfig>): Promise<MQTT> {
if (!MQTTService.instance) {
const finalConfig = { ...defaultConfig, ...config };
MQTTService.instance = new MQTT();
await MQTTService.instance.initialize(finalConfig);
}
return MQTTService.instance;
}
static getInstance(): MQTT | null {
return MQTTService.instance;
}
static async disconnect(): Promise<void> {
if (MQTTService.instance) {
await MQTTService.instance.disconnect();
MQTTService.instance = null;
}
}
}
// Factory function to get or create MQTT client
export async function getMQTTClient(config?: Partial<MQTTConfig>): Promise<MQTT> {
if (!mqttInstance) {
mqttInstance = new MQTT();
const finalConfig = { ...defaultConfig, ...config };
await mqttInstance.initialize(finalConfig);
}
return mqttInstance;
}
// Export the singleton instance getter
export function getMQTTInstance(): MQTT | null {
return mqttInstance;
}
// Cleanup function
export async function disconnectMQTT(): Promise<void> {
if (mqttInstance) {
await mqttInstance.disconnect();
mqttInstance = null;
}
}
// Export default configured client (lazy initialization)
const mqttService = {
async getClient(config?: Partial<MQTTConfig>): Promise<MQTT> {
return getMQTTClient(config);
},
getInstance: getMQTTInstance,
disconnect: disconnectMQTT
};
export default mqttService;