diff --git a/package.json b/package.json index 9d1fb9a..ec76db8 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@siwats/mxrelay-consumer", - "version": "1.2.5", + "version": "1.2.7", "description": "An internal TypeScript client library for transporting SMTP messages", "main": "lib/index.js", "module": "lib/index.esm.js", diff --git a/src/client.ts b/src/client.ts index 44f08e2..dbc5854 100644 --- a/src/client.ts +++ b/src/client.ts @@ -82,10 +82,12 @@ export class SMTPOverWSClient extends EventEmitter { private reconnectTimer: NodeJS.Timeout | null = null; private authTimer: NodeJS.Timeout | null = null; private channelTimer: NodeJS.Timeout | null = null; + private channelCloseTimer: NodeJS.Timeout | null = null; private heartbeatTimer: NodeJS.Timeout | null = null; private sessionTimer: NodeJS.Timeout | null = null; private isProcessingQueue = false; private isShuttingDown = false; + private connectionPromise: Promise | null = null; private logger: Logger; private stats: ClientStats; private connectionStartTime: number = 0; @@ -314,9 +316,17 @@ export class SMTPOverWSClient extends EventEmitter { let failed = 0; try { - // Connect if not connected - if (this.state === ConnectionState.DISCONNECTED) { - await this.connect(); + // Connect if not connected - use connection promise to prevent race conditions + if (this.state === ConnectionState.DISCONNECTED || this.state === ConnectionState.FAILED) { + if (!this.connectionPromise) { + this.connectionPromise = this.connect().finally(() => { + this.connectionPromise = null; + }); + } + await this.connectionPromise; + } else if (this.connectionPromise) { + // Wait for ongoing connection attempt to complete + await this.connectionPromise; } // Process sessions sequentially one at a time (SMTP sessions are mutex) @@ -789,6 +799,11 @@ export class SMTPOverWSClient extends EventEmitter { return new Promise((resolve) => { if (this.state === ConnectionState.CHANNEL_READY) { const onChannelClosed = () => { + // Clear the fallback timer since channel closed properly + if (this.channelCloseTimer) { + clearTimeout(this.channelCloseTimer); + this.channelCloseTimer = null; + } this.setState(ConnectionState.AUTHENTICATED); this.emit('channelClosed'); this.logger.debug('SMTP channel closed'); @@ -798,9 +813,14 @@ export class SMTPOverWSClient extends EventEmitter { this.once('smtp_channel_closed', onChannelClosed); // Fallback timeout - setTimeout(() => { + this.channelCloseTimer = setTimeout(() => { this.removeListener('smtp_channel_closed', onChannelClosed); - this.setState(ConnectionState.AUTHENTICATED); + // Only set to authenticated if we're still connected + if (this.ws && this.ws.readyState === WebSocket.OPEN && + this.state !== ConnectionState.DISCONNECTED) { + this.setState(ConnectionState.AUTHENTICATED); + } + this.channelCloseTimer = null; resolve(); }, 5000); } else { @@ -883,6 +903,7 @@ export class SMTPOverWSClient extends EventEmitter { this.emit('disconnected', reason); this.stopHeartbeat(); this.clearTimers(); + this.connectionPromise = null; // Always try to reconnect if we have queued sessions and not shutting down // NEVER reject emails - keep trying with periodic retries @@ -1010,6 +1031,11 @@ export class SMTPOverWSClient extends EventEmitter { this.channelTimer = null; } + if (this.channelCloseTimer) { + clearTimeout(this.channelCloseTimer); + this.channelCloseTimer = null; + } + if (this.sessionTimer) { clearTimeout(this.sessionTimer); this.sessionTimer = null;