4.0.0b3抓包可包含扩展信息;url不对所有允许的字符转义

This commit is contained in:
g1879 2023-10-31 00:25:29 +08:00
parent 722e299150
commit b5f2e28e32
5 changed files with 105 additions and 26 deletions

View File

@ -414,7 +414,7 @@ class BasePage(BaseParser):
:param interval: 重试间隔
:return: 重试次数和间隔组成的tuple
"""
self._url = quote(url, safe='/:&?=%;#@+![]')
self._url = quote(url, safe='-_.~!*\'();:@&=+$,/?#[]')
retry = retry if retry is not None else self.retry_times
interval = interval if interval is not None else self.retry_interval
return retry, interval

View File

@ -48,6 +48,7 @@ class ChromiumBase(BasePage):
self._listener = None
self._has_alert = False
self._ready_state = None
self._doc_got = False # 用于在LoadEventFired和FrameStoppedLoading间标记是否已获取doc
self._download_path = str(Path('.').absolute())
@ -188,16 +189,18 @@ class ChromiumBase(BasePage):
self._ready_state = 'complete'
if self._debug:
print(f'LoadEventFired {kwargs}')
# self._get_document()
self._get_document()
self._doc_got = True
def _onFrameStoppedLoading(self, **kwargs):
"""页面加载完成后执行"""
self.browser._frames[kwargs['frameId']] = self.tab_id
if kwargs['frameId'] == self._frame_id:
if kwargs['frameId'] == self._frame_id and self._doc_got is False:
self._ready_state = 'complete'
if self._debug:
print(f'FrameStoppedLoading {kwargs}')
self._get_document()
self._doc_got = False
def _onFileChooserOpened(self, **kwargs):
"""文件选择框打开时执行"""

View File

@ -49,6 +49,7 @@ class ChromiumBase(BasePage):
self._listener: NetworkListener = ...
self._alert: Alert = ...
self._has_alert: bool = ...
self._doc_got: bool = ...
self._ready_state: Optional[str] = ...
def _connect_browser(self, tab_id: str = None) -> None: ...

View File

@ -179,17 +179,20 @@ class NetworkListener(object):
def _set_callback(self):
"""设置监听请求的回调函数"""
self._driver.set_listener('Network.requestWillBeSent', self._requestWillBeSent)
self._driver.set_listener('Network.requestWillBeSentExtraInfo', self._requestWillBeSentExtraInfo)
self._driver.set_listener('Network.responseReceived', self._response_received)
self._driver.set_listener('Network.responseReceivedExtraInfo', self._responseReceivedExtraInfo)
self._driver.set_listener('Network.loadingFinished', self._loading_finished)
self._driver.set_listener('Network.loadingFailed', self._loading_failed)
def _requestWillBeSent(self, **kwargs):
"""接收到请求时的回调函数"""
if not self._targets:
self._request_ids[kwargs['requestId']] = DataPacket(self._page.tab_id, None, kwargs)
packet = self._request_ids.setdefault(kwargs['requestId'], DataPacket(self._page.tab_id, None))
packet._raw_request = kwargs
if kwargs['request'].get('hasPostData', None) and not kwargs['request'].get('postData', None):
self._request_ids[kwargs['requestId']]._raw_post_data = \
self._driver.call_method('Network.getRequestPostData', requestId=kwargs['requestId'])['postData']
packet._raw_post_data = self._driver.call_method('Network.getRequestPostData',
requestId=kwargs['requestId'])['postData']
return
@ -197,20 +200,40 @@ class NetworkListener(object):
if ((self._is_regex and search(target, kwargs['request']['url'])) or
(not self._is_regex and target in kwargs['request']['url'])) and (
not self._method or kwargs['request']['method'] in self._method):
self._request_ids[kwargs['requestId']] = DataPacket(self._page.tab_id, target, kwargs)
packet = self._request_ids.setdefault(kwargs['requestId'], DataPacket(self._page.tab_id, None))
packet._raw__request = kwargs
if kwargs['request'].get('hasPostData', None) and not kwargs['request'].get('postData', None):
self._request_ids[kwargs['requestId']]._raw_post_data = \
self._driver.call_method('Network.getRequestPostData', requestId=kwargs['requestId'])['postData']
packet._raw_post_data = self._driver.call_method('Network.getRequestPostData',
requestId=kwargs['requestId'])['postData']
break
def _requestWillBeSentExtraInfo(self, **kwargs):
if not self._targets:
packet = self._request_ids.setdefault(kwargs['requestId'], DataPacket(self._page.tab_id, None))
packet._requestExtraInfo = kwargs
return
for target in self._targets:
if ((self._is_regex and search(target, kwargs['request']['url'])) or
(not self._is_regex and target in kwargs['request']['url'])) and (
not self._method or kwargs['request']['method'] in self._method):
packet = self._request_ids.setdefault(kwargs['requestId'], DataPacket(self._page.tab_id, None))
packet._requestExtraInfo = kwargs
break
def _response_received(self, **kwargs):
"""接收到返回信息时处理方法"""
request_id = kwargs['requestId']
if request_id in self._request_ids:
self._request_ids[request_id]._raw_response = kwargs['response']
self._request_ids[request_id]._resource_type = kwargs['type']
request = self._request_ids.get(kwargs['requestId'])
if request:
request._raw_response = kwargs['response']
request._resource_type = kwargs['type']
def _responseReceivedExtraInfo(self, **kwargs):
request = self._request_ids.get(kwargs['requestId'])
if request:
request._responseExtraInfo = kwargs
def _loading_finished(self, **kwargs):
"""请求完成时处理方法"""
@ -249,21 +272,22 @@ class NetworkListener(object):
class DataPacket(object):
"""返回的数据包管理类"""
def __init__(self, tab_id, target, raw_request):
def __init__(self, tab_id, target):
"""
:param tab_id: 产生这个数据包的tab的id
:param target: 监听目标
:param raw_request: 原始request数据从cdp获得
"""
self.tab_id = tab_id
self.target = target
self._raw_request = raw_request
self._raw_request = None
self._raw_post_data = None
self._raw_response = None
self._raw_body = None
self._base64_body = False
self._requestExtraInfo = None
self._responseExtraInfo = None
self._request = None
self._response = None
@ -293,22 +317,23 @@ class DataPacket(object):
@property
def request(self):
if self._request is None:
self._request = Request(self._raw_request['request'], self._raw_post_data)
self._request = Request(self._raw_request['request'], self._raw_post_data, self._requestExtraInfo)
return self._request
@property
def response(self):
if self._response is None:
self._response = Response(self._raw_response, self._raw_body, self._base64_body)
self._response = Response(self._raw_response, self._raw_body, self._base64_body, self._responseExtraInfo)
return self._response
class Request(object):
def __init__(self, raw_request, post_data):
def __init__(self, raw_request, post_data, extra_info):
self._request = raw_request
self._raw_post_data = post_data
self._postData = None
self._headers = None
self.extra_info = RequestExtraInfo(extra_info or {})
def __getattr__(self, item):
return self._request.get(item, None)
@ -338,12 +363,13 @@ class Request(object):
class Response(object):
def __init__(self, raw_response, raw_body, base64_body):
def __init__(self, raw_response, raw_body, base64_body, extra_info):
self._response = raw_response
self._raw_body = raw_body
self._is_base64_body = base64_body
self._body = None
self._headers = None
self.extra_info = ResponseExtraInfo(extra_info or {})
def __getattr__(self, item):
return self._response.get(item, None)
@ -374,3 +400,19 @@ class Response(object):
self._body = self._raw_body
return self._body
class ExtraInfo(object):
def __init__(self, extra_info):
self._extra_info = extra_info
def __getattr__(self, item):
return self._extra_info.get(item, None)
class RequestExtraInfo(ExtraInfo):
pass
class ResponseExtraInfo(ExtraInfo):
pass

View File

@ -49,8 +49,12 @@ class NetworkListener(object):
def _requestWillBeSent(self, **kwargs) -> None: ...
def _requestWillBeSentExtraInfo(self, **kwargs) -> None: ...
def _response_received(self, **kwargs) -> None: ...
def _responseReceivedExtraInfo(self, **kwargs) -> None: ...
def _loading_finished(self, **kwargs) -> None: ...
def _loading_failed(self, **kwargs) -> None: ...
@ -64,11 +68,11 @@ class NetworkListener(object):
class DataPacket(object):
"""返回的数据包管理类"""
def __init__(self, tab_id: str, target: Optional[str], raw_info: dict):
def __init__(self, tab_id: str, target: Optional[str]):
self.tab_id: str = ...
self.target: str = ...
self._raw_request: dict = ...
self._raw_response: dict = ...
self._raw_request: Optional[dict] = ...
self._raw_response: Optional[dict] = ...
self._raw_post_data: str = ...
self._raw_body: str = ...
self._base64_body: bool = ...
@ -76,6 +80,8 @@ class DataPacket(object):
self._response: Response = ...
self.errorText: str = ...
self._resource_type: str = ...
self._requestExtraInfo: Optional[dict] = ...
self._responseExtraInfo: Optional[dict] = ...
@property
def url(self) -> str: ...
@ -98,6 +104,7 @@ class DataPacket(object):
class Request(object):
url: str = ...
extra_info: Optional[RequestExtraInfo] = ...
_headers: Union[CaseInsensitiveDict, None] = ...
method: str = ...
@ -111,7 +118,7 @@ class Request(object):
trustTokenParams = ...
isSameSite = ...
def __init__(self, raw_request: dict, post_data: str):
def __init__(self, raw_request: dict, post_data: str, extra_info: Optional[dict]):
self._request: dict = ...
self._raw_post_data: str = ...
self._postData: str = ...
@ -124,6 +131,7 @@ class Request(object):
class Response(object):
extra_info: Optional[ResponseExtraInfo] = ...
url = ...
status = ...
statusText = ...
@ -148,7 +156,7 @@ class Response(object):
securityState = ...
securityDetails = ...
def __init__(self, raw_response: dict, raw_body: str, base64_body: bool):
def __init__(self, raw_response: dict, raw_body: str, base64_body: bool, extra_info: Optional[dict]):
self._response: dict = ...
self._raw_body: str = ...
self._is_base64_body: bool = ...
@ -163,3 +171,28 @@ class Response(object):
@property
def body(self) -> Union[str, dict, bool]: ...
class ExtraInfo(object):
def __init__(self, extra_info: dict):
self._extra_info: dict = ...
class RequestExtraInfo(ExtraInfo):
requestId: str = ...
associatedCookies: List[dict] = ...
headers: dict = ...
connectTiming: dict = ...
clientSecurityState: dict = ...
siteHasCookieInOtherPartition: bool = ...
class ResponseExtraInfo(ExtraInfo):
requestId: str = ...
blockedCookies: List[dict] = ...
headers: dict = ...
resourceIPAddressSpace: str = ...
statusCode: int = ...
headersText: str = ...
cookiePartitionKey: str = ...
cookiePartitionKeyOpaque: bool = ...