const { simpleParser } = require('mailparser'); const db = require('./db'); const emitter = require('./eventEmitter'); const { isRateLimited } = require('./rateLimiter'); const logger = require('./logger'); // 引入 logger // Helper function to convert stream to buffer function streamToBuffer(stream) { return new Promise((resolve, reject) => { const chunks = []; stream.on('data', (chunk) => chunks.push(chunk)); stream.on('error', reject); stream.on('end', () => resolve(Buffer.concat(chunks))); }); } async function saveEmail(stream) { try { // First, buffer the entire stream const emailBuffer = await streamToBuffer(stream); // Now, parse the buffered email content const parsed = await simpleParser(emailBuffer); const recipient = parsed.to ? parsed.to.text : 'undisclosed-recipients'; const sender = parsed.from ? parsed.from.text : 'unknown-sender'; //const rawEmail = emailBuffer.toString(); // 暂时去除 rawEmail,不在保存到数据库 // 在这里进行速率限制检查 if (isRateLimited(sender)) { // 记录被拒绝的事件 logger.warn(`Email from <${sender}> rejected due to rate limiting.`, { sender: sender, recipient: recipient, action: 'rate-limit-reject' }); // 如果被限流,则抛出错误,上游的SMTPServer会处理这个错误并拒绝邮件 const error = new Error(`4.7.1 Domain of <${sender}> has been temporarily blocked due to rate limiting. Please try again later.`); error.responseCode = 421; // "Service not available, closing transmission channel" throw error; } const subject = parsed.subject || 'No Subject'; const body = parsed.text || (parsed.html || ''); // Manually create a timestamp for 'Asia/Shanghai' timezone const received_at = new Date().toLocaleString('sv-SE', { timeZone: 'Asia/Shanghai' }); const [result] = await db.execute( 'INSERT INTO emails (recipient, sender, subject, body, received_at) VALUES (?, ?, ?, ?, ?)', [recipient, sender, subject, body, received_at] ); // const [result] = await db.execute( // 'INSERT INTO emails (recipient, sender, subject, body, raw) VALUES (?, ?, ?, ?, ?)', // [recipient, sender, subject, body, rawEmail] // ); const newEmailId = result.insertId; logger.info(`Email from <${sender}> to <${recipient}> saved with ID: ${newEmailId}`, { sender, recipient, subject, emailId: newEmailId, action: 'email-saved' }); if (parsed.attachments && parsed.attachments.length > 0) { for (const attachment of parsed.attachments) { await db.execute( 'INSERT INTO email_attachments (email_id, filename, content_type, content) VALUES (?, ?, ?, ?)', [newEmailId, attachment.filename, attachment.contentType, attachment.content] ); logger.info(`Attachment ${attachment.filename} saved for email ID: ${newEmailId}`, { filename: attachment.filename, emailId: newEmailId, action: 'attachment-saved' }); } } // Emit an event with the new email's main information const [rows] = await db.execute('SELECT id, sender, recipient, subject, body, received_at FROM emails WHERE id = ?', [newEmailId]); if (rows.length > 0) { emitter.emit('newEmail', rows[0]); logger.info(`Event 'newEmail' emitted for email ID: ${newEmailId}`, { emailId: newEmailId, action: 'event-emitted' }); } } catch (error) { // 如果错误是带有响应码的(例如我们的速率限制错误),它已经被记录过了。 // 我们只记录其他意想不到的错误。 if (!error.responseCode) { logger.error('Failed to save email due to an unexpected error:', { errorMessage: error.message, errorStack: error.stack }); } // 重新抛出错误,以便上游的SMTPServer可以正确处理它。 throw error; } } module.exports = { saveEmail };