agricultural-sock-amalysis/spark-processor/README_DATABASE.md

4.6 KiB
Raw Blame History

🗄️ 农业股票数据处理器 - MySQL数据库存储

📋 概述

本文档说明如何配置和使用农业股票数据处理器的MySQL数据库存储功能。处理器现在支持将Spark分析结果直接保存到MySQL数据库中而不仅仅是文件系统。

运行

spark-submit \
  --class com.agricultural.spark.StockDataProcessor \
  --master local[*] \
  --files application-prod.conf \
  spark-data-processor-1.0.0.jar

🔧 数据库配置

1. 数据库表结构

首先需要创建必要的数据库表。执行以下SQL脚本

# 执行扩展表结构脚本
mysql -u your_username -p your_database < database_tables.sql

2. 配置文件

application.conf 中配置数据库连接(如果文件不存在,程序会使用默认值):

mysql {
  host = "localhost"
  port = 3306
  database = "agricultural_stock"
  user = "root"
  password = "your_password"
}

spark {
  master = "local[*]"
}

output {
  path = "/tmp/spark-output"  # 备用文件输出路径
}

📊 数据存储结构

处理结果存储到以下表:

1. market_analysis - 市场分析表

存储每日市场总览数据:

  • 上涨/下跌/平盘股票数量
  • 总市值、成交量、成交额
  • 平均涨跌幅

2. stock_technical_indicators - 技术指标表

存储股票技术指标:

  • 移动平均线 (MA5, MA10, MA20, MA30)
  • RSI相对强弱指标
  • MACD指标 (DIF, DEA)
  • 布林带 (上轨、中轨、下轨)

3. industry_analysis - 行业分析表

存储行业表现数据:

  • 行业平均涨跌幅
  • 行业股票数量
  • 行业总市值和成交量

存储历史趋势数据:

  • 每日平均价格和涨跌幅
  • 每日总成交量和成交额

🚀 运行方式

1. 标准运行

# 编译项目
mvn clean compile

# 运行处理器(批处理模式)
mvn exec:java -Dexec.mainClass="com.agricultural.spark.StockDataProcessor"

2. 打包运行

# 打包成可执行JAR
mvn package

# 运行JAR包
java -jar target/spark-data-processor-1.0.0.jar

📈 数据流程

MySQL stock_data (输入)
        ↓
   数据清洗和处理
        ↓
    技术指标计算
        ↓
    市场分析计算
        ↓
保存到MySQL数据库表
     ↓    ↓
 主要表  备用文件

🔍 数据查询示例

查看最新市场分析

SELECT * FROM market_analysis 
ORDER BY analysis_date DESC 
LIMIT 1;

查看特定股票技术指标

SELECT stock_name, trade_date, close_price, ma5, ma20, rsi
FROM stock_technical_indicators 
WHERE stock_code = 'sz000876' 
ORDER BY trade_date DESC 
LIMIT 30;

查看行业表现排行

SELECT industry, avg_change_percent, stock_count, total_market_cap
FROM industry_analysis 
WHERE analysis_date = (SELECT MAX(analysis_date) FROM industry_analysis)
ORDER BY avg_change_percent DESC;

查看市场趋势

SELECT trade_date, avg_change_percent, total_volume
FROM market_trends 
ORDER BY trade_date DESC 
LIMIT 30;

⚠️ 注意事项

1. 错误处理

  • 程序会首先测试数据库连接
  • 如果数据库连接失败,会回退到文件保存模式
  • 所有数据库操作都有异常处理,不会中断主流程

2. 性能考虑

  • 技术指标数据量较大,采用批量写入方式
  • 市场分析数据使用 ON DUPLICATE KEY UPDATE 避免重复
  • 添加了必要的数据库索引优化查询性能

3. 数据一致性

  • market_analysis 表按日期去重
  • 技术指标表按股票代码和日期建立组合索引
  • 所有表都包含创建时间和更新时间字段

🛠️ 故障排除

1. 数据库连接失败

检查配置文件中的数据库连接信息:

  • 主机地址和端口
  • 数据库名称
  • 用户名和密码
  • 确保MySQL服务正在运行

2. 表不存在错误

确保已执行 database_tables.sql 脚本创建所有必要的表。

3. 权限问题

确保数据库用户具有以下权限:

  • SELECT (读取 stock_data)
  • INSERT (写入分析结果)
  • UPDATE (更新已有数据)

4. 内存不足

对于大量数据可能需要调整Spark配置

# 增加内存限制
export SPARK_DRIVER_MEMORY=2g
export SPARK_EXECUTOR_MEMORY=2g

📝 日志说明

程序运行时会输出详细日志,包括:

  • 数据库连接状态
  • 各阶段处理进度
  • 数据保存结果
  • 错误信息和回退操作

关键日志信息:

  • 数据库连接测试成功 - 数据库连接正常
  • 市场分析结果保存成功 - 数据已保存到数据库
  • 保存到数据库失败,回退到文件保存 - 使用备用保存方式