ChromiumOptions和Browser增加is_headless属性;接管浏览器时如无头状态和设置不一致,会按设置重启浏览器

This commit is contained in:
g1879 2024-06-28 23:34:30 +08:00
parent ac3a8ec27c
commit 61dce186c6
5 changed files with 87 additions and 73 deletions

View File

@ -40,13 +40,12 @@ class Browser(object):
:param session_options: 使用双模Tab时使用的默认Session配置为True使用ini文件配置 :param session_options: 使用双模Tab时使用的默认Session配置为True使用ini文件配置
""" """
opt = handle_options(addr_or_opts) opt = handle_options(addr_or_opts)
is_exist, browser_id = run_browser(opt) is_headless, browser_id = run_browser(opt)
if browser_id in cls._BROWSERS: if browser_id in cls._BROWSERS:
r = cls._BROWSERS[browser_id] return cls._BROWSERS[browser_id]
return r
r = object.__new__(cls) r = object.__new__(cls)
r._chromium_options = opt r._chromium_options = opt
r._is_exist = is_exist r.is_headless = is_headless
r.id = browser_id r.id = browser_id
r.address = opt.address r.address = opt.address
cls._BROWSERS[browser_id] = r cls._BROWSERS[browser_id] = r
@ -63,7 +62,6 @@ class Browser(object):
self._type = 'Browser' self._type = 'Browser'
self._driver = BrowserDriver(self.id, 'browser', self.address, self) self._driver = BrowserDriver(self.id, 'browser', self.address, self)
self.version = self.run_cdp('Browser.getVersion')['product']
self._frames = {} self._frames = {}
self._drivers = {} self._drivers = {}
@ -82,9 +80,25 @@ class Browser(object):
self.retry_times = self._chromium_options.retry_times self.retry_times = self._chromium_options.retry_times
self.retry_interval = self._chromium_options.retry_interval self.retry_interval = self._chromium_options.retry_interval
if self.is_headless != self._chromium_options.is_headless:
self.quit(3, True)
connect_browser(self._chromium_options)
s = Session()
s.trust_env = False
ws = s.get(f'http://{self._chromium_options.address}/json/version', headers={'Connection': 'close'})
self.id = ws.json()['webSocketDebuggerUrl'].split('/')[-1]
self._driver = BrowserDriver(self.id, 'browser', self.address, self)
ws.close()
s.close()
self._frames = {}
self._drivers = {}
self._all_drivers = {}
self.version = self._run_cdp('Browser.getVersion')['product']
self._process_id = None self._process_id = None
try: try:
r = self.run_cdp('SystemInfo.getProcessInfo') r = self._run_cdp('SystemInfo.getProcessInfo')
for i in r.get('processInfo', []): for i in r.get('processInfo', []):
if i['type'] == 'browser': if i['type'] == 'browser':
self._process_id = i['id'] self._process_id = i['id']
@ -92,7 +106,7 @@ class Browser(object):
except: except:
pass pass
self.run_cdp('Target.setDiscoverTargets', discover=True) self._run_cdp('Target.setDiscoverTargets', discover=True)
self._driver.set_callback('Target.targetDestroyed', self._onTargetDestroyed) self._driver.set_callback('Target.targetDestroyed', self._onTargetDestroyed)
self._driver.set_callback('Target.targetCreated', self._onTargetCreated) self._driver.set_callback('Target.targetCreated', self._onTargetCreated)
self._dl_mgr = DownloadManager(self) self._dl_mgr = DownloadManager(self)
@ -137,7 +151,7 @@ class Browser(object):
self._drivers.pop(tab_id, None) self._drivers.pop(tab_id, None)
self._all_drivers.pop(tab_id, None) self._all_drivers.pop(tab_id, None)
def run_cdp(self, cmd, **cmd_args): def _run_cdp(self, cmd, **cmd_args):
"""执行Chrome DevTools Protocol语句 """执行Chrome DevTools Protocol语句
:param cmd: 协议项目 :param cmd: 协议项目
:param cmd_args: 参数 :param cmd_args: 参数
@ -183,7 +197,7 @@ class Browser(object):
@property @property
def tabs_count(self): def tabs_count(self):
"""返回标签页数量""" """返回标签页数量"""
j = self.run_cdp('Target.getTargets')['targetInfos'] # 不要改用get避免卡死 j = self._run_cdp('Target.getTargets')['targetInfos'] # 不要改用get避免卡死
return len([i for i in j if i['type'] in ('page', 'webview') and not i['url'].startswith('devtools://')]) return len([i for i in j if i['type'] in ('page', 'webview') and not i['url'].startswith('devtools://')])
@property @property
@ -235,7 +249,7 @@ class Browser(object):
""" """
tab = None tab = None
if new_context: if new_context:
tab = self.run_cdp('Target.createBrowserContext')['browserContextId'] tab = self._run_cdp('Target.createBrowserContext')['browserContextId']
kwargs = {'url': ''} kwargs = {'url': ''}
if new_window: if new_window:
@ -245,7 +259,7 @@ class Browser(object):
if tab: if tab:
kwargs['browserContextId'] = tab kwargs['browserContextId'] = tab
tab = self.run_cdp('Target.createTarget', **kwargs)['targetId'] tab = self._run_cdp('Target.createTarget', **kwargs)['targetId']
while tab not in self._drivers: while tab not in self._drivers:
sleep(.1) sleep(.1)
tab = obj(self, tab) tab = obj(self, tab)
@ -401,14 +415,14 @@ class Browser(object):
:param tab_id: 标签页id :param tab_id: 标签页id
:return: None :return: None
""" """
self.run_cdp('Target.activateTarget', targetId=tab_id) self._run_cdp('Target.activateTarget', targetId=tab_id)
def reconnect(self): def reconnect(self):
"""断开重连""" """断开重连"""
self._driver.stop() self._driver.stop()
BrowserDriver.BROWSERS.pop(self.id) BrowserDriver.BROWSERS.pop(self.id)
self._driver = BrowserDriver(self.id, 'browser', self.address, self) self._driver = BrowserDriver(self.id, 'browser', self.address, self)
self.run_cdp('Target.setDiscoverTargets', discover=True) self._run_cdp('Target.setDiscoverTargets', discover=True)
self._driver.set_callback('Target.targetDestroyed', self._onTargetDestroyed) self._driver.set_callback('Target.targetDestroyed', self._onTargetDestroyed)
self._driver.set_callback('Target.targetCreated', self._onTargetCreated) self._driver.set_callback('Target.targetCreated', self._onTargetCreated)
@ -419,7 +433,7 @@ class Browser(object):
:return: None :return: None
""" """
try: try:
self.run_cdp('Browser.close') self._run_cdp('Browser.close')
except PageDisconnectedError: except PageDisconnectedError:
pass pass
self._driver.stop() self._driver.stop()
@ -433,7 +447,7 @@ class Browser(object):
return return
try: try:
pids = [pid['id'] for pid in self.run_cdp('SystemInfo.getProcessInfo')['processInfo']] pids = [pid['id'] for pid in self._run_cdp('SystemInfo.getProcessInfo')['processInfo']]
except: except:
return return
@ -516,18 +530,20 @@ def handle_options(addr_or_opts):
def run_browser(chromium_options): def run_browser(chromium_options):
"""连接浏览器""" """连接浏览器"""
is_exist = connect_browser(chromium_options) connect_browser(chromium_options)
try: try:
s = Session() s = Session()
s.trust_env = False s.trust_env = False
ws = s.get(f'http://{chromium_options.address}/json/version', headers={'Connection': 'close'}) ws = s.get(f'http://{chromium_options.address}/json/version', headers={'Connection': 'close'})
if not ws: if not ws:
raise BrowserConnectError('\n浏览器连接失败如使用全局代理须设置不代理127.0.0.1地址。') raise BrowserConnectError('\n浏览器连接失败,请确认浏览器是否启动。')
browser_id = ws.json()['webSocketDebuggerUrl'].split('/')[-1] json = ws.json()
browser_id = json['webSocketDebuggerUrl'].split('/')[-1]
is_headless = 'headless' in json['User-Agent'].lower()
ws.close() ws.close()
s.close() s.close()
except KeyError: except KeyError:
raise BrowserConnectError('浏览器版本太旧或此浏览器不支持接管。') raise BrowserConnectError('浏览器版本太旧或此浏览器不支持接管。')
except: except:
raise BrowserConnectError('\n浏览器连接失败,如使用全局代理须设置不代理127.0.0.1地址') raise BrowserConnectError('\n浏览器连接失败,请确认浏览器是否启动')
return is_exist, browser_id return is_headless, browser_id

View File

@ -24,6 +24,7 @@ class Browser(object):
version: str = ... version: str = ...
retry_times: int = ... retry_times: int = ...
retry_interval: float = ... retry_interval: float = ...
is_headless:bool = ...
_BROWSERS: dict = ... _BROWSERS: dict = ...
_chromium_options: ChromiumOptions = ... _chromium_options: ChromiumOptions = ...
@ -51,7 +52,7 @@ class Browser(object):
def _get_driver(self, tab_id: str, owner=None) -> Driver: ... def _get_driver(self, tab_id: str, owner=None) -> Driver: ...
def run_cdp(self, cmd, **cmd_args) -> dict: ... def _run_cdp(self, cmd, **cmd_args) -> dict: ...
@property @property
def process_id(self) -> Optional[int]: ... def process_id(self) -> Optional[int]: ...

View File

@ -21,7 +21,7 @@ class ChromiumOptions(object):
self._user = 'Default' self._user = 'Default'
self._prefs_to_del = [] self._prefs_to_del = []
self.clear_file_flags = False self.clear_file_flags = False
self._headless = None self._is_headless = False
if read_file is False: if read_file is False:
ini_path = False ini_path = False
@ -47,6 +47,10 @@ class ChromiumOptions(object):
self._load_mode = options.get('load_mode', 'normal') self._load_mode = options.get('load_mode', 'normal')
self._system_user_path = options.get('system_user_path', False) self._system_user_path = options.get('system_user_path', False)
self._existing_only = options.get('existing_only', False) self._existing_only = options.get('existing_only', False)
for i in self._arguments:
if i.startswith('--headless'):
self._is_headless = True
break
self._proxy = om.proxies.get('http', None) or om.proxies.get('https', None) self._proxy = om.proxies.get('http', None) or om.proxies.get('https', None)
@ -164,6 +168,11 @@ class ChromiumOptions(object):
"""返回连接失败时的重试间隔(秒)""" """返回连接失败时的重试间隔(秒)"""
return self._retry_interval return self._retry_interval
@property
def is_headless(self):
"""返回是否无头模式"""
return self._is_headless
def set_retry(self, times=None, interval=None): def set_retry(self, times=None, interval=None):
"""设置连接失败时的重试操作 """设置连接失败时的重试操作
:param times: 重试次数 :param times: 重试次数
@ -184,11 +193,19 @@ class ChromiumOptions(object):
""" """
self.remove_argument(arg) self.remove_argument(arg)
if value is not False: if value is not False:
if arg == '--headless' and value is None: if arg == '--headless':
self._arguments.append('--headless=new') if value == 'false':
self._is_headless = False
else:
if value is None:
value = 'new'
self._arguments.append(f'--headless={value}')
self._is_headless = True
else: else:
arg_str = arg if value is None else f'{arg}={value}' arg_str = arg if value is None else f'{arg}={value}'
self._arguments.append(arg_str) self._arguments.append(arg_str)
elif arg == '--headless':
self._is_headless = False
return self return self
def remove_argument(self, value): def remove_argument(self, value):
@ -312,7 +329,7 @@ class ChromiumOptions(object):
:param on_off: 开或关 :param on_off: 开或关
:return: 当前对象 :return: 当前对象
""" """
on_off = 'new' if on_off else 'false' on_off = 'new' if on_off else on_off
return self.set_argument('--headless', on_off) return self.set_argument('--headless', on_off)
def no_imgs(self, on_off=True): def no_imgs(self, on_off=True):

View File

@ -10,30 +10,31 @@ from typing import Union, Any, Literal, Optional, Tuple
class ChromiumOptions(object): class ChromiumOptions(object):
def __init__(self, read_file: [bool, None] = True, ini_path: Union[str, Path] = None): ini_path: Optional[str] = ...
self.ini_path: str = ... _driver_path: str = ...
self._driver_path: str = ... _user_data_path: Optional[str] = ...
self._user_data_path: str = ... _download_path: str = ...
self._download_path: str = ... _tmp_path: str = ...
self._tmp_path: str = ... _arguments: list = ...
self._arguments: list = ... _browser_path: str = ...
self._browser_path: str = ... _user: str = ...
self._user: str = ... _load_mode: str = ...
self._load_mode: str = ... _timeouts: dict = ...
self._timeouts: dict = ... _proxy: str = ...
self._proxy: str = ... _address: str = ...
self._address: str = ... _extensions: list = ...
self._extensions: list = ... _prefs: dict = ...
self._prefs: dict = ... _flags: dict = ...
self._flags: dict = ... _prefs_to_del: list = ...
self._prefs_to_del: list = ... clear_file_flags: bool = ...
self.clear_file_flags: bool = ... _auto_port: bool = ...
self._auto_port: bool = ... _system_user_path: bool = ...
self._system_user_path: bool = ... _existing_only: bool = ...
self._existing_only: bool = ... _retry_times: int = ...
self._headless: bool = ... _retry_interval: float = ...
self._retry_times: int = ... _is_headless: bool = ...
self._retry_interval: float = ...
def __init__(self, read_file: [bool, None] = True, ini_path: Union[str, Path] = None): ...
@property @property
def download_path(self) -> str: ... def download_path(self) -> str: ...
@ -89,6 +90,9 @@ class ChromiumOptions(object):
@property @property
def retry_interval(self) -> float: ... def retry_interval(self) -> float: ...
@property
def is_headless(self) -> bool: ...
def set_retry(self, times: int = None, interval: float = None) -> ChromiumOptions: ... def set_retry(self, times: int = None, interval: float = None) -> ChromiumOptions: ...
def set_argument(self, arg: str, value: Union[str, None, bool] = None) -> ChromiumOptions: ... def set_argument(self, arg: str, value: Union[str, None, bool] = None) -> ChromiumOptions: ...

View File

@ -30,11 +30,6 @@ def connect_browser(option):
ip, port = address.split(':') ip, port = address.split(':')
if ip != '127.0.0.1' or port_is_using(ip, port) or option.is_existing_only: if ip != '127.0.0.1' or port_is_using(ip, port) or option.is_existing_only:
test_connect(ip, port) test_connect(ip, port)
option._headless = False
for i in option.arguments:
if i.startswith('--headless') and not i.endswith('=false'):
option._headless = True
break
return True return True
# ----------创建浏览器进程---------- # ----------创建浏览器进程----------
@ -65,7 +60,6 @@ def get_launch_args(opt):
# ----------处理arguments----------- # ----------处理arguments-----------
result = set() result = set()
has_user_path = False has_user_path = False
headless = None
for i in opt.arguments: for i in opt.arguments:
if i.startswith(('--load-extension=', '--remote-debugging-port=')): if i.startswith(('--load-extension=', '--remote-debugging-port=')):
continue continue
@ -73,16 +67,6 @@ def get_launch_args(opt):
result.add(f'--user-data-dir={Path(i[16:]).absolute()}') result.add(f'--user-data-dir={Path(i[16:]).absolute()}')
has_user_path = True has_user_path = True
continue continue
elif i.startswith('--headless'):
if i == '--headless=false':
headless = False
continue
elif i == '--headless':
i = '--headless=new'
headless = True
else:
headless = True
result.add(i) result.add(i)
if not has_user_path and not opt.system_user_path: if not has_user_path and not opt.system_user_path:
@ -93,15 +77,7 @@ def get_launch_args(opt):
opt.set_user_data_path(path) opt.set_user_data_path(path)
result.add(f'--user-data-dir={path}') result.add(f'--user-data-dir={path}')
# if headless is None and system().lower() == 'linux': # 无界面Linux自动加入无头
# from os import popen
# r = popen('systemctl list-units | grep graphical.target')
# if 'graphical.target' not in r.read():
# headless = True
# result.add('--headless=new')
result = list(result) result = list(result)
opt._headless = headless
# ----------处理插件extensions------------- # ----------处理插件extensions-------------
ext = [str(Path(e).absolute()) for e in opt.extensions] ext = [str(Path(e).absolute()) for e in opt.extensions]