新版抓包功能待测试

This commit is contained in:
g1879 2023-04-28 18:55:08 +08:00
parent f817fcee5c
commit c021d83212
4 changed files with 176 additions and 134 deletions

View File

@ -1034,6 +1034,7 @@ class ChromiumBaseWaiter(object):
if not self._listener:
self._listener = NetworkListener(self._driver)
self._listener.set_targets(targets, is_regex)
self._listener.start()
def data_packets(self, timeout=None, any_one=False):
"""等待指定数据包加载完成
@ -1059,11 +1060,10 @@ class NetworkListener(object):
self._is_regex = False
self._results = {}
self._single = False
self._requests = {}
self._count = None
self._caught = 0 # 已获取到的数量
self._all_tabs = False # 是否监听所有tab
self._driver = self._page.driver
def set_targets(self, targets, is_regex=False, count=None):
"""指定要等待的数据包
@ -1085,31 +1085,16 @@ class NetworkListener(object):
if count is None:
self._count = len(self._targets)
if targets is not None:
self._page.run_cdp('Network.enable')
self._page.driver.Network.requestWillBeSent = self._request_will_sent
self._page.driver.Network.responseReceived = self._response_received
self._page.driver.Network.loadingFinished = self._loading_finished
else:
self.stop()
def start(self):
driver = self._page.browser_driver if self._all_tabs else self._page.driver
driver.set_listener('Fetch.requestPaused', self._request_paused)
patterns = []
for i in self._targets:
patterns.append({'requestStage': 'Request', 'urlPattern': i})
patterns.append({'requestStage': 'Response', 'urlPattern': i})
if patterns:
driver.call_method('Fetch.enable', patterns=patterns)
else:
driver.call_method('Fetch.enable')
self._driver.set_listener('Fetch.requestPaused', self._request_paused)
self._driver.call_method('Network.enable')
self._driver.call_method('Fetch.enable')
def stop(self):
"""停止监听数据包"""
driver = self._page.browser_driver if self._all_tabs else self._page.driver
driver.set_listener('Fetch.requestPaused', None)
driver.call_method('Fetch.disable')
self._driver.set_listener('Fetch.requestPaused', None)
self._driver.call_method('Fetch.disable')
self._driver.call_method('Network.disable')
def listen(self, timeout=None, any_one=False):
"""等待指定数据包加载完成
@ -1130,70 +1115,40 @@ class NetworkListener(object):
if self._caught == 0:
r = False
else:
# todo
r = list(self._results.values())[0] if self._single else self._results
self._results = {}
self._requests = {}
self._caught = 0
return r
def _request_paused(self, **kwargs):
pass
i = kwargs['requestId']
if 'responseStatusCode' in kwargs:
for target in self._targets:
if (self._is_regex and search(target, kwargs['request']['url'])) or (
not self._is_regex and target in kwargs['request']['url']):
dp = DataPacket(self._page.tab_id, target, kwargs)
body = self._driver.call_method('Fetch.getResponseBody', requestId=i)
dp._raw_body = body['body']
dp._base64_body = body['base64Encoded']
if 'networkId' in kwargs and kwargs['request'].get('hasPostData', None) \
and not kwargs['request'].get('postData', None):
pd = self._driver.call_method('Network.getRequestPostData', requestId=kwargs['networkId'])
if 'postData' in pd:
dp._raw_post_data = pd['postData']
def _request_will_sent(self, **kwargs):
"""接收到请求时的回调函数"""
for target in self._targets:
if (self._is_regex and search(target, kwargs['request']['url'])) or (
not self._is_regex and target in kwargs['request']['url']):
self._requests[kwargs['requestId']] = DataPacket(kwargs['requestId'], self._page.tab_id, target, kwargs)
if target in self._results:
self._results[target].append(dp)
else:
self._results[target] = [dp]
if kwargs['request'].get('hasPostData', None) and not kwargs['request'].get('postData', None):
self._requests[kwargs['requestId']]._rawPostData \
= self._page.run_cdp('Network.getRequestPostData', requestId=kwargs['requestId'])['postData']
break
def _response_received(self, **kwargs):
"""接收到返回信息时处理方法"""
if kwargs['requestId'] in self._requests:
self._requests[kwargs['requestId']]._raw_response = kwargs
def _loading_finished(self, **kwargs):
"""请求完成时处理方法"""
request_id = kwargs['requestId']
if request_id in self._requests:
try:
r = self._page.run_cdp('Network.getResponseBody', requestId=request_id)
body = r['body']
is_base64 = r['base64Encoded']
except CallMethodError:
body = ''
is_base64 = False
data_packet = self._requests[request_id]
data_packet._rowBody = body
data_packet._base64_body = is_base64
if data_packet.target in self._results:
self._results[data_packet.target].append(data_packet)
else:
self._results[data_packet.target] = [data_packet]
break
self._driver.call_method('Fetch.continueResponse', requestId=i)
self._caught += 1
def _loading_failed(self, **kwargs):
"""请求失败时的处理方法"""
if kwargs['requestId'] in self._requests:
data_packet = self._requests[kwargs['requestId']]
data_packet._raw_fail_info = kwargs
if data_packet.target in self._results:
self._results[data_packet.target].append(data_packet)
else:
self._results[data_packet.target] = [data_packet]
self._caught += 1
else: # request
self._driver.call_method('Fetch.continueRequest', requestId=i)
class ChromiumPageScroll(ChromiumScroll):

View File

@ -242,7 +242,7 @@ class ChromiumBaseWaiter(object):
class NetworkListener(object):
def __init__(self, page):
def __init__(self, page: ChromiumBase):
self._page: ChromiumBase = ...
self._count: int = ...
self._caught: int = ...
@ -250,22 +250,18 @@ class NetworkListener(object):
self._single: bool = ...
self._results: Union[DataPacket, Dict[str, List[DataPacket]], False] = ...
self._is_regex: bool = ...
self._requests: dict = ...
self._driver: ChromiumDriver = ...
def set_targets(self, targets: Union[str, list, tuple, set], is_regex: bool = False, count: int = None) -> None: ...
def start(self) -> None: ...
def stop(self) -> None: ...
def listen(self, timeout: float = None,
any_one: bool = False) -> Union[DataPacket, Dict[str, List[DataPacket]], False]: ...
def _request_will_sent(self, **kwargs) -> None: ...
def _response_received(self, **kwargs) -> None: ...
def _loading_finished(self, **kwargs) -> None: ...
def _loading_failed(self, **kwargs) -> None: ...
def _request_paused(self, **kwargs) -> None: ...
class ChromiumPageScroll(ChromiumScroll):

View File

@ -17,82 +17,75 @@ from tldextract import extract
class DataPacket(object):
"""返回的数据包管理类"""
# __slots__ = ('requestId', 'request', 'response', 'rawBody', 'tab', 'target', '_requestHeaders', '_body',
# '_postData', '_request_data', '_response_data', '_fail_info',
# # cdp 原始数据
# '_raw_request', '_raw_response', '_raw_fail_info', '_rawPostData', '_rawBody', '_base64_body',
#
# 'url', 'urlFragment', 'method', 'postDataEntries', 'mixedContentType', 'initialPriority',
# 'referrerPolicy', 'isLinkPreload', 'trustTokenParams', 'isSameSite',
#
# 'status', 'statusText',
# 'securityDetails', 'headersText', 'mimeType', 'requestHeadersText', 'connectionReused', 'connectionId',
# 'remoteIPAddress', 'remotePort', 'fromDiskCache', 'fromServiceWorker', 'fromPrefetchCache',
# 'encodedDataLength', 'timing', 'serviceWorkerResponseSource', 'responseTime', 'cacheStorageCacheName',
# 'protocol', 'securityState',
# )
def __init__(self, request_id, tab, target, raw_request):
def __init__(self, tab, target, raw_info):
"""
:param request_id: request id
:param tab: 产生这个数据包的tab的id
:param target: 监听目标
:param raw_request: 原始request数据从cdp获得
"""
self.requestId = request_id
self.tab = tab
self.target = target
self._raw_request = raw_request
self._rawPostData = None
self._raw_info = raw_info
self._raw_post_data = None
self._raw_response = None
self._rawBody = None
self._raw_body = None
self._base64_body = False
self._raw_fail_info = None
self._request_data = None
self._response_data = None
self._fail_info = None
self._request = None
self._response = None
def __repr__(self):
return f'<DataPacket target={self.target} request_id={self.requestId}>'
@property
def requestId(self):
return self._raw_info['requestId']
@property
def url(self):
pass
return self.request.url
@property
def method(self):
pass
return self.request.method
@property
def frameId(self):
return self._raw_info['frameId']
@property
def resourceType(self):
return self._raw_info['resourceType']
@property
def request(self):
if self._request_data is None:
self._request_data = RequestData(self._raw_request, self._rawPostData)
return self._request_data
if self._request is None:
self._request = Request(self._raw_info['request'], self._raw_post_data)
return self._request
@property
def response(self):
if self._response_data is None:
self._response_data = False if self._raw_fail_info else ResponseData(self._raw_response, self._rawBody,
self._base64_body)
return self._response_data
@property
def fail_info(self):
if self._raw_fail_info and self._fail_info is None:
self._fail_info = FailInfo(self._raw_fail_info)
return self._fail_info
if self._response is None:
self._response = Response(self._raw_info, self._raw_body, self._base64_body)
return self._response
class RequestData(object):
class Request(object):
__slots__ = ('url', 'urlFragment', 'postDataEntries', 'mixedContentType', 'initialPriority',
'referrerPolicy', 'isLinkPreload', 'trustTokenParams', 'isSameSite',
'_request', '_raw_post_data', '_postData')
def __init__(self, raw_request, post_data):
self._request = raw_request
self._raw_post_data = post_data
self._postData = None
def __getattr__(self, item):
return self._request.get(item, None)
@property
def headers(self):
"""以大小写不敏感字典返回headers数据"""
@ -101,20 +94,43 @@ class RequestData(object):
@property
def postData(self):
"""返回postData数据"""
if self._postData is None and self._rawPostData:
if self._postData is None:
if self._raw_post_data:
postData = self._raw_post_data
elif self._request.get('postData', None):
postData = self._request['postData']
else:
postData = False
try:
self._postData = loads(self._rawPostData)
self._postData = loads(postData)
except JSONDecodeError:
self._postData = self._rawPostData
self._postData = postData
return self._postData
class ResponseData(object):
class Response(object):
__slots__ = ('responseErrorReason', 'responseStatusCode', 'responseStatusText',
'_response', '_raw_body', '_is_base64_body', '_body', '_headers')
def __init__(self, raw_response, raw_body, base64_body):
self._response = raw_response
self._raw_body = raw_body
self._is_base64_body = base64_body
self._body = None
self._headers = None
def __getattr__(self, item):
return self._response.get(item, None)
@property
def headers(self):
if self._headers is None:
if 'responseHeaders' in self._response:
headers = {i['name']: i['value'] for i in self._response['responseHeaders']}
self._headers = CaseInsensitiveDict(headers)
else:
self._headers = False
return self._headers
@property
def body(self):
@ -132,11 +148,6 @@ class ResponseData(object):
return self._body
class FailInfo(object):
def __init__(self, raw_fail_info):
pass
def get_ele_txt(e):
"""获取元素内所有文本
:param e: 元素对象

View File

@ -8,12 +8,92 @@ from typing import Union
from requests import Session
from requests.cookies import RequestsCookieJar
from requests.structures import CaseInsensitiveDict
from DrissionPage.base import DrissionElement, BasePage
from DrissionPage.chromium_element import ChromiumElement
from DrissionPage.chromium_base import ChromiumBase
class DataPacket(object):
"""返回的数据包管理类"""
def __init__(self, tab: str, target: str, raw_info: dict):
self.tab: str = ...
self.target: str = ...
self._raw_info: dict = ...
self._raw_post_data: str = ...
self._raw_body: str = ...
self._base64_body: bool = ...
self._request: Request = ...
self._response: Response = ...
def __repr__(self): ...
@property
def requestId(self) -> str: ...
@property
def url(self) -> str: ...
@property
def method(self) -> str: ...
@property
def frameId(self) -> str: ...
@property
def resourceType(self) -> str: ...
@property
def request(self) -> Request: ...
@property
def response(self) -> Response: ...
class Request(object):
url: str = ...
urlFragment: str = ...
postDataEntries: list = ...
mixedContentType: str = ...
initialPriority: str = ...
referrerPolicy: str = ...
isLinkPreload: bool = ...
trustTokenParams: dict = ...
isSameSite: bool = ...
def __init__(self, raw_request: dict, post_data: str):
self._request: dict = ...
self._raw_post_data: str = ...
self._postData: str = ...
@property
def headers(self) -> dict: ...
@property
def postData(self) -> Union[str, dict]: ...
class Response(object):
responseErrorReason: str = ...
responseStatusCod: int = ...
responseStatusText: str = ...
def __init__(self, raw_response: dict, raw_body: str, base64_body: bool):
self._response: dict = ...
self._raw_body: str = ...
self._is_base64_body: bool = ...
self._body: Union[str, dict] = ...
self._headers: dict = ...
@property
def headers(self) -> CaseInsensitiveDict: ...
@property
def body(self) -> Union[str, dict, bool]: ...
def get_ele_txt(e: DrissionElement) -> str: ...