Pre Merge pull request !35 from g1879/dev

This commit is contained in:
g1879 2024-01-16 10:23:34 +00:00 committed by Gitee
commit e0d0b45122
No known key found for this signature in database
GPG Key ID: 173E9B9CA92EEF8F
36 changed files with 1253 additions and 990 deletions

View File

@ -14,4 +14,4 @@ from ._configs.chromium_options import ChromiumOptions
from ._configs.session_options import SessionOptions
__all__ = ['ChromiumPage', 'ChromiumOptions', 'SessionOptions', 'SessionPage', 'WebPage', '__version__']
__version__ = '4.0.1'
__version__ = '4.0.2'

View File

@ -23,11 +23,11 @@ class BaseParser(object):
def __call__(self, loc_or_str):
return self.ele(loc_or_str)
def ele(self, loc_or_ele, timeout=None):
return self._ele(loc_or_ele, timeout, True, method='ele()')
def ele(self, loc_or_ele, index=1, timeout=None):
return self._ele(loc_or_ele, timeout, index=index, method='ele()')
def eles(self, loc_or_str, timeout=None):
return self._ele(loc_or_str, timeout, False)
return self._ele(loc_or_str, timeout, index=None)
# ----------------以下属性或方法待后代实现----------------
@property
@ -40,11 +40,11 @@ class BaseParser(object):
def s_eles(self, loc_or_str):
pass
def _ele(self, loc_or_ele, timeout=None, single=True, raise_err=None, method=None):
def _ele(self, loc_or_ele, timeout=None, index=1, raise_err=None, method=None):
pass
@abstractmethod
def _find_elements(self, loc_or_ele, timeout=None, single=True, raise_err=None):
def _find_elements(self, loc_or_ele, timeout=None, index=1, raise_err=None):
pass
@ -68,19 +68,28 @@ class BaseElement(BaseParser):
def nexts(self):
pass
def _ele(self, loc_or_str, timeout=None, single=True, relative=False, raise_err=None, method=None):
r = self._find_elements(loc_or_str, timeout=timeout, single=single, relative=relative, raise_err=raise_err)
def _ele(self, loc_or_str, timeout=None, index=1, relative=False, raise_err=None, method=None):
"""调用获取元素的方法
:param loc_or_str: 定位符
:param timeout: 超时时间
:param index: 获取第几个从1开始可传入负数获取倒数第几个
:param relative: 是否相对定位
:param raise_err: 找不到时是否抛出异常
:param method: 调用的方法名
:return: 元素对象或它们组成的列表
"""
r = self._find_elements(loc_or_str, timeout=timeout, index=index, relative=relative, raise_err=raise_err)
if r or isinstance(r, list):
return r
if Settings.raise_when_ele_not_found or raise_err is True:
raise ElementNotFoundError(None, method, {'loc_or_str': loc_or_str})
raise ElementNotFoundError(None, method, {'loc_or_str': loc_or_str, 'index': index})
r.method = method
r.args = {'loc_or_str': loc_or_str}
r.args = {'loc_or_str': loc_or_str, 'index': index}
return r
@abstractmethod
def _find_elements(self, loc_or_str, timeout=None, single=True, relative=False, raise_err=None):
def _find_elements(self, loc_or_str, timeout=None, index=1, relative=False, raise_err=None):
pass
@ -122,8 +131,8 @@ class DrissionElement(BaseElement):
def parent(self, level_or_loc=1, index=1):
"""返回上面某一级父元素,可指定层数或用查询语法定位
:param level_or_loc: 第几级父元素或定位符
:param index: 当level_or_loc传入定位符使用此参数选择第几个结果
:param level_or_loc: 第几级父元素1开始或定位符
:param index: 当level_or_loc传入定位符使用此参数选择第几个结果1开始
:return: 上级元素对象
"""
if isinstance(level_or_loc, int):
@ -153,24 +162,23 @@ class DrissionElement(BaseElement):
if isinstance(filter_loc, int):
index = filter_loc
filter_loc = ''
nodes = self.children(filter_loc=filter_loc, timeout=timeout, ele_only=ele_only)
if not nodes:
if Settings.raise_when_ele_not_found:
raise ElementNotFoundError(None, 'child()', {'filter_loc': filter_loc,
'index': index, 'ele_only': ele_only})
if not filter_loc:
loc = '*' if ele_only else 'node()'
else:
return NoneElement(self.page, 'child()', {'filter_loc': filter_loc,
'index': index, 'ele_only': ele_only})
loc = get_loc(filter_loc, True) # 把定位符转换为xpath
if loc[0] == 'css selector':
raise ValueError('此css selector语法不受支持请换成xpath。')
loc = loc[1].lstrip('./')
node = self._ele(f'xpath:./{loc}', timeout=timeout, index=index, relative=True, raise_err=False)
if node:
return node
try:
return nodes[index - 1]
except IndexError:
if Settings.raise_when_ele_not_found:
raise ElementNotFoundError(None, 'child()', {'filter_loc': filter_loc,
'index': index, 'ele_only': ele_only})
raise ElementNotFoundError(None, 'child()', {'filter_loc': filter_loc, 'index': index,
'ele_only': ele_only})
else:
return NoneElement(self.page, 'child()', {'filter_loc': filter_loc,
'index': index, 'ele_only': ele_only})
return NoneElement(self.page, 'child()', {'filter_loc': filter_loc, 'index': index, 'ele_only': ele_only})
def prev(self, filter_loc='', index=1, timeout=None, ele_only=True):
"""返回前面的一个兄弟元素,可用查询语法筛选,可指定返回筛选结果的第几个
@ -180,17 +188,7 @@ class DrissionElement(BaseElement):
:param ele_only: 是否只获取元素为False时把文本注释节点也纳入
:return: 兄弟元素
"""
if isinstance(filter_loc, int):
index = filter_loc
filter_loc = ''
nodes = self._get_brothers(index, filter_loc, 'preceding', timeout=timeout, ele_only=ele_only)
if nodes:
return nodes[-1]
if Settings.raise_when_ele_not_found:
raise ElementNotFoundError(None, 'prev()', {'filter_loc': filter_loc,
'index': index, 'ele_only': ele_only})
else:
return NoneElement(self.page, 'prev()', {'filter_loc': filter_loc, 'index': index, 'ele_only': ele_only})
return self._get_relative('prev()', 'preceding', True, filter_loc, index, timeout, ele_only)
def next(self, filter_loc='', index=1, timeout=None, ele_only=True):
"""返回后面的一个兄弟元素,可用查询语法筛选,可指定返回筛选结果的第几个
@ -200,17 +198,7 @@ class DrissionElement(BaseElement):
:param ele_only: 是否只获取元素为False时把文本注释节点也纳入
:return: 兄弟元素
"""
if isinstance(filter_loc, int):
index = filter_loc
filter_loc = ''
nodes = self._get_brothers(index, filter_loc, 'following', timeout=timeout, ele_only=ele_only)
if nodes:
return nodes[0]
if Settings.raise_when_ele_not_found:
raise ElementNotFoundError(None, 'next()', {'filter_loc': filter_loc,
'index': index, 'ele_only': ele_only})
else:
return NoneElement(self.page, 'next()', {'filter_loc': filter_loc, 'index': index, 'ele_only': ele_only})
return self._get_relative('next()', 'following', True, filter_loc, index, timeout, ele_only)
def before(self, filter_loc='', index=1, timeout=None, ele_only=True):
"""返回前面的一个兄弟元素,可用查询语法筛选,可指定返回筛选结果的第几个
@ -220,17 +208,7 @@ class DrissionElement(BaseElement):
:param ele_only: 是否只获取元素为False时把文本注释节点也纳入
:return: 本元素前面的某个元素或节点
"""
if isinstance(filter_loc, int):
index = filter_loc
filter_loc = ''
nodes = self._get_brothers(index, filter_loc, 'preceding', False, timeout=timeout, ele_only=ele_only)
if nodes:
return nodes[-1]
if Settings.raise_when_ele_not_found:
raise ElementNotFoundError(None, 'before()', {'filter_loc': filter_loc,
'index': index, 'ele_only': ele_only})
else:
return NoneElement(self.page, 'before()', {'filter_loc': filter_loc, 'index': index, 'ele_only': ele_only})
return self._get_relative('before()', 'preceding', False, filter_loc, index, timeout, ele_only)
def after(self, filter_loc='', index=1, timeout=None, ele_only=True):
"""返回后面的一个兄弟元素,可用查询语法筛选,可指定返回筛选结果的第几个
@ -240,17 +218,7 @@ class DrissionElement(BaseElement):
:param ele_only: 是否只获取元素为False时把文本注释节点也纳入
:return: 本元素后面的某个元素或节点
"""
if isinstance(filter_loc, int):
index = filter_loc
filter_loc = ''
nodes = self._get_brothers(index, filter_loc, 'following', False, timeout, ele_only=ele_only)
if nodes:
return nodes[0]
if Settings.raise_when_ele_not_found:
raise ElementNotFoundError(None, 'after()', {'filter_loc': filter_loc,
'index': index, 'ele_only': ele_only})
else:
return NoneElement(self.page, 'after()', {'filter_loc': filter_loc, 'index': index, 'ele_only': ele_only})
return self._get_relative('after()', 'following', False, filter_loc, index, timeout, ele_only)
def children(self, filter_loc='', timeout=None, ele_only=True):
"""返回直接子元素元素或节点组成的列表,可用查询语法筛选
@ -268,7 +236,7 @@ class DrissionElement(BaseElement):
loc = loc[1].lstrip('./')
loc = f'xpath:./{loc}'
nodes = self._ele(loc, timeout=timeout, single=False, relative=True)
nodes = self._ele(loc, timeout=timeout, index=None, relative=True)
return [e for e in nodes if not (isinstance(e, str) and sub('[ \n\t\r]', '', e) == '')]
def prevs(self, filter_loc='', timeout=None, ele_only=True):
@ -278,7 +246,7 @@ class DrissionElement(BaseElement):
:param ele_only: 是否只获取元素为False时把文本注释节点也纳入
:return: 兄弟元素或节点文本组成的列表
"""
return self._get_brothers(filter_loc=filter_loc, direction='preceding', timeout=timeout, ele_only=ele_only)
return self._get_relatives(filter_loc=filter_loc, direction='preceding', timeout=timeout, ele_only=ele_only)
def nexts(self, filter_loc='', timeout=None, ele_only=True):
"""返回后面全部兄弟元素或节点组成的列表,可用查询语法筛选
@ -287,7 +255,7 @@ class DrissionElement(BaseElement):
:param ele_only: 是否只获取元素为False时把文本注释节点也纳入
:return: 兄弟元素或节点文本组成的列表
"""
return self._get_brothers(filter_loc=filter_loc, direction='following', timeout=timeout, ele_only=ele_only)
return self._get_relatives(filter_loc=filter_loc, direction='following', timeout=timeout, ele_only=ele_only)
def befores(self, filter_loc='', timeout=None, ele_only=True):
"""返回后面全部兄弟元素或节点组成的列表,可用查询语法筛选
@ -296,7 +264,7 @@ class DrissionElement(BaseElement):
:param ele_only: 是否只获取元素为False时把文本注释节点也纳入
:return: 本元素前面的元素或节点组成的列表
"""
return self._get_brothers(filter_loc=filter_loc, direction='preceding',
return self._get_relatives(filter_loc=filter_loc, direction='preceding',
brother=False, timeout=timeout, ele_only=ele_only)
def afters(self, filter_loc='', timeout=None, ele_only=True):
@ -306,11 +274,31 @@ class DrissionElement(BaseElement):
:param ele_only: 是否只获取元素为False时把文本注释节点也纳入
:return: 本元素后面的元素或节点组成的列表
"""
return self._get_brothers(filter_loc=filter_loc, direction='following',
return self._get_relatives(filter_loc=filter_loc, direction='following',
brother=False, timeout=timeout, ele_only=ele_only)
def _get_brothers(self, index=None, filter_loc='', direction='following',
brother=True, timeout=.5, ele_only=True):
def _get_relative(self, func, direction, brother, filter_loc='', index=1, timeout=None, ele_only=True):
"""获取一个亲戚元素或节点,可用查询语法筛选,可指定返回筛选结果的第几个
:param func: 方法名称
:param direction: 方向'following' 'preceding'
:param filter_loc: 用于筛选的查询语法
:param index: 前面第几个查询结果1开始
:param timeout: 查找节点的超时时间
:param ele_only: 是否只获取元素为False时把文本注释节点也纳入
:return: 本元素前面的某个元素或节点
"""
if isinstance(filter_loc, int):
index = filter_loc
filter_loc = ''
node = self._get_relatives(index, filter_loc, direction, brother, timeout, ele_only)
if node:
return node
if Settings.raise_when_ele_not_found:
raise ElementNotFoundError(None, func, {'filter_loc': filter_loc, 'index': index, 'ele_only': ele_only})
else:
return NoneElement(self.page, func, {'filter_loc': filter_loc, 'index': index, 'ele_only': ele_only})
def _get_relatives(self, index=None, filter_loc='', direction='following', brother=True, timeout=.5, ele_only=True):
"""按要求返回兄弟元素或节点组成的列表
:param index: 获取第几个该参数不为None时只获取该编号的元素
:param filter_loc: 用于筛选的查询语法
@ -319,9 +307,6 @@ class DrissionElement(BaseElement):
:param timeout: 查找等待时间
:return: 元素对象或字符串
"""
if index is not None and index < 1:
raise ValueError('index必须大于等于1。')
brother = '-sibling' if brother else ''
if not filter_loc:
@ -335,16 +320,11 @@ class DrissionElement(BaseElement):
loc = f'xpath:./{direction}{brother}::{loc}'
nodes = self._ele(loc, timeout=timeout, single=False, relative=True)
if index is not None:
index = index if direction == 'following' else -index
nodes = self._ele(loc, timeout=timeout, index=index, relative=True, raise_err=False)
if isinstance(nodes, list):
nodes = [e for e in nodes if not (isinstance(e, str) and sub('[ \n\t\r]', '', e) == '')]
if nodes and index is not None:
index = index - 1 if direction == 'following' else -index
try:
return [nodes[index]]
except IndexError:
return []
else:
return nodes
# ----------------以下属性或方法由后代实现----------------
@ -442,21 +422,29 @@ class BasePage(BaseParser):
def get(self, url, show_errmsg=False, retry=None, interval=None):
pass
def _ele(self, loc_or_ele, timeout=None, single=True, raise_err=None, method=None):
def _ele(self, loc_or_ele, timeout=None, index=1, raise_err=None, method=None):
"""调用获取元素的方法
:param loc_or_ele: 定位符
:param timeout: 超时时间
:param index: 获取第几个从1开始可传入负数获取倒数第几个
:param raise_err: 找不到时是否抛出异常
:param method: 调用的方法名
:return: 元素对象或它们组成的列表
"""
if not loc_or_ele:
raise ElementNotFoundError(None, method, {'loc_or_str': loc_or_ele})
r = self._find_elements(loc_or_ele, timeout=timeout, single=single, raise_err=raise_err)
r = self._find_elements(loc_or_ele, timeout=timeout, index=index, raise_err=raise_err)
if r or isinstance(r, list):
return r
if Settings.raise_when_ele_not_found or raise_err is True:
raise ElementNotFoundError(None, method, {'loc_or_str': loc_or_ele})
raise ElementNotFoundError(None, method, {'loc_or_str': loc_or_ele, 'index': index})
r.method = method
r.args = {'loc_or_str': loc_or_ele}
r.args = {'loc_or_str': loc_or_ele, 'index': index}
return r
@abstractmethod
def _find_elements(self, loc_or_ele, timeout=None, single=True, raise_err=None):
def _find_elements(self, loc_or_ele, timeout=None, index=1, raise_err=None):
pass

View File

@ -6,7 +6,7 @@
@License : BSD 3-Clause.
"""
from abc import abstractmethod
from typing import Union, Tuple, List, Any
from typing import Union, Tuple, List, Any, Optional
from DownloadKit import DownloadKit
@ -15,9 +15,12 @@ from .._elements.none_element import NoneElement
class BaseParser(object):
def __call__(self, loc_or_str: Union[Tuple[str, str], str]): ...
def __call__(self, loc_or_str: Union[Tuple[str, str], str], index: int = 1): ...
def ele(self, loc_or_ele: Union[Tuple[str, str], str, BaseElement], timeout: float = None): ...
def ele(self,
loc_or_ele: Union[Tuple[str, str], str, BaseElement],
index: int = 1,
timeout: float = None): ...
def eles(self, loc_or_str: Union[Tuple[str, str], str], timeout=None): ...
@ -25,15 +28,23 @@ class BaseParser(object):
@property
def html(self) -> str: ...
def s_ele(self, loc_or_ele: Union[Tuple[str, str], str, BaseElement]): ...
def s_ele(self, loc_or_ele: Union[Tuple[str, str], str, BaseElement], index: int = 1): ...
def s_eles(self, loc_or_str: Union[Tuple[str, str], str]): ...
def _ele(self, loc_or_ele, timeout: float = None, single: bool = True,
raise_err: bool = None, method: str = None): ...
def _ele(self,
loc_or_ele,
timeout: float = None,
index: Optional[int] = 1,
raise_err: bool = None,
method: str = None): ...
@abstractmethod
def _find_elements(self, loc_or_ele, timeout: float = None, single: bool = True, raise_err: bool = None): ...
def _find_elements(self,
loc_or_ele,
timeout: float = None,
index: Optional[int] = 1,
raise_err: bool = None): ...
class BaseElement(BaseParser):
@ -45,11 +56,19 @@ class BaseElement(BaseParser):
@property
def tag(self) -> str: ...
def _ele(self, loc_or_str: Union[Tuple[str, str], str], timeout: float = None, single: bool = True,
relative: bool = False, raise_err: bool = None, method: str = None): ...
def _ele(self,
loc_or_str: Union[Tuple[str, str], str],
timeout: float = None,
index: Optional[int] = 1,
relative: bool = False,
raise_err: bool = None,
method: str = None): ...
@abstractmethod
def _find_elements(self, loc_or_str, timeout: float = None, single: bool = True, relative: bool = False,
def _find_elements(self, loc_or_str,
timeout: float = None,
index: Optional[int] = 1,
relative: bool = False,
raise_err: bool = None): ...
def parent(self, level_or_loc: Union[tuple, str, int] = 1): ...
@ -83,41 +102,81 @@ class DrissionElement(BaseElement):
def texts(self, text_node_only: bool = False) -> list: ...
def parent(self, level_or_loc: Union[tuple, str, int] = 1, index: int = 1) -> Union[DrissionElement, None]: ...
def parent(self,
level_or_loc: Union[tuple, str, int] = 1,
index: int = 1) -> Union[DrissionElement, None]: ...
def child(self, filter_loc: Union[tuple, str, int] = '', index: int = 1,
timeout: float = None, ele_only: bool = True) -> Union[DrissionElement, str, NoneElement]: ...
def child(self,
filter_loc: Union[tuple, str, int] = '',
index: int = 1,
timeout: float = None,
ele_only: bool = True) -> Union[DrissionElement, str, NoneElement]: ...
def prev(self, filter_loc: Union[tuple, str, int] = '', index: int = 1,
timeout: float = None, ele_only: bool = True) -> Union[DrissionElement, str, NoneElement]: ...
def prev(self,
filter_loc: Union[tuple, str, int] = '',
index: int = 1,
timeout: float = None,
ele_only: bool = True) -> Union[DrissionElement, str, NoneElement]: ...
def next(self, filter_loc: Union[tuple, str, int] = '', index: int = 1,
timeout: float = None, ele_only: bool = True) -> Union[DrissionElement, str, NoneElement]: ...
def next(self,
filter_loc: Union[tuple, str, int] = '',
index: int = 1,
timeout: float = None,
ele_only: bool = True) -> Union[DrissionElement, str, NoneElement]: ...
def before(self, filter_loc: Union[tuple, str, int] = '', index: int = 1,
timeout: float = None, ele_only: bool = True) -> Union[DrissionElement, str, NoneElement]: ...
def before(self,
filter_loc: Union[tuple, str, int] = '',
index: int = 1,
timeout: float = None,
ele_only: bool = True) -> Union[DrissionElement, str, NoneElement]: ...
def after(self, filter_loc: Union[tuple, str, int] = '', index: int = 1,
timeout: float = None, ele_only: bool = True) -> Union[DrissionElement, str, NoneElement]: ...
def after(self,
filter_loc: Union[tuple, str, int] = '',
index: int = 1,
timeout: float = None,
ele_only: bool = True) -> Union[DrissionElement, str, NoneElement]: ...
def children(self, filter_loc: Union[tuple, str] = '', timeout: float = None,
def children(self,
filter_loc: Union[tuple, str] = '',
timeout: float = None,
ele_only: bool = True) -> List[Union[DrissionElement, str]]: ...
def prevs(self, filter_loc: Union[tuple, str] = '', timeout: float = None,
def prevs(self,
filter_loc: Union[tuple, str] = '',
timeout: float = None,
ele_only: bool = True) -> List[Union[DrissionElement, str]]: ...
def nexts(self, filter_loc: Union[tuple, str] = '', timeout: float = None,
def nexts(self,
filter_loc: Union[tuple, str] = '',
timeout: float = None,
ele_only: bool = True) -> List[Union[DrissionElement, str]]: ...
def befores(self, filter_loc: Union[tuple, str] = '', timeout: float = None,
def befores(self,
filter_loc: Union[tuple, str] = '',
timeout: float = None,
ele_only: bool = True) -> List[Union[DrissionElement, str]]: ...
def afters(self, filter_loc: Union[tuple, str] = '', timeout: float = None,
def afters(self,
filter_loc: Union[tuple, str] = '',
timeout: float = None,
ele_only: bool = True) -> List[Union[DrissionElement, str]]: ...
def _get_brothers(self, index: int = None, filter_loc: Union[tuple, str] = '',
direction: str = 'following', brother: bool = True,
timeout: float = 0.5, ele_only: bool = True) -> List[Union[DrissionElement, str]]: ...
def _get_relative(self,
func: str,
direction: str,
brother: bool,
filter_loc: Union[tuple, str] = '',
index: int = 1,
timeout: float = None,
ele_only: bool = True) -> DrissionElement: ...
def _get_relatives(self,
index: int = None,
filter_loc: Union[tuple, str] = '',
direction: str = 'following',
brother: bool = True,
timeout: float = 0.5,
ele_only: bool = True) -> List[Union[DrissionElement, str]]: ...
# ----------------以下属性或方法由后代实现----------------
@property
@ -184,8 +243,16 @@ class BasePage(BaseParser):
@abstractmethod
def get(self, url: str, show_errmsg: bool = False, retry: int = None, interval: float = None): ...
def _ele(self, loc_or_ele, timeout: float = None, single: bool = True,
raise_err: bool = None, method: str = None): ...
def _ele(self,
loc_or_ele,
timeout: float = None,
index: Optional[int] = 1,
raise_err: bool = None,
method: str = None): ...
@abstractmethod
def _find_elements(self, loc_or_ele, timeout: float = None, single: bool = True, raise_err: bool = None): ...
def _find_elements(self,
loc_or_ele,
timeout: float = None,
index: Optional[int] = 1,
raise_err: bool = None): ...

View File

@ -63,12 +63,13 @@ class Browser(object):
self._driver.set_callback('Target.targetDestroyed', self._onTargetDestroyed)
self._driver.set_callback('Target.targetCreated', self._onTargetCreated)
def _get_driver(self, tab_id):
def _get_driver(self, tab_id, owner=None):
"""获取对应tab id的Driver
:param tab_id: 标签页id
:param owner: 使用该驱动的对象
:return: Driver对象
"""
return self._drivers.pop(tab_id, Driver(tab_id, 'page', self.address))
return self._drivers.pop(tab_id, Driver(tab_id, 'page', self.address, owner))
def _onTargetCreated(self, **kwargs):
"""标签页创建时执行"""
@ -201,7 +202,8 @@ class Browser(object):
except TypeError:
pass
def _on_quit(self):
def _on_disconnect(self):
self.page._on_disconnect()
Browser.BROWSERS.pop(self.id, None)
if self.page._chromium_options.is_auto_port and self.page._chromium_options.user_data_path:
path = Path(self.page._chromium_options.user_data_path)

View File

@ -28,7 +28,7 @@ class Browser(object):
def __init__(self, address: str, browser_id: str, page: ChromiumPage): ...
def _get_driver(self, tab_id: str) -> Driver: ...
def _get_driver(self, tab_id: str, owner=None) -> Driver: ...
def run_cdp(self, cmd, **cmd_args) -> dict: ...
@ -61,4 +61,4 @@ class Browser(object):
def quit(self, timeout: float = 5, force: bool = False) -> None: ...
def _on_quit(self) -> None: ...
def _on_disconnect(self) -> None: ...

View File

@ -12,21 +12,23 @@ from time import perf_counter, sleep
from requests import get
from websocket import (WebSocketTimeoutException, WebSocketConnectionClosedException, create_connection,
WebSocketException)
WebSocketException, WebSocketBadStatusException)
from ..errors import PageDisconnectedError
from ..errors import PageDisconnectedError, TargetNotFoundError
class Driver(object):
def __init__(self, tab_id, tab_type, address):
def __init__(self, tab_id, tab_type, address, owner=None):
"""
:param tab_id: 标签页id
:param tab_type: 标签页类型
:param address: 浏览器连接地址
:param owner: 创建这个驱动的对象
"""
self.id = tab_id
self.address = address
self.type = tab_type
self.owner = owner
self._debug = False
self.alert_flag = False # 标记alert出现跳过一条请求后复原
@ -195,7 +197,10 @@ class Driver(object):
def start(self):
"""启动连接"""
self._stopped.clear()
try:
self._ws = create_connection(self._websocket_url, enable_multithread=True, suppress_origin=True)
except WebSocketBadStatusException as e:
raise TargetNotFoundError(f'找不到页面:{self.id}') if 'No such target id' in str(e) else e
self._recv_th.start()
self._handle_event_th.start()
return True
@ -230,6 +235,9 @@ class Driver(object):
self.method_results.clear()
self.event_queue.queue.clear()
if hasattr(self.owner, '_on_disconnect'):
self.owner._on_disconnect()
def set_callback(self, event, callback, immediate=False):
"""绑定cdp event和回调方法
:param event: cdp event
@ -247,18 +255,17 @@ class Driver(object):
class BrowserDriver(Driver):
BROWSERS = {}
def __new__(cls, tab_id, tab_type, address, browser):
def __new__(cls, tab_id, tab_type, address, owner):
if tab_id in cls.BROWSERS:
return cls.BROWSERS[tab_id]
return object.__new__(cls)
def __init__(self, tab_id, tab_type, address, browser):
def __init__(self, tab_id, tab_type, address, owner):
if hasattr(self, '_created'):
return
self._created = True
BrowserDriver.BROWSERS[tab_id] = self
super().__init__(tab_id, tab_type, address)
self.browser = browser
super().__init__(tab_id, tab_type, address, owner)
def __repr__(self):
return f'<BrowserDriver {self.id}>'
@ -267,7 +274,3 @@ class BrowserDriver(Driver):
r = get(url, headers={'Connection': 'close'})
r.close()
return r
def _stop(self):
super()._stop()
self.browser._on_quit()

View File

@ -27,7 +27,7 @@ class Driver(object):
id: str
address: str
type: str
# _debug: bool
owner = ...
alert_flag: bool
_websocket_url: str
_cur_id: int
@ -42,7 +42,7 @@ class Driver(object):
event_queue: Queue
immediate_event_queue: Queue
def __init__(self, tab_id: str, tab_type: str, address: str): ...
def __init__(self, tab_id: str, tab_type: str, address: str, owner=None): ...
def _send(self, message: dict, timeout: float = None) -> dict: ...
@ -67,10 +67,10 @@ class Driver(object):
class BrowserDriver(Driver):
BROWSERS: Dict[str, Driver] = ...
browser: Browser = ...
owner: Browser = ...
def __new__(cls, tab_id: str, tab_type: str, address: str, browser: Browser): ...
def __new__(cls, tab_id: str, tab_type: str, address: str, owner: Browser): ...
def __init__(self, tab_id: str, tab_type: str, address: str, browser: Browser): ...
def __init__(self, tab_id: str, tab_type: str, address: str, owner: Browser): ...
def get(self, url) -> Response: ...

View File

@ -5,7 +5,7 @@ tmp_path =
[chromium_options]
address = 127.0.0.1:9222
browser_path = chrome
arguments = ['--no-default-browser-check', '--disable-suggestions-ui', '--no-first-run', '--disable-infobars', '--disable-popup-blocking']
arguments = ['--no-default-browser-check', '--disable-suggestions-ui', '--no-first-run', '--disable-infobars', '--disable-popup-blocking', '--hide-crash-restore-bubble']
extensions = []
prefs = {'profile.default_content_settings.popups': 0, 'profile.default_content_setting_values': {'notifications': 2}}
flags = {}

View File

@ -5,6 +5,7 @@
@Copyright: (c) 2024 by g1879, Inc. All Rights Reserved.
@License : BSD 3-Clause.
"""
from json import loads
from os.path import basename, sep
from pathlib import Path
from re import search
@ -80,13 +81,13 @@ class ChromiumElement(DrissionElement):
attrs = [f"{attr}='{attrs[attr]}'" for attr in attrs]
return f'<ChromiumElement {self.tag} {" ".join(attrs)}>'
def __call__(self, loc_or_str, timeout=None):
def __call__(self, loc_or_str, index=1, timeout=None):
"""在内部查找元素
:param loc_or_str: 元素的定位信息可以是loc元组或查询字符串
:param timeout: 超时时间
:return: ChromiumElement对象或属性文本
"""
return self.ele(loc_or_str, timeout)
return self.ele(loc_or_str, index=index, timeout=timeout)
def __eq__(self, other):
return self._backend_id == getattr(other, '_backend_id', None)
@ -227,8 +228,8 @@ class ChromiumElement(DrissionElement):
def parent(self, level_or_loc=1, index=1):
"""返回上面某一级父元素,可指定层数或用查询语法定位
:param level_or_loc: 第几级父元素或定位符
:param index: 当level_or_loc传入定位符使用此参数选择第几个结果
:param level_or_loc: 第几级父元素1开始或定位符
:param index: 当level_or_loc传入定位符使用此参数选择第几个结果1开始
:return: 上级元素对象
"""
return super().parent(level_or_loc, index)
@ -264,7 +265,7 @@ class ChromiumElement(DrissionElement):
return super().next(filter_loc, index, timeout, ele_only=ele_only)
def before(self, filter_loc='', index=1, timeout=None, ele_only=True):
"""返回文档中当前元素前面符合条件的一个元素,可用查询语法筛选,可指定返回筛选结果的第几个
"""返回文档中当前元素前面符合条件的一个元素,可用查询语法筛选,可指定返回筛选结果的第几个
查找范围不限同级元素而是整个DOM文档
:param filter_loc: 用于筛选的查询语法
:param index: 前面第几个查询结果1开始
@ -275,7 +276,7 @@ class ChromiumElement(DrissionElement):
return super().before(filter_loc, index, timeout, ele_only=ele_only)
def after(self, filter_loc='', index=1, timeout=None, ele_only=True):
"""返回文档中此当前元素后面符合条件的一个元素,可用查询语法筛选,可指定返回筛选结果的第几个
"""返回文档中此当前元素后面符合条件的一个元素,可用查询语法筛选,可指定返回筛选结果的第几个
查找范围不限同级元素而是整个DOM文档
:param filter_loc: 用于筛选的查询语法
:param index: 第几个查询结果1开始
@ -400,13 +401,14 @@ class ChromiumElement(DrissionElement):
"""
run_js(self, script, as_expr, 0, args)
def ele(self, loc_or_str, timeout=None):
"""返回当前元素下级符合条件的一个元素、属性或节点文本
def ele(self, loc_or_str, index=1, timeout=None):
"""返回当前元素下级符合条件的一个元素、属性或节点文本
:param loc_or_str: 元素的定位信息可以是loc元组或查询字符串
:param index: 获取第几个元素从1开始可传入负数获取倒数第几个
:param timeout: 查找元素超时时间默认与元素所在页面等待时间一致
:return: ChromiumElement对象或属性文本
"""
return self._ele(loc_or_str, timeout, method='ele()')
return self._ele(loc_or_str, timeout, index=index, method='ele()')
def eles(self, loc_or_str, timeout=None):
"""返回当前元素下级所有符合条件的子元素、属性或节点文本
@ -414,17 +416,18 @@ class ChromiumElement(DrissionElement):
:param timeout: 查找元素超时时间默认与元素所在页面等待时间一致
:return: ChromiumElement对象或属性文本组成的列表
"""
return self._ele(loc_or_str, timeout=timeout, single=False)
return self._ele(loc_or_str, timeout=timeout, index=None)
def s_ele(self, loc_or_str=None):
"""查找一个符合条件的元素以SessionElement形式返回
def s_ele(self, loc_or_str=None, index=1):
"""查找一个符合条件的元素以SessionElement形式返回
:param loc_or_str: 元素的定位信息可以是loc元组或查询字符串
:param index: 获取第几个从1开始可传入负数获取倒数第几个
:return: SessionElement对象或属性文本
"""
if self.tag in __FRAME_ELEMENT__:
r = make_session_ele(self.inner_html, loc_or_str)
r = make_session_ele(self.inner_html, loc_or_str, index=index)
else:
r = make_session_ele(self, loc_or_str)
r = make_session_ele(self, loc_or_str, index=index)
if isinstance(r, NoneElement):
if Settings.raise_when_ele_not_found:
raise ElementNotFoundError(None, 's_ele()', {'loc_or_str': loc_or_str})
@ -439,19 +442,19 @@ class ChromiumElement(DrissionElement):
:return: SessionElement或属性文本组成的列表
"""
if self.tag in __FRAME_ELEMENT__:
return make_session_ele(self.inner_html, loc_or_str, single=False)
return make_session_ele(self, loc_or_str, single=False)
return make_session_ele(self.inner_html, loc_or_str, index=None)
return make_session_ele(self, loc_or_str, index=None)
def _find_elements(self, loc_or_str, timeout=None, single=True, relative=False, raise_err=None):
def _find_elements(self, loc_or_str, timeout=None, index=1, relative=False, raise_err=None):
"""返回当前元素下级符合条件的子元素、属性或节点文本,默认返回第一个
:param loc_or_str: 元素的定位信息可以是loc元组或查询字符串
:param timeout: 查找元素超时时间
:param single: True则返回第一个False则返回全部
:param index: 第几个结果从1开始可传入负数获取倒数第几个为None返回所有
:param relative: WebPage用的表示是否相对定位的参数
:param raise_err: 找不到元素是是否抛出异常为None时根据全局设置
:return: ChromiumElement对象或文本属性或其组成的列表
"""
return find_in_chromium_ele(self, loc_or_str, single, timeout, relative=relative)
return find_in_chromium_ele(self, loc_or_str, index, timeout, relative=relative)
def style(self, style, pseudo_ele=''):
"""返回元素样式属性值,可获取伪元素属性值
@ -806,14 +809,15 @@ class ShadowRoot(BaseElement):
def __repr__(self):
return f'<ShadowRoot in {self.parent_ele}>'
def __call__(self, loc_or_str, timeout=None):
def __call__(self, loc_or_str, index=1, timeout=None):
"""在内部查找元素
ele2 = ele1('@id=ele_id')
:param loc_or_str: 元素的定位信息可以是loc元组或查询字符串
:param index: 获取第几个从1开始可传入负数获取倒数第几个
:param timeout: 超时时间
:return: 元素对象或属性文本
"""
return self.ele(loc_or_str, timeout)
return self.ele(loc_or_str, index=index, timeout=timeout)
def __eq__(self, other):
return self._backend_id == getattr(other, '_backend_id', None)
@ -890,16 +894,19 @@ class ShadowRoot(BaseElement):
:param index: 第几个查询结果1开始
:return: 直接子元素或节点文本组成的列表
"""
nodes = self.children(filter_loc=filter_loc)
if not nodes:
if Settings.raise_when_ele_not_found:
raise ElementNotFoundError(None, 'child()', {'filter_loc': filter_loc, 'index': index})
if not filter_loc:
loc = '*'
else:
return NoneElement(self.page, 'child()', {'filter_loc': filter_loc, 'index': index})
loc = get_loc(filter_loc, True) # 把定位符转换为xpath
if loc[0] == 'css selector':
raise ValueError('此css selector语法不受支持请换成xpath。')
loc = loc[1].lstrip('./')
loc = f'xpath:./{loc}'
ele = self._ele(loc, index=index, relative=True)
if ele:
return ele
try:
return nodes[index - 1]
except IndexError:
if Settings.raise_when_ele_not_found:
raise ElementNotFoundError(None, 'child()', {'filter_loc': filter_loc, 'index': index})
else:
@ -911,31 +918,45 @@ class ShadowRoot(BaseElement):
:param index: 第几个查询结果1开始
:return: ChromiumElement对象
"""
nodes = self.nexts(filter_loc=filter_loc)
if nodes:
return nodes[index - 1]
loc = get_loc(filter_loc, True)
if loc[0] == 'css selector':
raise ValueError('此css selector语法不受支持请换成xpath。')
loc = loc[1].lstrip('./')
xpath = f'xpath:./{loc}'
ele = self.parent_ele._ele(xpath, index=index, relative=True)
if ele:
return ele
if Settings.raise_when_ele_not_found:
raise ElementNotFoundError(None, 'next()', {'filter_loc': filter_loc, 'index': index})
else:
return NoneElement(self.page, 'next()', {'filter_loc': filter_loc, 'index': index})
def before(self, filter_loc='', index=1):
"""返回文档中当前元素前面符合条件的第一个元素,可用查询语法筛选,可指定返回筛选结果的第几个
"""返回文档中当前元素前面符合条件的一个元素,可用查询语法筛选,可指定返回筛选结果的第几个
查找范围不限同级元素而是整个DOM文档
:param filter_loc: 用于筛选的查询语法
:param index: 前面第几个查询结果1开始
:return: 本元素前面的某个元素或节点
"""
nodes = self.befores(filter_loc=filter_loc)
if nodes:
return nodes[index - 1]
loc = get_loc(filter_loc, True)
if loc[0] == 'css selector':
raise ValueError('此css selector语法不受支持请换成xpath。')
loc = loc[1].lstrip('./')
xpath = f'xpath:./preceding::{loc}'
ele = self.parent_ele._ele(xpath, index=index, relative=True)
if ele:
return ele
if Settings.raise_when_ele_not_found:
raise ElementNotFoundError(None, 'before()', {'filter_loc': filter_loc, 'index': index})
else:
return NoneElement(self.page, 'before()', {'filter_loc': filter_loc, 'index': index})
def after(self, filter_loc='', index=1):
"""返回文档中此当前元素后面符合条件的第一个元素,可用查询语法筛选,可指定返回筛选结果的第几个
"""返回文档中此当前元素后面符合条件的一个元素,可用查询语法筛选,可指定返回筛选结果的第几个
查找范围不限同级元素而是整个DOM文档
:param filter_loc: 用于筛选的查询语法
:param index: 后面第几个查询结果1开始
@ -963,7 +984,7 @@ class ShadowRoot(BaseElement):
loc = loc[1].lstrip('./')
loc = f'xpath:./{loc}'
return self._ele(loc, single=False, relative=True)
return self._ele(loc, index=None, relative=True)
def nexts(self, filter_loc=''):
"""返回当前元素后面符合条件的同级元素或节点组成的列表,可用查询语法筛选
@ -976,7 +997,7 @@ class ShadowRoot(BaseElement):
loc = loc[1].lstrip('./')
xpath = f'xpath:./{loc}'
return self.parent_ele._ele(xpath, single=False, relative=True)
return self.parent_ele._ele(xpath, index=None, relative=True)
def befores(self, filter_loc=''):
"""返回文档中当前元素前面符合条件的元素或节点组成的列表,可用查询语法筛选
@ -990,7 +1011,7 @@ class ShadowRoot(BaseElement):
loc = loc[1].lstrip('./')
xpath = f'xpath:./preceding::{loc}'
return self.parent_ele._ele(xpath, single=False, relative=True)
return self.parent_ele._ele(xpath, index=None, relative=True)
def afters(self, filter_loc=''):
"""返回文档中当前元素后面符合条件的元素或节点组成的列表,可用查询语法筛选
@ -1001,15 +1022,16 @@ class ShadowRoot(BaseElement):
eles1 = self.nexts(filter_loc)
loc = get_loc(filter_loc, True)[1].lstrip('./')
xpath = f'xpath:./following::{loc}'
return eles1 + self.parent_ele._ele(xpath, single=False, relative=True)
return eles1 + self.parent_ele._ele(xpath, index=None, relative=True)
def ele(self, loc_or_str, timeout=None):
"""返回当前元素下级符合条件的一个元素
def ele(self, loc_or_str, index=1, timeout=None):
"""返回当前元素下级符合条件的一个元素
:param loc_or_str: 元素的定位信息可以是loc元组或查询字符串
:param index: 获取第几个元素从1开始可传入负数获取倒数第几个
:param timeout: 查找元素超时时间默认与元素所在页面等待时间一致
:return: ChromiumElement对象
"""
return self._ele(loc_or_str, timeout, method='ele()')
return self._ele(loc_or_str, timeout, index=index, method='ele()')
def eles(self, loc_or_str, timeout=None):
"""返回当前元素下级所有符合条件的子元素
@ -1017,14 +1039,15 @@ class ShadowRoot(BaseElement):
:param timeout: 查找元素超时时间默认与元素所在页面等待时间一致
:return: ChromiumElement对象组成的列表
"""
return self._ele(loc_or_str, timeout=timeout, single=False)
return self._ele(loc_or_str, timeout=timeout, index=None)
def s_ele(self, loc_or_str=None):
"""查找一个符合条件的元素以SessionElement形式返回处理复杂页面时效率很高
def s_ele(self, loc_or_str=None, index=1):
"""查找一个符合条件的元素以SessionElement形式返回处理复杂页面时效率很高
:param loc_or_str: 元素的定位信息可以是loc元组或查询字符串
:param index: 获取第几个从1开始可传入负数获取倒数第几个
:return: SessionElement对象或属性文本
"""
r = make_session_ele(self, loc_or_str)
r = make_session_ele(self, loc_or_str, index=index)
if isinstance(r, NoneElement):
r.method = 's_ele()'
r.args = {'loc_or_str': loc_or_str}
@ -1035,13 +1058,13 @@ class ShadowRoot(BaseElement):
:param loc_or_str: 元素的定位信息可以是loc元组或查询字符串
:return: SessionElement对象
"""
return make_session_ele(self, loc_or_str, single=False)
return make_session_ele(self, loc_or_str, index=None)
def _find_elements(self, loc_or_str, timeout=None, single=True, relative=False, raise_err=None):
def _find_elements(self, loc_or_str, timeout=None, index=1, relative=False, raise_err=None):
"""返回当前元素下级符合条件的子元素、属性或节点文本,默认返回第一个
:param loc_or_str: 元素的定位信息可以是loc元组或查询字符串
:param timeout: 查找元素超时时间
:param single: True则返回第一个False则返回全部
:param index: 第几个结果从1开始可传入负数获取倒数第几个为None返回所有
:param relative: WebPage用的表示是否相对定位的参数
:param raise_err: 找不到元素是是否抛出异常为None时根据全局设置
:return: ChromiumElement对象或其组成的列表
@ -1052,15 +1075,15 @@ class ShadowRoot(BaseElement):
def do_find():
if loc[0] == 'css selector':
if single:
if index == 1:
nod_id = self.page.run_cdp('DOM.querySelector', nodeId=self._node_id, selector=loc[1])['nodeId']
if nod_id:
r = make_chromium_ele(self.page, node_id=nod_id)
r = make_chromium_eles(self.page, _ids=nod_id, is_obj_id=False)
return None if r is False else r
else:
nod_ids = self.page.run_cdp('DOM.querySelectorAll', nodeId=self._node_id, selector=loc[1])['nodeId']
r = make_chromium_eles(self.page, node_ids=nod_ids, single=False)
r = make_chromium_eles(self.page, _ids=nod_ids, index=index, is_obj_id=False)
return None if r is False else r
else:
@ -1069,16 +1092,20 @@ class ShadowRoot(BaseElement):
return None
css = [i.css_path[61:] for i in eles]
if single:
node_id = self.page.run_cdp('DOM.querySelector', nodeId=self._node_id, selector=css[0])['nodeId']
r = make_chromium_ele(self.page, node_id=node_id)
if index is not None:
try:
node_id = self.page.run_cdp('DOM.querySelector', nodeId=self._node_id,
selector=css[index - 1])['nodeId']
except IndexError:
return None
r = make_chromium_eles(self.page, _ids=node_id, is_obj_id=False)
return None if r is False else r
else:
node_ids = [self.page.run_cdp('DOM.querySelector', nodeId=self._node_id, selector=i)['nodeId']
for i in css]
if 0 in node_ids:
return None
r = make_chromium_eles(self.page, node_ids=node_ids, single=False)
r = make_chromium_eles(self.page, _ids=node_ids, index=index, is_obj_id=False)
return None if r is False else r
timeout = timeout if timeout is not None else self.page.timeout
@ -1090,7 +1117,7 @@ class ShadowRoot(BaseElement):
if result:
return result
return NoneElement(self.page) if single else []
return NoneElement(self.page) if index is not None else []
def _get_node_id(self, obj_id):
"""返回元素node id"""
@ -1107,11 +1134,11 @@ class ShadowRoot(BaseElement):
return r['backendNodeId']
def find_in_chromium_ele(ele, loc, single=True, timeout=None, relative=True):
def find_in_chromium_ele(ele, loc, index=1, timeout=None, relative=True):
"""在chromium元素中查找
:param ele: ChromiumElement对象
:param loc: 元素定位元组
:param single: True则返回第一个False则返回全部
:param index: 第几个结果从1开始可传入负数获取倒数第几个为None返回所有
:param timeout: 查找元素超时时间
:param relative: WebPage用于标记是否相对定位使用
:return: 返回ChromiumElement元素或它们组成的列表
@ -1133,25 +1160,25 @@ def find_in_chromium_ele(ele, loc, single=True, timeout=None, relative=True):
# ---------------执行查找-----------------
if loc[0] == 'xpath':
return find_by_xpath(ele, loc[1], single, timeout, relative=relative)
return find_by_xpath(ele, loc[1], index, timeout, relative=relative)
else:
return find_by_css(ele, loc[1], single, timeout)
return find_by_css(ele, loc[1], index, timeout)
def find_by_xpath(ele, xpath, single, timeout, relative=True):
def find_by_xpath(ele, xpath, index, timeout, relative=True):
"""执行用xpath在元素中查找元素
:param ele: 在此元素中查找
:param xpath: 查找语句
:param single: 是否只返回第一个结果
:param index: 第几个结果从1开始可传入负数获取倒数第几个为None返回所有
:param timeout: 超时时间
:param relative: 是否相对定位
:return: ChromiumElement或其组成的列表
"""
type_txt = '9' if single else '7'
type_txt = '9' if index == 1 else '7'
node_txt = 'this.contentDocument' if ele.tag in __FRAME_ELEMENT__ and not relative else 'this'
js = make_js_for_find_ele_by_xpath(xpath, type_txt, node_txt)
ele.page.wait.load_complete()
ele.page.wait.doc_loaded()
def do_find():
res = ele.page.run_cdp('Runtime.callFunctionOn', functionDeclaration=js, objectId=ele._obj_id,
@ -1170,21 +1197,30 @@ def find_by_xpath(ele, xpath, single, timeout, relative=True):
if res['result']['subtype'] == 'null' or res['result']['description'] in ('NodeList(0)', 'Array(0)'):
return None
if single:
r = make_chromium_ele(ele.page, obj_id=res['result']['objectId'])
if index == 1:
r = make_chromium_eles(ele.page, _ids=res['result']['objectId'], is_obj_id=True)
return None if r is False else r
else:
# from pprint import pprint
# for i in ele.page.run_cdp('Runtime.getProperties',
# objectId=res['result']['objectId'],
# ownProperties=True)['result'][:-1]:
# pprint(i)
r = [make_chromium_ele(ele.page, obj_id=i['value']['objectId']) if i['value']['type'] == 'object' else
i['value']['value'] for i in ele.page.run_cdp('Runtime.getProperties',
objectId=res['result']['objectId'],
ownProperties=True)['result'][:-1]]
return None if not r or r is False in r else r
res = ele.page.run_cdp('Runtime.getProperties', objectId=res['result']['objectId'],
ownProperties=True)['result'][:-1]
if index is None:
r = [make_chromium_eles(ele.page, _ids=i['value']['objectId'], is_obj_id=True)
if i['value']['type'] == 'object' else i['value']['value'] for i in res]
return None if False in r else r
else:
eles_count = len(res)
if eles_count == 0 or abs(index) > eles_count:
return None
index1 = eles_count + index + 1 if index < 0 else index
res = res[index1 - 1]
if res['value']['type'] == 'object':
r = make_chromium_eles(ele.page, _ids=res['value']['objectId'], is_obj_id=True)
else:
r = res['value']['value']
return None if r is False else r
end_time = perf_counter() + timeout
result = do_find()
@ -1194,23 +1230,23 @@ def find_by_xpath(ele, xpath, single, timeout, relative=True):
if result:
return result
return NoneElement(ele.page) if single else []
return NoneElement(ele.page) if index is not None else []
def find_by_css(ele, selector, single, timeout):
def find_by_css(ele, selector, index, timeout):
"""执行用css selector在元素中查找元素
:param ele: 在此元素中查找
:param selector: 查找语句
:param single: 是否只返回第一个结果
:param index: 第几个结果从1开始可传入负数获取倒数第几个为None返回所有
:param timeout: 超时时间
:return: ChromiumElement或其组成的列表
"""
selector = selector.replace('"', r'\"')
find_all = '' if single else 'All'
find_all = '' if index == 1 else 'All'
node_txt = 'this.contentDocument' if ele.tag in ('iframe', 'frame', 'shadow-root') else 'this'
js = f'function(){{return {node_txt}.querySelector{find_all}("{selector}");}}'
ele.page.wait.load_complete()
ele.page.wait.doc_loaded()
def do_find():
res = ele.page.run_cdp('Runtime.callFunctionOn', functionDeclaration=js, objectId=ele._obj_id,
@ -1221,15 +1257,15 @@ def find_by_css(ele, selector, single, timeout):
if res['result']['subtype'] == 'null' or res['result']['description'] in ('NodeList(0)', 'Array(0)'):
return None
if single:
r = make_chromium_ele(ele.page, obj_id=res['result']['objectId'])
if index == 1:
r = make_chromium_eles(ele.page, _ids=res['result']['objectId'], is_obj_id=True)
return None if r is False else r
else:
node_ids = [i['value']['objectId'] for i in ele.page.run_cdp('Runtime.getProperties',
obj_ids = [i['value']['objectId'] for i in ele.page.run_cdp('Runtime.getProperties',
objectId=res['result']['objectId'],
ownProperties=True)['result'][:-1]]
r = make_chromium_eles(ele.page, obj_ids=node_ids, single=False, ele_only=False)
r = make_chromium_eles(ele.page, _ids=obj_ids, index=index, is_obj_id=True)
return None if r is False else r
end_time = perf_counter() + timeout
@ -1240,113 +1276,79 @@ def find_by_css(ele, selector, single, timeout):
if result:
return result
return NoneElement(ele.page) if single else []
return NoneElement(ele.page) if index is not None else []
def make_chromium_ele(page, node_id=None, obj_id=None):
def make_chromium_eles(page, _ids, index=1, is_obj_id=True):
"""根据node id或object id生成相应元素对象
:param page: ChromiumPage对象
:param node_id: 元素的node id
:param obj_id: 元素的object id
:return: ChromiumElement对象或ChromiumFrame对象生成失败返回False
:param _ids: 元素的id列表
:param index: 获取第几个为None返回全部
:param is_obj_id: 传入的id是obj id还是node id
:return: 浏览器元素对象或它们组成的列表生成失败返回False
"""
if node_id:
node = page.driver.run('DOM.describeNode', nodeId=node_id)
if 'error' in node:
return False
if node['node']['nodeName'] in ('#text', '#comment'):
# todo: Node()
return node['node']['nodeValue']
backend_id = node['node']['backendNodeId']
obj_id = page.run_cdp('DOM.resolveNode', nodeId=node_id)['object']['objectId']
if 'error' in obj_id:
return False
elif obj_id:
node = page.driver.run('DOM.describeNode', objectId=obj_id)
if 'error' in node:
return False
if node['node']['nodeName'] in ('#text', '#comment'):
# todo: Node()
return node['node']['nodeValue']
backend_id = node['node']['backendNodeId']
node_id = node['node']['nodeId']
if is_obj_id:
get_node_func = _get_node_by_obj_id
else:
return False
get_node_func = _get_node_by_node_id
if not isinstance(_ids, (list, tuple)):
_ids = (_ids,)
ele = ChromiumElement(page, obj_id=obj_id, node_id=node_id, backend_id=backend_id)
if ele.tag in __FRAME_ELEMENT__:
from .._pages.chromium_frame import ChromiumFrame
ele = ChromiumFrame(page, ele, node)
if index is not None: # 获取一个
obj_id = _ids[index - 1]
return get_node_func(page, obj_id)
return ele
def make_chromium_eles(page, node_ids=None, obj_ids=None, single=True, ele_only=True):
"""根据node id或object id生成相应元素对象
:param page: ChromiumPage对象
:param node_ids: 元素的node id
:param obj_ids: 元素的object id
:param single: 是否获取但个元素
:param ele_only: 是否只要ele
:return: ChromiumElement对象或ChromiumFrame对象生成失败返回False
"""
else: # 获取全部
nodes = []
if node_ids:
for node_id in node_ids:
if not node_id:
for obj_id in _ids:
tmp = get_node_func(page, obj_id)
if tmp is False:
return False
node = page.driver.run('DOM.describeNode', nodeId=node_id)
nodes.append(tmp)
return nodes
def _get_node_info(page, id_type, _id):
if not _id:
return False
arg = {id_type: _id}
node = page.driver.run('DOM.describeNode', **arg)
if 'error' in node:
return False
return node
def _get_node_by_obj_id(page, obj_id):
node = _get_node_info(page, 'objectId', obj_id)
if node is False:
return False
if node['node']['nodeName'] in ('#text', '#comment'):
if ele_only:
continue
else:
if single:
return node['node']['nodeValue']
else:
nodes.append(node['node']['nodeValue'])
return _make_ele(page, obj_id, node)
def _get_node_by_node_id(page, node_id):
node = _get_node_info(page, 'nodeId', node_id)
if node is False:
return False
if node['node']['nodeName'] in ('#text', '#comment'):
return node['node']['nodeValue']
else:
obj_id = page.driver.run('DOM.resolveNode', nodeId=node_id)
if 'error' in obj_id:
return False
obj_id = obj_id['object']['objectId']
ele = ChromiumElement(page, obj_id=obj_id, node_id=node_id, backend_id=node['node']['backendNodeId'])
if ele.tag in __FRAME_ELEMENT__:
from .._pages.chromium_frame import ChromiumFrame
ele = ChromiumFrame(page, ele, node)
if single:
return ele
nodes.append(ele)
return _make_ele(page, obj_id, node)
if obj_ids:
for obj_id in obj_ids:
if not obj_id:
return False
node = page.driver.run('DOM.describeNode', objectId=obj_id)
if 'error' in node:
return False
if node['node']['nodeName'] in ('#text', '#comment'):
if ele_only:
continue
else:
if single:
return node['node']['nodeValue']
else:
nodes.append(node['node']['nodeValue'])
def _make_ele(page, obj_id, node):
ele = ChromiumElement(page, obj_id=obj_id, node_id=node['node']['nodeId'],
backend_id=node['node']['backendNodeId'])
if ele.tag in __FRAME_ELEMENT__:
from .._pages.chromium_frame import ChromiumFrame
ele = ChromiumFrame(page, ele, node)
if single:
return ele
nodes.append(ele)
return NoneElement(page) if single and not nodes else nodes
def make_js_for_find_ele_by_xpath(xpath, type_txt, node_txt):
@ -1391,7 +1393,7 @@ else{a.push(e.snapshotItem(i));}}"""
return js
def run_js(page_or_ele, script, as_expr=False, timeout=None, args=None):
def run_js(page_or_ele, script, as_expr, timeout, args=None):
"""运行javascript代码
:param page_or_ele: 页面对象或元素对象
:param script: js文本
@ -1418,6 +1420,7 @@ def run_js(page_or_ele, script, as_expr=False, timeout=None, args=None):
if page.states.has_alert:
raise AlertExistsError
end_time = perf_counter() + timeout
try:
if as_expr:
res = page.run_cdp('Runtime.evaluate', expression=script, returnByValue=False,
@ -1446,18 +1449,17 @@ def run_js(page_or_ele, script, as_expr=False, timeout=None, args=None):
raise JavaScriptError(f'\njavascript运行错误\n{script}\n错误信息: \n{exceptionDetails}')
try:
return parse_js_result(page, page_or_ele, res.get('result'))
return parse_js_result(page, page_or_ele, res.get('result'), end_time)
except Exception:
return res
def parse_js_result(page, ele, result):
def parse_js_result(page, ele, result, end_time):
"""解析js返回的结果"""
if 'unserializableValue' in result:
return result['unserializableValue']
the_type = result['type']
if the_type == 'object':
sub_type = result.get('subtype', None)
if sub_type == 'null':
@ -1470,21 +1472,31 @@ def parse_js_result(page, ele, result):
elif class_name == 'HTMLDocument':
return result
else:
r = make_chromium_ele(page, obj_id=result['objectId'])
r = make_chromium_eles(page, _ids=result['objectId'])
if r is False:
raise ElementLostError
return r
elif sub_type == 'array':
r = page.run_cdp('Runtime.getProperties', objectId=result['objectId'], ownProperties=True)['result']
return [parse_js_result(page, ele, result=i['value']) for i in r[:-1]]
return [parse_js_result(page, ele, result=i['value'], end_time=end_time) for i in r[:-1]]
elif 'objectId' in result and result['className'].lower() == 'object': # dict
r = page.run_cdp('Runtime.getProperties', objectId=result['objectId'], ownProperties=True)['result']
return {i['name']: parse_js_result(page, ele, result=i['value']) for i in r}
return {i['name']: parse_js_result(page, ele, result=i['value'], end_time=end_time) for i in r}
elif 'objectId' in result:
timeout = end_time - perf_counter()
if timeout < 0:
return
js = 'function(){return JSON.stringify(this);}'
r = page.run_cdp('Runtime.callFunctionOn', functionDeclaration=js, objectId=result['objectId'],
returnByValue=False, awaitPromise=True, userGesture=True, _ignore=AlertExistsError,
_timeout=timeout)
return loads(parse_js_result(page, ele, r['result'], end_time))
else:
return result['value']
return result.get('value', result)
elif the_type == 'undefined':
return None

View File

@ -6,7 +6,7 @@
@License : BSD 3-Clause.
"""
from pathlib import Path
from typing import Union, Tuple, List, Any, Literal
from typing import Union, Tuple, List, Any, Literal, Optional
from .none_element import NoneElement
from .._base.base import DrissionElement, BaseElement
@ -47,7 +47,9 @@ class ChromiumElement(DrissionElement):
def __repr__(self) -> str: ...
def __call__(self, loc_or_str: Union[Tuple[str, str], str],
def __call__(self,
loc_or_str: Union[Tuple[str, str], str],
index: int = 1,
timeout: float = None) -> Union[ChromiumElement, NoneElement]: ...
def __eq__(self, other: ChromiumElement) -> bool: ...
@ -175,20 +177,23 @@ class ChromiumElement(DrissionElement):
def ele(self,
loc_or_str: Union[Tuple[str, str], str],
index: int = 1,
timeout: float = None) -> Union[ChromiumElement, NoneElement]: ...
def eles(self,
loc_or_str: Union[Tuple[str, str], str],
timeout: float = None) -> List[ChromiumElement]: ...
def s_ele(self, loc_or_str: Union[Tuple[str, str], str] = None) -> Union[SessionElement, NoneElement]: ...
def s_ele(self,
loc_or_str: Union[Tuple[str, str], str] = None,
index: int = 1) -> Union[SessionElement, NoneElement]: ...
def s_eles(self, loc_or_str: Union[Tuple[str, str], str] = None) -> List[SessionElement]: ...
def _find_elements(self,
loc_or_str: Union[Tuple[str, str], str],
timeout: float = None,
single: bool = True,
index: Optional[int] = 1,
relative: bool = False,
raise_err: bool = False) -> Union[ChromiumElement, ChromiumFrame, NoneElement,
List[Union[ChromiumElement, ChromiumFrame]]]: ...
@ -286,20 +291,28 @@ class ShadowRoot(BaseElement):
def afters(self, filter_loc: Union[tuple, str] = '') -> List[ChromiumElement]: ...
def ele(self, loc_or_str: Union[Tuple[str, str], str],
def ele(self,
loc_or_str: Union[Tuple[str, str], str],
index: int = 1,
timeout: float = None) -> Union[ChromiumElement, NoneElement]: ...
def eles(self, loc_or_str: Union[Tuple[str, str], str],
def eles(self,
loc_or_str: Union[Tuple[str, str], str],
timeout: float = None) -> List[ChromiumElement]: ...
def s_ele(self, loc_or_str: Union[Tuple[str, str], str] = None) -> Union[SessionElement, NoneElement]: ...
def s_ele(self,
loc_or_str: Union[Tuple[str, str], str] = None,
index: int = 1) -> Union[SessionElement, NoneElement]: ...
def s_eles(self, loc_or_str: Union[Tuple[str, str], str]) -> List[SessionElement]: ...
def _find_elements(self, loc_or_str: Union[Tuple[str, str], str], timeout: float = None,
single: bool = True, relative: bool = False, raise_err: bool = None) \
-> Union[ChromiumElement, ChromiumFrame, NoneElement, str, List[Union[ChromiumElement,
ChromiumFrame, str]]]: ...
def _find_elements(self,
loc_or_str: Union[Tuple[str, str], str],
timeout: float = None,
index: Optional[int] = 1,
relative: bool = False,
raise_err: bool = None) -> Union[ChromiumElement, ChromiumFrame, NoneElement, str,
List[Union[ChromiumElement, ChromiumFrame, str]]]: ...
def _get_node_id(self, obj_id: str) -> int: ...
@ -308,40 +321,48 @@ class ShadowRoot(BaseElement):
def _get_backend_id(self, node_id: int) -> int: ...
def find_in_chromium_ele(ele: ChromiumElement, loc: Union[str, Tuple[str, str]],
single: bool = True, timeout: float = None, relative: bool = True) \
-> Union[ChromiumElement, NoneElement, List[ChromiumElement]]: ...
def find_in_chromium_ele(ele: ChromiumElement,
loc: Union[str, Tuple[str, str]],
index: Optional[int] = 1,
timeout: float = None,
relative: bool = True) -> Union[ChromiumElement, NoneElement, List[ChromiumElement]]: ...
def find_by_xpath(ele: ChromiumElement, xpath: str, single: bool, timeout: float,
def find_by_xpath(ele: ChromiumElement,
xpath: str,
index: Optional[int],
timeout: float,
relative: bool = True) -> Union[ChromiumElement, List[ChromiumElement], NoneElement]: ...
def find_by_css(ele: ChromiumElement, selector: str, single: bool,
def find_by_css(ele: ChromiumElement,
selector: str,
index: Optional[int],
timeout: float) -> Union[ChromiumElement, List[ChromiumElement], NoneElement]: ...
def make_chromium_ele(page: Union[ChromiumBase, ChromiumPage, WebPage, ChromiumTab, ChromiumFrame],
node_id: int = ...,
obj_id: str = ...) -> Union[ChromiumElement, ChromiumFrame, str]: ...
def make_chromium_eles(page: Union[ChromiumBase, ChromiumPage, WebPage, ChromiumTab, ChromiumFrame],
node_ids: Union[tuple, list] = None,
obj_ids: Union[tuple, list] = None,
single: bool = True,
ele_only: bool = True) -> Union[ChromiumElement, ChromiumFrame, NoneElement,
_ids: Union[tuple, list, str, int],
index: Optional[int] = 1,
is_obj_id: bool = True
) -> Union[ChromiumElement, ChromiumFrame, NoneElement,
List[Union[ChromiumElement, ChromiumFrame]]]: ...
def make_js_for_find_ele_by_xpath(xpath: str, type_txt: str, node_txt: str) -> str: ...
def run_js(page_or_ele: Union[ChromiumBase, ChromiumElement, ShadowRoot], script: str,
as_expr: bool = False, timeout: float = None, args: tuple = ...) -> Any: ...
def run_js(page_or_ele: Union[ChromiumBase, ChromiumElement, ShadowRoot],
script: str,
as_expr: bool,
timeout: float,
args: tuple = ...) -> Any: ...
def parse_js_result(page: ChromiumBase, ele: ChromiumElement, result: dict): ...
def parse_js_result(page: ChromiumBase,
ele: ChromiumElement,
result: dict,
end_time: float): ...
def convert_argument(arg: Any) -> dict: ...

View File

@ -118,7 +118,7 @@ class SessionElement(DrissionElement):
return super().next(index, filter_loc, timeout, ele_only=ele_only)
def before(self, filter_loc='', index=1, timeout=None, ele_only=True):
"""返回文档中当前元素前面符合条件的一个元素,可用查询语法筛选,可指定返回筛选结果的第几个
"""返回文档中当前元素前面符合条件的一个元素,可用查询语法筛选,可指定返回筛选结果的第几个
查找范围不限同级元素而是整个DOM文档
:param filter_loc: 用于筛选的查询语法
:param index: 前面第几个查询结果1开始
@ -129,7 +129,7 @@ class SessionElement(DrissionElement):
return super().before(index, filter_loc, timeout, ele_only=ele_only)
def after(self, filter_loc='', index=1, timeout=None, ele_only=True):
"""返回文档中此当前元素后面符合条件的一个元素,可用查询语法筛选,可指定返回筛选结果的第几个
"""返回文档中此当前元素后面符合条件的一个元素,可用查询语法筛选,可指定返回筛选结果的第几个
查找范围不限同级元素而是整个DOM文档
:param filter_loc: 用于筛选的查询语法
:param index: 第几个查询结果1开始
@ -219,13 +219,14 @@ class SessionElement(DrissionElement):
else:
return self.inner_ele.get(attr)
def ele(self, loc_or_str, timeout=None):
"""返回当前元素下级符合条件的一个元素、属性或节点文本
def ele(self, loc_or_str, index=1, timeout=None):
"""返回当前元素下级符合条件的一个元素、属性或节点文本
:param loc_or_str: 元素的定位信息可以是loc元组或查询字符串
:param timeout: 不起实际作用用于和DriverElement对应便于无差别调用
:param index: 第几个元素从1开始可传入负数获取倒数第几个
:param timeout: 不起实际作用
:return: SessionElement对象或属性文本
"""
return self._ele(loc_or_str, method='ele()')
return self._ele(loc_or_str, index=index, method='ele()')
def eles(self, loc_or_str, timeout=None):
"""返回当前元素下级所有符合条件的子元素、属性或节点文本
@ -233,32 +234,33 @@ class SessionElement(DrissionElement):
:param timeout: 不起实际作用用于和DriverElement对应便于无差别调用
:return: SessionElement对象或属性文本组成的列表
"""
return self._ele(loc_or_str, single=False)
return self._ele(loc_or_str, index=None)
def s_ele(self, loc_or_str=None):
"""返回当前元素下级符合条件的一个元素、属性或节点文本
def s_ele(self, loc_or_str=None, index=1):
"""返回当前元素下级符合条件的一个元素、属性或节点文本
:param loc_or_str: 元素的定位信息可以是loc元组或查询字符串
:param index: 获取第几个从1开始可传入负数获取倒数第几个
:return: SessionElement对象或属性文本
"""
return self._ele(loc_or_str, method='s_ele()')
return self._ele(loc_or_str, index=index, method='s_ele()')
def s_eles(self, loc_or_str):
"""返回当前元素下级所有符合条件的子元素、属性或节点文本
:param loc_or_str: 元素的定位信息可以是loc元组或查询字符串
:return: SessionElement对象或属性文本组成的列表
"""
return self._ele(loc_or_str, single=False)
return self._ele(loc_or_str, index=None)
def _find_elements(self, loc_or_str, timeout=None, single=True, relative=False, raise_err=None):
"""返回当前元素下级符合条件的子元素、属性或节点文本,默认返回第一个
def _find_elements(self, loc_or_str, timeout=None, index=1, relative=False, raise_err=None):
"""返回当前元素下级符合条件的子元素、属性或节点文本
:param loc_or_str: 元素的定位信息可以是loc元组或查询字符串
:param timeout: 不起实际作用用于和父类对应
:param single: True则返回第一个False则返回全部
:param index: 第几个结果从1开始可传入负数获取倒数第几个为None返回所有
:param relative: WebPage用的表示是否相对定位的参数
:param raise_err: 找不到元素是是否抛出异常为None时根据全局设置
:return: SessionElement对象
"""
return make_session_ele(self, loc_or_str, single)
return make_session_ele(self, loc_or_str, index=index)
def _get_ele_path(self, mode):
"""获取css路径或xpath路径
@ -281,19 +283,18 @@ class SessionElement(DrissionElement):
return f'{path_str[1:]}' if mode == 'css' else path_str
def make_session_ele(html_or_ele, loc=None, single=True):
def make_session_ele(html_or_ele, loc=None, index=1):
"""从接收到的对象或html文本中查找元素返回SessionElement对象
如要直接从html生成SessionElement而不在下级查找loc输入None即可
:param html_or_ele: html文本BaseParser对象
:param loc: 定位元组或字符串为None时不在下级查找返回根元素
:param single: True则返回第一个False则返回全部
:param index: 获取第几个元素从1开始可传入负数获取倒数第几个None获取所有
:return: 返回SessionElement元素或列表或属性文本
"""
# ---------------处理定位符---------------
if not loc:
if isinstance(html_or_ele, SessionElement):
return html_or_ele if single else [html_or_ele]
return html_or_ele
loc = ('xpath', '.')
elif isinstance(loc, (str, tuple)):
@ -368,16 +369,25 @@ def make_session_ele(html_or_ele, loc=None, single=True):
# ---------------执行查找-----------------
try:
if loc[0] == 'xpath': # 用lxml内置方法获取lxml的元素对象列表
ele = html_or_ele.xpath(loc[1])
eles = html_or_ele.xpath(loc[1])
else: # 用css selector获取元素对象列表
ele = html_or_ele.cssselect(loc[1])
eles = html_or_ele.cssselect(loc[1])
if not isinstance(ele, list): # 结果不是列表,如数字
return ele
if not isinstance(eles, list): # 结果不是列表,如数字
return eles
# 把lxml元素对象包装成SessionElement对象并按需要返回第一个或全部
if single:
ele = ele[0] if ele else None
# 把lxml元素对象包装成SessionElement对象并按需要返回一个或全部
if index is None:
return [SessionElement(e, page) if isinstance(e, HtmlElement) else e for e in eles if e != '\n']
else:
eles_count = len(eles)
if eles_count == 0 or abs(index) > eles_count:
return NoneElement(page)
if index < 0:
index = eles_count + index + 1
ele = eles[index - 1]
if isinstance(ele, HtmlElement):
return SessionElement(ele, page)
elif isinstance(ele, str):
@ -385,9 +395,6 @@ def make_session_ele(html_or_ele, loc=None, single=True):
else:
return NoneElement(page)
else: # 返回全部
return [SessionElement(e, page) if isinstance(e, HtmlElement) else e for e in ele if e != '\n']
except Exception as e:
if 'Invalid expression' in str(e):
raise SyntaxError(f'无效的xpath语句{loc}')

View File

@ -30,6 +30,7 @@ class SessionElement(DrissionElement):
def __call__(self,
loc_or_str: Union[Tuple[str, str], str],
index: int = 1,
timeout: float = None) -> Union[SessionElement, NoneElement]: ...
def __eq__(self, other: SessionElement) -> bool: ...
@ -115,6 +116,7 @@ class SessionElement(DrissionElement):
def ele(self,
loc_or_str: Union[Tuple[str, str], str],
index: int = 1,
timeout: float = None) -> Union[SessionElement, NoneElement]: ...
def eles(self,
@ -122,18 +124,17 @@ class SessionElement(DrissionElement):
timeout: float = None) -> List[SessionElement]: ...
def s_ele(self,
loc_or_str: Union[Tuple[str, str], str] = None) -> Union[SessionElement, NoneElement]: ...
loc_or_str: Union[Tuple[str, str], str] = None,
index: int = 1) -> Union[SessionElement, NoneElement]: ...
def s_eles(self,
loc_or_str: Union[Tuple[str, str], str]) -> List[SessionElement]: ...
def s_eles(self, loc_or_str: Union[Tuple[str, str], str]) -> List[SessionElement]: ...
def _find_elements(self,
loc_or_str: Union[Tuple[str, str], str],
timeout: float = None,
single: bool = True,
index: Optional[int] = 1,
relative: bool = False,
raise_err: bool = None) \
-> Union[SessionElement, NoneElement, List[SessionElement]]: ...
raise_err: bool = None) -> Union[SessionElement, NoneElement, List[SessionElement]]: ...
def _get_ele_path(self, mode: str) -> str: ...
@ -141,5 +142,4 @@ class SessionElement(DrissionElement):
def make_session_ele(html_or_ele: Union[str, SessionElement, SessionPage, ChromiumElement, BaseElement, ChromiumFrame,
ChromiumBase],
loc: Union[str, Tuple[str, str]] = None,
single: bool = True) -> Union[
SessionElement, NoneElement, List[SessionElement]]: ...
index: Optional[int] = 1) -> Union[SessionElement, NoneElement, List[SessionElement]]: ...

View File

@ -11,3 +11,4 @@ class Settings(object):
raise_when_ele_not_found = False
raise_when_click_failed = False
raise_when_wait_failed = False
singleton_tab_obj = True

View File

@ -124,7 +124,7 @@ class ChromiumBase(BasePage):
:return: None
"""
self._is_loading = True
self._driver = self.browser._get_driver(tab_id)
self._driver = self.browser._get_driver(tab_id, self)
self._alert = Alert()
self._driver.set_callback('Page.javascriptDialogOpening', self._on_alert_open, immediate=True)
@ -244,14 +244,15 @@ class ChromiumBase(BasePage):
self.run_cdp('Page.setInterceptFileChooserDialog', enabled=False)
self._upload_list = None
def __call__(self, loc_or_str, timeout=None):
def __call__(self, loc_or_str, index=1, timeout=None):
"""在内部查找元素
ele = page('@id=ele_id')
:param loc_or_str: 元素的定位信息可以是loc元组或查询字符串
:param index: 获取第几个元素从1开始可传入负数获取倒数第几个
:param timeout: 超时时间
:return: ChromiumElement对象
"""
return self.ele(loc_or_str, timeout)
return self.ele(loc_or_str, index, timeout)
def _wait_to_stop(self):
"""eager策略超时时使页面停止加载"""
@ -289,7 +290,7 @@ class ChromiumBase(BasePage):
"""返回用于执行动作链的对象"""
if self._actions is None:
self._actions = Actions(self)
self.wait.load_complete()
self.wait.doc_loaded()
return self._actions
@property
@ -309,7 +310,7 @@ class ChromiumBase(BasePage):
@property
def scroll(self):
"""返回用于滚动滚动条的对象"""
self.wait.load_complete()
self.wait.doc_loaded()
if self._scroll is None:
self._scroll = PageScroller(self)
return self._scroll
@ -317,7 +318,7 @@ class ChromiumBase(BasePage):
@property
def rect(self):
"""返回获取窗口坐标和大小的对象"""
# self.wait.load_complete()
# self.wait.doc_loaded()
if self._rect is None:
self._rect = TabRect(self)
return self._rect
@ -358,7 +359,7 @@ class ChromiumBase(BasePage):
@property
def html(self):
"""返回当前页面html文本"""
self.wait.load_complete()
self.wait.doc_loaded()
return self.run_cdp('DOM.getOuterHTML', objectId=self._root_id)['outerHTML']
@property
@ -425,7 +426,7 @@ class ChromiumBase(BasePage):
:param cmd_args: 参数
:return: 执行的结果
"""
self.wait.load_complete()
self.wait.doc_loaded()
return self.run_cdp(cmd, **cmd_args)
def run_js(self, script, *args, as_expr=False, timeout=None):
@ -446,7 +447,7 @@ class ChromiumBase(BasePage):
:param timeout: js超时时间为None则使用页面timeouts.script属性值
:return: 运行的结果
"""
self.wait.load_complete()
self.wait.doc_loaded()
return run_js(self, script, as_expr, self.timeouts.script if timeout is None else timeout, args)
def run_async_js(self, script, *args, as_expr=False):
@ -490,13 +491,14 @@ class ChromiumBase(BasePage):
return [{'name': cookie['name'], 'value': cookie['value'], 'domain': cookie['domain']}
for cookie in cookies]
def ele(self, loc_or_ele, timeout=None):
"""获取一个符合条件的元素对象
def ele(self, loc_or_ele, index=1, timeout=None):
"""获取一个符合条件的元素对象
:param loc_or_ele: 定位符或元素对象
:param index: 获取第几个元素从1开始可传入负数获取倒数第几个
:param timeout: 查找超时时间
:return: ChromiumElement对象
"""
return self._ele(loc_or_ele, timeout=timeout, method='ele()')
return self._ele(loc_or_ele, timeout=timeout, index=index, method='ele()')
def eles(self, loc_or_str, timeout=None):
"""获取所有符合条件的元素对象
@ -504,14 +506,15 @@ class ChromiumBase(BasePage):
:param timeout: 查找超时时间
:return: ChromiumElement对象组成的列表
"""
return self._ele(loc_or_str, timeout=timeout, single=False)
return self._ele(loc_or_str, timeout=timeout, index=None)
def s_ele(self, loc_or_ele=None):
"""查找一个符合条件的元素以SessionElement形式返回处理复杂页面时效率很高
def s_ele(self, loc_or_ele=None, index=1):
"""查找一个符合条件的元素以SessionElement形式返回处理复杂页面时效率很高
:param loc_or_ele: 元素的定位信息可以是loc元组或查询字符串
:param index: 获取第几个从1开始可传入负数获取倒数第几个
:return: SessionElement对象或属性文本
"""
r = make_session_ele(self, loc_or_ele)
r = make_session_ele(self, loc_or_ele, index=index)
if isinstance(r, NoneElement):
if Settings.raise_when_ele_not_found:
raise ElementNotFoundError(None, 's_ele()', {'loc_or_ele': loc_or_ele})
@ -525,13 +528,13 @@ class ChromiumBase(BasePage):
:param loc_or_str: 元素的定位信息可以是loc元组或查询字符串
:return: SessionElement对象组成的列表
"""
return make_session_ele(self, loc_or_str, single=False)
return make_session_ele(self, loc_or_str, index=None)
def _find_elements(self, loc_or_ele, timeout=None, single=True, relative=False, raise_err=None):
def _find_elements(self, loc_or_ele, timeout=None, index=1, relative=False, raise_err=None):
"""执行元素查找
:param loc_or_ele: 定位符或元素对象
:param timeout: 查找超时时间
:param single: 是否只返回第一个
:param index: 第几个结果从1开始可传入负数获取倒数第几个为None返回所有
:param relative: WebPage用的表示是否相对定位的参数
:param raise_err: 找不到元素是是否抛出异常为None时根据全局设置
:return: ChromiumElement对象或元素对象组成的列表
@ -543,7 +546,7 @@ class ChromiumBase(BasePage):
else:
raise ValueError('loc_or_str参数只能是tuple、str、ChromiumElement类型。')
self.wait.load_complete()
self.wait.doc_loaded()
timeout = timeout if timeout is not None else self.timeout
end_time = perf_counter() + timeout
@ -558,16 +561,28 @@ class ChromiumBase(BasePage):
while True:
if num > 0:
num = 1 if single else num
nIds = self._driver.run('DOM.getSearchResults', searchId=result['searchId'], fromIndex=0, toIndex=num)
from_index = index_arg = 0
if index is None:
end_index = num
index_arg = None
elif index < 0:
from_index = index + num
end_index = from_index + 1
else:
from_index = index - 1
end_index = from_index + 1
if from_index <= num - 1:
nIds = self._driver.run('DOM.getSearchResults', searchId=result['searchId'],
fromIndex=from_index, toIndex=end_index)
if __ERROR__ not in nIds:
if nIds['nodeIds'][0] != 0:
r = make_chromium_eles(self, node_ids=nIds['nodeIds'], single=single)
r = make_chromium_eles(self, _ids=nIds['nodeIds'], index=index_arg, is_obj_id=False)
if r is not False:
break
if perf_counter() >= end_time:
return NoneElement(self) if single else []
return NoneElement(self) if index is not None else []
sleep(.1)
timeout = end_time - perf_counter()
@ -653,8 +668,8 @@ class ChromiumBase(BasePage):
self.run_cdp('DOM.removeNode', nodeId=ele._node_id)
def get_frame(self, loc_ind_ele, timeout=None):
"""获取页面中一个frame对象可传入定位符、iframe序号、ChromiumFrame对象序号从0开始
:param loc_ind_ele: 定位符iframe序号ChromiumFrame对象
"""获取页面中一个frame对象
:param loc_ind_ele: 定位符iframe序号ChromiumFrame对象序号从1开始可传入负数获取倒数第几个
:param timeout: 查找元素超时时间
:return: ChromiumFrame对象
"""
@ -676,9 +691,11 @@ class ChromiumBase(BasePage):
r = ele
elif isinstance(loc_ind_ele, int):
if loc_ind_ele < 0:
raise ValueError('序号必须大于等于0。')
xpath = f'xpath:(//*[name()="frame" or name()="iframe"])[{loc_ind_ele + 1}]'
if loc_ind_ele == 0:
loc_ind_ele = 1
elif loc_ind_ele < 0:
loc_ind_ele = f'last()+{loc_ind_ele}+1'
xpath = f'xpath:(//*[name()="frame" or name()="iframe"])[{loc_ind_ele}]'
r = self._ele(xpath, timeout=timeout)
elif str(type(loc_ind_ele)).endswith(".ChromiumFrame'>"):
@ -699,7 +716,7 @@ class ChromiumBase(BasePage):
:return: ChromiumFrame对象组成的列表
"""
loc = loc or 'xpath://*[name()="iframe" or name()="frame"]'
frames = self._ele(loc, timeout=timeout, single=False, raise_err=False)
frames = self._ele(loc, timeout=timeout, index=None, raise_err=False)
return [i for i in frames if str(type(i)).endswith(".ChromiumFrame'>")]
def get_session_storage(self, item=None):

View File

@ -93,7 +93,9 @@ class ChromiumBase(BasePage):
def _d_set_runtime_settings(self) -> None: ...
def __call__(self, loc_or_str: Union[Tuple[str, str], str, ChromiumElement],
def __call__(self,
loc_or_str: Union[Tuple[str, str], str, ChromiumElement],
index: int = 1,
timeout: float = None) -> Union[ChromiumElement, NoneElement]: ...
@property
@ -177,19 +179,27 @@ class ChromiumBase(BasePage):
def get_cookies(self, as_dict: bool = False, all_domains: bool = False,
all_info: bool = False) -> Union[list, dict]: ...
def ele(self, loc_or_ele: Union[Tuple[str, str], str, ChromiumElement, ChromiumFrame],
def ele(self,
loc_or_ele: Union[Tuple[str, str], str, ChromiumElement, ChromiumFrame],
index: int = 1,
timeout: float = None) -> Union[ChromiumElement, NoneElement]: ...
def eles(self, loc_or_str: Union[Tuple[str, str], str],
def eles(self,
loc_or_str: Union[Tuple[str, str], str],
timeout: float = None) -> List[ChromiumElement]: ...
def s_ele(self, loc_or_ele: Union[Tuple[str, str], str] = None) \
-> Union[SessionElement, NoneElement]: ...
def s_ele(self,
loc_or_ele: Union[Tuple[str, str], str] = None,
index:int = 1) -> Union[SessionElement, NoneElement]: ...
def s_eles(self, loc_or_str: Union[Tuple[str, str], str]) -> List[SessionElement]: ...
def _find_elements(self, loc_or_ele: Union[Tuple[str, str], str, ChromiumElement, ChromiumFrame],
timeout: float = None, single: bool = True, relative: bool = False, raise_err: bool = None) \
def _find_elements(self,
loc_or_ele: Union[Tuple[str, str], str, ChromiumElement, ChromiumFrame],
timeout: float = None,
index: Optional[int] = 1,
relative: bool = False,
raise_err: bool = None) \
-> Union[ChromiumElement, ChromiumFrame, NoneElement, List[Union[ChromiumElement, ChromiumFrame]]]: ...
def refresh(self, ignore_cache: bool = False) -> None: ...
@ -279,4 +289,4 @@ def get_mhtml(page: Union[ChromiumPage, ChromiumTab],
def get_pdf(page: Union[ChromiumPage, ChromiumTab],
path: Union[str, Path] = None,
name: str = None, kwargs: dict=None) -> bytes: ...
name: str = None, kwargs: dict = None) -> bytes: ...

View File

@ -58,20 +58,21 @@ class ChromiumFrame(ChromiumBase):
self.doc_ele = ChromiumElement(self, obj_id=obj_id)
self._rect = None
end_time = perf_counter() + 5
end_time = perf_counter() + 2
while perf_counter() < end_time:
if self.url not in (None, 'about:blank'):
break
sleep(.1)
def __call__(self, loc_or_str, timeout=None):
def __call__(self, loc_or_str, index=1, timeout=None):
"""在内部查找元素
ele2 = ele1('@id=ele_id')
:param loc_or_str: 元素的定位信息可以是loc元组或查询字符串
:param index: 获取第几个从1开始可传入负数获取倒数第几个
:param timeout: 超时时间
:return: ChromiumElement对象或属性文本
"""
return self.ele(loc_or_str, timeout)
return self.ele(loc_or_str, index=index, timeout=timeout)
def __eq__(self, other):
return self._frame_id == getattr(other, '_frame_id', None)
@ -211,7 +212,7 @@ class ChromiumFrame(ChromiumBase):
@property
def scroll(self):
"""返回用于滚动的对象"""
self.wait.load_complete()
self.wait.doc_loaded()
if self._scroll is None:
self._scroll = FrameScroller(self)
return self._scroll
@ -388,8 +389,8 @@ class ChromiumFrame(ChromiumBase):
def parent(self, level_or_loc=1, index=1):
"""返回上面某一级父元素,可指定层数或用查询语法定位
:param level_or_loc: 第几级父元素或定位符
:param index: 当level_or_loc传入定位符使用此参数选择第几个结果
:param level_or_loc: 第几级父元素1开始或定位符
:param index: 当level_or_loc传入定位符使用此参数选择第几个结果1开始
:return: 上级元素对象
"""
return self.frame_ele.parent(level_or_loc, index)
@ -415,7 +416,7 @@ class ChromiumFrame(ChromiumBase):
return self.frame_ele.next(filter_loc, index, timeout, ele_only=ele_only)
def before(self, filter_loc='', index=1, timeout=None, ele_only=True):
"""返回文档中当前元素前面符合条件的一个元素,可用查询语法筛选,可指定返回筛选结果的第几个
"""返回文档中当前元素前面符合条件的一个元素,可用查询语法筛选,可指定返回筛选结果的第几个
查找范围不限同级元素而是整个DOM文档
:param filter_loc: 用于筛选的查询语法
:param index: 前面第几个查询结果1开始
@ -426,7 +427,7 @@ class ChromiumFrame(ChromiumBase):
return self.frame_ele.before(filter_loc, index, timeout, ele_only=ele_only)
def after(self, filter_loc='', index=1, timeout=None, ele_only=True):
"""返回文档中此当前元素后面符合条件的一个元素,可用查询语法筛选,可指定返回筛选结果的第几个
"""返回文档中此当前元素后面符合条件的一个元素,可用查询语法筛选,可指定返回筛选结果的第几个
查找范围不限同级元素而是整个DOM文档
:param filter_loc: 用于筛选的查询语法
:param index: 后面第几个查询结果1开始
@ -561,20 +562,20 @@ class ChromiumFrame(ChromiumBase):
self.tab.remove_ele(new_ele)
return r
def _find_elements(self, loc_or_ele, timeout=None, single=True, relative=False, raise_err=None):
def _find_elements(self, loc_or_ele, timeout=None, index=1, relative=False, raise_err=None):
"""在frame内查找单个元素
:param loc_or_ele: 定位符或元素对象
:param timeout: 查找超时时间
:param single: True则返回第一个False则返回全部
:param index: 第几个结果从1开始可传入负数获取倒数第几个为None返回所有
:param relative: WebPage用的表示是否相对定位的参数
:param raise_err: 找不到元素是是否抛出异常为None时根据全局设置
:return: ChromiumElement对象
"""
if isinstance(loc_or_ele, ChromiumElement):
return loc_or_ele
self.wait.load_complete()
return self.doc_ele._ele(loc_or_ele, timeout,
raise_err=raise_err) if single else self.doc_ele.eles(loc_or_ele, timeout)
self.wait.doc_loaded()
return self.doc_ele._ele(loc_or_ele, index=index, timeout=timeout,
raise_err=raise_err) if index is not None else self.doc_ele.eles(loc_or_ele, timeout)
def _is_inner_frame(self):
"""返回当前frame是否同域"""

View File

@ -6,7 +6,7 @@
@License : BSD 3-Clause.
"""
from pathlib import Path
from typing import Union, Tuple, List, Any
from typing import Union, Tuple, List, Any, Optional
from .chromium_base import ChromiumBase
from .chromium_page import ChromiumPage
@ -44,6 +44,7 @@ class ChromiumFrame(ChromiumBase):
def __call__(self,
loc_or_str: Union[Tuple[str, str], str],
index: int = 1,
timeout: float = None) -> Union[ChromiumElement, NoneElement]: ...
def __eq__(self, other: ChromiumFrame) -> bool: ...
@ -209,7 +210,7 @@ class ChromiumFrame(ChromiumBase):
def _find_elements(self,
loc_or_ele: Union[Tuple[str, str], str, ChromiumElement, ChromiumFrame],
timeout: float = None,
single: bool = True,
index: Optional[int] = 1,
relative: bool = False,
raise_err: bool = None) \
-> Union[ChromiumElement, ChromiumFrame, None, List[Union[ChromiumElement, ChromiumFrame]]]: ...

View File

@ -22,6 +22,26 @@ from ..errors import BrowserConnectError
class ChromiumPage(ChromiumBase):
"""用于管理浏览器的类"""
PAGES = {}
def __new__(cls, addr_or_opts=None, tab_id=None, timeout=None, addr_driver_opts=None):
"""
:param addr_or_opts: 浏览器地址:端口ChromiumOptions对象或端口数字int
:param tab_id: 要控制的标签页id不指定默认为激活的
:param timeout: 超时时间
"""
addr_or_opts = addr_or_opts or addr_driver_opts
opt = handle_options(addr_or_opts)
is_exist, browser_id = run_browser(opt)
if browser_id in cls.PAGES:
return cls.PAGES[browser_id]
r = object.__new__(cls)
r._chromium_options = opt
r._is_exist = is_exist
r._browser_id = browser_id
r.address = opt.address
cls.PAGES[browser_id] = r
return r
def __init__(self, addr_or_opts=None, tab_id=None, timeout=None, addr_driver_opts=None):
"""
@ -29,58 +49,20 @@ class ChromiumPage(ChromiumBase):
:param tab_id: 要控制的标签页id不指定默认为激活的
:param timeout: 超时时间
"""
addr_or_opts = addr_or_opts or addr_driver_opts
if hasattr(self, '_created'):
return
self._created = True
self._page = self
address = self._handle_options(addr_or_opts)
self._run_browser()
super().__init__(address, tab_id)
super().__init__(self.address, tab_id)
self.set.timeouts(base=timeout)
self._page_init()
def _handle_options(self, addr_or_opts):
"""设置浏览器启动属性
:param addr_or_opts: 'ip:port'ChromiumOptionsDriver
:return: 返回浏览器地址
"""
if not addr_or_opts:
self._chromium_options = ChromiumOptions(addr_or_opts)
elif isinstance(addr_or_opts, ChromiumOptions):
if addr_or_opts.is_auto_port:
port, path = PortFinder(addr_or_opts.tmp_path).get_port()
addr_or_opts.set_address(f'127.0.0.1:{port}')
addr_or_opts.set_user_data_path(path)
addr_or_opts.auto_port()
self._chromium_options = addr_or_opts
elif isinstance(addr_or_opts, str):
self._chromium_options = ChromiumOptions()
self._chromium_options.set_address(addr_or_opts)
elif isinstance(addr_or_opts, int):
self._chromium_options = ChromiumOptions()
self._chromium_options.set_local_port(addr_or_opts)
else:
raise TypeError('只能接收ip:port格式或ChromiumOptions类型参数。')
return self._chromium_options.address
def _run_browser(self):
"""连接浏览器"""
is_exist = connect_browser(self._chromium_options)
try:
ws = get(f'http://{self._chromium_options.address}/json/version', headers={'Connection': 'close'})
if not ws:
raise BrowserConnectError('\n浏览器连接失败如使用全局代理须设置不代理127.0.0.1地址。')
ws = ws.json()['webSocketDebuggerUrl'].split('/')[-1]
except KeyError:
raise BrowserConnectError('浏览器版本太旧,请升级。')
except:
raise BrowserConnectError('\n浏览器连接失败如使用全局代理须设置不代理127.0.0.1地址。')
self._browser = Browser(self._chromium_options.address, ws, self)
if (is_exist and self._chromium_options._headless is False and
self._browser = Browser(self._chromium_options.address, self._browser_id, self)
if (self._is_exist and self._chromium_options._headless is False and
'headless' in self._browser.run_cdp('Browser.getVersion')['userAgent'].lower()):
self._browser.quit(3)
connect_browser(self._chromium_options)
@ -156,17 +138,17 @@ class ChromiumPage(ChromiumBase):
:param kwargs: pdf生成参数
:return: as_pdf为True时返回bytes否则返回文件文本
"""
return get_pdf(self, path, name, kwargs)if as_pdf else get_mhtml(self, path, name)
return get_pdf(self, path, name, kwargs) if as_pdf else get_mhtml(self, path, name)
def get_tab(self, id_or_num=None):
"""获取一个标签页对象
:param id_or_num: 要获取的标签页id或序号为None时获取当前tab序号不是视觉排列顺序而是激活顺序
:param id_or_num: 要获取的标签页id或序号为None时获取当前tab序号从1开始可传入负数获取倒数第几个不是视觉排列顺序而是激活顺序
:return: 标签页对象
"""
if isinstance(id_or_num, str):
return ChromiumTab(self, id_or_num)
elif isinstance(id_or_num, int):
return ChromiumTab(self, self.tabs[id_or_num])
return ChromiumTab(self, self.tabs[id_or_num - 1 if id_or_num > 0 else id_or_num])
elif id_or_num is None:
return ChromiumTab(self, self.tab_id)
elif isinstance(id_or_num, ChromiumTab):
@ -263,6 +245,10 @@ class ChromiumPage(ChromiumBase):
"""
self.browser.quit(timeout, force)
def _on_disconnect(self):
"""浏览器退出时执行"""
ChromiumPage.PAGES.pop(self._browser_id, None)
def __repr__(self):
return f'<ChromiumPage browser_id={self.browser.id} tab_id={self.tab_id}>'
@ -275,6 +261,51 @@ class ChromiumPage(ChromiumBase):
self.close_tabs(tabs_or_ids, True)
def handle_options(addr_or_opts):
"""设置浏览器启动属性
:param addr_or_opts: 'ip:port'ChromiumOptionsDriver
:return: 返回ChromiumOptions对象
"""
if not addr_or_opts:
_chromium_options = ChromiumOptions(addr_or_opts)
elif isinstance(addr_or_opts, ChromiumOptions):
if addr_or_opts.is_auto_port:
port, path = PortFinder(addr_or_opts.tmp_path).get_port()
addr_or_opts.set_address(f'127.0.0.1:{port}')
addr_or_opts.set_user_data_path(path)
addr_or_opts.auto_port()
_chromium_options = addr_or_opts
elif isinstance(addr_or_opts, str):
_chromium_options = ChromiumOptions()
_chromium_options.set_address(addr_or_opts)
elif isinstance(addr_or_opts, int):
_chromium_options = ChromiumOptions()
_chromium_options.set_local_port(addr_or_opts)
else:
raise TypeError('只能接收ip:port格式或ChromiumOptions类型参数。')
return _chromium_options
def run_browser(chromium_options):
"""连接浏览器"""
is_exist = connect_browser(chromium_options)
try:
ws = get(f'http://{chromium_options.address}/json/version', headers={'Connection': 'close'})
if not ws:
raise BrowserConnectError('\n浏览器连接失败如使用全局代理须设置不代理127.0.0.1地址。')
browser_id = ws.json()['webSocketDebuggerUrl'].split('/')[-1]
except KeyError:
raise BrowserConnectError('浏览器版本太旧,请升级。')
except:
raise BrowserConnectError('\n浏览器连接失败如使用全局代理须设置不代理127.0.0.1地址。')
return is_exist, browser_id
def get_rename(original, rename):
if '.' in rename:
return rename

View File

@ -18,6 +18,12 @@ from .._units.waiter import PageWaiter
class ChromiumPage(ChromiumBase):
PAGES: dict = ...
def __new__(cls,
addr_or_opts: Union[str, int, ChromiumOptions] = None,
tab_id: str = None,
timeout: float = None): ...
def __init__(self,
addr_or_opts: Union[str, int, ChromiumOptions] = None,
@ -25,7 +31,9 @@ class ChromiumPage(ChromiumBase):
timeout: float = None):
self._chromium_options: ChromiumOptions = ...
self._browser: Browser = ...
self._browser_id: str = ...
self._rect: Optional[TabRect] = ...
self._is_exist: bool = ...
def _handle_options(self, addr_or_opts: Union[str, ChromiumOptions]) -> str: ...
@ -95,5 +103,13 @@ class ChromiumPage(ChromiumBase):
def quit(self, timeout: float = 5, force: bool = True) -> None: ...
def _on_disconnect(self) -> None: ...
def handle_options(addr_or_opts): ...
def run_browser(chromium_options): ...
def get_rename(original: str, rename: str) -> str: ...

View File

@ -9,6 +9,7 @@ from copy import copy
from .._base.base import BasePage
from .._configs.session_options import SessionOptions
from .._functions.settings import Settings
from .._functions.web import set_session_cookies, set_browser_cookies
from .._pages.chromium_base import ChromiumBase, get_mhtml, get_pdf
from .._pages.session_page import SessionPage
@ -18,12 +19,28 @@ from .._units.waiter import TabWaiter
class ChromiumTab(ChromiumBase):
"""实现浏览器标签页的类"""
TABS = {}
def __init__(self, page, tab_id=None):
def __new__(cls, page, tab_id):
"""
:param page: ChromiumPage对象
:param tab_id: 要控制的标签页id不指定默认为激活的
:param tab_id: 要控制的标签页id
"""
if Settings.singleton_tab_obj and tab_id in cls.TABS:
return cls.TABS[tab_id]
r = object.__new__(cls)
cls.TABS[tab_id] = r
return r
def __init__(self, page, tab_id):
"""
:param page: ChromiumPage对象
:param tab_id: 要控制的标签页id
"""
if Settings.singleton_tab_obj and hasattr(self, '_created'):
return
self._created = True
self._page = page
self._browser = page.browser
super().__init__(page.address, tab_id, page.timeout)
@ -73,6 +90,9 @@ class ChromiumTab(ChromiumBase):
def __repr__(self):
return f'<ChromiumTab browser_id={self.browser.id} tab_id={self.tab_id}>'
def _on_disconnect(self):
ChromiumTab.TABS.pop(self.tab_id, None)
class WebPageTab(SessionPage, ChromiumTab, BasePage):
def __init__(self, page, tab_id):
@ -87,17 +107,18 @@ class WebPageTab(SessionPage, ChromiumTab, BasePage):
page._headers))
super(SessionPage, self).__init__(page=page, tab_id=tab_id)
def __call__(self, loc_or_str, timeout=None):
def __call__(self, loc_or_str, index=1, timeout=None):
"""在内部查找元素
ele = page('@id=ele_id')
:param loc_or_str: 元素的定位信息可以是loc元组或查询字符串
:param index: 获取第几个从1开始可传入负数获取倒数第几个
:param timeout: 超时时间
:return: 子元素对象
"""
if self._mode == 'd':
return super(SessionPage, self).__call__(loc_or_str, timeout)
return super(SessionPage, self).__call__(loc_or_str, index=index, timeout=timeout)
elif self._mode == 's':
return super().__call__(loc_or_str)
return super().__call__(loc_or_str, index=index)
@property
def set(self):
@ -231,16 +252,17 @@ class WebPageTab(SessionPage, ChromiumTab, BasePage):
return self.response
return super().post(url, show_errmsg, retry, interval, **kwargs)
def ele(self, loc_or_ele, timeout=None):
def ele(self, loc_or_ele, index=1, timeout=None):
"""返回第一个符合条件的元素、属性或节点文本
:param loc_or_ele: 元素的定位信息可以是元素对象loc元组或查询字符串
:param index: 获取第几个从1开始可传入负数获取倒数第几个
:param timeout: 查找元素超时时间默认与页面等待时间一致
:return: 元素对象或属性文本节点文本
"""
if self._mode == 's':
return super().ele(loc_or_ele)
return super().ele(loc_or_ele, index=index)
elif self._mode == 'd':
return super(SessionPage, self).ele(loc_or_ele, timeout=timeout)
return super(SessionPage, self).ele(loc_or_ele, index=index, timeout=timeout)
def eles(self, loc_or_str, timeout=None):
"""返回页面中所有符合条件的元素、属性或节点文本
@ -253,15 +275,16 @@ class WebPageTab(SessionPage, ChromiumTab, BasePage):
elif self._mode == 'd':
return super(SessionPage, self).eles(loc_or_str, timeout=timeout)
def s_ele(self, loc_or_ele=None):
def s_ele(self, loc_or_ele=None, index=1):
"""查找第一个符合条件的元素以SessionElement形式返回d模式处理复杂页面时效率很高
:param loc_or_ele: 元素的定位信息可以是loc元组或查询字符串
:param index: 获取第几个从1开始可传入负数获取倒数第几个
:return: SessionElement对象或属性文本
"""
if self._mode == 's':
return super().s_ele(loc_or_ele)
return super().s_ele(loc_or_ele, index=index)
elif self._mode == 'd':
return super(SessionPage, self).s_ele(loc_or_ele)
return super(SessionPage, self).s_ele(loc_or_ele, index=index)
def s_eles(self, loc_or_str):
"""查找所有符合条件的元素以SessionElement形式返回d模式处理复杂页面时效率很高
@ -355,20 +378,19 @@ class WebPageTab(SessionPage, ChromiumTab, BasePage):
if self._response is not None:
self._response.close()
def _find_elements(self, loc_or_ele, timeout=None, single=True, relative=False, raise_err=None):
def _find_elements(self, loc_or_ele, timeout=None, index=1, relative=False, raise_err=None):
"""返回页面中符合条件的元素、属性或节点文本,默认返回第一个
:param loc_or_ele: 元素的定位信息可以是元素对象loc元组或查询字符串
:param timeout: 查找元素超时时间d模式专用
:param single: True则返回第一个False则返回全部
:param index: 第几个结果从1开始可传入负数获取倒数第几个为None返回所有
:param relative: WebPage用的表示是否相对定位的参数
:param raise_err: 找不到元素是是否抛出异常为None时根据全局设置
:return: 元素对象或属性文本节点文本
"""
if self._mode == 's':
return super()._find_elements(loc_or_ele, single=single)
return super()._find_elements(loc_or_ele, index=index)
elif self._mode == 'd':
return super(SessionPage, self)._find_elements(loc_or_ele, timeout=timeout, single=single,
relative=relative)
return super(SessionPage, self)._find_elements(loc_or_ele, timeout=timeout, index=index, relative=relative)
def __repr__(self):
return f'<WebPageTab browser_id={self.browser.id} tab_id={self.tab_id}>'

View File

@ -25,8 +25,11 @@ from .._units.waiter import TabWaiter
class ChromiumTab(ChromiumBase):
TABS: dict = ...
def __init__(self, page: ChromiumPage, tab_id: str = None):
def __new__(cls, page: ChromiumPage, tab_id: str): ...
def __init__(self, page: ChromiumPage, tab_id: str):
self._page: ChromiumPage = ...
self._browser: Browser = ...
self._rect: Optional[TabRect] = ...
@ -76,6 +79,7 @@ class WebPageTab(SessionPage, ChromiumTab):
def __call__(self,
loc_or_str: Union[Tuple[str, str], str, ChromiumElement, SessionElement],
index: int = 1,
timeout: float = None) -> Union[ChromiumElement, SessionElement, NoneElement]: ...
@property
@ -145,14 +149,16 @@ class WebPageTab(SessionPage, ChromiumTab):
def ele(self,
loc_or_ele: Union[Tuple[str, str], str, ChromiumElement, SessionElement],
index: int = 1,
timeout: float = None) -> Union[ChromiumElement, SessionElement, NoneElement]: ...
def eles(self,
loc_or_str: Union[Tuple[str, str], str],
timeout: float = None) -> List[Union[ChromiumElement, SessionElement]]: ...
def s_ele(self, loc_or_ele: Union[Tuple[str, str], str] = None) \
-> Union[SessionElement, NoneElement]: ...
def s_ele(self,
loc_or_ele: Union[Tuple[str, str], str] = None,
index: int = 1) -> Union[SessionElement, NoneElement]: ...
def s_eles(self, loc_or_str: Union[Tuple[str, str], str]) -> List[SessionElement]: ...
@ -191,7 +197,11 @@ class WebPageTab(SessionPage, ChromiumTab):
@property
def set(self) -> WebPageTabSetter: ...
def _find_elements(self, loc_or_ele: Union[Tuple[str, str], str, ChromiumElement, SessionElement, ChromiumFrame],
timeout: float = None, single: bool = True, relative: bool = False, raise_err: bool = None) \
def _find_elements(self,
loc_or_ele: Union[Tuple[str, str], str, ChromiumElement, SessionElement, ChromiumFrame],
timeout: float = None,
index: Optional[int] = 1,
relative: bool = False,
raise_err: bool = None) \
-> Union[ChromiumElement, SessionElement, ChromiumFrame, NoneElement, List[SessionElement], List[
Union[ChromiumElement, ChromiumFrame]]]: ...

View File

@ -68,14 +68,15 @@ class SessionPage(BasePage):
if not self._session:
self._session, self._headers = self._session_options.make_session()
def __call__(self, loc_or_str, timeout=None):
def __call__(self, loc_or_str, index=1, timeout=None):
"""在内部查找元素
ele2 = ele1('@id=ele_id')
:param loc_or_str: 元素的定位信息可以是loc元组或查询字符串
:param index: 获取第几个从1开始可传入负数获取倒数第几个
:param timeout: 不起实际作用用于和ChromiumElement对应便于无差别调用
:return: SessionElement对象或属性文本
"""
return self.ele(loc_or_str)
return self.ele(loc_or_str, index=index)
# -----------------共有属性和方法-------------------
@property
@ -174,13 +175,14 @@ class SessionPage(BasePage):
"""
return self._s_connect(url, 'post', show_errmsg, retry, interval, **kwargs)
def ele(self, loc_or_ele, timeout=None):
"""返回页面中符合条件的一个元素、属性或节点文本
def ele(self, loc_or_ele, index=1, timeout=None):
"""返回页面中符合条件的一个元素、属性或节点文本
:param loc_or_ele: 元素的定位信息可以是元素对象loc元组或查询字符串
:param index: 获取第几个从1开始可传入负数获取倒数第几个
:param timeout: 不起实际作用用于和ChromiumElement对应便于无差别调用
:return: SessionElement对象或属性文本
"""
return self._ele(loc_or_ele, method='ele()')
return self._ele(loc_or_ele, index=index, method='ele()')
def eles(self, loc_or_str, timeout=None):
"""返回页面中所有符合条件的元素、属性或节点文本
@ -188,31 +190,33 @@ class SessionPage(BasePage):
:param timeout: 不起实际作用用于和ChromiumElement对应便于无差别调用
:return: SessionElement对象或属性文本组成的列表
"""
return self._ele(loc_or_str, single=False)
return self._ele(loc_or_str, index=None)
def s_ele(self, loc_or_ele=None):
"""返回页面中符合条件的一个元素、属性或节点文本
def s_ele(self, loc_or_ele=None, index=1):
"""返回页面中符合条件的一个元素、属性或节点文本
:param loc_or_ele: 元素的定位信息可以是元素对象loc元组或查询字符串
:param index: 获取第几个从1开始可传入负数获取倒数第几个
:return: SessionElement对象或属性文本
"""
return make_session_ele(self.html) if loc_or_ele is None else self._ele(loc_or_ele, method='s_ele()')
return make_session_ele(self.html) if loc_or_ele is None else self._ele(loc_or_ele,
index=index, method='s_ele()')
def s_eles(self, loc_or_str):
"""返回页面中符合条件的所有元素、属性或节点文本
:param loc_or_str: 元素的定位信息可以是元素对象loc元组或查询字符串
:return: SessionElement对象或属性文本
"""
return self._ele(loc_or_str, single=False)
return self._ele(loc_or_str, index=None)
def _find_elements(self, loc_or_ele, timeout=None, single=True, raise_err=None):
def _find_elements(self, loc_or_ele, timeout=None, index=1, raise_err=None):
"""返回页面中符合条件的元素、属性或节点文本,默认返回第一个
:param loc_or_ele: 元素的定位信息可以是元素对象loc元组或查询字符串
:param timeout: 不起实际作用用于和父类对应
:param single: True则返回第一个False则返回全部
:param index: 第几个结果从1开始可传入负数获取倒数第几个为None返回所有
:param raise_err: 找不到元素是是否抛出异常为None时根据全局设置
:return: SessionElement对象
"""
return loc_or_ele if isinstance(loc_or_ele, SessionElement) else make_session_ele(self, loc_or_ele, single)
return loc_or_ele if isinstance(loc_or_ele, SessionElement) else make_session_ele(self, loc_or_ele, index=index)
def get_cookies(self, as_dict=False, all_domains=False, all_info=False):
"""返回cookies

View File

@ -42,6 +42,7 @@ class SessionPage(BasePage):
def __call__(self,
loc_or_str: Union[Tuple[str, str], str, SessionElement],
index: int = 1,
timeout: float = None) -> Union[SessionElement, NoneElement]: ...
# -----------------共有属性和方法-------------------
@ -91,6 +92,7 @@ class SessionPage(BasePage):
def ele(self,
loc_or_ele: Union[Tuple[str, str], str, SessionElement],
index: int = 1,
timeout: float = None) -> Union[SessionElement, NoneElement]: ...
def eles(self,
@ -98,15 +100,15 @@ class SessionPage(BasePage):
timeout: float = None) -> List[SessionElement]: ...
def s_ele(self,
loc_or_ele: Union[Tuple[str, str], str, SessionElement] = None) \
-> Union[SessionElement, NoneElement]: ...
loc_or_ele: Union[Tuple[str, str], str, SessionElement] = None,
index: int = 1) -> Union[SessionElement, NoneElement]: ...
def s_eles(self, loc_or_str: Union[Tuple[str, str], str]) -> List[SessionElement]: ...
def _find_elements(self,
loc_or_ele: Union[Tuple[str, str], str, SessionElement],
timeout: float = None,
single: bool = True,
index: Optional[int] = 1,
raise_err: bool = None) \
-> Union[SessionElement, NoneElement, List[SessionElement]]: ...

View File

@ -17,6 +17,16 @@ from .._units.setter import WebPageSetter
class WebPage(SessionPage, ChromiumPage, BasePage):
"""整合浏览器和request的页面类"""
def __new__(cls, mode='d', timeout=None, chromium_options=None, session_or_options=None, driver_or_options=None):
"""初始化函数
:param mode: 'd' 's'即driver模式和session模式
:param timeout: 超时时间d模式时为寻找元素时间s模式时为连接时间默认10秒
:param chromium_options: Driver对象只使用s模式时应传入False
:param session_or_options: Session对象或SessionOptions对象只使用d模式时应传入False
"""
opts = chromium_options or driver_or_options
return super().__new__(cls, opts)
def __init__(self, mode='d', timeout=None, chromium_options=None, session_or_options=None, driver_or_options=None):
"""初始化函数
:param mode: 'd' 's'即driver模式和session模式
@ -24,7 +34,9 @@ class WebPage(SessionPage, ChromiumPage, BasePage):
:param chromium_options: Driver对象只使用s模式时应传入False
:param session_or_options: Session对象或SessionOptions对象只使用d模式时应传入False
"""
chromium_options = chromium_options or driver_or_options
if hasattr(self, '_created'):
return
self._mode = mode.lower()
if self._mode not in ('s', 'd'):
raise ValueError('mode参数只能是s或d。')
@ -38,17 +50,18 @@ class WebPage(SessionPage, ChromiumPage, BasePage):
super(SessionPage, self).__init__(addr_or_opts=chromium_options, timeout=timeout)
self.change_mode(self._mode, go=False, copy_cookies=False)
def __call__(self, loc_or_str, timeout=None):
def __call__(self, loc_or_str, index=1, timeout=None):
"""在内部查找元素
ele = page('@id=ele_id')
:param loc_or_str: 元素的定位信息可以是loc元组或查询字符串
:param index: 获取第几个从1开始可传入负数获取倒数第几个
:param timeout: 超时时间
:return: 子元素对象
"""
if self._mode == 'd':
return super(SessionPage, self).__call__(loc_or_str, timeout)
return super(SessionPage, self).__call__(loc_or_str, index=index, timeout=timeout)
elif self._mode == 's':
return super().__call__(loc_or_str)
return super().__call__(loc_or_str, index=index)
@property
def set(self):
@ -182,16 +195,17 @@ class WebPage(SessionPage, ChromiumPage, BasePage):
return self.response
return super().post(url, show_errmsg, retry, interval, **kwargs)
def ele(self, loc_or_ele, timeout=None):
def ele(self, loc_or_ele, index=1, timeout=None):
"""返回第一个符合条件的元素、属性或节点文本
:param loc_or_ele: 元素的定位信息可以是元素对象loc元组或查询字符串
:param index: 获取第几个从1开始可传入负数获取倒数第几个
:param timeout: 查找元素超时时间默认与页面等待时间一致
:return: 元素对象或属性文本节点文本
"""
if self._mode == 's':
return super().ele(loc_or_ele)
return super().ele(loc_or_ele, index=index)
elif self._mode == 'd':
return super(SessionPage, self).ele(loc_or_ele, timeout=timeout)
return super(SessionPage, self).ele(loc_or_ele, index=index, timeout=timeout)
def eles(self, loc_or_str, timeout=None):
"""返回页面中所有符合条件的元素、属性或节点文本
@ -204,15 +218,16 @@ class WebPage(SessionPage, ChromiumPage, BasePage):
elif self._mode == 'd':
return super(SessionPage, self).eles(loc_or_str, timeout=timeout)
def s_ele(self, loc_or_ele=None):
def s_ele(self, loc_or_ele=None, index=1):
"""查找第一个符合条件的元素以SessionElement形式返回d模式处理复杂页面时效率很高
:param loc_or_ele: 元素的定位信息可以是loc元组或查询字符串
:param index: 获取第几个从1开始可传入负数获取倒数第几个
:return: SessionElement对象或属性文本
"""
if self._mode == 's':
return super().s_ele(loc_or_ele)
return super().s_ele(loc_or_ele, index=index)
elif self._mode == 'd':
return super(SessionPage, self).s_ele(loc_or_ele)
return super(SessionPage, self).s_ele(loc_or_ele, index=index)
def s_eles(self, loc_or_str):
"""查找所有符合条件的元素以SessionElement形式返回d模式处理复杂页面时效率很高
@ -360,20 +375,19 @@ class WebPage(SessionPage, ChromiumPage, BasePage):
if self._response is not None:
self._response.close()
def _find_elements(self, loc_or_ele, timeout=None, single=True, relative=False, raise_err=None):
def _find_elements(self, loc_or_ele, timeout=None, index=1, relative=False, raise_err=None):
"""返回页面中符合条件的元素、属性或节点文本,默认返回第一个
:param loc_or_ele: 元素的定位信息可以是元素对象loc元组或查询字符串
:param timeout: 查找元素超时时间d模式专用
:param single: True则返回第一个False则返回全部
:param index: 第几个结果从1开始可传入负数获取倒数第几个为None返回所有
:param relative: WebPage用的表示是否相对定位的参数
:param raise_err: 找不到元素是是否抛出异常为None时根据全局设置
:return: 元素对象或属性文本节点文本
"""
if self._mode == 's':
return super()._find_elements(loc_or_ele, single=single)
return super()._find_elements(loc_or_ele, index=index)
elif self._mode == 'd':
return super(SessionPage, self)._find_elements(loc_or_ele, timeout=timeout, single=single,
relative=relative)
return super(SessionPage, self)._find_elements(loc_or_ele, timeout=timeout, index=index, relative=relative)
def quit(self, timeout=5, force=True):
"""关闭浏览器和Session

View File

@ -5,7 +5,7 @@
@Copyright: (c) 2024 by g1879, Inc. All Rights Reserved.
@License : BSD 3-Clause.
"""
from typing import Union, Tuple, List, Any
from typing import Union, Tuple, List, Any, Optional
from requests import Session, Response
@ -38,6 +38,7 @@ class WebPage(SessionPage, ChromiumPage, BasePage):
def __call__(self,
loc_or_str: Union[Tuple[str, str], str, ChromiumElement, SessionElement],
index: int = 1,
timeout: float = None) -> Union[ChromiumElement, SessionElement, NoneElement]: ...
# -----------------共有属性和方法-------------------
@ -105,13 +106,16 @@ class WebPage(SessionPage, ChromiumPage, BasePage):
def ele(self,
loc_or_ele: Union[Tuple[str, str], str, ChromiumElement, SessionElement],
index: int = 1,
timeout: float = None) -> Union[ChromiumElement, SessionElement, NoneElement]: ...
def eles(self,
loc_or_str: Union[Tuple[str, str], str],
timeout: float = None) -> List[Union[ChromiumElement, SessionElement]]: ...
def s_ele(self, loc_or_ele: Union[Tuple[str, str], str] = None) -> Union[SessionElement, NoneElement]: ...
def s_ele(self,
loc_or_ele: Union[Tuple[str, str], str] = None,
index: int = 1) -> Union[SessionElement, NoneElement]: ...
def s_eles(self, loc_or_str: Union[Tuple[str, str], str]) -> List[SessionElement]: ...
@ -167,7 +171,7 @@ class WebPage(SessionPage, ChromiumPage, BasePage):
def _find_elements(self,
loc_or_ele: Union[Tuple[str, str], str, ChromiumElement, SessionElement, ChromiumFrame],
timeout: float = None,
single: bool = True,
index: Optional[int] = 1,
relative: bool = False,
raise_err: bool = None) \
-> Union[ChromiumElement, SessionElement, ChromiumFrame, NoneElement, List[SessionElement],

View File

@ -22,6 +22,7 @@ class DownloadManager(object):
self._browser = browser
self._page = browser.page
self._when_download_file_exists = 'rename'
self._save_path = None
t = TabDownloadSettings(self._page.tab_id)
t.path = self._page.download_path
@ -46,18 +47,19 @@ class DownloadManager(object):
"""返回所有未完成的下载任务"""
return self._missions
def set_path(self, tab_id, path):
def set_path(self, tab, path):
"""设置某个tab的下载路径
:param tab_id: tab id
:param tab: 页面对象
:param path: 下载路径绝对路径str
:return: None
"""
TabDownloadSettings(tab_id).path = path
if tab_id == self._page.tab_id or not self._running:
TabDownloadSettings(tab.tab_id).path = path
if tab is self._page or not self._running:
self._browser.driver.set_callback('Browser.downloadProgress', self._onDownloadProgress)
self._browser.driver.set_callback('Browser.downloadWillBegin', self._onDownloadWillBegin)
r = self._browser.run_cdp('Browser.setDownloadBehavior', downloadPath=path,
behavior='allowAndName', eventsEnabled=True)
self._save_path = path
if 'error' in r:
print('浏览器版本太低无法使用下载管理功能。')
self._running = True
@ -124,7 +126,10 @@ class DownloadManager(object):
:return: None
"""
mission.state = 'canceled'
try:
self._browser.run_cdp('Browser.cancelDownload', guid=mission.id)
except:
pass
if mission.final_path:
Path(mission.final_path).unlink(True)
@ -134,7 +139,10 @@ class DownloadManager(object):
:return: None
"""
mission.state = 'skipped'
try:
self._browser.run_cdp('Browser.cancelDownload', guid=mission.id)
except:
pass
def clear_tab_info(self, tab_id):
"""当tab关闭时清除有关信息
@ -182,7 +190,7 @@ class DownloadManager(object):
elif settings.when_file_exists == 'overwrite':
goal_path.unlink()
m = DownloadMission(self, tab_id, guid, settings.path, name, kwargs['url'], self._page.download_path)
m = DownloadMission(self, tab_id, guid, settings.path, name, kwargs['url'], self._save_path)
self._missions[guid] = m
if self.get_flag(tab_id) is False: # 取消该任务
@ -210,7 +218,7 @@ class DownloadManager(object):
return
mission.received_bytes = kwargs['receivedBytes']
mission.total_bytes = kwargs['totalBytes']
form_path = f'{mission.path}{sep}{mission.id}'
form_path = f'{mission.save_path}{sep}{mission.id}'
to_path = str(get_usable_path(f'{mission.path}{sep}{mission.name}'))
move(form_path, to_path)
self.set_done(mission, 'completed', final_path=to_path)

View File

@ -8,6 +8,7 @@
from typing import Dict, Optional, Union, Literal
from .._base.browser import Browser
from .._pages.chromium_base import ChromiumBase
from .._pages.chromium_page import ChromiumPage
@ -18,13 +19,14 @@ class DownloadManager(object):
_tab_missions: dict = ...
_flags: dict = ...
_running: bool = ...
_save_path: Optional[str] = ...
def __init__(self, browser: Browser): ...
@property
def missions(self) -> Dict[str, DownloadMission]: ...
def set_path(self, tab_id: str, path: str) -> None: ...
def set_path(self, tab: ChromiumBase, path: str) -> None: ...
def set_rename(self, tab_id: str, rename: str = None, suffix: str = None) -> None: ...

View File

@ -30,6 +30,7 @@ class Listener(object):
self._target_id = page._target_id
self._driver = None
self._running_requests = 0
self._running_targets = 0
self._caught = None
self._request_ids = None
@ -208,22 +209,24 @@ class Listener(object):
self._extra_info_ids = {}
self._caught = Queue(maxsize=0)
self._running_requests = 0
self._running_targets = 0
def wait_silent(self, timeout=None):
def wait_silent(self, timeout=None, targets_only=False):
"""等待所有请求结束
:param timeout: 超时为None时无限等待
:param targets_only: 是否只等待targets指定的请求结束
:return: 返回是否等待成功
"""
if not self.listening:
raise RuntimeError('监听未启动用listen.start()启动。')
if timeout is None:
while self._running_requests > 0:
while (not targets_only and self._running_requests > 0) or (targets_only and self._running_targets > 0):
sleep(.1)
return True
end_time = perf_counter() + timeout
while perf_counter() < end_time:
if self._running_requests <= 0:
if (not targets_only and self._running_requests <= 0) or (targets_only and self._running_targets <= 0):
return True
sleep(.1)
else:
@ -265,6 +268,7 @@ class Listener(object):
if self._targets is True:
if ((self._method is True or kwargs['request']['method'] in self._method)
and (self._res_type is True or kwargs.get('type', '').upper() in self._res_type)):
self._running_targets += 1
rid = kwargs['requestId']
p = self._request_ids.setdefault(rid, DataPacket(self._page.tab_id, True))
p._raw_request = kwargs
@ -279,6 +283,7 @@ class Listener(object):
or (not self._is_regex and target in kwargs['request']['url']))
and (self._method is True or kwargs['request']['method'] in self._method)
and (self._res_type is True or kwargs.get('type', '').upper() in self._res_type)):
self._running_targets += 1
p = self._request_ids.setdefault(rid, DataPacket(self._page.tab_id, target))
p._raw_request = kwargs
break
@ -346,16 +351,17 @@ class Listener(object):
if packet:
self._caught.put(packet)
self._running_targets -= 1
def _loading_failed(self, **kwargs):
"""请求失败时的回调方法"""
self._running_requests -= 1
r_id = kwargs['requestId']
dp = self._request_ids.get(r_id, None)
if dp:
dp._raw_fail_info = kwargs
dp._resource_type = kwargs['type']
dp.is_failed = True
data_packet = self._request_ids.get(r_id, None)
if data_packet:
data_packet._raw_fail_info = kwargs
data_packet._resource_type = kwargs['type']
data_packet.is_failed = True
r = self._extra_info_ids.get(kwargs['requestId'], None)
if r:
@ -371,8 +377,9 @@ class Listener(object):
self._request_ids.pop(r_id, None)
if dp:
self._caught.put(dp)
if data_packet:
self._caught.put(data_packet)
self._running_targets -= 1
class FrameListener(Listener):

View File

@ -33,6 +33,7 @@ class Listener(object):
self._extra_info_ids: dict = ...
self.listening: bool = ...
self._running_requests: int = ...
self._running_targets: int = ...
@property
def targets(self) -> Optional[set]: ...
@ -66,7 +67,7 @@ class Listener(object):
def clear(self) -> None: ...
def wait_silent(self, timeout=None) -> bool: ...
def wait_silent(self, timeout: float = None, targets_only: bool = False) -> bool: ...
def _to_target(self, target_id: str, address: str, page: ChromiumBase) -> None: ...

View File

@ -97,7 +97,7 @@ class SelectElement(object):
def by_index(self, index, timeout=None):
"""此方法用于根据index值选择项。当元素是多选列表时可以接收list或tuple
:param index: 序号0开始传入list或tuple可选择多项
:param index: 序号从1开始可传入负数获取倒数第几个传入list或tuple可选择多项
:param timeout: 超时时间为None默认使用页面超时时间
:return: 是否选择成功
"""
@ -136,7 +136,7 @@ class SelectElement(object):
def cancel_by_index(self, index, timeout=None):
"""此方法用于根据index值取消选择项。当元素是多选列表时可以接收list或tuple
:param index: 序号0开始传入list或tuple可取消多项
:param index: 序号从1开始可传入负数获取倒数第几个传入list或tuple可取消多项
:param timeout: 超时时间不输入默认实用页面超时时间
:return: 是否取消成功
"""
@ -231,7 +231,7 @@ class SelectElement(object):
"""
ok = False
condition = [int(i) for i in condition]
text_len = max(condition)
text_len = abs(max(condition, key=abs))
end_time = perf_counter() + timeout
while perf_counter() < end_time:
if len(self.options) >= text_len:
@ -240,7 +240,7 @@ class SelectElement(object):
if ok:
eles = self.options
eles = [eles[i - 1] for i in condition]
eles = [eles[i - 1] if i > 0 else eles[i] for i in condition]
self._select_options(eles, mode)
return True

View File

@ -185,7 +185,7 @@ class TabSetter(ChromiumBaseSetter):
"""
path = str(Path(path).absolute())
self._page._download_path = path
self._page.browser._dl_mgr.set_path(self._page.tab_id, path)
self._page.browser._dl_mgr.set_path(self._page, path)
if self._page._DownloadKit:
self._page._DownloadKit.set.goal_path(path)

View File

@ -96,7 +96,7 @@ class BaseWaiter(object):
"""
return self._loading(timeout=timeout, gap=.002, raise_err=raise_err)
def load_complete(self, timeout=None, raise_err=None):
def doc_loaded(self, timeout=None, raise_err=None):
"""等待页面加载完成
:param timeout: 超时时间为None时使用页面timeout属性
:param raise_err: 等待失败时是否报错为None时根据Settings设置
@ -215,6 +215,14 @@ class BaseWaiter(object):
:return: count为1时返回数据包对象大于1时返回列表超时且fix_count为True时返回False"""
return self._driver.listen.wait(count, timeout, fix_count)
def load_complete(self, timeout=None, raise_err=None):
"""等待页面加载完成
:param timeout: 超时时间为None时使用页面timeout属性
:param raise_err: 等待失败时是否报错为None时根据Settings设置
:return: 是否等待成功
"""
return self._loading(timeout=timeout, start=False, raise_err=raise_err)
class TabWaiter(BaseWaiter):

View File

@ -42,7 +42,7 @@ class BaseWaiter(object):
def load_start(self, timeout: float = None, raise_err: bool = None) -> bool: ...
def load_complete(self, timeout: float = None, raise_err: bool = None) -> bool: ...
def doc_loaded(self, timeout: float = None, raise_err: bool = None) -> bool: ...
def upload_paths_inputted(self) -> bool: ...

View File

@ -89,3 +89,7 @@ class StorageError(BaseError):
class CookieFormatError(BaseError):
_info = 'cookie格式不正确。'
class TargetNotFoundError(BaseError):
_info = '找不到指定页面。'

View File

@ -2,7 +2,7 @@ requests
lxml
cssselect
DownloadKit>=2.0.0
websocket-client>=1.7.0
websocket-client
click
tldextract
psutil

View File

@ -6,7 +6,7 @@ with open("README.md", "r", encoding='utf-8') as fh:
setup(
name="DrissionPage",
version="4.0.1",
version="4.0.2",
author="g1879",
author_email="g1879@qq.com",
description="Python based web automation tool. It can control the browser and send and receive data packets.",
@ -23,21 +23,21 @@ setup(
'requests',
'cssselect',
'DownloadKit>=2.0.0',
'websocket-client>=1.7.0',
'websocket-client',
'click',
'tldextract',
'psutil'
],
classifiers=[
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.6",
"Development Status :: 4 - Beta",
"Topic :: Utilities",
"License :: OSI Approved :: BSD License",
],
python_requires='>=3.8',
python_requires='>=3.6',
entry_points={
'console_scripts': [
'dp = DrissionPage.commons.cli:main',
'dp = DrissionPage.functions.cli:main',
],
},
)