feat: Refactor SMTP client to support session-based email processing, enhancing queue management and error handling

This commit is contained in:
Siwat Sirichai 2025-08-19 13:53:20 +07:00
parent c2aa9b6cf6
commit 069540e310
7 changed files with 951 additions and 480 deletions

2
.gitignore vendored
View file

@ -6,6 +6,8 @@ yarn-error.log*
# Build outputs # Build outputs
lib/ lib/
lib-esm/
dist/ dist/
*.tsbuildinfo *.tsbuildinfo

699
README.md
View file

@ -1,122 +1,232 @@
# @siwatsystem/mxrelay-consumer # @siwats/mxrelay-consumer
[![TypeScript](https://img.shields.io/badge/%3C%2F%3E-TypeScript-%230074c1.svg)](http://www.typescriptlang.org/) [![TypeScript](https://img.shields.io/badge/%3C%2F%3E-TypeScript-%230074c1.svg)](http://www.typescriptlang.org/)
[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT) [![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT)
A production-ready TypeScript client library for SMTP over WebSocket protocol with intelligent queue management, automatic reconnection, and comprehensive error handling. Includes full Nodemailer transport compatibility. A production-ready TypeScript client library for SMTP over WebSocket protocol with **session-based queue management**, automatic reconnection, and comprehensive error handling. Features proper SMTP session protocol implementation with mutex-based session locking.
## Features ## 🚀 Key Features
**Intelligent Queue Management** ### **Session-Based SMTP Protocol**
- Automatic WebSocket connection when messages are queued - **Proper SMTP Sessions**: One session = EHLO → AUTH → Multiple Emails → QUIT
- Priority-based message processing (CRITICAL > HIGH > NORMAL > LOW) - **Session Mutex**: Only one SMTP session active at a time (protocol compliant)
- Configurable queue limits and overflow protection - **Email Batching**: Send multiple emails efficiently within a single session
- Auto-disconnect when queue is empty - **Resource Optimization**: Connect only when needed, disconnect when idle
**Robust Connection Handling** ### **Intelligent Queue Management**
- Automatic reconnection with exponential backoff - **Session Queuing**: Queues complete email sessions, not individual commands
- Connection state management and lifecycle - **Priority-Based Processing**: CRITICAL > HIGH > NORMAL > LOW priority levels
- Heartbeat monitoring and timeout handling - **Configurable Batching**: Control emails per session with `maxEmailsPerSession`
- Graceful connection recovery - **Smart Resource Usage**: Auto-connect/disconnect based on queue state
**High Performance** ### **Production-Ready Reliability**
- Efficient SMTP channel cycling per message - **Comprehensive Error Handling**: Structured error classification and meaningful messages
- Minimal resource usage with smart connection management - **Automatic Reconnection**: Exponential backoff with configurable retry limits
- Concurrent message processing support - **Timeout Management**: Session, email, and connection-level timeouts
- Optimized WebSocket communication - **Connection State Management**: Full lifecycle state tracking
**Enterprise-Grade Reliability** ### **Nodemailer Integration**
- Comprehensive SMTP error handling with meaningful messages - **Full Nodemailer Compatibility**: Drop-in replacement for existing transports
- Timeout management for all operations - **Standard API**: Use familiar Nodemailer methods and options
- Retry logic with configurable attempts - **Advanced Features**: Attachments, HTML, multipart messages, custom headers
- Structured error classification
**Nodemailer Integration** ## 📊 Architecture Overview
- Full Nodemailer transport compatibility
- Transparent bridge for all email features
- Support for attachments, HTML, multipart messages
- Standard Nodemailer API compatibility
## Installation ```mermaid
graph TB
subgraph "Client Application"
App[Your Application]
NM[Nodemailer]
Transport[SMTPWSTransport]
end
```bash subgraph "Session Management"
npm install @siwatsystem/mxrelay-consumer Client[SMTPOverWSClient]
# or Queue[Session Queue]
bun add @siwatsystem/mxrelay-consumer SM[Session Manager]
end
subgraph "Protocol Layer"
WS[WebSocket Connection]
Auth[Authentication]
Channel[SMTP Channel]
end
subgraph "Remote Server"
Relay[SMTP WebSocket Relay]
SMTP[SMTP Server]
end
App --> NM
NM --> Transport
Transport --> Client
Client --> Queue
Queue --> SM
SM --> WS
WS --> Auth
Auth --> Channel
Channel --> Relay
Relay --> SMTP
style Client fill:#e1f5fe
style Queue fill:#f3e5f5
style SM fill:#fff3e0
style Channel fill:#e8f5e8
``` ```
## Quick Start ## 🔄 Session Processing Flow
### Direct Client Usage ```mermaid
sequenceDiagram
participant App as Application
participant Client as SMTPOverWSClient
participant Queue as Session Queue
participant WS as WebSocket
participant Server as SMTP Server
Note over App,Server: Email Submission
App->>Client: queueEmail(from, to[], data)
Client->>Queue: Add to session or create new
Note over App,Server: Session Processing (Mutex)
Queue->>Client: Process next session
Client->>WS: Connect (if needed)
WS->>Server: WebSocket connection
Server-->>WS: Connection established
Client->>Server: AUTHENTICATE
Server-->>Client: AUTH_SUCCESS
Client->>Server: SMTP_CHANNEL_OPEN
Server-->>Client: SMTP_CHANNEL_READY
Server-->>Client: 220 SMTP ready
Client->>Server: EHLO client
Server-->>Client: 250 EHLO OK
Client->>Server: AUTH PLAIN <credentials>
Server-->>Client: 235 AUTH OK
loop For each email in session
Client->>Server: MAIL FROM:<sender>
Server-->>Client: 250 Sender OK
Client->>Server: RCPT TO:<recipient>
Server-->>Client: 250 Recipient OK
Client->>Server: DATA
Server-->>Client: 354 Start data
Client->>Server: <email content>\r\n.
Server-->>Client: 250 Message accepted
Client->>App: Email resolved
end
Client->>Server: QUIT
Server-->>Client: 221 Goodbye
Client->>Server: SMTP_CHANNEL_CLOSE
WS->>WS: Disconnect (if queue empty)
Client->>App: Session completed
```
## 📦 Installation
```bash
npm install @siwats/mxrelay-consumer
# or
bun add @siwats/mxrelay-consumer
```
## 🚀 Quick Start
### Direct Client Usage (New Session-Based API)
```typescript ```typescript
import { SMTPOverWSClient } from '@siwatsystem/mxrelay-consumer'; import { SMTPOverWSClient } from '@siwats/mxrelay-consumer';
const client = new SMTPOverWSClient({ const client = new SMTPOverWSClient({
url: 'wss://api.siwatsystem.com/smtp', url: 'wss://api.siwatsystem.com/smtp',
apiKey: 'your-api-key', apiKey: 'your-api-key',
debug: true debug: true,
maxEmailsPerSession: 5, // Batch up to 5 emails per session
sessionTimeout: 300000 // 5 minute session timeout
}); });
// Send SMTP commands directly // Queue individual emails - they'll be batched into sessions automatically
try { try {
const response = await client.sendSMTPCommand(` // These emails will be sent in the same SMTP session
MAIL FROM: <sender@example.com> const emailId1 = await client.queueEmail(
RCPT TO: <recipient@example.com> 'sender@example.com',
DATA ['recipient1@example.com'],
Subject: Test Email 'Subject: Email 1\r\n\r\nFirst email content'
);
Hello from SMTP over WebSocket! const emailId2 = await client.queueEmail(
. 'sender@example.com',
QUIT ['recipient2@example.com'],
`); 'Subject: Email 2\r\n\r\nSecond email content'
console.log('Email sent:', response); );
console.log('Emails queued:', emailId1, emailId2);
} catch (error) { } catch (error) {
console.error('SMTP error:', error.message); console.error('Email error:', error.message);
} finally { } finally {
await client.shutdown(); await client.shutdown();
} }
``` ```
### Nodemailer Transport ### Nodemailer Transport (Recommended)
```typescript ```typescript
import nodemailer from 'nodemailer'; import nodemailer from 'nodemailer';
import { createTransport } from '@siwatsystem/mxrelay-consumer'; import { createTransport } from '@siwats/mxrelay-consumer';
// Create transport (uses defaults: api.siwatsystem.com:443 secure) // Create transport with session batching
const transport = createTransport('your-api-key');
// Or with custom options
const transport = createTransport('your-api-key', { const transport = createTransport('your-api-key', {
host: 'custom.server.com', host: 'api.siwatsystem.com',
port: 80, port: 443,
secure: false, secure: true,
debug: true debug: true,
maxEmailsPerSession: 10, // Batch up to 10 emails per session
sessionBatchTimeout: 2000 // Wait 2 seconds to batch emails
}); });
const transporter = nodemailer.createTransporter(transport); const transporter = nodemailer.createTransporter(transport);
// Send email using standard Nodemailer API // Send multiple emails - automatically batched into sessions
const info = await transporter.sendMail({ const emails = [
from: 'sender@example.com',
to: 'recipient@example.com',
subject: 'Test Email via SMTP WebSocket',
text: 'Plain text version',
html: '<h1>HTML version</h1>',
attachments: [
{ {
filename: 'document.pdf', from: 'sender@example.com',
path: './document.pdf' to: 'user1@example.com',
subject: 'Welcome Email',
html: '<h1>Welcome to our service!</h1>'
},
{
from: 'sender@example.com',
to: 'user2@example.com',
subject: 'Newsletter',
html: '<h1>Monthly Newsletter</h1>',
attachments: [{ filename: 'report.pdf', path: './report.pdf' }]
}
];
// Send all emails - they'll be efficiently batched into SMTP sessions
for (const email of emails) {
try {
const info = await transporter.sendMail(email);
console.log('Email sent:', info.messageId);
} catch (error) {
console.error('Failed to send email:', error.message);
}
} }
]
});
console.log('Message sent:', info.messageId);
await transport.close(); await transport.close();
``` ```
## Configuration ## ⚙️ Configuration
### Client Configuration ### Client Configuration
@ -124,172 +234,348 @@ await transport.close();
interface SMTPClientConfig { interface SMTPClientConfig {
url: string; // WebSocket server URL url: string; // WebSocket server URL
apiKey: string; // Authentication API key apiKey: string; // Authentication API key
// Session Management
sessionTimeout?: number; // Session timeout (default: 300000ms)
maxEmailsPerSession?: number; // Emails per session (default: 10)
sessionBatchTimeout?: number; // Batch wait time (default: 1000ms)
// Connection Management
debug?: boolean; // Enable debug logging (default: false) debug?: boolean; // Enable debug logging (default: false)
maxQueueSize?: number; // Queue capacity limit (default: 1000) maxQueueSize?: number; // Session queue limit (default: 100)
reconnectInterval?: number; // Reconnect delay (default: 5000ms) reconnectInterval?: number; // Reconnect delay (default: 5000ms)
maxReconnectAttempts?: number; // Max retry attempts (default: 10) maxReconnectAttempts?: number; // Max retry attempts (default: 10)
authTimeout?: number; // Auth timeout (default: 30000ms) authTimeout?: number; // Auth timeout (default: 30000ms)
channelTimeout?: number; // Channel timeout (default: 10000ms) channelTimeout?: number; // Channel timeout (default: 10000ms)
messageTimeout?: number; // Message timeout (default: 60000ms) messageTimeout?: number; // SMTP command timeout (default: 60000ms)
heartbeatInterval?: number; // Heartbeat interval (default: 30000ms) heartbeatInterval?: number; // Heartbeat interval (default: 30000ms)
maxConcurrentMessages?: number; // Concurrent limit (default: 1)
} }
``` ```
### Transport Configuration ### Transport Configuration
```typescript ```typescript
interface TransportOptions { interface TransportOptions extends Omit<SMTPClientConfig, 'url' | 'apiKey'> {
host?: string; // Server host (default: 'api.siwatsystem.com') host?: string; // Server host (default: 'api.siwatsystem.com')
port?: number; // Server port (default: 443) port?: number; // Server port (default: 443)
secure?: boolean; // Use wss:// (default: true) secure?: boolean; // Use wss:// (default: true)
debug?: boolean; // Enable debug mode (default: false) apiKey?: string; // API key for authentication
// ... other SMTPClientConfig options
} }
``` ```
## Advanced Usage ## 🎯 Advanced Usage
### Priority-Based Messaging ### Session Priority and Options
```typescript ```typescript
import { MessagePriority } from '@siwatsystem/mxrelay-consumer'; import { MessagePriority, SessionSendOptions } from '@siwats/mxrelay-consumer';
// High priority (processed first) // High priority email (processed first)
await client.sendSMTPCommand('URGENT EMAIL DATA', { await client.queueEmail(
'urgent@company.com',
['admin@company.com'],
'Subject: URGENT ALERT\r\n\r\nSystem down!',
{
priority: MessagePriority.HIGH, priority: MessagePriority.HIGH,
timeout: 30000 timeout: 30000,
}); retries: 5
}
);
// Critical priority (highest) // Critical priority email (highest priority)
await client.sendSMTPCommand('CRITICAL ALERT EMAIL', { await client.queueEmail(
priority: MessagePriority.CRITICAL 'security@company.com',
}); ['security-team@company.com'],
'Subject: SECURITY BREACH\r\n\r\nImmediate action required!',
{
priority: MessagePriority.CRITICAL,
immediate: true // Skip batching, send immediately
}
);
``` ```
### Event Monitoring ### Session Event Monitoring
```typescript ```typescript
// Session-level events
client.on('sessionQueued', (sessionId, queueSize) => {
console.log(`Session ${sessionId} queued. Queue size: ${queueSize}`);
});
client.on('sessionStarted', (sessionId, emailCount) => {
console.log(`Processing session ${sessionId} with ${emailCount} emails`);
});
client.on('sessionCompleted', (sessionId, results) => {
console.log(`Session ${sessionId} completed with ${results.length} emails`);
});
// Email-level events within sessions
client.on('emailProcessed', (sessionId, emailId, responseTime) => {
console.log(`Email ${emailId} in session ${sessionId} processed in ${responseTime}ms`);
});
client.on('emailFailed', (sessionId, emailId, error) => {
console.error(`Email ${emailId} in session ${sessionId} failed:`, error.message);
});
// Connection events // Connection events
client.on('connected', () => console.log('WebSocket connected')); client.on('connected', () => console.log('WebSocket connected'));
client.on('authenticated', () => console.log('Authentication successful')); client.on('authenticated', () => console.log('Authentication successful'));
client.on('disconnected', () => console.log('Connection lost')); client.on('disconnected', () => console.log('Connection lost'));
// Queue events
client.on('messageQueued', (messageId, queueSize) => {
console.log(`Message ${messageId} queued. Queue size: ${queueSize}`);
});
client.on('messageProcessed', (messageId, responseTime) => {
console.log(`Message ${messageId} processed in ${responseTime}ms`);
});
// Error events
client.on('error', (error) => console.error('Client error:', error));
``` ```
### Statistics and Monitoring ### Session Statistics
```typescript ```typescript
const stats = client.getStats(); const stats = client.getStats();
console.log('Client Statistics:', { console.log('Session Statistics:', {
messagesQueued: stats.messagesQueued, sessionsQueued: stats.sessionsQueued,
messagesProcessed: stats.messagesProcessed, sessionsProcessed: stats.sessionsProcessed,
messagesFailed: stats.messagesFailed, sessionsFailed: stats.sessionsFailed,
averageResponseTime: stats.averageResponseTime, emailsProcessed: stats.emailsProcessed,
emailsFailed: stats.emailsFailed,
averageSessionTime: stats.averageSessionTime,
averageEmailTime: stats.averageEmailTime,
queueSize: stats.queueSize queueSize: stats.queueSize
}); });
console.log('Session State:', client.getSessionState()); // IDLE, PROCESSING, FAILED
console.log('Connection State:', client.getConnectionState()); // DISCONNECTED, CONNECTED, etc.
``` ```
## Error Handling ## 🛡️ Error Handling
### SMTP Error Detection ### SMTP Session Errors
The transport properly detects and categorizes SMTP errors:
```typescript ```typescript
try { try {
await transporter.sendMail({ await client.queueEmail(
from: 'unauthorized@domain.com', // Invalid sender 'invalid-sender@domain.com',
to: 'recipient@example.com', ['recipient@example.com'],
subject: 'Test' 'Subject: Test\r\n\r\nTest message'
}); );
} catch (error) { } catch (error) {
if (error instanceof MessageError) {
console.error('SMTP Error:', error.message); console.error('SMTP Error:', error.message);
// Output: "Sender not authorized: Sender domain not authorized for your IP or subnet" console.log('Email ID:', error.messageId);
console.log('Retry Count:', error.retryCount);
console.log('Error details:', { }
smtpCode: error.context.smtpCode, // "550"
rejectedRecipients: error.context.rejectedRecipients
});
} }
``` ```
### Error Classification ### Session State Management
The client manages session states automatically:
```mermaid
stateDiagram-v2
[*] --> IDLE
IDLE --> PROCESSING: Session queued
PROCESSING --> IDLE: Session completed
PROCESSING --> FAILED: Session error
FAILED --> PROCESSING: Retry session
FAILED --> IDLE: Max retries reached
IDLE --> [*]: Client shutdown
```
- `IDLE` - No active session, ready to process
- `PROCESSING` - Session currently being processed (mutex locked)
- `FAILED` - Session failed, will retry or reject
- `COMPLETED` - Session successfully completed
### Connection States
```mermaid
stateDiagram-v2
[*] --> DISCONNECTED
DISCONNECTED --> CONNECTING: Connect requested
CONNECTING --> CONNECTED: WebSocket established
CONNECTED --> AUTHENTICATING: Send credentials
AUTHENTICATING --> AUTHENTICATED: Auth successful
AUTHENTICATED --> CHANNEL_OPENING: Open SMTP channel
CHANNEL_OPENING --> CHANNEL_READY: Channel established
CHANNEL_READY --> CHANNEL_CLOSED: Session complete
CHANNEL_CLOSED --> AUTHENTICATED: Ready for next session
AUTHENTICATED --> DISCONNECTED: Queue empty
CONNECTING --> RECONNECTING: Connection failed
AUTHENTICATING --> RECONNECTING: Auth failed
CHANNEL_OPENING --> RECONNECTING: Channel failed
RECONNECTING --> CONNECTING: Retry attempt
RECONNECTING --> FAILED: Max retries reached
FAILED --> [*]: Shutdown
DISCONNECTED --> [*]: Shutdown
```
## 🔧 Session Queue Architecture
```mermaid
flowchart TD
subgraph Emails ["📧 Email Submission"]
E1[Email 1 - HIGH]
E2[Email 2 - NORMAL]
E3[Email 3 - HIGH]
E4[Email 4 - CRITICAL]
end
subgraph Queue ["📋 Session Creation"]
SQ[Session Queue]
S1[Session 1 - CRITICAL]
S2[Session 2 - HIGH]
S3[Session 3 - NORMAL]
end
subgraph Processing ["⚡ Session Processing"]
SP[Session Processor]
SC[SMTP Connection]
SM[Session Manager]
end
E1 --> SQ
E2 --> SQ
E3 --> SQ
E4 --> SQ
SQ --> S1
SQ --> S2
SQ --> S3
S1 --> SP
S2 --> SP
S3 --> SP
SP --> SC
SC --> SM
style S1 fill:#ffcdd2
style S2 fill:#fff3e0
style S3 fill:#e8f5e8
style SP fill:#e1f5fe
style SC fill:#f3e5f5
```
## 🏭 Production Configuration
### Optimal Settings
```typescript ```typescript
import { const client = new SMTPOverWSClient({
ConnectionError, url: 'wss://api.siwatsystem.com/smtp',
AuthenticationError, apiKey: process.env.MXRELAY_API_KEY,
MessageError,
TimeoutError
} from '@siwatsystem/mxrelay-consumer';
try { // Production optimizations
await client.sendSMTPCommand('MAIL FROM: <test@example.com>'); debug: false,
} catch (error) { maxEmailsPerSession: 20, // Batch more emails per session
if (error instanceof ConnectionError) { sessionBatchTimeout: 5000, // Wait longer to accumulate emails
console.error('Connection failed:', error.message); sessionTimeout: 600000, // 10 minute session timeout
} else if (error instanceof AuthenticationError) { maxQueueSize: 1000, // Higher queue capacity
console.error('Authentication failed:', error.message); reconnectInterval: 15000, // Longer reconnect delay
} else if (error instanceof MessageError) { maxReconnectAttempts: 3, // Fewer retries in production
console.error('SMTP error:', error.message, 'Code:', error.context.smtpCode); messageTimeout: 120000 // 2 minute SMTP timeout
} else if (error instanceof TimeoutError) { });
console.error('Operation timed out:', error.message);
}
}
``` ```
## Connection States ### Graceful Shutdown
The client manages connection states automatically: ```typescript
process.on('SIGTERM', async () => {
console.log('Shutting down SMTP client...');
try {
// Wait for current sessions to complete
await client.shutdown(60000); // 60 second timeout
console.log('SMTP client shutdown complete');
} catch (error) {
console.error('Forced SMTP client shutdown');
}
process.exit(0);
});
```
- `DISCONNECTED` - No connection ## 📚 API Reference
- `CONNECTING` - Establishing WebSocket connection
- `CONNECTED` - WebSocket connected, authentication pending
- `AUTHENTICATING` - Sending credentials
- `AUTHENTICATED` - Ready for SMTP operations
- `CHANNEL_OPENING` - Opening SMTP channel
- `CHANNEL_READY` - SMTP channel active
- `CHANNEL_CLOSED` - SMTP channel closed
- `RECONNECTING` - Attempting reconnection
- `FAILED` - Connection failed, max retries reached
## Development ### SMTPOverWSClient
This project uses Bun as the primary runtime. TypeScript files can be run directly without building. #### Methods
- `queueEmail(from, to[], data, options?)` - Queue email for session processing
- `getStats()` - Get session and email statistics
- `getConnectionState()` - Get current connection state
- `getSessionState()` - Get current session processing state
- `getQueueSize()` - Get session queue size
- `shutdown(timeout?)` - Graceful shutdown with session completion
#### Events
- **Session Events**: `sessionQueued`, `sessionStarted`, `sessionCompleted`, `sessionFailed`
- **Email Events**: `emailProcessed`, `emailFailed`
- **Connection Events**: `connecting`, `connected`, `authenticated`, `disconnected`, `reconnecting`
- **Queue Events**: `queueProcessingStarted`, `queueProcessingCompleted`
- **State Events**: `stateChanged`, `error`
### createTransport(apiKey, options?)
Creates a Nodemailer-compatible transport with session batching.
- `apiKey` - Your API key for authentication
- `options` - Optional transport and session configuration
## 🔍 Debugging
### Enable Debug Logging
```typescript
const client = new SMTPOverWSClient({
url: 'wss://api.siwatsystem.com/smtp',
apiKey: 'your-api-key',
debug: true // Enable comprehensive logging
});
// Debug output includes:
// - Session creation and batching
// - SMTP protocol commands and responses
// - Connection state transitions
// - Queue processing details
// - Error stack traces
```
### Monitor Session Processing
```typescript
client.on('sessionStarted', (sessionId, emailCount) => {
console.log(`🚀 Starting session ${sessionId} with ${emailCount} emails`);
});
client.on('emailProcessed', (sessionId, emailId, responseTime) => {
console.log(`✅ Email ${emailId} processed in ${responseTime}ms`);
});
client.on('sessionCompleted', (sessionId, results) => {
const successful = results.filter(r => r.success).length;
const failed = results.filter(r => !r.success).length;
console.log(`🎉 Session ${sessionId} complete: ${successful} sent, ${failed} failed`);
});
```
## 🧪 Development
### Setup ### Setup
```bash ```bash
# Clone repository # Clone repository
git clone https://git.siwatsystem.com/siwat/mxrelay-consumer.git git clone https://git.siwatsystem.com/siwatsystem-public/mxrelay-consumer.git
cd mxrelay-consumer cd mxrelay-consumer
# Install dependencies # Install dependencies
bun install bun install
# Run examples directly # Run examples directly (TypeScript)
bun run examples/nodemailer-transport.ts bun run examples/nodemailer-transport.ts
# Run with environment variable
MXRELAY_API_KEY=your-key bun run examples/nodemailer-transport.ts
``` ```
### Build & Test ### Build & Test
```bash ```bash
# Build (optional - bun runs TypeScript directly) # Build all formats
bun run build bun run build
# Run tests # Run tests
@ -302,90 +588,15 @@ bun run lint
bun run format bun run format
``` ```
## Protocol Implementation ## 📄 License
### WebSocket Message Types
- `AUTHENTICATE` / `AUTHENTICATE_RESPONSE` - Authentication flow
- `SMTP_CHANNEL_OPEN` / `SMTP_CHANNEL_READY` - Channel management
- `SMTP_CHANNEL_CLOSED` / `SMTP_CHANNEL_ERROR` - Channel lifecycle
- `SMTP_TO_SERVER` / `SMTP_FROM_SERVER` - SMTP data exchange
### Connection Flow
1. **WebSocket Connection** - Connect to relay server
2. **Authentication** - Authenticate using API key
3. **Channel Management** - Open SMTP channel per message
4. **Data Transfer** - Exchange SMTP commands and responses
5. **Cleanup** - Close channel and disconnect when queue empty
## Best Practices
### Production Configuration
```typescript
const client = new SMTPOverWSClient({
url: 'wss://api.siwatsystem.com/smtp',
apiKey: process.env.MXRELAY_API_KEY,
// Production settings
debug: false,
maxQueueSize: 5000,
reconnectInterval: 10000,
maxReconnectAttempts: 5,
messageTimeout: 120000
});
```
### Graceful Shutdown
```typescript
process.on('SIGTERM', async () => {
console.log('Shutting down...');
try {
await client.shutdown(30000); // 30 second timeout
console.log('Shutdown complete');
} catch (error) {
console.error('Forced shutdown');
}
process.exit(0);
});
```
## API Reference
### createTransport(apiKey, options?)
Creates a Nodemailer-compatible transport.
- `apiKey` - Your API key for authentication
- `options` - Optional transport configuration
### SMTPOverWSClient
Main client class for direct SMTP operations.
#### Methods
- `sendSMTPCommand(data, options?)` - Send SMTP command
- `getStats()` - Get client statistics
- `getConnectionState()` - Get current state
- `getQueueSize()` - Get queue size
- `shutdown(timeout?)` - Graceful shutdown
#### Events
- Connection: `connecting`, `connected`, `authenticated`, `disconnected`
- Queue: `messageQueued`, `messageProcessed`, `messageFailed`
- State: `stateChanged`, `error`
## License
MIT License - see [LICENSE](LICENSE) file for details. MIT License - see [LICENSE](LICENSE) file for details.
## Support ## 🆘 Support
- Issues: [Git Repository Issues](https://git.siwatsystem.com/siwat/mxrelay-consumer/issues) - **Issues**: [Git Repository Issues](https://git.siwatsystem.com/siwatsystem-public/mxrelay-consumer/issues)
- Documentation: [Project Repository](https://git.siwatsystem.com/siwat/mxrelay-consumer) - **Documentation**: [Project Repository](https://git.siwatsystem.com/siwatsystem-public/mxrelay-consumer)
--- ---
Built by SiwatSystem **Built by SiwatSystem** | *Session-based SMTP over WebSocket*

View file

@ -1,6 +1,6 @@
{ {
"name": "@siwats/mxrelay-consumer", "name": "@siwats/mxrelay-consumer",
"version": "1.0.0", "version": "1.1.0",
"description": "An internal TypeScript client library for transporting SMTP messages", "description": "An internal TypeScript client library for transporting SMTP messages",
"main": "lib/index.js", "main": "lib/index.js",
"module": "lib/index.esm.js", "module": "lib/index.esm.js",
@ -27,9 +27,9 @@
"postinstall": "npm run build", "postinstall": "npm run build",
"build": "bun run build:cjs && bun run build:esm && bun run build:types", "build": "bun run build:cjs && bun run build:esm && bun run build:types",
"build:cjs": "tsc -p tsconfig.cjs.json", "build:cjs": "tsc -p tsconfig.cjs.json",
"build:esm": "echo 'ESM build disabled - use CommonJS for now'", "build:esm": "tsc -p tsconfig.esm.json && cp lib-esm/index.js lib/index.esm.js && cp lib-esm/types.js lib/types.esm.js",
"build:types": "tsc -p tsconfig.types.json", "build:types": "tsc -p tsconfig.types.json",
"clean": "rimraf lib", "clean": "rimraf lib lib-esm",
"prepublishOnly": "npm run clean && npm run build", "prepublishOnly": "npm run clean && npm run build",
"lint": "eslint src/**/*.ts && tsc --noEmit", "lint": "eslint src/**/*.ts && tsc --noEmit",
"lint:tsc": "tsc --noEmit", "lint:tsc": "tsc --noEmit",
@ -61,12 +61,12 @@
"license": "MIT", "license": "MIT",
"repository": { "repository": {
"type": "git", "type": "git",
"url": "git+https://github.com/siwats/smtp-ws-relay-client.git" "url": "git+https://git.siwatsystem.com/siwatsystem-public/mxrelay-consumer.git"
}, },
"bugs": { "bugs": {
"url": "https://github.com/siwats/smtp-ws-relay-client/issues" "url": "https://git.siwatsystem.com/siwatsystem-public/mxrelay-consumer/issues"
}, },
"homepage": "https://github.com/siwats/smtp-ws-relay-client#readme", "homepage": "https://git.siwatsystem.com/siwatsystem-public/mxrelay-consumer#readme",
"engines": { "engines": {
"node": ">=16.0.0" "node": ">=16.0.0"
}, },

View file

@ -8,13 +8,16 @@ import {
SMTPOverWsMessage, SMTPOverWsMessage,
SMTPOverWsMessageType, SMTPOverWsMessageType,
ConnectionState, ConnectionState,
QueuedMessage, SMTPSessionRequest,
SMTPEmail,
SessionResult,
SessionState,
MessagePriority, MessagePriority,
SMTPClientConfig, SMTPClientConfig,
Logger, Logger,
ClientStats, ClientStats,
ClientEvents, ClientEvents,
SendOptions, SessionSendOptions,
AuthenticateMessage, AuthenticateMessage,
SMTPChannelOpenMessage, SMTPChannelOpenMessage,
SMTPToServerMessage, SMTPToServerMessage,
@ -72,20 +75,22 @@ export class SMTPOverWSClient extends EventEmitter {
private config: Required<SMTPClientConfig>; private config: Required<SMTPClientConfig>;
private ws: WebSocket | null = null; private ws: WebSocket | null = null;
private state: ConnectionState = ConnectionState.DISCONNECTED; private state: ConnectionState = ConnectionState.DISCONNECTED;
private messageQueue: QueuedMessage[] = []; private sessionQueue: SMTPSessionRequest[] = [];
private currentMessage: QueuedMessage | null = null; private currentSession: SMTPSessionRequest | null = null;
private sessionState: SessionState = SessionState.IDLE;
private reconnectAttempts = 0; private reconnectAttempts = 0;
private reconnectTimer: NodeJS.Timeout | null = null; private reconnectTimer: NodeJS.Timeout | null = null;
private authTimer: NodeJS.Timeout | null = null; private authTimer: NodeJS.Timeout | null = null;
private channelTimer: NodeJS.Timeout | null = null; private channelTimer: NodeJS.Timeout | null = null;
private heartbeatTimer: NodeJS.Timeout | null = null; private heartbeatTimer: NodeJS.Timeout | null = null;
private messageTimer: NodeJS.Timeout | null = null; private sessionTimer: NodeJS.Timeout | null = null;
private isProcessingQueue = false; private isProcessingQueue = false;
private isShuttingDown = false; private isShuttingDown = false;
private logger: Logger; private logger: Logger;
private stats: ClientStats; private stats: ClientStats;
private connectionStartTime: number = 0; private connectionStartTime: number = 0;
private messageIdCounter = 0; private sessionIdCounter = 0;
private emailIdCounter = 0;
constructor(config: SMTPClientConfig) { constructor(config: SMTPClientConfig) {
super(); super();
@ -100,10 +105,12 @@ export class SMTPOverWSClient extends EventEmitter {
authTimeout: 30000, authTimeout: 30000,
channelTimeout: 10000, channelTimeout: 10000,
messageTimeout: 60000, messageTimeout: 60000,
maxConcurrentMessages: 1, sessionTimeout: 300000, // 5 minutes
maxEmailsPerSession: 10,
sessionBatchTimeout: 1000,
debug: false, debug: false,
heartbeatInterval: 30000, heartbeatInterval: 30000,
maxQueueSize: 1000, maxQueueSize: 100, // sessions, not individual messages
...config, ...config,
logger: config.logger || new DefaultLogger(config.debug ?? false) logger: config.logger || new DefaultLogger(config.debug ?? false)
}; };
@ -118,61 +125,50 @@ export class SMTPOverWSClient extends EventEmitter {
} }
/** /**
* Send SMTP command with automatic queue management * Queue an email for sending in a session
*/ */
public async sendSMTPCommand(data: string, options: SendOptions = {}): Promise<string> { public async queueEmail(from: string, to: string[], data: string, options: SessionSendOptions = {}): Promise<string> {
if (this.isShuttingDown) { if (this.isShuttingDown) {
throw new ShutdownError('Client is shutting down'); throw new ShutdownError('Client is shutting down');
} }
// Check queue size limit // Check queue size limit
if (this.messageQueue.length >= this.config.maxQueueSize) { if (this.sessionQueue.length >= this.config.maxQueueSize) {
throw ErrorFactory.queueError( throw ErrorFactory.queueError(
`Queue is full (${this.config.maxQueueSize} messages)`, `Session queue is full (${this.config.maxQueueSize} sessions)`,
this.messageQueue.length this.sessionQueue.length
); );
} }
const messageId = this.generateMessageId(); const emailId = this.generateEmailId();
const priority = options.priority ?? MessagePriority.NORMAL; const priority = options.priority ?? MessagePriority.NORMAL;
const timeout = options.timeout ?? this.config.messageTimeout; const timeout = options.timeout ?? this.config.sessionTimeout;
const retries = options.retries ?? 3; const retries = options.retries ?? 3;
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
const queuedMessage: QueuedMessage = { const email: SMTPEmail = {
id: messageId, id: emailId,
from,
to,
data, data,
resolve, resolve,
reject, reject
timestamp: Date.now(),
retries,
priority
}; };
// Insert message based on priority // Try to add to existing session or create new one
this.insertMessageByPriority(queuedMessage); this.addEmailToSession(email, priority, timeout, retries, options);
this.stats.messagesQueued++; this.logger.debug('Email queued', {
this.emit('messageQueued', messageId, this.messageQueue.length); emailId,
from,
this.logger.debug('Message queued', { to: to.length,
messageId,
priority, priority,
queueSize: this.messageQueue.length, queueSize: this.sessionQueue.length,
data: data.substring(0, 100) + (data.length > 100 ? '...' : '') dataSize: data.length
}); });
// Set message timeout
setTimeout(() => {
if (this.messageQueue.includes(queuedMessage) || this.currentMessage === queuedMessage) {
const error = ErrorFactory.timeout('Message', timeout, { messageId });
this.removeMessageFromQueue(messageId);
reject(error);
}
}, timeout);
// Start processing if not already running // Start processing if not already running
if (!this.isProcessingQueue) { if (!this.isProcessingQueue && this.sessionState === SessionState.IDLE) {
this.processQueue().catch(error => { this.processQueue().catch(error => {
this.logger.error('Queue processing failed', error); this.logger.error('Queue processing failed', error);
}); });
@ -180,13 +176,44 @@ export class SMTPOverWSClient extends EventEmitter {
}); });
} }
/**
* Legacy method for backward compatibility
* @deprecated Use queueEmail instead
*/
public async sendSMTPCommand(data: string, options: any = {}): Promise<string> {
// Try to parse basic SMTP transaction
const lines = data.split('\r\n').filter(line => line.trim());
let from = '';
let to: string[] = [];
let messageData = '';
for (const line of lines) {
if (line.startsWith('MAIL FROM:')) {
from = line.match(/<([^>]+)>/)?.[1] || '';
} else if (line.startsWith('RCPT TO:')) {
const recipient = line.match(/<([^>]+)>/)?.[1];
if (recipient) to.push(recipient);
} else if (line === 'DATA') {
// Everything after DATA is message content
messageData = lines.slice(lines.indexOf(line) + 1, lines.indexOf('QUIT')).join('\r\n');
break;
}
}
if (!from || to.length === 0) {
throw new MessageError('Invalid SMTP transaction format', 'parse-error', 0);
}
return this.queueEmail(from, to, messageData, options);
}
/** /**
* Get current client statistics * Get current client statistics
*/ */
public getStats(): ClientStats { public getStats(): ClientStats {
return { return {
...this.stats, ...this.stats,
queueSize: this.messageQueue.length, queueSize: this.sessionQueue.length,
connectionUptime: this.connectionStartTime > 0 ? Date.now() - this.connectionStartTime : 0 connectionUptime: this.connectionStartTime > 0 ? Date.now() - this.connectionStartTime : 0
}; };
} }
@ -202,22 +229,32 @@ export class SMTPOverWSClient extends EventEmitter {
* Get current queue size * Get current queue size
*/ */
public getQueueSize(): number { public getQueueSize(): number {
return this.messageQueue.length; return this.sessionQueue.length;
} }
/** /**
* Clear all queued messages * Get current session state
*/ */
public clearQueue(): void { public getSessionState(): SessionState {
const clearedCount = this.messageQueue.length; return this.sessionState;
// Reject all queued messages
for (const message of this.messageQueue) {
message.reject(new QueueError('Queue cleared'));
} }
this.messageQueue = []; /**
this.logger.info('Queue cleared', { clearedCount }); * Clear all queued sessions
*/
public clearQueue(): void {
const clearedCount = this.sessionQueue.length;
// Reject all queued sessions
for (const session of this.sessionQueue) {
for (const email of session.emails) {
email.reject(new QueueError('Queue cleared'));
}
session.reject(new QueueError('Queue cleared'));
}
this.sessionQueue = [];
this.logger.info('Session queue cleared', { clearedCount });
} }
/** /**
@ -229,7 +266,7 @@ export class SMTPOverWSClient extends EventEmitter {
} }
this.isShuttingDown = true; this.isShuttingDown = true;
this.logger.info('Initiating client shutdown', { queueSize: this.messageQueue.length }); this.logger.info('Initiating client shutdown', { queueSize: this.sessionQueue.length });
// Stop accepting new messages and clear timers // Stop accepting new messages and clear timers
this.clearTimers(); this.clearTimers();
@ -248,8 +285,8 @@ export class SMTPOverWSClient extends EventEmitter {
this.logger.warn('Shutdown timeout reached, forcing shutdown', error); this.logger.warn('Shutdown timeout reached, forcing shutdown', error);
} }
// Reject any remaining messages // Reject any remaining sessions
this.rejectAllQueuedMessages(new ShutdownError('Client shutting down')); this.rejectAllQueuedSessions(new ShutdownError('Client shutting down'));
// Close connection // Close connection
await this.disconnect(); await this.disconnect();
@ -261,17 +298,18 @@ export class SMTPOverWSClient extends EventEmitter {
} }
/** /**
* Process the message queue * Process the session queue
*/ */
private async processQueue(): Promise<void> { private async processQueue(): Promise<void> {
if (this.isProcessingQueue || this.messageQueue.length === 0 || this.isShuttingDown) { if (this.isProcessingQueue || this.sessionQueue.length === 0 || this.isShuttingDown || this.sessionState !== SessionState.IDLE) {
return; return;
} }
this.isProcessingQueue = true; this.isProcessingQueue = true;
this.emit('queueProcessingStarted', this.messageQueue.length); this.sessionState = SessionState.PROCESSING;
this.emit('queueProcessingStarted', this.sessionQueue.length);
this.logger.info('Queue processing started', { queueSize: this.messageQueue.length }); this.logger.info('Session queue processing started', { queueSize: this.sessionQueue.length });
let processed = 0; let processed = 0;
let failed = 0; let failed = 0;
@ -282,109 +320,218 @@ export class SMTPOverWSClient extends EventEmitter {
await this.connect(); await this.connect();
} }
// Process messages sequentially one at a time // Process sessions sequentially one at a time (SMTP sessions are mutex)
while (this.messageQueue.length > 0 && !this.isShuttingDown) { while (this.sessionQueue.length > 0 && !this.isShuttingDown) {
const message = this.messageQueue.shift()!; const session = this.sessionQueue.shift()!;
try { try {
this.currentMessage = message; this.currentSession = session;
const startTime = Date.now(); const startTime = Date.now();
await this.processMessage(message); await this.processSession(session);
const responseTime = Date.now() - startTime; const sessionTime = Date.now() - startTime;
this.stats.messagesProcessed++; this.stats.sessionsProcessed++;
this.stats.averageResponseTime = this.stats.averageSessionTime =
(this.stats.averageResponseTime * (this.stats.messagesProcessed - 1) + responseTime) / (this.stats.averageSessionTime * (this.stats.sessionsProcessed - 1) + sessionTime) /
this.stats.messagesProcessed; this.stats.sessionsProcessed;
this.emit('messageProcessed', message.id, responseTime); this.emit('sessionCompleted', session.id, []);
this.logger.debug('Message processed successfully', { this.logger.debug('Session processed successfully', {
messageId: message.id, sessionId: session.id,
responseTime emailCount: session.emails.length,
sessionTime
}); });
processed++; processed++;
} catch (error) { } catch (error) {
this.stats.messagesFailed++; this.stats.sessionsFailed++;
this.stats.lastError = (error as Error).message; this.stats.lastError = (error as Error).message;
this.stats.lastErrorTime = new Date(); this.stats.lastErrorTime = new Date();
this.emit('messageFailed', message.id, error as Error); this.emit('sessionFailed', session.id, error as Error);
this.logger.error('Message processing failed', { this.logger.error('Session processing failed', {
messageId: message.id, sessionId: session.id,
error: (error as Error).message error: (error as Error).message
}); });
failed++; failed++;
message.reject(error as Error); session.reject(error as Error);
} finally { } finally {
this.currentMessage = null; this.currentSession = null;
} }
} }
} catch (error) { } catch (error) {
this.logger.error('Queue processing error', error); this.logger.error('Queue processing error', error);
this.rejectAllQueuedMessages(error as Error); this.rejectAllQueuedSessions(error as Error);
failed += this.messageQueue.length; failed += this.sessionQueue.length;
} finally { } finally {
this.isProcessingQueue = false; this.isProcessingQueue = false;
this.sessionState = SessionState.IDLE;
// Disconnect if queue is empty and not shutting down // Disconnect if queue is empty and not shutting down
if (this.messageQueue.length === 0 && !this.isShuttingDown) { if (this.sessionQueue.length === 0 && !this.isShuttingDown) {
await this.disconnect(); await this.disconnect();
} }
this.emit('queueProcessingCompleted', processed, failed); this.emit('queueProcessingCompleted', processed, failed);
this.logger.info('Queue processing completed', { processed, failed }); this.logger.info('Session queue processing completed', { processed, failed });
} }
} }
/** /**
* Process a single message * Process a single SMTP session with multiple emails
*/ */
private async processMessage(message: QueuedMessage): Promise<void> { private async processSession(session: SMTPSessionRequest): Promise<void> {
return new Promise(async (resolve, reject) => { this.logger.info('Processing SMTP session', {
sessionId: session.id,
emailCount: session.emails.length
});
this.emit('sessionStarted', session.id, session.emails.length);
try { try {
// Open SMTP channel // Open SMTP channel once for the entire session
await this.openSMTPChannel(); await this.openSMTPChannel();
// Wait for SMTP response const results: SessionResult[] = [];
const responsePromise = this.waitForSMTPResponse(message.id);
// Send SMTP data // Process each email in the session
this.sendSMTPData(message.data); for (const email of session.emails) {
try {
const startTime = Date.now();
// Wait for response // Send MAIL FROM
const response = await responsePromise; await this.sendSMTPCommandAndWait(`MAIL FROM: <${email.from}>\r\n`);
message.resolve(response);
// Send RCPT TO for each recipient
for (const recipient of email.to) {
await this.sendSMTPCommandAndWait(`RCPT TO: <${recipient}>\r\n`);
}
// Send DATA command
await this.sendSMTPCommandAndWait('DATA\r\n');
// Send message content with proper escaping
const escapedData = this.escapeMessageData(email.data);
await this.sendSMTPCommandAndWait(escapedData + '\r\n.\r\n');
const emailTime = Date.now() - startTime;
this.stats.emailsProcessed++;
this.stats.averageEmailTime =
(this.stats.averageEmailTime * (this.stats.emailsProcessed - 1) + emailTime) /
this.stats.emailsProcessed;
// Resolve individual email
email.resolve('250 OK');
const result: SessionResult = {
emailId: email.id,
success: true,
response: '250 OK'
};
results.push(result);
this.emit('emailProcessed', session.id, email.id, emailTime);
this.logger.debug('Email processed successfully', {
sessionId: session.id,
emailId: email.id,
emailTime
});
} catch (error) {
this.stats.emailsFailed++;
// Reject individual email
email.reject(error as Error);
const result: SessionResult = {
emailId: email.id,
success: false,
response: (error as Error).message,
error: error as Error
};
results.push(result);
this.emit('emailFailed', session.id, email.id, error as Error);
this.logger.error('Email processing failed', {
sessionId: session.id,
emailId: email.id,
error: (error as Error).message
});
}
}
// Send QUIT to end session
await this.sendSMTPCommandAndWait('QUIT\r\n');
// Close SMTP channel // Close SMTP channel
await this.closeSMTPChannel(); await this.closeSMTPChannel();
resolve(); // Resolve session
session.resolve(results);
} catch (error) { } catch (error) {
// Retry logic // If session-level error, retry the entire session
if (message.retries > 0) { if (session.retries > 0) {
message.retries--; session.retries--;
this.logger.debug('Retrying message', { this.logger.debug('Retrying session', {
messageId: message.id, sessionId: session.id,
retriesLeft: message.retries retriesLeft: session.retries
}); });
// Re-queue message // Re-queue session
this.insertMessageByPriority(message); this.insertSessionByPriority(session);
resolve(); return;
} else { } else {
reject(ErrorFactory.messageError( throw ErrorFactory.messageError(
(error as Error).message, (error as Error).message,
message.id, session.id,
message.retries session.retries
)); );
} }
} }
}
/**
* Send SMTP command and wait for response
*/
private async sendSMTPCommandAndWait(command: string): Promise<string> {
return new Promise((resolve, reject) => {
const timeout = setTimeout(() => {
reject(ErrorFactory.timeout('SMTP command', this.config.messageTimeout));
}, this.config.messageTimeout);
const onResponse = (message: SMTPFromServerMessage) => {
clearTimeout(timeout);
this.logger.debug('RX SMTP response', {
command: command.trim(),
response: message.data.trim()
}); });
// Check if response indicates success (2xx or 3xx)
const responseCode = parseInt(message.data.substring(0, 3));
if (responseCode >= 400) {
reject(new MessageError(`SMTP error: ${message.data.trim()}`, 'smtp-response', 0));
} else {
resolve(message.data);
}
};
this.once('smtp_from_server', onResponse);
// Send the command
this.sendSMTPData(command);
});
}
/**
* Escape message data for SMTP transmission
*/
private escapeMessageData(data: string): string {
// Escape lines that start with a dot
return data.replace(/\n\./g, '\n..');
} }
/** /**
@ -733,7 +880,7 @@ export class SMTPOverWSClient extends EventEmitter {
} else if (this.isProcessingQueue && !this.isShuttingDown) { } else if (this.isProcessingQueue && !this.isShuttingDown) {
this.logger.error('Max reconnection attempts reached'); this.logger.error('Max reconnection attempts reached');
this.setState(ConnectionState.FAILED); this.setState(ConnectionState.FAILED);
this.rejectAllQueuedMessages(new ConnectionError('Max reconnection attempts reached')); this.rejectAllQueuedSessions(new ConnectionError('Max reconnection attempts reached'));
this.isProcessingQueue = false; this.isProcessingQueue = false;
} }
} }
@ -809,9 +956,9 @@ export class SMTPOverWSClient extends EventEmitter {
this.channelTimer = null; this.channelTimer = null;
} }
if (this.messageTimer) { if (this.sessionTimer) {
clearTimeout(this.messageTimer); clearTimeout(this.sessionTimer);
this.messageTimer = null; this.sessionTimer = null;
} }
this.stopHeartbeat(); this.stopHeartbeat();
@ -830,54 +977,92 @@ export class SMTPOverWSClient extends EventEmitter {
} }
/** /**
* Reject all queued messages * Reject all queued sessions
*/ */
private rejectAllQueuedMessages(error: Error): void { private rejectAllQueuedSessions(error: Error): void {
if (this.currentMessage) { if (this.currentSession) {
this.currentMessage.reject(error); this.currentSession.reject(error);
this.currentMessage = null; this.currentSession = null;
} }
while (this.messageQueue.length > 0) { while (this.sessionQueue.length > 0) {
const message = this.messageQueue.shift()!; const session = this.sessionQueue.shift()!;
message.reject(error); session.reject(error);
} }
} }
/**
* Generate unique session ID
*/
private generateSessionId(): string {
return `session_${Date.now()}_${++this.sessionIdCounter}`;
}
/** /**
* Insert message into queue based on priority * Generate unique email ID
*/ */
private insertMessageByPriority(message: QueuedMessage): void { private generateEmailId(): string {
let insertIndex = this.messageQueue.length; return `email_${Date.now()}_${++this.emailIdCounter}`;
}
/**
* Add email to existing session or create new session
*/
private addEmailToSession(email: SMTPEmail, priority: MessagePriority, timeout: number, retries: number, options: SessionSendOptions): void {
// Look for existing session with same priority and available space
let targetSession = this.sessionQueue.find(session =>
session.priority === priority &&
session.emails.length < this.config.maxEmailsPerSession
);
if (!targetSession) {
// Create new session
const sessionId = this.generateSessionId();
targetSession = {
id: sessionId,
emails: [],
timestamp: Date.now(),
retries,
priority,
resolve: (results: SessionResult[]) => {
// Session-level resolve doesn't need to do anything
// Individual emails handle their own resolution
},
reject: (error: Error) => {
// Reject all emails in the session
for (const email of targetSession!.emails) {
email.reject(error);
}
}
};
// Insert session based on priority
this.insertSessionByPriority(targetSession);
this.stats.sessionsQueued++;
this.emit('sessionQueued', sessionId, this.sessionQueue.length);
}
// Add email to session
targetSession.emails.push(email);
}
/**
* Insert session into queue based on priority
*/
private insertSessionByPriority(session: SMTPSessionRequest): void {
let insertIndex = this.sessionQueue.length;
// Find insertion point based on priority // Find insertion point based on priority
for (let i = 0; i < this.messageQueue.length; i++) { for (let i = 0; i < this.sessionQueue.length; i++) {
if (message.priority > this.messageQueue[i]!.priority) { if (session.priority > this.sessionQueue[i]!.priority) {
insertIndex = i; insertIndex = i;
break; break;
} }
} }
this.messageQueue.splice(insertIndex, 0, message); this.sessionQueue.splice(insertIndex, 0, session);
}
/**
* Remove message from queue by ID
*/
private removeMessageFromQueue(messageId: string): boolean {
const index = this.messageQueue.findIndex(msg => msg.id === messageId);
if (index !== -1) {
this.messageQueue.splice(index, 1);
return true;
}
return false;
}
/**
* Generate unique message ID
*/
private generateMessageId(): string {
return `msg_${Date.now()}_${++this.messageIdCounter}`;
} }
/** /**
@ -910,12 +1095,15 @@ export class SMTPOverWSClient extends EventEmitter {
*/ */
private initializeStats(): ClientStats { private initializeStats(): ClientStats {
return { return {
messagesQueued: 0, sessionsQueued: 0,
messagesProcessed: 0, sessionsProcessed: 0,
messagesFailed: 0, sessionsFailed: 0,
emailsProcessed: 0,
emailsFailed: 0,
reconnectionAttempts: 0, reconnectionAttempts: 0,
totalConnections: 0, totalConnections: 0,
averageResponseTime: 0, averageSessionTime: 0,
averageEmailTime: 0,
queueSize: 0, queueSize: 0,
connectionUptime: 0 connectionUptime: 0
}; };

View file

@ -10,6 +10,7 @@ export {
SMTPOverWsMessageType, SMTPOverWsMessageType,
ConnectionState, ConnectionState,
MessagePriority, MessagePriority,
SessionState,
type SMTPOverWsMessageBase, type SMTPOverWsMessageBase,
type AuthenticateMessage, type AuthenticateMessage,
type AuthenticateResponseMessage, type AuthenticateResponseMessage,
@ -20,12 +21,14 @@ export {
type SMTPToServerMessage, type SMTPToServerMessage,
type SMTPFromServerMessage, type SMTPFromServerMessage,
type SMTPOverWsMessage, type SMTPOverWsMessage,
type QueuedMessage, type SMTPEmail,
type SMTPSessionRequest,
type SessionResult,
type SMTPClientConfig, type SMTPClientConfig,
type Logger, type Logger,
type ClientStats, type ClientStats,
type ClientEvents, type ClientEvents,
type SendOptions type SessionSendOptions
} from './types'; } from './types';
// Export all error classes // Export all error classes

View file

@ -111,7 +111,6 @@ export class SMTPWSTransport extends EventEmitter {
...(this.options.authTimeout !== undefined && { authTimeout: this.options.authTimeout }), ...(this.options.authTimeout !== undefined && { authTimeout: this.options.authTimeout }),
...(this.options.channelTimeout !== undefined && { channelTimeout: this.options.channelTimeout }), ...(this.options.channelTimeout !== undefined && { channelTimeout: this.options.channelTimeout }),
...(this.options.messageTimeout !== undefined && { messageTimeout: this.options.messageTimeout }), ...(this.options.messageTimeout !== undefined && { messageTimeout: this.options.messageTimeout }),
...(this.options.maxConcurrentMessages !== undefined && { maxConcurrentMessages: this.options.maxConcurrentMessages }),
...(this.options.logger !== undefined && { logger: this.options.logger }), ...(this.options.logger !== undefined && { logger: this.options.logger }),
...(this.options.heartbeatInterval !== undefined && { heartbeatInterval: this.options.heartbeatInterval }) ...(this.options.heartbeatInterval !== undefined && { heartbeatInterval: this.options.heartbeatInterval })
}; };
@ -169,8 +168,8 @@ export class SMTPWSTransport extends EventEmitter {
*/ */
public async verify(): Promise<boolean> { public async verify(): Promise<boolean> {
try { try {
// Test full connection cycle: connect -> authenticate -> open SMTP -> close SMTP -> disconnect // Test connection by sending a verification email with minimal data
await this.client.sendSMTPCommand('EHLO transport-verify\r\n'); await this.client.queueEmail('verify@test.local', ['verify@test.local'], 'Subject: Verification\r\n\r\nTest');
return true; return true;
} catch (error) { } catch (error) {
const err = error as any; const err = error as any;
@ -216,49 +215,46 @@ export class SMTPWSTransport extends EventEmitter {
// Get the raw message content from Nodemailer's stream // Get the raw message content from Nodemailer's stream
const rawMessage = await this.getRawMessage(mail); const rawMessage = await this.getRawMessage(mail);
// Build complete SMTP transaction // Prepare message data for SMTP transmission
let smtpTransaction = '';
// MAIL FROM
smtpTransaction += `MAIL FROM: <${envelope.from}>\r\n`;
// RCPT TO for each recipient
for (const recipient of envelope.to) {
smtpTransaction += `RCPT TO: <${recipient}>\r\n`;
}
// DATA command
smtpTransaction += 'DATA\r\n';
// Message content
const messageData = this.prepareMessageData(rawMessage); const messageData = this.prepareMessageData(rawMessage);
smtpTransaction += messageData;
// QUIT try {
smtpTransaction += 'QUIT\r\n'; // Queue email using the new session-based client
const response = await this.client.queueEmail(envelope.from, envelope.to, messageData);
// Send complete SMTP transaction in one session // Create successful result
const response = await this.client.sendSMTPCommand(smtpTransaction); const result: SendResult = {
envelope,
messageId,
accepted: envelope.to.slice(), // All recipients accepted
rejected: [],
pending: [],
response
};
// Parse SMTP response for success/failure return result;
const result = this.parseSmtpResponse(response, envelope, messageId);
} catch (error) {
// Handle SMTP errors
const result: SendResult = {
envelope,
messageId,
accepted: [],
rejected: envelope.to.slice(), // All recipients rejected on error
pending: [],
response: (error as Error).message
};
// If there were SMTP errors, throw an appropriate error
if (result.rejected.length > 0 || !this.isSuccessfulResponse(response)) {
const errorDetails = this.extractSmtpError(response);
throw new MessageError( throw new MessageError(
errorDetails.message, (error as Error).message,
messageId, messageId,
0, 0,
{ {
smtpCode: errorDetails.code, smtpResponse: (error as Error).message,
smtpResponse: response, rejectedRecipients: envelope.to
rejectedRecipients: result.rejected
} }
); );
} }
return result;
} }
/** /**
@ -329,7 +325,7 @@ export class SMTPWSTransport extends EventEmitter {
this.emit('close'); this.emit('close');
}); });
this.client.on('messageProcessed', () => { this.client.on('emailProcessed', () => {
this.emit('idle', this._isIdle); this.emit('idle', this._isIdle);
}); });
} }

View file

@ -125,18 +125,68 @@ export enum ConnectionState {
} }
/** /**
* Queued message structure * Individual email within a session
*/ */
export interface QueuedMessage { export interface SMTPEmail {
/** Unique email ID */
id: string; id: string;
/** Sender address */
from: string;
/** Recipient addresses */
to: string[];
/** Raw email message data */
data: string; data: string;
/** Email-specific resolve callback */
resolve: (response: string) => void; resolve: (response: string) => void;
/** Email-specific reject callback */
reject: (error: Error) => void; reject: (error: Error) => void;
timestamp: number;
retries: number;
priority: MessagePriority;
} }
/**
* SMTP session request containing multiple emails
*/
export interface SMTPSessionRequest {
/** Unique session ID */
id: string;
/** List of emails to send in this session */
emails: SMTPEmail[];
/** Session timestamp */
timestamp: number;
/** Number of retry attempts for the entire session */
retries: number;
/** Session priority */
priority: MessagePriority;
/** Session-level resolve callback */
resolve: (results: SessionResult[]) => void;
/** Session-level reject callback */
reject: (error: Error) => void;
}
/**
* Result of processing a single email in a session
*/
export interface SessionResult {
/** Email ID */
emailId: string;
/** Whether email was successfully sent */
success: boolean;
/** SMTP response for this email */
response: string;
/** Error if email failed */
error?: Error;
}
/**
* Session state enum
*/
export enum SessionState {
IDLE = 'idle',
PROCESSING = 'processing',
FAILED = 'failed',
COMPLETED = 'completed'
}
/** /**
* Message priority levels * Message priority levels
*/ */
@ -172,8 +222,16 @@ export interface SMTPClientConfig {
/** Message timeout in milliseconds */ /** Message timeout in milliseconds */
messageTimeout?: number; messageTimeout?: number;
/** Maximum number of concurrent messages */ /** Session timeout in milliseconds (default: 300000 = 5 minutes) */
maxConcurrentMessages?: number; sessionTimeout?: number;
/** Maximum emails per session (default: 10) */
maxEmailsPerSession?: number;
/** Session batch timeout in milliseconds (default: 1000) */
sessionBatchTimeout?: number;
/** Enable debug logging */ /** Enable debug logging */
debug?: boolean; debug?: boolean;
@ -184,7 +242,7 @@ export interface SMTPClientConfig {
/** Connection heartbeat interval in milliseconds */ /** Connection heartbeat interval in milliseconds */
heartbeatInterval?: number; heartbeatInterval?: number;
/** Maximum queue size */ /** Maximum queue size (number of sessions, not individual messages) */
maxQueueSize?: number; maxQueueSize?: number;
} }
@ -202,12 +260,15 @@ export interface Logger {
* Client statistics * Client statistics
*/ */
export interface ClientStats { export interface ClientStats {
messagesQueued: number; sessionsQueued: number;
messagesProcessed: number; sessionsProcessed: number;
messagesFailed: number; sessionsFailed: number;
emailsProcessed: number;
emailsFailed: number;
reconnectionAttempts: number; reconnectionAttempts: number;
totalConnections: number; totalConnections: number;
averageResponseTime: number; averageSessionTime: number;
averageEmailTime: number;
queueSize: number; queueSize: number;
connectionUptime: number; connectionUptime: number;
lastError?: string; lastError?: string;
@ -225,9 +286,12 @@ export interface ClientEvents {
reconnecting: (attempt: number, maxAttempts: number) => void; reconnecting: (attempt: number, maxAttempts: number) => void;
reconnected: () => void; reconnected: () => void;
error: (error: Error) => void; error: (error: Error) => void;
messageQueued: (messageId: string, queueSize: number) => void; sessionQueued: (sessionId: string, queueSize: number) => void;
messageProcessed: (messageId: string, responseTime: number) => void; sessionStarted: (sessionId: string, emailCount: number) => void;
messageFailed: (messageId: string, error: Error) => void; sessionCompleted: (sessionId: string, results: SessionResult[]) => void;
sessionFailed: (sessionId: string, error: Error) => void;
emailProcessed: (sessionId: string, emailId: string, responseTime: number) => void;
emailFailed: (sessionId: string, emailId: string, error: Error) => void;
queueProcessingStarted: (queueSize: number) => void; queueProcessingStarted: (queueSize: number) => void;
queueProcessingCompleted: (processed: number, failed: number) => void; queueProcessingCompleted: (processed: number, failed: number) => void;
channelOpened: () => void; channelOpened: () => void;
@ -246,18 +310,25 @@ export interface ClientEvents {
} }
/** /**
* Message send options * Session send options
*/ */
export interface SendOptions { export interface SessionSendOptions {
/** Message priority */ /** Session priority */
priority?: MessagePriority; priority?: MessagePriority;
/** Message timeout in milliseconds */ /** Session timeout in milliseconds */
timeout?: number; timeout?: number;
/** Number of retry attempts */ /** Number of retry attempts for the entire session */
retries?: number; retries?: number;
/** Whether to skip queue and send immediately */ /** Whether to skip queue and send immediately */
immediate?: boolean; immediate?: boolean;
/** Maximum emails per session (default: 10) */
maxEmailsPerSession?: number;
/** Session batch timeout in milliseconds (default: 1000) */
batchTimeout?: number;
} }