console加上wait();优化MixTab逻辑;Chromium只获取MixTab

This commit is contained in:
g1879 2024-08-29 10:06:42 +08:00
parent e10030e23f
commit bbf1bc7e28
11 changed files with 180 additions and 249 deletions

View File

@ -13,4 +13,4 @@ from ._pages.mix_page import MixPage
from ._pages.mix_page import MixPage as WebPage
from ._pages.session_page import SessionPage
__version__ = '4.1.0.0b20'
__version__ = '4.1.0.0b21'

View File

@ -174,29 +174,15 @@ class Chromium(object):
return CookiesList(r)
def new_tab(self, url=None, new_window=False, background=False, new_context=False):
return self._new_tab(ChromiumTab, url=url, new_window=new_window,
background=background, new_context=new_context)
return self._new_tab(True, url=url, new_window=new_window, background=background, new_context=new_context)
def new_mix_tab(self, url=None, new_window=False, background=False, new_context=False):
return self._new_tab(MixTab, url=url, new_window=new_window,
background=background, new_context=new_context)
def get_tab(self, id_or_num=None, title=None, url=None, tab_type='page', as_id=False):
t = self._get_tab(id_or_num=id_or_num, title=title, url=url, tab_type=tab_type, as_id=as_id)
if t._type == 'MixTab':
raise RuntimeError('该标签页已有MixTab版本如需多对象公用请用Settings设置singleton_tab_obj为False。')
return t
def get_tabs(self, title=None, url=None, tab_type='page', as_id=False):
return self._get_tabs(title=title, url=url, tab_type=tab_type, as_id=as_id)
def get_mix_tab(self, id_or_num=None, title=None, url=None, tab_type='page'):
def get_tab(self, id_or_num=None, title=None, url=None, tab_type='page'):
t = self._get_tab(id_or_num=id_or_num, title=title, url=url, tab_type=tab_type, mix=True, as_id=False)
if t._type != 'MixTab':
raise RuntimeError('该标签页已有非MixTab版本如需多对象公用请用Settings设置singleton_tab_obj为False。')
return t
def get_mix_tabs(self, title=None, url=None, tab_type='page'):
def get_tabs(self, title=None, url=None, tab_type='page'):
return self._get_tabs(title=title, url=url, tab_type=tab_type, mix=True, as_id=False)
def close_tabs(self, tabs_or_ids=None, others=False):
@ -301,7 +287,8 @@ class Chromium(object):
path = Path(self._chromium_options.user_data_path)
rmtree(path, True)
def _new_tab(self, obj, url=None, new_window=False, background=False, new_context=False):
def _new_tab(self, mix=True, url=None, new_window=False, background=False, new_context=False):
obj = MixTab if mix else ChromiumTab
tab = None
if new_context:
tab = self._run_cdp('Target.createBrowserContext')['browserContextId']
@ -329,7 +316,7 @@ class Chromium(object):
tab.get(url)
return tab
def _get_tab(self, id_or_num=None, title=None, url=None, tab_type='page', mix=False, as_id=False):
def _get_tab(self, id_or_num=None, title=None, url=None, tab_type='page', mix=True, as_id=False):
if id_or_num is not None:
if isinstance(id_or_num, int):
id_or_num = self.tab_ids[id_or_num - 1 if id_or_num > 0 else id_or_num]
@ -353,7 +340,7 @@ class Chromium(object):
with self._lock:
return MixTab(self, id_or_num) if mix else ChromiumTab(self, id_or_num)
def _get_tabs(self, title=None, url=None, tab_type='page', mix=False, as_id=False):
def _get_tabs(self, title=None, url=None, tab_type='page', mix=True, as_id=False):
tabs = self._driver.get(f'http://{self.address}/json').json() # 不要改用cdp
if isinstance(tab_type, str):

View File

@ -122,7 +122,7 @@ class Chromium(object):
...
@property
def latest_tab(self) -> Union[ChromiumTab, MixTab, str]:
def latest_tab(self) -> Union[MixTab, str]:
"""返回最新的标签页,最新标签页指最后创建或最后被激活的
当Settings.singleton_tab_obj==True时返回Tab对象否则返回tab id"""
...
@ -138,21 +138,7 @@ class Chromium(object):
url: str = None,
new_window: bool = False,
background: bool = False,
new_context: bool = False) -> ChromiumTab:
"""新建一个标签页
:param url: 新标签页跳转到的网址为None时新建空标签页
:param new_window: 是否在新窗口打开标签页隐身模式下无效
:param background: 是否不激活新标签页隐身模式和访客模式及new_window为True时无效
:param new_context: 是否创建独立环境隐身模式和访客模式下无效
:return: 新标签页对象
"""
...
def new_mix_tab(self,
url: str = None,
new_window: bool = False,
background: bool = False,
new_context: bool = False) -> MixTab:
new_context: bool = False) -> MixTab:
"""新建一个标签页
:param url: 新标签页跳转到的网址为None时新建空标签页
:param new_window: 是否在新窗口打开标签页隐身模式下无效
@ -167,7 +153,7 @@ class Chromium(object):
title: str = None,
url: str = None,
tab_type: Union[str, list, tuple] = 'page',
as_id: bool = False) -> Union[ChromiumTab, str]:
as_id: bool = False) -> Union[MixTab, str]:
"""获取一个标签页对象id_or_num不为None时后面几个参数无效
:param id_or_num: 要获取的标签页id或序号序号从1开始可传入负数获取倒数第几个不是视觉排列顺序而是激活顺序
:param title: 要匹配title的文本模糊匹配为None则匹配所有
@ -182,7 +168,7 @@ class Chromium(object):
title: str = None,
url: str = None,
tab_type: Union[str, list, tuple] = 'page',
as_id: bool = False) -> List[ChromiumTab, str]:
as_id: bool = False) -> List[MixTab, str]:
"""查找符合条件的tab返回它们组成的列表title和url是与关系
:param title: 要匹配title的文本
:param url: 要匹配url的文本
@ -192,32 +178,6 @@ class Chromium(object):
"""
...
def get_mix_tab(self,
id_or_num: Union[str, int] = None,
title: str = None,
url: str = None,
tab_type: Union[str, list, tuple] = 'page') -> MixTab:
"""获取一个标签页对象id_or_num不为None时后面几个参数无效
:param id_or_num: 要获取的标签页id或序号序号从1开始可传入负数获取倒数第几个不是视觉排列顺序而是激活顺序为None时获取最后的
:param title: 要匹配title的文本模糊匹配为None则匹配所有
:param url: 要匹配url的文本模糊匹配为None则匹配所有
:param tab_type: tab类型可用列表输入多个 'page', 'iframe' 为None则匹配所有
:return: Tab对象
"""
...
def get_mix_tabs(self,
title: str = None,
url: str = None,
tab_type: Union[str, list, tuple] = 'page') -> List[MixTab]:
"""查找符合条件的tab返回它们组成的列表title和url是与关系
:param title: 要匹配title的文本
:param url: 要匹配url的文本
:param tab_type: tab类型可用列表输入多个
:return: Tab对象列表
"""
...
def close_tabs(self,
tabs_or_ids: Union[str, ChromiumTab, List[Union[str, ChromiumTab]],
Tuple[Union[str, ChromiumTab]]],
@ -258,7 +218,7 @@ class Chromium(object):
...
def _new_tab(self,
obj,
mix: bool = True,
url: str = None,
new_window: bool = False,
background: bool = False,
@ -278,7 +238,7 @@ class Chromium(object):
title: str = None,
url: str = None,
tab_type: Union[str, list, tuple] = 'page',
mix: bool = False,
mix: bool = True,
as_id: bool = False) -> Union[ChromiumTab, str]:
"""获取一个标签页对象id_or_num不为None时后面几个参数无效
:param id_or_num: 要获取的标签页id或序号序号从1开始可传入负数获取倒数第几个不是视觉排列顺序而是激活顺序
@ -295,7 +255,7 @@ class Chromium(object):
title: str = None,
url: str = None,
tab_type: Union[str, list, tuple] = 'page',
mix: bool = False,
mix: bool = True,
as_id: bool = False) -> List[ChromiumTab, str]:
"""查找符合条件的tab返回它们组成的列表title和url是与关系
:param title: 要匹配title的文本

View File

@ -8,6 +8,7 @@
from time import sleep
from .._base.chromium import Chromium
from .._functions.settings import Settings
from .._functions.web import save_page
from .._pages.chromium_base import ChromiumBase
from .._units.setter import ChromiumPageSetter
@ -77,7 +78,7 @@ class ChromiumPage(ChromiumBase):
@property
def latest_tab(self):
return self.browser.latest_tab
return self.browser._get_tab(id_or_num=self.tab_ids[0], as_id=not Settings.singleton_tab_obj)
@property
def process_id(self):
@ -95,13 +96,15 @@ class ChromiumPage(ChromiumBase):
return save_page(self, path, name, as_pdf, kwargs)
def get_tab(self, id_or_num=None, title=None, url=None, tab_type='page', as_id=False):
return self.browser.get_tab(id_or_num=id_or_num, title=title, url=url, tab_type=tab_type, as_id=as_id)
return self.browser._get_tab(id_or_num=id_or_num, title=title, url=url,
tab_type=tab_type, mix=False, as_id=as_id)
def get_tabs(self, title=None, url=None, tab_type='page', as_id=False):
return self.browser.get_tabs(title=title, url=url, tab_type=tab_type, as_id=as_id)
return self.browser._get_tabs(title=title, url=url, tab_type=tab_type, mix=False, as_id=as_id)
def new_tab(self, url=None, new_window=False, background=False, new_context=False):
return self.browser.new_tab(url=url, new_window=new_window, background=background, new_context=new_context)
return self.browser._new_tab(False, url=url, new_window=new_window,
background=background, new_context=new_context)
def activate_tab(self, id_ind_tab):
self.browser.activate_tab(id_ind_tab)

View File

@ -30,9 +30,10 @@ class MixPage(SessionPage, ChromiumPage, BasePage):
if hasattr(self, '_created'):
return
self._mode = mode.lower()
if self._mode not in ('s', 'd'):
mode = mode.lower()
if mode not in ('s', 'd'):
raise ValueError('mode参数只能是s或d。')
self._d_mode = mode == 'd'
self._has_driver = True
self._has_session = True
@ -42,13 +43,12 @@ class MixPage(SessionPage, ChromiumPage, BasePage):
chromium_options.set_timeouts(base=self._timeout).set_paths(download_path=self.download_path)
super(SessionPage, self).__init__(addr_or_opts=chromium_options, timeout=timeout)
self._type = 'MixPage'
self.change_mode(self._mode, go=False, copy_cookies=False)
self.change_mode(mode, go=False, copy_cookies=False)
def __call__(self, locator, index=1, timeout=None):
if self._mode == 'd':
if self._d_mode:
return super(SessionPage, self).__call__(locator, index=index, timeout=timeout)
elif self._mode == 's':
return super().__call__(locator, index=index)
return super().__call__(locator, index=index)
@property
def latest_tab(self):
@ -62,10 +62,7 @@ class MixPage(SessionPage, ChromiumPage, BasePage):
@property
def url(self):
if self._mode == 'd':
return self._browser_url
elif self._mode == 's':
return self._session_url
return self._browser_url if self._d_mode else self._session_url
@property
def _browser_url(self):
@ -73,31 +70,23 @@ class MixPage(SessionPage, ChromiumPage, BasePage):
@property
def title(self):
if self._mode == 's':
return super().title
elif self._mode == 'd':
return super(SessionPage, self).title
return super(SessionPage, self).title if self._d_mode else super().title
@property
def raw_data(self):
if self._mode == 's':
return super().raw_data
elif self._mode == 'd':
if self._d_mode:
return super(SessionPage, self).html if self._has_driver else ''
return super().raw_data
@property
def html(self):
if self._mode == 's':
return super().html
elif self._mode == 'd':
if self._d_mode:
return super(SessionPage, self).html if self._has_driver else ''
return super().html
@property
def json(self):
if self._mode == 's':
return super().json
elif self._mode == 'd':
return super(SessionPage, self).json
return super(SessionPage, self).json if self._d_mode else super().json
@property
def response(self):
@ -105,14 +94,11 @@ class MixPage(SessionPage, ChromiumPage, BasePage):
@property
def mode(self):
return self._mode
return 'd' if self._d_mode else 's'
@property
def user_agent(self):
if self._mode == 's':
return super().user_agent
elif self._mode == 'd':
return super(SessionPage, self).user_agent
return super(SessionPage, self).user_agent if self._d_mode else super().user_agent
@property
def session(self):
@ -126,15 +112,15 @@ class MixPage(SessionPage, ChromiumPage, BasePage):
@property
def timeout(self):
return self._timeout if self._mode == 's' else self.timeouts.base
return self.timeouts.base if self._d_mode else self._timeout
def get(self, url, show_errmsg=False, retry=None, interval=None, timeout=None, **kwargs):
if self._mode == 'd':
if self._d_mode:
return super(SessionPage, self).get(url, show_errmsg, retry, interval, timeout)
elif self._mode == 's':
if timeout is None:
timeout = self.timeouts.page_load if self._has_driver else self.timeout
return super().get(url, show_errmsg, retry, interval, timeout, **kwargs)
if timeout is None:
timeout = self.timeouts.page_load if self._has_driver else self.timeout
return super().get(url, show_errmsg, retry, interval, timeout, **kwargs)
def post(self, url, show_errmsg=False, retry=None, interval=None, **kwargs):
if self.mode == 'd':
@ -144,61 +130,51 @@ class MixPage(SessionPage, ChromiumPage, BasePage):
return super().post(url, show_errmsg, retry, interval, **kwargs)
def ele(self, locator, index=1, timeout=None):
if self._mode == 's':
return super().ele(locator, index=index)
elif self._mode == 'd':
return super(SessionPage, self).ele(locator, index=index, timeout=timeout)
return super(SessionPage, self).ele(locator, index=index, timeout=timeout) if self._d_mode \
else super().ele(locator, index=index)
def eles(self, locator, timeout=None):
if self._mode == 's':
return super().eles(locator)
elif self._mode == 'd':
return super(SessionPage, self).eles(locator, timeout=timeout)
return super(SessionPage, self).eles(locator, timeout=timeout) if self._d_mode else super().eles(locator)
def s_ele(self, locator=None, index=1):
if self._mode == 's':
return super().s_ele(locator, index=index)
elif self._mode == 'd':
return super(SessionPage, self).s_ele(locator, index=index)
return super(SessionPage, self).s_ele(locator,
index=index) if self._d_mode else super().s_ele(locator, index=index)
def s_eles(self, locator):
if self._mode == 's':
return super().s_eles(locator)
elif self._mode == 'd':
return super(SessionPage, self).s_eles(locator)
return super(SessionPage, self).s_eles(locator) if self._d_mode else super().s_eles(locator)
def change_mode(self, mode=None, go=True, copy_cookies=True):
if mode is not None and mode.lower() == self._mode:
if mode:
mode = mode.lower()
if mode is not None and ((mode == 'd' and self._d_mode) or (mode == 's' and not self._d_mode)):
return
self._mode = 's' if self._mode == 'd' else 'd'
self._d_mode = not self._d_mode
# s模式转d模式
if self._mode == 'd':
if self._d_mode:
if self._driver is None:
self._connect_browser(self._chromium_options)
self._url = None if not self._has_driver else super(SessionPage, self).url
self._has_driver = True
if self._session_url:
if copy_cookies:
self.cookies_to_browser()
if go:
self.get(self._session_url)
return
# d模式转s模式
elif self._mode == 's':
self._has_session = True
self._url = self._session_url
self._has_session = True
self._url = self._session_url
if self._has_driver:
if copy_cookies:
self.cookies_to_session()
if go and not self.get(super(SessionPage, self).url):
raise ConnectionError('s模式访问失败请设置go=False自行构造连接参数进行访问。')
if self._has_driver:
if copy_cookies:
self.cookies_to_session()
if go and not self.get(super(SessionPage, self).url):
raise ConnectionError('s模式访问失败请设置go=False自行构造连接参数进行访问。')
def cookies_to_session(self, copy_user_agent=True):
if not self._has_session:
@ -216,10 +192,8 @@ class MixPage(SessionPage, ChromiumPage, BasePage):
set_tab_cookies(self, super().cookies())
def cookies(self, all_domains=False, all_info=False):
if self._mode == 's':
return super().cookies(all_domains, all_info)
elif self._mode == 'd':
return super(SessionPage, self).cookies(all_domains, all_info)
return super(SessionPage, self).cookies(all_domains, all_info) if self._d_mode \
else super().cookies(all_domains, all_info)
def get_tab(self, id_or_num=None, title=None, url=None, tab_type='page', as_id=False):
return self.browser._get_tab(id_or_num=id_or_num, title=title, url=url,
@ -261,10 +235,9 @@ class MixPage(SessionPage, ChromiumPage, BasePage):
self._response.close()
def _find_elements(self, locator, timeout=None, index=1, relative=False, raise_err=None):
if self._mode == 's':
return super()._find_elements(locator, index=index)
elif self._mode == 'd':
if self._d_mode:
return super(SessionPage, self)._find_elements(locator, timeout=timeout, index=index, relative=relative)
return super()._find_elements(locator, index=index)
def quit(self, timeout=5, force=True, del_data=False):
if self._has_session:

View File

@ -24,7 +24,7 @@ from .._units.waiter import MixPageWaiter
class MixPage(SessionPage, ChromiumPage, BasePage):
_mode: str = ...
_d_mode: bool = ...
_set: MixPageSetter = ...
_has_driver: Optional[bool] = ...
_has_session: Optional[bool] = ...

View File

@ -20,7 +20,7 @@ class MixTab(SessionPage, ChromiumTab, BasePage):
if Settings.singleton_tab_obj and hasattr(self, '_created'):
return
self._mode = 'd'
self._d_mode = True
self._has_driver = True
self._has_session = True
super().__init__(session_or_options=browser._session_options or SessionOptions(
@ -29,10 +29,8 @@ class MixTab(SessionPage, ChromiumTab, BasePage):
self._type = 'MixTab'
def __call__(self, locator, index=1, timeout=None):
if self._mode == 'd':
return super(SessionPage, self).__call__(locator, index=index, timeout=timeout)
elif self._mode == 's':
return super().__call__(locator, index=index)
return super(SessionPage, self).__call__(locator, index=index, timeout=timeout) if self._d_mode \
else super().__call__(locator, index=index)
@property
def set(self):
@ -42,10 +40,7 @@ class MixTab(SessionPage, ChromiumTab, BasePage):
@property
def url(self):
if self._mode == 'd':
return self._browser_url
elif self._mode == 's':
return self._session_url
return self._browser_url if self._d_mode else self._session_url
@property
def _browser_url(self):
@ -53,31 +48,23 @@ class MixTab(SessionPage, ChromiumTab, BasePage):
@property
def title(self):
if self._mode == 's':
return super().title
elif self._mode == 'd':
return super(SessionPage, self).title
return super(SessionPage, self).title if self._d_mode else super().title
@property
def raw_data(self):
if self._mode == 's':
return super().raw_data
elif self._mode == 'd':
if self._d_mode:
return super(SessionPage, self).html if self._has_driver else ''
return super().raw_data
@property
def html(self):
if self._mode == 's':
return super().html
elif self._mode == 'd':
if self._d_mode:
return super(SessionPage, self).html if self._has_driver else ''
return super().html
@property
def json(self):
if self._mode == 's':
return super().json
elif self._mode == 'd':
return super(SessionPage, self).json
return super(SessionPage, self).json if self._d_mode else super().json
@property
def response(self):
@ -85,14 +72,11 @@ class MixTab(SessionPage, ChromiumTab, BasePage):
@property
def mode(self):
return self._mode
return 'd' if self._d_mode else 's'
@property
def user_agent(self):
if self._mode == 's':
return super().user_agent
elif self._mode == 'd':
return super(SessionPage, self).user_agent
return super(SessionPage, self).user_agent if self._d_mode else super().user_agent
@property
def session(self):
@ -106,17 +90,17 @@ class MixTab(SessionPage, ChromiumTab, BasePage):
@property
def timeout(self):
return self._timeout if self._mode == 's' else self.timeouts.base
return self.timeouts.base if self._d_mode else self._timeout
def get(self, url, show_errmsg=False, retry=None, interval=None, timeout=None, **kwargs):
if self._mode == 'd':
if self._d_mode:
if kwargs:
raise ValueError(f'以下参数在s模式下才会生效{" ".join(kwargs.keys())}')
return super(SessionPage, self).get(url, show_errmsg, retry, interval, timeout)
elif self._mode == 's':
if timeout is None:
timeout = self.timeouts.page_load if self._has_driver else self.timeout
return super().get(url, show_errmsg, retry, interval, timeout, **kwargs)
if timeout is None:
timeout = self.timeouts.page_load if self._has_driver else self.timeout
return super().get(url, show_errmsg, retry, interval, timeout, **kwargs)
def post(self, url, show_errmsg=False, retry=None, interval=None, **kwargs):
if self.mode == 'd':
@ -126,65 +110,55 @@ class MixTab(SessionPage, ChromiumTab, BasePage):
return super().post(url, show_errmsg, retry, interval, **kwargs)
def ele(self, locator, index=1, timeout=None):
if self._mode == 's':
return super().ele(locator, index=index)
elif self._mode == 'd':
return super(SessionPage, self).ele(locator, index=index, timeout=timeout)
return super(SessionPage, self).ele(locator, index=index, timeout=timeout) if self._d_mode \
else super().ele(locator, index=index)
def eles(self, locator, timeout=None):
if self._mode == 's':
return super().eles(locator)
elif self._mode == 'd':
return super(SessionPage, self).eles(locator, timeout=timeout)
return super(SessionPage, self).eles(locator, timeout=timeout) if self._d_mode else super().eles(locator)
def s_ele(self, locator=None, index=1):
if self._mode == 's':
return super().s_ele(locator, index=index)
elif self._mode == 'd':
return super(SessionPage, self).s_ele(locator, index=index)
return super(SessionPage, self).s_ele(locator,
index=index) if self._d_mode else super().s_ele(locator, index=index)
def s_eles(self, locator):
if self._mode == 's':
return super().s_eles(locator)
elif self._mode == 'd':
return super(SessionPage, self).s_eles(locator)
return super(SessionPage, self).s_eles(locator) if self._d_mode else super().s_eles(locator)
def change_mode(self, mode=None, go=True, copy_cookies=True):
if mode is not None and mode.lower() == self._mode:
if mode:
mode = mode.lower()
if mode is not None and ((mode == 'd' and self._d_mode) or (mode == 's' and not self._d_mode)):
return
self._mode = 's' if self._mode == 'd' else 'd'
self._d_mode = not self._d_mode
# s模式转d模式
if self._mode == 'd':
if self._driver is None:
if self._d_mode:
if self._driver is None: # todo: 优化这里的逻辑
tabs = self.browser.tab_ids
tid = self.tab_id if self.tab_id in tabs else tabs[0]
self._connect_browser(tid)
self._url = None if not self._has_driver else super(SessionPage, self).url
self._has_driver = True
if self._session_url:
if copy_cookies:
self.cookies_to_browser()
if go:
self.get(self._session_url)
return
# d模式转s模式
elif self._mode == 's':
self._has_session = True
self._url = self._session_url
self._has_session = True
self._url = self._session_url
if self._has_driver:
if copy_cookies:
self.cookies_to_session()
if self._has_driver:
if copy_cookies:
self.cookies_to_session()
if go:
url = super(SessionPage, self).url
if url.startswith('http'):
self.get(url)
if go:
url = super(SessionPage, self).url
if url.startswith('http'):
self.get(url)
def cookies_to_session(self, copy_user_agent=True):
if not self._has_session:
@ -202,10 +176,8 @@ class MixTab(SessionPage, ChromiumTab, BasePage):
set_tab_cookies(self, super().cookies())
def cookies(self, all_domains=False, all_info=False):
if self._mode == 's':
return super().cookies(all_domains, all_info)
elif self._mode == 'd':
return super(SessionPage, self).cookies(all_domains, all_info)
return super(SessionPage, self).cookies(all_domains, all_info) if self._d_mode \
else super().cookies(all_domains, all_info)
def close(self, others=False):
self.browser.close_tabs(self.tab_id, others=others)
@ -214,10 +186,8 @@ class MixTab(SessionPage, ChromiumTab, BasePage):
self._response.close()
def _find_elements(self, locator, timeout=None, index=1, relative=False, raise_err=None):
if self._mode == 's':
return super()._find_elements(locator, index=index)
elif self._mode == 'd':
return super(SessionPage, self)._find_elements(locator, timeout=timeout, index=index, relative=relative)
return super(SessionPage, self)._find_elements(locator, timeout=timeout, index=index, relative=relative) \
if self._d_mode else super()._find_elements(locator, index=index)
def __repr__(self):
return f'<MixTab browser_id={self.browser.id} tab_id={self.tab_id}>'

View File

@ -23,7 +23,7 @@ from .._units.waiter import MixTabWaiter
class MixTab(SessionPage, ChromiumTab):
_tab: MixTab = ...
_mode: str = ...
_d_mode: bool = ...
_has_driver: bool = ...
_has_session: bool = ...
_set: MixTabSetter = ...

View File

@ -5,7 +5,7 @@ from time import perf_counter, sleep
class Console(object):
def __init__(self, owner):
self.owner = owner
self._owner = owner
self.listening = False
self._caught = None
@ -20,27 +20,49 @@ class Console(object):
def start(self):
self._caught = Queue(maxsize=0)
self.owner._driver.set_callback("Console.messageAdded", self._console)
self.owner._run_cdp("Console.enable")
self._owner._driver.set_callback("Console.messageAdded", self._console)
self._owner._run_cdp("Console.enable")
self.listening = True
def stop(self):
if self.listening:
self.owner._run_cdp("Console.disable")
self.owner._driver.set_callback('Console.messageAdded', None)
self._owner._run_cdp("Console.disable")
self._owner._driver.set_callback('Console.messageAdded', None)
self.listening = False
def clear(self):
self._caught = Queue(maxsize=0)
def wait(self, timeout=None):
if not self.listening:
raise RuntimeError('监听未启动。')
if timeout is None:
while self._owner._driver.is_running and self.listening and not self._caught.qsize():
sleep(.03)
return self._caught.get_nowait() if self._caught.qsize() else None
else:
end = perf_counter() + timeout
while self._owner._driver.is_running and self.listening and perf_counter() < end:
if self._caught.qsize():
return self._caught.get_nowait()
sleep(0.05)
return False
def steps(self, timeout=None):
end = perf_counter() + timeout if timeout else None
while self.owner._driver.is_running:
if timeout and perf_counter() > end:
return
if self._caught.qsize():
yield self._caught.get_nowait()
sleep(0.05)
if timeout is None:
while self._owner._driver.is_running and self.listening:
if self._caught.qsize():
yield self._caught.get_nowait()
sleep(0.05)
else:
end = perf_counter() + timeout
while self._owner._driver.is_running and self.listening and perf_counter() < end:
if self._caught.qsize():
yield self._caught.get_nowait()
sleep(0.05)
return False
def _console(self, **kwargs):
self._caught.put(ConsoleData(kwargs['message']))

View File

@ -1,13 +1,13 @@
# -*- coding:utf-8 -*-
from queue import Queue
from typing import Optional, Iterable, List
from typing import Optional, Iterable, List, Union
from .._pages.chromium_base import ChromiumBase
class Console(object):
listening: bool = ...
owner: ChromiumBase = ...
_owner: ChromiumBase = ...
_caught: Optional[Queue] = ...
def __init__(self, owner: ChromiumBase) -> None:
@ -33,6 +33,13 @@ class Console(object):
"""清空已获取但未返回的信息"""
...
def wait(self, timeout: float = None) -> Union[ConsoleData, False]:
"""等待一条信息
:param timeout: 超时时间
:return: ConsoleData对象
"""
...
def steps(self, timeout: Optional[float] = None) -> Iterable[ConsoleData]:
"""每监听到一个信息就返回用于for循环
:param timeout: 等待一个信息的超时时间为None无限等待

View File

@ -99,13 +99,13 @@ class Listener(object):
if not self.listening:
raise RuntimeError('监听未启动或已暂停。')
if not timeout:
while self._driver.is_running and self._caught.qsize() < count:
while self._driver.is_running and self.listening and self._caught.qsize() < count:
sleep(.03)
fail = False
else:
end = perf_counter() + timeout
while self._driver.is_running:
while self._driver.is_running and self.listening:
if perf_counter() > end:
fail = True
break
@ -132,19 +132,28 @@ class Listener(object):
if not self.listening:
raise RuntimeError('监听未启动或已暂停。')
caught = 0
end = perf_counter() + timeout if timeout else None
while self._driver.is_running:
if timeout and perf_counter() > end:
return
if self._caught.qsize() >= gap:
yield self._caught.get_nowait() if gap == 1 else [self._caught.get_nowait() for _ in range(gap)]
if timeout:
if timeout is None:
while self._driver.is_running and self.listening:
if self._caught.qsize() >= gap:
yield self._caught.get_nowait() if gap == 1 else [self._caught.get_nowait() for _ in range(gap)]
if count:
caught += gap
if caught >= count:
return
sleep(.03)
else:
end = perf_counter() + timeout
while self._driver.is_running and self.listening and perf_counter() < end:
if self._caught.qsize() >= gap:
yield self._caught.get_nowait() if gap == 1 else [self._caught.get_nowait() for _ in range(gap)]
end = perf_counter() + timeout
if count:
caught += gap
if caught >= count:
return
sleep(.03)
if count:
caught += gap
if caught >= count:
return
sleep(.03)
return False
def stop(self):
if self.listening: