feat(backend): 实施连接与数据层速率限制

为了增强服务的稳定性和安全性,防止滥用行为,本次提交引入了两个核心的速率限制和验证机制:

1. **连接层验证 (`onConnect`)**
   - **IP 速率限制**:在 `connectionValidator.js` 中实现。限制单个 IP 地址每分钟最多 20 次连接,超过限制将临时封禁 5 分钟,以防御暴力连接攻击。
   - **反向 DNS (PTR) 检查**:要求所有公共网络连接必须具有有效的 PTR 记录,用于区分合法邮件服务器和僵尸网络。

2. **数据层验证 (`onData`)**
   - **发件人域速率限制**:在 `rateLimiter.js` 中实现。在邮件数据传输阶段,限制单个发件人域名每分钟最多发送 10 封邮件,超过限制的域名将被临时封禁 5 分钟,以防止单一来源的邮件泛滥。

**主要变更:**
- 新增 `backend/connectionValidator.js`:处理连接时的 IP 速率限制和 PTR 验证。
- 新增 `backend/rateLimiter.js`:在 `onData` 阶段对发件人域名进行速率限制。
- 在 `backend/app.js` 的 `SMTPServer` 配置中集成了 `onConnect` 验证钩子。
- 在 `backend/saveEmail.js` 中调用速率限制器,并在超限时拒绝邮件。
- 更新日志模块 (`logger.js`, `db.js`),输出更清晰,并记录安全相关事件。
- 新增 `backend/SECURITY_POLICIES.md` 文档,详细说明所有安全策略和可配置参数。
This commit is contained in:
shenjianZ 2025-08-01 22:59:40 +08:00
parent 3ad3849fde
commit be59fa85de
7 changed files with 338 additions and 11 deletions

View File

@ -0,0 +1,76 @@
# 邮件接收服务安全策略文档
本文档详细说明了为保护邮件接收服务而实施的各项安全与反滥用策略。这些策略旨在构建一个多层次的纵深防御体系,有效过滤垃圾邮件和恶意连接,同时保障服务的稳定性和安全性。
## 1. 连接层防御策略 (Connection-Level Policies)
这些策略在SMTP连接建立的最初阶段 (`onConnect`) 生效,以便在消耗最少资源的情况下,快速拒绝掉可疑的连接。
### 1.1 IP 地址频率限制 (IP-based Rate Limiting)
- **作用**: 防止单个IP地址在短时间内发起大量连接有效遏制暴力攻击和自动化脚本滥用。
- **策略**:
- **限制**: 每个IP地址每分钟最多允许 **20** 次连接。
- **惩罚**: 超过限制的IP地址将被封禁 **5** 分钟。
- **日志**:
- 当IP被封禁时会记录一条警告日志。
- 当连接因为IP被封禁或超速而被拒绝时会记录一条警告日志。
- **SMTP响应码**: `421` (服务不可用,请稍后重试)
- **配置文件**: `backend/connectionValidator.js`
### 1.2 反向DNS检查 (PTR Record Check)
- **作用**: 验证连接来源IP是否拥有一个有效的反向DNSPTR记录。这是区分正规邮件服务器和僵尸网络/垃圾邮件程序的有效手段。
- **策略**:
- 所有连接到本服务的公网IP地址都必须拥有一个可解析的PTR记录。
- 对于没有PTR记录的连接将直接拒绝。
- 本地和私有网络地址 (`127.0.0.1`, `10.x.x.x`, `192.168.x.x`, `::1`) 会被自动豁免。
- **日志**:
- 成功或失败的PTR查询都会被记录。
- **SMTP响应码**: `550` (请求的操作未执行,连接源不可信)
- **配置文件**: `backend/connectionValidator.js`
---
## 2. 数据层防御策略 (Data-Level Policies)
这些策略在SMTP连接建立后客户端开始发送邮件数据时 (`onData`) 生效,提供更精细的控制。
### 2.1 发件人域名频率限制 (Sender Domain Rate Limiting)
- **作用**: 在IP层验证通过后进一步限制来自**同一个发件人域名**的邮件接收频率。这可以防止某个合法来源(例如一个大型邮件服务提供商)下的单一账户被滥用。
- **策略**:
- **限制**: 每个发件人域名每分钟最多允许接收 **10** 封邮件。
- **惩罚**: 超过限制的域名将被封禁 **5** 分钟。
- **日志**:
- 当域名被封禁时,会记录一条警告日志。
- 当邮件因为域名被封禁或超速而被拒绝时,会记录一条警告日志。
- **SMTP响应码**: `421` (服务不可用,因策略限制请稍后重试)
- **配置文件**: `backend/rateLimiter.js`
---
## 3. 传输层安全 (Transport-Layer Security)
### 3.1 启用 STARTTLS
- **作用**: 允许客户端将一个普通的SMTP连接升级为安全的TLS加密连接。这可以保护邮件内容在传输过程中不被窃听或篡改。
- **策略**:
- 服务器在25端口上宣告支持 `STARTTLS`
- 优先使用加密连接。
- **配置文件**: `backend/app.js` (在 `SMTPServer` 的配置中)
---
## 4. 可配置参数详解
下表详细解释了各项策略中的可配置参数,您可以根据实际需求在对应的文件中进行调整。
| 参数 (Parameter) | 所在文件 (File Location) | 作用描述 | 默认值 | 单位 (Unit) |
| --------------------- | ------------------------------------ | ---------------------------------------------------------------------------------------------------- | ------------------ | -------------------- |
| `IP_RATE_LIMIT` | `backend/connectionValidator.js` | 在指定时间窗口内单个IP允许的最大**连接次数**。 | `20` | 次 (Connections) |
| `IP_TIME_WINDOW` | `backend/connectionValidator.js` | IP频率限制的时间窗口。 | `60 * 1000` | 毫秒 (1 分钟) |
| `IP_BAN_DURATION` | `backend/connectionValidator.js` | IP因超速被封禁的持续时间。 | `5 * 60 * 1000` | 毫秒 (5 分钟) |
| `RATE_LIMIT` | `backend/rateLimiter.js` | 在指定时间窗口内,单个**发件人域名**允许的最大**邮件数量**。 | `10` | 封 (Emails) |
| `TIME_WINDOW` | `backend/rateLimiter.js` | 域名频率限制的时间窗口。 | `60 * 1000` | 毫秒 (1 分钟) |
| `BAN_DURATION` | `backend/rateLimiter.js` | 域名因超速被封禁的持续时间。 | `5 * 60 * 1000` | 毫秒 (5 分钟) |

View File

@ -8,6 +8,7 @@ const { saveEmail } = require('./saveEmail');
const emitter = require('./eventEmitter');
const logger = require('./logger');
const morgan = require('morgan');
const { validateConnection } = require('./connectionValidator');
const app = express();
const apiPort = 5182;
@ -162,8 +163,13 @@ server.listen(apiPort, () => {
// Configure and start SMTP server
const smtpServer = new SMTPServer({
secure: false, // Enable STARTTLS
authOptional: true,
disabledCommands: ['AUTH'],
onConnect(session, callback) {
logger.info('Connection received from', { remoteAddress: session.remoteAddress });
return validateConnection(session, callback);
},
onData(stream, session, callback) {
logger.info('Receiving email...', { session });
saveEmail(stream)
@ -173,6 +179,10 @@ const smtpServer = new SMTPServer({
})
.catch(err => {
logger.error('Error processing email:', err);
// Check if the error is a rate limit error with a specific response code
if (err.responseCode) {
return callback(err);
}
callback(new Error('Failed to process email.'));
});
},

View File

@ -0,0 +1,100 @@
const dns = require('dns').promises;
const logger = require('./logger');
const IP_CONNECTION_COUNTS = new Map();
const BANNED_IPS = new Set();
const IP_RATE_LIMIT = 20; // 每个IP(服务器)每分钟最多20次连接
const IP_TIME_WINDOW = 60 * 1000; // 1分钟
const IP_BAN_DURATION = 5 * 60 * 1000; // 5分钟
/**
* 检查IP地址是否因为连接频率过高而被限制
* @param {string} remoteAddress 客户端IP地址
* @returns {boolean} 如果被限制则返回true否则返回false
*/
function isIpRateLimited(remoteAddress) {
if (BANNED_IPS.has(remoteAddress)) {
logger.warn(`Connection from banned IP ${remoteAddress} rejected.`);
return true;
}
const now = Date.now();
const requests = IP_CONNECTION_COUNTS.get(remoteAddress) || [];
const recentRequests = requests.filter(timestamp => now - timestamp < IP_TIME_WINDOW);
if (recentRequests.length >= IP_RATE_LIMIT) {
logger.warn(`IP ${remoteAddress} has exceeded the connection rate limit. Banning for ${IP_BAN_DURATION / 1000} seconds.`);
BANNED_IPS.add(remoteAddress);
setTimeout(() => {
BANNED_IPS.delete(remoteAddress);
logger.info(`IP ${remoteAddress} has been unbanned.`);
}, IP_BAN_DURATION);
IP_CONNECTION_COUNTS.delete(remoteAddress);
return true;
}
recentRequests.push(now);
IP_CONNECTION_COUNTS.set(remoteAddress, recentRequests);
return false;
}
/**
* 检查IP地址是否有有效的反向DNSPTR记录
* 正规的邮件服务器通常都有PTR记录
* @param {string} remoteAddress 客户端IP地址
* @returns {Promise<boolean>} 如果验证通过则返回true否则返回false
*/
async function hasValidPtrRecord(remoteAddress) {
// 对于本地和私有地址我们跳过检查因为它们通常没有公共PTR记录
if (remoteAddress.startsWith('127.') || remoteAddress.startsWith('192.168.') || remoteAddress.startsWith('10.') || remoteAddress.startsWith('::1')) {
return true;
}
try {
const hostnames = await dns.reverse(remoteAddress);
if (hostnames && hostnames.length > 0) {
logger.info(`PTR record for ${remoteAddress} found: ${hostnames.join(', ')}`);
return true;
}
logger.warn(`No PTR record found for ${remoteAddress}.`);
return false;
} catch (error) {
// 'ENOTFOUND' 是最常见的错误意味着没有找到PTR记录。
if (error.code === 'ENOTFOUND') {
logger.warn(`No PTR record found for ${remoteAddress}.`);
} else {
logger.error(`Error during PTR lookup for ${remoteAddress}:`, error);
}
return false;
}
}
/**
* 在连接建立时验证客户端
* @param {object} session SMTP会话对象
* @param {function} callback 回调函数
*/
async function validateConnection(session, callback) {
const { remoteAddress } = session;
// 1. IP频率限制检查
if (isIpRateLimited(remoteAddress)) {
const err = new Error('Connection rejected due to high frequency. Please try again later.');
err.responseCode = 421;
return callback(err);
}
// 2. 反向DNS检查
const hasPtr = await hasValidPtrRecord(remoteAddress);
if (!hasPtr) {
const err = new Error('Connection rejected: The IP address has no PTR record.');
err.responseCode = 550; // 550表示请求的操作未执行邮箱不可用在这里引申为连接源不可信
return callback(err);
}
// 所有检查通过
callback();
}
module.exports = { validateConnection };

View File

@ -15,13 +15,31 @@ const promisePool = pool.promise();
const originalExecute = promisePool.execute;
promisePool.execute = function(sql, params) {
logger.info('Executing SQL', { sql, params });
let loggableParams = params;
// For email insertion, only log recipient and sender to avoid large logs.
if (sql.startsWith('INSERT INTO emails') && Array.isArray(params) && params.length >= 2) {
loggableParams = {
recipient: params[0],
sender: params[1],
details: '(omitted for brevity)'
};
}
logger.info('Executing SQL', { sql, params: loggableParams });
return originalExecute.call(this, sql, params);
};
const originalQuery = promisePool.query;
promisePool.query = function(sql, params) {
logger.info('Executing SQL', { sql, params });
let loggableParams = params;
// For email insertion, only log recipient and sender to avoid large logs.
if (sql.startsWith('INSERT INTO emails') && Array.isArray(params) && params.length >= 2) {
loggableParams = {
recipient: params[0],
sender: params[1],
details: '(omitted for brevity)'
};
}
logger.info('Executing SQL', { sql, params: loggableParams });
return originalQuery.call(this, sql, params);
};

View File

@ -21,9 +21,25 @@ if (process.env.NODE_ENV !== 'production') {
logger.add(new winston.transports.Console({
format: winston.format.combine(
winston.format.colorize(),
winston.format.simple()
winston.format.printf(({ level, message, timestamp, stack }) => {
if (stack) {
// 打印错误堆栈
return `${timestamp} ${level}: ${message}\n${stack}`;
}
return `${timestamp} ${level}: ${message}`;
})
)
}));
}
// 在文件日志中也使用更清晰的格式
logger.transports.forEach(t => {
if (t.name === 'file') {
t.format = winston.format.combine(
winston.format.timestamp(),
winston.format.json()
);
}
});
module.exports = logger;

67
backend/rateLimiter.js Normal file
View File

@ -0,0 +1,67 @@
const logger = require('./logger');
const BANNED_DOMAINS = new Set();
const DOMAIN_REQUEST_COUNTS = new Map();
const RATE_LIMIT = 10; // 每分钟10封邮件
const TIME_WINDOW = 60 * 1000; // 1分钟
const BAN_DURATION = 5 * 60 * 1000; // 5分钟
function getDomainFromEmail(sender) {
if (!sender) {
return null;
}
let emailAddress = sender;
// 检查 "Name <email@domain.com>" 格式
const match = sender.match(/<([^>]+)>/);
if (match && match[1]) {
emailAddress = match[1]; // 提取 'email@domain.com'
}
// 现在,从(可能已清理的)电子邮件地址中提取域名
if (!emailAddress.includes('@')) {
return null;
}
return emailAddress.split('@')[1];
}
function isRateLimited(sender) {
const domain = getDomainFromEmail(sender);
if (!domain) {
// 如果无法从发件人中提取域名,则不进行速率限制
return false;
}
if (BANNED_DOMAINS.has(domain)) {
logger.warn(`Domain ${domain} is currently banned, rejecting email.`, { domain, action: 'reject-banned-domain' });
return true;
}
const now = Date.now();
const requests = DOMAIN_REQUEST_COUNTS.get(domain) || [];
// 过滤掉时间窗口之外的旧请求
const recentRequests = requests.filter(timestamp => now - timestamp < TIME_WINDOW);
if (recentRequests.length >= RATE_LIMIT) {
logger.warn(`Domain ${domain} has exceeded the rate limit. Banning for ${BAN_DURATION / 1000} seconds.`, { domain, action: 'ban-domain' });
BANNED_DOMAINS.add(domain);
// 设置解封计时器
setTimeout(() => {
BANNED_DOMAINS.delete(domain);
logger.info(`Domain ${domain} has been unbanned.`, { domain, action: 'unban-domain' });
}, BAN_DURATION);
// 清空该域名的请求记录
DOMAIN_REQUEST_COUNTS.delete(domain);
return true;
}
// 记录当前请求时间
recentRequests.push(now);
DOMAIN_REQUEST_COUNTS.set(domain, recentRequests);
return false;
}
module.exports = { isRateLimited };

View File

@ -1,6 +1,8 @@
const { simpleParser } = require('mailparser');
const db = require('./db');
const emitter = require('./eventEmitter');
const { isRateLimited } = require('./rateLimiter');
const logger = require('./logger'); // 引入 logger
// Helper function to convert stream to buffer
function streamToBuffer(stream) {
@ -19,20 +21,44 @@ async function saveEmail(stream) {
// Now, parse the buffered email content
const parsed = await simpleParser(emailBuffer);
const rawEmail = emailBuffer.toString();
const recipient = parsed.to ? parsed.to.text : 'undisclosed-recipients';
const sender = parsed.from ? parsed.from.text : 'unknown-sender';
//const rawEmail = emailBuffer.toString(); // 暂时去除 rawEmail不在保存到数据库
// 在这里进行速率限制检查
if (isRateLimited(sender)) {
// 记录被拒绝的事件
logger.warn(`Email from <${sender}> rejected due to rate limiting.`, {
sender: sender,
recipient: recipient,
action: 'rate-limit-reject'
});
// 如果被限流则抛出错误上游的SMTPServer会处理这个错误并拒绝邮件
const error = new Error(`4.7.1 Domain of <${sender}> has been temporarily blocked due to rate limiting. Please try again later.`);
error.responseCode = 421; // "Service not available, closing transmission channel"
throw error;
}
const subject = parsed.subject || 'No Subject';
const body = parsed.text || (parsed.html || '');
const [result] = await db.execute(
'INSERT INTO emails (recipient, sender, subject, body, raw) VALUES (?, ?, ?, ?, ?)',
[recipient, sender, subject, body, rawEmail]
'INSERT INTO emails (recipient, sender, subject, body) VALUES (?, ?, ?, ?)',
[recipient, sender, subject, body]
);
// const [result] = await db.execute(
// 'INSERT INTO emails (recipient, sender, subject, body, raw) VALUES (?, ?, ?, ?, ?)',
// [recipient, sender, subject, body, rawEmail]
// );
const newEmailId = result.insertId;
console.log(`Email from <${sender}> to <${recipient}> saved with ID: ${newEmailId}`);
logger.info(`Email from <${sender}> to <${recipient}> saved with ID: ${newEmailId}`, {
sender,
recipient,
subject,
emailId: newEmailId,
action: 'email-saved'
});
if (parsed.attachments && parsed.attachments.length > 0) {
for (const attachment of parsed.attachments) {
@ -40,7 +66,11 @@ async function saveEmail(stream) {
'INSERT INTO email_attachments (email_id, filename, content_type, content) VALUES (?, ?, ?, ?)',
[newEmailId, attachment.filename, attachment.contentType, attachment.content]
);
console.log(`Attachment ${attachment.filename} saved.`);
logger.info(`Attachment ${attachment.filename} saved for email ID: ${newEmailId}`, {
filename: attachment.filename,
emailId: newEmailId,
action: 'attachment-saved'
});
}
}
@ -48,12 +78,22 @@ async function saveEmail(stream) {
const [rows] = await db.execute('SELECT id, sender, recipient, subject, body, received_at FROM emails WHERE id = ?', [newEmailId]);
if (rows.length > 0) {
emitter.emit('newEmail', rows[0]);
logger.info(`Event 'newEmail' emitted for email ID: ${newEmailId}`, {
emailId: newEmailId,
action: 'event-emitted'
});
}
} catch (error) {
console.error('Failed to save email:', error);
// We should not exit the process here, but maybe throw the error
// so the caller (SMTPServer) can handle it.
// 如果错误是带有响应码的(例如我们的速率限制错误),它已经被记录过了。
// 我们只记录其他意想不到的错误。
if (!error.responseCode) {
logger.error('Failed to save email due to an unexpected error:', {
errorMessage: error.message,
errorStack: error.stack
});
}
// 重新抛出错误以便上游的SMTPServer可以正确处理它。
throw error;
}
}