diff --git a/DrissionPage/__init__.py b/DrissionPage/__init__.py index 1d6b9d2..bda0836 100644 --- a/DrissionPage/__init__.py +++ b/DrissionPage/__init__.py @@ -17,6 +17,6 @@ from .action_chains import ActionChains from .keys import Keys # 旧版页面类和启动配置类 -from .mix_page import MixPage -from .drission import Drission +from .mixpage.mix_page import MixPage +from .mixpage.drission import Drission from .configs.driver_options import DriverOptions diff --git a/DrissionPage/chromium_element.py b/DrissionPage/chromium_element.py index 6741bb5..d15515f 100644 --- a/DrissionPage/chromium_element.py +++ b/DrissionPage/chromium_element.py @@ -1580,7 +1580,7 @@ class Click(object): def right(self): """右键单击""" self._ele.page.scroll.to_see(self._ele) - x, y = self._ele._client_click_point + x, y = self._ele.locations.viewport_click_point self._click(x, y, 'right') def right_at(self, offset_x=None, offset_y=None): @@ -1594,7 +1594,7 @@ class Click(object): def middle(self): """中键单击""" self._ele.page.scroll.to_see(self._ele) - x, y = self._ele._client_click_point + x, y = self._ele.locations.viewport_click_point self._click(x, y, 'middle') def _click(self, client_x, client_y, button='left'): diff --git a/DrissionPage/chromium_element.pyi b/DrissionPage/chromium_element.pyi index 4ce43a5..0ea9f99 100644 --- a/DrissionPage/chromium_element.pyi +++ b/DrissionPage/chromium_element.pyi @@ -6,11 +6,11 @@ from pathlib import Path from typing import Union, Tuple, List, Any -from .common.constants import NoneElement from .base import DrissionElement, BaseElement from .chromium_base import ChromiumBase from .chromium_frame import ChromiumFrame from .chromium_page import ChromiumPage +from .common.constants import NoneElement from .session_element import SessionElement from .web_page import WebPage @@ -99,12 +99,6 @@ class ChromiumElement(DrissionElement): @property def midpoint(self) -> Tuple[int, int]: ... - @property - def _client_click_point(self) -> Tuple[int, int]: ... - - @property - def _click_point(self) -> Tuple[int, int]: ... - @property def shadow_root(self) -> Union[None, ChromiumShadowRootElement]: ... diff --git a/DrissionPage/common/web.py b/DrissionPage/common/web.py index 8a9ac22..515ae92 100644 --- a/DrissionPage/common/web.py +++ b/DrissionPage/common/web.py @@ -107,22 +107,22 @@ def location_in_viewport(page, loc_x, loc_y): def offset_scroll(ele, offset_x, offset_y): """接收元素及偏移坐标,把坐标滚动到页面中间,返回该点在视口中的坐标 - 有偏移量时以元素左上角坐标为基准,没有时以_click_point为基准 + 有偏移量时以元素左上角坐标为基准,没有时以click_point为基准 :param ele: 元素对象 :param offset_x: 偏移量x :param offset_y: 偏移量y :return: 视口中的坐标 """ loc_x, loc_y = ele.location - cp_x, cp_y = ele._click_point + cp_x, cp_y = ele.locations.click_point lx = loc_x + offset_x if offset_x else cp_x ly = loc_y + offset_y if offset_y else cp_y if not location_in_viewport(ele.page, lx, ly): clientWidth = ele.page.run_js('return document.body.clientWidth;') clientHeight = ele.page.run_js('return document.body.clientHeight;') ele.page.scroll.to_location(lx - clientWidth // 2, ly - clientHeight // 2) - cl_x, cl_y = ele.client_location - ccp_x, ccp_y = ele._client_click_point + cl_x, cl_y = ele.locations.viewport_location + ccp_x, ccp_y = ele.locations.viewport_click_point cx = cl_x + offset_x if offset_x else ccp_x cy = cl_y + offset_y if offset_y else ccp_y return cx, cy diff --git a/DrissionPage/easy_set.py b/DrissionPage/easy_set.py index 14d1f6d..65c5dd9 100644 --- a/DrissionPage/easy_set.py +++ b/DrissionPage/easy_set.py @@ -13,7 +13,7 @@ from selenium import webdriver from .configs.chromium_options import ChromiumOptions from .configs.driver_options import DriverOptions from .configs.options_manage import OptionsManager -from .drission import Drission +from DrissionPage.mixpage.drission import Drission from .common.tools import unzip from .session_page import SessionPage diff --git a/DrissionPage/mixpage/base.py b/DrissionPage/mixpage/base.py new file mode 100644 index 0000000..1a26364 --- /dev/null +++ b/DrissionPage/mixpage/base.py @@ -0,0 +1,324 @@ +# -*- coding:utf-8 -*- +""" +@Author : g1879 +@Contact : g1879@qq.com +""" +from abc import abstractmethod +from re import sub +from urllib.parse import quote + +from DrissionPage.common.web import format_html +from DrissionPage.common.locator import get_loc + + +class BaseParser(object): + """所有页面、元素类的基类""" + + def __call__(self, loc_or_str): + return self.ele(loc_or_str) + + def ele(self, loc_or_ele, timeout=None): + return self._ele(loc_or_ele, timeout, True) + + def eles(self, loc_or_str, timeout=None): + return self._ele(loc_or_str, timeout, False) + + # ----------------以下属性或方法待后代实现---------------- + @property + def html(self): + return '' + + def s_ele(self, loc_or_ele): + pass + + def s_eles(self, loc_or_str): + pass + + @abstractmethod + def _ele(self, loc_or_ele, timeout=None, single=True): + pass + + +class BaseElement(BaseParser): + """各元素类的基类""" + + def __init__(self, page=None): + self.page = page + + # ----------------以下属性或方法由后代实现---------------- + @property + def tag(self): + return + + @abstractmethod + def _ele(self, loc_or_str, timeout=None, single=True, relative=False): + pass + + def parent(self, level_or_loc=1): + pass + + def prev(self, index=1): + return None # ShadowRootElement直接继承 + + def prevs(self) -> None: + return None # ShadowRootElement直接继承 + + def next(self, index=1): + pass + + def nexts(self): + pass + + +class DrissionElement(BaseElement): + """DriverElement、ChromiumElement 和 SessionElement的基类 + 但不是ShadowRootElement的基类""" + + @property + def link(self): + """返回href或src绝对url""" + return self.attr('href') or self.attr('src') + + @property + def css_path(self): + """返回css path路径""" + return self._get_ele_path('css') + + @property + def xpath(self): + """返回xpath路径""" + return self._get_ele_path('xpath') + + @property + def comments(self): + """返回元素注释文本组成的列表""" + return self.eles('xpath:.//comment()') + + def texts(self, text_node_only=False): + """返回元素内所有直接子节点的文本,包括元素和文本节点 + :param text_node_only: 是否只返回文本节点 + :return: 文本列表 + """ + if text_node_only: + texts = self.eles('xpath:/text()') + else: + texts = [x if isinstance(x, str) else x.text for x in self.eles('xpath:./text() | *')] + + return [format_html(x.strip(' ').rstrip('\n')) for x in texts if x and sub('[\r\n\t ]', '', x) != ''] + + def parent(self, level_or_loc=1): + """返回上面某一级父元素,可指定层数或用查询语法定位 + :param level_or_loc: 第几级父元素,或定位符 + :return: 上级元素对象 + """ + if isinstance(level_or_loc, int): + loc = f'xpath:./ancestor::*[{level_or_loc}]' + + elif isinstance(level_or_loc, (tuple, str)): + loc = get_loc(level_or_loc, True) + + if loc[0] == 'css selector': + raise ValueError('此css selector语法不受支持,请换成xpath。') + + loc = f'xpath:./ancestor::{loc[1].lstrip(". / ")}' + + else: + raise TypeError('level_or_loc参数只能是tuple、int或str。') + + return self._ele(loc, timeout=0, relative=True) + + def prev(self, index=1, filter_loc='', timeout=0): + """返回前面的一个兄弟元素,可用查询语法筛选,可指定返回筛选结果的第几个 + :param index: 前面第几个查询结果元素 + :param filter_loc: 用于筛选元素的查询语法 + :param timeout: 查找元素的超时时间 + :return: 兄弟元素 + """ + nodes = self._get_brothers(index, filter_loc, 'preceding', timeout=timeout) + return nodes[-1] if nodes else None + + def next(self, index=1, filter_loc='', timeout=0): + """返回后面的一个兄弟元素,可用查询语法筛选,可指定返回筛选结果的第几个 + :param index: 后面第几个查询结果元素 + :param filter_loc: 用于筛选元素的查询语法 + :param timeout: 查找元素的超时时间 + :return: 兄弟元素 + """ + nodes = self._get_brothers(index, filter_loc, 'following', timeout=timeout) + return nodes[0] if nodes else None + + def before(self, index=1, filter_loc='', timeout=None): + """返回前面的一个兄弟元素,可用查询语法筛选,可指定返回筛选结果的第几个 + :param index: 前面第几个查询结果元素 + :param filter_loc: 用于筛选元素的查询语法 + :param timeout: 查找元素的超时时间 + :return: 本元素前面的某个元素或节点 + """ + nodes = self._get_brothers(index, filter_loc, 'preceding', False, timeout=timeout) + return nodes[-1] if nodes else None + + def after(self, index=1, filter_loc='', timeout=None): + """返回后面的一个兄弟元素,可用查询语法筛选,可指定返回筛选结果的第几个 + :param index: 后面第几个查询结果元素 + :param filter_loc: 用于筛选元素的查询语法 + :param timeout: 查找元素的超时时间 + :return: 本元素后面的某个元素或节点 + """ + nodes = self._get_brothers(index, filter_loc, 'following', False, timeout) + return nodes[0] if nodes else None + + def prevs(self, filter_loc='', timeout=0): + """返回前面全部兄弟元素或节点组成的列表,可用查询语法筛选 + :param filter_loc: 用于筛选元素的查询语法 + :param timeout: 查找元素的超时时间 + :return: 兄弟元素或节点文本组成的列表 + """ + return self._get_brothers(filter_loc=filter_loc, direction='preceding', timeout=timeout) + + def nexts(self, filter_loc='', timeout=0): + """返回后面全部兄弟元素或节点组成的列表,可用查询语法筛选 + :param filter_loc: 用于筛选元素的查询语法 + :param timeout: 查找元素的超时时间 + :return: 兄弟元素或节点文本组成的列表 + """ + return self._get_brothers(filter_loc=filter_loc, direction='following', timeout=timeout) + + def befores(self, filter_loc='', timeout=None): + """返回后面全部兄弟元素或节点组成的列表,可用查询语法筛选 + :param filter_loc: 用于筛选元素的查询语法 + :param timeout: 查找元素的超时时间 + :return: 本元素前面的元素或节点组成的列表 + """ + return self._get_brothers(filter_loc=filter_loc, direction='preceding', brother=False, timeout=timeout) + + def afters(self, filter_loc='', timeout=None): + """返回前面全部兄弟元素或节点组成的列表,可用查询语法筛选 + :param filter_loc: 用于筛选元素的查询语法 + :param timeout: 查找元素的超时时间 + :return: 本元素后面的元素或节点组成的列表 + """ + return self._get_brothers(filter_loc=filter_loc, direction='following', brother=False, timeout=timeout) + + def _get_brothers(self, index=None, filter_loc='', direction='following', brother=True, timeout=.5): + """按要求返回兄弟元素或节点组成的列表 + :param index: 获取第几个,该参数不为None时只获取该编号的元素 + :param filter_loc: 用于筛选元素的查询语法 + :param direction: 'following' 或 'preceding',查找的方向 + :param brother: 查找范围,在同级查找还是整个dom前后查找 + :param timeout: 查找等待时间 + :return: DriverElement对象或字符串 + """ + if index is not None and index < 1: + raise ValueError('index必须大于等于1。') + + brother = '-sibling' if brother else '' + + if not filter_loc: + loc = '*' + + else: + loc = get_loc(filter_loc, True) # 把定位符转换为xpath + if loc[0] == 'css selector': + raise ValueError('此css selector语法不受支持,请换成xpath。') + loc = loc[1].lstrip('./') + + loc = f'xpath:./{direction}{brother}::{loc}' + + nodes = self._ele(loc, timeout=timeout, single=False, relative=True) + nodes = [e for e in nodes if not (isinstance(e, str) and sub('[ \n\t\r]', '', e) == '')] + + if nodes and index is not None: + index = index - 1 if direction == 'following' else -index + try: + return [nodes[index]] + except IndexError: + return [] + else: + return nodes + + # ----------------以下属性或方法由后代实现---------------- + @property + def attrs(self): + return + + @property + def text(self): + return + + @property + def raw_text(self): + return + + @abstractmethod + def attr(self, attr: str): + return '' + + def _get_ele_path(self, mode): + return '' + + +class BasePage(BaseParser): + """页面类的基类""" + + def __init__(self, timeout=None): + """初始化函数""" + self._url = None + self.timeout = timeout if timeout is not None else 10 + self.retry_times = 3 + self.retry_interval = 2 + self._url_available = None + + @property + def title(self): + """返回网页title""" + ele = self.ele('xpath://title') + return ele.text if ele else None + + @property + def timeout(self): + """返回查找元素时等待的秒数""" + return self._timeout + + @timeout.setter + def timeout(self, second): + """设置查找元素时等待的秒数""" + self._timeout = second + + @property + def cookies(self): + """返回cookies""" + return self.get_cookies(True) + + @property + def url_available(self): + """返回当前访问的url有效性""" + return self._url_available + + def _before_connect(self, url, retry, interval): + """连接前的准备 + :param url: 要访问的url + :param retry: 重试次数 + :param interval: 重试间隔 + :return: 重试次数和间隔组成的tuple + """ + self._url = quote(url, safe='/:&?=%;#@+!') + retry = retry if retry is not None else self.retry_times + interval = interval if interval is not None else self.retry_interval + return retry, interval + + # ----------------以下属性或方法由后代实现---------------- + @property + def url(self): + return + + @property + def json(self): + return + + @abstractmethod + def get_cookies(self, as_dict=False): + return {} + + @abstractmethod + def get(self, url, show_errmsg=False, retry=None, interval=None): + pass diff --git a/DrissionPage/mixpage/base.pyi b/DrissionPage/mixpage/base.pyi new file mode 100644 index 0000000..1f12e80 --- /dev/null +++ b/DrissionPage/mixpage/base.pyi @@ -0,0 +1,175 @@ +# -*- coding:utf-8 -*- +""" +@Author : g1879 +@Contact : g1879@qq.com +""" +from abc import abstractmethod +from typing import Union, Tuple, List + + +class BaseParser(object): + + def __call__(self, loc_or_str: Union[Tuple[str, str], str]): ... + + def ele(self, loc_or_ele: Union[Tuple[str, str], str, BaseElement], timeout: float = None): ... + + def eles(self, loc_or_str: Union[Tuple[str, str], str], timeout=None): ... + + # ----------------以下属性或方法待后代实现---------------- + @property + def html(self) -> str: ... + + def s_ele(self, loc_or_ele: Union[Tuple[str, str], str, BaseElement]): ... + + def s_eles(self, loc_or_str: Union[Tuple[str, str], str]): ... + + @abstractmethod + def _ele(self, loc_or_ele, timeout: float = None, single: bool = True): ... + + +class BaseElement(BaseParser): + + def __init__(self, page: BasePage = None): + self.page: BasePage = ... + + # ----------------以下属性或方法由后代实现---------------- + @property + def tag(self) -> str: ... + + @abstractmethod + def _ele(self, loc_or_str: Union[Tuple[str, str], str], timeout: float = None, single: bool = True, + relative: bool = False): ... + + def parent(self, level_or_loc: Union[tuple, str, int] = 1): ... + + def prev(self, index: int = 1) -> None: ... + + def prevs(self) -> None: ... + + def next(self, index: int = 1): ... + + def nexts(self): ... + + +class DrissionElement(BaseElement): + + def __init__(self, + page: BasePage = ...): + self.page: BasePage = ... + + @property + def link(self) -> str: ... + + @property + def css_path(self) -> str: ... + + @property + def xpath(self) -> str: ... + + @property + def comments(self) -> list: ... + + def texts(self, text_node_only: bool = False) -> list: ... + + def parent(self, level_or_loc: Union[tuple, str, int] = 1) -> Union[DrissionElement, None]: ... + + def prev(self, + index: int = 1, + filter_loc: Union[tuple, str] = '', + timeout: float = 0) -> Union[DrissionElement, str, None]: ... + + def next(self, + index: int = 1, + filter_loc: Union[tuple, str] = '', + timeout: float = 0) -> Union[DrissionElement, str, None]: ... + + def before(self, + index: int = 1, + filter_loc: Union[tuple, str] = '', + timeout: float = None) -> Union[DrissionElement, str, None]: ... + + def after(self, + index: int = 1, + filter_loc: Union[tuple, str] = '', + timeout: float = None) -> Union[DrissionElement, str, None]: ... + + def prevs(self, + filter_loc: Union[tuple, str] = '', + timeout: float = 0) -> List[Union[DrissionElement, str]]: ... + + def nexts(self, + filter_loc: Union[tuple, str] = '', + timeout: float = 0) -> List[Union[DrissionElement, str]]: ... + + def befores(self, + filter_loc: Union[tuple, str] = '', + timeout: float = None) -> List[Union[DrissionElement, str]]: ... + + def afters(self, + filter_loc: Union[tuple, str] = '', + timeout: float = None) -> List[Union[DrissionElement, str]]: ... + + def _get_brothers(self, + index: int = None, + filter_loc: Union[tuple, str] = '', + direction: str = 'following', + brother: bool = True, + timeout: float = 0.5) -> List[Union[DrissionElement, str]]: ... + + # ----------------以下属性或方法由后代实现---------------- + @property + def attrs(self) -> dict: ... + + @property + def text(self) -> str: ... + + @property + def raw_text(self) -> str: ... + + @abstractmethod + def attr(self, attr: str) -> str: ... + + def _get_ele_path(self, mode) -> str: ... + + +class BasePage(BaseParser): + + def __init__(self, timeout: float = None): + self._url_available: bool = ... + self.retry_times: int = ... + self.retry_interval: float = ... + self._timeout = float = ... + + @property + def title(self) -> Union[str, None]: ... + + @property + def timeout(self) -> float: ... + + @timeout.setter + def timeout(self, second: float) -> None: ... + + @property + def cookies(self) -> dict: ... + + @property + def url_available(self) -> bool: ... + + def _before_connect(self, url: str, retry: int, interval: float) -> tuple: ... + + # ----------------以下属性或方法由后代实现---------------- + @property + def url(self) -> str: ... + + @property + def json(self) -> dict: ... + + @abstractmethod + def get_cookies(self, as_dict: bool = False) -> Union[list, dict]: ... + + @abstractmethod + def get(self, + url: str, + show_errmsg: bool = False, + retry: int = None, + interval: float = None): ... diff --git a/DrissionPage/drission.py b/DrissionPage/mixpage/drission.py similarity index 97% rename from DrissionPage/drission.py rename to DrissionPage/mixpage/drission.py index 7b6667f..de76a4a 100644 --- a/DrissionPage/drission.py +++ b/DrissionPage/mixpage/drission.py @@ -14,11 +14,11 @@ from selenium.webdriver.chrome.options import Options from selenium.webdriver.remote.webdriver import WebDriver as RemoteWebDriver from tldextract import extract -from .common.tools import get_pid_from_port -from .common.browser import connect_browser -from .common.web import cookies_to_tuple -from .configs.session_options import SessionOptions, session_options_to_dict -from .configs.driver_options import DriverOptions +from DrissionPage.common.tools import get_pid_from_port +from DrissionPage.common.browser import connect_browser +from DrissionPage.common.web import cookies_to_tuple +from DrissionPage.configs.session_options import SessionOptions, session_options_to_dict +from DrissionPage.configs.driver_options import DriverOptions class Drission(object): @@ -391,8 +391,7 @@ def create_driver(chrome_path, driver_path, options): # 若版本不对,获取对应 chromedriver 再试 except (WebDriverException, SessionNotCreatedException): print('打开失败,尝试获取driver。\n') - from .easy_set import get_match_driver - from DrissionPage.easy_set import get_chrome_path + from DrissionPage.easy_set import get_match_driver, get_chrome_path if chrome_path == 'chrome': chrome_path = get_chrome_path(show_msg=False, from_ini=False) diff --git a/DrissionPage/drission.pyi b/DrissionPage/mixpage/drission.pyi similarity index 95% rename from DrissionPage/drission.pyi rename to DrissionPage/mixpage/drission.pyi index e902073..3079bca 100644 --- a/DrissionPage/drission.pyi +++ b/DrissionPage/mixpage/drission.pyi @@ -12,8 +12,8 @@ from selenium.webdriver.chrome.options import Options from selenium.webdriver.chrome.webdriver import WebDriver from selenium.webdriver.remote.webdriver import WebDriver as RemoteWebDriver -from .configs.driver_options import DriverOptions -from .configs.session_options import SessionOptions +from DrissionPage.configs.driver_options import DriverOptions +from DrissionPage.configs.session_options import SessionOptions class Drission(object): diff --git a/DrissionPage/driver_element.py b/DrissionPage/mixpage/driver_element.py similarity index 99% rename from DrissionPage/driver_element.py rename to DrissionPage/mixpage/driver_element.py index 0422734..f88967f 100644 --- a/DrissionPage/driver_element.py +++ b/DrissionPage/mixpage/driver_element.py @@ -15,9 +15,9 @@ from selenium.webdriver.support import expected_conditions as ec from selenium.webdriver.support.wait import WebDriverWait from .base import DrissionElement, BaseElement -from .common.locator import str_to_loc, get_loc -from .common.tools import get_usable_path -from .common.web import format_html, get_ele_txt +from DrissionPage.common.locator import str_to_loc, get_loc +from DrissionPage.common.tools import get_usable_path +from DrissionPage.common.web import format_html, get_ele_txt from .session_element import make_session_ele diff --git a/DrissionPage/driver_element.pyi b/DrissionPage/mixpage/driver_element.pyi similarity index 100% rename from DrissionPage/driver_element.pyi rename to DrissionPage/mixpage/driver_element.pyi diff --git a/DrissionPage/driver_page.py b/DrissionPage/mixpage/driver_page.py similarity index 97% rename from DrissionPage/driver_page.py rename to DrissionPage/mixpage/driver_page.py index e7c3469..122a86a 100644 --- a/DrissionPage/driver_page.py +++ b/DrissionPage/mixpage/driver_page.py @@ -1,611 +1,611 @@ -# -*- coding:utf-8 -*- -""" -@Author : g1879 -@Contact : g1879@qq.com -""" -from glob import glob -from os import sep -from pathlib import Path -from time import sleep, perf_counter - -from selenium.common.exceptions import NoAlertPresentException -from selenium.webdriver.remote.webelement import WebElement -from selenium.webdriver.support.wait import WebDriverWait - -from .base import BasePage -from .common.tools import get_usable_path -from .driver_element import DriverElement, make_driver_ele, Scroll, ElementWaiter -from .session_element import make_session_ele - - -class DriverPage(BasePage): - """DriverPage封装了页面操作的常用功能,使用selenium来获取、解析、操作网页""" - - def __init__(self, driver, timeout=10): - """初始化函数,接收一个WebDriver对象,用来操作网页""" - super().__init__(timeout) - self._driver = driver - self._wait_object = None - self._scroll = None - - def __call__(self, loc_or_str, timeout=None): - """在内部查找元素 - 例:ele = page('@id=ele_id') - :param loc_or_str: 元素的定位信息,可以是loc元组,或查询字符串 - :param timeout: 超时时间 - :return: DriverElement对象或属性、文本 - """ - return self.ele(loc_or_str, timeout) - - # -----------------共有属性和方法------------------- - @property - def url(self): - """返回当前网页url""" - if not self._driver or not self.driver.current_url.startswith('http'): - return None - else: - return self.driver.current_url - - @property - def html(self): - """返回页面的html文本""" - return self.driver.find_element('xpath', "//*").get_attribute("outerHTML") - - @property - def json(self): - """当返回内容是json格式时,返回对应的字典""" - from json import loads - return loads(self('t:pre').text) - - def get(self, url, show_errmsg=False, retry=None, interval=None): - """访问url - :param url: 目标url - :param show_errmsg: 是否显示和抛出异常 - :param retry: 重试次数 - :param interval: 重试间隔(秒) - :return: 目标url是否可用,返回None表示不确定 - """ - retry, interval = self._before_connect(url, retry, interval) - self._url_available = self._d_connect(self._url, times=retry, interval=interval, show_errmsg=show_errmsg) - return self._url_available - - def ele(self, loc_or_ele, timeout=None): - """返回页面中符合条件的第一个元素 - :param loc_or_ele: 元素的定位信息,可以是元素对象,loc元组,或查询字符串 - :param timeout: 查找元素超时时间,默认与页面等待时间一致 - :return: DriverElement对象或属性、文本 - """ - return self._ele(loc_or_ele, timeout) - - def eles(self, loc_or_str, timeout=None): - """返回页面中所有符合条件的元素 - :param loc_or_str: 元素的定位信息,可以是loc元组,或查询字符串 - :param timeout: 查找元素超时时间,默认与页面等待时间一致 - :return: DriverElement对象或属性、文本组成的列表 - """ - return self._ele(loc_or_str, timeout, single=False) - - def s_ele(self, loc_or_ele=None): - """查找第一个符合条件的元素以SessionElement形式返回,处理复杂页面时效率很高 - :param loc_or_ele: 元素的定位信息,可以是loc元组,或查询字符串 - :return: SessionElement对象或属性、文本 - """ - if isinstance(loc_or_ele, DriverElement): - return make_session_ele(loc_or_ele) - else: - return make_session_ele(self, loc_or_ele) - - def s_eles(self, loc_or_str): - """查找所有符合条件的元素以SessionElement列表形式返回 - :param loc_or_str: 元素的定位信息,可以是loc元组,或查询字符串 - :return: SessionElement对象组成的列表 - """ - return make_session_ele(self, loc_or_str, single=False) - - def _ele(self, loc_or_ele, timeout=None, single=True): - """返回页面中符合条件的元素,默认返回第一个 - :param loc_or_ele: 元素的定位信息,可以是元素对象,loc元组,或查询字符串 - :param timeout: 查找元素超时时间 - :param single: True则返回第一个,False则返回全部 - :return: DriverElement对象 - """ - # 接收到字符串或元组,获取定位loc元组 - if isinstance(loc_or_ele, (str, tuple)): - return make_driver_ele(self, loc_or_ele, single, timeout) - - # 接收到DriverElement对象直接返回 - elif isinstance(loc_or_ele, DriverElement): - return loc_or_ele - - # 接收到WebElement对象打包成DriverElement对象返回 - elif isinstance(loc_or_ele, WebElement): - return DriverElement(loc_or_ele, self) - - # 接收到的类型不正确,抛出异常 - else: - raise ValueError('loc_or_str参数只能是tuple、str、DriverElement 或 WebElement类型。') - - def get_cookies(self, as_dict=False): - """返回当前网站cookies""" - if as_dict: - return {cookie['name']: cookie['value'] for cookie in self.driver.get_cookies()} - else: - return self.driver.get_cookies() - - @property - def timeout(self): - """返回查找元素时等待的秒数""" - return self._timeout - - @timeout.setter - def timeout(self, second): - """设置查找元素时等待的秒数""" - self._timeout = second - self._wait_object = None - - def _d_connect(self, to_url, times=0, interval=1, show_errmsg=False): - """尝试连接,重试若干次 - :param to_url: 要访问的url - :param times: 重试次数 - :param interval: 重试间隔(秒) - :param show_errmsg: 是否抛出异常 - :return: 是否成功,返回None表示不确定 - """ - err = None - is_ok = False - - for _ in range(times + 1): - try: - self.driver.get(to_url) - go_ok = True - except Exception as e: - err = e - go_ok = False - - is_ok = self.check_page() if go_ok else False - - if is_ok is not False: - break - - if _ < times: - sleep(interval) - if show_errmsg: - print(f'重试 {to_url}') - - if is_ok is False and show_errmsg: - raise err if err is not None else ConnectionError('连接异常。') - - return is_ok - - # ----------------driver独有属性和方法----------------------- - @property - def driver(self): - return self._driver - - @property - def wait_object(self): - """返回WebDriverWait对象,重用避免每次新建对象""" - if self._wait_object is None: - self._wait_object = WebDriverWait(self.driver, timeout=self.timeout) - - return self._wait_object - - @property - def timeouts(self): - """返回三种超时时间,selenium4以上版本可用""" - return {'implicit': self.timeout, - 'pageLoad': self.driver.timeouts.page_load, - 'script': self.driver.timeouts.script} - - @property - def tabs_count(self): - """返回标签页数量""" - try: - return len(self.driver.window_handles) - except Exception: - return 0 - - @property - def tab_handles(self): - """返回所有标签页handle列表""" - return self.driver.window_handles - - @property - def current_tab_index(self): - """返回当前标签页序号""" - return self.driver.window_handles.index(self.driver.current_window_handle) - - @property - def current_tab_handle(self): - """返回当前标签页handle""" - return self.driver.current_window_handle - - @property - def active_ele(self): - """返回当前焦点所在元素""" - return DriverElement(self.driver.switch_to.active_element, self) - - @property - def scroll(self): - """用于滚动滚动条的对象""" - if self._scroll is None: - self._scroll = Scroll(self) - return self._scroll - - @property - def to_frame(self): - """用于跳转到frame的对象,调用其方法实现跳转 - 示例: - page.to_frame.by_loc('tag:iframe') - 通过传入frame的查询字符串定位 - page.to_frame.by_loc((By.TAG_NAME, 'iframe')) - 通过传入定位符定位 - page.to_frame.by_id('iframe_id') - 通过frame的id属性定位 - page.to_frame('iframe_name') - 通过frame的name属性定位 - page.to_frame(iframe_element) - 通过传入元素对象定位 - page.to_frame(0) - 通过frame的序号定位 - page.to_frame.main() - 跳到最顶层 - page.to_frame.parent() - 跳到上一层 - """ - return ToFrame(self) - - def set_timeouts(self, implicit=None, pageLoad=None, script=None): - """设置超时时间,单位为秒,selenium4以上版本有效 - :param implicit: 查找元素超时时间 - :param pageLoad: 页面加载超时时间 - :param script: 脚本运行超时时间 - :return: None - """ - if implicit is not None: - self.timeout = implicit - - if pageLoad is not None: - self.driver.set_page_load_timeout(pageLoad) - - if script is not None: - self.driver.set_script_timeout(script) - - def wait_ele(self, loc_or_ele, timeout=None): - """等待元素从dom删除、显示、隐藏 - :param loc_or_ele: 可以是元素、查询字符串、loc元组 - :param timeout: 等待超时时间 - :return: 用于等待的ElementWaiter对象 - """ - return ElementWaiter(self, loc_or_ele, timeout) - - def check_page(self): - """检查页面是否符合预期 - 由子类自行实现各页面的判定规则 - """ - return None - - def run_script(self, script, *args): - """执行js代码 - :param script: js文本 - :param args: 传入的参数 - :return: js执行结果 - """ - return self.driver.execute_script(script, *args) - - def run_async_script(self, script, *args): - """以异步方式执行js代码 - :param script: js文本 - :param args: 传入的参数 - :return: js执行结果 - """ - return self.driver.execute_async_script(script, *args) - - def run_cdp(self, cmd, **cmd_args): - """执行Chrome DevTools Protocol语句 - :param cmd: 协议项目 - :param cmd_args: 参数 - :return: 执行的结果 - """ - return self.driver.execute_cdp_cmd(cmd, cmd_args) - - def create_tab(self, url=''): - """新建并定位到一个标签页,该标签页在最后面 - :param url: 新标签页跳转到的网址 - :return: None - """ - self.driver.switch_to.new_window('tab') - if url: - self.get(url) - - def close_tabs(self, num_or_handles=None): - """关闭传入的标签页,默认关闭当前页。可传入多个 - 注意:当程序使用的是接管的浏览器,获取到的 handle 顺序和视觉效果不一致,不能按序号关闭。 - :param num_or_handles:要关闭的标签页序号或handle,可传入handle和序号组成的列表或元组,为None时关闭当前页 - :return: None - """ - tabs = (self.current_tab_handle,) if num_or_handles is None else get_handles(self.tab_handles, num_or_handles) - for i in tabs: - self.driver.switch_to.window(i) - self.driver.close() - - self.to_tab(0) - - def close_other_tabs(self, num_or_handles=None): - """关闭传入的标签页以外标签页,默认保留当前页。可传入多个 - 注意:当程序使用的是接管的浏览器,获取到的 handle 顺序和视觉效果不一致,不能按序号关闭。 - :param num_or_handles: 要保留的标签页序号或handle,可传入handle和序号组成的列表或元组,为None时保存当前页 - :return: None - """ - all_tabs = self.driver.window_handles - reserve_tabs = {self.current_tab_handle} if num_or_handles is None else get_handles(all_tabs, num_or_handles) - - for i in set(all_tabs) - reserve_tabs: - self.driver.switch_to.window(i) - self.driver.close() - - self.to_tab(0) - - def to_tab(self, num_or_handle=0): - """跳转到标签页 - 注意:当程序使用的是接管的浏览器,获取到的 handle 顺序和视觉效果不一致 - :param num_or_handle: 标签页序号或handle字符串,序号第一个为0,最后为-1 - :return: None - """ - try: - tab = int(num_or_handle) - except (ValueError, TypeError): - tab = num_or_handle - - tab = self.driver.window_handles[tab] if isinstance(tab, int) else tab - self.driver.switch_to.window(tab) - - def set_ua_to_tab(self, ua): - """为当前tab设置user agent,只在当前tab有效 - :param ua: user agent字符串 - :return: None - """ - self.driver.execute_cdp_cmd("Network.setUserAgentOverride", {"userAgent": ua}) - - def get_session_storage(self, item=None): - """获取sessionStorage信息,不设置item则获取全部 - :param item: 要获取的项,不设置则返回全部 - :return: sessionStorage一个或所有项内容 - """ - js = f'return sessionStorage.getItem("{item}");' if item else 'return sessionStorage;' - return self.run_script(js) - - def get_local_storage(self, item=None): - """获取localStorage信息,不设置item则获取全部 - :param item: 要获取的项目,不设置则返回全部 - :return: localStorage一个或所有项内容 - """ - js = f'return localStorage.getItem("{item}");' if item else 'return localStorage;' - return self.run_script(js) - - def set_session_storage(self, item, value): - """设置或删除某项sessionStorage信息 - :param item: 要设置的项 - :param value: 项的值,设置为False时,删除该项 - :return: None - """ - s = f'sessionStorage.removeItem("{item}");' if item is False else f'sessionStorage.setItem("{item}","{value}");' - self.run_script(s) - - def set_local_storage(self, item, value): - """设置或删除某项localStorage信息 - :param item: 要设置的项 - :param value: 项的值,设置为False时,删除该项 - :return: None - """ - s = f'localStorage.removeItem("{item}");' if item is False else f'localStorage.setItem("{item}","{value}");' - self.run_script(s) - - def clean_cache(self, session_storage=True, local_storage=True, cache=True, cookies=True): - """清除缓存,可选要清除的项 - :param session_storage: 是否清除sessionStorage - :param local_storage: 是否清除localStorage - :param cache: 是否清除cache - :param cookies: 是否清除cookies - :return: None - """ - if session_storage: - self.run_script('sessionStorage.clear();') - if local_storage: - self.run_script('localStorage.clear();') - if cache: - self.run_cdp('Network.clearBrowserCache') - if cookies: - self.run_cdp('Network.clearBrowserCookies') - - def screenshot(self, path=None, filename=None, as_bytes=False): - """截取页面可见范围截图 - :param path: 保存路径 - :param filename: 图片文件名,不传入时以页面title命名 - :param as_bytes: 是否已字节形式返回图片,为True时上面两个参数失效 - :return: 图片完整路径或字节文本 - """ - if as_bytes: - return self.driver.get_screenshot_as_png() - - name = filename or self.title - if not name.lower().endswith('.png'): - name = f'{name}.png' - path = Path(path or '.').absolute() - path.mkdir(parents=True, exist_ok=True) - img_path = str(get_usable_path(f'{path}{sep}{name}')) - self.driver.save_screenshot(img_path) - return img_path - - def scroll_to_see(self, loc_or_ele): - """滚动页面直到元素可见 - :param loc_or_ele: 元素的定位信息,可以是loc元组,或查询字符串(详见ele函数注释) - :return: None - """ - ele = self.ele(loc_or_ele) - ele.run_script("arguments[0].scrollIntoView();") - - def refresh(self): - """刷新当前页面""" - self.driver.refresh() - - def stop_loading(self): - """强制停止页面加载""" - self.run_cdp('Page.stopLoading') - - def back(self): - """在浏览历史中后退一步""" - self.driver.back() - - def forward(self): - """在浏览历史中前进一步""" - self.driver.forward() - - def set_window_size(self, width=None, height=None): - """设置浏览器窗口大小,默认最大化,任一参数为0最小化 - :param width: 浏览器窗口高 - :param height: 浏览器窗口宽 - :return: None - """ - if width is None and height is None: - self.driver.maximize_window() - - elif width == 0 or height == 0: - self.driver.minimize_window() - - else: - if width < 0 or height < 0: - raise ValueError('x 和 y参数必须大于0。') - - new_x = width or self.driver.get_window_size()['width'] - new_y = height or self.driver.get_window_size()['height'] - self.driver.set_window_size(new_x, new_y) - - def chrome_downloading(self, download_path): - """返回浏览器下载中的文件列表 - :param download_path: 下载文件夹路径 - :return: 文件列表 - """ - return glob(f'{download_path}{sep}*.crdownload') - - def process_alert(self, ok=True, send=None, timeout=None): - """处理提示框 - :param ok: True表示确认,False表示取消,其它值不会按按钮但依然返回文本值 - :param send: 处理prompt提示框时可输入文本 - :param timeout: 等待提示框出现的超时时间 - :return: 提示框内容文本,未等到提示框则返回None - """ - - def do_it(): - try: - return self.driver.switch_to.alert - except NoAlertPresentException: - return False - - timeout = timeout if timeout is not None else self.timeout - t1 = perf_counter() - alert = do_it() - while alert is False and perf_counter() - t1 <= timeout: - alert = do_it() - - if alert is False: - return None - - res_text = alert.text - - if send is not None: - alert.send_keys(send) - - if ok is True: - alert.accept() - elif ok is False: - alert.dismiss() - - return res_text - - -class ToFrame(object): - """用于处理焦点跳转到页面框架的类""" - - def __init__(self, page): - self.page = page - - def __call__(self, condition='main'): - """跳转到(i)frame,可传入id、name、序号、元素对象、定位符 - :param condition: (i)frame,可传入id、name、序号、元素对象、定位符 - :return: 当前页面对象 - """ - if isinstance(condition, (DriverElement, WebElement)): - self.by_ele(condition) - elif isinstance(condition, int): - self.by_index(condition) - elif ':' not in condition and '=' not in condition and not condition.startswith(('#', '.', '@')): - self.by_id(condition) - else: - self.by_loc(condition) - - return self.page - - def main(self): - """焦点跳转到最高层级框架""" - self.page.driver.switch_to.default_content() - return self.page - - def parent(self, level=1): - """焦点跳转到上级框架,可指定上级层数 - :param level: 上面第几层框架 - :return: 框架所在页面对象 - """ - if level < 1: - raise ValueError('level参数须是大于0的整数。') - for _ in range(level): - self.page.driver.switch_to.parent_frame() - return self.page - - def by_id(self, id_): - """焦点跳转到id为该值的(i)frame - :param id_: (i)frame的id属性值 - :return: 框架所在页面对象 - """ - self.page.driver.switch_to.frame(id_) - return self.page - - def by_name(self, name): - """焦点跳转到name为该值的(i)frame - :param name: (i)frame的name属性值 - :return: 框架所在页面对象 - """ - self.page.driver.switch_to.frame(name) - return self.page - - def by_index(self, index): - """焦点跳转到页面中第几个(i)frame - :param index: 页面中第几个(i)frame - :return: 框架所在页面对象 - """ - self.page.driver.switch_to.frame(index) - return self.page - - def by_loc(self, loc): - """焦点跳转到根据定位符获取到的(i)frame - :param loc: 定位符,支持selenium原生和DriverPage定位符 - :return: 框架所在页面对象 - """ - self.page.driver.switch_to.frame(self.page(loc).inner_ele) - return self.page - - def by_ele(self, ele): - """焦点跳转到传入的(i)frame元素对象 - :param ele: (i)frame元素对象 - :return: 框架所在页面对象 - """ - if isinstance(ele, DriverElement): - ele = ele.inner_ele - self.page.driver.switch_to.frame(ele) - return self.page - - -def get_handles(handles, num_or_handles): - """返回指定标签页组成的set - :param handles: handles列表 - :param num_or_handles: 指定的标签页,可以是多个 - :return: 指定标签页组成的set - """ - if isinstance(num_or_handles, (int, str)): - num_or_handles = (num_or_handles,) - elif not isinstance(num_or_handles, (list, tuple)): - raise TypeError('num_or_handle参数只能是int、str、list 或 tuple类型。') - - return set(i if isinstance(i, str) else handles[i] for i in num_or_handles) +# -*- coding:utf-8 -*- +""" +@Author : g1879 +@Contact : g1879@qq.com +""" +from glob import glob +from os import sep +from pathlib import Path +from time import sleep, perf_counter + +from selenium.common.exceptions import NoAlertPresentException +from selenium.webdriver.remote.webelement import WebElement +from selenium.webdriver.support.wait import WebDriverWait + +from .base import BasePage +from DrissionPage.common.tools import get_usable_path +from .driver_element import DriverElement, make_driver_ele, Scroll, ElementWaiter +from .session_element import make_session_ele + + +class DriverPage(BasePage): + """DriverPage封装了页面操作的常用功能,使用selenium来获取、解析、操作网页""" + + def __init__(self, driver, timeout=10): + """初始化函数,接收一个WebDriver对象,用来操作网页""" + super().__init__(timeout) + self._driver = driver + self._wait_object = None + self._scroll = None + + def __call__(self, loc_or_str, timeout=None): + """在内部查找元素 + 例:ele = page('@id=ele_id') + :param loc_or_str: 元素的定位信息,可以是loc元组,或查询字符串 + :param timeout: 超时时间 + :return: DriverElement对象或属性、文本 + """ + return self.ele(loc_or_str, timeout) + + # -----------------共有属性和方法------------------- + @property + def url(self): + """返回当前网页url""" + if not self._driver or not self.driver.current_url.startswith('http'): + return None + else: + return self.driver.current_url + + @property + def html(self): + """返回页面的html文本""" + return self.driver.find_element('xpath', "//*").get_attribute("outerHTML") + + @property + def json(self): + """当返回内容是json格式时,返回对应的字典""" + from json import loads + return loads(self('t:pre').text) + + def get(self, url, show_errmsg=False, retry=None, interval=None): + """访问url + :param url: 目标url + :param show_errmsg: 是否显示和抛出异常 + :param retry: 重试次数 + :param interval: 重试间隔(秒) + :return: 目标url是否可用,返回None表示不确定 + """ + retry, interval = self._before_connect(url, retry, interval) + self._url_available = self._d_connect(self._url, times=retry, interval=interval, show_errmsg=show_errmsg) + return self._url_available + + def ele(self, loc_or_ele, timeout=None): + """返回页面中符合条件的第一个元素 + :param loc_or_ele: 元素的定位信息,可以是元素对象,loc元组,或查询字符串 + :param timeout: 查找元素超时时间,默认与页面等待时间一致 + :return: DriverElement对象或属性、文本 + """ + return self._ele(loc_or_ele, timeout) + + def eles(self, loc_or_str, timeout=None): + """返回页面中所有符合条件的元素 + :param loc_or_str: 元素的定位信息,可以是loc元组,或查询字符串 + :param timeout: 查找元素超时时间,默认与页面等待时间一致 + :return: DriverElement对象或属性、文本组成的列表 + """ + return self._ele(loc_or_str, timeout, single=False) + + def s_ele(self, loc_or_ele=None): + """查找第一个符合条件的元素以SessionElement形式返回,处理复杂页面时效率很高 + :param loc_or_ele: 元素的定位信息,可以是loc元组,或查询字符串 + :return: SessionElement对象或属性、文本 + """ + if isinstance(loc_or_ele, DriverElement): + return make_session_ele(loc_or_ele) + else: + return make_session_ele(self, loc_or_ele) + + def s_eles(self, loc_or_str): + """查找所有符合条件的元素以SessionElement列表形式返回 + :param loc_or_str: 元素的定位信息,可以是loc元组,或查询字符串 + :return: SessionElement对象组成的列表 + """ + return make_session_ele(self, loc_or_str, single=False) + + def _ele(self, loc_or_ele, timeout=None, single=True): + """返回页面中符合条件的元素,默认返回第一个 + :param loc_or_ele: 元素的定位信息,可以是元素对象,loc元组,或查询字符串 + :param timeout: 查找元素超时时间 + :param single: True则返回第一个,False则返回全部 + :return: DriverElement对象 + """ + # 接收到字符串或元组,获取定位loc元组 + if isinstance(loc_or_ele, (str, tuple)): + return make_driver_ele(self, loc_or_ele, single, timeout) + + # 接收到DriverElement对象直接返回 + elif isinstance(loc_or_ele, DriverElement): + return loc_or_ele + + # 接收到WebElement对象打包成DriverElement对象返回 + elif isinstance(loc_or_ele, WebElement): + return DriverElement(loc_or_ele, self) + + # 接收到的类型不正确,抛出异常 + else: + raise ValueError('loc_or_str参数只能是tuple、str、DriverElement 或 WebElement类型。') + + def get_cookies(self, as_dict=False): + """返回当前网站cookies""" + if as_dict: + return {cookie['name']: cookie['value'] for cookie in self.driver.get_cookies()} + else: + return self.driver.get_cookies() + + @property + def timeout(self): + """返回查找元素时等待的秒数""" + return self._timeout + + @timeout.setter + def timeout(self, second): + """设置查找元素时等待的秒数""" + self._timeout = second + self._wait_object = None + + def _d_connect(self, to_url, times=0, interval=1, show_errmsg=False): + """尝试连接,重试若干次 + :param to_url: 要访问的url + :param times: 重试次数 + :param interval: 重试间隔(秒) + :param show_errmsg: 是否抛出异常 + :return: 是否成功,返回None表示不确定 + """ + err = None + is_ok = False + + for _ in range(times + 1): + try: + self.driver.get(to_url) + go_ok = True + except Exception as e: + err = e + go_ok = False + + is_ok = self.check_page() if go_ok else False + + if is_ok is not False: + break + + if _ < times: + sleep(interval) + if show_errmsg: + print(f'重试 {to_url}') + + if is_ok is False and show_errmsg: + raise err if err is not None else ConnectionError('连接异常。') + + return is_ok + + # ----------------driver独有属性和方法----------------------- + @property + def driver(self): + return self._driver + + @property + def wait_object(self): + """返回WebDriverWait对象,重用避免每次新建对象""" + if self._wait_object is None: + self._wait_object = WebDriverWait(self.driver, timeout=self.timeout) + + return self._wait_object + + @property + def timeouts(self): + """返回三种超时时间,selenium4以上版本可用""" + return {'implicit': self.timeout, + 'pageLoad': self.driver.timeouts.page_load, + 'script': self.driver.timeouts.script} + + @property + def tabs_count(self): + """返回标签页数量""" + try: + return len(self.driver.window_handles) + except Exception: + return 0 + + @property + def tab_handles(self): + """返回所有标签页handle列表""" + return self.driver.window_handles + + @property + def current_tab_index(self): + """返回当前标签页序号""" + return self.driver.window_handles.index(self.driver.current_window_handle) + + @property + def current_tab_handle(self): + """返回当前标签页handle""" + return self.driver.current_window_handle + + @property + def active_ele(self): + """返回当前焦点所在元素""" + return DriverElement(self.driver.switch_to.active_element, self) + + @property + def scroll(self): + """用于滚动滚动条的对象""" + if self._scroll is None: + self._scroll = Scroll(self) + return self._scroll + + @property + def to_frame(self): + """用于跳转到frame的对象,调用其方法实现跳转 + 示例: + page.to_frame.by_loc('tag:iframe') - 通过传入frame的查询字符串定位 + page.to_frame.by_loc((By.TAG_NAME, 'iframe')) - 通过传入定位符定位 + page.to_frame.by_id('iframe_id') - 通过frame的id属性定位 + page.to_frame('iframe_name') - 通过frame的name属性定位 + page.to_frame(iframe_element) - 通过传入元素对象定位 + page.to_frame(0) - 通过frame的序号定位 + page.to_frame.main() - 跳到最顶层 + page.to_frame.parent() - 跳到上一层 + """ + return ToFrame(self) + + def set_timeouts(self, implicit=None, pageLoad=None, script=None): + """设置超时时间,单位为秒,selenium4以上版本有效 + :param implicit: 查找元素超时时间 + :param pageLoad: 页面加载超时时间 + :param script: 脚本运行超时时间 + :return: None + """ + if implicit is not None: + self.timeout = implicit + + if pageLoad is not None: + self.driver.set_page_load_timeout(pageLoad) + + if script is not None: + self.driver.set_script_timeout(script) + + def wait_ele(self, loc_or_ele, timeout=None): + """等待元素从dom删除、显示、隐藏 + :param loc_or_ele: 可以是元素、查询字符串、loc元组 + :param timeout: 等待超时时间 + :return: 用于等待的ElementWaiter对象 + """ + return ElementWaiter(self, loc_or_ele, timeout) + + def check_page(self): + """检查页面是否符合预期 + 由子类自行实现各页面的判定规则 + """ + return None + + def run_script(self, script, *args): + """执行js代码 + :param script: js文本 + :param args: 传入的参数 + :return: js执行结果 + """ + return self.driver.execute_script(script, *args) + + def run_async_script(self, script, *args): + """以异步方式执行js代码 + :param script: js文本 + :param args: 传入的参数 + :return: js执行结果 + """ + return self.driver.execute_async_script(script, *args) + + def run_cdp(self, cmd, **cmd_args): + """执行Chrome DevTools Protocol语句 + :param cmd: 协议项目 + :param cmd_args: 参数 + :return: 执行的结果 + """ + return self.driver.execute_cdp_cmd(cmd, cmd_args) + + def create_tab(self, url=''): + """新建并定位到一个标签页,该标签页在最后面 + :param url: 新标签页跳转到的网址 + :return: None + """ + self.driver.switch_to.new_window('tab') + if url: + self.get(url) + + def close_tabs(self, num_or_handles=None): + """关闭传入的标签页,默认关闭当前页。可传入多个 + 注意:当程序使用的是接管的浏览器,获取到的 handle 顺序和视觉效果不一致,不能按序号关闭。 + :param num_or_handles:要关闭的标签页序号或handle,可传入handle和序号组成的列表或元组,为None时关闭当前页 + :return: None + """ + tabs = (self.current_tab_handle,) if num_or_handles is None else get_handles(self.tab_handles, num_or_handles) + for i in tabs: + self.driver.switch_to.window(i) + self.driver.close() + + self.to_tab(0) + + def close_other_tabs(self, num_or_handles=None): + """关闭传入的标签页以外标签页,默认保留当前页。可传入多个 + 注意:当程序使用的是接管的浏览器,获取到的 handle 顺序和视觉效果不一致,不能按序号关闭。 + :param num_or_handles: 要保留的标签页序号或handle,可传入handle和序号组成的列表或元组,为None时保存当前页 + :return: None + """ + all_tabs = self.driver.window_handles + reserve_tabs = {self.current_tab_handle} if num_or_handles is None else get_handles(all_tabs, num_or_handles) + + for i in set(all_tabs) - reserve_tabs: + self.driver.switch_to.window(i) + self.driver.close() + + self.to_tab(0) + + def to_tab(self, num_or_handle=0): + """跳转到标签页 + 注意:当程序使用的是接管的浏览器,获取到的 handle 顺序和视觉效果不一致 + :param num_or_handle: 标签页序号或handle字符串,序号第一个为0,最后为-1 + :return: None + """ + try: + tab = int(num_or_handle) + except (ValueError, TypeError): + tab = num_or_handle + + tab = self.driver.window_handles[tab] if isinstance(tab, int) else tab + self.driver.switch_to.window(tab) + + def set_ua_to_tab(self, ua): + """为当前tab设置user agent,只在当前tab有效 + :param ua: user agent字符串 + :return: None + """ + self.driver.execute_cdp_cmd("Network.setUserAgentOverride", {"userAgent": ua}) + + def get_session_storage(self, item=None): + """获取sessionStorage信息,不设置item则获取全部 + :param item: 要获取的项,不设置则返回全部 + :return: sessionStorage一个或所有项内容 + """ + js = f'return sessionStorage.getItem("{item}");' if item else 'return sessionStorage;' + return self.run_script(js) + + def get_local_storage(self, item=None): + """获取localStorage信息,不设置item则获取全部 + :param item: 要获取的项目,不设置则返回全部 + :return: localStorage一个或所有项内容 + """ + js = f'return localStorage.getItem("{item}");' if item else 'return localStorage;' + return self.run_script(js) + + def set_session_storage(self, item, value): + """设置或删除某项sessionStorage信息 + :param item: 要设置的项 + :param value: 项的值,设置为False时,删除该项 + :return: None + """ + s = f'sessionStorage.removeItem("{item}");' if item is False else f'sessionStorage.setItem("{item}","{value}");' + self.run_script(s) + + def set_local_storage(self, item, value): + """设置或删除某项localStorage信息 + :param item: 要设置的项 + :param value: 项的值,设置为False时,删除该项 + :return: None + """ + s = f'localStorage.removeItem("{item}");' if item is False else f'localStorage.setItem("{item}","{value}");' + self.run_script(s) + + def clean_cache(self, session_storage=True, local_storage=True, cache=True, cookies=True): + """清除缓存,可选要清除的项 + :param session_storage: 是否清除sessionStorage + :param local_storage: 是否清除localStorage + :param cache: 是否清除cache + :param cookies: 是否清除cookies + :return: None + """ + if session_storage: + self.run_script('sessionStorage.clear();') + if local_storage: + self.run_script('localStorage.clear();') + if cache: + self.run_cdp('Network.clearBrowserCache') + if cookies: + self.run_cdp('Network.clearBrowserCookies') + + def screenshot(self, path=None, filename=None, as_bytes=False): + """截取页面可见范围截图 + :param path: 保存路径 + :param filename: 图片文件名,不传入时以页面title命名 + :param as_bytes: 是否已字节形式返回图片,为True时上面两个参数失效 + :return: 图片完整路径或字节文本 + """ + if as_bytes: + return self.driver.get_screenshot_as_png() + + name = filename or self.title + if not name.lower().endswith('.png'): + name = f'{name}.png' + path = Path(path or '.').absolute() + path.mkdir(parents=True, exist_ok=True) + img_path = str(get_usable_path(f'{path}{sep}{name}')) + self.driver.save_screenshot(img_path) + return img_path + + def scroll_to_see(self, loc_or_ele): + """滚动页面直到元素可见 + :param loc_or_ele: 元素的定位信息,可以是loc元组,或查询字符串(详见ele函数注释) + :return: None + """ + ele = self.ele(loc_or_ele) + ele.run_script("arguments[0].scrollIntoView();") + + def refresh(self): + """刷新当前页面""" + self.driver.refresh() + + def stop_loading(self): + """强制停止页面加载""" + self.run_cdp('Page.stopLoading') + + def back(self): + """在浏览历史中后退一步""" + self.driver.back() + + def forward(self): + """在浏览历史中前进一步""" + self.driver.forward() + + def set_window_size(self, width=None, height=None): + """设置浏览器窗口大小,默认最大化,任一参数为0最小化 + :param width: 浏览器窗口高 + :param height: 浏览器窗口宽 + :return: None + """ + if width is None and height is None: + self.driver.maximize_window() + + elif width == 0 or height == 0: + self.driver.minimize_window() + + else: + if width < 0 or height < 0: + raise ValueError('x 和 y参数必须大于0。') + + new_x = width or self.driver.get_window_size()['width'] + new_y = height or self.driver.get_window_size()['height'] + self.driver.set_window_size(new_x, new_y) + + def chrome_downloading(self, download_path): + """返回浏览器下载中的文件列表 + :param download_path: 下载文件夹路径 + :return: 文件列表 + """ + return glob(f'{download_path}{sep}*.crdownload') + + def process_alert(self, ok=True, send=None, timeout=None): + """处理提示框 + :param ok: True表示确认,False表示取消,其它值不会按按钮但依然返回文本值 + :param send: 处理prompt提示框时可输入文本 + :param timeout: 等待提示框出现的超时时间 + :return: 提示框内容文本,未等到提示框则返回None + """ + + def do_it(): + try: + return self.driver.switch_to.alert + except NoAlertPresentException: + return False + + timeout = timeout if timeout is not None else self.timeout + t1 = perf_counter() + alert = do_it() + while alert is False and perf_counter() - t1 <= timeout: + alert = do_it() + + if alert is False: + return None + + res_text = alert.text + + if send is not None: + alert.send_keys(send) + + if ok is True: + alert.accept() + elif ok is False: + alert.dismiss() + + return res_text + + +class ToFrame(object): + """用于处理焦点跳转到页面框架的类""" + + def __init__(self, page): + self.page = page + + def __call__(self, condition='main'): + """跳转到(i)frame,可传入id、name、序号、元素对象、定位符 + :param condition: (i)frame,可传入id、name、序号、元素对象、定位符 + :return: 当前页面对象 + """ + if isinstance(condition, (DriverElement, WebElement)): + self.by_ele(condition) + elif isinstance(condition, int): + self.by_index(condition) + elif ':' not in condition and '=' not in condition and not condition.startswith(('#', '.', '@')): + self.by_id(condition) + else: + self.by_loc(condition) + + return self.page + + def main(self): + """焦点跳转到最高层级框架""" + self.page.driver.switch_to.default_content() + return self.page + + def parent(self, level=1): + """焦点跳转到上级框架,可指定上级层数 + :param level: 上面第几层框架 + :return: 框架所在页面对象 + """ + if level < 1: + raise ValueError('level参数须是大于0的整数。') + for _ in range(level): + self.page.driver.switch_to.parent_frame() + return self.page + + def by_id(self, id_): + """焦点跳转到id为该值的(i)frame + :param id_: (i)frame的id属性值 + :return: 框架所在页面对象 + """ + self.page.driver.switch_to.frame(id_) + return self.page + + def by_name(self, name): + """焦点跳转到name为该值的(i)frame + :param name: (i)frame的name属性值 + :return: 框架所在页面对象 + """ + self.page.driver.switch_to.frame(name) + return self.page + + def by_index(self, index): + """焦点跳转到页面中第几个(i)frame + :param index: 页面中第几个(i)frame + :return: 框架所在页面对象 + """ + self.page.driver.switch_to.frame(index) + return self.page + + def by_loc(self, loc): + """焦点跳转到根据定位符获取到的(i)frame + :param loc: 定位符,支持selenium原生和DriverPage定位符 + :return: 框架所在页面对象 + """ + self.page.driver.switch_to.frame(self.page(loc).inner_ele) + return self.page + + def by_ele(self, ele): + """焦点跳转到传入的(i)frame元素对象 + :param ele: (i)frame元素对象 + :return: 框架所在页面对象 + """ + if isinstance(ele, DriverElement): + ele = ele.inner_ele + self.page.driver.switch_to.frame(ele) + return self.page + + +def get_handles(handles, num_or_handles): + """返回指定标签页组成的set + :param handles: handles列表 + :param num_or_handles: 指定的标签页,可以是多个 + :return: 指定标签页组成的set + """ + if isinstance(num_or_handles, (int, str)): + num_or_handles = (num_or_handles,) + elif not isinstance(num_or_handles, (list, tuple)): + raise TypeError('num_or_handle参数只能是int、str、list 或 tuple类型。') + + return set(i if isinstance(i, str) else handles[i] for i in num_or_handles) diff --git a/DrissionPage/driver_page.pyi b/DrissionPage/mixpage/driver_page.pyi similarity index 100% rename from DrissionPage/driver_page.pyi rename to DrissionPage/mixpage/driver_page.pyi diff --git a/DrissionPage/mix_page.py b/DrissionPage/mixpage/mix_page.py similarity index 96% rename from DrissionPage/mix_page.py rename to DrissionPage/mixpage/mix_page.py index 933ac38..64312d7 100644 --- a/DrissionPage/mix_page.py +++ b/DrissionPage/mixpage/mix_page.py @@ -1,346 +1,344 @@ -# -*- coding:utf-8 -*- -""" -@Author : g1879 -@Contact : g1879@qq.com -""" -from .base import BasePage -from .common.constants import NoneElement -from .drission import Drission -from .driver_page import DriverPage -from .session_page import SessionPage - - -class MixPage(SessionPage, DriverPage, BasePage): - """MixPage整合了DriverPage和SessionPage,封装了对页面的操作, - 可在selenium(d模式)和requests(s模式)间无缝切换。 - 切换的时候会自动同步cookies。 - 获取信息功能为两种模式共有,操作页面元素功能只有d模式有。 - 调用某种模式独有的功能,会自动切换到该模式。 - """ - - def __init__(self, mode='d', drission=None, timeout=None, driver_options=None, session_options=None): - """初始化函数 - :param mode: 'd' 或 's',即driver模式和session模式 - :param drission: Drission对象,不传入时会自动创建,有传入时driver_options和session_options参数无效 - :param timeout: 超时时间,d模式时为寻找元素时间,s模式时为连接时间,默认10秒 - :param driver_options: 浏览器设置,没传入drission参数时会用这个设置新建Drission对象中的WebDriver对象,传入False则不创建 - :param session_options: requests设置,没传入drission参数时会用这个设置新建Drission对象中的Session对象,传入False则不创建 - """ - self._mode = mode.lower() - if self._mode not in ('s', 'd'): - raise ValueError('mode参数只能是s或d。') - - super(DriverPage, self).__init__(timeout) - self._driver, self._session = (None, True) if self._mode == 's' else (True, None) - self._drission = drission or Drission(driver_options, session_options) - self._wait_object = None - self._response = None - self._scroll = None - self._download_set = None - self._download_path = None - - if self._mode == 'd': - try: - timeouts = self.drission.driver_options.timeouts - t = timeout if isinstance(timeout, (int, float)) else timeouts['implicit'] - self.set_timeouts(t, timeouts['pageLoad'], timeouts['script']) - - except Exception: - self.timeout = timeout if timeout is not None else 10 - - def __call__(self, loc_or_str, timeout=None): - """在内部查找元素 - 例:ele = page('@id=ele_id') - :param loc_or_str: 元素的定位信息,可以是loc元组,或查询字符串 - :param timeout: 超时时间 - :return: 子元素对象或属性文本 - """ - if self._mode == 's': - return super().__call__(loc_or_str) - elif self._mode == 'd': - return super(SessionPage, self).__call__(loc_or_str, timeout) - - # -----------------共有属性和方法------------------- - @property - def url(self): - """返回当前url""" - if self._mode == 'd': - return self._drission.driver.current_url if self._driver else None - elif self._mode == 's': - return self._session_url - - @property - def title(self): - """返回网页title""" - if self._mode == 's': - return super().title - elif self._mode == 'd': - return super(SessionPage, self).title - - @property - def html(self): - """返回页面html文本""" - if self._mode == 's': - return super().html - elif self._mode == 'd': - return super(SessionPage, self).html - - @property - def json(self): - """当返回内容是json格式时,返回对应的字典""" - if self._mode == 's': - return super().json - elif self._mode == 'd': - return super(SessionPage, self).json - - def get(self, url, show_errmsg=False, retry=None, interval=None, **kwargs): - """跳转到一个url - :param url: 目标url - :param show_errmsg: 是否显示和抛出异常 - :param retry: 重试次数 - :param interval: 重试间隔(秒) - :param kwargs: 连接参数,s模式专用 - :return: url是否可用,d模式返回None时表示不确定 - """ - if self._mode == 'd': - return super(SessionPage, self).get(url, show_errmsg, retry, interval) - elif self._mode == 's': - return super().get(url, show_errmsg, retry, interval, **kwargs) - - def ele(self, loc_or_ele, timeout=None): - """返回第一个符合条件的元素、属性或节点文本 - :param loc_or_ele: 元素的定位信息,可以是元素对象,loc元组,或查询字符串 - :param timeout: 查找元素超时时间,默认与页面等待时间一致 - :return: 元素对象或属性、文本节点文本 - """ - if self._mode == 's': - return super().ele(loc_or_ele) - elif self._mode == 'd': - return super(SessionPage, self).ele(loc_or_ele, timeout=timeout) - - def eles(self, loc_or_str, timeout=None): - """返回页面中所有符合条件的元素、属性或节点文本 - :param loc_or_str: 元素的定位信息,可以是loc元组,或查询字符串 - :param timeout: 查找元素超时时间,默认与页面等待时间一致 - :return: 元素对象或属性、文本组成的列表 - """ - if self._mode == 's': - return super().eles(loc_or_str) - elif self._mode == 'd': - return super(SessionPage, self).eles(loc_or_str, timeout=timeout) - - def s_ele(self, loc_or_ele=None): - """查找第一个符合条件的元素以SessionElement形式返回,d模式处理复杂页面时效率很高 - :param loc_or_ele: 元素的定位信息,可以是loc元组,或查询字符串 - :return: SessionElement对象或属性、文本 - """ - if self._mode == 's': - return super().s_ele(loc_or_ele) - elif self._mode == 'd': - return super(SessionPage, self).s_ele(loc_or_ele) - - def s_eles(self, loc_or_str): - """查找所有符合条件的元素以SessionElement形式返回,d模式处理复杂页面时效率很高 - :param loc_or_str: 元素的定位信息,可以是loc元组,或查询字符串 - :return: SessionElement对象或属性、文本组成的列表 - """ - if self._mode == 's': - return super().s_eles(loc_or_str) - elif self._mode == 'd': - return super(SessionPage, self).s_eles(loc_or_str) - - def _ele(self, loc_or_ele, timeout=None, single=True): - """返回页面中符合条件的元素、属性或节点文本,默认返回第一个 - :param loc_or_ele: 元素的定位信息,可以是元素对象,loc元组,或查询字符串 - :param timeout: 查找元素超时时间,d模式专用 - :param single: True则返回第一个,False则返回全部 - :return: 元素对象或属性、文本节点文本 - """ - if self._mode == 's': - r = super()._ele(loc_or_ele, single=single) - return None if isinstance(r, NoneElement) else r - elif self._mode == 'd': - return super(SessionPage, self)._ele(loc_or_ele, timeout=timeout, single=single) - - def get_cookies(self, as_dict=False, all_domains=False): - """返回cookies - :param as_dict: 是否以字典方式返回 - :param all_domains: 是否返回所有域的cookies - :return: cookies信息 - """ - if self._mode == 's': - return super().get_cookies(as_dict, all_domains) - elif self._mode == 'd': - return super(SessionPage, self).get_cookies(as_dict) - - # ----------------MixPage独有属性和方法----------------------- - @property - def drission(self): - """返回当前使用的 Dirssion 对象""" - return self._drission - - @property - def driver(self): - """返回 driver 对象,如没有则创建 - 每次访问时切换到 d 模式,用于独有函数及外部调用 - :return: WebDriver对象 - """ - self.change_mode('d') - return self._drission.driver - - @property - def session(self): - """返回 Session 对象,如没有则创建""" - return self._drission.session - - @property - def response(self): - """返回 s 模式获取到的 Response 对象,切换到 s 模式""" - self.change_mode('s') - return self._response - - @property - def mode(self): - """返回当前模式,'s'或'd' """ - return self._mode - - @property - def _session_url(self): - """返回 session 保存的url""" - return self._response.url if self._response else None - - def change_mode(self, mode=None, go=True, copy_cookies=True): - """切换模式,接收's'或'd',除此以外的字符串会切换为 d 模式 - 切换时会把当前模式的cookies复制到目标模式 - 切换后,如果go是True,调用相应的get函数使访问的页面同步 - 注意:s转d时,若浏览器当前网址域名和s模式不一样,必须会跳转 - :param mode: 模式字符串 - :param go: 是否跳转到原模式的url - :param copy_cookies: 是否复制cookies到目标模式 - """ - if mode is not None and mode.lower() == self._mode: - return - - self._mode = 's' if self._mode == 'd' else 'd' - - # s模式转d模式 - if self._mode == 'd': - self._driver = True - self._url = None if not self._driver else self._drission.driver.current_url - - if self._session_url: - if copy_cookies: - self.cookies_to_driver(self._session_url) - - if go: - self.get(self._session_url) - - # d模式转s模式 - elif self._mode == 's': - self._session = True - self._url = self._session_url - - if self._driver: - if copy_cookies: - self.cookies_to_session() - - if go and self._drission.driver.current_url.startswith('http'): - self.get(self._drission.driver.current_url) - - def set_cookies(self, cookies, refresh=True): - """设置cookies - :param cookies: cookies信息,可为CookieJar, list, tuple, str, dict - :param refresh: 设置cookies后是否刷新页面 - :return: None - """ - if self._mode == 's': - self.drission.set_cookies(cookies, set_session=True) - elif self._mode == 'd': - self.drission.set_cookies(cookies, set_driver=True) - if refresh: - self.refresh() - - def cookies_to_session(self, copy_user_agent=False): - """从driver复制cookies到session - :param copy_user_agent : 是否复制user agent信息 - """ - self._drission.cookies_to_session(copy_user_agent) - - def cookies_to_driver(self, url=None): - """从session复制cookies到driver - chrome需要指定域才能接收cookies - :param url: 目标域 - :return: None - """ - url = url or self._session_url - self._drission.cookies_to_driver(url) - - def check_page(self, by_requests=False): - """d模式时检查网页是否符合预期 - 默认由response状态检查,可重载实现针对性检查 - :param by_requests: 是否用内置response检查 - :return: bool或None,None代表不知道结果 - """ - if self._session_url and self._session_url == self.url: - return self._response.ok - - # 使用requests访问url并判断可用性 - if by_requests: - self.cookies_to_session() - r = self._make_response(self.url, retry=0)[0] - return r.ok if r else False - - def close_driver(self): - """关闭driver及浏览器""" - self._driver = None - self.drission.close_driver(True) - - def close_session(self): - """关闭session""" - self._session = None - self._response = None - self.drission.close_session() - - # ----------------重写SessionPage的函数----------------------- - def post(self, url, data=None, show_errmsg=False, retry=None, interval=None, **kwargs): - """用post方式跳转到url,会切换到s模式 - :param url: 目标url - :param data: post方式时提交的数据 - :param show_errmsg: 是否显示和抛出异常 - :param retry: 重试次数 - :param interval: 重试间隔(秒) - :param kwargs: 连接参数 - :return: url是否可用 - """ - self.change_mode('s', go=False) - return super().post(url, data, show_errmsg, retry, interval, **kwargs) - - @property - def download(self): - """返回下载器对象""" - if self.mode == 'd': - self.cookies_to_session() - return super().download - - def chrome_downloading(self, path=None): - """返回浏览器下载中的文件列表 - :param path: 下载文件夹路径,默认读取配置信息 - :return: 正在下载的文件列表 - """ - try: - path = path or self._drission.driver_options.experimental_options['prefs']['download.default_directory'] - if not path: - raise ValueError('未指定下载路径。') - except Exception: - raise IOError('无法找到下载路径。') - - return super().chrome_downloading(path) - - # ----------------MixPage独有函数----------------------- - def hide_browser(self): - """隐藏浏览器窗口""" - self.drission.hide_browser() - - def show_browser(self): - """显示浏览器窗口""" - self.drission.show_browser() +# -*- coding:utf-8 -*- +""" +@Author : g1879 +@Contact : g1879@qq.com +""" +from .base import BasePage +from .drission import Drission +from .driver_page import DriverPage +from .session_page import SessionPage + + +class MixPage(SessionPage, DriverPage, BasePage): + """MixPage整合了DriverPage和SessionPage,封装了对页面的操作, + 可在selenium(d模式)和requests(s模式)间无缝切换。 + 切换的时候会自动同步cookies。 + 获取信息功能为两种模式共有,操作页面元素功能只有d模式有。 + 调用某种模式独有的功能,会自动切换到该模式。 + """ + + def __init__(self, mode='d', drission=None, timeout=None, driver_options=None, session_options=None): + """初始化函数 + :param mode: 'd' 或 's',即driver模式和session模式 + :param drission: Drission对象,不传入时会自动创建,有传入时driver_options和session_options参数无效 + :param timeout: 超时时间,d模式时为寻找元素时间,s模式时为连接时间,默认10秒 + :param driver_options: 浏览器设置,没传入drission参数时会用这个设置新建Drission对象中的WebDriver对象,传入False则不创建 + :param session_options: requests设置,没传入drission参数时会用这个设置新建Drission对象中的Session对象,传入False则不创建 + """ + self._mode = mode.lower() + if self._mode not in ('s', 'd'): + raise ValueError('mode参数只能是s或d。') + + super(DriverPage, self).__init__(timeout) + self._driver, self._session = (None, True) if self._mode == 's' else (True, None) + self._drission = drission or Drission(driver_options, session_options) + self._wait_object = None + self._response = None + self._scroll = None + self._download_set = None + self._download_path = None + + if self._mode == 'd': + try: + timeouts = self.drission.driver_options.timeouts + t = timeout if isinstance(timeout, (int, float)) else timeouts['implicit'] + self.set_timeouts(t, timeouts['pageLoad'], timeouts['script']) + + except Exception: + self.timeout = timeout if timeout is not None else 10 + + def __call__(self, loc_or_str, timeout=None): + """在内部查找元素 + 例:ele = page('@id=ele_id') + :param loc_or_str: 元素的定位信息,可以是loc元组,或查询字符串 + :param timeout: 超时时间 + :return: 子元素对象或属性文本 + """ + if self._mode == 's': + return super().__call__(loc_or_str) + elif self._mode == 'd': + return super(SessionPage, self).__call__(loc_or_str, timeout) + + # -----------------共有属性和方法------------------- + @property + def url(self): + """返回当前url""" + if self._mode == 'd': + return self._drission.driver.current_url if self._driver else None + elif self._mode == 's': + return self._session_url + + @property + def title(self): + """返回网页title""" + if self._mode == 's': + return super().title + elif self._mode == 'd': + return super(SessionPage, self).title + + @property + def html(self): + """返回页面html文本""" + if self._mode == 's': + return super().html + elif self._mode == 'd': + return super(SessionPage, self).html + + @property + def json(self): + """当返回内容是json格式时,返回对应的字典""" + if self._mode == 's': + return super().json + elif self._mode == 'd': + return super(SessionPage, self).json + + def get(self, url, show_errmsg=False, retry=None, interval=None, **kwargs): + """跳转到一个url + :param url: 目标url + :param show_errmsg: 是否显示和抛出异常 + :param retry: 重试次数 + :param interval: 重试间隔(秒) + :param kwargs: 连接参数,s模式专用 + :return: url是否可用,d模式返回None时表示不确定 + """ + if self._mode == 'd': + return super(SessionPage, self).get(url, show_errmsg, retry, interval) + elif self._mode == 's': + return super().get(url, show_errmsg, retry, interval, **kwargs) + + def ele(self, loc_or_ele, timeout=None): + """返回第一个符合条件的元素、属性或节点文本 + :param loc_or_ele: 元素的定位信息,可以是元素对象,loc元组,或查询字符串 + :param timeout: 查找元素超时时间,默认与页面等待时间一致 + :return: 元素对象或属性、文本节点文本 + """ + if self._mode == 's': + return super().ele(loc_or_ele) + elif self._mode == 'd': + return super(SessionPage, self).ele(loc_or_ele, timeout=timeout) + + def eles(self, loc_or_str, timeout=None): + """返回页面中所有符合条件的元素、属性或节点文本 + :param loc_or_str: 元素的定位信息,可以是loc元组,或查询字符串 + :param timeout: 查找元素超时时间,默认与页面等待时间一致 + :return: 元素对象或属性、文本组成的列表 + """ + if self._mode == 's': + return super().eles(loc_or_str) + elif self._mode == 'd': + return super(SessionPage, self).eles(loc_or_str, timeout=timeout) + + def s_ele(self, loc_or_ele=None): + """查找第一个符合条件的元素以SessionElement形式返回,d模式处理复杂页面时效率很高 + :param loc_or_ele: 元素的定位信息,可以是loc元组,或查询字符串 + :return: SessionElement对象或属性、文本 + """ + if self._mode == 's': + return super().s_ele(loc_or_ele) + elif self._mode == 'd': + return super(SessionPage, self).s_ele(loc_or_ele) + + def s_eles(self, loc_or_str): + """查找所有符合条件的元素以SessionElement形式返回,d模式处理复杂页面时效率很高 + :param loc_or_str: 元素的定位信息,可以是loc元组,或查询字符串 + :return: SessionElement对象或属性、文本组成的列表 + """ + if self._mode == 's': + return super().s_eles(loc_or_str) + elif self._mode == 'd': + return super(SessionPage, self).s_eles(loc_or_str) + + def _ele(self, loc_or_ele, timeout=None, single=True): + """返回页面中符合条件的元素、属性或节点文本,默认返回第一个 + :param loc_or_ele: 元素的定位信息,可以是元素对象,loc元组,或查询字符串 + :param timeout: 查找元素超时时间,d模式专用 + :param single: True则返回第一个,False则返回全部 + :return: 元素对象或属性、文本节点文本 + """ + if self._mode == 's': + return super()._ele(loc_or_ele, single=single) + elif self._mode == 'd': + return super(SessionPage, self)._ele(loc_or_ele, timeout=timeout, single=single) + + def get_cookies(self, as_dict=False, all_domains=False): + """返回cookies + :param as_dict: 是否以字典方式返回 + :param all_domains: 是否返回所有域的cookies + :return: cookies信息 + """ + if self._mode == 's': + return super().get_cookies(as_dict, all_domains) + elif self._mode == 'd': + return super(SessionPage, self).get_cookies(as_dict) + + # ----------------MixPage独有属性和方法----------------------- + @property + def drission(self): + """返回当前使用的 Dirssion 对象""" + return self._drission + + @property + def driver(self): + """返回 driver 对象,如没有则创建 + 每次访问时切换到 d 模式,用于独有函数及外部调用 + :return: WebDriver对象 + """ + self.change_mode('d') + return self._drission.driver + + @property + def session(self): + """返回 Session 对象,如没有则创建""" + return self._drission.session + + @property + def response(self): + """返回 s 模式获取到的 Response 对象,切换到 s 模式""" + self.change_mode('s') + return self._response + + @property + def mode(self): + """返回当前模式,'s'或'd' """ + return self._mode + + @property + def _session_url(self): + """返回 session 保存的url""" + return self._response.url if self._response else None + + def change_mode(self, mode=None, go=True, copy_cookies=True): + """切换模式,接收's'或'd',除此以外的字符串会切换为 d 模式 + 切换时会把当前模式的cookies复制到目标模式 + 切换后,如果go是True,调用相应的get函数使访问的页面同步 + 注意:s转d时,若浏览器当前网址域名和s模式不一样,必须会跳转 + :param mode: 模式字符串 + :param go: 是否跳转到原模式的url + :param copy_cookies: 是否复制cookies到目标模式 + """ + if mode is not None and mode.lower() == self._mode: + return + + self._mode = 's' if self._mode == 'd' else 'd' + + # s模式转d模式 + if self._mode == 'd': + self._driver = True + self._url = None if not self._driver else self._drission.driver.current_url + + if self._session_url: + if copy_cookies: + self.cookies_to_driver(self._session_url) + + if go: + self.get(self._session_url) + + # d模式转s模式 + elif self._mode == 's': + self._session = True + self._url = self._session_url + + if self._driver: + if copy_cookies: + self.cookies_to_session() + + if go and self._drission.driver.current_url.startswith('http'): + self.get(self._drission.driver.current_url) + + def set_cookies(self, cookies, refresh=True): + """设置cookies + :param cookies: cookies信息,可为CookieJar, list, tuple, str, dict + :param refresh: 设置cookies后是否刷新页面 + :return: None + """ + if self._mode == 's': + self.drission.set_cookies(cookies, set_session=True) + elif self._mode == 'd': + self.drission.set_cookies(cookies, set_driver=True) + if refresh: + self.refresh() + + def cookies_to_session(self, copy_user_agent=False): + """从driver复制cookies到session + :param copy_user_agent : 是否复制user agent信息 + """ + self._drission.cookies_to_session(copy_user_agent) + + def cookies_to_driver(self, url=None): + """从session复制cookies到driver + chrome需要指定域才能接收cookies + :param url: 目标域 + :return: None + """ + url = url or self._session_url + self._drission.cookies_to_driver(url) + + def check_page(self, by_requests=False): + """d模式时检查网页是否符合预期 + 默认由response状态检查,可重载实现针对性检查 + :param by_requests: 是否用内置response检查 + :return: bool或None,None代表不知道结果 + """ + if self._session_url and self._session_url == self.url: + return self._response.ok + + # 使用requests访问url并判断可用性 + if by_requests: + self.cookies_to_session() + r = self._make_response(self.url, retry=0)[0] + return r.ok if r else False + + def close_driver(self): + """关闭driver及浏览器""" + self._driver = None + self.drission.close_driver(True) + + def close_session(self): + """关闭session""" + self._session = None + self._response = None + self.drission.close_session() + + # ----------------重写SessionPage的函数----------------------- + def post(self, url, data=None, show_errmsg=False, retry=None, interval=None, **kwargs): + """用post方式跳转到url,会切换到s模式 + :param url: 目标url + :param data: post方式时提交的数据 + :param show_errmsg: 是否显示和抛出异常 + :param retry: 重试次数 + :param interval: 重试间隔(秒) + :param kwargs: 连接参数 + :return: url是否可用 + """ + self.change_mode('s', go=False) + return super().post(url, data, show_errmsg, retry, interval, **kwargs) + + @property + def download(self): + """返回下载器对象""" + if self.mode == 'd': + self.cookies_to_session() + return super().download + + def chrome_downloading(self, path=None): + """返回浏览器下载中的文件列表 + :param path: 下载文件夹路径,默认读取配置信息 + :return: 正在下载的文件列表 + """ + try: + path = path or self._drission.driver_options.experimental_options['prefs']['download.default_directory'] + if not path: + raise ValueError('未指定下载路径。') + except Exception: + raise IOError('无法找到下载路径。') + + return super().chrome_downloading(path) + + # ----------------MixPage独有函数----------------------- + def hide_browser(self): + """隐藏浏览器窗口""" + self.drission.hide_browser() + + def show_browser(self): + """显示浏览器窗口""" + self.drission.show_browser() diff --git a/DrissionPage/mix_page.pyi b/DrissionPage/mixpage/mix_page.pyi similarity index 97% rename from DrissionPage/mix_page.pyi rename to DrissionPage/mixpage/mix_page.pyi index a6b251f..beea82e 100644 --- a/DrissionPage/mix_page.pyi +++ b/DrissionPage/mixpage/mix_page.pyi @@ -13,8 +13,8 @@ from selenium.webdriver.chrome.webdriver import WebDriver from selenium.webdriver.remote.webelement import WebElement from .base import BasePage -from .configs.session_options import SessionOptions -from .configs.driver_options import DriverOptions +from DrissionPage.configs.session_options import SessionOptions +from DrissionPage.configs.driver_options import DriverOptions from .drission import Drission from .driver_element import DriverElement from .driver_page import DriverPage diff --git a/DrissionPage/mixpage/session_element.py b/DrissionPage/mixpage/session_element.py new file mode 100644 index 0000000..fda9392 --- /dev/null +++ b/DrissionPage/mixpage/session_element.py @@ -0,0 +1,357 @@ +# -*- coding:utf-8 -*- +""" +@Author : g1879 +@Contact : g1879@qq.com +""" +from html import unescape +from re import match, DOTALL + +from lxml.etree import tostring +from lxml.html import HtmlElement, fromstring + +from .base import DrissionElement, BasePage, BaseElement +from DrissionPage.common.locator import get_loc +from DrissionPage.common.web import get_ele_txt, make_absolute_link + + +class SessionElement(DrissionElement): + """session模式的元素对象,包装了一个lxml的Element对象,并封装了常用功能""" + + def __init__(self, ele, page=None): + """初始化对象 + :param ele: 被包装的HtmlElement元素 + :param page: 元素所在页面对象,如果是从 html 文本生成的元素,则为 None + """ + super().__init__(page) + self._inner_ele = ele + + @property + def inner_ele(self): + return self._inner_ele + + def __repr__(self): + attrs = [f"{attr}='{self.attrs[attr]}'" for attr in self.attrs] + return f'' + + def __call__(self, loc_or_str, timeout=None): + """在内部查找元素 + 例:ele2 = ele1('@id=ele_id') + :param loc_or_str: 元素的定位信息,可以是loc元组,或查询字符串 + :param timeout: 不起实际作用,用于和DriverElement对应,便于无差别调用 + :return: SessionElement对象或属性、文本 + """ + return self.ele(loc_or_str) + + @property + def tag(self): + """返回元素类型""" + return self._inner_ele.tag + + @property + def html(self): + """返回outerHTML文本""" + html = tostring(self._inner_ele, method="html").decode() + return unescape(html[:html.rfind('>') + 1]) # tostring()会把跟紧元素的文本节点也带上,因此要去掉 + + @property + def inner_html(self): + """返回元素innerHTML文本""" + r = match(r'<.*?>(.*)', self.html, flags=DOTALL) + return '' if not r else r.group(1) + + @property + def attrs(self): + """返回元素所有属性及值""" + return {attr: self.attr(attr) for attr, val in self.inner_ele.items()} + + @property + def text(self): + """返回元素内所有文本""" + return get_ele_txt(self) + + @property + def raw_text(self): + """返回未格式化处理的元素内文本""" + return str(self._inner_ele.text_content()) + + def parent(self, level_or_loc=1): + """返回上面某一级父元素,可指定层数或用查询语法定位 + :param level_or_loc: 第几级父元素,或定位符 + :return: 上级元素对象 + """ + return super().parent(level_or_loc) + + def prev(self, filter_loc='', index=1, timeout=None): + """返回前面的一个兄弟元素,可用查询语法筛选,可指定返回筛选结果的第几个 + :param filter_loc: 用于筛选元素的查询语法 + :param index: 前面第几个查询结果元素 + :param timeout: 查找元素的超时时间 + :return: 兄弟元素 + """ + return super().prev(index, filter_loc, timeout) + + def next(self, filter_loc='', index=1, timeout=None): + """返回后面的一个兄弟元素,可用查询语法筛选,可指定返回筛选结果的第几个 + :param filter_loc: 用于筛选元素的查询语法 + :param index: 后面第几个查询结果元素 + :param timeout: 查找元素的超时时间 + :return: 兄弟元素 + """ + return super().next(index, filter_loc, timeout) + + def before(self, filter_loc='', index=1, timeout=None): + """返回前面的一个兄弟元素,可用查询语法筛选,可指定返回筛选结果的第几个 + :param filter_loc: 用于筛选元素的查询语法 + :param index: 前面第几个查询结果元素 + :param timeout: 查找元素的超时时间 + :return: 本元素前面的某个元素或节点 + """ + return super().before(index, filter_loc, timeout) + + def after(self, filter_loc='', index=1, timeout=None): + """返回后面的一个兄弟元素,可用查询语法筛选,可指定返回筛选结果的第几个 + :param filter_loc: 用于筛选元素的查询语法 + :param index: 后面第几个查询结果元素 + :param timeout: 查找元素的超时时间 + :return: 本元素后面的某个元素或节点 + """ + return super().after(index, filter_loc, timeout) + + def prevs(self, filter_loc='', timeout=None): + """返回前面全部兄弟元素或节点组成的列表,可用查询语法筛选 + :param filter_loc: 用于筛选元素的查询语法 + :param timeout: 查找元素的超时时间 + :return: 兄弟元素或节点文本组成的列表 + """ + return super().prevs(filter_loc, timeout) + + def nexts(self, filter_loc='', timeout=None): + """返回后面全部兄弟元素或节点组成的列表,可用查询语法筛选 + :param filter_loc: 用于筛选元素的查询语法 + :param timeout: 查找元素的超时时间 + :return: 兄弟元素或节点文本组成的列表 + """ + return super().nexts(filter_loc, timeout) + + def befores(self, filter_loc='', timeout=None): + """返回后面全部兄弟元素或节点组成的列表,可用查询语法筛选 + :param filter_loc: 用于筛选元素的查询语法 + :param timeout: 查找元素的超时时间 + :return: 本元素前面的元素或节点组成的列表 + """ + return super().befores(filter_loc, timeout) + + def afters(self, filter_loc='', timeout=None): + """返回前面全部兄弟元素或节点组成的列表,可用查询语法筛选 + :param filter_loc: 用于筛选元素的查询语法 + :param timeout: 查找元素的超时时间 + :return: 本元素后面的元素或节点组成的列表 + """ + return super().afters(filter_loc, timeout) + + def attr(self, attr): + """返回attribute属性值 + :param attr: 属性名 + :return: 属性值文本,没有该属性返回None + """ + # 获取href属性时返回绝对url + if attr == 'href': + link = self.inner_ele.get('href') + # 若为链接为None、js或邮件,直接返回 + if not link or link.lower().startswith(('javascript:', 'mailto:')): + return link + + else: # 其它情况直接返回绝对url + return make_absolute_link(link, self.page) + + elif attr == 'src': + return make_absolute_link(self.inner_ele.get('src'), self.page) + + elif attr == 'text': + return self.text + + elif attr == 'innerText': + return self.raw_text + + elif attr in ('html', 'outerHTML'): + return self.html + + elif attr == 'innerHTML': + return self.inner_html + + else: + return self.inner_ele.get(attr) + + def ele(self, loc_or_str, timeout=None): + """返回当前元素下级符合条件的第一个元素、属性或节点文本 + :param loc_or_str: 元素的定位信息,可以是loc元组,或查询字符串 + :param timeout: 不起实际作用,用于和DriverElement对应,便于无差别调用 + :return: SessionElement对象或属性、文本 + """ + return self._ele(loc_or_str) + + def eles(self, loc_or_str, timeout=None): + """返回当前元素下级所有符合条件的子元素、属性或节点文本 + :param loc_or_str: 元素的定位信息,可以是loc元组,或查询字符串 + :param timeout: 不起实际作用,用于和DriverElement对应,便于无差别调用 + :return: SessionElement对象或属性、文本组成的列表 + """ + return self._ele(loc_or_str, single=False) + + def s_ele(self, loc_or_str=None): + """返回当前元素下级符合条件的第一个元素、属性或节点文本 + :param loc_or_str: 元素的定位信息,可以是loc元组,或查询字符串 + :return: SessionElement对象或属性、文本 + """ + return self._ele(loc_or_str) + + def s_eles(self, loc_or_str): + """返回当前元素下级所有符合条件的子元素、属性或节点文本 + :param loc_or_str: 元素的定位信息,可以是loc元组,或查询字符串 + :return: SessionElement对象或属性、文本组成的列表 + """ + return self._ele(loc_or_str, single=False) + + def _ele(self, loc_or_str, timeout=None, single=True, relative=False): + """返回当前元素下级符合条件的子元素、属性或节点文本,默认返回第一个 + :param loc_or_str: 元素的定位信息,可以是loc元组,或查询字符串 + :param timeout: 不起实际作用,用于和父类对应 + :param single: True则返回第一个,False则返回全部 + :param relative: WebPage用的表示是否相对定位的参数 + :return: SessionElement对象 + """ + return make_session_ele(self, loc_or_str, single) + + def _get_ele_path(self, mode): + """获取css路径或xpath路径 + :param mode: 'css' 或 'xpath' + :return: css路径或xpath路径 + """ + path_str = '' + ele = self + + while ele: + if mode == 'css': + brothers = len(ele.eles(f'xpath:./preceding-sibling::*')) + path_str = f'>:nth-child({brothers + 1}){path_str}' + else: + brothers = len(ele.eles(f'xpath:./preceding-sibling::{ele.tag}')) + path_str = f'/{ele.tag}[{brothers + 1}]{path_str}' if brothers > 0 else f'/{ele.tag}{path_str}' + + ele = ele.parent() + + return f':root{path_str[1:]}' if mode == 'css' else path_str + + +def make_session_ele(html_or_ele, loc=None, single=True): + """从接收到的对象或html文本中查找元素,返回SessionElement对象 + 如要直接从html生成SessionElement而不在下级查找,loc输入None即可 + :param html_or_ele: html文本、BaseParser对象 + :param loc: 定位元组或字符串,为None时不在下级查找,返回根元素 + :param single: True则返回第一个,False则返回全部 + :return: 返回SessionElement元素或列表,或属性文本 + """ + # ---------------处理定位符--------------- + if not loc: + if isinstance(html_or_ele, SessionElement): + return html_or_ele if single else [html_or_ele] + + loc = ('xpath', '.') + + elif isinstance(loc, (str, tuple)): + loc = get_loc(loc) + + else: + raise ValueError("定位符必须为str或长度为2的tuple。") + + # ---------------根据传入对象类型获取页面对象和lxml元素对象--------------- + the_type = str(type(html_or_ele)) + # SessionElement + if the_type.endswith(".SessionElement'>"): + page = html_or_ele.page + + loc_str = loc[1] + if loc[0] == 'xpath' and loc[1].lstrip().startswith('/'): + loc_str = f'.{loc[1]}' + html_or_ele = html_or_ele.inner_ele + + # 若css以>开头,表示找元素的直接子元素,要用page以绝对路径才能找到 + elif loc[0] == 'css selector' and loc[1].lstrip().startswith('>'): + loc_str = f'{html_or_ele.css_path}{loc[1]}' + if html_or_ele.page: + html_or_ele = fromstring(html_or_ele.page.html) + else: # 接收html文本,无page的情况 + html_or_ele = fromstring(html_or_ele('xpath:/ancestor::*').html) + + else: + html_or_ele = html_or_ele.inner_ele + + loc = loc[0], loc_str + + # ChromiumElement, DriverElement + elif the_type.endswith((".ChromiumElement'>", ".DriverElement'>")): + loc_str = loc[1] + if loc[0] == 'xpath' and loc[1].lstrip().startswith('/'): + loc_str = f'.{loc[1]}' + elif loc[0] == 'css selector' and loc[1].lstrip().startswith('>'): + loc_str = f'{html_or_ele.css_path}{loc[1]}' + loc = loc[0], loc_str + + # 获取整个页面html再定位到当前元素,以实现查找上级元素 + page = html_or_ele.page + xpath = html_or_ele.xpath + if hasattr(html_or_ele, 'doc_id'): # ChromiumElement,兼容传入的元素在iframe内的情况 + html = html_or_ele.page.run_cdp('DOM.getOuterHTML', objectId=html_or_ele.doc_id)['outerHTML'] + else: + html = html_or_ele.page.html + html_or_ele = fromstring(html) + html_or_ele = html_or_ele.xpath(xpath)[0] + + # 各种页面对象 + elif isinstance(html_or_ele, BasePage): + page = html_or_ele + html_or_ele = fromstring(html_or_ele.html) + + # 直接传入html文本 + elif isinstance(html_or_ele, str): + page = None + html_or_ele = fromstring(html_or_ele) + + # ShadowRootElement, ChromiumShadowRootElement, ChromiumFrame + elif isinstance(html_or_ele, BaseElement) or the_type.endswith(".ChromiumFrame'>"): + page = html_or_ele.page + html_or_ele = fromstring(html_or_ele.html) + + else: + raise TypeError('html_or_ele参数只能是元素、页面对象或html文本。') + + # ---------------执行查找----------------- + try: + if loc[0] == 'xpath': # 用lxml内置方法获取lxml的元素对象列表 + ele = html_or_ele.xpath(loc[1]) + else: # 用css selector获取元素对象列表 + ele = html_or_ele.cssselect(loc[1]) + + if not isinstance(ele, list): # 结果不是列表,如数字 + return ele + + # 把lxml元素对象包装成SessionElement对象并按需要返回第一个或全部 + if single: + ele = ele[0] if ele else None + if isinstance(ele, HtmlElement): + return SessionElement(ele, page) + elif isinstance(ele, str): + return ele + else: + return None + + else: # 返回全部 + return [SessionElement(e, page) if isinstance(e, HtmlElement) else e for e in ele if e != '\n'] + + except Exception as e: + if 'Invalid expression' in str(e): + raise SyntaxError(f'无效的xpath语句:{loc}') + elif 'Expected selector' in str(e): + raise SyntaxError(f'无效的css select语句:{loc}') + + raise e diff --git a/DrissionPage/mixpage/session_element.pyi b/DrissionPage/mixpage/session_element.pyi new file mode 100644 index 0000000..69dcb35 --- /dev/null +++ b/DrissionPage/mixpage/session_element.pyi @@ -0,0 +1,114 @@ +# -*- coding:utf-8 -*- +""" +@Author : g1879 +@Contact : g1879@qq.com +""" +from typing import Union, List, Tuple + +from lxml.html import HtmlElement + +from .base import DrissionElement, BaseElement +from .driver_element import DriverElement +from .driver_page import DriverPage +from .session_page import SessionPage + + +class SessionElement(DrissionElement): + + def __init__(self, ele: HtmlElement, page: Union[SessionPage, None] = None): + self._inner_ele: HtmlElement = ... + self.page: SessionPage = ... + + @property + def inner_ele(self) -> HtmlElement: ... + + def __repr__(self) -> str: ... + + def __call__(self, + loc_or_str: Union[Tuple[str, str], str], + timeout: float = None) -> Union['SessionElement', str, None]: ... + + @property + def tag(self) -> str: ... + + @property + def html(self) -> str: ... + + @property + def inner_html(self) -> str: ... + + @property + def attrs(self) -> dict: ... + + @property + def text(self) -> str: ... + + @property + def raw_text(self) -> str: ... + + def parent(self, level_or_loc: Union[tuple, str, int] = 1) -> Union['SessionElement', None]: ... + + def prev(self, + filter_loc: Union[tuple, str] = '', + index: int = 1, + timeout: float = None) -> Union['SessionElement', str, None]: ... + + def next(self, + filter_loc: Union[tuple, str] = '', + index: int = 1, + timeout: float = None) -> Union['SessionElement', str, None]: ... + + def before(self, + filter_loc: Union[tuple, str] = '', + index: int = 1, + timeout: float = None) -> Union['SessionElement', str, None]: ... + + def after(self, + filter_loc: Union[tuple, str] = '', + index: int = 1, + timeout: float = None) -> Union['SessionElement', str, None]: ... + + def prevs(self, + filter_loc: Union[tuple, str] = '', + timeout: float = None) -> List[Union['SessionElement', str]]: ... + + def nexts(self, + filter_loc: Union[tuple, str] = '', + timeout: float = None) -> List[Union['SessionElement', str]]: ... + + def befores(self, + filter_loc: Union[tuple, str] = '', + timeout: float = None) -> List[Union['SessionElement', str]]: ... + + def afters(self, + filter_loc: Union[tuple, str] = '', + timeout: float = None) -> List[Union['SessionElement', str]]: ... + + def attr(self, attr: str) -> Union[str, None]: ... + + def ele(self, + loc_or_str: Union[Tuple[str, str], str], + timeout: float = None) -> Union['SessionElement', str, None]: ... + + def eles(self, + loc_or_str: Union[Tuple[str, str], str], + timeout: float = None) -> List[Union['SessionElement', str]]: ... + + def s_ele(self, + loc_or_str: Union[Tuple[str, str], str] = None) -> Union['SessionElement', str, None]: ... + + def s_eles(self, + loc_or_str: Union[Tuple[str, str], str]) -> List[Union['SessionElement', str]]: ... + + def _ele(self, + loc_or_str: Union[Tuple[str, str], str], + timeout: float = None, + single: bool = True, + relative: bool = False) -> Union['SessionElement', str, None, List[Union['SessionElement', str]]]: ... + + def _get_ele_path(self, mode: str) -> str: ... + + +def make_session_ele(html_or_ele: Union[str, SessionElement, SessionPage, DriverElement, BaseElement, DriverPage], + loc: Union[str, Tuple[str, str]] = None, + single: bool = True) -> Union[SessionElement, str, None, List[Union[SessionElement, str]]]: ... diff --git a/DrissionPage/mixpage/session_page.py b/DrissionPage/mixpage/session_page.py new file mode 100644 index 0000000..14cb169 --- /dev/null +++ b/DrissionPage/mixpage/session_page.py @@ -0,0 +1,555 @@ +# -*- coding:utf-8 -*- +""" +@Author : g1879 +@Contact : g1879@qq.com +""" +from re import search +from time import sleep +from urllib.parse import urlparse +from warnings import warn + +from DownloadKit import DownloadKit +from requests import Session, Response +from requests.structures import CaseInsensitiveDict +from tldextract import extract + +from .base import BasePage +from DrissionPage.configs.session_options import SessionOptions +from DrissionPage.common.web import cookie_to_dict, set_session_cookies +from .session_element import SessionElement, make_session_ele + + +class SessionPage(BasePage): + """SessionPage封装了页面操作的常用功能,使用requests来获取、解析网页""" + + def __init__(self, session_or_options=None, timeout=None): + """ + :param session_or_options: Session对象或SessionOptions对象 + :param timeout: 连接超时时间,为None时从ini文件读取 + """ + self._response = None + self._download_set = None + self._session = None + self._set = None + self._set_start_options(session_or_options, None) + self._set_runtime_settings() + self._create_session() + timeout = timeout if timeout is not None else self.timeout + super().__init__(timeout) + + def _set_start_options(self, session_or_options, none): + """启动配置 + :param session_or_options: Session、SessionOptions + :param none: 用于后代继承 + :return: None + """ + if not session_or_options or isinstance(session_or_options, SessionOptions): + self._session_options = session_or_options or SessionOptions(session_or_options) + + elif isinstance(session_or_options, Session): + self._session_options = SessionOptions() + self._session = session_or_options + + def _set_runtime_settings(self): + """设置运行时用到的属性""" + self._timeout = self._session_options.timeout + self._download_path = self._session_options.download_path + + def _create_session(self): + """创建内建Session对象""" + if not self._session: + self._set_session(self._session_options) + + def _set_session(self, opt): + """根据传入字典对session进行设置 + :param opt: session配置字典 + :return: None + """ + self._session = Session() + + if opt.headers: + self._session.headers = CaseInsensitiveDict(opt.headers) + if opt.cookies: + self.set.cookies(opt.cookies) + if opt.adapters: + for url, adapter in opt.adapters: + self._session.mount(url, adapter) + + attrs = ['auth', 'proxies', 'hooks', 'params', 'verify', + 'cert', 'stream', 'trust_env', 'max_redirects'] + for i in attrs: + attr = opt.__getattribute__(i) + if attr: + self._session.__setattr__(i, attr) + + def __call__(self, loc_or_str, timeout=None): + """在内部查找元素 + 例:ele2 = ele1('@id=ele_id') + :param loc_or_str: 元素的定位信息,可以是loc元组,或查询字符串 + :param timeout: 不起实际作用,用于和ChromiumElement对应,便于无差别调用 + :return: SessionElement对象或属性文本 + """ + return self.ele(loc_or_str) + + # -----------------共有属性和方法------------------- + @property + def title(self): + """返回网页title""" + ele = self.ele('xpath://title') + return ele.text if ele else None + + @property + def url(self): + """返回当前访问url""" + return self._url + + @property + def html(self): + """返回页面的html文本""" + return self.response.text if self.response else '' + + @property + def json(self): + """当返回内容是json格式时,返回对应的字典,非json格式时返回None""" + try: + return self.response.json() + except Exception: + return None + + @property + def download_path(self): + """返回下载路径""" + return self._download_path + + @property + def download_set(self): + """返回用于设置下载参数的对象""" + if self._download_set is None: + self._download_set = DownloadSetter(self) + return self._download_set + + @property + def download(self): + """返回下载器对象""" + return self.download_set.DownloadKit + + @property + def session(self): + """返回session对象""" + return self._session + + @property + def response(self): + """返回访问url得到的response对象""" + return self._response + + @property + def set(self): + """返回用于等待的对象""" + if self._set is None: + self._set = SessionPageSetter(self) + return self._set + + def get(self, url, show_errmsg=False, retry=None, interval=None, timeout=None, **kwargs): + """用get方式跳转到url + :param url: 目标url + :param show_errmsg: 是否显示和抛出异常 + :param retry: 重试次数 + :param interval: 重试间隔(秒) + :param timeout: 连接超时时间(秒) + :param kwargs: 连接参数 + :return: url是否可用 + """ + return self._s_connect(url, 'get', None, show_errmsg, retry, interval, **kwargs) + + def ele(self, loc_or_ele, timeout=None): + """返回页面中符合条件的第一个元素、属性或节点文本 + :param loc_or_ele: 元素的定位信息,可以是元素对象,loc元组,或查询字符串 + :param timeout: 不起实际作用,用于和ChromiumElement对应,便于无差别调用 + :return: SessionElement对象或属性、文本 + """ + return self._ele(loc_or_ele) + + def eles(self, loc_or_str, timeout=None): + """返回页面中所有符合条件的元素、属性或节点文本 + :param loc_or_str: 元素的定位信息,可以是loc元组,或查询字符串 + :param timeout: 不起实际作用,用于和ChromiumElement对应,便于无差别调用 + :return: SessionElement对象或属性、文本组成的列表 + """ + return self._ele(loc_or_str, single=False) + + def s_ele(self, loc_or_ele=None): + """返回页面中符合条件的第一个元素、属性或节点文本 + :param loc_or_ele: 元素的定位信息,可以是元素对象,loc元组,或查询字符串 + :return: SessionElement对象或属性、文本 + """ + return make_session_ele(self.html) if loc_or_ele is None else self._ele(loc_or_ele) + + def s_eles(self, loc_or_str): + """返回页面中符合条件的所有元素、属性或节点文本 + :param loc_or_str: 元素的定位信息,可以是元素对象,loc元组,或查询字符串 + :return: SessionElement对象或属性、文本 + """ + return self._ele(loc_or_str, single=False) + + def _ele(self, loc_or_ele, timeout=None, single=True): + """返回页面中符合条件的元素、属性或节点文本,默认返回第一个 + :param loc_or_ele: 元素的定位信息,可以是元素对象,loc元组,或查询字符串 + :param timeout: 不起实际作用,用于和父类对应 + :param single: True则返回第一个,False则返回全部 + :return: SessionElement对象 + """ + return loc_or_ele if isinstance(loc_or_ele, SessionElement) else make_session_ele(self, loc_or_ele, single) + + def get_cookies(self, as_dict=False, all_domains=False): + """返回cookies + :param as_dict: 是否以字典方式返回 + :param all_domains: 是否返回所有域的cookies + :return: cookies信息 + """ + if all_domains: + cookies = self.session.cookies + else: + if self.url: + url = extract(self.url) + domain = f'{url.domain}.{url.suffix}' + cookies = tuple(x for x in self.session.cookies if domain in x.domain or x.domain == '') + else: + cookies = tuple(x for x in self.session.cookies) + + if as_dict: + return {x.name: x.value for x in cookies} + else: + return [cookie_to_dict(cookie) for cookie in cookies] + + def post(self, url, data=None, show_errmsg=False, retry=None, interval=None, **kwargs): + """用post方式跳转到url + :param url: 目标url + :param data: 提交的数据 + :param show_errmsg: 是否显示和抛出异常 + :param retry: 重试次数 + :param interval: 重试间隔(秒) + :param kwargs: 连接参数 + :return: url是否可用 + """ + return self._s_connect(url, 'post', data, show_errmsg, retry, interval, **kwargs) + + def _s_connect(self, url, mode, data=None, show_errmsg=False, retry=None, interval=None, **kwargs): + """执行get或post连接 + :param url: 目标url + :param mode: 'get' 或 'post' + :param data: 提交的数据 + :param show_errmsg: 是否显示和抛出异常 + :param retry: 重试次数 + :param interval: 重试间隔(秒) + :param kwargs: 连接参数 + :return: url是否可用 + """ + retry, interval = self._before_connect(url, retry, interval) + self._response, info = self._make_response(self._url, mode, data, retry, interval, show_errmsg, **kwargs) + + if self._response is None: + self._url_available = False + + else: + if self._response.ok: + self._url_available = True + + else: + if show_errmsg: + raise ConnectionError(f'状态码:{self._response.status_code}.') + self._url_available = False + + return self._url_available + + def _make_response(self, url, mode='get', data=None, retry=None, interval=None, show_errmsg=False, **kwargs): + """生成Response对象 + :param url: 目标url + :param mode: 'get' 或 'post' + :param data: post方式要提交的数据 + :param show_errmsg: 是否显示和抛出异常 + :param kwargs: 其它参数 + :return: tuple,第一位为Response或None,第二位为出错信息或'Success' + """ + kwargs = CaseInsensitiveDict(kwargs) + if 'headers' not in kwargs: + kwargs['headers'] = {} + else: + kwargs['headers'] = CaseInsensitiveDict(kwargs['headers']) + + # 设置referer和host值 + parsed_url = urlparse(url) + hostname = parsed_url.hostname + scheme = parsed_url.scheme + if not check_headers(kwargs, self.session.headers, 'Referer'): + kwargs['headers']['Referer'] = self.url if self.url else f'{scheme}://{hostname}' + if 'Host' not in kwargs['headers']: + kwargs['headers']['Host'] = hostname + + if not check_headers(kwargs, self.session.headers, 'timeout'): + kwargs['timeout'] = self.timeout + + if 'allow_redirects' not in kwargs: + kwargs['allow_redirects'] = False + + r = err = None + retry = retry if retry is not None else self.retry_times + interval = interval if interval is not None else self.retry_interval + for i in range(retry + 1): + try: + if mode == 'get': + r = self.session.get(url, **kwargs) + elif mode == 'post': + r = self.session.post(url, data=data, **kwargs) + + if r: + return set_charset(r), 'Success' + + except Exception as e: + err = e + + # if r and r.status_code in (403, 404): + # break + + if i < retry: + sleep(interval) + if show_errmsg: + print(f'重试 {url}') + + if r is None: + if show_errmsg: + if err: + raise err + else: + raise ConnectionError('连接失败') + return None, '连接失败' if err is None else err + + if not r.ok: + if show_errmsg: + raise ConnectionError(f'状态码:{r.status_code}') + return r, f'状态码:{r.status_code}' + + def set_cookies(self, cookies): + """为Session对象设置cookies + :param cookies: cookies信息 + :return: None + """ + warn("set_cookies()方法即将弃用,请用set.cookies()方法代替。", DeprecationWarning) + self.set.cookies(cookies) + + def set_headers(self, headers): + """设置通用的headers,设置的headers值回逐个覆盖原有的,不会清理原来的 + :param headers: dict形式的headers + :return: None + """ + warn("set_headers()方法即将弃用,请用set.headers()方法代替。", DeprecationWarning) + self.set.headers(headers) + + def set_user_agent(self, ua): + """设置user agent""" + warn("set_user_agent()方法即将弃用,请用set.user_agent()方法代替。", DeprecationWarning) + self.set.user_agent(ua) + + +class SessionPageSetter(object): + def __init__(self, page): + self._page = page + + def timeout(self, second): + """设置连接超时时间 + :param second: 秒数 + :return: None + """ + self._page.timeout = second + + def cookies(self, cookies): + """为Session对象设置cookies + :param cookies: cookies信息 + :return: None + """ + set_session_cookies(self._page.session, cookies) + + def headers(self, headers): + """设置通用的headers + :param headers: dict形式的headers + :return: None + """ + self._page.session.headers = CaseInsensitiveDict(headers) + + def header(self, attr, value): + """设置headers中一个项 + :param attr: 设置名称 + :param value: 设置值 + :return: None + """ + self._page.session.headers[attr.lower()] = value + + def user_agent(self, ua): + """设置user agent + :param ua: user agent + :return: None + """ + self._page.session.headers['user-agent'] = ua + + def proxies(self, http, https=None): + """设置proxies参数 + :param http: http代理地址 + :param https: https代理地址 + :return: None + """ + proxies = None if http == https is None else {'http': http, 'https': https or http} + self._page.session.proxies = proxies + + def auth(self, auth): + """设置认证元组或对象 + :param auth: 认证元组或对象 + :return: None + """ + self._page.session.auth = auth + + def hooks(self, hooks): + """设置回调方法 + :param hooks: 回调方法 + :return: None + """ + self._page.session.hooks = hooks + + def params(self, params): + """设置查询参数字典 + :param params: 查询参数字典 + :return: None + """ + self._page.session.params = params + + def verify(self, on_off): + """设置是否验证SSL证书 + :param on_off: 是否验证 SSL 证书 + :return: None + """ + self._page.session.verify = on_off + + def cert(self, cert): + """SSL客户端证书文件的路径(.pem格式),或(‘cert’, ‘key’)元组 + :param cert: 证书路径或元组 + :return: None + """ + self._page.session.cert = cert + + def stream(self, on_off): + """设置是否使用流式响应内容 + :param on_off: 是否使用流式响应内容 + :return: None + """ + self._page.session.stream = on_off + + def trust_env(self, on_off): + """设置是否信任环境 + :param on_off: 是否信任环境 + :return: None + """ + self._page.session.trust_env = on_off + + def max_redirects(self, times): + """设置最大重定向次数 + :param times: 最大重定向次数 + :return: None + """ + self._page.session.max_redirects = times + + def add_adapter(self, url, adapter): + """添加适配器 + :param url: 适配器对应url + :param adapter: 适配器对象 + :return: None + """ + self._page.session.mount(url, adapter) + + +class DownloadSetter(object): + """用于设置下载参数的类""" + + def __init__(self, page): + self._page = page + self._DownloadKit = None + + @property + def DownloadKit(self): + if self._DownloadKit is None: + self._DownloadKit = DownloadKit(session=self._page.session, goal_path=self._page.download_path) + return self._DownloadKit + + @property + def if_file_exists(self): + """返回用于设置存在同名文件时处理方法的对象""" + return FileExists(self) + + def split(self, on_off): + """设置是否允许拆分大文件用多线程下载 + :param on_off: 是否启用多线程下载大文件 + :return: None + """ + self.DownloadKit.split = on_off + + def save_path(self, path): + """设置下载保存路径 + :param path: 下载保存路径 + :return: None + """ + path = path if path is None else str(path) + self._page._download_path = path + self.DownloadKit.goal_path = path + + +class FileExists(object): + """用于设置存在同名文件时处理方法""" + + def __init__(self, setter): + """ + :param setter: DownloadSetter对象 + """ + self._setter = setter + + def __call__(self, mode): + if mode not in ('skip', 'rename', 'overwrite'): + raise ValueError("mode参数只能是'skip', 'rename', 'overwrite'") + self._setter.DownloadKit.file_exists = mode + + def skip(self): + """设为跳过""" + self._setter.DownloadKit.file_exists = 'skip' + + def rename(self): + """设为重命名,文件名后加序号""" + self._setter.DownloadKit._file_exists = 'rename' + + def overwrite(self): + """设为覆盖""" + self._setter.DownloadKit._file_exists = 'overwrite' + + +def check_headers(kwargs, headers, arg) -> bool: + """检查kwargs或headers中是否有arg所示属性""" + return arg in kwargs['headers'] or arg in headers + + +def set_charset(response) -> Response: + """设置Response对象的编码""" + # 在headers中获取编码 + content_type = response.headers.get('content-type', '').lower() + charset = search(r'charset[=: ]*(.*)?;', content_type) + + if charset: + response.encoding = charset.group(1) + + # 在headers中获取不到编码,且如果是网页 + elif content_type.replace(' ', '').startswith('text/html'): + re_result = search(b']+).*?>', response.content) + + if re_result: + charset = re_result.group(1).decode() + else: + charset = response.apparent_encoding + + response.encoding = charset + + return response diff --git a/DrissionPage/mixpage/session_page.pyi b/DrissionPage/mixpage/session_page.pyi new file mode 100644 index 0000000..ab72a36 --- /dev/null +++ b/DrissionPage/mixpage/session_page.pyi @@ -0,0 +1,237 @@ +# -*- coding:utf-8 -*- +""" +@Author : g1879 +@Contact : g1879@qq.com +""" +from pathlib import Path +from typing import Any, Union, Tuple, List + +from DownloadKit import DownloadKit +from requests import Session, Response +from requests.adapters import HTTPAdapter +from requests.auth import HTTPBasicAuth +from requests.cookies import RequestsCookieJar +from requests.structures import CaseInsensitiveDict + +from .base import BasePage +from DrissionPage.configs.session_options import SessionOptions +from .session_element import SessionElement + + +class SessionPage(BasePage): + def __init__(self, + session_or_options: Union[Session, SessionOptions] = None, + timeout: float = None): + self._session: Session = ... + self._session_options: SessionOptions = ... + self._url: str = ... + self._response: Response = ... + self._download_path: str = ... + self._download_set: DownloadSetter = ... + self._url_available: bool = ... + self.timeout: float = ... + self.retry_times: int = ... + self.retry_interval: float = ... + self._set: SessionPageSetter = ... + + def _set_start_options(self, session_or_options, none) -> None: ... + + def _create_session(self) -> None: ... + + def _set_session(self, opt: SessionOptions) -> None: ... + + def _set_runtime_settings(self) -> None: ... + + def set_cookies(self, cookies: Union[RequestsCookieJar, list, tuple, str, dict]) -> None: ... + + def set_headers(self, headers: dict) -> None: ... + + def set_user_agent(self, ua: str) -> None: ... + + def __call__(self, + loc_or_str: Union[Tuple[str, str], str, SessionElement], + timeout: float = None) -> Union[SessionElement, str, None]: ... + + # -----------------共有属性和方法------------------- + @property + def title(self) -> str: ... + + @property + def url(self) -> str: ... + + @property + def html(self) -> str: ... + + @property + def json(self) -> Union[dict, None]: ... + + @property + def download_path(self) -> str: ... + + @property + def download_set(self) -> DownloadSetter: ... + + def get(self, + url: str, + show_errmsg: bool | None = False, + retry: int | None = None, + interval: float | None = None, + timeout: float | None = None, + params: dict | None = ..., + data: Union[dict, str, None] = ..., + json: Union[dict, str, None] = ..., + headers: dict | None = ..., + cookies: Any | None = ..., + files: Any | None = ..., + auth: Any | None = ..., + allow_redirects: bool = ..., + proxies: dict | None = ..., + hooks: Any | None = ..., + stream: Any | None = ..., + verify: Any | None = ..., + cert: Any | None = ...) -> bool: ... + + def ele(self, + loc_or_ele: Union[Tuple[str, str], str, SessionElement], + timeout: float = None) -> Union[SessionElement, str, None]: ... + + def eles(self, + loc_or_str: Union[Tuple[str, str], str], + timeout: float = None) -> List[Union[SessionElement, str]]: ... + + def s_ele(self, + loc_or_ele: Union[Tuple[str, str], str, SessionElement] = None) \ + -> Union[SessionElement, str, None]: ... + + def s_eles(self, loc_or_str: Union[Tuple[str, str], str]) -> List[Union[SessionElement, str]]: ... + + def _ele(self, + loc_or_ele: Union[Tuple[str, str], str, SessionElement], + timeout: float = None, + single: bool = True) -> Union[SessionElement, str, None, List[Union[SessionElement, str]]]: ... + + def get_cookies(self, + as_dict: bool = False, + all_domains: bool = False) -> Union[dict, list]: ... + + # ----------------session独有属性和方法----------------------- + @property + def session(self) -> Session: ... + + @property + def response(self) -> Response: ... + + @property + def set(self) -> SessionPageSetter: ... + + @property + def download(self) -> DownloadKit: ... + + def post(self, + url: str, + data: Union[dict, str, None] = ..., + show_errmsg: bool = False, + retry: int | None = None, + interval: float | None = None, + timeout: float | None = ..., + params: dict | None = ..., + json: Union[dict, str, None] = ..., + headers: dict | None = ..., + cookies: Any | None = ..., + files: Any | None = ..., + auth: Any | None = ..., + allow_redirects: bool = ..., + proxies: dict | None = ..., + hooks: Any | None = ..., + stream: Any | None = ..., + verify: Any | None = ..., + cert: Any | None = ...) -> bool: ... + + def _s_connect(self, + url: str, + mode: str, + data: Union[dict, str, None] = None, + show_errmsg: bool = False, + retry: int = None, + interval: float = None, + **kwargs) -> bool: ... + + def _make_response(self, + url: str, + mode: str = 'get', + data: Union[dict, str] = None, + retry: int = None, + interval: float = None, + show_errmsg: bool = False, + **kwargs) -> tuple: ... + + +class SessionPageSetter(object): + def __init__(self, page: SessionPage): + self._page: SessionPage = ... + + def timeout(self, second: Union[int, float]) -> None: ... + + def cookies(self, cookies: Union[RequestsCookieJar, list, tuple, str, dict]) -> None: ... + + def headers(self, headers: dict) -> None: ... + + def header(self, attr: str, value: str) -> None: ... + + def user_agent(self, ua: str) -> None: ... + + def proxies(self, http, https=None) -> None: ... + + def auth(self, auth: Union[Tuple[str, str], HTTPBasicAuth, None]) -> None: ... + + def hooks(self, hooks: Union[dict, None]) -> None: ... + + def params(self, params: Union[dict, None]) -> None: ... + + def verify(self, on_off: Union[bool, None]) -> None: ... + + def cert(self, cert: Union[str, Tuple[str, str], None]) -> None: ... + + def stream(self, on_off: Union[bool, None]) -> None: ... + + def trust_env(self, on_off: Union[bool, None]) -> None: ... + + def max_redirects(self, times: Union[int, None]) -> None: ... + + def add_adapter(self, url: str, adapter: HTTPAdapter) -> None: ... + + +class DownloadSetter(object): + def __init__(self, page: SessionPage): + self._page: SessionPage = ... + self._DownloadKit: DownloadKit = ... + + @property + def DownloadKit(self) -> DownloadKit: ... + + @property + def if_file_exists(self) -> FileExists: ... + + def split(self, on_off: bool) -> None: ... + + def save_path(self, path: Union[str, Path]): ... + + +class FileExists(object): + def __init__(self, setter: DownloadSetter): + self._setter: DownloadSetter = ... + + def __call__(self, mode: str) -> None: ... + + def skip(self) -> None: ... + + def rename(self) -> None: ... + + def overwrite(self) -> None: ... + + +def check_headers(kwargs: Union[dict, CaseInsensitiveDict], headers: Union[dict, CaseInsensitiveDict], + arg: str) -> bool: ... + + +def set_charset(response: Response) -> Response: ... diff --git a/DrissionPage/shadow_root_element.py b/DrissionPage/mixpage/shadow_root_element.py similarity index 99% rename from DrissionPage/shadow_root_element.py rename to DrissionPage/mixpage/shadow_root_element.py index 122677c..dd4f9ab 100644 --- a/DrissionPage/shadow_root_element.py +++ b/DrissionPage/mixpage/shadow_root_element.py @@ -9,7 +9,7 @@ from typing import Union from selenium.webdriver.remote.webelement import WebElement from .base import BaseElement -from .common.locator import get_loc +from DrissionPage.common.locator import get_loc from .driver_element import make_driver_ele from .session_element import make_session_ele, SessionElement diff --git a/DrissionPage/shadow_root_element.pyi b/DrissionPage/mixpage/shadow_root_element.pyi similarity index 100% rename from DrissionPage/shadow_root_element.pyi rename to DrissionPage/mixpage/shadow_root_element.pyi