ChromiumOption增加new_env();auto_port()删除tmp_path参数;ini增加new_env

This commit is contained in:
g1879 2024-07-04 23:58:01 +08:00
parent 4114e6826c
commit c6e3e0b71f
10 changed files with 55 additions and 32 deletions

View File

@ -41,12 +41,13 @@ class Browser(object):
:param session_options: 使用双模Tab时使用的默认Session配置为True使用ini文件配置
"""
opt = handle_options(addr_or_opts)
is_headless, browser_id = run_browser(opt)
is_headless, browser_id, is_exists = run_browser(opt)
if browser_id in cls._BROWSERS:
return cls._BROWSERS[browser_id]
r = object.__new__(cls)
r._chromium_options = opt
r.is_headless = is_headless
r._is_exists = is_exists
r.id = browser_id
cls._BROWSERS[browser_id] = r
return r
@ -74,11 +75,11 @@ class Browser(object):
self._download_path = str(Path(self._chromium_options.download_path).absolute())
self.retry_times = self._chromium_options.retry_times
self.retry_interval = self._chromium_options.retry_interval
self.user_data_path = self._chromium_options.user_data_path
self.address = self._chromium_options.address
self._driver = BrowserDriver(self.id, 'browser', self.address, self)
if self.is_headless != self._chromium_options.is_headless:
if self.is_headless != self._chromium_options.is_headless or (
self._is_exists and self._chromium_options._new_env):
self.quit(3, True)
connect_browser(self._chromium_options)
s = Session()
@ -111,6 +112,11 @@ class Browser(object):
self._session_options = SessionOptions() if session_options is True else session_options
@property
def user_data_path(self):
"""返回用户文件夹路径"""
return self._chromium_options.user_data_path
@property
def process_id(self):
"""返回浏览器进程id"""
@ -537,7 +543,7 @@ def handle_options(addr_or_opts):
def run_browser(chromium_options):
"""连接浏览器"""
connect_browser(chromium_options)
is_exists = connect_browser(chromium_options)
try:
s = Session()
s.trust_env = False
@ -553,4 +559,4 @@ def run_browser(chromium_options):
raise BrowserConnectError('浏览器版本太旧或此浏览器不支持接管。')
except:
raise BrowserConnectError('\n浏览器连接失败,请确认浏览器是否启动。')
return is_headless, browser_id
return is_headless, browser_id, is_exists

View File

@ -26,7 +26,6 @@ class Browser(object):
retry_times: int = ...
retry_interval: float = ...
is_headless: bool = ...
user_data_path: str = ...
_BROWSERS: dict = ...
_chromium_options: ChromiumOptions = ...
@ -44,6 +43,7 @@ class Browser(object):
_timeouts: Timeout = ...
_load_mode: str = ...
_download_path: str = ...
_is_exists: bool = ...
def __new__(cls,
addr_or_opts: Union[str, int, ChromiumOptions] = None,
@ -56,6 +56,9 @@ class Browser(object):
def _run_cdp(self, cmd, **cmd_args) -> dict: ...
@property
def user_data_path(self) -> str: ...
@property
def process_id(self) -> Optional[int]: ...

View File

@ -192,7 +192,7 @@ class Driver(object):
if 'result' not in result and 'error' in result:
kwargs['_timeout'] = timeout
return {'error': result['error']['message'], 'type': result.get('type', 'call_method_error'),
'method': _method, 'args': kwargs}
'method': _method, 'args': kwargs, 'data': result['error']['data']}
else:
return result['result']

View File

@ -33,8 +33,8 @@ class ChromiumOptions(object):
self.ini_path = str(ini_path)
else:
self.ini_path = str(Path(__file__).parent / 'configs.ini')
om = OptionsManager(ini_path)
om = OptionsManager(ini_path)
options = om.chromium_options
self._download_path = om.paths.get('download_path', '.') or '.'
self._tmp_path = om.paths.get('tmp_path', None) or None
@ -47,6 +47,7 @@ class ChromiumOptions(object):
self._load_mode = options.get('load_mode', 'normal')
self._system_user_path = options.get('system_user_path', False)
self._existing_only = options.get('existing_only', False)
self._new_env = options.get('new_env', False)
for i in self._arguments:
if i.startswith('--headless'):
self._is_headless = True
@ -364,6 +365,14 @@ class ChromiumOptions(object):
on_off = None if on_off else False
return self.set_argument('--incognito', on_off)
def new_env(self, on_off=True):
"""设置是否使用全新浏览器环境
:param on_off: 开或关
:return: 当前对象
"""
self._new_env = on_off
return self
def ignore_certificate_errors(self, on_off=True):
"""设置是否忽略证书错误
:param on_off: 开或关
@ -504,17 +513,14 @@ class ChromiumOptions(object):
self._system_user_path = on_off
return self
def auto_port(self, on_off=True, tmp_path=None, scope=None):
def auto_port(self, on_off=True, scope=None):
"""自动获取可用端口
:param on_off: 是否开启自动获取端口号
:param tmp_path: 临时文件保存路径为None时保存到系统临时文件夹on_off为False时此参数无效
:param scope: 指定端口范围不含最后的数字为None则使用[9600-19600)
:param scope: 指定端口范围不含最后的数字为None则使用[9600-59600)
:return: 当前对象
"""
if on_off:
self._auto_port = scope if scope else (9600, 19600)
if tmp_path:
self._tmp_path = str(tmp_path)
self._auto_port = scope if scope else (9600, 59600)
else:
self._auto_port = False
return self
@ -553,7 +559,7 @@ class ChromiumOptions(object):
# 设置chromium_options
attrs = ('address', 'browser_path', 'arguments', 'extensions', 'user', 'load_mode',
'auto_port', 'system_user_path', 'existing_only', 'flags')
'auto_port', 'system_user_path', 'existing_only', 'flags', 'new_env')
for i in attrs:
om.set_item('chromium_options', i, self.__getattribute__(f'_{i}'))
# 设置代理

View File

@ -26,6 +26,7 @@ class ChromiumOptions(object):
_prefs: dict = ...
_flags: dict = ...
_prefs_to_del: list = ...
_new_env: bool = ...
clear_file_flags: bool = ...
_auto_port: Union[Tuple[int, int], False] = ...
_system_user_path: bool = ...
@ -136,6 +137,8 @@ class ChromiumOptions(object):
def incognito(self, on_off: bool = True) -> ChromiumOptions: ...
def new_env(self, on_off: bool = True) -> ChromiumOptions: ...
def set_user_agent(self, user_agent: str) -> ChromiumOptions: ...
def set_proxy(self, proxy: str) -> ChromiumOptions: ...
@ -166,7 +169,6 @@ class ChromiumOptions(object):
def auto_port(self,
on_off: bool = True,
tmp_path: Union[str, Path] = None,
scope: Tuple[int, int] = None) -> ChromiumOptions: ...
def existing_only(self, on_off: bool = True) -> ChromiumOptions: ...

View File

@ -14,6 +14,7 @@ user = Default
auto_port = False
system_user_path = False
existing_only = False
new_env = False
[session_options]
headers = {'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_6) AppleWebKit/603.3.8 (KHTML, like Gecko) Version/10.1.2 Safari/603.3.8', 'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', 'connection': 'keep-alive', 'accept-charset': 'GB2312,utf-8;q=0.7,*;q=0.7'}

View File

@ -64,6 +64,7 @@ class OptionsManager(object):
self.set_item('chromium_options', 'auto_port', 'False')
self.set_item('chromium_options', 'system_user_path', 'False')
self.set_item('chromium_options', 'existing_only', 'False')
self.set_item('chromium_options', 'new_env', 'False')
self.set_item('session_options', 'headers', "{'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X "
"10_12_6) AppleWebKit/603.3.8 (KHTML, like Gecko) Version/10."
"1.2 Safari/603.3.8', 'accept': 'text/html,application/xhtml"

View File

@ -8,6 +8,7 @@
from json import load, dump, JSONDecodeError
from os import environ
from pathlib import Path
from shutil import rmtree
from subprocess import Popen, DEVNULL
from tempfile import gettempdir
from time import perf_counter, sleep
@ -33,7 +34,9 @@ def connect_browser(option):
return True
# ----------创建浏览器进程----------
args = get_launch_args(option)
args, user_path = get_launch_args(option)
if option._new_env:
rmtree(user_path, ignore_errors=True)
set_prefs(option)
set_flags(option)
try:
@ -42,10 +45,8 @@ def connect_browser(option):
# 传入的路径找不到主动在ini文件、注册表、系统变量中找
except FileNotFoundError:
browser_path = get_chrome_path(option.ini_path)
if not browser_path:
raise FileNotFoundError('无法找到浏览器可执行文件路径,请手动配置。')
_run_browser(port, browser_path, args)
test_connect(ip, port)
@ -59,23 +60,24 @@ def get_launch_args(opt):
"""
# ----------处理arguments-----------
result = set()
has_user_path = False
user_path = False
for i in opt.arguments:
if i.startswith(('--load-extension=', '--remote-debugging-port=')):
continue
elif i.startswith('--user-data-dir') and not opt.system_user_path:
result.add(f'--user-data-dir={Path(i[16:]).absolute()}')
has_user_path = True
user_path = f'--user-data-dir={Path(i[16:]).absolute()}'
result.add(user_path)
continue
result.add(i)
if not has_user_path and not opt.system_user_path:
if not user_path and not opt.system_user_path:
port = opt.address.split(':')[-1] if opt.address else '0'
p = Path(opt.tmp_path) if opt.tmp_path else Path(gettempdir()) / 'DrissionPage'
path = p / f'userData_{port}'
path = p / 'userData' / port
path.mkdir(parents=True, exist_ok=True)
opt.set_user_data_path(path)
result.add(f'--user-data-dir={path}')
user_path = path.absolute()
opt.set_user_data_path(user_path)
result.add(f'--user-data-dir={user_path}')
result = list(result)
@ -86,7 +88,7 @@ def get_launch_args(opt):
ext = f'--load-extension={ext}'
result.append(ext)
return result
return result, user_path
def set_prefs(opt):

View File

@ -28,17 +28,17 @@ class PortFinder(object):
:param path: 临时文件保存路径为None时使用系统临时文件夹
"""
tmp = Path(path) if path else Path(gettempdir()) / 'DrissionPage'
self.tmp_dir = tmp / 'UserTempFolder'
self.tmp_dir = tmp / 'autoPortData'
self.tmp_dir.mkdir(parents=True, exist_ok=True)
if str(self.tmp_dir.absolute()) not in PortFinder.checked_paths:
for i in self.tmp_dir.iterdir():
if i.is_dir() and i.stem.startswith('AutoPort') and not port_is_using('127.0.0.1', i.name[8:]):
if i.is_dir() and not port_is_using('127.0.0.1', i.name):
rmtree(i, ignore_errors=True)
PortFinder.checked_paths.add(str(self.tmp_dir.absolute()))
def get_port(self, scope=None):
"""查找一个可用端口
:param scope: 指定端口范围不含最后的数字为None则使用[9600-19600)
:param scope: 指定端口范围不含最后的数字为None则使用[9600-59600)
:return: 可以使用的端口和用户文件夹路径组成的元组
"""
from random import randint
@ -47,7 +47,7 @@ class PortFinder(object):
PortFinder.used_port.clear()
PortFinder.prev_time = perf_counter()
if scope in (True, None):
scope = (9600, 19600)
scope = (9600, 59600)
msx_times = scope[1] - scope[0]
times = 0
while times < msx_times:
@ -55,7 +55,7 @@ class PortFinder(object):
port = randint(*scope)
if port in PortFinder.used_port or port_is_using('127.0.0.1', port):
continue
path = self.tmp_dir / f'AutoPort{port}'
path = self.tmp_dir / str(port)
if path.exists():
try:
rmtree(path)

View File

@ -283,6 +283,8 @@ class Actions:
:return: self
"""
modifiers = []
if not isinstance(keys, (str, tuple, list)):
keys = str(keys)
for i in keys:
for character in i:
if character in ('\ue009', '\ue008', '\ue00a', '\ue03d'):