元素增加locations.rect和locations.viewport_rect属性;优化is_covered逻辑;优化Driver _send()逻辑;修复Listener method设置问题;优化点击逻辑;Driver统一在browser创建

This commit is contained in:
g1879 2023-11-07 16:15:01 +08:00
parent 99dfaf91da
commit ed8c53d738
16 changed files with 593 additions and 531 deletions

View File

@ -5,7 +5,7 @@
"""
from time import sleep
from .chromium_driver import BrowserDriver
from .chromium_driver import BrowserDriver, ChromiumDriver
from .._units.download_manager import BrowserDownloadManager
@ -38,6 +38,8 @@ class Browser(object):
self._driver = BrowserDriver(browser_id, 'browser', address)
self.id = browser_id
self._frames = {}
self._drivers = {}
# self._drivers = {t: ChromiumDriver(t, 'page', address) for t in self.tabs}
self._connected = False
self._process_id = None
@ -49,13 +51,27 @@ class Browser(object):
self.run_cdp('Target.setDiscoverTargets', discover=True)
self._driver.set_listener('Target.targetDestroyed', self._onTargetDestroyed)
self._driver.set_listener('Target.targetCreated', self._onTargetCreated)
def _get_driver(self, tab_id):
"""获取对应tab id的ChromiumDriver
:param tab_id: 标签页id
:return: ChromiumDriver对象
"""
return self._drivers.pop(tab_id, ChromiumDriver(tab_id, 'page', self.address))
def _onTargetCreated(self, **kwargs):
"""标签页创建时执行"""
if kwargs['targetInfo']['type'] == 'page' and not kwargs['targetInfo']['url'].startswith('devtools://'):
self._drivers[kwargs['targetInfo']['targetId']] = ChromiumDriver(kwargs['targetInfo']['targetId'], 'page', self.address)
def _onTargetDestroyed(self, **kwargs):
"""标签页关闭时执行"""
tab_id = kwargs['targetId']
self._dl_mgr.clear_tab_info(tab_id)
for item in [(k, i) for k, i in self._frames.items() if i == tab_id]:
self._frames.pop(item[0])
for key in [k for k, i in self._frames.items() if i == tab_id]:
self._frames.pop(key, None)
self._drivers.pop(tab_id, None)
def connect_to_page(self):
"""执行与page相关的逻辑"""
@ -150,4 +166,4 @@ class Browser(object):
break
sleep(.2)
Browser.BROWSERS.pop(self.id)
Browser.BROWSERS.pop(self.id, None)

View File

@ -5,7 +5,7 @@
"""
from typing import List, Optional, Union
from .chromium_driver import BrowserDriver
from .chromium_driver import BrowserDriver, ChromiumDriver
from .._pages.chromium_page import ChromiumPage
from .._units.download_manager import BrowserDownloadManager
@ -17,6 +17,7 @@ class Browser(object):
id: str = ...
address: str = ...
_frames: dict = ...
_drivers: dict = ...
_process_id: Optional[int] = ...
_dl_mgr: BrowserDownloadManager = ...
_connected: bool = ...
@ -25,6 +26,8 @@ class Browser(object):
def __init__(self, address: str, browser_id: str, page: ChromiumPage): ...
def _get_driver(self, tab_id: str)->ChromiumDriver: ...
def run_cdp(self, cmd, **cmd_args) -> dict: ...
@property
@ -50,6 +53,8 @@ class Browser(object):
def connect_to_page(self) -> None: ...
def _onTargetCreated(self, **kwargs) -> None: ...
def _onTargetDestroyed(self, **kwargs) -> None: ...
def quit(self) -> None: ...

View File

@ -71,26 +71,29 @@ class ChromiumDriver(object):
try:
self._ws.send(message_json)
except OSError:
self.method_results.pop(ws_id)
self.method_results.pop(ws_id, None)
return None
while not self._stopped.is_set():
try:
return self.method_results[ws_id].get(timeout=.2)
result = self.method_results[ws_id].get(timeout=.2)
self.method_results.pop(ws_id, None)
return result
except Empty:
if self.alert_flag:
self.alert_flag = False
return {'result': {'message': 'alert exists.'}}
result = {'result': {'message': 'alert exists.'}}
self.method_results.pop(ws_id, None)
return result
elif timeout is not None and perf_counter() > timeout:
return {'error': {'message': 'timeout'}}
result = {'error': {'message': 'timeout'}}
self.method_results.pop(ws_id, None)
return result
continue
finally:
self.method_results.pop(ws_id)
def _recv_loop(self):
"""接收浏览器信息的守护线程方法"""
while not self._stopped.is_set():

View File

@ -169,8 +169,8 @@ def cookie_to_dict(cookie):
"""
if isinstance(cookie, Cookie):
cookie_dict = cookie.__dict__.copy()
cookie_dict.pop('rfc2109')
cookie_dict.pop('_rest')
cookie_dict.pop('rfc2109', None)
cookie_dict.pop('_rest', None)
return cookie_dict
elif isinstance(cookie, dict):

View File

@ -209,7 +209,7 @@ class ChromiumOptions(object):
:param arg: 设置项名称
:return: 当前对象
"""
self._prefs.pop(arg)
self._prefs.pop(arg, None)
return self
def remove_pref_from_file(self, arg):

View File

@ -161,8 +161,7 @@ class SessionOptions(object):
return self
attr = attr.lower()
if attr in self._headers:
self._headers.pop(attr)
self._headers.pop(attr, None)
return self

View File

@ -13,12 +13,14 @@ from .._commons.constants import FRAME_ELEMENT, NoneElement, Settings
from .._commons.keys import keys_to_typing, keyDescriptionForString, keyDefinitions
from .._commons.locator import get_loc
from .._commons.tools import get_usable_path
from .._commons.web import make_absolute_link, get_ele_txt, format_html, is_js_func, location_in_viewport, offset_scroll
from .._commons.web import make_absolute_link, get_ele_txt, format_html, is_js_func, offset_scroll
from .._units.clicker import Clicker
from .._units.element_states import ChromiumElementStates, ShadowRootStates
from .._units.select_element import SelectElement
from .._units.setter import ChromiumElementSetter
from .._units.waiter import ChromiumElementWaiter
from ..errors import ContextLossError, ElementLossError, JavaScriptError, ElementNotFoundError, \
CDPError, NoResourceError, NoRectError, AlertExistsError
from ..errors import (ContextLossError, ElementLossError, JavaScriptError, ElementNotFoundError,
CDPError, NoResourceError, AlertExistsError)
class ChromiumElement(DrissionElement):
@ -202,7 +204,7 @@ class ChromiumElement(DrissionElement):
if self.tag != 'select':
self._select = False
else:
self._select = ChromiumSelect(self)
self._select = SelectElement(self)
return self._select
@ -1436,103 +1438,6 @@ def send_key(ele, modifier, key):
ele.page.run_cdp('Input.dispatchKeyEvent', **data)
class ChromiumElementStates(object):
def __init__(self, ele):
"""
:param ele: ChromiumElement
"""
self._ele = ele
@property
def is_selected(self):
"""返回元素是否被选择"""
return self._ele.run_js('return this.selected;')
@property
def is_checked(self):
"""返回元素是否被选择"""
return self._ele.run_js('return this.checked;')
@property
def is_displayed(self):
"""返回元素是否显示"""
return not (self._ele.style('visibility') == 'hidden'
or self._ele.run_js('return this.offsetParent === null;')
or self._ele.style('display') == 'none')
@property
def is_enabled(self):
"""返回元素是否可用"""
return not self._ele.run_js('return this.disabled;')
@property
def is_alive(self):
"""返回元素是否仍在DOM中"""
try:
d = self._ele.attrs
return True
except Exception:
return False
@property
def is_in_viewport(self):
"""返回元素是否出现在视口中以元素click_point为判断"""
x, y = self._ele.locations.click_point
return location_in_viewport(self._ele.page, x, y) if x else False
@property
def is_whole_in_viewport(self):
"""返回元素是否整个都在视口内"""
x1, y1 = self._ele.location
w, h = self._ele.size
x2, y2 = x1 + w, y1 + h
return location_in_viewport(self._ele.page, x1, y1) and location_in_viewport(self._ele.page, x2, y2)
@property
def is_covered(self):
"""返回元素是否被覆盖,与是否在视口中无关"""
lx, ly = self._ele.locations.click_point
try:
r = self._ele.page.run_cdp('DOM.getNodeForLocation', x=lx, y=ly)
except CDPError:
return False
if r.get('backendNodeId') != self._ele.ids.backend_id:
return True
return False
@property
def has_rect(self):
"""返回元素是否拥有位置和大小没有返回False有返回大小元组"""
try:
return self._ele.size
except NoRectError:
return False
class ShadowRootStates(object):
def __init__(self, ele):
"""
:param ele: ChromiumElement
"""
self._ele = ele
@property
def is_enabled(self):
"""返回元素是否可用"""
return not self._ele.run_js('return this.disabled;')
@property
def is_alive(self):
"""返回元素是否仍在DOM中"""
try:
self._ele.page.run_cdp('DOM.describeNode', backendNodeId=self._ele.ids.backend_id)
return True
except Exception:
return False
class Locations(object):
def __init__(self, ele):
"""
@ -1574,7 +1479,7 @@ class Locations(object):
def viewport_click_point(self):
"""返回元素接受点击的点视口坐标"""
m = self._get_viewport_rect('padding')
return int(self.viewport_midpoint[0]), int(m[1]) + 1
return int(self.viewport_midpoint[0]), int(m[1]) + 3
@property
def screen_location(self):
@ -1600,18 +1505,30 @@ class Locations(object):
pr = self._ele.page.run_js('return window.devicePixelRatio;')
return int((vx + ex) * pr), int((ey + vy) * pr)
@property
def rect(self):
"""返回元素四个角坐标顺序坐上、右上、右下、左下没有大小的元素抛出NoRectError"""
vr = self._get_viewport_rect('border')
r = self._ele.page.run_cdp_loaded('Page.getLayoutMetrics')['visualViewport']
sx = r['pageX']
sy = r['pageY']
return [(vr[0] + sx, vr[1] + sy), (vr[2] + sx, vr[3] + sy), (vr[4] + sx, vr[5] + sy), (vr[6] + sx, vr[7] + sy)]
@property
def viewport_rect(self):
"""返回元素四个角视口坐标顺序坐上、右上、右下、左下没有大小的元素抛出NoRectError"""
r = self._get_viewport_rect('border')
return [(r[0], r[1]), (r[2], r[3]), (r[4], r[5]), (r[6], r[7])]
def _get_viewport_rect(self, quad):
"""按照类型返回在可视窗口中的范围
:param quad: 方框类型margin border padding
:return: 四个角坐标大小为0时返回None
:return: 四个角坐标
"""
return self._ele.page.run_cdp('DOM.getBoxModel', backendNodeId=self._ele.ids.backend_id)['model'][quad]
def _get_page_coord(self, x, y):
"""根据视口坐标获取绝对坐标"""
# js = 'return document.documentElement.scrollLeft+" "+document.documentElement.scrollTop;'
# xy = self._ele.run_js(js)
# sx, sy = xy.split(' ')
r = self._ele.page.run_cdp_loaded('Page.getLayoutMetrics')['visualViewport']
sx = r['pageX']
sy = r['pageY']
@ -1728,244 +1645,6 @@ class ChromiumElementScroll(ChromiumScroll):
self._driver.page.scroll.to_see(self._driver, center=True)
class ChromiumSelect(object):
"""ChromiumSelect 类专门用于处理 d 模式下 select 标签"""
def __init__(self, ele):
"""
:param ele: select 元素对象
"""
if ele.tag != 'select':
raise TypeError("select方法只能在<select>元素使用。")
self._ele = ele
def __call__(self, text_or_index, timeout=None):
"""选定下拉列表中子元素
:param text_or_index: 根据文本值选或序号择选项若允许多选传入list或tuple可多选
:param timeout: 超时时间不输入默认实用页面超时时间
:return: None
"""
para_type = 'index' if isinstance(text_or_index, int) else 'text'
timeout = timeout if timeout is not None else self._ele.page.timeout
return self._select(text_or_index, para_type, timeout=timeout)
@property
def is_multi(self):
"""返回是否多选表单"""
return self._ele.attr('multiple') is not None
@property
def options(self):
"""返回所有选项元素组成的列表"""
return self._ele.eles('xpath://option')
@property
def selected_option(self):
"""返回第一个被选中的option元素
:return: ChromiumElement对象或None
"""
ele = self._ele.run_js('return this.options[this.selectedIndex];')
return ele
@property
def selected_options(self):
"""返回所有被选中的option元素列表
:return: ChromiumElement对象组成的列表
"""
return [x for x in self.options if x.states.is_selected]
def all(self):
"""全选"""
if not self.is_multi:
raise TypeError("只能在多选菜单执行此操作。")
return self._by_loc('tag:option', 1, False)
def invert(self):
"""反选"""
if not self.is_multi:
raise TypeError("只能对多项选框执行反选。")
change = False
for i in self.options:
change = True
mode = 'false' if i.states.is_selected else 'true'
i.run_js(f'this.selected={mode};')
if change:
self._dispatch_change()
def clear(self):
"""清除所有已选项"""
if not self.is_multi:
raise TypeError("只能在多选菜单执行此操作。")
return self._by_loc('tag:option', 1, True)
def by_text(self, text, timeout=None):
"""此方法用于根据text值选择项。当元素是多选列表时可以接收list或tuple
:param text: text属性值传入list或tuple可选择多项
:param timeout: 超时时间为None默认使用页面超时时间
:return: 是否选择成功
"""
return self._select(text, 'text', False, timeout)
def by_value(self, value, timeout=None):
"""此方法用于根据value值选择项。当元素是多选列表时可以接收list或tuple
:param value: value属性值传入list或tuple可选择多项
:param timeout: 超时时间为None默认使用页面超时时间
:return: 是否选择成功
"""
return self._select(value, 'value', False, timeout)
def by_index(self, index, timeout=None):
"""此方法用于根据index值选择项。当元素是多选列表时可以接收list或tuple
:param index: 序号0开始传入list或tuple可选择多项
:param timeout: 超时时间为None默认使用页面超时时间
:return: 是否选择成功
"""
return self._select(index, 'index', False, timeout)
def by_loc(self, loc, timeout=None):
"""用定位符选择指定的项
:param loc: 定位符
:param timeout: 超时时间
:return: 是否选择成功
"""
return self._by_loc(loc, timeout)
def cancel_by_text(self, text, timeout=None):
"""此方法用于根据text值取消选择项。当元素是多选列表时可以接收list或tuple
:param text: 文本传入list或tuple可取消多项
:param timeout: 超时时间不输入默认实用页面超时时间
:return: 是否取消成功
"""
return self._select(text, 'text', True, timeout)
def cancel_by_value(self, value, timeout=None):
"""此方法用于根据value值取消选择项。当元素是多选列表时可以接收list或tuple
:param value: value属性值传入list或tuple可取消多项
:param timeout: 超时时间不输入默认实用页面超时时间
:return: 是否取消成功
"""
return self._select(value, 'value', True, timeout)
def cancel_by_index(self, index, timeout=None):
"""此方法用于根据index值取消选择项。当元素是多选列表时可以接收list或tuple
:param index: 序号0开始传入list或tuple可取消多项
:param timeout: 超时时间不输入默认实用页面超时时间
:return: 是否取消成功
"""
return self._select(index, 'index', True, timeout)
def cancel_by_loc(self, loc, timeout=None):
"""用定位符取消选择指定的项
:param loc: 定位符
:param timeout: 超时时间
:return: 是否选择成功
"""
return self._by_loc(loc, timeout, True)
def _by_loc(self, loc, timeout=None, cancel=False):
"""用定位符取消选择指定的项
:param loc: 定位符
:param timeout: 超时时间
:param cancel: 是否取消选择
:return: 是否选择成功
"""
eles = self._ele.eles(loc, timeout)
if not eles:
return False
mode = 'false' if cancel else 'true'
if self.is_multi:
for ele in eles:
ele.run_js(f'this.selected={mode};')
self._dispatch_change()
return True
eles[0].run_js(f'this.selected={mode};')
self._dispatch_change()
return True
def _select(self, condition, para_type='text', cancel=False, timeout=None):
"""选定或取消选定下拉列表中子元素
:param condition: 根据文本值选或序号择选项若允许多选传入list或tuple可多选
:param para_type: 参数类型可选 'text''value''index'
:param cancel: 是否取消选择
:return: 是否选择成功
"""
if not self.is_multi and isinstance(condition, (list, tuple)):
raise TypeError('单选列表只能传入str格式。')
mode = 'false' if cancel else 'true'
timeout = timeout if timeout is not None else self._ele.page.timeout
condition = set(condition) if isinstance(condition, (list, tuple)) else {condition}
if para_type in ('text', 'value'):
return self._text_value([str(i) for i in condition], para_type, mode, timeout)
elif para_type == 'index':
return self._index(condition, mode, timeout)
def _text_value(self, condition, para_type, mode, timeout):
"""执行text和value搜索
:param condition: 条件set
:param para_type: 参数类型可选 'text''value'
:param mode: 'true' 'false'
:param timeout: 超时时间
:return: 是否选择成功
"""
ok = False
text_len = len(condition)
eles = []
end_time = perf_counter() + timeout
while perf_counter() < end_time:
if para_type == 'text':
eles = [i for i in self.options if i.text in condition]
elif para_type == 'value':
eles = [i for i in self.options if i.attr('value') in condition]
if len(eles) >= text_len:
ok = True
break
if ok:
for i in eles:
i.run_js(f'this.selected={mode};')
self._dispatch_change()
return True
return False
def _index(self, condition, mode, timeout):
"""执行index搜索
:param condition: 条件set
:param mode: 'true' 'false'
:param timeout: 超时时间
:return: 是否选择成功
"""
ok = False
condition = [int(i) for i in condition]
text_len = max(condition)
end_time = perf_counter() + timeout
while perf_counter() < end_time:
if len(self.options) >= text_len:
ok = True
break
if ok:
eles = self.options
for i in condition:
eles[i - 1].run_js(f'this.selected={mode};')
self._dispatch_change()
return True
return False
def _dispatch_change(self):
"""触发修改动作"""
self._ele.run_js('this.dispatchEvent(new UIEvent("change"));')
class Pseudo(object):
def __init__(self, ele):
"""

View File

@ -4,9 +4,8 @@
@Contact : g1879@qq.com
"""
from pathlib import Path
from typing import Union, Tuple, List, Any
from typing import Union, Tuple, List, Any, Optional
from .._units.clicker import Clicker
from .._base.base import DrissionElement, BaseElement
from .._commons.constants import NoneElement
from .._elements.session_element import SessionElement
@ -14,6 +13,9 @@ from .._pages.chromium_base import ChromiumBase
from .._pages.chromium_frame import ChromiumFrame
from .._pages.chromium_page import ChromiumPage
from .._pages.web_page import WebPage
from .._units.clicker import Clicker
from .._units.element_states import ShadowRootStates, ChromiumElementStates
from .._units.select_element import SelectElement
from .._units.setter import ChromiumElementSetter
from .._units.waiter import ChromiumElementWaiter
@ -32,7 +34,7 @@ class ChromiumElement(DrissionElement):
self._ids: ChromiumElementIds = ...
self._scroll: ChromiumElementScroll = ...
self._clicker: Clicker = ...
self._select: ChromiumSelect = ...
self._select: SelectElement = ...
self._wait: ChromiumElementWaiter = ...
self._locations: Locations = ...
self._set: ChromiumElementSetter = ...
@ -148,7 +150,7 @@ class ChromiumElement(DrissionElement):
def wait(self) -> ChromiumElementWaiter: ...
@property
def select(self) -> ChromiumSelect: ...
def select(self) -> SelectElement: ...
def check(self, uncheck: bool = False) -> None: ...
@ -213,38 +215,6 @@ class ChromiumElement(DrissionElement):
def _get_ele_path(self, mode: str) -> str: ...
class ChromiumElementStates(object):
def __init__(self, ele: ChromiumElement):
self._ele: ChromiumElement = ...
@property
def is_selected(self) -> bool: ...
@property
def is_checked(self) -> bool: ...
@property
def is_displayed(self) -> bool: ...
@property
def is_enabled(self) -> bool: ...
@property
def is_alive(self) -> bool: ...
@property
def is_in_viewport(self) -> bool: ...
@property
def is_whole_in_viewport(self) -> bool: ...
@property
def is_covered(self) -> bool: ...
@property
def has_rect(self) -> Union[bool, Tuple[int, int]]: ...
class ChromiumShadowRoot(BaseElement):
def __init__(self,
@ -393,20 +363,6 @@ def send_enter(ele: ChromiumElement) -> None: ...
def send_key(ele: ChromiumElement, modifier: int, key: str) -> None: ...
class ShadowRootStates(object):
def __init__(self, ele: ChromiumShadowRoot):
"""
:param ele: ChromiumElement
"""
self._ele: ChromiumShadowRoot = ...
@property
def is_enabled(self) -> bool: ...
@property
def is_alive(self) -> bool: ...
class Locations(object):
def __init__(self, ele: ChromiumElement):
self._ele: ChromiumElement = ...
@ -438,6 +394,12 @@ class Locations(object):
@property
def screen_click_point(self) -> Tuple[int, int]: ...
@property
def rect(self) -> list: ...
@property
def viewport_rect(self) -> list: ...
def _get_viewport_rect(self, quad: str) -> Union[list, None]: ...
def _get_page_coord(self, x: int, y: int) -> Tuple[int, int]: ...
@ -482,61 +444,6 @@ class ChromiumElementScroll(ChromiumScroll):
def to_center(self) -> None: ...
class ChromiumSelect(object):
def __init__(self, ele: ChromiumElement):
self._ele: ChromiumElement = ...
def __call__(self, text_or_index: Union[str, int, list, tuple], timeout: float = None) -> bool: ...
@property
def is_multi(self) -> bool: ...
@property
def options(self) -> List[ChromiumElement]: ...
@property
def selected_option(self) -> Union[ChromiumElement, None]: ...
@property
def selected_options(self) -> List[ChromiumElement]: ...
def clear(self) -> None: ...
def all(self) -> None: ...
def by_text(self, text: Union[str, list, tuple], timeout: float = None) -> bool: ...
def by_value(self, value: Union[str, list, tuple], timeout: float = None) -> bool: ...
def by_index(self, index: Union[int, list, tuple], timeout: float = None) -> bool: ...
def by_loc(self, loc: Union[str, Tuple[str, str]], timeout: float = None) -> bool: ...
def cancel_by_text(self, text: Union[str, list, tuple], timeout: float = None) -> bool: ...
def cancel_by_value(self, value: Union[str, list, tuple], timeout: float = None) -> bool: ...
def cancel_by_index(self, index: Union[int, list, tuple], timeout: float = None) -> bool: ...
def cancel_by_loc(self, loc: Union[str, Tuple[str, str]], timeout: float = None) -> bool: ...
def invert(self) -> None: ...
def _by_loc(self, loc: Union[str, Tuple[str, str]], timeout: float = None, cancel: bool = False) -> bool: ...
def _select(self,
condition: Union[str, int, list, tuple] = None,
para_type: str = 'text',
cancel: bool = False,
timeout: float = None) -> bool: ...
def _text_value(self, condition: Union[list, set], para_type: str, mode: str, timeout: float) -> bool: ...
def _index(self, condition: set, mode: str, timeout: float) -> bool: ...
def _dispatch_change(self) -> None: ...
class Pseudo(object):
def __init__(self, ele: ChromiumElement):
self._ele: ChromiumElement = ...

View File

@ -11,7 +11,6 @@ from threading import Thread
from time import perf_counter, sleep
from .._base.base import BasePage
from .._base.chromium_driver import ChromiumDriver
from .._commons.constants import ERROR, NoneElement
from .._commons.locator import get_loc
from .._commons.tools import get_usable_path
@ -95,7 +94,7 @@ class ChromiumBase(BasePage):
:return: None
"""
self._is_loading = True
self._driver = ChromiumDriver(tab_id=tab_id, tab_type='page', address=self.address)
self._driver = self.browser._get_driver(tab_id)
self._alert = Alert()
self._driver.set_listener('Page.javascriptDialogOpening', self._on_alert_open)
self._driver.set_listener('Page.javascriptDialogClosed', self._on_alert_close)
@ -140,10 +139,7 @@ class ChromiumBase(BasePage):
self._is_reading = False
def _onFrameDetached(self, **kwargs):
try:
self.browser._frames.pop(kwargs['frameId'])
except KeyError:
pass
self.browser._frames.pop(kwargs['frameId'], None)
def _onFrameAttached(self, **kwargs):
self.browser._frames[kwargs['frameId']] = self.tab_id

View File

@ -3,11 +3,11 @@
@Author : g1879
@Contact : g1879@qq.com
"""
from time import perf_counter
from time import perf_counter, sleep
from .._commons.constants import Settings
from .._commons.web import offset_scroll
from ..errors import NoRectError, CanNotClickError
from ..errors import CanNotClickError, CDPError
class Clicker(object):
@ -33,33 +33,45 @@ class Clicker(object):
:return: 是否点击成功
"""
if not by_js: # 模拟点击
try:
can_click = False
timeout = self._ele.page.timeout if timeout is None else timeout
if timeout == 0 and self._ele.states.has_rect:
self._ele.scroll.to_see()
can_click = False
if self._ele.states.is_in_viewport and self._ele.states.is_enabled and self._ele.states.is_displayed:
can_click = True
timeout = self._ele.page.timeout if timeout is None else timeout
if timeout == 0:
if self._ele.states.is_in_viewport and self._ele.states.is_enabled and self._ele.states.is_displayed:
can_click = True
else:
end_time = perf_counter() + timeout
else:
end_time = perf_counter() + timeout
while not self._ele.states.has_rect and perf_counter() < end_time:
sleep(.001)
if self._ele.states.has_rect:
self._ele.scroll.to_see()
while perf_counter() < end_time:
if self._ele.states.is_in_viewport and self._ele.states.is_enabled and self._ele.states.is_displayed:
if (self._ele.states.is_in_viewport and self._ele.states.is_enabled
and self._ele.states.is_displayed):
can_click = True
break
sleep(.001)
if not self._ele.states.is_in_viewport:
by_js = True
elif can_click and (by_js is False or not self._ele.states.is_covered):
client_x, client_y = self._ele.locations.viewport_midpoint if self._ele.tag == 'input' \
else self._ele.locations.viewport_click_point
self._click(client_x, client_y)
return True
except NoRectError:
if not self._ele.states.has_rect or not self._ele.states.is_in_viewport:
by_js = True
elif can_click and (by_js is False or not self._ele.states.is_covered):
vx, vy = self._ele.locations.click_point
try:
r = self._ele.page.run_cdp('DOM.getNodeForLocation', x=vx, y=vy, includeUserAgentShadowDOM=True,
ignorePointerEventsNone=True)
if r['backendNodeId'] != self._ele.ids.backend_id:
vx, vy = self._ele.locations.viewport_click_point
else:
vx, vy = self._ele.locations.viewport_click_point
except CDPError:
vx, vy = self._ele.locations.viewport_midpoint
self._click(vx, vy)
return True
if by_js is not False:
self._ele.run_js('this.click();')
return True

View File

@ -101,7 +101,7 @@ class BrowserDownloadManager(object):
mission.final_path = final_path
if mission.tab_id in self._tab_missions and mission.id in self._tab_missions[mission.tab_id]:
self._tab_missions[mission.tab_id].remove(mission.id)
self._missions.pop(mission.id)
self._missions.pop(mission.id, None)
mission._is_done = True
def cancel(self, mission):
@ -127,18 +127,9 @@ class BrowserDownloadManager(object):
:param tab_id: 标签页id
:return: None
"""
try:
self._tab_missions.pop(tab_id)
except KeyError:
pass
try:
self._flags.pop(tab_id)
except KeyError:
pass
try:
TabDownloadSettings.TABS.pop(tab_id)
except KeyError:
pass
self._tab_missions.pop(tab_id, None)
self._flags.pop(tab_id, None)
TabDownloadSettings.TABS.pop(tab_id, None)
def _onDownloadWillBegin(self, **kwargs):
"""用于获取弹出新标签页触发的下载任务"""

View File

@ -0,0 +1,100 @@
# -*- coding:utf-8 -*-
from .._commons.web import location_in_viewport
from ..errors import CDPError, NoRectError
class ChromiumElementStates(object):
def __init__(self, ele):
"""
:param ele: ChromiumElement
"""
self._ele = ele
@property
def is_selected(self):
"""返回元素是否被选择"""
return self._ele.run_js('return this.selected;')
@property
def is_checked(self):
"""返回元素是否被选择"""
return self._ele.run_js('return this.checked;')
@property
def is_displayed(self):
"""返回元素是否显示"""
return not (self._ele.style('visibility') == 'hidden'
or self._ele.run_js('return this.offsetParent === null;')
or self._ele.style('display') == 'none')
@property
def is_enabled(self):
"""返回元素是否可用"""
return not self._ele.run_js('return this.disabled;')
@property
def is_alive(self):
"""返回元素是否仍在DOM中"""
try:
d = self._ele.attrs
return True
except Exception:
return False
@property
def is_in_viewport(self):
"""返回元素是否出现在视口中以元素click_point为判断"""
x, y = self._ele.locations.click_point
return location_in_viewport(self._ele.page, x, y) if x else False
@property
def is_whole_in_viewport(self):
"""返回元素是否整个都在视口内"""
x1, y1 = self._ele.location
w, h = self._ele.size
x2, y2 = x1 + w, y1 + h
return location_in_viewport(self._ele.page, x1, y1) and location_in_viewport(self._ele.page, x2, y2)
@property
def is_covered(self):
"""返回元素是否被覆盖,与是否在视口中无关"""
lx, ly = self._ele.locations.click_point
try:
r = self._ele.page.run_cdp('DOM.getNodeForLocation', x=lx, y=ly)
except CDPError:
return False
if r.get('backendNodeId') != self._ele.ids.backend_id:
return True
return False
@property
def has_rect(self):
"""返回元素是否拥有位置和大小没有返回False有返回大小元组"""
try:
return self._ele.size
except NoRectError:
return False
class ShadowRootStates(object):
def __init__(self, ele):
"""
:param ele: ChromiumElement
"""
self._ele = ele
@property
def is_enabled(self):
"""返回元素是否可用"""
return not self._ele.run_js('return this.disabled;')
@property
def is_alive(self):
"""返回元素是否仍在DOM中"""
try:
self._ele.page.run_cdp('DOM.describeNode', backendNodeId=self._ele.ids.backend_id)
return True
except Exception:
return False

View File

@ -0,0 +1,50 @@
# -*- coding:utf-8 -*-
from typing import Union, Tuple
from .._elements.chromium_element import ChromiumShadowRoot, ChromiumElement
class ChromiumElementStates(object):
def __init__(self, ele: ChromiumElement):
self._ele: ChromiumElement = ...
@property
def is_selected(self) -> bool: ...
@property
def is_checked(self) -> bool: ...
@property
def is_displayed(self) -> bool: ...
@property
def is_enabled(self) -> bool: ...
@property
def is_alive(self) -> bool: ...
@property
def is_in_viewport(self) -> bool: ...
@property
def is_whole_in_viewport(self) -> bool: ...
@property
def is_covered(self) -> bool: ...
@property
def has_rect(self) -> Union[bool, Tuple[int, int]]: ...
class ShadowRootStates(object):
def __init__(self, ele: ChromiumShadowRoot):
"""
:param ele: ChromiumElement
"""
self._ele: ChromiumShadowRoot = ...
@property
def is_enabled(self) -> bool: ...
@property
def is_alive(self) -> bool: ...

View File

@ -72,7 +72,7 @@ class NetworkListener(object):
:param method: 设置监听的请求类型可用list等指定多个为None时监听全部
:return: None
"""
if targets:
if targets or method:
self.set_targets(targets, is_regex, method)
if self.listening:
return
@ -191,23 +191,26 @@ class NetworkListener(object):
def _requestWillBeSent(self, **kwargs):
"""接收到请求时的回调函数"""
if not self._targets:
rid = kwargs['requestId']
p = self._request_ids.setdefault(rid, DataPacket(self._page.tab_id, None))
p._raw_request = kwargs
if kwargs['request'].get('hasPostData', None) and not kwargs['request'].get('postData', None):
p._raw_post_data = self._driver.call_method('Network.getRequestPostData', requestId=rid)['postData']
return
rid = kwargs['requestId']
for target in self._targets:
if ((self._is_regex and search(target, kwargs['request']['url'])) or
(not self._is_regex and target in kwargs['request']['url'])) and (
not self._method or kwargs['request']['method'] in self._method):
p = self._request_ids.setdefault(rid, DataPacket(self._page.tab_id, target))
if not self._method or kwargs['request']['method'] in self._method:
rid = kwargs['requestId']
p = self._request_ids.setdefault(rid, DataPacket(self._page.tab_id, None))
p._raw_request = kwargs
if kwargs['request'].get('hasPostData', None) and not kwargs['request'].get('postData', None):
p._raw_post_data = self._driver.call_method('Network.getRequestPostData', requestId=rid)['postData']
break
return
else:
rid = kwargs['requestId']
for target in self._targets:
if ((self._is_regex and search(target, kwargs['request']['url'])) or
(not self._is_regex and target in kwargs['request']['url'])) and (
not self._method or kwargs['request']['method'] in self._method):
p = self._request_ids.setdefault(rid, DataPacket(self._page.tab_id, target))
p._raw_request = kwargs
if kwargs['request'].get('hasPostData', None) and not kwargs['request'].get('postData', None):
p._raw_post_data = self._driver.call_method('Network.getRequestPostData', requestId=rid)[
'postData']
break
def _requestWillBeSentExtraInfo(self, **kwargs):
self._extra_info_ids.setdefault(kwargs['requestId'], {})['request'] = kwargs
@ -244,11 +247,8 @@ class NetworkListener(object):
self._caught.put(dp)
try:
self._request_ids.pop(r_id)
self._extra_info_ids.pop(r_id)
except:
pass
self._request_ids.pop(r_id, None)
self._extra_info_ids.pop(r_id, None)
def _loading_failed(self, **kwargs):
"""请求失败时的回调方法"""
@ -263,11 +263,8 @@ class NetworkListener(object):
dp._responseExtraInfo = ei.get('response', None)
self._caught.put(dp)
try:
self._request_ids.pop(r_id)
self._extra_info_ids.pop(r_id)
except:
pass
self._request_ids.pop(r_id, None)
self._extra_info_ids.pop(r_id, None)
class DataPacket(object):

View File

@ -0,0 +1,244 @@
# -*- coding:utf-8 -*-
"""
@Author : g1879
@Contact : g1879@qq.com
"""
from time import perf_counter
class SelectElement(object):
"""用于处理 select 标签"""
def __init__(self, ele):
"""
:param ele: select 元素对象
"""
if ele.tag != 'select':
raise TypeError("select方法只能在<select>元素使用。")
self._ele = ele
def __call__(self, text_or_index, timeout=None):
"""选定下拉列表中子元素
:param text_or_index: 根据文本值选或序号择选项若允许多选传入list或tuple可多选
:param timeout: 超时时间不输入默认实用页面超时时间
:return: None
"""
para_type = 'index' if isinstance(text_or_index, int) else 'text'
timeout = timeout if timeout is not None else self._ele.page.timeout
return self._select(text_or_index, para_type, timeout=timeout)
@property
def is_multi(self):
"""返回是否多选表单"""
return self._ele.attr('multiple') is not None
@property
def options(self):
"""返回所有选项元素组成的列表"""
return self._ele.eles('xpath://option')
@property
def selected_option(self):
"""返回第一个被选中的option元素
:return: ChromiumElement对象或None
"""
ele = self._ele.run_js('return this.options[this.selectedIndex];')
return ele
@property
def selected_options(self):
"""返回所有被选中的option元素列表
:return: ChromiumElement对象组成的列表
"""
return [x for x in self.options if x.states.is_selected]
def all(self):
"""全选"""
if not self.is_multi:
raise TypeError("只能在多选菜单执行此操作。")
return self._by_loc('tag:option', 1, False)
def invert(self):
"""反选"""
if not self.is_multi:
raise TypeError("只能对多项选框执行反选。")
change = False
for i in self.options:
change = True
mode = 'false' if i.states.is_selected else 'true'
i.run_js(f'this.selected={mode};')
if change:
self._dispatch_change()
def clear(self):
"""清除所有已选项"""
if not self.is_multi:
raise TypeError("只能在多选菜单执行此操作。")
return self._by_loc('tag:option', 1, True)
def by_text(self, text, timeout=None):
"""此方法用于根据text值选择项。当元素是多选列表时可以接收list或tuple
:param text: text属性值传入list或tuple可选择多项
:param timeout: 超时时间为None默认使用页面超时时间
:return: 是否选择成功
"""
return self._select(text, 'text', False, timeout)
def by_value(self, value, timeout=None):
"""此方法用于根据value值选择项。当元素是多选列表时可以接收list或tuple
:param value: value属性值传入list或tuple可选择多项
:param timeout: 超时时间为None默认使用页面超时时间
:return: 是否选择成功
"""
return self._select(value, 'value', False, timeout)
def by_index(self, index, timeout=None):
"""此方法用于根据index值选择项。当元素是多选列表时可以接收list或tuple
:param index: 序号0开始传入list或tuple可选择多项
:param timeout: 超时时间为None默认使用页面超时时间
:return: 是否选择成功
"""
return self._select(index, 'index', False, timeout)
def by_loc(self, loc, timeout=None):
"""用定位符选择指定的项
:param loc: 定位符
:param timeout: 超时时间
:return: 是否选择成功
"""
return self._by_loc(loc, timeout)
def cancel_by_text(self, text, timeout=None):
"""此方法用于根据text值取消选择项。当元素是多选列表时可以接收list或tuple
:param text: 文本传入list或tuple可取消多项
:param timeout: 超时时间不输入默认实用页面超时时间
:return: 是否取消成功
"""
return self._select(text, 'text', True, timeout)
def cancel_by_value(self, value, timeout=None):
"""此方法用于根据value值取消选择项。当元素是多选列表时可以接收list或tuple
:param value: value属性值传入list或tuple可取消多项
:param timeout: 超时时间不输入默认实用页面超时时间
:return: 是否取消成功
"""
return self._select(value, 'value', True, timeout)
def cancel_by_index(self, index, timeout=None):
"""此方法用于根据index值取消选择项。当元素是多选列表时可以接收list或tuple
:param index: 序号0开始传入list或tuple可取消多项
:param timeout: 超时时间不输入默认实用页面超时时间
:return: 是否取消成功
"""
return self._select(index, 'index', True, timeout)
def cancel_by_loc(self, loc, timeout=None):
"""用定位符取消选择指定的项
:param loc: 定位符
:param timeout: 超时时间
:return: 是否选择成功
"""
return self._by_loc(loc, timeout, True)
def _by_loc(self, loc, timeout=None, cancel=False):
"""用定位符取消选择指定的项
:param loc: 定位符
:param timeout: 超时时间
:param cancel: 是否取消选择
:return: 是否选择成功
"""
eles = self._ele.eles(loc, timeout)
if not eles:
return False
mode = 'false' if cancel else 'true'
if self.is_multi:
for ele in eles:
ele.run_js(f'this.selected={mode};')
self._dispatch_change()
return True
eles[0].run_js(f'this.selected={mode};')
self._dispatch_change()
return True
def _select(self, condition, para_type='text', cancel=False, timeout=None):
"""选定或取消选定下拉列表中子元素
:param condition: 根据文本值选或序号择选项若允许多选传入list或tuple可多选
:param para_type: 参数类型可选 'text''value''index'
:param cancel: 是否取消选择
:return: 是否选择成功
"""
if not self.is_multi and isinstance(condition, (list, tuple)):
raise TypeError('单选列表只能传入str格式。')
mode = 'false' if cancel else 'true'
timeout = timeout if timeout is not None else self._ele.page.timeout
condition = set(condition) if isinstance(condition, (list, tuple)) else {condition}
if para_type in ('text', 'value'):
return self._text_value([str(i) for i in condition], para_type, mode, timeout)
elif para_type == 'index':
return self._index(condition, mode, timeout)
def _text_value(self, condition, para_type, mode, timeout):
"""执行text和value搜索
:param condition: 条件set
:param para_type: 参数类型可选 'text''value'
:param mode: 'true' 'false'
:param timeout: 超时时间
:return: 是否选择成功
"""
ok = False
text_len = len(condition)
eles = []
end_time = perf_counter() + timeout
while perf_counter() < end_time:
if para_type == 'text':
eles = [i for i in self.options if i.text in condition]
elif para_type == 'value':
eles = [i for i in self.options if i.attr('value') in condition]
if len(eles) >= text_len:
ok = True
break
if ok:
for i in eles:
i.run_js(f'this.selected={mode};')
self._dispatch_change()
return True
return False
def _index(self, condition, mode, timeout):
"""执行index搜索
:param condition: 条件set
:param mode: 'true' 'false'
:param timeout: 超时时间
:return: 是否选择成功
"""
ok = False
condition = [int(i) for i in condition]
text_len = max(condition)
end_time = perf_counter() + timeout
while perf_counter() < end_time:
if len(self.options) >= text_len:
ok = True
break
if ok:
eles = self.options
for i in condition:
eles[i - 1].run_js(f'this.selected={mode};')
self._dispatch_change()
return True
return False
def _dispatch_change(self):
"""触发修改动作"""
self._ele.run_js('this.dispatchEvent(new UIEvent("change"));')

View File

@ -0,0 +1,63 @@
# -*- coding:utf-8 -*-
"""
@Author : g1879
@Contact : g1879@qq.com
"""
from typing import Union, Tuple, List
from .._elements.chromium_element import ChromiumElement
class SelectElement(object):
def __init__(self, ele: ChromiumElement):
self._ele: ChromiumElement = ...
def __call__(self, text_or_index: Union[str, int, list, tuple], timeout: float = None) -> bool: ...
@property
def is_multi(self) -> bool: ...
@property
def options(self) -> List[ChromiumElement]: ...
@property
def selected_option(self) -> Union[ChromiumElement, None]: ...
@property
def selected_options(self) -> List[ChromiumElement]: ...
def clear(self) -> None: ...
def all(self) -> None: ...
def by_text(self, text: Union[str, list, tuple], timeout: float = None) -> bool: ...
def by_value(self, value: Union[str, list, tuple], timeout: float = None) -> bool: ...
def by_index(self, index: Union[int, list, tuple], timeout: float = None) -> bool: ...
def by_loc(self, loc: Union[str, Tuple[str, str]], timeout: float = None) -> bool: ...
def cancel_by_text(self, text: Union[str, list, tuple], timeout: float = None) -> bool: ...
def cancel_by_value(self, value: Union[str, list, tuple], timeout: float = None) -> bool: ...
def cancel_by_index(self, index: Union[int, list, tuple], timeout: float = None) -> bool: ...
def cancel_by_loc(self, loc: Union[str, Tuple[str, str]], timeout: float = None) -> bool: ...
def invert(self) -> None: ...
def _by_loc(self, loc: Union[str, Tuple[str, str]], timeout: float = None, cancel: bool = False) -> bool: ...
def _select(self,
condition: Union[str, int, list, tuple] = None,
para_type: str = 'text',
cancel: bool = False,
timeout: float = None) -> bool: ...
def _text_value(self, condition: Union[list, set], para_type: str, mode: str, timeout: float) -> bool: ...
def _index(self, condition: set, mode: str, timeout: float) -> bool: ...
def _dispatch_change(self) -> None: ...