This commit is contained in:
g1879 2023-05-02 19:45:22 +08:00
parent 561c20377a
commit 09c4e98072
6 changed files with 71 additions and 235 deletions

View File

@ -11,6 +11,7 @@ from re import search
from threading import Thread
from time import perf_counter, sleep, time
from FlowViewer.listener import ResponseData
from requests import Session
from .base import BasePage
@ -19,7 +20,6 @@ from .chromium_element import ChromiumScroll, ChromiumElement, run_js, make_chro
from .commons.constants import HANDLE_ALERT_METHOD, ERROR, NoneElement
from .commons.locator import get_loc
from .commons.tools import get_usable_path, clean_folder
from .commons.web import DataPacket
from .commons.web import set_browser_cookies
from .errors import ContextLossError, ElementLossError, AlertExistsError, CallMethodError, TabClosedError, \
NoRectError, BrowserConnectError
@ -1034,7 +1034,6 @@ class ChromiumBaseWaiter(object):
if not self._listener:
self._listener = NetworkListener(self._driver)
self._listener.set_targets(targets, is_regex)
self._listener.start()
def data_packets(self, timeout=None, any_one=False):
"""等待指定数据包加载完成
@ -1060,21 +1059,16 @@ class NetworkListener(object):
self._is_regex = False
self._results = {}
self._single = False
self._requests = {}
self._count = None
self._caught = 0 # 已获取到的数量
self._driver = self._page.driver
def set_targets(self, targets, is_regex=False, count=None):
def set_targets(self, targets, is_regex=False):
"""指定要等待的数据包
:param targets: 要匹配的数据包url特征可用list等传入多个
:param is_regex: 设置的target是否正则表达式
:param count: 设置总共等待多少个数据包为None时每个目标等待1个
:return: None
"""
if not isinstance(targets, (str, list, tuple, set)):
raise TypeError('targets只能是str、list、tuple、set。')
self._is_regex = is_regex
if isinstance(targets, str):
self._targets = {targets}
@ -1082,19 +1076,20 @@ class NetworkListener(object):
else:
self._targets = set(targets)
self._single = False
if count is None:
self._count = len(self._targets)
def start(self):
self._driver.set_listener('Fetch.requestPaused', self._request_paused)
self._driver.call_method('Network.enable')
self._driver.call_method('Fetch.enable')
self._page.run_cdp('Network.enable')
if targets is not None:
self._page.driver.Network.requestWillBeSent = self._requestWillBeSent
self._page.driver.Network.responseReceived = self._response_received
self._page.driver.Network.loadingFinished = self._loading_finished
else:
self.stop()
def stop(self):
"""停止监听数据包"""
self._driver.set_listener('Fetch.requestPaused', None)
self._driver.call_method('Fetch.disable')
self._driver.call_method('Network.disable')
self._page.run_cdp('Network.disable')
self._page.driver.Network.requestWillBeSent = None
self._page.driver.Network.responseReceived = None
self._page.driver.Network.loadingFinished = None
def listen(self, timeout=None, any_one=False):
"""等待指定数据包加载完成
@ -1108,47 +1103,51 @@ class NetworkListener(object):
timeout = timeout if timeout is not None else self._page.timeout
end_time = perf_counter() + timeout
while perf_counter() < end_time:
if self._caught >= self._count or (any_one and self._caught):
if self._results and (any_one or set(self._results) == self._targets):
break
sleep(.1)
if self._caught == 0:
r = False
else:
r = list(self._results.values())[0] if self._single else self._results
self._requests = {}
if not self._results:
return False
r = list(self._results.values())[0] if self._single else self._results
self._results = {}
self._caught = 0
return r
def _request_paused(self, **kwargs):
i = kwargs['requestId']
if 'responseStatusCode' in kwargs:
for target in self._targets:
if (self._is_regex and search(target, kwargs['request']['url'])) or (
not self._is_regex and target in kwargs['request']['url']):
dp = DataPacket(self._page.tab_id, target, kwargs)
body = self._driver.call_method('Fetch.getResponseBody', requestId=i)
dp._raw_body = body['body']
dp._base64_body = body['base64Encoded']
if 'networkId' in kwargs and kwargs['request'].get('hasPostData', None) \
and not kwargs['request'].get('postData', None):
pd = self._driver.call_method('Network.getRequestPostData', requestId=kwargs['networkId'])
if 'postData' in pd:
dp._raw_post_data = pd['postData']
def _response_received(self, **kwargs):
"""接收到返回信息时处理方法"""
if kwargs['requestId'] in self._requests:
self._requests[kwargs['requestId']]['response'] = kwargs['response']
if target in self._results:
self._results[target].append(dp)
else:
self._results[target] = [dp]
def _loading_finished(self, **kwargs):
"""请求完成时处理方法"""
request_id = kwargs['requestId']
if request_id in self._requests:
try:
r = self._page.run_cdp('Network.getResponseBody', requestId=request_id)
body = r['body']
is_base64 = r['base64Encoded']
except CallMethodError:
body = ''
is_base64 = False
break
request = self._requests[request_id]
target = request['target']
rd = ResponseData(request_id, request['response'], body, self._page.tab_id, target)
rd.postData = request['post_data']
rd._base64_body = is_base64
rd.requestHeaders = request['request_headers']
self._results[target] = rd
self._driver.call_method('Fetch.continueResponse', requestId=i)
self._caught += 1
else: # request
self._driver.call_method('Fetch.continueRequest', requestId=i)
def _requestWillBeSent(self, **kwargs):
"""接收到请求时的回调函数"""
for target in self._targets:
if (self._is_regex and search(target, kwargs['request']['url'])) or (
not self._is_regex and target in kwargs['request']['url']):
self._requests[kwargs['requestId']] = {'target': target,
'post_data': kwargs['request'].get('postData', None),
'request_headers': kwargs['request']['headers']}
break
class ChromiumPageScroll(ChromiumScroll):
@ -1307,7 +1306,7 @@ class Screencast(object):
DrissionPage_Screencast_blob_ok = true;
})
mediaRecorder.start()
mediaRecorder.addEventListener('stop', function(){
while(DrissionPage_Screencast_blob_ok==false){}
DrissionPage_Screencast_blob = new Blob(DrissionPage_Screencast_chunks,

View File

@ -7,6 +7,7 @@ from pathlib import Path
from typing import Union, Tuple, List, Any, Dict
from DataRecorder import Recorder
from FlowViewer.listener import ResponseData
from requests import Session
from requests.cookies import RequestsCookieJar
@ -15,7 +16,6 @@ from .chromium_driver import ChromiumDriver
from .chromium_element import ChromiumElement, ChromiumScroll
from .chromium_frame import ChromiumFrame
from .commons.constants import NoneElement
from .commons.web import DataPacket
from .session_element import SessionElement
@ -231,37 +231,37 @@ class ChromiumBaseWaiter(object):
def load_complete(self, timeout: float = None) -> bool: ...
def upload_paths_inputted(self) -> None: ...
def set_targets(self, targets: Union[str, list, tuple, set], is_regex: bool = False) -> None: ...
def stop_listening(self) -> None: ...
def data_packets(self, timeout: float = None,
any_one: bool = False) -> Union[DataPacket, Dict[str, List[DataPacket]], False]: ...
any_one: bool = False) -> Union[ResponseData, Dict[str, ResponseData], False]: ...
def upload_paths_inputted(self) -> None: ...
class NetworkListener(object):
def __init__(self, page: ChromiumBase):
def __init__(self, page):
self._page: ChromiumBase = ...
self._count: int = ...
self._caught: int = ...
self._targets: Union[str, dict] = ...
self._single: bool = ...
self._results: Union[DataPacket, Dict[str, List[DataPacket]], False] = ...
self._results: Union[ResponseData, Dict[str, ResponseData], False] = ...
self._is_regex: bool = ...
self._driver: ChromiumDriver = ...
self._requests: dict = ...
def set_targets(self, targets: Union[str, list, tuple, set], is_regex: bool = False, count: int = None) -> None: ...
def start(self) -> None: ...
def set_targets(self, targets: Union[str, list, tuple, set], is_regex: bool = False) -> None: ...
def stop(self) -> None: ...
def listen(self, timeout: float = None,
any_one: bool = False) -> Union[DataPacket, Dict[str, List[DataPacket]], False]: ...
any_one: bool = False) -> Union[ResponseData, Dict[str, ResponseData], False]: ...
def _request_paused(self, **kwargs) -> None: ...
def _response_received(self, **kwargs) -> None: ...
def _loading_finished(self, **kwargs) -> None: ...
def _requestWillBeSent(self, **kwargs) -> None: ...
class ChromiumPageScroll(ChromiumScroll):
@ -366,4 +366,4 @@ class ScreencastMode(object):
def frugal_imgs_mode(self) -> None: ...
def imgs_mode(self) -> None: ...
def imgs_mode(self) -> None: ...

View File

@ -5,14 +5,13 @@
"""
from pathlib import Path
from platform import system
from re import search
from threading import Thread
from time import perf_counter, sleep
from warnings import warn
from requests import Session
from .chromium_base import ChromiumBase, Timeout, ChromiumBaseSetter, ChromiumBaseWaiter, NetworkListener
from .chromium_base import ChromiumBase, Timeout, ChromiumBaseSetter, ChromiumBaseWaiter
from .chromium_driver import ChromiumDriver
from .chromium_tab import ChromiumTab
from .commons.browser import connect_browser
@ -401,32 +400,6 @@ class ChromiumPageWaiter(ChromiumBaseWaiter):
while self._driver.tab_id == self._driver.latest_tab and perf_counter() < end_time:
sleep(.01)
def set_targets(self, targets, is_regex=False):
"""指定要等待的数据包
:param targets: 要匹配的数据包url特征可用list等传入多个
:param is_regex: 设置的target是否正则表达式
:return: None
"""
if not self._listener:
self._listener = NetworkListener(self._driver)
self._listener.set_targets(targets, is_regex)
def data_packets(self, timeout=None, any_one=False):
"""等待指定数据包加载完成
:param timeout: 超时时间为None则使用页面对象timeout
:param any_one: 多个target时是否全部监听到才结束为True时监听到一个目标就结束
:return: ResponseData对象或监听结果字典
"""
if not self._listener:
self._listener = NetworkListener(self._driver)
return self._listener.listen(timeout, any_one)
def stop_listening(self):
"""停止监听数据包"""
if not self._listener:
self._listener = NetworkListener(self._driver)
self._listener.stop()
class ChromiumTabRect(object):
def __init__(self, page):

View File

@ -3,151 +3,15 @@
@Author : g1879
@Contact : g1879@qq.com
"""
from base64 import b64decode
from html import unescape
from http.cookiejar import Cookie
from json import JSONDecodeError, loads
from re import sub
from urllib.parse import urlparse, urljoin, urlunparse
from requests.cookies import RequestsCookieJar
from requests.structures import CaseInsensitiveDict
from tldextract import extract
class DataPacket(object):
"""返回的数据包管理类"""
def __init__(self, tab, target, raw_info):
"""
:param request_id: request id
:param tab: 产生这个数据包的tab的id
:param target: 监听目标
:param raw_request: 原始request数据从cdp获得
"""
self.tab = tab
self.target = target
self._raw_info = raw_info
self._raw_post_data = None
self._raw_body = None
self._base64_body = False
self._request = None
self._response = None
def __repr__(self):
return f'<DataPacket target={self.target} request_id={self.requestId}>'
@property
def requestId(self):
return self._raw_info['requestId']
@property
def url(self):
return self.request.url
@property
def method(self):
return self.request.method
@property
def frameId(self):
return self._raw_info['frameId']
@property
def resourceType(self):
return self._raw_info['resourceType']
@property
def request(self):
if self._request is None:
self._request = Request(self._raw_info['request'], self._raw_post_data)
return self._request
@property
def response(self):
if self._response is None:
self._response = Response(self._raw_info, self._raw_body, self._base64_body)
return self._response
class Request(object):
__slots__ = ('url', 'urlFragment', 'postDataEntries', 'mixedContentType', 'initialPriority',
'referrerPolicy', 'isLinkPreload', 'trustTokenParams', 'isSameSite',
'_request', '_raw_post_data', '_postData')
def __init__(self, raw_request, post_data):
self._request = raw_request
self._raw_post_data = post_data
self._postData = None
def __getattr__(self, item):
return self._request.get(item, None)
@property
def headers(self):
"""以大小写不敏感字典返回headers数据"""
return CaseInsensitiveDict(self._request['request']['headers'])
@property
def postData(self):
"""返回postData数据"""
if self._postData is None:
if self._raw_post_data:
postData = self._raw_post_data
elif self._request.get('postData', None):
postData = self._request['postData']
else:
postData = False
try:
self._postData = loads(postData)
except JSONDecodeError:
self._postData = postData
return self._postData
class Response(object):
__slots__ = ('responseErrorReason', 'responseStatusCode', 'responseStatusText',
'_response', '_raw_body', '_is_base64_body', '_body', '_headers')
def __init__(self, raw_response, raw_body, base64_body):
self._response = raw_response
self._raw_body = raw_body
self._is_base64_body = base64_body
self._body = None
self._headers = None
def __getattr__(self, item):
return self._response.get(item, None)
@property
def headers(self):
if self._headers is None:
if 'responseHeaders' in self._response:
headers = {i['name']: i['value'] for i in self._response['responseHeaders']}
self._headers = CaseInsensitiveDict(headers)
else:
self._headers = False
return self._headers
@property
def body(self):
"""返回body内容如果是json格式自动进行转换如果时图片格式进行base64转换其它格式直接返回文本"""
if self._body is None:
if self._is_base64_body:
self._body = b64decode(self._raw_body)
else:
try:
self._body = loads(self._raw_body)
except JSONDecodeError:
self._body = self._raw_body
return self._body
def get_ele_txt(e):
"""获取元素内所有文本
:param e: 元素对象
@ -293,7 +157,7 @@ def make_absolute_link(link, page=None):
def is_js_func(func):
"""检查文本是否js函数"""
func = func.strip()
if func.startswith('function') or func.startswith('async '):
if (func.startswith('function') or func.startswith('async ')) and func.endswith('}'):
return True
elif '=>' in func:
return True

View File

@ -417,7 +417,7 @@ class PortFinder(object):
"""查找一个可用端口
:return: 可以使用的端口和用户文件夹路径组成的元组
"""
for i in range(9600, 9800):
for i in range(9600, 19800):
if i in PortFinder.used_port or port_is_using('127.0.0.1', i):
continue

View File

@ -6,7 +6,7 @@ with open("README.md", "r", encoding='utf-8') as fh:
setup(
name="DrissionPage",
version="3.2.26",
version="3.2.27",
author="g1879",
author_email="g1879@qq.com",
description="Python based web automation tool. It can control the browser and send and receive data packets.",
@ -25,7 +25,7 @@ setup(
'DownloadKit>=0.5.3',
'FlowViewer>=0.3.0',
'websocket-client',
'click~=8.1.3',
'click',
'tldextract'
],
classifiers=[