From ed8c53d7384abc501b1a7caa57a30a35556fc7ff Mon Sep 17 00:00:00 2001 From: g1879 Date: Tue, 7 Nov 2023 16:15:01 +0800 Subject: [PATCH] =?UTF-8?q?=E5=85=83=E7=B4=A0=E5=A2=9E=E5=8A=A0locations.r?= =?UTF-8?q?ect=E5=92=8Clocations.viewport=5Frect=E5=B1=9E=E6=80=A7?= =?UTF-8?q?=EF=BC=9B=E4=BC=98=E5=8C=96is=5Fcovered=E9=80=BB=E8=BE=91?= =?UTF-8?q?=EF=BC=9B=E4=BC=98=E5=8C=96Driver=20=5Fsend()=E9=80=BB=E8=BE=91?= =?UTF-8?q?=EF=BC=9B=E4=BF=AE=E5=A4=8DListener=20method=E8=AE=BE=E7=BD=AE?= =?UTF-8?q?=E9=97=AE=E9=A2=98=EF=BC=9B=E4=BC=98=E5=8C=96=E7=82=B9=E5=87=BB?= =?UTF-8?q?=E9=80=BB=E8=BE=91=EF=BC=9BDriver=E7=BB=9F=E4=B8=80=E5=9C=A8bro?= =?UTF-8?q?wser=E5=88=9B=E5=BB=BA?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- DrissionPage/_base/browser.py | 24 +- DrissionPage/_base/browser.pyi | 7 +- DrissionPage/_base/chromium_driver.py | 17 +- DrissionPage/_commons/web.py | 4 +- DrissionPage/_configs/chromium_options.py | 2 +- DrissionPage/_configs/session_options.py | 3 +- DrissionPage/_elements/chromium_element.py | 367 ++------------------ DrissionPage/_elements/chromium_element.pyi | 117 +------ DrissionPage/_pages/chromium_base.py | 8 +- DrissionPage/_units/clicker.py | 54 +-- DrissionPage/_units/download_manager.py | 17 +- DrissionPage/_units/element_states.py | 100 ++++++ DrissionPage/_units/element_states.pyi | 50 +++ DrissionPage/_units/network_listener.py | 47 ++- DrissionPage/_units/select_element.py | 244 +++++++++++++ DrissionPage/_units/select_element.pyi | 63 ++++ 16 files changed, 593 insertions(+), 531 deletions(-) create mode 100644 DrissionPage/_units/element_states.py create mode 100644 DrissionPage/_units/element_states.pyi create mode 100644 DrissionPage/_units/select_element.py create mode 100644 DrissionPage/_units/select_element.pyi diff --git a/DrissionPage/_base/browser.py b/DrissionPage/_base/browser.py index 5593260..46ad7c1 100644 --- a/DrissionPage/_base/browser.py +++ b/DrissionPage/_base/browser.py @@ -5,7 +5,7 @@ """ from time import sleep -from .chromium_driver import BrowserDriver +from .chromium_driver import BrowserDriver, ChromiumDriver from .._units.download_manager import BrowserDownloadManager @@ -38,6 +38,8 @@ class Browser(object): self._driver = BrowserDriver(browser_id, 'browser', address) self.id = browser_id self._frames = {} + self._drivers = {} + # self._drivers = {t: ChromiumDriver(t, 'page', address) for t in self.tabs} self._connected = False self._process_id = None @@ -49,13 +51,27 @@ class Browser(object): self.run_cdp('Target.setDiscoverTargets', discover=True) self._driver.set_listener('Target.targetDestroyed', self._onTargetDestroyed) + self._driver.set_listener('Target.targetCreated', self._onTargetCreated) + + def _get_driver(self, tab_id): + """获取对应tab id的ChromiumDriver + :param tab_id: 标签页id + :return: ChromiumDriver对象 + """ + return self._drivers.pop(tab_id, ChromiumDriver(tab_id, 'page', self.address)) + + def _onTargetCreated(self, **kwargs): + """标签页创建时执行""" + if kwargs['targetInfo']['type'] == 'page' and not kwargs['targetInfo']['url'].startswith('devtools://'): + self._drivers[kwargs['targetInfo']['targetId']] = ChromiumDriver(kwargs['targetInfo']['targetId'], 'page', self.address) def _onTargetDestroyed(self, **kwargs): """标签页关闭时执行""" tab_id = kwargs['targetId'] self._dl_mgr.clear_tab_info(tab_id) - for item in [(k, i) for k, i in self._frames.items() if i == tab_id]: - self._frames.pop(item[0]) + for key in [k for k, i in self._frames.items() if i == tab_id]: + self._frames.pop(key, None) + self._drivers.pop(tab_id, None) def connect_to_page(self): """执行与page相关的逻辑""" @@ -150,4 +166,4 @@ class Browser(object): break sleep(.2) - Browser.BROWSERS.pop(self.id) + Browser.BROWSERS.pop(self.id, None) diff --git a/DrissionPage/_base/browser.pyi b/DrissionPage/_base/browser.pyi index ee5edad..cb1a13a 100644 --- a/DrissionPage/_base/browser.pyi +++ b/DrissionPage/_base/browser.pyi @@ -5,7 +5,7 @@ """ from typing import List, Optional, Union -from .chromium_driver import BrowserDriver +from .chromium_driver import BrowserDriver, ChromiumDriver from .._pages.chromium_page import ChromiumPage from .._units.download_manager import BrowserDownloadManager @@ -17,6 +17,7 @@ class Browser(object): id: str = ... address: str = ... _frames: dict = ... + _drivers: dict = ... _process_id: Optional[int] = ... _dl_mgr: BrowserDownloadManager = ... _connected: bool = ... @@ -25,6 +26,8 @@ class Browser(object): def __init__(self, address: str, browser_id: str, page: ChromiumPage): ... + def _get_driver(self, tab_id: str)->ChromiumDriver: ... + def run_cdp(self, cmd, **cmd_args) -> dict: ... @property @@ -50,6 +53,8 @@ class Browser(object): def connect_to_page(self) -> None: ... + def _onTargetCreated(self, **kwargs) -> None: ... + def _onTargetDestroyed(self, **kwargs) -> None: ... def quit(self) -> None: ... diff --git a/DrissionPage/_base/chromium_driver.py b/DrissionPage/_base/chromium_driver.py index 83309a4..b9f1c9e 100644 --- a/DrissionPage/_base/chromium_driver.py +++ b/DrissionPage/_base/chromium_driver.py @@ -71,26 +71,29 @@ class ChromiumDriver(object): try: self._ws.send(message_json) except OSError: - self.method_results.pop(ws_id) + self.method_results.pop(ws_id, None) return None while not self._stopped.is_set(): try: - return self.method_results[ws_id].get(timeout=.2) + result = self.method_results[ws_id].get(timeout=.2) + self.method_results.pop(ws_id, None) + return result except Empty: if self.alert_flag: self.alert_flag = False - return {'result': {'message': 'alert exists.'}} + result = {'result': {'message': 'alert exists.'}} + self.method_results.pop(ws_id, None) + return result elif timeout is not None and perf_counter() > timeout: - return {'error': {'message': 'timeout'}} + result = {'error': {'message': 'timeout'}} + self.method_results.pop(ws_id, None) + return result continue - finally: - self.method_results.pop(ws_id) - def _recv_loop(self): """接收浏览器信息的守护线程方法""" while not self._stopped.is_set(): diff --git a/DrissionPage/_commons/web.py b/DrissionPage/_commons/web.py index 543d222..9d9c7b5 100644 --- a/DrissionPage/_commons/web.py +++ b/DrissionPage/_commons/web.py @@ -169,8 +169,8 @@ def cookie_to_dict(cookie): """ if isinstance(cookie, Cookie): cookie_dict = cookie.__dict__.copy() - cookie_dict.pop('rfc2109') - cookie_dict.pop('_rest') + cookie_dict.pop('rfc2109', None) + cookie_dict.pop('_rest', None) return cookie_dict elif isinstance(cookie, dict): diff --git a/DrissionPage/_configs/chromium_options.py b/DrissionPage/_configs/chromium_options.py index 1ce072f..214ccd0 100644 --- a/DrissionPage/_configs/chromium_options.py +++ b/DrissionPage/_configs/chromium_options.py @@ -209,7 +209,7 @@ class ChromiumOptions(object): :param arg: 设置项名称 :return: 当前对象 """ - self._prefs.pop(arg) + self._prefs.pop(arg, None) return self def remove_pref_from_file(self, arg): diff --git a/DrissionPage/_configs/session_options.py b/DrissionPage/_configs/session_options.py index 4339b08..d918f63 100644 --- a/DrissionPage/_configs/session_options.py +++ b/DrissionPage/_configs/session_options.py @@ -161,8 +161,7 @@ class SessionOptions(object): return self attr = attr.lower() - if attr in self._headers: - self._headers.pop(attr) + self._headers.pop(attr, None) return self diff --git a/DrissionPage/_elements/chromium_element.py b/DrissionPage/_elements/chromium_element.py index e51a0d6..704c5d6 100644 --- a/DrissionPage/_elements/chromium_element.py +++ b/DrissionPage/_elements/chromium_element.py @@ -13,12 +13,14 @@ from .._commons.constants import FRAME_ELEMENT, NoneElement, Settings from .._commons.keys import keys_to_typing, keyDescriptionForString, keyDefinitions from .._commons.locator import get_loc from .._commons.tools import get_usable_path -from .._commons.web import make_absolute_link, get_ele_txt, format_html, is_js_func, location_in_viewport, offset_scroll +from .._commons.web import make_absolute_link, get_ele_txt, format_html, is_js_func, offset_scroll from .._units.clicker import Clicker +from .._units.element_states import ChromiumElementStates, ShadowRootStates +from .._units.select_element import SelectElement from .._units.setter import ChromiumElementSetter from .._units.waiter import ChromiumElementWaiter -from ..errors import ContextLossError, ElementLossError, JavaScriptError, ElementNotFoundError, \ - CDPError, NoResourceError, NoRectError, AlertExistsError +from ..errors import (ContextLossError, ElementLossError, JavaScriptError, ElementNotFoundError, + CDPError, NoResourceError, AlertExistsError) class ChromiumElement(DrissionElement): @@ -202,7 +204,7 @@ class ChromiumElement(DrissionElement): if self.tag != 'select': self._select = False else: - self._select = ChromiumSelect(self) + self._select = SelectElement(self) return self._select @@ -1436,103 +1438,6 @@ def send_key(ele, modifier, key): ele.page.run_cdp('Input.dispatchKeyEvent', **data) -class ChromiumElementStates(object): - def __init__(self, ele): - """ - :param ele: ChromiumElement - """ - self._ele = ele - - @property - def is_selected(self): - """返回元素是否被选择""" - return self._ele.run_js('return this.selected;') - - @property - def is_checked(self): - """返回元素是否被选择""" - return self._ele.run_js('return this.checked;') - - @property - def is_displayed(self): - """返回元素是否显示""" - return not (self._ele.style('visibility') == 'hidden' - or self._ele.run_js('return this.offsetParent === null;') - or self._ele.style('display') == 'none') - - @property - def is_enabled(self): - """返回元素是否可用""" - return not self._ele.run_js('return this.disabled;') - - @property - def is_alive(self): - """返回元素是否仍在DOM中""" - try: - d = self._ele.attrs - return True - except Exception: - return False - - @property - def is_in_viewport(self): - """返回元素是否出现在视口中,以元素click_point为判断""" - x, y = self._ele.locations.click_point - return location_in_viewport(self._ele.page, x, y) if x else False - - @property - def is_whole_in_viewport(self): - """返回元素是否整个都在视口内""" - x1, y1 = self._ele.location - w, h = self._ele.size - x2, y2 = x1 + w, y1 + h - return location_in_viewport(self._ele.page, x1, y1) and location_in_viewport(self._ele.page, x2, y2) - - @property - def is_covered(self): - """返回元素是否被覆盖,与是否在视口中无关""" - lx, ly = self._ele.locations.click_point - try: - r = self._ele.page.run_cdp('DOM.getNodeForLocation', x=lx, y=ly) - except CDPError: - return False - - if r.get('backendNodeId') != self._ele.ids.backend_id: - return True - - return False - - @property - def has_rect(self): - """返回元素是否拥有位置和大小,没有返回False,有返回大小元组""" - try: - return self._ele.size - except NoRectError: - return False - - -class ShadowRootStates(object): - def __init__(self, ele): - """ - :param ele: ChromiumElement - """ - self._ele = ele - - @property - def is_enabled(self): - """返回元素是否可用""" - return not self._ele.run_js('return this.disabled;') - - @property - def is_alive(self): - """返回元素是否仍在DOM中""" - try: - self._ele.page.run_cdp('DOM.describeNode', backendNodeId=self._ele.ids.backend_id) - return True - except Exception: - return False - - class Locations(object): def __init__(self, ele): """ @@ -1574,7 +1479,7 @@ class Locations(object): def viewport_click_point(self): """返回元素接受点击的点视口坐标""" m = self._get_viewport_rect('padding') - return int(self.viewport_midpoint[0]), int(m[1]) + 1 + return int(self.viewport_midpoint[0]), int(m[1]) + 3 @property def screen_location(self): @@ -1600,18 +1505,30 @@ class Locations(object): pr = self._ele.page.run_js('return window.devicePixelRatio;') return int((vx + ex) * pr), int((ey + vy) * pr) + @property + def rect(self): + """返回元素四个角坐标,顺序:坐上、右上、右下、左下,没有大小的元素抛出NoRectError""" + vr = self._get_viewport_rect('border') + r = self._ele.page.run_cdp_loaded('Page.getLayoutMetrics')['visualViewport'] + sx = r['pageX'] + sy = r['pageY'] + return [(vr[0] + sx, vr[1] + sy), (vr[2] + sx, vr[3] + sy), (vr[4] + sx, vr[5] + sy), (vr[6] + sx, vr[7] + sy)] + + @property + def viewport_rect(self): + """返回元素四个角视口坐标,顺序:坐上、右上、右下、左下,没有大小的元素抛出NoRectError""" + r = self._get_viewport_rect('border') + return [(r[0], r[1]), (r[2], r[3]), (r[4], r[5]), (r[6], r[7])] + def _get_viewport_rect(self, quad): """按照类型返回在可视窗口中的范围 :param quad: 方框类型,margin border padding - :return: 四个角坐标,大小为0时返回None + :return: 四个角坐标 """ return self._ele.page.run_cdp('DOM.getBoxModel', backendNodeId=self._ele.ids.backend_id)['model'][quad] def _get_page_coord(self, x, y): """根据视口坐标获取绝对坐标""" - # js = 'return document.documentElement.scrollLeft+" "+document.documentElement.scrollTop;' - # xy = self._ele.run_js(js) - # sx, sy = xy.split(' ') r = self._ele.page.run_cdp_loaded('Page.getLayoutMetrics')['visualViewport'] sx = r['pageX'] sy = r['pageY'] @@ -1728,244 +1645,6 @@ class ChromiumElementScroll(ChromiumScroll): self._driver.page.scroll.to_see(self._driver, center=True) -class ChromiumSelect(object): - """ChromiumSelect 类专门用于处理 d 模式下 select 标签""" - - def __init__(self, ele): - """ - :param ele: select 元素对象 - """ - if ele.tag != 'select': - raise TypeError("select方法只能在元素使用。") + + self._ele = ele + + def __call__(self, text_or_index, timeout=None): + """选定下拉列表中子元素 + :param text_or_index: 根据文本、值选或序号择选项,若允许多选,传入list或tuple可多选 + :param timeout: 超时时间,不输入默认实用页面超时时间 + :return: None + """ + para_type = 'index' if isinstance(text_or_index, int) else 'text' + timeout = timeout if timeout is not None else self._ele.page.timeout + return self._select(text_or_index, para_type, timeout=timeout) + + @property + def is_multi(self): + """返回是否多选表单""" + return self._ele.attr('multiple') is not None + + @property + def options(self): + """返回所有选项元素组成的列表""" + return self._ele.eles('xpath://option') + + @property + def selected_option(self): + """返回第一个被选中的option元素 + :return: ChromiumElement对象或None + """ + ele = self._ele.run_js('return this.options[this.selectedIndex];') + return ele + + @property + def selected_options(self): + """返回所有被选中的option元素列表 + :return: ChromiumElement对象组成的列表 + """ + return [x for x in self.options if x.states.is_selected] + + def all(self): + """全选""" + if not self.is_multi: + raise TypeError("只能在多选菜单执行此操作。") + return self._by_loc('tag:option', 1, False) + + def invert(self): + """反选""" + if not self.is_multi: + raise TypeError("只能对多项选框执行反选。") + change = False + for i in self.options: + change = True + mode = 'false' if i.states.is_selected else 'true' + i.run_js(f'this.selected={mode};') + if change: + self._dispatch_change() + + def clear(self): + """清除所有已选项""" + if not self.is_multi: + raise TypeError("只能在多选菜单执行此操作。") + return self._by_loc('tag:option', 1, True) + + def by_text(self, text, timeout=None): + """此方法用于根据text值选择项。当元素是多选列表时,可以接收list或tuple + :param text: text属性值,传入list或tuple可选择多项 + :param timeout: 超时时间,为None默认使用页面超时时间 + :return: 是否选择成功 + """ + return self._select(text, 'text', False, timeout) + + def by_value(self, value, timeout=None): + """此方法用于根据value值选择项。当元素是多选列表时,可以接收list或tuple + :param value: value属性值,传入list或tuple可选择多项 + :param timeout: 超时时间,为None默认使用页面超时时间 + :return: 是否选择成功 + """ + return self._select(value, 'value', False, timeout) + + def by_index(self, index, timeout=None): + """此方法用于根据index值选择项。当元素是多选列表时,可以接收list或tuple + :param index: 序号,0开始,传入list或tuple可选择多项 + :param timeout: 超时时间,为None默认使用页面超时时间 + :return: 是否选择成功 + """ + return self._select(index, 'index', False, timeout) + + def by_loc(self, loc, timeout=None): + """用定位符选择指定的项 + :param loc: 定位符 + :param timeout: 超时时间 + :return: 是否选择成功 + """ + return self._by_loc(loc, timeout) + + def cancel_by_text(self, text, timeout=None): + """此方法用于根据text值取消选择项。当元素是多选列表时,可以接收list或tuple + :param text: 文本,传入list或tuple可取消多项 + :param timeout: 超时时间,不输入默认实用页面超时时间 + :return: 是否取消成功 + """ + return self._select(text, 'text', True, timeout) + + def cancel_by_value(self, value, timeout=None): + """此方法用于根据value值取消选择项。当元素是多选列表时,可以接收list或tuple + :param value: value属性值,传入list或tuple可取消多项 + :param timeout: 超时时间,不输入默认实用页面超时时间 + :return: 是否取消成功 + """ + return self._select(value, 'value', True, timeout) + + def cancel_by_index(self, index, timeout=None): + """此方法用于根据index值取消选择项。当元素是多选列表时,可以接收list或tuple + :param index: 序号,0开始,传入list或tuple可取消多项 + :param timeout: 超时时间,不输入默认实用页面超时时间 + :return: 是否取消成功 + """ + return self._select(index, 'index', True, timeout) + + def cancel_by_loc(self, loc, timeout=None): + """用定位符取消选择指定的项 + :param loc: 定位符 + :param timeout: 超时时间 + :return: 是否选择成功 + """ + return self._by_loc(loc, timeout, True) + + def _by_loc(self, loc, timeout=None, cancel=False): + """用定位符取消选择指定的项 + :param loc: 定位符 + :param timeout: 超时时间 + :param cancel: 是否取消选择 + :return: 是否选择成功 + """ + eles = self._ele.eles(loc, timeout) + if not eles: + return False + + mode = 'false' if cancel else 'true' + if self.is_multi: + for ele in eles: + ele.run_js(f'this.selected={mode};') + self._dispatch_change() + return True + + eles[0].run_js(f'this.selected={mode};') + self._dispatch_change() + return True + + def _select(self, condition, para_type='text', cancel=False, timeout=None): + """选定或取消选定下拉列表中子元素 + :param condition: 根据文本、值选或序号择选项,若允许多选,传入list或tuple可多选 + :param para_type: 参数类型,可选 'text'、'value'、'index' + :param cancel: 是否取消选择 + :return: 是否选择成功 + """ + if not self.is_multi and isinstance(condition, (list, tuple)): + raise TypeError('单选列表只能传入str格式。') + + mode = 'false' if cancel else 'true' + timeout = timeout if timeout is not None else self._ele.page.timeout + condition = set(condition) if isinstance(condition, (list, tuple)) else {condition} + + if para_type in ('text', 'value'): + return self._text_value([str(i) for i in condition], para_type, mode, timeout) + elif para_type == 'index': + return self._index(condition, mode, timeout) + + def _text_value(self, condition, para_type, mode, timeout): + """执行text和value搜索 + :param condition: 条件set + :param para_type: 参数类型,可选 'text'、'value' + :param mode: 'true' 或 'false' + :param timeout: 超时时间 + :return: 是否选择成功 + """ + ok = False + text_len = len(condition) + eles = [] + end_time = perf_counter() + timeout + while perf_counter() < end_time: + if para_type == 'text': + eles = [i for i in self.options if i.text in condition] + elif para_type == 'value': + eles = [i for i in self.options if i.attr('value') in condition] + + if len(eles) >= text_len: + ok = True + break + + if ok: + for i in eles: + i.run_js(f'this.selected={mode};') + + self._dispatch_change() + return True + + return False + + def _index(self, condition, mode, timeout): + """执行index搜索 + :param condition: 条件set + :param mode: 'true' 或 'false' + :param timeout: 超时时间 + :return: 是否选择成功 + """ + ok = False + condition = [int(i) for i in condition] + text_len = max(condition) + end_time = perf_counter() + timeout + while perf_counter() < end_time: + if len(self.options) >= text_len: + ok = True + break + + if ok: + eles = self.options + for i in condition: + eles[i - 1].run_js(f'this.selected={mode};') + + self._dispatch_change() + return True + + return False + + def _dispatch_change(self): + """触发修改动作""" + self._ele.run_js('this.dispatchEvent(new UIEvent("change"));') diff --git a/DrissionPage/_units/select_element.pyi b/DrissionPage/_units/select_element.pyi new file mode 100644 index 0000000..55d07b6 --- /dev/null +++ b/DrissionPage/_units/select_element.pyi @@ -0,0 +1,63 @@ +# -*- coding:utf-8 -*- +""" +@Author : g1879 +@Contact : g1879@qq.com +""" +from typing import Union, Tuple, List + +from .._elements.chromium_element import ChromiumElement + + +class SelectElement(object): + def __init__(self, ele: ChromiumElement): + self._ele: ChromiumElement = ... + + def __call__(self, text_or_index: Union[str, int, list, tuple], timeout: float = None) -> bool: ... + + @property + def is_multi(self) -> bool: ... + + @property + def options(self) -> List[ChromiumElement]: ... + + @property + def selected_option(self) -> Union[ChromiumElement, None]: ... + + @property + def selected_options(self) -> List[ChromiumElement]: ... + + def clear(self) -> None: ... + + def all(self) -> None: ... + + def by_text(self, text: Union[str, list, tuple], timeout: float = None) -> bool: ... + + def by_value(self, value: Union[str, list, tuple], timeout: float = None) -> bool: ... + + def by_index(self, index: Union[int, list, tuple], timeout: float = None) -> bool: ... + + def by_loc(self, loc: Union[str, Tuple[str, str]], timeout: float = None) -> bool: ... + + def cancel_by_text(self, text: Union[str, list, tuple], timeout: float = None) -> bool: ... + + def cancel_by_value(self, value: Union[str, list, tuple], timeout: float = None) -> bool: ... + + def cancel_by_index(self, index: Union[int, list, tuple], timeout: float = None) -> bool: ... + + def cancel_by_loc(self, loc: Union[str, Tuple[str, str]], timeout: float = None) -> bool: ... + + def invert(self) -> None: ... + + def _by_loc(self, loc: Union[str, Tuple[str, str]], timeout: float = None, cancel: bool = False) -> bool: ... + + def _select(self, + condition: Union[str, int, list, tuple] = None, + para_type: str = 'text', + cancel: bool = False, + timeout: float = None) -> bool: ... + + def _text_value(self, condition: Union[list, set], para_type: str, mode: str, timeout: float) -> bool: ... + + def _index(self, condition: set, mode: str, timeout: float) -> bool: ... + + def _dispatch_change(self) -> None: ... \ No newline at end of file