优化auto_port()逻辑

This commit is contained in:
g1879 2024-06-29 17:22:22 +08:00
parent a877f646e0
commit fddc472ad5
4 changed files with 33 additions and 24 deletions

View File

@ -512,7 +512,7 @@ class ChromiumOptions(object):
:return: 当前对象 :return: 当前对象
""" """
if on_off: if on_off:
self._auto_port = scope if scope else True self._auto_port = scope if scope else (9600, 19600)
if tmp_path: if tmp_path:
self._tmp_path = str(tmp_path) self._tmp_path = str(tmp_path)
else: else:

View File

@ -27,7 +27,7 @@ class ChromiumOptions(object):
_flags: dict = ... _flags: dict = ...
_prefs_to_del: list = ... _prefs_to_del: list = ...
clear_file_flags: bool = ... clear_file_flags: bool = ...
_auto_port: bool = ... _auto_port: Union[Tuple[int, int], False] = ...
_system_user_path: bool = ... _system_user_path: bool = ...
_existing_only: bool = ... _existing_only: bool = ...
_retry_times: int = ... _retry_times: int = ...

View File

@ -8,7 +8,7 @@
from pathlib import Path from pathlib import Path
from platform import system from platform import system
from shutil import rmtree from shutil import rmtree
from tempfile import gettempdir, TemporaryDirectory from tempfile import gettempdir
from threading import Lock from threading import Lock
from time import perf_counter, sleep from time import perf_counter, sleep
@ -18,8 +18,10 @@ from ..errors import (ContextLostError, ElementLostError, CDPError, PageDisconne
class PortFinder(object): class PortFinder(object):
used_port = {} used_port = set()
prev_time = None
lock = Lock() lock = Lock()
checked_paths = set()
def __init__(self, path=None): def __init__(self, path=None):
""" """
@ -28,34 +30,39 @@ class PortFinder(object):
tmp = Path(path) if path else Path(gettempdir()) / 'DrissionPage' tmp = Path(path) if path else Path(gettempdir()) / 'DrissionPage'
self.tmp_dir = tmp / 'UserTempFolder' self.tmp_dir = tmp / 'UserTempFolder'
self.tmp_dir.mkdir(parents=True, exist_ok=True) self.tmp_dir.mkdir(parents=True, exist_ok=True)
if not PortFinder.used_port: if str(self.tmp_dir.absolute()) not in PortFinder.checked_paths:
clean_folder(self.tmp_dir) for i in self.tmp_dir.iterdir():
if i.is_dir() and i.stem.startswith('AutoPort') and not port_is_using('127.0.0.1', i.name[8:]):
rmtree(i, ignore_errors=True)
PortFinder.checked_paths.add(str(self.tmp_dir.absolute()))
def get_port(self, scope=None): def get_port(self, scope=None):
"""查找一个可用端口 """查找一个可用端口
:param scope: 指定端口范围不含最后的数字为None则使用[9600-19600) :param scope: 指定端口范围不含最后的数字为None则使用[9600-19600)
:return: 可以使用的端口和用户文件夹路径组成的元组 :return: 可以使用的端口和用户文件夹路径组成的元组
""" """
from random import randint
with PortFinder.lock: with PortFinder.lock:
if PortFinder.prev_time and perf_counter() - PortFinder.prev_time > 30:
PortFinder.used_port.clear()
PortFinder.prev_time = perf_counter()
if scope in (True, None): if scope in (True, None):
scope = (9600, 19600) scope = (9600, 19600)
for i in range(scope[0], scope[1]): msx_times = scope[1] - scope[0]
if i in PortFinder.used_port: times = 0
while times < msx_times:
times += 1
port = randint(*scope)
if port in PortFinder.used_port or port_is_using('127.0.0.1', port):
continue continue
elif port_is_using('127.0.0.1', i): path = self.tmp_dir / f'AutoPort{port}'
PortFinder.used_port[i] = None if path.exists():
continue try:
path = TemporaryDirectory(dir=self.tmp_dir).name rmtree(path)
PortFinder.used_port[i] = path except:
return i, path continue
return port, str(path)
for i in range(scope[0], scope[1]): raise OSError('未找到可用端口。')
if port_is_using('127.0.0.1', i):
continue
rmtree(PortFinder.used_port[i], ignore_errors=True)
return i, TemporaryDirectory(dir=self.tmp_dir).name
raise OSError('未找到可用端口。')
def port_is_using(ip, port): def port_is_using(ip, port):
@ -95,7 +102,7 @@ def show_or_hide_browser(page, hide=True):
:param hide: 是否隐藏 :param hide: 是否隐藏
:return: None :return: None
""" """
if not page.address.startswith(('127.0.0.1', 'localhost')): if not page.browser.address.startswith(('127.0.0.1', 'localhost')):
return return
if system().lower() != 'windows': if system().lower() != 'windows':

View File

@ -14,9 +14,11 @@ from .._pages.chromium_base import ChromiumBase
class PortFinder(object): class PortFinder(object):
used_port: dict = ... used_port: set = ...
prev_time: float = ...
lock: Lock = ... lock: Lock = ...
tmp_dir: Path = ... tmp_dir: Path = ...
checked_paths: set = ...
def __init__(self, path: Union[str, Path] = None): ... def __init__(self, path: Union[str, Path] = None): ...