修改wait.data_packets()未完成

This commit is contained in:
g1879 2023-04-20 17:52:01 +08:00
parent e241649a1d
commit a6c49cc3d8
3 changed files with 107 additions and 19 deletions

View File

@ -1035,16 +1035,15 @@ class ChromiumBaseWaiter(object):
self._listener = NetworkListener(self._driver)
self._listener.set_targets(targets, is_regex)
def data_packets(self, targets=None, timeout=None, any_one=False):
def data_packets(self, timeout=None, any_one=False):
"""等待指定数据包加载完成
:param targets: 要匹配的数据包url特征可用list等传入多个
:param timeout: 超时时间为None则使用页面对象timeout
:param any_one: 多个target时是否全部监听到才结束为True时监听到一个目标就结束
:return: ResponseData对象或监听结果字典
"""
if not self._listener:
self._listener = NetworkListener(self._driver)
return self._listener.listen(targets, timeout, any_one)
return self._listener.listen(timeout, any_one)
def stop_listening(self):
"""停止监听数据包"""
@ -1083,7 +1082,7 @@ class NetworkListener(object):
self._page.driver.Network.responseReceived = self._response_received
self._page.driver.Network.loadingFinished = self._loading_finished
else:
self.stop_listening()
self.stop()
def stop(self):
"""停止监听数据包"""
@ -1092,19 +1091,14 @@ class NetworkListener(object):
self._page.driver.Network.responseReceived = None
self._page.driver.Network.loadingFinished = None
def listen(self, targets=None, timeout=None, any_one=False):
def listen(self, timeout=None, any_one=False):
"""等待指定数据包加载完成
:param targets: 要匹配的数据包url特征可用list等传入多个
:param timeout: 超时时间为None则使用页面对象timeout
:param any_one: 多个target时是否全部监听到才结束为True时监听到一个目标就结束
:return: ResponseData对象或监听结果字典
"""
if self._targets is None and targets is None:
targets = ''
if targets is not None:
self.set_targets(targets, is_regex=self._is_regex)
self._results = {}
if self._targets is None:
self.set_targets('', is_regex=self._is_regex)
timeout = timeout if timeout is not None else self._page.timeout
end_time = perf_counter() + timeout
while perf_counter() < end_time:
@ -1112,10 +1106,13 @@ class NetworkListener(object):
break
sleep(.1)
self._requests = {}
if not self._results:
return False
return list(self._results.values())[0] if self._single else self._results
r = list(self._results.values())[0] if self._single else self._results
self._results = {}
self._requests = {}
return r
def _response_received(self, **kwargs):
"""接收到返回信息时处理方法"""
@ -1140,6 +1137,7 @@ class NetworkListener(object):
rd.postData = request['post_data']
rd._base64_body = is_base64
rd.requestHeaders = request['request_headers']
rd.method = request['method']
self._results[target] = rd
def _requestWillBeSent(self, **kwargs):
@ -1149,7 +1147,8 @@ class NetworkListener(object):
not self._is_regex and target in kwargs['request']['url']):
self._requests[kwargs['requestId']] = {'target': target,
'post_data': kwargs['request'].get('postData', None),
'request_headers': kwargs['request']['headers']}
'request_headers': kwargs['request']['headers'],
'method': kwargs['request']['method']}
break

View File

@ -233,9 +233,9 @@ class ChromiumBaseWaiter(object):
def set_targets(self, targets: Union[str, list, tuple, set], is_regex: bool = False) -> None: ...
def stop(self) -> None: ...
def stop_listening(self) -> None: ...
def data_packets(self, targets: Union[str, list, tuple, set] = None, timeout: float = None,
def data_packets(self, timeout: float = None,
any_one: bool = False) -> Union[ResponseData, Dict[str, ResponseData], False]: ...
def upload_paths_inputted(self) -> None: ...
@ -252,9 +252,9 @@ class NetworkListener(object):
def set_targets(self, targets: Union[str, list, tuple, set], is_regex: bool = False) -> None: ...
def stop_listening(self) -> None: ...
def stop(self) -> None: ...
def listen(self, targets: Union[str, list, tuple, set] = None, timeout: float = None,
def listen(self, timeout: float = None,
any_one: bool = False) -> Union[ResponseData, Dict[str, ResponseData], False]: ...
def _response_received(self, **kwargs) -> None: ...

View File

@ -3,15 +3,104 @@
@Author : g1879
@Contact : g1879@qq.com
"""
from base64 import b64decode
from html import unescape
from http.cookiejar import Cookie
from json import JSONDecodeError, loads
from re import sub
from urllib.parse import urlparse, urljoin, urlunparse
from requests.cookies import RequestsCookieJar
from requests.structures import CaseInsensitiveDict
from tldextract import extract
class DataPacket(object):
"""返回的数据包管理类"""
__slots__ = ('requestId', 'request', 'response', 'rawBody', 'tab', 'target', '_requestHeaders', '_body',
'_base64_body', '_rawPostData', '_postData',
'url', 'urlFragment', 'method', 'postDataEntries', 'mixedContentType', 'initialPriority',
'referrerPolicy', 'isLinkPreload', 'trustTokenParams', 'isSameSite',
'status', 'statusText',
'securityDetails', 'headersText', 'mimeType', 'requestHeadersText', 'connectionReused', 'connectionId',
'remoteIPAddress', 'remotePort', 'fromDiskCache', 'fromServiceWorker', 'fromPrefetchCache',
'encodedDataLength', 'timing', 'serviceWorkerResponseSource', 'responseTime', 'cacheStorageCacheName',
'protocol', 'securityState',
)
def __init__(self, request_id, request, response, body, tab, target):
"""
:param request: request的数据
:param response: response的数据
:param body: response包含的内容
:param tab: 产生这个数据包的tab的id
:param target: 监听目标
"""
self.requestId = request_id
self.response = CaseInsensitiveDict(response)
self.request = CaseInsensitiveDict(request)
self.rawBody = body
self.tab = tab
self.target = target
self._requestHeaders = None
self._postData = None
self._body = None
self._base64_body = False
self._rawPostData = None
def __getattr__(self, item):
return self.response.get(item, None)
def __repr__(self):
return f'<ResponseData target={self.target} request_id={self.requestId}>'
@property
def responseHeaders(self):
"""以大小写不敏感字典返回headers数据"""
headers = self.response.get('headers', None)
return CaseInsensitiveDict(headers) if headers else None
@property
def requestHeaders(self):
"""以大小写不敏感字典返回requestHeaders数据"""
if self._requestHeaders:
return self._requestHeaders
headers = self.response.get('requestHeaders', None)
return CaseInsensitiveDict(headers) if headers else None
@property
def postData(self):
"""返回postData数据"""
if self._postData is None and self._rawPostData:
try:
self._postData = loads(self._rawPostData)
except JSONDecodeError:
self._postData = self._rawPostData
return self._postData
def set_postData(self, val):
"""设置postData当hasPostData为True但数据太长时使用"""
self._rawPostData = val
@property
def body(self):
"""返回body内容如果是json格式自动进行转换如果时图片格式进行base64转换其它格式直接返回文本"""
if self._body is None:
if self._base64_body:
self._body = b64decode(self.rawBody)
else:
try:
self._body = loads(self.rawBody)
except JSONDecodeError:
self._body = self.rawBody
return self._body
def get_ele_txt(e):
"""获取元素内所有文本
:param e: 元素对象