继续改进抓包功能,未完成

This commit is contained in:
g1879 2023-04-25 00:06:10 +08:00
parent cefb94515e
commit 5a0616306d
3 changed files with 54 additions and 35 deletions

View File

@ -11,16 +11,15 @@ from re import search
from threading import Thread
from time import perf_counter, sleep, time
from FlowViewer.listener import ResponseData
from requests import Session
from .commons.web import DataPacket
from .base import BasePage
from .chromium_driver import ChromiumDriver
from .chromium_element import ChromiumScroll, ChromiumElement, run_js, make_chromium_ele
from .commons.constants import HANDLE_ALERT_METHOD, ERROR, NoneElement
from .commons.locator import get_loc
from .commons.tools import get_usable_path, clean_folder
from .commons.web import DataPacket
from .commons.web import set_browser_cookies
from .errors import ContextLossError, ElementLossError, AlertExistsError, CallMethodError, TabClosedError, \
NoRectError, BrowserConnectError
@ -1087,7 +1086,7 @@ class NetworkListener(object):
if targets is not None:
self._page.run_cdp('Network.enable')
self._page.driver.Network.requestWillBeSent = self._requestWillBeSent
self._page.driver.Network.requestWillBeSent = self._request_will_sent
self._page.driver.Network.responseReceived = self._response_received
self._page.driver.Network.loadingFinished = self._loading_finished
else:
@ -1127,7 +1126,7 @@ class NetworkListener(object):
self._caught = 0
return r
def _requestWillBeSent(self, **kwargs):
def _request_will_sent(self, **kwargs):
"""接收到请求时的回调函数"""
for target in self._targets:
if (self._is_regex and search(target, kwargs['request']['url'])) or (

View File

@ -260,11 +260,13 @@ class NetworkListener(object):
def listen(self, timeout: float = None,
any_one: bool = False) -> Union[ResponseData, Dict[str, ResponseData], False]: ...
def _request_will_sent(self, **kwargs) -> None: ...
def _response_received(self, **kwargs) -> None: ...
def _loading_finished(self, **kwargs) -> None: ...
def _requestWillBeSent(self, **kwargs) -> None: ...
def _loading_failed(self, **kwargs) -> None: ...
class ChromiumPageScroll(ChromiumScroll):

View File

@ -18,7 +18,7 @@ from tldextract import extract
class DataPacket(object):
"""返回的数据包管理类"""
__slots__ = ('requestId', 'request', 'response', 'rawBody', 'tab', 'target', '_requestHeaders', '_body',
'_postData',
'_postData', '_request_data', '_response_data', '_fail_info',
# cdp 原始数据
'_raw_request', '_raw_response', '_raw_fail_info', '_rawPostData', '_rawBody', '_base64_body',
@ -44,46 +44,59 @@ class DataPacket(object):
self.target = target
self._raw_request = raw_request
self._raw_response = None
self._raw_fail_info = None
self._rawPostData = None
self._raw_response = None
self._rawBody = None
self._base64_body = False
self._requestHeaders = None
self._postData = None
self._body = None
self._raw_fail_info = None
self._request_data = None
self._response_data = None
self._fail_info = None
def __repr__(self):
return f'<DataPacket target={self.target} request_id={self.requestId}>'
@property
def reuqest(self):
def url(self):
pass
@property
def method(self):
pass
@property
def request(self):
if self._request_data is None:
self._request_data = RequestData(self._raw_request, self._rawPostData)
return self._request_data
@property
def response(self):
pass
if self._response_data is None:
self._response_data = False if self._raw_fail_info else ResponseData(self._raw_response, self._rawBody,
self._base64_body)
return self._response_data
@property
def fail_info(self):
pass
if self._raw_fail_info and self._fail_info is None:
self._fail_info = FailInfo(self._raw_fail_info)
return self._fail_info
class RequestData(object):
@property
def responseHeaders(self):
"""以大小写不敏感字典返回headers数据"""
headers = self.response.get('headers', None)
return CaseInsensitiveDict(headers) if headers else None
def __init__(self, raw_request, post_data):
self._request = raw_request
self._raw_post_data = post_data
self._postData = None
@property
def requestHeaders(self):
"""以大小写不敏感字典返回requestHeaders数据"""
if self._requestHeaders:
return self._requestHeaders
headers = self.response.get('requestHeaders', None)
return CaseInsensitiveDict(headers) if headers else None
def headers(self):
"""以大小写不敏感字典返回headers数据"""
return CaseInsensitiveDict(self._request['request']['headers'])
@property
def postData(self):
@ -95,28 +108,33 @@ class RequestData(object):
self._postData = self._rawPostData
return self._postData
class ResponseData(object):
def __init__(self, raw_response, raw_body, base64_body):
self._response = raw_response
self._raw_body = raw_body
self._is_base64_body = base64_body
self._body = None
@property
def body(self):
"""返回body内容如果是json格式自动进行转换如果时图片格式进行base64转换其它格式直接返回文本"""
if self._body is None:
if self._base64_body:
self._body = b64decode(self.rawBody)
if self._is_base64_body:
self._body = b64decode(self._raw_body)
else:
try:
self._body = loads(self.rawBody)
self._body = loads(self._raw_body)
except JSONDecodeError:
self._body = self.rawBody
self._body = self._raw_body
return self._body
class ResponseData(object):
pass
class FailData(object):
pass
class FailInfo(object):
def __init__(self, raw_fail_info):
pass
def get_ele_txt(e):