From 569ab2e542045afcc43356874aa10fe6546ea5e6 Mon Sep 17 00:00:00 2001 From: g1879 Date: Wed, 9 Nov 2022 17:36:37 +0800 Subject: [PATCH] =?UTF-8?q?=E5=9F=BA=E6=9C=AC=E5=AE=8C=E6=88=90?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- DrissionPage/chrome_ShadowRoot_element.py | 228 ++++++++++++++++++ DrissionPage/chrome_element.py | 174 ++++++++------ DrissionPage/chrome_page.py | 274 ++++++++++++++-------- DrissionPage/config.py | 2 + DrissionPage/driver_page.py | 8 +- DrissionPage/keys.py | 39 ++- DrissionPage/web_page.py | 23 +- 7 files changed, 541 insertions(+), 207 deletions(-) create mode 100644 DrissionPage/chrome_ShadowRoot_element.py diff --git a/DrissionPage/chrome_ShadowRoot_element.py b/DrissionPage/chrome_ShadowRoot_element.py new file mode 100644 index 0000000..97bf50d --- /dev/null +++ b/DrissionPage/chrome_ShadowRoot_element.py @@ -0,0 +1,228 @@ +# -*- coding:utf-8 -*- +""" +@Author : g1879 +@Contact : g1879@qq.com +@File : shadow_root_element.py +""" +from time import perf_counter +from typing import Union, Any, Tuple, List + +from selenium.webdriver.remote.webelement import WebElement + +from .base import BaseElement +from .common import get_loc +from .driver_element import make_driver_ele, DriverElement +from .session_element import make_session_ele, SessionElement + + +class ChromeShadowRootElement(BaseElement): + """ChromeShadowRootElement是用于处理ShadowRoot的类,使用方法和ChromeElement基本一致""" + + def __init__(self, parent_ele: DriverElement): + super().__init__(parent_ele.page) + self.parent_ele = parent_ele + + def __repr__(self) -> str: + return f'' + + def __call__(self, + loc_or_str: Union[Tuple[str, str], str], + timeout: float = None) -> Union[DriverElement, str, None]: + """在内部查找元素 \n + 例:ele2 = ele1('@id=ele_id') \n + :param loc_or_str: 元素的定位信息,可以是loc元组,或查询字符串 + :param timeout: 超时时间 + :return: DriverElement对象或属性、文本 + """ + return self.ele(loc_or_str, timeout) + + @property + def tag(self) -> str: + """元素标签名""" + return 'shadow-root' + + # @property + # def html(self) -> str: + # return f'{self.inner_html}' + # + # @property + # def inner_html(self) -> str: + # """返回内部的html文本""" + # shadow_root = WebElement(self.page.driver, self.inner_ele._id) + # return shadow_root.get_attribute('innerHTML') + # + # def parent(self, level_or_loc: Union[str, int] = 1) -> DriverElement: + # """返回上面某一级父元素,可指定层数或用查询语法定位 \n + # :param level_or_loc: 第几级父元素,或定位符 + # :return: DriverElement对象 + # """ + # if isinstance(level_or_loc, int): + # loc = f'xpath:./ancestor-or-self::*[{level_or_loc}]' + # + # elif isinstance(level_or_loc, (tuple, str)): + # loc = get_loc(level_or_loc, True) + # + # if loc[0] == 'css selector': + # raise ValueError('此css selector语法不受支持,请换成xpath。') + # + # loc = f'xpath:./ancestor-or-self::{loc[1].lstrip(". / ")}' + # + # else: + # raise TypeError('level_or_loc参数只能是tuple、int或str。') + # + # return self.parent_ele.ele(loc, timeout=0) + # + # def next(self, + # index: int = 1, + # filter_loc: Union[tuple, str] = '') -> Union[DriverElement, str, None]: + # """返回后面的一个兄弟元素,可用查询语法筛选,可指定返回筛选结果的第几个 \n + # :param index: 第几个查询结果元素 + # :param filter_loc: 用于筛选元素的查询语法 + # :return: DriverElement对象 + # """ + # nodes = self.nexts(filter_loc=filter_loc) + # return nodes[index - 1] if nodes else None + # + # def before(self, + # index: int = 1, + # filter_loc: Union[tuple, str] = '') -> Union[DriverElement, str, None]: + # """返回前面的一个兄弟元素,可用查询语法筛选,可指定返回筛选结果的第几个 \n + # :param index: 前面第几个查询结果元素 + # :param filter_loc: 用于筛选元素的查询语法 + # :return: 本元素前面的某个元素或节点 + # """ + # nodes = self.befores(filter_loc=filter_loc) + # return nodes[index - 1] if nodes else None + # + # def after(self, index: int = 1, + # filter_loc: Union[tuple, str] = '') -> Union[DriverElement, str, None]: + # """返回后面的一个兄弟元素,可用查询语法筛选,可指定返回筛选结果的第几个 \n + # :param index: 后面第几个查询结果元素 + # :param filter_loc: 用于筛选元素的查询语法 + # :return: 本元素后面的某个元素或节点 + # """ + # nodes = self.afters(filter_loc=filter_loc) + # return nodes[index - 1] if nodes else None + # + # def nexts(self, filter_loc: Union[tuple, str] = '') -> List[Union[DriverElement, str]]: + # """返回后面所有兄弟元素或节点组成的列表 \n + # :param filter_loc: 用于筛选元素的查询语法 + # :return: DriverElement对象组成的列表 + # """ + # loc = get_loc(filter_loc, True) + # if loc[0] == 'css selector': + # raise ValueError('此css selector语法不受支持,请换成xpath。') + # + # loc = loc[1].lstrip('./') + # xpath = f'xpath:./{loc}' + # return self.parent_ele.eles(xpath, timeout=0.1) + # + # def befores(self, filter_loc: Union[tuple, str] = '') -> List[Union[DriverElement, str]]: + # """返回后面全部兄弟元素或节点组成的列表,可用查询语法筛选 \n + # :param filter_loc: 用于筛选元素的查询语法 + # :return: 本元素前面的元素或节点组成的列表 + # """ + # loc = get_loc(filter_loc, True) + # if loc[0] == 'css selector': + # raise ValueError('此css selector语法不受支持,请换成xpath。') + # + # loc = loc[1].lstrip('./') + # xpath = f'xpath:./preceding::{loc}' + # return self.parent_ele.eles(xpath, timeout=0.1) + # + # def afters(self, filter_loc: Union[tuple, str] = '') -> List[Union[DriverElement, str]]: + # """返回前面全部兄弟元素或节点组成的列表,可用查询语法筛选 \n + # :param filter_loc: 用于筛选元素的查询语法 + # :return: 本元素后面的元素或节点组成的列表 + # """ + # eles1 = self.nexts(filter_loc) + # loc = get_loc(filter_loc, True)[1].lstrip('./') + # xpath = f'xpath:./following::{loc}' + # return eles1 + self.parent_ele.eles(xpath, timeout=0.1) + # + # def ele(self, + # loc_or_str: Union[Tuple[str, str], str], + # timeout: float = None) -> Union[DriverElement, str, None]: + # """返回当前元素下级符合条件的第一个元素,默认返回 \n + # :param loc_or_str: 元素的定位信息,可以是loc元组,或查询字符串 + # :param timeout: 查找元素超时时间,默认与元素所在页面等待时间一致 + # :return: DriverElement对象或属性、文本 + # """ + # return self._ele(loc_or_str, timeout) + # + # def eles(self, + # loc_or_str: Union[Tuple[str, str], str], + # timeout: float = None) -> List[Union[DriverElement, str]]: + # """返回当前元素下级所有符合条件的子元素 \n + # :param loc_or_str: 元素的定位信息,可以是loc元组,或查询字符串 + # :param timeout: 查找元素超时时间,默认与元素所在页面等待时间一致 + # :return: DriverElement对象或属性、文本组成的列表 + # """ + # return self._ele(loc_or_str, timeout=timeout, single=False) + # + # def s_ele(self, loc_or_ele=None) -> Union[SessionElement, str, None]: + # """查找第一个符合条件的元素以SessionElement形式返回,处理复杂页面时效率很高 \n + # :param loc_or_ele: 元素的定位信息,可以是loc元组,或查询字符串 + # :return: SessionElement对象或属性、文本 + # """ + # return make_session_ele(self, loc_or_ele) + # + # def s_eles(self, loc_or_ele) -> List[Union[SessionElement, str]]: + # """查找所有符合条件的元素以SessionElement列表形式返回,处理复杂页面时效率很高 \n + # :param loc_or_ele: 元素的定位信息,可以是loc元组,或查询字符串 + # :return: SessionElement对象或属性、文本 + # """ + # return make_session_ele(self, loc_or_ele, single=False) + # + # def _ele(self, + # loc_or_str: Union[Tuple[str, str], str], + # timeout: float = None, + # single: bool = True) -> Union[DriverElement, str, None, List[Union[DriverElement, str]]]: + # """返回当前元素下级符合条件的子元素,默认返回第一个 \n + # :param loc_or_str: 元素的定位信息,可以是loc元组,或查询字符串 + # :param timeout: 查找元素超时时间 + # :param single: True则返回第一个,False则返回全部 + # :return: DriverElement对象 + # """ + # # 先转换为sessionElement,再获取所有元素,获取它们的css selector路径,再用路径在页面上执行查找 + # loc = get_loc(loc_or_str) + # if loc[0] == 'css selector' and str(loc[1]).startswith(':root'): + # loc = loc[0], loc[1][5:] + # + # timeout = timeout if timeout is not None else self.page.timeout + # t1 = perf_counter() + # eles = make_session_ele(self.html).eles(loc) + # while not eles and perf_counter() - t1 <= timeout: + # eles = make_session_ele(self.html).eles(loc) + # + # if not eles: + # return None if single else eles + # + # css_paths = [i.css_path[47:] for i in eles] + # + # if single: + # return make_driver_ele(self, f'css:{css_paths[0]}', single, timeout) + # else: + # return [make_driver_ele(self, f'css:{css}', True, timeout) for css in css_paths] + # + # def run_script(self, script: str, *args) -> Any: + # """执行js代码,传入自己为第一个参数 \n + # :param script: js文本 + # :param args: 传入的参数 + # :return: js执行结果 + # """ + # shadow_root = WebElement(self.page.driver, self.inner_ele._id) + # return shadow_root.parent.execute_script(script, shadow_root, *args) + # + # def is_enabled(self) -> bool: + # """是否可用""" + # return self.inner_ele.is_enabled() + # + # def is_valid(self) -> bool: + # """用于判断元素是否还能用,应对页面跳转元素不能用的情况""" + # try: + # self.is_enabled() + # return True + # + # except Exception: + # return False diff --git a/DrissionPage/chrome_element.py b/DrissionPage/chrome_element.py index fbbc5a8..83c41a1 100644 --- a/DrissionPage/chrome_element.py +++ b/DrissionPage/chrome_element.py @@ -20,6 +20,7 @@ class ChromeElement(DrissionElement): super().__init__(page) self._select = None self._scroll = None + self._tag = None if not node_id and not obj_id: raise TypeError('node_id或obj_id必须传入一个。') @@ -31,8 +32,9 @@ class ChromeElement(DrissionElement): self._obj_id = obj_id def __repr__(self) -> str: - attrs = [f"{attr}='{self.attrs[attr]}'" for attr in self.attrs] - return f'' + # attrs = [f"{attr}='{self.attrs[attr]}'" for attr in self.attrs] + # return f'' + return f'' def __call__(self, loc_or_str: Union[Tuple[str, str], str], @@ -58,14 +60,18 @@ class ChromeElement(DrissionElement): @property def tag(self) -> str: - return self.page.driver.DOM.describeNode(nodeId=self._node_id)['node']['localName'] + """返回元素tag""" + # print(self.page.driver.DOM.describeNode(nodeId=self._node_id)) + if self._tag is None: + self._tag = self.page.driver.DOM.describeNode(nodeId=self._node_id)['node']['localName'].lower() + return self._tag @property def inner_html(self) -> str: """返回元素innerHTML文本""" if self.tag in ('iframe', 'frame'): return self.run_script('return this.contentDocument.documentElement;').html - # return _run_script(self, 'this.contentDocument.body;').html + # return run_script(self, 'this.contentDocument.body;').html return self.run_script('return this.innerHTML;') @property @@ -111,38 +117,40 @@ class ChromeElement(DrissionElement): def location(self) -> dict: """返回元素左上角坐标""" js = '''function(){ -function getElementPagePosition(element){ - var actualLeft = element.offsetLeft; - var current = element.offsetParent; - while (current !== null){ - actualLeft += current.offsetLeft; - current = current.offsetParent; - } - var actualTop = element.offsetTop; - var current = element.offsetParent; - while (current !== null){ - actualTop += (current.offsetTop+current.clientTop); - current = current.offsetParent; - } - return actualLeft.toString() +' '+actualTop.toString(); -} - return getElementPagePosition(this);}''' + function getElementPagePosition(element){ + var actualLeft = element.offsetLeft; + var current = element.offsetParent; + while (current !== null){ + actualLeft += current.offsetLeft; + current = current.offsetParent; + } + var actualTop = element.offsetTop; + var current = element.offsetParent; + while (current !== null){ + actualTop += (current.offsetTop+current.clientTop); + current = current.offsetParent; + } + return actualLeft.toString() +' '+actualTop.toString(); + } + return getElementPagePosition(this);}''' xy = self.run_script(js) x, y = xy.split(' ') return {'x': int(x.split('.')[0]), 'y': int(y.split('.')[0])} - # @property - # def shadow_root(self): - # """返回当前元素的shadow_root元素对象""" - # shadow = self.run_script('return arguments[0].shadowRoot') - # if shadow: - # from .shadow_root_element import ShadowRootElement - # return ShadowRootElement(shadow, self) - # - # @property - # def sr(self): - # """返回当前元素的shadow_root元素对象""" - # return self.shadow_root + + @property + def shadow_root(self): + """返回当前元素的shadow_root元素对象""" + shadow = self.run_script('return this.shadowRoot;') + return shadow + # if shadow: + # from .shadow_root_element import ShadowRootElement + # return ShadowRootElement(shadow, self) + + @property + def sr(self): + """返回当前元素的shadow_root元素对象""" + return self.shadow_root @property def pseudo_before(self) -> str: @@ -345,7 +353,7 @@ function getElementPagePosition(element){ :param args: 参数,按顺序在js文本中对应argument[0]、argument[2]... :return: 运行的结果 """ - return _run_script(self, script, as_expr, args) + return _run_script(self, script, as_expr, self.page.timeouts.script, args) def ele(self, loc_or_str: Union[Tuple[str, str], str], @@ -474,55 +482,38 @@ function getElementPagePosition(element){ :param clear: 输入前是否清空文本框 :return: None """ - combination_key = False - if not isinstance(vals, (str, tuple, list)): - vals = str(vals) - if isinstance(vals, str): - if '\n' in vals: - combination_key = True - vals = (vals,) + if self.tag == 'input' and self.attr('type') == 'file': + return self._set_file_input(vals) try: self.page.driver.DOM.focus(nodeId=self._node_id) except Exception: - self.click(by_js=False) + self.click(by_js=True) if clear: self.clear(by_js=True) - if not combination_key: - for i in ('\ue008', '\ue009', '\ue00a', '\ue03d'): # ctrl alt shift command 四键 - if i in vals: - combination_key = True - break + # ------------处理字符------------- + if not isinstance(vals, (tuple, list)): + vals = (str(vals),) + modifier, vals = _keys_to_typing(vals) - if not combination_key: - self.page.run_cdp('Input.insertText', text=''.join(vals)) + if modifier != 0: # 包含组合键 + for key in vals: + _send_key(self, modifier, key) return - modifier, typing = _keys_to_typing(vals) - for key in typing: - print([key]) - if key not in _keyDefinitions: - self.page.run_cdp('Input.insertText', text=key) + if vals.endswith('\n'): + self.page.run_cdp('Input.insertText', text=vals[:-1]) + _send_key(self, modifier, '\n') + else: + self.page.run_cdp('Input.insertText', text=vals) - else: - description = _keyDescriptionForString(modifier, key) - text = description['text'] - data = {'type': 'keyDown' if text else 'rawKeyDown', - 'modifiers': modifier, - 'windowsVirtualKeyCode': description['keyCode'], - 'code': description['code'], - 'key': description['key'], - 'text': text, - 'autoRepeat': False, - 'unmodifiedText': text, - 'location': description['location'], - 'isKeypad': description['location'] == 3} - - self.page.run_cdp('Input.dispatchKeyEvent', **data) - # data['type'] = 'keyUp' - # self.page.run_cdp('Input.dispatchKeyEvent', **data) + def _set_file_input(self, files: Union[str, list, tuple]) -> None: + """设置上传控件值""" + if isinstance(files): + files = files.split('\n') + self.page.driver.DOM.setFileInputFiles(files=files, nodeId=self._node_id) def clear(self, by_js: bool = True) -> None: """清空元素文本 \n @@ -752,7 +743,8 @@ def _find_by_xpath(ele: ChromeElement, xpath: str, single: bool, timeout: float) def _find_by_css(ele: ChromeElement, selector: str, single: bool, timeout: float): selector = selector.replace('"', r'\"') find_all = '' if single else 'All' - js = f'this.querySelector{find_all}("{selector}");' + node_txt = 'this.contentDocument' if ele.tag in ('iframe', 'frame') else 'this' + js = f'function(){{return {node_txt}.querySelector{find_all}("{selector}");}}' r = ele.page.run_cdp('Runtime.callFunctionOn', functionDeclaration=js, objectId=ele.obj_id, returnByValue=False, awaitPromise=True, userGesture=True) @@ -815,7 +807,7 @@ else{a.push(e.snapshotItem(i));}}""" return js -def _run_script(page_or_ele, script: str, as_expr: bool = False, args: tuple = None) -> Any: +def _run_script(page_or_ele, script: str, as_expr: bool = False, timeout: float = None, args: tuple = None) -> Any: """运行javascript代码 \n :param page_or_ele: 页面对象或元素对象 :param script: js文本 @@ -835,7 +827,8 @@ def _run_script(page_or_ele, script: str, as_expr: bool = False, args: tuple = N expression=script, returnByValue=False, awaitPromise=True, - userGesture=True) + userGesture=True, + timeout=timeout * 1000) else: args = args or () if not is_js_func(script): @@ -851,7 +844,7 @@ def _run_script(page_or_ele, script: str, as_expr: bool = False, args: tuple = N exceptionDetails = res.get('exceptionDetails') if exceptionDetails: raise RuntimeError(f'Evaluation failed: {exceptionDetails}') - # print(res.get('result')) + return _parse_js_result(page, res.get('result')) @@ -928,6 +921,39 @@ def _offset_scroll(ele: ChromeElement, x: int, y: int): return x, y +def _send_enter(ele: ChromeElement): + # todo:windows系统回车是否不一样 + data = {'type': 'keyDown', 'modifiers': 0, 'windowsVirtualKeyCode': 13, 'code': 'Enter', 'key': 'Enter', + 'text': '\r', 'autoRepeat': False, 'unmodifiedText': '\r', 'location': 0, 'isKeypad': False} + + ele.page.run_cdp('Input.dispatchKeyEvent', **data) + data['type'] = 'keyUp' + ele.page.run_cdp('Input.dispatchKeyEvent', **data) + + +def _send_key(ele: ChromeElement, modifier: int, key: str) -> None: + if key not in _keyDefinitions: + ele.page.run_cdp('Input.insertText', text=key) + + else: + description = _keyDescriptionForString(modifier, key) + text = description['text'] + data = {'type': 'keyDown' if text else 'rawKeyDown', + 'modifiers': modifier, + 'windowsVirtualKeyCode': description['keyCode'], + 'code': description['code'], + 'key': description['key'], + 'text': text, + 'autoRepeat': False, + 'unmodifiedText': text, + 'location': description['location'], + 'isKeypad': description['location'] == 3} + + ele.page.run_cdp('Input.dispatchKeyEvent', **data) + data['type'] = 'keyUp' + ele.page.run_cdp('Input.dispatchKeyEvent', **data) + + class ChromeScroll(object): """用于滚动的对象""" diff --git a/DrissionPage/chrome_page.py b/DrissionPage/chrome_page.py index acd152b..4177019 100644 --- a/DrissionPage/chrome_page.py +++ b/DrissionPage/chrome_page.py @@ -10,6 +10,7 @@ from json import loads from requests.cookies import RequestsCookieJar +from .session_element import SessionElement, make_session_ele from .config import DriverOptions, _cookies_to_tuple from .base import BasePage from .common import get_loc @@ -25,24 +26,27 @@ class ChromePage(BasePage): super().__init__(timeout) self._connect_debugger(Tab_or_Options, tab_handle) - # def _ready(self): - # self._alert = Alert() - # self.driver.Page.javascriptDialogOpening = self._on_alert_open - # self.driver.Page.javascriptDialogClosed = self._on_alert_close - def _connect_debugger(self, Tab_or_Options: Union[Tab, DriverOptions] = None, tab_handle: str = None): + self.timeouts = Timeout(self) + self._page_load_strategy = 'normal' if isinstance(Tab_or_Options, Tab): self._driver = Tab_or_Options self.address = search(r'ws://(.*?)/dev', Tab_or_Options._websocket_url).group(1) + self.options = None - else: - if Tab_or_Options is None: - Tab_or_Options = DriverOptions() # 从ini文件读取 - connect_chrome(Tab_or_Options) - self.address = Tab_or_Options.debugger_address + elif isinstance(Tab_or_Options, DriverOptions): + self.options = Tab_or_Options or DriverOptions() # 从ini文件读取 + self.set_timeouts(page_load=self.options.timeouts['pageLoad'], + script=self.options.timeouts['script']) + self._page_load_strategy = self.options.page_load_strategy + connect_chrome(self.options) + self.address = self.options.debugger_address tab_handle = self.tab_handles[0] if not tab_handle else tab_handle self._driver = Tab(id=tab_handle, type='page', - webSocketDebuggerUrl=f'ws://{Tab_or_Options.debugger_address}/devtools/page/{tab_handle}') + webSocketDebuggerUrl=f'ws://{self.options.debugger_address}/devtools/page/{tab_handle}') + + else: + raise TypeError('只能接收Tab或DriverOptions类型参数。') self._driver.start() self._driver.DOM.enable() @@ -110,7 +114,7 @@ class ChromePage(BasePage): @property def ready_state(self) -> str: """返回当前页面加载状态,""" - return self.driver.Runtime.evaluate(expression='document.readyState;')['result']['value'] + return self.run_script('document.readyState;', as_expr=True) @property def scroll(self) -> ChromeScroll: @@ -122,10 +126,41 @@ class ChromePage(BasePage): @property def size(self) -> dict: """返回页面总长宽""" - w = self.driver.Runtime.evaluate(expression='document.body.scrollWidth;')['result']['value'] - h = self.driver.Runtime.evaluate(expression='document.body.scrollHeight;')['result']['value'] + w = self.run_script('document.body.scrollWidth;', as_expr=True) + h = self.run_script('document.body.scrollHeight;', as_expr=True) return {'height': h, 'width': w} + @property + def active_ele(self) -> ChromeElement: + return self.run_script('return document.activeElement;') + + @property + def page_load_strategy(self) -> str: + """返回页面加载策略""" + return self._page_load_strategy + + def set_page_load_strategy(self, value: str) -> None: + """设置页面加载策略,可选'normal', 'eager', 'none'""" + if value not in ('normal', 'eager', 'none'): + raise ValueError("只能选择'normal', 'eager', 'none'。") + self._page_load_strategy = value + + def set_timeouts(self, implicit: float = None, page_load: float = None, script: float = None) -> None: + """设置超时时间,单位为秒,selenium4以上版本有效 \n + :param implicit: 查找元素超时时间 + :param page_load: 页面加载超时时间 + :param script: 脚本运行超时时间 + :return: None + """ + if implicit is not None: + self.timeout = implicit + + if page_load is not None: + self.timeouts.page_load = page_load + + if script is not None: + self.timeouts.script = script + def run_script(self, script: str, as_expr: bool = False, *args: Any) -> Any: """运行javascript代码 \n :param script: js文本 @@ -133,7 +168,7 @@ class ChromePage(BasePage): :param args: 参数,按顺序在js文本中对应argument[0]、argument[2]... :return: 运行的结果 """ - return _run_script(self, script, as_expr, args) + return _run_script(self, script, as_expr, self.timeouts.script, args) def get(self, url: str, @@ -187,11 +222,22 @@ class ChromePage(BasePage): timeout: float = None) -> List[Union[ChromeElement, str]]: return self._ele(loc_or_ele, timeout=timeout, single=False) - # def s_ele(self): - # pass - # - # def s_eles(self): - # pass + def s_ele(self, loc_or_ele: Union[Tuple[str, str], str, ChromeElement] = None) -> Union[SessionElement, str, None]: + """查找第一个符合条件的元素以SessionElement形式返回,处理复杂页面时效率很高 \n + :param loc_or_ele: 元素的定位信息,可以是loc元组,或查询字符串 + :return: SessionElement对象或属性、文本 + """ + if isinstance(loc_or_ele, ChromeElement): + return make_session_ele(loc_or_ele) + else: + return make_session_ele(self, loc_or_ele) + + def s_eles(self, loc_or_str: Union[Tuple[str, str], str] = None) -> List[Union[SessionElement, str]]: + """查找所有符合条件的元素以SessionElement列表形式返回 \n + :param loc_or_str: 元素的定位信息,可以是loc元组,或查询字符串 + :return: SessionElement对象组成的列表 + """ + return make_session_ele(self, loc_or_str, single=False) def _ele(self, loc_or_ele: Union[Tuple[str, str], str, ChromeElement], @@ -205,12 +251,12 @@ class ChromePage(BasePage): raise ValueError('loc_or_str参数只能是tuple、str、ChromeElement类型。') timeout = timeout if timeout is not None else self.timeout - search_result = self.driver.DOM.performSearch(query=loc) + search_result = self.driver.DOM.performSearch(query=loc, includeUserAgentShadowDOM=True) count = search_result['resultCount'] end_time = perf_counter() + timeout while count == 0 and perf_counter() < end_time: - search_result = self.driver.DOM.performSearch(query=loc) + search_result = self.driver.DOM.performSearch(query=loc, includeUserAgentShadowDOM=True) count = search_result['resultCount'] if count == 0: @@ -309,16 +355,17 @@ class ChromePage(BasePage): :param steps: 次数 :return: None """ - self.driver.Runtime.evaluate(expression=f'window.history.go({steps});') + self.run_script(f'window.history.go({steps});', as_expr=True) def back(self, steps: int = 1) -> None: """在浏览历史中后退若干步 \n :param steps: 次数 :return: None """ - self.driver.Runtime.evaluate(expression=f'window.history.go({-steps});') + self.run_script(f'window.history.go({-steps});', as_expr=True) def stop_loading(self) -> None: + """页面停止加载""" self.driver.Page.stopLoading() def run_cdp(self, cmd: str, **cmd_args): @@ -342,7 +389,7 @@ class ChromePage(BasePage): :return: sessionStorage一个或所有项内容 """ js = f'sessionStorage.getItem("{item}");' if item else 'sessionStorage;' - return self.driver.Runtime.evaluate(js) + return self.run_script(js, as_expr=True) def get_local_storage(self, item: str = None) -> Union[str, dict, None]: """获取localStorage信息,不设置item则获取全部 \n @@ -350,7 +397,7 @@ class ChromePage(BasePage): :return: localStorage一个或所有项内容 """ js = f'localStorage.getItem("{item}");' if item else 'localStorage;' - return self.driver.Runtime.evaluate(js) + return self.run_script(js, as_expr=True) def set_session_storage(self, item: str, value: Union[str, bool]) -> None: """设置或删除某项sessionStorage信息 \n @@ -358,8 +405,8 @@ class ChromePage(BasePage): :param value: 项的值,设置为False时,删除该项 :return: None """ - s = f'sessionStorage.removeItem("{item}");' if item is False else f'sessionStorage.setItem("{item}","{value}");' - return self.driver.Runtime.evaluate(s) + js = f'sessionStorage.removeItem("{item}");' if item is False else f'sessionStorage.setItem("{item}","{value}");' + return self.run_script(js, as_expr=True) def set_local_storage(self, item: str, value: Union[str, bool]) -> None: """设置或删除某项localStorage信息 \n @@ -367,8 +414,8 @@ class ChromePage(BasePage): :param value: 项的值,设置为False时,删除该项 :return: None """ - s = f'localStorage.removeItem("{item}");' if item is False else f'localStorage.setItem("{item}","{value}");' - return self.driver.Runtime.evaluate(s) + js = f'localStorage.removeItem("{item}");' if item is False else f'localStorage.setItem("{item}","{value}");' + return self.run_script(js, as_expr=True) def create_tab(self, url: str = None) -> None: """新建并定位到一个标签页,该标签页在最后面 \n @@ -442,7 +489,7 @@ class ChromePage(BasePage): """ self.close_tabs(num_or_handles, True) - def clean_cache(self, + def clear_cache(self, session_storage: bool = True, local_storage: bool = True, cache: bool = True, @@ -455,84 +502,14 @@ class ChromePage(BasePage): :return: None """ if session_storage: - self.driver.Runtime.evaluate(expression='sessionStorage.clear();') + self.run_script('sessionStorage.clear();', as_expr=True) if local_storage: - self.driver.Runtime.evaluate(expression='localStorage.clear();') + self.run_script('localStorage.clear();', as_expr=True) if cache: self.driver.Network.clearBrowserCache() if cookies: self.driver.Network.clearBrowserCookies() - def check_page(self): - pass - - # @property - # def active_ele(self): - # pass - - def _d_connect(self, - to_url: str, - times: int = 0, - interval: float = 1, - show_errmsg: bool = False, - timeout: float = None) -> Union[bool, None]: - """尝试连接,重试若干次 \n - :param to_url: 要访问的url - :param times: 重试次数 - :param interval: 重试间隔(秒) - :param show_errmsg: 是否抛出异常 - :return: 是否成功,返回None表示不确定 - """ - err = None - is_ok = False - timeout = timeout if timeout is not None else self.timeout - - for _ in range(times + 1): - try: - result = self.driver.Page.navigate(url=to_url) - end_time = perf_counter() + timeout - while self.ready_state != 'complete' and perf_counter() < end_time: - sleep(.5) - if self.ready_state != 'complete': - raise TimeoutError - if 'errorText' in result: - raise ConnectionError(result['errorText']) - go_ok = True - except Exception as e: - err = e - go_ok = False - - is_ok = self.check_page() if go_ok else False - - if is_ok is not False: - break - - if _ < times: - sleep(interval) - if show_errmsg: - print(f'重试 {to_url}') - - if is_ok is False and show_errmsg: - raise err if err is not None else ConnectionError('连接异常。') - - return is_ok - - def _on_alert_close(self, **kwargs): - self._alert.activated = False - self._alert.text = None - self._alert.type = None - self._alert.defaultPrompt = None - self._alert.response_accept = kwargs.get['result'] - self._alert.response_text = kwargs['userInput'] - - def _on_alert_open(self, **kwargs): - self._alert.activated = True - self._alert.text = kwargs['message'] - self._alert.type = kwargs['message'] - self._alert.defaultPrompt = kwargs.get('defaultPrompt', None) - self._alert.response_accept = None - self._alert.response_text = None - def handle_alert(self, accept: bool = True, send: str = None, timeout: float = None) -> Union[str, None]: """处理提示框 \n :param accept: True表示确认,False表示取消,其它值不会按按钮但依然返回文本值 @@ -554,8 +531,86 @@ class ChromePage(BasePage): self.driver.Page.handleJavaScriptDialog(accept=accept) return res_text + def check_page(self) -> Union[bool, None]: + """检查页面是否符合预期 \n + 由子类自行实现各页面的判定规则 + """ + return None + + def _d_connect(self, + to_url: str, + times: int = 0, + interval: float = 1, + show_errmsg: bool = False, + timeout: float = None) -> Union[bool, None]: + """尝试连接,重试若干次 \n + :param to_url: 要访问的url + :param times: 重试次数 + :param interval: 重试间隔(秒) + :param show_errmsg: 是否抛出异常 + :param timeout: 连接超时时间 + :return: 是否成功,返回None表示不确定 + """ + err = None + is_ok = False + timeout = timeout if timeout is not None else self.timeout + + for _ in range(times + 1): + result = self.driver.Page.navigate(url=to_url) + + is_timeout = True + end_time = perf_counter() + timeout + while perf_counter() < end_time: + if ((self.page_load_strategy == 'normal' and self.ready_state == 'complete') + or (self.page_load_strategy == 'eager' and self.ready_state in ('interactive', 'complete')) + or (self.page_load_strategy == 'none' and self.ready_state + in ('loading', 'interactive', 'complete'))): + self.stop_loading() + is_timeout = False + break + + if is_timeout: + raise TimeoutError('页面连接超时。') + if 'errorText' in result: + raise ConnectionError(result['errorText']) + + is_ok = self.check_page() + + if is_ok is not False: + break + + if _ < times: + sleep(interval) + if show_errmsg: + print(f'重试 {to_url}') + + if is_ok is False and show_errmsg: + raise err if err is not None else ConnectionError('连接异常。') + + return is_ok + + def _on_alert_close(self, **kwargs): + """alert关闭时触发的方法""" + self._alert.activated = False + self._alert.text = None + self._alert.type = None + self._alert.defaultPrompt = None + self._alert.response_accept = kwargs.get['result'] + self._alert.response_text = kwargs['userInput'] + + def _on_alert_open(self, **kwargs): + """alert出现时触发的方法""" + self._alert.activated = True + self._alert.text = kwargs['message'] + self._alert.type = kwargs['message'] + self._alert.defaultPrompt = kwargs.get('defaultPrompt', None) + self._alert.response_accept = None + self._alert.response_text = None + class Alert(object): + """用于保存alert信息""" + def __init__(self): self.activated = False self.text = None @@ -565,6 +620,19 @@ class Alert(object): self.response_text = None +class Timeout(object): + """用于保存d模式timeout信息""" + + def __init__(self, page: ChromePage): + self.page = page + self.page_load = 30 + self.script = 30 + + @property + def implicit(self): + return self.page.timeout + + def _get_tabs(handles: list, num_or_handles: Union[int, str, list, tuple, set]) -> set: """返回指定标签页handle组成的set \n :param handles: handles列表 diff --git a/DrissionPage/config.py b/DrissionPage/config.py index 139cf88..28e6405 100644 --- a/DrissionPage/config.py +++ b/DrissionPage/config.py @@ -703,6 +703,8 @@ class DriverOptions(Options): :param value: 可接收 'normal', 'eager', 'none' :return: 当前对象 """ + if value not in ('normal', 'eager', 'none'): + raise ValueError("只能选择'normal', 'eager', 'none'。") self.page_load_strategy = value.lower() return self diff --git a/DrissionPage/driver_page.py b/DrissionPage/driver_page.py index aaa760b..2f93d13 100644 --- a/DrissionPage/driver_page.py +++ b/DrissionPage/driver_page.py @@ -512,9 +512,9 @@ class DriverPage(BasePage): """ return glob(f'{download_path}{sep}*.crdownload') - def handle_alert(self, accept: bool = True, send: str = None, timeout: float = None) -> Union[str, None]: + def process_alert(self, ok: bool = True, send: str = None, timeout: float = None) -> Union[str, None]: """处理提示框 \n - :param accept: True表示确认,False表示取消,其它值不会按按钮但依然返回文本值 + :param ok: True表示确认,False表示取消,其它值不会按按钮但依然返回文本值 :param send: 处理prompt提示框时可输入文本 :param timeout: 等待提示框出现的超时时间 :return: 提示框内容文本,未等到提示框则返回None @@ -540,9 +540,9 @@ class DriverPage(BasePage): if send is not None: alert.send_keys(send) - if accept is True: + if ok is True: alert.accept() - elif accept is False: + elif ok is False: alert.dismiss() return res_text diff --git a/DrissionPage/keys.py b/DrissionPage/keys.py index 47ac86d..1a9ea1e 100644 --- a/DrissionPage/keys.py +++ b/DrissionPage/keys.py @@ -3,9 +3,7 @@ from typing import List, Tuple, Dict class Keys: - """ - Set of special keys codes. - """ + """特殊按键""" NULL = '\ue000' CANCEL = '\ue001' # ^break @@ -329,14 +327,18 @@ _keyDefinitions = { '}': {'keyCode': 221, 'key': '}', 'code': 'BracketRight'}, '"': {'keyCode': 222, 'key': '"', 'code': 'Quote'}, } +_modifierBit = {'\ue00a': 1, + '\ue009': 2, + '\ue03d': 4, + '\ue008': 8} -def _keys_to_typing(value) -> Tuple[int, list]: +def _keys_to_typing(value) -> Tuple[int, str]: typing: List[str] = [] modifier = 0 for val in value: - if val in ('\ue008', '\ue009', '\ue00a', '\ue03d'): - modifier |= _modifierBit(val) + if val in ('\ue009', '\ue008', '\ue00a', '\ue03d'): + modifier |= _modifierBit.get(val, 0) continue if isinstance(val, (int, float)): val = str(val) @@ -345,30 +347,17 @@ def _keys_to_typing(value) -> Tuple[int, list]: else: for i in range(len(val)): typing.append(val[i]) - return modifier, typing - -def _modifierBit(key: str) -> int: - if key == '\ue00a': - return 1 - if key == '\ue009': - return 2 - if key == '\ue03d': - return 4 - if key == '\ue008': - return 8 - return 0 + return modifier, ''.join(typing) def _keyDescriptionForString(_modifiers: int, keyString: str) -> Dict: # noqa: C901 shift = _modifiers & 8 - description = { - 'key': '', - 'keyCode': 0, - 'code': '', - 'text': '', - 'location': 0, - } + description = {'key': '', + 'keyCode': 0, + 'code': '', + 'text': '', + 'location': 0} definition: Dict = _keyDefinitions.get(keyString) # type: ignore if not definition: diff --git a/DrissionPage/web_page.py b/DrissionPage/web_page.py index 729e667..cda6a9b 100644 --- a/DrissionPage/web_page.py +++ b/DrissionPage/web_page.py @@ -185,6 +185,27 @@ class WebPage(SessionPage, ChromePage, BasePage): elif self._mode == 'd': return super(SessionPage, self).eles(loc_or_str, timeout=timeout) + def s_ele(self, loc_or_ele: Union[Tuple[str, str], str, ChromeElement, SessionElement] = None) \ + -> Union[SessionElement, str, None]: + """查找第一个符合条件的元素以SessionElement形式返回,d模式处理复杂页面时效率很高 \n + :param loc_or_ele: 元素的定位信息,可以是loc元组,或查询字符串 + :return: SessionElement对象或属性、文本 + """ + if self._mode == 's': + return super().s_ele(loc_or_ele) + elif self._mode == 'd': + return super(SessionPage, self).s_ele(loc_or_ele) + + def s_eles(self, loc_or_str: Union[Tuple[str, str], str] = None) -> List[Union[SessionElement, str]]: + """查找所有符合条件的元素以SessionElement形式返回,d模式处理复杂页面时效率很高 \n + :param loc_or_str: 元素的定位信息,可以是loc元组,或查询字符串 + :return: SessionElement对象或属性、文本组成的列表 + """ + if self._mode == 's': + return super().s_eles(loc_or_str) + elif self._mode == 'd': + return super(SessionPage, self).s_eles(loc_or_str) + def change_mode(self, mode: str = None, go: bool = True) -> None: """切换模式,接收's'或'd',除此以外的字符串会切换为 d 模式 \n 切换时会把当前模式的cookies复制到目标模式 \n @@ -379,4 +400,4 @@ class WebPage(SessionPage, ChromePage, BasePage): self._session_options = Session_or_Options else: - raise TypeError('session_or_options参数只能接收Session, dict, SessionOptions或False。') \ No newline at end of file + raise TypeError('session_or_options参数只能接收Session, dict, SessionOptions或False。')