feat: fix spark-processor spark args;fix data-collector tradetime

This commit is contained in:
2025-06-25 09:03:41 +08:00
parent bc1a3cecbc
commit ecfeb5899f
15 changed files with 1273 additions and 357 deletions

View File

@@ -0,0 +1,219 @@
# Spark 数据处理定时调度器
## 概述
该定时调度器功能为农业股票数据处理器提供了自动化的数据处理能力能够每30分钟自动执行一次完整的数据处理流程。
## 功能特性
-**自动定时执行**: 每30分钟自动运行一次Spark数据处理任务
-**实时监控**: 提供调度器状态查询功能
-**手动触发**: 支持手动触发任务执行
-**异常处理**: 完善的错误处理和日志记录
-**资源管理**: 自动管理Spark会话的创建和销毁
-**交互式控制**: 支持运行时命令交互
## 快速开始
### spark-submit
``` shell
spark-submit \
--class com.agricultural.spark.StockDataProcessor \
--master local[*] \
--driver-java-options "-Denvironment=prod" \
target/spark-data-processor-1.0.0.jar \
scheduler
```
### 1. 启动定时调度器
#### Linux/Mac 系统
```bash
# 进入spark-processor目录
cd spark-processor
# 启动定时调度器
./start-scheduler.sh
```
#### Windows 系统
```batch
# 进入spark-processor目录
cd spark-processor
# 启动定时调度器
start-scheduler.bat
```
### 2. 使用命令行参数启动
```bash
# 编译项目
mvn clean package -DskipTests
# 直接运行调度器模式
java -jar target/spark-data-processor-1.0.0.jar scheduler
```
## 交互式命令
启动调度器后,可以使用以下命令进行交互:
| 命令 | 说明 |
|------|------|
| `status` | 查看调度器当前状态 |
| `trigger` | 手动触发一次数据处理任务 |
| `help` | 显示所有可用命令 |
| `stop` | 停止调度器并退出程序 |
## 运行模式
### 调度器模式
```bash
java -jar target/spark-data-processor-1.0.0.jar scheduler
```
- 每30分钟自动执行一次数据处理
- 支持交互式命令控制
- 适用于生产环境长期运行
### 批处理模式
```bash
java -jar target/spark-data-processor-1.0.0.jar batch
# 或
java -jar target/spark-data-processor-1.0.0.jar
```
- 执行一次完整的数据处理流程后退出
- 适用于手动执行或脚本调用
## 配置说明
### 调度时间间隔修改
如需修改执行间隔时间,可以在 `SparkJobScheduler.java` 中修改:
```java
// 创建触发器 - 修改执行间隔
Trigger trigger = TriggerBuilder.newTrigger()
.withSchedule(SimpleScheduleBuilder.simpleSchedule()
.withIntervalInMinutes(30) // 修改这里的分钟数
.repeatForever())
.build();
```
### JVM 参数配置
在启动脚本中可以调整JVM参数
```bash
# Linux/Mac (start-scheduler.sh)
JVM_OPTS="-Xmx4g -Xms2g -XX:+UseG1GC"
# Windows (start-scheduler.bat)
set JVM_OPTS=-Xmx4g -Xms2g -XX:+UseG1GC
```
## 日志管理
### 日志文件位置
- 应用日志: `logs/agricultural-stock-platform.log`
- Spark处理日志: 包含在应用日志中
### 日志级别
- INFO: 正常运行信息
- WARN: 警告信息(如数据库连接异常)
- ERROR: 错误信息
## 监控和故障排除
### 1. 检查调度器状态
```
status
```
### 2. 查看日志文件
```bash
# 实时查看日志
tail -f logs/agricultural-stock-platform.log
# 搜索错误日志
grep ERROR logs/agricultural-stock-platform.log
```
### 3. 常见问题
#### 问题1: 内存不足
**症状**: OutOfMemoryError 异常
**解决**: 增加JVM堆内存大小
```bash
JVM_OPTS="-Xmx8g -Xms4g" # 增加内存
```
#### 问题2: 数据库连接失败
**症状**: 无法连接到MySQL数据库
**解决**: 检查数据库配置和网络连接
```
# 检查配置文件
src/main/resources/application-prod.conf
```
#### 问题3: Spark任务执行失败
**症状**: Spark处理过程中出现异常
**解决**:
1. 检查数据源连接
2. 验证数据格式
3. 检查磁盘空间
## 性能优化
### 1. Spark配置优化
```java
// 在SparkConfig中调整参数
spark.sql.adaptive.enabled=true
spark.sql.adaptive.coalescePartitions.enabled=true
```
### 2. 资源配置
- 根据数据量调整执行器内存
- 合理设置并行度
- 优化分区策略
### 3. 调度频率
- 根据数据更新频率调整执行间隔
- 避免任务重叠执行
## 部署建议
### 生产环境
1. 使用系统服务管理调度器
2. 配置日志轮转
3. 设置监控报警
4. 定期备份配置文件
### 开发环境
1. 使用较小的内存配置
2. 增加日志详细程度
3. 缩短执行间隔进行测试
## 技术架构
```
定时调度器架构:
┌─────────────────┐ ┌──────────────┐ ┌─────────────────┐
│ Quartz │ │ Spark │ │ MySQL │
│ Scheduler │───▶│ Processor │───▶│ Database │
│ │ │ │ │ │
└─────────────────┘ └──────────────┘ └─────────────────┘
│ │
▼ ▼
┌─────────────────┐ ┌──────────────┐
│ 用户交互接口 │ │ 日志系统 │
│ (命令行) │ │ │
└─────────────────┘ └──────────────┘
```
## 版本更新记录
### v1.0.0
- 初始版本
- 支持每30分钟定时执行
- 提供基本的交互式命令
- 集成Quartz调度框架

View File

@@ -95,6 +95,13 @@
<version>3.12.0</version>
</dependency>
<!-- Quartz Scheduler -->
<dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz</artifactId>
<version>2.3.2</version>
</dependency>
<!-- Test Dependencies -->
<dependency>
<groupId>junit</groupId>

View File

@@ -0,0 +1,37 @@
[Unit]
Description=Agricultural Stock Spark Data Processor Scheduler
Documentation=https://github.com/your-repo/agricultural-stock-platform
After=network.target mysql.service
[Service]
Type=simple
User=spark
Group=spark
WorkingDirectory=/opt/agricultural-stock-platform/spark-processor
ExecStart=/usr/bin/java -Xmx4g -Xms2g -XX:+UseG1GC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -Dlogback.configurationFile=src/main/resources/logback.xml -Dconfig.file=src/main/resources/application-prod.conf -jar target/spark-data-processor-1.0.0.jar scheduler
ExecStop=/bin/kill -TERM $MAINPID
Restart=on-failure
RestartSec=30
StandardOutput=journal
StandardError=journal
SyslogIdentifier=spark-scheduler
# 环境变量
Environment=JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64
Environment=SPARK_HOME=/opt/spark
Environment=PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin
# 资源限制
LimitNOFILE=65536
LimitNPROC=4096
# 安全设置
NoNewPrivileges=true
PrivateTmp=true
ProtectSystem=strict
ProtectHome=true
ReadWritePaths=/opt/agricultural-stock-platform/spark-processor/logs
ReadWritePaths=/tmp/spark-warehouse
[Install]
WantedBy=multi-user.target

View File

@@ -1,6 +1,7 @@
package com.agricultural.spark;
import com.agricultural.spark.config.SparkConfig;
import com.agricultural.spark.scheduler.SparkJobScheduler;
import com.agricultural.spark.service.DataCleaningService;
import com.agricultural.spark.service.DatabaseSaveService;
import com.agricultural.spark.service.MarketAnalysisService;
@@ -13,6 +14,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
import java.util.Scanner;
/**
* 农业股票数据处理器主类
@@ -201,25 +203,64 @@ public class StockDataProcessor {
logger.info("农业股票数据处理器启动");
StockDataProcessor processor = null;
SparkJobScheduler scheduler = null;
Scanner scanner = null;
try {
// 加载配置
SparkConfig config = SparkConfig.load();
processor = new StockDataProcessor(config);
if (args.length > 0) {
String mode = args[0];
switch (mode) {
case "scheduler":
// 定时调度模式 - 每30分钟执行一次
logger.info("启动定时调度模式 - 每30分钟执行一次数据处理");
scheduler = new SparkJobScheduler(config);
scheduler.start();
// 保持程序运行,监听用户输入
scanner = new Scanner(System.in);
logger.info("调度器已启动,输入命令:");
logger.info(" 'status' - 查看调度器状态");
logger.info(" 'trigger' - 手动触发一次任务执行");
logger.info(" 'stop' - 停止调度器并退出程序");
String command;
while (!(command = scanner.nextLine().trim().toLowerCase()).equals("stop")) {
switch (command) {
case "status":
logger.info("调度器状态: {}", scheduler.getSchedulerStatus());
break;
case "trigger":
scheduler.triggerJobNow();
break;
case "help":
logger.info("可用命令: status, trigger, stop, help");
break;
default:
logger.info("未知命令: {},输入 'help' 查看可用命令", command);
break;
}
}
break;
case "stream":
// 流处理模式
processor = new StockDataProcessor(config);
processor.runStreamProcessing();
break;
case "batch":
default:
// 默认批处理模式
// 批处理模式
processor = new StockDataProcessor(config);
processor.runBatchProcessing();
break;
}
} else {
// 默认批处理模式
processor = new StockDataProcessor(config);
processor.runBatchProcessing();
}
@@ -227,9 +268,16 @@ public class StockDataProcessor {
logger.error("程序运行过程中出现错误", e);
System.exit(1);
} finally {
// 清理资源
if (scheduler != null) {
scheduler.stop();
}
if (processor != null) {
processor.close();
}
if (scanner != null) {
scanner.close();
}
logger.info("农业股票数据处理器已停止");
}
}

View File

@@ -0,0 +1,161 @@
package com.agricultural.spark.scheduler;
import com.agricultural.spark.StockDataProcessor;
import com.agricultural.spark.config.SparkConfig;
import org.quartz.*;
import org.quartz.impl.StdSchedulerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Spark任务定时调度器
* 使用Quartz调度器实现定时执行Spark数据处理任务
*
* @author Agricultural Stock Platform Team
*/
public class SparkJobScheduler {
private static final Logger logger = LoggerFactory.getLogger(SparkJobScheduler.class);
private Scheduler scheduler;
private final SparkConfig sparkConfig;
public SparkJobScheduler(SparkConfig sparkConfig) {
this.sparkConfig = sparkConfig;
}
/**
* 启动定时调度器
*/
public void start() {
try {
logger.info("正在启动Spark任务定时调度器...");
// 创建调度器工厂
SchedulerFactory schedulerFactory = new StdSchedulerFactory();
scheduler = schedulerFactory.getScheduler();
// 定义任务详情
JobDetail jobDetail = JobBuilder.newJob(SparkProcessingJob.class)
.withIdentity("sparkProcessingJob", "sparkGroup")
.withDescription("农业股票数据处理定时任务")
.usingJobData("sparkConfig", sparkConfig.toString()) // 传递配置
.build();
// 创建触发器 - 每30分钟执行一次
Trigger trigger = TriggerBuilder.newTrigger()
.withIdentity("sparkProcessingTrigger", "sparkGroup")
.withDescription("每30分钟执行一次Spark数据处理")
.startNow()
.withSchedule(SimpleScheduleBuilder.simpleSchedule()
.withIntervalInMinutes(30)
.repeatForever())
.build();
// 调度任务
scheduler.scheduleJob(jobDetail, trigger);
// 启动调度器
scheduler.start();
logger.info("Spark任务定时调度器启动成功每30分钟执行一次数据处理任务");
} catch (SchedulerException e) {
logger.error("启动定时调度器失败", e);
throw new RuntimeException("定时调度器启动失败", e);
}
}
/**
* 停止定时调度器
*/
public void stop() {
try {
if (scheduler != null && !scheduler.isShutdown()) {
logger.info("正在停止Spark任务定时调度器...");
scheduler.shutdown(true);
logger.info("Spark任务定时调度器已停止");
}
} catch (SchedulerException e) {
logger.error("停止定时调度器失败", e);
}
}
/**
* 手动触发一次任务执行
*/
public void triggerJobNow() {
try {
if (scheduler != null && scheduler.isStarted()) {
JobKey jobKey = new JobKey("sparkProcessingJob", "sparkGroup");
scheduler.triggerJob(jobKey);
logger.info("已手动触发Spark数据处理任务");
}
} catch (SchedulerException e) {
logger.error("手动触发任务失败", e);
}
}
/**
* 获取调度器状态信息
*/
public String getSchedulerStatus() {
try {
if (scheduler == null) {
return "调度器未初始化";
}
if (scheduler.isStarted()) {
return "调度器运行中";
} else if (scheduler.isShutdown()) {
return "调度器已停止";
} else {
return "调度器未启动";
}
} catch (SchedulerException e) {
logger.error("获取调度器状态失败", e);
return "调度器状态未知";
}
}
/**
* Quartz任务类
*/
public static class SparkProcessingJob implements Job {
private static final Logger jobLogger = LoggerFactory.getLogger(SparkProcessingJob.class);
@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
jobLogger.info("开始执行定时Spark数据处理任务...");
StockDataProcessor processor = null;
try {
// 加载配置
SparkConfig config = SparkConfig.load();
// 创建处理器实例
processor = new StockDataProcessor(config);
// 执行批处理任务
processor.runBatchProcessing();
jobLogger.info("定时Spark数据处理任务执行完成");
} catch (Exception e) {
jobLogger.error("定时Spark数据处理任务执行失败", e);
throw new JobExecutionException("Spark数据处理任务失败", e);
} finally {
// 确保关闭Spark会话
if (processor != null) {
try {
processor.close();
} catch (Exception e) {
jobLogger.warn("关闭Spark会话时出现警告", e);
}
}
}
}
}
}

View File

@@ -5,11 +5,11 @@ spark {
}
mysql {
host = "localhost"
host = "43.143.145.172"
port = 3306
database = "agricultural_stock"
user = "root"
password = "root"
password = "kyff145972"
}
output {

View File

@@ -1,20 +0,0 @@
# Docker环境配置
spark {
master = "spark://spark-master:7077"
app-name = "agricultural-stock-data-processor-docker"
}
mysql {
host = "mysql"
port = 3306
database = "agricultural_stock"
user = "root"
password = "123456"
}
output {
path = "/app/spark-output"
}
# 生产模式
debug = false

View File

@@ -5,7 +5,7 @@ spark {
}
mysql {
host = ${MYSQL_HOST:-"localhost"}
host = ${MYSQL_HOST:-"43.143.145.172"}
port = ${MYSQL_PORT:-3306}
database = ${MYSQL_DATABASE:-"agricultural_stock"}
user = ${MYSQL_USER:-"root"}

View File

@@ -0,0 +1,79 @@
#!/bin/bash
# 农业股票数据处理器 - 定时调度器启动脚本
# 每30分钟自动执行一次数据处理任务
echo "==================================="
echo "农业股票数据处理器 - 定时调度器模式"
echo "==================================="
echo "该模式将每30分钟自动执行一次数据处理"
echo "启动中..."
echo ""
# 设置Java环境变量如果需要
# export JAVA_HOME=/path/to/java
# 设置Spark环境变量如果需要
# export SPARK_HOME=/path/to/spark
# 获取当前脚本所在目录
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
# 进入项目目录
cd "$SCRIPT_DIR"
# 检查target目录是否存在编译后的jar文件
JAR_FILE="target/spark-data-processor-1.0.0.jar"
if [ ! -f "$JAR_FILE" ]; then
echo "未找到编译后的jar文件正在编译项目..."
# 检查是否有maven
if ! command -v mvn &> /dev/null; then
echo "错误: 未找到Maven命令请确保Maven已安装并配置到PATH中"
exit 1
fi
# 编译项目
echo "执行: mvn clean package -DskipTests"
mvn clean package -DskipTests
if [ $? -ne 0 ]; then
echo "项目编译失败,请检查错误信息"
exit 1
fi
fi
# 检查配置文件
CONFIG_FILE="src/main/resources/application-prod.conf"
if [ ! -f "$CONFIG_FILE" ]; then
echo "警告: 未找到生产环境配置文件 $CONFIG_FILE"
echo "将使用开发环境配置"
CONFIG_FILE="src/main/resources/application-dev.conf"
fi
# 设置JVM参数
JVM_OPTS="-Xmx4g -Xms2g -XX:+UseG1GC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps"
# 设置日志配置
LOG_CONFIG="-Dlogback.configurationFile=src/main/resources/logback.xml"
# 启动应用
echo "启动定时调度器..."
echo "配置文件: $CONFIG_FILE"
echo "JVM参数: $JVM_OPTS"
echo ""
echo "使用以下命令与调度器交互:"
echo " status - 查看调度器状态"
echo " trigger - 手动触发一次任务"
echo " stop - 停止调度器"
echo " help - 显示帮助信息"
echo ""
# 执行Java程序
java $JVM_OPTS $LOG_CONFIG \
-Dconfig.file="$CONFIG_FILE" \
-jar "$JAR_FILE" scheduler
echo ""
echo "定时调度器已停止"