mirror of
https://gitee.com/g1879/DrissionPage.git
synced 2024-12-10 04:00:23 +08:00
完善driver设计
This commit is contained in:
parent
ea8321990c
commit
fab61bff85
@ -45,8 +45,8 @@ class ChromiumPage(BasePage):
|
|||||||
self._is_loading = False
|
self._is_loading = False
|
||||||
self._root_id = None
|
self._root_id = None
|
||||||
self.timeouts = Timeout(self)
|
self.timeouts = Timeout(self)
|
||||||
self._ss = Session()
|
self._control_session = Session()
|
||||||
self._ss.keep_alive = False
|
self._control_session.keep_alive = False
|
||||||
self._alert = Alert()
|
self._alert = Alert()
|
||||||
self._first_run = True
|
self._first_run = True
|
||||||
|
|
||||||
@ -56,7 +56,7 @@ class ChromiumPage(BasePage):
|
|||||||
self.address = self.options.debugger_address
|
self.address = self.options.debugger_address
|
||||||
self.process = connect_chrome(self.options)[1]
|
self.process = connect_chrome(self.options)[1]
|
||||||
self._set_options()
|
self._set_options()
|
||||||
json = loads(self._ss.get(f'http://{self.address}/json').text)
|
json = loads(self._control_session.get(f'http://{self.address}/json').text)
|
||||||
tab_id = [i['id'] for i in json if i['type'] == 'page'][0]
|
tab_id = [i['id'] for i in json if i['type'] == 'page'][0]
|
||||||
self._init_page(tab_id)
|
self._init_page(tab_id)
|
||||||
self._get_document()
|
self._get_document()
|
||||||
@ -70,7 +70,7 @@ class ChromiumPage(BasePage):
|
|||||||
self.process = connect_chrome(self.options)[1]
|
self.process = connect_chrome(self.options)[1]
|
||||||
self._set_options()
|
self._set_options()
|
||||||
if not tab_id:
|
if not tab_id:
|
||||||
json = loads(self._ss.get(f'http://{self.address}/json').text)
|
json = loads(self._control_session.get(f'http://{self.address}/json').text)
|
||||||
tab_id = [i['id'] for i in json if i['type'] == 'page'][0]
|
tab_id = [i['id'] for i in json if i['type'] == 'page'][0]
|
||||||
self._init_page(tab_id)
|
self._init_page(tab_id)
|
||||||
self._get_document()
|
self._get_document()
|
||||||
@ -144,7 +144,7 @@ class ChromiumPage(BasePage):
|
|||||||
|
|
||||||
def _onLoadEventFired(self, **kwargs):
|
def _onLoadEventFired(self, **kwargs):
|
||||||
"""在页面刷新、变化后重新读取页面内容"""
|
"""在页面刷新、变化后重新读取页面内容"""
|
||||||
# print('load complete')
|
print('load complete')
|
||||||
if self._first_run is False and self._is_loading:
|
if self._first_run is False and self._is_loading:
|
||||||
self._get_document()
|
self._get_document()
|
||||||
|
|
||||||
@ -152,11 +152,11 @@ class ChromiumPage(BasePage):
|
|||||||
"""页面跳转时触发"""
|
"""页面跳转时触发"""
|
||||||
# todo: 考虑frame的情况,修改别的判断方式
|
# todo: 考虑frame的情况,修改别的判断方式
|
||||||
if not kwargs['frame'].get('parentId', None):
|
if not kwargs['frame'].get('parentId', None):
|
||||||
# print('nav')
|
print('nav')
|
||||||
self._is_loading = True
|
self._is_loading = True
|
||||||
|
|
||||||
def _onDocumentUpdated(self, **kwargs):
|
def _onDocumentUpdated(self, **kwargs):
|
||||||
# print('doc')
|
print('doc')
|
||||||
pass
|
pass
|
||||||
|
|
||||||
def _set_options(self) -> None:
|
def _set_options(self) -> None:
|
||||||
@ -179,23 +179,28 @@ class ChromiumPage(BasePage):
|
|||||||
@property
|
@property
|
||||||
def driver(self) -> Tab:
|
def driver(self) -> Tab:
|
||||||
"""返回用于控制浏览器的Tab对象"""
|
"""返回用于控制浏览器的Tab对象"""
|
||||||
|
return self._driver
|
||||||
|
|
||||||
|
@property
|
||||||
|
def _wait_driver(self) -> Tab:
|
||||||
|
"""返回用于控制浏览器的Tab对象,会先等待页面加载完毕"""
|
||||||
while self._is_loading:
|
while self._is_loading:
|
||||||
# print('loading')
|
print('loading')
|
||||||
sleep(.1)
|
sleep(.1)
|
||||||
return self._driver
|
return self._driver
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def url(self) -> str:
|
def url(self) -> str:
|
||||||
"""返回当前页面url"""
|
"""返回当前页面url"""
|
||||||
tab_id = self.driver.id # 用于WebPage时激活浏览器
|
tab_id = self._wait_driver.id # 用于WebPage时激活浏览器
|
||||||
json = loads(self._ss.get(f'http://{self.address}/json').text)
|
json = loads(self._control_session.get(f'http://{self.address}/json').text)
|
||||||
return [i['url'] for i in json if i['id'] == tab_id][0]
|
return [i['url'] for i in json if i['id'] == tab_id][0]
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def html(self) -> str:
|
def html(self) -> str:
|
||||||
"""返回当前页面html文本"""
|
"""返回当前页面html文本"""
|
||||||
node_id = self.driver.DOM.getDocument()['root']['nodeId']
|
node_id = self._wait_driver.DOM.getDocument()['root']['nodeId']
|
||||||
return self.driver.DOM.getOuterHTML(nodeId=node_id)['outerHTML']
|
return self._wait_driver.DOM.getOuterHTML(nodeId=node_id)['outerHTML']
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def json(self) -> dict:
|
def json(self) -> dict:
|
||||||
@ -210,14 +215,14 @@ class ChromiumPage(BasePage):
|
|||||||
@property
|
@property
|
||||||
def tabs(self) -> list:
|
def tabs(self) -> list:
|
||||||
"""返回所有标签页id"""
|
"""返回所有标签页id"""
|
||||||
d = self.driver
|
d = self._wait_driver
|
||||||
json = loads(self._ss.get(f'http://{self.address}/json').text)
|
json = loads(self._control_session.get(f'http://{self.address}/json').text)
|
||||||
return [i['id'] for i in json if i['type'] == 'page']
|
return [i['id'] for i in json if i['type'] == 'page']
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def tab_id(self) -> str:
|
def tab_id(self) -> str:
|
||||||
"""返回当前标签页id"""
|
"""返回当前标签页id"""
|
||||||
return self.driver.id
|
return self._wait_driver.id
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def ready_state(self) -> str:
|
def ready_state(self) -> str:
|
||||||
@ -245,7 +250,7 @@ class ChromiumPage(BasePage):
|
|||||||
def process_id(self) -> Union[None, int]:
|
def process_id(self) -> Union[None, int]:
|
||||||
"""返回浏览器进程id"""
|
"""返回浏览器进程id"""
|
||||||
try:
|
try:
|
||||||
return self.driver.SystemInfo.getProcessInfo()['id']
|
return self._wait_driver.SystemInfo.getProcessInfo()['id']
|
||||||
except Exception:
|
except Exception:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
@ -334,7 +339,7 @@ class ChromiumPage(BasePage):
|
|||||||
:param as_dict: 为True时返回由{name: value}键值对组成的dict
|
:param as_dict: 为True时返回由{name: value}键值对组成的dict
|
||||||
:return: cookies信息
|
:return: cookies信息
|
||||||
"""
|
"""
|
||||||
cookies = self.driver.Network.getCookies()['cookies']
|
cookies = self._wait_driver.Network.getCookies()['cookies']
|
||||||
if as_dict:
|
if as_dict:
|
||||||
return {cookie['name']: cookie['value'] for cookie in cookies}
|
return {cookie['name']: cookie['value'] for cookie in cookies}
|
||||||
else:
|
else:
|
||||||
@ -354,7 +359,7 @@ class ChromiumPage(BasePage):
|
|||||||
'name': cookie['name'],
|
'name': cookie['name'],
|
||||||
'domain': cookie['domain']}
|
'domain': cookie['domain']}
|
||||||
result_cookies.append(c)
|
result_cookies.append(c)
|
||||||
self.driver.Network.setCookies(cookies=result_cookies)
|
self._wait_driver.Network.setCookies(cookies=result_cookies)
|
||||||
|
|
||||||
def ele(self,
|
def ele(self,
|
||||||
loc_or_ele: Union[Tuple[str, str], str, ChromiumElement],
|
loc_or_ele: Union[Tuple[str, str], str, ChromiumElement],
|
||||||
@ -412,12 +417,12 @@ class ChromiumPage(BasePage):
|
|||||||
raise ValueError('loc_or_str参数只能是tuple、str、ChromeElement类型。')
|
raise ValueError('loc_or_str参数只能是tuple、str、ChromeElement类型。')
|
||||||
|
|
||||||
timeout = timeout if timeout is not None else self.timeout
|
timeout = timeout if timeout is not None else self.timeout
|
||||||
search_result = self.driver.DOM.performSearch(query=loc, includeUserAgentShadowDOM=True)
|
search_result = self._wait_driver.DOM.performSearch(query=loc, includeUserAgentShadowDOM=True)
|
||||||
count = search_result['resultCount']
|
count = search_result['resultCount']
|
||||||
|
|
||||||
end_time = perf_counter() + timeout
|
end_time = perf_counter() + timeout
|
||||||
while count == 0 and perf_counter() < end_time:
|
while count == 0 and perf_counter() < end_time:
|
||||||
search_result = self.driver.DOM.performSearch(query=loc, includeUserAgentShadowDOM=True)
|
search_result = self._wait_driver.DOM.performSearch(query=loc, includeUserAgentShadowDOM=True)
|
||||||
count = search_result['resultCount']
|
count = search_result['resultCount']
|
||||||
|
|
||||||
if count == 0:
|
if count == 0:
|
||||||
@ -425,7 +430,8 @@ class ChromiumPage(BasePage):
|
|||||||
|
|
||||||
else:
|
else:
|
||||||
count = 1 if single else count
|
count = 1 if single else count
|
||||||
nodeIds = self.driver.DOM.getSearchResults(searchId=search_result['searchId'], fromIndex=0, toIndex=count)
|
nodeIds = self._wait_driver.DOM.getSearchResults(searchId=search_result['searchId'], fromIndex=0,
|
||||||
|
toIndex=count)
|
||||||
if count == 1:
|
if count == 1:
|
||||||
return ChromiumElement(self, node_id=nodeIds['nodeIds'][0])
|
return ChromiumElement(self, node_id=nodeIds['nodeIds'][0])
|
||||||
else:
|
else:
|
||||||
@ -474,16 +480,17 @@ class ChromiumPage(BasePage):
|
|||||||
hw = self.size
|
hw = self.size
|
||||||
if full_page:
|
if full_page:
|
||||||
vp = {'x': 0, 'y': 0, 'width': hw['width'], 'height': hw['height'], 'scale': 1}
|
vp = {'x': 0, 'y': 0, 'width': hw['width'], 'height': hw['height'], 'scale': 1}
|
||||||
png = self.driver.Page.captureScreenshot(format=pic_type, captureBeyondViewport=True, clip=vp)['data']
|
png = self._wait_driver.Page.captureScreenshot(format=pic_type, captureBeyondViewport=True, clip=vp)['data']
|
||||||
else:
|
else:
|
||||||
if left_top and right_bottom:
|
if left_top and right_bottom:
|
||||||
x, y = left_top
|
x, y = left_top
|
||||||
w = right_bottom[0] - x
|
w = right_bottom[0] - x
|
||||||
h = right_bottom[1] - y
|
h = right_bottom[1] - y
|
||||||
vp = {'x': x, 'y': y, 'width': w, 'height': h, 'scale': 1}
|
vp = {'x': x, 'y': y, 'width': w, 'height': h, 'scale': 1}
|
||||||
png = self.driver.Page.captureScreenshot(format=pic_type, captureBeyondViewport=True, clip=vp)['data']
|
png = self._wait_driver.Page.captureScreenshot(format=pic_type, captureBeyondViewport=True, clip=vp)[
|
||||||
|
'data']
|
||||||
else:
|
else:
|
||||||
png = self.driver.Page.captureScreenshot(format=pic_type)['data']
|
png = self._wait_driver.Page.captureScreenshot(format=pic_type)['data']
|
||||||
|
|
||||||
from base64 import b64decode
|
from base64 import b64decode
|
||||||
png = b64decode(png)
|
png = b64decode(png)
|
||||||
@ -503,7 +510,7 @@ class ChromiumPage(BasePage):
|
|||||||
"""
|
"""
|
||||||
node_id = self.ele(loc_or_ele).node_id
|
node_id = self.ele(loc_or_ele).node_id
|
||||||
try:
|
try:
|
||||||
self.driver.DOM.scrollIntoViewIfNeeded(nodeId=node_id)
|
self._wait_driver.DOM.scrollIntoViewIfNeeded(nodeId=node_id)
|
||||||
except Exception:
|
except Exception:
|
||||||
self.ele(loc_or_ele).run_script("this.scrollIntoView();")
|
self.ele(loc_or_ele).run_script("this.scrollIntoView();")
|
||||||
|
|
||||||
@ -546,7 +553,7 @@ class ChromiumPage(BasePage):
|
|||||||
:param ua: user agent字符串
|
:param ua: user agent字符串
|
||||||
:return: None
|
:return: None
|
||||||
"""
|
"""
|
||||||
self.driver.Network.setUserAgentOverride(userAgent=ua)
|
self._wait_driver.Network.setUserAgentOverride(userAgent=ua)
|
||||||
|
|
||||||
def get_session_storage(self, item: str = None) -> Union[str, dict, None]:
|
def get_session_storage(self, item: str = None) -> Union[str, dict, None]:
|
||||||
"""获取sessionStorage信息,不设置item则获取全部 \n
|
"""获取sessionStorage信息,不设置item则获取全部 \n
|
||||||
@ -584,7 +591,7 @@ class ChromiumPage(BasePage):
|
|||||||
|
|
||||||
def to_front(self) -> None:
|
def to_front(self) -> None:
|
||||||
"""激活当前标签页使其处于最前面"""
|
"""激活当前标签页使其处于最前面"""
|
||||||
self._ss.get(f'http://{self.address}/json/activate/{self.tab_id}')
|
self._control_session.get(f'http://{self.address}/json/activate/{self.tab_id}')
|
||||||
|
|
||||||
def set_main_tab(self, tab_id: str = None) -> None:
|
def set_main_tab(self, tab_id: str = None) -> None:
|
||||||
"""设置某个标签页为住标签页"""
|
"""设置某个标签页为住标签页"""
|
||||||
@ -597,7 +604,7 @@ class ChromiumPage(BasePage):
|
|||||||
"""
|
"""
|
||||||
begin_len = len(self.tabs)
|
begin_len = len(self.tabs)
|
||||||
url = f'?{url}' if url else ''
|
url = f'?{url}' if url else ''
|
||||||
self._ss.get(f'http://{self.address}/json/new{url}')
|
self._control_session.get(f'http://{self.address}/json/new{url}')
|
||||||
while len(self.tabs) < begin_len:
|
while len(self.tabs) < begin_len:
|
||||||
pass
|
pass
|
||||||
self.to_tab()
|
self.to_tab()
|
||||||
@ -621,7 +628,7 @@ class ChromiumPage(BasePage):
|
|||||||
return
|
return
|
||||||
|
|
||||||
if activate:
|
if activate:
|
||||||
self._ss.get(f'http://{self.address}/json/activate/{tab_id}')
|
self._control_session.get(f'http://{self.address}/json/activate/{tab_id}')
|
||||||
|
|
||||||
self._driver.stop()
|
self._driver.stop()
|
||||||
self._init_page(tab_id)
|
self._init_page(tab_id)
|
||||||
@ -648,7 +655,7 @@ class ChromiumPage(BasePage):
|
|||||||
self._driver.stop()
|
self._driver.stop()
|
||||||
|
|
||||||
for tab in tabs:
|
for tab in tabs:
|
||||||
self._ss.get(f'http://{self.address}/json/close/{tab}')
|
self._control_session.get(f'http://{self.address}/json/close/{tab}')
|
||||||
while len(self.tabs) != end_len:
|
while len(self.tabs) != end_len:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
@ -678,9 +685,9 @@ class ChromiumPage(BasePage):
|
|||||||
if local_storage:
|
if local_storage:
|
||||||
self.run_script('localStorage.clear();', as_expr=True)
|
self.run_script('localStorage.clear();', as_expr=True)
|
||||||
if cache:
|
if cache:
|
||||||
self.driver.Network.clearBrowserCache()
|
self._wait_driver.Network.clearBrowserCache()
|
||||||
if cookies:
|
if cookies:
|
||||||
self.driver.Network.clearBrowserCookies()
|
self._wait_driver.Network.clearBrowserCookies()
|
||||||
|
|
||||||
def handle_alert(self, accept: bool = True, send: str = None, timeout: float = None) -> Union[str, None]:
|
def handle_alert(self, accept: bool = True, send: str = None, timeout: float = None) -> Union[str, None]:
|
||||||
"""处理提示框,可以自动等待提示框出现 \n
|
"""处理提示框,可以自动等待提示框出现 \n
|
||||||
|
@ -44,7 +44,7 @@ class WebPage(SessionPage, ChromiumPage, BasePage):
|
|||||||
self._response = None
|
self._response = None
|
||||||
|
|
||||||
if self._mode == 'd':
|
if self._mode == 'd':
|
||||||
self.driver
|
self._driver
|
||||||
|
|
||||||
def __call__(self,
|
def __call__(self,
|
||||||
loc_or_str: Union[Tuple[str, str], str, ChromiumElement, SessionElement],
|
loc_or_str: Union[Tuple[str, str], str, ChromiumElement, SessionElement],
|
||||||
@ -126,13 +126,6 @@ class WebPage(SessionPage, ChromiumPage, BasePage):
|
|||||||
def _driver(self, tab):
|
def _driver(self, tab):
|
||||||
self._tab_obj = tab
|
self._tab_obj = tab
|
||||||
|
|
||||||
@property
|
|
||||||
def driver(self) -> Tab:
|
|
||||||
"""返回Tab对象,如未初始化则按配置信息创建。 \n
|
|
||||||
如设置了本地调试浏览器,可自动接入或打开浏览器进程。
|
|
||||||
"""
|
|
||||||
return super().driver
|
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def _session_url(self) -> str:
|
def _session_url(self) -> str:
|
||||||
"""返回 session 保存的url"""
|
"""返回 session 保存的url"""
|
||||||
@ -281,7 +274,7 @@ class WebPage(SessionPage, ChromiumPage, BasePage):
|
|||||||
return self._get_driver_cookies(as_dict)
|
return self._get_driver_cookies(as_dict)
|
||||||
|
|
||||||
def _get_driver_cookies(self, as_dict: bool = False):
|
def _get_driver_cookies(self, as_dict: bool = False):
|
||||||
cookies = super(SessionPage, self).driver.Network.getCookies()['cookies']
|
cookies = super(SessionPage, self)._wait_driver.Network.getCookies()['cookies']
|
||||||
if as_dict:
|
if as_dict:
|
||||||
return {cookie['name']: cookie['value'] for cookie in cookies}
|
return {cookie['name']: cookie['value'] for cookie in cookies}
|
||||||
else:
|
else:
|
||||||
@ -299,7 +292,7 @@ class WebPage(SessionPage, ChromiumPage, BasePage):
|
|||||||
'name': cookie['name'],
|
'name': cookie['name'],
|
||||||
'domain': cookie['domain']}
|
'domain': cookie['domain']}
|
||||||
result_cookies.append(c)
|
result_cookies.append(c)
|
||||||
super(SessionPage, self).driver.Network.setCookies(cookies=result_cookies)
|
super(SessionPage, self)._wait_driver.Network.setCookies(cookies=result_cookies)
|
||||||
|
|
||||||
# 添加cookie到session
|
# 添加cookie到session
|
||||||
if set_session:
|
if set_session:
|
||||||
|
Loading…
x
Reference in New Issue
Block a user