434 lines
17 KiB
Python
434 lines
17 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
农业股票数据采集器
|
||
根据项目需求从腾讯行情接口采集农业类上市公司数据
|
||
"""
|
||
|
||
import requests
|
||
import pandas as pd
|
||
import json
|
||
import time
|
||
import logging
|
||
import schedule
|
||
from datetime import datetime, timedelta
|
||
from typing import List, Dict, Optional
|
||
import mysql.connector
|
||
# Kafka导入已移除
|
||
import configparser
|
||
|
||
# 日志配置
|
||
logging.basicConfig(
|
||
level=logging.INFO,
|
||
format='%(asctime)s - %(levelname)s - %(message)s',
|
||
handlers=[
|
||
logging.FileHandler('stock_crawler.log', encoding='utf-8'),
|
||
logging.StreamHandler()
|
||
]
|
||
)
|
||
logger = logging.getLogger(__name__)
|
||
|
||
class StockCrawler:
|
||
"""股票数据爬虫类"""
|
||
|
||
def __init__(self, config_file='config.ini'):
|
||
"""初始化爬虫"""
|
||
self.config = self.load_config(config_file)
|
||
self.session = requests.Session()
|
||
self.session.headers.update({
|
||
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
|
||
})
|
||
|
||
# 农业类股票代码列表(示例)
|
||
self.agricultural_stocks = [
|
||
'sz300189', # 神农种业
|
||
'sz000713', # 丰乐钟业
|
||
'sh600313', # 农发种业
|
||
'bj837403', # 康农种业
|
||
'sz200505', # 京粮B
|
||
'sz300268', # st佳沃
|
||
'sz000930', # 中粮科技
|
||
'sz002299', # 圣农发展
|
||
'sh600371', # 万向德农
|
||
'sh600598', # 北大荒
|
||
'sh603609', # 禾丰股份
|
||
'bj831087',
|
||
'sh603363',
|
||
'sh603336',
|
||
'sh600354',
|
||
'sz002385',
|
||
'sz000048',
|
||
'sh600251',
|
||
'sz002321',
|
||
'sz000505',
|
||
'sz001366',
|
||
'sz002772',
|
||
'sz002041',
|
||
'sh600127',
|
||
'sz002548',
|
||
'sh605296',
|
||
'sh600359',
|
||
'sh601952',
|
||
'sh600975',
|
||
# 新增股票代码
|
||
'sz300505',
|
||
'sh600141',
|
||
'sz000893',
|
||
'sh600691',
|
||
'sh600470',
|
||
'sz000408',
|
||
'sz000902',
|
||
'sz002539',
|
||
'sz002599',
|
||
'sz002545',
|
||
'sz300387',
|
||
'sz002274',
|
||
'sz002470',
|
||
'sz002538',
|
||
'sz000731',
|
||
'sh600078',
|
||
'sh600227',
|
||
'sh603395',
|
||
'sz002588',
|
||
'sz000422',
|
||
'sz002556',
|
||
'sz002312',
|
||
'sz002170',
|
||
'sz000792',
|
||
'sh600096',
|
||
# 农药兽药类股票
|
||
'sz001231',
|
||
'sz002731',
|
||
'sz200553',
|
||
'sh603810',
|
||
'sh603970',
|
||
'sz301035',
|
||
'sz002868',
|
||
'sz003042',
|
||
'sz002391',
|
||
'bj870866',
|
||
'sz301665',
|
||
'sh600486',
|
||
]
|
||
|
||
# 初始化数据库连接
|
||
self.init_db_connection()
|
||
|
||
# Kafka生产者初始化已移除
|
||
|
||
def load_config(self, config_file: str) -> configparser.ConfigParser:
|
||
"""加载配置文件"""
|
||
config = configparser.ConfigParser()
|
||
try:
|
||
config.read(config_file, encoding='utf-8')
|
||
except FileNotFoundError:
|
||
logger.warning(f"配置文件 {config_file} 不存在,使用默认配置")
|
||
except UnicodeDecodeError:
|
||
logger.warning(f"配置文件编码错误,使用默认配置")
|
||
return config
|
||
|
||
def init_db_connection(self):
|
||
"""初始化数据库连接"""
|
||
try:
|
||
self.db_connection = mysql.connector.connect(
|
||
host=self.config.get('database', 'host', fallback='localhost'),
|
||
port=self.config.getint('database', 'port', fallback=3306),
|
||
user=self.config.get('database', 'user', fallback='root'),
|
||
password=self.config.get('database', 'password', fallback='123456'),
|
||
database=self.config.get('database', 'database', fallback='agricultural_stock'),
|
||
charset='utf8mb4'
|
||
)
|
||
logger.info("数据库连接成功")
|
||
except Exception as e:
|
||
logger.error(f"数据库连接失败: {e}")
|
||
self.db_connection = None
|
||
|
||
# Kafka生产者初始化方法已移除
|
||
|
||
def fetch_stock_data(self, stock_code: str) -> Optional[Dict]:
|
||
"""
|
||
从腾讯行情接口获取单个股票数据
|
||
|
||
Args:
|
||
stock_code: 股票代码,格式如 'sz300630' 或 'sh600998'
|
||
|
||
Returns:
|
||
股票数据字典或None
|
||
"""
|
||
url = f"http://qt.gtimg.cn/q={stock_code}"
|
||
|
||
try:
|
||
response = self.session.get(url, timeout=10)
|
||
response.raise_for_status()
|
||
|
||
# 解析响应数据
|
||
content = response.text.strip()
|
||
if not content or 'v_' not in content:
|
||
logger.warning(f"股票 {stock_code} 无数据返回")
|
||
return None
|
||
|
||
# 提取数据部分
|
||
data_part = content.split('="')[1].split('";')[0]
|
||
fields = data_part.split('~')
|
||
|
||
if len(fields) < 50:
|
||
logger.warning(f"股票 {stock_code} 数据字段不完整")
|
||
return None
|
||
|
||
# 构造股票数据字典
|
||
stock_data = {
|
||
'stock_code': stock_code,
|
||
'stock_name': fields[1],
|
||
'current_price': float(fields[3]) if fields[3] else 0.0,
|
||
'yesterday_close': float(fields[4]) if fields[4] else 0.0,
|
||
'open_price': float(fields[5]) if fields[5] else 0.0,
|
||
'volume': int(fields[6]) if fields[6] else 0,
|
||
'outer_volume': int(fields[7]) if fields[7] else 0,
|
||
'inner_volume': int(fields[8]) if fields[8] else 0,
|
||
'buy1_price': float(fields[9]) if fields[9] else 0.0,
|
||
'buy1_volume': int(fields[10]) if fields[10] else 0,
|
||
'buy2_price': float(fields[11]) if fields[11] else 0.0,
|
||
'buy2_volume': int(fields[12]) if fields[12] else 0,
|
||
'buy3_price': float(fields[13]) if fields[13] else 0.0,
|
||
'buy3_volume': int(fields[14]) if fields[14] else 0,
|
||
'buy4_price': float(fields[15]) if fields[15] else 0.0,
|
||
'buy4_volume': int(fields[16]) if fields[16] else 0,
|
||
'buy5_price': float(fields[17]) if fields[17] else 0.0,
|
||
'buy5_volume': int(fields[18]) if fields[18] else 0,
|
||
'sell1_price': float(fields[19]) if fields[19] else 0.0,
|
||
'sell1_volume': int(fields[20]) if fields[20] else 0,
|
||
'sell2_price': float(fields[21]) if fields[21] else 0.0,
|
||
'sell2_volume': int(fields[22]) if fields[22] else 0,
|
||
'sell3_price': float(fields[23]) if fields[23] else 0.0,
|
||
'sell3_volume': int(fields[24]) if fields[24] else 0,
|
||
'sell4_price': float(fields[25]) if fields[25] else 0.0,
|
||
'sell4_volume': int(fields[26]) if fields[26] else 0,
|
||
'sell5_price': float(fields[27]) if fields[27] else 0.0,
|
||
'sell5_volume': int(fields[28]) if fields[28] else 0,
|
||
'latest_deals': fields[29],
|
||
'trade_time': fields[30],
|
||
'change_amount': float(fields[31]) if fields[31] else 0.0,
|
||
'change_percent': float(fields[32]) if fields[32] else 0.0,
|
||
'high_price': float(fields[33]) if fields[33] else 0.0,
|
||
'low_price': float(fields[34]) if fields[34] else 0.0,
|
||
'price_volume_ratio': fields[35],
|
||
'volume_ratio': fields[36],
|
||
'turnover_rate': float(fields[37]) if fields[37] else 0.0,
|
||
'pe_ratio': float(fields[38]) if fields[38] else 0.0,
|
||
'pb_ratio': float(fields[46]) if len(fields) > 46 and fields[46] else 0.0,
|
||
'market_cap': float(fields[44]) if len(fields) > 44 and fields[44] else 0.0,
|
||
'float_market_cap': float(fields[45]) if len(fields) > 45 and fields[45] else 0.0,
|
||
'crawl_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
||
}
|
||
|
||
# 计算派生字段
|
||
if stock_data['current_price'] > 0 and stock_data['yesterday_close'] > 0:
|
||
stock_data['change_amount'] = stock_data['current_price'] - stock_data['yesterday_close']
|
||
stock_data['change_percent'] = (stock_data['change_amount'] / stock_data['yesterday_close']) * 100
|
||
|
||
# 计算成交额
|
||
stock_data['turnover'] = stock_data['volume'] * stock_data['current_price']
|
||
|
||
logger.info(f"成功获取股票 {stock_code} 数据: {stock_data['stock_name']}")
|
||
return stock_data
|
||
|
||
except requests.exceptions.RequestException as e:
|
||
logger.error(f"请求股票 {stock_code} 数据失败: {e}")
|
||
return None
|
||
except (ValueError, IndexError) as e:
|
||
logger.error(f"解析股票 {stock_code} 数据失败: {e}")
|
||
return None
|
||
except Exception as e:
|
||
logger.error(f"获取股票 {stock_code} 数据时出现未知错误: {e}")
|
||
return None
|
||
|
||
def fetch_all_stocks(self) -> List[Dict]:
|
||
"""获取所有农业股票数据"""
|
||
all_data = []
|
||
|
||
for stock_code in self.agricultural_stocks:
|
||
try:
|
||
stock_data = self.fetch_stock_data(stock_code)
|
||
if stock_data:
|
||
all_data.append(stock_data)
|
||
|
||
# 避免请求过于频繁
|
||
time.sleep(0.5)
|
||
|
||
except Exception as e:
|
||
logger.error(f"处理股票 {stock_code} 时出错: {e}")
|
||
continue
|
||
|
||
logger.info(f"本次采集完成,共获取 {len(all_data)} 只股票数据")
|
||
return all_data
|
||
|
||
def is_trading_time(self) -> bool:
|
||
"""检查当前是否为交易时间"""
|
||
now = datetime.now()
|
||
|
||
# 检查是否为交易日(周一到周五)
|
||
if now.weekday() >= 5: # 周六、周日
|
||
return False
|
||
|
||
# 检查是否在交易时间内
|
||
current_time = now.time()
|
||
morning_start = datetime.strptime("09:30", "%H:%M").time()
|
||
morning_end = datetime.strptime("12:00", "%H:%M").time()
|
||
afternoon_start = datetime.strptime("13:00", "%H:%M").time()
|
||
afternoon_end = datetime.strptime("17:00", "%H:%M").time()
|
||
|
||
return (morning_start <= current_time <= morning_end) or \
|
||
(afternoon_start <= current_time <= afternoon_end)
|
||
|
||
def save_to_database(self, stock_data_list: List[Dict]):
|
||
"""保存数据到MySQL数据库"""
|
||
if not self.db_connection or not stock_data_list:
|
||
return
|
||
|
||
try:
|
||
cursor = self.db_connection.cursor()
|
||
|
||
# 获取最新交易日期
|
||
today = datetime.now().strftime('%Y-%m-%d')
|
||
|
||
for stock_data in stock_data_list:
|
||
# 检查是否已有相同的交易数据(避免重复存储相同价格的数据)
|
||
check_sql = """
|
||
SELECT COUNT(*) FROM stock_data
|
||
WHERE stock_code = %s AND DATE(trade_date) = %s
|
||
AND close_price = %s AND volume = %s
|
||
"""
|
||
cursor.execute(check_sql, (
|
||
stock_data['stock_code'],
|
||
today,
|
||
stock_data['current_price'],
|
||
stock_data['volume']
|
||
))
|
||
count = cursor.fetchone()[0]
|
||
|
||
if count > 0:
|
||
logger.info(f"股票 {stock_data['stock_code']} 今日数据已存在且相同,跳过")
|
||
continue
|
||
|
||
# 检查是否有今日其他数据需要更新
|
||
update_check_sql = """
|
||
SELECT COUNT(*) FROM stock_data
|
||
WHERE stock_code = %s AND DATE(trade_date) = %s
|
||
"""
|
||
cursor.execute(update_check_sql, (stock_data['stock_code'], today))
|
||
update_count = cursor.fetchone()[0]
|
||
|
||
if update_count > 0:
|
||
# 已有数据,使用UPDATE更新
|
||
update_sql = """
|
||
UPDATE stock_data SET
|
||
stock_name = %(stock_name)s,
|
||
open_price = %(open_price)s,
|
||
close_price = %(current_price)s,
|
||
high_price = %(high_price)s,
|
||
low_price = %(low_price)s,
|
||
volume = %(volume)s,
|
||
turnover = %(turnover)s,
|
||
change_percent = %(change_percent)s,
|
||
change_amount = %(change_amount)s,
|
||
pe_ratio = %(pe_ratio)s,
|
||
pb_ratio = %(pb_ratio)s,
|
||
market_cap = %(market_cap)s,
|
||
float_market_cap = %(float_market_cap)s,
|
||
update_time = NOW()
|
||
WHERE stock_code = %(stock_code)s AND DATE(trade_date) = %(date)s
|
||
"""
|
||
cursor.execute(update_sql, {**stock_data, 'date': today})
|
||
logger.info(f"更新股票 {stock_data['stock_code']} 数据")
|
||
else:
|
||
# 无数据,使用INSERT插入
|
||
insert_sql = """
|
||
INSERT INTO stock_data (
|
||
stock_code, stock_name, open_price, close_price, high_price, low_price,
|
||
volume, turnover, change_percent, change_amount, pe_ratio, pb_ratio,
|
||
market_cap, float_market_cap, trade_date, create_time
|
||
) VALUES (
|
||
%(stock_code)s, %(stock_name)s, %(open_price)s, %(current_price)s,
|
||
%(high_price)s, %(low_price)s, %(volume)s, %(turnover)s,
|
||
%(change_percent)s, %(change_amount)s, %(pe_ratio)s, %(pb_ratio)s,
|
||
%(market_cap)s, %(float_market_cap)s, NOW(), NOW()
|
||
)
|
||
"""
|
||
cursor.execute(insert_sql, stock_data)
|
||
logger.info(f"插入股票 {stock_data['stock_code']} 数据")
|
||
|
||
self.db_connection.commit()
|
||
logger.info(f"成功处理 {len(stock_data_list)} 条股票数据到数据库")
|
||
|
||
except Exception as e:
|
||
logger.error(f"保存数据到数据库失败: {e}")
|
||
if self.db_connection:
|
||
self.db_connection.rollback()
|
||
finally:
|
||
if cursor:
|
||
cursor.close()
|
||
|
||
# Kafka发送方法已移除
|
||
|
||
def run_once(self):
|
||
"""执行一次完整的数据采集流程"""
|
||
# 检查是否为交易时间
|
||
if not self.is_trading_time():
|
||
logger.info("当前非交易时间,跳过数据采集")
|
||
return
|
||
|
||
logger.info("开始执行股票数据采集...")
|
||
|
||
# 获取股票数据
|
||
stock_data_list = self.fetch_all_stocks()
|
||
|
||
if stock_data_list:
|
||
# 保存到数据库
|
||
self.save_to_database(stock_data_list)
|
||
|
||
# Kafka发送已移除
|
||
|
||
logger.info(f"数据采集完成,共处理 {len(stock_data_list)} 只股票")
|
||
else:
|
||
logger.warning("本次采集未获取到任何股票数据")
|
||
|
||
def start_scheduler(self):
|
||
"""启动定时任务"""
|
||
# 配置定时任务
|
||
interval_minutes = self.config.getint('crawler', 'interval_minutes', fallback=5)
|
||
|
||
schedule.every(interval_minutes).minutes.do(self.run_once)
|
||
|
||
logger.info(f"定时任务已启动,每 {interval_minutes} 分钟采集一次")
|
||
|
||
# 立即执行一次
|
||
self.run_once()
|
||
|
||
# 开始定时循环
|
||
while True:
|
||
schedule.run_pending()
|
||
time.sleep(1)
|
||
|
||
def close(self):
|
||
"""关闭连接"""
|
||
if self.db_connection:
|
||
self.db_connection.close()
|
||
# Kafka生产者关闭已移除
|
||
|
||
def main():
|
||
"""主函数"""
|
||
crawler = StockCrawler()
|
||
|
||
try:
|
||
logger.info("股票数据采集器启动")
|
||
crawler.start_scheduler()
|
||
except KeyboardInterrupt:
|
||
logger.info("收到停止信号,正在关闭...")
|
||
except Exception as e:
|
||
logger.error(f"程序运行出错: {e}")
|
||
finally:
|
||
crawler.close()
|
||
logger.info("股票数据采集器已停止")
|
||
|
||
if __name__ == "__main__":
|
||
main() |