email-unlimit/backend/saveEmail.js

108 lines
4.4 KiB
JavaScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

const { simpleParser } = require('mailparser');
const db = require('./db');
const emitter = require('./eventEmitter');
const { isRateLimited } = require('./rateLimiter');
const logger = require('./logger'); // 引入 logger
// Helper function to convert stream to buffer
function streamToBuffer(stream) {
return new Promise((resolve, reject) => {
const chunks = [];
stream.on('data', (chunk) => chunks.push(chunk));
stream.on('error', reject);
stream.on('end', () => resolve(Buffer.concat(chunks)));
});
}
async function saveEmail(stream) {
try {
// First, buffer the entire stream
const emailBuffer = await streamToBuffer(stream);
// Now, parse the buffered email content
const parsed = await simpleParser(emailBuffer);
const recipient = parsed.to ? parsed.to.text : 'undisclosed-recipients';
const sender = parsed.from ? parsed.from.text : 'unknown-sender';
//const rawEmail = emailBuffer.toString(); // 暂时去除 rawEmail不在保存到数据库
// 在这里进行速率限制检查
if (isRateLimited(sender)) {
// 记录被拒绝的事件
logger.warn(`Email from <${sender}> rejected due to rate limiting.`, {
sender: sender,
recipient: recipient,
action: 'rate-limit-reject'
});
// 如果被限流则抛出错误上游的SMTPServer会处理这个错误并拒绝邮件
const error = new Error(`4.7.1 Domain of <${sender}> has been temporarily blocked due to rate limiting. Please try again later.`);
error.responseCode = 421; // "Service not available, closing transmission channel"
throw error;
}
const subject = parsed.subject || 'No Subject';
const body = parsed.text || (parsed.html || '');
// Manually create a timestamp for 'Asia/Shanghai' timezone
const received_at = new Date().toLocaleString('sv-SE', {
timeZone: 'Asia/Shanghai'
});
const [result] = await db.execute(
'INSERT INTO emails (recipient, sender, subject, body, received_at) VALUES (?, ?, ?, ?, ?)',
[recipient, sender, subject, body, received_at]
);
// const [result] = await db.execute(
// 'INSERT INTO emails (recipient, sender, subject, body, raw) VALUES (?, ?, ?, ?, ?)',
// [recipient, sender, subject, body, rawEmail]
// );
const newEmailId = result.insertId;
logger.info(`Email from <${sender}> to <${recipient}> saved with ID: ${newEmailId}`, {
sender,
recipient,
subject,
emailId: newEmailId,
action: 'email-saved'
});
if (parsed.attachments && parsed.attachments.length > 0) {
for (const attachment of parsed.attachments) {
await db.execute(
'INSERT INTO email_attachments (email_id, filename, content_type, content) VALUES (?, ?, ?, ?)',
[newEmailId, attachment.filename, attachment.contentType, attachment.content]
);
logger.info(`Attachment ${attachment.filename} saved for email ID: ${newEmailId}`, {
filename: attachment.filename,
emailId: newEmailId,
action: 'attachment-saved'
});
}
}
// Emit an event with the new email's main information
const [rows] = await db.execute('SELECT id, sender, recipient, subject, body, received_at FROM emails WHERE id = ?', [newEmailId]);
if (rows.length > 0) {
emitter.emit('newEmail', rows[0]);
logger.info(`Event 'newEmail' emitted for email ID: ${newEmailId}`, {
emailId: newEmailId,
action: 'event-emitted'
});
}
} catch (error) {
// 如果错误是带有响应码的(例如我们的速率限制错误),它已经被记录过了。
// 我们只记录其他意想不到的错误。
if (!error.responseCode) {
logger.error('Failed to save email due to an unexpected error:', {
errorMessage: error.message,
errorStack: error.stack
});
}
// 重新抛出错误以便上游的SMTPServer可以正确处理它。
throw error;
}
}
module.exports = { saveEmail };