增加wait.alert_closed()、wait.has_rect();run_js()无视其中产生的弹窗

This commit is contained in:
g1879 2023-12-22 17:49:49 +08:00
parent 98b992ac90
commit da5a8a9e42
7 changed files with 63 additions and 27 deletions

View File

@ -5,6 +5,7 @@
""" """
from time import sleep, perf_counter from time import sleep, perf_counter
from errors import PageClosedError
from .driver import BrowserDriver, Driver from .driver import BrowserDriver, Driver
from .._functions.tools import stop_process_on_port, raise_error from .._functions.tools import stop_process_on_port, raise_error
from .._units.downloader import DownloadManager from .._units.downloader import DownloadManager
@ -65,9 +66,10 @@ class Browser(object):
def _onTargetCreated(self, **kwargs): def _onTargetCreated(self, **kwargs):
"""标签页创建时执行""" """标签页创建时执行"""
if kwargs['targetInfo']['type'] == 'page' and not kwargs['targetInfo']['url'].startswith('devtools://'): if (kwargs['targetInfo']['type'] in ('page', 'webview')
self._drivers[kwargs['targetInfo']['targetId']] = Driver(kwargs['targetInfo']['targetId'], 'page', and not kwargs['targetInfo']['url'].startswith('devtools://')):
self.address) self._drivers[kwargs['targetInfo']['targetId']] = Driver(kwargs['targetInfo']['targetId'],
'page', self.address)
def _onTargetDestroyed(self, **kwargs): def _onTargetDestroyed(self, **kwargs):
"""标签页关闭时执行""" """标签页关闭时执行"""
@ -101,13 +103,13 @@ class Browser(object):
def tabs_count(self): def tabs_count(self):
"""返回标签页数量""" """返回标签页数量"""
j = self.run_cdp('Target.getTargets')['targetInfos'] # 不要改用get避免卡死 j = self.run_cdp('Target.getTargets')['targetInfos'] # 不要改用get避免卡死
return len([i for i in j if i['type'] == 'page' and not i['url'].startswith('devtools://')]) return len([i for i in j if i['type'] in ('page', 'webview') and not i['url'].startswith('devtools://')])
@property @property
def tabs(self): def tabs(self):
"""返回所有标签页id组成的列表""" """返回所有标签页id组成的列表"""
j = self._driver.get(f'http://{self.address}/json').json() # 不要改用cdp因为顺序不对 j = self._driver.get(f'http://{self.address}/json').json() # 不要改用cdp因为顺序不对
return [i['id'] for i in j if i['type'] == 'page' and not i['url'].startswith('devtools://')] return [i['id'] for i in j if i['type'] in ('page', 'webview') and not i['url'].startswith('devtools://')]
@property @property
def process_id(self): def process_id(self):
@ -140,7 +142,7 @@ class Browser(object):
:param tab_id: 标签页id :param tab_id: 标签页id
:return: None :return: None
""" """
self.run_cdp('Target.closeTarget', targetId=tab_id) self.run_cdp('Target.closeTarget', targetId=tab_id, _ignore=PageClosedError)
def activate_tab(self, tab_id): def activate_tab(self, tab_id):
"""使标签页变为活动状态 """使标签页变为活动状态
@ -162,8 +164,12 @@ class Browser(object):
:param force: 是否立刻强制终止进程 :param force: 是否立刻强制终止进程
:return: None :return: None
""" """
self.run_cdp('Browser.close') try:
self.driver.stop() self.run_cdp('Browser.close')
self.driver.stop()
except PageClosedError:
self.driver.stop()
return
if force: if force:
ip, port = self.address.split(':') ip, port = self.address.split(':')

View File

@ -83,8 +83,8 @@ class Driver(object):
return result return result
except Empty: except Empty:
if self.alert_flag and message['method'].startswith('Input.'): if self.alert_flag and message['method'].startswith(('Input.', 'Runtime.')):
return {'result': {'message': 'alert exists.'}} return {'error': {'message': 'alert exists.'}}
if timeout is not None and perf_counter() > end_time: if timeout is not None and perf_counter() > end_time:
self.method_results.pop(ws_id, None) self.method_results.pop(ws_id, None)
@ -156,7 +156,7 @@ class Driver(object):
if self._stopped.is_set(): if self._stopped.is_set():
return {'error': 'tab closed', 'type': 'tab_closed'} return {'error': 'tab closed', 'type': 'tab_closed'}
timeout = kwargs.pop('_timeout', 10) timeout = kwargs.pop('_timeout', 15)
result = self._send({'method': _method, 'params': kwargs}, timeout=timeout) result = self._send({'method': _method, 'params': kwargs}, timeout=timeout)
if result is None: if result is None:
return {'error': 'tab closed', 'type': 'tab_closed'} return {'error': 'tab closed', 'type': 'tab_closed'}

View File

@ -603,6 +603,7 @@ class ChromiumElement(DrissionElement):
if clear and vals not in ('\n', '\ue007'): if clear and vals not in ('\n', '\ue007'):
self.clear(by_js=False) self.clear(by_js=False)
else: else:
self.wait.has_rect()
self._input_focus() self._input_focus()
input_text_or_keys(self.page, vals) input_text_or_keys(self.page, vals)
@ -614,10 +615,10 @@ class ChromiumElement(DrissionElement):
""" """
if by_js: if by_js:
self.run_js("this.value='';") self.run_js("this.value='';")
return
else: self._input_focus()
self._input_focus() self.input(('\ue009', 'a', '\ue017'), clear=False)
self.input(('\ue009', 'a', '\ue017'), clear=False)
def _input_focus(self): def _input_focus(self):
"""输入前使元素获取焦点""" """输入前使元素获取焦点"""
@ -1248,8 +1249,10 @@ def make_chromium_eles(page, node_ids=None, obj_ids=None, single=True, ele_only=
if ele_only: if ele_only:
continue continue
else: else:
# todo: Node() if single:
pass return node['node']['nodeValue']
else:
nodes.append(node['node']['nodeValue'])
obj_id = page.run_cdp('DOM.resolveNode', nodeId=node_id)['object']['objectId'] obj_id = page.run_cdp('DOM.resolveNode', nodeId=node_id)['object']['objectId']
ele = ChromiumElement(page, obj_id=obj_id, node_id=node_id, backend_id=node['node']['backendNodeId']) ele = ChromiumElement(page, obj_id=obj_id, node_id=node_id, backend_id=node['node']['backendNodeId'])
@ -1269,8 +1272,10 @@ def make_chromium_eles(page, node_ids=None, obj_ids=None, single=True, ele_only=
if ele_only: if ele_only:
continue continue
else: else:
# todo: Node if single:
pass return node['node']['nodeValue']
else:
nodes.append(node['node']['nodeValue'])
ele = ChromiumElement(page, obj_id=obj_id, node_id=node['node']['nodeId'], ele = ChromiumElement(page, obj_id=obj_id, node_id=node['node']['nodeId'],
backend_id=node['node']['backendNodeId']) backend_id=node['node']['backendNodeId'])
@ -1356,7 +1361,7 @@ def run_js(page_or_ele, script, as_expr=False, timeout=None, args=None):
try: try:
if as_expr: if as_expr:
res = page.run_cdp('Runtime.evaluate', expression=script, returnByValue=False, res = page.run_cdp('Runtime.evaluate', expression=script, returnByValue=False,
awaitPromise=True, userGesture=True, _timeout=timeout) awaitPromise=True, userGesture=True, _timeout=timeout, _ignore=AlertExistsError)
else: else:
args = args or () args = args or ()
@ -1364,7 +1369,7 @@ def run_js(page_or_ele, script, as_expr=False, timeout=None, args=None):
script = f'function(){{{script}}}' script = f'function(){{{script}}}'
res = page.run_cdp('Runtime.callFunctionOn', functionDeclaration=script, objectId=obj_id, res = page.run_cdp('Runtime.callFunctionOn', functionDeclaration=script, objectId=obj_id,
arguments=[convert_argument(arg) for arg in args], returnByValue=False, arguments=[convert_argument(arg) for arg in args], returnByValue=False,
awaitPromise=True, userGesture=True, _timeout=timeout) awaitPromise=True, userGesture=True, _timeout=timeout, _ignore=AlertExistsError)
except ContextLostError: except ContextLostError:
if is_page: if is_page:

View File

@ -203,7 +203,7 @@ def test_connect(ip, port, timeout=30):
tabs = requests_get(f'http://{ip}:{port}/json', timeout=10, headers={'Connection': 'close'}, tabs = requests_get(f'http://{ip}:{port}/json', timeout=10, headers={'Connection': 'close'},
proxies={'http': None, 'https': None}).json() proxies={'http': None, 'https': None}).json()
for tab in tabs: for tab in tabs:
if tab['type'] == 'page': if tab['type'] in ('page', 'webview'):
return return
except Exception: except Exception:
sleep(.2) sleep(.2)

View File

@ -89,7 +89,8 @@ class ChromiumBase(BasePage):
if not tab_id: if not tab_id:
tabs = self.browser.driver.get(f'http://{self.address}/json').json() tabs = self.browser.driver.get(f'http://{self.address}/json').json()
tabs = [(i['id'], i['url']) for i in tabs if i['type'] == 'page' and not i['url'].startswith('devtools://')] tabs = [(i['id'], i['url']) for i in tabs
if i['type'] in ('page', 'webview') and not i['url'].startswith('devtools://')]
dialog = None dialog = None
if len(tabs) > 1: if len(tabs) > 1:
for k, t in enumerate(tabs): for k, t in enumerate(tabs):
@ -588,7 +589,8 @@ class ChromiumBase(BasePage):
search_ids = [] search_ids = []
try: try:
search_result = self.run_cdp_loaded('DOM.performSearch', query=loc, includeUserAgentShadowDOM=True) search_result = self.run_cdp_loaded('DOM.performSearch', query=loc, _timeout=timeout,
includeUserAgentShadowDOM=True)
count = search_result['resultCount'] count = search_result['resultCount']
search_ids.append(search_result['searchId']) search_ids.append(search_result['searchId'])
except ContextLostError: except ContextLostError:
@ -615,7 +617,11 @@ class ChromiumBase(BasePage):
ok = False ok = False
try: try:
search_result = self.run_cdp_loaded('DOM.performSearch', query=loc, includeUserAgentShadowDOM=True) timeout = end_time - perf_counter()
if timeout <= 0:
timeout = .5
search_result = self.run_cdp_loaded('DOM.performSearch', query=loc, _timeout=timeout,
includeUserAgentShadowDOM=True)
count = search_result['resultCount'] count = search_result['resultCount']
search_ids.append(search_result['searchId']) search_ids.append(search_result['searchId'])
except ContextLostError: except ContextLostError:

View File

@ -238,6 +238,13 @@ class TabWaiter(BaseWaiter):
else: else:
return True return True
def alert_closed(self):
"""等待弹出框关闭"""
while not self._driver.states.has_alert:
sleep(.2)
while self._driver.states.has_alert:
sleep(.2)
class PageWaiter(TabWaiter): class PageWaiter(TabWaiter):
def __init__(self, page): def __init__(self, page):
@ -415,6 +422,14 @@ class ElementWaiter(object):
else: else:
return False return False
def has_rect(self, timeout=None, raise_err=None):
"""等待当前元素有大小及位置属性
:param timeout: 超时时间为None使用元素所在页面timeout属性
:param raise_err: 等待失败时是否报错为None时根据Settings设置
:return: 是否等待成功
"""
return self._wait_state('has_rect', True, timeout, raise_err, err_text='等待元素拥有大小及位置属性失败。')
def _wait_state(self, attr, mode=False, timeout=None, raise_err=None, err_text=None): def _wait_state(self, attr, mode=False, timeout=None, raise_err=None, err_text=None):
"""等待元素某个元素状态到达指定状态 """等待元素某个元素状态到达指定状态
:param attr: 状态名称 :param attr: 状态名称

View File

@ -24,9 +24,9 @@ class BaseWaiter(object):
raise_err: bool = None) -> bool: ... raise_err: bool = None) -> bool: ...
def ele_displayed(self, def ele_displayed(self,
loc_or_ele: Union[str, tuple, ChromiumElement], loc_or_ele: Union[str, tuple, ChromiumElement],
timeout: float = None, timeout: float = None,
raise_err: bool = None) -> bool: ... raise_err: bool = None) -> bool: ...
def ele_hidden(self, loc_or_ele: Union[str, tuple, ChromiumElement], timeout: float = None, def ele_hidden(self, loc_or_ele: Union[str, tuple, ChromiumElement], timeout: float = None,
raise_err: bool = None) -> bool: ... raise_err: bool = None) -> bool: ...
@ -60,6 +60,8 @@ class TabWaiter(BaseWaiter):
def downloads_done(self, timeout: float = None, cancel_if_timeout: bool = True) -> bool: ... def downloads_done(self, timeout: float = None, cancel_if_timeout: bool = True) -> bool: ...
def alert_closed(self) -> None: ...
class PageWaiter(TabWaiter): class PageWaiter(TabWaiter):
_driver: ChromiumPage = ... _driver: ChromiumPage = ...
@ -90,6 +92,8 @@ class ElementWaiter(object):
def disabled(self, timeout: float = None, raise_err: bool = None) -> bool: ... def disabled(self, timeout: float = None, raise_err: bool = None) -> bool: ...
def has_rect(self, timeout: float = None, raise_err: bool = None) -> bool: ...
def disabled_or_deleted(self, timeout: float = None, raise_err: bool = None) -> bool: ... def disabled_or_deleted(self, timeout: float = None, raise_err: bool = None) -> bool: ...
def stop_moving(self, gap: float = .1, timeout: float = None, raise_err: bool = None) -> bool: ... def stop_moving(self, gap: float = .1, timeout: float = None, raise_err: bool = None) -> bool: ...