增加run_js_loaded();alert问题待解决

This commit is contained in:
g1879 2023-02-13 17:46:13 +08:00
parent dea209f35a
commit f9068cfbb1
10 changed files with 110 additions and 75 deletions

View File

@ -10,6 +10,7 @@ from warnings import warn
from requests import Session
from .functions.tools import AlertExistsError
from .functions.tools import get_usable_path
from .base import BasePage
from .chromium_driver import ChromiumDriver
@ -73,6 +74,7 @@ class ChromiumBase(BasePage):
self._first_run = True
self._is_reading = False
self._upload_list = None
self._wait = None
def _driver_init(self, tab_id):
"""新建页面、页面刷新、切换标签页后要进行的cdp参数初始化
@ -278,24 +280,18 @@ class ChromiumBase(BasePage):
@property
def ready_state(self):
"""返回当前页面加载状态,'loading' 'interactive' 'complete'"""
try:
return self.run_cdp('Runtime.evaluate', expression='document.readyState;')['result']['value']
except KeyError:
raise ConnectionError('标签页或连接已关闭。')
return self.run_cdp('Runtime.evaluate', expression='document.readyState;')['result']['value']
@property
def size(self):
"""返回页面总宽高,格式:(宽, 高)"""
# w = self.run_js('document.body.scrollWidth;', as_expr=True)
# h = self.run_js('document.body.scrollHeight;', as_expr=True)
# return w, h
r = self.run_cdp_loaded('Page.getLayoutMetrics')['contentSize']
return r['width'], r['height']
@property
def active_ele(self):
"""返回当前焦点所在元素"""
return self.run_js('return document.activeElement;')
return self.run_js_loaded('return document.activeElement;')
@property
def page_load_strategy(self):
@ -328,7 +324,9 @@ class ChromiumBase(BasePage):
@property
def wait(self):
"""返回用于等待的对象"""
return ChromiumPageWaiter(self)
if self._wait is None:
self._wait = ChromiumPageWaiter(self)
return self._wait
def set_timeouts(self, implicit=None, page_load=None, script=None):
"""设置超时时间,单位为秒
@ -346,6 +344,36 @@ class ChromiumBase(BasePage):
if script is not None:
self._timeouts.script = script
def run_cdp(self, cmd, **cmd_args):
"""执行Chrome DevTools Protocol语句
:param cmd: 协议项目
:param cmd_args: 参数
:return: 执行的结果
"""
r = self.driver.call_method(cmd, **cmd_args)
if 'error' not in r:
return r
if 'Cannot find context with specified id' in r['error']:
raise RuntimeError('页面被刷新,请操作前尝试等待页面刷新或加载完成。')
elif 'Could not find node with given id' in r['error']:
raise RuntimeError('该元素已不在当前页面中。')
elif 'tab closed' in r['error']:
raise RuntimeError('标签页已关闭。')
elif 'alert exists' in r['error']:
raise AlertExistsError('存在未处理的提示框。')
else:
raise RuntimeError(r)
def run_cdp_loaded(self, cmd, **cmd_args):
"""执行Chrome DevTools Protocol语句执行前等待页面加载完毕
:param cmd: 协议项目
:param cmd_args: 参数
:return: 执行的结果
"""
self.wait.load_complete()
return self.run_cdp(cmd, **cmd_args)
def run_js(self, script, as_expr=False, *args):
"""运行javascript代码
:param script: js文本
@ -355,6 +383,16 @@ class ChromiumBase(BasePage):
"""
return run_js(self, script, as_expr, self.timeouts.script, args)
def run_js_loaded(self, script, as_expr=False, *args):
"""运行javascript代码执行前等待页面加载完毕
:param script: js文本
:param as_expr: 是否作为表达式运行为True时args无效
:param args: 参数按顺序在js文本中对应argument[0]argument[1]...
:return: 运行的结果
"""
self.wait.load_complete()
return run_js(self, script, as_expr, self.timeouts.script, args)
def run_async_js(self, script, as_expr=False, *args):
"""以异步方式执行js代码
:param script: js文本
@ -553,35 +591,6 @@ class ChromiumBase(BasePage):
while self.ready_state != 'complete':
sleep(.1)
def run_cdp(self, cmd, **cmd_args):
"""执行Chrome DevTools Protocol语句
:param cmd: 协议项目
:param cmd_args: 参数
:return: 执行的结果
"""
r = self.driver.call_method(cmd, **cmd_args)
if 'error' not in r:
return r
if 'Cannot find context with specified id' in r['error']:
raise RuntimeError('页面被刷新,请操作前尝试等待页面刷新或加载完成。')
elif 'Could not find node with given id' in r['error']:
raise RuntimeError('该元素已不在当前页面中。')
elif 'tab closed' in r['error']:
raise RuntimeError('标签页已关闭。')
else:
raise RuntimeError(r)
def run_cdp_loaded(self, cmd, **cmd_args):
"""执行Chrome DevTools Protocol语句执行前等待页面加载完毕
:param cmd: 协议项目
:param cmd_args: 参数
:return: 执行的结果
"""
while self.is_loading:
sleep(.1)
return self.run_cdp(cmd, **cmd_args)
def set_user_agent(self, ua, platform=None):
"""为当前tab设置user agent只在当前tab有效
:param ua: user agent字符串
@ -599,7 +608,7 @@ class ChromiumBase(BasePage):
:return: sessionStorage一个或所有项内容
"""
js = f'sessionStorage.getItem("{item}");' if item else 'sessionStorage;'
return self.run_js(js, as_expr=True)
return self.run_js_loaded(js, as_expr=True)
def get_local_storage(self, item=None):
"""获取localStorage信息不设置item则获取全部
@ -607,7 +616,7 @@ class ChromiumBase(BasePage):
:return: localStorage一个或所有项内容
"""
js = f'localStorage.getItem("{item}");' if item else 'localStorage;'
return self.run_js(js, as_expr=True)
return self.run_js_loaded(js, as_expr=True)
def set_session_storage(self, item, value):
"""设置或删除某项sessionStorage信息
@ -617,7 +626,7 @@ class ChromiumBase(BasePage):
"""
js = f'sessionStorage.removeItem("{item}");' if item is False \
else f'sessionStorage.setItem("{item}","{value}");'
return self.run_js(js, as_expr=True)
return self.run_js_loaded(js, as_expr=True)
def set_local_storage(self, item, value):
"""设置或删除某项localStorage信息
@ -626,7 +635,7 @@ class ChromiumBase(BasePage):
:return: None
"""
js = f'localStorage.removeItem("{item}");' if item is False else f'localStorage.setItem("{item}","{value}");'
return self.run_js(js, as_expr=True)
return self.run_js_loaded(js, as_expr=True)
def get_screenshot(self, path=None, as_bytes=None, full_page=False, left_top=None, right_bottom=None):
"""对页面进行截图可对整个网页、可见网页、指定范围截图。对可视范围外截图需要90以上版本浏览器支持
@ -715,8 +724,7 @@ class ChromiumBase(BasePage):
result = self.run_cdp('Page.navigate', url=to_url)
is_timeout = not self._wait_loaded(timeout)
while self.is_loading:
sleep(.1)
self.wait.load_complete()
if is_timeout:
err = TimeoutError('页面连接超时。')
@ -824,7 +832,7 @@ class ChromiumPageScroll(ChromiumScroll):
try:
self._driver.run_cdp_loaded('DOM.scrollIntoViewIfNeeded', nodeId=node_id)
except Exception:
ele.run_js("this.scrollIntoView();")
ele.run_js_loaded("this.scrollIntoView();")
if not ele.is_in_viewport:
offset_scroll(ele, 0, 0)

View File

@ -37,6 +37,7 @@ class ChromiumBase(BasePage):
self._debug: bool = ...
self._debug_recorder: Recorder = ...
self._upload_list: list = ...
self._wait: ChromiumPageWaiter = ...
def _connect_browser(self, tab_id: str = None) -> None: ...
@ -127,6 +128,8 @@ class ChromiumBase(BasePage):
def run_js(self, script: str, as_expr: bool = False, *args: Any) -> Any: ...
def run_js_loaded(self, script: str, as_expr: bool = False, *args: Any) -> Any: ...
def run_async_js(self, script: str, as_expr: bool = False, *args: Any) -> None: ...
def get(self,

View File

@ -99,7 +99,8 @@ class ChromiumDriver(object):
except Empty:
if self.has_alert:
return {'result': {'alert': True}}
return {'error': {'message': 'alert exists'},
'type': 'alert_exists'}
if isinstance(timeout, (int, float)) and timeout <= 0:
raise TimeoutError(f"调用{message['method']}超时。")
@ -179,7 +180,7 @@ class ChromiumDriver(object):
return {'error': 'tab closed', 'type': 'tab_closed'}
if 'result' not in result and 'error' in result:
return {'error': result['error']['message'],
'type': 'call_method_error',
'type': result.get('type', 'call_method_error'),
'method': _method,
'args': kwargs}

View File

@ -32,6 +32,7 @@ class ChromiumElement(DrissionElement):
self._select = None
self._scroll = None
self._tag = None
self._wait = None
if node_id:
self._node_id = node_id
@ -274,7 +275,9 @@ class ChromiumElement(DrissionElement):
@property
def wait(self):
"""返回用于等待的对象"""
return ChromiumWaiter(self)
if self._wait is None:
self._wait = ChromiumWaiter(self)
return self._wait
@property
def select(self):
@ -356,7 +359,7 @@ class ChromiumElement(DrissionElement):
:param value: 属性值
:return: None
"""
self.run_js(f'this.setAttribute(arguments[0], arguments[1]);', False, attr, str(value))
self.page.run_cdp('DOM.setAttributeValue', nodeId=self.node_id, name=attr, value=str(value))
def remove_attr(self, attr):
"""删除元素attribute属性
@ -1282,29 +1285,33 @@ def run_js(page_or_ele, script, as_expr=False, timeout=None, args=None):
if isinstance(page_or_ele, (ChromiumElement, ChromiumShadowRootElement)):
page = page_or_ele.page
obj_id = page_or_ele.obj_id
is_page = False
else:
page = page_or_ele
obj_id = page_or_ele._root_id
is_page = True
if as_expr:
res = page.run_cdp('Runtime.evaluate',
expression=script,
returnByValue=False,
awaitPromise=True,
userGesture=True,
timeout=timeout * 1000)
res = page.driver.Runtime.evaluate(expression=script,
returnByValue=False,
awaitPromise=True,
userGesture=True,
timeout=timeout * 1000)
else:
args = args or ()
if not is_js_func(script):
script = f'function(){{{script}}}'
res = page.run_cdp('Runtime.callFunctionOn',
functionDeclaration=script,
objectId=obj_id,
arguments=[_convert_argument(arg) for arg in args],
returnByValue=False,
awaitPromise=True,
userGesture=True)
res = page.driver.Runtime.callFunctionOn(functionDeclaration=script,
objectId=obj_id,
arguments=[_convert_argument(arg) for arg in args],
returnByValue=False,
awaitPromise=True,
userGesture=True)
if 'Cannot find context with specified id' in res.get('error', ''):
txt = '页面已被刷新,请尝试等待页面加载完成再执行操作。' if is_page else '元素已不在页面内。'
raise RuntimeError(txt)
exceptionDetails = res.get('exceptionDetails')
if exceptionDetails:
@ -1338,7 +1345,8 @@ def _parse_js_result(page, ele, result):
return make_chromium_ele(page, obj_id=result['objectId'])
elif sub_type == 'array':
r = page.run_cdp('Runtime.getProperties', objectId=result['result']['objectId'], ownProperties=True)['result']
r = page.run_cdp('Runtime.getProperties', objectId=result['result']['objectId'],
ownProperties=True)['result']
return [_parse_js_result(page, ele, result=i['value']) for i in r]
else:

View File

@ -27,6 +27,7 @@ class ChromiumElement(DrissionElement):
self._doc_id: str = ...
self._scroll: ChromiumScroll = ...
self._select: ChromiumSelect = ...
self._wait: ChromiumWaiter = ...
def __repr__(self) -> str: ...

View File

@ -53,8 +53,11 @@ class ChromiumFrame(ChromiumBase):
:param tab_id: 要跳转到的标签页id
:return: None
"""
self._control_session.get(f'http://{self.address}/json')
super()._driver_init(tab_id)
try:
super()._driver_init(tab_id)
except:
self._control_session.get(f'http://{self.address}/json')
super()._driver_init(tab_id)
def _reload(self):
"""重新获取document"""

View File

@ -138,10 +138,14 @@ class ChromiumPage(ChromiumBase):
@property
def process_id(self):
"""返回浏览器进程id"""
try:
return self.driver.SystemInfo.getProcessInfo()['id']
except Exception:
return None
if self.process:
return self.process.pid
r = self.browser_driver.SystemInfo.getProcessInfo()['processInfo']
for i in r:
if i['type'] == 'browser':
return i['id']
return None
@property
def set_window(self):
@ -483,8 +487,8 @@ class WindowSetter(object):
"""用于设置窗口大小的类"""
def __init__(self, page):
self.driver = page.driver
self.window_id = self._get_info()['windowId']
self._page = page
self._window_id = self._get_info()['windowId']
def maximized(self):
"""窗口最大化"""
@ -544,14 +548,14 @@ class WindowSetter(object):
def _get_info(self):
"""获取窗口位置及大小信息"""
return self.driver.Browser.getWindowForTarget()
return self._page.run_cdp('Browser.getWindowForTarget')
def _perform(self, bounds):
"""执行改变窗口大小操作
:param bounds: 控制数据
:return: None
"""
self.driver.Browser.setWindowBounds(windowId=self.window_id, bounds=bounds)
self._page.run_cdp('Browser.setWindowBounds', windowId=self._window_id, bounds=bounds)
def show_or_hide_browser(page, hide=True):

View File

@ -151,8 +151,8 @@ class Alert(object):
class WindowSetter(object):
def __init__(self, page: ChromiumPage):
self.driver: ChromiumDriver = ...
self.window_id: str = ...
self._page: ChromiumPage = ...
self._window_id: str = ...
def maximized(self) -> None: ...

View File

@ -149,3 +149,7 @@ def unzip(zip_path, to_path):
with ZipFile(zip_path, 'r') as f:
return [f.extract(f.namelist()[0], path=to_path)]
class AlertExistsError(Exception):
pass

View File

@ -29,3 +29,6 @@ def clean_folder(folder_path: Union[str, Path], ignore: list = None) -> None: ..
def unzip(zip_path: str, to_path: str) -> Union[list, None]: ...
class AlertExistsError(Exception): ...