feat: Update SMTPOverWSClient to improve error handling and reconnection logic for better session management

This commit is contained in:
Siwat Sirichai 2025-08-19 18:43:45 +07:00
parent b035b27da7
commit 6d1070b7e2
2 changed files with 138 additions and 16 deletions

View file

@ -1,6 +1,6 @@
{ {
"name": "@siwats/mxrelay-consumer", "name": "@siwats/mxrelay-consumer",
"version": "1.2.0", "version": "1.2.1",
"description": "An internal TypeScript client library for transporting SMTP messages", "description": "An internal TypeScript client library for transporting SMTP messages",
"main": "lib/index.js", "main": "lib/index.js",
"module": "lib/index.esm.js", "module": "lib/index.esm.js",

View file

@ -363,8 +363,24 @@ export class SMTPOverWSClient extends EventEmitter {
} catch (error) { } catch (error) {
this.logger.error('Queue processing error', error); this.logger.error('Queue processing error', error);
this.rejectAllQueuedSessions(error as Error);
failed += this.sessionQueue.length; // If it's a connection error, re-queue current session and schedule reconnect
if (this.isConnectionError(error as Error)) {
this.requeueCurrentSession();
this.handleDisconnection('Queue processing connection error');
} else {
// For other errors, only reject during shutdown
if (this.isShuttingDown) {
this.rejectAllQueuedSessions(error as Error);
failed += this.sessionQueue.length;
} else {
// Log error but keep sessions queued for retry
this.logger.warn('Non-connection error in queue processing, will retry', {
error: (error as Error).message,
queueSize: this.sessionQueue.length
});
}
}
} finally { } finally {
this.isProcessingQueue = false; this.isProcessingQueue = false;
this.sessionState = SessionState.IDLE; this.sessionState = SessionState.IDLE;
@ -448,7 +464,20 @@ export class SMTPOverWSClient extends EventEmitter {
} catch (error) { } catch (error) {
this.stats.emailsFailed++; this.stats.emailsFailed++;
// Reject individual email // Check if it's a connection-related error
if (this.isConnectionError(error as Error)) {
this.logger.warn('Connection error during email processing, re-queuing session', {
sessionId: session.id,
emailId: email.id,
error: (error as Error).message
});
// Don't reject the email, re-queue the session for retry
this.insertSessionByPriority(session);
return;
}
// For non-connection errors, reject individual email
email.reject(error as Error); email.reject(error as Error);
this.emit('emailFailed', session.id, email.id, error as Error); this.emit('emailFailed', session.id, email.id, error as Error);
@ -506,8 +535,14 @@ export class SMTPOverWSClient extends EventEmitter {
this.once('smtp_from_server', onResponse); this.once('smtp_from_server', onResponse);
// Send the command try {
this.sendSMTPData(command); // Send the command - check if connection is still valid
this.sendSMTPData(command);
} catch (error) {
clearTimeout(timeout);
this.removeListener('smtp_from_server', onResponse);
reject(error);
}
}); });
} }
@ -786,7 +821,13 @@ export class SMTPOverWSClient extends EventEmitter {
command: data.trim(), command: data.trim(),
size: data.length size: data.length
}); });
this.sendMessage(message);
try {
this.sendMessage(message);
} catch (error) {
// Re-throw with connection error context
throw new ConnectionError(`Failed to send SMTP command: ${(error as Error).message}`);
}
} }
@ -837,14 +878,23 @@ export class SMTPOverWSClient extends EventEmitter {
this.stopHeartbeat(); this.stopHeartbeat();
this.clearTimers(); this.clearTimers();
if (this.isProcessingQueue && !this.isShuttingDown && // Always try to reconnect if we have queued sessions and not shutting down
this.reconnectAttempts < this.config.maxReconnectAttempts) { // NEVER reject emails - keep trying with periodic retries
this.scheduleReconnect(); if (this.isProcessingQueue && !this.isShuttingDown) {
} else if (this.isProcessingQueue && !this.isShuttingDown) { if (this.reconnectAttempts < this.config.maxReconnectAttempts) {
this.logger.error('Max reconnection attempts reached'); this.scheduleReconnect();
this.setState(ConnectionState.FAILED); } else {
this.rejectAllQueuedSessions(new ConnectionError('Max reconnection attempts reached')); // Reset reconnect attempts and continue with longer delays
this.isProcessingQueue = false; this.logger.warn('Max reconnect attempts reached, switching to periodic retry mode');
this.reconnectAttempts = 0;
this.schedulePeriodicReconnect();
}
} else if (this.sessionQueue.length > 0 && !this.isShuttingDown) {
// If we have queued sessions but not currently processing, start periodic retries
this.logger.info('Queued sessions detected, starting periodic reconnection');
this.isProcessingQueue = true;
this.reconnectAttempts = 0;
this.schedulePeriodicReconnect();
} }
} }
@ -877,6 +927,41 @@ export class SMTPOverWSClient extends EventEmitter {
}, this.config.reconnectInterval * Math.min(this.reconnectAttempts, 5)); // Exponential backoff }, this.config.reconnectInterval * Math.min(this.reconnectAttempts, 5)); // Exponential backoff
} }
/**
* Schedule periodic reconnection attempts (never give up)
*/
private schedulePeriodicReconnect(): void {
this.setState(ConnectionState.RECONNECTING);
// Use longer intervals for periodic retries (5-30 minutes)
const periodicInterval = Math.min(300000 + (this.reconnectAttempts * 60000), 1800000); // 5min to 30min
this.emit('reconnecting', this.reconnectAttempts + 1, Infinity);
this.logger.info('Scheduling periodic reconnection', {
attempt: this.reconnectAttempts + 1,
delay: periodicInterval,
queueSize: this.sessionQueue.length
});
this.reconnectTimer = setTimeout(() => {
this.reconnectTimer = null;
this.reconnectAttempts++;
this.stats.reconnectionAttempts++;
this.connect()
.then(() => {
this.emit('reconnected');
this.logger.info('Periodic reconnection successful');
this.reconnectAttempts = 0; // Reset on success
})
.catch((error) => {
this.logger.error('Periodic reconnection failed', error);
// Continue periodic retries
this.schedulePeriodicReconnect();
});
}, periodicInterval);
}
/** /**
* Start heartbeat timer * Start heartbeat timer
*/ */
@ -940,7 +1025,7 @@ export class SMTPOverWSClient extends EventEmitter {
} }
/** /**
* Reject all queued sessions * Reject all queued sessions (only used during shutdown)
*/ */
private rejectAllQueuedSessions(error: Error): void { private rejectAllQueuedSessions(error: Error): void {
if (this.currentSession) { if (this.currentSession) {
@ -954,6 +1039,22 @@ export class SMTPOverWSClient extends EventEmitter {
} }
} }
/**
* Re-queue current session for retry (never reject emails due to connection issues)
*/
private requeueCurrentSession(): void {
if (this.currentSession) {
this.logger.info('Re-queuing current session due to connection failure', {
sessionId: this.currentSession.id,
retriesLeft: this.currentSession.retries
});
// Put the current session back at the front of the queue
this.insertSessionByPriority(this.currentSession);
this.currentSession = null;
}
}
/** /**
* Generate unique session ID * Generate unique session ID
@ -1129,6 +1230,27 @@ export class SMTPOverWSClient extends EventEmitter {
}); });
} }
/**
* Check if an error is connection-related
*/
private isConnectionError(error: Error): boolean {
const connectionErrorMessages = [
'WebSocket is not connected',
'Connection closed',
'Connection lost',
'WebSocket connection failed',
'Network error',
'ENOTFOUND',
'ECONNREFUSED',
'ETIMEDOUT',
'ECONNRESET'
];
return connectionErrorMessages.some(msg =>
error.message.toLowerCase().includes(msg.toLowerCase())
) || error.name === 'ConnectionError' || error.name === 'NetworkError';
}
// Type-safe event emitter methods // Type-safe event emitter methods
public on<K extends keyof ClientEvents>(event: K, listener: ClientEvents[K]): this { public on<K extends keyof ClientEvents>(event: K, listener: ClientEvents[K]): this {
return super.on(event, listener); return super.on(event, listener);