4.6 KiB
4.6 KiB
🗄️ 农业股票数据处理器 - MySQL数据库存储
📋 概述
本文档说明如何配置和使用农业股票数据处理器的MySQL数据库存储功能。处理器现在支持将Spark分析结果直接保存到MySQL数据库中,而不仅仅是文件系统。
运行
spark-submit \
--class com.agricultural.spark.StockDataProcessor \
--master local[*] \
--files application-prod.conf \
spark-data-processor-1.0.0.jar
🔧 数据库配置
1. 数据库表结构
首先需要创建必要的数据库表。执行以下SQL脚本:
# 执行扩展表结构脚本
mysql -u your_username -p your_database < database_tables.sql
2. 配置文件
在 application.conf 中配置数据库连接(如果文件不存在,程序会使用默认值):
mysql {
host = "localhost"
port = 3306
database = "agricultural_stock"
user = "root"
password = "your_password"
}
spark {
master = "local[*]"
}
output {
path = "/tmp/spark-output" # 备用文件输出路径
}
📊 数据存储结构
处理结果存储到以下表:
1. market_analysis - 市场分析表
存储每日市场总览数据:
- 上涨/下跌/平盘股票数量
- 总市值、成交量、成交额
- 平均涨跌幅
2. stock_technical_indicators - 技术指标表
存储股票技术指标:
- 移动平均线 (MA5, MA10, MA20, MA30)
- RSI相对强弱指标
- MACD指标 (DIF, DEA)
- 布林带 (上轨、中轨、下轨)
3. industry_analysis - 行业分析表
存储行业表现数据:
- 行业平均涨跌幅
- 行业股票数量
- 行业总市值和成交量
4. market_trends - 市场趋势表
存储历史趋势数据:
- 每日平均价格和涨跌幅
- 每日总成交量和成交额
🚀 运行方式
1. 标准运行
# 编译项目
mvn clean compile
# 运行处理器(批处理模式)
mvn exec:java -Dexec.mainClass="com.agricultural.spark.StockDataProcessor"
2. 打包运行
# 打包成可执行JAR
mvn package
# 运行JAR包
java -jar target/spark-data-processor-1.0.0.jar
📈 数据流程
MySQL stock_data (输入)
↓
数据清洗和处理
↓
技术指标计算
↓
市场分析计算
↓
保存到MySQL数据库表
↓ ↓
主要表 备用文件
🔍 数据查询示例
查看最新市场分析
SELECT * FROM market_analysis
ORDER BY analysis_date DESC
LIMIT 1;
查看特定股票技术指标
SELECT stock_name, trade_date, close_price, ma5, ma20, rsi
FROM stock_technical_indicators
WHERE stock_code = 'sz000876'
ORDER BY trade_date DESC
LIMIT 30;
查看行业表现排行
SELECT industry, avg_change_percent, stock_count, total_market_cap
FROM industry_analysis
WHERE analysis_date = (SELECT MAX(analysis_date) FROM industry_analysis)
ORDER BY avg_change_percent DESC;
查看市场趋势
SELECT trade_date, avg_change_percent, total_volume
FROM market_trends
ORDER BY trade_date DESC
LIMIT 30;
⚠️ 注意事项
1. 错误处理
- 程序会首先测试数据库连接
- 如果数据库连接失败,会回退到文件保存模式
- 所有数据库操作都有异常处理,不会中断主流程
2. 性能考虑
- 技术指标数据量较大,采用批量写入方式
- 市场分析数据使用
ON DUPLICATE KEY UPDATE避免重复 - 添加了必要的数据库索引优化查询性能
3. 数据一致性
market_analysis表按日期去重- 技术指标表按股票代码和日期建立组合索引
- 所有表都包含创建时间和更新时间字段
🛠️ 故障排除
1. 数据库连接失败
检查配置文件中的数据库连接信息:
- 主机地址和端口
- 数据库名称
- 用户名和密码
- 确保MySQL服务正在运行
2. 表不存在错误
确保已执行 database_tables.sql 脚本创建所有必要的表。
3. 权限问题
确保数据库用户具有以下权限:
- SELECT (读取 stock_data)
- INSERT (写入分析结果)
- UPDATE (更新已有数据)
4. 内存不足
对于大量数据,可能需要调整Spark配置:
# 增加内存限制
export SPARK_DRIVER_MEMORY=2g
export SPARK_EXECUTOR_MEMORY=2g
📝 日志说明
程序运行时会输出详细日志,包括:
- 数据库连接状态
- 各阶段处理进度
- 数据保存结果
- 错误信息和回退操作
关键日志信息:
数据库连接测试成功- 数据库连接正常市场分析结果保存成功- 数据已保存到数据库保存到数据库失败,回退到文件保存- 使用备用保存方式