Tab默认单例;增加TargetNotFoundError、Settings.singleton_tab_obj

This commit is contained in:
g1879 2024-01-11 21:22:20 +08:00
parent 1eaa56efdb
commit b19e62bac5
11 changed files with 67 additions and 30 deletions

View File

@ -63,12 +63,13 @@ class Browser(object):
self._driver.set_callback('Target.targetDestroyed', self._onTargetDestroyed) self._driver.set_callback('Target.targetDestroyed', self._onTargetDestroyed)
self._driver.set_callback('Target.targetCreated', self._onTargetCreated) self._driver.set_callback('Target.targetCreated', self._onTargetCreated)
def _get_driver(self, tab_id): def _get_driver(self, tab_id, owner=None):
"""获取对应tab id的Driver """获取对应tab id的Driver
:param tab_id: 标签页id :param tab_id: 标签页id
:param owner: 使用该驱动的对象
:return: Driver对象 :return: Driver对象
""" """
return self._drivers.pop(tab_id, Driver(tab_id, 'page', self.address)) return self._drivers.pop(tab_id, Driver(tab_id, 'page', self.address, owner))
def _onTargetCreated(self, **kwargs): def _onTargetCreated(self, **kwargs):
"""标签页创建时执行""" """标签页创建时执行"""
@ -201,8 +202,8 @@ class Browser(object):
except TypeError: except TypeError:
pass pass
def _on_quit(self): def _on_disconnect(self):
self.page._on_quit() self.page._on_disconnect()
Browser.BROWSERS.pop(self.id, None) Browser.BROWSERS.pop(self.id, None)
if self.page._chromium_options.is_auto_port and self.page._chromium_options.user_data_path: if self.page._chromium_options.is_auto_port and self.page._chromium_options.user_data_path:
path = Path(self.page._chromium_options.user_data_path) path = Path(self.page._chromium_options.user_data_path)

View File

@ -28,7 +28,7 @@ class Browser(object):
def __init__(self, address: str, browser_id: str, page: ChromiumPage): ... def __init__(self, address: str, browser_id: str, page: ChromiumPage): ...
def _get_driver(self, tab_id: str) -> Driver: ... def _get_driver(self, tab_id: str, owner=None) -> Driver: ...
def run_cdp(self, cmd, **cmd_args) -> dict: ... def run_cdp(self, cmd, **cmd_args) -> dict: ...
@ -61,4 +61,4 @@ class Browser(object):
def quit(self, timeout: float = 5, force: bool = False) -> None: ... def quit(self, timeout: float = 5, force: bool = False) -> None: ...
def _on_quit(self) -> None: ... def _on_disconnect(self) -> None: ...

View File

@ -12,21 +12,23 @@ from time import perf_counter, sleep
from requests import get from requests import get
from websocket import (WebSocketTimeoutException, WebSocketConnectionClosedException, create_connection, from websocket import (WebSocketTimeoutException, WebSocketConnectionClosedException, create_connection,
WebSocketException) WebSocketException, WebSocketBadStatusException)
from ..errors import PageDisconnectedError from ..errors import PageDisconnectedError, TargetNotFoundError
class Driver(object): class Driver(object):
def __init__(self, tab_id, tab_type, address): def __init__(self, tab_id, tab_type, address, owner=None):
""" """
:param tab_id: 标签页id :param tab_id: 标签页id
:param tab_type: 标签页类型 :param tab_type: 标签页类型
:param address: 浏览器连接地址 :param address: 浏览器连接地址
:param owner: 创建这个驱动的对象
""" """
self.id = tab_id self.id = tab_id
self.address = address self.address = address
self.type = tab_type self.type = tab_type
self.owner = owner
self._debug = False self._debug = False
self.alert_flag = False # 标记alert出现跳过一条请求后复原 self.alert_flag = False # 标记alert出现跳过一条请求后复原
@ -195,7 +197,10 @@ class Driver(object):
def start(self): def start(self):
"""启动连接""" """启动连接"""
self._stopped.clear() self._stopped.clear()
self._ws = create_connection(self._websocket_url, enable_multithread=True, suppress_origin=True) try:
self._ws = create_connection(self._websocket_url, enable_multithread=True, suppress_origin=True)
except WebSocketBadStatusException as e:
raise TargetNotFoundError(f'找不到页面:{self.id}') if 'No such target id' in str(e) else e
self._recv_th.start() self._recv_th.start()
self._handle_event_th.start() self._handle_event_th.start()
return True return True
@ -230,6 +235,9 @@ class Driver(object):
self.method_results.clear() self.method_results.clear()
self.event_queue.queue.clear() self.event_queue.queue.clear()
if hasattr(self.owner, '_on_disconnect'):
self.owner._on_disconnect()
def set_callback(self, event, callback, immediate=False): def set_callback(self, event, callback, immediate=False):
"""绑定cdp event和回调方法 """绑定cdp event和回调方法
:param event: cdp event :param event: cdp event
@ -247,18 +255,17 @@ class Driver(object):
class BrowserDriver(Driver): class BrowserDriver(Driver):
BROWSERS = {} BROWSERS = {}
def __new__(cls, tab_id, tab_type, address, browser): def __new__(cls, tab_id, tab_type, address, owner):
if tab_id in cls.BROWSERS: if tab_id in cls.BROWSERS:
return cls.BROWSERS[tab_id] return cls.BROWSERS[tab_id]
return object.__new__(cls) return object.__new__(cls)
def __init__(self, tab_id, tab_type, address, browser): def __init__(self, tab_id, tab_type, address, owner):
if hasattr(self, '_created'): if hasattr(self, '_created'):
return return
self._created = True self._created = True
BrowserDriver.BROWSERS[tab_id] = self BrowserDriver.BROWSERS[tab_id] = self
super().__init__(tab_id, tab_type, address) super().__init__(tab_id, tab_type, address, owner)
self.browser = browser
def __repr__(self): def __repr__(self):
return f'<BrowserDriver {self.id}>' return f'<BrowserDriver {self.id}>'
@ -267,7 +274,3 @@ class BrowserDriver(Driver):
r = get(url, headers={'Connection': 'close'}) r = get(url, headers={'Connection': 'close'})
r.close() r.close()
return r return r
def _stop(self):
super()._stop()
self.browser._on_quit()

View File

@ -27,7 +27,7 @@ class Driver(object):
id: str id: str
address: str address: str
type: str type: str
# _debug: bool owner = ...
alert_flag: bool alert_flag: bool
_websocket_url: str _websocket_url: str
_cur_id: int _cur_id: int
@ -42,7 +42,7 @@ class Driver(object):
event_queue: Queue event_queue: Queue
immediate_event_queue: Queue immediate_event_queue: Queue
def __init__(self, tab_id: str, tab_type: str, address: str): ... def __init__(self, tab_id: str, tab_type: str, address: str, owner=None): ...
def _send(self, message: dict, timeout: float = None) -> dict: ... def _send(self, message: dict, timeout: float = None) -> dict: ...
@ -67,10 +67,10 @@ class Driver(object):
class BrowserDriver(Driver): class BrowserDriver(Driver):
BROWSERS: Dict[str, Driver] = ... BROWSERS: Dict[str, Driver] = ...
browser: Browser = ... owner: Browser = ...
def __new__(cls, tab_id: str, tab_type: str, address: str, browser: Browser): ... def __new__(cls, tab_id: str, tab_type: str, address: str, owner: Browser): ...
def __init__(self, tab_id: str, tab_type: str, address: str, browser: Browser): ... def __init__(self, tab_id: str, tab_type: str, address: str, owner: Browser): ...
def get(self, url) -> Response: ... def get(self, url) -> Response: ...

View File

@ -11,3 +11,4 @@ class Settings(object):
raise_when_ele_not_found = False raise_when_ele_not_found = False
raise_when_click_failed = False raise_when_click_failed = False
raise_when_wait_failed = False raise_when_wait_failed = False
singleton_tab_obj = True

View File

@ -124,7 +124,7 @@ class ChromiumBase(BasePage):
:return: None :return: None
""" """
self._is_loading = True self._is_loading = True
self._driver = self.browser._get_driver(tab_id) self._driver = self.browser._get_driver(tab_id, self)
self._alert = Alert() self._alert = Alert()
self._driver.set_callback('Page.javascriptDialogOpening', self._on_alert_open, immediate=True) self._driver.set_callback('Page.javascriptDialogOpening', self._on_alert_open, immediate=True)

View File

@ -245,7 +245,7 @@ class ChromiumPage(ChromiumBase):
""" """
self.browser.quit(timeout, force) self.browser.quit(timeout, force)
def _on_quit(self): def _on_disconnect(self):
"""浏览器退出时执行""" """浏览器退出时执行"""
ChromiumPage.PAGES.pop(self._browser_id, None) ChromiumPage.PAGES.pop(self._browser_id, None)

View File

@ -20,6 +20,11 @@ from .._units.waiter import PageWaiter
class ChromiumPage(ChromiumBase): class ChromiumPage(ChromiumBase):
PAGES: dict = ... PAGES: dict = ...
def __new__(cls,
addr_or_opts: Union[str, int, ChromiumOptions] = None,
tab_id: str = None,
timeout: float = None): ...
def __init__(self, def __init__(self,
addr_or_opts: Union[str, int, ChromiumOptions] = None, addr_or_opts: Union[str, int, ChromiumOptions] = None,
tab_id: str = None, tab_id: str = None,
@ -28,7 +33,7 @@ class ChromiumPage(ChromiumBase):
self._browser: Browser = ... self._browser: Browser = ...
self._browser_id: str = ... self._browser_id: str = ...
self._rect: Optional[TabRect] = ... self._rect: Optional[TabRect] = ...
self._is_exist:bool = ... self._is_exist: bool = ...
def _handle_options(self, addr_or_opts: Union[str, ChromiumOptions]) -> str: ... def _handle_options(self, addr_or_opts: Union[str, ChromiumOptions]) -> str: ...
@ -98,7 +103,7 @@ class ChromiumPage(ChromiumBase):
def quit(self, timeout: float = 5, force: bool = True) -> None: ... def quit(self, timeout: float = 5, force: bool = True) -> None: ...
def _on_quit(self) -> None: ... def _on_disconnect(self) -> None: ...
def get_rename(original: str, rename: str) -> str: ... def get_rename(original: str, rename: str) -> str: ...

View File

@ -9,6 +9,7 @@ from copy import copy
from .._base.base import BasePage from .._base.base import BasePage
from .._configs.session_options import SessionOptions from .._configs.session_options import SessionOptions
from .._functions.settings import Settings
from .._functions.web import set_session_cookies, set_browser_cookies from .._functions.web import set_session_cookies, set_browser_cookies
from .._pages.chromium_base import ChromiumBase, get_mhtml, get_pdf from .._pages.chromium_base import ChromiumBase, get_mhtml, get_pdf
from .._pages.session_page import SessionPage from .._pages.session_page import SessionPage
@ -18,12 +19,28 @@ from .._units.waiter import TabWaiter
class ChromiumTab(ChromiumBase): class ChromiumTab(ChromiumBase):
"""实现浏览器标签页的类""" """实现浏览器标签页的类"""
TABS = {}
def __init__(self, page, tab_id=None): def __new__(cls, page, tab_id):
""" """
:param page: ChromiumPage对象 :param page: ChromiumPage对象
:param tab_id: 要控制的标签页id不指定默认为激活的 :param tab_id: 要控制的标签页id
""" """
if Settings.singleton_tab_obj and tab_id in cls.TABS:
return cls.TABS[tab_id]
r = object.__new__(cls)
cls.TABS[tab_id] = r
return r
def __init__(self, page, tab_id):
"""
:param page: ChromiumPage对象
:param tab_id: 要控制的标签页id
"""
if Settings.singleton_tab_obj and hasattr(self, '_created'):
return
self._created = True
self._page = page self._page = page
self._browser = page.browser self._browser = page.browser
super().__init__(page.address, tab_id, page.timeout) super().__init__(page.address, tab_id, page.timeout)
@ -73,6 +90,9 @@ class ChromiumTab(ChromiumBase):
def __repr__(self): def __repr__(self):
return f'<ChromiumTab browser_id={self.browser.id} tab_id={self.tab_id}>' return f'<ChromiumTab browser_id={self.browser.id} tab_id={self.tab_id}>'
def _on_disconnect(self):
ChromiumTab.TABS.pop(self.tab_id, None)
class WebPageTab(SessionPage, ChromiumTab, BasePage): class WebPageTab(SessionPage, ChromiumTab, BasePage):
def __init__(self, page, tab_id): def __init__(self, page, tab_id):

View File

@ -25,8 +25,11 @@ from .._units.waiter import TabWaiter
class ChromiumTab(ChromiumBase): class ChromiumTab(ChromiumBase):
TABS: dict = ...
def __init__(self, page: ChromiumPage, tab_id: str = None): def __new__(cls, page: ChromiumPage, tab_id: str): ...
def __init__(self, page: ChromiumPage, tab_id: str):
self._page: ChromiumPage = ... self._page: ChromiumPage = ...
self._browser: Browser = ... self._browser: Browser = ...
self._rect: Optional[TabRect] = ... self._rect: Optional[TabRect] = ...

View File

@ -89,3 +89,7 @@ class StorageError(BaseError):
class CookieFormatError(BaseError): class CookieFormatError(BaseError):
_info = 'cookie格式不正确。' _info = 'cookie格式不正确。'
class TargetNotFoundError(BaseError):
_info = '找不到指定页面。'