改进等待数据包功能,待测试

This commit is contained in:
g1879 2023-04-03 18:22:30 +08:00
parent fc1e39cd3c
commit 68b9fa15b1
6 changed files with 129 additions and 84 deletions

View File

@ -1036,10 +1036,7 @@ class ChromiumBaseWaiter(object):
:param page_or_ele: 页面对象或元素对象 :param page_or_ele: 页面对象或元素对象
""" """
self._driver = page_or_ele self._driver = page_or_ele
self._response = None self._listener = None
self._request_id = None
self._targets = None
self._is_regex = False
def ele_delete(self, loc_or_ele, timeout=None): def ele_delete(self, loc_or_ele, timeout=None):
"""等待元素从DOM中删除 """等待元素从DOM中删除
@ -1111,23 +1108,68 @@ class ChromiumBaseWaiter(object):
:param is_regex: 设置的target是否正则表达式 :param is_regex: 设置的target是否正则表达式
:return: None :return: None
""" """
if not isinstance(targets, (str, list, tuple, set)): if not self._listener:
raise TypeError('targets只能是str、list、tuple、set。') self._listener = NetworkListener(self._driver)
self._is_regex = is_regex self._listener.set_targets(targets, is_regex)
self._targets = targets if isinstance(targets, str) else set(targets)
self._driver.run_cdp('Network.enable') def data_packets(self, targets=None, timeout=None, any_one=False):
if targets is not None: """等待指定数据包加载完成
self._driver.driver.Network.responseReceived = self._response_received :param targets: 要匹配的数据包url特征可用list等传入多个
self._driver.driver.Network.loadingFinished = self._loading_finished :param timeout: 超时时间为None则使用页面对象timeout
else: :param any_one: 多个target时是否全部监听到才结束为True时监听到一个目标就结束
self.stop_listening() :return: ResponseData对象或监听结果字典
"""
if not self._listener:
self._listener = NetworkListener(self._driver)
return self._listener.listen(targets, timeout, any_one)
def stop_listening(self): def stop_listening(self):
"""停止监听数据包""" """停止监听数据包"""
self._driver.driver.Network.responseReceived = None if not self._listener:
self._driver.driver.Network.loadingFinished = None self._listener = NetworkListener(self._driver)
self._listener.stop()
def data_packets(self, targets=None, timeout=None, any_one=False):
class NetworkListener(object):
def __init__(self, page):
self._page = page
self._targets = None
self._is_regex = False
self._results = {}
self._single = False
self._requests = {}
def set_targets(self, targets, is_regex=False):
"""指定要等待的数据包
:param targets: 要匹配的数据包url特征可用list等传入多个
:param is_regex: 设置的target是否正则表达式
:return: None
"""
if not isinstance(targets, (str, list, tuple, set)):
raise TypeError('targets只能是str、list、tuple、set。')
self._is_regex = is_regex
if isinstance(targets, str):
self._targets = {targets}
self._single = True
else:
self._targets = set(targets)
self._single = False
self._page.run_cdp('Network.enable')
if targets is not None:
self._page.driver.Network.requestWillBeSent = self._requestWillBeSent
self._page.driver.Network.responseReceived = self._response_received
self._page.driver.Network.loadingFinished = self._loading_finished
else:
self.stop_listening()
def stop(self):
"""停止监听数据包"""
self._page.run_cdp('Network.disable')
self._page.driver.Network.requestWillBeSent = None
self._page.driver.Network.responseReceived = None
self._page.driver.Network.loadingFinished = None
def listen(self, targets=None, timeout=None, any_one=False):
"""等待指定数据包加载完成 """等待指定数据包加载完成
:param targets: 要匹配的数据包url特征可用list等传入多个 :param targets: 要匹配的数据包url特征可用list等传入多个
:param timeout: 超时时间为None则使用页面对象timeout :param timeout: 超时时间为None则使用页面对象timeout
@ -1138,70 +1180,54 @@ class ChromiumBaseWaiter(object):
targets = '' targets = ''
if targets is not None: if targets is not None:
self.set_targets(targets, is_regex=self._is_regex) self.set_targets(targets, is_regex=self._is_regex)
self._request_id = None self._results = {}
self._response_result = None
timeout = timeout if timeout is not None else self._driver.timeout timeout = timeout if timeout is not None else self._page.timeout
end_time = perf_counter() + timeout end_time = perf_counter() + timeout
if isinstance(self._targets, str): while perf_counter() < end_time:
while not self._response_result and perf_counter() < end_time: if self._results and (any_one or set(self._results) == self._targets):
sleep(.1) break
sleep(.1)
else: self._requests = {}
while perf_counter() < end_time: if not self._results:
if self._response_result and (any_one or set(self._response_result) == self._targets): return False
break return list(self._results.values())[0] if self._single else self._results
sleep(.1)
self._request_id = None
return self._response_result or False
def _response_received(self, **kwargs): def _response_received(self, **kwargs):
"""接收到返回信息时处理方法""" """接收到返回信息时处理方法"""
if isinstance(self._targets, str): if kwargs['requestId'] in self._requests:
if (self._is_regex and search(self._targets, kwargs['response']['url'])) or ( print(f"{kwargs['requestId']} _response_received")
not self._is_regex and self._targets in kwargs['response']['url']): self._requests[kwargs['requestId']]['response'] = kwargs['response']
self._request_id = kwargs['requestId']
self._response = kwargs['response']
else:
if not self._response:
self._response = {}
if not self._request_id:
self._request_id = {}
for target in self._targets:
if (self._is_regex and search(target, kwargs['response']['url'])) or (
not self._is_regex and target in kwargs['response']['url']):
self._response[target] = kwargs['response']
self._request_id[kwargs['requestId']] = target
def _loading_finished(self, **kwargs): def _loading_finished(self, **kwargs):
"""请求完成时处理方法""" """请求完成时处理方法"""
if isinstance(self._targets, str): request_id = kwargs['requestId']
if kwargs['requestId'] == self._request_id: if request_id in self._requests:
try: print(f'{request_id} _loading_finished')
body = self._driver.run_cdp('Network.getResponseBody', requestId=self._request_id)['body'] try:
except: body = self._page.run_cdp('Network.getResponseBody', requestId=request_id)['body']
body = '' except:
self._response_result = ResponseData(self._request_id, self._response, body, body = None
self._driver.tab_id, self._targets)
else: request = self._requests[request_id]
if self._request_id and kwargs['requestId'] in self._request_id: target = request['target']
if not self._response_result: rd = ResponseData(request_id, request['response'],
self._response_result = {} body, self._page.tab_id, target)
rd.postData = request['post_data']
rd._requestHeaders = request['request_headers']
self._results[target] = rd
try: def _requestWillBeSent(self, **kwargs):
body = self._driver.run_cdp('Network.getResponseBody', requestId=kwargs['requestId'])['body'] """接收到请求时的回调函数"""
except: for target in self._targets:
body = '' if (self._is_regex and search(target, kwargs['request']['url'])) or (
not self._is_regex and target in kwargs['request']['url']):
target = self._request_id[kwargs['requestId']] print(f"{kwargs['requestId']} _requestWillBeSent")
self._response_result[self._request_id[kwargs['requestId']]] = ResponseData(kwargs['requestId'], self._requests[kwargs['requestId']] = {'target': target,
self._response[target], 'post_data': kwargs['request'].get('postData', None),
body, self._driver.tab_id, 'request_headers': kwargs['request']['headers']}
target) break
class ChromiumPageScroll(ChromiumScroll): class ChromiumPageScroll(ChromiumScroll):

View File

@ -217,11 +217,7 @@ class ChromiumBase(BasePage):
class ChromiumBaseWaiter(object): class ChromiumBaseWaiter(object):
def __init__(self, page: ChromiumBase): def __init__(self, page: ChromiumBase):
self._driver: ChromiumBase = ... self._driver: ChromiumBase = ...
self._targets: Union[str, dict] = ... self._listener: NetworkListener = ...
self._request_id: Union[str, dict] = ...
self._response: dict = ...
self._response_result: Union[ResponseData, Dict[str, ResponseData], False] = ...
self._is_regex: bool = ...
def ele_delete(self, loc_or_ele: Union[str, tuple, ChromiumElement], timeout: float = None) -> bool: ... def ele_delete(self, loc_or_ele: Union[str, tuple, ChromiumElement], timeout: float = None) -> bool: ...
@ -237,18 +233,37 @@ class ChromiumBaseWaiter(object):
def set_targets(self, targets: Union[str, list, tuple, set], is_regex: bool = False) -> None: ... def set_targets(self, targets: Union[str, list, tuple, set], is_regex: bool = False) -> None: ...
def stop_listening(self) -> None: ... def stop(self) -> None: ...
def data_packets(self, targets: Union[str, list, tuple, set] = None, timeout: float = None, def data_packets(self, targets: Union[str, list, tuple, set] = None, timeout: float = None,
any_one: bool = False) -> Union[ResponseData, Dict[str, ResponseData], False]: ... any_one: bool = False) -> Union[ResponseData, Dict[str, ResponseData], False]: ...
def _response_received(self, **kwargs): ...
def _loading_finished(self, **kwargs): ...
def upload_paths_inputted(self) -> None: ... def upload_paths_inputted(self) -> None: ...
class NetworkListener(object):
def __init__(self, page):
self._page: ChromiumBase = ...
self._targets: Union[str, dict] = ...
self._single: bool = ...
self._results: Union[ResponseData, Dict[str, ResponseData], False] = ...
self._is_regex: bool = ...
self._requests: dict = ...
def set_targets(self, targets: Union[str, list, tuple, set], is_regex: bool = False) -> None: ...
def stop_listening(self) -> None: ...
def listen(self, targets: Union[str, list, tuple, set] = None, timeout: float = None,
any_one: bool = False) -> Union[ResponseData, Dict[str, ResponseData], False]: ...
def _response_received(self, **kwargs) -> None: ...
def _loading_finished(self, **kwargs) -> None: ...
def _requestWillBeSent(self, **kwargs) -> None: ...
class ChromiumPageScroll(ChromiumScroll): class ChromiumPageScroll(ChromiumScroll):
def __init__(self, page: ChromiumBase): ... def __init__(self, page: ChromiumBase): ...

View File

@ -149,6 +149,7 @@ class ChromiumDriver(object):
try: try:
self.event_handlers[event['method']](**event['params']) self.event_handlers[event['method']](**event['params'])
except Exception as e: except Exception as e:
raise
raise RuntimeError(f"\n回调函数错误:\n{e}") raise RuntimeError(f"\n回调函数错误:\n{e}")
self.event_queue.task_done() self.event_queue.task_done()

View File

@ -38,7 +38,10 @@ class ChromiumFrame(ChromiumBase):
end_time = perf_counter() + 2 end_time = perf_counter() + 2
while perf_counter() < end_time and self.url == 'about:blank': while perf_counter() < end_time and self.url == 'about:blank':
sleep(.1) sleep(.1)
Thread(target=self._check_alive).start()
t = Thread(target=self._check_alive)
t.daemon = True
t.start()
def __call__(self, loc_or_str, timeout=None): def __call__(self, loc_or_str, timeout=None):
"""在内部查找元素 """在内部查找元素

View File

@ -150,7 +150,7 @@ def test_connect(ip, port):
end_time = perf_counter() + 6 end_time = perf_counter() + 6
while perf_counter() < end_time: while perf_counter() < end_time:
try: try:
tabs = requests_get(f'http://{ip}:{port}/json', timeout=3).json() tabs = requests_get(f'http://{ip}:{port}/json', timeout=10).json()
for tab in tabs: for tab in tabs:
if tab['type'] == 'page': if tab['type'] == 'page':
return return

View File

@ -6,7 +6,7 @@ with open("README.md", "r", encoding='utf-8') as fh:
setup( setup(
name="DrissionPage", name="DrissionPage",
version="3.2.23", version="3.2.24",
author="g1879", author="g1879",
author_email="g1879@qq.com", author_email="g1879@qq.com",
description="Python based web automation tool. It can control the browser and send and receive data packets.", description="Python based web automation tool. It can control the browser and send and receive data packets.",
@ -23,7 +23,7 @@ setup(
'requests', 'requests',
'cssselect', 'cssselect',
'DownloadKit>=0.5.3', 'DownloadKit>=0.5.3',
'FlowViewer', 'FlowViewer>=0.2.7',
'websocket-client', 'websocket-client',
'click~=8.1.3', 'click~=8.1.3',
'tldextract' 'tldextract'