diff --git a/DrissionPage/chromium_base.py b/DrissionPage/chromium_base.py index e5eeb66..33274ab 100644 --- a/DrissionPage/chromium_base.py +++ b/DrissionPage/chromium_base.py @@ -11,6 +11,7 @@ from re import search from threading import Thread from time import perf_counter, sleep, time +from FlowViewer.listener import ResponseData from requests import Session from .base import BasePage @@ -19,7 +20,6 @@ from .chromium_element import ChromiumScroll, ChromiumElement, run_js, make_chro from .commons.constants import HANDLE_ALERT_METHOD, ERROR, NoneElement from .commons.locator import get_loc from .commons.tools import get_usable_path, clean_folder -from .commons.web import DataPacket from .commons.web import set_browser_cookies from .errors import ContextLossError, ElementLossError, AlertExistsError, CallMethodError, TabClosedError, \ NoRectError, BrowserConnectError @@ -1034,7 +1034,6 @@ class ChromiumBaseWaiter(object): if not self._listener: self._listener = NetworkListener(self._driver) self._listener.set_targets(targets, is_regex) - self._listener.start() def data_packets(self, timeout=None, any_one=False): """等待指定数据包加载完成 @@ -1060,21 +1059,16 @@ class NetworkListener(object): self._is_regex = False self._results = {} self._single = False + self._requests = {} - self._count = None - self._caught = 0 # 已获取到的数量 - self._driver = self._page.driver - - def set_targets(self, targets, is_regex=False, count=None): + def set_targets(self, targets, is_regex=False): """指定要等待的数据包 :param targets: 要匹配的数据包url特征,可用list等传入多个 :param is_regex: 设置的target是否正则表达式 - :param count: 设置总共等待多少个数据包,为None时每个目标等待1个 :return: None """ if not isinstance(targets, (str, list, tuple, set)): raise TypeError('targets只能是str、list、tuple、set。') - self._is_regex = is_regex if isinstance(targets, str): self._targets = {targets} @@ -1082,19 +1076,20 @@ class NetworkListener(object): else: self._targets = set(targets) self._single = False - if count is None: - self._count = len(self._targets) - - def start(self): - self._driver.set_listener('Fetch.requestPaused', self._request_paused) - self._driver.call_method('Network.enable') - self._driver.call_method('Fetch.enable') + self._page.run_cdp('Network.enable') + if targets is not None: + self._page.driver.Network.requestWillBeSent = self._requestWillBeSent + self._page.driver.Network.responseReceived = self._response_received + self._page.driver.Network.loadingFinished = self._loading_finished + else: + self.stop() def stop(self): """停止监听数据包""" - self._driver.set_listener('Fetch.requestPaused', None) - self._driver.call_method('Fetch.disable') - self._driver.call_method('Network.disable') + self._page.run_cdp('Network.disable') + self._page.driver.Network.requestWillBeSent = None + self._page.driver.Network.responseReceived = None + self._page.driver.Network.loadingFinished = None def listen(self, timeout=None, any_one=False): """等待指定数据包加载完成 @@ -1108,47 +1103,51 @@ class NetworkListener(object): timeout = timeout if timeout is not None else self._page.timeout end_time = perf_counter() + timeout while perf_counter() < end_time: - if self._caught >= self._count or (any_one and self._caught): + if self._results and (any_one or set(self._results) == self._targets): break sleep(.1) - if self._caught == 0: - r = False - else: - r = list(self._results.values())[0] if self._single else self._results - + self._requests = {} + if not self._results: + return False + r = list(self._results.values())[0] if self._single else self._results self._results = {} - self._caught = 0 return r - def _request_paused(self, **kwargs): - i = kwargs['requestId'] - if 'responseStatusCode' in kwargs: - for target in self._targets: - if (self._is_regex and search(target, kwargs['request']['url'])) or ( - not self._is_regex and target in kwargs['request']['url']): - dp = DataPacket(self._page.tab_id, target, kwargs) - body = self._driver.call_method('Fetch.getResponseBody', requestId=i) - dp._raw_body = body['body'] - dp._base64_body = body['base64Encoded'] - if 'networkId' in kwargs and kwargs['request'].get('hasPostData', None) \ - and not kwargs['request'].get('postData', None): - pd = self._driver.call_method('Network.getRequestPostData', requestId=kwargs['networkId']) - if 'postData' in pd: - dp._raw_post_data = pd['postData'] + def _response_received(self, **kwargs): + """接收到返回信息时处理方法""" + if kwargs['requestId'] in self._requests: + self._requests[kwargs['requestId']]['response'] = kwargs['response'] - if target in self._results: - self._results[target].append(dp) - else: - self._results[target] = [dp] + def _loading_finished(self, **kwargs): + """请求完成时处理方法""" + request_id = kwargs['requestId'] + if request_id in self._requests: + try: + r = self._page.run_cdp('Network.getResponseBody', requestId=request_id) + body = r['body'] + is_base64 = r['base64Encoded'] + except CallMethodError: + body = '' + is_base64 = False - break + request = self._requests[request_id] + target = request['target'] + rd = ResponseData(request_id, request['response'], body, self._page.tab_id, target) + rd.postData = request['post_data'] + rd._base64_body = is_base64 + rd.requestHeaders = request['request_headers'] + self._results[target] = rd - self._driver.call_method('Fetch.continueResponse', requestId=i) - self._caught += 1 - - else: # request - self._driver.call_method('Fetch.continueRequest', requestId=i) + def _requestWillBeSent(self, **kwargs): + """接收到请求时的回调函数""" + for target in self._targets: + if (self._is_regex and search(target, kwargs['request']['url'])) or ( + not self._is_regex and target in kwargs['request']['url']): + self._requests[kwargs['requestId']] = {'target': target, + 'post_data': kwargs['request'].get('postData', None), + 'request_headers': kwargs['request']['headers']} + break class ChromiumPageScroll(ChromiumScroll): @@ -1307,7 +1306,7 @@ class Screencast(object): DrissionPage_Screencast_blob_ok = true; }) mediaRecorder.start() - + mediaRecorder.addEventListener('stop', function(){ while(DrissionPage_Screencast_blob_ok==false){} DrissionPage_Screencast_blob = new Blob(DrissionPage_Screencast_chunks, diff --git a/DrissionPage/chromium_base.pyi b/DrissionPage/chromium_base.pyi index 3e2ab36..2723ebc 100644 --- a/DrissionPage/chromium_base.pyi +++ b/DrissionPage/chromium_base.pyi @@ -7,6 +7,7 @@ from pathlib import Path from typing import Union, Tuple, List, Any, Dict from DataRecorder import Recorder +from FlowViewer.listener import ResponseData from requests import Session from requests.cookies import RequestsCookieJar @@ -15,7 +16,6 @@ from .chromium_driver import ChromiumDriver from .chromium_element import ChromiumElement, ChromiumScroll from .chromium_frame import ChromiumFrame from .commons.constants import NoneElement -from .commons.web import DataPacket from .session_element import SessionElement @@ -231,37 +231,37 @@ class ChromiumBaseWaiter(object): def load_complete(self, timeout: float = None) -> bool: ... - def upload_paths_inputted(self) -> None: ... - def set_targets(self, targets: Union[str, list, tuple, set], is_regex: bool = False) -> None: ... def stop_listening(self) -> None: ... def data_packets(self, timeout: float = None, - any_one: bool = False) -> Union[DataPacket, Dict[str, List[DataPacket]], False]: ... + any_one: bool = False) -> Union[ResponseData, Dict[str, ResponseData], False]: ... + + def upload_paths_inputted(self) -> None: ... class NetworkListener(object): - def __init__(self, page: ChromiumBase): + def __init__(self, page): self._page: ChromiumBase = ... - self._count: int = ... - self._caught: int = ... self._targets: Union[str, dict] = ... self._single: bool = ... - self._results: Union[DataPacket, Dict[str, List[DataPacket]], False] = ... + self._results: Union[ResponseData, Dict[str, ResponseData], False] = ... self._is_regex: bool = ... - self._driver: ChromiumDriver = ... + self._requests: dict = ... - def set_targets(self, targets: Union[str, list, tuple, set], is_regex: bool = False, count: int = None) -> None: ... - - def start(self) -> None: ... + def set_targets(self, targets: Union[str, list, tuple, set], is_regex: bool = False) -> None: ... def stop(self) -> None: ... def listen(self, timeout: float = None, - any_one: bool = False) -> Union[DataPacket, Dict[str, List[DataPacket]], False]: ... + any_one: bool = False) -> Union[ResponseData, Dict[str, ResponseData], False]: ... - def _request_paused(self, **kwargs) -> None: ... + def _response_received(self, **kwargs) -> None: ... + + def _loading_finished(self, **kwargs) -> None: ... + + def _requestWillBeSent(self, **kwargs) -> None: ... class ChromiumPageScroll(ChromiumScroll): @@ -366,4 +366,4 @@ class ScreencastMode(object): def frugal_imgs_mode(self) -> None: ... - def imgs_mode(self) -> None: ... + def imgs_mode(self) -> None: ... \ No newline at end of file diff --git a/DrissionPage/chromium_page.py b/DrissionPage/chromium_page.py index 18a19d4..16d7a7b 100644 --- a/DrissionPage/chromium_page.py +++ b/DrissionPage/chromium_page.py @@ -5,14 +5,13 @@ """ from pathlib import Path from platform import system -from re import search from threading import Thread from time import perf_counter, sleep from warnings import warn from requests import Session -from .chromium_base import ChromiumBase, Timeout, ChromiumBaseSetter, ChromiumBaseWaiter, NetworkListener +from .chromium_base import ChromiumBase, Timeout, ChromiumBaseSetter, ChromiumBaseWaiter from .chromium_driver import ChromiumDriver from .chromium_tab import ChromiumTab from .commons.browser import connect_browser @@ -401,32 +400,6 @@ class ChromiumPageWaiter(ChromiumBaseWaiter): while self._driver.tab_id == self._driver.latest_tab and perf_counter() < end_time: sleep(.01) - def set_targets(self, targets, is_regex=False): - """指定要等待的数据包 - :param targets: 要匹配的数据包url特征,可用list等传入多个 - :param is_regex: 设置的target是否正则表达式 - :return: None - """ - if not self._listener: - self._listener = NetworkListener(self._driver) - self._listener.set_targets(targets, is_regex) - - def data_packets(self, timeout=None, any_one=False): - """等待指定数据包加载完成 - :param timeout: 超时时间,为None则使用页面对象timeout - :param any_one: 多个target时,是否全部监听到才结束,为True时监听到一个目标就结束 - :return: ResponseData对象或监听结果字典 - """ - if not self._listener: - self._listener = NetworkListener(self._driver) - return self._listener.listen(timeout, any_one) - - def stop_listening(self): - """停止监听数据包""" - if not self._listener: - self._listener = NetworkListener(self._driver) - self._listener.stop() - class ChromiumTabRect(object): def __init__(self, page): diff --git a/DrissionPage/commons/web.py b/DrissionPage/commons/web.py index 13255b5..b9dd703 100644 --- a/DrissionPage/commons/web.py +++ b/DrissionPage/commons/web.py @@ -3,151 +3,15 @@ @Author : g1879 @Contact : g1879@qq.com """ -from base64 import b64decode from html import unescape from http.cookiejar import Cookie -from json import JSONDecodeError, loads from re import sub from urllib.parse import urlparse, urljoin, urlunparse from requests.cookies import RequestsCookieJar -from requests.structures import CaseInsensitiveDict from tldextract import extract -class DataPacket(object): - """返回的数据包管理类""" - - def __init__(self, tab, target, raw_info): - """ - :param request_id: request id - :param tab: 产生这个数据包的tab的id - :param target: 监听目标 - :param raw_request: 原始request数据,从cdp获得 - """ - self.tab = tab - self.target = target - - self._raw_info = raw_info - self._raw_post_data = None - - self._raw_body = None - self._base64_body = False - - self._request = None - self._response = None - - def __repr__(self): - return f'' - - @property - def requestId(self): - return self._raw_info['requestId'] - - @property - def url(self): - return self.request.url - - @property - def method(self): - return self.request.method - - @property - def frameId(self): - return self._raw_info['frameId'] - - @property - def resourceType(self): - return self._raw_info['resourceType'] - - @property - def request(self): - if self._request is None: - self._request = Request(self._raw_info['request'], self._raw_post_data) - return self._request - - @property - def response(self): - if self._response is None: - self._response = Response(self._raw_info, self._raw_body, self._base64_body) - return self._response - - -class Request(object): - __slots__ = ('url', 'urlFragment', 'postDataEntries', 'mixedContentType', 'initialPriority', - 'referrerPolicy', 'isLinkPreload', 'trustTokenParams', 'isSameSite', - '_request', '_raw_post_data', '_postData') - - def __init__(self, raw_request, post_data): - self._request = raw_request - self._raw_post_data = post_data - self._postData = None - - def __getattr__(self, item): - return self._request.get(item, None) - - @property - def headers(self): - """以大小写不敏感字典返回headers数据""" - return CaseInsensitiveDict(self._request['request']['headers']) - - @property - def postData(self): - """返回postData数据""" - if self._postData is None: - if self._raw_post_data: - postData = self._raw_post_data - elif self._request.get('postData', None): - postData = self._request['postData'] - else: - postData = False - try: - self._postData = loads(postData) - except JSONDecodeError: - self._postData = postData - return self._postData - - -class Response(object): - __slots__ = ('responseErrorReason', 'responseStatusCode', 'responseStatusText', - '_response', '_raw_body', '_is_base64_body', '_body', '_headers') - - def __init__(self, raw_response, raw_body, base64_body): - self._response = raw_response - self._raw_body = raw_body - self._is_base64_body = base64_body - self._body = None - self._headers = None - - def __getattr__(self, item): - return self._response.get(item, None) - - @property - def headers(self): - if self._headers is None: - if 'responseHeaders' in self._response: - headers = {i['name']: i['value'] for i in self._response['responseHeaders']} - self._headers = CaseInsensitiveDict(headers) - else: - self._headers = False - return self._headers - - @property - def body(self): - """返回body内容,如果是json格式,自动进行转换,如果时图片格式,进行base64转换,其它格式直接返回文本""" - if self._body is None: - if self._is_base64_body: - self._body = b64decode(self._raw_body) - - else: - try: - self._body = loads(self._raw_body) - except JSONDecodeError: - self._body = self._raw_body - - return self._body - - def get_ele_txt(e): """获取元素内所有文本 :param e: 元素对象 @@ -293,7 +157,7 @@ def make_absolute_link(link, page=None): def is_js_func(func): """检查文本是否js函数""" func = func.strip() - if func.startswith('function') or func.startswith('async '): + if (func.startswith('function') or func.startswith('async ')) and func.endswith('}'): return True elif '=>' in func: return True diff --git a/DrissionPage/configs/chromium_options.py b/DrissionPage/configs/chromium_options.py index 49f395b..e31d97e 100644 --- a/DrissionPage/configs/chromium_options.py +++ b/DrissionPage/configs/chromium_options.py @@ -417,7 +417,7 @@ class PortFinder(object): """查找一个可用端口 :return: 可以使用的端口和用户文件夹路径组成的元组 """ - for i in range(9600, 9800): + for i in range(9600, 19800): if i in PortFinder.used_port or port_is_using('127.0.0.1', i): continue diff --git a/setup.py b/setup.py index dc49e70..cff1fbd 100644 --- a/setup.py +++ b/setup.py @@ -6,7 +6,7 @@ with open("README.md", "r", encoding='utf-8') as fh: setup( name="DrissionPage", - version="3.2.26", + version="3.2.27", author="g1879", author_email="g1879@qq.com", description="Python based web automation tool. It can control the browser and send and receive data packets.", @@ -25,7 +25,7 @@ setup( 'DownloadKit>=0.5.3', 'FlowViewer>=0.3.0', 'websocket-client', - 'click~=8.1.3', + 'click', 'tldextract' ], classifiers=[