diff --git a/package.json b/package.json index 4866fab..fa7cbf8 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@siwats/mxrelay-consumer", - "version": "1.2.0", + "version": "1.2.1", "description": "An internal TypeScript client library for transporting SMTP messages", "main": "lib/index.js", "module": "lib/index.esm.js", diff --git a/src/client.ts b/src/client.ts index 83684d9..4350f67 100644 --- a/src/client.ts +++ b/src/client.ts @@ -363,8 +363,24 @@ export class SMTPOverWSClient extends EventEmitter { } catch (error) { this.logger.error('Queue processing error', error); - this.rejectAllQueuedSessions(error as Error); - failed += this.sessionQueue.length; + + // If it's a connection error, re-queue current session and schedule reconnect + if (this.isConnectionError(error as Error)) { + this.requeueCurrentSession(); + this.handleDisconnection('Queue processing connection error'); + } else { + // For other errors, only reject during shutdown + if (this.isShuttingDown) { + this.rejectAllQueuedSessions(error as Error); + failed += this.sessionQueue.length; + } else { + // Log error but keep sessions queued for retry + this.logger.warn('Non-connection error in queue processing, will retry', { + error: (error as Error).message, + queueSize: this.sessionQueue.length + }); + } + } } finally { this.isProcessingQueue = false; this.sessionState = SessionState.IDLE; @@ -448,7 +464,20 @@ export class SMTPOverWSClient extends EventEmitter { } catch (error) { this.stats.emailsFailed++; - // Reject individual email + // Check if it's a connection-related error + if (this.isConnectionError(error as Error)) { + this.logger.warn('Connection error during email processing, re-queuing session', { + sessionId: session.id, + emailId: email.id, + error: (error as Error).message + }); + + // Don't reject the email, re-queue the session for retry + this.insertSessionByPriority(session); + return; + } + + // For non-connection errors, reject individual email email.reject(error as Error); this.emit('emailFailed', session.id, email.id, error as Error); @@ -506,8 +535,14 @@ export class SMTPOverWSClient extends EventEmitter { this.once('smtp_from_server', onResponse); - // Send the command - this.sendSMTPData(command); + try { + // Send the command - check if connection is still valid + this.sendSMTPData(command); + } catch (error) { + clearTimeout(timeout); + this.removeListener('smtp_from_server', onResponse); + reject(error); + } }); } @@ -786,7 +821,13 @@ export class SMTPOverWSClient extends EventEmitter { command: data.trim(), size: data.length }); - this.sendMessage(message); + + try { + this.sendMessage(message); + } catch (error) { + // Re-throw with connection error context + throw new ConnectionError(`Failed to send SMTP command: ${(error as Error).message}`); + } } @@ -837,14 +878,23 @@ export class SMTPOverWSClient extends EventEmitter { this.stopHeartbeat(); this.clearTimers(); - if (this.isProcessingQueue && !this.isShuttingDown && - this.reconnectAttempts < this.config.maxReconnectAttempts) { - this.scheduleReconnect(); - } else if (this.isProcessingQueue && !this.isShuttingDown) { - this.logger.error('Max reconnection attempts reached'); - this.setState(ConnectionState.FAILED); - this.rejectAllQueuedSessions(new ConnectionError('Max reconnection attempts reached')); - this.isProcessingQueue = false; + // Always try to reconnect if we have queued sessions and not shutting down + // NEVER reject emails - keep trying with periodic retries + if (this.isProcessingQueue && !this.isShuttingDown) { + if (this.reconnectAttempts < this.config.maxReconnectAttempts) { + this.scheduleReconnect(); + } else { + // Reset reconnect attempts and continue with longer delays + this.logger.warn('Max reconnect attempts reached, switching to periodic retry mode'); + this.reconnectAttempts = 0; + this.schedulePeriodicReconnect(); + } + } else if (this.sessionQueue.length > 0 && !this.isShuttingDown) { + // If we have queued sessions but not currently processing, start periodic retries + this.logger.info('Queued sessions detected, starting periodic reconnection'); + this.isProcessingQueue = true; + this.reconnectAttempts = 0; + this.schedulePeriodicReconnect(); } } @@ -877,6 +927,41 @@ export class SMTPOverWSClient extends EventEmitter { }, this.config.reconnectInterval * Math.min(this.reconnectAttempts, 5)); // Exponential backoff } + /** + * Schedule periodic reconnection attempts (never give up) + */ + private schedulePeriodicReconnect(): void { + this.setState(ConnectionState.RECONNECTING); + + // Use longer intervals for periodic retries (5-30 minutes) + const periodicInterval = Math.min(300000 + (this.reconnectAttempts * 60000), 1800000); // 5min to 30min + + this.emit('reconnecting', this.reconnectAttempts + 1, Infinity); + this.logger.info('Scheduling periodic reconnection', { + attempt: this.reconnectAttempts + 1, + delay: periodicInterval, + queueSize: this.sessionQueue.length + }); + + this.reconnectTimer = setTimeout(() => { + this.reconnectTimer = null; + this.reconnectAttempts++; + this.stats.reconnectionAttempts++; + + this.connect() + .then(() => { + this.emit('reconnected'); + this.logger.info('Periodic reconnection successful'); + this.reconnectAttempts = 0; // Reset on success + }) + .catch((error) => { + this.logger.error('Periodic reconnection failed', error); + // Continue periodic retries + this.schedulePeriodicReconnect(); + }); + }, periodicInterval); + } + /** * Start heartbeat timer */ @@ -940,7 +1025,7 @@ export class SMTPOverWSClient extends EventEmitter { } /** - * Reject all queued sessions + * Reject all queued sessions (only used during shutdown) */ private rejectAllQueuedSessions(error: Error): void { if (this.currentSession) { @@ -954,6 +1039,22 @@ export class SMTPOverWSClient extends EventEmitter { } } + /** + * Re-queue current session for retry (never reject emails due to connection issues) + */ + private requeueCurrentSession(): void { + if (this.currentSession) { + this.logger.info('Re-queuing current session due to connection failure', { + sessionId: this.currentSession.id, + retriesLeft: this.currentSession.retries + }); + + // Put the current session back at the front of the queue + this.insertSessionByPriority(this.currentSession); + this.currentSession = null; + } + } + /** * Generate unique session ID @@ -1129,6 +1230,27 @@ export class SMTPOverWSClient extends EventEmitter { }); } + /** + * Check if an error is connection-related + */ + private isConnectionError(error: Error): boolean { + const connectionErrorMessages = [ + 'WebSocket is not connected', + 'Connection closed', + 'Connection lost', + 'WebSocket connection failed', + 'Network error', + 'ENOTFOUND', + 'ECONNREFUSED', + 'ETIMEDOUT', + 'ECONNRESET' + ]; + + return connectionErrorMessages.some(msg => + error.message.toLowerCase().includes(msg.toLowerCase()) + ) || error.name === 'ConnectionError' || error.name === 'NetworkError'; + } + // Type-safe event emitter methods public on(event: K, listener: ClientEvents[K]): this { return super.on(event, listener);