g1879 adcc564997 3.2.35修复问题(详)
接管浏览器无须'--remote-allow-origins=*'参数
tabs属性忽略隐私声明
修复 8x 版浏览器选择下拉列表时报错问题
修复某些情况下下拉框不触发联动的问题
修复配置文件损坏时出现的问题
修复get()方法url含某些特殊字符时连接失败的问题
2023-11-30 14:54:49 +08:00

236 lines
7.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding:utf-8 -*-
"""
@Author : g1879
@Contact : g1879@qq.com
"""
from json import load, dump, JSONDecodeError
from pathlib import Path
from platform import system
from subprocess import Popen, DEVNULL
from tempfile import gettempdir
from time import perf_counter, sleep
from requests import get as requests_get
from .tools import port_is_using
from ..configs.chromium_options import ChromiumOptions
from ..errors import BrowserConnectError
def connect_browser(option):
"""连接或启动浏览器
:param option: DriverOptions对象
:return: chrome 路径和进程对象组成的元组
"""
debugger_address = option.debugger_address.replace('localhost', '127.0.0.1').lstrip('http://').lstrip('https://')
chrome_path = option.browser_path
ip, port = debugger_address.split(':')
if ip != '127.0.0.1':
test_connect(ip, port)
return None, None
if port_is_using(ip, port):
test_connect(ip, port)
return None, None
args = get_launch_args(option)
set_prefs(option)
# ----------创建浏览器进程----------
try:
debugger = _run_browser(port, chrome_path, args)
# 传入的路径找不到主动在ini文件、注册表、系统变量中找
except FileNotFoundError:
from ..easy_set import get_chrome_path
chrome_path = get_chrome_path(show_msg=False)
if not chrome_path:
raise FileNotFoundError('无法找到chrome路径请手动配置。')
debugger = _run_browser(port, chrome_path, args)
test_connect(ip, port)
return chrome_path, debugger
def get_launch_args(opt):
"""从DriverOptions获取命令行启动参数
:param opt: DriverOptions或ChromiumOptions
:return: 启动参数列表
"""
# ----------处理arguments-----------
result = set()
has_user_path = False
headless = False
for i in opt.arguments:
if i.startswith(('--load-extension=', '--remote-debugging-port=')):
continue
elif i.startswith('--user-data-dir') and not opt.system_user_path:
result.add(f'--user-data-dir={Path(i[16:]).absolute()}')
has_user_path = True
continue
elif i.startswith('--headless'):
headless = True
result.add(i)
if not has_user_path and not opt.system_user_path:
port = opt.debugger_address.split(':')[-1] if opt.debugger_address else '0'
path = Path(gettempdir()) / 'DrissionPage' / f'userData_{port}'
path.mkdir(parents=True, exist_ok=True)
result.add(f'--user-data-dir={path}')
if not headless and system().lower() == 'linux':
from os import popen
r = popen('systemctl list-units | grep graphical.target')
if 'graphical.target' not in r.read():
result.add('--headless=new')
result = list(result)
# ----------处理插件extensions-------------
ext = opt.extensions if isinstance(opt, ChromiumOptions) else opt._extension_files
if ext:
ext = ','.join(set(ext))
ext = f'--load-extension={ext}'
result.append(ext)
return result
def set_prefs(opt):
"""处理启动配置中的prefs项目前只能对已存在文件夹配置
:param opt: DriverOptions或ChromiumOptions
:return: None
"""
if isinstance(opt, ChromiumOptions):
prefs = opt.preferences
del_list = opt._prefs_to_del
else:
prefs = opt.experimental_options.get('prefs', [])
del_list = []
if not opt.user_data_path:
return
args = opt.arguments
user = 'Default'
for arg in args:
if arg.startswith('--profile-directory'):
user = arg.split('=')[-1].strip()
break
prefs_file = Path(opt.user_data_path) / user / 'Preferences'
if not prefs_file.exists():
prefs_file.parent.mkdir(parents=True, exist_ok=True)
with open(prefs_file, 'w') as f:
f.write('{}')
with open(prefs_file, "r", encoding='utf-8') as f:
try:
prefs_dict = load(f)
except JSONDecodeError:
prefs_dict = {}
for pref in prefs:
value = prefs[pref]
pref = pref.split('.')
_make_leave_in_dict(prefs_dict, pref, 0, len(pref))
_set_value_to_dict(prefs_dict, pref, value)
for pref in del_list:
_remove_arg_from_dict(prefs_dict, pref)
with open(prefs_file, 'w', encoding='utf-8') as f:
dump(prefs_dict, f)
def test_connect(ip, port):
"""测试浏览器是否可用
:param ip: 浏览器ip
:param port: 浏览器端口
:return: None
"""
end_time = perf_counter() + 30
while perf_counter() < end_time:
try:
tabs = requests_get(f'http://{ip}:{port}/json', timeout=10).json()
for tab in tabs:
if tab['type'] == 'page':
return
except Exception:
sleep(.2)
if ip in ('127.0.0.1', 'localhost'):
raise BrowserConnectError(f'\n连接浏览器失败,可能原因:\n1、浏览器未启动\n2、{port}端口不是Chromium内核浏览器\n'
f'3、该浏览器未允许控制\n4、和已打开的浏览器冲突\n'
f'请尝试用ChromiumOptions指定别的端口和指定浏览器路径')
raise BrowserConnectError(f'{ip}:{port}浏览器无法链接。')
def _run_browser(port, path: str, args) -> Popen:
"""创建chrome进程
:param port: 端口号
:param path: 浏览器路径
:param args: 启动参数
:return: 进程对象
"""
p = Path(path)
p = str(p / 'chrome') if p.is_dir() else str(path)
arguments = [p, f'--remote-debugging-port={port}']
arguments.extend(args)
try:
return Popen(arguments, shell=False, stdout=DEVNULL, stderr=DEVNULL)
except FileNotFoundError:
raise FileNotFoundError('未找到浏览器,请手动指定浏览器可执行文件路径。')
def _make_leave_in_dict(target_dict: dict, src: list, num: int, end: int) -> None:
"""把prefs中a.b.c形式的属性转为a['b']['c']形式
:param target_dict: 要处理的字典
:param src: 属性层级列表[a, b, c]
:param num: 当前处理第几个
:param end: src长度
:return: None
"""
if num == end:
return
if src[num] not in target_dict:
target_dict[src[num]] = {}
num += 1
_make_leave_in_dict(target_dict[src[num - 1]], src, num, end)
def _set_value_to_dict(target_dict: dict, src: list, value) -> None:
"""把a.b.c形式的属性的值赋值到a['b']['c']形式的字典中
:param target_dict: 要处理的字典
:param src: 属性层级列表[a, b, c]
:param value: 属性值
:return: None
"""
src = "']['".join(src)
src = f"target_dict['{src}']=value"
exec(src)
def _remove_arg_from_dict(target_dict: dict, arg: str) -> None:
"""把a.b.c形式的属性从字典中删除
:param target_dict: 要处理的字典
:param arg: 层级属性,形式'a.b.c'
:return: None
"""
args = arg.split('.')
args = [f"['{i}']" for i in args]
src = ''.join(args)
src = f"target_dict{src}"
try:
exec(src)
src = ''.join(args[:-1])
src = f"target_dict{src}.pop({args[-1][1:-1]})"
exec(src)
except:
pass