解决离开页面触发的弹出框问题

This commit is contained in:
g1879 2023-02-13 20:00:21 +08:00
parent f9068cfbb1
commit b3fc6b35e3
6 changed files with 52 additions and 36 deletions

View File

@ -10,13 +10,13 @@ from warnings import warn
from requests import Session from requests import Session
from .functions.tools import AlertExistsError
from .functions.tools import get_usable_path
from .base import BasePage from .base import BasePage
from .chromium_driver import ChromiumDriver from .chromium_driver import ChromiumDriver
from .chromium_element import ChromiumWaiter, ChromiumScroll, ChromiumElement, run_js, make_chromium_ele, \ from .chromium_element import ChromiumWaiter, ChromiumScroll, ChromiumElement, run_js, make_chromium_ele, \
ChromiumElementWaiter ChromiumElementWaiter
from .functions.errors import ContextLossError, ElementLossError, AlertExistsError
from .functions.locator import get_loc from .functions.locator import get_loc
from .functions.tools import get_usable_path
from .functions.web import offset_scroll, cookies_to_tuple from .functions.web import offset_scroll, cookies_to_tuple
from .session_element import make_session_ele from .session_element import make_session_ele
@ -139,6 +139,8 @@ class ChromiumBase(BasePage):
end_time = perf_counter() + timeout end_time = perf_counter() + timeout
while perf_counter() < end_time: while perf_counter() < end_time:
state = self.ready_state state = self.ready_state
if state is None: # 存在alert的情况
return None
if self._debug_recorder: if self._debug_recorder:
self._debug_recorder.add_data((perf_counter(), 'waiting', state)) self._debug_recorder.add_data((perf_counter(), 'waiting', state))
@ -280,7 +282,10 @@ class ChromiumBase(BasePage):
@property @property
def ready_state(self): def ready_state(self):
"""返回当前页面加载状态,'loading' 'interactive' 'complete'""" """返回当前页面加载状态,'loading' 'interactive' 'complete'"""
return self.run_cdp('Runtime.evaluate', expression='document.readyState;')['result']['value'] try:
return self.run_cdp('Runtime.evaluate', expression='document.readyState;')['result']['value']
except AlertExistsError:
return None
@property @property
def size(self): def size(self):
@ -350,18 +355,21 @@ class ChromiumBase(BasePage):
:param cmd_args: 参数 :param cmd_args: 参数
:return: 执行的结果 :return: 执行的结果
""" """
if self.driver.has_alert and cmd != 'Page.handleJavaScriptDialog':
raise AlertExistsError('存在未处理的提示框。')
r = self.driver.call_method(cmd, **cmd_args) r = self.driver.call_method(cmd, **cmd_args)
if 'error' not in r: if 'error' not in r:
return r return r
if 'Cannot find context with specified id' in r['error']: if 'Cannot find context with specified id' in r['error']:
raise RuntimeError('页面被刷新,请操作前尝试等待页面刷新或加载完成。') raise ContextLossError('页面被刷新,请操作前尝试等待页面刷新或加载完成。')
elif 'Could not find node with given id' in r['error']: elif 'Could not find node with given id' in r['error']:
raise RuntimeError('该元素已不在当前页面中。') raise ElementLossError('该元素已不在当前页面中。')
elif 'tab closed' in r['error']: elif 'tab closed' in r['error']:
raise RuntimeError('标签页已关闭。') raise RuntimeError('标签页已关闭。')
elif 'alert exists' in r['error']: elif 'alert exists' in r['error']:
raise AlertExistsError('存在未处理的提示框。') pass
else: else:
raise RuntimeError(r) raise RuntimeError(r)
@ -723,7 +731,10 @@ class ChromiumBase(BasePage):
err = None err = None
result = self.run_cdp('Page.navigate', url=to_url) result = self.run_cdp('Page.navigate', url=to_url)
is_timeout = not self._wait_loaded(timeout) is_timeout = self._wait_loaded(timeout)
if is_timeout is None:
return None
is_timeout = not is_timeout
self.wait.load_complete() self.wait.load_complete()
if is_timeout: if is_timeout:

View File

@ -99,8 +99,8 @@ class ChromiumDriver(object):
except Empty: except Empty:
if self.has_alert: if self.has_alert:
return {'error': {'message': 'alert exists'}, return {'error': {'message': 'alert exists'}, 'type': 'alert_exists'}
'type': 'alert_exists'}
if isinstance(timeout, (int, float)) and timeout <= 0: if isinstance(timeout, (int, float)) and timeout <= 0:
raise TimeoutError(f"调用{message['method']}超时。") raise TimeoutError(f"调用{message['method']}超时。")

View File

@ -10,6 +10,7 @@ from time import perf_counter, sleep
from warnings import warn from warnings import warn
from .base import DrissionElement, BaseElement from .base import DrissionElement, BaseElement
from .functions.errors import ContextLossError, ElementLossError
from .functions.locator import get_loc from .functions.locator import get_loc
from .functions.web import make_absolute_link, get_ele_txt, format_html, is_js_func, location_in_viewport, offset_scroll from .functions.web import make_absolute_link, get_ele_txt, format_html, is_js_func, location_in_viewport, offset_scroll
from .keys import _keys_to_typing, _keyDescriptionForString, _keyDefinitions from .keys import _keys_to_typing, _keyDescriptionForString, _keyDefinitions
@ -47,7 +48,7 @@ class ChromiumElement(DrissionElement):
self._node_id = self._get_node_id(obj_id=self._obj_id) self._node_id = self._get_node_id(obj_id=self._obj_id)
self._backend_id = backend_id self._backend_id = backend_id
else: else:
raise RuntimeError('元素可能已失效') raise ElementLossError('原来获取到的元素对象已不在页面内')
doc = self.run_js('return this.ownerDocument;') doc = self.run_js('return this.ownerDocument;')
self._doc_id = doc['objectId'] if doc else None self._doc_id = doc['objectId'] if doc else None
@ -1291,27 +1292,27 @@ def run_js(page_or_ele, script, as_expr=False, timeout=None, args=None):
obj_id = page_or_ele._root_id obj_id = page_or_ele._root_id
is_page = True is_page = True
if as_expr: try:
res = page.driver.Runtime.evaluate(expression=script, if as_expr:
returnByValue=False, res = page.run_cdp('Runtime.evaluate', expression=script, returnByValue=False,
awaitPromise=True, awaitPromise=True, userGesture=True, timeout=timeout * 1000)
userGesture=True,
timeout=timeout * 1000)
else: else:
args = args or () args = args or ()
if not is_js_func(script): if not is_js_func(script):
script = f'function(){{{script}}}' script = f'function(){{{script}}}'
res = page.driver.Runtime.callFunctionOn(functionDeclaration=script, res = page.run_cdp('Runtime.callFunctionOn', functionDeclaration=script, objectId=obj_id,
objectId=obj_id, arguments=[_convert_argument(arg) for arg in args], returnByValue=False,
arguments=[_convert_argument(arg) for arg in args], awaitPromise=True, userGesture=True)
returnByValue=False,
awaitPromise=True,
userGesture=True)
if 'Cannot find context with specified id' in res.get('error', ''): except ContextLossError:
txt = '页面已被刷新,请尝试等待页面加载完成再执行操作。' if is_page else '元素已不在页面内。' if is_page:
raise RuntimeError(txt) raise ContextLossError('页面已被刷新,请尝试等待页面加载完成再执行操作。')
else:
raise ElementLossError('原来获取到的元素对象已不在页面内。')
if res is None and page.driver.has_alert: # 存在alert的情况
return None
exceptionDetails = res.get('exceptionDetails') exceptionDetails = res.get('exceptionDetails')
if exceptionDetails: if exceptionDetails:

View File

@ -0,0 +1,11 @@
# -*- coding:utf-8 -*-
class AlertExistsError(Exception):
pass
class ContextLossError(Exception):
pass
class ElementLossError(Exception):
pass

View File

@ -149,7 +149,3 @@ def unzip(zip_path, to_path):
with ZipFile(zip_path, 'r') as f: with ZipFile(zip_path, 'r') as f:
return [f.extract(f.namelist()[0], path=to_path)] return [f.extract(f.namelist()[0], path=to_path)]
class AlertExistsError(Exception):
pass

View File

@ -29,6 +29,3 @@ def clean_folder(folder_path: Union[str, Path], ignore: list = None) -> None: ..
def unzip(zip_path: str, to_path: str) -> Union[list, None]: ... def unzip(zip_path: str, to_path: str) -> Union[list, None]: ...
class AlertExistsError(Exception): ...