增加wait.data_packet()方法;提高页面对象查找元素稳定性;删除wait.new_frame()方法

This commit is contained in:
g1879 2023-03-29 00:21:32 +08:00
parent 7eb1aac778
commit a798317ae9
4 changed files with 90 additions and 87 deletions

View File

@ -5,13 +5,12 @@
"""
from abc import abstractmethod
from re import sub
from time import sleep
from urllib.parse import quote
from .commons.constants import Settings, NoneElement
from .commons.locator import get_loc
from .commons.web import format_html
from .errors import ElementNotFoundError, ContextLossError
from .errors import ElementNotFoundError
class BaseParser(object):
@ -72,13 +71,7 @@ class BaseElement(BaseParser):
pass
def _ele(self, loc_or_str, timeout=None, single=True, relative=False, raise_err=None):
while True:
try:
r = self._find_elements(loc_or_str, timeout=timeout, single=single,
relative=relative, raise_err=raise_err)
break
except ContextLossError:
sleep(.1)
r = self._find_elements(loc_or_str, timeout=timeout, single=single, relative=relative, raise_err=raise_err)
if not single or raise_err is False:
return r
if not r and (Settings.raise_ele_not_found or raise_err is True):
@ -420,12 +413,7 @@ class BasePage(BaseParser):
if not loc_or_ele:
raise ElementNotFoundError
while True:
try:
r = self._find_elements(loc_or_ele, timeout=timeout, single=single, raise_err=raise_err)
break
except ContextLossError:
sleep(.1)
r = self._find_elements(loc_or_ele, timeout=timeout, single=single, raise_err=raise_err)
if not single or raise_err is False:
return r

View File

@ -11,6 +11,7 @@ from threading import Thread
from time import perf_counter, sleep, time
from warnings import warn
from FlowViewer.listener import ResponseData
from requests import Session
from .base import BasePage
@ -507,8 +508,12 @@ class ChromiumBase(BasePage):
timeout = timeout if timeout is not None else self.timeout
end_time = perf_counter() + timeout
search_result = self.run_cdp_loaded('DOM.performSearch', query=loc, includeUserAgentShadowDOM=True)
count = search_result['resultCount']
try:
search_result = self.run_cdp_loaded('DOM.performSearch', query=loc, includeUserAgentShadowDOM=True)
count = search_result['resultCount']
except ContextLossError:
search_result = None
count = 0
while True:
if count > 0:
@ -520,7 +525,7 @@ class ChromiumBase(BasePage):
ok = True
except Exception:
sleep(.01)
pass
if ok:
try:
@ -532,12 +537,17 @@ class ChromiumBase(BasePage):
except ElementLossError:
ok = False
search_result = self.run_cdp_loaded('DOM.performSearch', query=loc, includeUserAgentShadowDOM=True)
count = search_result['resultCount']
try:
search_result = self.run_cdp_loaded('DOM.performSearch', query=loc, includeUserAgentShadowDOM=True)
count = search_result['resultCount']
except ContextLossError:
pass
if perf_counter() >= end_time:
return NoneElement() if single else []
sleep(.1)
def refresh(self, ignore_cache=False):
"""刷新当前页面
:param ignore_cache: 是否忽略缓存
@ -1082,52 +1092,51 @@ class ChromiumBaseWaiter(object):
sleep(gap)
return False
# def data_package(self, target, timeout=None):
# """
# :param target:
# :param timeout:
# :return:
# """
# self._target = target
# self._request_id = None
# timeout = timeout if timeout is not None else self._driver.timeout
# end_time = perf_counter() + timeout
# while perf_counter() < end_time:
# pass
#
# def _response_received(self, **kwargs):
# """接收到返回信息时处理方法"""
# if self._target in kwargs['response']['url']:
#
#
# if self.targets is True:
# self._request_ids[kwargs['requestId']] = {'target': True, 'response': kwargs['response']}
#
# else:
# for target in self.targets:
# if target in kwargs['response']['url']:
# self._request_ids[kwargs['requestId']] = {'target': target, 'response': kwargs['response']}
#
# def _loading_finished(self, **kwargs):
# """请求完成时处理方法"""
# if not self._is_continue():
# return
#
# request_id = kwargs['requestId']
# target = self._request_ids.pop(request_id, None)
# if target is None:
# return
#
# target, response = target.values()
# response = ResponseData(request_id, response, self._get_response_body(request_id), self.tab_id, target)
# response.postData = self._get_post_data(request_id)
#
# self._caught_count += 1
# self._tmp.put(response)
# self.results.append(response)
#
# if not self._is_continue():
# self.stop()
def set_target(self, target):
"""指定要等待的数据包
:param target:
:return: None
"""
self._target = target
self._driver.run_cdp('Network.enable')
self._driver.driver.Network.responseReceived = self._response_received
self._driver.driver.Network.loadingFinished = self._loading_finished
def data_packet(self, target=None, timeout=None):
"""等待指定数据包加载完成
:param target: 指定的数据包url片段
:param timeout: 超时时间为None则使用页面对象timeout
:return: ResponseData对象
"""
if target:
self._target = target
self._request_id = None
self._response_data = None
timeout = timeout if timeout is not None else self._driver.timeout
end_time = perf_counter() + timeout
while not self._response_data and perf_counter() < end_time:
sleep(.1)
self._request_id = None
return self._response_data or False
def _response_received(self, **kwargs):
"""接收到返回信息时处理方法"""
if self._target in kwargs['response']['url']:
self._request_id = kwargs['requestId']
self._response = kwargs['response']
def _loading_finished(self, **kwargs):
"""请求完成时处理方法"""
if kwargs['requestId'] == self._request_id:
try:
body = self._driver.run_cdp('Network.getResponseBody', requestId=self._request_id)['body']
except:
body = ''
self._response_data = ResponseData(self._request_id, self._response, body,
self._driver.tab_id, self._target)
class ChromiumPageScroll(ChromiumScroll):

View File

@ -217,6 +217,10 @@ class ChromiumBase(BasePage):
class ChromiumBaseWaiter(object):
def __init__(self, page: ChromiumBase):
self._driver: ChromiumBase = ...
self._target: str = ...
self._request_id: str = ...
self._response: dict = ...
self._response_data: ResponseData = ...
def ele_delete(self, loc_or_ele: Union[str, tuple, ChromiumElement], timeout: float = None) -> bool: ...
@ -230,7 +234,13 @@ class ChromiumBaseWaiter(object):
def load_complete(self, timeout: float = None) -> bool: ...
def data_package(self, target: str, timeout: float = None) -> Union[ResponseData, None]: ...
def set_target(self, target: str) -> None: ...
def data_packet(self, target: str = None, timeout: float = None) -> Union[ResponseData, None]: ...
def _response_received(self, **kwargs): ...
def _loading_finished(self, **kwargs): ...
def upload_paths_inputted(self) -> None: ...

View File

@ -1252,19 +1252,16 @@ def find_by_xpath(ele, xpath, single, timeout, relative=True):
type_txt = '9' if single else '7'
node_txt = 'this.contentDocument' if ele.tag in FRAME_ELEMENT and not relative else 'this'
js = make_js_for_find_ele_by_xpath(xpath, type_txt, node_txt)
r = ele.page.run_cdp_loaded('Runtime.callFunctionOn',
functionDeclaration=js, objectId=ele.ids.obj_id, returnByValue=False, awaitPromise=True,
userGesture=True)
r = ele.page.run_cdp_loaded('Runtime.callFunctionOn', functionDeclaration=js, objectId=ele.ids.obj_id,
returnByValue=False, awaitPromise=True, userGesture=True)
if r['result']['type'] == 'string':
return r['result']['value']
if 'exceptionDetails' in r:
if 'The result is not a node set' in r['result']['description']:
js = make_js_for_find_ele_by_xpath(xpath, '1', node_txt)
r = ele.page.run_cdp_loaded('Runtime.callFunctionOn',
functionDeclaration=js, objectId=ele.ids.obj_id, returnByValue=False,
awaitPromise=True,
userGesture=True)
r = ele.page.run_cdp_loaded('Runtime.callFunctionOn', functionDeclaration=js, objectId=ele.ids.obj_id,
returnByValue=False, awaitPromise=True, userGesture=True)
return r['result']['value']
else:
raise SyntaxError(f'查询语句错误:\n{r}')
@ -1272,9 +1269,8 @@ def find_by_xpath(ele, xpath, single, timeout, relative=True):
end_time = perf_counter() + timeout
while (r['result']['subtype'] == 'null'
or r['result']['description'] == 'NodeList(0)') and perf_counter() < end_time:
r = ele.page.run_cdp_loaded('Runtime.callFunctionOn',
functionDeclaration=js, objectId=ele.ids.obj_id, returnByValue=False, awaitPromise=True,
userGesture=True)
r = ele.page.run_cdp_loaded('Runtime.callFunctionOn', functionDeclaration=js, objectId=ele.ids.obj_id,
returnByValue=False, awaitPromise=True, userGesture=True)
if single:
return NoneElement() if r['result']['subtype'] == 'null' \
@ -1283,7 +1279,8 @@ def find_by_xpath(ele, xpath, single, timeout, relative=True):
if r['result']['description'] == 'NodeList(0)':
return []
else:
r = ele.page.run_cdp_loaded('Runtime.getProperties', objectId=r['result']['objectId'], ownProperties=True)['result']
r = ele.page.run_cdp_loaded('Runtime.getProperties', objectId=r['result']['objectId'],
ownProperties=True)['result']
return [make_chromium_ele(ele.page, obj_id=i['value']['objectId'])
if i['value']['type'] == 'object' else i['value']['value']
for i in r[:-1]]
@ -1301,16 +1298,14 @@ def find_by_css(ele, selector, single, timeout):
find_all = '' if single else 'All'
node_txt = 'this.contentDocument' if ele.tag in ('iframe', 'frame', 'shadow-root') else 'this'
js = f'function(){{return {node_txt}.querySelector{find_all}("{selector}");}}'
r = ele.page.run_cdp_loaded('Runtime.callFunctionOn',
functionDeclaration=js, objectId=ele.ids.obj_id, returnByValue=False, awaitPromise=True,
userGesture=True)
r = ele.page.run_cdp_loaded('Runtime.callFunctionOn', functionDeclaration=js, objectId=ele.ids.obj_id,
returnByValue=False, awaitPromise=True, userGesture=True)
end_time = perf_counter() + timeout
while ('exceptionDetails' in r or r['result']['subtype'] == 'null'
or r['result']['description'] == 'NodeList(0)') and perf_counter() < end_time:
r = ele.page.run_cdp_loaded('Runtime.callFunctionOn',
functionDeclaration=js, objectId=ele.ids.obj_id, returnByValue=False, awaitPromise=True,
userGesture=True)
while ('exceptionDetails' in r or r['result']['subtype'] == 'null' or
r['result']['description'] == 'NodeList(0)') and perf_counter() < end_time:
r = ele.page.run_cdp_loaded('Runtime.callFunctionOn', functionDeclaration=js, objectId=ele.ids.obj_id,
returnByValue=False, awaitPromise=True, userGesture=True)
if 'exceptionDetails' in r:
raise SyntaxError(f'查询语句错误:\n{r}')
@ -1322,7 +1317,8 @@ def find_by_css(ele, selector, single, timeout):
if r['result']['description'] == 'NodeList(0)':
return []
else:
r = ele.page.run_cdp_loaded('Runtime.getProperties', objectId=r['result']['objectId'], ownProperties=True)['result']
r = ele.page.run_cdp_loaded('Runtime.getProperties', objectId=r['result']['objectId'],
ownProperties=True)['result']
return [make_chromium_ele(ele.page, obj_id=i['value']['objectId']) for i in r]