继续完善ChromiumPage

This commit is contained in:
g1879 2022-11-16 18:33:01 +08:00
parent 60930597df
commit a27e4c8fb6
2 changed files with 107 additions and 118 deletions

View File

@ -6,7 +6,7 @@ from time import perf_counter, sleep
from typing import Union, Tuple, List, Any
from pychrome import Tab
from requests import get as requests_get
from requests import Session
from json import loads
from requests.cookies import RequestsCookieJar
@ -45,6 +45,10 @@ class ChromiumPage(BasePage):
self._is_loading = False
self._root_id = None
self.timeouts = Timeout(self)
self._ss = Session()
self._ss.keep_alive = False
self._alert = Alert()
self._first_run = True
# 接管或启动浏览器
if addr_tab_opts is None or isinstance(addr_tab_opts, DriverOptions):
@ -52,20 +56,25 @@ class ChromiumPage(BasePage):
self.address = self.options.debugger_address
self.process = connect_chrome(self.options)[1]
self._set_options()
json = loads(requests_get(f'http://{self.address}/json').text)
json = loads(self._ss.get(f'http://{self.address}/json').text)
tab_id = [i['id'] for i in json if i['type'] == 'page'][0]
self._init_page(tab_id)
self._get_document()
self._first_run = False
# 接收浏览器地址和端口
elif isinstance(addr_tab_opts, str):
self.address = addr_tab_opts
self.options = DriverOptions(read_file=False)
self.options.debugger_address = addr_tab_opts
self.process = connect_chrome(self.options)[1]
self._set_options()
if not tab_id:
json = loads(requests_get(f'http://{self.address}/json').text)
json = loads(self._ss.get(f'http://{self.address}/json').text)
tab_id = [i['id'] for i in json if i['type'] == 'page'][0]
self._init_page(tab_id)
self._get_document()
self._first_run = False
# 接收传递过来的Tab浏览器
elif isinstance(addr_tab_opts, Tab):
@ -75,11 +84,26 @@ class ChromiumPage(BasePage):
self.options = DriverOptions(read_file=False)
self._set_options()
self._init_page(None)
self._get_document()
self._first_run = False
else:
raise TypeError('只能接收Tab或DriverOptions类型参数。')
self._alert = Alert()
def _init_page(self, tab_id: str = None) -> None:
"""新建页面、页面刷新、切换标签页后要进行的cdp参数初始化
:param tab_id: 要跳转到的标签页id
:return: None
"""
self._is_loading = True
if tab_id:
self._driver = Tab(id=tab_id, type='page',
webSocketDebuggerUrl=f'ws://{self.address}/devtools/page/{tab_id}')
self._driver.start()
self._driver.DOM.enable()
self._driver.Page.enable()
self._driver.Page.javascriptDialogOpening = self._on_alert_open
self._driver.Page.javascriptDialogClosed = self._on_alert_close
@ -87,38 +111,21 @@ class ChromiumPage(BasePage):
self._driver.Page.loadEventFired = self._onLoadEventFired
self._driver.DOM.documentUpdated = self._onDocumentUpdated
def _init_page(self, tab_id: str = None) -> None:
"""新建页面、页面刷新、切换标签页后要进行的cdp参数初始化
:param tab_id: 要跳转到的标签页id
:return: None
"""
print('init page')
self._is_loading = True
if tab_id:
print('new tab')
self._driver = Tab(id=tab_id, type='page',
webSocketDebuggerUrl=f'ws://{self.address}/devtools/page/{tab_id}')
self._driver.start()
self._driver.DOM.enable()
self._driver.Page.enable()
def _get_document(self) -> None:
"""刷新cdp使用的document数据"""
print('get doc')
self._wait_loading()
root_id = self._driver.DOM.getDocument()['root']['nodeId']
self._root_id = self._driver.DOM.resolveNode(nodeId=root_id)['object']['objectId']
self._wait_loading()
self._is_loading = False
def _set_options(self) -> None:
self.set_timeouts(page_load=self.options.timeouts['pageLoad'] / 1000,
script=self.options.timeouts['script'] / 1000,
implicit=self.options.timeouts['implicit'] / 1000 if self.timeout is None else self.timeout)
self._page_load_strategy = self.options.page_load_strategy
def _wait_loading(self, timeout: float = None) -> bool:
"""等待页面加载完成
:param timeout: 超时时间
:return: 是否成功超时返回False
"""
timeout = timeout if timeout is not None else self.timeouts.page_load
# timeout = timeout if timeout is not None else self.timeouts.page_load
timeout = 100
end_time = perf_counter() + timeout
while perf_counter() < end_time:
@ -136,9 +143,9 @@ class ChromiumPage(BasePage):
def _onLoadEventFired(self, **kwargs):
"""在页面刷新、变化后重新读取页面内容"""
# self._is_loading = True
print('load complete')
self._init_page(self._driver.id)
if self._first_run is False:
self._get_document()
def _onFrameNavigated(self, **kwargs):
print('nav')
@ -147,7 +154,14 @@ class ChromiumPage(BasePage):
self._is_loading = True
def _onDocumentUpdated(self, **kwargs):
print('doc')
# print('doc')
pass
def _set_options(self) -> None:
self.set_timeouts(page_load=self.options.timeouts['pageLoad'] / 1000,
script=self.options.timeouts['script'] / 1000,
implicit=self.options.timeouts['implicit'] / 1000 if self.timeout is None else self.timeout)
self._page_load_strategy = self.options.page_load_strategy
def __call__(self, loc_or_str: Union[Tuple[str, str], str, 'ChromiumElement'],
timeout: float = None) -> Union['ChromiumElement', None]:
@ -163,7 +177,7 @@ class ChromiumPage(BasePage):
def driver(self) -> Tab:
"""返回用于控制浏览器的Tab对象"""
while self._is_loading:
print('loading')
# print('loading')
sleep(.1)
return self._driver
@ -171,7 +185,7 @@ class ChromiumPage(BasePage):
def url(self) -> str:
"""返回当前页面url"""
tab_id = self.driver.id # 用于WebPage时激活浏览器
json = loads(requests_get(f'http://{self.address}/json').text)
json = loads(self._ss.get(f'http://{self.address}/json').text)
return [i['url'] for i in json if i['id'] == tab_id][0]
@property
@ -188,25 +202,20 @@ class ChromiumPage(BasePage):
@property
def tabs_count(self) -> int:
"""返回标签页数量"""
return len(self.tab_ids)
return len(self.tabs)
@property
def tab_ids(self) -> list:
def tabs(self) -> list:
"""返回所有标签页id"""
d = self.driver
json = loads(requests_get(f'http://{self.address}/json').text)
json = loads(self._ss.get(f'http://{self.address}/json').text)
return [i['id'] for i in json if i['type'] == 'page']
@property
def current_tab_id(self) -> str:
def tab_id(self) -> str:
"""返回当前标签页id"""
return self.driver.id
@property
def current_tab_index(self) -> int:
"""返回当前标签页序号"""
return self.tab_ids.index(self.current_tab_id)
@property
def ready_state(self) -> str:
"""返回当前页面加载状态,'loading' 'interactive' 'complete'"""
@ -364,8 +373,8 @@ class ChromiumPage(BasePage):
"""
return self._ele(loc_or_ele, timeout=timeout, single=False)
def s_ele(self, loc_or_ele: Union[Tuple[str, str], str, ChromiumElement] = None) -> Union[
SessionElement, str, None]:
def s_ele(self, loc_or_ele: Union[Tuple[str, str], str, ChromiumElement] = None) \
-> Union[SessionElement, str, None]:
"""查找第一个符合条件的元素以SessionElement形式返回处理复杂页面时效率很高 \n
:param loc_or_ele: 元素的定位信息可以是loc元组或查询字符串
:return: SessionElement对象或属性文本
@ -569,6 +578,10 @@ class ChromiumPage(BasePage):
js = f'localStorage.removeItem("{item}");' if item is False else f'localStorage.setItem("{item}","{value}");'
return self.run_script(js, as_expr=True)
def to_front(self) -> None:
"""激活当前标签页使其处于最前面"""
self._ss.get(f'http://{self.address}/json/activate/{self.tab_id}')
def new_tab(self, url: str = None) -> None:
"""新建并定位到一个标签页,该标签页在最后面 \n
:param url: 新标签页跳转到的网址
@ -576,70 +589,62 @@ class ChromiumPage(BasePage):
"""
d = self.driver
url = f'?{url}' if url else ''
requests_get(f'http://{self.address}/json/new{url}')
self._ss.get(f'http://{self.address}/json/new{url}')
self.to_tab()
def to_tab(self, num_or_id: Union[int, str] = 0, activate: bool = True) -> None:
"""跳转到标签页 \n
注意当程序使用的是接管的浏览器获取到的 id 顺序和视觉效果不一致 \n
:param num_or_id: 标签页序号或id字符串序号第一个为0最后为-1
def to_tab(self, tab_id: str = None, activate: bool = True) -> None:
"""跳转到标签页 \n
:param tab_id: 标签页id字符串默认跳转到活动状态的
:param activate: 切换后是否变为活动状态
:return: None
"""
try:
tab = int(num_or_id)
except (ValueError, TypeError):
tab = num_or_id
tab = self.tab_ids[tab] if isinstance(tab, int) else tab
tabs = self.tabs
if not tab_id:
tab_id = tabs[0]
if tab_id == self.tab_id or tab_id not in tabs:
return
if activate:
requests_get(f'http://{self.address}/json/activate/{tab}')
self._ss.get(f'http://{self.address}/json/activate/{tab_id}')
self.driver.stop()
self._init_page(tab)
self._driver.stop()
self._init_page(tab_id)
state = self._driver.Runtime.evaluate(expression='document.readyState;')['result']['value']
if state == 'complete':
self._get_document()
def to_front(self) -> None:
"""激活当前标签页使其处于最前面"""
d = self.driver
requests_get(f'http://{self.address}/json/activate/{self.current_tab_id}')
def close_tabs(self, num_or_ids: Union[int, str, list, tuple, set] = None, others: bool = False) -> None:
def close_tabs(self, tab_ids: Union[str, List[str], Tuple[str]] = None, others: bool = False) -> None:
"""关闭传入的标签页,默认关闭当前页。可传入多个 \n
注意当程序使用的是接管的浏览器获取到的 id 顺序和视觉效果不一致不能按序号关闭 \n
:param num_or_ids:要关闭的标签页序号或id可传入id和序号组成的列表或元组为None时关闭当前页
:param tab_ids: 要关闭的标签页id可传入id组成的列表或元组为None时关闭当前页
:param others: 是否关闭指定标签页之外的
:return: None
"""
all_tabs = set(self.tabs)
tabs = set(tab_ids) if tab_ids else {self.tab_id}
if others:
all_tabs = self.tab_ids
reserve_tabs = {self.current_tab_id} if num_or_ids is None else _get_tabs(all_tabs, num_or_ids)
tabs = set(all_tabs) - reserve_tabs
else:
tabs = (self.current_tab_id,) if num_or_ids is None else _get_tabs(self.tab_ids, num_or_ids)
tabs = all_tabs - tabs
tabs_len = len(tabs)
all_len = len(self.tab_ids)
if tabs_len > all_len:
raise ValueError('要关闭的页面数量不能大于总数量。')
end_len = len(all_tabs) - len(tabs)
if end_len <= 0:
self.quit()
return
is_alive = True
if tabs_len == all_len:
self.driver.stop()
is_alive = False
if self.tab_id in tabs:
self._driver.stop()
for tab in tabs:
requests_get(f'http://{self.address}/json/close/{tab}')
self._ss.get(f'http://{self.address}/json/close/{tab}')
while len(self.tabs) != end_len:
pass
if is_alive:
self.to_tab(0)
self.to_tab()
def close_other_tabs(self, num_or_ids: Union[int, str, list, tuple] = None) -> None:
def close_other_tabs(self, tab_ids: Union[str, List[str], Tuple[str]] = None) -> None:
"""关闭传入的标签页以外标签页,默认保留当前页。可传入多个 \n
注意当程序使用的是接管的浏览器获取到的 id 顺序和视觉效果不一致不能按序号关闭 \n
:param num_or_ids: 要保留的标签页序号或id可传入id和序号组成的列表或元组为None时保存当前页
:param tab_ids: 要保留的标签页id可传入id组成的列表或元组为None时保存当前页
:return: None
"""
self.close_tabs(num_or_ids, True)
self.close_tabs(tab_ids, True)
def clear_cache(self,
session_storage: bool = True,
@ -692,11 +697,10 @@ class ChromiumPage(BasePage):
"""显示浏览器窗口只在Windows系统可用"""
_show_or_hide_browser(self, hide=False)
def check_page(self) -> Union[bool, None]:
"""检查页面是否符合预期 \n
由子类自行实现各页面的判定规则
"""
return None
def quit(self) -> None:
"""关闭浏览器"""
self._driver.Browser.close()
self._driver.stop()
def _d_connect(self,
to_url: str,
@ -713,22 +717,20 @@ class ChromiumPage(BasePage):
:return: 是否成功返回None表示不确定
"""
err = None
is_ok = False
timeout = timeout if timeout is not None else self.timeouts.page_load
for _ in range(times + 1):
result = self.driver.Page.navigate(url=to_url)
is_timeout = not self._wait_loading(timeout)
if is_timeout:
raise TimeoutError('页面连接超时。')
err = TimeoutError('页面连接超时。')
continue
if 'errorText' in result:
raise ConnectionError(result['errorText'])
err = ConnectionError(result['errorText'])
continue
is_ok = self.check_page()
if is_ok is not False:
if not err:
break
if _ < times:
@ -736,10 +738,10 @@ class ChromiumPage(BasePage):
if show_errmsg:
print(f'重试 {to_url}')
if is_ok is False and show_errmsg:
if err and show_errmsg:
raise err if err is not None else ConnectionError('连接异常。')
return is_ok
return False if err else True
def _on_alert_close(self, **kwargs):
"""alert关闭时触发的方法"""
@ -845,20 +847,6 @@ class WindowSizeSetter(object):
self.driver.Browser.setWindowBounds(windowId=self.window_id, bounds=bounds)
def _get_tabs(ids: list, num_or_ids: Union[int, str, list, tuple, set]) -> set:
"""返回指定标签页id组成的set
:param ids: 所有页面id组成的列表
:param num_or_ids: 指定的标签页可以是多个
:return: 指定标签页组成的set
"""
if isinstance(num_or_ids, (int, str)):
num_or_ids = (num_or_ids,)
elif not isinstance(num_or_ids, (list, tuple, set)):
raise TypeError('num_or_id参数只能是int、str、list、set 或 tuple类型。')
return set(i if isinstance(i, str) else ids[i] for i in num_or_ids)
def _show_or_hide_browser(page: ChromiumPage, hide: bool = True) -> None:
"""执行显示或隐藏浏览器窗口
:param page: ChromePage对象

View File

@ -464,6 +464,7 @@ class DriverOptions(Options):
self._driver_path = None
self._user_data_path = None
self.ini_path = None
self.timeouts = {'implicit': 10, 'pageLoad': 30, 'script': 30}
if read_file:
self.ini_path = ini_path or str(Path(__file__).parent / 'configs.ini')
@ -647,14 +648,14 @@ class DriverOptions(Options):
:param script: 脚本运行超时时间
:return: 当前对象
"""
timeouts = self._caps.get('timeouts', {'implicit': 10, 'pageLoad': 3000, 'script': 3000})
# timeouts = self._caps.get('timeouts', {'implicit': 10, 'pageLoad': 3000, 'script': 3000})
if implicit is not None:
timeouts['implicit'] = implicit
self.timeouts['implicit'] = implicit
if pageLoad is not None:
timeouts['pageLoad'] = pageLoad * 1000
self.timeouts['pageLoad'] = pageLoad * 1000
if script is not None:
timeouts['script'] = script * 1000
self.timeouts = timeouts
self.timeouts['script'] = script * 1000
# self.timeouts = timeouts
return self