Pre Merge pull request !42 from g1879/dev

This commit is contained in:
g1879 2024-03-02 14:50:05 +00:00 committed by Gitee
commit 23aeb63c8c
No known key found for this signature in database
GPG Key ID: 173E9B9CA92EEF8F
47 changed files with 505 additions and 329 deletions

View File

@ -14,4 +14,4 @@ from ._configs.chromium_options import ChromiumOptions
from ._configs.session_options import SessionOptions
__all__ = ['ChromiumPage', 'ChromiumOptions', 'SessionOptions', 'SessionPage', 'WebPage', '__version__']
__version__ = '4.0.4.3'
__version__ = '4.0.4.7'

View File

@ -36,7 +36,7 @@ class BaseParser(object):
def html(self):
return ''
def s_ele(self, locator):
def s_ele(self, locator=None):
pass
def s_eles(self, locator):
@ -52,8 +52,9 @@ class BaseParser(object):
class BaseElement(BaseParser):
"""各元素类的基类"""
def __init__(self, page=None):
self.page = page
def __init__(self, owner=None):
self.owner = owner
self.page = owner._page if owner else None
self._type = 'BaseElement'
# ----------------以下属性或方法由后代实现----------------
@ -175,7 +176,7 @@ class DrissionElement(BaseElement):
raise ElementNotFoundError(None, 'child()', {'locator': locator, 'index': index,
'ele_only': ele_only})
else:
return NoneElement(self.page, 'child()', {'locator': locator, 'index': index, 'ele_only': ele_only})
return NoneElement(self.owner, 'child()', {'locator': locator, 'index': index, 'ele_only': ele_only})
def prev(self, locator='', index=1, timeout=None, ele_only=True):
"""返回前面的一个兄弟元素,可用查询语法筛选,可指定返回筛选结果的第几个
@ -293,7 +294,7 @@ class DrissionElement(BaseElement):
if Settings.raise_when_ele_not_found:
raise ElementNotFoundError(None, func, {'locator': locator, 'index': index, 'ele_only': ele_only})
else:
return NoneElement(self.page, func, {'locator': locator, 'index': index, 'ele_only': ele_only})
return NoneElement(self.owner, func, {'locator': locator, 'index': index, 'ele_only': ele_only})
def _get_relatives(self, index=None, locator='', direction='following', brother=True, timeout=.5, ele_only=True):
"""按要求返回兄弟元素或节点组成的列表

View File

@ -11,6 +11,10 @@ from typing import Union, Tuple, List, Any, Optional
from DownloadKit import DownloadKit
from .._elements.none_element import NoneElement
from .._elements.session_element import SessionElement
from .._pages.chromium_page import ChromiumPage
from .._pages.session_page import SessionPage
from .._pages.web_page import WebPage
class BaseParser(object):
@ -29,9 +33,11 @@ class BaseParser(object):
@property
def html(self) -> str: ...
def s_ele(self, locator: Union[Tuple[str, str], str, BaseElement], index: int = 1): ...
def s_ele(self,
locator: Union[Tuple[str, str], str, BaseElement, None] = None,
index: int = 1) -> SessionElement: ...
def s_eles(self, locator: Union[Tuple[str, str], str]): ...
def s_eles(self, locator: Union[Tuple[str, str], str]) -> List[SessionElement]: ...
def _ele(self,
locator: Union[Tuple[str, str], str],
@ -50,8 +56,9 @@ class BaseParser(object):
class BaseElement(BaseParser):
def __init__(self, page: BasePage = None):
self.page: BasePage = ...
def __init__(self, owner: BasePage = None):
self.owner: BasePage = ...
self.page: Union[ChromiumPage, SessionPage, WebPage] = ...
# ----------------以下属性或方法由后代实现----------------
@property
@ -78,9 +85,7 @@ class BaseElement(BaseParser):
class DrissionElement(BaseElement):
def __init__(self,
page: BasePage = ...):
self.page: BasePage = ...
def __init__(self, owner: BasePage = None): ...
@property
def link(self) -> str: ...
@ -199,6 +204,7 @@ class BasePage(BaseParser):
self._DownloadKit: DownloadKit = ...
self._none_ele_return_value: bool = ...
self._none_ele_value: Any = ...
self._page: Union[ChromiumPage, SessionPage, WebPage]=...
@property
def title(self) -> Union[str, None]: ...

View File

@ -7,12 +7,12 @@
"""
from pathlib import Path
from shutil import rmtree
from time import sleep, perf_counter
from time import perf_counter, sleep
from websocket import WebSocketBadStatusException
from .driver import BrowserDriver, Driver
from .._functions.tools import stop_process_on_port, raise_error
from .._functions.tools import raise_error
from .._units.downloader import DownloadManager
from ..errors import PageDisconnectedError
@ -190,44 +190,56 @@ class Browser(object):
"""
return self.run_cdp('Browser.getWindowForTarget', targetId=tab_id or self.id)['bounds']
def reconnect(self):
"""断开重连"""
self._driver.stop()
BrowserDriver.BROWSERS.pop(self.id)
self._driver = BrowserDriver(self.id, 'browser', self.address, self)
self.run_cdp('Target.setDiscoverTargets', discover=True)
self._driver.set_callback('Target.targetDestroyed', self._onTargetDestroyed)
self._driver.set_callback('Target.targetCreated', self._onTargetCreated)
def quit(self, timeout=5, force=False):
"""关闭浏览器
:param timeout: 等待浏览器关闭超时时间
:param force: 是否立刻强制终止进程
:return: None
"""
pids = [pid['id'] for pid in self.run_cdp('SystemInfo.getProcessInfo')['processInfo']]
for tab in self._all_drivers.values():
for driver in tab:
driver.stop()
try:
self.run_cdp('Browser.close')
except PageDisconnectedError:
self.driver.stop()
return
self.driver.stop()
if force:
ip, port = self.address.split(':')
if ip not in ('127.0.0.1', 'localhost'):
return
stop_process_on_port(port)
return
from psutil import Process
for pid in pids:
Process(pid).kill()
else:
try:
self.run_cdp('Browser.close')
self.driver.stop()
except PageDisconnectedError:
self.driver.stop()
if self.process_id:
from os import popen
from platform import system
txt = f'tasklist | findstr {self.process_id}' if system().lower() == 'windows' \
else f'ps -ef | grep {self.process_id}'
end_time = perf_counter() + timeout
while perf_counter() < end_time:
from os import popen
from platform import system
end_time = perf_counter() + timeout
while perf_counter() < end_time:
ok = True
for pid in pids:
txt = f'tasklist | findstr {pid}' if system().lower() == 'windows' else f'ps -ef | grep {pid}'
p = popen(txt)
sleep(.1)
sleep(.05)
try:
if f' {self.process_id} ' not in p.read():
return
if f' {pid} ' in p.read():
ok = False
break
except TypeError:
pass
if ok:
break
def _on_disconnect(self):
self.page._on_disconnect()
Browser.BROWSERS.pop(self.id, None)

View File

@ -56,6 +56,8 @@ class Browser(object):
def get_window_bounds(self, tab_id: str = None) -> dict: ...
def reconnect(self) -> None: ...
def connect_to_page(self) -> None: ...
def _onTargetCreated(self, **kwargs) -> None: ...

View File

@ -71,6 +71,11 @@ class BrowserDriver(Driver):
def __new__(cls, tab_id: str, tab_type: str, address: str, owner: Browser): ...
def __init__(self, tab_id: str, tab_type: str, address: str, owner: Browser): ...
def __init__(self, tab_id: str, tab_type: str, address: str, owner: Browser):
"""
:rtype: object
"""
...
def get(self, url) -> Response: ...

View File

@ -532,9 +532,9 @@ class ChromiumOptions(object):
path = path / 'config.ini' if path.is_dir() else path
if path.exists():
om = OptionsManager(str(path))
om = OptionsManager(path)
else:
om = OptionsManager(self.ini_path or str(Path(__file__).parent / 'configs.ini'))
om = OptionsManager(self.ini_path or (Path(__file__).parent / 'configs.ini'))
# 设置chromium_options
attrs = ('address', 'browser_path', 'arguments', 'extensions', 'user', 'load_mode',

View File

@ -29,6 +29,8 @@ class OptionsManager(object):
self.ini_path = default_configs
elif path == 'default':
self.ini_path = default_configs
elif isinstance(path, Path):
self.ini_path = path
else:
self.ini_path = Path(path)
@ -147,6 +149,7 @@ class OptionsManager(object):
path = Path(path).absolute()
path = path / 'config.ini' if path.is_dir() else path
path.parent.mkdir(exist_ok=True, parents=True)
path = str(path)
self._conf.write(open(path, 'w', encoding='utf-8'))

View File

@ -7,7 +7,7 @@
"""
from configparser import RawConfigParser
from pathlib import Path
from typing import Any, Optional
from typing import Any, Optional, Union
class OptionsManager(object):
@ -15,7 +15,7 @@ class OptionsManager(object):
file_exists: bool = ...
_conf: RawConfigParser = ...
def __init__(self, path: str = None): ...
def __init__(self, path: Union[Path, str] = None): ...
def __getattr__(self, item) -> dict: ...

View File

@ -377,9 +377,9 @@ class SessionOptions(object):
path = path / 'config.ini' if path.is_dir() else path
if path.exists():
om = OptionsManager(str(path))
om = OptionsManager(path)
else:
om = OptionsManager(self.ini_path or str(Path(__file__).parent / 'configs.ini'))
om = OptionsManager(self.ini_path or (Path(__file__).parent / 'configs.ini'))
options = session_options_to_dict(self)

View File

@ -36,14 +36,15 @@ __FRAME_ELEMENT__ = ('iframe', 'frame')
class ChromiumElement(DrissionElement):
"""控制浏览器元素的对象"""
def __init__(self, page, node_id=None, obj_id=None, backend_id=None):
def __init__(self, owner, node_id=None, obj_id=None, backend_id=None):
"""node_id、obj_id和backend_id必须至少传入一个
:param page: 元素所在页面对象
:param owner: 元素所在页面对象
:param node_id: cdp中的node id
:param obj_id: js中的object id
:param backend_id: backend id
"""
super().__init__(page)
super().__init__(owner)
self.tab = self.owner.tab
self._select = None
self._scroll = None
self._rect = None
@ -96,14 +97,14 @@ class ChromiumElement(DrissionElement):
def tag(self):
"""返回元素tag"""
if self._tag is None:
self._tag = self.page.run_cdp('DOM.describeNode',
backendNodeId=self._backend_id)['node']['localName'].lower()
self._tag = self.owner.run_cdp('DOM.describeNode',
backendNodeId=self._backend_id)['node']['localName'].lower()
return self._tag
@property
def html(self):
"""返回元素outerHTML文本"""
return self.page.run_cdp('DOM.getOuterHTML', backendNodeId=self._backend_id)['outerHTML']
return self.owner.run_cdp('DOM.getOuterHTML', backendNodeId=self._backend_id)['outerHTML']
@property
def inner_html(self):
@ -114,7 +115,7 @@ class ChromiumElement(DrissionElement):
def attrs(self):
"""返回元素所有attribute属性"""
try:
attrs = self.page.run_cdp('DOM.getAttributes', nodeId=self._node_id)['attributes']
attrs = self.owner.run_cdp('DOM.getAttributes', nodeId=self._node_id)['attributes']
return {attrs[i]: attrs[i + 1] for i in range(0, len(attrs), 2)}
except CDPError: # 文档根元素不能调用此方法
return {}
@ -161,7 +162,7 @@ class ChromiumElement(DrissionElement):
@property
def shadow_root(self):
"""返回当前元素的shadow_root元素对象"""
info = self.page.run_cdp('DOM.describeNode', backendNodeId=self._backend_id)['node']
info = self.owner.run_cdp('DOM.describeNode', backendNodeId=self._backend_id)['node']
if not info.get('shadowRoots', None):
return None
@ -190,7 +191,7 @@ class ChromiumElement(DrissionElement):
def wait(self):
"""返回用于等待的对象"""
if self._wait is None:
self._wait = ElementWaiter(self.page, self)
self._wait = ElementWaiter(self.owner, self)
return self._wait
@property
@ -412,7 +413,7 @@ class ChromiumElement(DrissionElement):
:param timeout: js超时时间为None则使用页面timeouts.script设置
:return: 运行的结果
"""
return run_js(self, script, as_expr, self.page.timeouts.script if timeout is None else timeout, args)
return run_js(self, script, as_expr, self.owner.timeouts.script if timeout is None else timeout, args)
def run_async_js(self, script, *args, as_expr=False):
"""以异步方式对本元素执行javascript代码
@ -494,7 +495,7 @@ class ChromiumElement(DrissionElement):
:param base64_to_bytes: 为True时如果是base64数据转换为bytes格式
:return: 资源内容
"""
timeout = self.page.timeout if timeout is None else timeout
timeout = self.owner.timeout if timeout is None else timeout
if self.tag == 'img': # 等待图片加载完成
js = ('return this.complete && typeof this.naturalWidth != "undefined" '
'&& this.naturalWidth > 0 && typeof this.naturalHeight != "undefined" '
@ -517,7 +518,7 @@ class ChromiumElement(DrissionElement):
end_time = perf_counter() + timeout
while perf_counter() < end_time:
if is_blob:
result = get_blob(self.page, src, base64_to_bytes)
result = get_blob(self.owner, src, base64_to_bytes)
if result:
break
@ -526,11 +527,11 @@ class ChromiumElement(DrissionElement):
if not src:
continue
node = self.page.run_cdp('DOM.describeNode', backendNodeId=self._backend_id)['node']
frame = node.get('frameId', None) or self.page._frame_id
node = self.owner.run_cdp('DOM.describeNode', backendNodeId=self._backend_id)['node']
frame = node.get('frameId', None) or self.owner._frame_id
try:
result = self.page.run_cdp('Page.getResourceContent', frameId=frame, url=src)
result = self.owner.run_cdp('Page.getResourceContent', frameId=frame, url=src)
break
except CDPError:
sleep(.1)
@ -585,7 +586,7 @@ class ChromiumElement(DrissionElement):
if self.tag == 'img': # 等待图片加载完成
js = ('return this.complete && typeof this.naturalWidth != "undefined" && this.naturalWidth > 0 '
'&& typeof this.naturalHeight != "undefined" && this.naturalHeight > 0')
end_time = perf_counter() + self.page.timeout
end_time = perf_counter() + self.owner.timeout
while not self.run_js(js) and perf_counter() < end_time:
sleep(.1)
if scroll_to_center:
@ -598,8 +599,8 @@ class ChromiumElement(DrissionElement):
if not name:
name = f'{self.tag}.jpg'
return self.page._get_screenshot(path, name, as_bytes=as_bytes, as_base64=as_base64, full_page=False,
left_top=left_top, right_bottom=right_bottom, ele=self)
return self.owner._get_screenshot(path, name, as_bytes=as_bytes, as_base64=as_base64, full_page=False,
left_top=left_top, right_bottom=right_bottom, ele=self)
def input(self, vals, clear=True, by_js=False):
"""输入文本或组合键也可用于输入文件路径到input元素路径间用\n间隔)
@ -625,7 +626,7 @@ class ChromiumElement(DrissionElement):
else:
self._input_focus()
input_text_or_keys(self.page, vals)
input_text_or_keys(self.owner, vals)
def clear(self, by_js=False):
"""清空元素文本
@ -643,14 +644,14 @@ class ChromiumElement(DrissionElement):
def _input_focus(self):
"""输入前使元素获取焦点"""
try:
self.page.run_cdp('DOM.focus', backendNodeId=self._backend_id)
self.owner.run_cdp('DOM.focus', backendNodeId=self._backend_id)
except Exception:
self.click(by_js=None)
def focus(self):
"""使元素获取焦点"""
try:
self.page.run_cdp('DOM.focus', backendNodeId=self._backend_id)
self.owner.run_cdp('DOM.focus', backendNodeId=self._backend_id)
except Exception:
self.run_js('this.focus();')
@ -660,9 +661,9 @@ class ChromiumElement(DrissionElement):
:param offset_y: 相对元素左上角坐标的y轴偏移量
:return: None
"""
self.page.scroll.to_see(self)
self.owner.scroll.to_see(self)
x, y = offset_scroll(self, offset_x, offset_y)
self.page.run_cdp('Input.dispatchMouseEvent', type='mouseMoved', x=x, y=y, _ignore=AlertExistsError)
self.owner.run_cdp('Input.dispatchMouseEvent', type='mouseMoved', x=x, y=y, _ignore=AlertExistsError)
def drag(self, offset_x=0, offset_y=0, duration=.5):
"""拖拽当前元素到相对位置
@ -687,7 +688,7 @@ class ChromiumElement(DrissionElement):
elif not isinstance(ele_or_loc, (list, tuple)):
raise TypeError('需要ChromiumElement对象或坐标。')
self.page.actions.hold(self).move_to(ele_or_loc, duration=duration).release()
self.owner.actions.hold(self).move_to(ele_or_loc, duration=duration).release()
def _get_obj_id(self, node_id=None, backend_id=None):
"""根据传入node id或backend id获取js中的object id
@ -696,9 +697,9 @@ class ChromiumElement(DrissionElement):
:return: js中的object id
"""
if node_id:
return self.page.run_cdp('DOM.resolveNode', nodeId=node_id)['object']['objectId']
return self.owner.run_cdp('DOM.resolveNode', nodeId=node_id)['object']['objectId']
else:
return self.page.run_cdp('DOM.resolveNode', backendNodeId=backend_id)['object']['objectId']
return self.owner.run_cdp('DOM.resolveNode', backendNodeId=backend_id)['object']['objectId']
def _get_node_id(self, obj_id=None, backend_id=None):
"""根据传入object id或backend id获取cdp中的node id
@ -707,9 +708,9 @@ class ChromiumElement(DrissionElement):
:return: cdp中的node id
"""
if obj_id:
return self.page.run_cdp('DOM.requestNode', objectId=obj_id)['nodeId']
return self.owner.run_cdp('DOM.requestNode', objectId=obj_id)['nodeId']
else:
n = self.page.run_cdp('DOM.describeNode', backendNodeId=backend_id)['node']
n = self.owner.run_cdp('DOM.describeNode', backendNodeId=backend_id)['node']
self._tag = n['localName']
return n['nodeId']
@ -718,7 +719,7 @@ class ChromiumElement(DrissionElement):
:param node_id:
:return: backend id
"""
n = self.page.run_cdp('DOM.describeNode', nodeId=node_id)['node']
n = self.owner.run_cdp('DOM.describeNode', nodeId=node_id)['node']
self._tag = n['localName']
return n['backendNodeId']
@ -770,7 +771,7 @@ class ChromiumElement(DrissionElement):
if isinstance(files, str):
files = files.split('\n')
files = [str(Path(i).absolute()) for i in files]
self.page.run_cdp('DOM.setFileInputFiles', files=files, backendNodeId=self._backend_id)
self.owner.run_cdp('DOM.setFileInputFiles', files=files, backendNodeId=self._backend_id)
class ShadowRoot(BaseElement):
@ -782,7 +783,8 @@ class ShadowRoot(BaseElement):
:param obj_id: js中的object id
:param backend_id: cdp中的backend id
"""
super().__init__(parent_ele.page)
super().__init__(parent_ele.owner)
self.tab = self.owner.tab
self.parent_ele = parent_ele
if backend_id:
self._backend_id = backend_id
@ -841,7 +843,7 @@ class ShadowRoot(BaseElement):
:param timeout: js超时时间为None则使用页面timeouts.script设置
:return: 运行的结果
"""
return run_js(self, script, as_expr, self.page.timeouts.script if timeout is None else timeout, args)
return run_js(self, script, as_expr, self.owner.timeouts.script if timeout is None else timeout, args)
def run_async_js(self, script, *args, as_expr=False, timeout=None):
"""以异步方式执行js代码
@ -853,7 +855,7 @@ class ShadowRoot(BaseElement):
"""
from threading import Thread
Thread(target=run_js, args=(self, script, as_expr,
self.page.timeouts.script if timeout is None else timeout, args)).start()
self.owner.timeouts.script if timeout is None else timeout, args)).start()
def parent(self, level_or_loc=1, index=1):
"""返回上面某一级父元素,可指定层数或用查询语法定位
@ -899,7 +901,7 @@ class ShadowRoot(BaseElement):
if Settings.raise_when_ele_not_found:
raise ElementNotFoundError(None, 'child()', {'locator': locator, 'index': index})
else:
return NoneElement(self.page, 'child()', {'locator': locator, 'index': index})
return NoneElement(self.owner, 'child()', {'locator': locator, 'index': index})
def next(self, locator='', index=1):
"""返回当前元素后面一个符合条件的同级元素,可用查询语法筛选,可指定返回筛选结果的第几个
@ -920,7 +922,7 @@ class ShadowRoot(BaseElement):
if Settings.raise_when_ele_not_found:
raise ElementNotFoundError(None, 'next()', {'locator': locator, 'index': index})
else:
return NoneElement(self.page, 'next()', {'locator': locator, 'index': index})
return NoneElement(self.owner, 'next()', {'locator': locator, 'index': index})
def before(self, locator='', index=1):
"""返回文档中当前元素前面符合条件的一个元素,可用查询语法筛选,可指定返回筛选结果的第几个
@ -942,7 +944,7 @@ class ShadowRoot(BaseElement):
if Settings.raise_when_ele_not_found:
raise ElementNotFoundError(None, 'before()', {'locator': locator, 'index': index})
else:
return NoneElement(self.page, 'before()', {'locator': locator, 'index': index})
return NoneElement(self.owner, 'before()', {'locator': locator, 'index': index})
def after(self, locator='', index=1):
"""返回文档中此当前元素后面符合条件的一个元素,可用查询语法筛选,可指定返回筛选结果的第几个
@ -957,7 +959,7 @@ class ShadowRoot(BaseElement):
if Settings.raise_when_ele_not_found:
raise ElementNotFoundError(None, 'after()', {'locator': locator, 'index': index})
else:
return NoneElement(self.page, 'after()', {'locator': locator, 'index': index})
return NoneElement(self.owner, 'after()', {'locator': locator, 'index': index})
def children(self, locator=''):
"""返回当前元素符合条件的直接子元素或节点组成的列表,可用查询语法筛选
@ -1065,14 +1067,15 @@ class ShadowRoot(BaseElement):
def do_find():
if loc[0] == 'css selector':
if index == 1:
nod_id = self.page.run_cdp('DOM.querySelector', nodeId=self._node_id, selector=loc[1])['nodeId']
nod_id = self.owner.run_cdp('DOM.querySelector', nodeId=self._node_id, selector=loc[1])['nodeId']
if nod_id:
r = make_chromium_eles(self.page, _ids=nod_id, is_obj_id=False)
r = make_chromium_eles(self.owner, _ids=nod_id, is_obj_id=False)
return None if r is False else r
else:
nod_ids = self.page.run_cdp('DOM.querySelectorAll', nodeId=self._node_id, selector=loc[1])['nodeId']
r = make_chromium_eles(self.page, _ids=nod_ids, index=index, is_obj_id=False)
nod_ids = self.owner.run_cdp('DOM.querySelectorAll',
nodeId=self._node_id, selector=loc[1])['nodeId']
r = make_chromium_eles(self.owner, _ids=nod_ids, index=index, is_obj_id=False)
return None if r is False else r
else:
@ -1083,21 +1086,21 @@ class ShadowRoot(BaseElement):
css = [i.css_path[61:] for i in eles]
if index is not None:
try:
node_id = self.page.run_cdp('DOM.querySelector', nodeId=self._node_id,
selector=css[index - 1])['nodeId']
node_id = self.owner.run_cdp('DOM.querySelector', nodeId=self._node_id,
selector=css[index - 1])['nodeId']
except IndexError:
return None
r = make_chromium_eles(self.page, _ids=node_id, is_obj_id=False)
r = make_chromium_eles(self.owner, _ids=node_id, is_obj_id=False)
return None if r is False else r
else:
node_ids = [self.page.run_cdp('DOM.querySelector', nodeId=self._node_id, selector=i)['nodeId']
node_ids = [self.owner.run_cdp('DOM.querySelector', nodeId=self._node_id, selector=i)['nodeId']
for i in css]
if 0 in node_ids:
return None
r = make_chromium_eles(self.page, _ids=node_ids, index=index, is_obj_id=False)
r = make_chromium_eles(self.owner, _ids=node_ids, index=index, is_obj_id=False)
return None if r is False else r
timeout = timeout if timeout is not None else self.page.timeout
timeout = timeout if timeout is not None else self.owner.timeout
end_time = perf_counter() + timeout
result = do_find()
while result is None and perf_counter() <= end_time:
@ -1106,19 +1109,19 @@ class ShadowRoot(BaseElement):
if result:
return result
return NoneElement(self.page) if index is not None else []
return NoneElement(self.owner) if index is not None else []
def _get_node_id(self, obj_id):
"""返回元素node id"""
return self.page.run_cdp('DOM.requestNode', objectId=obj_id)['nodeId']
return self.owner.run_cdp('DOM.requestNode', objectId=obj_id)['nodeId']
def _get_obj_id(self, back_id):
"""返回元素object id"""
return self.page.run_cdp('DOM.resolveNode', backendNodeId=back_id)['object']['objectId']
return self.owner.run_cdp('DOM.resolveNode', backendNodeId=back_id)['object']['objectId']
def _get_backend_id(self, node_id):
"""返回元素object id"""
r = self.page.run_cdp('DOM.describeNode', nodeId=node_id)['node']
r = self.owner.run_cdp('DOM.describeNode', nodeId=node_id)['node']
self._tag = r['localName'].lower()
return r['backendNodeId']
@ -1145,7 +1148,7 @@ def find_in_chromium_ele(ele, locator, index=1, timeout=None, relative=True):
loc_str = f'{ele.css_path}{loc[1]}'
loc = loc[0], loc_str
timeout = timeout if timeout is not None else ele.page.timeout
timeout = timeout if timeout is not None else ele.owner.timeout
# ---------------执行查找-----------------
if loc[0] == 'xpath':
@ -1167,18 +1170,18 @@ def find_by_xpath(ele, xpath, index, timeout, relative=True):
type_txt = '9' if index == 1 else '7'
node_txt = 'this.contentDocument' if ele.tag in __FRAME_ELEMENT__ and not relative else 'this'
js = make_js_for_find_ele_by_xpath(xpath, type_txt, node_txt)
ele.page.wait.doc_loaded()
ele.owner.wait.doc_loaded()
def do_find():
res = ele.page.run_cdp('Runtime.callFunctionOn', functionDeclaration=js, objectId=ele._obj_id,
returnByValue=False, awaitPromise=True, userGesture=True)
res = ele.owner.run_cdp('Runtime.callFunctionOn', functionDeclaration=js, objectId=ele._obj_id,
returnByValue=False, awaitPromise=True, userGesture=True)
if res['result']['type'] == 'string':
return res['result']['value']
if 'exceptionDetails' in res:
if 'The result is not a node set' in res['result']['description']:
js1 = make_js_for_find_ele_by_xpath(xpath, '1', node_txt)
res = ele.page.run_cdp('Runtime.callFunctionOn', functionDeclaration=js1, objectId=ele._obj_id,
returnByValue=False, awaitPromise=True, userGesture=True)
res = ele.owner.run_cdp('Runtime.callFunctionOn', functionDeclaration=js1, objectId=ele._obj_id,
returnByValue=False, awaitPromise=True, userGesture=True)
return res['result']['value']
else:
raise SyntaxError(f'查询语句错误:\n{res}')
@ -1187,14 +1190,14 @@ def find_by_xpath(ele, xpath, index, timeout, relative=True):
return None
if index == 1:
r = make_chromium_eles(ele.page, _ids=res['result']['objectId'], is_obj_id=True)
r = make_chromium_eles(ele.owner, _ids=res['result']['objectId'], is_obj_id=True)
return None if r is False else r
else:
res = ele.page.run_cdp('Runtime.getProperties', objectId=res['result']['objectId'],
ownProperties=True)['result'][:-1]
res = ele.owner.run_cdp('Runtime.getProperties', objectId=res['result']['objectId'],
ownProperties=True)['result'][:-1]
if index is None:
r = [make_chromium_eles(ele.page, _ids=i['value']['objectId'], is_obj_id=True)
r = [make_chromium_eles(ele.owner, _ids=i['value']['objectId'], is_obj_id=True)
if i['value']['type'] == 'object' else i['value']['value'] for i in res]
return None if False in r else r
@ -1206,7 +1209,7 @@ def find_by_xpath(ele, xpath, index, timeout, relative=True):
index1 = eles_count + index + 1 if index < 0 else index
res = res[index1 - 1]
if res['value']['type'] == 'object':
r = make_chromium_eles(ele.page, _ids=res['value']['objectId'], is_obj_id=True)
r = make_chromium_eles(ele.owner, _ids=res['value']['objectId'], is_obj_id=True)
else:
r = res['value']['value']
return None if r is False else r
@ -1219,7 +1222,7 @@ def find_by_xpath(ele, xpath, index, timeout, relative=True):
if result:
return result
return NoneElement(ele.page) if index is not None else []
return NoneElement(ele.owner) if index is not None else []
def find_by_css(ele, selector, index, timeout):
@ -1235,11 +1238,11 @@ def find_by_css(ele, selector, index, timeout):
node_txt = 'this.contentDocument' if ele.tag in ('iframe', 'frame', 'shadow-root') else 'this'
js = f'function(){{return {node_txt}.querySelector{find_all}("{selector}");}}'
ele.page.wait.doc_loaded()
ele.owner.wait.doc_loaded()
def do_find():
res = ele.page.run_cdp('Runtime.callFunctionOn', functionDeclaration=js, objectId=ele._obj_id,
returnByValue=False, awaitPromise=True, userGesture=True)
res = ele.owner.run_cdp('Runtime.callFunctionOn', functionDeclaration=js, objectId=ele._obj_id,
returnByValue=False, awaitPromise=True, userGesture=True)
if 'exceptionDetails' in res:
raise SyntaxError(f'查询语句错误:\n{res}')
@ -1247,14 +1250,14 @@ def find_by_css(ele, selector, index, timeout):
return None
if index == 1:
r = make_chromium_eles(ele.page, _ids=res['result']['objectId'], is_obj_id=True)
r = make_chromium_eles(ele.owner, _ids=res['result']['objectId'], is_obj_id=True)
return None if r is False else r
else:
obj_ids = [i['value']['objectId'] for i in ele.page.run_cdp('Runtime.getProperties',
objectId=res['result']['objectId'],
ownProperties=True)['result'][:-1]]
r = make_chromium_eles(ele.page, _ids=obj_ids, index=index, is_obj_id=True)
obj_ids = [i['value']['objectId'] for i in ele.owner.run_cdp('Runtime.getProperties',
objectId=res['result']['objectId'],
ownProperties=True)['result']]
r = make_chromium_eles(ele.owner, _ids=obj_ids, index=index, is_obj_id=True)
return None if r is False else r
end_time = perf_counter() + timeout
@ -1265,15 +1268,16 @@ def find_by_css(ele, selector, index, timeout):
if result:
return result
return NoneElement(ele.page) if index is not None else []
return NoneElement(ele.owner) if index is not None else []
def make_chromium_eles(page, _ids, index=1, is_obj_id=True):
def make_chromium_eles(page, _ids, index=1, is_obj_id=True, ele_only=False):
"""根据node id或object id生成相应元素对象
:param page: ChromiumPage对象
:param _ids: 元素的id列表
:param index: 获取第几个为None返回全部
:param is_obj_id: 传入的id是obj id还是node id
:param ele_only: 是否只返回ele在页面查找元素时生效
:return: 浏览器元素对象或它们组成的列表生成失败返回False
"""
if is_obj_id:
@ -1284,16 +1288,25 @@ def make_chromium_eles(page, _ids, index=1, is_obj_id=True):
_ids = (_ids,)
if index is not None: # 获取一个
obj_id = _ids[index - 1]
return get_node_func(page, obj_id)
if ele_only:
for obj_id in _ids:
tmp = get_node_func(page, obj_id, ele_only)
if tmp is not None:
return tmp
return False
else:
obj_id = _ids[index - 1]
return get_node_func(page, obj_id, ele_only)
else: # 获取全部
nodes = []
for obj_id in _ids:
tmp = get_node_func(page, obj_id)
tmp = get_node_func(page, obj_id, ele_only)
if tmp is False:
return False
nodes.append(tmp)
elif tmp is not None:
nodes.append(tmp)
return nodes
@ -1307,22 +1320,24 @@ def _get_node_info(page, id_type, _id):
return node
def _get_node_by_obj_id(page, obj_id):
def _get_node_by_obj_id(page, obj_id, ele_only):
"""根据obj id返回元素对象或文本ele_only时如果是文本返回None出错返回False"""
node = _get_node_info(page, 'objectId', obj_id)
if node is False:
return False
if node['node']['nodeName'] in ('#text', '#comment'):
return node['node']['nodeValue']
return None if ele_only else node['node']['nodeValue']
else:
return _make_ele(page, obj_id, node)
def _get_node_by_node_id(page, node_id):
def _get_node_by_node_id(page, node_id, ele_only):
"""根据node id返回元素对象或文本ele_only时如果是文本返回None出错返回False"""
node = _get_node_info(page, 'nodeId', node_id)
if node is False:
return False
if node['node']['nodeName'] in ('#text', '#comment'):
return node['node']['nodeValue']
return None if ele_only else node['node']['nodeValue']
else:
obj_id = page.driver.run('DOM.resolveNode', nodeId=node_id)
if 'error' in obj_id:
@ -1393,7 +1408,7 @@ def run_js(page_or_ele, script, as_expr, timeout, args=None):
"""
if isinstance(page_or_ele, (ChromiumElement, ShadowRoot)):
is_page = False
page = page_or_ele.page
page = page_or_ele.owner
obj_id = page_or_ele._obj_id
else:
is_page = True
@ -1432,12 +1447,10 @@ def run_js(page_or_ele, script, as_expr, timeout, args=None):
except TimeoutError:
raise TimeoutError(f'执行js超时等待{timeout}秒)。')
except ContextLostError:
if is_page:
raise ContextLostError('页面已被刷新,请尝试等待页面加载完成再执行操作。')
else:
raise ElementLostError('原来获取到的元素对象已不在页面内。')
raise ContextLostError('页面已被刷新,请尝试等待页面加载完成再执行操作。') if is_page else ElementLostError(
'原来获取到的元素对象已不在页面内。')
if res is None and page.states.has_alert:
if not res: # _timeout=0或js激活alert时
return None
exceptionDetails = res.get('exceptionDetails')
@ -1447,7 +1460,10 @@ def run_js(page_or_ele, script, as_expr, timeout, args=None):
try:
return parse_js_result(page, page_or_ele, res.get('result'), end_time)
except Exception:
return res
from DrissionPage import __version__
raise RuntimeError(f'\njs结果解析错误\n版本:{__version__}\n内容:{res}\njs{script}\n'
f'出现这个错误可能意味着程序有bug请把错误信息和重现方法告知作者谢谢。\n'
f'报告网站https://gitee.com/g1879/DrissionPage/issues')
def parse_js_result(page, ele, result, end_time):
@ -1475,11 +1491,7 @@ def parse_js_result(page, ele, result, end_time):
elif sub_type == 'array':
r = page.run_cdp('Runtime.getProperties', objectId=result['objectId'], ownProperties=True)['result']
return [parse_js_result(page, ele, result=i['value'], end_time=end_time) for i in r[:-1]]
elif 'objectId' in result and result['className'].lower() == 'object': # dict
r = page.run_cdp('Runtime.getProperties', objectId=result['objectId'], ownProperties=True)['result']
return {i['name']: parse_js_result(page, ele, result=i['value'], end_time=end_time) for i in r}
return [parse_js_result(page, ele, result=i['value'], end_time=end_time) for i in r if i['name'].isdigit()]
elif 'objectId' in result:
timeout = end_time - perf_counter()

View File

@ -28,9 +28,12 @@ PIC_TYPE = Literal['jpg', 'jpeg', 'png', 'webp', True]
class ChromiumElement(DrissionElement):
def __init__(self, page: ChromiumBase, node_id: int = None, obj_id: str = None, backend_id: int = None):
def __init__(self, owner: ChromiumBase, node_id: int = None, obj_id: str = None, backend_id: int = None):
self._tag: str = ...
# self.page: Union[ChromiumPage, WebPage] = ...
self.owner: ChromiumBase = ...
self.page: Union[ChromiumPage, WebPage] = ...
self.tab: Union[ChromiumPage, ChromiumTab] = ...
self._node_id: int = ...
self._obj_id: str = ...
self._backend_id: int = ...
@ -226,7 +229,7 @@ class ChromiumElement(DrissionElement):
def drag(self, offset_x: int = 0, offset_y: int = 0, duration: float = 0.5) -> None: ...
def drag_to(self, ele_or_loc: Union[tuple, ChromiumElement], duration: float = 0.5) -> None: ...
def drag_to(self, ele_or_loc: Union[Tuple[int, int], str, ChromiumElement], duration: float = 0.5) -> None: ...
def _get_obj_id(self, node_id: int = None, backend_id: int = None) -> str: ...
@ -240,10 +243,13 @@ class ChromiumElement(DrissionElement):
class ShadowRoot(BaseElement):
def __init__(self, parent_ele: ChromiumElement, obj_id: str = None, backend_id: int = None):
# self.page: Union[ChromiumPage, WebPage] = ...
self.owner: ChromiumBase = ...
self.tab: Union[ChromiumPage, ChromiumTab] = ...
self._obj_id: str = ...
self._node_id: int = ...
self._backend_id: int = ...
self.page: ChromiumPage = ...
# self.page: ChromiumPage = ...
self.parent_ele: ChromiumElement = ...
self._states: ShadowRootStates = ...
@ -350,7 +356,8 @@ def find_by_css(ele: ChromiumElement,
def make_chromium_eles(page: Union[ChromiumBase, ChromiumPage, WebPage, ChromiumTab, ChromiumFrame],
_ids: Union[tuple, list, str, int],
index: Optional[int] = 1,
is_obj_id: bool = True
is_obj_id: bool = True,
ele_only: bool = False
) -> Union[ChromiumElement, ChromiumFrame, List[Union[ChromiumElement, ChromiumFrame]]]: ...

View File

@ -20,12 +20,12 @@ from .._functions.web import get_ele_txt, make_absolute_link
class SessionElement(DrissionElement):
"""session模式的元素对象包装了一个lxml的Element对象并封装了常用功能"""
def __init__(self, ele, page=None):
def __init__(self, ele, owner=None):
"""初始化对象
:param ele: 被包装的HtmlElement元素
:param page: 元素所在页面对象如果是从 html 文本生成的元素则为 None
:param owner: 元素所在页面对象如果是从 html 文本生成的元素则为 None
"""
super().__init__(page)
super().__init__(owner)
self._inner_ele = ele
self._type = 'SessionElement'
@ -201,10 +201,10 @@ class SessionElement(DrissionElement):
return link
else: # 其它情况直接返回绝对url
return make_absolute_link(link, self.page.url)
return make_absolute_link(link, self.owner.url)
elif name == 'src':
return make_absolute_link(self.inner_ele.get('src'), self.page.url)
return make_absolute_link(self.inner_ele.get('src'), self.owner.url)
elif name == 'text':
return self.text
@ -313,7 +313,7 @@ def make_session_ele(html_or_ele, loc=None, index=1):
# SessionElement
elif html_or_ele._type == 'SessionElement':
page = html_or_ele.page
page = html_or_ele.owner
loc_str = loc[1]
if loc[0] == 'xpath' and loc[1].lstrip().startswith('/'):
@ -323,8 +323,8 @@ def make_session_ele(html_or_ele, loc=None, index=1):
# 若css以>开头表示找元素的直接子元素要用page以绝对路径才能找到
elif loc[0] == 'css selector' and loc[1].lstrip().startswith('>'):
loc_str = f'{html_or_ele.css_path}{loc[1]}'
if html_or_ele.page:
html_or_ele = fromstring(html_or_ele.page.html)
if html_or_ele.owner:
html_or_ele = fromstring(html_or_ele.owner.html)
else: # 接收html文本无page的情况
html_or_ele = fromstring(html_or_ele('xpath:/ancestor::*').html)
@ -342,11 +342,11 @@ def make_session_ele(html_or_ele, loc=None, index=1):
loc = loc[0], loc_str
# 获取整个页面html再定位到当前元素以实现查找上级元素
page = html_or_ele.page
page = html_or_ele.owner
xpath = html_or_ele.xpath
# ChromiumElement兼容传入的元素在iframe内的情况
html = html_or_ele.page.run_cdp('DOM.getOuterHTML', objectId=html_or_ele._doc_id)['outerHTML'] \
if html_or_ele._doc_id else html_or_ele.page.html
html = html_or_ele.owner.run_cdp('DOM.getOuterHTML', objectId=html_or_ele._doc_id)['outerHTML'] \
if html_or_ele._doc_id else html_or_ele.owner.html
html_or_ele = fromstring(html)
html_or_ele = html_or_ele.xpath(xpath)[0]
@ -360,7 +360,7 @@ def make_session_ele(html_or_ele, loc=None, index=1):
# ShadowRoot
elif isinstance(html_or_ele, BaseElement):
page = html_or_ele.page
page = html_or_ele.owner
html_or_ele = fromstring(html_or_ele.html)
else:

View File

@ -18,8 +18,9 @@ from .._pages.session_page import SessionPage
class SessionElement(DrissionElement):
def __init__(self, ele: HtmlElement, page: Union[SessionPage, None] = None):
def __init__(self, ele: HtmlElement, owner: Union[SessionPage, None] = None):
self._inner_ele: HtmlElement = ...
self.owner: SessionPage = ...
self.page: SessionPage = ...
@property

View File

@ -157,7 +157,7 @@ def set_prefs(opt):
def set_flags(opt):
"""处理启动配置中的prefs项,目前只能对已存在文件夹配置
"""处理启动配置中的flags项
:param opt: ChromiumOptions
:return: None
"""
@ -316,18 +316,25 @@ def get_chrome_path(ini_path):
return None
# -----------从注册表中获取--------------
import winreg
from winreg import OpenKey, EnumValue, CloseKey, HKEY_CURRENT_USER, HKEY_LOCAL_MACHINE, KEY_READ
txt = r'SOFTWARE\Microsoft\Windows\CurrentVersion\App Paths\chrome.exe'
try:
key = winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE,
r'SOFTWARE\Microsoft\Windows\CurrentVersion\App Paths\chrome.exe',
reserved=0, access=winreg.KEY_READ)
k = winreg.EnumValue(key, 0)
winreg.CloseKey(key)
key = OpenKey(HKEY_CURRENT_USER, txt, reserved=0, access=KEY_READ)
k = EnumValue(key, 0)
CloseKey(key)
if k[1]:
return k[1]
return k[1]
except (FileNotFoundError, OSError):
try:
key = OpenKey(HKEY_LOCAL_MACHINE, txt, reserved=0, access=KEY_READ)
k = EnumValue(key, 0)
CloseKey(key)
if k[1]:
return k[1]
except FileNotFoundError:
pass
except (FileNotFoundError, OSError):
pass
# -----------从系统变量中获取--------------
for path in environ.get('PATH', '').split(';'):

View File

@ -12,8 +12,6 @@ from tempfile import gettempdir, TemporaryDirectory
from threading import Lock
from time import perf_counter
from psutil import process_iter, AccessDenied, NoSuchProcess, ZombieProcess
from .._configs.options_manage import OptionsManager
from ..errors import (ContextLostError, ElementLostError, CDPError, PageDisconnectedError, NoRectError,
AlertExistsError, WrongURLError, StorageError, CookieFormatError, JavaScriptError)
@ -182,26 +180,6 @@ def wait_until(function, kwargs=None, timeout=10):
raise TimeoutError
def stop_process_on_port(port):
"""强制关闭某个端口内的进程
:param port: 端口号
:return: None
"""
for proc in process_iter(['pid', 'connections']):
try:
connections = proc.connections()
except (AccessDenied, NoSuchProcess):
continue
for conn in connections:
if conn.laddr.port == int(port):
try:
proc.terminate()
except (NoSuchProcess, AccessDenied, ZombieProcess):
pass
except Exception as e:
print(f"{proc.pid} {port}: {e}")
def configs_to_here(save_name=None):
"""把默认ini文件复制到当前目录
:param save_name: 指定文件名为None则命名为'dp_configs.ini'

View File

@ -42,9 +42,6 @@ def get_hwnds_from_pid(pid: Union[str, int], title: str) -> list: ...
def wait_until(function: callable, kwargs: dict = None, timeout: float = 10): ...
def stop_process_on_port(port: Union[int, str]) -> None: ...
def configs_to_here(file_name: Union[Path, str] = None) -> None: ...

View File

@ -119,10 +119,10 @@ def offset_scroll(ele, offset_x, offset_y):
cp_x, cp_y = ele.rect.click_point
lx = loc_x + offset_x if offset_x else cp_x
ly = loc_y + offset_y if offset_y else cp_y
if not location_in_viewport(ele.page, lx, ly):
clientWidth = ele.page.run_js('return document.body.clientWidth;')
clientHeight = ele.page.run_js('return document.body.clientHeight;')
ele.page.scroll.to_location(lx - clientWidth // 2, ly - clientHeight // 2)
if not location_in_viewport(ele.owner, lx, ly):
clientWidth = ele.owner.run_js('return document.body.clientWidth;')
clientHeight = ele.owner.run_js('return document.body.clientHeight;')
ele.owner.scroll.to_location(lx - clientWidth // 2, ly - clientHeight // 2)
cl_x, cl_y = ele.rect.viewport_location
ccp_x, ccp_y = ele.rect.viewport_click_point
cx = cl_x + offset_x if offset_x else ccp_x
@ -319,12 +319,12 @@ def is_cookie_in_driver(page, cookie):
:return: bool
"""
if 'domain' in cookie:
for c in page.get_cookies(all_domains=True):
for c in page.cookies(all_domains=True):
if cookie['name'] == c['name'] and cookie['value'] == c['value'] and cookie['domain'] == c.get('domain',
None):
return True
else:
for c in page.get_cookies(all_domains=True):
for c in page.cookies(all_domains=True):
if cookie['name'] == c['name'] and cookie['value'] == c['value']:
return True
return False
@ -363,3 +363,34 @@ def get_blob(page, url, as_bytes=True):
return b64decode(result.split(',', 1)[-1])
else:
return result
def tree(ele_or_page):
"""把页面或元素对象DOM结构打印出来
:param ele_or_page: 页面或元素对象
:return: None
"""
def _tree(obj, last_one=True, body=''):
list_ele = obj.children()
length = len(list_ele)
body_unit = ' ' if last_one else ''
tail = '├───'
new_body = body + body_unit
if length > 0:
new_last_one = False
for i in range(length):
if i == length - 1:
tail = '└───'
new_last_one = True
e = list_ele[i]
attrs = ' '.join([f"{k}='{v}'" for k, v in e.attrs.items()])
print(f'{new_body}{tail}<{e.tag} {attrs}>'.replace('\n', ' '))
_tree(e, new_last_one, new_body)
ele = ele_or_page.s_ele()
attrs = ' '.join([f"{k}='{v}'" for k, v in ele.attrs.items()])
print(f'<{ele.tag} {attrs}>'.replace('\n', ' '))
_tree(ele)

View File

@ -11,7 +11,7 @@ from typing import Union
from requests import Session
from requests.cookies import RequestsCookieJar
from .._base.base import BasePage, DrissionElement
from .._base.base import DrissionElement, BaseParser
from .._elements.chromium_element import ChromiumElement
from .._pages.chromium_base import ChromiumBase
@ -50,3 +50,6 @@ def is_cookie_in_driver(page: ChromiumBase, cookie: dict) -> bool: ...
def get_blob(page: ChromiumBase, url: str, as_bytes: bool = True) -> bytes: ...
def tree(ele_or_page:BaseParser) -> None: ...

View File

@ -576,7 +576,8 @@ class ChromiumBase(BasePage):
fromIndex=from_index, toIndex=end_index)
if __ERROR__ not in nIds:
if nIds['nodeIds'][0] != 0:
r = make_chromium_eles(self, _ids=nIds['nodeIds'], index=index_arg, is_obj_id=False)
r = make_chromium_eles(self, _ids=nIds['nodeIds'], index=index_arg,
is_obj_id=False, ele_only=True)
if r is not False:
break
@ -747,17 +748,6 @@ class ChromiumBase(BasePage):
frames = self._ele(locator, timeout=timeout, index=None, raise_err=False)
return [i for i in frames if i._type == 'ChromiumFrame']
def upload(self, loc_or_ele, file_paths, by_js=False):
"""触发上传文件选择框并自动填入指定路径
:param loc_or_ele: 被点击后会触发文件选择框的元素或它的定位符
:param file_paths: 文件路径如果上传框支持多文件可传入列表或字符串字符串时多个文件用回车分隔
:param by_js: 是否用js方式点击
:return: None
"""
self.set.upload_files(file_paths)
self.ele(loc_or_ele).click(by_js=by_js)
self.wait.upload_paths_inputted()
def session_storage(self, item=None):
"""返回sessionStorage信息不设置item则获取全部
:param item: 要获取的项不设置则返回全部
@ -875,10 +865,20 @@ class ChromiumBase(BasePage):
t_id = self._target_id
self.disconnect()
sleep(wait)
self.browser.reconnect()
self._driver = self.browser._get_driver(t_id, self)
def handle_alert(self, accept=True, send=None, timeout=None, next_one=False):
"""处理提示框,可以自动等待提示框出现
:param accept: True表示确认False表示取消为None不会按按钮但依然返回文本值
:param send: 处理prompt提示框时可输入文本
:param timeout: 等待提示框出现的超时时间为None则使用self.timeout属性的值
:param next_one: 是否处理下一个出现的提示框为True时timeout参数无效
:return: 提示框内容文本未等到提示框则返回False
"""
r = self._handle_alert(accept=accept, send=send, timeout=timeout, next_one=next_one)
if not isinstance(accept, bool):
return r
while self._has_alert:
sleep(.1)
return r
@ -905,6 +905,8 @@ class ChromiumBase(BasePage):
return False
res_text = self._alert.text
if not isinstance(accept, bool):
return res_text
d = {'accept': accept, '_timeout': 0}
if self._alert.type == 'prompt' and send is not None:
d['promptText'] = send
@ -1064,7 +1066,11 @@ class ChromiumBase(BasePage):
vp = {'x': 0, 'y': 0, 'width': width, 'height': height, 'scale': 1}
args = {'format': pic_type, 'captureBeyondViewport': True, 'clip': vp}
else:
if left_top and right_bottom:
if left_top or right_bottom:
if not left_top:
left_top = (0, 0)
if not right_bottom:
right_bottom = self.rect.size
x, y = left_top
w = right_bottom[0] - x
h = right_bottom[1] - y
@ -1132,6 +1138,17 @@ class ChromiumBase(BasePage):
def get_cookies(self, as_dict=False, all_domains=False, all_info=False):
return self.cookies(as_dict=as_dict, all_domains=all_domains, all_info=all_info)
def upload(self, loc_or_ele, file_paths, by_js=False):
"""触发上传文件选择框并自动填入指定路径
:param loc_or_ele: 被点击后会触发文件选择框的元素或它的定位符
:param file_paths: 文件路径如果上传框支持多文件可传入列表或字符串字符串时多个文件用回车分隔
:param by_js: 是否用js方式点击
:return: None
"""
self.set.upload_files(file_paths)
self.ele(loc_or_ele).click(by_js=by_js)
self.wait.upload_paths_inputted()
class Timeout(object):
"""用于保存d模式timeout信息的类"""

View File

@ -8,7 +8,8 @@
from pathlib import Path
from typing import Union, Tuple, List, Any, Optional, Literal
from .chromium_tab import ChromiumTab
from .chromium_tab import ChromiumTab, WebPageTab
from .web_page import WebPage
from .._base.base import BasePage
from .._base.browser import Browser
from .._base.driver import Driver
@ -35,6 +36,7 @@ class ChromiumBase(BasePage):
timeout: float = None):
self._browser: Browser = ...
self._page: ChromiumPage = ...
self.tab: Union[ChromiumPage, ChromiumTab] = ...
self.address: str = ...
self._driver: Driver = ...
self._frame_id: str = ...
@ -216,18 +218,13 @@ class ChromiumBase(BasePage):
def add_ele(self,
outerHTML: str,
insert_to: Optional[ChromiumElement, str, Tuple[str, str]] = None,
before: Optional[ChromiumElement, str, Tuple[str, str]] = None) -> ChromiumElement: ...
insert_to: Union[ChromiumElement, str, Tuple[str, str], None] = None,
before: Union[ChromiumElement, str, Tuple[str, str], None] = None) -> ChromiumElement: ...
def get_frame(self, loc_ind_ele: Union[str, int, tuple, ChromiumFrame], timeout: float = None) -> ChromiumFrame: ...
def get_frames(self, locator: Union[str, tuple] = None, timeout: float = None) -> List[ChromiumFrame]: ...
def upload(self,
loc_or_ele: Union[str, Tuple[str, str], ChromiumElement],
file_paths: Union[str, list, tuple],
by_js: bool = False) -> None: ...
def run_cdp(self, cmd: str, **cmd_args) -> dict: ...
def run_cdp_loaded(self, cmd: str, **cmd_args) -> dict: ...
@ -255,7 +252,7 @@ class ChromiumBase(BasePage):
def reconnect(self, wait: float = 0) -> None: ...
def handle_alert(self, accept: bool = True, send: str = None, timeout: float = None,
def handle_alert(self, accept: Optional[bool] = True, send: str = None, timeout: float = None,
next_one: bool = False) -> Union[str, False]: ...
def _handle_alert(self, accept: bool = True, send: str = None, timeout: float = None,

View File

@ -21,38 +21,38 @@ from ..errors import ContextLostError, ElementLostError, PageDisconnectedError,
class ChromiumFrame(ChromiumBase):
def __init__(self, page, ele, info=None):
def __init__(self, owner, ele, info=None):
"""
:param page: frame所在的页面对象
:param owner: frame所在的页面对象
:param ele: frame所在元素
:param info: frame所在元素信息
"""
if page._type in ('ChromiumPage', 'WebPage'):
self._page = self._target_page = self.tab = page
self._browser = page.browser
if owner._type in ('ChromiumPage', 'WebPage'):
self._page = self._target_page = self.tab = owner
self._browser = owner.browser
else: # Tab、Frame
self._page = page.page
self._page = owner.page
self._browser = self._page.browser
self._target_page = page
self.tab = page.tab if page._type == 'ChromiumFrame' else page
self._target_page = owner
self.tab = owner.tab if owner._type == 'ChromiumFrame' else owner
self.address = page.address
self._tab_id = page.tab_id
self.address = owner.address
self._tab_id = owner.tab_id
self._backend_id = ele._backend_id
self._frame_ele = ele
self._states = None
self._reloading = False
node = info['node'] if not info else page.run_cdp('DOM.describeNode', backendNodeId=ele._backend_id)['node']
node = info['node'] if not info else owner.run_cdp('DOM.describeNode', backendNodeId=ele._backend_id)['node']
self._frame_id = node['frameId']
if self._is_inner_frame():
self._is_diff_domain = False
self.doc_ele = ChromiumElement(self._target_page, backend_id=node['contentDocument']['backendNodeId'])
super().__init__(page.address, page.tab_id, page.timeout)
super().__init__(owner.address, owner.tab_id, owner.timeout)
else:
self._is_diff_domain = True
delattr(self, '_frame_id')
super().__init__(page.address, node['frameId'], page.timeout)
super().__init__(owner.address, node['frameId'], owner.timeout)
obj_id = super().run_js('document;', as_expr=True)['objectId']
self.doc_ele = ChromiumElement(self, obj_id=obj_id)
@ -176,6 +176,7 @@ class ChromiumFrame(ChromiumBase):
return True
except:
raise
return False
finally:
@ -251,8 +252,14 @@ class ChromiumFrame(ChromiumBase):
@property
def page(self):
"""返回所属Page对象"""
return self._page
@property
def owner(self):
"""返回所属页面对象"""
return self.frame_ele.owner
@property
def frame_ele(self):
"""返回总页面上的frame元素"""

View File

@ -10,7 +10,7 @@ from typing import Union, Tuple, List, Any, Optional
from .chromium_base import ChromiumBase
from .chromium_page import ChromiumPage
from .chromium_tab import ChromiumTab
from .chromium_tab import ChromiumTab, WebPageTab
from .web_page import WebPage
from .._elements.chromium_element import ChromiumElement
from .._units.listener import FrameListener
@ -24,12 +24,12 @@ from .._units.waiter import FrameWaiter
class ChromiumFrame(ChromiumBase):
def __init__(self,
page: Union[ChromiumPage, WebPage, ChromiumTab, ChromiumFrame],
owner: Union[ChromiumPage, WebPage, ChromiumTab, ChromiumFrame],
ele: ChromiumElement,
info: dict = None):
self._page: ChromiumPage = ...
self._target_page: ChromiumBase = ...
self.tab: ChromiumTab = ...
self._page: ChromiumPage = ...
self.tab: Union[ChromiumPage, ChromiumTab] = ...
self._tab_id: str = ...
self._set: ChromiumFrameSetter = ...
self._frame_ele: ChromiumElement = ...
@ -68,6 +68,9 @@ class ChromiumFrame(ChromiumBase):
@property
def page(self) -> Union[ChromiumPage, WebPage]: ...
@property
def owner(self) -> ChromiumBase: ...
@property
def frame_ele(self) -> ChromiumElement: ...

View File

@ -59,6 +59,7 @@ class ChromiumPage(ChromiumBase):
self._created = True
self._page = self
self.tab = self
self._run_browser()
super().__init__(self.address, tab_id)
self._type = 'ChromiumPage'

View File

@ -30,6 +30,7 @@ class ChromiumPage(ChromiumBase):
addr_or_opts: Union[str, int, ChromiumOptions] = None,
tab_id: str = None,
timeout: float = None):
self.tab: ChromiumPage = ...
self._chromium_options: ChromiumOptions = ...
self._browser: Browser = ...
self._browser_id: str = ...

View File

@ -46,6 +46,7 @@ class ChromiumTab(ChromiumBase):
self._created = True
self._page = page
self.tab = self
self._browser = page.browser
super().__init__(page.address, tab_id, page.timeout)
self._rect = None

View File

@ -36,6 +36,7 @@ class SessionPage(BasePage):
self._set = None
self._encoding = None
self._type = 'SessionPage'
self._page = self
self._s_set_start_options(session_or_options)
self._s_set_runtime_settings()
self._create_session()

View File

@ -32,6 +32,7 @@ class SessionPage(BasePage):
self.retry_interval: float = ...
self._set: SessionPageSetter = ...
self._encoding: str = ...
self._page: SessionPage = ...
def _s_set_start_options(self, session_or_options: Union[Session, SessionOptions]) -> None: ...
@ -113,9 +114,9 @@ class SessionPage(BasePage):
-> Union[SessionElement, List[SessionElement]]: ...
def cookies(self,
as_dict: bool = False,
all_domains: bool = False,
all_info: bool = False) -> Union[dict, list]: ...
as_dict: bool = False,
all_domains: bool = False,
all_info: bool = False) -> Union[dict, list]: ...
# ----------------session独有属性和方法-----------------------
@property

View File

@ -15,12 +15,12 @@ from .._functions.web import location_in_viewport
class Actions:
"""用于实现动作链的类"""
def __init__(self, page):
def __init__(self, owner):
"""
:param page: ChromiumBase对象
:param owner: ChromiumBase对象
"""
self.page = page
self._dr = page.driver
self.owner = owner
self._dr = owner.driver
self.modifier = 0 # 修饰符Alt=1, Ctrl=2, Meta/Command=4, Shift=8
self.curr_x = 0 # 视口坐标
self.curr_y = 0
@ -40,23 +40,23 @@ class Actions:
lx = ele_or_loc[0] + offset_x
ly = ele_or_loc[1] + offset_y
elif isinstance(ele_or_loc, str) or ele_or_loc._type == 'ChromiumElement':
ele_or_loc = self.page(ele_or_loc)
self.page.scroll.to_see(ele_or_loc)
ele_or_loc = self.owner(ele_or_loc)
self.owner.scroll.to_see(ele_or_loc)
x, y = ele_or_loc.rect.location if offset_x or offset_y else ele_or_loc.rect.midpoint
lx = x + offset_x
ly = y + offset_y
else:
raise TypeError('ele_or_loc参数只能接受坐标(x, y)或ChromiumElement对象。')
if not location_in_viewport(self.page, lx, ly):
if not location_in_viewport(self.owner, lx, ly):
# 把坐标滚动到页面中间
clientWidth = self.page.run_js('return document.body.clientWidth;')
clientHeight = self.page.run_js('return document.body.clientHeight;')
self.page.scroll.to_location(lx - clientWidth // 2, ly - clientHeight // 2)
clientWidth = self.owner.run_js('return document.body.clientWidth;')
clientHeight = self.owner.run_js('return document.body.clientHeight;')
self.owner.scroll.to_location(lx - clientWidth // 2, ly - clientHeight // 2)
# 这样设计为了应付那些不随滚动条滚动的元素
if is_loc:
cx, cy = location_to_client(self.page, lx, ly)
cx, cy = location_to_client(self.owner, lx, ly)
else:
x, y = ele_or_loc.rect.viewport_location if offset_x or offset_y \
else ele_or_loc.rect.viewport_midpoint
@ -255,7 +255,7 @@ class Actions:
data = self._get_key_data(key, 'keyDown')
data['_ignore'] = AlertExistsError
self.page.run_cdp('Input.dispatchKeyEvent', **data)
self.owner.run_cdp('Input.dispatchKeyEvent', **data)
return self
def key_up(self, key):
@ -270,7 +270,7 @@ class Actions:
data = self._get_key_data(key, 'keyUp')
data['_ignore'] = AlertExistsError
self.page.run_cdp('Input.dispatchKeyEvent', **data)
self.owner.run_cdp('Input.dispatchKeyEvent', **data)
return self
def type(self, keys):
@ -295,7 +295,7 @@ class Actions:
:param text: 文本值或按键组合
:return: self
"""
input_text_or_keys(self.page, text)
input_text_or_keys(self.owner, text)
return self
def wait(self, second):

View File

@ -44,8 +44,8 @@ KEYS = Literal['NULL', 'CANCEL', 'HELP', 'BACKSPACE', 'BACK_SPACE', 'meta',
class Actions:
def __init__(self, page: ChromiumBase):
self.page: ChromiumBase = ...
def __init__(self, owner: ChromiumBase):
self.owner: ChromiumBase = ...
self._dr: Driver = ...
self.modifier: int = ...
self.curr_x: int = ...

View File

@ -45,7 +45,7 @@ class Clicker(object):
if not by_js: # 模拟点击
can_click = False
timeout = self._ele.page.timeout if timeout is None else timeout
timeout = self._ele.owner.timeout if timeout is None else timeout
rect = None
if timeout == 0:
try:
@ -85,8 +85,8 @@ class Clicker(object):
x = rect[1][0] - (rect[1][0] - rect[0][0]) / 2
y = rect[0][0] + 3
try:
r = self._ele.page.run_cdp('DOM.getNodeForLocation', x=x, y=y, includeUserAgentShadowDOM=True,
ignorePointerEventsNone=True)
r = self._ele.owner.run_cdp('DOM.getNodeForLocation', x=x, y=y, includeUserAgentShadowDOM=True,
ignorePointerEventsNone=True)
if r['backendNodeId'] != self._ele._backend_id:
vx, vy = self._ele.rect.viewport_midpoint
else:
@ -107,13 +107,13 @@ class Clicker(object):
def right(self):
"""右键单击"""
self._ele.page.scroll.to_see(self._ele)
self._ele.owner.scroll.to_see(self._ele)
x, y = self._ele.rect.viewport_click_point
self._click(x, y, 'right')
def middle(self):
"""中键单击"""
self._ele.page.scroll.to_see(self._ele)
self._ele.owner.scroll.to_see(self._ele)
x, y = self._ele.rect.viewport_click_point
self._click(x, y, 'middle')
@ -125,7 +125,7 @@ class Clicker(object):
:param count: 点击次数
:return: None
"""
self._ele.page.scroll.to_see(self._ele)
self._ele.owner.scroll.to_see(self._ele)
if offset_x is None and offset_y is None:
w, h = self._ele.rect.size
offset_x = w // 2
@ -140,6 +140,61 @@ class Clicker(object):
"""
self.at(count=times)
def to_download(self, save_path=None, rename=None, suffix=None, new_tab=False, by_js=False, timeout=None):
"""点击触发下载
:param save_path: 保存路径为None保存在原来设置的如未设置保存到当前路径
:param rename: 重命名文件名
:param suffix: 指定文件后缀
:param new_tab: 该下载是否在新tab中触发
:param by_js: 是否用js方式点击逻辑与click()一致
:param timeout: 等待下载触发的超时时间为None则使用页面对象设置
:return: DownloadMission对象
"""
if save_path:
self._ele.owner.tab.set.download_path(save_path)
elif not self._ele.page._browser._dl_mgr._running:
self._ele.page.set.download_path('.')
if rename or suffix:
self._ele.owner.tab.set.download_file_name(rename, suffix)
tab = self._ele.page if new_tab else self._ele.owner
self.left(by_js=by_js)
return tab.wait.download_begin(timeout=timeout)
def to_upload(self, file_paths, by_js=False):
"""触发上传文件选择框并自动填入指定路径
:param file_paths: 文件路径如果上传框支持多文件可传入列表或字符串字符串时多个文件用回车分隔
:param by_js: 是否用js方式点击逻辑与click()一致
:return: None
"""
self._ele.owner.set.upload_files(file_paths)
self.left(by_js=by_js)
self._ele.owner.wait.upload_paths_inputted()
def for_new_tab(self, by_js=False):
"""点击后等待新tab出现并返回其对象
:param by_js: 是否使用js点击逻辑与click()一致
:return: 新标签页对象如果没有等到新标签页出现则抛出异常
"""
self.left(by_js=by_js)
tid = self._ele.page.wait.new_tab()
if not tid:
raise RuntimeError('没有出现新标签页。')
return self._ele.page.get_tab(tid)
def for_new_tab(self, by_js=False):
"""点击后等待新tab出现并返回其对象
:param by_js: 是否使用js点击逻辑与click()一致
:return: 新标签页对象如果没有等到新标签页出现则抛出异常
"""
self.left(by_js=by_js)
tid = self._ele.page._page.wait.new_tab()
if not tid:
raise RuntimeError('没有出现新标签页。')
return self._ele.page._page.get_tab(tid)
def _click(self, client_x, client_y, button='left', count=1):
"""实施点击
:param client_x: 视口中的x坐标
@ -148,11 +203,11 @@ class Clicker(object):
:param count: 点击次数
:return: None
"""
self._ele.page.run_cdp('Input.dispatchMouseEvent', type='mousePressed', x=client_x,
y=client_y, button=button, clickCount=count, _ignore=AlertExistsError)
self._ele.owner.run_cdp('Input.dispatchMouseEvent', type='mousePressed', x=client_x,
y=client_y, button=button, clickCount=count, _ignore=AlertExistsError)
# sleep(.05)
self._ele.page.run_cdp('Input.dispatchMouseEvent', type='mouseReleased', x=client_x,
y=client_y, button=button, _ignore=AlertExistsError)
self._ele.owner.run_cdp('Input.dispatchMouseEvent', type='mouseReleased', x=client_x,
y=client_y, button=button, _ignore=AlertExistsError)
# -------------即将废弃--------------

View File

@ -5,25 +5,44 @@
@Copyright: (c) 2024 by g1879, Inc. All Rights Reserved.
@License : BSD 3-Clause.
"""
from typing import Optional
from pathlib import Path
from typing import Union
from .downloader import DownloadMission
from .._elements.chromium_element import ChromiumElement
from .._pages.chromium_tab import WebPageTab, ChromiumTab
class Clicker(object):
def __init__(self, ele: ChromiumElement):
self._ele: ChromiumElement = ...
def __call__(self, by_js: Optional[bool] = False, timeout: float = 1.5, wait_stop: bool = True) -> bool: ...
def __call__(self, by_js: Union[bool, str, None] = False, timeout: float = 1.5, wait_stop: bool = True) -> bool: ...
def left(self, by_js: Optional[bool] = False, timeout: float = 1.5, wait_stop: bool = True) -> bool: ...
def left(self, by_js: Union[bool, str, None] = False, timeout: float = 1.5, wait_stop: bool = True) -> bool: ...
def right(self) -> None: ...
def middle(self) -> None: ...
def at(self, offset_x: float = None, offset_y: float = None, button: str = 'left', count: int = 1) -> None: ...
def at(self,
offset_x: float = None,
offset_y: float = None,
button: str = 'left',
count: int = 1) -> None: ...
def multi(self, times: int = 2) -> None: ...
def to_download(self,
save_path: Union[str, Path] = None,
rename: str = None,
suffix: str = None,
new_tab: bool = False,
by_js: bool = False,
timeout:float=None) -> DownloadMission: ...
def to_upload(self, file_paths: Union[str, Path, list, tuple], by_js: bool = False) -> None: ...
def for_new_tab(self, by_js:bool=False)->Union[ChromiumTab, WebPageTab]:...
def _click(self, client_x: float, client_y: float, button: str = 'left', count: int = 1) -> None: ...

View File

@ -31,13 +31,7 @@ class DownloadManager(object):
self._flags = {} # {tab_id: [bool, DownloadMission]}
if self._page.download_path:
self._browser.driver.set_callback('Browser.downloadProgress', self._onDownloadProgress)
self._browser.driver.set_callback('Browser.downloadWillBegin', self._onDownloadWillBegin)
r = self._browser.run_cdp('Browser.setDownloadBehavior', downloadPath=self._page.download_path,
behavior='allowAndName', eventsEnabled=True)
if 'error' in r:
print('浏览器版本太低无法使用下载管理功能。')
self._running = True
self.set_path(self._page, self._page.download_path)
else:
self._running = False

View File

@ -32,9 +32,9 @@ class DownloadManager(object):
def set_file_exists(self, tab_id: str, mode: Literal['rename', 'skip', 'overwrite']) -> None: ...
def set_flag(self, tab_id: str, flag: Optional[bool, DownloadMission]) -> None: ...
def set_flag(self, tab_id: str, flag: Union[bool, DownloadMission, None]) -> None: ...
def get_flag(self, tab_id: str) -> Optional[bool, DownloadMission]: ...
def get_flag(self, tab_id: str) -> Union[bool, DownloadMission, None]: ...
def get_tab_missions(self, tab_id: str) -> list: ...
@ -54,7 +54,7 @@ class DownloadManager(object):
class TabDownloadSettings(object):
TABS: dict = ...
tab_id: str = ...
waiting_flag: Optional[bool, dict] = ...
waiting_flag: Union[bool, dict, None] = ...
rename: Optional[str] = ...
suffix: Optional[str] = ...
path: Optional[str] = ...

View File

@ -162,6 +162,8 @@ class Listener(object):
:param gap: 每接收到多少个数据包返回一次数据
:return: 用于在接收到监听目标时触发动作的可迭代对象
"""
if not self.listening:
raise RuntimeError('监听未启动或已暂停。')
caught = 0
end = perf_counter() + timeout if timeout else None
while True:
@ -221,7 +223,7 @@ class Listener(object):
:return: 返回是否等待成功
"""
if not self.listening:
raise RuntimeError('监听未启动用listen.start()启动')
raise RuntimeError('监听未启动或已暂停')
if timeout is None:
while (not targets_only and self._running_requests > 0) or (targets_only and self._running_targets > 0):
sleep(.1)

View File

@ -6,7 +6,7 @@
@License : BSD 3-Clause.
"""
from queue import Queue
from typing import Union, Dict, List, Iterable, Optional, Literal
from typing import Union, Dict, List, Iterable, Optional, Literal, Any
from requests.structures import CaseInsensitiveDict
@ -23,7 +23,7 @@ class Listener(object):
self._page: ChromiumBase = ...
self._address: str = ...
self._target_id: str = ...
self._targets: Union[str, dict] = ...
self._targets: Union[str, dict, None] = ...
self._method: set = ...
self._res_type: set = ...
self._caught: Queue = ...
@ -39,16 +39,16 @@ class Listener(object):
def targets(self) -> Optional[set]: ...
def set_targets(self,
targets: Optional[str, list, tuple, set, bool] = True,
targets: Union[str, list, tuple, set, bool, None] = True,
is_regex: Optional[bool] = False,
method: Optional[str, list, tuple, set, bool] = ('GET', 'POST'),
res_type: Optional[__RES_TYPE__, list, tuple, set, bool] = True) -> None: ...
method: Union[str, list, tuple, set, bool, None] = ('GET', 'POST'),
res_type: Union[__RES_TYPE__, list, tuple, set, bool, None] = True) -> None: ...
def start(self,
targets: Optional[str, list, tuple, set, bool] = None,
targets: Union[str, list, tuple, set, bool, None] = None,
is_regex: Optional[bool] = None,
method: Optional[str, list, tuple, set, bool] = None,
res_type: Optional[__RES_TYPE__, list, tuple, set, bool] = None) -> None: ...
method: Union[str, list, tuple, set, bool, None] = None,
res_type: Union[__RES_TYPE__, list, tuple, set, bool, None] = None) -> None: ...
def stop(self) -> None: ...
@ -172,7 +172,7 @@ class Request(object):
def headers(self) -> dict: ...
@property
def postData(self) -> Union[str, dict]: ...
def postData(self) -> Any: ...
@property
def extra_info(self) -> Optional[RequestExtraInfo]: ...
@ -208,7 +208,7 @@ class Response(object):
self._response: dict = ...
self._raw_body: str = ...
self._is_base64_body: bool = ...
self._body: Union[str, dict] = ...
self._body: Union[str, dict, None] = ...
self._headers: dict = ...
@property
@ -221,7 +221,7 @@ class Response(object):
def raw_body(self) -> str: ...
@property
def body(self) -> Union[str, dict]: ...
def body(self) -> Any: ...
class ExtraInfo(object):

View File

@ -18,7 +18,7 @@ class ElementRect(object):
def corners(self):
"""返回元素四个角坐标顺序坐上、右上、右下、左下没有大小的元素抛出NoRectError"""
vr = self._get_viewport_rect('border')
r = self._ele.page.run_cdp_loaded('Page.getLayoutMetrics')['visualViewport']
r = self._ele.owner.run_cdp_loaded('Page.getLayoutMetrics')['visualViewport']
sx = r['pageX']
sy = r['pageY']
return [(vr[0] + sx, vr[1] + sy), (vr[2] + sx, vr[3] + sy), (vr[4] + sx, vr[5] + sy), (vr[6] + sx, vr[7] + sy)]
@ -32,8 +32,8 @@ class ElementRect(object):
@property
def size(self):
"""返回元素大小,格式(宽, 高)"""
border = self._ele.page.run_cdp('DOM.getBoxModel', backendNodeId=self._ele._backend_id,
nodeId=self._ele._node_id, objectId=self._ele._obj_id)['model']['border']
border = self._ele.owner.run_cdp('DOM.getBoxModel', backendNodeId=self._ele._backend_id,
nodeId=self._ele._node_id, objectId=self._ele._obj_id)['model']['border']
return border[2] - border[0], border[5] - border[1]
@property
@ -75,25 +75,25 @@ class ElementRect(object):
@property
def screen_location(self):
"""返回元素左上角在屏幕上坐标,左上角为(0, 0)"""
vx, vy = self._ele.page.rect.viewport_location
vx, vy = self._ele.owner.rect.viewport_location
ex, ey = self.viewport_location
pr = self._ele.page.run_js('return window.devicePixelRatio;')
pr = self._ele.owner.run_js('return window.devicePixelRatio;')
return (vx + ex) * pr, (ey + vy) * pr
@property
def screen_midpoint(self):
"""返回元素中点在屏幕上坐标,左上角为(0, 0)"""
vx, vy = self._ele.page.rect.viewport_location
vx, vy = self._ele.owner.rect.viewport_location
ex, ey = self.viewport_midpoint
pr = self._ele.page.run_js('return window.devicePixelRatio;')
pr = self._ele.owner.run_js('return window.devicePixelRatio;')
return (vx + ex) * pr, (ey + vy) * pr
@property
def screen_click_point(self):
"""返回元素中点在屏幕上坐标,左上角为(0, 0)"""
vx, vy = self._ele.page.rect.viewport_location
vx, vy = self._ele.owner.rect.viewport_location
ex, ey = self.viewport_click_point
pr = self._ele.page.run_js('return window.devicePixelRatio;')
pr = self._ele.owner.run_js('return window.devicePixelRatio;')
return (vx + ex) * pr, (ey + vy) * pr
def _get_viewport_rect(self, quad):
@ -101,12 +101,12 @@ class ElementRect(object):
:param quad: 方框类型margin border padding
:return: 四个角坐标
"""
return self._ele.page.run_cdp('DOM.getBoxModel', backendNodeId=self._ele._backend_id,
nodeId=self._ele._node_id, objectId=self._ele._obj_id)['model'][quad]
return self._ele.owner.run_cdp('DOM.getBoxModel', backendNodeId=self._ele._backend_id,
nodeId=self._ele._node_id, objectId=self._ele._obj_id)['model'][quad]
def _get_page_coord(self, x, y):
"""根据视口坐标获取绝对坐标"""
r = self._ele.page.run_cdp_loaded('Page.getLayoutMetrics')['visualViewport']
r = self._ele.owner.run_cdp_loaded('Page.getLayoutMetrics')['visualViewport']
sx = r['pageX']
sy = r['pageY']
return x + sx, y + sy

View File

@ -87,7 +87,7 @@ class Scroller(object):
if not self._wait_complete:
return
page = self._driver.page if self._driver._type == 'ChromiumElement' else self._driver
page = self._driver.owner if self._driver._type == 'ChromiumElement' else self._driver
r = page.run_cdp('Page.getLayoutMetrics')
x = r['layoutViewport']['pageX']
y = r['layoutViewport']['pageY']
@ -112,11 +112,11 @@ class ElementScroller(Scroller):
:param center: 是否尽量滚动到页面正中为None时如果被遮挡则滚动到页面正中
:return: None
"""
self._driver.page.scroll.to_see(self._driver, center=center)
self._driver.owner.scroll.to_see(self._driver, center=center)
def to_center(self):
"""元素尽量滚动到视口中间"""
self._driver.page.scroll.to_see(self._driver, center=True)
self._driver.owner.scroll.to_see(self._driver, center=True)
class PageScroller(Scroller):

View File

@ -27,7 +27,7 @@ class SelectElement(object):
:return: None
"""
para_type = 'index' if isinstance(text_or_index, int) else 'text'
timeout = timeout if timeout is not None else self._ele.page.timeout
timeout = timeout if timeout is not None else self._ele.owner.timeout
return self._select(text_or_index, para_type, timeout=timeout)
@property
@ -186,7 +186,7 @@ class SelectElement(object):
raise TypeError('单选列表只能传入str格式。')
mode = 'false' if cancel else 'true'
timeout = timeout if timeout is not None else self._ele.page.timeout
timeout = timeout if timeout is not None else self._ele.owner.timeout
condition = set(condition) if isinstance(condition, (list, tuple)) else {condition}
if para_type in ('text', 'value'):
@ -264,4 +264,4 @@ class SelectElement(object):
def _dispatch_change(self):
"""触发修改动作"""
self._ele.run_js('this.dispatchEvent(new Event("change", {bubbles: true}));')
self._ele.run_js('this.dispatchEvent(new CustomEvent("change", {bubbles: true}));')

View File

@ -130,6 +130,8 @@ class ChromiumBaseSetter(BasePageSetter):
if isinstance(files, str):
files = files.split('\n')
elif isinstance(files, Path):
files = (files, )
self._page._upload_list = [str(Path(i).absolute()) for i in files]
def headers(self, headers: dict) -> None:
@ -456,7 +458,7 @@ class ChromiumElementSetter(object):
:param value: 属性值
:return: None
"""
self._ele.page.run_cdp('DOM.setAttributeValue', nodeId=self._ele._node_id, name=name, value=str(value))
self._ele.owner.run_cdp('DOM.setAttributeValue', nodeId=self._ele._node_id, name=name, value=str(value))
def property(self, name, value):
"""设置元素property属性

View File

@ -6,7 +6,7 @@
@License : BSD 3-Clause.
"""
from pathlib import Path
from typing import Union, Tuple, Literal, Any, Optional
from typing import Union, Tuple, Literal, Any
from requests.adapters import HTTPAdapter
from requests.auth import HTTPBasicAuth
@ -62,9 +62,9 @@ class ChromiumBaseSetter(BasePageSetter):
def auto_handle_alert(self, on_off: bool = True, accept: bool = True) -> None: ...
def upload_files(self, files: Union[str, list, tuple]) -> None: ...
def upload_files(self, files: Union[str, Path, list, tuple]) -> None: ...
def blocked_urls(self, urls: Optional[list, tuple, str]) -> None: ...
def blocked_urls(self, urls: Union[list, tuple, str, None]) -> None: ...
class TabSetter(ChromiumBaseSetter):
@ -109,7 +109,7 @@ class SessionPageSetter(BasePageSetter):
def timeout(self, second: float) -> None: ...
def encoding(self, encoding: Optional[str, None], set_all: bool = True) -> None: ...
def encoding(self, encoding: Union[str, None], set_all: bool = True) -> None: ...
def headers(self, headers: dict) -> None: ...

View File

@ -51,7 +51,7 @@ class ElementStates(object):
def is_in_viewport(self):
"""返回元素是否出现在视口中以元素click_point为判断"""
x, y = self._ele.rect.click_point
return location_in_viewport(self._ele.page, x, y) if x else False
return location_in_viewport(self._ele.owner, x, y) if x else False
@property
def is_whole_in_viewport(self):
@ -59,14 +59,14 @@ class ElementStates(object):
x1, y1 = self._ele.rect.location
w, h = self._ele.rect.size
x2, y2 = x1 + w, y1 + h
return location_in_viewport(self._ele.page, x1, y1) and location_in_viewport(self._ele.page, x2, y2)
return location_in_viewport(self._ele.owner, x1, y1) and location_in_viewport(self._ele.owner, x2, y2)
@property
def is_covered(self):
"""返回元素是否被覆盖与是否在视口中无关如被覆盖返回覆盖元素的backend id否则返回False"""
lx, ly = self._ele.rect.click_point
try:
bid = self._ele.page.run_cdp('DOM.getNodeForLocation', x=int(lx), y=int(ly)).get('backendNodeId')
bid = self._ele.owner.run_cdp('DOM.getNodeForLocation', x=int(lx), y=int(ly)).get('backendNodeId')
return bid if bid != self._ele._backend_id else False
except CDPError:
return False
@ -96,7 +96,7 @@ class ShadowRootStates(object):
def is_alive(self):
"""返回元素是否仍在DOM中"""
try:
self._ele.page.run_cdp('DOM.describeNode', backendNodeId=self._ele._backend_id)
self._ele.owner.run_cdp('DOM.describeNode', backendNodeId=self._ele._backend_id)
return True
except Exception:
return False

View File

@ -18,12 +18,17 @@ class BaseWaiter(object):
"""
self._driver = page_or_ele
def __call__(self, second):
"""等待若干秒
def __call__(self, second, scope=None):
"""等待若干秒,如传入两个参数,等待时间为这两个数间的一个随机数
:param second: 秒数
:param scope: 随机数范围
:return: None
"""
sleep(second)
if scope is None:
sleep(second)
else:
from random import uniform
sleep(uniform(second, scope))
def ele_deleted(self, loc_or_ele, timeout=None, raise_err=None):
"""等待元素从DOM中删除
@ -328,12 +333,17 @@ class ElementWaiter(object):
self._page = page
self._ele = ele
def __call__(self, second):
"""等待若干秒
def __call__(self, second, scope=None):
"""等待若干秒,如传入两个参数,等待时间为这两个数间的一个随机数
:param second: 秒数
:param scope: 随机数范围
:return: None
"""
sleep(second)
if scope is None:
sleep(second)
else:
from random import uniform
sleep(uniform(second, scope))
def deleted(self, timeout=None, raise_err=None):
"""等待元素从dom删除

View File

@ -18,7 +18,7 @@ class BaseWaiter(object):
def __init__(self, page: ChromiumBase):
self._driver: ChromiumBase = ...
def __call__(self, second: float) -> None: ...
def __call__(self, second: float, scope: float = None) -> None: ...
def ele_deleted(self,
loc_or_ele: Union[str, tuple, ChromiumElement],
@ -78,7 +78,7 @@ class ElementWaiter(object):
self._ele: ChromiumElement = ...
self._page: ChromiumBase = ...
def __call__(self, second: float) -> None: ...
def __call__(self, second: float, scope: float = None) -> None: ...
def deleted(self, timeout: float = None, raise_err: bool = None) -> bool: ...

View File

@ -10,7 +10,7 @@ from ._functions.by import By
from ._functions.keys import Keys
from ._functions.settings import Settings
from ._functions.tools import wait_until, configs_to_here
from ._functions.web import get_blob
from ._functions.web import get_blob, tree
from ._units.actions import Actions
__all__ = ['make_session_ele', 'Actions', 'Keys', 'By', 'Settings', 'wait_until', 'configs_to_here', 'get_blob']
__all__ = ['make_session_ele', 'Actions', 'Keys', 'By', 'Settings', 'wait_until', 'configs_to_here', 'get_blob', 'tree']

View File

@ -123,4 +123,4 @@ python 版本3.6 及以上
如果本项目对您有所帮助,不妨请作者我喝杯咖啡
![](https://gitee.com/g1879/DrissionPageDocs/raw/master/docs/imgs/code.jpg)
![](https://g1879.gitee.io/drissionpagedocs/assets/images/code-cf68de50a2f331a2aa0d39c9aebbbe2c.jpg)

View File

@ -38,7 +38,7 @@ setup(
python_requires='>=3.6',
entry_points={
'console_scripts': [
'dp = DrissionPage.functions.cli:main',
'dp = DrissionPage._functions.cli:main',
],
},
)