From a9d5b1819456821eeb44e4846b6efed1defeffbf Mon Sep 17 00:00:00 2001 From: g1879 Date: Thu, 10 Nov 2022 18:34:07 +0800 Subject: [PATCH] =?UTF-8?q?=E5=BE=85=E6=B5=8B=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- DrissionPage/action_chains.py | 90 +++++++++++++++++++++++ DrissionPage/chrome_element.py | 28 ++++++- DrissionPage/chrome_page.py | 130 +++++++++++++++++++++++---------- 3 files changed, 205 insertions(+), 43 deletions(-) create mode 100644 DrissionPage/action_chains.py diff --git a/DrissionPage/action_chains.py b/DrissionPage/action_chains.py new file mode 100644 index 0000000..6f320e2 --- /dev/null +++ b/DrissionPage/action_chains.py @@ -0,0 +1,90 @@ +# -*- coding:utf-8 -*- +# from chrome_element import ChromeElement + + +class ActionChains: + """ + ActionChains are a way to automate low level interactions such as + mouse movements, mouse button actions, key press, and context menu interactions. + This is useful for doing more complex actions like hover over and drag and drop. + + Generate user actions. + When you call methods for actions on the ActionChains object, + the actions are stored in a queue in the ActionChains object. + When you call perform(), the events are fired in the order they + are queued up. + + ActionChains can be used in a chain pattern:: + + menu = driver.find_element(By.CSS_SELECTOR, ".nav") + hidden_submenu = driver.find_element(By.CSS_SELECTOR, ".nav #submenu1") + + ActionChains(driver).move_to_element(menu).click(hidden_submenu).perform() + + Or actions can be queued up one by one, then performed.:: + + menu = driver.find_element(By.CSS_SELECTOR, ".nav") + hidden_submenu = driver.find_element(By.CSS_SELECTOR, ".nav #submenu1") + + actions = ActionChains(driver) + actions.move_to_element(menu) + actions.click(hidden_submenu) + actions.perform() + + Either way, the actions are performed in the order they are called, one after + another. + """ + + def __init__(self, page): + """ + Creates a new ActionChains. + + :Args: + - driver: The WebDriver instance which performs user actions. + - duration: override the default 250 msecs of DEFAULT_MOVE_DURATION in PointerInput + """ + self._dr = page.driver + self.curr_x = 0 + self.curr_y = 0 + + def move_to_element(self, to_element): + cl = to_element.client_location + size = to_element.size + x = cl['x'] + size['width'] // 2 + y = cl['y'] + size['height'] // 2 + self._dr.Input.dispatchMouseEvent(type='mouseMoved', x=x, y=y) + self.curr_x = x + self.curr_y = y + return self + + def move_to_element_with_offset(self, to_element, offset_x=0, offset_y=0): + cl = to_element.client_location + size = to_element.size + x = int(offset_x) + cl['x'] + size['width'] // 2 + y = int(offset_y) + cl['y'] + size['height'] // 2 + self._dr.Input.dispatchMouseEvent(type='mouseMoved', x=x, y=y) + self.curr_x = x + self.curr_y = y + return self + + def click_and_hold(self, on_element=None): + if on_element: + self.move_to_element(on_element) + self._dr.Input.dispatchMouseEvent(type='mousePressed', button='left', clickCount=1, + x=self.curr_x, y=self.curr_y) + # self.key_down() + + return self + + def release(self, on_element=None): + if on_element: + self.move_to_element(on_element) + self._dr.Input.dispatchMouseEvent(type='mouseReleased', button='left', + x=self.curr_x, y=self.curr_y) + # self.key_down() + return self + + def key_down(self): + data = {'type': 'rawKeyDown', 'modifiers': 0, 'windowsVirtualKeyCode': 19, 'code': 'Pause', 'key': 'Pause', + 'text': '', 'autoRepeat': False, 'unmodifiedText': '', 'location': 0, 'isKeypad': False} + self._dr.call_method('Input.dispatchKeyEvent', **data) \ No newline at end of file diff --git a/DrissionPage/chrome_element.py b/DrissionPage/chrome_element.py index 5bdcc01..4b721ba 100644 --- a/DrissionPage/chrome_element.py +++ b/DrissionPage/chrome_element.py @@ -355,6 +355,16 @@ class ChromeElement(DrissionElement): """ return _run_script(self, script, as_expr, self.page.timeouts.script, args) + def run_async_script(self, script: str, as_expr: bool = False, *args: Any) -> None: + """以异步方式执行js代码 \n + :param script: js文本 + :param as_expr: 是否作为表达式运行,为True时args无效 + :param args: 参数,按顺序在js文本中对应argument[0]、argument[2]... + :return: None + """ + from threading import Thread + Thread(target=_run_script, args=(self, script, as_expr, self.page.timeouts.script, args)).start() + def ele(self, loc_or_str: Union[Tuple[str, str], str], timeout: float = None) -> Union['ChromeElement', str, None]: @@ -636,13 +646,13 @@ class ChromeElement(DrissionElement): sleep(.1) self.page.driver.Input.dispatchMouseEvent(type='mouseReleased', x=x, y=y, button=button) - def hover(self, x: int = None, y: int = None) -> None: + def hover(self, offset_x: int = None, offset_y: int = None) -> None: """鼠标悬停,可接受偏移量,偏移量相对于元素左上角坐标。不传入x或y值时悬停在元素中点 \n - :param x: 相对元素左上角坐标的x轴偏移量 - :param y: 相对元素左上角坐标的y轴偏移量 + :param offset_x: 相对元素左上角坐标的x轴偏移量 + :param offset_y: 相对元素左上角坐标的y轴偏移量 :return: None """ - x, y = _offset_scroll(self, x, y) + x, y = _offset_scroll(self, offset_x, offset_y) self.page.driver.Input.dispatchMouseEvent(type='mouseMoved', x=x, y=y) def _get_obj_id(self, node_id) -> str: @@ -765,6 +775,16 @@ class ChromeShadowRootElement(BaseElement): """ return _run_script(self, script, as_expr, self.page.timeouts.script, args) + def run_async_script(self, script: str, as_expr: bool = False, *args: Any) -> None: + """以异步方式执行js代码 \n + :param script: js文本 + :param as_expr: 是否作为表达式运行,为True时args无效 + :param args: 参数,按顺序在js文本中对应argument[0]、argument[2]... + :return: None + """ + from threading import Thread + Thread(target=_run_script, args=(self, script, as_expr, self.page.timeouts.script, args)).start() + def parent(self, level_or_loc: Union[str, int] = 1) -> ChromeElement: """返回上面某一级父元素,可指定层数或用查询语法定位 \n :param level_or_loc: 第几级父元素,或定位符 diff --git a/DrissionPage/chrome_page.py b/DrissionPage/chrome_page.py index 490fe30..de99237 100644 --- a/DrissionPage/chrome_page.py +++ b/DrissionPage/chrome_page.py @@ -118,13 +118,6 @@ class ChromePage(BasePage): """返回当前页面加载状态,""" return self.run_script('document.readyState;', as_expr=True) - @property - def scroll(self) -> ChromeScroll: - """用于滚动滚动条的对象""" - if not hasattr(self, '_scroll'): - self._scroll = ChromeScroll(self) - return self._scroll - @property def size(self) -> dict: """返回页面总长宽""" @@ -141,6 +134,27 @@ class ChromePage(BasePage): """返回页面加载策略""" return self._page_load_strategy + @property + def process_id(self) -> Union[None, int]: + """获取浏览器进程id""" + try: + return self.driver.SystemInfo.getProcessInfo()['id'] + except Exception: + return None + + @property + def scroll(self) -> ChromeScroll: + """用于滚动滚动条的对象""" + if not hasattr(self, '_scroll'): + self._scroll = ChromeScroll(self) + return self._scroll + + @property + def set_window(self) -> 'WindowSizeSetter': + if not hasattr(self, '_window_setter'): + self._window_setter = WindowSizeSetter(self) + return self._window_setter + def set_page_load_strategy(self, value: str) -> None: """设置页面加载策略,可选'normal', 'eager', 'none'""" if value not in ('normal', 'eager', 'none'): @@ -172,6 +186,16 @@ class ChromePage(BasePage): """ return _run_script(self, script, as_expr, self.timeouts.script, args) + def run_async_script(self, script: str, as_expr: bool = False, *args: Any) -> None: + """以异步方式执行js代码 \n + :param script: js文本 + :param as_expr: 是否作为表达式运行,为True时args无效 + :param args: 参数,按顺序在js文本中对应argument[0]、argument[2]... + :return: None + """ + from threading import Thread + Thread(target=_run_script, args=(self, script, as_expr, self.timeouts.script, args)).start() + def get(self, url: str, show_errmsg: bool = False, @@ -491,29 +515,6 @@ class ChromePage(BasePage): """ self.close_tabs(num_or_handles, True) - def set_window_size(self, width: int = None, height: int = None) -> None: - """设置浏览器窗口大小,默认最大化,任一参数为0最小化 \n - :param width: 浏览器窗口高 - :param height: 浏览器窗口宽 - :return: None - """ - self.driver.Emulation.setDeviceMetricsOverride(width=500, height=500, - deviceScaleFactor=0, mobile=False, - ) - # if width is None and height is None: - # self.driver.maximize_window() - # - # elif width == 0 or height == 0: - # self.driver.minimize_window() - # - # else: - # if width < 0 or height < 0: - # raise ValueError('x 和 y参数必须大于0。') - # - # new_x = width or self.driver.get_window_size()['width'] - # new_y = height or self.driver.get_window_size()['height'] - # self.driver.set_window_size(new_x, new_y) - def clear_cache(self, session_storage: bool = True, local_storage: bool = True, @@ -666,6 +667,61 @@ class Timeout(object): return self.page.timeout +class WindowSizeSetter(object): + """用于设置窗口大小的类""" + + def __init__(self, page: ChromePage): + self.driver = page.driver + self.window_id = self._get_info()['windowId'] + + def _get_info(self): + return self.driver.Browser.getWindowBounds() + + def _perform(self, bounds: dict): + self.driver.Browser.setWindowBounds(windowId=self.window_id, bounds=bounds) + + def maximized(self) -> None: + """最大化""" + self._perform({'windowState': 'maximized'}) + + def minimized(self) -> None: + """最小化""" + self._perform({'windowState': 'minimized'}) + + def fullscreen(self) -> None: + """全屏""" + self._perform({'windowState': 'fullscreen'}) + + def normal(self) -> None: + """常规""" + self._perform({'windowState': 'normal'}) + + def new_size(self, width: int = None, height: int = None) -> None: + """设置窗口大小 \n + :param width: 窗口宽度 + :param height: 窗口高度 + :return: None + """ + if width or height: + info = self._get_info()['bounds'] + width = width or info['width'] + height = height or info['height'] + self._perform({'width': width, 'height': height}) + + def to_location(self, x: int = None, y: int = None) -> None: + """设置在屏幕中的位置,相对左上角坐标 \n + :param x: 距离顶部距离 + :param y: 距离左边距离 + :return: None + """ + if x or y: + self.normal() + info = self._get_info()['bounds'] + x = x or info['left'] + y = y or info['top'] + self._perform({'left': x, 'top': y}) + + def _get_tabs(handles: list, num_or_handles: Union[int, str, list, tuple, set]) -> set: """返回指定标签页handle组成的set \n :param handles: handles列表 @@ -681,6 +737,9 @@ def _get_tabs(handles: list, num_or_handles: Union[int, str, list, tuple, set]) def _show_or_hide_browser(page: ChromePage, hide: bool = True) -> None: + if not page.address.startswith(('localhost', '127.0.0.1')): + return + if system().lower() != 'windows': raise OSError('该方法只能在Windows系统使用。') @@ -690,7 +749,7 @@ def _show_or_hide_browser(page: ChromePage, hide: bool = True) -> None: except ImportError: raise ImportError('请先安装:pip install pypiwin32') - pid = _get_browser_progress_id(page.process, page.address) + pid = page.process_id or _get_browser_progress_id(page.process, page.address) if not pid: return None hds = _get_chrome_hwnds_from_pid(pid, page.title) @@ -704,15 +763,8 @@ def _get_browser_progress_id(progress, address: str) -> Union[str, None]: if progress: return progress.pid - address = address.split(':') - if len(address) != 2: - return None - - ip, port = address - if ip not in ('127.0.0.1', 'localhost') or not port.isdigit(): - return None - from os import popen + port = address.split(':')[-1] txt = '' progresses = popen(f'netstat -nao | findstr :{port}').read().split('\n') for progress in progresses: