From 0b3fca70e4acb84648cff6fa0b7cd8d6a26bf343 Mon Sep 17 00:00:00 2001 From: g1879 Date: Thu, 10 Nov 2022 00:11:45 +0800 Subject: [PATCH] =?UTF-8?q?=E7=BB=A7=E7=BB=AD=E5=AE=8C=E5=96=84=EF=BC=8C?= =?UTF-8?q?=E5=8D=B3=E5=B0=86=E5=AE=8C=E6=88=90?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- DrissionPage/chrome_driver.py | 6 -- DrissionPage/chrome_element.py | 42 +++++++++++-- DrissionPage/chrome_page.py | 104 ++++++++++++++++++++++++++++++++- DrissionPage/keys.py | 2 + DrissionPage/web_page.py | 49 +++++++++++----- 5 files changed, 176 insertions(+), 27 deletions(-) delete mode 100644 DrissionPage/chrome_driver.py diff --git a/DrissionPage/chrome_driver.py b/DrissionPage/chrome_driver.py deleted file mode 100644 index 2b00289..0000000 --- a/DrissionPage/chrome_driver.py +++ /dev/null @@ -1,6 +0,0 @@ -# -*- coding:utf-8 -*- -class ChromeDriver(object): - def __init__(self, - address: str = 'localhost:9222', - path: str = 'chrome'): - self.address = address[7:] if address.startswith('http://') else address diff --git a/DrissionPage/chrome_element.py b/DrissionPage/chrome_element.py index 1eaacbf..5bdcc01 100644 --- a/DrissionPage/chrome_element.py +++ b/DrissionPage/chrome_element.py @@ -4,6 +4,8 @@ @Contact : g1879@qq.com @File : chrome_element.py """ +from os import sep +from os.path import basename from pathlib import Path from re import search from typing import Union, Tuple, List, Any @@ -320,14 +322,14 @@ class ChromeElement(DrissionElement): # 获取href属性时返回绝对url attrs = self.attrs if attr == 'href': - link = attrs['href'] + link = attrs.get('href', None) if not link or link.lower().startswith(('javascript:', 'mailto:')): return link else: return make_absolute_link(link, self.page) elif attr == 'src': - return make_absolute_link(attrs['src'], self.page) + return make_absolute_link(attrs.get('src', None), self.page) elif attr == 'text': return self.text @@ -451,6 +453,37 @@ class ChromeElement(DrissionElement): js = f'return window.getComputedStyle(this{pseudo_ele}).getPropertyValue("{style}");' return self.run_script(js) + def save(self, path: [str, bool] = None, rename: str = None) -> Union[bytes, str, bool]: + """保存图片或其它有src属性的元素的资源 \n + :param path: 文件保存路径,为None时保存到当前文件夹,为False时不保存 + :param rename: 文件名称,为None时从资源url获取 + :return: 资源内容文本 + """ + src = self.attr('src') + if not src: + return False + path = path or '.' + + node = self.page.driver.DOM.describeNode(nodeId=self._node_id)['node'] + frame = node.get('frameId', None) + frame = frame or self.page.current_tab_handle + result = self.page.driver.Page.getResourceContent(frameId=frame, url=src) + if result['base64Encoded']: + from base64 import b64decode + data = b64decode(result['content']) + write_type = 'wb' + else: + data = result['content'] + write_type = 'w' + + if path: + rename = rename or basename(src) + Path(path).mkdir(parents=True, exist_ok=True) + with open(f'{path}{sep}{rename}', write_type) as f: + f.write(data) + + return data + def get_screenshot(self, path: [str, Path] = None, as_bytes: [bool, str] = None) -> Union[str, bytes]: """对当前元素截图 \n @@ -509,7 +542,7 @@ class ChromeElement(DrissionElement): def _set_file_input(self, files: Union[str, list, tuple]) -> None: """设置上传控件值""" - if isinstance(files): + if isinstance(files, str): files = files.split('\n') self.page.driver.DOM.setFileInputFiles(files=files, nodeId=self._node_id) @@ -821,7 +854,6 @@ class ChromeShadowRootElement(BaseElement): xpath = f'xpath:./following::{loc}' return eles1 + self.parent_ele.eles(xpath, timeout=0.1) - def ele(self, loc_or_str: Union[Tuple[str, str], str], timeout: float = None) -> Union[ChromeElement, str, None]: @@ -1048,7 +1080,7 @@ def _run_script(page_or_ele, script: str, as_expr: bool = False, timeout: float :param args: 参数,按顺序在js文本中对应argument[0]、argument[2]... :return: """ - if isinstance(page_or_ele, BaseElement): + if isinstance(page_or_ele, (ChromeElement, ChromeShadowRootElement)): page = page_or_ele.page obj_id = page_or_ele.obj_id else: diff --git a/DrissionPage/chrome_page.py b/DrissionPage/chrome_page.py index 4177019..490fe30 100644 --- a/DrissionPage/chrome_page.py +++ b/DrissionPage/chrome_page.py @@ -1,5 +1,6 @@ # -*- coding:utf-8 -*- from pathlib import Path +from platform import system from re import search from time import perf_counter, sleep from typing import Union, Tuple, List, Any @@ -33,13 +34,14 @@ class ChromePage(BasePage): self._driver = Tab_or_Options self.address = search(r'ws://(.*?)/dev', Tab_or_Options._websocket_url).group(1) self.options = None + self.process = None elif isinstance(Tab_or_Options, DriverOptions): self.options = Tab_or_Options or DriverOptions() # 从ini文件读取 self.set_timeouts(page_load=self.options.timeouts['pageLoad'], script=self.options.timeouts['script']) self._page_load_strategy = self.options.page_load_strategy - connect_chrome(self.options) + self.process = connect_chrome(self.options)[1] self.address = self.options.debugger_address tab_handle = self.tab_handles[0] if not tab_handle else tab_handle self._driver = Tab(id=tab_handle, type='page', @@ -489,6 +491,29 @@ class ChromePage(BasePage): """ self.close_tabs(num_or_handles, True) + def set_window_size(self, width: int = None, height: int = None) -> None: + """设置浏览器窗口大小,默认最大化,任一参数为0最小化 \n + :param width: 浏览器窗口高 + :param height: 浏览器窗口宽 + :return: None + """ + self.driver.Emulation.setDeviceMetricsOverride(width=500, height=500, + deviceScaleFactor=0, mobile=False, + ) + # if width is None and height is None: + # self.driver.maximize_window() + # + # elif width == 0 or height == 0: + # self.driver.minimize_window() + # + # else: + # if width < 0 or height < 0: + # raise ValueError('x 和 y参数必须大于0。') + # + # new_x = width or self.driver.get_window_size()['width'] + # new_y = height or self.driver.get_window_size()['height'] + # self.driver.set_window_size(new_x, new_y) + def clear_cache(self, session_storage: bool = True, local_storage: bool = True, @@ -531,6 +556,14 @@ class ChromePage(BasePage): self.driver.Page.handleJavaScriptDialog(accept=accept) return res_text + def hide_browser(self) -> None: + """隐藏浏览器窗口,只在Windows系统可用""" + _show_or_hide_browser(self, hide=True) + + def show_browser(self) -> None: + """显示浏览器窗口,只在Windows系统可用""" + _show_or_hide_browser(self, hide=False) + def check_page(self) -> Union[bool, None]: """检查页面是否符合预期 \n 由子类自行实现各页面的判定规则 @@ -553,7 +586,7 @@ class ChromePage(BasePage): """ err = None is_ok = False - timeout = timeout if timeout is not None else self.timeout + timeout = timeout if timeout is not None else self.timeouts.page_load for _ in range(times + 1): result = self.driver.Page.navigate(url=to_url) @@ -595,7 +628,7 @@ class ChromePage(BasePage): self._alert.text = None self._alert.type = None self._alert.defaultPrompt = None - self._alert.response_accept = kwargs.get['result'] + self._alert.response_accept = kwargs.get('result') self._alert.response_text = kwargs['userInput'] def _on_alert_open(self, **kwargs): @@ -645,3 +678,68 @@ def _get_tabs(handles: list, num_or_handles: Union[int, str, list, tuple, set]) raise TypeError('num_or_handle参数只能是int、str、list、set 或 tuple类型。') return set(i if isinstance(i, str) else handles[i] for i in num_or_handles) + + +def _show_or_hide_browser(page: ChromePage, hide: bool = True) -> None: + if system().lower() != 'windows': + raise OSError('该方法只能在Windows系统使用。') + + try: + from win32gui import ShowWindow + from win32con import SW_HIDE, SW_SHOW + except ImportError: + raise ImportError('请先安装:pip install pypiwin32') + + pid = _get_browser_progress_id(page.process, page.address) + if not pid: + return None + hds = _get_chrome_hwnds_from_pid(pid, page.title) + sw = SW_HIDE if hide else SW_SHOW + for hd in hds: + ShowWindow(hd, sw) + + +def _get_browser_progress_id(progress, address: str) -> Union[str, None]: + """获取浏览器进程id""" + if progress: + return progress.pid + + address = address.split(':') + if len(address) != 2: + return None + + ip, port = address + if ip not in ('127.0.0.1', 'localhost') or not port.isdigit(): + return None + + from os import popen + txt = '' + progresses = popen(f'netstat -nao | findstr :{port}').read().split('\n') + for progress in progresses: + if 'LISTENING' in progress: + txt = progress + break + if not txt: + return None + + return txt.split(' ')[-1] + + +def _get_chrome_hwnds_from_pid(pid, title) -> list: + """通过PID查询句柄ID""" + try: + from win32gui import IsWindow, GetWindowText, EnumWindows + from win32process import GetWindowThreadProcessId + except ImportError: + raise ImportError('请先安装win32gui,pip install pypiwin32') + + def callback(hwnd, hds): + if IsWindow(hwnd) and title in GetWindowText(hwnd): + _, found_pid = GetWindowThreadProcessId(hwnd) + if str(found_pid) == str(pid): + hds.append(hwnd) + return True + + hwnds = [] + EnumWindows(callback, hwnds) + return hwnds diff --git a/DrissionPage/keys.py b/DrissionPage/keys.py index 1a9ea1e..34a45df 100644 --- a/DrissionPage/keys.py +++ b/DrissionPage/keys.py @@ -334,6 +334,8 @@ _modifierBit = {'\ue00a': 1, def _keys_to_typing(value) -> Tuple[int, str]: + """把要输入的内容连成字符串,去掉其中 ctrl 等键。 + 返回的modifier表示是否有按下组合键""" typing: List[str] = [] modifier = 0 for val in value: diff --git a/DrissionPage/web_page.py b/DrissionPage/web_page.py index cda6a9b..fdec08b 100644 --- a/DrissionPage/web_page.py +++ b/DrissionPage/web_page.py @@ -43,16 +43,6 @@ class WebPage(SessionPage, ChromePage, BasePage): if self._mode == 'd': self.driver - # self._ready() - - # if self._mode == 'd': - # try: - # timeouts = self.drission.driver_options.timeouts - # t = timeout if timeout is not None else timeouts['implicit'] / 1000 - # self.set_timeouts(t, timeouts['pageLoad'] / 1000, timeouts['script'] / 1000) - # - # except Exception: - # self.timeout = timeout if timeout is not None else 10 def __call__(self, loc_or_str: Union[Tuple[str, str], str, ChromeElement, SessionElement], @@ -221,10 +211,10 @@ class WebPage(SessionPage, ChromePage, BasePage): # s模式转d模式 if self._mode == 'd': - # if not self._has_driver: - # self._ready() - self._has_driver = True + if not self._has_driver: + self.driver self._url = None if not self._has_driver else super(SessionPage, self).url + self._has_driver = True if self._session_url: self.cookies_to_driver() @@ -301,6 +291,39 @@ class WebPage(SessionPage, ChromePage, BasePage): self.session.cookies.set(cookie['name'], cookie['value'], **kwargs) + def check_page(self, by_requests: bool = False) -> Union[bool, None]: + """d模式时检查网页是否符合预期 \n + 默认由response状态检查,可重载实现针对性检查 \n + :param by_requests: 是否用内置response检查 + :return: bool或None,None代表不知道结果 + """ + if self._session_url and self._session_url == self.url: + return self._response.ok + + # 使用requests访问url并判断可用性 + if by_requests: + self.cookies_to_session() + r = self._make_response(self.url, retry=0)[0] + return r.ok if r else False + + def close_driver(self) -> None: + """关闭driver及浏览器""" + if self._has_driver: + self.change_mode('s') + try: + self.driver.Browser.close() + except Exception: + pass + self._has_driver = None + + def close_session(self) -> None: + """关闭session""" + if self._has_session: + self.change_mode('d') + self._session = None + self._response = None + self._has_session = None + # ----------------重写SessionPage的函数----------------------- def post(self, url: str,