3.2.20完善wait.data_packets(),使用正则设置target

This commit is contained in:
g1879 2023-03-30 00:16:04 +08:00
parent 1aeda01d94
commit 395b94a703
2 changed files with 23 additions and 21 deletions

View File

@ -7,6 +7,7 @@ from base64 import b64decode
from json import loads, JSONDecodeError from json import loads, JSONDecodeError
from os import sep from os import sep
from pathlib import Path from pathlib import Path
from re import search
from threading import Thread from threading import Thread
from time import perf_counter, sleep, time from time import perf_counter, sleep, time
from warnings import warn from warnings import warn
@ -1029,6 +1030,7 @@ class ChromiumBaseWaiter(object):
self._driver = page_or_ele self._driver = page_or_ele
self._response = None self._response = None
self._request_id = None self._request_id = None
self._targets = None
def ele_delete(self, loc_or_ele, timeout=None): def ele_delete(self, loc_or_ele, timeout=None):
"""等待元素从DOM中删除 """等待元素从DOM中删除
@ -1094,16 +1096,16 @@ class ChromiumBaseWaiter(object):
sleep(gap) sleep(gap)
return False return False
def set_targets(self, targets): def set_targets(self, regexes):
"""指定要等待的数据包 """指定要等待的数据包
:param targets: 指定的数据包url片段传入多个 :param regexes: 要匹配的数据包url特征正则字符串可用list等传入多个
:return: None :return: None
""" """
if not isinstance(targets, (str, list, tuple, set)): if not isinstance(regexes, (str, list, tuple, set)):
raise TypeError('targets只能是str、list、tuple、set。') raise TypeError('targets只能是str、list、tuple、set。')
self._targets = targets if isinstance(targets, str) else set(targets) self._targets = regexes if isinstance(regexes, str) else set(regexes)
self._driver.run_cdp('Network.enable') self._driver.run_cdp('Network.enable')
if targets: if regexes is not None:
self._driver.driver.Network.responseReceived = self._response_received self._driver.driver.Network.responseReceived = self._response_received
self._driver.driver.Network.loadingFinished = self._loading_finished self._driver.driver.Network.loadingFinished = self._loading_finished
else: else:
@ -1114,17 +1116,17 @@ class ChromiumBaseWaiter(object):
self._driver.driver.Network.responseReceived = None self._driver.driver.Network.responseReceived = None
self._driver.driver.Network.loadingFinished = None self._driver.driver.Network.loadingFinished = None
def data_packets(self, targets=None, timeout=None, any_target=False): def data_packets(self, targets=None, timeout=None, any_one=False):
"""等待指定数据包加载完成 """等待指定数据包加载完成
:param targets: 指定的数据包url片段传入多个 :param targets: 要匹配的数据包url特征为正则字符串可用list等传入多个
:param timeout: 超时时间为None则使用页面对象timeout :param timeout: 超时时间为None则使用页面对象timeout
:param any_target: 多个target时是否全部监听到才结束为True时监听到一个目标就结束 :param any_one: 多个target时是否全部监听到才结束为True时监听到一个目标就结束
:return: ResponseData对象 :return: ResponseData对象或监听结果字典
""" """
if targets and not isinstance(targets, (str, list, tuple, set)): if self._targets is None and targets is None:
raise TypeError('targets只能是str、list、tuple、set。') targets = ''
if targets: if targets is not None:
self._targets = targets if isinstance(targets, str) else set(targets) self.set_targets(targets)
self._request_id = None self._request_id = None
self._response_result = None self._response_result = None
@ -1136,7 +1138,7 @@ class ChromiumBaseWaiter(object):
else: else:
while perf_counter() < end_time: while perf_counter() < end_time:
if self._response_result and (any_target or set(self._response_result) == self._targets): if self._response_result and (any_one or set(self._response_result) == self._targets):
break break
sleep(.1) sleep(.1)
@ -1146,7 +1148,7 @@ class ChromiumBaseWaiter(object):
def _response_received(self, **kwargs): def _response_received(self, **kwargs):
"""接收到返回信息时处理方法""" """接收到返回信息时处理方法"""
if isinstance(self._targets, str): if isinstance(self._targets, str):
if self._targets in kwargs['response']['url']: if search(self._targets, kwargs['response']['url']):
self._request_id = kwargs['requestId'] self._request_id = kwargs['requestId']
self._response = kwargs['response'] self._response = kwargs['response']
@ -1157,7 +1159,7 @@ class ChromiumBaseWaiter(object):
self._request_id = {} self._request_id = {}
for target in self._targets: for target in self._targets:
if target in kwargs['response']['url']: if search(target, kwargs['response']['url']):
self._response[target] = kwargs['response'] self._response[target] = kwargs['response']
self._request_id[kwargs['requestId']] = target self._request_id[kwargs['requestId']] = target
@ -1178,7 +1180,7 @@ class ChromiumBaseWaiter(object):
self._response_result = {} self._response_result = {}
try: try:
body = self._driver.run_cdp('Network.getResponseBody', requestId=self._request_id)['body'] body = self._driver.run_cdp('Network.getResponseBody', requestId=kwargs['requestId'])['body']
except: except:
body = '' body = ''

View File

@ -220,7 +220,7 @@ class ChromiumBaseWaiter(object):
self._targets: Union[str, dict] = ... self._targets: Union[str, dict] = ...
self._request_id: Union[str, dict] = ... self._request_id: Union[str, dict] = ...
self._response: dict = ... self._response: dict = ...
self._response_result: Union[ResponseData, List[ResponseData], False] = ... self._response_result: Union[ResponseData, Dict[str, ResponseData], False] = ...
def ele_delete(self, loc_or_ele: Union[str, tuple, ChromiumElement], timeout: float = None) -> bool: ... def ele_delete(self, loc_or_ele: Union[str, tuple, ChromiumElement], timeout: float = None) -> bool: ...
@ -234,12 +234,12 @@ class ChromiumBaseWaiter(object):
def load_complete(self, timeout: float = None) -> bool: ... def load_complete(self, timeout: float = None) -> bool: ...
def set_targets(self, target: Union[str, list, tuple, set]) -> None: ... def set_targets(self, regexes: Union[str, list, tuple, set]) -> None: ...
def stop_listening(self) -> None: ... def stop_listening(self) -> None: ...
def data_packets(self, target: Union[str, list, tuple, set] = None, timeout: float = None, def data_packets(self, targets: Union[str, list, tuple, set] = None, timeout: float = None,
any_target: bool = False) -> Union[ResponseData, Dict[str, ResponseData], False]: ... any_one: bool = False) -> Union[ResponseData, Dict[str, ResponseData], False]: ...
def _response_received(self, **kwargs): ... def _response_received(self, **kwargs): ...