feat: Implement SMTP over WebSocket client with error handling and Nodemailer transport

- Added error classes for various SMTP-related issues (ConnectionError, AuthenticationError, etc.) in `errors.ts`.
- Created main entry point for the SMTP over WebSocket client library in `index.ts`, exporting client, types, errors, and transport.
- Developed Nodemailer transport adapter for SMTP over WebSocket in `transport.ts`, including methods for sending mail and verifying transport configuration.
- Defined type definitions for the SMTP over WebSocket protocol in `types.ts`, including message types, connection states, and client configuration options.
This commit is contained in:
Siwat Sirichai 2025-08-19 00:52:18 +07:00
parent 619cb97fa3
commit d059b80682
6 changed files with 1836 additions and 1 deletions

1
.gitignore vendored
View file

@ -8,7 +8,6 @@ yarn-error.log*
lib/ lib/
dist/ dist/
*.tsbuildinfo *.tsbuildinfo
src/
# Coverage directory used by tools like istanbul # Coverage directory used by tools like istanbul
coverage/ coverage/

951
src/client.ts Normal file
View file

@ -0,0 +1,951 @@
/**
* @fileoverview SMTP over WebSocket client with intelligent queue management
*/
import WebSocket from 'ws';
import { EventEmitter } from 'events';
import {
SMTPOverWsMessage,
SMTPOverWsMessageType,
ConnectionState,
QueuedMessage,
MessagePriority,
SMTPClientConfig,
Logger,
ClientStats,
ClientEvents,
SendOptions,
AuthenticateMessage,
SMTPChannelOpenMessage,
SMTPToServerMessage,
AuthenticateResponseMessage,
SMTPChannelErrorMessage,
SMTPFromServerMessage
} from './types';
import {
SMTPWSError,
ConnectionError,
AuthenticationError,
ChannelError,
TimeoutError,
QueueError,
MessageError,
ShutdownError,
ErrorFactory
} from './errors';
/**
* Default logger implementation
*/
class DefaultLogger implements Logger {
constructor(private enableDebug: boolean = false) {}
debug(message: string, ...args: any[]): void {
if (this.enableDebug) {
console.debug(`[SMTP-WS-DEBUG] ${message}`, ...args);
}
}
info(message: string, ...args: any[]): void {
if (this.enableDebug) {
console.info(`[SMTP-WS-INFO] ${message}`, ...args);
}
}
warn(message: string, ...args: any[]): void {
if (this.enableDebug) {
console.warn(`[SMTP-WS-WARN] ${message}`, ...args);
}
}
error(message: string, ...args: any[]): void {
if (this.enableDebug) {
console.error(`[SMTP-WS-ERROR] ${message}`, ...args);
}
}
}
/**
* SMTP over WebSocket client with intelligent queue management and automatic reconnection
*/
export class SMTPOverWSClient extends EventEmitter {
private config: Required<SMTPClientConfig>;
private ws: WebSocket | null = null;
private state: ConnectionState = ConnectionState.DISCONNECTED;
private messageQueue: QueuedMessage[] = [];
private currentMessage: QueuedMessage | null = null;
private reconnectAttempts = 0;
private reconnectTimer: NodeJS.Timeout | null = null;
private authTimer: NodeJS.Timeout | null = null;
private channelTimer: NodeJS.Timeout | null = null;
private heartbeatTimer: NodeJS.Timeout | null = null;
private messageTimer: NodeJS.Timeout | null = null;
private isProcessingQueue = false;
private isShuttingDown = false;
private logger: Logger;
private stats: ClientStats;
private connectionStartTime: number = 0;
private messageIdCounter = 0;
constructor(config: SMTPClientConfig) {
super();
// Validate configuration
this.validateConfig(config);
// Set defaults
this.config = {
reconnectInterval: 5000,
maxReconnectAttempts: 10,
authTimeout: 30000,
channelTimeout: 10000,
messageTimeout: 60000,
maxConcurrentMessages: 1,
debug: false,
heartbeatInterval: 30000,
maxQueueSize: 1000,
...config,
logger: config.logger || new DefaultLogger(config.debug ?? false)
};
this.logger = this.config.logger;
this.stats = this.initializeStats();
this.logger.debug('SMTP WebSocket client initialized', {
url: this.config.url,
maxQueueSize: this.config.maxQueueSize
});
}
/**
* Send SMTP command with automatic queue management
*/
public async sendSMTPCommand(data: string, options: SendOptions = {}): Promise<string> {
if (this.isShuttingDown) {
throw new ShutdownError('Client is shutting down');
}
// Check queue size limit
if (this.messageQueue.length >= this.config.maxQueueSize) {
throw ErrorFactory.queueError(
`Queue is full (${this.config.maxQueueSize} messages)`,
this.messageQueue.length
);
}
const messageId = this.generateMessageId();
const priority = options.priority ?? MessagePriority.NORMAL;
const timeout = options.timeout ?? this.config.messageTimeout;
const retries = options.retries ?? 3;
return new Promise((resolve, reject) => {
const queuedMessage: QueuedMessage = {
id: messageId,
data,
resolve,
reject,
timestamp: Date.now(),
retries,
priority
};
// Insert message based on priority
this.insertMessageByPriority(queuedMessage);
this.stats.messagesQueued++;
this.emit('messageQueued', messageId, this.messageQueue.length);
this.logger.debug('Message queued', {
messageId,
priority,
queueSize: this.messageQueue.length,
data: data.substring(0, 100) + (data.length > 100 ? '...' : '')
});
// Set message timeout
setTimeout(() => {
if (this.messageQueue.includes(queuedMessage) || this.currentMessage === queuedMessage) {
const error = ErrorFactory.timeout('Message', timeout, { messageId });
this.removeMessageFromQueue(messageId);
reject(error);
}
}, timeout);
// Start processing if not already running
if (!this.isProcessingQueue) {
this.processQueue().catch(error => {
this.logger.error('Queue processing failed', error);
});
}
});
}
/**
* Get current client statistics
*/
public getStats(): ClientStats {
return {
...this.stats,
queueSize: this.messageQueue.length,
connectionUptime: this.connectionStartTime > 0 ? Date.now() - this.connectionStartTime : 0
};
}
/**
* Get current connection state
*/
public getConnectionState(): ConnectionState {
return this.state;
}
/**
* Get current queue size
*/
public getQueueSize(): number {
return this.messageQueue.length;
}
/**
* Clear all queued messages
*/
public clearQueue(): void {
const clearedCount = this.messageQueue.length;
// Reject all queued messages
for (const message of this.messageQueue) {
message.reject(new QueueError('Queue cleared'));
}
this.messageQueue = [];
this.logger.info('Queue cleared', { clearedCount });
}
/**
* Graceful shutdown of the client
*/
public async shutdown(timeout: number = 30000): Promise<void> {
if (this.isShuttingDown) {
return;
}
this.isShuttingDown = true;
this.logger.info('Initiating client shutdown', { queueSize: this.messageQueue.length });
// Stop accepting new messages and clear timers
this.clearTimers();
try {
// Wait for current queue processing to complete or timeout
if (this.isProcessingQueue) {
await Promise.race([
this.waitForQueueCompletion(),
new Promise((_, reject) =>
setTimeout(() => reject(new TimeoutError('Shutdown timeout', timeout)), timeout)
)
]);
}
} catch (error) {
this.logger.warn('Shutdown timeout reached, forcing shutdown', error);
}
// Reject any remaining messages
this.rejectAllQueuedMessages(new ShutdownError('Client shutting down'));
// Close connection
await this.disconnect();
// Remove all listeners
this.removeAllListeners();
this.logger.info('Client shutdown completed');
}
/**
* Process the message queue
*/
private async processQueue(): Promise<void> {
if (this.isProcessingQueue || this.messageQueue.length === 0 || this.isShuttingDown) {
return;
}
this.isProcessingQueue = true;
this.emit('queueProcessingStarted', this.messageQueue.length);
this.logger.info('Queue processing started', { queueSize: this.messageQueue.length });
let processed = 0;
let failed = 0;
try {
// Connect if not connected
if (this.state === ConnectionState.DISCONNECTED) {
await this.connect();
}
// Process messages sequentially one at a time
while (this.messageQueue.length > 0 && !this.isShuttingDown) {
const message = this.messageQueue.shift()!;
try {
this.currentMessage = message;
const startTime = Date.now();
await this.processMessage(message);
const responseTime = Date.now() - startTime;
this.stats.messagesProcessed++;
this.stats.averageResponseTime =
(this.stats.averageResponseTime * (this.stats.messagesProcessed - 1) + responseTime) /
this.stats.messagesProcessed;
this.emit('messageProcessed', message.id, responseTime);
this.logger.debug('Message processed successfully', {
messageId: message.id,
responseTime
});
processed++;
} catch (error) {
this.stats.messagesFailed++;
this.stats.lastError = (error as Error).message;
this.stats.lastErrorTime = new Date();
this.emit('messageFailed', message.id, error as Error);
this.logger.error('Message processing failed', {
messageId: message.id,
error: (error as Error).message
});
failed++;
message.reject(error as Error);
} finally {
this.currentMessage = null;
}
}
} catch (error) {
this.logger.error('Queue processing error', error);
this.rejectAllQueuedMessages(error as Error);
failed += this.messageQueue.length;
} finally {
this.isProcessingQueue = false;
// Disconnect if queue is empty and not shutting down
if (this.messageQueue.length === 0 && !this.isShuttingDown) {
await this.disconnect();
}
this.emit('queueProcessingCompleted', processed, failed);
this.logger.info('Queue processing completed', { processed, failed });
}
}
/**
* Process a single message
*/
private async processMessage(message: QueuedMessage): Promise<void> {
return new Promise(async (resolve, reject) => {
try {
// Open SMTP channel
await this.openSMTPChannel();
// Wait for SMTP response
const responsePromise = this.waitForSMTPResponse(message.id);
// Send SMTP data
this.sendSMTPData(message.data);
// Wait for response
const response = await responsePromise;
message.resolve(response);
// Close SMTP channel
await this.closeSMTPChannel();
resolve();
} catch (error) {
// Retry logic
if (message.retries > 0) {
message.retries--;
this.logger.debug('Retrying message', {
messageId: message.id,
retriesLeft: message.retries
});
// Re-queue message
this.insertMessageByPriority(message);
resolve();
} else {
reject(ErrorFactory.messageError(
(error as Error).message,
message.id,
message.retries
));
}
}
});
}
/**
* Connect to WebSocket server
*/
private async connect(): Promise<void> {
if (this.state !== ConnectionState.DISCONNECTED && this.state !== ConnectionState.FAILED) {
return;
}
this.setState(ConnectionState.CONNECTING);
this.logger.debug('Connecting to WebSocket server', { url: this.config.url });
return new Promise((resolve, reject) => {
try {
this.ws = new WebSocket(this.config.url);
const connectionTimeout = setTimeout(() => {
if (this.ws) {
this.ws.terminate();
}
reject(ErrorFactory.timeout('Connection', this.config.authTimeout));
}, this.config.authTimeout);
this.ws.on('open', () => {
clearTimeout(connectionTimeout);
this.connectionStartTime = Date.now();
this.stats.totalConnections++;
this.reconnectAttempts = 0;
this.setState(ConnectionState.CONNECTED);
this.emit('connected');
this.logger.info('WebSocket connected');
// Start heartbeat
this.startHeartbeat();
// Authenticate
this.authenticate().then(resolve).catch(reject);
});
this.ws.on('message', (data) => {
this.handleMessage(data.toString());
});
this.ws.on('close', (code, reason) => {
clearTimeout(connectionTimeout);
this.logger.info('WebSocket closed', { code, reason: reason.toString() });
this.handleDisconnection(`Connection closed: ${code} ${reason}`);
});
this.ws.on('error', (error) => {
clearTimeout(connectionTimeout);
this.logger.error('WebSocket error', error);
const wsError = ErrorFactory.fromWebSocketError(error);
this.emit('error', wsError);
reject(wsError);
});
} catch (error) {
reject(ErrorFactory.fromWebSocketError(error as Error));
}
});
}
/**
* Authenticate with the server
*/
private async authenticate(): Promise<void> {
return new Promise((resolve, reject) => {
this.setState(ConnectionState.AUTHENTICATING);
this.logger.debug('Authenticating with server');
this.authTimer = setTimeout(() => {
reject(ErrorFactory.timeout('Authentication', this.config.authTimeout));
}, this.config.authTimeout);
const authMessage: AuthenticateMessage = {
type: SMTPOverWsMessageType.AUTHENTICATE,
data: { apiKey: this.config.apiKey }
};
this.sendMessage(authMessage);
const onAuthResponse = (message: AuthenticateResponseMessage) => {
if (this.authTimer) {
clearTimeout(this.authTimer);
this.authTimer = null;
}
if (message.data.success) {
this.setState(ConnectionState.AUTHENTICATED);
this.emit('authenticated');
this.logger.info('Authentication successful');
resolve();
} else {
const error = ErrorFactory.fromAuthenticationFailure(message.data.error);
this.emit('error', error);
this.logger.error('Authentication failed', { error: message.data.error });
reject(error);
}
};
this.once('authenticate_response', onAuthResponse);
});
}
/**
* Open SMTP channel
*/
private async openSMTPChannel(): Promise<void> {
if (this.state !== ConnectionState.AUTHENTICATED) {
throw new ChannelError('Cannot open SMTP channel: not authenticated');
}
return new Promise((resolve, reject) => {
this.setState(ConnectionState.CHANNEL_OPENING);
this.logger.debug('Opening SMTP channel');
this.channelTimer = setTimeout(() => {
reject(ErrorFactory.timeout('Channel open', this.config.channelTimeout));
}, this.config.channelTimeout);
const openMessage: SMTPChannelOpenMessage = {
type: SMTPOverWsMessageType.SMTP_CHANNEL_OPEN
};
this.sendMessage(openMessage);
const onChannelReady = () => {
// Channel is ready, now wait for SMTP 220 greeting
this.logger.debug('Channel ready, waiting for SMTP greeting');
const onGreeting = async (message: SMTPFromServerMessage) => {
this.logger.debug('RX SMTP greeting', {
response: message.data.trim(),
size: message.data.length
});
// Check if it's a 220 greeting
if (message.data.startsWith('220')) {
try {
// Automatically send EHLO after greeting
this.logger.debug('Auto-sending EHLO after greeting');
this.sendSMTPData('EHLO client\r\n');
// Wait for EHLO response
const onEhloResponse = (ehloMessage: SMTPFromServerMessage) => {
if (this.channelTimer) {
clearTimeout(this.channelTimer);
this.channelTimer = null;
}
this.logger.debug('RX SMTP EHLO response', {
response: ehloMessage.data.trim(),
size: ehloMessage.data.length
});
if (ehloMessage.data.startsWith('250')) {
this.setState(ConnectionState.CHANNEL_READY);
this.emit('channelOpened');
this.logger.debug('SMTP channel ready after EHLO');
resolve();
} else {
const error = ErrorFactory.fromChannelFailure(`EHLO rejected: ${ehloMessage.data.trim()}`);
this.emit('channelError', error);
reject(error);
}
};
this.once('smtp_from_server', onEhloResponse);
} catch (error) {
if (this.channelTimer) {
clearTimeout(this.channelTimer);
this.channelTimer = null;
}
const channelError = ErrorFactory.fromChannelFailure(`Failed to send EHLO: ${(error as Error).message}`);
this.emit('channelError', channelError);
reject(channelError);
}
} else {
if (this.channelTimer) {
clearTimeout(this.channelTimer);
this.channelTimer = null;
}
const error = ErrorFactory.fromChannelFailure(`Expected 220 greeting, got: ${message.data.trim()}`);
this.emit('channelError', error);
reject(error);
}
};
this.once('smtp_from_server', onGreeting);
};
const onChannelError = (message: SMTPChannelErrorMessage) => {
if (this.channelTimer) {
clearTimeout(this.channelTimer);
this.channelTimer = null;
}
this.setState(ConnectionState.CHANNEL_ERROR);
const error = ErrorFactory.fromChannelFailure(message.data.error);
this.emit('channelError', error);
this.logger.error('SMTP channel error', { error: message.data.error });
reject(error);
};
this.once('smtp_channel_ready', onChannelReady);
this.once('smtp_channel_error', onChannelError);
});
}
/**
* Close SMTP channel
*/
private async closeSMTPChannel(): Promise<void> {
return new Promise((resolve) => {
if (this.state === ConnectionState.CHANNEL_READY) {
const onChannelClosed = () => {
this.setState(ConnectionState.AUTHENTICATED);
this.emit('channelClosed');
this.logger.debug('SMTP channel closed');
resolve();
};
this.once('smtp_channel_closed', onChannelClosed);
// Fallback timeout
setTimeout(() => {
this.removeListener('smtp_channel_closed', onChannelClosed);
this.setState(ConnectionState.AUTHENTICATED);
resolve();
}, 5000);
} else {
resolve();
}
});
}
/**
* Send SMTP data to server
*/
private sendSMTPData(data: string): void {
const message: SMTPToServerMessage = {
type: SMTPOverWsMessageType.SMTP_TO_SERVER,
data
};
this.logger.debug('TX SMTP command', {
command: data.trim(),
size: data.length
});
this.sendMessage(message);
}
/**
* Wait for SMTP response
*/
private waitForSMTPResponse(messageId: string): Promise<string> {
return new Promise((resolve, reject) => {
const timeout = setTimeout(() => {
reject(ErrorFactory.timeout('SMTP response', this.config.messageTimeout, { messageId }));
}, this.config.messageTimeout);
const onResponse = (message: SMTPFromServerMessage) => {
clearTimeout(timeout);
this.logger.debug('RX SMTP response', {
messageId,
response: message.data.trim(),
size: message.data.length
});
resolve(message.data);
};
this.once('smtp_from_server', onResponse);
});
}
/**
* Send WebSocket message
*/
private sendMessage(message: SMTPOverWsMessage): void {
if (this.ws && this.ws.readyState === WebSocket.OPEN) {
const messageStr = JSON.stringify(message);
this.logger.debug('TX WebSocket message', {
type: message.type,
data: message.type === 'authenticate' ? '[REDACTED]' : (message as any).data,
raw: messageStr.length > 200 ? messageStr.substring(0, 200) + '...' : messageStr
});
this.ws.send(messageStr);
} else {
throw new ConnectionError('WebSocket is not connected');
}
}
/**
* Handle incoming WebSocket message
*/
private handleMessage(data: string): void {
try {
const message: SMTPOverWsMessage = JSON.parse(data);
this.logger.debug('RX WebSocket message', {
type: message.type,
data: message.type === 'authenticate_response' ? '[REDACTED]' : (message as any).data,
raw: data.length > 200 ? data.substring(0, 200) + '...' : data
});
// Use setImmediate to avoid catching errors from event handlers
setImmediate(() => {
(this as any).emit(message.type, message);
});
} catch (error) {
this.logger.error('Failed to parse WebSocket message', { data, error });
this.emit('error', ErrorFactory.protocolError('Invalid message format', undefined, { data }));
}
}
/**
* Handle WebSocket disconnection
*/
private handleDisconnection(reason?: string): void {
this.setState(ConnectionState.DISCONNECTED);
this.emit('disconnected', reason);
this.stopHeartbeat();
this.clearTimers();
if (this.isProcessingQueue && !this.isShuttingDown &&
this.reconnectAttempts < this.config.maxReconnectAttempts) {
this.scheduleReconnect();
} else if (this.isProcessingQueue && !this.isShuttingDown) {
this.logger.error('Max reconnection attempts reached');
this.setState(ConnectionState.FAILED);
this.rejectAllQueuedMessages(new ConnectionError('Max reconnection attempts reached'));
this.isProcessingQueue = false;
}
}
/**
* Schedule reconnection attempt
*/
private scheduleReconnect(): void {
this.reconnectAttempts++;
this.stats.reconnectionAttempts++;
this.setState(ConnectionState.RECONNECTING);
this.emit('reconnecting', this.reconnectAttempts, this.config.maxReconnectAttempts);
this.logger.info('Scheduling reconnection', {
attempt: this.reconnectAttempts,
maxAttempts: this.config.maxReconnectAttempts,
delay: this.config.reconnectInterval
});
this.reconnectTimer = setTimeout(() => {
this.reconnectTimer = null;
this.connect()
.then(() => {
this.emit('reconnected');
this.logger.info('Reconnection successful');
})
.catch((error) => {
this.logger.error('Reconnection failed', error);
this.handleDisconnection('Reconnection failed');
});
}, this.config.reconnectInterval * Math.min(this.reconnectAttempts, 5)); // Exponential backoff
}
/**
* Start heartbeat timer
*/
private startHeartbeat(): void {
if (this.config.heartbeatInterval > 0) {
this.heartbeatTimer = setInterval(() => {
if (this.ws && this.ws.readyState === WebSocket.OPEN) {
this.ws.ping();
}
}, this.config.heartbeatInterval);
}
}
/**
* Stop heartbeat timer
*/
private stopHeartbeat(): void {
if (this.heartbeatTimer) {
clearInterval(this.heartbeatTimer);
this.heartbeatTimer = null;
}
}
/**
* Clear all timers
*/
private clearTimers(): void {
if (this.reconnectTimer) {
clearTimeout(this.reconnectTimer);
this.reconnectTimer = null;
}
if (this.authTimer) {
clearTimeout(this.authTimer);
this.authTimer = null;
}
if (this.channelTimer) {
clearTimeout(this.channelTimer);
this.channelTimer = null;
}
if (this.messageTimer) {
clearTimeout(this.messageTimer);
this.messageTimer = null;
}
this.stopHeartbeat();
}
/**
* Set connection state and emit event
*/
private setState(newState: ConnectionState): void {
if (this.state !== newState) {
const oldState = this.state;
this.state = newState;
this.emit('stateChanged', oldState, newState);
this.logger.debug('State changed', { from: oldState, to: newState });
}
}
/**
* Reject all queued messages
*/
private rejectAllQueuedMessages(error: Error): void {
if (this.currentMessage) {
this.currentMessage.reject(error);
this.currentMessage = null;
}
while (this.messageQueue.length > 0) {
const message = this.messageQueue.shift()!;
message.reject(error);
}
}
/**
* Insert message into queue based on priority
*/
private insertMessageByPriority(message: QueuedMessage): void {
let insertIndex = this.messageQueue.length;
// Find insertion point based on priority
for (let i = 0; i < this.messageQueue.length; i++) {
if (message.priority > this.messageQueue[i]!.priority) {
insertIndex = i;
break;
}
}
this.messageQueue.splice(insertIndex, 0, message);
}
/**
* Remove message from queue by ID
*/
private removeMessageFromQueue(messageId: string): boolean {
const index = this.messageQueue.findIndex(msg => msg.id === messageId);
if (index !== -1) {
this.messageQueue.splice(index, 1);
return true;
}
return false;
}
/**
* Generate unique message ID
*/
private generateMessageId(): string {
return `msg_${Date.now()}_${++this.messageIdCounter}`;
}
/**
* Validate client configuration
*/
private validateConfig(config: SMTPClientConfig): void {
if (!config.url) {
throw ErrorFactory.configurationError('URL is required', 'url');
}
if (!config.apiKey) {
throw ErrorFactory.configurationError('API key is required', 'apiKey');
}
if (config.reconnectInterval && config.reconnectInterval < 1000) {
throw ErrorFactory.configurationError('Reconnect interval must be at least 1000ms', 'reconnectInterval');
}
if (config.maxReconnectAttempts && config.maxReconnectAttempts < 1) {
throw ErrorFactory.configurationError('Max reconnect attempts must be at least 1', 'maxReconnectAttempts');
}
if (config.maxQueueSize && config.maxQueueSize < 1) {
throw ErrorFactory.configurationError('Max queue size must be at least 1', 'maxQueueSize');
}
}
/**
* Initialize statistics
*/
private initializeStats(): ClientStats {
return {
messagesQueued: 0,
messagesProcessed: 0,
messagesFailed: 0,
reconnectionAttempts: 0,
totalConnections: 0,
averageResponseTime: 0,
queueSize: 0,
connectionUptime: 0
};
}
/**
* Wait for queue processing to complete
*/
private waitForQueueCompletion(): Promise<void> {
return new Promise((resolve) => {
if (!this.isProcessingQueue) {
resolve();
return;
}
const onComplete = () => {
resolve();
};
this.once('queueProcessingCompleted', onComplete);
});
}
/**
* Disconnect from WebSocket server
*/
private async disconnect(): Promise<void> {
this.clearTimers();
if (this.ws && this.ws.readyState === WebSocket.OPEN) {
this.ws.close();
}
this.ws = null;
this.setState(ConnectionState.DISCONNECTED);
this.connectionStartTime = 0;
}
// Type-safe event emitter methods
public on<K extends keyof ClientEvents>(event: K, listener: ClientEvents[K]): this {
return super.on(event, listener);
}
public once<K extends keyof ClientEvents>(event: K, listener: ClientEvents[K]): this {
return super.once(event, listener);
}
public emit<K extends keyof ClientEvents>(event: K, ...args: Parameters<ClientEvents[K]>): boolean {
return super.emit(event, ...args);
}
}

207
src/errors.ts Normal file
View file

@ -0,0 +1,207 @@
/**
* @fileoverview Error classes for SMTP over WebSocket client
*/
/**
* Base error class for all SMTP WebSocket client errors
*/
export abstract class SMTPWSError extends Error {
public readonly code: string;
public readonly timestamp: Date;
public readonly context?: Record<string, any>;
constructor(message: string, code: string, context?: Record<string, any>) {
super(message);
this.name = this.constructor.name;
this.code = code;
this.timestamp = new Date();
this.context = context ?? {};
// Maintains proper stack trace for where our error was thrown (only available on V8)
if (Error.captureStackTrace) {
Error.captureStackTrace(this, this.constructor);
}
}
/**
* Convert error to JSON for logging
*/
toJSON(): Record<string, any> {
return {
name: this.name,
message: this.message,
code: this.code,
timestamp: this.timestamp.toISOString(),
context: this.context,
stack: this.stack
};
}
}
/**
* Connection-related errors
*/
export class ConnectionError extends SMTPWSError {
constructor(message: string, context?: Record<string, any>) {
super(message, 'CONNECTION_ERROR', context);
}
}
/**
* Authentication-related errors
*/
export class AuthenticationError extends SMTPWSError {
constructor(message: string, context?: Record<string, any>) {
super(message, 'AUTHENTICATION_ERROR', context);
}
}
/**
* SMTP channel-related errors
*/
export class ChannelError extends SMTPWSError {
constructor(message: string, context?: Record<string, any>) {
super(message, 'CHANNEL_ERROR', context);
}
}
/**
* Message timeout errors
*/
export class TimeoutError extends SMTPWSError {
constructor(message: string, timeout: number, context?: Record<string, any>) {
super(message, 'TIMEOUT_ERROR', { ...context, timeout });
}
}
/**
* Queue-related errors
*/
export class QueueError extends SMTPWSError {
constructor(message: string, context?: Record<string, any>) {
super(message, 'QUEUE_ERROR', context);
}
}
/**
* Protocol-related errors
*/
export class ProtocolError extends SMTPWSError {
constructor(message: string, context?: Record<string, any>) {
super(message, 'PROTOCOL_ERROR', context);
}
}
/**
* Configuration-related errors
*/
export class ConfigurationError extends SMTPWSError {
constructor(message: string, context?: Record<string, any>) {
super(message, 'CONFIGURATION_ERROR', context);
}
}
/**
* Message processing errors
*/
export class MessageError extends SMTPWSError {
public readonly messageId: string;
public readonly retryCount: number;
constructor(message: string, messageId: string, retryCount: number = 0, context?: Record<string, any>) {
super(message, 'MESSAGE_ERROR', { ...context, messageId, retryCount });
this.messageId = messageId;
this.retryCount = retryCount;
}
}
/**
* Client shutdown errors
*/
export class ShutdownError extends SMTPWSError {
constructor(message: string, context?: Record<string, any>) {
super(message, 'SHUTDOWN_ERROR', context);
}
}
/**
* Network-related errors
*/
export class NetworkError extends SMTPWSError {
constructor(message: string, context?: Record<string, any>) {
super(message, 'NETWORK_ERROR', context);
}
}
/**
* Error factory for creating appropriate error types
*/
export class ErrorFactory {
/**
* Create an error from WebSocket error events
*/
static fromWebSocketError(error: Error, context?: Record<string, any>): SMTPWSError {
if (error.message.includes('timeout')) {
return new TimeoutError(error.message, 0, context);
}
if (error.message.includes('connection')) {
return new ConnectionError(error.message, context);
}
if (error.message.includes('network') || error.message.includes('ENOTFOUND') || error.message.includes('ECONNREFUSED')) {
return new NetworkError(error.message, context);
}
return new ConnectionError(error.message, context);
}
/**
* Create an error from authentication failure
*/
static fromAuthenticationFailure(errorMessage?: string, context?: Record<string, any>): AuthenticationError {
return new AuthenticationError(errorMessage || 'Authentication failed', context);
}
/**
* Create an error from channel failure
*/
static fromChannelFailure(errorMessage: string, context?: Record<string, any>): ChannelError {
return new ChannelError(errorMessage, context);
}
/**
* Create a timeout error
*/
static timeout(operation: string, timeout: number, context?: Record<string, any>): TimeoutError {
return new TimeoutError(`${operation} timed out after ${timeout}ms`, timeout, context);
}
/**
* Create a queue error
*/
static queueError(message: string, queueSize: number, context?: Record<string, any>): QueueError {
return new QueueError(message, { ...context, queueSize });
}
/**
* Create a message error
*/
static messageError(message: string, messageId: string, retryCount: number = 0, context?: Record<string, any>): MessageError {
return new MessageError(message, messageId, retryCount, context);
}
/**
* Create a configuration error
*/
static configurationError(message: string, field?: string, context?: Record<string, any>): ConfigurationError {
return new ConfigurationError(message, { ...context, field });
}
/**
* Create a protocol error
*/
static protocolError(message: string, messageType?: string, context?: Record<string, any>): ProtocolError {
return new ProtocolError(message, { ...context, messageType });
}
}

63
src/index.ts Normal file
View file

@ -0,0 +1,63 @@
/**
* @fileoverview Main entry point for SMTP over WebSocket client library
*/
// Export main client class
export { SMTPOverWSClient } from './client';
// Export all types
export {
SMTPOverWsMessageType,
ConnectionState,
MessagePriority,
type SMTPOverWsMessageBase,
type AuthenticateMessage,
type AuthenticateResponseMessage,
type SMTPChannelOpenMessage,
type SMTPChannelReadyMessage,
type SMTPChannelClosedMessage,
type SMTPChannelErrorMessage,
type SMTPToServerMessage,
type SMTPFromServerMessage,
type SMTPOverWsMessage,
type QueuedMessage,
type SMTPClientConfig,
type Logger,
type ClientStats,
type ClientEvents,
type SendOptions
} from './types';
// Export all error classes
export {
SMTPWSError,
ConnectionError,
AuthenticationError,
ChannelError,
TimeoutError,
QueueError,
MessageError,
ShutdownError,
NetworkError,
ProtocolError,
ConfigurationError,
ErrorFactory
} from './errors';
// Export Nodemailer transport
export {
SMTPWSTransport,
createTransport,
type TransportOptions,
type Envelope,
type MailMessage,
type SendResult,
type TransportInfo
} from './transport';
// Version information (will be updated by build process)
export const VERSION = '1.0.0';
// Re-export for convenience
import { SMTPOverWSClient } from './client';
export default SMTPOverWSClient;

352
src/transport.ts Normal file
View file

@ -0,0 +1,352 @@
/**
* @fileoverview Nodemailer transport adapter for SMTP over WebSocket
*/
import { EventEmitter } from 'events';
import { SMTPOverWSClient } from './client';
import { SMTPClientConfig, ConnectionState } from './types';
import { ConnectionError, MessageError, TimeoutError } from './errors';
/**
* Nodemailer transport interface compatibility
*/
export interface TransportOptions extends Omit<SMTPClientConfig, 'url' | 'apiKey'> {
/** WebSocket server URL */
host: string;
/** WebSocket server port */
port?: number;
/** Use secure WebSocket (wss) */
secure?: boolean;
/** API key for authentication */
auth: {
user: string; // API key
pass?: string; // Optional, for future use
};
/** Transport name */
name?: string;
/** Transport version */
version?: string;
}
/**
* Mail envelope information
*/
export interface Envelope {
from: string;
to: string[];
}
/**
* Mail data structure (nodemailer format)
*/
export interface MailMessage {
data: any;
message: {
_envelope: Envelope;
_raw: string | Buffer;
};
mailer: any;
}
/**
* Transport send result
*/
export interface SendResult {
envelope: Envelope;
messageId: string;
accepted: string[];
rejected: string[];
pending: string[];
response: string;
}
/**
* Transport info for Nodemailer compatibility
*/
export interface TransportInfo {
name: string;
version: string;
[key: string]: any;
}
/**
* SMTP over WebSocket Nodemailer Transport
*/
export class SMTPWSTransport extends EventEmitter {
public name: string = 'SMTPWS';
public version: string = '1.0.0';
private client: SMTPOverWSClient;
private options: TransportOptions;
private _isIdle: boolean = true;
constructor(options: TransportOptions) {
super();
this.options = options;
// Convert transport options to client config
const protocol = options.secure ? 'wss' : 'ws';
const port = options.port || (options.secure ? 443 : 3000);
const url = `${protocol}://${options.host}:${port}/smtp`;
const clientConfig: SMTPClientConfig = {
url,
apiKey: options.auth.user,
...options
};
this.client = new SMTPOverWSClient(clientConfig);
this.setupClientEvents();
}
/**
* Get transport info
*/
public getTransportInfo(): TransportInfo {
return {
name: this.name,
version: this.version,
host: this.options.host,
port: this.options.port || 3000,
secure: this.options.secure || false
};
}
/**
* Send mail using the transport
*/
public async send(mail: MailMessage, callback?: (err: Error | null, info?: SendResult) => void): Promise<SendResult> {
try {
this._isIdle = false;
this.emit('idle', false);
const result = await this.sendMail(mail);
this._isIdle = true;
this.emit('idle', true);
if (callback) {
callback(null, result);
}
return result;
} catch (error) {
this._isIdle = true;
this.emit('idle', true);
if (callback) {
callback(error as Error);
}
throw error;
}
}
/**
* Verify transport configuration
*/
public async verify(): Promise<boolean> {
try {
// Test full connection cycle: connect -> authenticate -> open SMTP -> close SMTP -> disconnect
await this.client.sendSMTPCommand('EHLO transport-verify\r\n');
return true;
} catch (error) {
const err = error as any;
let message = 'Transport verification failed';
// Provide more specific error messages
if (err.code === 'AUTHENTICATION_ERROR') {
message = `Authentication failed: ${err.message}. Check your API key.`;
} else if (err.code === 'CONNECTION_ERROR') {
message = `Connection failed: ${err.message}. Check your host and port.`;
} else if (err.code === 'TIMEOUT_ERROR') {
message = `Connection timeout: ${err.message}. Server may be unreachable.`;
} else {
message = `${message}: ${err.message}`;
}
throw new ConnectionError(message);
}
}
/**
* Close the transport
*/
public async close(): Promise<void> {
await this.client.shutdown();
this.removeAllListeners();
}
/**
* Check if transport is idle
*/
public isIdle(): boolean {
return this._isIdle && this.client.getQueueSize() === 0;
}
/**
* Internal method to send mail
*/
private async sendMail(mail: MailMessage): Promise<SendResult> {
const envelope = mail.message._envelope;
const raw = mail.message._raw;
const messageId = this.generateMessageId();
const accepted: string[] = [];
const rejected: string[] = [];
const responses: string[] = [];
try {
// Debug envelope
console.log('DEBUG envelope:', {
from: envelope.from,
to: envelope.to,
messageId,
envelopeKeys: Object.keys(envelope || {}),
envelope: envelope
});
// EHLO is now sent automatically when channel opens
// Send MAIL FROM
const mailFromResponse = await this.client.sendSMTPCommand(`MAIL FROM: <${envelope.from}>\r\n`);
responses.push(mailFromResponse);
if (!this.isSuccessResponse(mailFromResponse)) {
throw new MessageError(`MAIL FROM rejected: ${mailFromResponse}`, messageId);
}
// Send RCPT TO for each recipient
for (const recipient of envelope.to) {
try {
const rcptResponse = await this.client.sendSMTPCommand(`RCPT TO: <${recipient}>\r\n`);
responses.push(rcptResponse);
if (this.isSuccessResponse(rcptResponse)) {
accepted.push(recipient);
} else {
rejected.push(recipient);
}
} catch (error) {
rejected.push(recipient);
responses.push(`Error for ${recipient}: ${(error as Error).message}`);
}
}
// If no recipients were accepted, fail
if (accepted.length === 0) {
throw new MessageError('All recipients were rejected', messageId);
}
// Send DATA command
const dataResponse = await this.client.sendSMTPCommand('DATA\r\n');
responses.push(dataResponse);
if (!this.isSuccessResponse(dataResponse)) {
throw new MessageError(`DATA command rejected: ${dataResponse}`, messageId);
}
// Send message content
const messageData = this.prepareMessageData(raw);
const contentResponse = await this.client.sendSMTPCommand(messageData);
responses.push(contentResponse);
if (!this.isSuccessResponse(contentResponse)) {
throw new MessageError(`Message data rejected: ${contentResponse}`, messageId);
}
// Send QUIT
try {
const quitResponse = await this.client.sendSMTPCommand('QUIT\r\n');
responses.push(quitResponse);
} catch (error) {
// QUIT failure is not critical
}
return {
envelope,
messageId,
accepted,
rejected,
pending: [],
response: responses.join('\n')
};
} catch (error) {
// Try to send RSET to clean up
try {
await this.client.sendSMTPCommand('RSET\r\n');
} catch {
// Ignore RSET failures
}
throw error;
}
}
/**
* Setup client event forwarding
*/
private setupClientEvents(): void {
this.client.on('error', (error) => {
this.emit('error', error);
});
this.client.on('connected', () => {
this.emit('connect');
});
this.client.on('disconnected', () => {
this.emit('close');
});
this.client.on('messageProcessed', () => {
this.emit('idle', this._isIdle);
});
}
/**
* Check if SMTP response indicates success
*/
private isSuccessResponse(response: string): boolean {
const statusCode = response.substring(0, 3);
return statusCode.startsWith('2') || statusCode.startsWith('3');
}
/**
* Prepare message data for transmission
*/
private prepareMessageData(raw: string | Buffer): string {
let messageData = raw.toString();
// Ensure message ends with CRLF.CRLF
if (!messageData.endsWith('\r\n')) {
messageData += '\r\n';
}
messageData += '.\r\n';
// Escape lines that start with a dot
messageData = messageData.replace(/\n\./g, '\n..');
return messageData;
}
/**
* Generate unique message ID
*/
private generateMessageId(): string {
return `smtp-ws-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`;
}
}
/**
* Create transport instance
*/
export function createTransport(options: TransportOptions): SMTPWSTransport {
return new SMTPWSTransport(options);
}

263
src/types.ts Normal file
View file

@ -0,0 +1,263 @@
/**
* @fileoverview Type definitions for SMTP over WebSocket protocol
*/
/**
* Message types supported by the SMTP over WebSocket protocol
*/
export enum SMTPOverWsMessageType {
AUTHENTICATE = 'authenticate',
AUTHENTICATE_RESPONSE = 'authenticate_response',
SMTP_TO_SERVER = 'smtp_to_server',
SMTP_FROM_SERVER = 'smtp_from_server',
SMTP_CHANNEL_OPEN = 'smtp_channel_open',
SMTP_CHANNEL_CLOSED = 'smtp_channel_closed',
SMTP_CHANNEL_ERROR = 'smtp_channel_error',
SMTP_CHANNEL_READY = 'smtp_channel_ready'
}
/**
* Base interface for all WebSocket messages
*/
export interface SMTPOverWsMessageBase {
type: SMTPOverWsMessageType;
}
/**
* Authentication request message
*/
export interface AuthenticateMessage extends SMTPOverWsMessageBase {
type: SMTPOverWsMessageType.AUTHENTICATE;
data: {
apiKey: string;
};
}
/**
* Authentication response message
*/
export interface AuthenticateResponseMessage extends SMTPOverWsMessageBase {
type: SMTPOverWsMessageType.AUTHENTICATE_RESPONSE;
data: {
success: boolean;
error?: string;
};
}
/**
* SMTP channel open request message
*/
export interface SMTPChannelOpenMessage extends SMTPOverWsMessageBase {
type: SMTPOverWsMessageType.SMTP_CHANNEL_OPEN;
data?: null;
}
/**
* SMTP channel ready notification message
*/
export interface SMTPChannelReadyMessage extends SMTPOverWsMessageBase {
type: SMTPOverWsMessageType.SMTP_CHANNEL_READY;
data?: null;
}
/**
* SMTP channel closed notification message
*/
export interface SMTPChannelClosedMessage extends SMTPOverWsMessageBase {
type: SMTPOverWsMessageType.SMTP_CHANNEL_CLOSED;
data?: null;
}
/**
* SMTP channel error notification message
*/
export interface SMTPChannelErrorMessage extends SMTPOverWsMessageBase {
type: SMTPOverWsMessageType.SMTP_CHANNEL_ERROR;
data: {
error: string;
};
}
/**
* SMTP data to server message
*/
export interface SMTPToServerMessage extends SMTPOverWsMessageBase {
type: SMTPOverWsMessageType.SMTP_TO_SERVER;
data: string;
}
/**
* SMTP data from server message
*/
export interface SMTPFromServerMessage extends SMTPOverWsMessageBase {
type: SMTPOverWsMessageType.SMTP_FROM_SERVER;
data: string;
}
/**
* Union type for all possible WebSocket messages
*/
export type SMTPOverWsMessage =
| AuthenticateMessage
| AuthenticateResponseMessage
| SMTPChannelOpenMessage
| SMTPChannelReadyMessage
| SMTPChannelClosedMessage
| SMTPChannelErrorMessage
| SMTPToServerMessage
| SMTPFromServerMessage;
/**
* Connection state enum
*/
export enum ConnectionState {
DISCONNECTED = 'disconnected',
CONNECTING = 'connecting',
CONNECTED = 'connected',
AUTHENTICATING = 'authenticating',
AUTHENTICATED = 'authenticated',
CHANNEL_OPENING = 'channel_opening',
CHANNEL_READY = 'channel_ready',
CHANNEL_ERROR = 'channel_error',
CHANNEL_CLOSED = 'channel_closed',
RECONNECTING = 'reconnecting',
FAILED = 'failed'
}
/**
* Queued message structure
*/
export interface QueuedMessage {
id: string;
data: string;
resolve: (response: string) => void;
reject: (error: Error) => void;
timestamp: number;
retries: number;
priority: MessagePriority;
}
/**
* Message priority levels
*/
export enum MessagePriority {
LOW = 0,
NORMAL = 1,
HIGH = 2,
CRITICAL = 3
}
/**
* Client configuration options
*/
export interface SMTPClientConfig {
/** WebSocket server URL */
url: string;
/** API key for authentication */
apiKey: string;
/** Interval between reconnection attempts in milliseconds */
reconnectInterval?: number;
/** Maximum number of reconnection attempts */
maxReconnectAttempts?: number;
/** Authentication timeout in milliseconds */
authTimeout?: number;
/** Channel open/close timeout in milliseconds */
channelTimeout?: number;
/** Message timeout in milliseconds */
messageTimeout?: number;
/** Maximum number of concurrent messages */
maxConcurrentMessages?: number;
/** Enable debug logging */
debug?: boolean;
/** Custom logger function */
logger?: Logger;
/** Connection heartbeat interval in milliseconds */
heartbeatInterval?: number;
/** Maximum queue size */
maxQueueSize?: number;
}
/**
* Logger interface
*/
export interface Logger {
debug(message: string, ...args: any[]): void;
info(message: string, ...args: any[]): void;
warn(message: string, ...args: any[]): void;
error(message: string, ...args: any[]): void;
}
/**
* Client statistics
*/
export interface ClientStats {
messagesQueued: number;
messagesProcessed: number;
messagesFailed: number;
reconnectionAttempts: number;
totalConnections: number;
averageResponseTime: number;
queueSize: number;
connectionUptime: number;
lastError?: string;
lastErrorTime?: Date;
}
/**
* Event types emitted by the client
*/
export interface ClientEvents {
connecting: () => void;
connected: () => void;
authenticated: () => void;
disconnected: (reason?: string) => void;
reconnecting: (attempt: number, maxAttempts: number) => void;
reconnected: () => void;
error: (error: Error) => void;
messageQueued: (messageId: string, queueSize: number) => void;
messageProcessed: (messageId: string, responseTime: number) => void;
messageFailed: (messageId: string, error: Error) => void;
queueProcessingStarted: (queueSize: number) => void;
queueProcessingCompleted: (processed: number, failed: number) => void;
channelOpened: () => void;
channelClosed: () => void;
channelError: (error: Error) => void;
stateChanged: (oldState: ConnectionState, newState: ConnectionState) => void;
// WebSocket message events
authenticate: (message: AuthenticateMessage) => void;
authenticate_response: (message: AuthenticateResponseMessage) => void;
smtp_channel_open: (message: SMTPChannelOpenMessage) => void;
smtp_channel_ready: (message: SMTPChannelReadyMessage) => void;
smtp_channel_closed: (message: SMTPChannelClosedMessage) => void;
smtp_channel_error: (message: SMTPChannelErrorMessage) => void;
smtp_to_server: (message: SMTPToServerMessage) => void;
smtp_from_server: (message: SMTPFromServerMessage) => void;
}
/**
* Message send options
*/
export interface SendOptions {
/** Message priority */
priority?: MessagePriority;
/** Message timeout in milliseconds */
timeout?: number;
/** Number of retry attempts */
retries?: number;
/** Whether to skip queue and send immediately */
immediate?: boolean;
}