4.0.0b32(+)

优化WebPage的post()返回值;
优化run_async_js()逻辑,删除timeout参数;
修复判断覆盖失效问题;
修复页面滚动有时报错问题;
优化_make_response()返回逻辑
This commit is contained in:
g1879 2023-12-27 23:45:58 +08:00
parent 9a6bd9c2b4
commit 655895c560
14 changed files with 63 additions and 68 deletions

View File

@ -13,4 +13,4 @@ from ._configs.chromium_options import ChromiumOptions
from ._configs.session_options import SessionOptions from ._configs.session_options import SessionOptions
__all__ = ['ChromiumPage', 'ChromiumOptions', 'SessionOptions', 'SessionPage', 'WebPage', '__version__'] __all__ = ['ChromiumPage', 'ChromiumOptions', 'SessionOptions', 'SessionPage', 'WebPage', '__version__']
__version__ = '4.0.0b31' __version__ = '4.0.0b32'

View File

@ -1409,7 +1409,8 @@ def run_js(page_or_ele, script, as_expr=False, timeout=None, args=None):
res = page.run_cdp('Runtime.callFunctionOn', functionDeclaration=script, objectId=obj_id, res = page.run_cdp('Runtime.callFunctionOn', functionDeclaration=script, objectId=obj_id,
arguments=[convert_argument(arg) for arg in args], returnByValue=False, arguments=[convert_argument(arg) for arg in args], returnByValue=False,
awaitPromise=True, userGesture=True, _timeout=timeout, _ignore=AlertExistsError) awaitPromise=True, userGesture=True, _timeout=timeout, _ignore=AlertExistsError)
except TimeoutError:
raise TimeoutError('执行js超时。')
except ContextLostError: except ContextLostError:
if is_page: if is_page:
raise ContextLostError('页面已被刷新,请尝试等待页面加载完成再执行操作。') raise ContextLostError('页面已被刷新,请尝试等待页面加载完成再执行操作。')

View File

@ -485,17 +485,14 @@ class ChromiumBase(BasePage):
self.wait.load_complete() self.wait.load_complete()
return run_js(self, script, as_expr, self.timeouts.script if timeout is None else timeout, args) return run_js(self, script, as_expr, self.timeouts.script if timeout is None else timeout, args)
def run_async_js(self, script, *args, as_expr=False, timeout=None): def run_async_js(self, script, *args, as_expr=False):
"""以异步方式执行js代码 """以异步方式执行js代码
:param script: js文本 :param script: js文本
:param args: 参数按顺序在js文本中对应arguments[0]arguments[1]... :param args: 参数按顺序在js文本中对应arguments[0]arguments[1]...
:param as_expr: 是否作为表达式运行为True时args无效 :param as_expr: 是否作为表达式运行为True时args无效
:param timeout: js超时时间为None则使用页面timeouts.script属性值
:return: None :return: None
""" """
from threading import Thread run_js(self, script, as_expr, 0, args)
Thread(target=run_js, args=(self, script, as_expr, self.timeouts.script if timeout is None else timeout,
args)).start()
def get(self, url, show_errmsg=False, retry=None, interval=None, timeout=None): def get(self, url, show_errmsg=False, retry=None, interval=None, timeout=None):
"""访问url """访问url

View File

@ -167,7 +167,7 @@ class ChromiumBase(BasePage):
def run_js_loaded(self, script: str, *args, as_expr: bool = False, timeout: float = None) -> Any: ... def run_js_loaded(self, script: str, *args, as_expr: bool = False, timeout: float = None) -> Any: ...
def run_async_js(self, script: str, *args, as_expr: bool = False, timeout: float = None) -> None: ... def run_async_js(self, script: str, *args, as_expr: bool = False) -> None: ...
def get(self, url: str, show_errmsg: bool = False, retry: int = None, def get(self, url: str, show_errmsg: bool = False, retry: int = None,
interval: float = None, timeout: float = None) -> Union[None, bool]: ... interval: float = None, timeout: float = None) -> Union[None, bool]: ...

View File

@ -212,19 +212,20 @@ class WebPageTab(SessionPage, ChromiumTab, BasePage):
timeout = self.timeouts.page_load if self._has_driver else self.timeout timeout = self.timeouts.page_load if self._has_driver else self.timeout
return super().get(url, show_errmsg, retry, interval, timeout, **kwargs) return super().get(url, show_errmsg, retry, interval, timeout, **kwargs)
def post(self, url: str, data=None, show_errmsg=False, retry=None, interval=None, **kwargs): def post(self, url, show_errmsg=False, retry=None, interval=None, **kwargs):
"""用post方式跳转到url会切换到s模式 """用post方式跳转到url会切换到s模式
:param url: 目标url :param url: 目标url
:param data: post方式时提交的数据
:param show_errmsg: 是否显示和抛出异常 :param show_errmsg: 是否显示和抛出异常
:param retry: 重试次数 :param retry: 重试次数为None时使用页面对象retry_times属性值
:param interval: 重试间隔 :param interval: 重试间隔为None时使用页面对象retry_interval属性值
:param kwargs: 连接参数 :param kwargs: 连接参数
:return: url是否可用 :return: s模式时返回url是否可用d模式时返回获取到的Response对象
""" """
if self.mode == 'd': if self.mode == 'd':
self.cookies_to_session() self.cookies_to_session()
return super().post(url, data, show_errmsg, retry, interval, **kwargs) super().post(url, show_errmsg, retry, interval, **kwargs)
return self.response
return super().post(url, show_errmsg, retry, interval, **kwargs)
def ele(self, loc_or_ele, timeout=None): def ele(self, loc_or_ele, timeout=None):
"""返回第一个符合条件的元素、属性或节点文本 """返回第一个符合条件的元素、属性或节点文本

View File

@ -165,7 +165,7 @@ class WebPageTab(SessionPage, ChromiumTab):
hooks: Any | None = ..., hooks: Any | None = ...,
stream: Any | None = ..., stream: Any | None = ...,
verify: Any | None = ..., verify: Any | None = ...,
cert: Any | None = ...) -> bool: ... cert: Any | None = ...) -> Union[bool, Response]: ...
@property @property
def set(self) -> WebPageTabSetter: ... def set(self) -> WebPageTabSetter: ...

View File

@ -25,7 +25,7 @@ class SessionPage(BasePage):
def __init__(self, session_or_options=None, timeout=None): def __init__(self, session_or_options=None, timeout=None):
""" """
:param session_or_options: Session对象或SessionOptions对象 :param session_or_options: Session对象或SessionOptions对象
:param timeout: 连接超时时间为None时从ini文件读取 :param timeout: 连接超时时间为None时从ini文件读取或默认10
""" """
super(SessionPage, SessionPage).__init__(self) super(SessionPage, SessionPage).__init__(self)
self._headers = None self._headers = None
@ -41,7 +41,7 @@ class SessionPage(BasePage):
def _s_set_start_options(self, session_or_options): def _s_set_start_options(self, session_or_options):
"""启动配置 """启动配置
:param session_or_options: SessionSessionOptions :param session_or_options: SessionSessionOptions对象
:return: None :return: None
""" """
if not session_or_options or isinstance(session_or_options, SessionOptions): if not session_or_options or isinstance(session_or_options, SessionOptions):
@ -117,12 +117,12 @@ class SessionPage(BasePage):
@property @property
def session(self): def session(self):
"""返回session对象""" """返回Session对象"""
return self._session return self._session
@property @property
def response(self): def response(self):
"""返回访问url得到的response对象""" """返回访问url得到的Response对象"""
return self._response return self._response
@property @property
@ -159,7 +159,18 @@ class SessionPage(BasePage):
r.status_code = 200 r.status_code = 200
self._response = r self._response = r
return return
return self._s_connect(url, 'get', None, show_errmsg, retry, interval, **kwargs) return self._s_connect(url, 'get', show_errmsg, retry, interval, **kwargs)
def post(self, url, show_errmsg=False, retry=None, interval=None, **kwargs):
"""用post方式跳转到url
:param url: 目标url
:param show_errmsg: 是否显示和抛出异常
:param retry: 重试次数为None时使用页面对象retry_times属性值
:param interval: 重试间隔为None时使用页面对象timeout属性值
:param kwargs: 连接参数
:return: url是否可用
"""
return self._s_connect(url, 'post', show_errmsg, retry, interval, **kwargs)
def ele(self, loc_or_ele, timeout=None): def ele(self, loc_or_ele, timeout=None):
"""返回页面中符合条件的第一个元素、属性或节点文本 """返回页面中符合条件的第一个元素、属性或节点文本
@ -230,18 +241,6 @@ class SessionPage(BasePage):
r.append({'name': c['name'], 'value': c['value'], 'domain': c['domain']}) r.append({'name': c['name'], 'value': c['value'], 'domain': c['domain']})
return r return r
def post(self, url, data=None, show_errmsg=False, retry=None, interval=None, **kwargs):
"""用post方式跳转到url
:param url: 目标url
:param data: 提交的数据
:param show_errmsg: 是否显示和抛出异常
:param retry: 重试次数
:param interval: 重试间隔
:param kwargs: 连接参数
:return: url是否可用
"""
return self._s_connect(url, 'post', data, show_errmsg, retry, interval, **kwargs)
def close(self): def close(self):
"""关闭Session对象""" """关闭Session对象"""
self._session.close() self._session.close()
@ -260,11 +259,10 @@ class SessionPage(BasePage):
interval = interval if interval is not None else self.retry_interval interval = interval if interval is not None else self.retry_interval
return retry, interval return retry, interval
def _s_connect(self, url, mode, data=None, show_errmsg=False, retry=None, interval=None, **kwargs): def _s_connect(self, url, mode, show_errmsg=False, retry=None, interval=None, **kwargs):
"""执行get或post连接 """执行get或post连接
:param url: 目标url :param url: 目标url
:param mode: 'get' 'post' :param mode: 'get' 'post'
:param data: 提交的数据
:param show_errmsg: 是否显示和抛出异常 :param show_errmsg: 是否显示和抛出异常
:param retry: 重试次数 :param retry: 重试次数
:param interval: 重试间隔 :param interval: 重试间隔
@ -272,7 +270,7 @@ class SessionPage(BasePage):
:return: url是否可用 :return: url是否可用
""" """
retry, interval = self._before_connect(url, retry, interval) retry, interval = self._before_connect(url, retry, interval)
self._response, info = self._make_response(self._url, mode, data, retry, interval, show_errmsg, **kwargs) self._response, info = self._make_response(self._url, mode, retry, interval, show_errmsg, **kwargs)
if self._response is None: if self._response is None:
self._url_available = False self._url_available = False
@ -288,11 +286,10 @@ class SessionPage(BasePage):
return self._url_available return self._url_available
def _make_response(self, url, mode='get', data=None, retry=None, interval=None, show_errmsg=False, **kwargs): def _make_response(self, url, mode='get', retry=None, interval=None, show_errmsg=False, **kwargs):
"""生成Response对象 """生成Response对象
:param url: 目标url :param url: 目标url
:param mode: 'get' 'post' :param mode: 'get' 'post'
:param data: post方式要提交的数据
:param show_errmsg: 是否显示和抛出异常 :param show_errmsg: 是否显示和抛出异常
:param kwargs: 其它参数 :param kwargs: 其它参数
:return: tuple第一位为Response或None第二位为出错信息或 'Success' :return: tuple第一位为Response或None第二位为出错信息或 'Success'
@ -325,7 +322,7 @@ class SessionPage(BasePage):
if mode == 'get': if mode == 'get':
r = self.session.get(url, **kwargs) r = self.session.get(url, **kwargs)
elif mode == 'post': elif mode == 'post':
r = self.session.post(url, data=data, **kwargs) r = self.session.post(url, **kwargs)
if r and r.content: if r and r.content:
if self._encoding: if self._encoding:
@ -344,18 +341,19 @@ class SessionPage(BasePage):
if show_errmsg: if show_errmsg:
print(f'重试 {url}') print(f'重试 {url}')
if r is None:
if show_errmsg: if show_errmsg:
if err: if err:
raise err raise err
elif r is not None:
raise ConnectionError(f'状态码:{r.status_code}') if r.content else ConnectionError('返回内容为空。')
else: else:
raise ConnectionError('连接失败') raise ConnectionError('连接失败')
return None, '连接失败' if err is None else err
if not r.ok: else:
if show_errmsg: if r is not None:
raise ConnectionError(f'状态码:{r.status_code}') return (r, f'状态码:{r.status_code}') if r.content else (None, '返回内容为空')
return r, f'状态码:{r.status_code}' else:
return None, '连接失败' if err is None else err
def __repr__(self): def __repr__(self):
return f'<SessionPage url={self.url}>' return f'<SessionPage url={self.url}>'

View File

@ -128,10 +128,10 @@ class SessionPage(BasePage):
def post(self, def post(self,
url: str, url: str,
data: Union[dict, str, None] = ...,
show_errmsg: bool = False, show_errmsg: bool = False,
retry: int | None = None, retry: int | None = None,
interval: float | None = None, interval: float | None = None,
data: Union[dict, str, None] = ...,
timeout: float | None = ..., timeout: float | None = ...,
params: dict | None = ..., params: dict | None = ...,
json: Union[dict, str, None] = ..., json: Union[dict, str, None] = ...,
@ -153,7 +153,6 @@ class SessionPage(BasePage):
def _s_connect(self, def _s_connect(self,
url: str, url: str,
mode: str, mode: str,
data: Union[dict, str, None] = None,
show_errmsg: bool = False, show_errmsg: bool = False,
retry: int = None, retry: int = None,
interval: float = None, interval: float = None,
@ -162,7 +161,6 @@ class SessionPage(BasePage):
def _make_response(self, def _make_response(self,
url: str, url: str,
mode: str = 'get', mode: str = 'get',
data: Union[dict, str] = None,
retry: int = None, retry: int = None,
interval: float = None, interval: float = None,
show_errmsg: bool = False, show_errmsg: bool = False,

View File

@ -165,19 +165,20 @@ class WebPage(SessionPage, ChromiumPage, BasePage):
timeout = self.timeouts.page_load if self._has_driver else self.timeout timeout = self.timeouts.page_load if self._has_driver else self.timeout
return super().get(url, show_errmsg, retry, interval, timeout, **kwargs) return super().get(url, show_errmsg, retry, interval, timeout, **kwargs)
def post(self, url: str, data=None, show_errmsg=False, retry=None, interval=None, **kwargs): def post(self, url, show_errmsg=False, retry=None, interval=None, **kwargs):
"""用post方式跳转到url会切换到s模式 """用post方式跳转到url会切换到s模式
:param url: 目标url :param url: 目标url
:param data: post方式时提交的数据
:param show_errmsg: 是否显示和抛出异常 :param show_errmsg: 是否显示和抛出异常
:param retry: 重试次数 :param retry: 重试次数为None时使用页面对象retry_times属性值
:param interval: 重试间隔 :param interval: 重试间隔为None时使用页面对象retry_interval属性值
:param kwargs: 连接参数 :param kwargs: 连接参数
:return: url是否可用 :return: s模式时返回url是否可用d模式时返回获取到的Response对象
""" """
if self.mode == 'd': if self.mode == 'd':
self.cookies_to_session() self.cookies_to_session()
return super().post(url, data, show_errmsg, retry, interval, **kwargs) super().post(url, show_errmsg, retry, interval, **kwargs)
return self.response
return super().post(url, show_errmsg, retry, interval, **kwargs)
def ele(self, loc_or_ele, timeout=None): def ele(self, loc_or_ele, timeout=None):
"""返回第一个符合条件的元素、属性或节点文本 """返回第一个符合条件的元素、属性或节点文本

View File

@ -157,7 +157,7 @@ class WebPage(SessionPage, ChromiumPage, BasePage):
hooks: Any | None = ..., hooks: Any | None = ...,
stream: Any | None = ..., stream: Any | None = ...,
verify: Any | None = ..., verify: Any | None = ...,
cert: Any | None = ...) -> bool: ... cert: Any | None = ...) -> Union[bool, Response]: ...
@property @property
def set(self) -> WebPageSetter: ... def set(self) -> WebPageSetter: ...

View File

@ -81,6 +81,7 @@ class Scroller(object):
self._run_js(f'{{}}.scrollBy({pixel}, 0);') self._run_js(f'{{}}.scrollBy({pixel}, 0);')
def _wait_scrolled(self): def _wait_scrolled(self):
"""等待滚动结束"""
if not self._wait_complete: if not self._wait_complete:
return return
@ -89,7 +90,7 @@ class Scroller(object):
x = r['layoutViewport']['pageX'] x = r['layoutViewport']['pageX']
y = r['layoutViewport']['pageY'] y = r['layoutViewport']['pageY']
end_time = perf_counter() + self._driver.page.timeout end_time = perf_counter() + page.timeout
while perf_counter() < end_time: while perf_counter() < end_time:
sleep(.1) sleep(.1)
r = page.run_cdp('Page.getLayoutMetrics') r = page.run_cdp('Page.getLayoutMetrics')

View File

@ -3,15 +3,13 @@ from typing import Union
from .._elements.chromium_element import ChromiumElement from .._elements.chromium_element import ChromiumElement
from .._pages.chromium_base import ChromiumBase from .._pages.chromium_base import ChromiumBase
from .._pages.chromium_frame import ChromiumFrame
from .._pages.chromium_page import ChromiumPage
class Scroller(object): class Scroller(object):
def __init__(self, page_or_ele: Union[ChromiumBase, ChromiumElement, ChromiumFrame]): def __init__(self, page_or_ele: Union[ChromiumBase, ChromiumElement]):
self.t1: str = ... self.t1: str = ...
self.t2: str = ... self.t2: str = ...
self._driver: Union[ChromiumPage, ChromiumElement, ChromiumFrame] = ... self._driver: Union[ChromiumBase, ChromiumElement] = ...
self._wait_complete: bool = ... self._wait_complete: bool = ...
def _run_js(self, js: str): ... def _run_js(self, js: str): ...

View File

@ -64,7 +64,7 @@ class ElementStates(object):
"""返回元素是否被覆盖与是否在视口中无关如被覆盖返回覆盖元素的backend id否则返回False""" """返回元素是否被覆盖与是否在视口中无关如被覆盖返回覆盖元素的backend id否则返回False"""
lx, ly = self._ele.rect.click_point lx, ly = self._ele.rect.click_point
try: try:
bid = self._ele.page.run_cdp('DOM.getNodeForLocation', x=lx, y=ly).get('backendNodeId') bid = self._ele.page.run_cdp('DOM.getNodeForLocation', x=int(lx), y=int(ly)).get('backendNodeId')
return bid if bid != self._ele._backend_id else False return bid if bid != self._ele._backend_id else False
except CDPError: except CDPError:
return False return False

View File

@ -6,7 +6,7 @@ with open("README.md", "r", encoding='utf-8') as fh:
setup( setup(
name="DrissionPage", name="DrissionPage",
version="4.0.0b31", version="4.0.0b32",
author="g1879", author="g1879",
author_email="g1879@qq.com", author_email="g1879@qq.com",
description="Python based web automation tool. It can control the browser and send and receive data packets.", description="Python based web automation tool. It can control the browser and send and receive data packets.",
@ -29,12 +29,12 @@ setup(
'psutil' 'psutil'
], ],
classifiers=[ classifiers=[
"Programming Language :: Python :: 3.6", "Programming Language :: Python :: 3.8",
"Development Status :: 4 - Beta", "Development Status :: 4 - Beta",
"Topic :: Utilities", "Topic :: Utilities",
"License :: OSI Approved :: BSD License", "License :: OSI Approved :: BSD License",
], ],
python_requires='>=3.6', python_requires='>=3.8',
entry_points={ entry_points={
'console_scripts': [ 'console_scripts': [
'dp = DrissionPage.commons.cli:main', 'dp = DrissionPage.commons.cli:main',