diff --git a/DrissionPage/chromium_base.py b/DrissionPage/chromium_base.py index 1825e80..cae0381 100644 --- a/DrissionPage/chromium_base.py +++ b/DrissionPage/chromium_base.py @@ -3,7 +3,7 @@ @Author : g1879 @Contact : g1879@qq.com """ -from json import loads +from json import loads, JSONDecodeError from pathlib import Path from time import perf_counter, sleep from warnings import warn @@ -109,10 +109,10 @@ class ChromiumBase(BasePage): while True: try: - root_id = self._tab_obj.DOM.getDocument()['root']['nodeId'] + root_id = self.run_cdp('DOM.getDocument')['root']['nodeId'] if self._debug_recorder: self._debug_recorder.add_data((perf_counter(), '信息', f'root_id:{root_id}')) - self._root_id = self._tab_obj.DOM.resolveNode(nodeId=root_id)['object']['objectId'] + self._root_id = self.run_cdp('DOM.resolveNode', nodeId=root_id)['object']['objectId'] break except Exception: @@ -203,7 +203,7 @@ class ChromiumBase(BasePage): """文件选择框打开时触发""" if self._upload_list: files = self._upload_list if kwargs['mode'] == 'selectMultiple' else self._upload_list[:1] - self._tab_obj.DOM.setFileInputFiles(files=files, backendNodeId=kwargs['backendNodeId']) + self.run_cdp('DOM.setFileInputFiles', files=files, backendNodeId=kwargs['backendNodeId']) self._upload_list = [] def set_upload_files(self, files): @@ -213,7 +213,7 @@ class ChromiumBase(BasePage): """ if self._upload_list is None: self._tab_obj.Page.fileChooserOpened = self._onFileChooserOpened - self._tab_obj.Page.setInterceptFileChooserDialog(enabled=True) + self.run_cdp('Page.setInterceptFileChooserDialog', enabled=True) if isinstance(files, str): files = files.split('\n') @@ -228,12 +228,6 @@ class ChromiumBase(BasePage): """ return self.ele(loc_or_str, timeout) - @property - def title(self): - """返回当前页面title""" - self._wait_loaded() - return self._tab_obj.Target.getTargetInfo(targetId=self.tab_id)['targetInfo']['title'] - @property def driver(self): """返回用于控制浏览器的ChromiumDriver对象""" @@ -253,25 +247,28 @@ class ChromiumBase(BasePage): """返回页面是否正在加载状态""" return self._is_loading + @property + def title(self): + """返回当前页面title""" + return self.run_cdp_loaded('Target.getTargetInfo', targetId=self.tab_id)['targetInfo']['title'] + @property def url(self): """返回当前页面url""" - self._wait_loaded() - return self._tab_obj.Target.getTargetInfo(targetId=self.tab_id)['targetInfo']['url'] + return self.run_cdp_loaded('Target.getTargetInfo', targetId=self.tab_id)['targetInfo']['url'] @property def html(self): """返回当前页面html文本""" - self._wait_loaded() - return self._wait_driver.DOM.getOuterHTML(objectId=self._root_id)['outerHTML'] + return self.run_cdp_loaded('DOM.getOuterHTML', objectId=self._root_id)['outerHTML'] @property def json(self): """当返回内容是json格式时,返回对应的字典,非json格式时返回None""" try: return loads(self('t:pre', timeout=.5).text) - except Exception: - return None + except JSONDecodeError: + raise RuntimeError('非json格式或格式不正确。') @property def tab_id(self): @@ -282,7 +279,7 @@ class ChromiumBase(BasePage): def ready_state(self): """返回当前页面加载状态,'loading' 'interactive' 'complete'""" try: - return self._tab_obj.Runtime.evaluate(expression='document.readyState;')['result']['value'] + return self.run_cdp('Runtime.evaluate', expression='document.readyState;')['result']['value'] except KeyError: raise ConnectionError('标签页或连接已关闭。') @@ -292,7 +289,7 @@ class ChromiumBase(BasePage): # w = self.run_js('document.body.scrollWidth;', as_expr=True) # h = self.run_js('document.body.scrollHeight;', as_expr=True) # return w, h - r = self.run_cdp('Page.getLayoutMetrics')['contentSize'] + r = self.run_cdp_loaded('Page.getLayoutMetrics')['contentSize'] return r['width'], r['height'] @property @@ -308,7 +305,7 @@ class ChromiumBase(BasePage): @property def scroll(self): """返回用于滚动滚动条的对象""" - self.wait.load_complete() # todo: 用run_js()负责等待,这里删除 + self.wait.load_complete() if not hasattr(self, '_scroll'): self._scroll = ChromiumPageScroll(self) return self._scroll @@ -390,7 +387,7 @@ class ChromiumBase(BasePage): :param as_dict: 为True时返回由{name: value}键值对组成的dict :return: cookies信息 """ - cookies = self._wait_driver.Network.getCookies()['cookies'] + cookies = self.run_cdp_loaded('Network.getCookies')['cookies'] if as_dict: return {cookie['name']: cookie['value'] for cookie in cookies} else: @@ -410,7 +407,7 @@ class ChromiumBase(BasePage): 'name': cookie['name'], 'domain': cookie['domain']} result_cookies.append(c) - self._wait_driver.Network.setCookies(cookies=result_cookies) + self.run_cdp_loaded('Network.setCookies', cookies=result_cookies) def set_headers(self, headers: dict) -> None: """设置固定发送的headers @@ -464,7 +461,7 @@ class ChromiumBase(BasePage): raise ValueError('loc_or_str参数只能是tuple、str、ChromiumElement类型。') timeout = timeout if timeout is not None else self.timeout - search_result = self._wait_driver.DOM.performSearch(query=loc, includeUserAgentShadowDOM=True) + search_result = self.run_cdp_loaded('DOM.performSearch', query=loc, includeUserAgentShadowDOM=True) count = search_result['resultCount'] nodeIds = None @@ -485,7 +482,7 @@ class ChromiumBase(BasePage): if ok or perf_counter() >= end_time: break - search_result = self._wait_driver.DOM.performSearch(query=loc, includeUserAgentShadowDOM=True) + search_result = self.run_cdp_loaded('DOM.performSearch', query=loc, includeUserAgentShadowDOM=True) count = search_result['resultCount'] if not nodeIds: @@ -502,7 +499,7 @@ class ChromiumBase(BasePage): :return: None """ self._is_loading = True - self.driver.Page.reload(ignoreCache=ignore_cache) + self.run_cdp('Page.reload', ignoreCache=ignore_cache) self.wait.load_start() def forward(self, steps=1): @@ -552,7 +549,7 @@ class ChromiumBase(BasePage): if self._debug_recorder: self._debug_recorder.add_data((perf_counter(), '操作', '停止页面加载')) - self._tab_obj.Page.stopLoading() + self.run_cdp('Page.stopLoading') while self.ready_state != 'complete': sleep(.1) @@ -575,6 +572,16 @@ class ChromiumBase(BasePage): else: raise RuntimeError(r) + def run_cdp_loaded(self, cmd, **cmd_args): + """执行Chrome DevTools Protocol语句,执行前等待页面加载完毕 + :param cmd: 协议项目 + :param cmd_args: 参数 + :return: 执行的结果 + """ + while self.is_loading: + sleep(.1) + return self.run_cdp(cmd, **cmd_args) + def set_user_agent(self, ua, platform=None): """为当前tab设置user agent,只在当前tab有效 :param ua: user agent字符串 @@ -584,7 +591,7 @@ class ChromiumBase(BasePage): keys = {'userAgent': ua} if platform: keys['platform'] = platform - self._wait_driver.Emulation.setUserAgentOverride(**keys) + self.run_cdp('Emulation.setUserAgentOverride', **keys) def get_session_storage(self, item=None): """获取sessionStorage信息,不设置item则获取全部 @@ -650,17 +657,18 @@ class ChromiumBase(BasePage): width, height = self.size if full_page: vp = {'x': 0, 'y': 0, 'width': width, 'height': height, 'scale': 1} - png = self._wait_driver.Page.captureScreenshot(format=pic_type, captureBeyondViewport=True, clip=vp)['data'] + png = self.run_cdp_loaded('Page.captureScreenshot', format=pic_type, + captureBeyondViewport=True, clip=vp)['data'] else: if left_top and right_bottom: x, y = left_top w = right_bottom[0] - x h = right_bottom[1] - y vp = {'x': x, 'y': y, 'width': w, 'height': h, 'scale': 1} - png = self._wait_driver.Page.captureScreenshot(format=pic_type, captureBeyondViewport=True, clip=vp)[ - 'data'] + png = self.run_cdp_loaded('Page.captureScreenshot', format=pic_type, + captureBeyondViewport=True, clip=vp)['data'] else: - png = self._wait_driver.Page.captureScreenshot(format=pic_type)['data'] + png = self.run_cdp_loaded('Page.captureScreenshot', format=pic_type)['data'] from base64 import b64decode png = b64decode(png) @@ -686,9 +694,9 @@ class ChromiumBase(BasePage): if local_storage: self.run_js('localStorage.clear();', as_expr=True) if cache: - self._wait_driver.Network.clearBrowserCache() + self.run_cdp_loaded('Network.clearBrowserCache') if cookies: - self._wait_driver.Network.clearBrowserCookies() + self.run_cdp_loaded('Network.clearBrowserCookies') def _d_connect(self, to_url, times=0, interval=1, show_errmsg=False, timeout=None): """尝试连接,重试若干次 @@ -704,7 +712,7 @@ class ChromiumBase(BasePage): for t in range(times + 1): err = None - result = self.driver.Page.navigate(url=to_url) + result = self.run_cdp('Page.navigate', url=to_url) is_timeout = not self._wait_loaded(timeout) while self.is_loading: @@ -814,7 +822,7 @@ class ChromiumPageScroll(ChromiumScroll): ele = self._driver.ele(loc_or_ele) node_id = ele.node_id try: - self._driver._wait_driver.DOM.scrollIntoViewIfNeeded(nodeId=node_id) + self._driver.run_cdp_loaded('DOM.scrollIntoViewIfNeeded', nodeId=node_id) except Exception: ele.run_js("this.scrollIntoView();") diff --git a/DrissionPage/chromium_base.pyi b/DrissionPage/chromium_base.pyi index b449e4d..ecb28f3 100644 --- a/DrissionPage/chromium_base.pyi +++ b/DrissionPage/chromium_base.pyi @@ -180,6 +180,8 @@ class ChromiumBase(BasePage): def run_cdp(self, cmd: str, **cmd_args) -> dict: ... + def run_cdp_loaded(self, cmd: str, **cmd_args) -> dict: ... + def set_user_agent(self, ua: str, platform: str = None) -> None: ... def get_session_storage(self, item: str = None) -> Union[str, dict, None]: ... diff --git a/DrissionPage/chromium_element.py b/DrissionPage/chromium_element.py index 43ea562..a30b353 100644 --- a/DrissionPage/chromium_element.py +++ b/DrissionPage/chromium_element.py @@ -485,7 +485,7 @@ class ChromiumElement(DrissionElement): frame = node.get('frameId', None) frame = frame or self.page.tab_id try: - result = self.page.driver.Page.getResourceContent(frameId=frame, url=src) + result = self.page.run_cdp('Page.getResourceContent', frameId=frame, url=src) except Exception: return None @@ -565,10 +565,10 @@ class ChromiumElement(DrissionElement): return if vals.endswith('\n'): - self.page.driver.Input.insertText(text=vals[:-1]) + self.page.run_cdp('Input.insertText', text=vals[:-1]) _send_key(self, modifier, '\n') else: - self.page.driver.Input.insertText(text=vals) + self.page.run_cdp('Input.insertText', text=vals) def _set_file_input(self, files): """往上传控件写入路径 @@ -604,7 +604,7 @@ class ChromiumElement(DrissionElement): def do_it(cx, cy, lx, ly): """无遮挡返回True,有遮挡返回False,无元素返回None""" try: - r = self.page.driver.DOM.getNodeForLocation(x=lx, y=ly) + r = self.page.run_cdp('DOM.getNodeForLocation', x=lx, y=ly) except Exception: return None @@ -680,10 +680,11 @@ class ChromiumElement(DrissionElement): :param button: 'left' 或 'right' :return: None """ - self.page.driver.Input.dispatchMouseEvent(type='mousePressed', x=client_x, y=client_y, button=button, - clickCount=1) + self.page.run_cdp('Input.dispatchMouseEvent', type='mousePressed', + x=client_x, y=client_y, button=button, clickCount=1) sleep(.05) - self.page.driver.Input.dispatchMouseEvent(type='mouseReleased', x=client_x, y=client_y, button=button) + self.page.run_cdp('Input.dispatchMouseEvent', type='mouseReleased', + x=client_x, y=client_y, button=button) def hover(self, offset_x=None, offset_y=None): """鼠标悬停,可接受偏移量,偏移量相对于元素左上角坐标。不传入x或y值时悬停在元素中点 @@ -693,7 +694,7 @@ class ChromiumElement(DrissionElement): """ self.page.scroll.to_see(self) x, y = offset_scroll(self, offset_x, offset_y) - self.page.driver.Input.dispatchMouseEvent(type='mouseMoved', x=x, y=y) + self.page.run_cdp('Input.dispatchMouseEvent', type='mouseMoved', x=x, y=y) def drag(self, offset_x=0, offset_y=0, speed=40, shake=True): """拖拽当前元素到相对位置 @@ -1172,7 +1173,7 @@ def _find_by_xpath(ele, xpath, single, timeout, relative=True): if r['result']['description'] == 'NodeList(0)': return [] else: - r = ele.page.driver.Runtime.getProperties(objectId=r['result']['objectId'], ownProperties=True)['result'] + r = ele.page.run_cdp('Runtime.getProperties', objectId=r['result']['objectId'], ownProperties=True)['result'] return [make_chromium_ele(ele.page, obj_id=i['value']['objectId']) if i['value']['type'] == 'object' else i['value']['value'] for i in r[:-1]] @@ -1209,7 +1210,7 @@ def _find_by_css(ele, selector, single, timeout): if r['result']['description'] == 'NodeList(0)': return [] else: - r = ele.page.driver.Runtime.getProperties(objectId=r['result']['objectId'], ownProperties=True)['result'] + r = ele.page.run_cdp('Runtime.getProperties', objectId=r['result']['objectId'], ownProperties=True)['result'] return [make_chromium_ele(ele.page, obj_id=i['value']['objectId']) for i in r] @@ -1337,7 +1338,7 @@ def _parse_js_result(page, ele, result): return make_chromium_ele(page, obj_id=result['objectId']) elif sub_type == 'array': - r = page.driver.Runtime.getProperties(objectId=result['result']['objectId'], ownProperties=True)['result'] + r = page.run_cdp('Runtime.getProperties', objectId=result['result']['objectId'], ownProperties=True)['result'] return [_parse_js_result(page, ele, result=i['value']) for i in r] else: @@ -1370,15 +1371,15 @@ def _send_enter(ele): data = {'type': 'keyDown', 'modifiers': 0, 'windowsVirtualKeyCode': 13, 'code': 'Enter', 'key': 'Enter', 'text': '\r', 'autoRepeat': False, 'unmodifiedText': '\r', 'location': 0, 'isKeypad': False} - ele.page.driver.Input.dispatchKeyEvent(**data) + ele.page.run_cdp('Input.dispatchKeyEvent', **data) data['type'] = 'keyUp' - ele.page.driver.Input.dispatchKeyEvent(**data) + ele.page.run_cdp('Input.dispatchKeyEvent', **data) def _send_key(ele, modifier, key): """发送一个字,在键盘中的字符触发按键,其它直接发送文本""" if key not in _keyDefinitions: - ele.page.driver.Input.insertText(text=key) + ele.page.run_cdp('Input.insertText', text=key) else: description = _keyDescriptionForString(modifier, key) @@ -1394,9 +1395,9 @@ def _send_key(ele, modifier, key): 'location': description['location'], 'isKeypad': description['location'] == 3} - ele.page.driver.Input.dispatchKeyEvent(**data) + ele.page.run_cdp('Input.dispatchKeyEvent', **data) data['type'] = 'keyUp' - ele.page.driver.Input.dispatchKeyEvent(**data) + ele.page.run_cdp('Input.dispatchKeyEvent', **data) class ChromiumScroll(object): diff --git a/DrissionPage/chromium_frame.py b/DrissionPage/chromium_frame.py index 53448c0..7b31d50 100644 --- a/DrissionPage/chromium_frame.py +++ b/DrissionPage/chromium_frame.py @@ -78,7 +78,7 @@ class ChromiumFrame(ChromiumBase): self._reload() try: - self._tab_obj.DOM.describeNode(nodeId=self.node_id) + self.run_cdp('DOM.describeNode', nodeId=self.node_id) except Exception: self._reload() # sleep(2) @@ -99,7 +99,7 @@ class ChromiumFrame(ChromiumBase): self.doc_ele = ChromiumElement(self.page, backend_id=node['contentDocument']['backendNodeId']) else: - b_id = self._tab_obj.DOM.getDocument()['root']['backendNodeId'] + b_id = self.run_cdp('DOM.getDocument')['root']['backendNodeId'] self.doc_ele = ChromiumElement(self, backend_id=b_id) break diff --git a/DrissionPage/configs/configs.ini b/DrissionPage/configs/configs.ini index 935c6ba..9d8421c 100644 --- a/DrissionPage/configs/configs.ini +++ b/DrissionPage/configs/configs.ini @@ -4,8 +4,8 @@ download_path = [chrome_options] debugger_address = 127.0.0.1:9222 -binary_location = chrome -arguments = ['--no-first-run', '--disable-gpu', '--ignore-certificate-errors', '--disable-infobars', '--disable-popup-blocking'] +binary_location = D:\python\Chrome109\chrome.exe +arguments = ['--no-first-run', '--disable-gpu', '--ignore-certificate-errors', '--disable-infobars', '--disable-popup-blocking', '--user-data-dir=D:\\python\\Chrome109\\user_data'] extensions = [] experimental_options = {'prefs': {'profile.default_content_settings.popups': 0, 'profile.default_content_setting_values': {'notifications': 2}, 'plugins.plugins_list': [{'enabled': False, 'name': 'Chrome PDF Viewer'}]}, 'useAutomationExtension': False, 'excludeSwitches': ['enable-automation']} page_load_strategy = normal diff --git a/DrissionPage/web_page.py b/DrissionPage/web_page.py index 7c37ac9..03da12d 100644 --- a/DrissionPage/web_page.py +++ b/DrissionPage/web_page.py @@ -342,7 +342,7 @@ class WebPage(SessionPage, ChromiumPage, BasePage): :return: None """ if copy_user_agent: - selenium_user_agent = self._tab_obj.Runtime.evaluate(expression='navigator.userAgent;')['result']['value'] + selenium_user_agent = self.run_cdp('Runtime.evaluate', expression='navigator.userAgent;')['result']['value'] self.session.headers.update({"User-Agent": selenium_user_agent}) self.set_cookies(self._get_driver_cookies(as_dict=True), set_session=True) @@ -381,7 +381,7 @@ class WebPage(SessionPage, ChromiumPage, BasePage): :param as_dict: 以dict形式返回 :return: cookies信息 """ - cookies = self._tab_obj.Network.getCookies()['cookies'] + cookies = self.run_cdp('Network.getCookies')['cookies'] if as_dict: return {cookie['name']: cookie['value'] for cookie in cookies} else: @@ -405,7 +405,7 @@ class WebPage(SessionPage, ChromiumPage, BasePage): 'name': cookie['name'], 'domain': cookie['domain']} result_cookies.append(c) - self._tab_obj.Network.setCookies(cookies=result_cookies) + self.run_cdp('Network.setCookies', cookies=result_cookies) # 添加cookie到session if set_session: