修改抓包功能,未完成

This commit is contained in:
g1879 2023-05-03 23:51:18 +08:00
parent 09c4e98072
commit 13c3cf0101
3 changed files with 212 additions and 39 deletions

View File

@ -11,7 +11,6 @@ from re import search
from threading import Thread
from time import perf_counter, sleep, time
from FlowViewer.listener import ResponseData
from requests import Session
from .base import BasePage
@ -20,7 +19,7 @@ from .chromium_element import ChromiumScroll, ChromiumElement, run_js, make_chro
from .commons.constants import HANDLE_ALERT_METHOD, ERROR, NoneElement
from .commons.locator import get_loc
from .commons.tools import get_usable_path, clean_folder
from .commons.web import set_browser_cookies
from .commons.web import set_browser_cookies, DataPacket
from .errors import ContextLossError, ElementLossError, AlertExistsError, CallMethodError, TabClosedError, \
NoRectError, BrowserConnectError
from .session_element import make_session_ele
@ -1061,14 +1060,20 @@ class NetworkListener(object):
self._single = False
self._requests = {}
def set_targets(self, targets, is_regex=False):
self._count = None
self._caught = 0 # 已获取到的数量
self._driver = self._page.driver
def set_targets(self, targets, is_regex=False, count=None):
"""指定要等待的数据包
:param targets: 要匹配的数据包url特征可用list等传入多个
:param is_regex: 设置的target是否正则表达式
:param count: 设置总共等待多少个数据包为None时每个目标等待1个
:return: None
"""
if not isinstance(targets, (str, list, tuple, set)):
raise TypeError('targets只能是str、list、tuple、set。')
self._is_regex = is_regex
if isinstance(targets, str):
self._targets = {targets}
@ -1076,20 +1081,25 @@ class NetworkListener(object):
else:
self._targets = set(targets)
self._single = False
self._page.run_cdp('Network.enable')
if targets is not None:
self._page.driver.Network.requestWillBeSent = self._requestWillBeSent
self._page.driver.Network.responseReceived = self._response_received
self._page.driver.Network.loadingFinished = self._loading_finished
else:
self.stop()
if count is None:
self._count = len(self._targets)
def start(self):
self._driver.set_listener('Fetch.requestPaused', self._request_paused)
self._driver.set_listener('Network.requestWillBeSent', self._requestWillBeSent)
self._driver.set_listener('Network.responseReceived', self._response_received)
self._driver.set_listener('Network.loadingFinished', self._loading_finished)
self._driver.call_method('Network.enable')
self._driver.call_method('Fetch.enable', patterns=[{'requestStage': 'Request'}, {'requestStage': 'Response'}])
def stop(self):
"""停止监听数据包"""
self._page.run_cdp('Network.disable')
self._page.driver.Network.requestWillBeSent = None
self._page.driver.Network.responseReceived = None
self._page.driver.Network.loadingFinished = None
self._driver.set_listener('Fetch.requestPaused', None)
self._driver.set_listener('Network.requestWillBeSent', None)
self._driver.set_listener('Network.responseReceived', None)
self._driver.set_listener('Network.loadingFinished', None)
self._driver.call_method('Fetch.disable')
self._driver.call_method('Network.disable')
def listen(self, timeout=None, any_one=False):
"""等待指定数据包加载完成
@ -1149,6 +1159,34 @@ class NetworkListener(object):
'request_headers': kwargs['request']['headers']}
break
def _request_paused(self, **kwargs):
i = kwargs['requestId']
if 'networkId' not in kwargs:
pass
# for target in self._targets:
# if (self._is_regex and search(target, kwargs['request']['url'])) or (
# not self._is_regex and target in kwargs['request']['url']):
# dp = DataPacket(self._page.tab_id, target, kwargs)
# body = self._driver.call_method('Fetch.getResponseBody', requestId=i)
# dp._raw_body = body['body']
# dp._base64_body = body['base64Encoded']
# if 'networkId' in kwargs and kwargs['request'].get('hasPostData', None) \
# and not kwargs['request'].get('postData', None):
# pd = self._driver.call_method('Network.getRequestPostData', requestId=kwargs['networkId'])
# if 'postData' in pd:
# dp._raw_post_data = pd['postData']
#
# if target in self._results:
# self._results[target].append(dp)
# else:
# self._results[target] = [dp]
#
# self._caught += 1
# break
method = 'Request' if 'responseStatusCode' not in kwargs else 'Response'
self._driver.call_method(f'Fetch.continue{method}', requestId=i)
class ChromiumPageScroll(ChromiumScroll):
def __init__(self, page):

View File

@ -7,7 +7,6 @@ from pathlib import Path
from typing import Union, Tuple, List, Any, Dict
from DataRecorder import Recorder
from FlowViewer.listener import ResponseData
from requests import Session
from requests.cookies import RequestsCookieJar
@ -16,6 +15,7 @@ from .chromium_driver import ChromiumDriver
from .chromium_element import ChromiumElement, ChromiumScroll
from .chromium_frame import ChromiumFrame
from .commons.constants import NoneElement
from .commons.web import DataPacket
from .session_element import SessionElement
@ -135,31 +135,24 @@ class ChromiumBase(BasePage):
def run_async_js(self, script: str, *args: Any, as_expr: bool = False) -> None: ...
def get(self,
url: str,
show_errmsg: bool = False,
retry: int = None,
interval: float = None,
timeout: float = None) -> Union[None, bool]: ...
def get(self, url: str, show_errmsg: bool = False, retry: int = None,
interval: float = None, timeout: float = None) -> Union[None, bool]: ...
def get_cookies(self, as_dict: bool = False, all_domains: bool = False, all_info: bool = False) -> Union[
list, dict]: ...
def get_cookies(self, as_dict: bool = False, all_domains: bool = False,
all_info: bool = False) -> Union[list, dict]: ...
def ele(self,
loc_or_ele: Union[Tuple[str, str], str, ChromiumElement, ChromiumFrame],
timeout: float = None) -> ChromiumElement: ...
def ele(self, loc_or_ele: Union[Tuple[str, str], str, ChromiumElement, ChromiumFrame],
timeout: float = None) -> Union[ChromiumElement, str]: ...
def eles(self,
loc_or_str: Union[Tuple[str, str], str],
timeout: float = None) -> List[ChromiumElement]: ...
def eles(self, loc_or_str: Union[Tuple[str, str], str],
timeout: float = None) -> List[Union[ChromiumElement, str]]: ...
def s_ele(self, loc_or_ele: Union[Tuple[str, str], str] = None) \
-> Union[SessionElement, str, NoneElement]: ...
def s_eles(self, loc_or_str: Union[Tuple[str, str], str]) -> List[Union[SessionElement, str]]: ...
def _find_elements(self,
loc_or_ele: Union[Tuple[str, str], str, ChromiumElement, ChromiumFrame],
def _find_elements(self, loc_or_ele: Union[Tuple[str, str], str, ChromiumElement, ChromiumFrame],
timeout: float = None, single: bool = True, relative: bool = False, raise_err: bool = None) \
-> Union[ChromiumElement, ChromiumFrame, NoneElement, List[Union[ChromiumElement, ChromiumFrame]]]: ...
@ -231,31 +224,36 @@ class ChromiumBaseWaiter(object):
def load_complete(self, timeout: float = None) -> bool: ...
def upload_paths_inputted(self) -> None: ...
def set_targets(self, targets: Union[str, list, tuple, set], is_regex: bool = False) -> None: ...
def stop_listening(self) -> None: ...
def data_packets(self, timeout: float = None,
any_one: bool = False) -> Union[ResponseData, Dict[str, ResponseData], False]: ...
def upload_paths_inputted(self) -> None: ...
any_one: bool = False) -> Union[DataPacket, Dict[str, List[DataPacket]], False]: ...
class NetworkListener(object):
def __init__(self, page):
def __init__(self, page: ChromiumBase):
self._page: ChromiumBase = ...
self._count: int = ...
self._caught: int = ...
self._targets: Union[str, dict] = ...
self._single: bool = ...
self._results: Union[ResponseData, Dict[str, ResponseData], False] = ...
self._results: Union[DataPacket, Dict[str, List[DataPacket]], False] = ...
self._is_regex: bool = ...
self._driver: ChromiumDriver = ...
self._requests: dict = ...
def set_targets(self, targets: Union[str, list, tuple, set], is_regex: bool = False) -> None: ...
def set_targets(self, targets: Union[str, list, tuple, set], is_regex: bool = False, count: int = None) -> None: ...
def start(self) -> None: ...
def stop(self) -> None: ...
def listen(self, timeout: float = None,
any_one: bool = False) -> Union[ResponseData, Dict[str, ResponseData], False]: ...
any_one: bool = False) -> Union[DataPacket, Dict[str, List[DataPacket]], False]: ...
def _response_received(self, **kwargs) -> None: ...
@ -263,6 +261,8 @@ class NetworkListener(object):
def _requestWillBeSent(self, **kwargs) -> None: ...
def _request_paused(self, **kwargs) -> None: ...
class ChromiumPageScroll(ChromiumScroll):
def __init__(self, page: ChromiumBase): ...
@ -366,4 +366,4 @@ class ScreencastMode(object):
def frugal_imgs_mode(self) -> None: ...
def imgs_mode(self) -> None: ...
def imgs_mode(self) -> None: ...

View File

@ -3,15 +3,150 @@
@Author : g1879
@Contact : g1879@qq.com
"""
from base64 import b64decode
from html import unescape
from http.cookiejar import Cookie
from json import JSONDecodeError, loads
from re import sub
from urllib.parse import urlparse, urljoin, urlunparse
from requests.cookies import RequestsCookieJar
from requests.structures import CaseInsensitiveDict
from tldextract import extract
class DataPacket(object):
"""返回的数据包管理类"""
def __init__(self, tab, target, raw_info):
"""
:param tab: 产生这个数据包的tab的id
:param target: 监听目标
:param raw_info: 原始request数据从cdp获得
"""
self.tab = tab
self.target = target
self._raw_info = raw_info
self._raw_post_data = None
self._raw_body = None
self._base64_body = False
self._request = None
self._response = None
def __repr__(self):
return f'<DataPacket target={self.target} request_id={self.requestId}>'
@property
def requestId(self):
return self._raw_info['requestId']
@property
def url(self):
return self.request.url
@property
def method(self):
return self.request.method
@property
def frameId(self):
return self._raw_info['frameId']
@property
def resourceType(self):
return self._raw_info['resourceType']
@property
def request(self):
if self._request is None:
self._request = Request(self._raw_info['request'], self._raw_post_data)
return self._request
@property
def response(self):
if self._response is None:
self._response = Response(self._raw_info, self._raw_body, self._base64_body)
return self._response
class Request(object):
__slots__ = ('url', 'urlFragment', 'postDataEntries', 'mixedContentType', 'initialPriority',
'referrerPolicy', 'isLinkPreload', 'trustTokenParams', 'isSameSite',
'_request', '_raw_post_data', '_postData')
def __init__(self, raw_request, post_data):
self._request = raw_request
self._raw_post_data = post_data
self._postData = None
def __getattr__(self, item):
return self._request.get(item, None)
@property
def headers(self):
"""以大小写不敏感字典返回headers数据"""
return CaseInsensitiveDict(self._request['request']['headers'])
@property
def postData(self):
"""返回postData数据"""
if self._postData is None:
if self._raw_post_data:
postData = self._raw_post_data
elif self._request.get('postData', None):
postData = self._request['postData']
else:
postData = False
try:
self._postData = loads(postData)
except JSONDecodeError:
self._postData = postData
return self._postData
class Response(object):
__slots__ = ('responseErrorReason', 'responseStatusCode', 'responseStatusText',
'_response', '_raw_body', '_is_base64_body', '_body', '_headers')
def __init__(self, raw_response, raw_body, base64_body):
self._response = raw_response
self._raw_body = raw_body
self._is_base64_body = base64_body
self._body = None
self._headers = None
def __getattr__(self, item):
return self._response.get(item, None)
@property
def headers(self):
if self._headers is None:
if 'responseHeaders' in self._response:
headers = {i['name']: i['value'] for i in self._response['responseHeaders']}
self._headers = CaseInsensitiveDict(headers)
else:
self._headers = False
return self._headers
@property
def body(self):
"""返回body内容如果是json格式自动进行转换如果时图片格式进行base64转换其它格式直接返回文本"""
if self._body is None:
if self._is_base64_body:
self._body = b64decode(self._raw_body)
else:
try:
self._body = loads(self._raw_body)
except JSONDecodeError:
self._body = self._raw_body
return self._body
def get_ele_txt(e):
"""获取元素内所有文本
:param e: 元素对象