agricultural-sock-amalysis/data-collector/stock_crawler.py

301 lines
12 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
农业股票数据采集器
根据项目需求从腾讯行情接口采集农业类上市公司数据
"""
import requests
import pandas as pd
import json
import time
import logging
import schedule
from datetime import datetime, timedelta
from typing import List, Dict, Optional
import mysql.connector
# Kafka导入已移除
import configparser
# 日志配置
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('stock_crawler.log', encoding='utf-8'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
class StockCrawler:
"""股票数据爬虫类"""
def __init__(self, config_file='config.ini'):
"""初始化爬虫"""
self.config = self.load_config(config_file)
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
})
# 农业类股票代码列表(示例)
self.agricultural_stocks = [
'sz300630', # 普利制药
'sh600998', # 九州通
'sh600371', # 万向钱潮
'sz000876', # 新希望
'sz002714', # 牧原股份
'sh600519', # 贵州茅台
'sz000858', # 五粮液
'sh600887', # 伊利股份
'sz002304', # 洋河股份
'sh600036', # 招商银行
]
# 初始化数据库连接
self.init_db_connection()
# Kafka生产者初始化已移除
def load_config(self, config_file: str) -> configparser.ConfigParser:
"""加载配置文件"""
config = configparser.ConfigParser()
try:
config.read(config_file, encoding='utf-8')
except FileNotFoundError:
logger.warning(f"配置文件 {config_file} 不存在,使用默认配置")
except UnicodeDecodeError:
logger.warning(f"配置文件编码错误,使用默认配置")
return config
def init_db_connection(self):
"""初始化数据库连接"""
try:
self.db_connection = mysql.connector.connect(
host=self.config.get('database', 'host', fallback='localhost'),
port=self.config.getint('database', 'port', fallback=3306),
user=self.config.get('database', 'user', fallback='root'),
password=self.config.get('database', 'password', fallback='123456'),
database=self.config.get('database', 'database', fallback='agricultural_stock'),
charset='utf8mb4'
)
logger.info("数据库连接成功")
except Exception as e:
logger.error(f"数据库连接失败: {e}")
self.db_connection = None
# Kafka生产者初始化方法已移除
def fetch_stock_data(self, stock_code: str) -> Optional[Dict]:
"""
从腾讯行情接口获取单个股票数据
Args:
stock_code: 股票代码,格式如 'sz300630''sh600998'
Returns:
股票数据字典或None
"""
url = f"http://qt.gtimg.cn/q={stock_code}"
try:
response = self.session.get(url, timeout=10)
response.raise_for_status()
# 解析响应数据
content = response.text.strip()
if not content or 'v_' not in content:
logger.warning(f"股票 {stock_code} 无数据返回")
return None
# 提取数据部分
data_part = content.split('="')[1].split('";')[0]
fields = data_part.split('~')
if len(fields) < 50:
logger.warning(f"股票 {stock_code} 数据字段不完整")
return None
# 构造股票数据字典
stock_data = {
'stock_code': stock_code,
'stock_name': fields[1],
'current_price': float(fields[3]) if fields[3] else 0.0,
'yesterday_close': float(fields[4]) if fields[4] else 0.0,
'open_price': float(fields[5]) if fields[5] else 0.0,
'volume': int(fields[6]) if fields[6] else 0,
'outer_volume': int(fields[7]) if fields[7] else 0,
'inner_volume': int(fields[8]) if fields[8] else 0,
'buy1_price': float(fields[9]) if fields[9] else 0.0,
'buy1_volume': int(fields[10]) if fields[10] else 0,
'buy2_price': float(fields[11]) if fields[11] else 0.0,
'buy2_volume': int(fields[12]) if fields[12] else 0,
'buy3_price': float(fields[13]) if fields[13] else 0.0,
'buy3_volume': int(fields[14]) if fields[14] else 0,
'buy4_price': float(fields[15]) if fields[15] else 0.0,
'buy4_volume': int(fields[16]) if fields[16] else 0,
'buy5_price': float(fields[17]) if fields[17] else 0.0,
'buy5_volume': int(fields[18]) if fields[18] else 0,
'sell1_price': float(fields[19]) if fields[19] else 0.0,
'sell1_volume': int(fields[20]) if fields[20] else 0,
'sell2_price': float(fields[21]) if fields[21] else 0.0,
'sell2_volume': int(fields[22]) if fields[22] else 0,
'sell3_price': float(fields[23]) if fields[23] else 0.0,
'sell3_volume': int(fields[24]) if fields[24] else 0,
'sell4_price': float(fields[25]) if fields[25] else 0.0,
'sell4_volume': int(fields[26]) if fields[26] else 0,
'sell5_price': float(fields[27]) if fields[27] else 0.0,
'sell5_volume': int(fields[28]) if fields[28] else 0,
'latest_deals': fields[29],
'trade_time': fields[30],
'change_amount': float(fields[31]) if fields[31] else 0.0,
'change_percent': float(fields[32]) if fields[32] else 0.0,
'high_price': float(fields[33]) if fields[33] else 0.0,
'low_price': float(fields[34]) if fields[34] else 0.0,
'price_volume_ratio': fields[35],
'volume_ratio': fields[36],
'turnover_rate': float(fields[37]) if fields[37] else 0.0,
'pe_ratio': float(fields[38]) if fields[38] else 0.0,
'pb_ratio': float(fields[46]) if len(fields) > 46 and fields[46] else 0.0,
'market_cap': float(fields[44]) if len(fields) > 44 and fields[44] else 0.0,
'float_market_cap': float(fields[45]) if len(fields) > 45 and fields[45] else 0.0,
'crawl_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
}
# 计算派生字段
if stock_data['current_price'] > 0 and stock_data['yesterday_close'] > 0:
stock_data['change_amount'] = stock_data['current_price'] - stock_data['yesterday_close']
stock_data['change_percent'] = (stock_data['change_amount'] / stock_data['yesterday_close']) * 100
# 计算成交额
stock_data['turnover'] = stock_data['volume'] * stock_data['current_price']
logger.info(f"成功获取股票 {stock_code} 数据: {stock_data['stock_name']}")
return stock_data
except requests.exceptions.RequestException as e:
logger.error(f"请求股票 {stock_code} 数据失败: {e}")
return None
except (ValueError, IndexError) as e:
logger.error(f"解析股票 {stock_code} 数据失败: {e}")
return None
except Exception as e:
logger.error(f"获取股票 {stock_code} 数据时出现未知错误: {e}")
return None
def fetch_all_stocks(self) -> List[Dict]:
"""获取所有农业股票数据"""
all_data = []
for stock_code in self.agricultural_stocks:
try:
stock_data = self.fetch_stock_data(stock_code)
if stock_data:
all_data.append(stock_data)
# 避免请求过于频繁
time.sleep(0.5)
except Exception as e:
logger.error(f"处理股票 {stock_code} 时出错: {e}")
continue
logger.info(f"本次采集完成,共获取 {len(all_data)} 只股票数据")
return all_data
def save_to_database(self, stock_data_list: List[Dict]):
"""保存数据到MySQL数据库"""
if not self.db_connection or not stock_data_list:
return
try:
cursor = self.db_connection.cursor()
# 构建插入SQL
insert_sql = """
INSERT INTO stock_data (
stock_code, stock_name, open_price, close_price, high_price, low_price,
volume, turnover, change_percent, change_amount, pe_ratio, pb_ratio,
market_cap, float_market_cap, trade_date, create_time
) VALUES (
%(stock_code)s, %(stock_name)s, %(open_price)s, %(current_price)s,
%(high_price)s, %(low_price)s, %(volume)s, %(turnover)s,
%(change_percent)s, %(change_amount)s, %(pe_ratio)s, %(pb_ratio)s,
%(market_cap)s, %(float_market_cap)s, NOW(), NOW()
)
"""
# 批量插入数据
cursor.executemany(insert_sql, stock_data_list)
self.db_connection.commit()
logger.info(f"成功保存 {len(stock_data_list)} 条股票数据到数据库")
except Exception as e:
logger.error(f"保存数据到数据库失败: {e}")
if self.db_connection:
self.db_connection.rollback()
finally:
if cursor:
cursor.close()
# Kafka发送方法已移除
def run_once(self):
"""执行一次完整的数据采集流程"""
logger.info("开始执行股票数据采集...")
# 获取股票数据
stock_data_list = self.fetch_all_stocks()
if stock_data_list:
# 保存到数据库
self.save_to_database(stock_data_list)
# Kafka发送已移除
logger.info(f"数据采集完成,共处理 {len(stock_data_list)} 只股票")
else:
logger.warning("本次采集未获取到任何股票数据")
def start_scheduler(self):
"""启动定时任务"""
# 配置定时任务
interval_minutes = self.config.getint('crawler', 'interval_minutes', fallback=5)
schedule.every(interval_minutes).minutes.do(self.run_once)
logger.info(f"定时任务已启动,每 {interval_minutes} 分钟采集一次")
# 立即执行一次
self.run_once()
# 开始定时循环
while True:
schedule.run_pending()
time.sleep(1)
def close(self):
"""关闭连接"""
if self.db_connection:
self.db_connection.close()
# Kafka生产者关闭已移除
def main():
"""主函数"""
crawler = StockCrawler()
try:
logger.info("股票数据采集器启动")
crawler.start_scheduler()
except KeyboardInterrupt:
logger.info("收到停止信号,正在关闭...")
except Exception as e:
logger.error(f"程序运行出错: {e}")
finally:
crawler.close()
logger.info("股票数据采集器已停止")
if __name__ == "__main__":
main()