增加run_cdp_loaded();cdp操作都使用run_cdp()或run_cdp_loaded()

This commit is contained in:
g1879 2023-02-13 10:35:00 +08:00
parent 2c0595e57a
commit dea209f35a
6 changed files with 69 additions and 58 deletions

View File

@ -3,7 +3,7 @@
@Author : g1879
@Contact : g1879@qq.com
"""
from json import loads
from json import loads, JSONDecodeError
from pathlib import Path
from time import perf_counter, sleep
from warnings import warn
@ -109,10 +109,10 @@ class ChromiumBase(BasePage):
while True:
try:
root_id = self._tab_obj.DOM.getDocument()['root']['nodeId']
root_id = self.run_cdp('DOM.getDocument')['root']['nodeId']
if self._debug_recorder:
self._debug_recorder.add_data((perf_counter(), '信息', f'root_id{root_id}'))
self._root_id = self._tab_obj.DOM.resolveNode(nodeId=root_id)['object']['objectId']
self._root_id = self.run_cdp('DOM.resolveNode', nodeId=root_id)['object']['objectId']
break
except Exception:
@ -203,7 +203,7 @@ class ChromiumBase(BasePage):
"""文件选择框打开时触发"""
if self._upload_list:
files = self._upload_list if kwargs['mode'] == 'selectMultiple' else self._upload_list[:1]
self._tab_obj.DOM.setFileInputFiles(files=files, backendNodeId=kwargs['backendNodeId'])
self.run_cdp('DOM.setFileInputFiles', files=files, backendNodeId=kwargs['backendNodeId'])
self._upload_list = []
def set_upload_files(self, files):
@ -213,7 +213,7 @@ class ChromiumBase(BasePage):
"""
if self._upload_list is None:
self._tab_obj.Page.fileChooserOpened = self._onFileChooserOpened
self._tab_obj.Page.setInterceptFileChooserDialog(enabled=True)
self.run_cdp('Page.setInterceptFileChooserDialog', enabled=True)
if isinstance(files, str):
files = files.split('\n')
@ -228,12 +228,6 @@ class ChromiumBase(BasePage):
"""
return self.ele(loc_or_str, timeout)
@property
def title(self):
"""返回当前页面title"""
self._wait_loaded()
return self._tab_obj.Target.getTargetInfo(targetId=self.tab_id)['targetInfo']['title']
@property
def driver(self):
"""返回用于控制浏览器的ChromiumDriver对象"""
@ -253,25 +247,28 @@ class ChromiumBase(BasePage):
"""返回页面是否正在加载状态"""
return self._is_loading
@property
def title(self):
"""返回当前页面title"""
return self.run_cdp_loaded('Target.getTargetInfo', targetId=self.tab_id)['targetInfo']['title']
@property
def url(self):
"""返回当前页面url"""
self._wait_loaded()
return self._tab_obj.Target.getTargetInfo(targetId=self.tab_id)['targetInfo']['url']
return self.run_cdp_loaded('Target.getTargetInfo', targetId=self.tab_id)['targetInfo']['url']
@property
def html(self):
"""返回当前页面html文本"""
self._wait_loaded()
return self._wait_driver.DOM.getOuterHTML(objectId=self._root_id)['outerHTML']
return self.run_cdp_loaded('DOM.getOuterHTML', objectId=self._root_id)['outerHTML']
@property
def json(self):
"""当返回内容是json格式时返回对应的字典非json格式时返回None"""
try:
return loads(self('t:pre', timeout=.5).text)
except Exception:
return None
except JSONDecodeError:
raise RuntimeError('非json格式或格式不正确。')
@property
def tab_id(self):
@ -282,7 +279,7 @@ class ChromiumBase(BasePage):
def ready_state(self):
"""返回当前页面加载状态,'loading' 'interactive' 'complete'"""
try:
return self._tab_obj.Runtime.evaluate(expression='document.readyState;')['result']['value']
return self.run_cdp('Runtime.evaluate', expression='document.readyState;')['result']['value']
except KeyError:
raise ConnectionError('标签页或连接已关闭。')
@ -292,7 +289,7 @@ class ChromiumBase(BasePage):
# w = self.run_js('document.body.scrollWidth;', as_expr=True)
# h = self.run_js('document.body.scrollHeight;', as_expr=True)
# return w, h
r = self.run_cdp('Page.getLayoutMetrics')['contentSize']
r = self.run_cdp_loaded('Page.getLayoutMetrics')['contentSize']
return r['width'], r['height']
@property
@ -308,7 +305,7 @@ class ChromiumBase(BasePage):
@property
def scroll(self):
"""返回用于滚动滚动条的对象"""
self.wait.load_complete() # todo: 用run_js()负责等待,这里删除
self.wait.load_complete()
if not hasattr(self, '_scroll'):
self._scroll = ChromiumPageScroll(self)
return self._scroll
@ -390,7 +387,7 @@ class ChromiumBase(BasePage):
:param as_dict: 为True时返回由{name: value}键值对组成的dict
:return: cookies信息
"""
cookies = self._wait_driver.Network.getCookies()['cookies']
cookies = self.run_cdp_loaded('Network.getCookies')['cookies']
if as_dict:
return {cookie['name']: cookie['value'] for cookie in cookies}
else:
@ -410,7 +407,7 @@ class ChromiumBase(BasePage):
'name': cookie['name'],
'domain': cookie['domain']}
result_cookies.append(c)
self._wait_driver.Network.setCookies(cookies=result_cookies)
self.run_cdp_loaded('Network.setCookies', cookies=result_cookies)
def set_headers(self, headers: dict) -> None:
"""设置固定发送的headers
@ -464,7 +461,7 @@ class ChromiumBase(BasePage):
raise ValueError('loc_or_str参数只能是tuple、str、ChromiumElement类型。')
timeout = timeout if timeout is not None else self.timeout
search_result = self._wait_driver.DOM.performSearch(query=loc, includeUserAgentShadowDOM=True)
search_result = self.run_cdp_loaded('DOM.performSearch', query=loc, includeUserAgentShadowDOM=True)
count = search_result['resultCount']
nodeIds = None
@ -485,7 +482,7 @@ class ChromiumBase(BasePage):
if ok or perf_counter() >= end_time:
break
search_result = self._wait_driver.DOM.performSearch(query=loc, includeUserAgentShadowDOM=True)
search_result = self.run_cdp_loaded('DOM.performSearch', query=loc, includeUserAgentShadowDOM=True)
count = search_result['resultCount']
if not nodeIds:
@ -502,7 +499,7 @@ class ChromiumBase(BasePage):
:return: None
"""
self._is_loading = True
self.driver.Page.reload(ignoreCache=ignore_cache)
self.run_cdp('Page.reload', ignoreCache=ignore_cache)
self.wait.load_start()
def forward(self, steps=1):
@ -552,7 +549,7 @@ class ChromiumBase(BasePage):
if self._debug_recorder:
self._debug_recorder.add_data((perf_counter(), '操作', '停止页面加载'))
self._tab_obj.Page.stopLoading()
self.run_cdp('Page.stopLoading')
while self.ready_state != 'complete':
sleep(.1)
@ -575,6 +572,16 @@ class ChromiumBase(BasePage):
else:
raise RuntimeError(r)
def run_cdp_loaded(self, cmd, **cmd_args):
"""执行Chrome DevTools Protocol语句执行前等待页面加载完毕
:param cmd: 协议项目
:param cmd_args: 参数
:return: 执行的结果
"""
while self.is_loading:
sleep(.1)
return self.run_cdp(cmd, **cmd_args)
def set_user_agent(self, ua, platform=None):
"""为当前tab设置user agent只在当前tab有效
:param ua: user agent字符串
@ -584,7 +591,7 @@ class ChromiumBase(BasePage):
keys = {'userAgent': ua}
if platform:
keys['platform'] = platform
self._wait_driver.Emulation.setUserAgentOverride(**keys)
self.run_cdp('Emulation.setUserAgentOverride', **keys)
def get_session_storage(self, item=None):
"""获取sessionStorage信息不设置item则获取全部
@ -650,17 +657,18 @@ class ChromiumBase(BasePage):
width, height = self.size
if full_page:
vp = {'x': 0, 'y': 0, 'width': width, 'height': height, 'scale': 1}
png = self._wait_driver.Page.captureScreenshot(format=pic_type, captureBeyondViewport=True, clip=vp)['data']
png = self.run_cdp_loaded('Page.captureScreenshot', format=pic_type,
captureBeyondViewport=True, clip=vp)['data']
else:
if left_top and right_bottom:
x, y = left_top
w = right_bottom[0] - x
h = right_bottom[1] - y
vp = {'x': x, 'y': y, 'width': w, 'height': h, 'scale': 1}
png = self._wait_driver.Page.captureScreenshot(format=pic_type, captureBeyondViewport=True, clip=vp)[
'data']
png = self.run_cdp_loaded('Page.captureScreenshot', format=pic_type,
captureBeyondViewport=True, clip=vp)['data']
else:
png = self._wait_driver.Page.captureScreenshot(format=pic_type)['data']
png = self.run_cdp_loaded('Page.captureScreenshot', format=pic_type)['data']
from base64 import b64decode
png = b64decode(png)
@ -686,9 +694,9 @@ class ChromiumBase(BasePage):
if local_storage:
self.run_js('localStorage.clear();', as_expr=True)
if cache:
self._wait_driver.Network.clearBrowserCache()
self.run_cdp_loaded('Network.clearBrowserCache')
if cookies:
self._wait_driver.Network.clearBrowserCookies()
self.run_cdp_loaded('Network.clearBrowserCookies')
def _d_connect(self, to_url, times=0, interval=1, show_errmsg=False, timeout=None):
"""尝试连接,重试若干次
@ -704,7 +712,7 @@ class ChromiumBase(BasePage):
for t in range(times + 1):
err = None
result = self.driver.Page.navigate(url=to_url)
result = self.run_cdp('Page.navigate', url=to_url)
is_timeout = not self._wait_loaded(timeout)
while self.is_loading:
@ -814,7 +822,7 @@ class ChromiumPageScroll(ChromiumScroll):
ele = self._driver.ele(loc_or_ele)
node_id = ele.node_id
try:
self._driver._wait_driver.DOM.scrollIntoViewIfNeeded(nodeId=node_id)
self._driver.run_cdp_loaded('DOM.scrollIntoViewIfNeeded', nodeId=node_id)
except Exception:
ele.run_js("this.scrollIntoView();")

View File

@ -180,6 +180,8 @@ class ChromiumBase(BasePage):
def run_cdp(self, cmd: str, **cmd_args) -> dict: ...
def run_cdp_loaded(self, cmd: str, **cmd_args) -> dict: ...
def set_user_agent(self, ua: str, platform: str = None) -> None: ...
def get_session_storage(self, item: str = None) -> Union[str, dict, None]: ...

View File

@ -485,7 +485,7 @@ class ChromiumElement(DrissionElement):
frame = node.get('frameId', None)
frame = frame or self.page.tab_id
try:
result = self.page.driver.Page.getResourceContent(frameId=frame, url=src)
result = self.page.run_cdp('Page.getResourceContent', frameId=frame, url=src)
except Exception:
return None
@ -565,10 +565,10 @@ class ChromiumElement(DrissionElement):
return
if vals.endswith('\n'):
self.page.driver.Input.insertText(text=vals[:-1])
self.page.run_cdp('Input.insertText', text=vals[:-1])
_send_key(self, modifier, '\n')
else:
self.page.driver.Input.insertText(text=vals)
self.page.run_cdp('Input.insertText', text=vals)
def _set_file_input(self, files):
"""往上传控件写入路径
@ -604,7 +604,7 @@ class ChromiumElement(DrissionElement):
def do_it(cx, cy, lx, ly):
"""无遮挡返回True有遮挡返回False无元素返回None"""
try:
r = self.page.driver.DOM.getNodeForLocation(x=lx, y=ly)
r = self.page.run_cdp('DOM.getNodeForLocation', x=lx, y=ly)
except Exception:
return None
@ -680,10 +680,11 @@ class ChromiumElement(DrissionElement):
:param button: 'left' 'right'
:return: None
"""
self.page.driver.Input.dispatchMouseEvent(type='mousePressed', x=client_x, y=client_y, button=button,
clickCount=1)
self.page.run_cdp('Input.dispatchMouseEvent', type='mousePressed',
x=client_x, y=client_y, button=button, clickCount=1)
sleep(.05)
self.page.driver.Input.dispatchMouseEvent(type='mouseReleased', x=client_x, y=client_y, button=button)
self.page.run_cdp('Input.dispatchMouseEvent', type='mouseReleased',
x=client_x, y=client_y, button=button)
def hover(self, offset_x=None, offset_y=None):
"""鼠标悬停可接受偏移量偏移量相对于元素左上角坐标。不传入x或y值时悬停在元素中点
@ -693,7 +694,7 @@ class ChromiumElement(DrissionElement):
"""
self.page.scroll.to_see(self)
x, y = offset_scroll(self, offset_x, offset_y)
self.page.driver.Input.dispatchMouseEvent(type='mouseMoved', x=x, y=y)
self.page.run_cdp('Input.dispatchMouseEvent', type='mouseMoved', x=x, y=y)
def drag(self, offset_x=0, offset_y=0, speed=40, shake=True):
"""拖拽当前元素到相对位置
@ -1172,7 +1173,7 @@ def _find_by_xpath(ele, xpath, single, timeout, relative=True):
if r['result']['description'] == 'NodeList(0)':
return []
else:
r = ele.page.driver.Runtime.getProperties(objectId=r['result']['objectId'], ownProperties=True)['result']
r = ele.page.run_cdp('Runtime.getProperties', objectId=r['result']['objectId'], ownProperties=True)['result']
return [make_chromium_ele(ele.page, obj_id=i['value']['objectId'])
if i['value']['type'] == 'object' else i['value']['value']
for i in r[:-1]]
@ -1209,7 +1210,7 @@ def _find_by_css(ele, selector, single, timeout):
if r['result']['description'] == 'NodeList(0)':
return []
else:
r = ele.page.driver.Runtime.getProperties(objectId=r['result']['objectId'], ownProperties=True)['result']
r = ele.page.run_cdp('Runtime.getProperties', objectId=r['result']['objectId'], ownProperties=True)['result']
return [make_chromium_ele(ele.page, obj_id=i['value']['objectId']) for i in r]
@ -1337,7 +1338,7 @@ def _parse_js_result(page, ele, result):
return make_chromium_ele(page, obj_id=result['objectId'])
elif sub_type == 'array':
r = page.driver.Runtime.getProperties(objectId=result['result']['objectId'], ownProperties=True)['result']
r = page.run_cdp('Runtime.getProperties', objectId=result['result']['objectId'], ownProperties=True)['result']
return [_parse_js_result(page, ele, result=i['value']) for i in r]
else:
@ -1370,15 +1371,15 @@ def _send_enter(ele):
data = {'type': 'keyDown', 'modifiers': 0, 'windowsVirtualKeyCode': 13, 'code': 'Enter', 'key': 'Enter',
'text': '\r', 'autoRepeat': False, 'unmodifiedText': '\r', 'location': 0, 'isKeypad': False}
ele.page.driver.Input.dispatchKeyEvent(**data)
ele.page.run_cdp('Input.dispatchKeyEvent', **data)
data['type'] = 'keyUp'
ele.page.driver.Input.dispatchKeyEvent(**data)
ele.page.run_cdp('Input.dispatchKeyEvent', **data)
def _send_key(ele, modifier, key):
"""发送一个字,在键盘中的字符触发按键,其它直接发送文本"""
if key not in _keyDefinitions:
ele.page.driver.Input.insertText(text=key)
ele.page.run_cdp('Input.insertText', text=key)
else:
description = _keyDescriptionForString(modifier, key)
@ -1394,9 +1395,9 @@ def _send_key(ele, modifier, key):
'location': description['location'],
'isKeypad': description['location'] == 3}
ele.page.driver.Input.dispatchKeyEvent(**data)
ele.page.run_cdp('Input.dispatchKeyEvent', **data)
data['type'] = 'keyUp'
ele.page.driver.Input.dispatchKeyEvent(**data)
ele.page.run_cdp('Input.dispatchKeyEvent', **data)
class ChromiumScroll(object):

View File

@ -78,7 +78,7 @@ class ChromiumFrame(ChromiumBase):
self._reload()
try:
self._tab_obj.DOM.describeNode(nodeId=self.node_id)
self.run_cdp('DOM.describeNode', nodeId=self.node_id)
except Exception:
self._reload()
# sleep(2)
@ -99,7 +99,7 @@ class ChromiumFrame(ChromiumBase):
self.doc_ele = ChromiumElement(self.page, backend_id=node['contentDocument']['backendNodeId'])
else:
b_id = self._tab_obj.DOM.getDocument()['root']['backendNodeId']
b_id = self.run_cdp('DOM.getDocument')['root']['backendNodeId']
self.doc_ele = ChromiumElement(self, backend_id=b_id)
break

View File

@ -4,8 +4,8 @@ download_path =
[chrome_options]
debugger_address = 127.0.0.1:9222
binary_location = chrome
arguments = ['--no-first-run', '--disable-gpu', '--ignore-certificate-errors', '--disable-infobars', '--disable-popup-blocking']
binary_location = D:\python\Chrome109\chrome.exe
arguments = ['--no-first-run', '--disable-gpu', '--ignore-certificate-errors', '--disable-infobars', '--disable-popup-blocking', '--user-data-dir=D:\\python\\Chrome109\\user_data']
extensions = []
experimental_options = {'prefs': {'profile.default_content_settings.popups': 0, 'profile.default_content_setting_values': {'notifications': 2}, 'plugins.plugins_list': [{'enabled': False, 'name': 'Chrome PDF Viewer'}]}, 'useAutomationExtension': False, 'excludeSwitches': ['enable-automation']}
page_load_strategy = normal

View File

@ -342,7 +342,7 @@ class WebPage(SessionPage, ChromiumPage, BasePage):
:return: None
"""
if copy_user_agent:
selenium_user_agent = self._tab_obj.Runtime.evaluate(expression='navigator.userAgent;')['result']['value']
selenium_user_agent = self.run_cdp('Runtime.evaluate', expression='navigator.userAgent;')['result']['value']
self.session.headers.update({"User-Agent": selenium_user_agent})
self.set_cookies(self._get_driver_cookies(as_dict=True), set_session=True)
@ -381,7 +381,7 @@ class WebPage(SessionPage, ChromiumPage, BasePage):
:param as_dict: 以dict形式返回
:return: cookies信息
"""
cookies = self._tab_obj.Network.getCookies()['cookies']
cookies = self.run_cdp('Network.getCookies')['cookies']
if as_dict:
return {cookie['name']: cookie['value'] for cookie in cookies}
else:
@ -405,7 +405,7 @@ class WebPage(SessionPage, ChromiumPage, BasePage):
'name': cookie['name'],
'domain': cookie['domain']}
result_cookies.append(c)
self._tab_obj.Network.setCookies(cookies=result_cookies)
self.run_cdp('Network.setCookies', cookies=result_cookies)
# 添加cookie到session
if set_session: