diff --git a/DrissionPage/common.py b/DrissionPage/common.py new file mode 100644 index 0000000..6abab86 --- /dev/null +++ b/DrissionPage/common.py @@ -0,0 +1,137 @@ +# -*- coding:utf-8 -*- +""" +@Author : g1879 +@Contact : g1879@qq.com +@File : common.py +""" +from abc import abstractmethod +from pathlib import Path +from typing import Union + +from requests_html import Element +from selenium.webdriver.remote.webelement import WebElement + + +class DrissionElement(object): + def __init__(self, ele): + self._inner_ele = ele + + @property + def inner_ele(self) -> Union[WebElement, Element]: + return self._inner_ele + + @property + def is_valid(self): + return True + + @property + def text(self): + return + + @property + def html(self): + return + + @property + def tag(self): + return + + @property + def parent(self): + return + + @property + def next(self): + return + + @property + def prev(self): + return + + @abstractmethod + def ele(self, loc: tuple, mode: str = None, show_errmsg: bool = True): + pass + + @abstractmethod + def eles(self, loc: tuple, show_errmsg: bool = True): + pass + + @abstractmethod + def attr(self, attr: str): + pass + + +def get_loc_from_str(loc: str) -> tuple: + loc_item = loc.split(':', 1) + by = loc_item[0] + loc_by = 'xpath' + if by == 'tag' and len(loc_item) == 2: + loc_str = f'//{loc_item[1]}' + elif by.startswith('@') and len(loc_item) == 2: + loc_str = f'//*[{by}="{loc_item[1]}"]' + elif by.startswith('@') and len(loc_item) == 1: + loc_str = f'//*[{by}]' + elif by == 'text' and len(loc_item) == 2: + loc_str = _make_xpath_search_str(loc_item[1]) + elif by == 'xpath' and len(loc_item) == 2: + loc_str = loc_item[1] + elif by == 'css' and len(loc_item) == 2: + loc_by = 'css selector' + loc_str = loc_item[1] + else: + loc_str = _make_xpath_search_str(by) + return loc_by, loc_str + + +def _make_xpath_search_str(search_str: str): + # 将"转义,不知何故不能直接用\" + parts = search_str.split('"') + parts_num = len(parts) + search_str = 'concat(' + for key, i in enumerate(parts): + search_str += f'"{i}"' + search_str += ',' + '\'"\',' if key < parts_num - 1 else '' + search_str += ',"")' + return f"//*[contains(text(),{search_str})]" + + +def translate_loc_to_xpath(loc): + """把By类型转为xpath或css selector""" + loc_by = 'xpath' + loc_str = None + if loc[0] == 'xpath': + loc_str = loc[1] + elif loc[0] == 'css selector': + loc_by = 'css selector' + loc_str = loc[1] + elif loc[0] == 'id': + loc_str = f'//*[@id="{loc[1]}"]' + elif loc[0] == 'class name': + loc_str = f'//*[@class="{loc[1]}"]' + elif loc[0] == 'link text': + loc_str = f'//a[text()="{loc[1]}"]' + elif loc[0] == 'name': + loc_str = f'//*[@name="{loc[1]}"]' + elif loc[0] == 'tag name': + loc_str = f'//{loc[1]}' + elif loc[0] == 'partial link text': + loc_str = f'//a[contains(text(),"{loc[1]}")]' + return loc_by, loc_str + + +def avoid_duplicate_name(folder_path: str, file_name: str) -> str: + """检查文件是否重名,并返回可以使用的文件名 + :param folder_path: 文件夹路径 + :param file_name: 要检查的文件名 + :return: 可用的文件名 + """ + while (file_Path := Path(folder_path).joinpath(file_name)).exists(): + ext_name = file_Path.suffix + base_name = file_Path.stem + num = base_name.split(' ')[-1] + if num[0] == '(' and num[-1] == ')' and num[1:-1].isdigit(): + num = int(num[1:-1]) + file_name = f'{base_name.replace(f"({num})", "", -1)}({num + 1}){ext_name}' + else: + file_name = f'{base_name} (1){ext_name}' + return file_name diff --git a/DrissionPage/config.py b/DrissionPage/config.py index aa1fce2..7af5f54 100644 --- a/DrissionPage/config.py +++ b/DrissionPage/config.py @@ -1,63 +1,141 @@ # -*- coding:utf-8 -*- """ 配置文件 +@Author : g1879 +@Contact : g1879@qq.com +@File : config.py """ - +from configparser import ConfigParser, NoSectionError, NoOptionError from pathlib import Path +from typing import Any -global_tmp_path = f'{str(Path(__file__).parent)}\\tmp' -Path(global_tmp_path).mkdir(parents=True, exist_ok=True) +from selenium import webdriver +from selenium.webdriver.chrome.options import Options -global_driver_options = { - # ---------------已打开的浏览器--------------- - 'debuggerAddress': '127.0.0.1:9222', - # ---------------chromedriver路径--------------- - 'chromedriver_path': r'D:\python\Google Chrome\Chrome\chromedriver.exe', - # ---------------手动指定使用的浏览器位置--------------- - # 'binary_location': r'C:\Program Files (x86)\Google\Chrome\Application\chrome.exe', - # ---------------启动参数--------------- - 'arguments': [ - # '--headless', # 隐藏浏览器窗口 - '--mute-audio', # 静音 - '--no-sandbox', - # '--blink-settings=imagesEnabled=false', # 不加载图片 - # r'--user-data-dir="E:\tmp\chrome_tmp"', # 指定用户文件夹路径 - # '-–disk-cache-dir=""', # 指定缓存路径 - 'zh_CN.UTF-8', # 编码格式 - # "--proxy-server=http://127.0.0.1:8888", # 设置代理 - # '--hide-scrollbars', # 隐藏滚动条 - # '--start-maximized', # 浏览器窗口最大化 - # "--disable-javascript", # 禁用JavaScript - # 模拟移动设备 - # 'user-agent="MQQBrowser/26 Mozilla/5.0 (Linux; U; Android 2.3.7; zh-cn; MB200 Build/GRJ22; CyanogenMod-7) AppleWebKit/533.1 (KHTML, like Gecko) Version/4.0 Mobile Safari/533.1"', - '--disable-gpu' # 谷歌文档提到需要加上这个属性来规避bug - ], - # ---------------扩展文件--------------- - 'extension_files': [], - # 'extensions': [], - # ---------------实验性质的设置参数--------------- - 'experimental_options': { - 'prefs': { - # 设置下载路径 - 'download.default_directory': global_tmp_path, - # 下载不弹出窗口 - 'profile.default_content_settings.popups': 0, - # 无弹窗 - 'profile.default_content_setting_values': {'notifications': 2}, - # 禁用PDF插件 - 'plugins.plugins_list': [{"enabled": False, "name": "Chrome PDF Viewer"}], - # 设置为开发者模式,防反爬虫 - 'excludeSwitches': ["ignore-certificate-errors", "enable-automation"] - } - } -} +class OptionsManager(object): + """管理配置文件内容的类""" + def __init__(self, path: str = None): + """初始化,读取配置文件,如没有设置临时文件夹,则设置并新建""" + self.path = path or Path(__file__).parent / 'configs.ini' + self._conf = ConfigParser() + self._conf.read(self.path, encoding='utf-8') + if 'global_tmp_path' not in self.get_option('paths') or not self.get_value('paths', 'global_tmp_path'): + global_tmp_path = f'{str(Path(__file__).parent)}\\tmp' + Path(global_tmp_path).mkdir(parents=True, exist_ok=True) + self.set_item('paths', 'global_tmp_path', global_tmp_path) + self.save() -global_session_options = { - 'headers': { - "User-Agent": 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_6) AppleWebKit/603.3.8 (KHTML, like Gecko)' - ' Version/10.1.2 Safari/603.3.8', - "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", - "Accept-Language": "zh-cn", "Connection": "keep-alive", - "Accept-Charset": "GB2312,utf-8;q=0.7,*;q=0.7"} -} + def get_value(self, section: str, item: str) -> Any: + """获取配置的值""" + try: + return eval(self._conf.get(section, item)) + except SyntaxError: + return self._conf.get(section, item) + except NoSectionError and NoOptionError: + return None + + def get_option(self, section: str) -> dict: + """把section内容以字典方式返回""" + items = self._conf.items(section) + option = dict() + for j in items: + try: + option[j[0]] = eval(self._conf.get(section, j[0]).replace('\\', '\\\\')) + except SyntaxError: + option[j[0]] = self._conf.get(section, j[0]) + return option + + def set_item(self, section: str, item: str, value: str): + """设置配置值""" + self._conf.set(section, item, str(value)) + + def save(self): + """保存配置文件""" + self._conf.write(open(self.path, 'w')) + + +class DriverOptions(Options): + def __init__(self, read_file=True): + """初始化,默认从文件读取设置""" + super().__init__() + if read_file: + options_dict = OptionsManager().get_option('chrome_options') + self._binary_location = options_dict['binary_location'] if 'binary_location' in options_dict else '' + self._arguments = options_dict['arguments'] if 'arguments' in options_dict else [] + self._extensions = options_dict['extensions'] if 'extensions' in options_dict else [] + self._experimental_options = options_dict[ + 'experimental_options'] if 'experimental_options' in options_dict else {} + self._debugger_address = options_dict['debugger_address'] if 'debugger_address' in options_dict else None + + def save(self): + """保存设置到文件""" + om = OptionsManager() + options = _chrome_options_to_dict(self) + for i in options: + om.set_item('chrome_options', i, options[i]) + om.save() + + def remove_argument(self, value: str): + """移除一个设置""" + if value in self._arguments: + self._arguments.remove(value) + + def remove_experimental_option(self, key: str): + """移除一个实验设置,传入key值删除""" + if key in self._experimental_options: + self._experimental_options.pop(key) + + def remove_all_extensions(self): + """移除所有插件 + 因插件是以整个文件储存,难以移除其中一个,故如须设置则全部移除再重设""" + self._extensions = [] + + +def _dict_to_chrome_options(options: dict) -> Options: + """从传入的字典获取浏览器设置,返回ChromeOptions对象""" + chrome_options = webdriver.ChromeOptions() + if 'debugger_address' in options and options['debugger_address']: + # 控制已打开的浏览器 + chrome_options.debugger_address = options['debugger_address'] + else: + if 'binary_location' in options and options['binary_location']: + # 手动指定使用的浏览器位置 + chrome_options.binary_location = options['binary_location'] + if 'arguments' in options: + # 启动参数 + if not isinstance(options['arguments'], list): + raise Exception(f'Arguments need list,not {type(options["arguments"])}.') + for arg in options['arguments']: + chrome_options.add_argument(arg) + if 'extension_files' in options and options['extension_files']: + # 加载插件 + if not isinstance(options['extension_files'], list): + raise Exception(f'Extension files need list,not {type(options["extension_files"])}.') + for arg in options['extension_files']: + chrome_options.add_extension(arg) + if 'extensions' in options and options['extensions']: + if not isinstance(options['extensions'], list): + raise Exception(f'Extensions need list,not {type(options["extensions"])}.') + for arg in options['extensions']: + chrome_options.add_encoded_extension(arg) + if 'experimental_options' in options and options['experimental_options']: + # 实验性质的设置参数 + if not isinstance(options['experimental_options'], dict): + raise Exception(f'Experimental options need dict,not {type(options["experimental_options"])}.') + for i in options['experimental_options']: + chrome_options.add_experimental_option(i, options['experimental_options'][i]) + # if 'capabilities' in options and options['capabilities']: + # pass # 未知怎么用 + return chrome_options + + +def _chrome_options_to_dict(options: Options) -> dict: + re_dict = dict() + re_dict['binary_location'] = options.binary_location + re_dict['debugger_address'] = options.debugger_address + re_dict['arguments'] = options.arguments + re_dict['extensions'] = options.extensions + re_dict['experimental_options'] = options.experimental_options + # re_dict['capabilities'] = options.capabilities + return re_dict diff --git a/DrissionPage/configs.ini b/DrissionPage/configs.ini new file mode 100644 index 0000000..f7ab093 --- /dev/null +++ b/DrissionPage/configs.ini @@ -0,0 +1,65 @@ +[paths] +;chromedriver_path = D:\python\Google Chrome\Chrome\chromedriver81.exe +chromedriver_path = D:\python\Google Chrome\Chrome\chromedriver.exe +global_tmp_path = D:\python\projects\fsjy\upload_news\DrissionPage\tmp + +[chrome_options] +debugger_address = +;127.0.0.1:9222 +;binary_location = C:\Program Files (x86)\Google\Chrome\Application\chrome.exe +binary_location = D:\python\Google Chrome\Chrome\chrome.exe +arguments = [ + ; 隐藏浏览器窗口 + '--headless', + ; 静音 + '--mute-audio', + ; 不使用沙盒 + '--no-sandbox', + ; 不加载图片 + ; '--blink-settings=imagesEnabled=false', + ; 指定用户文件夹路径 + ; r'--user-data-dir="E:\tmp\chrome_tmp"', + ; 指定缓存路径 + ; '-–disk-cache-dir=""', + ; 编码格式 + 'zh_CN.UTF-8', + ; 设置代理 + ; "--proxy-server=http://127.0.0.1:1081", + ; 隐藏滚动条 + ; '--hide-scrollbars', + ; 浏览器窗口最大化 + ; '--start-maximized', + ; 禁用JavaScript + ; "--disable-javascript", + ; 模拟移动设备 + ; 'user-agent="MQQBrowser/26 Mozilla/5.0 (Linux; U; Android 2.3.7; zh-cn; MB200 Build/GRJ22; CyanogenMod-7) AppleWebKit/533.1 (KHTML, like Gecko) Version/4.0 Mobile Safari/533.1"', + ; 谷歌文档提到需要加上这个属性来规避bug + '--disable-gpu' + ] +extensions = [] +experimental_options = { + 'prefs': { + ; 设置下载路径 + 'download.default_directory': r'D:\python\projects\fsjy\upload_news\DrissionPage\tmp', + ; 下载不弹出窗口 + 'profile.default_content_settings.popups': 0, + ; 无弹窗 + 'profile.default_content_setting_values': {'notifications': 2}, + ; 禁用PDF插件 + 'plugins.plugins_list': [{"enabled": False, "name": "Chrome PDF Viewer"}], + ; 设置为开发者模式,防反爬虫(无用) + 'excludeSwitches': ["ignore-certificate-errors", "enable-automation"], + 'useAutomationExtension': False + } + } + +[session_options] +headers = { + "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_6) AppleWebKit/603.3.8 (KHTML, like Gecko) Version/10.1.2 Safari/603.3.8", + "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", + "Accept-Language": "zh-cn", + "Connection": "keep-alive", + "Accept-Charset": "GB2312,utf-8;q=0.7,*;q=0.7" + } +;proxies = { "http": "127.0.0.1:8888", "https": "http://127.0.0.1:8888" } + diff --git a/DrissionPage/drission.py b/DrissionPage/drission.py index 25d8321..b66ca85 100644 --- a/DrissionPage/drission.py +++ b/DrissionPage/drission.py @@ -4,131 +4,146 @@ @Contact : g1879@qq.com @File : drission.py """ +from typing import Union from urllib.parse import urlparse import tldextract +from requests import Session from requests_html import HTMLSession from selenium import webdriver from selenium.common.exceptions import WebDriverException from selenium.webdriver.chrome.options import Options +from selenium.webdriver.chrome.webdriver import WebDriver -from .config import global_driver_options, global_session_options - - -def _get_chrome_options(options: dict) -> Options: - """ 从传入的字典获取浏览器设置,返回ChromeOptions对象""" - chrome_options = webdriver.ChromeOptions() - if 'debuggerAddress' in options: - # 控制已打开的浏览器 - chrome_options.add_experimental_option('debuggerAddress', options['debuggerAddress']) - else: - if 'binary_location' in options and options['binary_location']: - # 手动指定使用的浏览器位置 - chrome_options.binary_location = options['binary_location'] - if 'arguments' in options: - # 启动参数 - if isinstance(options['arguments'], list): - for arg in options['arguments']: - chrome_options.add_argument(arg) - else: - raise Exception(f'需要list,而非{type(options["arguments"])}') - if 'extension_files' in options and options['extension_files']: - # 加载插件 - if isinstance(options['extension_files'], list): - for arg in options['extension_files']: - chrome_options.add_extension(arg) - else: - raise Exception(f'需要list,而非{type(options["extension_files"])}') - if 'experimental_options' in options: - # 实验性质的设置参数 - if isinstance(options['experimental_options'], dict): - for i in options['experimental_options']: - chrome_options.add_experimental_option(i, options['experimental_options'][i]) - else: - raise Exception(f'需要dict,而非{type(options["experimental_options"])}') - - return chrome_options +from .config import _dict_to_chrome_options, OptionsManager class Drission(object): - """ Drission类整合了WebDriver对象和HTLSession对象, - 可按要求创建、关闭及同步cookies + """Drission类整合了WebDriver对象和HTLSession对象,可按要求创建、关闭及同步cookies """ - def __init__(self, driver_options: dict = None, session_options: dict = None): - self._driver = None + def __init__(self, driver_options: Union[dict, Options] = None, session_options: dict = None, + driver_path: str = None): + """初始化配置信息,但不生成session和driver实例 + :param driver_options: chrome设置,Options类或设置字典 + :param session_options: session设置 + :param driver_path: chromedriver路径,如为空,则为'chromedriver' + """ self._session = None - self._driver_options = driver_options if driver_options else global_driver_options - self._session_options = session_options if session_options else global_session_options + self._driver = None + om = OptionsManager() + self._session_options = session_options or om.get_option('session_options') + self._driver_options = driver_options or om.get_option('chrome_options') + + if driver_path: + self._driver_path = driver_path + elif 'chromedriver_path' in om.get_option('paths') and om.get_option('paths')['chromedriver_path']: + self._driver_path = om.get_option('paths')['chromedriver_path'] + else: + self._driver_path = 'chromedriver' @property def session(self): - """ 获取HTMLSession对象""" + """获取HTMLSession对象""" if self._session is None: self._session = HTMLSession() + attrs = ['headers', 'cookies', 'auth', 'proxies', 'hooks', 'params', 'verify', + 'cert', 'adapters', 'stream', 'trust_env', 'max_redirects'] + for i in attrs: + if i in self._session_options: + exec(f'self._session.{i} = self._session_options["{i}"]') + return self._session @property def driver(self): - """ 获取WebDriver对象,按传入配置信息初始化""" + """获取WebDriver对象,按传入配置信息初始化""" if self._driver is None: - if 'chromedriver_path' in self._driver_options: - driver_path = self._driver_options['chromedriver_path'] + if isinstance(self._driver_options, Options): + options = self._driver_options + if options.debugger_address: + # 因同时设置调试浏览器和其他配置会导致异常,故新建一个对象 + debugger_address = options.debugger_address + options = webdriver.ChromeOptions() + options.debugger_address = debugger_address + elif isinstance(self._driver_options, dict): + options = _dict_to_chrome_options(self._driver_options) else: - driver_path = 'chromedriver' - self._driver = webdriver.Chrome(driver_path, options=_get_chrome_options(self._driver_options)) + raise KeyError('Driver options invalid') + + self._driver = webdriver.Chrome(self._driver_path, options=options) + + # 反爬设置,似乎没用 + self._driver.execute_cdp_cmd("Page.addScriptToEvaluateOnNewDocument", { + "source": """ + Object.defineProperty(navigator, 'webdriver', { + get: () => undefined + }) + """ + }) + return self._driver @property - def session_options(self): + def session_options(self) -> dict: return self._session_options - def cookies_to_session(self, copy_user_agent: bool = False) -> None: - """ 把driver的cookies复制到session""" - if copy_user_agent: - self.copy_user_agent_from_driver() - for cookie in self.driver.get_cookies(): - self.session.cookies.set(cookie['name'], cookie['value'], domain=cookie['domain']) + @session_options.setter + def session_options(self, value: dict): + self._session_options = value - def cookies_to_driver(self, url: str): - """ 把session的cookies复制到driver""" + def cookies_to_session(self, copy_user_agent: bool = False, driver: WebDriver = None, session: Session = None) \ + -> None: + """把driver的cookies复制到session""" + driver = driver or self.driver + session = session or self.session + if copy_user_agent: + self.user_agent_to_session(driver, session) + for cookie in driver.get_cookies(): + session.cookies.set(cookie['name'], cookie['value'], domain=cookie['domain']) + + def cookies_to_driver(self, url: str, driver: WebDriver = None, session: Session = None) -> None: + """把session的cookies复制到driver""" + driver = driver or self.driver + session = session or self.session domain = urlparse(url).netloc if not domain: raise Exception('Without specifying a domain') - # 翻译cookies - for i in [x for x in self.session.cookies if domain in x.domain]: + for i in [x for x in session.cookies if domain in x.domain]: cookie_data = {'name': i.name, 'value': str(i.value), 'path': i.path, 'domain': i.domain} if i.expires: cookie_data['expiry'] = i.expires - self.ensure_add_cookie(cookie_data) + self._ensure_add_cookie(cookie_data, driver=driver) - def ensure_add_cookie(self, cookie, override_domain=None) -> None: - """ 添加cookie到driver""" + def _ensure_add_cookie(self, cookie, override_domain=None, driver=None) -> None: + """添加cookie到driver""" + driver = driver or self.driver if override_domain: cookie['domain'] = override_domain cookie_domain = cookie['domain'] if cookie['domain'][0] != '.' else cookie['domain'][1:] try: - browser_domain = tldextract.extract(self.driver.current_url).fqdn + browser_domain = tldextract.extract(driver.current_url).fqdn except AttributeError: browser_domain = '' if cookie_domain not in browser_domain: - self.driver.get(f'http://{cookie_domain.lstrip("http://")}') + driver.get(f'http://{cookie_domain.lstrip("http://")}') - self.driver.add_cookie(cookie) + driver.add_cookie(cookie) # 如果添加失败,尝试更宽的域名 - if not self.is_cookie_in_driver(cookie): + if not self._is_cookie_in_driver(cookie, driver): cookie['domain'] = tldextract.extract(cookie['domain']).registered_domain - self.driver.add_cookie(cookie) - if not self.is_cookie_in_driver(cookie): + driver.add_cookie(cookie) + if not self._is_cookie_in_driver(cookie): raise WebDriverException(f"Couldn't add the following cookie to the webdriver\n{cookie}\n") - def is_cookie_in_driver(self, cookie) -> bool: - """ 检查cookie是否已经在driver里 + def _is_cookie_in_driver(self, cookie, driver=None) -> bool: + """检查cookie是否已经在driver里 只检查name、value、domain,检查domain时比较宽""" - for driver_cookie in self.driver.get_cookies(): + driver = driver or self.driver + for driver_cookie in driver.get_cookies(): if (cookie['name'] == driver_cookie['name'] and cookie['value'] == driver_cookie['value'] and (cookie['domain'] == driver_cookie['domain'] or @@ -136,23 +151,25 @@ class Drission(object): return True return False - def copy_user_agent_from_driver(self) -> None: - """ 把driver的user-agent复制到session""" - selenium_user_agent = self.driver.execute_script("return navigator.userAgent;") - self.session.headers.update({"user-agent": selenium_user_agent}) + def user_agent_to_session(self, driver: WebDriver = None, session: Session = None) -> None: + """把driver的user-agent复制到session""" + driver = driver or self.driver + session = session or self.session + selenium_user_agent = driver.execute_script("return navigator.userAgent;") + session.headers.update({"User-Agent": selenium_user_agent}) def close_driver(self) -> None: - """ 关闭driver和浏览器""" + """关闭driver和浏览器""" self._driver.quit() self._driver = None def close_session(self) -> None: - """ 关闭session""" + """关闭session""" self._session.close() self._session = None def close(self) -> None: - """ 关闭session、driver和浏览器""" + """关闭session、driver和浏览器""" if self._driver: self.close_driver() if self._session: diff --git a/DrissionPage/driver_element.py b/DrissionPage/driver_element.py new file mode 100644 index 0000000..25a270e --- /dev/null +++ b/DrissionPage/driver_element.py @@ -0,0 +1,251 @@ +# -*- coding:utf-8 -*- +""" +@Author : g1879 +@Contact : g1879@qq.com +@File : driver_element.py +""" +from html import unescape +from pathlib import Path +from time import sleep +from typing import Union, List, Any + +from selenium.webdriver.chrome.webdriver import WebDriver +from selenium.webdriver.remote.webelement import WebElement +from selenium.webdriver.support import expected_conditions as ec +from selenium.webdriver.support.select import Select +from selenium.webdriver.support.wait import WebDriverWait + +from .common import DrissionElement, get_loc_from_str, translate_loc_to_xpath +from .config import OptionsManager + + +class DriverElement(DrissionElement): + '''driver模式的元素对象,包装了一个WebElement对象,并封装了常用功能''' + + def __init__(self, ele: WebElement, timeout: float = 10): + super().__init__(ele) + self.timeout = timeout + + def __repr__(self): + attrs = [f"{attr}='{self.attrs[attr]}'" for attr in self.attrs] + return f'' + + @property + def attrs(self) -> dict: + """返回元素所有属性及值""" + js = ''' + var dom=arguments[0]; + var names="{"; + var len = dom.attributes.length; + for(var i=0;i str: + """元素内文本""" + return unescape(self.attr('innerText')).replace('\xa0', ' ') + + @property + def html(self) -> str: + """元素innerHTML""" + return unescape(self.attr('innerHTML')).replace('\xa0', ' ') + + @property + def tag(self) -> str: + """元素类型""" + return self._inner_ele.tag_name + + @property + def parent(self): + """父级元素""" + loc = 'xpath', './..' + return self.ele(loc, timeout=1, show_errmsg=False) + + @property + def next(self): + """下一个兄弟元素""" + loc = 'xpath', './following-sibling::*[1]' + return self.ele(loc, timeout=1, show_errmsg=False) + + @property + def prev(self): + """上一个兄弟元素""" + loc = 'xpath', './preceding-sibling::*[1]' + return self.ele(loc, timeout=1, show_errmsg=False) + + def attr(self, attr: str) -> str: + """获取属性值""" + if attr == 'text': + return self.text + else: + # return self.attrs[attr] + return self.inner_ele.get_attribute(attr) + + def ele(self, loc_or_str: Union[tuple, str], mode: str = None, show_errmsg: bool = False, timeout: float = None): + """根据loc获取元素或列表,可用用字符串控制获取方式,可选'id','class','name','tagName' + 例:ele.find('id:ele_id') + """ + if isinstance(loc_or_str, str): + loc_or_str = get_loc_from_str(loc_or_str) + elif isinstance(loc_or_str, tuple) and len(loc_or_str) == 2: + loc_or_str = translate_loc_to_xpath(loc_or_str) + else: + raise ValueError('loc_or_str must be tuple or str.') + + if loc_or_str[0] == 'xpath': + # 确保查询语句最前面是. + loc_str = f'.{loc_or_str[1]}' if not loc_or_str[1].startswith('.') else loc_or_str[1] + loc_or_str = loc_or_str[0], loc_str + + timeout = timeout or self.timeout + return execute_driver_find(self.inner_ele, loc_or_str, mode, show_errmsg, timeout) + + def eles(self, loc_or_str: Union[tuple, str], show_errmsg: bool = False, timeout: float = None): + """根据loc获取子元素列表""" + return self.ele(loc_or_str, mode='all', show_errmsg=show_errmsg, timeout=timeout) + + # -----------------以下为driver独占------------------- + def click(self, by_js=False) -> bool: + """点击""" + if not by_js: + for _ in range(10): + try: + self.inner_ele.click() + return True + except Exception as e: + # print(e) + sleep(0.2) + # 若点击失败,用js方式点击 + # print('Click by JS.') + try: + self.run_script('arguments[0].click()') + return True + except: + raise + + def input(self, value, clear: bool = True) -> bool: + """输入文本""" + try: + if clear: + self.clear() + self.inner_ele.send_keys(value) + return True + except: + raise + + def run_script(self, script: str) -> Any: + """运行js""" + return self.inner_ele.parent.execute_script(script, self.inner_ele) + + def submit(self) -> None: + """提交表单""" + self.inner_ele.submit() + + def clear(self) -> None: + """清空元素""" + self.run_script("arguments[0].value=''") + # self.ele.clear() + + def is_selected(self) -> bool: + """是否选中""" + return self.inner_ele.is_selected() + + def is_enabled(self) -> bool: + """是否可用""" + return self.inner_ele.is_enabled() + + def is_displayed(self) -> bool: + """是否可见""" + return self.inner_ele.is_displayed() + + def is_valid(self) -> bool: + """用于判断元素是否还能用,应对页面跳转元素不能用的情况""" + try: + self.is_enabled() + return True + except: + return False + + @property + def size(self) -> dict: + """元素大小""" + return self.inner_ele.size + + @property + def location(self) -> dict: + """元素坐标""" + return self.inner_ele.location + + def screenshot(self, path: str = None, filename: str = None) -> str: + """元素截图""" + name = filename or self.tag + path = path or OptionsManager().get_value('paths', 'global_tmp_path') + if not path: + raise IOError('No path specified.') + Path(path).mkdir(parents=True, exist_ok=True) + # 等待元素加载完成 + if self.tag == 'img': + js = 'return arguments[0].complete && typeof arguments[0].naturalWidth != "undefined" ' \ + '&& arguments[0].naturalWidth > 0' + while not self.run_script(js): + pass + img_path = f'{path}\\{name}.png' + self.inner_ele.screenshot(img_path) + return img_path + + def select(self, text: str) -> bool: + """在下拉列表中选择""" + ele = Select(self.inner_ele) + try: + ele.select_by_visible_text(text) + return True + except: + return False + + def set_attr(self, attr: str, value: str) -> bool: + """设置元素属性""" + try: + self.run_script(f"arguments[0].{attr} = '{value}';") + return True + except: + raise + + +def execute_driver_find(page_or_ele: Union[WebElement, WebDriver], loc: tuple, mode: str = 'single', + show_errmsg: bool = False, timeout: float = 10) -> Union[DriverElement, List[DriverElement]]: + """执行driver模式元素的查找 + 页面查找元素及元素查找下级元素皆使用此方法 + :param page_or_ele: driver模式页面或元素 + :param loc: 元素定位语句 + :param mode: 'single'或'all' + :param show_errmsg: 是否显示错误信息 + :param timeout: 查找元素超时时间 + :return: 返回DriverElement元素或列表 + """ + mode = mode or 'single' + if mode not in ['single', 'all']: + raise ValueError("mode must be 'single' or 'all'.") + msg = result = None + try: + wait = WebDriverWait(page_or_ele, timeout=timeout) + if mode == 'single': + msg = 'Element not found.' + result = DriverElement(wait.until(ec.presence_of_element_located(loc))) + elif mode == 'all': + msg = 'Elements not found.' + eles = wait.until(ec.presence_of_all_elements_located(loc)) + result = [DriverElement(ele) for ele in eles] + return result + except: + if show_errmsg: + print(msg, loc) + raise + return [] if mode == 'all' else None diff --git a/DrissionPage/driver_page.py b/DrissionPage/driver_page.py index 4888e68..191a199 100644 --- a/DrissionPage/driver_page.py +++ b/DrissionPage/driver_page.py @@ -4,25 +4,26 @@ @Contact : g1879@qq.com @File : driver_page.py """ -from html import unescape -from time import sleep -from typing import Union +from glob import glob +from typing import Union, List, Any from urllib import parse from selenium.webdriver.chrome.webdriver import WebDriver from selenium.webdriver.remote.webelement import WebElement -from selenium.webdriver.support import expected_conditions as EC -from selenium.webdriver.support.ui import Select -from selenium.webdriver.support.wait import WebDriverWait + +from .common import get_loc_from_str +from .config import OptionsManager +from .driver_element import DriverElement, execute_driver_find class DriverPage(object): """DriverPage封装了页面操作的常用功能,使用selenium来获取、解析、操作网页""" - def __init__(self, driver: WebDriver, locs=None): + def __init__(self, driver: WebDriver, timeout: float = 10): # , locs=None """初始化函数,接收一个WebDriver对象,用来操作网页""" self._driver = driver - self._locs = locs + self.timeout = timeout + # self._locs = locs self._url = None self._url_available = None @@ -38,11 +39,26 @@ class DriverPage(object): else: return self._driver.current_url + @property + def html(self) -> str: + """获取元素innerHTML,如未指定元素则获取页面源代码""" + return self.driver.find_element_by_xpath("//*").get_attribute("outerHTML") + @property def url_available(self) -> bool: """url有效性""" return self._url_available + @property + def cookies(self) -> list: + """返回当前网站cookies""" + return self.driver.get_cookies() + + @property + def title(self) -> str: + """获取网页title""" + return self._driver.title + def get(self, url: str, params: dict = None, go_anyway: bool = False) -> Union[None, bool]: """跳转到url""" to_url = f'{url}?{parse.urlencode(params)}' if params else url @@ -50,162 +66,43 @@ class DriverPage(object): return self._url = to_url self.driver.get(to_url) - self._url_available = True if self.check_driver_url() else False + self._url_available = self.check_page() return self._url_available - @property - def cookies(self) -> list: - """返回当前网站cookies""" - return self.driver.get_cookies() - - def get_title(self) -> str: - """获取网页title""" - return self._driver.title - - def _get_ele(self, loc_or_ele: Union[WebElement, tuple]) -> WebElement: - """接收loc或元素实例,返回元素实例""" - # ======================================== - # ** 必须与SessionPage类中同名函数保持一致 ** - # ======================================== - if isinstance(loc_or_ele, tuple): - return self.find(loc_or_ele) - return loc_or_ele - - def find(self, loc: tuple, mode: str = None, timeout: float = 10, show_errmsg: bool = True) \ - -> Union[WebElement, list]: - """查找一个元素 - :param loc: 页面元素地址 + def ele(self, loc_or_ele: Union[tuple, str, DriverElement], mode: str = None, + timeout: float = None, show_errmsg: bool = False) -> Union[DriverElement, List[DriverElement], None]: + """根据loc获取元素或列表,可用用字符串控制获取方式,可选'id','class','name','tagName' + 例:ele.find('id:ele_id') + :param loc_or_ele: 页面元素地址 :param mode: 以某种方式查找元素,可选'single' , 'all', 'visible' :param timeout: 是否显示错误信息 :param show_errmsg: 是否显示错误信息 :return: 页面元素对象或列表 """ - mode = mode if mode else 'single' - if mode not in ['single', 'all', 'visible']: - raise ValueError("mode须在'single', 'all', 'visible'中选择") - msg = ele = None - try: - wait = WebDriverWait(self.driver, timeout=timeout) - if mode == 'single': - msg = '未找到元素' - ele = wait.until(EC.presence_of_element_located(loc)) - elif mode == 'all': - msg = '未找到元素s' - ele = wait.until(EC.presence_of_all_elements_located(loc)) - elif mode == 'visible': - msg = '元素不可见或不存在' - ele = wait.until(EC.visibility_of_element_located(loc)) - return ele - except: - if show_errmsg: - print(msg, loc) + if isinstance(loc_or_ele, DriverElement): + return loc_or_ele + elif isinstance(loc_or_ele, str): + loc_or_ele = get_loc_from_str(loc_or_ele) - def find_all(self, loc: tuple, timeout: float = 10, show_errmsg=True) -> list: + timeout = timeout or self.timeout + return execute_driver_find(self.driver, loc_or_ele, mode, show_errmsg, timeout) + + def eles(self, loc: Union[tuple, str], timeout: float = None, show_errmsg=False) -> List[DriverElement]: """查找符合条件的所有元素""" - return self.find(loc, mode='all', timeout=timeout, show_errmsg=show_errmsg) - - def search(self, value: str, mode: str = None, timeout: float = 10) -> Union[WebElement, list, None]: - """根据内容搜索元素 - :param value: 搜索内容 - :param mode: 可选'single','all' - :param timeout: 超时时间 - :return: 页面元素对象 - """ - mode = mode if mode else 'single' - if mode not in ['single', 'all']: - raise ValueError("mode须在'single', 'all'中选择") - ele = [] - try: - loc = 'xpath', f'//*[contains(text(),"{value}")]' - wait = WebDriverWait(self.driver, timeout=timeout) - if mode == 'single': - ele = wait.until(EC.presence_of_element_located(loc)) - elif mode == 'all': - ele = wait.until(EC.presence_of_all_elements_located(loc)) - return ele - except: - if mode == 'single': - return None - elif mode == 'all': - return [] - - def search_all(self, value: str, timeout: float = 10) -> list: - """根据内容搜索元素""" - return self.search(value, mode='all', timeout=timeout) - - def get_attr(self, loc_or_ele: Union[WebElement, tuple], attr: str) -> str: - """获取元素属性""" - ele = self._get_ele(loc_or_ele) - try: - return ele.get_attribute(attr) - except: - return '' - - def get_html(self, loc_or_ele: Union[WebElement, tuple] = None) -> str: - """获取元素innerHTML,如未指定元素则获取页面源代码""" - if not loc_or_ele: - return self.driver.find_element_by_xpath("//*").get_attribute("outerHTML") - return unescape(self.get_attr(loc_or_ele, 'innerHTML')).replace('\xa0', ' ') - - def get_text(self, loc_or_ele: Union[WebElement, tuple]) -> str: - """获取innerText""" - return unescape(self.get_attr(loc_or_ele, 'innerText')).replace('\xa0', ' ') + return self.ele(loc, mode='all', timeout=timeout, show_errmsg=show_errmsg) # ----------------以下为独有函数----------------------- - def find_visible(self, loc: tuple, timeout: float = 10, show_errmsg: bool = True) -> WebElement: - """查找一个可见元素""" - return self.find(loc, mode='visible', timeout=timeout, show_errmsg=show_errmsg) + def check_page(self) -> Union[bool, None]: + """检查页面是否符合预期 + 由子类自行实现各页面的判定规则""" + return None - def check_driver_url(self) -> bool: - """由子类自行实现各页面的判定规则""" - return True - - def input(self, loc_or_ele: Union[WebElement, tuple], value: str, clear: bool = True) -> bool: - """向文本框填入文本""" - ele = self._get_ele(loc_or_ele) - try: - if clear: - self.run_script(ele, "arguments[0].value=''") - ele.send_keys(value) - return True - except: - raise - - def click(self, loc_or_ele: Union[WebElement, tuple]) -> bool: - """点击一个元素""" - ele = self._get_ele(loc_or_ele) - if not ele: - raise - for _ in range(10): - try: - ele.click() - return True - except Exception as e: - print(e) - sleep(0.2) - # 点击失败代表被遮挡,用js方式点击 - print(f'用js点击{loc_or_ele}') - try: - self.run_script(ele, 'arguments[0].click()') - return True - except: - raise - - def set_attr(self, loc_or_ele: Union[WebElement, tuple], attribute: str, value: str) -> bool: - """设置元素属性""" - ele = self._get_ele(loc_or_ele) - try: - self.driver.execute_script(f"arguments[0].{attribute} = '{value}';", ele) - return True - except: - raise - - def run_script(self, loc_or_ele: Union[WebElement, tuple], script: str) -> bool: + def run_script(self, script: str) -> Any: """执行js脚本""" - ele = self._get_ele(loc_or_ele) + ele = self.ele(('css selector', 'html')) try: - return self.driver.execute_script(script, ele) + return ele.run_script(script) except: raise @@ -228,10 +125,10 @@ class DriverPage(object): """关闭当前标签页""" self.driver.close() - def close_other_tabs(self, tab_index: int = None) -> None: - """关闭其它标签页,没有传入序号代表保留当前页""" + def close_other_tabs(self, index: int = None) -> None: + """传入序号,关闭序号以外标签页,没有传入序号代表保留当前页""" tabs = self.driver.window_handles # 获得所有标签页权柄 - page_handle = tabs[tab_index] if tab_index >= 0 else self.driver.current_window_handle + page_handle = tabs[index] if index >= 0 else self.driver.current_window_handle for i in tabs: # 遍历所有标签页,关闭非保留的 if i != page_handle: self.driver.switch_to.window(i) @@ -244,39 +141,55 @@ class DriverPage(object): self.driver.switch_to.default_content() return True else: - ele = self._get_ele(loc_or_ele) + ele = self.ele(loc_or_ele) try: - self.driver.switch_to.frame(ele) + self.driver.switch_to.frame(ele.inner_ele) return True except: raise - def get_screen(self, loc_or_ele: Union[WebElement, tuple], path: str, file_name: str = None) -> str: - """获取元素截图""" - ele = self._get_ele(loc_or_ele) - name = file_name if file_name else ele.tag_name - # 等待元素加载完成 - js = 'return arguments[0].complete && typeof arguments[0].naturalWidth ' \ - '!= "undefined" && arguments[0].naturalWidth > 0' - while not self.run_script(ele, js): - pass + def screenshot(self, path: str = None, filename: str = None) -> str: + """获取网页截图""" + ele = self.ele(('css selector', 'html')) + path = path or OptionsManager().get_value('paths', 'global_tmp_path') + if not path: + raise IOError('No path specified.') + name = filename or self.title img_path = f'{path}\\{name}.png' - ele.screenshot(img_path) + ele.screenshot(path, name) return img_path def scroll_to_see(self, loc_or_ele: Union[WebElement, tuple]) -> None: """滚动直到元素可见""" - ele = self._get_ele(loc_or_ele) - self.run_script(ele, "arguments[0].scrollIntoView();") + ele = self.ele(loc_or_ele) + ele.run_script("arguments[0].scrollIntoView();") - def choose_select_list(self, loc_or_ele: Union[WebElement, tuple], text: str) -> bool: - """选择下拉列表""" - ele = Select(self._get_ele(loc_or_ele)) - try: - ele.select_by_visible_text(text) - return True - except: - return False + def scroll_to(self, mode: str = 'bottom', pixel: int = 300) -> None: + """滚动页面,按照参数决定如何滚动 + :param mode: 滚动的方向,top、bottom、rightmost、leftmost、up、down、left、right + :param pixel: 滚动的像素 + :return: None + """ + if mode == 'top': + self.driver.execute_script("window.scrollTo(document.documentElement.scrollLeft,0);") + elif mode == 'bottom': + self.driver.execute_script( + "window.scrollTo(document.documentElement.scrollLeft,document.body.scrollHeight);") + elif mode == 'rightmost': + self.driver.execute_script("window.scrollTo(document.body.scrollWidth,document.documentElement.scrollTop);") + elif mode == 'leftmost': + self.driver.execute_script("window.scrollTo(0,document.documentElement.scrollTop);") + elif mode == 'up': + self.driver.execute_script(f"window.scrollBy(0,-{pixel});") + elif mode == 'down': + self.driver.execute_script(f"window.scrollBy(0,{pixel});") + elif mode == 'left': + self.driver.execute_script(f"window.scrollBy(-{pixel},0);") + elif mode == 'right': + self.driver.execute_script(f"window.scrollBy({pixel},0);") + else: + raise KeyError( + "mode must be selected among 'top','bottom','rightmost','leftmost','up','down','left','right'.") def refresh(self) -> None: """刷新页面""" @@ -291,11 +204,19 @@ class DriverPage(object): if not x and not y: self.driver.maximize_window() else: - new_x = x if x else self.driver.get_window_size()['width'] - new_y = y if y else self.driver.get_window_size()['height'] + if x <= 0 or y <= 0: + raise KeyError('x and y must greater than 0.') + new_x = x or self.driver.get_window_size()['width'] + new_y = y or self.driver.get_window_size()['height'] self.driver.set_window_size(new_x, new_y) - def close_driver(self) -> None: - """关闭driver及浏览器""" - self._driver.quit() - self._driver = None + def is_downloading(self, download_path: str = None) -> bool: + if download_path: + p = download_path + else: + try: + p = OptionsManager().get_value('chrome_options', 'experimental_options')['prefs'][ + 'download.default_directory'] + except IOError('No download path found.'): + raise + return not glob(f'{p}\\*.crdownload') diff --git a/DrissionPage/mix_element.py b/DrissionPage/mix_element.py deleted file mode 100644 index 31f08f7..0000000 --- a/DrissionPage/mix_element.py +++ /dev/null @@ -1,255 +0,0 @@ -# -*- coding:utf-8 -*- -""" -@Author : g1879 -@Contact : g1879@qq.com -@File : mix_page.py -""" -import re -from html import unescape -from time import sleep -from typing import Union - -from requests_html import Element -from selenium.webdriver.remote.webelement import WebElement -from selenium.webdriver.support import expected_conditions as EC -from selenium.webdriver.support.select import Select -from selenium.webdriver.support.wait import WebDriverWait - -from .config import global_tmp_path -from .session_page import _translate_loc - - -class MixElement(object): - def __init__(self, ele: Union[WebElement, Element]): - self._ele = ele - - @property - def ele(self) -> Union[WebElement, Element]: - """返回元素对象""" - return self._ele - - @property - def text(self) -> str: - """元素内文本""" - if isinstance(self._ele, Element): - return unescape(self._ele.text).replace('\xa0', ' ') - else: - return unescape(self.attr('innerText')).replace('\xa0', ' ') - - @property - def html(self) -> str: - """元素innerHTML""" - if isinstance(self._ele, Element): - html = unescape(self._ele.html).replace('\xa0', ' ') - r = re.match(r'<.*?>(.*)', html, flags=re.DOTALL) - return r.group(1) - else: - return unescape(self.attr('innerHTML')).replace('\xa0', ' ') - - @property - def tag_name(self) -> str: - """获取标签名""" - if isinstance(self._ele, Element): - html = unescape(self._ele.html).replace('\xa0', ' ') - r = re.match(r'^<(.*?)\s+', html, flags=re.DOTALL) - return r.group(1) - else: - return self._ele.tag_name - - def attr(self, attr) -> str: - """获取属性值""" - if isinstance(self._ele, Element): - try: - if attr == 'href': - # 如直接获取attr只能获取相对地址 - for link in self._ele.absolute_links: - return link - elif attr == 'class': - class_str = '' - for key, i in enumerate(self._ele.attrs['class']): - class_str += ' ' if key > 0 else '' - class_str += i - return class_str - else: - return self._ele.attrs[attr] - except: - return '' - else: - return self._ele.get_attribute(attr) - - def find(self, loc: tuple, mode: str = None, show_errmsg: bool = True) -> Union[WebElement, Element, list, None]: - """根据loc获取元素""" - if isinstance(self._ele, Element): - mode = mode if mode else 'single' - if mode not in ['single', 'all']: - raise ValueError("mode须在'single', 'all'中选择") - loc_by, loc_str = _translate_loc(loc) - msg = ele = None - try: - if mode == 'single': - msg = '未找到元素' - if loc_by == 'xpath': - ele = MixElement(self.ele.xpath(loc_str, first=True, _encoding='utf-8')) - else: - ele = MixElement(self.ele.find(loc_str, first=True, _encoding='utf-8')) - elif mode == 'all': - msg = '未找到元素s' - if loc_by == 'xpath': - ele = self.ele.xpath(loc_str, first=False, _encoding='utf-8') - else: - ele = self.ele.find(loc_str, first=False, _encoding='utf-8') - return ele - except: - if show_errmsg: - print(msg, loc) - raise - else: # d模式 - mode = mode if mode else 'single' - if mode not in ['single', 'all', 'visible']: - raise ValueError("mode须在'single', 'all', 'visible'中选择") - msg = ele = None - try: - wait = WebDriverWait(self.ele.parent, timeout=10) - if mode == 'single': - msg = '未找到元素' - ele = wait.until(EC.presence_of_element_located(loc)) - elif mode == 'all': - msg = '未找到元素s' - ele = MixElement(wait.until(EC.presence_of_all_elements_located(loc))) - elif mode == 'visible': - msg = '元素不可见或不存在' - ele = wait.until(EC.visibility_of_element_located(loc)) - return ele - except: - if show_errmsg: - print(msg, loc) - raise - - def find_all(self, loc: tuple, show_errmsg: bool = True) -> list: - """根据loc获取子元素列表""" - return self.find(loc, mode='all', show_errmsg=show_errmsg) - - def search(self, value: str, mode: str = None): - """根据内容获取元素""" - mode = mode if mode else 'single' - if mode not in ['single', 'all']: - raise ValueError("mode须在'single', 'all'中选择") - if isinstance(self._ele, Element): - try: - if mode == 'single': - ele = self.ele.xpath(f'.//*[contains(text(),"{value}")]', first=True) - return MixElement(ele) - elif mode == 'all': - eles = self.ele.xpath(f'.//*[contains(text(),"{value}")]') - return [MixElement(ele) for ele in eles] - except: - return None - else: # d模式 - try: - loc = 'xpath', f'.//*[contains(text(),"{value}")]' - wait = WebDriverWait(self.ele.parent, timeout=10) - if mode == 'single': - ele = wait.until(EC.presence_of_element_located(loc)) - return MixElement(ele) - elif mode == 'all': - eles = wait.until(EC.presence_of_all_elements_located(loc)) - return [MixElement(ele) for ele in eles] - except: - return None - - def search_all(self, value: str) -> list: - """根据内容获取元素列表""" - return self.search(value, mode='all') - - # -----------------以下为d模式独占------------------- - def click(self) -> bool: - """点击""" - for _ in range(10): - try: - self.ele.click() - return True - except Exception as e: - print(e) - sleep(0.2) - # 若点击失败,用js方式点击 - print('用js点击') - try: - self.run_script('arguments[0].click()') - return True - except: - raise - - def input(self, value, clear: bool = True) -> bool: - """输入文本""" - try: - if clear: - self.run_script("arguments[0].value=''") - self.ele.send_keys(value) - return True - except: - raise - - def run_script(self, script: str): - """运行js""" - self.ele.parent.execute_script(script, self.ele) - - def submit(self): - """提交表单""" - self.ele.submit() - - def clear(self): - """清空元素""" - self.ele.clear() - - def is_selected(self) -> bool: - """是否选中""" - return self.ele.is_selected() - - def is_enabled(self) -> bool: - """是否可用""" - return self.ele.is_enabled() - - def is_displayed(self) -> bool: - """是否可见""" - return self.ele.is_displayed() - - @property - def size(self): - """元素大小""" - return self.ele.size - - @property - def location(self): - """元素坐标""" - return self.ele.location - - def screenshot(self, path: str = None, filename: str = None) -> str: - """元素截图""" - path = path if path else global_tmp_path - name = filename if filename else self.tag_name - # 等待元素加载完成 - if self.tag_name == 'img': - js = 'return arguments[0].complete && typeof arguments[0].naturalWidth ' \ - '!= "undefined" && arguments[0].naturalWidth > 0' - while not self.run_script(js): - pass - img_path = f'{path}\\{name}.png' - self.ele.screenshot(img_path) - return img_path - - def select(self, text: str): - """选择下拉列表""" - ele = Select(self.ele) - try: - ele.select_by_visible_text(text) - return True - except: - return False - - def set_attr(self, attr, value) -> bool: - """设置元素属性""" - try: - self.run_script(f"arguments[0].{attr} = '{value}';") - return True - except: - raise diff --git a/DrissionPage/mix_page.py b/DrissionPage/mix_page.py index df8c07f..82962ee 100644 --- a/DrissionPage/mix_page.py +++ b/DrissionPage/mix_page.py @@ -4,16 +4,17 @@ @Contact : g1879@qq.com @File : mix_page.py """ -from typing import Union +from typing import Union, List from urllib import parse from requests import Response -from requests_html import Element, HTMLSession +from requests_html import HTMLSession from selenium.webdriver.chrome.webdriver import WebDriver -from selenium.webdriver.remote.webelement import WebElement from .drission import Drission +from .driver_element import DriverElement from .driver_page import DriverPage +from .session_element import SessionElement from .session_page import SessionPage @@ -31,10 +32,9 @@ class MixPage(Null, SessionPage, DriverPage): 这些功能由DriverPage和SessionPage类实现。 """ - def __init__(self, drission: Drission, locs=None, mode='d'): + def __init__(self, drission: Drission, mode='d', timeout: float = 10): """初始化函数 :param drission: 整合了driver和session的类 - :param locs: 提供页面元素地址的类 :param mode: 默认使用selenium的d模式 """ super().__init__() @@ -43,13 +43,15 @@ class MixPage(Null, SessionPage, DriverPage): self._driver = None self._url = None self._response = None - self._locs = locs + self.timeout = timeout self._url_available = None self._mode = mode if mode == 's': self._session = self._drission.session elif mode == 'd': self._driver = self._drission.driver + else: + raise KeyError("mode must be 'd' or 's'.") @property def url(self) -> str: @@ -70,20 +72,30 @@ class MixPage(Null, SessionPage, DriverPage): """ return self._mode - def change_mode(self, mode: str = None) -> None: + def change_mode(self, mode: str = None, go: bool = True) -> None: """切换模式,接收字符串s或d,除此以外的字符串会切换为d模式 - 切换后调用相应的get函数使访问的页面同步 + 切换时会把当前模式的cookies复制到目标模式 + 切换后,如果go是True,调用相应的get函数使访问的页面同步 :param mode: 模式字符串 + :param go: 是否跳转到原模式的url """ if mode == self._mode: return self._mode = 's' if self._mode == 'd' else 'd' if self._mode == 'd': # s转d self._url = super(SessionPage, self).url - self.get(self.session_url) + if self.session_url: + self.cookies_to_driver(self.session_url) + if go: + self.get(self.session_url) elif self._mode == 's': # d转s self._url = self.session_url - self.get(super(SessionPage, self).url) + if self._session is None: + self._session = self._drission.session + if self._driver: + self.cookies_to_session() + if go: + self.get(super(SessionPage, self).url) @property def drission(self) -> Drission: @@ -109,7 +121,7 @@ class MixPage(Null, SessionPage, DriverPage): """ if self._session is None: self._session = self._drission.session - self.change_mode('s') + # self.change_mode('s') return self._session @property @@ -126,109 +138,82 @@ class MixPage(Null, SessionPage, DriverPage): elif self._mode == 'd': return super(SessionPage, self).cookies - def check_driver_url(self) -> bool: - """判断页面是否能访问,由子类依据不同的页面自行实现""" - return True - - def cookies_to_session(self) -> None: - """从driver复制cookies到session""" - self._drission.cookies_to_session() + def cookies_to_session(self, copy_user_agent: bool = False) -> None: + """从driver复制cookies到session + :param copy_user_agent : 是否复制user agent信息 + """ + self._drission.cookies_to_session(copy_user_agent) def cookies_to_driver(self, url=None) -> None: """从session复制cookies到driver,chrome需要指定域才能接收cookies""" - u = url if url else self.session_url + u = url or self.session_url self._drission.cookies_to_driver(u) + # ----------------重写SessionPage的函数----------------------- + + def post(self, url: str, params: dict = None, data: dict = None, go_anyway: bool = False, **kwargs) \ + -> Union[bool, None]: + """post前先转换模式,但不跳转""" + self.change_mode('s', go=False) + return super().post(url, params, data, go_anyway, **kwargs) + # ----------------以下为共用函数----------------------- - def get(self, url: str, params: dict = None, go_anyway=False, **kwargs) -> Union[bool, Response, None]: + def get(self, url: str, params: dict = None, go_anyway=False, **kwargs) -> Union[bool, None]: """跳转到一个url,跳转前先同步cookies,跳转后判断目标url是否可用""" to_url = f'{url}?{parse.urlencode(params)}' if params else url if not url or (not go_anyway and self.url == to_url): return if self._mode == 'd': - if self.session_url: - self.cookies_to_driver(self.session_url) super(SessionPage, self).get(url=to_url, go_anyway=go_anyway) - if self._session: - ua = {"User-Agent": "Mozilla/5.0 (Windows; U; Windows NT 5.1; zh-CN; rv:1.9.1.6) "} - return True if self._session.get(to_url, headers=ua).status_code == 200 else False + if self.session_url == self.url: + self._url_available = True if self._response and self._response.status_code == 200 else False else: - return self.check_driver_url() + self._url_available = self.check_page() + return self._url_available elif self._mode == 's': - if self._session is None: - self._session = self._drission.session - if self._driver: - self.cookies_to_session() - super().get(url=to_url, go_anyway=go_anyway, **self.drission.session_options) + super().get(url=to_url, go_anyway=go_anyway, **kwargs) return self._url_available - def find(self, loc: tuple, mode=None, timeout: float = 10, show_errmsg: bool = True) -> Union[WebElement, Element]: + def ele(self, loc_or_ele: Union[tuple, str, DriverElement, SessionElement], mode: str = None, timeout: float = None, + show_errmsg: bool = False) -> Union[DriverElement, SessionElement]: """查找一个元素,根据模式调用对应的查找函数 - :param loc: 页面元素地址 + :param loc_or_ele: 页面元素地址 :param mode: 以某种方式查找元素,可选'single','all','visible'(d模式独有) :param timeout: 超时时间 :param show_errmsg: 是否显示错误信息 :return: 页面元素对象,s模式下返回Element,d模式下返回WebElement """ if self._mode == 's': - return super().find(loc, mode=mode, show_errmsg=show_errmsg) + return super().ele(loc_or_ele, mode=mode, show_errmsg=show_errmsg) elif self._mode == 'd': - return super(SessionPage, self).find(loc, mode=mode, timeout=timeout, show_errmsg=show_errmsg) + timeout = timeout or self.timeout + # return super(SessionPage, self).ele(loc_or_ele, mode=mode, timeout=timeout, show_errmsg=show_errmsg) + return DriverPage.ele(self, loc_or_ele, mode=mode, timeout=timeout, show_errmsg=show_errmsg) - def find_all(self, loc: tuple, timeout: float = 10, show_errmsg: bool = True) -> list: + def eles(self, loc_or_str: Union[tuple, str], timeout: float = None, show_errmsg: bool = False) -> List[ + DriverElement]: """查找符合条件的所有元素""" if self._mode == 's': - return super().find_all(loc, show_errmsg) + return super().eles(loc_or_str, show_errmsg) elif self._mode == 'd': - return super(SessionPage, self).find_all(loc, timeout=timeout, show_errmsg=show_errmsg) + return super(SessionPage, self).eles(loc_or_str, timeout=timeout, show_errmsg=show_errmsg) - def search(self, value: str, mode: str = None, timeout: float = 10) -> Union[WebElement, Element, None]: - """根据内容搜索元素 - :param value: 搜索内容 - :param mode: 可选'single','all' - :param timeout: 超时时间 - :return: 页面元素对象,s模式下返回Element,d模式下返回WebElement - """ + @property + def html(self) -> str: + """获取页面HTML""" if self._mode == 's': - return super().search(value, mode=mode) + return super().html elif self._mode == 'd': - return super(SessionPage, self).search(value, mode=mode, timeout=timeout) + return super(SessionPage, self).html - def search_all(self, value: str, timeout: float = 10) -> list: - """根据内容搜索元素""" - if self._mode == 's': - return super().search_all(value) - elif self._mode == 'd': - return super(SessionPage, self).search_all(value, timeout=timeout) - - def get_attr(self, loc_or_ele: Union[WebElement, Element, tuple], attr: str) -> str: - """获取元素属性值""" - if self._mode == 's': - return super().get_attr(loc_or_ele, attr) - elif self._mode == 'd': - return super(SessionPage, self).get_attr(loc_or_ele, attr) - - def get_html(self, loc_or_ele: Union[WebElement, Element, tuple] = None) -> str: - """获取元素innerHTML,如未指定元素则获取页面源代码""" - if self._mode == 's': - return super().get_html(loc_or_ele) - elif self._mode == 'd': - return super(SessionPage, self).get_html(loc_or_ele) - - def get_text(self, loc_or_ele) -> str: - """获取元素innerText""" - if self._mode == 's': - return super().get_text(loc_or_ele) - elif self._mode == 'd': - return super(SessionPage, self).get_text(loc_or_ele) - - def get_title(self) -> str: + @property + def title(self) -> str: """获取页面title""" if self._mode == 's': - return super().get_title() + return super().title elif self._mode == 'd': - return super(SessionPage, self).get_title() + return super(SessionPage, self).title def close_driver(self) -> None: """关闭driver及浏览器,切换到s模式""" diff --git a/DrissionPage/session_page.py b/DrissionPage/session_page.py index 8014f70..ee62175 100644 --- a/DrissionPage/session_page.py +++ b/DrissionPage/session_page.py @@ -4,54 +4,27 @@ @Contact : g1879@qq.com @File : session_page.py """ -import re -from html import unescape -from typing import Union +import os +from pathlib import Path +from random import random +from time import time +from typing import Union, List from urllib import parse -from requests_html import Element, HTMLSession, HTMLResponse +from requests_html import HTMLSession, HTMLResponse -from .config import global_session_options - - -def _translate_loc(loc): - """把By类型转为xpath或css selector""" - loc_by = loc_str = None - if loc[0] == 'xpath': - loc_by = 'xpath' - loc_str = loc[1] - elif loc[0] == 'css selector': - loc_by = 'css selector' - loc_str = loc[1] - elif loc[0] == 'id': - loc_by = 'css selector' - loc_str = f'#{loc[1]}' - elif loc[0] == 'class name': - loc_by = 'xpath' - loc_str = f'//*[@class="{loc[1]}"]' - elif loc[0] == 'link text': - loc_by = 'xpath' - loc_str = f'//a[text()="{loc[1]}"]' - elif loc[0] == 'name': - loc_by = 'css selector' - loc_str = f'[name={loc[1]}]' - elif loc[0] == 'tag name': - loc_by = 'css selector' - loc_str = loc[1] - elif loc[0] == 'partial link text': - loc_by = 'xpath' - loc_str = f'//a[contains(text(),"{loc[1]}")]' - return loc_by, loc_str +from .common import get_loc_from_str, translate_loc_to_xpath, avoid_duplicate_name +from .config import OptionsManager +from .session_element import SessionElement, execute_session_find class SessionPage(object): - """SessionPage封装了页面操作的常用功能,使用requests_html来获取、解析网页。 - """ + """SessionPage封装了页面操作的常用功能,使用requests_html来获取、解析网页。""" - def __init__(self, session: HTMLSession, locs=None): + def __init__(self, session: HTMLSession): """初始化函数""" self._session = session - self._locs = locs + # self._locs = locs self._url = None self._url_available = None self._response = None @@ -79,130 +52,124 @@ class SessionPage(object): """当前session的cookies""" return self.session.cookies.get_dict() - def get_title(self) -> str: + @property + def title(self) -> str: """获取网页title""" - return self.get_text(('css selector', 'title')) + return self.ele(('css selector', 'title')).text - def find(self, loc: tuple, mode: str = None, show_errmsg: bool = True) -> Union[Element, list]: + @property + def html(self) -> str: + """获取元素innerHTML,如未指定元素则获取所有源代码""" + return self.response.html.html + + def ele(self, loc_or_ele: Union[tuple, str, SessionElement], mode: str = None, show_errmsg: bool = False) \ + -> Union[SessionElement, List[SessionElement], None]: """查找一个元素 - :param loc: 页面元素地址 + :param loc_or_ele: 页面元素地址 :param mode: 以某种方式查找元素,可选'single','all' :param show_errmsg: 是否显示错误信息 :return: 页面元素对象或列表 """ - mode = mode if mode else 'single' - if mode not in ['single', 'all']: - raise ValueError("mode须在'single', 'all'中选择") - loc_by, loc_str = _translate_loc(loc) - msg = first = None - try: - if mode == 'single': - msg = '未找到元素' - first = True - elif mode == 'all': - msg = '未找到元素s' - first = False - if loc_by == 'xpath': - return self.response.html.xpath(loc_str, first=first, _encoding='utf-8') - else: - return self.response.html.find(loc_str, first=first, _encoding='utf-8') - except: - if show_errmsg: - print(msg, loc) - raise + if isinstance(loc_or_ele, SessionElement): + return loc_or_ele + elif isinstance(loc_or_ele, str): + loc = get_loc_from_str(loc_or_ele) + else: + loc = translate_loc_to_xpath(loc_or_ele) - def find_all(self, loc: tuple, show_errmsg: bool = True) -> list: + return execute_session_find(self.response.html, loc, mode, show_errmsg) + + def eles(self, loc: Union[tuple, str], show_errmsg: bool = False) -> List[SessionElement]: """查找符合条件的所有元素""" - return self.find(loc, mode='all', show_errmsg=True) - - def search(self, value: str, mode: str = None) -> Union[Element, list, None]: - """根据内容搜索元素 - :param value: 搜索内容 - :param mode: 可选'single','all' - :return: 页面元素对象 - """ - mode = mode if mode else 'single' - if mode not in ['single', 'all']: - raise ValueError("mode须在'single', 'all'中选择") - try: - if mode == 'single': - ele = self.response.html.xpath(f'.//*[contains(text(),"{value}")]', first=True) - return ele - elif mode == 'all': - eles = self.response.html.xpath(f'.//*[contains(text(),"{value}")]') - return eles - except: - return - - def search_all(self, value: str) -> list: - """根据内容搜索元素""" - return self.search(value, mode='all') - - def _get_ele(self, loc_or_ele: Union[Element, tuple]) -> Element: - """获取loc或元素实例,返回元素实例""" - # ====================================== - # ** 必须与DriverPage类中同名函数保持一致 ** - # ====================================== - if isinstance(loc_or_ele, tuple): - return self.find(loc_or_ele) - return loc_or_ele - - def get_attr(self, loc_or_ele: Union[Element, tuple], attr: str) -> str: - """获取元素属性""" - ele = self._get_ele(loc_or_ele) - try: - if attr == 'href': - # 如直接获取attr只能获取相对地址 - for link in ele.absolute_links: - return link - elif attr == 'class': - class_str = '' - for key, i in enumerate(ele.attrs['class']): - class_str += ' ' if key > 0 else '' - class_str += i - return class_str - else: - return ele.attrs[attr] - except: - return '' - - def get_html(self, loc_or_ele: Union[Element, tuple] = None) -> str: - """获取元素innerHTML,如未指定元素则获取所有源代码""" - if not loc_or_ele: - return self.response.html.html - ele = self._get_ele(loc_or_ele) - re_str = r'<.*?>(.*)' - html = unescape(ele.html).replace('\xa0', ' ') - r = re.match(re_str, html, flags=re.DOTALL) - return r.group(1) - - def get_text(self, loc_or_ele: Union[Element, tuple]) -> str: - """获取innerText""" - ele = self._get_ele(loc_or_ele) - return unescape(ele.text).replace('\xa0', ' ') + return self.ele(loc, mode='all', show_errmsg=True) def get(self, url: str, params: dict = None, go_anyway: bool = False, **kwargs) -> Union[bool, None]: """用get方式跳转到url,调用_make_response()函数生成response对象""" to_url = f'{url}?{parse.urlencode(params)}' if params else url if not url or (not go_anyway and self.url == to_url): return - self._response = self._make_response(to_url, **kwargs)[0] - self._url_available = self._response + self._url = url + self._response = self._make_response(to_url, **kwargs) + if self._response: + self._response.html.encoding = self._response.encoding # 修复requests_html丢失编码方式的bug + self._url_available = True if self._response and self._response.status_code == 200 else False return self._url_available - # ------------以下为独占函数-------------- - def post(self, url: str, params: dict = None, data: dict = None, go_anyway: bool = False, **kwargs) \ -> Union[bool, None]: """用post方式跳转到url,调用_make_response()函数生成response对象""" to_url = f'{url}?{parse.urlencode(params)}' if params else url if not url or (not go_anyway and self._url == to_url): return - self._response = self._make_response(to_url, mode='post', data=data, **kwargs)[0] - self._url_available = self._response + self._url = url + self._response = self._make_response(to_url, mode='post', data=data, **kwargs) + if self._response: + self._response.html.encoding = self._response.encoding # 修复requests_html丢失编码方式的bug + self._url_available = True if self._response and self._response.status_code == 200 else False return self._url_available - def _make_response(self, url: str, mode: str = 'get', data: dict = None, **kwargs) -> tuple: + def download(self, file_url: str, goal_path: str = None, rename: str = None, **kwargs) -> tuple: + """下载一个文件,生成的response不写入self._response,是临时的""" + goal_path = goal_path or OptionsManager().get_value('paths', 'global_tmp_path') + if not goal_path: + raise IOError('No path specified.') + + kwargs['stream'] = True + if 'timeout' not in kwargs: + kwargs['timeout'] = 20 + + r = self._make_response(file_url, mode='get', **kwargs) + if not r: + print('Invalid link') + return False, 'Invalid link' + # -------------------获取文件名------------------- + # header里有文件名,则使用它,否则在url里截取,但不能保证url包含文件名 + if 'Content-disposition' in r.headers: + file_name = r.headers['Content-disposition'].split('"')[1].encode('ISO-8859-1').decode('utf-8') + elif os.path.basename(file_url): + file_name = os.path.basename(file_url).split("?")[0] + else: + file_name = f'untitled_{time()}_{random.randint(0, 100)}' + file_full_name = rename or file_name + # 避免和现有文件重名 + file_full_name = avoid_duplicate_name(goal_path, file_full_name) + # 打印要下载的文件 + print_txt = file_full_name if file_name == file_full_name else f'{file_name} -> {file_full_name}' + print(print_txt) + # -------------------开始下载------------------- + # 获取远程文件大小 + file_size = int(r.headers['Content-Length']) if 'Content-Length' in r.headers else None + # 已下载文件大小和下载状态 + downloaded_size, download_status = 0, False + # 完整的存放路径 + full_path = Path(f'{goal_path}\\{file_full_name}') + try: + with open(str(full_path), 'wb') as tmpFile: + print(f'Downloading to: {goal_path}') + for chunk in r.iter_content(chunk_size=1024): + if chunk: + tmpFile.write(chunk) + # 如表头有返回文件大小,显示进度 + if file_size: + downloaded_size += 1024 + rate = downloaded_size / file_size if downloaded_size < file_size else 1 + print('\r {:.0%} '.format(rate), end="") + except Exception as e: + download_status, info = False, f'Download failed.\n{e}' + raise + else: + download_status, info = (False, 'File size is 0.') if full_path.stat().st_size == 0 else (True, 'Success.') + finally: + # 删除下载出错文件 + if not download_status and full_path.exists(): + full_path.unlink() + r.close() + # -------------------显示并返回值------------------- + print(info, '\n') + info = file_full_name if download_status else info + return download_status, info + + def _make_response(self, url: str, mode: str = 'get', data: dict = None, **kwargs) -> Union[HTMLResponse, bool]: """生成response对象。接收mode参数,以决定用什么方式。 :param url: 要访问的网址 :param mode: 'get','post'中选择 @@ -211,14 +178,17 @@ class SessionPage(object): :return: Response对象 """ if mode not in ['get', 'post']: - raise ValueError("mode须在'get', 'post'中选择") - self._url = url - if not kwargs: - kwargs = global_session_options - else: - for i in global_session_options: - if i not in kwargs: - kwargs[i] = global_session_options[i] + raise ValueError("mode must be 'get' or 'post'.") + + # 设置referer值 + if self._url: + if 'headers' in set(x.lower() for x in kwargs): + if 'referer' not in set(x.lower() for x in kwargs['headers']): + kwargs['headers']['Referer'] = self._url + else: + kwargs['headers'] = self.session.headers + kwargs['headers']['Referer'] = self._url + try: r = None if mode == 'get': @@ -227,12 +197,7 @@ class SessionPage(object): r = self.session.post(url, data=data, **kwargs) except: return_value = False - info = 'URL Invalid' else: - if r.status_code == 200: - return_value = r - info = 'Success' - else: - return_value = False - info = f'{r.status_code}' - return return_value, info + # r.encoding = 'utf-8' + return_value = r + return return_value diff --git a/README.md b/README.md index d71d298..97f11d1 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,1176 @@ -# DrissionPage +# 简介 +*** -#### 介绍 -一个整合了selenium和requests_html的模块,封装了常用页面操作,可实现两种模式的无缝切换。兼顾selenium的易用性和requests的高性能,也可直接用于PO模式。 -适用于网页自动化,有效减少代码量。 +DrissionPage,即driver和session的合体。 +它是一个python库,是个Web自动化操作集成工具。 +它整合了selenium和requests_html,实现了它们之间的无缝切换。 +因此可以兼顾selenium的便利性和requests的高效率。 +它封装了页面元素常用的方法,很适合自动化操作PO模式的扩展。 +更棒的是,它的使用方式非常人性化,代码量少,对新手友好。 + +# 背景 + +*** + +新手学习爬虫时,面对须要登录的网站,要分析数据包、JS源码,构造复杂的请求,往往还要应付验证码、JS混淆、签名参数等反爬手段,学习门槛较高。获取数据时,有的数据是由JS计算生成的,若只拿到源数据,还须重现计算过程,体验不好,开发效率不高。 + +使用selenium,可以很大程度上绕过这些坑,但selenium效率不高。因此,这个库要做的,是将selenium和requests合而为一,不同须要时切换相应模式,并提供一种人性化的使用方法,提高开发和运行效率。 + +除了合并两者,本库还以网页为单位封装了常用功能,简化了selenium的操作和语句,在用于网页自动化操作时,减少考虑细节,专注功能实现,使用更方便。 + +# 特性 + +*** + +- 允许在selenium和requests间无缝切换,共享session。 +- 两种模式提供统一的操作方法,使用体验一致。 +- 以页面为单位封装常用方法,便于PO模式扩展。 +- 人性化的页面元素操作方法,减轻页面分析工作量和编码量。 +- 对某些常用功能(如点击)作了优化,更符合实际使用需要。 + +# 简单演示 + +*** + +例:用selenium登录网站,然后切换到requests读取网页,打印元素属性。 + +```python +from DrissionPage import * +from time import sleep + +drission = Drission() # 创建驱动器对象 +page = MixPage(drission) # 创建页面对象,默认driver模式 +page.get('https://gitee.com/profile') # 访问个人中心页面(未登录,重定向到登录页面) + +# 使用selenium输入账号密码登录 +page.ele('@id:user_login').input('your_user_name') +page.ele('@id:user_password').input('your_password\n') +sleep(1) # 等待登录 + +page.change_mode() # 切换到session模式 + +print('登录后title:', page.title, '\n') # 登录后session模式的输出 + +# 获取并打印属性 +foot = page.ele('@id:footer-left') # 用id查找元素 +first_col = foot.ele('css:>div') # 使用css selector在元素的下级中查找元素(第一个) +lnk = first_col.ele('text:命令学') # 使用文本内容查找元素 +text = lnk.text # 获取元素文本 +href = lnk.attr('href') # 获取元素属性值 + +print(first_col) +print(text, href) +``` + +输出: + +``` +登录后title: 个人资料 - 码云 Gitee.com + + +Git 命令学习 https://oschina.gitee.io/learn-git-branching/ +``` + +# 安装 + +*** + +``` +pip install DrissionPage +``` +只支持python3.6及以上版本,driver模式目前只支持chrome。 +若要使用driver模式,须下载chrome和对应版本的chromedriver。[[chromedriver下载]](https://chromedriver.chromium.org/downloads) +目前只在Windows环境下作了测试。 + +# 使用方法 + +*** + +## 导入模块 + +```python +from DrissionPage import * +``` + +## 创建驱动器对象 + +Drission对象用于管理driver和session对象。本库维护了一个ini文件,可直接从里面的配置信息创建驱动器。详细方法见[保存配置](# 保存配置)。也可以在初始化时传入配置信息。 + +**driver模式注意事项(只使用session模式可忽略):** + +- 须指定driver_chrome.exe和chrome.exe路径。 +- 两个路径可创建时传入,也可保存到ini文件中,还可以写入系统变量(三选一)。 +- 注意chromedriver.exe和chrome.exe版本匹配。 + +```python +# 两个路径已写入系统变量 +drission = Drission() + +# 用传入的配置信息创建 +from DrissionPage.config import DriverOptions +driver_options = DriverOptions() # 创建driver配置对象 +driver_options.binary_location = 'D:\\chrome\\chrome.exe' # chrome.exe路径 +driver_path = 'C:\\chrome\\chromedriver.exe' # driver_path路径 +drission = Drission(driver_options = driver_options, driver_path = driver_path) + +# 保存到ini文件 +from DrissionPage.config import OptionsManager +options = OptionsManager() +driver_path = 'C:\\chrome\\chromedriver.exe' # driver_path路径 +chrome_path = 'D:\\chrome\\chrome.exe' # chrome.exe路径 +options.set_item('paths', 'chromedriver_path', driver_path) # 设置driver_path路径 +options.set_item('chrome_options', 'binary_location', chrome_path) # 设置chrome.exe路径 +options.save() # 保存到ini文件 +drission = Drission() # 以后可直接创建 +``` + + + +## 使用页面对象 + +页面对象封装了常用的网页操作,并实现driver和session模式之间的切换。 + +```python +page = MixPage(drission) # 默认driver模式 +page = MixPage(drission, mode='s', timeout=10) # session模式,元素等待时间5秒(默认10秒) + +# 访问URL +page.get(url, **kwargs) +page.post(url, data, **kwargs) # 只有session模式才有post方法 + +# 切换模式 +page.change_mode() + +# 操作页面 +print(page.html) # 页面源代码 +page.run_script(js) # 运行js语句 +page.close_other_tabs(num) # 关闭其它标签页 +page.to_iframe(iframe) # 切入iframe +page.screenshot(path) # 页面截图 +page.scrool_to_see(element) # 滚动直到某元素可见 +# 详见APIs... +``` + +注:调用只属于driver模式的方法,会自动切换到driver模式。 + + + +## 查找元素 + +可使用多种方法查找页面元素(eles函数会返回所有符合要求的元素对象列表)。 + +注:元素查找超时默认为10秒,你也可以按需要设置。 + +```python +# 根据属性查找 +page.ele('@id:ele_id', timeout = 2) # 查找id为ele_id的元素,设置等待时间2秒 +page.eles('@class:class_name') # 查找所有class为class_name的元素 + +# 根据tag name查找 +page.ele('tag:li') # 查找第一个li元素 +page.eles('tag:li') # 查找所有li元素 + +# 根据位置查找 +page.ele('@id:ele_id').parent # 父元素 +page.ele('@id:ele_id').next # 下一个兄弟元素 +page.ele('@id:ele_id').prev # 上一个兄弟元素 + +# 根据文本内容查找 +page.ele('search text') # 查找包含传入文本的元素 +page.eles('text:search text') # 如文本以@、tag:、css:、xpath:、text:开头,则在前面加上text:避免冲突 + +# 根据xpath或css selector查找 +page.eles('xpath://div[@class="ele_class"]') +page.eles('css:div.ele_class') + +# 根据loc查找 +loc1 = By.ID, 'ele_id' +loc2 = By.XPATH, '//div[@class="ele_class"]' +page.ele(loc1) +page.ele(loc2) + +# 查找下级元素 +element = page.ele('@id:ele_id') +element.ele('@class:class_name') # 在element下级查找第一个class为ele_class的元素 +element.eles('tag:li') # 在ele_id下级查找所有li元素 + +# 串连查找 +page.ele('@id:ele_id').ele('tag:div').next.ele('some text').eles('tag:a') +``` + + + +## 元素操作 + +```python +# 获取元素信息 +element = page.ele('@id:ele_id') +element.html # 返回元素内html +element.text # 返回元素内去除html标签后的text值 +element.tag # 返回元素tag name +element.attrs # 返回元素所有属性的字典 +element.attr('class') # 返回元素的class属性 +element.is_valid # driver模式独有,用于判断元素是否还可用 + +# 操作元素 +element.click() # 点击元素 +element.input(text) # 输入文字 +element.run_script(js) # 运行js +element.submit() # 提交表单 +element.clear() # 清空元素 +element.is_selected() # 是否被选中 +element.is_enabled() # 是否可用 +element.is_displayed() # 是否可见 +element.is_valid() # 是否有效,用于判断页面跳转导致元素失效的情况 +element.select(text) # 选中下拉列表选项 +element.set_attr(attr,value) # 设置元素属性 +element.size # 元素大小 +element.location # 元素位置 +``` + + + +## 保存配置 + +因chrome和headers配置繁多,故设置一个ini文件专门用于保存常用配置,你可使用OptionsManager对象获取和保存配置,用DriverOptions对象修改chrome配置。 + +### ini文件内容 + +ini文件默认拥有三部分配置:paths、chrome_options、session_options。 + +```ini +[paths] +chromedriver_path = +; chromedriver.exe路径 +global_tmp_path = +; 临时文件夹路径,用于保存截图、文件下载等 + +[chrome_options] +debugger_address = +; 已打开的浏览器地址和端口,如127.0.0.1:9222 +binary_location = +; chrome.exe路径 +arguments = [] +; 配置信息,如'--headless', +extensions = [] +; 插件 +experimental_options = {} +; 实验性配置 + +[session_options] +headers = { + "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_6) AppleWebKit/603.3.8 (KHTML, like Gecko) Version/10.1.2 Safari/603.3.8", + "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", + "Accept-Language": "zh-cn", + "Connection": "keep-alive", + "Accept-Charset": "GB2312,utf-8;q=0.7,*;q=0.7" + } +``` + +### OptionsManager对象 + +OptionsManager对象用于读取、设置和保存配置。 + +```python +get_value(section, item) -> str # 获取某个配置的值 +get_option(section) -> dict # 以字典格式返回配置全部属性 +set_item(section, item, value) # 设置配置属性 +save() # 保存配置到ini文件 +``` + +### DriverOptions对象 + +DriverOptions对象继承自selenium.webdriver.chrome.options的Options对象,在其基础上增加了以下方法: + +```python +remove_argument(value) # 删除某argument值 +remove_experimental_option(key) # 删除某experimental_option设置 +remove_all_extensions() # 删除全部插件 +save() # 保存配置到ini文件 +``` + +### 使用示例 + +```python +from DrissionPage import * +from DrissionPage.configs import * + +driver_options = DriverOptions() # 默认从ini文件读取配置 +driver_options.add_argument('--headless') # 添加配置 +driver_options.remove_experimental_options('prefs') # 移除配置 +driver_options.save() # 保存配置 + +options_manager = OptionsManager() # 创建OptionsManager对象 +driver_path = options_manager.get_value('paths', 'chromedriver_path') # 读取路径信息 + +drission = Drission(driver_options, driver_path) # 使用配置创建Drission对象 +``` + +# PO模式 + +*** + +MixPage封装了常用的页面操作,可方便地用于扩展。 + +例:扩展一个列表页面读取类 + +```python +import re +from time import sleep +from DrissionPage import * + +class ListPage(MixPage): + """本类封装读取列表页面的方法,根据必须的4个元素,可读取同构的列表页面 + (中文变量真香)""" + def __init__(self, drission: Drission, url: str = None, **xpaths): + super().__init__(drission) + self._url = url + self.xpath_栏目名 = xpaths['栏目名'] # [xpath字符串, 正则表达式] + self.xpath_下一页 = xpaths['下一页'] + self.xpath_行s = xpaths['行'] + self.xpath_页数 = xpaths['页数'] # [xpath字符串, 正则表达式] + self.总页数 = self.get_总页数() + if url: + self.get(url) + + def get_栏目名称(self) -> str: + if self.xpath_栏目名[1]: + s = self.ele(f'xpath:{self.xpath_栏目名[0]}').text + r = re.search(self.xpath_栏目名[1], s) + return r.group(1) + else: + return self.ele(f'xpath:{self.xpath_栏目名[0]}').text + + def get_总页数(self) -> int: + if self.xpath_页数[1]: + s = self.ele(f'xpath:{self.xpath_页数[0]}').text + r = re.search(self.xpath_页数[1], s) + return int(r.group(1)) + else: + return int(self.ele(f'xpath:{self.xpath_页数[0]}').text) + + def click_下一页(self, wait: float = None): + self.ele(f'xpath:{self.xpath_下一页}').click() + if wait: + sleep(wait) + + def get_当前页列表(self, 待爬内容: list) -> list: + """ + 待爬内容格式:[[xpath1,参数1],[xpath2,参数2]...] + 返回列表格式:[[参数1,参数2...],[参数1,参数2...]...] + """ + 结果列表 = [] + 行s = self.eles(f'xpath:{self.xpath_行s}') + for 行 in 行s: + 行结果 = [] + for j in 待爬内容: + 行结果.append(行.ele(f'xpath:{j[0]}').attr(j[1])) + 结果列表.append(行结果) + print(行结果) + return 结果列表 + + def get_列表(self, 待爬内容: list, wait: float = None) -> list: + 列表 = self.get_当前页列表(待爬内容) + for _ in range(self.总页数 - 1): + self.click_下一页(wait) + 列表.extend(self.get_当前页列表(待爬内容)) + return 列表 +``` + +# 其它 + +*** + +## DriverPage和SessionPage + +如果无须切换模式,可根据需要只使用DriverPage或SessionPage,用法和MixPage一致。 + +```python +from DrissionPage.session_page import SessionPage +from DrissionPage.drission import Drission + +session = Drission().session +page = SessionPage(session) # 传入Session对象 +page.get('http://www.baidu.com') +print(page.ele('@id:su').text) # 输出:百度一下 + +driver = Drission().driver +page = DriverPage(driver) # 传入Driver对象 +page.get('http://www.baidu.com') +print(page.ele('@id:su').text) # 输出:百度一下 +``` + +# APIs + +*** + +## Drission类 + +class **Drission**(driver_options: Union[dict, Options] = None, session_options: dict = None, driver_path: str = None) + +用于管理driver和session对象。参数说明: + +- driver_options - chrome配置参数,可接收Options对象或字典 +- session_options - session配置参数,接收字典 +- driver_path - chromedriver.exe路径,如不设置,须在系统设置系统变量 + +### session + +​ 返回HTMLSession对象,调用时自动创建。 + +### driver + +​ 获取WebDriver对象,调用时自动创建,按传入配置或ini文件配置初始化。 + +### session_options + +​ 以字典格式返回或设置session配置。 + +### cookies_to_session + +​ cookies_to_session(copy_user_agent: bool = False, driver: WebDriver = None, session: Session = None) -> None + +​ 把cookies从driver复制到session。默认复制self.driver到self.session,也可以接收driver和session进行操作。 + +​ 参数说明: + +- copy_user_agent - 是否复制user_agent到session +- driver - WebDriver对象,复制cookies +- session - Session对象,接收cookies + +### cookies_to_driver + +​ cookies_to_driver(url: str, driver: WebDriver = None, session: Session = None) -> None + +​ 把cookies从session复制到driver。默认复制self.session到self.driver,也可以接收driver和session进行操作。须要指定url或域名。 + +​ 参数说明: + +- url - cookies的域 +- driver - WebDriver对象,接收cookies +- session - Session对象,复制cookies + +### user_agent_to_session + +​ user_agent_to_session(driver: WebDriver = None, session: Session = None) -> None + +​ 把user agent从driver复制到session。默认复制self.driver到self.session,也可以接收driver和session进行操作。 + +​ 参数说明: + +- driver - WebDriver对象,复制user agent +- session - Session对象,接收user agent + +### close_driver + +​ close_driver() -> None + +​ 关闭浏览器,driver置为None。 + +### close_session + +​ close_session() -> None + +​ 关闭session并置为None。 + +### close + +​ close() -> None + +​ 关闭driver和session。 + + + +## MixPage类 + +class **MixPage**(drission: Drission, mode='d', timeout: float = 10) + +MixPage封装了页面操作的常用功能,可在driver和session模式间无缝切换。切换的时候会自动同步cookies。 +获取信息功能为两种模式共有,操作页面元素功能只有d模式有。调用某种模式独有的功能,会自动切换到该模式。 +它继承自DriverPage和SessionPage类,这些功能由这两个类实现,MixPage作为调度的角色存在。 + +参数说明: + +- drission - Drission对象 +- mode - 模式,可选'd'或's',默认为'd' +- timeout - 查找元素超时时间(每次查找元素时还可单独设置) + +### url + +​ 返回当前访问的url。 + +### mode + +​ 返回当前模式('s'或'd')。 + +### drission + +​ 返回当前使用的Dirssion对象。 + +### driver + +​ 返回driver对象,如没有则创建,调用时会切换到driver模式。 + +### session + +​ 返回session对象,如没有则创建。 + +### response + +​ 返回Response对象,调用时会切换到session模式。 + +### cookies + +​ 返回cookies,从当前模式获取。 + +### html + +​ 返回页面html文本。 + +### title + +​ 返回页面title文本。 + +### change_mode + +​ change_mode(mode: str = None, go: bool = True) -> None + +​ 切换模式,可指定目标模式,若目标模式与当前模式一致,则直接返回。 + +​ 参数说明: + +- mode - 指定目标模式,'d'或's'。 +- go - 切换模式后是否跳转到当前url + +### get + +​ get(url: str, params: dict = None, go_anyway=False, **kwargs) -> Union[bool, None] + +​ 跳转到一个url,跳转前先同步cookies,跳转后返回目标url是否可用。 + +​ 参数说明: + +- url - 目标url +- params - url参数 +- go_anyway - 是否强制跳转。若目标url和当前url一致,默认不跳转。 +- kwargs - 用于session模式时访问参数。 + +### ele + +​ ele(loc_or_ele: Union[tuple, str, DriverElement, SessionElement], mode: str = None, timeout: float = None, show_errmsg: bool = False) -> Union[DriverElement, SessionElement] + +​ 根据查询参数获取元素,返回元素或元素列表。 +​ 如查询参数是字符串,可选'@属性名:'、'tag:'、'text:'、'css:'、'xpath:'方式。无控制方式时默认用text方式查找。 +​ 如是loc,直接按照内容查询。 + +​ 参数说明: + +- loc_or_str - 查询条件参数,如传入一个元素对象,则直接返回 +- mode - 查找一个或多个,传入'single'或'all' +- timeout - 查找元素超时时间,driver模式下有效 +- show_errmsg - 出现异常时是否抛出及显示 + +​ 示例: + +- page.ele('@id:ele_id') - 按照属性查找元素 +- page.ele('tag:div') - 按照tag name查找元素 +- page.ele('text:some text') - 按照文本查找元素 +- page.ele('some text') - 按照文本查找元素 +- page.ele('css:>div') - 按照css selector查找元素 +- page.ele('xpath://div') - 按照xpath查找元素 +- page.ele((By.ID, 'ele_id')) - 按照loc方式查找元素 + +### eles + +​ eles(loc_or_str: Union[tuple, str], timeout: float = None, show_errmsg: bool = False) -> List[DriverElement] + +​ 根据查询参数获取符合条件的元素列表。查询参数使用方法和ele方法一致。 + +​ 参数说明: + +- loc_or_str - 查询条件参数 +- timeout - 查找元素超时时间,driver模式下有效 +- show_errmsg - 出现异常时是否抛出及显示 + +### cookies_to_session + +​ cookies_to_session(copy_user_agent: bool = False) -> None + +​ 手动把cookies从driver复制到session。 + +​ 参数说明: + +- copy_user_agent - 是否同时复制user agent + +### cookies_to_driver + +​ cookies_to_driver(url=None) -> None + +​ 手动把cookies从session复制到driver。 + +​ 参数说明: + +- url - cookies的域或url + +### post + +​ post(url: str, params: dict = None, data: dict = None, go_anyway: bool = False, **kwargs) -> Union[bool, None] + +​ 以post方式跳转,调用时自动切换到session模式。 + +​ 参数说明: + +- url - 目标url +- parame - url参数 +- data - 提交的数据 +- go_anyway - 是否强制跳转。若目标url和当前url一致,默认不跳转。 +- kwargs - headers等访问参数 + +### download + +​ download(file_url: str, goal_path: str = None, rename: str = None, **kwargs) -> tuple + +​ 下载一个文件,返回是否成功和下载信息字符串。改方法会自动避免和目标路径现有文件重名。 + +​ 参数说明: + +- file_url - 文件URL +- goal_path - 存放路径,默认为ini文件中指定的临时文件夹 +- rename - 重命名文件名,默认不重命名 +- kwargs - 用于requests的连接参数 + + + +以下方法只有driver模式下生效,调用时会自动切换到driver模式 + +*** + +### check_page + +​ check_page() -> bool + +​ 派生子类后用于检查域名是否符合预期,功能由子类实现。 + +### run_script + +​ run_script(script: str) -> Any + +​ 执行JavaScript代码。 + +​ 参数说明: + +- script - JavaScript代码文本 + +### get_tabs_sum + +​ get_tabs_sum() -> int + +​ 返回浏览器标签页数量。 + +### get_tab_num + +​ get_tab_num() -> int + +​ 返回当前标签页序号。 + +### to_tab + +​ to_tab(index: int = 0) -> None + +​ 跳转到某序号的标签页。 + +参数说明: + +- index - 目标标签页序号,从0开始计算 + +### close_current_tab + +​ close_current_tab() -> None + +​ 关闭当前标签页。 + +### close_other_tabs + +​ close_other_tabs(tab_index: int = None) -> None + +​ 关闭除序号外的标签页。 + +​ 参数说明: + +- index - 保留的标签页序号,从0开始计算 + +### to_iframe + +​ to_iframe(loc_or_ele: Union[str, tuple, WebElement] = 'main') -> bool + +​ 跳转到iframe,默认跳转到最高层级。 + +​ 参数说明: + +- loc_or_ele - 查找iframe元素的条件,和ele()方法的查找条件一致。 + +​ 示例: +- to_iframe('@id:iframe_id') +- to_iframe(iframe_element) + +### scroll_to_see + +​ scroll_to_see(loc_or_ele: Union[WebElement, tuple]) -> None + +​ 滚动直到元素可见。 + +​ 参数说明: + +- loc_or_ele - 查找iframe元素的条件,和ele()方法的查找条件一致。 + +### scroll_to + +​ scroll_to(mode: str = 'bottom', pixel: int = 300) -> None + +​ 滚动页面,按照参数决定如何滚动。 + +​ 参数说明: + +- mode - 滚动的方向,top、bottom、rightmost、leftmost、up、down、left、right +- pixel - 滚动的像素 + +### refresh + +​ refresh() -> None + +​ 刷新页面。 + +### back + +​ back() -> None + +​ 页面后退。 + +### set_window_size + +​ set_window_size(x: int = None, y: int = None) -> None + +​ 设置窗口大小,默认最大化。 + +​ 参数说明: + +- x - 目标宽度 +- y - 目标高度 + +### screenshot + +​ screenshot(path: str = None, filename: str = None) -> str + +​ 网页截图,返回截图文件路径。 + +​ 参数说明: + +- path - 截图保存路径,默认为ini文件中指定的临时文件夹 +- filename - 截图文件名,默认为页面title为文件名 + +### is_downloading + +​ is_downloading(download_path: str = None) -> bool + +​ 检测浏览器是否下载完毕。 + +​ 参数说明: + +- download_path - 下载路径,默认为ini文件中定义的路径 + +### close_driver + +​ close_driver() -> None + +​ 关闭driver及浏览器,切换到s模式。 + +### close_session + +​ close_session() -> None + +​ 关闭session,切换到d模式。 + +## DriverElement类 + +class DriverElement(ele: WebElement, timeout: float = 10) + +driver模式的元素对象,包装了一个WebElement对象,并封装了常用功能。 + +参数说明: + +- ele - WebElement对象 +- timeout - 查找元素超时时间(每次查找元素时还可单独设置) + +### inner_ele + +​ 被包装的WebElement对象。 + +### attrs + +​ 以字典方式返回元素所有属性及值。 + +### text + +​ 返回元素内的文本。 + +### html + +​ 返回元素内html文本。 + +### tag + +​ 返回元素标签名文本。 + +### parent + +​ 返回父级元素对象。 + +### next + +​ 返回下一个兄弟元素对象。 + +### prev + +​ 返回上一个兄弟元素对象。 + +### size + +​ 以字典方式返回元素大小。 + +### location + +​ 以字典方式放回元素坐标。 + +### ele + +​ ele(loc_or_str: Union[tuple, str], mode: str = None, show_errmsg: bool = False, timeout: float = None) -> Union[DriverElement, List[DriverElement], None] + +​ 根据查询参数获取元素。 +​ 如查询参数是字符串,可选'@属性名:'、'tag:'、'text:'、'css:'、'xpath:'方式。无控制方式时默认用text方式查找。 +​ 如是loc,直接按照内容查询。 + +​ 参数说明: + +- loc_or_str - 查询条件参数 +- mode - 查找一个或多个,传入'single'或'all' +- show_errmsg - 出现异常时是否抛出及显示 +- timeout - 查找元素超时时间 + +​ 示例: + +- element.ele('@id:ele_id') - 按照属性查找元素 +- element.ele('tag:div') - 按照tag name查找元素 +- element.ele('text:some text') - 按照文本查找元素 +- element.ele('some text') - 按照文本查找元素 +- element.ele('css:>div') - 按照css selector查找元素 +- element.ele('xpath://div') - 按照xpath查找元素 +- element.ele((By.ID, 'ele_id')) - 按照loc方式查找元素 + +### eles + +​ eles(loc_or_str: Union[tuple, str], show_errmsg: bool = False, timeout: float = None) -> List[DriverElement] + +​ 根据查询参数获取符合条件的元素列表。查询参数使用方法和ele方法一致。 + +​ 参数说明: + +- loc_or_str - 查询条件参数 +- show_errmsg - 出现异常时是否抛出及显示 +- timeout - 查找元素超时时间 + +### attr + +​ attr(attr: str) -> str + +​ 获取元素某个属性的值。 + +​ 参数说明: + +- attr - 属性名称 + +### click + +​ click(by_js=False) -> bool + +​ 点击元素,如不成功则用js方式点击,可指定用js方式点击。 + +​ 参数说明: + +- by_js - 是否用js方式点击 + +### input + +​ input(value, clear: bool = True) -> bool + +​ 输入文本。 + +​ 参数说明: + +- value - 文本值 +- clear - 输入前是否清除文本框 + +### run_script + +​ run_script(script: str) -> Any + +​ 在元素上运行JavaScript。 + +​ 参数说明: + +- script - JavaScript文本 + +### submit + +​ submit() -> None + +​ 提交表单。 + +### clear + +​ clear() -> None + +​ 清空文本框。 + +### is_selected + +​ is_selected() -> bool + +​ 元素是否被选中。 + +### is_enabled + +​ is_enabled() -> bool + +​ 元素在页面中是否可用。 + +### is_displayed + +​ is_displayed() -> bool + +​ 元素是否可见。 + +### is_valid + +​ is_valid() -> bool + +​ 元素是否有效。该方法用于判断页面跳转元素不能用的情况 + +### screenshot + +​ screenshot(path: str = None, filename: str = None) -> str + +​ 网页截图,返回截图文件路径。 + +​ 参数说明: + +- path - 截图保存路径,默认为ini文件中指定的临时文件夹 +- filename - 截图文件名,默认为页面title为文件名 + +### select + +​ select(text: str) -> bool + +​ 在下拉列表中选择。 + +​ 参数说明: + +- text - 选项文本 + +### set_attr + +​ set_attr(attr: str, value: str) -> bool + +​ 设置元素属性。 + +​ 参数说明: + +- attr - 参数名 +- value - 参数值 + + + +## SessionElement类 + +class SessionElement(ele: Element) + +session模式的元素对象,包装了一个Element对象,并封装了常用功能。 + +参数说明: + +- ele - requests_html库的Element对象 + +### inner_ele + +​ 被包装的Element对象。 + +### attrs + +​ 以字典格式返回元素所有属性的名称和值。 + +### text + +​ 返回元素内的文本。 + +### html + +​ 返回元素内html文本。 + +### tag + +​ 返回元素标签名文本。 + +### parent + +​ 返回父级元素对象。 + +### next + +​ 返回下一个兄弟元素对象。 + +### prev + +​ 返回上一个兄弟元素对象。 + +### ele + +​ ele(loc_or_str: Union[tuple, str], mode: str = None, show_errmsg: bool = False) -> Union[SessionElement, List[SessionElement], None] + +​ 根据查询参数获取元素。 +​ 如查询参数是字符串,可选'@属性名:'、'tag:'、'text:'、'css:'、'xpath:'方式。无控制方式时默认用text方式查找。 +​ 如是loc,直接按照内容查询。 + +​ 参数说明: + +- loc_or_str - 查询条件参数 + +- mode - 查找一个或多个,传入'single'或'all' + +- show_errmsg - 出现异常时是否抛出及显示 + +​ 示例: + +- element.ele('@id:ele_id') - 按照属性查找元素 +- element.ele('tag:div') - 按照tag name查找元素 +- element.ele('text:some text') - 按照文本查找元素 +- element.ele('some text') - 按照文本查找元素 +- element.ele('css:>div') - 按照css selector查找元素 +- element.ele('xpath://div') - 按照xpath查找元素 +- element.ele((By.ID, 'ele_id')) - 按照loc方式查找元素 + +### eles + +​ eles(loc_or_str: Union[tuple, str], show_errmsg: bool = False) -> List[SessionElement] + +​ 根据查询参数获取符合条件的元素列表。查询参数使用方法和ele方法一致。 + +​ 参数说明: + +- loc_or_str - 查询条件参数 +- show_errmsg - 出现异常时是否抛出及显示 + +### attr + +​ attr(attr: str) -> str + +​ 获取元素某个属性的值。 + +​ 参数说明: + +- attr - 属性名称 + + + +## OptionsManager类 + +​ class OptionsManager(path: str = None) + +​ 管理配置文件内容的类。 + +​ 参数说明: + +- path - ini文件路径,不传入则默认读取当前文件夹下的configs.ini文件 + +### get_value + +​ get_value(section: str, item: str) -> Any + +​ 获取配置的值。 + +​ 参数说明: + +- section - 段落名称 +- item - 配置项名称 + +### get_option + +​ get_option(section: str) -> dict + +​ 以字典的格式返回整个段落的配置信息。 + +​ 参数说明: + +- section - 段落名称 + +### set_item + +​ set_item(section: str, item: str, value: str) -> None + +​ 设置配置值。 + +​ 参数说明: + +- section - 段落名称 +- item - 配置项名称 +- value - 值内容 + +### save + +​ save() -> None + +​ 保存设置到文件。 + + + +## DriverOptions类 + +​ class DriverOptions(read_file=True) + +​ chrome浏览器配置类,继承自selenium.webdriver.chrome.options的Options类,增加了删除配置和保存到文件方法。 + +​ 参数说明: + +- read_file - 布尔型,指定创建时是否从ini文件读取配置信息 + +### remove_argument + +​ remove_argument(value: str) -> None + +​ 移除一个设置。 + +​ 参数说明: + +- value - 要移除的属性值 + +### remove_experimental_option + +​ remove_experimental_option(key: str) -> None + +​ 移除一个实验设置,传入key值删除。 + +​ 参数说明: + +- key - 要移除的实验设置key值 + +### remove_argument + +​ remove_argument() -> None + +​ 移除所有插件,因插件是以整个文件储存,难以移除其中一个,故如须设置则全部移除再重设。 + +### save() + +​ save() -> None + +​ 保存设置到文件。