4.0.0b35(+)

优化查找浏览器方法;
监听器增加资源类型筛选;
监听器增加fail_info和is_failed属性;
调整set_targets()和start()参数默认值;
blocked_urls()可接收str
This commit is contained in:
g1879 2024-01-07 21:27:33 +08:00
parent e56995dcf0
commit bff8d6ba73
14 changed files with 174 additions and 196 deletions

View File

@ -13,4 +13,4 @@ from ._configs.chromium_options import ChromiumOptions
from ._configs.session_options import SessionOptions
__all__ = ['ChromiumPage', 'ChromiumOptions', 'SessionOptions', 'SessionPage', 'WebPage', '__version__']
__version__ = '4.0.0b34'
__version__ = '4.0.0b35'

View File

@ -8,13 +8,14 @@ from pathlib import Path
from re import search
from time import perf_counter, sleep
from DataRecorder.tools import get_usable_path
from .none_element import NoneElement
from .session_element import make_session_ele
from .._base.base import DrissionElement, BaseElement
from .._functions.keys import input_text_or_keys
from .._functions.locator import get_loc
from .._functions.settings import Settings
from .._functions.tools import get_usable_path
from .._functions.web import make_absolute_link, get_ele_txt, format_html, is_js_func, offset_scroll
from .._units.clicker import Clicker
from .._units.rect import ElementRect

View File

@ -46,10 +46,10 @@ def connect_browser(option):
# 传入的路径找不到主动在ini文件、注册表、系统变量中找
except FileNotFoundError:
chrome_path = get_chrome_path(show_msg=False)
chrome_path = get_chrome_path()
if not chrome_path:
raise FileNotFoundError('无法找到chrome路径,请手动配置。')
raise FileNotFoundError('无法找到浏览器可执行文件路径,请手动配置。')
_run_browser(port, chrome_path, args)
@ -281,34 +281,26 @@ def _remove_arg_from_dict(target_dict: dict, arg: str) -> None:
pass
def get_chrome_path(ini_path=None, show_msg=True, from_ini=True,
from_regedit=True, from_system_path=True):
"""从ini文件或系统变量中获取chrome.exe的路径
:param ini_path: ini文件路径
:param show_msg: 是否打印信息
:param from_ini: 是否从ini文件获取
:param from_regedit: 是否从注册表获取
:param from_system_path: 是否从系统路径获取
:return: chrome.exe路径
"""
def get_chrome_path():
"""从ini文件或系统变量中获取chrome可执行文件的路径"""
# -----------从ini文件中获取--------------
if ini_path and from_ini:
try:
path = OptionsManager(ini_path).chromium_options['browser_path']
except KeyError:
path = None
else:
path = None
path = OptionsManager().chromium_options.get('browser_path', None)
if path and Path(path).is_file():
if show_msg:
print('ini文件中', end='')
return str(path)
# -----------使用which获取-----------
from shutil import which
path = (which('chrome') or which('chromium') or which('google-chrome') or which('google-chrome-stable')
or which('google-chrome-unstable') or which('google-chrome-beta'))
if path:
return path
# -----------从MAC和Linux默认路径获取-----------
from platform import system
sys = system().lower()
if sys in ('macos', 'darwin'):
return '/Applications/Google Chrome.app/Contents/MacOS/Google Chrome'
p = '/Applications/Google Chrome.app/Contents/MacOS/Google Chrome'
return p if Path(p).exists() else None
elif sys == 'linux':
paths = ('/usr/bin/google-chrome', '/opt/google/chrome/google-chrome',
@ -322,48 +314,39 @@ def get_chrome_path(ini_path=None, show_msg=True, from_ini=True,
return None
# -----------从注册表中获取--------------
if from_regedit:
import winreg
try:
key = winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE,
r'SOFTWARE\Microsoft\Windows\CurrentVersion\App Paths\chrome.exe',
reserved=0, access=winreg.KEY_READ)
k = winreg.EnumValue(key, 0)
winreg.CloseKey(key)
import winreg
try:
key = winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE,
r'SOFTWARE\Microsoft\Windows\CurrentVersion\App Paths\chrome.exe',
reserved=0, access=winreg.KEY_READ)
k = winreg.EnumValue(key, 0)
winreg.CloseKey(key)
if show_msg:
print('注册表中', end='')
return k[1]
return k[1]
except FileNotFoundError:
pass
except FileNotFoundError:
pass
# -----------从系统变量中获取--------------
if from_system_path:
try:
paths = popen('set path').read().lower()
except:
return None
r = search(r'[^;]*chrome[^;]*', paths)
if r:
path = Path(r.group(0)) if 'chrome.exe' in r.group(0) else Path(r.group(0)) / 'chrome.exe'
if path.exists():
return str(path)
paths = paths.split(';')
for path in paths:
path = Path(path) / 'chrome.exe'
try:
paths = popen('set path').read().lower()
except:
return None
r = search(r'[^;]*chrome[^;]*', paths)
if r:
path = Path(r.group(0)) if 'chrome.exe' in r.group(0) else Path(r.group(0)) / 'chrome.exe'
if path.exists():
if show_msg:
print('系统变量中', end='')
return str(path)
paths = paths.split(';')
for path in paths:
path = Path(path) / 'chrome.exe'
try:
if path.exists():
if show_msg:
print('系统变量中', end='')
return str(path)
except OSError:
pass
except OSError:
pass

View File

@ -23,8 +23,4 @@ def set_flags(opt: ChromiumOptions) -> None: ...
def test_connect(ip: str, port: Union[int, str], timeout: float = 30) -> None: ...
def get_chrome_path(ini_path: str = None,
show_msg: bool = True,
from_ini: bool = True,
from_regedit: bool = True,
from_system_path: bool = True, ) -> Union[str, None]: ...
def get_chrome_path() -> Union[str, None]: ...

View File

@ -5,7 +5,6 @@
"""
from pathlib import Path
from platform import system
from re import search, sub
from shutil import rmtree
from time import perf_counter, sleep
@ -16,73 +15,6 @@ from ..errors import (ContextLostError, ElementLostError, CDPError, PageDisconne
AlertExistsError, WrongURLError, StorageError, CookieFormatError, JavaScriptError)
def get_usable_path(path, is_file=True, parents=True):
"""检查文件或文件夹是否有重名,并返回可以使用的路径
:param path: 文件或文件夹路径
:param is_file: 目标是文件还是文件夹
:param parents: 是否创建目标路径
:return: 可用的路径Path对象
"""
path = Path(path)
parent = path.parent
if parents:
parent.mkdir(parents=True, exist_ok=True)
path = parent / make_valid_name(path.name)
name = path.stem if path.is_file() else path.name
ext = path.suffix if path.is_file() else ''
first_time = True
while path.exists() and path.is_file() == is_file:
r = search(r'(.*)_(\d+)$', name)
if not r or (r and first_time):
src_name, num = name, '1'
else:
src_name, num = r.group(1), int(r.group(2)) + 1
name = f'{src_name}_{num}'
path = parent / f'{name}{ext}'
first_time = None
return path
def make_valid_name(full_name):
"""获取有效的文件名
:param full_name: 文件名
:return: 可用的文件名
"""
# ----------------去除前后空格----------------
full_name = full_name.strip()
# ----------------使总长度不大于255个字符一个汉字是2个字符----------------
r = search(r'(.*)(\.[^.]+$)', full_name) # 拆分文件名和后缀名
if r:
name, ext = r.group(1), r.group(2)
ext_long = len(ext)
else:
name, ext = full_name, ''
ext_long = 0
while get_long(name) > 255 - ext_long:
name = name[:-1]
full_name = f'{name}{ext}'
# ----------------去除不允许存在的字符----------------
return sub(r'[<>/\\|:*?\n]', '', full_name)
def get_long(txt):
"""返回字符串中字符个数一个汉字是2个字符
:param txt: 字符串
:return: 字符个数
"""
txt_len = len(txt)
return int((len(txt.encode('utf-8')) - txt_len) / 2 + txt_len)
def port_is_using(ip, port):
"""检查端口是否被占用
:param ip: 浏览器地址

View File

@ -11,15 +11,6 @@ from types import FunctionType
from .._pages.chromium_page import ChromiumPage
def get_usable_path(path: Union[str, Path], is_file: bool = True, parents: bool = True) -> Path: ...
def make_valid_name(full_name: str) -> str: ...
def get_long(txt) -> int: ...
def port_is_using(ip: str, port: Union[str, int]) -> bool: ...

View File

@ -11,13 +11,15 @@ from threading import Thread
from time import perf_counter, sleep
from urllib.parse import quote
from DataRecorder.tools import make_valid_name
from .._base.base import BasePage
from .._elements.chromium_element import ChromiumElement, run_js, make_chromium_eles
from .._elements.none_element import NoneElement
from .._elements.session_element import make_session_ele
from .._functions.locator import get_loc, is_loc
from .._functions.settings import Settings
from .._functions.tools import raise_error, make_valid_name
from .._functions.tools import raise_error
from .._functions.web import location_in_viewport
from .._units.actions import Actions
from .._units.listener import Listener

View File

@ -8,7 +8,7 @@ from pathlib import Path
from shutil import move
from time import sleep, perf_counter
from .._functions.tools import get_usable_path
from DataRecorder.tools import get_usable_path
class DownloadManager(object):

View File

@ -29,56 +29,76 @@ class Listener(object):
self._driver = None
self._running_requests = 0
self._caught = None # 临存捕捉到的数据
self._request_ids = None # 暂存须要拦截的请求id
self._caught = None
self._request_ids = None
self._extra_info_ids = None
self.listening = False
self._targets = None # 默认监听所有
self.tab_id = None # 当前tab的id
self.tab_id = None
self._targets = True
self._is_regex = False
self._method = None
self._method = ('GET', 'POST')
self._res_type = True
@property
def targets(self):
"""返回监听目标"""
return self._targets
def set_targets(self, targets=True, is_regex=False, method=('GET', 'POST')):
def set_targets(self, targets=True, is_regex=False, method=('GET', 'POST'), res_type=True):
"""指定要等待的数据包
:param targets: 要匹配的数据包url特征可用list等传入多个为True时获取所有
:param is_regex: 设置的target是否正则表达式
:param method: 设置监听的请求类型可指定多个为None时监听全部
:param method: 设置监听的请求类型可指定多个为True时监听全部
:param res_type: 设置监听的资源类型可指定多个为True时监听全部可指定的值有
Document, Stylesheet, Image, Media, Font, Script, TextTrack, XHR, Fetch, Prefetch, EventSource, WebSocket,
Manifest, SignedExchange, Ping, CSPViolationReport, Preflight, Other
:return: None
"""
if targets is not None:
if not isinstance(targets, (str, list, tuple, set)) and targets is not True:
raise TypeError('targets只能是str、list、tuple、set、True。')
if targets is True:
targets = ''
self._targets = True
else:
self._targets = {targets} if isinstance(targets, str) else set(targets)
self._targets = {targets} if isinstance(targets, str) else set(targets)
self._is_regex = is_regex
if is_regex is not None:
self._is_regex = is_regex
if method is not None:
if isinstance(method, str):
self._method = {method.upper()}
elif isinstance(method, (list, tuple, set)):
self._method = set(i.upper() for i in method)
elif method is True:
self._method = True
else:
raise TypeError('method参数只能是str、list、tuple、set类型。')
raise TypeError('method参数只能是str、list、tuple、set、True类型。')
def start(self, targets=None, is_regex=False, method=('GET', 'POST')):
if res_type is not None:
if isinstance(res_type, str):
self._res_type = {res_type.upper()}
elif isinstance(res_type, (list, tuple, set)):
self._res_type = set(i.upper() for i in res_type)
elif res_type is True:
self._res_type = True
else:
raise TypeError('res_type参数只能是str、list、tuple、set、True类型。')
def start(self, targets=None, is_regex=None, method=None, res_type=None):
"""拦截目标请求,每次拦截前清空结果
:param targets: 要匹配的数据包url特征可用list等传入多个为True时获取所有
:param is_regex: 设置的target是否正则表达式
:param method: 设置监听的请求类型可指定多个为None时监听全部
:param is_regex: 设置的target是否正则表达式为None时保持原来设置
:param method: 设置监听的请求类型可指定多个默认('GET', 'POST')为True时监听全部为None时保持原来设置
:param res_type: 设置监听的资源类型可指定多个默认为True时监听全部为None时保持原来设置可指定的值有
Document, Stylesheet, Image, Media, Font, Script, TextTrack, XHR, Fetch, Prefetch, EventSource, WebSocket,
Manifest, SignedExchange, Ping, CSPViolationReport, Preflight, Other
:return: None
"""
if targets or method:
self.set_targets(targets, is_regex, method)
if targets or is_regex is not None or method or res_type:
self.set_targets(targets, is_regex, method, res_type)
self.clear()
if self.listening:
@ -240,10 +260,11 @@ class Listener(object):
"""接收到请求时的回调函数"""
self._running_requests += 1
p = None
if not self._targets:
if not self._method or kwargs['request']['method'] in self._method:
if self._targets is True:
if ((self._method is True or kwargs['request']['method'] in self._method)
and (self._res_type is True or kwargs.get('type', '').upper() in self._res_type)):
rid = kwargs['requestId']
p = self._request_ids.setdefault(rid, DataPacket(self._page.tab_id, None))
p = self._request_ids.setdefault(rid, DataPacket(self._page.tab_id, True))
p._raw_request = kwargs
if kwargs['request'].get('hasPostData', None) and not kwargs['request'].get('postData', None):
p._raw_post_data = self._driver.run('Network.getRequestPostData',
@ -252,9 +273,10 @@ class Listener(object):
else:
rid = kwargs['requestId']
for target in self._targets:
if ((self._is_regex and search(target, kwargs['request']['url'])) or
(not self._is_regex and target in kwargs['request']['url'])) and (
not self._method or kwargs['request']['method'] in self._method):
if (((self._is_regex and search(target, kwargs['request']['url']))
or (not self._is_regex and target in kwargs['request']['url']))
and (self._method is True or kwargs['request']['method'] in self._method)
and (self._res_type is True or kwargs.get('type', '').upper() in self._res_type)):
p = self._request_ids.setdefault(rid, DataPacket(self._page.tab_id, target))
p._raw_request = kwargs
break
@ -329,8 +351,9 @@ class Listener(object):
r_id = kwargs['requestId']
dp = self._request_ids.get(r_id, None)
if dp:
dp.errorText = kwargs['errorText']
dp._raw_fail_info = kwargs
dp._resource_type = kwargs['type']
dp.is_failed = True
r = self._extra_info_ids.get(kwargs['requestId'], None)
if r:
@ -374,23 +397,25 @@ class DataPacket(object):
"""
self.tab_id = tab_id
self.target = target
self.is_failed = False
self._raw_request = None
self._raw_post_data = None
self._raw_response = None
self._raw_body = None
self._base64_body = False
self._requestExtraInfo = None
self._responseExtraInfo = None
self._raw_fail_info = None
self._request = None
self._response = None
self.errorText = None
self._fail_info = None
self._base64_body = False
self._requestExtraInfo = None
self._responseExtraInfo = None
self._resource_type = None
def __repr__(self):
t = f'"{self.target}"' if self.target is not None else None
t = f'"{self.target}"' if self.target is not True else True
return f'<DataPacket target={t} url="{self.url}">'
@property
@ -429,6 +454,12 @@ class DataPacket(object):
self._response = Response(self, self._raw_response, self._raw_body, self._base64_body)
return self._response
@property
def fail_info(self):
if self._fail_info is None:
self._fail_info = FailInfo(self, self._raw_fail_info)
return self._fail_info
def wait_extra_info(self, timeout=None):
"""等待额外的信息加载完成
:param timeout: 超时时间None为无限等待
@ -498,7 +529,7 @@ class Response(object):
self._headers = None
def __getattr__(self, item):
return self._response.get(item, None)
return self._response.get(item, None) if self._response else None
@property
def headers(self):
@ -551,3 +582,12 @@ class RequestExtraInfo(ExtraInfo):
class ResponseExtraInfo(ExtraInfo):
pass
class FailInfo(object):
def __init__(self, data_packet, fail_info):
self._data_packet = data_packet
self._fail_info = fail_info
def __getattr__(self, item):
return self._fail_info.get(item, None) if self._fail_info else None

View File

@ -4,7 +4,7 @@
@Contact : g1879@qq.com
"""
from queue import Queue
from typing import Union, Dict, List, Iterable, Tuple, Optional
from typing import Union, Dict, List, Iterable, Optional, Literal
from requests.structures import CaseInsensitiveDict
@ -12,6 +12,9 @@ from .._base.driver import Driver
from .._pages.chromium_base import ChromiumBase
from .._pages.chromium_frame import ChromiumFrame
__RES_TYPE__ = Literal['Document', 'Stylesheet', 'Image', 'Media', 'Font', 'Script', 'TextTrack', 'XHR', 'Fetch',
'Prefetch', 'EventSource', 'WebSocket', 'Manifest', 'SignedExchange', 'Ping', 'CSPViolationReport', 'Preflight', 'Other']
class Listener(object):
def __init__(self, page: ChromiumBase):
@ -20,6 +23,7 @@ class Listener(object):
self._target_id: str = ...
self._targets: Union[str, dict] = ...
self._method: set = ...
self._res_type: set = ...
self._caught: Queue = ...
self._is_regex: bool = ...
self._driver: Driver = ...
@ -31,8 +35,17 @@ class Listener(object):
@property
def targets(self) -> Optional[set]: ...
def set_targets(self, targets: Union[str, list, tuple, set, None] = None, is_regex: bool = False,
method: Union[str, list, tuple, set] = None) -> None: ...
def set_targets(self,
targets: Optional[str, list, tuple, set, bool] = True,
is_regex: Optional[bool] = False,
method: Optional[str, list, tuple, set, bool] = ('GET', 'POST'),
res_type: Optional[__RES_TYPE__, list, tuple, set, bool] = True) -> None: ...
def start(self,
targets: Optional[str, list, tuple, set, bool] = None,
is_regex: Optional[bool] = None,
method: Optional[str, list, tuple, set, bool] = None,
res_type: Optional[__RES_TYPE__, list, tuple, set, bool] = None) -> None: ...
def stop(self) -> None: ...
@ -40,7 +53,10 @@ class Listener(object):
def resume(self) -> None: ...
def wait(self, count: int = 1, timeout: float = None, fit_count: bool = True,
def wait(self,
count: int = 1,
timeout: float = None,
fit_count: bool = True,
raise_err: bool = None) -> Union[List[DataPacket], DataPacket, None]: ...
@property
@ -52,10 +68,6 @@ class Listener(object):
def _to_target(self, target_id: str, address: str, page: ChromiumBase) -> None: ...
def start(self, targets: Union[str, List[str], Tuple, bool, None] = None, is_regex: bool = False,
method: Union[str, list, tuple, set] = None) \
-> Union[DataPacket, Dict[str, List[DataPacket]], False]: ...
def _requestWillBeSent(self, **kwargs) -> None: ...
def _requestWillBeSentExtraInfo(self, **kwargs) -> None: ...
@ -68,7 +80,9 @@ class Listener(object):
def _loading_failed(self, **kwargs) -> None: ...
def steps(self, count: int = None, timeout: float = None,
def steps(self,
count: int = None,
timeout: float = None,
gap=1) -> Iterable[Union[DataPacket, List[DataPacket]]]: ...
def _set_callback(self) -> None: ...
@ -83,17 +97,19 @@ class FrameListener(Listener):
class DataPacket(object):
"""返回的数据包管理类"""
def __init__(self, tab_id: str, target: Optional[str]):
def __init__(self, tab_id: str, target: [str, bool]):
self.tab_id: str = ...
self.target: str = ...
self.is_failed: bool = ...
self._raw_request: Optional[dict] = ...
self._raw_response: Optional[dict] = ...
self._raw_post_data: str = ...
self._raw_body: str = ...
self._raw_fail_info: Optional[dict] = ...
self._base64_body: bool = ...
self._request: Request = ...
self._response: Response = ...
self.errorText: str = ...
self._fail_info: Optional[FailInfo] = ...
self._resource_type: str = ...
self._requestExtraInfo: Optional[dict] = ...
self._responseExtraInfo: Optional[dict] = ...
@ -122,6 +138,9 @@ class DataPacket(object):
@property
def response(self) -> Response: ...
@property
def fail_info(self) -> Optional[FailInfo]: ...
def wait_extra_info(self, timeout: float = None) -> bool: ...
@ -228,3 +247,15 @@ class ResponseExtraInfo(ExtraInfo):
headersText: str = ...
cookiePartitionKey: str = ...
cookiePartitionKeyOpaque: bool = ...
class FailInfo(object):
_data_packet: DataPacket
_fail_info: dict
_fail_info: float
errorText: str
canceled: bool
blockedReason: Optional[str]
corsErrorStatus: Optional[str]
def __init__(self, data_packet: DataPacket, fail_info: dict): ...

View File

@ -146,14 +146,16 @@ class ChromiumBaseSetter(BasePageSetter):
self._page._alert.auto = accept if on_off else None
def blocked_urls(self, urls):
"""设置要忽略的url传入None时清空已设置的内容。
:param urls:
"""设置要忽略的url
:param urls: 要忽略的url可用*通配符可输入多个传入None时清空已设置的内容
:return: None
"""
if not urls:
urls = []
elif isinstance(urls, str):
urls = (urls,)
if not isinstance(urls, (list, tuple)):
raise TypeError('urls需传入list或tuple类型。')
raise TypeError('urls需传入str、list或tuple类型。')
self._page.run_cdp('Network.enable')
self._page.run_cdp('Network.setBlockedURLs', urls=urls)

View File

@ -62,7 +62,7 @@ class ChromiumBaseSetter(BasePageSetter):
def upload_files(self, files: Union[str, list, tuple]) -> None: ...
def blocked_urls(self, urls: Optional[list, tuple]) -> None: ...
def blocked_urls(self, urls: Optional[list, tuple, str]) -> None: ...
class TabSetter(ChromiumBaseSetter):

View File

@ -1,7 +1,7 @@
requests
lxml
cssselect
DownloadKit>=2.0.0b3
DownloadKit>=2.0.0b5
websocket-client>=1.7.0
click
tldextract

View File

@ -6,7 +6,7 @@ with open("README.md", "r", encoding='utf-8') as fh:
setup(
name="DrissionPage",
version="4.0.0b34",
version="4.0.0b35",
author="g1879",
author_email="g1879@qq.com",
description="Python based web automation tool. It can control the browser and send and receive data packets.",
@ -22,7 +22,7 @@ setup(
'lxml',
'requests',
'cssselect',
'DownloadKit>=2.0.0b3',
'DownloadKit>=2.0.0b5',
'websocket-client>=1.7.0',
'click',
'tldextract',