diff --git a/DrissionPage/action_chains.py b/DrissionPage/action_chains.py index 5ada2ac..f914b04 100644 --- a/DrissionPage/action_chains.py +++ b/DrissionPage/action_chains.py @@ -61,7 +61,7 @@ class ActionChains: def hold(self, on_ele=None) -> 'ActionChains': """点击并按住当前坐标或指定元素 \n - :param on_ele: ChromeElement对象 + :param on_ele: ChromiumElement对象 :return: self """ if on_ele: @@ -72,7 +72,7 @@ class ActionChains: def click(self, on_ele=None) -> 'ActionChains': """点击鼠标左键,可先移动到元素上 \n - :param on_ele: ChromeElement元素 + :param on_ele: ChromiumElement元素 :return: self """ if on_ele: @@ -85,7 +85,7 @@ class ActionChains: def r_click(self, on_ele=None) -> 'ActionChains': """点击鼠标右键,可先移动到元素上 \n - :param on_ele: ChromeElement元素 + :param on_ele: ChromiumElement元素 :return: self """ if on_ele: @@ -98,7 +98,7 @@ class ActionChains: def release(self, on_ele=None) -> 'ActionChains': """释放鼠标左键,可先移动到元素再释放 \n - :param on_ele: ChromeElement对象 + :param on_ele: ChromiumElement对象 :return: self """ if on_ele: @@ -111,7 +111,7 @@ class ActionChains: """滚动鼠标滚轮,可先移动到元素上 \n :param delta_x: 滚轮变化值x :param delta_y: 滚轮变化值y - :param on_ele: ChromeElement元素 + :param on_ele: ChromiumElement元素 :return: self """ if on_ele: diff --git a/DrissionPage/base.py b/DrissionPage/base.py index 5fc4669..acb0322 100644 --- a/DrissionPage/base.py +++ b/DrissionPage/base.py @@ -9,9 +9,6 @@ from re import sub from typing import Union, Tuple, List from urllib.parse import quote -from lxml.html import HtmlElement -from selenium.webdriver.remote.webelement import WebElement - from .common import format_html, get_loc @@ -59,7 +56,7 @@ class BaseElement(BaseParser): return True @abstractmethod - def _ele(self, loc_or_ele, timeout=None, single=True): + def _ele(self, loc_or_ele, timeout=None, single=True, relative=False): pass def parent(self, level_or_loc: Union[tuple, str, int] = 1): @@ -132,7 +129,7 @@ class DrissionElement(BaseElement): else: raise TypeError('level_or_loc参数只能是tuple、int或str。') - return self.ele(loc, timeout=0) + return self._ele(loc, timeout=0, relative=True) def prev(self, index: int = 1, @@ -256,7 +253,7 @@ class DrissionElement(BaseElement): loc = f'xpath:./{direction}{brother}::{loc}' - nodes = self._ele(loc, timeout=timeout, single=False) + nodes = self._ele(loc, timeout=timeout, single=False, relative=True) nodes = [e for e in nodes if not (isinstance(e, str) and sub('[ \n\t\r]', '', e) == '')] if nodes and index is not None: diff --git a/DrissionPage/chromium_element.py b/DrissionPage/chromium_element.py index 042fa01..aa29988 100644 --- a/DrissionPage/chromium_element.py +++ b/DrissionPage/chromium_element.py @@ -2,8 +2,9 @@ """ @Author : g1879 @Contact : g1879@qq.com -@File : chrome_element.py +@File : chromium_element.py """ +from json import loads from os import sep from os.path import basename from pathlib import Path @@ -11,9 +12,14 @@ from re import search from typing import Union, Tuple, List, Any from time import perf_counter, sleep +from pychrome import Tab +from requests import Session +from requests.cookies import RequestsCookieJar + +from .config import DriverOptions, _cookies_to_tuple from .keys import _keys_to_typing, _keyDescriptionForString, _keyDefinitions from .session_element import make_session_ele, SessionElement -from .base import DrissionElement, BaseElement +from .base import DrissionElement, BaseElement, BasePage from .common import make_absolute_link, get_loc, get_ele_txt, format_html, is_js_func, _location_in_viewport @@ -42,19 +48,26 @@ class ChromiumElement(DrissionElement): def __repr__(self) -> str: attrs = [f"{attr}='{self.attrs[attr]}'" for attr in self.attrs] - return f'' + return f'' def __call__(self, loc_or_str: Union[Tuple[str, str], str], - timeout: float = None) -> Union['ChromiumElement', str, None]: + timeout: float = None) -> Union['ChromiumElement', 'ChromiumFrame', str, None]: """在内部查找元素 \n 例:ele2 = ele1('@id=ele_id') \n :param loc_or_str: 元素的定位信息,可以是loc元组,或查询字符串 :param timeout: 超时时间 - :return: ChromeElement对象或属性、文本 + :return: ChromiumElement对象或属性、文本 """ return self.ele(loc_or_str, timeout) + @property + def tag(self) -> str: + """返回元素tag""" + if self._tag is None: + self._tag = self.page.driver.DOM.describeNode(nodeId=self._node_id)['node']['localName'].lower() + return self._tag + @property def html(self) -> str: """返回元素outerHTML文本""" @@ -66,19 +79,12 @@ class ChromiumElement(DrissionElement): return f'{sign}{in_html}' return self.page.driver.DOM.getOuterHTML(nodeId=self._node_id)['outerHTML'] - @property - def tag(self) -> str: - """返回元素tag""" - if self._tag is None: - self._tag = self.page.driver.DOM.describeNode(nodeId=self._node_id)['node']['localName'].lower() - return self._tag - @property def inner_html(self) -> str: """返回元素innerHTML文本""" if self.tag in ('iframe', 'frame'): return self.run_script('return this.contentDocument.documentElement;').html - # return run_script(self, 'this.contentDocument.body;').html + # return self.run_script(self, 'this.contentDocument.body;').html return self.run_script('return this.innerHTML;') @property @@ -169,7 +175,7 @@ class ChromiumElement(DrissionElement): return {'x': cx, 'y': cy} @property - def shadow_root(self) -> Union[None, 'ChromeShadowRootElement']: + def shadow_root(self) -> Union[None, 'ChromiumShadowRootElement']: """返回当前元素的shadow_root元素对象""" shadow = self.run_script('return this.shadowRoot;') return shadow @@ -283,13 +289,13 @@ class ChromiumElement(DrissionElement): def wait_ele(self, loc_or_ele: Union[str, tuple, 'ChromiumElement'], - timeout: float = None) -> 'ChromeElementWaiter': + timeout: float = None) -> 'ChromiumElementWaiter': """返回用于等待子元素到达某个状态的等待器对象 \n :param loc_or_ele: 可以是元素、查询字符串、loc元组 :param timeout: 等待超时时间 :return: 用于等待的ElementWaiter对象 """ - return ChromeElementWaiter(self, loc_or_ele, timeout) + return ChromiumElementWaiter(self, loc_or_ele, timeout) @property def select(self) -> 'ChromeSelect': @@ -430,21 +436,21 @@ class ChromiumElement(DrissionElement): def ele(self, loc_or_str: Union[Tuple[str, str], str], - timeout: float = None) -> Union['ChromiumElement', str, None]: + timeout: float = None) -> Union['ChromiumElement', 'ChromiumFrame', str, None]: """返回当前元素下级符合条件的第一个元素、属性或节点文本 \n :param loc_or_str: 元素的定位信息,可以是loc元组,或查询字符串 :param timeout: 查找元素超时时间,默认与元素所在页面等待时间一致 - :return: ChromeElement对象或属性、文本 + :return: ChromiumElement对象或属性、文本 """ return self._ele(loc_or_str, timeout) def eles(self, loc_or_str: Union[Tuple[str, str], str], - timeout: float = None) -> List[Union['ChromiumElement', str]]: + timeout: float = None) -> List[Union['ChromiumElement', 'ChromiumFrame', str]]: """返回当前元素下级所有符合条件的子元素、属性或节点文本 \n :param loc_or_str: 元素的定位信息,可以是loc元组,或查询字符串 :param timeout: 查找元素超时时间,默认与元素所在页面等待时间一致 - :return: ChromeElement对象或属性、文本组成的列表 + :return: ChromiumElement对象或属性、文本组成的列表 """ return self._ele(loc_or_str, timeout=timeout, single=False) @@ -468,15 +474,16 @@ class ChromiumElement(DrissionElement): def _ele(self, loc_or_str: Union[Tuple[str, str], str], - timeout: float = None, - single: bool = True) -> Union['ChromiumElement', str, None, List[Union['ChromiumElement', str]]]: + timeout: float = None, single: bool = True, relative=False) \ + -> Union['ChromiumElement', 'ChromiumFrame', str, None, + List[Union['ChromiumElement', 'ChromiumFrame', str]]]: """返回当前元素下级符合条件的子元素、属性或节点文本,默认返回第一个 \n :param loc_or_str: 元素的定位信息,可以是loc元组,或查询字符串 :param timeout: 查找元素超时时间 :param single: True则返回第一个,False则返回全部 - :return: ChromeElement对象或文本、属性或其组成的列表 + :return: ChromiumElement对象或文本、属性或其组成的列表 """ - return make_chrome_ele(self, loc_or_str, single, timeout) + return make_chromium_ele(self, loc_or_str, single, timeout, relative=relative) def style(self, style: str, pseudo_ele: str = '') -> str: """返回元素样式属性值,可获取伪元素属性值 \n @@ -725,7 +732,7 @@ class ChromiumElement(DrissionElement): elif isinstance(ele_or_loc, (list, tuple)): target_x, target_y = ele_or_loc else: - raise TypeError('需要ChromeElement对象或坐标。') + raise TypeError('需要ChromiumElement对象或坐标。') curr_xy = self.midpoint current_x = curr_xy['x'] @@ -809,8 +816,8 @@ class ChromiumElement(DrissionElement): return f':root{t}' if mode == 'css' else t -class ChromeShadowRootElement(BaseElement): - """ChromeShadowRootElement是用于处理ShadowRoot的类,使用方法和ChromeElement基本一致""" +class ChromiumShadowRootElement(BaseElement): + """ChromiumShadowRootElement是用于处理ShadowRoot的类,使用方法和ChromiumElement基本一致""" def __init__(self, parent_ele: ChromiumElement, obj_id: str): super().__init__(parent_ele.page) @@ -893,7 +900,7 @@ class ChromeShadowRootElement(BaseElement): def parent(self, level_or_loc: Union[str, int] = 1) -> ChromiumElement: """返回上面某一级父元素,可指定层数或用查询语法定位 \n :param level_or_loc: 第几级父元素,或定位符 - :return: ChromeElement对象 + :return: ChromiumElement对象 """ if isinstance(level_or_loc, int): loc = f'xpath:./ancestor-or-self::*[{level_or_loc}]' @@ -909,7 +916,7 @@ class ChromeShadowRootElement(BaseElement): else: raise TypeError('level_or_loc参数只能是tuple、int或str。') - return self.parent_ele.ele(loc, timeout=0) + return self.parent_ele._ele(loc, timeout=0, relative=True) def next(self, index: int = 1, @@ -917,7 +924,7 @@ class ChromeShadowRootElement(BaseElement): """返回后面的一个兄弟元素,可用查询语法筛选,可指定返回筛选结果的第几个 \n :param index: 第几个查询结果元素 :param filter_loc: 用于筛选元素的查询语法 - :return: ChromeElement对象 + :return: ChromiumElement对象 """ nodes = self.nexts(filter_loc=filter_loc) return nodes[index - 1] if nodes else None @@ -946,7 +953,7 @@ class ChromeShadowRootElement(BaseElement): def nexts(self, filter_loc: Union[tuple, str] = '') -> List[Union[ChromiumElement, str]]: """返回后面所有兄弟元素或节点组成的列表 \n :param filter_loc: 用于筛选元素的查询语法 - :return: ChromeElement对象组成的列表 + :return: ChromiumElement对象组成的列表 """ loc = get_loc(filter_loc, True) if loc[0] == 'css selector': @@ -954,7 +961,7 @@ class ChromeShadowRootElement(BaseElement): loc = loc[1].lstrip('./') xpath = f'xpath:./{loc}' - return self.parent_ele.eles(xpath, timeout=0.1) + return self.parent_ele._ele(xpath, timeout=0.1, single=False, relative=True) def befores(self, filter_loc: Union[tuple, str] = '') -> List[Union[ChromiumElement, str]]: """返回后面全部兄弟元素或节点组成的列表,可用查询语法筛选 \n @@ -967,7 +974,7 @@ class ChromeShadowRootElement(BaseElement): loc = loc[1].lstrip('./') xpath = f'xpath:./preceding::{loc}' - return self.parent_ele.eles(xpath, timeout=0.1) + return self.parent_ele._ele(xpath, timeout=0.1, single=False, relative=True) def afters(self, filter_loc: Union[tuple, str] = '') -> List[Union[ChromiumElement, str]]: """返回前面全部兄弟元素或节点组成的列表,可用查询语法筛选 \n @@ -977,7 +984,7 @@ class ChromeShadowRootElement(BaseElement): eles1 = self.nexts(filter_loc) loc = get_loc(filter_loc, True)[1].lstrip('./') xpath = f'xpath:./following::{loc}' - return eles1 + self.parent_ele.eles(xpath, timeout=0.1) + return eles1 + self.parent_ele._ele(xpath, timeout=0.1, single=False, relative=True) def ele(self, loc_or_str: Union[Tuple[str, str], str], @@ -985,7 +992,7 @@ class ChromeShadowRootElement(BaseElement): """返回当前元素下级符合条件的第一个元素 \n :param loc_or_str: 元素的定位信息,可以是loc元组,或查询字符串 :param timeout: 查找元素超时时间,默认与元素所在页面等待时间一致 - :return: ChromeElement对象 + :return: ChromiumElement对象 """ return self._ele(loc_or_str, timeout) @@ -995,7 +1002,7 @@ class ChromeShadowRootElement(BaseElement): """返回当前元素下级所有符合条件的子元素 \n :param loc_or_str: 元素的定位信息,可以是loc元组,或查询字符串 :param timeout: 查找元素超时时间,默认与元素所在页面等待时间一致 - :return: ChromeElement对象组成的列表 + :return: ChromiumElement对象组成的列表 """ return self._ele(loc_or_str, timeout=timeout, single=False) @@ -1016,12 +1023,12 @@ class ChromeShadowRootElement(BaseElement): def _ele(self, loc_or_str: Union[Tuple[str, str], str], timeout: float = None, - single: bool = True) -> Union['ChromiumElement', None, List[ChromiumElement]]: + single: bool = True, relative=False) -> Union['ChromiumElement', None, List[ChromiumElement]]: """返回当前元素下级符合条件的子元素、属性或节点文本,默认返回第一个 \n :param loc_or_str: 元素的定位信息,可以是loc元组,或查询字符串 :param timeout: 查找元素超时时间 :param single: True则返回第一个,False则返回全部 - :return: ChromeElement对象或其组成的列表 + :return: ChromiumElement对象或其组成的列表 """ loc = get_loc(loc_or_str) if loc[0] == 'css selector' and str(loc[1]).startswith(':root'): @@ -1054,16 +1061,721 @@ class ChromeShadowRootElement(BaseElement): return self.page.driver.DOM.requestNode(objectId=obj_id)['nodeId'] -def make_chrome_ele(ele: ChromiumElement, - loc: Union[str, Tuple[str, str]], - single: bool = True, - timeout: float = None) -> Union[ChromiumElement, str, None, List[Union[ChromiumElement, str]]]: - """在chrome元素中查找 \n - :param ele: ChromeElement对象 +class ChromiumBase(BasePage): + """标签页、frame、页面基类""" + + def __init__(self, + address: str, + tab_id: str = None, + timeout: float = None): + """初始化 \n + :param address: 浏览器地址:端口 + :param tab_id: 要控制的标签页id,不指定默认为激活的 + :param timeout: 超时时间 + """ + super().__init__(timeout) + self._is_loading = None + self._root_id = None + self._connect_browser(address, tab_id) + + def _connect_browser(self, + addr_tab_opts: Union[str, Tab, DriverOptions] = None, + tab_id: str = None) -> None: + """连接浏览器,在第一次时运行 \n + :param addr_tab_opts: 浏览器地址、Tab对象或DriverOptions对象 + :param tab_id: 要控制的标签页id,不指定默认为激活的 + :return: None + """ + self._is_loading = False + self._root_id = None + self.timeouts = Timeout(self) + self._control_session = Session() + self._control_session.keep_alive = False + self._first_run = True + + self.address = addr_tab_opts + if not tab_id: + json = loads(self._control_session.get(f'http://{self.address}/json').text) + tab_id = [i['id'] for i in json if i['type'] == 'page'][0] + self._set_options() + self._init_page(tab_id) + self._get_document() + self._first_run = False + + def _init_page(self, tab_id: str = None) -> None: + """新建页面、页面刷新、切换标签页后要进行的cdp参数初始化 + :param tab_id: 要跳转到的标签页id + :return: None + """ + self._is_loading = True + if tab_id: + self._driver = Tab(id=tab_id, type='page', + webSocketDebuggerUrl=f'ws://{self.address}/devtools/page/{tab_id}') + + self._driver.start() + self._driver.DOM.enable() + self._driver.Page.enable() + + self._driver.Page.frameNavigated = self._onFrameNavigated + self._driver.Page.loadEventFired = self._onLoadEventFired + # self._driver.DOM.documentUpdated = self._onDocumentUpdated + + def _get_document(self) -> None: + """刷新cdp使用的document数据""" + # print('get doc') + self._wait_loading() + root_id = self._driver.DOM.getDocument()['root']['nodeId'] + self._root_id = self._driver.DOM.resolveNode(nodeId=root_id)['object']['objectId'] + self._is_loading = False + + def _wait_loading(self, timeout: float = None) -> bool: + """等待页面加载完成 + :param timeout: 超时时间 + :return: 是否成功,超时返回False + """ + timeout = timeout if timeout is not None else self.timeouts.page_load + + end_time = perf_counter() + timeout + while perf_counter() < end_time: + state = self.ready_state + if state == 'complete': + return True + elif self.page_load_strategy == 'eager' and state in ('interactive', 'complete'): + self.stop_loading() + return True + elif self.page_load_strategy == 'none': + self.stop_loading() + return True + + return False + + def _onLoadEventFired(self, **kwargs): + """在页面刷新、变化后重新读取页面内容""" + # print('load complete') + if self._first_run is False and self._is_loading: + self._get_document() + + def _onFrameNavigated(self, **kwargs): + """页面跳转时触发""" + if not kwargs['frame'].get('parentId', None): + # print('nav') + self._is_loading = True + + def _onDocumentUpdated(self, **kwargs): + # print('doc') + pass + + def _set_options(self) -> None: + pass + + def __call__(self, loc_or_str: Union[Tuple[str, str], str, 'ChromiumElement'], + timeout: float = None) -> Union['ChromiumElement', 'ChromiumFrame', None]: + """在内部查找元素 \n + 例:ele = page('@id=ele_id') \n + :param loc_or_str: 元素的定位信息,可以是loc元组,或查询字符串 + :param timeout: 超时时间 + :return: ChromiumElement对象 + """ + return self.ele(loc_or_str, timeout) + + @property + def driver(self) -> Tab: + """返回用于控制浏览器的Tab对象""" + return self._driver + + @property + def _wait_driver(self) -> Tab: + """返回用于控制浏览器的Tab对象,会先等待页面加载完毕""" + while self._is_loading: + # print('loading') + sleep(.1) + return self._driver + + @property + def url(self) -> str: + """返回当前页面url""" + tab_id = self._wait_driver.id # 用于WebPage时激活浏览器 + json = loads(self._control_session.get(f'http://{self.address}/json').text) + return [i['url'] for i in json if i['id'] == tab_id][0] + + @property + def html(self) -> str: + """返回当前页面html文本""" + node_id = self._wait_driver.DOM.getDocument()['root']['nodeId'] + return self._wait_driver.DOM.getOuterHTML(nodeId=node_id)['outerHTML'] + + @property + def json(self) -> dict: + """当返回内容是json格式时,返回对应的字典""" + return loads(self('t:pre').text) + + @property + def tab_id(self) -> str: + """返回当前标签页id""" + return self._wait_driver.id + + @property + def ready_state(self) -> str: + """返回当前页面加载状态,'loading' 'interactive' 'complete'""" + return self._driver.Runtime.evaluate(expression='document.readyState;')['result']['value'] + + @property + def size(self) -> dict: + """返回页面总长宽,{'height': int, 'width': int}""" + w = self.run_script('document.body.scrollWidth;', as_expr=True) + h = self.run_script('document.body.scrollHeight;', as_expr=True) + return {'height': h, 'width': w} + + @property + def active_ele(self) -> ChromiumElement: + """返回当前焦点所在元素""" + return self.run_script('return document.activeElement;') + + @property + def page_load_strategy(self) -> str: + """返回页面加载策略""" + return self._page_load_strategy + + @property + def scroll(self) -> 'ChromeScroll': + """返回用于滚动滚动条的对象""" + if not hasattr(self, '_scroll'): + self._scroll = ChromeScroll(self) + return self._scroll + + def set_page_load_strategy(self, value: str) -> None: + """设置页面加载策略 \n + :param value: 可选'normal', 'eager', 'none' + :return: None + """ + if value not in ('normal', 'eager', 'none'): + raise ValueError("只能选择'normal', 'eager', 'none'。") + self._page_load_strategy = value + + def set_timeouts(self, implicit: float = None, page_load: float = None, script: float = None) -> None: + """设置超时时间,单位为秒 \n + :param implicit: 查找元素超时时间 + :param page_load: 页面加载超时时间 + :param script: 脚本运行超时时间 + :return: None + """ + if implicit is not None: + self.timeout = implicit + + if page_load is not None: + self.timeouts.page_load = page_load + + if script is not None: + self.timeouts.script = script + + def run_script(self, script: str, as_expr: bool = False, *args: Any) -> Any: + """运行javascript代码 \n + :param script: js文本 + :param as_expr: 是否作为表达式运行,为True时args无效 + :param args: 参数,按顺序在js文本中对应argument[0]、argument[2]... + :return: 运行的结果 + """ + return _run_script(self, script, as_expr, self.timeouts.script, args) + + def run_async_script(self, script: str, as_expr: bool = False, *args: Any) -> None: + """以异步方式执行js代码 \n + :param script: js文本 + :param as_expr: 是否作为表达式运行,为True时args无效 + :param args: 参数,按顺序在js文本中对应argument[0]、argument[2]... + :return: None + """ + from threading import Thread + Thread(target=_run_script, args=(self, script, as_expr, self.timeouts.script, args)).start() + + def get(self, + url: str, + show_errmsg: bool = False, + retry: int = None, + interval: float = None, + timeout: float = None) -> Union[None, bool]: + """访问url \n + :param url: 目标url + :param show_errmsg: 是否显示和抛出异常 + :param retry: 重试次数 + :param interval: 重试间隔(秒) + :param timeout: 连接超时时间 + :return: 目标url是否可用,返回None表示不确定 + """ + retry, interval = self._before_connect(url, retry, interval) + self._url_available = self._d_connect(self._url, + times=retry, + interval=interval, + show_errmsg=show_errmsg, + timeout=timeout) + return self._url_available + + def get_cookies(self, as_dict: bool = False) -> Union[list, dict]: + """获取cookies信息 \n + :param as_dict: 为True时返回由{name: value}键值对组成的dict + :return: cookies信息 + """ + cookies = self._wait_driver.Network.getCookies()['cookies'] + if as_dict: + return {cookie['name']: cookie['value'] for cookie in cookies} + else: + return cookies + + def set_cookies(self, cookies: Union[RequestsCookieJar, list, tuple, str, dict]) -> None: + """设置cookies值 \n + :param cookies: cookies信息 + :return: None + """ + cookies = _cookies_to_tuple(cookies) + result_cookies = [] + for cookie in cookies: + if not cookie.get('domain', None): + continue + c = {'value': '' if cookie['value'] is None else cookie['value'], + 'name': cookie['name'], + 'domain': cookie['domain']} + result_cookies.append(c) + self._wait_driver.Network.setCookies(cookies=result_cookies) + + def ele(self, + loc_or_ele: Union[Tuple[str, str], str, ChromiumElement, 'ChromiumFrame'], + timeout: float = None) -> Union[ChromiumElement, 'ChromiumFrame', None]: + """获取第一个符合条件的元素对象 \n + :param loc_or_ele: 定位符或元素对象 + :param timeout: 查找超时时间 + :return: ChromiumElement对象 + """ + return self._ele(loc_or_ele, timeout=timeout) + + def eles(self, + loc_or_ele: Union[Tuple[str, str], str], + timeout: float = None) -> List[Union[ChromiumElement, 'ChromiumFrame']]: + """获取所有符合条件的元素对象 \n + :param loc_or_ele: 定位符或元素对象 + :param timeout: 查找超时时间 + :return: ChromiumElement对象组成的列表 + """ + return self._ele(loc_or_ele, timeout=timeout, single=False) + + def s_ele(self, loc_or_ele: Union[Tuple[str, str], str, ChromiumElement] = None) \ + -> Union[SessionElement, str, None]: + """查找第一个符合条件的元素以SessionElement形式返回,处理复杂页面时效率很高 \n + :param loc_or_ele: 元素的定位信息,可以是loc元组,或查询字符串 + :return: SessionElement对象或属性、文本 + """ + if isinstance(loc_or_ele, ChromiumElement): + return make_session_ele(loc_or_ele) + else: + return make_session_ele(self, loc_or_ele) + + def s_eles(self, loc_or_str: Union[Tuple[str, str], str] = None) -> List[Union[SessionElement, str]]: + """查找所有符合条件的元素以SessionElement列表形式返回 \n + :param loc_or_str: 元素的定位信息,可以是loc元组,或查询字符串 + :return: SessionElement对象组成的列表 + """ + return make_session_ele(self, loc_or_str, single=False) + + def _ele(self, + loc_or_ele: Union[Tuple[str, str], str, ChromiumElement, 'ChromiumFrame'], + timeout: float = None, single: bool = True, relative:bool=False) \ + -> Union[ChromiumElement, 'ChromiumFrame', None, List[Union[ChromiumElement, 'ChromiumFrame']]]: + """执行元素查找 + :param loc_or_ele: 定位符或元素对象 + :param timeout: 查找超时时间 + :param single: 是否只返回第一个 + :return: ChromiumElement对象或元素对象组成的列表 + """ + if isinstance(loc_or_ele, (str, tuple)): + loc = get_loc(loc_or_ele)[1] + elif isinstance(loc_or_ele, ChromiumElement): + return loc_or_ele + else: + raise ValueError('loc_or_str参数只能是tuple、str、ChromiumElement类型。') + + timeout = timeout if timeout is not None else self.timeout + search_result = self._wait_driver.DOM.performSearch(query=loc, includeUserAgentShadowDOM=True) + count = search_result['resultCount'] + + end_time = perf_counter() + timeout + while count == 0 and perf_counter() < end_time: + search_result = self._wait_driver.DOM.performSearch(query=loc, includeUserAgentShadowDOM=True) + count = search_result['resultCount'] + + if count == 0: + return None if single else [] + + count = 1 if single else count + nodeIds = self._wait_driver.DOM.getSearchResults(searchId=search_result['searchId'], fromIndex=0, + toIndex=count) + eles = [] + for i in nodeIds['nodeIds']: + ele = ChromiumElement(self, node_id=i) + if ele.tag in ('iframe', 'frame'): + ele = ChromiumFrame(self, ele) + eles.append(ele) + + return eles[0] if single else eles + + def wait_ele(self, + loc_or_ele: Union[str, tuple, ChromiumElement], + timeout: float = None) -> 'ChromiumElementWaiter': + """返回用于等待元素到达某个状态的等待器对象 \n + :param loc_or_ele: 可以是元素、查询字符串、loc元组 + :param timeout: 等待超时时间 + :return: 用于等待的ElementWaiter对象 + """ + return ChromiumElementWaiter(self, loc_or_ele, timeout) + + def scroll_to_see(self, loc_or_ele: Union[str, tuple, ChromiumElement]) -> None: + """滚动页面直到元素可见 \n + :param loc_or_ele: 元素的定位信息,可以是loc元组,或查询字符串(详见ele函数注释) + :return: None + """ + node_id = self.ele(loc_or_ele).node_id + try: + self._wait_driver.DOM.scrollIntoViewIfNeeded(nodeId=node_id) + except Exception: + self.ele(loc_or_ele).run_script("this.scrollIntoView();") + + def refresh(self, ignore_cache: bool = False) -> None: + """刷新当前页面 \n + :param ignore_cache: 是否忽略缓存 + :return: None + """ + self._driver.Page.reload(ignoreCache=ignore_cache) + + def forward(self, steps: int = 1) -> None: + """在浏览历史中前进若干步 \n + :param steps: 前进步数 + :return: None + """ + self.run_script(f'window.history.go({steps});', as_expr=True) + + def back(self, steps: int = 1) -> None: + """在浏览历史中后退若干步 \n + :param steps: 后退步数 + :return: None + """ + self.run_script(f'window.history.go({-steps});', as_expr=True) + + def stop_loading(self) -> None: + """页面停止加载""" + self._driver.Page.stopLoading() + self._get_document() + + def run_cdp(self, cmd: str, **cmd_args) -> dict: + """执行Chrome DevTools Protocol语句 \n + :param cmd: 协议项目 + :param cmd_args: 参数 + :return: 执行的结果 + """ + return self._driver.call_method(cmd, **cmd_args) + + def set_user_agent(self, ua: str) -> None: + """为当前tab设置user agent,只在当前tab有效 \n + :param ua: user agent字符串 + :return: None + """ + self._wait_driver.Network.setUserAgentOverride(userAgent=ua) + + def get_session_storage(self, item: str = None) -> Union[str, dict, None]: + """获取sessionStorage信息,不设置item则获取全部 \n + :param item: 要获取的项,不设置则返回全部 + :return: sessionStorage一个或所有项内容 + """ + js = f'sessionStorage.getItem("{item}");' if item else 'sessionStorage;' + return self.run_script(js, as_expr=True) + + def get_local_storage(self, item: str = None) -> Union[str, dict, None]: + """获取localStorage信息,不设置item则获取全部 \n + :param item: 要获取的项目,不设置则返回全部 + :return: localStorage一个或所有项内容 + """ + js = f'localStorage.getItem("{item}");' if item else 'localStorage;' + return self.run_script(js, as_expr=True) + + def set_session_storage(self, item: str, value: Union[str, bool]) -> None: + """设置或删除某项sessionStorage信息 \n + :param item: 要设置的项 + :param value: 项的值,设置为False时,删除该项 + :return: None + """ + js = f'sessionStorage.removeItem("{item}");' if item is False else f'sessionStorage.setItem("{item}","{value}");' + return self.run_script(js, as_expr=True) + + def set_local_storage(self, item: str, value: Union[str, bool]) -> None: + """设置或删除某项localStorage信息 \n + :param item: 要设置的项 + :param value: 项的值,设置为False时,删除该项 + :return: None + """ + js = f'localStorage.removeItem("{item}");' if item is False else f'localStorage.setItem("{item}","{value}");' + return self.run_script(js, as_expr=True) + + def clear_cache(self, + session_storage: bool = True, + local_storage: bool = True, + cache: bool = True, + cookies: bool = True) -> None: + """清除缓存,可选要清除的项 \n + :param session_storage: 是否清除sessionStorage + :param local_storage: 是否清除localStorage + :param cache: 是否清除cache + :param cookies: 是否清除cookies + :return: None + """ + if session_storage: + self.run_script('sessionStorage.clear();', as_expr=True) + if local_storage: + self.run_script('localStorage.clear();', as_expr=True) + if cache: + self._wait_driver.Network.clearBrowserCache() + if cookies: + self._wait_driver.Network.clearBrowserCookies() + + def _d_connect(self, + to_url: str, + times: int = 0, + interval: float = 1, + show_errmsg: bool = False, + timeout: float = None) -> Union[bool, None]: + """尝试连接,重试若干次 \n + :param to_url: 要访问的url + :param times: 重试次数 + :param interval: 重试间隔(秒) + :param show_errmsg: 是否抛出异常 + :param timeout: 连接超时时间 + :return: 是否成功,返回None表示不确定 + """ + err = None + timeout = timeout if timeout is not None else self.timeouts.page_load + + for _ in range(times + 1): + result = self._driver.Page.navigate(url=to_url) + is_timeout = not self._wait_loading(timeout) + + if is_timeout: + err = TimeoutError('页面连接超时。') + if 'errorText' in result: + err = ConnectionError(result['errorText']) + + if not err: + break + + if _ < times: + sleep(interval) + if show_errmsg: + print(f'重试 {to_url}') + + if err and show_errmsg: + raise err if err is not None else ConnectionError('连接异常。') + + return False if err else True + + +class ChromiumFrame(ChromiumBase): + """实现浏览器frame的类""" + + def __init__(self, page, + ele: ChromiumElement): + """初始化 \n + :param page: 浏览器地址:端口、Tab对象或DriverOptions对象 + :param ele: 页面上的frame元素 + """ + self.page = page + self._inner_ele = ele + frame_id = page.run_cdp('DOM.describeNode', nodeId=ele.node_id)['node'].get('frameId', None) + super().__init__(page.address, frame_id, page.timeout) + + def __repr__(self) -> str: + attrs = [f"{attr}='{self.attrs[attr]}'" for attr in self.attrs] + return f'' + + @property + def tag(self) -> str: + """返回元素tag""" + return self._inner_ele.tag + + @property + def html(self) -> str: + """返回元素outerHTML文本""" + tag = self.tag + out_html = self.page.driver.DOM.getOuterHTML(nodeId=self._inner_ele.node_id)['outerHTML'] + in_html = super().html + sign = search(rf'<{tag}.*?>', out_html).group(0) + return f'{sign}{in_html}' + + @property + def inner_html(self) -> str: + """返回元素innerHTML文本""" + return super().html + + @property + def attrs(self) -> dict: + return self._inner_ele.attrs + + @property + def frame_size(self) -> dict: + """返回frame元素大小""" + return self._inner_ele.size + + def _set_options(self) -> None: + self.set_timeouts(page_load=self.page.timeouts.page_load, + script=self.page.timeouts.script, + implicit=self.page.timeouts.implicit if self.timeout is None else self.timeout) + self._page_load_strategy = self.page.page_load_strategy + + @property + def obj_id(self) -> str: + """返回js中的object id""" + return self._inner_ele.obj_id + + @property + def node_id(self) -> str: + """返回cdp中的node id""" + return self._inner_ele.node_id + + @property + def location(self) -> dict: + """返回frame元素左上角的绝对坐标""" + return self._inner_ele.location + + @property + def is_displayed(self) -> bool: + """返回frame元素是否显示""" + return self._inner_ele.is_displayed + + def attr(self, attr: str) -> Union[str, None]: + """返回frame元素attribute属性值 \n + :param attr: 属性名 + :return: 属性值文本,没有该属性返回None + """ + return self._inner_ele.attr(attr) + + def set_attr(self, attr: str, value: str) -> None: + """设置frame元素attribute属性 \n + :param attr: 属性名 + :param value: 属性值 + :return: None + """ + self._inner_ele.set_attr(attr, value) + + def remove_attr(self, attr: str) -> None: + """删除frame元素attribute属性 \n + :param attr: 属性名 + :return: None + """ + self._inner_ele.remove_attr(attr) + + def parent(self, level_or_loc: Union[tuple, str, int] = 1) -> Union['ChromiumElement', None]: + """返回上面某一级父元素,可指定层数或用查询语法定位 \n + :param level_or_loc: 第几级父元素,或定位符 + :return: 上级元素对象 + """ + return self._inner_ele.parent(level_or_loc) + + def prev(self, + index: int = 1, + filter_loc: Union[tuple, str] = '', + timeout: float = 0) -> Union['ChromiumElement', str, None]: + """返回前面的一个兄弟元素,可用查询语法筛选,可指定返回筛选结果的第几个 \n + :param index: 前面第几个查询结果元素 + :param filter_loc: 用于筛选元素的查询语法 + :param timeout: 查找元素的超时时间 + :return: 兄弟元素 + """ + return self._inner_ele.prev(index, filter_loc, timeout) + + def next(self, + index: int = 1, + filter_loc: Union[tuple, str] = '', + timeout: float = 0) -> Union['ChromiumElement', str, None]: + """返回后面的一个兄弟元素,可用查询语法筛选,可指定返回筛选结果的第几个 \n + :param index: 后面第几个查询结果元素 + :param filter_loc: 用于筛选元素的查询语法 + :param timeout: 查找元素的超时时间 + :return: 兄弟元素 + """ + return self._inner_ele.next(index, filter_loc, timeout) + + def before(self, + index: int = 1, + filter_loc: Union[tuple, str] = '', + timeout: float = None) -> Union['ChromiumElement', str, None]: + """返回当前元素前面的一个元素,可指定筛选条件和第几个。查找范围不限兄弟元素,而是整个DOM文档 \n + :param index: 前面第几个查询结果元素 + :param filter_loc: 用于筛选元素的查询语法 + :param timeout: 查找元素的超时时间 + :return: 本元素前面的某个元素或节点 + """ + return self._inner_ele.before(index, filter_loc, timeout) + + def after(self, + index: int = 1, + filter_loc: Union[tuple, str] = '', + timeout: float = None) -> Union['ChromiumElement', str, None]: + """返回当前元素后面的一个元素,可指定筛选条件和第几个。查找范围不限兄弟元素,而是整个DOM文档 \n + :param index: 后面第几个查询结果元素 + :param filter_loc: 用于筛选元素的查询语法 + :param timeout: 查找元素的超时时间 + :return: 本元素后面的某个元素或节点 + """ + return self._inner_ele.after(index, filter_loc, timeout) + + def prevs(self, + filter_loc: Union[tuple, str] = '', + timeout: float = 0) -> List[Union['ChromiumElement', str]]: + """返回前面全部兄弟元素或节点组成的列表,可用查询语法筛选 \n + :param filter_loc: 用于筛选元素的查询语法 + :param timeout: 查找元素的超时时间 + :return: 兄弟元素或节点文本组成的列表 + """ + return self._inner_ele.prevs(filter_loc, timeout) + + def nexts(self, + filter_loc: Union[tuple, str] = '', + timeout: float = 0) -> List[Union['ChromiumElement', str]]: + """返回后面全部兄弟元素或节点组成的列表,可用查询语法筛选 \n + :param filter_loc: 用于筛选元素的查询语法 + :param timeout: 查找元素的超时时间 + :return: 兄弟元素或节点文本组成的列表 + """ + return self._inner_ele.nexts(filter_loc, timeout) + + def befores(self, + filter_loc: Union[tuple, str] = '', + timeout: float = None) -> List[Union['ChromiumElement', str]]: + """返回当前元素后面符合条件的全部兄弟元素或节点组成的列表,可用查询语法筛选。查找范围不限兄弟元素,而是整个DOM文档 \n + :param filter_loc: 用于筛选元素的查询语法 + :param timeout: 查找元素的超时时间 + :return: 本元素前面的元素或节点组成的列表 + """ + return self._inner_ele.befores(filter_loc, timeout) + + +class Timeout(object): + """用于保存d模式timeout信息的类""" + + def __init__(self, page): + self.page = page + self.page_load = 30 + self.script = 30 + + @property + def implicit(self): + return self.page.timeout + + +def make_chromium_ele(ele: ChromiumElement, + loc: Union[str, Tuple[str, str]], + single: bool = True, + timeout: float = None, + relative=True) -> Union[ChromiumElement, str, None, List[Union[ChromiumElement, str]]]: + """在chromium元素中查找 \n + :param ele: ChromiumElement对象 :param loc: 元素定位元组 :param single: True则返回第一个,False则返回全部 :param timeout: 查找元素超时时间 - :return: 返回ChromeElement元素或它们组成的列表 + :return: 返回ChromiumElement元素或它们组成的列表 """ # ---------------处理定位符--------------- if isinstance(loc, (str, tuple)): @@ -1082,7 +1794,7 @@ def make_chrome_ele(ele: ChromiumElement, # ---------------执行查找----------------- if loc[0] == 'xpath': - return _find_by_xpath(ele, loc[1], single, timeout) + return _find_by_xpath(ele, loc[1], single, timeout, relative=relative) else: return _find_by_css(ele, loc[1], single, timeout) @@ -1091,16 +1803,17 @@ def make_chrome_ele(ele: ChromiumElement, def _find_by_xpath(ele: ChromiumElement, xpath: str, single: bool, - timeout: float) -> Union[ChromiumElement, List[ChromiumElement], None]: + timeout: float, + relative=True) -> Union[ChromiumElement, List[ChromiumElement], None]: """执行用xpath在元素中查找元素 :param ele: 在此元素中查找 :param xpath: 查找语句 :param single: 是否只返回第一个结果 :param timeout: 超时时间 - :return: ChromeElement或其组成的列表 + :return: ChromiumElement或其组成的列表 """ type_txt = '9' if single else '7' - node_txt = 'this.contentDocument' if ele.tag in ('iframe', 'frame') else 'this' + node_txt = 'this.contentDocument' if ele.tag in ('iframe', 'frame') and not relative else 'this' js = _make_js_for_find_ele_by_xpath(xpath, type_txt, node_txt) r = ele.page.run_cdp('Runtime.callFunctionOn', functionDeclaration=js, objectId=ele.obj_id, returnByValue=False, awaitPromise=True, @@ -1129,14 +1842,15 @@ def _find_by_xpath(ele: ChromiumElement, if r['result']['subtype'] == 'null': return None else: - return ChromiumElement(ele.page, obj_id=r['result']['objectId']) + # return ChromiumElement(ele.page, obj_id=r['result']['objectId']) + return _make_chromium_ele(ele.page, obj_id=r['result']['objectId']) else: if r['result']['description'] == 'NodeList(0)': return [] else: r = ele.page.driver.Runtime.getProperties(objectId=r['result']['objectId'], ownProperties=True)['result'] - return [ChromiumElement(ele.page, obj_id=i['value']['objectId']) + return [_make_chromium_ele(ele.page, obj_id=i['value']['objectId']) if i['value']['type'] == 'object' else i['value']['value'] for i in r[:-1]] @@ -1150,7 +1864,7 @@ def _find_by_css(ele: ChromiumElement, :param selector: 查找语句 :param single: 是否只返回第一个结果 :param timeout: 超时时间 - :return: ChromeElement或其组成的列表 + :return: ChromiumElement或其组成的列表 """ selector = selector.replace('"', r'\"') find_all = '' if single else 'All' @@ -1173,14 +1887,22 @@ def _find_by_css(ele: ChromiumElement, if r['result']['subtype'] == 'null': return None else: - return ChromiumElement(ele.page, obj_id=r['result']['objectId']) + return _make_chromium_ele(ele.page, obj_id=r['result']['objectId']) else: if r['result']['description'] == 'NodeList(0)': return [] else: r = ele.page.driver.Runtime.getProperties(objectId=r['result']['objectId'], ownProperties=True)['result'] - return [ChromiumElement(ele.page, obj_id=i['value']['objectId']) for i in r] + return [_make_chromium_ele(ele.page, obj_id=i['value']['objectId']) for i in r] + + +def _make_chromium_ele(page, node_id: str = None, obj_id: str = None): + """根据node id或object id生成相应元素对象""" + ele = ChromiumElement(page, obj_id=obj_id, node_id=node_id) + if ele.tag in ('iframe', 'frame') and ele.attr('src'): + ele = ChromiumFrame(page, ele) + return ele def _make_js_for_find_ele_by_xpath(xpath: str, type_txt: str, node_txt: str) -> str: @@ -1232,7 +1954,7 @@ def _run_script(page_or_ele, script: str, as_expr: bool = False, timeout: float :param args: 参数,按顺序在js文本中对应argument[0]、argument[2]... :return: js执行结果 """ - if isinstance(page_or_ele, (ChromiumElement, ChromeShadowRootElement)): + if isinstance(page_or_ele, (ChromiumElement, ChromiumShadowRootElement)): page = page_or_ele.page obj_id = page_or_ele.obj_id else: @@ -1279,9 +2001,9 @@ def _parse_js_result(page, ele, result: dict): elif sub_type == 'node': if result['className'] == 'ShadowRoot': - return ChromeShadowRootElement(ele, obj_id=result['objectId']) + return ChromiumShadowRootElement(ele, obj_id=result['objectId']) else: - return ChromiumElement(page, obj_id=result['objectId']) + return _make_chromium_ele(page, obj_id=result['objectId']) elif sub_type == 'array': r = page.driver.Runtime.getProperties(objectId=result['result']['objectId'], ownProperties=True)['result'] @@ -1372,7 +2094,7 @@ class ChromeScroll(object): def __init__(self, page_or_ele): """ - :param page_or_ele: ChromePage或ChromeElement + :param page_or_ele: ChromePage或ChromiumElement """ if isinstance(page_or_ele, ChromiumElement): self.t1 = self.t2 = 'this' @@ -1486,7 +2208,7 @@ class ChromeSelect(object): @property def selected_option(self) -> Union[ChromiumElement, None]: """返回第一个被选中的option元素 \n - :return: ChromeElement对象或None + :return: ChromiumElement对象或None """ ele = self._ele.run_script('return this.options[this.selectedIndex];') return ele @@ -1494,7 +2216,7 @@ class ChromeSelect(object): @property def selected_options(self) -> List[ChromiumElement]: """返回所有被选中的option元素列表 \n - :return: ChromeElement对象组成的列表 + :return: ChromiumElement对象组成的列表 """ return [x for x in self.options if x.is_selected] @@ -1642,7 +2364,7 @@ class ChromeSelect(object): return success -class ChromeElementWaiter(object): +class ChromiumElementWaiter(object): """等待元素在dom中某种状态,如删除、显示、隐藏""" def __init__(self, diff --git a/DrissionPage/chromium_page.py b/DrissionPage/chromium_page.py index 0e2d0cf..cb0e26c 100644 --- a/DrissionPage/chromium_page.py +++ b/DrissionPage/chromium_page.py @@ -1,29 +1,25 @@ # -*- coding:utf-8 -*- +from json import loads from pathlib import Path from platform import system from re import search from time import perf_counter, sleep -from typing import Union, Tuple, List, Any +from typing import Union, Tuple, List from pychrome import Tab from requests import Session -from json import loads -from requests.cookies import RequestsCookieJar - -from .session_element import SessionElement, make_session_ele -from .config import DriverOptions, _cookies_to_tuple -from .base import BasePage +from .chromium_element import Timeout, ChromiumBase from .chromium_tab import ChromiumTab -from .common import get_loc +from .config import DriverOptions from .drission import connect_chrome -from .chromium_element import ChromiumElement, ChromeScroll, _run_script, ChromeElementWaiter -class ChromiumPage(BasePage): +class ChromiumPage(ChromiumBase): """用于管理浏览器的类""" - def __init__(self, addr_tab_opts: Union[str, Tab, DriverOptions] = None, + def __init__(self, + addr_tab_opts: Union[str, Tab, DriverOptions] = None, tab_id: str = None, timeout: float = None): """初始化 \n @@ -31,12 +27,10 @@ class ChromiumPage(BasePage): :param tab_id: 要控制的标签页id,不指定默认为激活的 :param timeout: 超时时间 """ - super().__init__(timeout) - self._is_loading = None - self._root_id = None - self._connect_browser(addr_tab_opts, tab_id) + super().__init__(addr_tab_opts, tab_id, timeout) - def _connect_browser(self, addr_tab_opts: Union[str, Tab, DriverOptions] = None, + def _connect_browser(self, + addr_tab_opts: Union[str, Tab, DriverOptions] = None, tab_id: str = None) -> None: """连接浏览器,在第一次时运行 \n :param addr_tab_opts: 浏览器地址、Tab对象或DriverOptions对象 @@ -98,116 +92,17 @@ class ChromiumPage(BasePage): :param tab_id: 要跳转到的标签页id :return: None """ - self._is_loading = True - if tab_id: - self._driver = Tab(id=tab_id, type='page', - webSocketDebuggerUrl=f'ws://{self.address}/devtools/page/{tab_id}') - - self._driver.start() - self._driver.DOM.enable() - self._driver.Page.enable() + super()._init_page(tab_id) self._driver.Page.javascriptDialogOpening = self._on_alert_open self._driver.Page.javascriptDialogClosed = self._on_alert_close - self._driver.Page.frameNavigated = self._onFrameNavigated - self._driver.Page.loadEventFired = self._onLoadEventFired - self._driver.DOM.documentUpdated = self._onDocumentUpdated - - def _get_document(self) -> None: - """刷新cdp使用的document数据""" - # print('get doc') - self._wait_loading() - root_id = self._driver.DOM.getDocument()['root']['nodeId'] - self._root_id = self._driver.DOM.resolveNode(nodeId=root_id)['object']['objectId'] - self._is_loading = False - - def _wait_loading(self, timeout: float = None) -> bool: - """等待页面加载完成 - :param timeout: 超时时间 - :return: 是否成功,超时返回False - """ - timeout = timeout if timeout is not None else self.timeouts.page_load - - end_time = perf_counter() + timeout - while perf_counter() < end_time: - state = self.ready_state - if state == 'complete': - return True - elif self.page_load_strategy == 'eager' and state in ('interactive', 'complete'): - self.stop_loading() - return True - elif self.page_load_strategy == 'none': - self.stop_loading() - return True - - return False - - def _onLoadEventFired(self, **kwargs): - """在页面刷新、变化后重新读取页面内容""" - print('load complete') - if self._first_run is False and self._is_loading: - self._get_document() - - def _onFrameNavigated(self, **kwargs): - """页面跳转时触发""" - # todo: 考虑frame的情况,修改别的判断方式 - if not kwargs['frame'].get('parentId', None): - print('nav') - self._is_loading = True - - def _onDocumentUpdated(self, **kwargs): - print('doc') - pass - def _set_options(self) -> None: - print(self.options.timeouts) self.set_timeouts(page_load=self.options.timeouts['pageLoad'] / 1000, script=self.options.timeouts['script'] / 1000, implicit=self.options.timeouts['implicit'] / 1000 if self.timeout is None else self.timeout) self._page_load_strategy = self.options.page_load_strategy - def __call__(self, loc_or_str: Union[Tuple[str, str], str, 'ChromiumElement'], - timeout: float = None) -> Union['ChromiumElement', None]: - """在内部查找元素 \n - 例:ele = page('@id=ele_id') \n - :param loc_or_str: 元素的定位信息,可以是loc元组,或查询字符串 - :param timeout: 超时时间 - :return: ChromeElement对象 - """ - return self.ele(loc_or_str, timeout) - - @property - def driver(self) -> Tab: - """返回用于控制浏览器的Tab对象""" - return self._driver - - @property - def _wait_driver(self) -> Tab: - """返回用于控制浏览器的Tab对象,会先等待页面加载完毕""" - while self._is_loading: - print('loading') - sleep(.1) - return self._driver - - @property - def url(self) -> str: - """返回当前页面url""" - tab_id = self._wait_driver.id # 用于WebPage时激活浏览器 - json = loads(self._control_session.get(f'http://{self.address}/json').text) - return [i['url'] for i in json if i['id'] == tab_id][0] - - @property - def html(self) -> str: - """返回当前页面html文本""" - node_id = self._wait_driver.DOM.getDocument()['root']['nodeId'] - return self._wait_driver.DOM.getOuterHTML(nodeId=node_id)['outerHTML'] - - @property - def json(self) -> dict: - """当返回内容是json格式时,返回对应的字典""" - return loads(self('t:pre').text) - @property def tabs_count(self) -> int: """返回标签页数量""" @@ -220,33 +115,6 @@ class ChromiumPage(BasePage): json = loads(self._control_session.get(f'http://{self.address}/json').text) return [i['id'] for i in json if i['type'] == 'page'] - @property - def tab_id(self) -> str: - """返回当前标签页id""" - return self._wait_driver.id - - @property - def ready_state(self) -> str: - """返回当前页面加载状态,'loading' 'interactive' 'complete'""" - return self._driver.Runtime.evaluate(expression='document.readyState;')['result']['value'] - - @property - def size(self) -> dict: - """返回页面总长宽,{'height': int, 'width': int}""" - w = self.run_script('document.body.scrollWidth;', as_expr=True) - h = self.run_script('document.body.scrollHeight;', as_expr=True) - return {'height': h, 'width': w} - - @property - def active_ele(self) -> ChromiumElement: - """返回当前焦点所在元素""" - return self.run_script('return document.activeElement;') - - @property - def page_load_strategy(self) -> str: - """返回页面加载策略""" - return self._page_load_strategy - @property def process_id(self) -> Union[None, int]: """返回浏览器进程id""" @@ -255,13 +123,6 @@ class ChromiumPage(BasePage): except Exception: return None - @property - def scroll(self) -> ChromeScroll: - """返回用于滚动滚动条的对象""" - if not hasattr(self, '_scroll'): - self._scroll = ChromeScroll(self) - return self._scroll - @property def set_window(self) -> 'WindowSizeSetter': """返回用于设置窗口大小的对象""" @@ -269,188 +130,9 @@ class ChromiumPage(BasePage): self._window_setter = WindowSizeSetter(self) return self._window_setter - def set_page_load_strategy(self, value: str) -> None: - """设置页面加载策略 \n - :param value: 可选'normal', 'eager', 'none' - :return: None - """ - if value not in ('normal', 'eager', 'none'): - raise ValueError("只能选择'normal', 'eager', 'none'。") - self._page_load_strategy = value - - def set_timeouts(self, implicit: float = None, page_load: float = None, script: float = None) -> None: - """设置超时时间,单位为秒 \n - :param implicit: 查找元素超时时间 - :param page_load: 页面加载超时时间 - :param script: 脚本运行超时时间 - :return: None - """ - if implicit is not None: - self.timeout = implicit - - if page_load is not None: - self.timeouts.page_load = page_load - - if script is not None: - self.timeouts.script = script - - def run_script(self, script: str, as_expr: bool = False, *args: Any) -> Any: - """运行javascript代码 \n - :param script: js文本 - :param as_expr: 是否作为表达式运行,为True时args无效 - :param args: 参数,按顺序在js文本中对应argument[0]、argument[2]... - :return: 运行的结果 - """ - return _run_script(self, script, as_expr, self.timeouts.script, args) - - def run_async_script(self, script: str, as_expr: bool = False, *args: Any) -> None: - """以异步方式执行js代码 \n - :param script: js文本 - :param as_expr: 是否作为表达式运行,为True时args无效 - :param args: 参数,按顺序在js文本中对应argument[0]、argument[2]... - :return: None - """ - from threading import Thread - Thread(target=_run_script, args=(self, script, as_expr, self.timeouts.script, args)).start() - - def get(self, - url: str, - show_errmsg: bool = False, - retry: int = None, - interval: float = None, - timeout: float = None) -> Union[None, bool]: - """访问url \n - :param url: 目标url - :param show_errmsg: 是否显示和抛出异常 - :param retry: 重试次数 - :param interval: 重试间隔(秒) - :param timeout: 连接超时时间 - :return: 目标url是否可用,返回None表示不确定 - """ - retry, interval = self._before_connect(url, retry, interval) - self._url_available = self._d_connect(self._url, - times=retry, - interval=interval, - show_errmsg=show_errmsg, - timeout=timeout) - return self._url_available - - def get_cookies(self, as_dict: bool = False) -> Union[list, dict]: - """获取cookies信息 \n - :param as_dict: 为True时返回由{name: value}键值对组成的dict - :return: cookies信息 - """ - cookies = self._wait_driver.Network.getCookies()['cookies'] - if as_dict: - return {cookie['name']: cookie['value'] for cookie in cookies} - else: - return cookies - - def set_cookies(self, cookies: Union[RequestsCookieJar, list, tuple, str, dict]) -> None: - """设置cookies值 \n - :param cookies: cookies信息 - :return: None - """ - cookies = _cookies_to_tuple(cookies) - result_cookies = [] - for cookie in cookies: - if not cookie.get('domain', None): - continue - c = {'value': '' if cookie['value'] is None else cookie['value'], - 'name': cookie['name'], - 'domain': cookie['domain']} - result_cookies.append(c) - self._wait_driver.Network.setCookies(cookies=result_cookies) - def get_tab(self, tab_id: str) -> ChromiumTab: """获取一个标签页对象""" - return ChromiumTab(self.address, tab_id) - - def ele(self, - loc_or_ele: Union[Tuple[str, str], str, ChromiumElement], - timeout: float = None) -> Union[ChromiumElement, None]: - """获取第一个符合条件的元素对象 \n - :param loc_or_ele: 定位符或元素对象 - :param timeout: 查找超时时间 - :return: ChromeElement对象 - """ - return self._ele(loc_or_ele, timeout=timeout) - - def eles(self, - loc_or_ele: Union[Tuple[str, str], str, ChromiumElement], - timeout: float = None) -> List[ChromiumElement]: - """获取所有符合条件的元素对象 \n - :param loc_or_ele: 定位符或元素对象 - :param timeout: 查找超时时间 - :return: ChromeElement对象组成的列表 - """ - return self._ele(loc_or_ele, timeout=timeout, single=False) - - def s_ele(self, loc_or_ele: Union[Tuple[str, str], str, ChromiumElement] = None) \ - -> Union[SessionElement, str, None]: - """查找第一个符合条件的元素以SessionElement形式返回,处理复杂页面时效率很高 \n - :param loc_or_ele: 元素的定位信息,可以是loc元组,或查询字符串 - :return: SessionElement对象或属性、文本 - """ - if isinstance(loc_or_ele, ChromiumElement): - return make_session_ele(loc_or_ele) - else: - return make_session_ele(self, loc_or_ele) - - def s_eles(self, loc_or_str: Union[Tuple[str, str], str] = None) -> List[Union[SessionElement, str]]: - """查找所有符合条件的元素以SessionElement列表形式返回 \n - :param loc_or_str: 元素的定位信息,可以是loc元组,或查询字符串 - :return: SessionElement对象组成的列表 - """ - return make_session_ele(self, loc_or_str, single=False) - - def _ele(self, - loc_or_ele: Union[Tuple[str, str], str, ChromiumElement], - timeout: float = None, - single: bool = True) -> Union[ChromiumElement, None, List[ChromiumElement]]: - """执行元素查找 - :param loc_or_ele: 定位符或元素对象 - :param timeout: 查找超时时间 - :param single: 是否只返回第一个 - :return: ChromeElement对象或元素对象组成的列表 - """ - if isinstance(loc_or_ele, (str, tuple)): - loc = get_loc(loc_or_ele)[1] - elif isinstance(loc_or_ele, ChromiumElement): - return loc_or_ele - else: - raise ValueError('loc_or_str参数只能是tuple、str、ChromeElement类型。') - - timeout = timeout if timeout is not None else self.timeout - search_result = self._wait_driver.DOM.performSearch(query=loc, includeUserAgentShadowDOM=True) - count = search_result['resultCount'] - - end_time = perf_counter() + timeout - while count == 0 and perf_counter() < end_time: - search_result = self._wait_driver.DOM.performSearch(query=loc, includeUserAgentShadowDOM=True) - count = search_result['resultCount'] - - if count == 0: - return None - - else: - count = 1 if single else count - nodeIds = self._wait_driver.DOM.getSearchResults(searchId=search_result['searchId'], fromIndex=0, - toIndex=count) - if count == 1: - return ChromiumElement(self, node_id=nodeIds['nodeIds'][0]) - else: - return [ChromiumElement(self, node_id=i) for i in nodeIds['nodeIds']] - - def wait_ele(self, - loc_or_ele: Union[str, tuple, ChromiumElement], - timeout: float = None) -> ChromeElementWaiter: - """返回用于等待元素到达某个状态的等待器对象 \n - :param loc_or_ele: 可以是元素、查询字符串、loc元组 - :param timeout: 等待超时时间 - :return: 用于等待的ElementWaiter对象 - """ - return ChromeElementWaiter(self, loc_or_ele, timeout) + return ChromiumTab(self, tab_id) def get_screenshot(self, path: [str, Path] = None, as_bytes: [bool, str] = None, @@ -508,92 +190,6 @@ class ChromiumPage(BasePage): f.write(png) return str(path.absolute()) - def scroll_to_see(self, loc_or_ele: Union[str, tuple, ChromiumElement]) -> None: - """滚动页面直到元素可见 \n - :param loc_or_ele: 元素的定位信息,可以是loc元组,或查询字符串(详见ele函数注释) - :return: None - """ - node_id = self.ele(loc_or_ele).node_id - try: - self._wait_driver.DOM.scrollIntoViewIfNeeded(nodeId=node_id) - except Exception: - self.ele(loc_or_ele).run_script("this.scrollIntoView();") - - def refresh(self, ignore_cache: bool = False) -> None: - """刷新当前页面 \n - :param ignore_cache: 是否忽略缓存 - :return: None - """ - self._driver.Page.reload(ignoreCache=ignore_cache) - - def forward(self, steps: int = 1) -> None: - """在浏览历史中前进若干步 \n - :param steps: 前进步数 - :return: None - """ - self.run_script(f'window.history.go({steps});', as_expr=True) - - def back(self, steps: int = 1) -> None: - """在浏览历史中后退若干步 \n - :param steps: 后退步数 - :return: None - """ - self.run_script(f'window.history.go({-steps});', as_expr=True) - - def stop_loading(self) -> None: - """页面停止加载""" - self._driver.Page.stopLoading() - self._get_document() - - def run_cdp(self, cmd: str, **cmd_args) -> dict: - """执行Chrome DevTools Protocol语句 \n - :param cmd: 协议项目 - :param cmd_args: 参数 - :return: 执行的结果 - """ - return self._driver.call_method(cmd, **cmd_args) - - def set_user_agent(self, ua: str) -> None: - """为当前tab设置user agent,只在当前tab有效 \n - :param ua: user agent字符串 - :return: None - """ - self._wait_driver.Network.setUserAgentOverride(userAgent=ua) - - def get_session_storage(self, item: str = None) -> Union[str, dict, None]: - """获取sessionStorage信息,不设置item则获取全部 \n - :param item: 要获取的项,不设置则返回全部 - :return: sessionStorage一个或所有项内容 - """ - js = f'sessionStorage.getItem("{item}");' if item else 'sessionStorage;' - return self.run_script(js, as_expr=True) - - def get_local_storage(self, item: str = None) -> Union[str, dict, None]: - """获取localStorage信息,不设置item则获取全部 \n - :param item: 要获取的项目,不设置则返回全部 - :return: localStorage一个或所有项内容 - """ - js = f'localStorage.getItem("{item}");' if item else 'localStorage;' - return self.run_script(js, as_expr=True) - - def set_session_storage(self, item: str, value: Union[str, bool]) -> None: - """设置或删除某项sessionStorage信息 \n - :param item: 要设置的项 - :param value: 项的值,设置为False时,删除该项 - :return: None - """ - js = f'sessionStorage.removeItem("{item}");' if item is False else f'sessionStorage.setItem("{item}","{value}");' - return self.run_script(js, as_expr=True) - - def set_local_storage(self, item: str, value: Union[str, bool]) -> None: - """设置或删除某项localStorage信息 \n - :param item: 要设置的项 - :param value: 项的值,设置为False时,删除该项 - :return: None - """ - js = f'localStorage.removeItem("{item}");' if item is False else f'localStorage.setItem("{item}","{value}");' - return self.run_script(js, as_expr=True) - def to_front(self) -> None: """激活当前标签页使其处于最前面""" self._control_session.get(f'http://{self.address}/json/activate/{self.tab_id}') @@ -673,27 +269,6 @@ class ChromiumPage(BasePage): """ self.close_tabs(tab_ids, True) - def clear_cache(self, - session_storage: bool = True, - local_storage: bool = True, - cache: bool = True, - cookies: bool = True) -> None: - """清除缓存,可选要清除的项 \n - :param session_storage: 是否清除sessionStorage - :param local_storage: 是否清除localStorage - :param cache: 是否清除cache - :param cookies: 是否清除cookies - :return: None - """ - if session_storage: - self.run_script('sessionStorage.clear();', as_expr=True) - if local_storage: - self.run_script('localStorage.clear();', as_expr=True) - if cache: - self._wait_driver.Network.clearBrowserCache() - if cookies: - self._wait_driver.Network.clearBrowserCookies() - def handle_alert(self, accept: bool = True, send: str = None, timeout: float = None) -> Union[str, None]: """处理提示框,可以自动等待提示框出现 \n :param accept: True表示确认,False表示取消,其它值不会按按钮但依然返回文本值 @@ -728,45 +303,6 @@ class ChromiumPage(BasePage): self._driver.Browser.close() self._driver.stop() - def _d_connect(self, - to_url: str, - times: int = 0, - interval: float = 1, - show_errmsg: bool = False, - timeout: float = None) -> Union[bool, None]: - """尝试连接,重试若干次 \n - :param to_url: 要访问的url - :param times: 重试次数 - :param interval: 重试间隔(秒) - :param show_errmsg: 是否抛出异常 - :param timeout: 连接超时时间 - :return: 是否成功,返回None表示不确定 - """ - err = None - timeout = timeout if timeout is not None else self.timeouts.page_load - - for _ in range(times + 1): - result = self._driver.Page.navigate(url=to_url) - is_timeout = not self._wait_loading(timeout) - - if is_timeout: - err = TimeoutError('页面连接超时。') - if 'errorText' in result: - err = ConnectionError(result['errorText']) - - if not err: - break - - if _ < times: - sleep(interval) - if show_errmsg: - print(f'重试 {to_url}') - - if err and show_errmsg: - raise err if err is not None else ConnectionError('连接异常。') - - return False if err else True - def _on_alert_close(self, **kwargs): """alert关闭时触发的方法""" self._alert.activated = False @@ -798,19 +334,6 @@ class Alert(object): self.response_text = None -class Timeout(object): - """用于保存d模式timeout信息的类""" - - def __init__(self, page: ChromiumPage): - self.page = page - self.page_load = 30 - self.script = 30 - - @property - def implicit(self): - return self.page.timeout - - class WindowSizeSetter(object): """用于设置窗口大小的类""" diff --git a/DrissionPage/chromium_tab.py b/DrissionPage/chromium_tab.py index 2899bd9..5f03dd0 100644 --- a/DrissionPage/chromium_tab.py +++ b/DrissionPage/chromium_tab.py @@ -1,586 +1,9 @@ # -*- coding:utf-8 -*- -from pathlib import Path -from time import perf_counter, sleep -from typing import Union, Tuple, List, Any - -from pychrome import Tab -from requests import Session -from json import loads - -from requests.cookies import RequestsCookieJar - -from .session_element import SessionElement, make_session_ele -from .config import DriverOptions, _cookies_to_tuple -from .base import BasePage -from .common import get_loc -from .chromium_element import ChromiumElement, ChromeScroll, _run_script, ChromeElementWaiter - - -class ChromiumBase(BasePage): - """用于管理浏览器的类""" - - def __init__(self, address: str, - tab_id: str = None, - timeout: float = None): - """初始化 \n - :param address: 浏览器地址:端口、Tab对象或DriverOptions对象 - :param tab_id: 要控制的标签页id,不指定默认为激活的 - :param timeout: 超时时间 - """ - super().__init__(timeout) - self._is_loading = None - self._root_id = None - self._connect_browser(address, tab_id) - - def _connect_browser(self, addr_tab_opts: Union[str, Tab, DriverOptions] = None, - tab_id: str = None) -> None: - """连接浏览器,在第一次时运行 \n - :param addr_tab_opts: 浏览器地址、Tab对象或DriverOptions对象 - :param tab_id: 要控制的标签页id,不指定默认为激活的 - :return: None - """ - self._is_loading = False - self._root_id = None - self.timeouts = Timeout(self) - self._control_session = Session() - self._control_session.keep_alive = False - self._first_run = True - - self.address = addr_tab_opts - if not tab_id: - json = loads(self._control_session.get(f'http://{self.address}/json').text) - tab_id = [i['id'] for i in json if i['type'] == 'page'][0] - self._set_options() - self._init_page(tab_id) - self._get_document() - self._first_run = False - - def _init_page(self, tab_id: str = None) -> None: - """新建页面、页面刷新、切换标签页后要进行的cdp参数初始化 - :param tab_id: 要跳转到的标签页id - :return: None - """ - self._is_loading = True - if tab_id: - self._driver = Tab(id=tab_id, type='page', - webSocketDebuggerUrl=f'ws://{self.address}/devtools/page/{tab_id}') - - self._driver.start() - self._driver.DOM.enable() - self._driver.Page.enable() - - self._driver.Page.frameNavigated = self._onFrameNavigated - self._driver.Page.loadEventFired = self._onLoadEventFired - self._driver.DOM.documentUpdated = self._onDocumentUpdated - - def _get_document(self) -> None: - """刷新cdp使用的document数据""" - # print('get doc') - self._wait_loading() - root_id = self._driver.DOM.getDocument()['root']['nodeId'] - self._root_id = self._driver.DOM.resolveNode(nodeId=root_id)['object']['objectId'] - self._is_loading = False - - def _wait_loading(self, timeout: float = None) -> bool: - """等待页面加载完成 - :param timeout: 超时时间 - :return: 是否成功,超时返回False - """ - timeout = timeout if timeout is not None else self.timeouts.page_load - - end_time = perf_counter() + timeout - while perf_counter() < end_time: - state = self.ready_state - if state == 'complete': - return True - elif self.page_load_strategy == 'eager' and state in ('interactive', 'complete'): - self.stop_loading() - return True - elif self.page_load_strategy == 'none': - self.stop_loading() - return True - - return False - - def _onLoadEventFired(self, **kwargs): - """在页面刷新、变化后重新读取页面内容""" - print('load complete') - if self._first_run is False and self._is_loading: - self._get_document() - - def _onFrameNavigated(self, **kwargs): - """页面跳转时触发""" - # todo: 考虑frame的情况,修改别的判断方式 - if not kwargs['frame'].get('parentId', None): - print('nav') - self._is_loading = True - - def _onDocumentUpdated(self, **kwargs): - print('doc') - pass - - def _set_options(self) -> None: - pass - - def __call__(self, loc_or_str: Union[Tuple[str, str], str, 'ChromiumElement'], - timeout: float = None) -> Union['ChromiumElement', None]: - """在内部查找元素 \n - 例:ele = page('@id=ele_id') \n - :param loc_or_str: 元素的定位信息,可以是loc元组,或查询字符串 - :param timeout: 超时时间 - :return: ChromeElement对象 - """ - return self.ele(loc_or_str, timeout) - - @property - def driver(self) -> Tab: - """返回用于控制浏览器的Tab对象""" - return self._driver - - @property - def _wait_driver(self) -> Tab: - """返回用于控制浏览器的Tab对象,会先等待页面加载完毕""" - while self._is_loading: - print('loading') - sleep(.1) - return self._driver - - @property - def url(self) -> str: - """返回当前页面url""" - tab_id = self._wait_driver.id # 用于WebPage时激活浏览器 - json = loads(self._control_session.get(f'http://{self.address}/json').text) - return [i['url'] for i in json if i['id'] == tab_id][0] - - @property - def html(self) -> str: - """返回当前页面html文本""" - node_id = self._wait_driver.DOM.getDocument()['root']['nodeId'] - return self._wait_driver.DOM.getOuterHTML(nodeId=node_id)['outerHTML'] - - @property - def json(self) -> dict: - """当返回内容是json格式时,返回对应的字典""" - return loads(self('t:pre').text) - - @property - def tab_id(self) -> str: - """返回当前标签页id""" - return self._wait_driver.id - - @property - def ready_state(self) -> str: - """返回当前页面加载状态,'loading' 'interactive' 'complete'""" - return self._driver.Runtime.evaluate(expression='document.readyState;')['result']['value'] - - @property - def size(self) -> dict: - """返回页面总长宽,{'height': int, 'width': int}""" - w = self.run_script('document.body.scrollWidth;', as_expr=True) - h = self.run_script('document.body.scrollHeight;', as_expr=True) - return {'height': h, 'width': w} - - @property - def active_ele(self) -> ChromiumElement: - """返回当前焦点所在元素""" - return self.run_script('return document.activeElement;') - - @property - def page_load_strategy(self) -> str: - """返回页面加载策略""" - return self._page_load_strategy - - @property - def scroll(self) -> ChromeScroll: - """返回用于滚动滚动条的对象""" - if not hasattr(self, '_scroll'): - self._scroll = ChromeScroll(self) - return self._scroll - - def set_page_load_strategy(self, value: str) -> None: - """设置页面加载策略 \n - :param value: 可选'normal', 'eager', 'none' - :return: None - """ - if value not in ('normal', 'eager', 'none'): - raise ValueError("只能选择'normal', 'eager', 'none'。") - self._page_load_strategy = value - - def set_timeouts(self, implicit: float = None, page_load: float = None, script: float = None) -> None: - """设置超时时间,单位为秒 \n - :param implicit: 查找元素超时时间 - :param page_load: 页面加载超时时间 - :param script: 脚本运行超时时间 - :return: None - """ - if implicit is not None: - self.timeout = implicit - - if page_load is not None: - self.timeouts.page_load = page_load - - if script is not None: - self.timeouts.script = script - - def run_script(self, script: str, as_expr: bool = False, *args: Any) -> Any: - """运行javascript代码 \n - :param script: js文本 - :param as_expr: 是否作为表达式运行,为True时args无效 - :param args: 参数,按顺序在js文本中对应argument[0]、argument[2]... - :return: 运行的结果 - """ - return _run_script(self, script, as_expr, self.timeouts.script, args) - - def run_async_script(self, script: str, as_expr: bool = False, *args: Any) -> None: - """以异步方式执行js代码 \n - :param script: js文本 - :param as_expr: 是否作为表达式运行,为True时args无效 - :param args: 参数,按顺序在js文本中对应argument[0]、argument[2]... - :return: None - """ - from threading import Thread - Thread(target=_run_script, args=(self, script, as_expr, self.timeouts.script, args)).start() - - def get(self, - url: str, - show_errmsg: bool = False, - retry: int = None, - interval: float = None, - timeout: float = None) -> Union[None, bool]: - """访问url \n - :param url: 目标url - :param show_errmsg: 是否显示和抛出异常 - :param retry: 重试次数 - :param interval: 重试间隔(秒) - :param timeout: 连接超时时间 - :return: 目标url是否可用,返回None表示不确定 - """ - retry, interval = self._before_connect(url, retry, interval) - self._url_available = self._d_connect(self._url, - times=retry, - interval=interval, - show_errmsg=show_errmsg, - timeout=timeout) - return self._url_available - - def get_cookies(self, as_dict: bool = False) -> Union[list, dict]: - """获取cookies信息 \n - :param as_dict: 为True时返回由{name: value}键值对组成的dict - :return: cookies信息 - """ - cookies = self._wait_driver.Network.getCookies()['cookies'] - if as_dict: - return {cookie['name']: cookie['value'] for cookie in cookies} - else: - return cookies - - def set_cookies(self, cookies: Union[RequestsCookieJar, list, tuple, str, dict]) -> None: - """设置cookies值 \n - :param cookies: cookies信息 - :return: None - """ - cookies = _cookies_to_tuple(cookies) - result_cookies = [] - for cookie in cookies: - if not cookie.get('domain', None): - continue - c = {'value': '' if cookie['value'] is None else cookie['value'], - 'name': cookie['name'], - 'domain': cookie['domain']} - result_cookies.append(c) - self._wait_driver.Network.setCookies(cookies=result_cookies) - - def ele(self, - loc_or_ele: Union[Tuple[str, str], str, ChromiumElement], - timeout: float = None) -> Union[ChromiumElement, None]: - """获取第一个符合条件的元素对象 \n - :param loc_or_ele: 定位符或元素对象 - :param timeout: 查找超时时间 - :return: ChromeElement对象 - """ - return self._ele(loc_or_ele, timeout=timeout) - - def eles(self, - loc_or_ele: Union[Tuple[str, str], str, ChromiumElement], - timeout: float = None) -> List[ChromiumElement]: - """获取所有符合条件的元素对象 \n - :param loc_or_ele: 定位符或元素对象 - :param timeout: 查找超时时间 - :return: ChromeElement对象组成的列表 - """ - return self._ele(loc_or_ele, timeout=timeout, single=False) - - def s_ele(self, loc_or_ele: Union[Tuple[str, str], str, ChromiumElement] = None) \ - -> Union[SessionElement, str, None]: - """查找第一个符合条件的元素以SessionElement形式返回,处理复杂页面时效率很高 \n - :param loc_or_ele: 元素的定位信息,可以是loc元组,或查询字符串 - :return: SessionElement对象或属性、文本 - """ - if isinstance(loc_or_ele, ChromiumElement): - return make_session_ele(loc_or_ele) - else: - return make_session_ele(self, loc_or_ele) - - def s_eles(self, loc_or_str: Union[Tuple[str, str], str] = None) -> List[Union[SessionElement, str]]: - """查找所有符合条件的元素以SessionElement列表形式返回 \n - :param loc_or_str: 元素的定位信息,可以是loc元组,或查询字符串 - :return: SessionElement对象组成的列表 - """ - return make_session_ele(self, loc_or_str, single=False) - - def _ele(self, - loc_or_ele: Union[Tuple[str, str], str, ChromiumElement], - timeout: float = None, - single: bool = True) -> Union[ChromiumElement, None, List[ChromiumElement]]: - """执行元素查找 - :param loc_or_ele: 定位符或元素对象 - :param timeout: 查找超时时间 - :param single: 是否只返回第一个 - :return: ChromeElement对象或元素对象组成的列表 - """ - if isinstance(loc_or_ele, (str, tuple)): - loc = get_loc(loc_or_ele)[1] - elif isinstance(loc_or_ele, ChromiumElement): - return loc_or_ele - else: - raise ValueError('loc_or_str参数只能是tuple、str、ChromeElement类型。') - - timeout = timeout if timeout is not None else self.timeout - search_result = self._wait_driver.DOM.performSearch(query=loc, includeUserAgentShadowDOM=True) - count = search_result['resultCount'] - - end_time = perf_counter() + timeout - while count == 0 and perf_counter() < end_time: - search_result = self._wait_driver.DOM.performSearch(query=loc, includeUserAgentShadowDOM=True) - count = search_result['resultCount'] - - if count == 0: - return None - - else: - count = 1 if single else count - nodeIds = self._wait_driver.DOM.getSearchResults(searchId=search_result['searchId'], fromIndex=0, - toIndex=count) - if count == 1: - return ChromiumElement(self, node_id=nodeIds['nodeIds'][0]) - else: - return [ChromiumElement(self, node_id=i) for i in nodeIds['nodeIds']] - - def wait_ele(self, - loc_or_ele: Union[str, tuple, ChromiumElement], - timeout: float = None) -> ChromeElementWaiter: - """返回用于等待元素到达某个状态的等待器对象 \n - :param loc_or_ele: 可以是元素、查询字符串、loc元组 - :param timeout: 等待超时时间 - :return: 用于等待的ElementWaiter对象 - """ - return ChromeElementWaiter(self, loc_or_ele, timeout) - - def get_screenshot(self, path: [str, Path] = None, - as_bytes: [bool, str] = None, - full_page: bool = False, - left_top: Tuple[int, int] = None, - right_bottom: Tuple[int, int] = None) -> Union[str, bytes]: - """对页面进行截图,可对整个网页、可见网页、指定范围截图。对可视范围外截图需要90以上版本浏览器支持 \n - :param path: 完整路径,后缀可选'jpg','jpeg','png','webp' - :param as_bytes: 是否已字节形式返回图片,可选'jpg','jpeg','png','webp',生效时path参数无效 - :param full_page: 是否整页截图,为True截取整个网页,为False截取可视窗口 - :param left_top: 截取范围左上角坐标 - :param right_bottom: 截取范围右下角角坐标 - :return: 图片完整路径或字节文本 - """ - if as_bytes: - if as_bytes is True: - pic_type = 'png' - else: - if as_bytes not in ('jpg', 'jpeg', 'png', 'webp'): - raise ValueError("只能接收'jpg', 'jpeg', 'png', 'webp'四种格式。") - pic_type = 'jpeg' if as_bytes == 'jpg' else as_bytes - - else: - if not path: - raise ValueError('保存为文件时必须传入路径。') - path = Path(path) - pic_type = path.suffix.lower() - if pic_type not in ('.jpg', '.jpeg', '.png', '.webp'): - raise TypeError(f'不支持的文件格式:{pic_type}。') - pic_type = 'jpeg' if pic_type == '.jpg' else pic_type[1:] - - hw = self.size - if full_page: - vp = {'x': 0, 'y': 0, 'width': hw['width'], 'height': hw['height'], 'scale': 1} - png = self._wait_driver.Page.captureScreenshot(format=pic_type, captureBeyondViewport=True, clip=vp)['data'] - else: - if left_top and right_bottom: - x, y = left_top - w = right_bottom[0] - x - h = right_bottom[1] - y - vp = {'x': x, 'y': y, 'width': w, 'height': h, 'scale': 1} - png = self._wait_driver.Page.captureScreenshot(format=pic_type, captureBeyondViewport=True, clip=vp)[ - 'data'] - else: - png = self._wait_driver.Page.captureScreenshot(format=pic_type)['data'] - - from base64 import b64decode - png = b64decode(png) - - if as_bytes: - return png - - path.parent.mkdir(parents=True, exist_ok=True) - with open(path, 'wb') as f: - f.write(png) - return str(path.absolute()) - - def scroll_to_see(self, loc_or_ele: Union[str, tuple, ChromiumElement]) -> None: - """滚动页面直到元素可见 \n - :param loc_or_ele: 元素的定位信息,可以是loc元组,或查询字符串(详见ele函数注释) - :return: None - """ - node_id = self.ele(loc_or_ele).node_id - try: - self._wait_driver.DOM.scrollIntoViewIfNeeded(nodeId=node_id) - except Exception: - self.ele(loc_or_ele).run_script("this.scrollIntoView();") - - def refresh(self, ignore_cache: bool = False) -> None: - """刷新当前页面 \n - :param ignore_cache: 是否忽略缓存 - :return: None - """ - self._driver.Page.reload(ignoreCache=ignore_cache) - - def forward(self, steps: int = 1) -> None: - """在浏览历史中前进若干步 \n - :param steps: 前进步数 - :return: None - """ - self.run_script(f'window.history.go({steps});', as_expr=True) - - def back(self, steps: int = 1) -> None: - """在浏览历史中后退若干步 \n - :param steps: 后退步数 - :return: None - """ - self.run_script(f'window.history.go({-steps});', as_expr=True) - - def stop_loading(self) -> None: - """页面停止加载""" - self._driver.Page.stopLoading() - self._get_document() - - def run_cdp(self, cmd: str, **cmd_args) -> dict: - """执行Chrome DevTools Protocol语句 \n - :param cmd: 协议项目 - :param cmd_args: 参数 - :return: 执行的结果 - """ - return self._driver.call_method(cmd, **cmd_args) - - def set_user_agent(self, ua: str) -> None: - """为当前tab设置user agent,只在当前tab有效 \n - :param ua: user agent字符串 - :return: None - """ - self._wait_driver.Network.setUserAgentOverride(userAgent=ua) - - def get_session_storage(self, item: str = None) -> Union[str, dict, None]: - """获取sessionStorage信息,不设置item则获取全部 \n - :param item: 要获取的项,不设置则返回全部 - :return: sessionStorage一个或所有项内容 - """ - js = f'sessionStorage.getItem("{item}");' if item else 'sessionStorage;' - return self.run_script(js, as_expr=True) - - def get_local_storage(self, item: str = None) -> Union[str, dict, None]: - """获取localStorage信息,不设置item则获取全部 \n - :param item: 要获取的项目,不设置则返回全部 - :return: localStorage一个或所有项内容 - """ - js = f'localStorage.getItem("{item}");' if item else 'localStorage;' - return self.run_script(js, as_expr=True) - - def set_session_storage(self, item: str, value: Union[str, bool]) -> None: - """设置或删除某项sessionStorage信息 \n - :param item: 要设置的项 - :param value: 项的值,设置为False时,删除该项 - :return: None - """ - js = f'sessionStorage.removeItem("{item}");' if item is False else f'sessionStorage.setItem("{item}","{value}");' - return self.run_script(js, as_expr=True) - - def set_local_storage(self, item: str, value: Union[str, bool]) -> None: - """设置或删除某项localStorage信息 \n - :param item: 要设置的项 - :param value: 项的值,设置为False时,删除该项 - :return: None - """ - js = f'localStorage.removeItem("{item}");' if item is False else f'localStorage.setItem("{item}","{value}");' - return self.run_script(js, as_expr=True) - - def clear_cache(self, - session_storage: bool = True, - local_storage: bool = True, - cache: bool = True, - cookies: bool = True) -> None: - """清除缓存,可选要清除的项 \n - :param session_storage: 是否清除sessionStorage - :param local_storage: 是否清除localStorage - :param cache: 是否清除cache - :param cookies: 是否清除cookies - :return: None - """ - if session_storage: - self.run_script('sessionStorage.clear();', as_expr=True) - if local_storage: - self.run_script('localStorage.clear();', as_expr=True) - if cache: - self._wait_driver.Network.clearBrowserCache() - if cookies: - self._wait_driver.Network.clearBrowserCookies() - - def _d_connect(self, - to_url: str, - times: int = 0, - interval: float = 1, - show_errmsg: bool = False, - timeout: float = None) -> Union[bool, None]: - """尝试连接,重试若干次 \n - :param to_url: 要访问的url - :param times: 重试次数 - :param interval: 重试间隔(秒) - :param show_errmsg: 是否抛出异常 - :param timeout: 连接超时时间 - :return: 是否成功,返回None表示不确定 - """ - err = None - timeout = timeout if timeout is not None else self.timeouts.page_load - - for _ in range(times + 1): - result = self._driver.Page.navigate(url=to_url) - is_timeout = not self._wait_loading(timeout) - - if is_timeout: - err = TimeoutError('页面连接超时。') - if 'errorText' in result: - err = ConnectionError(result['errorText']) - - if not err: - break - - if _ < times: - sleep(interval) - if show_errmsg: - print(f'重试 {to_url}') - - if err and show_errmsg: - raise err if err is not None else ConnectionError('连接异常。') - - return False if err else True +from .chromium_element import ChromiumBase class ChromiumTab(ChromiumBase): - """用于管理浏览器的类""" + """实现浏览器标签页的类""" def __init__(self, page, tab_id: str = None): @@ -592,9 +15,9 @@ class ChromiumTab(ChromiumBase): super().__init__(page.address, tab_id, page.timeout) def _set_options(self) -> None: - self.set_timeouts(page_load=self.page.timeouts['pageLoad'], - script=self.page.timeouts['script'] / 1000, - implicit=self.page.timeouts['implicit'] / 1000 if self.timeout is None else self.timeout) + self.set_timeouts(page_load=self.page.timeouts.page_load, + script=self.page.timeouts.script, + implicit=self.page.timeouts.implicit if self.timeout is None else self.timeout) self._page_load_strategy = self.page.page_load_strategy diff --git a/DrissionPage/web_page.py b/DrissionPage/web_page.py index 3ce5585..03db233 100644 --- a/DrissionPage/web_page.py +++ b/DrissionPage/web_page.py @@ -6,7 +6,7 @@ from pychrome import Tab from requests import Session, Response from tldextract import extract -from .chromium_element import ChromiumElement +from .chromium_element import ChromiumElement, ChromiumFrame, ChromiumBase from .session_element import SessionElement from .base import BasePage from .config import DriverOptions, SessionOptions, _cookies_to_tuple @@ -33,7 +33,7 @@ class WebPage(SessionPage, ChromiumPage, BasePage): if self._mode not in ('s', 'd'): raise ValueError('mode参数只能是s或d。') - super(ChromiumPage, self).__init__(timeout) # 调用Base的__init__() + super(ChromiumBase, self).__init__(timeout) # 调用Base的__init__() self._session = None self._tab_obj = None self._is_loading = False @@ -48,7 +48,7 @@ class WebPage(SessionPage, ChromiumPage, BasePage): def __call__(self, loc_or_str: Union[Tuple[str, str], str, ChromiumElement, SessionElement], - timeout: float = None) -> Union[ChromiumElement, SessionElement, None]: + timeout: float = None) -> Union[ChromiumElement, SessionElement, ChromiumFrame, None]: """在内部查找元素 \n 例:ele = page('@id=ele_id') \n :param loc_or_str: 元素的定位信息,可以是loc元组,或查询字符串 @@ -154,7 +154,7 @@ class WebPage(SessionPage, ChromiumPage, BasePage): def ele(self, loc_or_ele: Union[Tuple[str, str], str, ChromiumElement, SessionElement], - timeout: float = None) -> Union[ChromiumElement, SessionElement, str, None]: + timeout: float = None) -> Union[ChromiumElement, SessionElement, ChromiumFrame, str, None]: """返回第一个符合条件的元素、属性或节点文本 \n :param loc_or_ele: 元素的定位信息,可以是元素对象,loc元组,或查询字符串 :param timeout: 查找元素超时时间,默认与页面等待时间一致 @@ -167,7 +167,7 @@ class WebPage(SessionPage, ChromiumPage, BasePage): def eles(self, loc_or_str: Union[Tuple[str, str], str], - timeout: float = None) -> List[Union[ChromiumElement, SessionElement, str]]: + timeout: float = None) -> List[Union[ChromiumElement, SessionElement, ChromiumFrame, str]]: """返回页面中所有符合条件的元素、属性或节点文本 \n :param loc_or_str: 元素的定位信息,可以是loc元组,或查询字符串 :param timeout: 查找元素超时时间,默认与页面等待时间一致 @@ -359,9 +359,9 @@ class WebPage(SessionPage, ChromiumPage, BasePage): def _ele(self, loc_or_ele: Union[Tuple[str, str], str, ChromiumElement, SessionElement], - timeout: float = None, single: bool = True) \ - -> Union[ChromiumElement, SessionElement, str, None, List[Union[SessionElement, str]], List[ - Union[ChromiumElement, str]]]: + timeout: float = None, single: bool = True, relative:bool=False) \ + -> Union[ChromiumElement, SessionElement, ChromiumFrame, str, None, List[Union[SessionElement, str]], List[ + Union[ChromiumElement, str, ChromiumFrame]]]: """返回页面中符合条件的元素、属性或节点文本,默认返回第一个 \n :param loc_or_ele: 元素的定位信息,可以是元素对象,loc元组,或查询字符串 :param timeout: 查找元素超时时间,d模式专用 @@ -371,7 +371,7 @@ class WebPage(SessionPage, ChromiumPage, BasePage): if self._mode == 's': return super()._ele(loc_or_ele, single=single) elif self._mode == 'd': - return super(SessionPage, self)._ele(loc_or_ele, timeout=timeout, single=single) + return super(SessionPage, self)._ele(loc_or_ele, timeout=timeout, single=single, relative=relative) def _set_driver_options(self, Tab_or_Options): """处理driver设置""" diff --git a/README.md b/README.md index c300bb0..3d37b76 100644 --- a/README.md +++ b/README.md @@ -21,7 +21,7 @@ DrissionPage,即 driver 和 session 组合而成的 page。 除了合并两者,本库还以网页为单位封装了常用功能,提供非常简便的操作和语句,在用于网页自动化操作时,减少考虑细节,专注功能实现,使用更方便。 一切从简,尽量提供简单直接的使用方法,对新手更友好。 -# 🔆 3.0 版即将推出 +# 🔆 3.0 版隆重推出(测试版) 以前的版本是对 selenium 进行重新封装实现的。从 3.0 开始,作者另起炉灶,对底层进行了重新开发,摆脱对 selenium 的依赖,增强了功能,提升了运行效率。 3.0 全新开发的页面对象是`WebPage`,支持 chromium 内核的浏览器(如 chrome 和 edge)。除了保持之前的功能,比依赖 selenium 的`MixPage`有以下优点: diff --git a/docs/版本历史.md b/docs/版本历史.md index 25dc62e..1261106 100644 --- a/docs/版本历史.md +++ b/docs/版本历史.md @@ -1,4 +1,4 @@ -# v3.0.0 +# v3.0.2 重大更新。推出`WebPage`,重新开发底层逻辑,摆脱对 selenium 的依赖,增强了功能,提升了运行效率。支持 chromium 内核的浏览器(如 chrome 和 edge)。比`MixPage`有以下优点: @@ -20,6 +20,8 @@ 其它更新: +- 增加`ChromiumTab`和`ChromiumFrame`类用于处理 tab 和 frame 元素 + - 新增与`WebPage`配合的动作链接`ActionChains` - ini 文件和`DriverOption`删除`set_window_rect`属性