mirror of
https://gitee.com/g1879/DrissionPage.git
synced 2024-12-10 04:00:23 +08:00
未完成
This commit is contained in:
parent
062ba54118
commit
60930597df
@ -112,8 +112,11 @@ class ChromiumElement(DrissionElement):
|
||||
@property
|
||||
def size(self) -> dict:
|
||||
"""返回元素宽和高"""
|
||||
model = self.page.driver.DOM.getBoxModel(nodeId=self._node_id)['model']
|
||||
return {'height': model['height'], 'width': model['width']}
|
||||
try:
|
||||
model = self.page.driver.DOM.getBoxModel(nodeId=self._node_id)['model']
|
||||
return {'height': model['height'], 'width': model['width']}
|
||||
except Exception:
|
||||
return {'height': 0, 'width': 0}
|
||||
|
||||
@property
|
||||
def location(self) -> dict:
|
||||
|
@ -22,80 +22,133 @@ from .chromium_element import ChromiumElement, ChromeScroll, _run_script, Chrome
|
||||
class ChromiumPage(BasePage):
|
||||
"""用于管理浏览器的类"""
|
||||
|
||||
def __init__(self, Tab_or_Options: Union[Tab, DriverOptions] = None,
|
||||
def __init__(self, addr_tab_opts: Union[str, Tab, DriverOptions] = None,
|
||||
tab_id: str = None,
|
||||
timeout: float = 10):
|
||||
"""初始化 \n
|
||||
:param Tab_or_Options: Tab对象或DriverOptions对象
|
||||
timeout: float = None):
|
||||
"""初始化 \n
|
||||
:param addr_tab_opts: 浏览器地址:端口、Tab对象或DriverOptions对象
|
||||
:param tab_id: 要控制的标签页id,不指定默认为激活的
|
||||
:param timeout: 超时时间
|
||||
"""
|
||||
super().__init__(timeout)
|
||||
self._is_loading = None
|
||||
self._root_id = None
|
||||
self._connect_browser(Tab_or_Options, tab_id)
|
||||
self._connect_browser(addr_tab_opts, tab_id)
|
||||
|
||||
def _connect_browser(self, Tab_or_Options: Union[Tab, DriverOptions] = None,
|
||||
def _connect_browser(self, addr_tab_opts: Union[str, Tab, DriverOptions] = None,
|
||||
tab_id: str = None) -> None:
|
||||
"""连接浏览器 \n
|
||||
:param Tab_or_Options: Tab对象或DriverOptions对象
|
||||
"""连接浏览器,在第一次时运行 \n
|
||||
:param addr_tab_opts: Tab对象或DriverOptions对象
|
||||
:param tab_id: 要控制的标签页id,不指定默认为激活的
|
||||
:return: None
|
||||
"""
|
||||
self._is_loading = False
|
||||
self._root_id = None
|
||||
self.timeouts = Timeout(self)
|
||||
self._page_load_strategy = 'normal'
|
||||
if isinstance(Tab_or_Options, Tab):
|
||||
self._driver = Tab_or_Options
|
||||
self.address = search(r'ws://(.*?)/dev', Tab_or_Options._websocket_url).group(1)
|
||||
self.options = None
|
||||
self.process = None
|
||||
|
||||
elif isinstance(Tab_or_Options, DriverOptions):
|
||||
self.options = Tab_or_Options or DriverOptions() # 从ini文件读取
|
||||
self.set_timeouts(page_load=self.options.timeouts['pageLoad'],
|
||||
script=self.options.timeouts['script'])
|
||||
self._page_load_strategy = self.options.page_load_strategy
|
||||
self.process = connect_chrome(self.options)[1]
|
||||
# 接管或启动浏览器
|
||||
if addr_tab_opts is None or isinstance(addr_tab_opts, DriverOptions):
|
||||
self.options = addr_tab_opts or DriverOptions() # 从ini文件读取
|
||||
self.address = self.options.debugger_address
|
||||
self.process = connect_chrome(self.options)[1]
|
||||
self._set_options()
|
||||
json = loads(requests_get(f'http://{self.address}/json').text)
|
||||
tab_id = [i['id'] for i in json if i['type'] == 'page'][0]
|
||||
self._init_page(tab_id)
|
||||
|
||||
# 接收浏览器地址和端口
|
||||
elif isinstance(addr_tab_opts, str):
|
||||
self.address = addr_tab_opts
|
||||
self.options = DriverOptions(read_file=False)
|
||||
self.process = connect_chrome(self.options)[1]
|
||||
self._set_options()
|
||||
if not tab_id:
|
||||
json = loads(requests_get(f'http://{self.address}/json').text)
|
||||
tab_id = [i['id'] for i in json if i['type'] == 'page'][0]
|
||||
self._init_page(tab_id)
|
||||
|
||||
# 接收传递过来的Tab,浏览器
|
||||
elif isinstance(addr_tab_opts, Tab):
|
||||
self._driver = addr_tab_opts
|
||||
self.address = search(r'ws://(.*?)/dev', addr_tab_opts._websocket_url).group(1)
|
||||
self.process = None
|
||||
self.options = DriverOptions(read_file=False)
|
||||
self._set_options()
|
||||
self._init_page(None)
|
||||
|
||||
else:
|
||||
raise TypeError('只能接收Tab或DriverOptions类型参数。')
|
||||
|
||||
if not tab_id:
|
||||
json = loads(requests_get(f'http://{self.address}/json').text)
|
||||
tab_id = [i['id'] for i in json if i['type'] == 'page'][0]
|
||||
self._driver = Tab(id=tab_id, type='page',
|
||||
webSocketDebuggerUrl=f'ws://{self.address}/devtools/page/{tab_id}')
|
||||
self._alert = Alert()
|
||||
self._driver.Page.javascriptDialogOpening = self._on_alert_open
|
||||
self._driver.Page.javascriptDialogClosed = self._on_alert_close
|
||||
|
||||
self._driver.Page.frameNavigated = self._onFrameNavigated
|
||||
self._driver.Page.loadEventFired = self._onLoadEventFired
|
||||
self._driver.DOM.documentUpdated = self._onDocumentUpdated
|
||||
|
||||
def _init_page(self, tab_id: str = None) -> None:
|
||||
"""新建页面、页面刷新、切换标签页后要进行的cdp参数初始化
|
||||
:param tab_id: 要跳转到的标签页id
|
||||
:return: None
|
||||
"""
|
||||
print('init page')
|
||||
self._is_loading = True
|
||||
if tab_id:
|
||||
print('new tab')
|
||||
self._driver = Tab(id=tab_id, type='page',
|
||||
webSocketDebuggerUrl=f'ws://{self.address}/devtools/page/{tab_id}')
|
||||
|
||||
self._driver.start()
|
||||
self._driver.DOM.enable()
|
||||
self._driver.Page.enable()
|
||||
root_id = self._driver.DOM.getDocument()['root']['nodeId']
|
||||
self._root_id = self._driver.DOM.resolveNode(nodeId=root_id)['object']['objectId']
|
||||
|
||||
self._alert = Alert()
|
||||
self._driver.Page.javascriptDialogOpening = self._on_alert_open
|
||||
self._driver.Page.javascriptDialogClosed = self._on_alert_close
|
||||
|
||||
self._driver.Page.frameNavigated = self.onFrameNavigated
|
||||
self._driver.Page.loadEventFired = self.onLoadEventFired
|
||||
# todo:增加onDocumentUpdated
|
||||
|
||||
def onLoadEventFired(self, **kwargs):
|
||||
"""在页面刷新、变化后重新读取页面内容"""
|
||||
self._is_loading = True
|
||||
self._driver.DOM.enable()
|
||||
self._driver.Page.enable()
|
||||
root_id = self._driver.DOM.getDocument()['root']['nodeId']
|
||||
self._root_id = self._driver.DOM.resolveNode(nodeId=root_id)['object']['objectId']
|
||||
self._wait_loading()
|
||||
self._is_loading = False
|
||||
|
||||
def onFrameNavigated(self, **kwargs):
|
||||
def _set_options(self) -> None:
|
||||
self.set_timeouts(page_load=self.options.timeouts['pageLoad'] / 1000,
|
||||
script=self.options.timeouts['script'] / 1000,
|
||||
implicit=self.options.timeouts['implicit'] / 1000 if self.timeout is None else self.timeout)
|
||||
self._page_load_strategy = self.options.page_load_strategy
|
||||
|
||||
def _wait_loading(self, timeout: float = None) -> bool:
|
||||
"""等待页面加载完成
|
||||
:param timeout: 超时时间
|
||||
:return: 是否成功,超时返回False
|
||||
"""
|
||||
timeout = timeout if timeout is not None else self.timeouts.page_load
|
||||
|
||||
end_time = perf_counter() + timeout
|
||||
while perf_counter() < end_time:
|
||||
state = self._driver.Runtime.evaluate(expression='document.readyState;')['result']['value']
|
||||
if state == 'complete':
|
||||
return True
|
||||
elif self.page_load_strategy == 'eager' and state in ('interactive', 'complete'):
|
||||
self._driver.Page.stopLoading()
|
||||
return True
|
||||
elif self.page_load_strategy == 'none':
|
||||
self._driver.Page.stopLoading()
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
def _onLoadEventFired(self, **kwargs):
|
||||
"""在页面刷新、变化后重新读取页面内容"""
|
||||
# self._is_loading = True
|
||||
print('load complete')
|
||||
self._init_page(self._driver.id)
|
||||
|
||||
def _onFrameNavigated(self, **kwargs):
|
||||
print('nav')
|
||||
# todo: 考虑frame的情况,修改别的判断方式
|
||||
if not kwargs['frame'].get('parentId', None):
|
||||
self._is_loading = True
|
||||
|
||||
def _onDocumentUpdated(self, **kwargs):
|
||||
print('doc')
|
||||
|
||||
def __call__(self, loc_or_str: Union[Tuple[str, str], str, 'ChromiumElement'],
|
||||
timeout: float = None) -> Union['ChromiumElement', None]:
|
||||
"""在内部查找元素 \n
|
||||
@ -110,6 +163,7 @@ class ChromiumPage(BasePage):
|
||||
def driver(self) -> Tab:
|
||||
"""返回用于控制浏览器的Tab对象"""
|
||||
while self._is_loading:
|
||||
print('loading')
|
||||
sleep(.1)
|
||||
return self._driver
|
||||
|
||||
@ -537,12 +591,13 @@ class ChromiumPage(BasePage):
|
||||
tab = num_or_id
|
||||
|
||||
tab = self.tab_ids[tab] if isinstance(tab, int) else tab
|
||||
# self.driver.stop()
|
||||
self._connect_browser(self.driver, tab)
|
||||
|
||||
if activate:
|
||||
requests_get(f'http://{self.address}/json/activate/{tab}')
|
||||
|
||||
self.driver.stop()
|
||||
self._init_page(tab)
|
||||
|
||||
def to_front(self) -> None:
|
||||
"""激活当前标签页使其处于最前面"""
|
||||
d = self.driver
|
||||
@ -664,16 +719,7 @@ class ChromiumPage(BasePage):
|
||||
for _ in range(times + 1):
|
||||
result = self.driver.Page.navigate(url=to_url)
|
||||
|
||||
is_timeout = True
|
||||
end_time = perf_counter() + timeout
|
||||
while perf_counter() < end_time:
|
||||
if ((self.page_load_strategy == 'normal' and self.ready_state == 'complete')
|
||||
or (self.page_load_strategy == 'eager' and self.ready_state in ('interactive', 'complete'))
|
||||
or (self.page_load_strategy == 'none' and self.ready_state
|
||||
in ('loading', 'interactive', 'complete'))):
|
||||
self.stop_loading()
|
||||
is_timeout = False
|
||||
break
|
||||
is_timeout = not self._wait_loading(timeout)
|
||||
|
||||
if is_timeout:
|
||||
raise TimeoutError('页面连接超时。')
|
||||
|
@ -380,8 +380,7 @@ class WebPage(SessionPage, ChromiumPage, BasePage):
|
||||
self._driver_options = DriverOptions(read_file=False)
|
||||
|
||||
elif isinstance(Tab_or_Options, Tab):
|
||||
self._driver = Tab_or_Options
|
||||
self._connect_browser(Tab_or_Options.id)
|
||||
self._connect_browser(Tab_or_Options)
|
||||
self._has_driver = True
|
||||
|
||||
elif isinstance(Tab_or_Options, DriverOptions):
|
||||
|
Loading…
x
Reference in New Issue
Block a user