From a2a4528e688ba5b343d0fc20797d5cde288f4425 Mon Sep 17 00:00:00 2001 From: g1879 Date: Sun, 26 Apr 2020 11:50:13 +0800 Subject: [PATCH] =?UTF-8?q?=E5=88=9B=E5=BB=BA?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- DrissionPage/__init__.py | 5 + DrissionPage/config.py | 63 ++++++++ DrissionPage/drission.py | 162 +++++++++++++++++++++ DrissionPage/driver_page.py | 273 +++++++++++++++++++++++++++++++++++ DrissionPage/mix_page.py | 224 ++++++++++++++++++++++++++++ DrissionPage/mix_page_old.py | 232 +++++++++++++++++++++++++++++ DrissionPage/session_page.py | 215 +++++++++++++++++++++++++++ 7 files changed, 1174 insertions(+) create mode 100644 DrissionPage/__init__.py create mode 100644 DrissionPage/config.py create mode 100644 DrissionPage/drission.py create mode 100644 DrissionPage/driver_page.py create mode 100644 DrissionPage/mix_page.py create mode 100644 DrissionPage/mix_page_old.py create mode 100644 DrissionPage/session_page.py diff --git a/DrissionPage/__init__.py b/DrissionPage/__init__.py new file mode 100644 index 0000000..71514f9 --- /dev/null +++ b/DrissionPage/__init__.py @@ -0,0 +1,5 @@ +#!/usr/bin/env python +# -*- coding:utf-8 -*- + +from .drission import Drission +from .mix_page import MixPage diff --git a/DrissionPage/config.py b/DrissionPage/config.py new file mode 100644 index 0000000..9eec4d1 --- /dev/null +++ b/DrissionPage/config.py @@ -0,0 +1,63 @@ +# -*- coding:utf-8 -*- +""" +配置文件 +""" + +from pathlib import Path + +global_tmp_path = f'{str(Path(__file__).parent)}\\tmp' +Path(global_tmp_path).mkdir(parents=True, exist_ok=True) + +global_driver_options = { + # ---------------已打开的浏览器--------------- + # 'debuggerAddress': '127.0.0.1:9222', + # ---------------chromedriver路径--------------- + 'chromedriver_path': r'D:\python\Google Chrome\Chrome\chromedriver.exe', + # ---------------手动指定使用的浏览器位置--------------- + # 'binary_location': r'C:\Program Files (x86)\Google\Chrome\Application\chrome.exe', + # ---------------启动参数--------------- + 'arguments': [ + '--headless', # 隐藏浏览器窗口 + '--mute-audio', # 静音 + '--no-sandbox', + '--blink-settings=imagesEnabled=false', # 不加载图片 + # r'--user-data-dir="E:\tmp\chrome_tmp"', # 指定用户文件夹路径 + # '-–disk-cache-dir=""', # 指定缓存路径 + 'zh_CN.UTF-8', # 编码格式 + # "--proxy-server=http://127.0.0.1:8888", # 设置代理 + # '--hide-scrollbars', # 隐藏滚动条 + # '--start-maximized', # 浏览器窗口最大化 + # "--disable-javascript", # 禁用JavaScript + # 模拟移动设备 + # 'user-agent="MQQBrowser/26 Mozilla/5.0 (Linux; U; Android 2.3.7; zh-cn; MB200 Build/GRJ22; CyanogenMod-7) AppleWebKit/533.1 (KHTML, like Gecko) Version/4.0 Mobile Safari/533.1"', + '--disable-gpu' # 谷歌文档提到需要加上这个属性来规避bug + ], + # ---------------扩展文件--------------- + 'extension_files': [], + # 'extensions': [], + # ---------------实验性质的设置参数--------------- + 'experimental_options': { + 'prefs': { + # 设置下载路径 + 'download.default_directory': global_tmp_path, + # 下载不弹出窗口 + 'profile.default_content_settings.popups': 0, + # 无弹窗 + 'profile.default_content_setting_values': {'notifications': 2}, + # 禁用PDF插件 + 'plugins.plugins_list': [{"enabled": False, "name": "Chrome PDF Viewer"}], + # 设置为开发者模式,防反爬虫 + 'excludeSwitches': ["ignore-certificate-errors", "enable-automation"] + } + + } +} + +global_session_options = { + 'headers': { + "User-Agent": 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_6) AppleWebKit/603.3.8 (KHTML, like Gecko)' + ' Version/10.1.2 Safari/603.3.8', + "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", + "Accept-Language": "zh-cn", "Connection": "keep-alive", + "Accept-Charset": "GB2312,utf-8;q=0.7,*;q=0.7"} +} diff --git a/DrissionPage/drission.py b/DrissionPage/drission.py new file mode 100644 index 0000000..69a67ba --- /dev/null +++ b/DrissionPage/drission.py @@ -0,0 +1,162 @@ +# -*- encoding: utf-8 -*- +""" +@Author : g1879 +@Contact : g1879@qq.com +@File : drission.py +""" +from urllib.parse import urlparse + +import tldextract +from requests_html import HTMLSession +from selenium import webdriver +from selenium.common.exceptions import WebDriverException +from selenium.webdriver.chrome.options import Options + +from DrissionPage.config import global_driver_options, global_session_options + + +def _get_chrome_options(options: dict) -> Options: + """ 从传入的字典获取浏览器设置,返回ChromeOptions对象""" + chrome_options = webdriver.ChromeOptions() + if 'debuggerAddress' in options: + # 控制已打开的浏览器 + chrome_options.add_experimental_option('debuggerAddress', options['debuggerAddress']) + else: + if 'binary_location' in options and options['binary_location']: + # 手动指定使用的浏览器位置 + chrome_options.binary_location = options['binary_location'] + if 'arguments' in options: + # 启动参数 + if isinstance(options['arguments'], list): + for arg in options['arguments']: + chrome_options.add_argument(arg) + else: + raise Exception(f'需要list,而非{type(options["arguments"])}') + if 'extension_files' in options and options['extension_files']: + # 加载插件 + if isinstance(options['extension_files'], list): + for arg in options['extension_files']: + chrome_options.add_extension(arg) + else: + raise Exception(f'需要list,而非{type(options["extension_files"])}') + if 'experimental_options' in options: + # 实验性质的设置参数 + if isinstance(options['experimental_options'], dict): + for i in options['experimental_options']: + chrome_options.add_experimental_option(i, options['experimental_options'][i]) + else: + raise Exception(f'需要dict,而非{type(options["experimental_options"])}') + + return chrome_options + + +class Drission(object): + """ Drission类整合了WebDriver对象和HTLSession对象, + 可按要求创建、关闭及同步cookies + """ + + def __init__(self, driver_options: dict = None, session_options: dict = None): + self._driver = None + self._session = None + self._driver_options = driver_options if driver_options else global_driver_options + self._session_options = session_options if session_options else global_session_options + + @property + def session(self): + """ 获取HTMLSession对象""" + if self._session is None: + self._session = HTMLSession() + return self._session + + @property + def driver(self): + """ 获取WebDriver对象,按传入配置信息初始化""" + if self._driver is None: + if 'chromedriver_path' in self._driver_options: + driver_path = self._driver_options['chromedriver_path'] + else: + driver_path = 'chromedriver' + self._driver = webdriver.Chrome(driver_path, options=_get_chrome_options(self._driver_options)) + return self._driver + + @property + def session_options(self): + return self._session_options + + def cookies_to_session(self, copy_user_agent: bool = False) -> None: + """ 把driver的cookies复制到session""" + if copy_user_agent: + self.copy_user_agent_from_driver() + for cookie in self.driver.get_cookies(): + self.session.cookies.set(cookie['name'], cookie['value'], domain=cookie['domain']) + + def cookies_to_driver(self, url: str): + """ 把session的cookies复制到driver""" + domain = urlparse(url).netloc + if not domain: + raise Exception('Without specifying a domain') + + # 翻译cookies + for i in [x for x in self.session.cookies if domain in x.domain]: + cookie_data = {'name': i.name, 'value': str(i.value), 'path': i.path, 'domain': i.domain} + if i.expires: + cookie_data['expiry'] = i.expires + self.ensure_add_cookie(cookie_data) + + def ensure_add_cookie(self, cookie, override_domain=None) -> None: + """ 添加cookie到driver""" + if override_domain: + cookie['domain'] = override_domain + + cookie_domain = cookie['domain'] if cookie['domain'][0] != '.' else cookie['domain'][1:] + try: + browser_domain = tldextract.extract(self.driver.current_url).fqdn + except AttributeError: + browser_domain = '' + if cookie_domain not in browser_domain: + self.driver.get(f'http://{cookie_domain.lstrip("http://")}') + + self.driver.add_cookie(cookie) + + # 如果添加失败,尝试更宽的域名 + if not self.is_cookie_in_driver(cookie): + cookie['domain'] = tldextract.extract(cookie['domain']).registered_domain + self.driver.add_cookie(cookie) + if not self.is_cookie_in_driver(cookie): + raise WebDriverException(f"Couldn't add the following cookie to the webdriver\n{cookie}\n") + + def is_cookie_in_driver(self, cookie) -> bool: + """ 检查cookie是否已经在driver里 + 只检查name、value、domain,检查domain时比较宽""" + for driver_cookie in self.driver.get_cookies(): + if (cookie['name'] == driver_cookie['name'] and + cookie['value'] == driver_cookie['value'] and + (cookie['domain'] == driver_cookie['domain'] or + f'.{cookie["domain"]}' == driver_cookie['domain'])): + return True + return False + + def copy_user_agent_from_driver(self) -> None: + """ 把driver的user-agent复制到session""" + selenium_user_agent = self.driver.execute_script("return navigator.userAgent;") + self.session.headers.update({"user-agent": selenium_user_agent}) + + def close_driver(self) -> None: + """ 关闭driver和浏览器""" + self._driver.quit() + self._driver = None + + def close_session(self) -> None: + """ 关闭session""" + self._session.close() + self._session = None + + def close(self) -> None: + """ 关闭session、driver和浏览器""" + if self._driver: + self.close_driver() + if self._session: + self.close_session() + + def __del__(self): + self.close() diff --git a/DrissionPage/driver_page.py b/DrissionPage/driver_page.py new file mode 100644 index 0000000..bb6cc6d --- /dev/null +++ b/DrissionPage/driver_page.py @@ -0,0 +1,273 @@ +# -*- coding:utf-8 -*- +""" +@Author : g1879 +@Contact : g1879@qq.com +@File : driver_page.py +""" +from html import unescape +from time import sleep +from typing import Union +from urllib import parse + +from selenium.webdriver.chrome.webdriver import WebDriver +from selenium.webdriver.remote.webelement import WebElement +from selenium.webdriver.support import expected_conditions as EC +from selenium.webdriver.support.ui import Select +from selenium.webdriver.support.wait import WebDriverWait + + +class DriverPage(object): + """DriverPage封装了页面操作的常用功能,使用selenium来获取、解析、操作网页""" + + def __init__(self, driver: WebDriver, locs=None): + """初始化函数,接收一个WebDriver对象,用来操作网页""" + self._driver = driver + self._locs = locs + self._url = None + self._url_available = None + + @property + def driver(self) -> WebDriver: + return self._driver + + @property + def url(self) -> Union[str, None]: + """当前网页url""" + if not self._driver or not self._driver.current_url.startswith('http'): + return None + else: + return self._driver.current_url + + @property + def url_available(self) -> bool: + """url有效性""" + return self._url_available + + def get(self, url: str, params: dict = None, go_anyway: bool = False) -> Union[None, bool]: + """跳转到url""" + to_url = f'{url}?{parse.urlencode(params)}' if params else url + if not url or (not go_anyway and self.url == to_url): + return + self._url = to_url + self.driver.get(to_url) + self._url_available = True if self.check_driver_url() else False + return self._url_available + + @property + def cookies(self) -> list: + """返回当前网站cookies""" + return self.driver.get_cookies() + + def get_title(self) -> str: + """获取网页title""" + return self._driver.title + + def _get_ele(self, loc_or_ele: Union[WebElement, tuple]) -> WebElement: + """接收loc或元素实例,返回元素实例""" + # ======================================== + # ** 必须与SessionPage类中同名函数保持一致 ** + # ======================================== + if isinstance(loc_or_ele, tuple): + return self.find(loc_or_ele) + return loc_or_ele + + def find(self, loc: tuple, mode: str = None, timeout: float = 10, show_errmsg: bool = True) \ + -> Union[WebElement, list]: + """查找一个元素 + :param loc: 页面元素地址 + :param mode: 以某种方式查找元素,可选'single' , 'all', 'visible' + :param timeout: 是否显示错误信息 + :param show_errmsg: 是否显示错误信息 + :return: 页面元素对象或列表 + """ + mode = mode if mode else 'single' + if mode not in ['single', 'all', 'visible']: + raise ValueError("mode须在'single', 'all', 'visible'中选择") + msg = ele = None + try: + wait = WebDriverWait(self.driver, timeout=timeout) + if mode == 'single': + msg = '未找到元素' + ele = wait.until(EC.presence_of_element_located(loc)) + elif mode == 'all': + msg = '未找到元素s' + ele = wait.until(EC.presence_of_all_elements_located(loc)) + elif mode == 'visible': + msg = '元素不可见或不存在' + ele = wait.until(EC.visibility_of_element_located(loc)) + # print(loc,ele) + return ele + except: + if show_errmsg: + print(msg, loc) + + def find_all(self, loc: tuple, timeout: float = 10, show_errmsg=True) -> list: + """查找符合条件的所有元素""" + return self.find(loc, mode='all', timeout=timeout, show_errmsg=show_errmsg) + + def get_attr(self, loc_or_ele: Union[WebElement, tuple], attr: str) -> str: + """获取元素属性""" + ele = self._get_ele(loc_or_ele) + try: + return ele.get_attribute(attr) + except: + return '' + + def get_html(self, loc_or_ele: Union[WebElement, tuple] = None) -> str: + """获取元素innerHTML,如未指定元素则获取页面源代码""" + if not loc_or_ele: + return self.driver.find_element_by_xpath("//*").get_attribute("outerHTML") + return unescape(self.get_attr(loc_or_ele, 'innerHTML')).replace('\xa0', ' ') + + def get_text(self, loc_or_ele: Union[WebElement, tuple]) -> str: + """获取innerText""" + return unescape(self.get_attr(loc_or_ele, 'innerText')).replace('\xa0', ' ') + + # ----------------以下为独有函数----------------------- + + def find_visible(self, loc: tuple, timeout: float = 10, show_errmsg: bool = True) -> WebElement: + """查找一个可见元素""" + return self.find(loc, mode='visible', timeout=timeout, show_errmsg=show_errmsg) + + def check_driver_url(self) -> bool: + """由子类自行实现各页面的判定规则""" + return True + + def input(self, loc_or_ele: Union[WebElement, tuple], value: str, clear: bool = True) -> bool: + """向文本框填入文本""" + ele = self._get_ele(loc_or_ele) + try: + if clear: + self.run_script(ele, "arguments[0].value=''") + ele.send_keys(value) + return True + except: + raise + + def click(self, loc_or_ele: Union[WebElement, tuple]) -> bool: + """点击一个元素""" + ele = self._get_ele(loc_or_ele) + if not ele: + raise + for _ in range(10): + try: + ele.click() + return True + except Exception as e: + print(e) + sleep(0.2) + # 点击失败代表被遮挡,用js方式点击 + print(f'用js点击{loc_or_ele}') + try: + self.run_script(ele, 'arguments[0].click()') + return True + except: + raise + + def set_attr(self, loc_or_ele: Union[WebElement, tuple], attribute: str, value: str) -> bool: + """设置元素属性""" + ele = self._get_ele(loc_or_ele) + try: + self.driver.execute_script(f"arguments[0].{attribute} = '{value}';", ele) + return True + except: + raise + + def run_script(self, loc_or_ele: Union[WebElement, tuple], script: str) -> bool: + """执行js脚本""" + ele = self._get_ele(loc_or_ele) + try: + return self.driver.execute_script(script, ele) + except: + raise + + def get_tabs_sum(self) -> int: + """获取标签页数量""" + return len(self.driver.window_handles) + + def get_tab_num(self) -> int: + """获取当前tab号码""" + handle = self.driver.current_window_handle + handle_list = self.driver.window_handles + return handle_list.index(handle) + + def to_tab(self, index: int = 0) -> None: + """跳转到第几个标签页,从0开始算""" + tabs = self.driver.window_handles # 获得所有标签页权柄 + self.driver.switch_to.window(tabs[index]) + + def close_current_tab(self) -> None: + """关闭当前标签页""" + self.driver.close() + + def close_other_tabs(self, tab_index: int = None) -> None: + """关闭其它标签页,没有传入序号代表保留当前页""" + tabs = self.driver.window_handles # 获得所有标签页权柄 + page_handle = tabs[tab_index] if tab_index >= 0 else self.driver.current_window_handle + for i in tabs: # 遍历所有标签页,关闭非保留的 + if i != page_handle: + self.driver.switch_to.window(i) + self.close_current_tab() + self.driver.switch_to.window(page_handle) # 把权柄定位回保留的页面 + + def to_iframe(self, loc_or_ele: Union[str, tuple, WebElement] = 'main') -> bool: + """跳转到iframe,若传入字符串main则跳转到最高级""" + if loc_or_ele == 'main': + self.driver.switch_to.default_content() + return True + else: + ele = self._get_ele(loc_or_ele) + try: + self.driver.switch_to.frame(ele) + return True + except: + raise + + def get_screen(self, loc_or_ele: Union[WebElement, tuple], path: str, file_name: str = None) -> str: + """获取元素截图""" + ele = self._get_ele(loc_or_ele) + name = file_name if file_name else ele.tag_name + # 等待元素加载完成 + js = 'return arguments[0].complete && typeof arguments[0].naturalWidth ' \ + '!= "undefined" && arguments[0].naturalWidth > 0' + while not self.run_script(ele, js): + pass + img_path = f'{path}\\{name}.png' + ele.screenshot(img_path) + return img_path + + def scroll_to_see(self, loc_or_ele: Union[WebElement, tuple]) -> None: + """滚动直到元素可见""" + ele = self._get_ele(loc_or_ele) + self.run_script(ele, "arguments[0].scrollIntoView();") + + def choose_select_list(self, loc_or_ele: Union[WebElement, tuple], text: str) -> bool: + """选择下拉列表""" + ele = Select(self._get_ele(loc_or_ele)) + try: + ele.select_by_visible_text(text) + return True + except: + return False + + def refresh(self) -> None: + """刷新页面""" + self.driver.refresh() + + def back(self) -> None: + """后退""" + self.driver.back() + + def set_window_size(self, x: int = None, y: int = None) -> None: + """设置窗口大小,默认最大化""" + if not x and not y: + self.driver.maximize_window() + else: + new_x = x if x else self.driver.get_window_size()['width'] + new_y = y if y else self.driver.get_window_size()['height'] + self.driver.set_window_size(new_x, new_y) + + def close_driver(self) -> None: + """关闭driver及浏览器""" + self._driver.quit() + self._driver = None diff --git a/DrissionPage/mix_page.py b/DrissionPage/mix_page.py new file mode 100644 index 0000000..9e0c106 --- /dev/null +++ b/DrissionPage/mix_page.py @@ -0,0 +1,224 @@ +# -*- coding:utf-8 -*- +""" +@Author : g1879 +@Contact : g1879@qq.com +@File : mix_page.py +""" +from typing import Union +from urllib import parse + +from requests import Response +from requests_html import Element, HTMLSession +from selenium.webdriver.chrome.webdriver import WebDriver +from selenium.webdriver.remote.webelement import WebElement + +from DrissionPage.drission import Drission +from DrissionPage.driver_page import DriverPage +from DrissionPage.session_page import SessionPage + + +class Null(object): + """避免IDE警告未调用超类初始化函数而引入的无用类""" + + def __init__(self): + pass + + +class MixPage(Null, SessionPage, DriverPage): + """MixPage封装了页面操作的常用功能,可在selenium(d模式)和requests(s模式)间无缝切换。 + 切换的时候会自动同步cookies,兼顾selenium的易用性和requests的高性能。 + 获取信息功能为两种模式共有,操作页面元素功能只有d模式有。调用某种模式独有的功能,会自动切换到该模式。 + 这些功能由DriverPage和SessionPage类实现。 + """ + + def __init__(self, drission: Drission, locs=None, mode='d'): + """初始化函数 + :param drission: 整合了driver和session的类 + :param locs: 提供页面元素地址的类 + :param mode: 默认使用selenium的d模式 + """ + super().__init__() + self._drission = drission + self._session = None + self._driver = None + self._url = None + self._response = None + self._locs = locs + self._url_available = None + self._mode = mode + if mode == 's': + self._session = self._drission.session + elif mode == 'd': + self._driver = self._drission.driver + + @property + def url(self) -> str: + """根据模式获取当前活动的url""" + if self._mode == 'd': + return super(SessionPage, self).url + elif self._mode == 's': + return self.session_url + + @property + def session_url(self) -> str: + return self._response.url if self._response else None + + @property + def mode(self) -> str: + """返回当前模式 + :return: 's'或'd' + """ + return self._mode + + def change_mode(self, mode: str = None) -> None: + """切换模式,接收字符串s或d,除此以外的字符串会切换为d模式 + 切换后调用相应的get函数使访问的页面同步 + :param mode: 模式字符串 + """ + if mode == self._mode: + return + self._mode = 's' if self._mode == 'd' else 'd' + if self._mode == 'd': # s转d + self._url = super(SessionPage, self).url + self.get(self.session_url) + elif self._mode == 's': # d转s + self._url = self.session_url + self.get(super(SessionPage, self).url) + + @property + def drission(self) -> Drission: + """返回当前使用的Dirssion对象""" + return self._drission + + @property + def driver(self) -> WebDriver: + """返回driver对象,如没有则创建 + 每次访问时切换到d模式,主要用于独有函数及外部调用 + :return:selenium的WebDriver对象 + """ + if self._driver is None: + self._driver = self._drission.driver + self.change_mode('d') + return self._driver + + @property + def session(self) -> HTMLSession: + """返回session对象,如没有则创建 + 每次访问时切换到s模式,主要用于独有函数及外部调用 + :return:requests-html的HTMLSession对象 + """ + if self._session is None: + self._session = self._drission.session + self.change_mode('s') + return self._session + + @property + def response(self) -> Response: + """返回response对象,切换到s模式""" + self.change_mode('s') + return self._response + + @property + def cookies(self) -> Union[dict, list]: # TODO:统一到一种类型 + """返回cookies,根据模式获取""" + if self._mode == 's': + return super().cookies + elif self._mode == 'd': + return super(SessionPage, self).cookies + + def check_driver_url(self) -> bool: + """判断页面是否能访问,由子类依据不同的页面自行实现""" + return True + + def cookies_to_session(self) -> None: + """从driver复制cookies到session""" + self._drission.cookies_to_session() + + def cookies_to_driver(self, url=None) -> None: + """从session复制cookies到driver,chrome需要指定域才能接收cookies""" + u = url if url else self.session_url + self._drission.cookies_to_driver(u) + + # ----------------以下为共用函数----------------------- + + def get(self, url: str, params: dict = None, go_anyway=False, **kwargs) -> Union[bool, Response, None]: + """跳转到一个url,跳转前先同步cookies,跳转后判断目标url是否可用""" + to_url = f'{url}?{parse.urlencode(params)}' if params else url + if not url or (not go_anyway and self.url == to_url): + return + if self._mode == 'd': + if self.session_url: + self.cookies_to_driver(self.session_url) + super(SessionPage, self).get(url=to_url, go_anyway=go_anyway) + if self._session: + ua = {"User-Agent": "Mozilla/5.0 (Windows; U; Windows NT 5.1; zh-CN; rv:1.9.1.6) "} + return True if self._session.get(to_url, headers=ua).status_code == 200 else False + else: + return self.check_driver_url() + elif self._mode == 's': + if self._session is None: + self._session = self._drission.session + if self._driver: + self.cookies_to_session() + super().get(url=to_url, go_anyway=go_anyway, **self.drission.session_options) + return self._url_available + + def find(self, loc: tuple, mode=None, timeout: float = 10, show_errmsg: bool = True) -> Union[WebElement, Element]: + """查找一个元素,根据模式调用对应的查找函数 + :param loc: 页面元素地址 + :param mode: 以某种方式查找元素,可选'single','all','visible'(d模式独有) + :param timeout: 超时时间 + :param show_errmsg: 是否显示错误信息 + :return: 页面元素对象,s模式下返回Element,d模式下返回WebElement + """ + if self._mode == 's': + return super().find(loc, mode=mode, show_errmsg=show_errmsg) + elif self._mode == 'd': + return super(SessionPage, self).find(loc, mode=mode, timeout=timeout, show_errmsg=show_errmsg) + + def find_all(self, loc: tuple, timeout: float = 10, show_errmsg: bool = True) -> list: + """查找符合条件的所有元素""" + if self._mode == 's': + return super().find_all(loc, show_errmsg) + elif self._mode == 'd': + return super(SessionPage, self).find_all(loc, timeout=timeout, show_errmsg=show_errmsg) + + def get_attr(self, loc_or_ele: Union[WebElement, Element, tuple], attr: str) -> str: + """获取元素属性值""" + if self._mode == 's': + return super().get_attr(loc_or_ele, attr) + elif self._mode == 'd': + return super(SessionPage, self).get_attr(loc_or_ele, attr) + + def get_html(self, loc_or_ele: Union[WebElement, Element, tuple] = None) -> str: + """获取元素innerHTML,如未指定元素则获取页面源代码""" + if self._mode == 's': + return super().get_html(loc_or_ele) + elif self._mode == 'd': + return super(SessionPage, self).get_html(loc_or_ele) + + def get_text(self, loc_or_ele) -> str: + """获取元素innerText""" + if self._mode == 's': + return super().get_text(loc_or_ele) + elif self._mode == 'd': + return super(SessionPage, self).get_text(loc_or_ele) + + def get_title(self) -> str: + """获取页面title""" + if self._mode == 's': + return super().get_title() + elif self._mode == 'd': + return super(SessionPage, self).get_title() + + def close_driver(self) -> None: + """关闭driver及浏览器,切换到s模式""" + self.change_mode('s') + self._driver = None + self.drission.close_driver() + + def close_session(self) -> None: + """关闭session,切换到d模式""" + self.change_mode('d') + self._session = None + self.drission.close_session() diff --git a/DrissionPage/mix_page_old.py b/DrissionPage/mix_page_old.py new file mode 100644 index 0000000..1b4b28f --- /dev/null +++ b/DrissionPage/mix_page_old.py @@ -0,0 +1,232 @@ +#!/usr/bin/env python +# -*- coding:utf-8 -*- +""" +旧版MixPage,已弃用 +在MixPage类中使用DriverPage和SessionPage对象,使用时根据模式调用相应对象的函数 +问题是须要在MixPage类中为这两个类中的函数写一一对应的调用函数 +新版中直接继承这两个类,只须要为这两个类共有的函数写调用函数即可 +""" +from abc import abstractmethod +from typing import Union +from urllib import parse + +from requests_html import Element +from selenium.webdriver.remote.webelement import WebElement + +from DrissionPage.drission import Drission +from DrissionPage.driver_page import DriverPage +from DrissionPage.session_page import SessionPage + + +class MixPage: + def __init__(self, drission: Drission, locs=None, mode='d'): + self._drission = drission + self._session = None + self._driver = None + self._session_page = None + self._driver_page = None + self._url = None + self._session_url = None + self._locs = locs + self._mode = mode + if mode == 's': + self._session_page = self.s_page + else: + self._driver_page = self.d_page + self._open_self_url() + + @abstractmethod + def _open_self_url(self): + pass + + @property + def mode(self): + return self._mode + + @mode.setter + def mode(self, value): + self._mode = value + + def change_mode(self, mode=None): + if mode == self.mode: + return + self.mode = 's' if self.mode == 'd' else 'd' + + @property + def drission(self): + return self._drission + + @property + def response(self): + return self.s_page.response + + @property + def session(self): + if self._session is None: + self._session = self._drission.session + return self._session + + @property + def driver(self): + if self._driver is None: + self._driver = self._drission.driver + return self._driver + + @property + def d_page(self): + if self._driver_page is None: + self._driver_page = DriverPage(self.driver) + if self._url: + self._init_page() + return self._driver_page + + @property + def s_page(self): + if self._session_page is None: + self._session_page = SessionPage(self.session) + if self._url: + self._init_page() + self.refresh_url() # 每次调用session页面时,使url和driver页面保持一致 + return self._session_page + + @property + def url(self): + if self.mode == 'd': + return self.d_page.url + else: + return self._url + + def _init_page(self): + if self._session_page: + self.cookies_to_driver(self._url) + self.d_page.get(self._url) + elif self._driver_page: + self.cookies_to_session() + self.s_page.get(self._url) + + def goto(self, url: str, url_data: dict = None): + """跳转到一个url""" + to_url = f'{url}?{parse.urlencode(url_data)}' if url_data else url + if self._url == to_url: + return + now_url = self._url + self._url = to_url + if self._driver_page: + if self._session_page: + self.cookies_to_driver(now_url) + self._driver_page.get(to_url, url_data) + if not self._session_page: + return self.check_driver_url() + if self._session_page: + self._session_url = to_url + if self._session_page: + self.cookies_to_session() + return self.s_page.goto(to_url, url_data) + + def check_driver_url(self) -> bool: + """由子类依据不同的页面自行实现""" + return True + + def refresh_url(self): + """使session的url与driver当前保持一致,并复制cookies到session""" + if self._driver and (self._url != self._driver.current_url or self._session_url != self._driver.current_url): + self._url = self._driver.current_url + self._session_url = self._driver.current_url + self.cookies_to_session() + self._session_page.get(self._url) + + def cookies_to_session(self): + self._drission.cookies_to_session() + + def cookies_to_driver(self, url=None): + u = url if url else self._url + self._drission.cookies_to_driver(u) + + # ----------------以下为共用函数----------------------- + def find(self, loc, timeout=10, show_errmsg=True) -> Union[WebElement, Element]: + if self._mode == 's': + return self.s_page.find(loc, show_errmsg) + elif self._mode == 'd': + return self.d_page.find(loc, timeout, show_errmsg) + + def find_all(self, loc, timeout=10, show_errmsg=True) -> list: + if self._mode == 's': + return self.s_page.find_all(loc, show_errmsg) + elif self._mode == 'd': + return self.d_page.find_all(loc, timeout, show_errmsg) + + def get_attr(self, loc_or_ele, attr) -> str: + if self._mode == 's': + return self.s_page.get_attr(loc_or_ele, attr) + elif self._mode == 'd': + return self.d_page.get_attr(loc_or_ele, attr) + + def get_html(self, loc_or_ele) -> str: + if self._mode == 's': + return self.s_page.get_html(loc_or_ele) + elif self._mode == 'd': + return self.d_page.get_html(loc_or_ele) + + def get_text(self, loc_or_ele) -> str: + if self._mode == 's': + return self.s_page.get_text(loc_or_ele) + elif self._mode == 'd': + return self.d_page.get_text(loc_or_ele) + + def get_source(self): + if self._mode == 's': + return self.s_page.get_html() + elif self._mode == 'd': + return self.d_page.get_html() + + def get_cookies(self): + if self._mode == 's': + return self.s_page.cookies + elif self._mode == 'd': + return self.d_page.cookies + + # ----------------以下为driver page专用函数----------------- + def input(self, loc_or_ele, value: str, clear=True) -> bool: + return self.d_page.input(loc_or_ele, value, clear) + + def click(self, loc_or_ele) -> bool: + return self.d_page.click(loc_or_ele) + + def set_attr(self, loc_or_ele, attribute: str, value: str) -> bool: + return self.d_page.set_attr(loc_or_ele, attribute, value) + + def run_script(self, loc_or_ele, script: str): + return self.d_page.run_script(loc_or_ele, script) + + def get_tabs_sum(self) -> int: + return self.d_page.get_tabs_sum() + + def get_tab_num(self) -> int: + return self.d_page.get_tab_num() + + def to_tab(self, index: int = 0): + return self.d_page.to_tab(index) + + def close_current_tab(self): + return self.d_page.close_current_tab() + + def close_other_tabs(self, tab_index: int = None): + return self.d_page.close_other_tabs(tab_index) + + def to_iframe(self, loc_or_ele): + return self.d_page.to_iframe(loc_or_ele) + + def get_screen(self, loc_or_ele, path: str, file_name: str = None) -> str: + return self.d_page.get_screen(loc_or_ele, path, file_name) + + def choose_select_list(self, loc_or_ele, text): + return self.d_page.choose_select_list(loc_or_ele, text) + + def refresh(self): + return self.d_page.refresh() + + def back(self): + return self.d_page.back() + + def set_window_size(self, x: int = None, y: int = None): + return self.d_page.set_window_size(x, y) diff --git a/DrissionPage/session_page.py b/DrissionPage/session_page.py new file mode 100644 index 0000000..c85b89e --- /dev/null +++ b/DrissionPage/session_page.py @@ -0,0 +1,215 @@ +# -*- coding:utf-8 -*- +""" +@Author : g1879 +@Contact : g1879@qq.com +@File : session_page.py +""" +import re +from html import unescape +from typing import Union +from urllib import parse + +from requests_html import Element, HTMLSession, HTMLResponse + +from DrissionPage.config import global_session_options + + +def _translate_loc(loc): + """把By类型转为xpath或css selector""" + loc_by = loc_str = None + if loc[0] == 'xpath': + loc_by = 'xpath' + loc_str = loc[1] + elif loc[0] == 'css selector': + loc_by = 'css selector' + loc_str = loc[1] + elif loc[0] == 'id': + loc_by = 'css selector' + loc_str = f'#{loc[1]}' + elif loc[0] == 'class name': + loc_by = 'xpath' + loc_str = f'//*[@class="{loc[1]}"]' + elif loc[0] == 'link text': + loc_by = 'xpath' + loc_str = f'//a[text()="{loc[1]}"]' + elif loc[0] == 'name': + loc_by = 'css selector' + loc_str = f'[name={loc[1]}]' + elif loc[0] == 'tag name': + loc_by = 'css selector' + loc_str = loc[1] + elif loc[0] == 'partial link text': + loc_by = 'xpath' + loc_str = f'//a[contains(text(),"{loc[1]}")]' + return loc_by, loc_str + + +class SessionPage(object): + """SessionPage封装了页面操作的常用功能,使用requests_html来获取、解析网页。 + """ + + def __init__(self, session: HTMLSession, locs=None): + """初始化函数""" + self._session = session + self._locs = locs + self._url = None + self._url_available = None + self._response = None + + @property + def session(self) -> HTMLSession: + return self._session + + @property + def response(self) -> HTMLResponse: + return self._response + + @property + def url(self) -> str: + """当前访问url""" + return self._url + + @property + def url_available(self) -> bool: + """url有效性""" + return self._url_available + + @property + def cookies(self) -> dict: + """当前session的cookies""" + return self.session.cookies.get_dict() + + def get_title(self) -> str: + """获取网页title""" + return self.get_text(('css selector', 'title')) + + def find(self, loc: tuple, mode: str = None, show_errmsg: bool = True) -> Union[Element, list]: + """查找一个元素 + :param loc: 页面元素地址 + :param mode: 以某种方式查找元素,可选'single','all' + :param show_errmsg: 是否显示错误信息 + :return: 页面元素对象或列表 + """ + mode = mode if mode else 'single' + if mode not in ['single', 'all']: + raise ValueError("mode须在'single', 'all'中选择") + loc_by, loc_str = _translate_loc(loc) + msg = first = None + try: + if mode == 'single': + msg = '未找到元素' + first = True + elif mode == 'all': + msg = '未找到元素s' + first = False + if loc_by == 'xpath': + return self.response.html.xpath(loc_str, first=first, _encoding='utf-8') + else: + return self.response.html.find(loc_str, first=first, _encoding='utf-8') + except: + if show_errmsg: + print(msg, loc) + raise + + def find_all(self, loc: tuple, show_errmsg: bool = True) -> list: + """查找符合条件的所有元素""" + return self.find(loc, mode='all', show_errmsg=True) + + def _get_ele(self, loc_or_ele: Union[Element, tuple]) -> Element: + """获取loc或元素实例,返回元素实例""" + # ====================================== + # ** 必须与DriverPage类中同名函数保持一致 ** + # ====================================== + if isinstance(loc_or_ele, tuple): + return self.find(loc_or_ele) + return loc_or_ele + + def get_attr(self, loc_or_ele: Union[Element, tuple], attr: str) -> str: + """获取元素属性""" + ele = self._get_ele(loc_or_ele) + try: + if attr == 'href': + # 如直接获取attr只能获取相对地址 + for link in ele.absolute_links: + return link + elif attr == 'class': + class_str = '' + for key, i in enumerate(ele.attrs['class']): + class_str += ' ' if key > 0 else '' + class_str += i + return class_str + else: + return ele.attrs[attr] + except: + return '' + + def get_html(self, loc_or_ele: Union[Element, tuple] = None) -> str: + """获取元素innerHTML,如未指定元素则获取所有源代码""" + if not loc_or_ele: + return self.response.html.html + ele = self._get_ele(loc_or_ele) + re_str = r'<.*?>(.*)' + html = unescape(ele.html).replace('\xa0', ' ') + r = re.match(re_str, html, flags=re.DOTALL) + return r.group(1) + + def get_text(self, loc_or_ele: Union[Element, tuple]) -> str: + """获取innerText""" + ele = self._get_ele(loc_or_ele) + return unescape(ele.text).replace('\xa0', ' ') + + def get(self, url: str, params: dict = None, go_anyway: bool = False, **kwargs) -> Union[bool, None]: + """用get方式跳转到url,调用_make_response()函数生成response对象""" + to_url = f'{url}?{parse.urlencode(params)}' if params else url + if not url or (not go_anyway and self.url == to_url): + return + self._response = self._make_response(to_url, **kwargs)[0] + self._url_available = self._response + return self._url_available + + # ------------以下为独占函数-------------- + + def post(self, url: str, params: dict = None, data: dict = None, go_anyway: bool = False, **kwargs) \ + -> Union[bool, None]: + """用post方式跳转到url,调用_make_response()函数生成response对象""" + to_url = f'{url}?{parse.urlencode(params)}' if params else url + if not url or (not go_anyway and self._url == to_url): + return + self._response = self._make_response(to_url, mode='post', data=data, **kwargs)[0] + self._url_available = self._response + return self._url_available + + def _make_response(self, url: str, mode: str = 'get', data: dict = None, **kwargs) -> tuple: + """生成response对象。接收mode参数,以决定用什么方式。 + :param url: 要访问的网址 + :param mode: 'get','post'中选择 + :param data: 提交的数据 + :param kwargs: 其它参数 + :return: Response对象 + """ + if mode not in ['get', 'post']: + raise ValueError("mode须在'get', 'post'中选择") + self._url = url + if not kwargs: + kwargs = global_session_options + else: + for i in global_session_options: + if i not in kwargs: + kwargs[i] = global_session_options[i] + try: + r = None + if mode == 'get': + r = self.session.get(url, **kwargs) + elif mode == 'post': + r = self.session.post(url, data=data, **kwargs) + except: + return_value = False + info = 'URL Invalid' + else: + if r.status_code == 200: + return_value = r + info = 'Success' + else: + return_value = False + info = f'{r.status_code}' + return return_value, info