import json import os import sys import requests from bs4 import BeautifulSoup # 添加项目路径 sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from src.database.repository import NewsRepository from src.database.models import NewsModel from src.utils.logger import get_logger # 配置 FILE_PATH = "example/02.json" ARTICLE_PREFIX_URL = 'https://www.xinhuanet.com' CATEGORY_ID = 7 # 政务分类 SOURCE = "新华网" # 爬取控制参数 START = 100 # 从第几条开始(0 表示从第一条开始,索引从 0 开始) COUNT = 200 # 爬取多少条(设置为 None 或 0 表示爬取所有) # 请求头 HEADERS = { "User-Agent": ( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/120.0.0.0 Safari/537.36" ) } # 初始化日志 logger = get_logger(__name__) def parse_article(href: str) -> str: """ 解析文章详情页,提取正文内容 Args: href: 文章URL Returns: 文章正文内容 """ try: resp = requests.get(href, headers=HEADERS, timeout=10) resp.raise_for_status() html = resp.text soup = BeautifulSoup(html, "lxml") # 提取正文容器 content_node = soup.find(id="detailContent") if content_node: # 获取文本并进行清洗 content = content_node.get_text(separator='\n', strip=True) return content else: logger.warning(f"未找到正文内容: {href}") return "" except Exception as e: logger.error(f"解析文章失败: {href} - {e}") return "" def main(): """主函数""" # 检查文件是否存在 if not os.path.exists(FILE_PATH): logger.error(f"找不到文件: {FILE_PATH}") return # 初始化数据库仓库 repository = NewsRepository() try: # 1. 读取并解析 JSON with open(FILE_PATH, 'r', encoding='utf-8') as f: data = json.loads(f.read()) # 获取数据源 datasource_list = data.get("datasource", []) # 计算爬取范围 total_count = len(datasource_list) end_index = START + COUNT if COUNT and COUNT > 0 else total_count # 验证参数 if START >= total_count: logger.error(f"起始索引 {START} 超出数据范围(总共 {total_count} 条)") return if end_index > total_count: end_index = total_count logger.warning(f"请求的爬取数量超出数据范围,将爬取到第 {total_count} 条") actual_count = end_index - START logger.info(f"数据范围: 第 {START} 条到第 {end_index - 1} 条(共 {actual_count} 条)") # 用于存储提取后的结果 extracted_results = [] # 2. 遍历与清洗提取(使用切片控制范围) for item in datasource_list[START:end_index]: # 只保留 MultiMedia 类型的有效新闻 if item.get("contentType") == "MultiMedia": # 提取指定字段 record = { "title": item.get("title", "").strip(), "href": ARTICLE_PREFIX_URL + item.get("publishUrl", "").strip(), "author": item.get("sourceText", "").strip(), "publish_time": item.get("publishTime", "").strip(), } extracted_results.append(record) logger.info(f"处理完成!共提取到 {len(extracted_results)} 条有效数据") # 3. 解析文章详情并保存到数据库 news_list = [] success_count = 0 fail_count = 0 for i, res in enumerate(extracted_results): logger.info(f"[{i+1}/{len(extracted_results)}] 正在处理: {res['title']}") # 解析文章正文 content = parse_article(res['href']) if not content: logger.warning(f"文章正文为空,跳过: {res['href']}") fail_count += 1 continue # 创建新闻模型 news = NewsModel( url=res['href'], title=res['title'], content=content, category_id=CATEGORY_ID, source=SOURCE, publish_time=res['publish_time'], author=res['author'], ) news_list.append(news) success_count += 1 # 4. 批量保存到数据库 if news_list: inserted_count = repository.save_news(news_list) logger.info(f"成功保存 {inserted_count} 条新闻到数据库") else: logger.warning("没有有效数据需要保存") # 5. 打印统计信息 print("\n" + "=" * 50) print("处理完成!统计信息:") print(f" 数据总条数: {total_count}") print(f" 爬取范围: 第 {START} 条到第 {end_index - 1} 条") print(f" 实际处理: {len(extracted_results)}") print(f" 成功解析: {success_count}") print(f" 解析失败: {fail_count}") print(f" 保存到数据库: {inserted_count if news_list else 0}") print("=" * 50) except json.JSONDecodeError: logger.error("JSON 格式错误,请检查文件内容") except Exception as e: logger.error(f"发生未知错误: {e}", exc_info=True) if __name__ == "__main__": main()