增加wait.stop_listening();wait.data_packets()可监听多个目标

This commit is contained in:
g1879 2023-03-29 18:44:41 +08:00
parent a798317ae9
commit 1aeda01d94
4 changed files with 87 additions and 34 deletions

View File

@ -1027,6 +1027,8 @@ class ChromiumBaseWaiter(object):
:param page_or_ele: 页面对象或元素对象 :param page_or_ele: 页面对象或元素对象
""" """
self._driver = page_or_ele self._driver = page_or_ele
self._response = None
self._request_id = None
def ele_delete(self, loc_or_ele, timeout=None): def ele_delete(self, loc_or_ele, timeout=None):
"""等待元素从DOM中删除 """等待元素从DOM中删除
@ -1092,51 +1094,99 @@ class ChromiumBaseWaiter(object):
sleep(gap) sleep(gap)
return False return False
def set_target(self, target): def set_targets(self, targets):
"""指定要等待的数据包 """指定要等待的数据包
:param target: :param targets: 指定的数据包url片段可传入多个
:return: None :return: None
""" """
self._target = target if not isinstance(targets, (str, list, tuple, set)):
raise TypeError('targets只能是str、list、tuple、set。')
self._targets = targets if isinstance(targets, str) else set(targets)
self._driver.run_cdp('Network.enable') self._driver.run_cdp('Network.enable')
self._driver.driver.Network.responseReceived = self._response_received if targets:
self._driver.driver.Network.loadingFinished = self._loading_finished self._driver.driver.Network.responseReceived = self._response_received
self._driver.driver.Network.loadingFinished = self._loading_finished
else:
self.stop_listening()
def data_packet(self, target=None, timeout=None): def stop_listening(self):
"""停止监听数据包"""
self._driver.driver.Network.responseReceived = None
self._driver.driver.Network.loadingFinished = None
def data_packets(self, targets=None, timeout=None, any_target=False):
"""等待指定数据包加载完成 """等待指定数据包加载完成
:param target: 指定的数据包url片段 :param targets: 指定的数据包url片段可传入多个
:param timeout: 超时时间为None则使用页面对象timeout :param timeout: 超时时间为None则使用页面对象timeout
:param any_target: 多个target时是否全部监听到才结束为True时监听到一个目标就结束
:return: ResponseData对象 :return: ResponseData对象
""" """
if target: if targets and not isinstance(targets, (str, list, tuple, set)):
self._target = target raise TypeError('targets只能是str、list、tuple、set。')
if targets:
self._targets = targets if isinstance(targets, str) else set(targets)
self._request_id = None self._request_id = None
self._response_data = None self._response_result = None
timeout = timeout if timeout is not None else self._driver.timeout timeout = timeout if timeout is not None else self._driver.timeout
end_time = perf_counter() + timeout end_time = perf_counter() + timeout
while not self._response_data and perf_counter() < end_time: if isinstance(self._targets, str):
sleep(.1) while not self._response_result and perf_counter() < end_time:
sleep(.1)
else:
while perf_counter() < end_time:
if self._response_result and (any_target or set(self._response_result) == self._targets):
break
sleep(.1)
self._request_id = None self._request_id = None
return self._response_result or False
return self._response_data or False
def _response_received(self, **kwargs): def _response_received(self, **kwargs):
"""接收到返回信息时处理方法""" """接收到返回信息时处理方法"""
if self._target in kwargs['response']['url']: if isinstance(self._targets, str):
self._request_id = kwargs['requestId'] if self._targets in kwargs['response']['url']:
self._response = kwargs['response'] self._request_id = kwargs['requestId']
self._response = kwargs['response']
else:
if not self._response:
self._response = {}
if not self._request_id:
self._request_id = {}
for target in self._targets:
if target in kwargs['response']['url']:
self._response[target] = kwargs['response']
self._request_id[kwargs['requestId']] = target
def _loading_finished(self, **kwargs): def _loading_finished(self, **kwargs):
"""请求完成时处理方法""" """请求完成时处理方法"""
if kwargs['requestId'] == self._request_id: if isinstance(self._targets, str):
try: if kwargs['requestId'] == self._request_id:
body = self._driver.run_cdp('Network.getResponseBody', requestId=self._request_id)['body'] try:
except: body = self._driver.run_cdp('Network.getResponseBody', requestId=self._request_id)['body']
body = '' except:
self._response_data = ResponseData(self._request_id, self._response, body, body = ''
self._driver.tab_id, self._target) self._response_result = ResponseData(self._request_id, self._response, body,
self._driver.tab_id, self._targets)
else:
if self._request_id and kwargs['requestId'] in self._request_id:
if not self._response_result:
self._response_result = {}
try:
body = self._driver.run_cdp('Network.getResponseBody', requestId=self._request_id)['body']
except:
body = ''
target = self._request_id[kwargs['requestId']]
self._response_result[self._request_id[kwargs['requestId']]] = ResponseData(kwargs['requestId'],
self._response[target],
body, self._driver.tab_id,
target)
class ChromiumPageScroll(ChromiumScroll): class ChromiumPageScroll(ChromiumScroll):

View File

@ -4,7 +4,7 @@
@Contact : g1879@qq.com @Contact : g1879@qq.com
""" """
from pathlib import Path from pathlib import Path
from typing import Union, Tuple, List, Any from typing import Union, Tuple, List, Any, Dict
from DataRecorder import Recorder from DataRecorder import Recorder
from FlowViewer.listener import ResponseData from FlowViewer.listener import ResponseData
@ -217,10 +217,10 @@ class ChromiumBase(BasePage):
class ChromiumBaseWaiter(object): class ChromiumBaseWaiter(object):
def __init__(self, page: ChromiumBase): def __init__(self, page: ChromiumBase):
self._driver: ChromiumBase = ... self._driver: ChromiumBase = ...
self._target: str = ... self._targets: Union[str, dict] = ...
self._request_id: str = ... self._request_id: Union[str, dict] = ...
self._response: dict = ... self._response: dict = ...
self._response_data: ResponseData = ... self._response_result: Union[ResponseData, List[ResponseData], False] = ...
def ele_delete(self, loc_or_ele: Union[str, tuple, ChromiumElement], timeout: float = None) -> bool: ... def ele_delete(self, loc_or_ele: Union[str, tuple, ChromiumElement], timeout: float = None) -> bool: ...
@ -234,9 +234,12 @@ class ChromiumBaseWaiter(object):
def load_complete(self, timeout: float = None) -> bool: ... def load_complete(self, timeout: float = None) -> bool: ...
def set_target(self, target: str) -> None: ... def set_targets(self, target: Union[str, list, tuple, set]) -> None: ...
def data_packet(self, target: str = None, timeout: float = None) -> Union[ResponseData, None]: ... def stop_listening(self) -> None: ...
def data_packets(self, target: Union[str, list, tuple, set] = None, timeout: float = None,
any_target: bool = False) -> Union[ResponseData, Dict[str, ResponseData], False]: ...
def _response_received(self, **kwargs): ... def _response_received(self, **kwargs): ...

View File

@ -149,9 +149,9 @@ class ChromiumDriver(object):
if event['method'] in self.event_handlers: if event['method'] in self.event_handlers:
try: try:
self.event_handlers[event['method']](**event['params']) self.event_handlers[event['method']](**event['params'])
except Exception: except Exception as e:
pass # pass
# raise RuntimeError(f"\n回调函数 {self.event_handlers[event['method']].__name__} 错误:\n{e}") raise RuntimeError(f"\n回调函数 {self.event_handlers[event['method']].__name__} 错误:\n{e}")
self.event_queue.task_done() self.event_queue.task_done()

View File

@ -6,7 +6,7 @@ with open("README.md", "r", encoding='utf-8') as fh:
setup( setup(
name="DrissionPage", name="DrissionPage",
version="3.2.19", version="3.2.20",
author="g1879", author="g1879",
author_email="g1879@qq.com", author_email="g1879@qq.com",
description="Python based web automation tool. It can control the browser and send and receive data packets.", description="Python based web automation tool. It can control the browser and send and receive data packets.",