""" Selenium驱动管理 封装Selenium操作,提供统一的浏览器驱动接口 """ import time from selenium import webdriver from selenium.webdriver.chrome.options import Options from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.common.by import By from selenium.webdriver.support import expected_conditions as EC from typing import Optional import sys import os sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from config.settings import config from utils.logger import get_logger class SeleniumDriver: """Selenium驱动管理器""" def __init__(self): self.logger = get_logger(__name__) self._driver: Optional[webdriver.Chrome] = None self._create_driver() def _create_driver(self): """创建Chrome驱动""" options = Options() # 基本配置 options.add_argument("--disable-gpu") options.add_argument("--disable-software-rasterizer") options.add_argument("--no-sandbox") options.add_argument("--disable-dev-shm-usage") options.add_argument(f"--log-level={config.selenium.log_level}") # 无头模式 if config.selenium.headless: options.add_argument("--headless=new") # 窗口大小 if config.selenium.window_size: options.add_argument(f"--window-size={config.selenium.window_size}") # 创建驱动 self._driver = webdriver.Chrome(options=options) # 设置超时 self._driver.set_page_load_timeout(config.selenium.page_load_timeout) self._driver.set_script_timeout(config.selenium.script_timeout) self.logger.info("Selenium驱动已创建") def get_page_source( self, url: str, wait_selector: Optional[str] = None, scroll: bool = True ) -> str: """ 获取页面HTML源码 Args: url: 页面URL wait_selector: 等待的CSS选择器 scroll: 是否滚动页面 Returns: HTML源码 """ if not self._driver: self._create_driver() try: self._driver.get(url) # 等待元素出现 if wait_selector: WebDriverWait(self._driver, config.selenium.implicit_wait).until( EC.presence_of_element_located((By.CSS_SELECTOR, wait_selector)) ) # 滚动触发懒加载 if scroll: self._scroll_page() return self._driver.page_source except Exception as e: self.logger.error(f"获取页面源码失败: {url} - {e}") raise def _scroll_page(self): """滚动页面""" # 获取初始页面高度 last_height = self._driver.execute_script( "return document.body.scrollHeight" ) self.logger.debug(f"初始页面高度: {last_height}") for i in range(config.selenium.max_scroll_times): # 滚动到底部 self._driver.execute_script( "window.scrollTo(0, document.body.scrollHeight);" ) # 等待新内容加载 time.sleep(config.selenium.scroll_pause_time) # 获取新的页面高度 new_height = self._driver.execute_script( "return document.body.scrollHeight" ) self.logger.debug(f"滚动 {i+1}/{config.selenium.max_scroll_times}, 高度: {last_height} -> {new_height}") # 如果页面高度没有变化,说明已经到底了 if new_height == last_height: self.logger.debug(f"页面高度未变化,停止滚动") break last_height = new_height self.logger.debug(f"滚动完成,最终高度: {last_height}") def click_load_more(self, selector: str = "div.kr-loading-more-button", max_clicks: int = 10): """ 点击加载更多按钮 Args: selector: 加载更多按钮的CSS选择器 max_clicks: 最大点击次数 """ if not self._driver: self._create_driver() click_count = 0 for i in range(max_clicks): try: # 多次滚动到页面底部 for j in range(3): self._driver.execute_script("window.scrollTo(0, document.body.scrollHeight);") time.sleep(0.5) # 点击加载更多按钮(不管按钮是否可见或可点击) load_more_btn = self._driver.find_element(By.CSS_SELECTOR, selector) load_more_btn.click() click_count += 1 self.logger.debug(f"点击加载更多按钮 {click_count} 次") # 等待内容加载 time.sleep(config.selenium.scroll_pause_time) except Exception as e: self.logger.debug(f"点击加载更多按钮失败: {e}") break self.logger.info(f"加载更多按钮共点击 {click_count} 次") def click_next_page(self, selector: str = "div.pagination-item-next.pagination-more", max_clicks: int = 10): """ 点击下一页按钮实现翻页 与 click_load_more 不同,此方法专门用于分页导航 Args: selector: 下一页按钮的CSS选择器 max_clicks: 最大点击次数 """ if not self._driver: self._create_driver() click_count = 0 for i in range(max_clicks): try: # 先滚动到页面底部,确保按钮可见 self._driver.execute_script("window.scrollTo(0, document.body.scrollHeight);") time.sleep(0.5) # 查找下一页按钮 try: next_btn = self._driver.find_element(By.CSS_SELECTOR, selector) except Exception as e: self.logger.info(f"找不到下一页按钮,可能已到最后一页,停止翻页") break # 检查按钮是否可点击(检查是否有disabled类或属性) btn_class = next_btn.get_attribute("class") or "" if "disabled" in btn_class or next_btn.get_attribute("disabled"): self.logger.info(f"下一页按钮已禁用,停止翻页") break # 点击按钮 next_btn.click() click_count += 1 self.logger.debug(f"成功点击下一页按钮 {click_count} 次") # 等待新页面加载 time.sleep(config.selenium.scroll_pause_time * 2) # 增加等待时间 except Exception as e: self.logger.info(f"点击下一页按钮时出错: {e},停止翻页") break self.logger.info(f"下一页按钮共点击 {click_count} 次") def quit(self): """退出驱动""" if self._driver: self._driver.quit() self._driver = None self.logger.info("Selenium驱动已关闭")