diff --git a/DrissionPage/chromium_base.py b/DrissionPage/chromium_base.py index 80e1c7a..a78685d 100644 --- a/DrissionPage/chromium_base.py +++ b/DrissionPage/chromium_base.py @@ -7,7 +7,6 @@ from base64 import b64decode from json import loads, JSONDecodeError from os import sep from pathlib import Path -from re import search from threading import Thread from time import perf_counter, sleep, time @@ -19,9 +18,10 @@ from .chromium_element import ChromiumScroll, ChromiumElement, run_js, make_chro from .commons.constants import HANDLE_ALERT_METHOD, ERROR, NoneElement from .commons.locator import get_loc from .commons.tools import get_usable_path, clean_folder -from .commons.web import set_browser_cookies, DataPacket +from .commons.web import set_browser_cookies from .errors import ContextLossError, ElementLossError, AlertExistsError, CallMethodError, TabClosedError, \ NoRectError, BrowserConnectError +from .network_listener import NetworkListener from .session_element import make_session_ele @@ -966,7 +966,6 @@ class ChromiumBaseWaiter(object): :param page_or_ele: 页面对象或元素对象 """ self._driver = page_or_ele - self._listener = None def ele_delete(self, loc_or_ele, timeout=None): """等待元素从DOM中删除 @@ -1033,197 +1032,6 @@ class ChromiumBaseWaiter(object): return False -class NetworkListener(object): - def __init__(self, page): - self._page = page - self._targets = None - self._is_regex = False - self._results = {} - self._single = False - self._method = None - self._requests = {} - - self.is_listening = False - self._count = None - self._caught = 0 # 已获取到的数量 - self._driver = self._page.driver - - def set_targets(self, targets=None, is_regex=False, count=None, method=None): - """指定要等待的数据包 - :param targets: 要匹配的数据包url特征,可用list等传入多个,为None时获取所有 - :param is_regex: 设置的target是否正则表达式 - :param count: 设置总共等待多少个数据包,为None时每个目标等待1个 - :param method: 设置监听的请求类型,可用list等指定多个,为None时监听全部 - :return: None - """ - if not isinstance(targets, (str, list, tuple, set)) and targets is not None: - raise TypeError('targets只能是str、list、tuple、set、None。') - if targets is None: - targets = '' - - self._is_regex = is_regex - if isinstance(targets, str): - self._targets = {targets} - else: - self._targets = set(targets) - - self._count = len(self._targets) if not count else count - self._single = self._count == 1 - if method is not None: - if isinstance(method, str): - self._method = {method.upper()} - elif isinstance(method, (list, tuple, set)): - self._method = set(i.upper() for i in method) - else: - raise TypeError('method参数只能是str、list、tuple、set类型。') - self.start() - - def start(self): - self._driver.set_listener('Network.requestWillBeSent', self._requestWillBeSent) - self._driver.set_listener('Network.responseReceived', self._response_received) - self._driver.set_listener('Network.loadingFinished', self._loading_finished) - self._driver.set_listener('Network.loadingFailed', self._loading_failed) - self._driver.call_method('Network.enable') - self._requests = {} - # self._driver.set_listener('Fetch.requestPaused', self._request_paused) - # self._driver.call_method('Fetch.enable', patterns=[{'requestStage': 'Request'}, {'requestStage': 'Response'}]) - - def stop(self): - """停止监听数据包""" - self._driver.call_method('Network.disable') - self._driver.set_listener('Network.requestWillBeSent', None) - self._driver.set_listener('Network.responseReceived', None) - self._driver.set_listener('Network.loadingFinished', None) - self._driver.set_listener('Network.loadingFailed', None) - # self._driver.call_method('Fetch.disable') - # self._driver.set_listener('Fetch.requestPaused', None) - - def listen(self, timeout=None, any_one=False, asyn=False): - if asyn: - pass - else: - r = self._listen(timeout, any_one) - self._results = {} - return r - - def _listen(self, timeout=None, any_one=False): - """等待指定数据包加载完成 - :param timeout: 超时时间,为None则使用页面对象timeout - :param any_one: 多个target时,是否全部监听到才结束,为True时监听到一个目标就结束 - :return: ResponseData对象或监听结果字典 - """ - if self._targets is None: - raise RuntimeError('必须先用set_targets()设置等待目标。') - - self.is_listening = True - timeout = timeout if timeout is not None else self._page.timeout - end_time = perf_counter() + timeout - while perf_counter() < end_time and not ((any_one and self._caught) or self._caught >= self._count): - sleep(.1) - - self._requests = {} - self.is_listening = False - return self.results() - - @property - def results(self): - """返沪监听到的数据""" - return list(self._results.values())[0][0] if self._results and self._single else self._results - - def clear(self): - """清空已监听到的数据""" - self._results = {} - - def _requestWillBeSent(self, **kwargs): - """接收到请求时的回调函数""" - for target in self._targets: - if ((self._is_regex and search(target, kwargs['request']['url'])) or - (not self._is_regex and target in kwargs['request']['url'])) and ( - not self._method or kwargs['request']['method'] in self._method): - self._requests[kwargs['requestId']] = DataPacket(self._page.tab_id, target, kwargs) - - if kwargs['request'].get('hasPostData', None) and not kwargs['request'].get('postData', None): - self._requests[kwargs['requestId']]._raw_post_data = \ - self._page.run_cdp('Network.getRequestPostData', requestId=kwargs['requestId'])['postData'] - - break - - def _response_received(self, **kwargs): - """接收到返回信息时处理方法""" - request_id = kwargs['requestId'] - if request_id in self._requests: - self._requests[request_id]._raw_response = kwargs['response'] - self._requests[request_id]._resource_type = kwargs['type'] - - def _loading_finished(self, **kwargs): - """请求完成时处理方法""" - request_id = kwargs['requestId'] - if request_id in self._requests: - try: - r = self._page.run_cdp('Network.getResponseBody', requestId=request_id) - body = r['body'] - is_base64 = r['base64Encoded'] - except CallMethodError: - body = '' - is_base64 = False - - dp = self._requests[request_id] - target = dp.target - dp._raw_body = body - dp._base64_body = is_base64 - - if target in self._results: - self._results[target].append(dp) - else: - self._results[target] = [dp] - - self._caught += 1 - - def _loading_failed(self, **kwargs): - """请求失败时的回调方法""" - request_id = kwargs['requestId'] - if request_id in self._requests: - dp = self._requests[request_id] - target = dp.target - dp.errorText = kwargs['errorText'] - dp._resource_type = kwargs['type'] - - if target in self._results: - self._results[target].append(dp) - else: - self._results[target] = [dp] - - self._caught += 1 - - def _request_paused(self, **kwargs): - i = kwargs['requestId'] - if 'networkId' not in kwargs: - pass - # for target in self._targets: - # if (self._is_regex and search(target, kwargs['request']['url'])) or ( - # not self._is_regex and target in kwargs['request']['url']): - # dp = DataPacket(self._page.tab_id, target, kwargs) - # body = self._driver.call_method('Fetch.getResponseBody', requestId=i) - # dp._raw_body = body['body'] - # dp._base64_body = body['base64Encoded'] - # if 'networkId' in kwargs and kwargs['request'].get('hasPostData', None) \ - # and not kwargs['request'].get('postData', None): - # pd = self._driver.call_method('Network.getRequestPostData', requestId=kwargs['networkId']) - # if 'postData' in pd: - # dp._raw_post_data = pd['postData'] - # - # if target in self._results: - # self._results[target].append(dp) - # else: - # self._results[target] = [dp] - # - # self._caught += 1 - # break - - method = 'Request' if 'responseStatusCode' not in kwargs else 'Response' - self._driver.call_method(f'Fetch.continue{method}', requestId=i) - - class ChromiumPageScroll(ChromiumScroll): def __init__(self, page): """ diff --git a/DrissionPage/chromium_base.pyi b/DrissionPage/chromium_base.pyi index 3511318..aca0400 100644 --- a/DrissionPage/chromium_base.pyi +++ b/DrissionPage/chromium_base.pyi @@ -4,7 +4,7 @@ @Contact : g1879@qq.com """ from pathlib import Path -from typing import Union, Tuple, List, Any, Dict +from typing import Union, Tuple, List, Any from DataRecorder import Recorder from requests import Session @@ -15,7 +15,7 @@ from .chromium_driver import ChromiumDriver from .chromium_element import ChromiumElement, ChromiumScroll from .chromium_frame import ChromiumFrame from .commons.constants import NoneElement -from .commons.web import DataPacket +from .network_listener import NetworkListener from .session_element import SessionElement @@ -214,7 +214,6 @@ class ChromiumBase(BasePage): class ChromiumBaseWaiter(object): def __init__(self, page: ChromiumBase): self._driver: ChromiumBase = ... - self._listener: NetworkListener = ... def ele_delete(self, loc_or_ele: Union[str, tuple, ChromiumElement], timeout: float = None) -> bool: ... @@ -231,49 +230,6 @@ class ChromiumBaseWaiter(object): def upload_paths_inputted(self) -> None: ... -class NetworkListener(object): - def __init__(self, page: ChromiumBase): - self._page: ChromiumBase = ... - self._count: int = ... - self._caught: int = ... - self._targets: Union[str, dict] = ... - self._single: bool = ... - self._method: set = ... - self._results: Union[DataPacket, Dict[str, List[DataPacket]], False] = ... - self._is_regex: bool = ... - self._driver: ChromiumDriver = ... - self._requests: dict = ... - self.is_listening: bool = ... - - def set_targets(self, targets: Union[str, list, tuple, set, None] = None, is_regex: bool = False, - count: int = None, method: Union[str, list, tuple, set] = None) -> None: ... - - def start(self) -> None: ... - - def stop(self) -> None: ... - - @property - def results(self) -> Union[DataPacket, Dict[str, List[DataPacket]], False]: ... - - def clear(self) -> None: ... - - def listen(self, timeout: float = None, any_one: bool = False, - asyn: bool = False) -> Union[DataPacket, Dict[str, List[DataPacket]], False]: ... - - def _listen(self, timeout: float = None, - any_one: bool = False) -> Union[DataPacket, Dict[str, List[DataPacket]], False]: ... - - def _requestWillBeSent(self, **kwargs) -> None: ... - - def _response_received(self, **kwargs) -> None: ... - - def _loading_finished(self, **kwargs) -> None: ... - - def _loading_failed(self, **kwargs) -> None: ... - - def _request_paused(self, **kwargs) -> None: ... - - class ChromiumPageScroll(ChromiumScroll): def __init__(self, page: ChromiumBase): ... diff --git a/DrissionPage/commons/web.py b/DrissionPage/commons/web.py index cd5b16e..b9dd703 100644 --- a/DrissionPage/commons/web.py +++ b/DrissionPage/commons/web.py @@ -3,139 +3,15 @@ @Author : g1879 @Contact : g1879@qq.com """ -from base64 import b64decode from html import unescape from http.cookiejar import Cookie -from json import JSONDecodeError, loads from re import sub from urllib.parse import urlparse, urljoin, urlunparse from requests.cookies import RequestsCookieJar -from requests.structures import CaseInsensitiveDict from tldextract import extract -class DataPacket(object): - """返回的数据包管理类""" - - def __init__(self, tab, target, raw_request): - """ - :param tab: 产生这个数据包的tab的id - :param target: 监听目标 - :param raw_request: 原始request数据,从cdp获得 - """ - self.tab = tab - self.target = target - - self._raw_request = raw_request - self._raw_post_data = None - - self._raw_response = None - self._raw_body = None - self._base64_body = False - - self._request = None - self._response = None - self.errorText = None - self._resource_type = None - - @property - def url(self): - return self.request.url - - @property - def method(self): - return self.request.method - - @property - def frameId(self): - return self._raw_request.get('frameId') - - @property - def resourceType(self): - return self._resource_type - - @property - def request(self): - if self._request is None: - self._request = Request(self._raw_request['request'], self._raw_post_data) - return self._request - - @property - def response(self): - if self._response is None: - self._response = Response(self._raw_response, self._raw_body, self._base64_body) - return self._response - - -class Request(object): - def __init__(self, raw_request, post_data): - self._request = raw_request - self._raw_post_data = post_data - self._postData = None - self._headers = None - - def __getattr__(self, item): - return self._request.get(item, None) - - @property - def headers(self): - """以大小写不敏感字典返回headers数据""" - if self._headers is None: - self._headers = CaseInsensitiveDict(self._request['headers']) - return self._headers - - @property - def postData(self): - """返回postData数据""" - if self._postData is None: - if self._raw_post_data: - postData = self._raw_post_data - elif self._request.get('postData', None): - postData = self._request['postData'] - else: - postData = False - try: - self._postData = loads(postData) - except (JSONDecodeError, TypeError): - self._postData = postData - return self._postData - - -class Response(object): - def __init__(self, raw_response, raw_body, base64_body): - self._response = raw_response - self._raw_body = raw_body - self._is_base64_body = base64_body - self._body = None - self._headers = None - - def __getattr__(self, item): - return self._response.get(item, None) - - @property - def headers(self): - """以大小写不敏感字典返回headers数据""" - if self._headers is None: - self._headers = CaseInsensitiveDict(self._response['headers']) - return self._headers - - @property - def body(self): - """返回body内容,如果是json格式,自动进行转换,如果时图片格式,进行base64转换,其它格式直接返回文本""" - if self._body is None: - if self._is_base64_body: - self._body = b64decode(self._raw_body) - - else: - try: - self._body = loads(self._raw_body) - except (JSONDecodeError, TypeError): - self._body = self._raw_body - - return self._body - - def get_ele_txt(e): """获取元素内所有文本 :param e: 元素对象 diff --git a/DrissionPage/commons/web.pyi b/DrissionPage/commons/web.pyi index 9c87f1a..b91ba71 100644 --- a/DrissionPage/commons/web.pyi +++ b/DrissionPage/commons/web.pyi @@ -8,93 +8,12 @@ from typing import Union from requests import Session from requests.cookies import RequestsCookieJar -from requests.structures import CaseInsensitiveDict from DrissionPage.base import DrissionElement, BasePage from DrissionPage.chromium_element import ChromiumElement from DrissionPage.chromium_base import ChromiumBase -class DataPacket(object): - """返回的数据包管理类""" - - def __init__(self, tab: str, target: str, raw_info: dict): - self.tab: str = ... - self.target: str = ... - self._raw_request: dict = ... - self._raw_response: dict = ... - self._raw_post_data: str = ... - self._raw_body: str = ... - self._base64_body: bool = ... - self._request: Request = ... - self._response: Response = ... - self.errorText: str = ... - self._resource_type: str = ... - - @property - def url(self) -> str: ... - - @property - def method(self) -> str: ... - - @property - def frameId(self) -> str: ... - - @property - def resourceType(self) -> str: ... - - @property - def request(self) -> Request: ... - - @property - def response(self) -> Response: ... - - -class Request(object): - url: str = ... - _headers: Union[CaseInsensitiveDict, None] = ... - method: str = ... - - # urlFragment: str = ... - # postDataEntries: list = ... - # mixedContentType: str = ... - # initialPriority: str = ... - # referrerPolicy: str = ... - # isLinkPreload: bool = ... - # trustTokenParams: dict = ... - # isSameSite: bool = ... - - def __init__(self, raw_request: dict, post_data: str): - self._request: dict = ... - self._raw_post_data: str = ... - self._postData: str = ... - - @property - def headers(self) -> dict: ... - - @property - def postData(self) -> Union[str, dict]: ... - - -class Response(object): - status: str = ... - statusText: int = ... - mimeType: str = ... - - def __init__(self, raw_response: dict, raw_body: str, base64_body: bool): - self._response: dict = ... - self._raw_body: str = ... - self._is_base64_body: bool = ... - self._body: Union[str, dict] = ... - self._headers: dict = ... - - @property - def headers(self) -> CaseInsensitiveDict: ... - - @property - def body(self) -> Union[str, dict, bool]: ... - - def get_ele_txt(e: DrissionElement) -> str: ... diff --git a/DrissionPage/network_listener.py b/DrissionPage/network_listener.py new file mode 100644 index 0000000..f1e8a34 --- /dev/null +++ b/DrissionPage/network_listener.py @@ -0,0 +1,330 @@ +# -*- coding:utf-8 -*- +from base64 import b64decode +from json import JSONDecodeError, loads +from queue import Queue +from re import search +from threading import Thread +from time import perf_counter, sleep + +from requests.structures import CaseInsensitiveDict + +from .errors import CallMethodError + + +class NetworkListener(object): + """监听器基类""" + + def __init__(self, page): + """ + :param page: ChromiumBase对象 + """ + self._page = page + self._driver = self._page.driver + + self._tmp = None # 临存捕捉到的数据 + self._request_ids = None # 暂存须要拦截的请求id + + self._total_count = None # 当次监听的数量上限 + self._caught_count = None # 当次已监听到的数量 + self._begin_time = None # 当次监听开始时间 + self._timeout = None # 当次监听超时时间 + + self.listening = False + self._targets = None # 默认监听所有 + self.tab_id = None # 当前tab的id + self._results = [] + + self._is_regex = False + self._method = None + + def set_targets(self, targets=True, is_regex=False, method=None): + """指定要等待的数据包 + :param targets: 要匹配的数据包url特征,可用list等传入多个,为True时获取所有 + :param is_regex: 设置的target是否正则表达式 + :param method: 设置监听的请求类型,可用list等指定多个,为None时监听全部 + :return: None + """ + if targets is not None: + if not isinstance(targets, (str, list, tuple, set)) and targets is not True: + raise TypeError('targets只能是str、list、tuple、set、True。') + if targets is True: + targets = '' + + if isinstance(targets, str): + self._targets = {targets} + else: + self._targets = set(targets) + + self._is_regex = is_regex + + if method is not None: + if isinstance(method, str): + self._method = {method.upper()} + elif isinstance(method, (list, tuple, set)): + self._method = set(i.upper() for i in method) + else: + raise TypeError('method参数只能是str、list、tuple、set类型。') + + def listen(self, targets=None, count=None, timeout=None, asyn=True): + """拦截目标请求,直到超时或达到拦截个数,每次拦截前清空结果 + 可监听多个目标,请求url包含这些字符串就会被记录 + :param targets: 要监听的目标字符串或其组成的列表,True监听所有,None则保留之前的目标不变 + :param count: 要记录的个数,到达个数停止监听 + :param timeout: 监听最长时间,到时间即使未达到记录个数也停止,None为无限长 + :param asyn: 是否异步监听 + :return: None + """ + if targets: + self.set_targets(targets) + + self.listening = True + self._results = [] + self._request_ids = {} + self._tmp = Queue(maxsize=0) + + self._caught_count = 0 + self._begin_time = perf_counter() + self._timeout = timeout + + self._set_callback_func() + + if asyn: + self._total_count = count + Thread(target=self._wait_to_stop).start() + else: + self._total_count = len(self._targets) if not count else count + self._wait_to_stop() + + def stop(self): + """停止监听""" + self._stop() + if self.listening: + self.listening = False + + def wait(self): + """等等监听结束""" + while self.listening: + sleep(.5) + + def get_results(self, target=None): + """获取结果列表 + :param target: 要获取的目标,为None时获取全部 + :return: 结果数据组成的列表 + """ + return self._results if target is None else [i for i in self._results if i.target == target] + + def _wait_to_stop(self): + """当收到停止信号、到达须获取结果数、到时间就停止""" + while self._is_continue(): + sleep(.5) + + self.stop() + + def _is_continue(self): + """是否继续当前监听""" + return self.listening \ + and (self._total_count is None or self._caught_count < self._total_count) \ + and (self._timeout is None or perf_counter() - self._begin_time < self._timeout) + + def steps(self, gap=1): + """用于单步操作,可实现没收到若干个数据包执行一步操作(如翻页) + :param gap: 每接收到多少个数据包触发 + :return: 用于在接收到监听目标时触发动作的可迭代对象 + """ + if not isinstance(gap, int) or gap < 1: + raise ValueError('gap参数必须为大于0的整数。') + while self.listening or not self._tmp.empty(): + while self._tmp.qsize() >= gap: + yield self._tmp.get(False) if gap == 1 else [self._tmp.get(False) for _ in range(gap)] + + sleep(.1) + + def _set_callback_func(self): + """设置监听请求的回调函数""" + self._driver.set_listener('Network.requestWillBeSent', self._requestWillBeSent) + self._driver.set_listener('Network.responseReceived', self._response_received) + self._driver.set_listener('Network.loadingFinished', self._loading_finished) + self._driver.set_listener('Network.loadingFailed', self._loading_failed) + self._driver.call_method('Network.enable') + + def _stop(self) -> None: + """停止监听前要做的工作""" + self._driver.set_listener('Network.requestWillBeSent', None) + self._driver.set_listener('Network.responseReceived', None) + self._driver.set_listener('Network.loadingFinished', None) + self._driver.set_listener('Network.loadingFailed', None) + self._driver.call_method('Network.disable') + + def _requestWillBeSent(self, **kwargs): + """接收到请求时的回调函数""" + for target in self._targets: + if ((self._is_regex and search(target, kwargs['request']['url'])) or + (not self._is_regex and target in kwargs['request']['url'])) and ( + not self._method or kwargs['request']['method'] in self._method): + self._request_ids[kwargs['requestId']] = DataPacket(self._page.tab_id, target, kwargs) + + if kwargs['request'].get('hasPostData', None) and not kwargs['request'].get('postData', None): + self._request_ids[kwargs['requestId']]._raw_post_data = \ + self._page.run_cdp('Network.getRequestPostData', requestId=kwargs['requestId'])['postData'] + + break + + def _response_received(self, **kwargs): + """接收到返回信息时处理方法""" + request_id = kwargs['requestId'] + if request_id in self._request_ids: + self._request_ids[request_id]._raw_response = kwargs['response'] + self._request_ids[request_id]._resource_type = kwargs['type'] + + def _loading_finished(self, **kwargs): + """请求完成时处理方法""" + request_id = kwargs['requestId'] + if request_id in self._request_ids: + try: + r = self._page.run_cdp('Network.getResponseBody', requestId=request_id) + body = r['body'] + is_base64 = r['base64Encoded'] + except CallMethodError: + body = '' + is_base64 = False + + dp = self._request_ids[request_id] + dp._raw_body = body + dp._base64_body = is_base64 + + self._tmp.put(dp) + self._results.append(dp) + self._caught_count += 1 + + def _loading_failed(self, **kwargs): + """请求失败时的回调方法""" + request_id = kwargs['requestId'] + if request_id in self._request_ids: + dp = self._request_ids[request_id] + dp.errorText = kwargs['errorText'] + dp._resource_type = kwargs['type'] + + self._tmp.put(dp) + self._results.append(dp) + self._caught_count += 1 + + +class DataPacket(object): + """返回的数据包管理类""" + + def __init__(self, tab, target, raw_request): + """ + :param tab: 产生这个数据包的tab的id + :param target: 监听目标 + :param raw_request: 原始request数据,从cdp获得 + """ + self.tab = tab + self.target = target + + self._raw_request = raw_request + self._raw_post_data = None + + self._raw_response = None + self._raw_body = None + self._base64_body = False + + self._request = None + self._response = None + self.errorText = None + self._resource_type = None + + @property + def url(self): + return self.request.url + + @property + def method(self): + return self.request.method + + @property + def frameId(self): + return self._raw_request.get('frameId') + + @property + def resourceType(self): + return self._resource_type + + @property + def request(self): + if self._request is None: + self._request = Request(self._raw_request['request'], self._raw_post_data) + return self._request + + @property + def response(self): + if self._response is None: + self._response = Response(self._raw_response, self._raw_body, self._base64_body) + return self._response + + +class Request(object): + def __init__(self, raw_request, post_data): + self._request = raw_request + self._raw_post_data = post_data + self._postData = None + self._headers = None + + def __getattr__(self, item): + return self._request.get(item, None) + + @property + def headers(self): + """以大小写不敏感字典返回headers数据""" + if self._headers is None: + self._headers = CaseInsensitiveDict(self._request['headers']) + return self._headers + + @property + def postData(self): + """返回postData数据""" + if self._postData is None: + if self._raw_post_data: + postData = self._raw_post_data + elif self._request.get('postData', None): + postData = self._request['postData'] + else: + postData = False + try: + self._postData = loads(postData) + except (JSONDecodeError, TypeError): + self._postData = postData + return self._postData + + +class Response(object): + def __init__(self, raw_response, raw_body, base64_body): + self._response = raw_response + self._raw_body = raw_body + self._is_base64_body = base64_body + self._body = None + self._headers = None + + def __getattr__(self, item): + return self._response.get(item, None) + + @property + def headers(self): + """以大小写不敏感字典返回headers数据""" + if self._headers is None: + self._headers = CaseInsensitiveDict(self._response['headers']) + return self._headers + + @property + def body(self): + """返回body内容,如果是json格式,自动进行转换,如果时图片格式,进行base64转换,其它格式直接返回文本""" + if self._body is None: + if self._is_base64_body: + self._body = b64decode(self._raw_body) + + else: + try: + self._body = loads(self._raw_body) + except (JSONDecodeError, TypeError): + self._body = self._raw_body + + return self._body diff --git a/DrissionPage/network_listener.pyi b/DrissionPage/network_listener.pyi new file mode 100644 index 0000000..ddfe480 --- /dev/null +++ b/DrissionPage/network_listener.pyi @@ -0,0 +1,142 @@ +from queue import Queue +from typing import Union, Dict, List, Iterable, Tuple + +from requests.structures import CaseInsensitiveDict + +from chromium_base import ChromiumBase +from chromium_driver import ChromiumDriver + + +class NetworkListener(object): + def __init__(self, page: ChromiumBase): + self._page: ChromiumBase = ... + self._total_count: int = ... + self._caught_count: int = ... + self._targets: Union[str, dict] = ... + self._results: list = ... + self._method: set = ... + self._tmp: Queue = ... + self._is_regex: bool = ... + self._driver: ChromiumDriver = ... + self._request_ids: dict = ... + self.listening: bool = ... + self._timeout: float = ... + self._begin_time: float = ... + + def set_targets(self, targets: Union[str, list, tuple, set, None] = None, is_regex: bool = False, + count: int = None, method: Union[str, list, tuple, set] = None) -> None: ... + + def start(self) -> None: ... + + def stop(self) -> None: ... + + @property + def results(self) -> Union[DataPacket, Dict[str, List[DataPacket]], False]: ... + + def clear(self) -> None: ... + + def listen(self, targets: Union[str, List[str], Tuple, bool, None] = ..., count: int = ..., timeout: float = ..., + asyn: bool = ...) -> Union[DataPacket, Dict[str, List[DataPacket]], False]: ... + + def _listen(self, timeout: float = None, + any_one: bool = False) -> Union[DataPacket, Dict[str, List[DataPacket]], False]: ... + + def _requestWillBeSent(self, **kwargs) -> None: ... + + def _response_received(self, **kwargs) -> None: ... + + def _loading_finished(self, **kwargs) -> None: ... + + def _loading_failed(self, **kwargs) -> None: ... + + def _request_paused(self, **kwargs) -> None: ... + + def _wait_to_stop(self) -> None: ... + + def _is_continue(self) -> bool: ... + + def steps(self, gap=1) -> Iterable[Union[DataPacket, List[DataPacket]]]: ... + + def _set_callback_func(self) -> None: ... + + def _stop(self) -> None: ... + + +class DataPacket(object): + """返回的数据包管理类""" + + def __init__(self, tab: str, target: str, raw_info: dict): + self.tab: str = ... + self.target: str = ... + self._raw_request: dict = ... + self._raw_response: dict = ... + self._raw_post_data: str = ... + self._raw_body: str = ... + self._base64_body: bool = ... + self._request: Request = ... + self._response: Response = ... + self.errorText: str = ... + self._resource_type: str = ... + + @property + def url(self) -> str: ... + + @property + def method(self) -> str: ... + + @property + def frameId(self) -> str: ... + + @property + def resourceType(self) -> str: ... + + @property + def request(self) -> Request: ... + + @property + def response(self) -> Response: ... + + +class Request(object): + url: str = ... + _headers: Union[CaseInsensitiveDict, None] = ... + method: str = ... + + # urlFragment: str = ... + # postDataEntries: list = ... + # mixedContentType: str = ... + # initialPriority: str = ... + # referrerPolicy: str = ... + # isLinkPreload: bool = ... + # trustTokenParams: dict = ... + # isSameSite: bool = ... + + def __init__(self, raw_request: dict, post_data: str): + self._request: dict = ... + self._raw_post_data: str = ... + self._postData: str = ... + + @property + def headers(self) -> dict: ... + + @property + def postData(self) -> Union[str, dict]: ... + + +class Response(object): + status: str = ... + statusText: int = ... + mimeType: str = ... + + def __init__(self, raw_response: dict, raw_body: str, base64_body: bool): + self._response: dict = ... + self._raw_body: str = ... + self._is_base64_body: bool = ... + self._body: Union[str, dict] = ... + self._headers: dict = ... + + @property + def headers(self) -> CaseInsensitiveDict: ... + + @property + def body(self) -> Union[str, dict, bool]: ...