From d059b806829ecee76f22eef682bf1a0e5f27f97b Mon Sep 17 00:00:00 2001 From: Siwat Sirichai Date: Tue, 19 Aug 2025 00:52:18 +0700 Subject: [PATCH] feat: Implement SMTP over WebSocket client with error handling and Nodemailer transport - Added error classes for various SMTP-related issues (ConnectionError, AuthenticationError, etc.) in `errors.ts`. - Created main entry point for the SMTP over WebSocket client library in `index.ts`, exporting client, types, errors, and transport. - Developed Nodemailer transport adapter for SMTP over WebSocket in `transport.ts`, including methods for sending mail and verifying transport configuration. - Defined type definitions for the SMTP over WebSocket protocol in `types.ts`, including message types, connection states, and client configuration options. --- .gitignore | 1 - src/client.ts | 951 +++++++++++++++++++++++++++++++++++++++++++++++ src/errors.ts | 207 +++++++++++ src/index.ts | 63 ++++ src/transport.ts | 352 ++++++++++++++++++ src/types.ts | 263 +++++++++++++ 6 files changed, 1836 insertions(+), 1 deletion(-) create mode 100644 src/client.ts create mode 100644 src/errors.ts create mode 100644 src/index.ts create mode 100644 src/transport.ts create mode 100644 src/types.ts diff --git a/.gitignore b/.gitignore index 0b96fe4..d0c3c16 100644 --- a/.gitignore +++ b/.gitignore @@ -8,7 +8,6 @@ yarn-error.log* lib/ dist/ *.tsbuildinfo -src/ # Coverage directory used by tools like istanbul coverage/ diff --git a/src/client.ts b/src/client.ts new file mode 100644 index 0000000..ef574b6 --- /dev/null +++ b/src/client.ts @@ -0,0 +1,951 @@ +/** + * @fileoverview SMTP over WebSocket client with intelligent queue management + */ + +import WebSocket from 'ws'; +import { EventEmitter } from 'events'; +import { + SMTPOverWsMessage, + SMTPOverWsMessageType, + ConnectionState, + QueuedMessage, + MessagePriority, + SMTPClientConfig, + Logger, + ClientStats, + ClientEvents, + SendOptions, + AuthenticateMessage, + SMTPChannelOpenMessage, + SMTPToServerMessage, + AuthenticateResponseMessage, + SMTPChannelErrorMessage, + SMTPFromServerMessage +} from './types'; +import { + SMTPWSError, + ConnectionError, + AuthenticationError, + ChannelError, + TimeoutError, + QueueError, + MessageError, + ShutdownError, + ErrorFactory +} from './errors'; + +/** + * Default logger implementation + */ +class DefaultLogger implements Logger { + constructor(private enableDebug: boolean = false) {} + + debug(message: string, ...args: any[]): void { + if (this.enableDebug) { + console.debug(`[SMTP-WS-DEBUG] ${message}`, ...args); + } + } + + info(message: string, ...args: any[]): void { + if (this.enableDebug) { + console.info(`[SMTP-WS-INFO] ${message}`, ...args); + } + } + + warn(message: string, ...args: any[]): void { + if (this.enableDebug) { + console.warn(`[SMTP-WS-WARN] ${message}`, ...args); + } + } + + error(message: string, ...args: any[]): void { + if (this.enableDebug) { + console.error(`[SMTP-WS-ERROR] ${message}`, ...args); + } + } +} + +/** + * SMTP over WebSocket client with intelligent queue management and automatic reconnection + */ +export class SMTPOverWSClient extends EventEmitter { + private config: Required; + private ws: WebSocket | null = null; + private state: ConnectionState = ConnectionState.DISCONNECTED; + private messageQueue: QueuedMessage[] = []; + private currentMessage: QueuedMessage | null = null; + private reconnectAttempts = 0; + private reconnectTimer: NodeJS.Timeout | null = null; + private authTimer: NodeJS.Timeout | null = null; + private channelTimer: NodeJS.Timeout | null = null; + private heartbeatTimer: NodeJS.Timeout | null = null; + private messageTimer: NodeJS.Timeout | null = null; + private isProcessingQueue = false; + private isShuttingDown = false; + private logger: Logger; + private stats: ClientStats; + private connectionStartTime: number = 0; + private messageIdCounter = 0; + + constructor(config: SMTPClientConfig) { + super(); + + // Validate configuration + this.validateConfig(config); + + // Set defaults + this.config = { + reconnectInterval: 5000, + maxReconnectAttempts: 10, + authTimeout: 30000, + channelTimeout: 10000, + messageTimeout: 60000, + maxConcurrentMessages: 1, + debug: false, + heartbeatInterval: 30000, + maxQueueSize: 1000, + ...config, + logger: config.logger || new DefaultLogger(config.debug ?? false) + }; + + this.logger = this.config.logger; + this.stats = this.initializeStats(); + + this.logger.debug('SMTP WebSocket client initialized', { + url: this.config.url, + maxQueueSize: this.config.maxQueueSize + }); + } + + /** + * Send SMTP command with automatic queue management + */ + public async sendSMTPCommand(data: string, options: SendOptions = {}): Promise { + if (this.isShuttingDown) { + throw new ShutdownError('Client is shutting down'); + } + + // Check queue size limit + if (this.messageQueue.length >= this.config.maxQueueSize) { + throw ErrorFactory.queueError( + `Queue is full (${this.config.maxQueueSize} messages)`, + this.messageQueue.length + ); + } + + const messageId = this.generateMessageId(); + const priority = options.priority ?? MessagePriority.NORMAL; + const timeout = options.timeout ?? this.config.messageTimeout; + const retries = options.retries ?? 3; + + return new Promise((resolve, reject) => { + const queuedMessage: QueuedMessage = { + id: messageId, + data, + resolve, + reject, + timestamp: Date.now(), + retries, + priority + }; + + // Insert message based on priority + this.insertMessageByPriority(queuedMessage); + + this.stats.messagesQueued++; + this.emit('messageQueued', messageId, this.messageQueue.length); + + this.logger.debug('Message queued', { + messageId, + priority, + queueSize: this.messageQueue.length, + data: data.substring(0, 100) + (data.length > 100 ? '...' : '') + }); + + // Set message timeout + setTimeout(() => { + if (this.messageQueue.includes(queuedMessage) || this.currentMessage === queuedMessage) { + const error = ErrorFactory.timeout('Message', timeout, { messageId }); + this.removeMessageFromQueue(messageId); + reject(error); + } + }, timeout); + + // Start processing if not already running + if (!this.isProcessingQueue) { + this.processQueue().catch(error => { + this.logger.error('Queue processing failed', error); + }); + } + }); + } + + /** + * Get current client statistics + */ + public getStats(): ClientStats { + return { + ...this.stats, + queueSize: this.messageQueue.length, + connectionUptime: this.connectionStartTime > 0 ? Date.now() - this.connectionStartTime : 0 + }; + } + + /** + * Get current connection state + */ + public getConnectionState(): ConnectionState { + return this.state; + } + + /** + * Get current queue size + */ + public getQueueSize(): number { + return this.messageQueue.length; + } + + /** + * Clear all queued messages + */ + public clearQueue(): void { + const clearedCount = this.messageQueue.length; + + // Reject all queued messages + for (const message of this.messageQueue) { + message.reject(new QueueError('Queue cleared')); + } + + this.messageQueue = []; + this.logger.info('Queue cleared', { clearedCount }); + } + + /** + * Graceful shutdown of the client + */ + public async shutdown(timeout: number = 30000): Promise { + if (this.isShuttingDown) { + return; + } + + this.isShuttingDown = true; + this.logger.info('Initiating client shutdown', { queueSize: this.messageQueue.length }); + + // Stop accepting new messages and clear timers + this.clearTimers(); + + try { + // Wait for current queue processing to complete or timeout + if (this.isProcessingQueue) { + await Promise.race([ + this.waitForQueueCompletion(), + new Promise((_, reject) => + setTimeout(() => reject(new TimeoutError('Shutdown timeout', timeout)), timeout) + ) + ]); + } + } catch (error) { + this.logger.warn('Shutdown timeout reached, forcing shutdown', error); + } + + // Reject any remaining messages + this.rejectAllQueuedMessages(new ShutdownError('Client shutting down')); + + // Close connection + await this.disconnect(); + + // Remove all listeners + this.removeAllListeners(); + + this.logger.info('Client shutdown completed'); + } + + /** + * Process the message queue + */ + private async processQueue(): Promise { + if (this.isProcessingQueue || this.messageQueue.length === 0 || this.isShuttingDown) { + return; + } + + this.isProcessingQueue = true; + this.emit('queueProcessingStarted', this.messageQueue.length); + + this.logger.info('Queue processing started', { queueSize: this.messageQueue.length }); + + let processed = 0; + let failed = 0; + + try { + // Connect if not connected + if (this.state === ConnectionState.DISCONNECTED) { + await this.connect(); + } + + // Process messages sequentially one at a time + while (this.messageQueue.length > 0 && !this.isShuttingDown) { + const message = this.messageQueue.shift()!; + + try { + this.currentMessage = message; + const startTime = Date.now(); + + await this.processMessage(message); + + const responseTime = Date.now() - startTime; + this.stats.messagesProcessed++; + this.stats.averageResponseTime = + (this.stats.averageResponseTime * (this.stats.messagesProcessed - 1) + responseTime) / + this.stats.messagesProcessed; + + this.emit('messageProcessed', message.id, responseTime); + this.logger.debug('Message processed successfully', { + messageId: message.id, + responseTime + }); + + processed++; + } catch (error) { + this.stats.messagesFailed++; + this.stats.lastError = (error as Error).message; + this.stats.lastErrorTime = new Date(); + + this.emit('messageFailed', message.id, error as Error); + this.logger.error('Message processing failed', { + messageId: message.id, + error: (error as Error).message + }); + + failed++; + message.reject(error as Error); + } finally { + this.currentMessage = null; + } + } + + } catch (error) { + this.logger.error('Queue processing error', error); + this.rejectAllQueuedMessages(error as Error); + failed += this.messageQueue.length; + } finally { + this.isProcessingQueue = false; + + // Disconnect if queue is empty and not shutting down + if (this.messageQueue.length === 0 && !this.isShuttingDown) { + await this.disconnect(); + } + + this.emit('queueProcessingCompleted', processed, failed); + this.logger.info('Queue processing completed', { processed, failed }); + } + } + + /** + * Process a single message + */ + private async processMessage(message: QueuedMessage): Promise { + return new Promise(async (resolve, reject) => { + try { + // Open SMTP channel + await this.openSMTPChannel(); + + // Wait for SMTP response + const responsePromise = this.waitForSMTPResponse(message.id); + + // Send SMTP data + this.sendSMTPData(message.data); + + // Wait for response + const response = await responsePromise; + message.resolve(response); + + // Close SMTP channel + await this.closeSMTPChannel(); + + resolve(); + + } catch (error) { + // Retry logic + if (message.retries > 0) { + message.retries--; + this.logger.debug('Retrying message', { + messageId: message.id, + retriesLeft: message.retries + }); + + // Re-queue message + this.insertMessageByPriority(message); + resolve(); + } else { + reject(ErrorFactory.messageError( + (error as Error).message, + message.id, + message.retries + )); + } + } + }); + } + + /** + * Connect to WebSocket server + */ + private async connect(): Promise { + if (this.state !== ConnectionState.DISCONNECTED && this.state !== ConnectionState.FAILED) { + return; + } + + this.setState(ConnectionState.CONNECTING); + this.logger.debug('Connecting to WebSocket server', { url: this.config.url }); + + return new Promise((resolve, reject) => { + try { + this.ws = new WebSocket(this.config.url); + + const connectionTimeout = setTimeout(() => { + if (this.ws) { + this.ws.terminate(); + } + reject(ErrorFactory.timeout('Connection', this.config.authTimeout)); + }, this.config.authTimeout); + + this.ws.on('open', () => { + clearTimeout(connectionTimeout); + this.connectionStartTime = Date.now(); + this.stats.totalConnections++; + this.reconnectAttempts = 0; + + this.setState(ConnectionState.CONNECTED); + this.emit('connected'); + this.logger.info('WebSocket connected'); + + // Start heartbeat + this.startHeartbeat(); + + // Authenticate + this.authenticate().then(resolve).catch(reject); + }); + + this.ws.on('message', (data) => { + this.handleMessage(data.toString()); + }); + + this.ws.on('close', (code, reason) => { + clearTimeout(connectionTimeout); + this.logger.info('WebSocket closed', { code, reason: reason.toString() }); + this.handleDisconnection(`Connection closed: ${code} ${reason}`); + }); + + this.ws.on('error', (error) => { + clearTimeout(connectionTimeout); + this.logger.error('WebSocket error', error); + const wsError = ErrorFactory.fromWebSocketError(error); + this.emit('error', wsError); + reject(wsError); + }); + + } catch (error) { + reject(ErrorFactory.fromWebSocketError(error as Error)); + } + }); + } + + /** + * Authenticate with the server + */ + private async authenticate(): Promise { + return new Promise((resolve, reject) => { + this.setState(ConnectionState.AUTHENTICATING); + this.logger.debug('Authenticating with server'); + + this.authTimer = setTimeout(() => { + reject(ErrorFactory.timeout('Authentication', this.config.authTimeout)); + }, this.config.authTimeout); + + const authMessage: AuthenticateMessage = { + type: SMTPOverWsMessageType.AUTHENTICATE, + data: { apiKey: this.config.apiKey } + }; + + this.sendMessage(authMessage); + + const onAuthResponse = (message: AuthenticateResponseMessage) => { + if (this.authTimer) { + clearTimeout(this.authTimer); + this.authTimer = null; + } + + if (message.data.success) { + this.setState(ConnectionState.AUTHENTICATED); + this.emit('authenticated'); + this.logger.info('Authentication successful'); + resolve(); + } else { + const error = ErrorFactory.fromAuthenticationFailure(message.data.error); + this.emit('error', error); + this.logger.error('Authentication failed', { error: message.data.error }); + reject(error); + } + }; + + this.once('authenticate_response', onAuthResponse); + }); + } + + /** + * Open SMTP channel + */ + private async openSMTPChannel(): Promise { + if (this.state !== ConnectionState.AUTHENTICATED) { + throw new ChannelError('Cannot open SMTP channel: not authenticated'); + } + + return new Promise((resolve, reject) => { + this.setState(ConnectionState.CHANNEL_OPENING); + this.logger.debug('Opening SMTP channel'); + + this.channelTimer = setTimeout(() => { + reject(ErrorFactory.timeout('Channel open', this.config.channelTimeout)); + }, this.config.channelTimeout); + + const openMessage: SMTPChannelOpenMessage = { + type: SMTPOverWsMessageType.SMTP_CHANNEL_OPEN + }; + + this.sendMessage(openMessage); + + const onChannelReady = () => { + // Channel is ready, now wait for SMTP 220 greeting + this.logger.debug('Channel ready, waiting for SMTP greeting'); + + const onGreeting = async (message: SMTPFromServerMessage) => { + this.logger.debug('RX SMTP greeting', { + response: message.data.trim(), + size: message.data.length + }); + + // Check if it's a 220 greeting + if (message.data.startsWith('220')) { + try { + // Automatically send EHLO after greeting + this.logger.debug('Auto-sending EHLO after greeting'); + this.sendSMTPData('EHLO client\r\n'); + + // Wait for EHLO response + const onEhloResponse = (ehloMessage: SMTPFromServerMessage) => { + if (this.channelTimer) { + clearTimeout(this.channelTimer); + this.channelTimer = null; + } + + this.logger.debug('RX SMTP EHLO response', { + response: ehloMessage.data.trim(), + size: ehloMessage.data.length + }); + + if (ehloMessage.data.startsWith('250')) { + this.setState(ConnectionState.CHANNEL_READY); + this.emit('channelOpened'); + this.logger.debug('SMTP channel ready after EHLO'); + resolve(); + } else { + const error = ErrorFactory.fromChannelFailure(`EHLO rejected: ${ehloMessage.data.trim()}`); + this.emit('channelError', error); + reject(error); + } + }; + + this.once('smtp_from_server', onEhloResponse); + + } catch (error) { + if (this.channelTimer) { + clearTimeout(this.channelTimer); + this.channelTimer = null; + } + const channelError = ErrorFactory.fromChannelFailure(`Failed to send EHLO: ${(error as Error).message}`); + this.emit('channelError', channelError); + reject(channelError); + } + } else { + if (this.channelTimer) { + clearTimeout(this.channelTimer); + this.channelTimer = null; + } + const error = ErrorFactory.fromChannelFailure(`Expected 220 greeting, got: ${message.data.trim()}`); + this.emit('channelError', error); + reject(error); + } + }; + + this.once('smtp_from_server', onGreeting); + }; + + const onChannelError = (message: SMTPChannelErrorMessage) => { + if (this.channelTimer) { + clearTimeout(this.channelTimer); + this.channelTimer = null; + } + this.setState(ConnectionState.CHANNEL_ERROR); + const error = ErrorFactory.fromChannelFailure(message.data.error); + this.emit('channelError', error); + this.logger.error('SMTP channel error', { error: message.data.error }); + reject(error); + }; + + this.once('smtp_channel_ready', onChannelReady); + this.once('smtp_channel_error', onChannelError); + }); + } + + /** + * Close SMTP channel + */ + private async closeSMTPChannel(): Promise { + return new Promise((resolve) => { + if (this.state === ConnectionState.CHANNEL_READY) { + const onChannelClosed = () => { + this.setState(ConnectionState.AUTHENTICATED); + this.emit('channelClosed'); + this.logger.debug('SMTP channel closed'); + resolve(); + }; + + this.once('smtp_channel_closed', onChannelClosed); + + // Fallback timeout + setTimeout(() => { + this.removeListener('smtp_channel_closed', onChannelClosed); + this.setState(ConnectionState.AUTHENTICATED); + resolve(); + }, 5000); + } else { + resolve(); + } + }); + } + + /** + * Send SMTP data to server + */ + private sendSMTPData(data: string): void { + const message: SMTPToServerMessage = { + type: SMTPOverWsMessageType.SMTP_TO_SERVER, + data + }; + this.logger.debug('TX SMTP command', { + command: data.trim(), + size: data.length + }); + this.sendMessage(message); + } + + /** + * Wait for SMTP response + */ + private waitForSMTPResponse(messageId: string): Promise { + return new Promise((resolve, reject) => { + const timeout = setTimeout(() => { + reject(ErrorFactory.timeout('SMTP response', this.config.messageTimeout, { messageId })); + }, this.config.messageTimeout); + + const onResponse = (message: SMTPFromServerMessage) => { + clearTimeout(timeout); + this.logger.debug('RX SMTP response', { + messageId, + response: message.data.trim(), + size: message.data.length + }); + resolve(message.data); + }; + + this.once('smtp_from_server', onResponse); + }); + } + + /** + * Send WebSocket message + */ + private sendMessage(message: SMTPOverWsMessage): void { + if (this.ws && this.ws.readyState === WebSocket.OPEN) { + const messageStr = JSON.stringify(message); + this.logger.debug('TX WebSocket message', { + type: message.type, + data: message.type === 'authenticate' ? '[REDACTED]' : (message as any).data, + raw: messageStr.length > 200 ? messageStr.substring(0, 200) + '...' : messageStr + }); + this.ws.send(messageStr); + } else { + throw new ConnectionError('WebSocket is not connected'); + } + } + + /** + * Handle incoming WebSocket message + */ + private handleMessage(data: string): void { + try { + const message: SMTPOverWsMessage = JSON.parse(data); + this.logger.debug('RX WebSocket message', { + type: message.type, + data: message.type === 'authenticate_response' ? '[REDACTED]' : (message as any).data, + raw: data.length > 200 ? data.substring(0, 200) + '...' : data + }); + // Use setImmediate to avoid catching errors from event handlers + setImmediate(() => { + (this as any).emit(message.type, message); + }); + } catch (error) { + this.logger.error('Failed to parse WebSocket message', { data, error }); + this.emit('error', ErrorFactory.protocolError('Invalid message format', undefined, { data })); + } + } + + /** + * Handle WebSocket disconnection + */ + private handleDisconnection(reason?: string): void { + this.setState(ConnectionState.DISCONNECTED); + this.emit('disconnected', reason); + this.stopHeartbeat(); + this.clearTimers(); + + if (this.isProcessingQueue && !this.isShuttingDown && + this.reconnectAttempts < this.config.maxReconnectAttempts) { + this.scheduleReconnect(); + } else if (this.isProcessingQueue && !this.isShuttingDown) { + this.logger.error('Max reconnection attempts reached'); + this.setState(ConnectionState.FAILED); + this.rejectAllQueuedMessages(new ConnectionError('Max reconnection attempts reached')); + this.isProcessingQueue = false; + } + } + + /** + * Schedule reconnection attempt + */ + private scheduleReconnect(): void { + this.reconnectAttempts++; + this.stats.reconnectionAttempts++; + this.setState(ConnectionState.RECONNECTING); + + this.emit('reconnecting', this.reconnectAttempts, this.config.maxReconnectAttempts); + this.logger.info('Scheduling reconnection', { + attempt: this.reconnectAttempts, + maxAttempts: this.config.maxReconnectAttempts, + delay: this.config.reconnectInterval + }); + + this.reconnectTimer = setTimeout(() => { + this.reconnectTimer = null; + this.connect() + .then(() => { + this.emit('reconnected'); + this.logger.info('Reconnection successful'); + }) + .catch((error) => { + this.logger.error('Reconnection failed', error); + this.handleDisconnection('Reconnection failed'); + }); + }, this.config.reconnectInterval * Math.min(this.reconnectAttempts, 5)); // Exponential backoff + } + + /** + * Start heartbeat timer + */ + private startHeartbeat(): void { + if (this.config.heartbeatInterval > 0) { + this.heartbeatTimer = setInterval(() => { + if (this.ws && this.ws.readyState === WebSocket.OPEN) { + this.ws.ping(); + } + }, this.config.heartbeatInterval); + } + } + + /** + * Stop heartbeat timer + */ + private stopHeartbeat(): void { + if (this.heartbeatTimer) { + clearInterval(this.heartbeatTimer); + this.heartbeatTimer = null; + } + } + + /** + * Clear all timers + */ + private clearTimers(): void { + if (this.reconnectTimer) { + clearTimeout(this.reconnectTimer); + this.reconnectTimer = null; + } + + if (this.authTimer) { + clearTimeout(this.authTimer); + this.authTimer = null; + } + + if (this.channelTimer) { + clearTimeout(this.channelTimer); + this.channelTimer = null; + } + + if (this.messageTimer) { + clearTimeout(this.messageTimer); + this.messageTimer = null; + } + + this.stopHeartbeat(); + } + + /** + * Set connection state and emit event + */ + private setState(newState: ConnectionState): void { + if (this.state !== newState) { + const oldState = this.state; + this.state = newState; + this.emit('stateChanged', oldState, newState); + this.logger.debug('State changed', { from: oldState, to: newState }); + } + } + + /** + * Reject all queued messages + */ + private rejectAllQueuedMessages(error: Error): void { + if (this.currentMessage) { + this.currentMessage.reject(error); + this.currentMessage = null; + } + + while (this.messageQueue.length > 0) { + const message = this.messageQueue.shift()!; + message.reject(error); + } + } + + /** + * Insert message into queue based on priority + */ + private insertMessageByPriority(message: QueuedMessage): void { + let insertIndex = this.messageQueue.length; + + // Find insertion point based on priority + for (let i = 0; i < this.messageQueue.length; i++) { + if (message.priority > this.messageQueue[i]!.priority) { + insertIndex = i; + break; + } + } + + this.messageQueue.splice(insertIndex, 0, message); + } + + /** + * Remove message from queue by ID + */ + private removeMessageFromQueue(messageId: string): boolean { + const index = this.messageQueue.findIndex(msg => msg.id === messageId); + if (index !== -1) { + this.messageQueue.splice(index, 1); + return true; + } + return false; + } + + /** + * Generate unique message ID + */ + private generateMessageId(): string { + return `msg_${Date.now()}_${++this.messageIdCounter}`; + } + + /** + * Validate client configuration + */ + private validateConfig(config: SMTPClientConfig): void { + if (!config.url) { + throw ErrorFactory.configurationError('URL is required', 'url'); + } + + if (!config.apiKey) { + throw ErrorFactory.configurationError('API key is required', 'apiKey'); + } + + if (config.reconnectInterval && config.reconnectInterval < 1000) { + throw ErrorFactory.configurationError('Reconnect interval must be at least 1000ms', 'reconnectInterval'); + } + + if (config.maxReconnectAttempts && config.maxReconnectAttempts < 1) { + throw ErrorFactory.configurationError('Max reconnect attempts must be at least 1', 'maxReconnectAttempts'); + } + + if (config.maxQueueSize && config.maxQueueSize < 1) { + throw ErrorFactory.configurationError('Max queue size must be at least 1', 'maxQueueSize'); + } + } + + /** + * Initialize statistics + */ + private initializeStats(): ClientStats { + return { + messagesQueued: 0, + messagesProcessed: 0, + messagesFailed: 0, + reconnectionAttempts: 0, + totalConnections: 0, + averageResponseTime: 0, + queueSize: 0, + connectionUptime: 0 + }; + } + + /** + * Wait for queue processing to complete + */ + private waitForQueueCompletion(): Promise { + return new Promise((resolve) => { + if (!this.isProcessingQueue) { + resolve(); + return; + } + + const onComplete = () => { + resolve(); + }; + + this.once('queueProcessingCompleted', onComplete); + }); + } + + /** + * Disconnect from WebSocket server + */ + private async disconnect(): Promise { + this.clearTimers(); + + if (this.ws && this.ws.readyState === WebSocket.OPEN) { + this.ws.close(); + } + + this.ws = null; + this.setState(ConnectionState.DISCONNECTED); + this.connectionStartTime = 0; + } + + // Type-safe event emitter methods + public on(event: K, listener: ClientEvents[K]): this { + return super.on(event, listener); + } + + public once(event: K, listener: ClientEvents[K]): this { + return super.once(event, listener); + } + + public emit(event: K, ...args: Parameters): boolean { + return super.emit(event, ...args); + } +} \ No newline at end of file diff --git a/src/errors.ts b/src/errors.ts new file mode 100644 index 0000000..4eb96c7 --- /dev/null +++ b/src/errors.ts @@ -0,0 +1,207 @@ +/** + * @fileoverview Error classes for SMTP over WebSocket client + */ + +/** + * Base error class for all SMTP WebSocket client errors + */ +export abstract class SMTPWSError extends Error { + public readonly code: string; + public readonly timestamp: Date; + public readonly context?: Record; + + constructor(message: string, code: string, context?: Record) { + super(message); + this.name = this.constructor.name; + this.code = code; + this.timestamp = new Date(); + this.context = context ?? {}; + + // Maintains proper stack trace for where our error was thrown (only available on V8) + if (Error.captureStackTrace) { + Error.captureStackTrace(this, this.constructor); + } + } + + /** + * Convert error to JSON for logging + */ + toJSON(): Record { + return { + name: this.name, + message: this.message, + code: this.code, + timestamp: this.timestamp.toISOString(), + context: this.context, + stack: this.stack + }; + } +} + +/** + * Connection-related errors + */ +export class ConnectionError extends SMTPWSError { + constructor(message: string, context?: Record) { + super(message, 'CONNECTION_ERROR', context); + } +} + +/** + * Authentication-related errors + */ +export class AuthenticationError extends SMTPWSError { + constructor(message: string, context?: Record) { + super(message, 'AUTHENTICATION_ERROR', context); + } +} + +/** + * SMTP channel-related errors + */ +export class ChannelError extends SMTPWSError { + constructor(message: string, context?: Record) { + super(message, 'CHANNEL_ERROR', context); + } +} + +/** + * Message timeout errors + */ +export class TimeoutError extends SMTPWSError { + constructor(message: string, timeout: number, context?: Record) { + super(message, 'TIMEOUT_ERROR', { ...context, timeout }); + } +} + +/** + * Queue-related errors + */ +export class QueueError extends SMTPWSError { + constructor(message: string, context?: Record) { + super(message, 'QUEUE_ERROR', context); + } +} + +/** + * Protocol-related errors + */ +export class ProtocolError extends SMTPWSError { + constructor(message: string, context?: Record) { + super(message, 'PROTOCOL_ERROR', context); + } +} + +/** + * Configuration-related errors + */ +export class ConfigurationError extends SMTPWSError { + constructor(message: string, context?: Record) { + super(message, 'CONFIGURATION_ERROR', context); + } +} + +/** + * Message processing errors + */ +export class MessageError extends SMTPWSError { + public readonly messageId: string; + public readonly retryCount: number; + + constructor(message: string, messageId: string, retryCount: number = 0, context?: Record) { + super(message, 'MESSAGE_ERROR', { ...context, messageId, retryCount }); + this.messageId = messageId; + this.retryCount = retryCount; + } +} + +/** + * Client shutdown errors + */ +export class ShutdownError extends SMTPWSError { + constructor(message: string, context?: Record) { + super(message, 'SHUTDOWN_ERROR', context); + } +} + +/** + * Network-related errors + */ +export class NetworkError extends SMTPWSError { + constructor(message: string, context?: Record) { + super(message, 'NETWORK_ERROR', context); + } +} + +/** + * Error factory for creating appropriate error types + */ +export class ErrorFactory { + /** + * Create an error from WebSocket error events + */ + static fromWebSocketError(error: Error, context?: Record): SMTPWSError { + if (error.message.includes('timeout')) { + return new TimeoutError(error.message, 0, context); + } + + if (error.message.includes('connection')) { + return new ConnectionError(error.message, context); + } + + if (error.message.includes('network') || error.message.includes('ENOTFOUND') || error.message.includes('ECONNREFUSED')) { + return new NetworkError(error.message, context); + } + + return new ConnectionError(error.message, context); + } + + /** + * Create an error from authentication failure + */ + static fromAuthenticationFailure(errorMessage?: string, context?: Record): AuthenticationError { + return new AuthenticationError(errorMessage || 'Authentication failed', context); + } + + /** + * Create an error from channel failure + */ + static fromChannelFailure(errorMessage: string, context?: Record): ChannelError { + return new ChannelError(errorMessage, context); + } + + /** + * Create a timeout error + */ + static timeout(operation: string, timeout: number, context?: Record): TimeoutError { + return new TimeoutError(`${operation} timed out after ${timeout}ms`, timeout, context); + } + + /** + * Create a queue error + */ + static queueError(message: string, queueSize: number, context?: Record): QueueError { + return new QueueError(message, { ...context, queueSize }); + } + + /** + * Create a message error + */ + static messageError(message: string, messageId: string, retryCount: number = 0, context?: Record): MessageError { + return new MessageError(message, messageId, retryCount, context); + } + + /** + * Create a configuration error + */ + static configurationError(message: string, field?: string, context?: Record): ConfigurationError { + return new ConfigurationError(message, { ...context, field }); + } + + /** + * Create a protocol error + */ + static protocolError(message: string, messageType?: string, context?: Record): ProtocolError { + return new ProtocolError(message, { ...context, messageType }); + } +} \ No newline at end of file diff --git a/src/index.ts b/src/index.ts new file mode 100644 index 0000000..4296522 --- /dev/null +++ b/src/index.ts @@ -0,0 +1,63 @@ +/** + * @fileoverview Main entry point for SMTP over WebSocket client library + */ + +// Export main client class +export { SMTPOverWSClient } from './client'; + +// Export all types +export { + SMTPOverWsMessageType, + ConnectionState, + MessagePriority, + type SMTPOverWsMessageBase, + type AuthenticateMessage, + type AuthenticateResponseMessage, + type SMTPChannelOpenMessage, + type SMTPChannelReadyMessage, + type SMTPChannelClosedMessage, + type SMTPChannelErrorMessage, + type SMTPToServerMessage, + type SMTPFromServerMessage, + type SMTPOverWsMessage, + type QueuedMessage, + type SMTPClientConfig, + type Logger, + type ClientStats, + type ClientEvents, + type SendOptions +} from './types'; + +// Export all error classes +export { + SMTPWSError, + ConnectionError, + AuthenticationError, + ChannelError, + TimeoutError, + QueueError, + MessageError, + ShutdownError, + NetworkError, + ProtocolError, + ConfigurationError, + ErrorFactory +} from './errors'; + +// Export Nodemailer transport +export { + SMTPWSTransport, + createTransport, + type TransportOptions, + type Envelope, + type MailMessage, + type SendResult, + type TransportInfo +} from './transport'; + +// Version information (will be updated by build process) +export const VERSION = '1.0.0'; + +// Re-export for convenience +import { SMTPOverWSClient } from './client'; +export default SMTPOverWSClient; \ No newline at end of file diff --git a/src/transport.ts b/src/transport.ts new file mode 100644 index 0000000..b070881 --- /dev/null +++ b/src/transport.ts @@ -0,0 +1,352 @@ +/** + * @fileoverview Nodemailer transport adapter for SMTP over WebSocket + */ + +import { EventEmitter } from 'events'; +import { SMTPOverWSClient } from './client'; +import { SMTPClientConfig, ConnectionState } from './types'; +import { ConnectionError, MessageError, TimeoutError } from './errors'; + +/** + * Nodemailer transport interface compatibility + */ +export interface TransportOptions extends Omit { + /** WebSocket server URL */ + host: string; + + /** WebSocket server port */ + port?: number; + + /** Use secure WebSocket (wss) */ + secure?: boolean; + + /** API key for authentication */ + auth: { + user: string; // API key + pass?: string; // Optional, for future use + }; + + /** Transport name */ + name?: string; + + /** Transport version */ + version?: string; +} + +/** + * Mail envelope information + */ +export interface Envelope { + from: string; + to: string[]; +} + +/** + * Mail data structure (nodemailer format) + */ +export interface MailMessage { + data: any; + message: { + _envelope: Envelope; + _raw: string | Buffer; + }; + mailer: any; +} + +/** + * Transport send result + */ +export interface SendResult { + envelope: Envelope; + messageId: string; + accepted: string[]; + rejected: string[]; + pending: string[]; + response: string; +} + +/** + * Transport info for Nodemailer compatibility + */ +export interface TransportInfo { + name: string; + version: string; + [key: string]: any; +} + +/** + * SMTP over WebSocket Nodemailer Transport + */ +export class SMTPWSTransport extends EventEmitter { + public name: string = 'SMTPWS'; + public version: string = '1.0.0'; + + private client: SMTPOverWSClient; + private options: TransportOptions; + private _isIdle: boolean = true; + + constructor(options: TransportOptions) { + super(); + + this.options = options; + + // Convert transport options to client config + const protocol = options.secure ? 'wss' : 'ws'; + const port = options.port || (options.secure ? 443 : 3000); + const url = `${protocol}://${options.host}:${port}/smtp`; + + const clientConfig: SMTPClientConfig = { + url, + apiKey: options.auth.user, + ...options + }; + + this.client = new SMTPOverWSClient(clientConfig); + this.setupClientEvents(); + } + + /** + * Get transport info + */ + public getTransportInfo(): TransportInfo { + return { + name: this.name, + version: this.version, + host: this.options.host, + port: this.options.port || 3000, + secure: this.options.secure || false + }; + } + + /** + * Send mail using the transport + */ + public async send(mail: MailMessage, callback?: (err: Error | null, info?: SendResult) => void): Promise { + try { + this._isIdle = false; + this.emit('idle', false); + + const result = await this.sendMail(mail); + + this._isIdle = true; + this.emit('idle', true); + + if (callback) { + callback(null, result); + } + + return result; + + } catch (error) { + this._isIdle = true; + this.emit('idle', true); + + if (callback) { + callback(error as Error); + } + + throw error; + } + } + + /** + * Verify transport configuration + */ + public async verify(): Promise { + try { + // Test full connection cycle: connect -> authenticate -> open SMTP -> close SMTP -> disconnect + await this.client.sendSMTPCommand('EHLO transport-verify\r\n'); + return true; + } catch (error) { + const err = error as any; + let message = 'Transport verification failed'; + + // Provide more specific error messages + if (err.code === 'AUTHENTICATION_ERROR') { + message = `Authentication failed: ${err.message}. Check your API key.`; + } else if (err.code === 'CONNECTION_ERROR') { + message = `Connection failed: ${err.message}. Check your host and port.`; + } else if (err.code === 'TIMEOUT_ERROR') { + message = `Connection timeout: ${err.message}. Server may be unreachable.`; + } else { + message = `${message}: ${err.message}`; + } + + throw new ConnectionError(message); + } + } + + /** + * Close the transport + */ + public async close(): Promise { + await this.client.shutdown(); + this.removeAllListeners(); + } + + /** + * Check if transport is idle + */ + public isIdle(): boolean { + return this._isIdle && this.client.getQueueSize() === 0; + } + + /** + * Internal method to send mail + */ + private async sendMail(mail: MailMessage): Promise { + const envelope = mail.message._envelope; + const raw = mail.message._raw; + const messageId = this.generateMessageId(); + const accepted: string[] = []; + const rejected: string[] = []; + const responses: string[] = []; + + try { + // Debug envelope + console.log('DEBUG envelope:', { + from: envelope.from, + to: envelope.to, + messageId, + envelopeKeys: Object.keys(envelope || {}), + envelope: envelope + }); + + // EHLO is now sent automatically when channel opens + // Send MAIL FROM + const mailFromResponse = await this.client.sendSMTPCommand(`MAIL FROM: <${envelope.from}>\r\n`); + responses.push(mailFromResponse); + + if (!this.isSuccessResponse(mailFromResponse)) { + throw new MessageError(`MAIL FROM rejected: ${mailFromResponse}`, messageId); + } + + // Send RCPT TO for each recipient + for (const recipient of envelope.to) { + try { + const rcptResponse = await this.client.sendSMTPCommand(`RCPT TO: <${recipient}>\r\n`); + responses.push(rcptResponse); + + if (this.isSuccessResponse(rcptResponse)) { + accepted.push(recipient); + } else { + rejected.push(recipient); + } + } catch (error) { + rejected.push(recipient); + responses.push(`Error for ${recipient}: ${(error as Error).message}`); + } + } + + // If no recipients were accepted, fail + if (accepted.length === 0) { + throw new MessageError('All recipients were rejected', messageId); + } + + // Send DATA command + const dataResponse = await this.client.sendSMTPCommand('DATA\r\n'); + responses.push(dataResponse); + + if (!this.isSuccessResponse(dataResponse)) { + throw new MessageError(`DATA command rejected: ${dataResponse}`, messageId); + } + + // Send message content + const messageData = this.prepareMessageData(raw); + const contentResponse = await this.client.sendSMTPCommand(messageData); + responses.push(contentResponse); + + if (!this.isSuccessResponse(contentResponse)) { + throw new MessageError(`Message data rejected: ${contentResponse}`, messageId); + } + + // Send QUIT + try { + const quitResponse = await this.client.sendSMTPCommand('QUIT\r\n'); + responses.push(quitResponse); + } catch (error) { + // QUIT failure is not critical + } + + return { + envelope, + messageId, + accepted, + rejected, + pending: [], + response: responses.join('\n') + }; + + } catch (error) { + // Try to send RSET to clean up + try { + await this.client.sendSMTPCommand('RSET\r\n'); + } catch { + // Ignore RSET failures + } + + throw error; + } + } + + /** + * Setup client event forwarding + */ + private setupClientEvents(): void { + this.client.on('error', (error) => { + this.emit('error', error); + }); + + this.client.on('connected', () => { + this.emit('connect'); + }); + + this.client.on('disconnected', () => { + this.emit('close'); + }); + + this.client.on('messageProcessed', () => { + this.emit('idle', this._isIdle); + }); + } + + /** + * Check if SMTP response indicates success + */ + private isSuccessResponse(response: string): boolean { + const statusCode = response.substring(0, 3); + return statusCode.startsWith('2') || statusCode.startsWith('3'); + } + + /** + * Prepare message data for transmission + */ + private prepareMessageData(raw: string | Buffer): string { + let messageData = raw.toString(); + + // Ensure message ends with CRLF.CRLF + if (!messageData.endsWith('\r\n')) { + messageData += '\r\n'; + } + messageData += '.\r\n'; + + // Escape lines that start with a dot + messageData = messageData.replace(/\n\./g, '\n..'); + + return messageData; + } + + /** + * Generate unique message ID + */ + private generateMessageId(): string { + return `smtp-ws-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`; + } +} + +/** + * Create transport instance + */ +export function createTransport(options: TransportOptions): SMTPWSTransport { + return new SMTPWSTransport(options); +} \ No newline at end of file diff --git a/src/types.ts b/src/types.ts new file mode 100644 index 0000000..eea7f79 --- /dev/null +++ b/src/types.ts @@ -0,0 +1,263 @@ +/** + * @fileoverview Type definitions for SMTP over WebSocket protocol + */ + +/** + * Message types supported by the SMTP over WebSocket protocol + */ +export enum SMTPOverWsMessageType { + AUTHENTICATE = 'authenticate', + AUTHENTICATE_RESPONSE = 'authenticate_response', + SMTP_TO_SERVER = 'smtp_to_server', + SMTP_FROM_SERVER = 'smtp_from_server', + SMTP_CHANNEL_OPEN = 'smtp_channel_open', + SMTP_CHANNEL_CLOSED = 'smtp_channel_closed', + SMTP_CHANNEL_ERROR = 'smtp_channel_error', + SMTP_CHANNEL_READY = 'smtp_channel_ready' +} + +/** + * Base interface for all WebSocket messages + */ +export interface SMTPOverWsMessageBase { + type: SMTPOverWsMessageType; +} + +/** + * Authentication request message + */ +export interface AuthenticateMessage extends SMTPOverWsMessageBase { + type: SMTPOverWsMessageType.AUTHENTICATE; + data: { + apiKey: string; + }; +} + +/** + * Authentication response message + */ +export interface AuthenticateResponseMessage extends SMTPOverWsMessageBase { + type: SMTPOverWsMessageType.AUTHENTICATE_RESPONSE; + data: { + success: boolean; + error?: string; + }; +} + +/** + * SMTP channel open request message + */ +export interface SMTPChannelOpenMessage extends SMTPOverWsMessageBase { + type: SMTPOverWsMessageType.SMTP_CHANNEL_OPEN; + data?: null; +} + +/** + * SMTP channel ready notification message + */ +export interface SMTPChannelReadyMessage extends SMTPOverWsMessageBase { + type: SMTPOverWsMessageType.SMTP_CHANNEL_READY; + data?: null; +} + +/** + * SMTP channel closed notification message + */ +export interface SMTPChannelClosedMessage extends SMTPOverWsMessageBase { + type: SMTPOverWsMessageType.SMTP_CHANNEL_CLOSED; + data?: null; +} + +/** + * SMTP channel error notification message + */ +export interface SMTPChannelErrorMessage extends SMTPOverWsMessageBase { + type: SMTPOverWsMessageType.SMTP_CHANNEL_ERROR; + data: { + error: string; + }; +} + +/** + * SMTP data to server message + */ +export interface SMTPToServerMessage extends SMTPOverWsMessageBase { + type: SMTPOverWsMessageType.SMTP_TO_SERVER; + data: string; +} + +/** + * SMTP data from server message + */ +export interface SMTPFromServerMessage extends SMTPOverWsMessageBase { + type: SMTPOverWsMessageType.SMTP_FROM_SERVER; + data: string; +} + +/** + * Union type for all possible WebSocket messages + */ +export type SMTPOverWsMessage = + | AuthenticateMessage + | AuthenticateResponseMessage + | SMTPChannelOpenMessage + | SMTPChannelReadyMessage + | SMTPChannelClosedMessage + | SMTPChannelErrorMessage + | SMTPToServerMessage + | SMTPFromServerMessage; + +/** + * Connection state enum + */ +export enum ConnectionState { + DISCONNECTED = 'disconnected', + CONNECTING = 'connecting', + CONNECTED = 'connected', + AUTHENTICATING = 'authenticating', + AUTHENTICATED = 'authenticated', + CHANNEL_OPENING = 'channel_opening', + CHANNEL_READY = 'channel_ready', + CHANNEL_ERROR = 'channel_error', + CHANNEL_CLOSED = 'channel_closed', + RECONNECTING = 'reconnecting', + FAILED = 'failed' +} + +/** + * Queued message structure + */ +export interface QueuedMessage { + id: string; + data: string; + resolve: (response: string) => void; + reject: (error: Error) => void; + timestamp: number; + retries: number; + priority: MessagePriority; +} + +/** + * Message priority levels + */ +export enum MessagePriority { + LOW = 0, + NORMAL = 1, + HIGH = 2, + CRITICAL = 3 +} + +/** + * Client configuration options + */ +export interface SMTPClientConfig { + /** WebSocket server URL */ + url: string; + + /** API key for authentication */ + apiKey: string; + + /** Interval between reconnection attempts in milliseconds */ + reconnectInterval?: number; + + /** Maximum number of reconnection attempts */ + maxReconnectAttempts?: number; + + /** Authentication timeout in milliseconds */ + authTimeout?: number; + + /** Channel open/close timeout in milliseconds */ + channelTimeout?: number; + + /** Message timeout in milliseconds */ + messageTimeout?: number; + + /** Maximum number of concurrent messages */ + maxConcurrentMessages?: number; + + /** Enable debug logging */ + debug?: boolean; + + /** Custom logger function */ + logger?: Logger; + + /** Connection heartbeat interval in milliseconds */ + heartbeatInterval?: number; + + /** Maximum queue size */ + maxQueueSize?: number; +} + +/** + * Logger interface + */ +export interface Logger { + debug(message: string, ...args: any[]): void; + info(message: string, ...args: any[]): void; + warn(message: string, ...args: any[]): void; + error(message: string, ...args: any[]): void; +} + +/** + * Client statistics + */ +export interface ClientStats { + messagesQueued: number; + messagesProcessed: number; + messagesFailed: number; + reconnectionAttempts: number; + totalConnections: number; + averageResponseTime: number; + queueSize: number; + connectionUptime: number; + lastError?: string; + lastErrorTime?: Date; +} + +/** + * Event types emitted by the client + */ +export interface ClientEvents { + connecting: () => void; + connected: () => void; + authenticated: () => void; + disconnected: (reason?: string) => void; + reconnecting: (attempt: number, maxAttempts: number) => void; + reconnected: () => void; + error: (error: Error) => void; + messageQueued: (messageId: string, queueSize: number) => void; + messageProcessed: (messageId: string, responseTime: number) => void; + messageFailed: (messageId: string, error: Error) => void; + queueProcessingStarted: (queueSize: number) => void; + queueProcessingCompleted: (processed: number, failed: number) => void; + channelOpened: () => void; + channelClosed: () => void; + channelError: (error: Error) => void; + stateChanged: (oldState: ConnectionState, newState: ConnectionState) => void; + // WebSocket message events + authenticate: (message: AuthenticateMessage) => void; + authenticate_response: (message: AuthenticateResponseMessage) => void; + smtp_channel_open: (message: SMTPChannelOpenMessage) => void; + smtp_channel_ready: (message: SMTPChannelReadyMessage) => void; + smtp_channel_closed: (message: SMTPChannelClosedMessage) => void; + smtp_channel_error: (message: SMTPChannelErrorMessage) => void; + smtp_to_server: (message: SMTPToServerMessage) => void; + smtp_from_server: (message: SMTPFromServerMessage) => void; +} + +/** + * Message send options + */ +export interface SendOptions { + /** Message priority */ + priority?: MessagePriority; + + /** Message timeout in milliseconds */ + timeout?: number; + + /** Number of retry attempts */ + retries?: number; + + /** Whether to skip queue and send immediately */ + immediate?: boolean; +} \ No newline at end of file