From 13c3cf01018ea5ee9ff8e2bd867643210e8e71ac Mon Sep 17 00:00:00 2001 From: g1879 Date: Wed, 3 May 2023 23:51:18 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E6=8A=93=E5=8C=85=E5=8A=9F?= =?UTF-8?q?=E8=83=BD=EF=BC=8C=E6=9C=AA=E5=AE=8C=E6=88=90?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- DrissionPage/chromium_base.py | 66 ++++++++++++---- DrissionPage/chromium_base.pyi | 50 ++++++------ DrissionPage/commons/web.py | 135 +++++++++++++++++++++++++++++++++ 3 files changed, 212 insertions(+), 39 deletions(-) diff --git a/DrissionPage/chromium_base.py b/DrissionPage/chromium_base.py index 33274ab..ff55aee 100644 --- a/DrissionPage/chromium_base.py +++ b/DrissionPage/chromium_base.py @@ -11,7 +11,6 @@ from re import search from threading import Thread from time import perf_counter, sleep, time -from FlowViewer.listener import ResponseData from requests import Session from .base import BasePage @@ -20,7 +19,7 @@ from .chromium_element import ChromiumScroll, ChromiumElement, run_js, make_chro from .commons.constants import HANDLE_ALERT_METHOD, ERROR, NoneElement from .commons.locator import get_loc from .commons.tools import get_usable_path, clean_folder -from .commons.web import set_browser_cookies +from .commons.web import set_browser_cookies, DataPacket from .errors import ContextLossError, ElementLossError, AlertExistsError, CallMethodError, TabClosedError, \ NoRectError, BrowserConnectError from .session_element import make_session_ele @@ -1061,14 +1060,20 @@ class NetworkListener(object): self._single = False self._requests = {} - def set_targets(self, targets, is_regex=False): + self._count = None + self._caught = 0 # 已获取到的数量 + self._driver = self._page.driver + + def set_targets(self, targets, is_regex=False, count=None): """指定要等待的数据包 :param targets: 要匹配的数据包url特征,可用list等传入多个 :param is_regex: 设置的target是否正则表达式 + :param count: 设置总共等待多少个数据包,为None时每个目标等待1个 :return: None """ if not isinstance(targets, (str, list, tuple, set)): raise TypeError('targets只能是str、list、tuple、set。') + self._is_regex = is_regex if isinstance(targets, str): self._targets = {targets} @@ -1076,20 +1081,25 @@ class NetworkListener(object): else: self._targets = set(targets) self._single = False - self._page.run_cdp('Network.enable') - if targets is not None: - self._page.driver.Network.requestWillBeSent = self._requestWillBeSent - self._page.driver.Network.responseReceived = self._response_received - self._page.driver.Network.loadingFinished = self._loading_finished - else: - self.stop() + if count is None: + self._count = len(self._targets) + + def start(self): + self._driver.set_listener('Fetch.requestPaused', self._request_paused) + self._driver.set_listener('Network.requestWillBeSent', self._requestWillBeSent) + self._driver.set_listener('Network.responseReceived', self._response_received) + self._driver.set_listener('Network.loadingFinished', self._loading_finished) + self._driver.call_method('Network.enable') + self._driver.call_method('Fetch.enable', patterns=[{'requestStage': 'Request'}, {'requestStage': 'Response'}]) def stop(self): """停止监听数据包""" - self._page.run_cdp('Network.disable') - self._page.driver.Network.requestWillBeSent = None - self._page.driver.Network.responseReceived = None - self._page.driver.Network.loadingFinished = None + self._driver.set_listener('Fetch.requestPaused', None) + self._driver.set_listener('Network.requestWillBeSent', None) + self._driver.set_listener('Network.responseReceived', None) + self._driver.set_listener('Network.loadingFinished', None) + self._driver.call_method('Fetch.disable') + self._driver.call_method('Network.disable') def listen(self, timeout=None, any_one=False): """等待指定数据包加载完成 @@ -1149,6 +1159,34 @@ class NetworkListener(object): 'request_headers': kwargs['request']['headers']} break + def _request_paused(self, **kwargs): + i = kwargs['requestId'] + if 'networkId' not in kwargs: + pass + # for target in self._targets: + # if (self._is_regex and search(target, kwargs['request']['url'])) or ( + # not self._is_regex and target in kwargs['request']['url']): + # dp = DataPacket(self._page.tab_id, target, kwargs) + # body = self._driver.call_method('Fetch.getResponseBody', requestId=i) + # dp._raw_body = body['body'] + # dp._base64_body = body['base64Encoded'] + # if 'networkId' in kwargs and kwargs['request'].get('hasPostData', None) \ + # and not kwargs['request'].get('postData', None): + # pd = self._driver.call_method('Network.getRequestPostData', requestId=kwargs['networkId']) + # if 'postData' in pd: + # dp._raw_post_data = pd['postData'] + # + # if target in self._results: + # self._results[target].append(dp) + # else: + # self._results[target] = [dp] + # + # self._caught += 1 + # break + + method = 'Request' if 'responseStatusCode' not in kwargs else 'Response' + self._driver.call_method(f'Fetch.continue{method}', requestId=i) + class ChromiumPageScroll(ChromiumScroll): def __init__(self, page): diff --git a/DrissionPage/chromium_base.pyi b/DrissionPage/chromium_base.pyi index 2723ebc..809617d 100644 --- a/DrissionPage/chromium_base.pyi +++ b/DrissionPage/chromium_base.pyi @@ -7,7 +7,6 @@ from pathlib import Path from typing import Union, Tuple, List, Any, Dict from DataRecorder import Recorder -from FlowViewer.listener import ResponseData from requests import Session from requests.cookies import RequestsCookieJar @@ -16,6 +15,7 @@ from .chromium_driver import ChromiumDriver from .chromium_element import ChromiumElement, ChromiumScroll from .chromium_frame import ChromiumFrame from .commons.constants import NoneElement +from .commons.web import DataPacket from .session_element import SessionElement @@ -135,31 +135,24 @@ class ChromiumBase(BasePage): def run_async_js(self, script: str, *args: Any, as_expr: bool = False) -> None: ... - def get(self, - url: str, - show_errmsg: bool = False, - retry: int = None, - interval: float = None, - timeout: float = None) -> Union[None, bool]: ... + def get(self, url: str, show_errmsg: bool = False, retry: int = None, + interval: float = None, timeout: float = None) -> Union[None, bool]: ... - def get_cookies(self, as_dict: bool = False, all_domains: bool = False, all_info: bool = False) -> Union[ - list, dict]: ... + def get_cookies(self, as_dict: bool = False, all_domains: bool = False, + all_info: bool = False) -> Union[list, dict]: ... - def ele(self, - loc_or_ele: Union[Tuple[str, str], str, ChromiumElement, ChromiumFrame], - timeout: float = None) -> ChromiumElement: ... + def ele(self, loc_or_ele: Union[Tuple[str, str], str, ChromiumElement, ChromiumFrame], + timeout: float = None) -> Union[ChromiumElement, str]: ... - def eles(self, - loc_or_str: Union[Tuple[str, str], str], - timeout: float = None) -> List[ChromiumElement]: ... + def eles(self, loc_or_str: Union[Tuple[str, str], str], + timeout: float = None) -> List[Union[ChromiumElement, str]]: ... def s_ele(self, loc_or_ele: Union[Tuple[str, str], str] = None) \ -> Union[SessionElement, str, NoneElement]: ... def s_eles(self, loc_or_str: Union[Tuple[str, str], str]) -> List[Union[SessionElement, str]]: ... - def _find_elements(self, - loc_or_ele: Union[Tuple[str, str], str, ChromiumElement, ChromiumFrame], + def _find_elements(self, loc_or_ele: Union[Tuple[str, str], str, ChromiumElement, ChromiumFrame], timeout: float = None, single: bool = True, relative: bool = False, raise_err: bool = None) \ -> Union[ChromiumElement, ChromiumFrame, NoneElement, List[Union[ChromiumElement, ChromiumFrame]]]: ... @@ -231,31 +224,36 @@ class ChromiumBaseWaiter(object): def load_complete(self, timeout: float = None) -> bool: ... + def upload_paths_inputted(self) -> None: ... + def set_targets(self, targets: Union[str, list, tuple, set], is_regex: bool = False) -> None: ... def stop_listening(self) -> None: ... def data_packets(self, timeout: float = None, - any_one: bool = False) -> Union[ResponseData, Dict[str, ResponseData], False]: ... - - def upload_paths_inputted(self) -> None: ... + any_one: bool = False) -> Union[DataPacket, Dict[str, List[DataPacket]], False]: ... class NetworkListener(object): - def __init__(self, page): + def __init__(self, page: ChromiumBase): self._page: ChromiumBase = ... + self._count: int = ... + self._caught: int = ... self._targets: Union[str, dict] = ... self._single: bool = ... - self._results: Union[ResponseData, Dict[str, ResponseData], False] = ... + self._results: Union[DataPacket, Dict[str, List[DataPacket]], False] = ... self._is_regex: bool = ... + self._driver: ChromiumDriver = ... self._requests: dict = ... - def set_targets(self, targets: Union[str, list, tuple, set], is_regex: bool = False) -> None: ... + def set_targets(self, targets: Union[str, list, tuple, set], is_regex: bool = False, count: int = None) -> None: ... + + def start(self) -> None: ... def stop(self) -> None: ... def listen(self, timeout: float = None, - any_one: bool = False) -> Union[ResponseData, Dict[str, ResponseData], False]: ... + any_one: bool = False) -> Union[DataPacket, Dict[str, List[DataPacket]], False]: ... def _response_received(self, **kwargs) -> None: ... @@ -263,6 +261,8 @@ class NetworkListener(object): def _requestWillBeSent(self, **kwargs) -> None: ... + def _request_paused(self, **kwargs) -> None: ... + class ChromiumPageScroll(ChromiumScroll): def __init__(self, page: ChromiumBase): ... @@ -366,4 +366,4 @@ class ScreencastMode(object): def frugal_imgs_mode(self) -> None: ... - def imgs_mode(self) -> None: ... \ No newline at end of file + def imgs_mode(self) -> None: ... diff --git a/DrissionPage/commons/web.py b/DrissionPage/commons/web.py index b9dd703..c782f88 100644 --- a/DrissionPage/commons/web.py +++ b/DrissionPage/commons/web.py @@ -3,15 +3,150 @@ @Author : g1879 @Contact : g1879@qq.com """ +from base64 import b64decode from html import unescape from http.cookiejar import Cookie +from json import JSONDecodeError, loads from re import sub from urllib.parse import urlparse, urljoin, urlunparse from requests.cookies import RequestsCookieJar +from requests.structures import CaseInsensitiveDict from tldextract import extract +class DataPacket(object): + """返回的数据包管理类""" + + def __init__(self, tab, target, raw_info): + """ + :param tab: 产生这个数据包的tab的id + :param target: 监听目标 + :param raw_info: 原始request数据,从cdp获得 + """ + self.tab = tab + self.target = target + + self._raw_info = raw_info + self._raw_post_data = None + + self._raw_body = None + self._base64_body = False + + self._request = None + self._response = None + + def __repr__(self): + return f'' + + @property + def requestId(self): + return self._raw_info['requestId'] + + @property + def url(self): + return self.request.url + + @property + def method(self): + return self.request.method + + @property + def frameId(self): + return self._raw_info['frameId'] + + @property + def resourceType(self): + return self._raw_info['resourceType'] + + @property + def request(self): + if self._request is None: + self._request = Request(self._raw_info['request'], self._raw_post_data) + return self._request + + @property + def response(self): + if self._response is None: + self._response = Response(self._raw_info, self._raw_body, self._base64_body) + return self._response + + +class Request(object): + __slots__ = ('url', 'urlFragment', 'postDataEntries', 'mixedContentType', 'initialPriority', + 'referrerPolicy', 'isLinkPreload', 'trustTokenParams', 'isSameSite', + '_request', '_raw_post_data', '_postData') + + def __init__(self, raw_request, post_data): + self._request = raw_request + self._raw_post_data = post_data + self._postData = None + + def __getattr__(self, item): + return self._request.get(item, None) + + @property + def headers(self): + """以大小写不敏感字典返回headers数据""" + return CaseInsensitiveDict(self._request['request']['headers']) + + @property + def postData(self): + """返回postData数据""" + if self._postData is None: + if self._raw_post_data: + postData = self._raw_post_data + elif self._request.get('postData', None): + postData = self._request['postData'] + else: + postData = False + try: + self._postData = loads(postData) + except JSONDecodeError: + self._postData = postData + return self._postData + + +class Response(object): + __slots__ = ('responseErrorReason', 'responseStatusCode', 'responseStatusText', + '_response', '_raw_body', '_is_base64_body', '_body', '_headers') + + def __init__(self, raw_response, raw_body, base64_body): + self._response = raw_response + self._raw_body = raw_body + self._is_base64_body = base64_body + self._body = None + self._headers = None + + def __getattr__(self, item): + return self._response.get(item, None) + + @property + def headers(self): + if self._headers is None: + if 'responseHeaders' in self._response: + headers = {i['name']: i['value'] for i in self._response['responseHeaders']} + self._headers = CaseInsensitiveDict(headers) + else: + self._headers = False + return self._headers + + @property + def body(self): + """返回body内容,如果是json格式,自动进行转换,如果时图片格式,进行base64转换,其它格式直接返回文本""" + if self._body is None: + if self._is_base64_body: + self._body = b64decode(self._raw_body) + + else: + try: + self._body = loads(self._raw_body) + except JSONDecodeError: + self._body = self._raw_body + + return self._body + + def get_ele_txt(e): """获取元素内所有文本 :param e: 元素对象