Pre Merge pull request !16 from kedaji/master

This commit is contained in:
kedaji 2023-01-13 07:03:31 +00:00 committed by Gitee
commit 2bcf254eb4
No known key found for this signature in database
GPG Key ID: 173E9B9CA92EEF8F
7 changed files with 187 additions and 1 deletions

4
.gitignore vendored
View File

@ -6,6 +6,9 @@ __pycache__/
# C extensions
*.so
# Intellij IDEA
.idea/
# Distribution / packaging
.Python
build/
@ -20,6 +23,7 @@ parts/
sdist/
var/
wheels/
test/
share/python-wheels/
*.egg-info/
.installed.cfg

View File

@ -14,3 +14,4 @@ from .session_page import SessionPage
from .drission import Drission
from .config import DriverOptions, SessionOptions
from .action_chains import ActionChains
from .dynamic_port_allocator import DynamicPortAllocator

View File

@ -3,12 +3,17 @@
@Author : g1879
@Contact : g1879@qq.com
"""
import os
import shutil
import sys
from configparser import RawConfigParser, NoSectionError, NoOptionError
from http.cookiejar import Cookie
from pathlib import Path
from requests.cookies import RequestsCookieJar
from selenium.webdriver.chrome.options import Options
from .dynamic_port_allocator import DynamicPortAllocator
class OptionsManager(object):
@ -475,6 +480,8 @@ class DriverOptions(Options):
"""
super().__init__()
self._user_data_path = None
self.multiple_open_browser = None
self.default_user_data_parent = None
if read_file:
self.ini_path = ini_path or str(Path(__file__).parent / 'configs.ini')
@ -486,7 +493,14 @@ class DriverOptions(Options):
self._arguments = options_dict.get('arguments', [])
self._extensions = options_dict.get('extensions', [])
self._experimental_options = options_dict.get('experimental_options', {})
self.default_user_data_parent = options_dict.get("default_user_data_parent", os.path.dirname(sys.argv[0]))
# 使用中间值避免重复分配的问题
tmp_bool = options_dict.get("multiple_open_browser", True)
if tmp_bool is True:
self.set_multiple_open_browser()
else:
self._debugger_address = options_dict.get('debugger_address', None)
self.page_load_strategy = options_dict.get('page_load_strategy', 'normal')
for arg in self._arguments:
@ -502,6 +516,13 @@ class DriverOptions(Options):
self.timeouts = {'implicit': 10, 'pageLoad': 30, 'script': 30}
self._debugger_address = '127.0.0.1:9222'
def __del__(self):
"""清理生成的目录"""
print("调用del")
for key in DynamicPortAllocator.mapping:
if key is str:
shutil.rmtree(key)
@property
def driver_path(self):
"""chromedriver文件路径"""
@ -732,6 +753,28 @@ class DriverOptions(Options):
self.page_load_strategy = value.lower()
return self
def set_multiple_open_browser(self, default_user_data_parent=None):
"""设置是否支持多开浏览器
开启后系统将自动分配浏览器端口号与用户数据目录不会产生浏览器端口冲突的问题但是浏览器接管功能将会失效
:param default_user_data_parent 浏览器用户数据目录的父目录如果不设置将使用配置文件中的预设的目录如果配置文件也没有设置默认数据父目录系统将在脚本执行的路径设置为默认数据父目录
"""
if self.multiple_open_browser is not True:
if default_user_data_parent is not None:
self.default_user_data_parent = default_user_data_parent
# 从配置文件中读取默认用户数据父级目录
DynamicPortAllocator.default_user_data_parent = self.default_user_data_parent
if not os.path.exists(DynamicPortAllocator.default_user_data_parent):
os.makedirs(DynamicPortAllocator.default_user_data_parent)
# 端口和调试地址将不再从配置文件中读取,而是直接动态分配
port_and_user_data_path = DynamicPortAllocator.allocate()
self._user_data_path = port_and_user_data_path[1]
self.set_argument('--user-data-dir', self._user_data_path)
port = port_and_user_data_path[0]
self.debugger_address = f"127.0.0.1:{port}"
self.multiple_open_browser = True
return self
def set_paths(self, driver_path=None, chrome_path=None, browser_path=None, local_port=None,
debugger_address=None, download_path=None, user_data_path=None, cache_path=None):
"""快捷的路径设置函数 \n
@ -754,6 +797,7 @@ class DriverOptions(Options):
if browser_path is not None:
self.binary_location = str(browser_path)
# TODO 允许用户自定义端口
if local_port is not None:
self.debugger_address = '' if local_port == '' else f'127.0.0.1:{local_port}'

View File

@ -158,6 +158,8 @@ class SessionOptions(object):
class DriverOptions(Options):
def __init__(self, read_file: bool = True, ini_path: str = None):
self.default_user_data_parent = None
self.multiple_open_browser = False
self.ini_path: str = ...
self._driver_path: str = ...
self._user_data_path: str = ...
@ -215,6 +217,8 @@ class DriverOptions(Options):
def set_page_load_strategy(self, value: str) -> DriverOptions: ...
def set_multiple_open_browser(self, default_user_data_parent: str=None) -> 'DriverOptions': ...
def set_paths(self,
driver_path: Union[str, Path] = None,
chrome_path: Union[str, Path] = None,

View File

@ -10,6 +10,9 @@ extensions = []
experimental_options = {'prefs': {'profile.default_content_settings.popups': 0, 'profile.default_content_setting_values': {'notifications': 2}, 'plugins.plugins_list': [{'enabled': False, 'name': 'Chrome PDF Viewer'}]}, 'useAutomationExtension': False, 'excludeSwitches': ['enable-automation']}
timeouts = {'implicit': 10.0, 'pageLoad': 30.0, 'script': 30.0}
page_load_strategy = normal
multiple_open_browser = True
default_user_data_parent = D:/tmp
[session_options]
headers = {

View File

@ -0,0 +1,109 @@
"""
@author: kedaji
"""
import os
import socket
import shutil
class DynamicPortAllocator:
"""动态端口分配器"""
mapping = {}
default_user_data_parent = None
def __init__(self):
"""没有什么用的初始化方法只是为了复合Python代码规范"""
pass
@staticmethod
def allocate(port=None, user_data_dir=None):
"""将端口与用户数据目录做个映射,如果没有设置端口和数据目录,系统将动态分配端口和用户数据目录"""
# 是否已指定端口
if port is None:
# 动态分配端口从min_port开始向后查询系统未使用的端口号遇到空闲端口就进行分配查到max_port停止
# TODO 动态分配端口的范围从配置文件中读取
min_port = 8080
max_port = 10086
allocated = False
for tmp_port in range(min_port, max_port + 1):
if DynamicPortAllocator.port_allocated(tmp_port):
continue
if not DynamicPortAllocator.port_occupied(tmp_port):
port = tmp_port
allocated = True
break
if not allocated:
raise BlockingIOError("未发现可使用的端口,当前的端口选择范围:" + str(min_port) + " ~ " + str(max_port))
else:
# 查询端口是否已被本程序使用
if DynamicPortAllocator.port_allocated(port):
raise BlockingIOError(str(port) + " 端口已被分配")
# 查询端口是否已被操作系统的某程序所占用
if DynamicPortAllocator.port_occupied(port):
raise BlockingIOError(str(port) + " 端口已被占用")
# 是否已指定用户数据目录
if user_data_dir is None:
# 自动分配用户数据目录
if DynamicPortAllocator.default_user_data_parent is None:
raise ValueError("请设置默认的用户数据父级目录")
user_data_dir = DynamicPortAllocator.default_user_data_parent + os.path.sep + "chrome_" + str(port)
# 用户数据目录是否已被使用
if user_data_dir in DynamicPortAllocator.mapping.keys():
raise RecursionError("当前用户数据路径:" + user_data_dir + "\n已被端口为 " + str(port) + "的托管浏览器使用")
# 创建用户数据目录
if not os.path.exists(user_data_dir):
os.makedirs(user_data_dir)
else:
# 清空目录中的内容
shutil.rmtree(user_data_dir)
os.mkdir(user_data_dir)
# 映射
DynamicPortAllocator.mapping[port] = user_data_dir
DynamicPortAllocator.mapping[user_data_dir] = port
return port, user_data_dir
@staticmethod
def get_mapped_user_data_path(port):
"""获取端口对应的用户数据路径
:param port: 端口号
"""
return DynamicPortAllocator.mapping[port]
@staticmethod
def get_mapped_port(user_data_path):
"""获取对应用户数据路径对应的端口号
:param user_data_path: 用户数据目录
"""
return DynamicPortAllocator.mapping[user_data_path]
@staticmethod
def port_allocated(port):
"""查询端口是否已被分配至某个被托管的浏览器
:param port: 端口号
"""
if port in DynamicPortAllocator.mapping.keys():
return True
return False
@staticmethod
def port_occupied(port):
"""查询端口是否被操作系统中的某个进程所占用
:param port:端口号
"""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
result = sock.connect_ex(('127.0.0.1', port))
if result == 0:
return True
return False

View File

@ -0,0 +1,21 @@
"""
@author: kedaji
"""
class DynamicPortAllocator:
@staticmethod
def allocate(port: int, user_data_path: str) -> tuple: ...
@staticmethod
def get_mapped_user_data_path(port: int) -> str: ...
@staticmethod
def get_mapped_port(user_data_path: str) -> int: ...
@staticmethod
def port_allocated(port: int) -> bool: ...
@staticmethod
def port_occupied(port: int) -> bool: ...