4.0.3.4(+)

修复多线程同时创建一个页面对象时报错问题;
优化一个性能问题;
stop_loading()保证状态变成完成;
auto_port()增加scope参数
This commit is contained in:
g1879 2024-01-21 01:08:54 +08:00
parent 4db2f71d15
commit 8aae35d31b
13 changed files with 123 additions and 111 deletions

View File

@ -14,4 +14,4 @@ from ._configs.chromium_options import ChromiumOptions
from ._configs.session_options import SessionOptions
__all__ = ['ChromiumPage', 'ChromiumOptions', 'SessionOptions', 'SessionPage', 'WebPage', '__version__']
__version__ = '4.0.3.3'
__version__ = '4.0.3.4'

View File

@ -53,11 +53,14 @@ class Browser(object):
self._connected = False
self._process_id = None
r = self.run_cdp('SystemInfo.getProcessInfo')
for i in r.get('processInfo', []):
if i['type'] == 'browser':
self._process_id = i['id']
break
try:
r = self.run_cdp('SystemInfo.getProcessInfo')
for i in r.get('processInfo', []):
if i['type'] == 'browser':
self._process_id = i['id']
break
except:
pass
self.run_cdp('Target.setDiscoverTargets', discover=True)
self._driver.set_callback('Target.targetDestroyed', self._onTargetDestroyed)
@ -69,13 +72,15 @@ class Browser(object):
:param owner: 使用该驱动的对象
:return: Driver对象
"""
d = self._drivers.pop(tab_id, Driver(tab_id, 'page', self.address, owner))
d = self._drivers.pop(tab_id, Driver(tab_id, 'page', self.address))
d.owner = owner
self._all_drivers.setdefault(tab_id, set()).add(d)
return d
def _onTargetCreated(self, **kwargs):
"""标签页创建时执行"""
if (kwargs['targetInfo']['type'] in ('page', 'webview')
and kwargs['targetInfo']['targetId'] not in self._all_drivers
and not kwargs['targetInfo']['url'].startswith('devtools://')):
try:
tab_id = kwargs['targetInfo']['targetId']

View File

@ -7,12 +7,8 @@
"""
from pathlib import Path
from re import search
from shutil import rmtree
from tempfile import gettempdir, TemporaryDirectory
from threading import Lock
from .options_manage import OptionsManager
from .._functions.tools import port_is_using, clean_folder
class ChromiumOptions(object):
@ -64,10 +60,6 @@ class ChromiumOptions(object):
'script': timeouts['script']}
self._auto_port = options.get('auto_port', False)
if self._auto_port:
port, path = PortFinder().get_port()
self._address = f'127.0.0.1:{port}'
self.set_argument('--user-data-dir', path)
others = om.others
self._retry_times = others.get('retry_times', 3)
@ -170,7 +162,7 @@ class ChromiumOptions(object):
@property
def is_auto_port(self):
"""返回是否使用自动端口和用户文件"""
"""返回是否使用自动端口和用户文件如指定范围则返回范围tuple"""
return self._auto_port
@property
@ -493,14 +485,15 @@ class ChromiumOptions(object):
self._system_user_path = on_off
return self
def auto_port(self, on_off=True, tmp_path=None):
def auto_port(self, on_off=True, tmp_path=None, scope=None):
"""自动获取可用端口
:param on_off: 是否开启自动获取端口号
:param tmp_path: 临时文件保存路径为None时保存到系统临时文件夹on_off为False时此参数无效
:param scope: 指定端口范围不含最后的数字为None则使用[9600-19600)
:return: 当前对象
"""
if on_off:
self._auto_port = True
self._auto_port = scope if scope else True
if tmp_path:
self._tmp_path = str(tmp_path)
else:
@ -618,41 +611,3 @@ class ChromiumOptions(object):
"""
on_off = None if on_off else False
return self.set_argument('--mute-audio', on_off)
class PortFinder(object):
used_port = {}
lock = Lock()
def __init__(self, path=None):
"""
:param path: 临时文件保存路径为None时使用系统临时文件夹
"""
tmp = Path(path) if path else Path(gettempdir()) / 'DrissionPage'
self.tmp_dir = tmp / 'UserTempFolder'
self.tmp_dir.mkdir(parents=True, exist_ok=True)
if not PortFinder.used_port:
clean_folder(self.tmp_dir)
def get_port(self):
"""查找一个可用端口
:return: 可以使用的端口和用户文件夹路径组成的元组
"""
with PortFinder.lock:
for i in range(9600, 19600):
if i in PortFinder.used_port:
continue
elif port_is_using('127.0.0.1', i):
PortFinder.used_port[i] = None
continue
path = TemporaryDirectory(dir=self.tmp_dir).name
PortFinder.used_port[i] = path
return i, path
for i in range(9600, 19600):
if port_is_using('127.0.0.1', i):
continue
rmtree(PortFinder.used_port[i], ignore_errors=True)
return i, TemporaryDirectory(dir=self.tmp_dir).name
raise OSError('未找到可用端口。')

View File

@ -6,8 +6,7 @@
@License : BSD 3-Clause.
"""
from pathlib import Path
from threading import Lock
from typing import Union, Tuple, Any, Literal, Optional
from typing import Union, Any, Literal, Optional, Tuple
class ChromiumOptions(object):
@ -82,7 +81,7 @@ class ChromiumOptions(object):
def is_existing_only(self) -> bool: ...
@property
def is_auto_port(self) -> bool: ...
def is_auto_port(self) -> Union[bool, Tuple[int, int]]: ...
@property
def retry_times(self) -> int: ...
@ -153,21 +152,13 @@ class ChromiumOptions(object):
def use_system_user_path(self, on_off: bool = True) -> ChromiumOptions: ...
def auto_port(self, on_off: bool = True, tmp_path: Union[str, Path] = None) -> ChromiumOptions: ...
def auto_port(self,
on_off: bool = True,
tmp_path: Union[str, Path] = None,
scope: Tuple[int, int] = None) -> ChromiumOptions: ...
def existing_only(self, on_off: bool = True) -> ChromiumOptions: ...
def save(self, path: Union[str, Path] = None) -> str: ...
def save_to_default(self) -> str: ...
class PortFinder(object):
used_port: dict = ...
lock: Lock = ...
tmp_dir: Path = ...
def __init__(self, path: Union[str, Path] = None): ...
@staticmethod
def get_port() -> Tuple[int, str]: ...

View File

@ -6,10 +6,9 @@
@License : BSD 3-Clause.
"""
from json import load, dump, JSONDecodeError
from os import popen
from os import environ
from pathlib import Path
from platform import system
from re import search
from subprocess import Popen, DEVNULL
from tempfile import gettempdir
from time import perf_counter, sleep
@ -330,23 +329,8 @@ def get_chrome_path():
pass
# -----------从系统变量中获取--------------
try:
paths = popen('set path').read().lower()
except:
return None
r = search(r'[^;]*chrome[^;]*', paths)
if r:
path = Path(r.group(0)) if 'chrome.exe' in r.group(0) else Path(r.group(0)) / 'chrome.exe'
if path.exists():
return str(path)
paths = paths.split(';')
for path in paths:
for path in environ.get('PATH', '').split(';'):
path = Path(path) / 'chrome.exe'
try:
if path.exists():
return str(path)

View File

@ -8,7 +8,9 @@
from pathlib import Path
from platform import system
from shutil import rmtree
from time import perf_counter, sleep
from tempfile import gettempdir, TemporaryDirectory
from threading import Lock
from time import perf_counter
from psutil import process_iter, AccessDenied, NoSuchProcess, ZombieProcess
@ -17,6 +19,47 @@ from ..errors import (ContextLostError, ElementLostError, CDPError, PageDisconne
AlertExistsError, WrongURLError, StorageError, CookieFormatError, JavaScriptError)
class PortFinder(object):
used_port = {}
lock = Lock()
def __init__(self, path=None):
"""
:param path: 临时文件保存路径为None时使用系统临时文件夹
"""
tmp = Path(path) if path else Path(gettempdir()) / 'DrissionPage'
self.tmp_dir = tmp / 'UserTempFolder'
self.tmp_dir.mkdir(parents=True, exist_ok=True)
if not PortFinder.used_port:
clean_folder(self.tmp_dir)
def get_port(self, scope=None):
"""查找一个可用端口
:param scope: 指定端口范围不含最后的数字为None则使用[9600-19600)
:return: 可以使用的端口和用户文件夹路径组成的元组
"""
with PortFinder.lock:
if scope in (True, None):
scope = (9600, 19600)
for i in range(scope[0], scope[1]):
if i in PortFinder.used_port:
continue
elif port_is_using('127.0.0.1', i):
PortFinder.used_port[i] = None
continue
path = TemporaryDirectory(dir=self.tmp_dir).name
PortFinder.used_port[i] = path
return i, path
for i in range(scope[0], scope[1]):
if port_is_using('127.0.0.1', i):
continue
rmtree(PortFinder.used_port[i], ignore_errors=True)
return i, TemporaryDirectory(dir=self.tmp_dir).name
raise OSError('未找到可用端口。')
def port_is_using(ip, port):
"""检查端口是否被占用
:param ip: 浏览器地址

View File

@ -7,12 +7,23 @@
"""
from os import popen
from pathlib import Path
from typing import Union
from types import FunctionType
from threading import Lock
from typing import Union, Tuple
from .._pages.chromium_page import ChromiumPage
class PortFinder(object):
used_port: dict = ...
lock: Lock = ...
tmp_dir: Path = ...
def __init__(self, path: Union[str, Path] = None): ...
@staticmethod
def get_port(scope: Tuple[int, int] = None) -> Tuple[int, str]: ...
def port_is_using(ip: str, port: Union[str, int]) -> bool: ...

View File

@ -651,11 +651,13 @@ class ChromiumBase(BasePage):
"""页面停止加载"""
try:
self.run_cdp('Page.stopLoading')
end_time = perf_counter() + 5
while self._ready_state != 'complete' and perf_counter() < end_time:
sleep(.1)
except (PageDisconnectedError, CDPError):
pass
end_time = perf_counter() + self.timeouts.page_load
while self._ready_state != 'complete' and perf_counter() < end_time:
sleep(.1)
finally:
self._ready_state = 'complete'
def remove_ele(self, loc_or_ele):
"""从页面上删除一个元素

View File

@ -6,13 +6,15 @@
@License : BSD 3-Clause.
"""
from pathlib import Path
from threading import Lock
from time import sleep, perf_counter
from requests import get
from .._base.browser import Browser
from .._configs.chromium_options import ChromiumOptions
from .._functions.browser import connect_browser
from .._configs.chromium_options import ChromiumOptions, PortFinder
from .._functions.tools import PortFinder
from .._pages.chromium_base import ChromiumBase, get_mhtml, get_pdf, Timeout
from .._pages.chromium_tab import ChromiumTab
from .._units.setter import ChromiumPageSetter
@ -34,7 +36,10 @@ class ChromiumPage(ChromiumBase):
opt = handle_options(addr_or_opts)
is_exist, browser_id = run_browser(opt)
if browser_id in cls.PAGES:
return cls.PAGES[browser_id]
r = cls.PAGES[browser_id]
while not hasattr(r, '_frame_id'):
sleep(.1)
return r
r = object.__new__(cls)
r._chromium_options = opt
r._is_exist = is_exist
@ -57,6 +62,7 @@ class ChromiumPage(ChromiumBase):
self._run_browser()
super().__init__(self.address, tab_id)
self._type = 'ChromiumPage'
self._lock = Lock()
self.set.timeouts(base=timeout)
self._page_init()
@ -146,16 +152,17 @@ class ChromiumPage(ChromiumBase):
:param id_or_num: 要获取的标签页id或序号为None时获取当前tab序号从1开始可传入负数获取倒数第几个不是视觉排列顺序而是激活顺序
:return: 标签页对象
"""
if isinstance(id_or_num, str):
return ChromiumTab(self, id_or_num)
elif isinstance(id_or_num, int):
return ChromiumTab(self, self.tabs[id_or_num - 1 if id_or_num > 0 else id_or_num])
elif id_or_num is None:
return ChromiumTab(self, self.tab_id)
elif isinstance(id_or_num, ChromiumTab):
return id_or_num
else:
raise TypeError(f'id_or_num需传入tab id或序号{id_or_num}')
with self._lock:
if isinstance(id_or_num, str):
return ChromiumTab(self, id_or_num)
elif isinstance(id_or_num, int):
return ChromiumTab(self, self.tabs[id_or_num - 1 if id_or_num > 0 else id_or_num])
elif id_or_num is None:
return ChromiumTab(self, self.tab_id)
elif isinstance(id_or_num, ChromiumTab):
return id_or_num
else:
raise TypeError(f'id_or_num需传入tab id或序号{id_or_num}')
def find_tabs(self, title=None, url=None, tab_type=None, single=True):
"""查找符合条件的tab返回它们的id组成的列表
@ -269,13 +276,18 @@ def handle_options(addr_or_opts):
"""
if not addr_or_opts:
_chromium_options = ChromiumOptions(addr_or_opts)
if _chromium_options.is_auto_port:
port, path = PortFinder(_chromium_options.tmp_path).get_port(_chromium_options.is_auto_port)
_chromium_options.set_address(f'127.0.0.1:{port}')
_chromium_options.set_user_data_path(path)
_chromium_options.auto_port(scope=_chromium_options.is_auto_port)
elif isinstance(addr_or_opts, ChromiumOptions):
if addr_or_opts.is_auto_port:
port, path = PortFinder(addr_or_opts.tmp_path).get_port()
port, path = PortFinder(addr_or_opts.tmp_path).get_port(addr_or_opts.is_auto_port)
addr_or_opts.set_address(f'127.0.0.1:{port}')
addr_or_opts.set_user_data_path(path)
addr_or_opts.auto_port()
addr_or_opts.auto_port(scope=addr_or_opts.is_auto_port)
_chromium_options = addr_or_opts
elif isinstance(addr_or_opts, str):

View File

@ -6,6 +6,7 @@
@License : BSD 3-Clause.
"""
from pathlib import Path
from threading import Lock
from typing import Union, Tuple, List, Optional
from .._base.browser import Browser
@ -34,6 +35,7 @@ class ChromiumPage(ChromiumBase):
self._browser_id: str = ...
self._rect: Optional[TabRect] = ...
self._is_exist: bool = ...
self._lock: Lock = ...
def _handle_options(self, addr_or_opts: Union[str, ChromiumOptions]) -> str: ...

View File

@ -6,6 +6,7 @@
@License : BSD 3-Clause.
"""
from copy import copy
from time import sleep
from .._base.base import BasePage
from .._configs.session_options import SessionOptions
@ -27,7 +28,10 @@ class ChromiumTab(ChromiumBase):
:param tab_id: 要控制的标签页id
"""
if Settings.singleton_tab_obj and tab_id in cls.TABS:
return cls.TABS[tab_id]
r = cls.TABS[tab_id]
while not hasattr(r, '_frame_id'):
sleep(.1)
return r
r = object.__new__(cls)
cls.TABS[tab_id] = r
return r

View File

@ -6,6 +6,7 @@
@License : BSD 3-Clause.
"""
from pathlib import Path
from threading import Lock
from typing import Union, Tuple, Any, List, Optional
from requests import Session, Response
@ -26,6 +27,7 @@ from .._units.waiter import TabWaiter
class ChromiumTab(ChromiumBase):
TABS: dict = ...
LOCK: Lock = ...
def __new__(cls, page: ChromiumPage, tab_id: str): ...

View File

@ -1,12 +1,13 @@
# -*- coding:utf-8 -*-
from setuptools import setup, find_packages
from DrissionPage import __version__
with open("README.md", "r", encoding='utf-8') as fh:
long_description = fh.read()
setup(
name="DrissionPage",
version="4.0.3.2",
version=__version__,
author="g1879",
author_email="g1879@qq.com",
description="Python based web automation tool. It can control the browser and send and receive data packets.",