From 069540e3104be3d7e6ab12f493770bb1d0153408 Mon Sep 17 00:00:00 2001 From: Siwat Sirichai Date: Tue, 19 Aug 2025 13:53:20 +0700 Subject: [PATCH] feat: Refactor SMTP client to support session-based email processing, enhancing queue management and error handling --- .gitignore | 2 + README.md | 719 ++++++++++++++++++++++++++++++----------------- package.json | 12 +- src/client.ts | 506 ++++++++++++++++++++++----------- src/index.ts | 7 +- src/transport.ts | 72 +++-- src/types.ts | 113 ++++++-- 7 files changed, 951 insertions(+), 480 deletions(-) diff --git a/.gitignore b/.gitignore index d0c3c16..98f520a 100644 --- a/.gitignore +++ b/.gitignore @@ -6,6 +6,8 @@ yarn-error.log* # Build outputs lib/ +lib-esm/ + dist/ *.tsbuildinfo diff --git a/README.md b/README.md index 3a58562..35173c1 100644 --- a/README.md +++ b/README.md @@ -1,122 +1,232 @@ -# @siwatsystem/mxrelay-consumer +# @siwats/mxrelay-consumer [![TypeScript](https://img.shields.io/badge/%3C%2F%3E-TypeScript-%230074c1.svg)](http://www.typescriptlang.org/) [![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT) -A production-ready TypeScript client library for SMTP over WebSocket protocol with intelligent queue management, automatic reconnection, and comprehensive error handling. Includes full Nodemailer transport compatibility. +A production-ready TypeScript client library for SMTP over WebSocket protocol with **session-based queue management**, automatic reconnection, and comprehensive error handling. Features proper SMTP session protocol implementation with mutex-based session locking. -## Features +## ๐Ÿš€ Key Features -**Intelligent Queue Management** -- Automatic WebSocket connection when messages are queued -- Priority-based message processing (CRITICAL > HIGH > NORMAL > LOW) -- Configurable queue limits and overflow protection -- Auto-disconnect when queue is empty +### **Session-Based SMTP Protocol** +- **Proper SMTP Sessions**: One session = EHLO โ†’ AUTH โ†’ Multiple Emails โ†’ QUIT +- **Session Mutex**: Only one SMTP session active at a time (protocol compliant) +- **Email Batching**: Send multiple emails efficiently within a single session +- **Resource Optimization**: Connect only when needed, disconnect when idle -**Robust Connection Handling** -- Automatic reconnection with exponential backoff -- Connection state management and lifecycle -- Heartbeat monitoring and timeout handling -- Graceful connection recovery +### **Intelligent Queue Management** +- **Session Queuing**: Queues complete email sessions, not individual commands +- **Priority-Based Processing**: CRITICAL > HIGH > NORMAL > LOW priority levels +- **Configurable Batching**: Control emails per session with `maxEmailsPerSession` +- **Smart Resource Usage**: Auto-connect/disconnect based on queue state -**High Performance** -- Efficient SMTP channel cycling per message -- Minimal resource usage with smart connection management -- Concurrent message processing support -- Optimized WebSocket communication +### **Production-Ready Reliability** +- **Comprehensive Error Handling**: Structured error classification and meaningful messages +- **Automatic Reconnection**: Exponential backoff with configurable retry limits +- **Timeout Management**: Session, email, and connection-level timeouts +- **Connection State Management**: Full lifecycle state tracking -**Enterprise-Grade Reliability** -- Comprehensive SMTP error handling with meaningful messages -- Timeout management for all operations -- Retry logic with configurable attempts -- Structured error classification +### **Nodemailer Integration** +- **Full Nodemailer Compatibility**: Drop-in replacement for existing transports +- **Standard API**: Use familiar Nodemailer methods and options +- **Advanced Features**: Attachments, HTML, multipart messages, custom headers -**Nodemailer Integration** -- Full Nodemailer transport compatibility -- Transparent bridge for all email features -- Support for attachments, HTML, multipart messages -- Standard Nodemailer API compatibility +## ๐Ÿ“Š Architecture Overview -## Installation - -```bash -npm install @siwatsystem/mxrelay-consumer -# or -bun add @siwatsystem/mxrelay-consumer +```mermaid +graph TB + subgraph "Client Application" + App[Your Application] + NM[Nodemailer] + Transport[SMTPWSTransport] + end + + subgraph "Session Management" + Client[SMTPOverWSClient] + Queue[Session Queue] + SM[Session Manager] + end + + subgraph "Protocol Layer" + WS[WebSocket Connection] + Auth[Authentication] + Channel[SMTP Channel] + end + + subgraph "Remote Server" + Relay[SMTP WebSocket Relay] + SMTP[SMTP Server] + end + + App --> NM + NM --> Transport + Transport --> Client + Client --> Queue + Queue --> SM + SM --> WS + WS --> Auth + Auth --> Channel + Channel --> Relay + Relay --> SMTP + + style Client fill:#e1f5fe + style Queue fill:#f3e5f5 + style SM fill:#fff3e0 + style Channel fill:#e8f5e8 ``` -## Quick Start +## ๐Ÿ”„ Session Processing Flow -### Direct Client Usage +```mermaid +sequenceDiagram + participant App as Application + participant Client as SMTPOverWSClient + participant Queue as Session Queue + participant WS as WebSocket + participant Server as SMTP Server + + Note over App,Server: Email Submission + App->>Client: queueEmail(from, to[], data) + Client->>Queue: Add to session or create new + + Note over App,Server: Session Processing (Mutex) + Queue->>Client: Process next session + Client->>WS: Connect (if needed) + WS->>Server: WebSocket connection + Server-->>WS: Connection established + + Client->>Server: AUTHENTICATE + Server-->>Client: AUTH_SUCCESS + + Client->>Server: SMTP_CHANNEL_OPEN + Server-->>Client: SMTP_CHANNEL_READY + Server-->>Client: 220 SMTP ready + + Client->>Server: EHLO client + Server-->>Client: 250 EHLO OK + + Client->>Server: AUTH PLAIN + Server-->>Client: 235 AUTH OK + + loop For each email in session + Client->>Server: MAIL FROM: + Server-->>Client: 250 Sender OK + + Client->>Server: RCPT TO: + Server-->>Client: 250 Recipient OK + + Client->>Server: DATA + Server-->>Client: 354 Start data + + Client->>Server: \r\n. + Server-->>Client: 250 Message accepted + + Client->>App: Email resolved + end + + Client->>Server: QUIT + Server-->>Client: 221 Goodbye + + Client->>Server: SMTP_CHANNEL_CLOSE + WS->>WS: Disconnect (if queue empty) + + Client->>App: Session completed +``` + +## ๐Ÿ“ฆ Installation + +```bash +npm install @siwats/mxrelay-consumer +# or +bun add @siwats/mxrelay-consumer +``` + +## ๐Ÿš€ Quick Start + +### Direct Client Usage (New Session-Based API) ```typescript -import { SMTPOverWSClient } from '@siwatsystem/mxrelay-consumer'; +import { SMTPOverWSClient } from '@siwats/mxrelay-consumer'; const client = new SMTPOverWSClient({ url: 'wss://api.siwatsystem.com/smtp', apiKey: 'your-api-key', - debug: true + debug: true, + maxEmailsPerSession: 5, // Batch up to 5 emails per session + sessionTimeout: 300000 // 5 minute session timeout }); -// Send SMTP commands directly +// Queue individual emails - they'll be batched into sessions automatically try { - const response = await client.sendSMTPCommand(` - MAIL FROM: - RCPT TO: - DATA - Subject: Test Email - - Hello from SMTP over WebSocket! - . - QUIT - `); - console.log('Email sent:', response); + // These emails will be sent in the same SMTP session + const emailId1 = await client.queueEmail( + 'sender@example.com', + ['recipient1@example.com'], + 'Subject: Email 1\r\n\r\nFirst email content' + ); + + const emailId2 = await client.queueEmail( + 'sender@example.com', + ['recipient2@example.com'], + 'Subject: Email 2\r\n\r\nSecond email content' + ); + + console.log('Emails queued:', emailId1, emailId2); } catch (error) { - console.error('SMTP error:', error.message); + console.error('Email error:', error.message); } finally { await client.shutdown(); } ``` -### Nodemailer Transport +### Nodemailer Transport (Recommended) ```typescript import nodemailer from 'nodemailer'; -import { createTransport } from '@siwatsystem/mxrelay-consumer'; +import { createTransport } from '@siwats/mxrelay-consumer'; -// Create transport (uses defaults: api.siwatsystem.com:443 secure) -const transport = createTransport('your-api-key'); - -// Or with custom options +// Create transport with session batching const transport = createTransport('your-api-key', { - host: 'custom.server.com', - port: 80, - secure: false, - debug: true + host: 'api.siwatsystem.com', + port: 443, + secure: true, + debug: true, + maxEmailsPerSession: 10, // Batch up to 10 emails per session + sessionBatchTimeout: 2000 // Wait 2 seconds to batch emails }); const transporter = nodemailer.createTransporter(transport); -// Send email using standard Nodemailer API -const info = await transporter.sendMail({ - from: 'sender@example.com', - to: 'recipient@example.com', - subject: 'Test Email via SMTP WebSocket', - text: 'Plain text version', - html: '

HTML version

', - attachments: [ - { - filename: 'document.pdf', - path: './document.pdf' - } - ] -}); +// Send multiple emails - automatically batched into sessions +const emails = [ + { + from: 'sender@example.com', + to: 'user1@example.com', + subject: 'Welcome Email', + html: '

Welcome to our service!

' + }, + { + from: 'sender@example.com', + to: 'user2@example.com', + subject: 'Newsletter', + html: '

Monthly Newsletter

', + attachments: [{ filename: 'report.pdf', path: './report.pdf' }] + } +]; + +// Send all emails - they'll be efficiently batched into SMTP sessions +for (const email of emails) { + try { + const info = await transporter.sendMail(email); + console.log('Email sent:', info.messageId); + } catch (error) { + console.error('Failed to send email:', error.message); + } +} -console.log('Message sent:', info.messageId); await transport.close(); ``` -## Configuration +## โš™๏ธ Configuration ### Client Configuration @@ -124,172 +234,348 @@ await transport.close(); interface SMTPClientConfig { url: string; // WebSocket server URL apiKey: string; // Authentication API key + + // Session Management + sessionTimeout?: number; // Session timeout (default: 300000ms) + maxEmailsPerSession?: number; // Emails per session (default: 10) + sessionBatchTimeout?: number; // Batch wait time (default: 1000ms) + + // Connection Management debug?: boolean; // Enable debug logging (default: false) - maxQueueSize?: number; // Queue capacity limit (default: 1000) + maxQueueSize?: number; // Session queue limit (default: 100) reconnectInterval?: number; // Reconnect delay (default: 5000ms) maxReconnectAttempts?: number; // Max retry attempts (default: 10) authTimeout?: number; // Auth timeout (default: 30000ms) channelTimeout?: number; // Channel timeout (default: 10000ms) - messageTimeout?: number; // Message timeout (default: 60000ms) + messageTimeout?: number; // SMTP command timeout (default: 60000ms) heartbeatInterval?: number; // Heartbeat interval (default: 30000ms) - maxConcurrentMessages?: number; // Concurrent limit (default: 1) } ``` ### Transport Configuration ```typescript -interface TransportOptions { +interface TransportOptions extends Omit { host?: string; // Server host (default: 'api.siwatsystem.com') port?: number; // Server port (default: 443) secure?: boolean; // Use wss:// (default: true) - debug?: boolean; // Enable debug mode (default: false) - // ... other SMTPClientConfig options + apiKey?: string; // API key for authentication } ``` -## Advanced Usage +## ๐ŸŽฏ Advanced Usage -### Priority-Based Messaging +### Session Priority and Options ```typescript -import { MessagePriority } from '@siwatsystem/mxrelay-consumer'; +import { MessagePriority, SessionSendOptions } from '@siwats/mxrelay-consumer'; -// High priority (processed first) -await client.sendSMTPCommand('URGENT EMAIL DATA', { - priority: MessagePriority.HIGH, - timeout: 30000 -}); +// High priority email (processed first) +await client.queueEmail( + 'urgent@company.com', + ['admin@company.com'], + 'Subject: URGENT ALERT\r\n\r\nSystem down!', + { + priority: MessagePriority.HIGH, + timeout: 30000, + retries: 5 + } +); -// Critical priority (highest) -await client.sendSMTPCommand('CRITICAL ALERT EMAIL', { - priority: MessagePriority.CRITICAL -}); +// Critical priority email (highest priority) +await client.queueEmail( + 'security@company.com', + ['security-team@company.com'], + 'Subject: SECURITY BREACH\r\n\r\nImmediate action required!', + { + priority: MessagePriority.CRITICAL, + immediate: true // Skip batching, send immediately + } +); ``` -### Event Monitoring +### Session Event Monitoring ```typescript +// Session-level events +client.on('sessionQueued', (sessionId, queueSize) => { + console.log(`Session ${sessionId} queued. Queue size: ${queueSize}`); +}); + +client.on('sessionStarted', (sessionId, emailCount) => { + console.log(`Processing session ${sessionId} with ${emailCount} emails`); +}); + +client.on('sessionCompleted', (sessionId, results) => { + console.log(`Session ${sessionId} completed with ${results.length} emails`); +}); + +// Email-level events within sessions +client.on('emailProcessed', (sessionId, emailId, responseTime) => { + console.log(`Email ${emailId} in session ${sessionId} processed in ${responseTime}ms`); +}); + +client.on('emailFailed', (sessionId, emailId, error) => { + console.error(`Email ${emailId} in session ${sessionId} failed:`, error.message); +}); + // Connection events client.on('connected', () => console.log('WebSocket connected')); client.on('authenticated', () => console.log('Authentication successful')); client.on('disconnected', () => console.log('Connection lost')); - -// Queue events -client.on('messageQueued', (messageId, queueSize) => { - console.log(`Message ${messageId} queued. Queue size: ${queueSize}`); -}); - -client.on('messageProcessed', (messageId, responseTime) => { - console.log(`Message ${messageId} processed in ${responseTime}ms`); -}); - -// Error events -client.on('error', (error) => console.error('Client error:', error)); ``` -### Statistics and Monitoring +### Session Statistics ```typescript const stats = client.getStats(); -console.log('Client Statistics:', { - messagesQueued: stats.messagesQueued, - messagesProcessed: stats.messagesProcessed, - messagesFailed: stats.messagesFailed, - averageResponseTime: stats.averageResponseTime, +console.log('Session Statistics:', { + sessionsQueued: stats.sessionsQueued, + sessionsProcessed: stats.sessionsProcessed, + sessionsFailed: stats.sessionsFailed, + emailsProcessed: stats.emailsProcessed, + emailsFailed: stats.emailsFailed, + averageSessionTime: stats.averageSessionTime, + averageEmailTime: stats.averageEmailTime, queueSize: stats.queueSize }); + +console.log('Session State:', client.getSessionState()); // IDLE, PROCESSING, FAILED +console.log('Connection State:', client.getConnectionState()); // DISCONNECTED, CONNECTED, etc. ``` -## Error Handling +## ๐Ÿ›ก๏ธ Error Handling -### SMTP Error Detection - -The transport properly detects and categorizes SMTP errors: +### SMTP Session Errors ```typescript try { - await transporter.sendMail({ - from: 'unauthorized@domain.com', // Invalid sender - to: 'recipient@example.com', - subject: 'Test' - }); + await client.queueEmail( + 'invalid-sender@domain.com', + ['recipient@example.com'], + 'Subject: Test\r\n\r\nTest message' + ); } catch (error) { - console.error('SMTP Error:', error.message); - // Output: "Sender not authorized: Sender domain not authorized for your IP or subnet" - - console.log('Error details:', { - smtpCode: error.context.smtpCode, // "550" - rejectedRecipients: error.context.rejectedRecipients - }); -} -``` - -### Error Classification - -```typescript -import { - ConnectionError, - AuthenticationError, - MessageError, - TimeoutError -} from '@siwatsystem/mxrelay-consumer'; - -try { - await client.sendSMTPCommand('MAIL FROM: '); -} catch (error) { - if (error instanceof ConnectionError) { - console.error('Connection failed:', error.message); - } else if (error instanceof AuthenticationError) { - console.error('Authentication failed:', error.message); - } else if (error instanceof MessageError) { - console.error('SMTP error:', error.message, 'Code:', error.context.smtpCode); - } else if (error instanceof TimeoutError) { - console.error('Operation timed out:', error.message); + if (error instanceof MessageError) { + console.error('SMTP Error:', error.message); + console.log('Email ID:', error.messageId); + console.log('Retry Count:', error.retryCount); } } ``` -## Connection States +### Session State Management -The client manages connection states automatically: +The client manages session states automatically: -- `DISCONNECTED` - No connection -- `CONNECTING` - Establishing WebSocket connection -- `CONNECTED` - WebSocket connected, authentication pending -- `AUTHENTICATING` - Sending credentials -- `AUTHENTICATED` - Ready for SMTP operations -- `CHANNEL_OPENING` - Opening SMTP channel -- `CHANNEL_READY` - SMTP channel active -- `CHANNEL_CLOSED` - SMTP channel closed -- `RECONNECTING` - Attempting reconnection -- `FAILED` - Connection failed, max retries reached +```mermaid +stateDiagram-v2 + [*] --> IDLE + IDLE --> PROCESSING: Session queued + PROCESSING --> IDLE: Session completed + PROCESSING --> FAILED: Session error + FAILED --> PROCESSING: Retry session + FAILED --> IDLE: Max retries reached + IDLE --> [*]: Client shutdown +``` -## Development +- `IDLE` - No active session, ready to process +- `PROCESSING` - Session currently being processed (mutex locked) +- `FAILED` - Session failed, will retry or reject +- `COMPLETED` - Session successfully completed -This project uses Bun as the primary runtime. TypeScript files can be run directly without building. +### Connection States + +```mermaid +stateDiagram-v2 + [*] --> DISCONNECTED + DISCONNECTED --> CONNECTING: Connect requested + CONNECTING --> CONNECTED: WebSocket established + CONNECTED --> AUTHENTICATING: Send credentials + AUTHENTICATING --> AUTHENTICATED: Auth successful + AUTHENTICATED --> CHANNEL_OPENING: Open SMTP channel + CHANNEL_OPENING --> CHANNEL_READY: Channel established + CHANNEL_READY --> CHANNEL_CLOSED: Session complete + CHANNEL_CLOSED --> AUTHENTICATED: Ready for next session + AUTHENTICATED --> DISCONNECTED: Queue empty + + CONNECTING --> RECONNECTING: Connection failed + AUTHENTICATING --> RECONNECTING: Auth failed + CHANNEL_OPENING --> RECONNECTING: Channel failed + RECONNECTING --> CONNECTING: Retry attempt + RECONNECTING --> FAILED: Max retries reached + + FAILED --> [*]: Shutdown + DISCONNECTED --> [*]: Shutdown +``` + +## ๐Ÿ”ง Session Queue Architecture + +```mermaid +flowchart TD + subgraph Emails ["๐Ÿ“ง Email Submission"] + E1[Email 1 - HIGH] + E2[Email 2 - NORMAL] + E3[Email 3 - HIGH] + E4[Email 4 - CRITICAL] + end + + subgraph Queue ["๐Ÿ“‹ Session Creation"] + SQ[Session Queue] + S1[Session 1 - CRITICAL] + S2[Session 2 - HIGH] + S3[Session 3 - NORMAL] + end + + subgraph Processing ["โšก Session Processing"] + SP[Session Processor] + SC[SMTP Connection] + SM[Session Manager] + end + + E1 --> SQ + E2 --> SQ + E3 --> SQ + E4 --> SQ + + SQ --> S1 + SQ --> S2 + SQ --> S3 + + S1 --> SP + S2 --> SP + S3 --> SP + + SP --> SC + SC --> SM + + style S1 fill:#ffcdd2 + style S2 fill:#fff3e0 + style S3 fill:#e8f5e8 + style SP fill:#e1f5fe + style SC fill:#f3e5f5 +``` + +## ๐Ÿญ Production Configuration + +### Optimal Settings + +```typescript +const client = new SMTPOverWSClient({ + url: 'wss://api.siwatsystem.com/smtp', + apiKey: process.env.MXRELAY_API_KEY, + + // Production optimizations + debug: false, + maxEmailsPerSession: 20, // Batch more emails per session + sessionBatchTimeout: 5000, // Wait longer to accumulate emails + sessionTimeout: 600000, // 10 minute session timeout + maxQueueSize: 1000, // Higher queue capacity + reconnectInterval: 15000, // Longer reconnect delay + maxReconnectAttempts: 3, // Fewer retries in production + messageTimeout: 120000 // 2 minute SMTP timeout +}); +``` + +### Graceful Shutdown + +```typescript +process.on('SIGTERM', async () => { + console.log('Shutting down SMTP client...'); + try { + // Wait for current sessions to complete + await client.shutdown(60000); // 60 second timeout + console.log('SMTP client shutdown complete'); + } catch (error) { + console.error('Forced SMTP client shutdown'); + } + process.exit(0); +}); +``` + +## ๐Ÿ“š API Reference + +### SMTPOverWSClient + +#### Methods +- `queueEmail(from, to[], data, options?)` - Queue email for session processing +- `getStats()` - Get session and email statistics +- `getConnectionState()` - Get current connection state +- `getSessionState()` - Get current session processing state +- `getQueueSize()` - Get session queue size +- `shutdown(timeout?)` - Graceful shutdown with session completion + +#### Events +- **Session Events**: `sessionQueued`, `sessionStarted`, `sessionCompleted`, `sessionFailed` +- **Email Events**: `emailProcessed`, `emailFailed` +- **Connection Events**: `connecting`, `connected`, `authenticated`, `disconnected`, `reconnecting` +- **Queue Events**: `queueProcessingStarted`, `queueProcessingCompleted` +- **State Events**: `stateChanged`, `error` + +### createTransport(apiKey, options?) + +Creates a Nodemailer-compatible transport with session batching. + +- `apiKey` - Your API key for authentication +- `options` - Optional transport and session configuration + +## ๐Ÿ” Debugging + +### Enable Debug Logging + +```typescript +const client = new SMTPOverWSClient({ + url: 'wss://api.siwatsystem.com/smtp', + apiKey: 'your-api-key', + debug: true // Enable comprehensive logging +}); + +// Debug output includes: +// - Session creation and batching +// - SMTP protocol commands and responses +// - Connection state transitions +// - Queue processing details +// - Error stack traces +``` + +### Monitor Session Processing + +```typescript +client.on('sessionStarted', (sessionId, emailCount) => { + console.log(`๐Ÿš€ Starting session ${sessionId} with ${emailCount} emails`); +}); + +client.on('emailProcessed', (sessionId, emailId, responseTime) => { + console.log(`โœ… Email ${emailId} processed in ${responseTime}ms`); +}); + +client.on('sessionCompleted', (sessionId, results) => { + const successful = results.filter(r => r.success).length; + const failed = results.filter(r => !r.success).length; + console.log(`๐ŸŽ‰ Session ${sessionId} complete: ${successful} sent, ${failed} failed`); +}); +``` + +## ๐Ÿงช Development ### Setup ```bash # Clone repository -git clone https://git.siwatsystem.com/siwat/mxrelay-consumer.git +git clone https://git.siwatsystem.com/siwatsystem-public/mxrelay-consumer.git cd mxrelay-consumer # Install dependencies bun install -# Run examples directly +# Run examples directly (TypeScript) bun run examples/nodemailer-transport.ts - -# Run with environment variable -MXRELAY_API_KEY=your-key bun run examples/nodemailer-transport.ts ``` ### Build & Test ```bash -# Build (optional - bun runs TypeScript directly) +# Build all formats bun run build # Run tests @@ -302,90 +588,15 @@ bun run lint bun run format ``` -## Protocol Implementation - -### WebSocket Message Types - -- `AUTHENTICATE` / `AUTHENTICATE_RESPONSE` - Authentication flow -- `SMTP_CHANNEL_OPEN` / `SMTP_CHANNEL_READY` - Channel management -- `SMTP_CHANNEL_CLOSED` / `SMTP_CHANNEL_ERROR` - Channel lifecycle -- `SMTP_TO_SERVER` / `SMTP_FROM_SERVER` - SMTP data exchange - -### Connection Flow - -1. **WebSocket Connection** - Connect to relay server -2. **Authentication** - Authenticate using API key -3. **Channel Management** - Open SMTP channel per message -4. **Data Transfer** - Exchange SMTP commands and responses -5. **Cleanup** - Close channel and disconnect when queue empty - -## Best Practices - -### Production Configuration - -```typescript -const client = new SMTPOverWSClient({ - url: 'wss://api.siwatsystem.com/smtp', - apiKey: process.env.MXRELAY_API_KEY, - - // Production settings - debug: false, - maxQueueSize: 5000, - reconnectInterval: 10000, - maxReconnectAttempts: 5, - messageTimeout: 120000 -}); -``` - -### Graceful Shutdown - -```typescript -process.on('SIGTERM', async () => { - console.log('Shutting down...'); - try { - await client.shutdown(30000); // 30 second timeout - console.log('Shutdown complete'); - } catch (error) { - console.error('Forced shutdown'); - } - process.exit(0); -}); -``` - -## API Reference - -### createTransport(apiKey, options?) - -Creates a Nodemailer-compatible transport. - -- `apiKey` - Your API key for authentication -- `options` - Optional transport configuration - -### SMTPOverWSClient - -Main client class for direct SMTP operations. - -#### Methods -- `sendSMTPCommand(data, options?)` - Send SMTP command -- `getStats()` - Get client statistics -- `getConnectionState()` - Get current state -- `getQueueSize()` - Get queue size -- `shutdown(timeout?)` - Graceful shutdown - -#### Events -- Connection: `connecting`, `connected`, `authenticated`, `disconnected` -- Queue: `messageQueued`, `messageProcessed`, `messageFailed` -- State: `stateChanged`, `error` - -## License +## ๐Ÿ“„ License MIT License - see [LICENSE](LICENSE) file for details. -## Support +## ๐Ÿ†˜ Support -- Issues: [Git Repository Issues](https://git.siwatsystem.com/siwat/mxrelay-consumer/issues) -- Documentation: [Project Repository](https://git.siwatsystem.com/siwat/mxrelay-consumer) +- **Issues**: [Git Repository Issues](https://git.siwatsystem.com/siwatsystem-public/mxrelay-consumer/issues) +- **Documentation**: [Project Repository](https://git.siwatsystem.com/siwatsystem-public/mxrelay-consumer) --- -Built by SiwatSystem \ No newline at end of file +**Built by SiwatSystem** | *Session-based SMTP over WebSocket* \ No newline at end of file diff --git a/package.json b/package.json index b96f87d..c1265e6 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@siwats/mxrelay-consumer", - "version": "1.0.0", + "version": "1.1.0", "description": "An internal TypeScript client library for transporting SMTP messages", "main": "lib/index.js", "module": "lib/index.esm.js", @@ -27,9 +27,9 @@ "postinstall": "npm run build", "build": "bun run build:cjs && bun run build:esm && bun run build:types", "build:cjs": "tsc -p tsconfig.cjs.json", - "build:esm": "echo 'ESM build disabled - use CommonJS for now'", + "build:esm": "tsc -p tsconfig.esm.json && cp lib-esm/index.js lib/index.esm.js && cp lib-esm/types.js lib/types.esm.js", "build:types": "tsc -p tsconfig.types.json", - "clean": "rimraf lib", + "clean": "rimraf lib lib-esm", "prepublishOnly": "npm run clean && npm run build", "lint": "eslint src/**/*.ts && tsc --noEmit", "lint:tsc": "tsc --noEmit", @@ -61,12 +61,12 @@ "license": "MIT", "repository": { "type": "git", - "url": "git+https://github.com/siwats/smtp-ws-relay-client.git" + "url": "git+https://git.siwatsystem.com/siwatsystem-public/mxrelay-consumer.git" }, "bugs": { - "url": "https://github.com/siwats/smtp-ws-relay-client/issues" + "url": "https://git.siwatsystem.com/siwatsystem-public/mxrelay-consumer/issues" }, - "homepage": "https://github.com/siwats/smtp-ws-relay-client#readme", + "homepage": "https://git.siwatsystem.com/siwatsystem-public/mxrelay-consumer#readme", "engines": { "node": ">=16.0.0" }, diff --git a/src/client.ts b/src/client.ts index 806e214..c024429 100644 --- a/src/client.ts +++ b/src/client.ts @@ -8,13 +8,16 @@ import { SMTPOverWsMessage, SMTPOverWsMessageType, ConnectionState, - QueuedMessage, + SMTPSessionRequest, + SMTPEmail, + SessionResult, + SessionState, MessagePriority, SMTPClientConfig, Logger, ClientStats, ClientEvents, - SendOptions, + SessionSendOptions, AuthenticateMessage, SMTPChannelOpenMessage, SMTPToServerMessage, @@ -72,20 +75,22 @@ export class SMTPOverWSClient extends EventEmitter { private config: Required; private ws: WebSocket | null = null; private state: ConnectionState = ConnectionState.DISCONNECTED; - private messageQueue: QueuedMessage[] = []; - private currentMessage: QueuedMessage | null = null; + private sessionQueue: SMTPSessionRequest[] = []; + private currentSession: SMTPSessionRequest | null = null; + private sessionState: SessionState = SessionState.IDLE; private reconnectAttempts = 0; private reconnectTimer: NodeJS.Timeout | null = null; private authTimer: NodeJS.Timeout | null = null; private channelTimer: NodeJS.Timeout | null = null; private heartbeatTimer: NodeJS.Timeout | null = null; - private messageTimer: NodeJS.Timeout | null = null; + private sessionTimer: NodeJS.Timeout | null = null; private isProcessingQueue = false; private isShuttingDown = false; private logger: Logger; private stats: ClientStats; private connectionStartTime: number = 0; - private messageIdCounter = 0; + private sessionIdCounter = 0; + private emailIdCounter = 0; constructor(config: SMTPClientConfig) { super(); @@ -100,10 +105,12 @@ export class SMTPOverWSClient extends EventEmitter { authTimeout: 30000, channelTimeout: 10000, messageTimeout: 60000, - maxConcurrentMessages: 1, + sessionTimeout: 300000, // 5 minutes + maxEmailsPerSession: 10, + sessionBatchTimeout: 1000, debug: false, heartbeatInterval: 30000, - maxQueueSize: 1000, + maxQueueSize: 100, // sessions, not individual messages ...config, logger: config.logger || new DefaultLogger(config.debug ?? false) }; @@ -118,61 +125,50 @@ export class SMTPOverWSClient extends EventEmitter { } /** - * Send SMTP command with automatic queue management + * Queue an email for sending in a session */ - public async sendSMTPCommand(data: string, options: SendOptions = {}): Promise { + public async queueEmail(from: string, to: string[], data: string, options: SessionSendOptions = {}): Promise { if (this.isShuttingDown) { throw new ShutdownError('Client is shutting down'); } // Check queue size limit - if (this.messageQueue.length >= this.config.maxQueueSize) { + if (this.sessionQueue.length >= this.config.maxQueueSize) { throw ErrorFactory.queueError( - `Queue is full (${this.config.maxQueueSize} messages)`, - this.messageQueue.length + `Session queue is full (${this.config.maxQueueSize} sessions)`, + this.sessionQueue.length ); } - const messageId = this.generateMessageId(); + const emailId = this.generateEmailId(); const priority = options.priority ?? MessagePriority.NORMAL; - const timeout = options.timeout ?? this.config.messageTimeout; + const timeout = options.timeout ?? this.config.sessionTimeout; const retries = options.retries ?? 3; return new Promise((resolve, reject) => { - const queuedMessage: QueuedMessage = { - id: messageId, + const email: SMTPEmail = { + id: emailId, + from, + to, data, resolve, - reject, - timestamp: Date.now(), - retries, - priority + reject }; - // Insert message based on priority - this.insertMessageByPriority(queuedMessage); + // Try to add to existing session or create new one + this.addEmailToSession(email, priority, timeout, retries, options); - this.stats.messagesQueued++; - this.emit('messageQueued', messageId, this.messageQueue.length); - - this.logger.debug('Message queued', { - messageId, + this.logger.debug('Email queued', { + emailId, + from, + to: to.length, priority, - queueSize: this.messageQueue.length, - data: data.substring(0, 100) + (data.length > 100 ? '...' : '') + queueSize: this.sessionQueue.length, + dataSize: data.length }); - // Set message timeout - setTimeout(() => { - if (this.messageQueue.includes(queuedMessage) || this.currentMessage === queuedMessage) { - const error = ErrorFactory.timeout('Message', timeout, { messageId }); - this.removeMessageFromQueue(messageId); - reject(error); - } - }, timeout); - // Start processing if not already running - if (!this.isProcessingQueue) { + if (!this.isProcessingQueue && this.sessionState === SessionState.IDLE) { this.processQueue().catch(error => { this.logger.error('Queue processing failed', error); }); @@ -180,13 +176,44 @@ export class SMTPOverWSClient extends EventEmitter { }); } + /** + * Legacy method for backward compatibility + * @deprecated Use queueEmail instead + */ + public async sendSMTPCommand(data: string, options: any = {}): Promise { + // Try to parse basic SMTP transaction + const lines = data.split('\r\n').filter(line => line.trim()); + let from = ''; + let to: string[] = []; + let messageData = ''; + + for (const line of lines) { + if (line.startsWith('MAIL FROM:')) { + from = line.match(/<([^>]+)>/)?.[1] || ''; + } else if (line.startsWith('RCPT TO:')) { + const recipient = line.match(/<([^>]+)>/)?.[1]; + if (recipient) to.push(recipient); + } else if (line === 'DATA') { + // Everything after DATA is message content + messageData = lines.slice(lines.indexOf(line) + 1, lines.indexOf('QUIT')).join('\r\n'); + break; + } + } + + if (!from || to.length === 0) { + throw new MessageError('Invalid SMTP transaction format', 'parse-error', 0); + } + + return this.queueEmail(from, to, messageData, options); + } + /** * Get current client statistics */ public getStats(): ClientStats { return { ...this.stats, - queueSize: this.messageQueue.length, + queueSize: this.sessionQueue.length, connectionUptime: this.connectionStartTime > 0 ? Date.now() - this.connectionStartTime : 0 }; } @@ -202,22 +229,32 @@ export class SMTPOverWSClient extends EventEmitter { * Get current queue size */ public getQueueSize(): number { - return this.messageQueue.length; + return this.sessionQueue.length; + } + + /** + * Get current session state + */ + public getSessionState(): SessionState { + return this.sessionState; } /** - * Clear all queued messages + * Clear all queued sessions */ public clearQueue(): void { - const clearedCount = this.messageQueue.length; + const clearedCount = this.sessionQueue.length; - // Reject all queued messages - for (const message of this.messageQueue) { - message.reject(new QueueError('Queue cleared')); + // Reject all queued sessions + for (const session of this.sessionQueue) { + for (const email of session.emails) { + email.reject(new QueueError('Queue cleared')); + } + session.reject(new QueueError('Queue cleared')); } - this.messageQueue = []; - this.logger.info('Queue cleared', { clearedCount }); + this.sessionQueue = []; + this.logger.info('Session queue cleared', { clearedCount }); } /** @@ -229,7 +266,7 @@ export class SMTPOverWSClient extends EventEmitter { } this.isShuttingDown = true; - this.logger.info('Initiating client shutdown', { queueSize: this.messageQueue.length }); + this.logger.info('Initiating client shutdown', { queueSize: this.sessionQueue.length }); // Stop accepting new messages and clear timers this.clearTimers(); @@ -248,8 +285,8 @@ export class SMTPOverWSClient extends EventEmitter { this.logger.warn('Shutdown timeout reached, forcing shutdown', error); } - // Reject any remaining messages - this.rejectAllQueuedMessages(new ShutdownError('Client shutting down')); + // Reject any remaining sessions + this.rejectAllQueuedSessions(new ShutdownError('Client shutting down')); // Close connection await this.disconnect(); @@ -261,17 +298,18 @@ export class SMTPOverWSClient extends EventEmitter { } /** - * Process the message queue + * Process the session queue */ private async processQueue(): Promise { - if (this.isProcessingQueue || this.messageQueue.length === 0 || this.isShuttingDown) { + if (this.isProcessingQueue || this.sessionQueue.length === 0 || this.isShuttingDown || this.sessionState !== SessionState.IDLE) { return; } this.isProcessingQueue = true; - this.emit('queueProcessingStarted', this.messageQueue.length); + this.sessionState = SessionState.PROCESSING; + this.emit('queueProcessingStarted', this.sessionQueue.length); - this.logger.info('Queue processing started', { queueSize: this.messageQueue.length }); + this.logger.info('Session queue processing started', { queueSize: this.sessionQueue.length }); let processed = 0; let failed = 0; @@ -282,111 +320,220 @@ export class SMTPOverWSClient extends EventEmitter { await this.connect(); } - // Process messages sequentially one at a time - while (this.messageQueue.length > 0 && !this.isShuttingDown) { - const message = this.messageQueue.shift()!; + // Process sessions sequentially one at a time (SMTP sessions are mutex) + while (this.sessionQueue.length > 0 && !this.isShuttingDown) { + const session = this.sessionQueue.shift()!; try { - this.currentMessage = message; + this.currentSession = session; const startTime = Date.now(); - await this.processMessage(message); + await this.processSession(session); - const responseTime = Date.now() - startTime; - this.stats.messagesProcessed++; - this.stats.averageResponseTime = - (this.stats.averageResponseTime * (this.stats.messagesProcessed - 1) + responseTime) / - this.stats.messagesProcessed; + const sessionTime = Date.now() - startTime; + this.stats.sessionsProcessed++; + this.stats.averageSessionTime = + (this.stats.averageSessionTime * (this.stats.sessionsProcessed - 1) + sessionTime) / + this.stats.sessionsProcessed; - this.emit('messageProcessed', message.id, responseTime); - this.logger.debug('Message processed successfully', { - messageId: message.id, - responseTime + this.emit('sessionCompleted', session.id, []); + this.logger.debug('Session processed successfully', { + sessionId: session.id, + emailCount: session.emails.length, + sessionTime }); processed++; } catch (error) { - this.stats.messagesFailed++; + this.stats.sessionsFailed++; this.stats.lastError = (error as Error).message; this.stats.lastErrorTime = new Date(); - this.emit('messageFailed', message.id, error as Error); - this.logger.error('Message processing failed', { - messageId: message.id, + this.emit('sessionFailed', session.id, error as Error); + this.logger.error('Session processing failed', { + sessionId: session.id, error: (error as Error).message }); failed++; - message.reject(error as Error); + session.reject(error as Error); } finally { - this.currentMessage = null; + this.currentSession = null; } } } catch (error) { this.logger.error('Queue processing error', error); - this.rejectAllQueuedMessages(error as Error); - failed += this.messageQueue.length; + this.rejectAllQueuedSessions(error as Error); + failed += this.sessionQueue.length; } finally { this.isProcessingQueue = false; + this.sessionState = SessionState.IDLE; // Disconnect if queue is empty and not shutting down - if (this.messageQueue.length === 0 && !this.isShuttingDown) { + if (this.sessionQueue.length === 0 && !this.isShuttingDown) { await this.disconnect(); } this.emit('queueProcessingCompleted', processed, failed); - this.logger.info('Queue processing completed', { processed, failed }); + this.logger.info('Session queue processing completed', { processed, failed }); } } /** - * Process a single message + * Process a single SMTP session with multiple emails */ - private async processMessage(message: QueuedMessage): Promise { - return new Promise(async (resolve, reject) => { - try { - // Open SMTP channel - await this.openSMTPChannel(); - - // Wait for SMTP response - const responsePromise = this.waitForSMTPResponse(message.id); - - // Send SMTP data - this.sendSMTPData(message.data); - - // Wait for response - const response = await responsePromise; - message.resolve(response); - - // Close SMTP channel - await this.closeSMTPChannel(); - - resolve(); - - } catch (error) { - // Retry logic - if (message.retries > 0) { - message.retries--; - this.logger.debug('Retrying message', { - messageId: message.id, - retriesLeft: message.retries + private async processSession(session: SMTPSessionRequest): Promise { + this.logger.info('Processing SMTP session', { + sessionId: session.id, + emailCount: session.emails.length + }); + + this.emit('sessionStarted', session.id, session.emails.length); + + try { + // Open SMTP channel once for the entire session + await this.openSMTPChannel(); + + const results: SessionResult[] = []; + + // Process each email in the session + for (const email of session.emails) { + try { + const startTime = Date.now(); + + // Send MAIL FROM + await this.sendSMTPCommandAndWait(`MAIL FROM: <${email.from}>\r\n`); + + // Send RCPT TO for each recipient + for (const recipient of email.to) { + await this.sendSMTPCommandAndWait(`RCPT TO: <${recipient}>\r\n`); + } + + // Send DATA command + await this.sendSMTPCommandAndWait('DATA\r\n'); + + // Send message content with proper escaping + const escapedData = this.escapeMessageData(email.data); + await this.sendSMTPCommandAndWait(escapedData + '\r\n.\r\n'); + + const emailTime = Date.now() - startTime; + this.stats.emailsProcessed++; + this.stats.averageEmailTime = + (this.stats.averageEmailTime * (this.stats.emailsProcessed - 1) + emailTime) / + this.stats.emailsProcessed; + + // Resolve individual email + email.resolve('250 OK'); + + const result: SessionResult = { + emailId: email.id, + success: true, + response: '250 OK' + }; + results.push(result); + + this.emit('emailProcessed', session.id, email.id, emailTime); + this.logger.debug('Email processed successfully', { + sessionId: session.id, + emailId: email.id, + emailTime }); - // Re-queue message - this.insertMessageByPriority(message); - resolve(); - } else { - reject(ErrorFactory.messageError( - (error as Error).message, - message.id, - message.retries - )); + } catch (error) { + this.stats.emailsFailed++; + + // Reject individual email + email.reject(error as Error); + + const result: SessionResult = { + emailId: email.id, + success: false, + response: (error as Error).message, + error: error as Error + }; + results.push(result); + + this.emit('emailFailed', session.id, email.id, error as Error); + this.logger.error('Email processing failed', { + sessionId: session.id, + emailId: email.id, + error: (error as Error).message + }); } } + + // Send QUIT to end session + await this.sendSMTPCommandAndWait('QUIT\r\n'); + + // Close SMTP channel + await this.closeSMTPChannel(); + + // Resolve session + session.resolve(results); + + } catch (error) { + // If session-level error, retry the entire session + if (session.retries > 0) { + session.retries--; + this.logger.debug('Retrying session', { + sessionId: session.id, + retriesLeft: session.retries + }); + + // Re-queue session + this.insertSessionByPriority(session); + return; + } else { + throw ErrorFactory.messageError( + (error as Error).message, + session.id, + session.retries + ); + } + } + } + + /** + * Send SMTP command and wait for response + */ + private async sendSMTPCommandAndWait(command: string): Promise { + return new Promise((resolve, reject) => { + const timeout = setTimeout(() => { + reject(ErrorFactory.timeout('SMTP command', this.config.messageTimeout)); + }, this.config.messageTimeout); + + const onResponse = (message: SMTPFromServerMessage) => { + clearTimeout(timeout); + this.logger.debug('RX SMTP response', { + command: command.trim(), + response: message.data.trim() + }); + + // Check if response indicates success (2xx or 3xx) + const responseCode = parseInt(message.data.substring(0, 3)); + if (responseCode >= 400) { + reject(new MessageError(`SMTP error: ${message.data.trim()}`, 'smtp-response', 0)); + } else { + resolve(message.data); + } + }; + + this.once('smtp_from_server', onResponse); + + // Send the command + this.sendSMTPData(command); }); } + /** + * Escape message data for SMTP transmission + */ + private escapeMessageData(data: string): string { + // Escape lines that start with a dot + return data.replace(/\n\./g, '\n..'); + } + /** * Connect to WebSocket server */ @@ -733,7 +880,7 @@ export class SMTPOverWSClient extends EventEmitter { } else if (this.isProcessingQueue && !this.isShuttingDown) { this.logger.error('Max reconnection attempts reached'); this.setState(ConnectionState.FAILED); - this.rejectAllQueuedMessages(new ConnectionError('Max reconnection attempts reached')); + this.rejectAllQueuedSessions(new ConnectionError('Max reconnection attempts reached')); this.isProcessingQueue = false; } } @@ -809,9 +956,9 @@ export class SMTPOverWSClient extends EventEmitter { this.channelTimer = null; } - if (this.messageTimer) { - clearTimeout(this.messageTimer); - this.messageTimer = null; + if (this.sessionTimer) { + clearTimeout(this.sessionTimer); + this.sessionTimer = null; } this.stopHeartbeat(); @@ -830,54 +977,92 @@ export class SMTPOverWSClient extends EventEmitter { } /** - * Reject all queued messages + * Reject all queued sessions */ - private rejectAllQueuedMessages(error: Error): void { - if (this.currentMessage) { - this.currentMessage.reject(error); - this.currentMessage = null; + private rejectAllQueuedSessions(error: Error): void { + if (this.currentSession) { + this.currentSession.reject(error); + this.currentSession = null; } - while (this.messageQueue.length > 0) { - const message = this.messageQueue.shift()!; - message.reject(error); + while (this.sessionQueue.length > 0) { + const session = this.sessionQueue.shift()!; + session.reject(error); } } + /** - * Insert message into queue based on priority + * Generate unique session ID */ - private insertMessageByPriority(message: QueuedMessage): void { - let insertIndex = this.messageQueue.length; + private generateSessionId(): string { + return `session_${Date.now()}_${++this.sessionIdCounter}`; + } + + /** + * Generate unique email ID + */ + private generateEmailId(): string { + return `email_${Date.now()}_${++this.emailIdCounter}`; + } + + /** + * Add email to existing session or create new session + */ + private addEmailToSession(email: SMTPEmail, priority: MessagePriority, timeout: number, retries: number, options: SessionSendOptions): void { + // Look for existing session with same priority and available space + let targetSession = this.sessionQueue.find(session => + session.priority === priority && + session.emails.length < this.config.maxEmailsPerSession + ); + + if (!targetSession) { + // Create new session + const sessionId = this.generateSessionId(); + targetSession = { + id: sessionId, + emails: [], + timestamp: Date.now(), + retries, + priority, + resolve: (results: SessionResult[]) => { + // Session-level resolve doesn't need to do anything + // Individual emails handle their own resolution + }, + reject: (error: Error) => { + // Reject all emails in the session + for (const email of targetSession!.emails) { + email.reject(error); + } + } + }; + + // Insert session based on priority + this.insertSessionByPriority(targetSession); + + this.stats.sessionsQueued++; + this.emit('sessionQueued', sessionId, this.sessionQueue.length); + } + + // Add email to session + targetSession.emails.push(email); + } + + /** + * Insert session into queue based on priority + */ + private insertSessionByPriority(session: SMTPSessionRequest): void { + let insertIndex = this.sessionQueue.length; // Find insertion point based on priority - for (let i = 0; i < this.messageQueue.length; i++) { - if (message.priority > this.messageQueue[i]!.priority) { + for (let i = 0; i < this.sessionQueue.length; i++) { + if (session.priority > this.sessionQueue[i]!.priority) { insertIndex = i; break; } } - this.messageQueue.splice(insertIndex, 0, message); - } - - /** - * Remove message from queue by ID - */ - private removeMessageFromQueue(messageId: string): boolean { - const index = this.messageQueue.findIndex(msg => msg.id === messageId); - if (index !== -1) { - this.messageQueue.splice(index, 1); - return true; - } - return false; - } - - /** - * Generate unique message ID - */ - private generateMessageId(): string { - return `msg_${Date.now()}_${++this.messageIdCounter}`; + this.sessionQueue.splice(insertIndex, 0, session); } /** @@ -910,12 +1095,15 @@ export class SMTPOverWSClient extends EventEmitter { */ private initializeStats(): ClientStats { return { - messagesQueued: 0, - messagesProcessed: 0, - messagesFailed: 0, + sessionsQueued: 0, + sessionsProcessed: 0, + sessionsFailed: 0, + emailsProcessed: 0, + emailsFailed: 0, reconnectionAttempts: 0, totalConnections: 0, - averageResponseTime: 0, + averageSessionTime: 0, + averageEmailTime: 0, queueSize: 0, connectionUptime: 0 }; diff --git a/src/index.ts b/src/index.ts index 4296522..c66f3de 100644 --- a/src/index.ts +++ b/src/index.ts @@ -10,6 +10,7 @@ export { SMTPOverWsMessageType, ConnectionState, MessagePriority, + SessionState, type SMTPOverWsMessageBase, type AuthenticateMessage, type AuthenticateResponseMessage, @@ -20,12 +21,14 @@ export { type SMTPToServerMessage, type SMTPFromServerMessage, type SMTPOverWsMessage, - type QueuedMessage, + type SMTPEmail, + type SMTPSessionRequest, + type SessionResult, type SMTPClientConfig, type Logger, type ClientStats, type ClientEvents, - type SendOptions + type SessionSendOptions } from './types'; // Export all error classes diff --git a/src/transport.ts b/src/transport.ts index 90f26e2..ac2a7b7 100644 --- a/src/transport.ts +++ b/src/transport.ts @@ -111,7 +111,6 @@ export class SMTPWSTransport extends EventEmitter { ...(this.options.authTimeout !== undefined && { authTimeout: this.options.authTimeout }), ...(this.options.channelTimeout !== undefined && { channelTimeout: this.options.channelTimeout }), ...(this.options.messageTimeout !== undefined && { messageTimeout: this.options.messageTimeout }), - ...(this.options.maxConcurrentMessages !== undefined && { maxConcurrentMessages: this.options.maxConcurrentMessages }), ...(this.options.logger !== undefined && { logger: this.options.logger }), ...(this.options.heartbeatInterval !== undefined && { heartbeatInterval: this.options.heartbeatInterval }) }; @@ -169,8 +168,8 @@ export class SMTPWSTransport extends EventEmitter { */ public async verify(): Promise { try { - // Test full connection cycle: connect -> authenticate -> open SMTP -> close SMTP -> disconnect - await this.client.sendSMTPCommand('EHLO transport-verify\r\n'); + // Test connection by sending a verification email with minimal data + await this.client.queueEmail('verify@test.local', ['verify@test.local'], 'Subject: Verification\r\n\r\nTest'); return true; } catch (error) { const err = error as any; @@ -216,49 +215,46 @@ export class SMTPWSTransport extends EventEmitter { // Get the raw message content from Nodemailer's stream const rawMessage = await this.getRawMessage(mail); - // Build complete SMTP transaction - let smtpTransaction = ''; - - // MAIL FROM - smtpTransaction += `MAIL FROM: <${envelope.from}>\r\n`; - - // RCPT TO for each recipient - for (const recipient of envelope.to) { - smtpTransaction += `RCPT TO: <${recipient}>\r\n`; - } - - // DATA command - smtpTransaction += 'DATA\r\n'; - - // Message content + // Prepare message data for SMTP transmission const messageData = this.prepareMessageData(rawMessage); - smtpTransaction += messageData; - - // QUIT - smtpTransaction += 'QUIT\r\n'; - // Send complete SMTP transaction in one session - const response = await this.client.sendSMTPCommand(smtpTransaction); - - // Parse SMTP response for success/failure - const result = this.parseSmtpResponse(response, envelope, messageId); - - // If there were SMTP errors, throw an appropriate error - if (result.rejected.length > 0 || !this.isSuccessfulResponse(response)) { - const errorDetails = this.extractSmtpError(response); + try { + // Queue email using the new session-based client + const response = await this.client.queueEmail(envelope.from, envelope.to, messageData); + + // Create successful result + const result: SendResult = { + envelope, + messageId, + accepted: envelope.to.slice(), // All recipients accepted + rejected: [], + pending: [], + response + }; + + return result; + + } catch (error) { + // Handle SMTP errors + const result: SendResult = { + envelope, + messageId, + accepted: [], + rejected: envelope.to.slice(), // All recipients rejected on error + pending: [], + response: (error as Error).message + }; + throw new MessageError( - errorDetails.message, + (error as Error).message, messageId, 0, { - smtpCode: errorDetails.code, - smtpResponse: response, - rejectedRecipients: result.rejected + smtpResponse: (error as Error).message, + rejectedRecipients: envelope.to } ); } - - return result; } /** @@ -329,7 +325,7 @@ export class SMTPWSTransport extends EventEmitter { this.emit('close'); }); - this.client.on('messageProcessed', () => { + this.client.on('emailProcessed', () => { this.emit('idle', this._isIdle); }); } diff --git a/src/types.ts b/src/types.ts index eea7f79..27358da 100644 --- a/src/types.ts +++ b/src/types.ts @@ -125,18 +125,68 @@ export enum ConnectionState { } /** - * Queued message structure + * Individual email within a session */ -export interface QueuedMessage { +export interface SMTPEmail { + /** Unique email ID */ id: string; + /** Sender address */ + from: string; + /** Recipient addresses */ + to: string[]; + /** Raw email message data */ data: string; + /** Email-specific resolve callback */ resolve: (response: string) => void; + /** Email-specific reject callback */ reject: (error: Error) => void; - timestamp: number; - retries: number; - priority: MessagePriority; } +/** + * SMTP session request containing multiple emails + */ +export interface SMTPSessionRequest { + /** Unique session ID */ + id: string; + /** List of emails to send in this session */ + emails: SMTPEmail[]; + /** Session timestamp */ + timestamp: number; + /** Number of retry attempts for the entire session */ + retries: number; + /** Session priority */ + priority: MessagePriority; + /** Session-level resolve callback */ + resolve: (results: SessionResult[]) => void; + /** Session-level reject callback */ + reject: (error: Error) => void; +} + +/** + * Result of processing a single email in a session + */ +export interface SessionResult { + /** Email ID */ + emailId: string; + /** Whether email was successfully sent */ + success: boolean; + /** SMTP response for this email */ + response: string; + /** Error if email failed */ + error?: Error; +} + +/** + * Session state enum + */ +export enum SessionState { + IDLE = 'idle', + PROCESSING = 'processing', + FAILED = 'failed', + COMPLETED = 'completed' +} + + /** * Message priority levels */ @@ -172,8 +222,16 @@ export interface SMTPClientConfig { /** Message timeout in milliseconds */ messageTimeout?: number; - /** Maximum number of concurrent messages */ - maxConcurrentMessages?: number; + /** Session timeout in milliseconds (default: 300000 = 5 minutes) */ + sessionTimeout?: number; + + + + /** Maximum emails per session (default: 10) */ + maxEmailsPerSession?: number; + + /** Session batch timeout in milliseconds (default: 1000) */ + sessionBatchTimeout?: number; /** Enable debug logging */ debug?: boolean; @@ -184,7 +242,7 @@ export interface SMTPClientConfig { /** Connection heartbeat interval in milliseconds */ heartbeatInterval?: number; - /** Maximum queue size */ + /** Maximum queue size (number of sessions, not individual messages) */ maxQueueSize?: number; } @@ -202,12 +260,15 @@ export interface Logger { * Client statistics */ export interface ClientStats { - messagesQueued: number; - messagesProcessed: number; - messagesFailed: number; + sessionsQueued: number; + sessionsProcessed: number; + sessionsFailed: number; + emailsProcessed: number; + emailsFailed: number; reconnectionAttempts: number; totalConnections: number; - averageResponseTime: number; + averageSessionTime: number; + averageEmailTime: number; queueSize: number; connectionUptime: number; lastError?: string; @@ -225,9 +286,12 @@ export interface ClientEvents { reconnecting: (attempt: number, maxAttempts: number) => void; reconnected: () => void; error: (error: Error) => void; - messageQueued: (messageId: string, queueSize: number) => void; - messageProcessed: (messageId: string, responseTime: number) => void; - messageFailed: (messageId: string, error: Error) => void; + sessionQueued: (sessionId: string, queueSize: number) => void; + sessionStarted: (sessionId: string, emailCount: number) => void; + sessionCompleted: (sessionId: string, results: SessionResult[]) => void; + sessionFailed: (sessionId: string, error: Error) => void; + emailProcessed: (sessionId: string, emailId: string, responseTime: number) => void; + emailFailed: (sessionId: string, emailId: string, error: Error) => void; queueProcessingStarted: (queueSize: number) => void; queueProcessingCompleted: (processed: number, failed: number) => void; channelOpened: () => void; @@ -246,18 +310,25 @@ export interface ClientEvents { } /** - * Message send options + * Session send options */ -export interface SendOptions { - /** Message priority */ +export interface SessionSendOptions { + /** Session priority */ priority?: MessagePriority; - /** Message timeout in milliseconds */ + /** Session timeout in milliseconds */ timeout?: number; - /** Number of retry attempts */ + /** Number of retry attempts for the entire session */ retries?: number; /** Whether to skip queue and send immediately */ immediate?: boolean; -} \ No newline at end of file + + /** Maximum emails per session (default: 10) */ + maxEmailsPerSession?: number; + + /** Session batch timeout in milliseconds (default: 1000) */ + batchTimeout?: number; +} +