103 lines
4.2 KiB
JavaScript
103 lines
4.2 KiB
JavaScript
const { simpleParser } = require('mailparser');
|
||
const db = require('./db');
|
||
const emitter = require('./eventEmitter');
|
||
const { isRateLimited } = require('./rateLimiter');
|
||
const logger = require('./logger'); // 引入 logger
|
||
|
||
// Helper function to convert stream to buffer
|
||
function streamToBuffer(stream) {
|
||
return new Promise((resolve, reject) => {
|
||
const chunks = [];
|
||
stream.on('data', (chunk) => chunks.push(chunk));
|
||
stream.on('error', reject);
|
||
stream.on('end', () => resolve(Buffer.concat(chunks)));
|
||
});
|
||
}
|
||
|
||
async function saveEmail(stream) {
|
||
try {
|
||
// First, buffer the entire stream
|
||
const emailBuffer = await streamToBuffer(stream);
|
||
|
||
// Now, parse the buffered email content
|
||
const parsed = await simpleParser(emailBuffer);
|
||
|
||
const recipient = parsed.to ? parsed.to.text : 'undisclosed-recipients';
|
||
const sender = parsed.from ? parsed.from.text : 'unknown-sender';
|
||
//const rawEmail = emailBuffer.toString(); // 暂时去除 rawEmail,不在保存到数据库
|
||
// 在这里进行速率限制检查
|
||
if (isRateLimited(sender)) {
|
||
// 记录被拒绝的事件
|
||
logger.warn(`Email from <${sender}> rejected due to rate limiting.`, {
|
||
sender: sender,
|
||
recipient: recipient,
|
||
action: 'rate-limit-reject'
|
||
});
|
||
// 如果被限流,则抛出错误,上游的SMTPServer会处理这个错误并拒绝邮件
|
||
const error = new Error(`4.7.1 Domain of <${sender}> has been temporarily blocked due to rate limiting. Please try again later.`);
|
||
error.responseCode = 421; // "Service not available, closing transmission channel"
|
||
throw error;
|
||
}
|
||
|
||
const subject = parsed.subject || 'No Subject';
|
||
const body = parsed.text || (parsed.html || '');
|
||
|
||
const [result] = await db.execute(
|
||
'INSERT INTO emails (recipient, sender, subject, body) VALUES (?, ?, ?, ?)',
|
||
[recipient, sender, subject, body]
|
||
);
|
||
// const [result] = await db.execute(
|
||
// 'INSERT INTO emails (recipient, sender, subject, body, raw) VALUES (?, ?, ?, ?, ?)',
|
||
// [recipient, sender, subject, body, rawEmail]
|
||
// );
|
||
const newEmailId = result.insertId;
|
||
|
||
logger.info(`Email from <${sender}> to <${recipient}> saved with ID: ${newEmailId}`, {
|
||
sender,
|
||
recipient,
|
||
subject,
|
||
emailId: newEmailId,
|
||
action: 'email-saved'
|
||
});
|
||
|
||
if (parsed.attachments && parsed.attachments.length > 0) {
|
||
for (const attachment of parsed.attachments) {
|
||
await db.execute(
|
||
'INSERT INTO email_attachments (email_id, filename, content_type, content) VALUES (?, ?, ?, ?)',
|
||
[newEmailId, attachment.filename, attachment.contentType, attachment.content]
|
||
);
|
||
logger.info(`Attachment ${attachment.filename} saved for email ID: ${newEmailId}`, {
|
||
filename: attachment.filename,
|
||
emailId: newEmailId,
|
||
action: 'attachment-saved'
|
||
});
|
||
}
|
||
}
|
||
|
||
// Emit an event with the new email's main information
|
||
const [rows] = await db.execute('SELECT id, sender, recipient, subject, body, received_at FROM emails WHERE id = ?', [newEmailId]);
|
||
if (rows.length > 0) {
|
||
emitter.emit('newEmail', rows[0]);
|
||
logger.info(`Event 'newEmail' emitted for email ID: ${newEmailId}`, {
|
||
emailId: newEmailId,
|
||
action: 'event-emitted'
|
||
});
|
||
}
|
||
|
||
} catch (error) {
|
||
// 如果错误是带有响应码的(例如我们的速率限制错误),它已经被记录过了。
|
||
// 我们只记录其他意想不到的错误。
|
||
if (!error.responseCode) {
|
||
logger.error('Failed to save email due to an unexpected error:', {
|
||
errorMessage: error.message,
|
||
errorStack: error.stack
|
||
});
|
||
}
|
||
// 重新抛出错误,以便上游的SMTPServer可以正确处理它。
|
||
throw error;
|
||
}
|
||
}
|
||
|
||
module.exports = { saveEmail };
|
||
|