diff --git a/DrissionPage/action_chains.py b/DrissionPage/action_chains.py index 6f320e2..1d938b3 100644 --- a/DrissionPage/action_chains.py +++ b/DrissionPage/action_chains.py @@ -1,90 +1,194 @@ # -*- coding:utf-8 -*- -# from chrome_element import ChromeElement +from time import sleep +from typing import Union, Tuple + +from .common import _location_in_viewport +from .base import DrissionElement +from .keys import _modifierBit, _keyDescriptionForString class ActionChains: - """ - ActionChains are a way to automate low level interactions such as - mouse movements, mouse button actions, key press, and context menu interactions. - This is useful for doing more complex actions like hover over and drag and drop. - - Generate user actions. - When you call methods for actions on the ActionChains object, - the actions are stored in a queue in the ActionChains object. - When you call perform(), the events are fired in the order they - are queued up. - - ActionChains can be used in a chain pattern:: - - menu = driver.find_element(By.CSS_SELECTOR, ".nav") - hidden_submenu = driver.find_element(By.CSS_SELECTOR, ".nav #submenu1") - - ActionChains(driver).move_to_element(menu).click(hidden_submenu).perform() - - Or actions can be queued up one by one, then performed.:: - - menu = driver.find_element(By.CSS_SELECTOR, ".nav") - hidden_submenu = driver.find_element(By.CSS_SELECTOR, ".nav #submenu1") - - actions = ActionChains(driver) - actions.move_to_element(menu) - actions.click(hidden_submenu) - actions.perform() - - Either way, the actions are performed in the order they are called, one after - another. - """ + """用于实现动作链的类""" def __init__(self, page): + """初始化 \n + :param page: ChromePage对象 """ - Creates a new ActionChains. - - :Args: - - driver: The WebDriver instance which performs user actions. - - duration: override the default 250 msecs of DEFAULT_MOVE_DURATION in PointerInput - """ + self.page = page self._dr = page.driver - self.curr_x = 0 + self.modifier = 0 # 修饰符,Alt=1, Ctrl=2, Meta/Command=4, Shift=8 + self.curr_x = 0 # 视口坐标 self.curr_y = 0 - def move_to_element(self, to_element): - cl = to_element.client_location - size = to_element.size - x = cl['x'] + size['width'] // 2 - y = cl['y'] + size['height'] // 2 - self._dr.Input.dispatchMouseEvent(type='mouseMoved', x=x, y=y) - self.curr_x = x - self.curr_y = y + def move_to(self, ele_or_loc: Union[DrissionElement, Tuple[int, int]], + offset_x: int = 0, offset_y: int = 0) -> 'ActionChains': + """鼠标移动到元素中点,或页面上的某个绝对坐标。可设置偏移量 \n + 当带偏移量时,偏移量相对于元素左上角坐标 + :param ele_or_loc: 元素对象或绝对坐标,坐标为tuple(int, int)形式 + :param offset_x: 偏移量x + :param offset_y: 偏移量y + :return: self + """ + if isinstance(ele_or_loc, (tuple, list)): + lx = ele_or_loc[0] + offset_x + ly = ele_or_loc[1] + offset_y + elif isinstance(ele_or_loc, DrissionElement): + ele_loc = ele_or_loc.location if offset_x or offset_y else ele_or_loc.midpoint + lx = ele_loc['x'] + offset_x + ly = ele_loc['y'] + offset_y + + if not _location_in_viewport(self.page, lx, ly): + self.page.scroll.to_location(lx, ly) + + cx, cy = _location_to_client(self.page, lx, ly) + self._dr.Input.dispatchMouseEvent(type='mouseMoved', x=cx, y=cy, modifiers=self.modifier) + self.curr_x = cx + self.curr_y = cy return self - def move_to_element_with_offset(self, to_element, offset_x=0, offset_y=0): - cl = to_element.client_location - size = to_element.size - x = int(offset_x) + cl['x'] + size['width'] // 2 - y = int(offset_y) + cl['y'] + size['height'] // 2 - self._dr.Input.dispatchMouseEvent(type='mouseMoved', x=x, y=y) - self.curr_x = x - self.curr_y = y + def move(self, offset_x: int = 0, offset_y: int = 0) -> 'ActionChains': + """鼠标相对当前位置移动若干位置 \n + :param offset_x: 偏移量x + :param offset_y: 偏移量y + :return: self + """ + self.curr_x += offset_x + self.curr_y += offset_y + self._dr.Input.dispatchMouseEvent(type='mouseMoved', x=self.curr_x, y=self.curr_y, modifiers=self.modifier) return self - def click_and_hold(self, on_element=None): - if on_element: - self.move_to_element(on_element) - self._dr.Input.dispatchMouseEvent(type='mousePressed', button='left', clickCount=1, - x=self.curr_x, y=self.curr_y) - # self.key_down() - + def hold(self, on_ele=None) -> 'ActionChains': + """点击并按住当前坐标或指定元素 \n + :param on_ele: ChromeElement对象 + :return: self + """ + if on_ele: + self.move_to(on_ele) + self._dr.Input.dispatchMouseEvent(type='mousePressed', button='left', + x=self.curr_x, y=self.curr_y, modifiers=self.modifier) return self - def release(self, on_element=None): - if on_element: - self.move_to_element(on_element) + def click(self, on_ele=None) -> 'ActionChains': + """点击鼠标左键,可先移动到元素上 \n + :param on_ele: ChromeElement元素 + :return: self + """ + if on_ele: + self.move_to(on_ele) + self._dr.Input.dispatchMouseEvent(type='mousePressed', button='left', + x=self.curr_x, y=self.curr_y, modifiers=self.modifier) self._dr.Input.dispatchMouseEvent(type='mouseReleased', button='left', - x=self.curr_x, y=self.curr_y) - # self.key_down() + x=self.curr_x, y=self.curr_y, modifiers=self.modifier) return self - def key_down(self): - data = {'type': 'rawKeyDown', 'modifiers': 0, 'windowsVirtualKeyCode': 19, 'code': 'Pause', 'key': 'Pause', - 'text': '', 'autoRepeat': False, 'unmodifiedText': '', 'location': 0, 'isKeypad': False} - self._dr.call_method('Input.dispatchKeyEvent', **data) \ No newline at end of file + def r_click(self, on_ele=None) -> 'ActionChains': + """点击鼠标右键,可先移动到元素上 \n + :param on_ele: ChromeElement元素 + :return: self + """ + if on_ele: + self.move_to(on_ele) + self._dr.Input.dispatchMouseEvent(type='mousePressed', button='right', + x=self.curr_x, y=self.curr_y, modifiers=self.modifier) + self._dr.Input.dispatchMouseEvent(type='mouseReleased', button='right', + x=self.curr_x, y=self.curr_y, modifiers=self.modifier) + return self + + def release(self, on_ele=None) -> 'ActionChains': + """释放鼠标左键,可先移动到元素再释放 \n + :param on_ele: ChromeElement对象 + :return: self + """ + if on_ele: + self.move_to(on_ele) + self._dr.Input.dispatchMouseEvent(type='mouseReleased', button='left', + x=self.curr_x, y=self.curr_y, modifiers=self.modifier) + return self + + def scroll(self, delta_x: int = 0, delta_y: int = 0, on_ele=None) -> 'ActionChains': + """滚动鼠标滚轮,可先移动到元素上 \n + :param delta_x: 滚轮变化值x + :param delta_y: 滚轮变化值y + :param on_ele: ChromeElement元素 + :return: self + """ + if on_ele: + self.move_to(on_ele) + self._dr.Input.dispatchMouseEvent(type='mouseWheel', x=self.curr_x, y=self.curr_y, + deltaX=delta_x, deltaY=delta_y, modifiers=self.modifier) + return self + + def up(self, pixel: int) -> 'ActionChains': + """鼠标向上移动若干像素""" + return self.move(0, -pixel) + + def down(self, pixel: int) -> 'ActionChains': + """鼠标向下移动若干像素""" + return self.move(0, pixel) + + def left(self, pixel: int) -> 'ActionChains': + """鼠标向左移动若干像素""" + return self.move(-pixel, 0) + + def right(self, pixel: int) -> 'ActionChains': + """鼠标向右移动若干像素""" + return self.move(pixel, 0) + + def key_down(self, key) -> 'ActionChains': + """按下键盘上的按键 \n + :param key: 按键,特殊字符见Keys + :return: self + """ + if key in ('\ue009', '\ue008', '\ue00a', '\ue03d'): # 如果上修饰符,添加到变量 + self.modifier |= _modifierBit.get(key, 0) + return self + + data = self._get_key_data(key, 'keyDown') + self.page.run_cdp('Input.dispatchKeyEvent', **data) + return self + + def key_up(self, key) -> 'ActionChains': + """提起键盘上的按键 \n + :param key: 按键,特殊字符见Keys + :return: self + """ + if key in ('\ue009', '\ue008', '\ue00a', '\ue03d'): # 如果上修饰符,添加到变量 + self.modifier ^= _modifierBit.get(key, 0) + return self + + data = self._get_key_data(key, 'keyUp') + self.page.run_cdp('Input.dispatchKeyEvent', **data) + return self + + def wait(self, second: float) -> 'ActionChains': + """等待若干秒""" + sleep(second) + return self + + def _get_key_data(self, key, action: str) -> dict: + """获取用于发送的按键信息 \n + :param key: 按键 + :param action: 'keyDown' 或 'keyUp' + :return: 按键信息 + """ + description = _keyDescriptionForString(self.modifier, key) + text = description['text'] + if action != 'keyUp': + action = 'keyDown' if text else 'rawKeyDown' + return {'type': action, + 'modifiers': self.modifier, + 'windowsVirtualKeyCode': description['keyCode'], + 'code': description['code'], + 'key': description['key'], + 'text': text, + 'autoRepeat': False, + 'unmodifiedText': text, + 'location': description['location'], + 'isKeypad': description['location'] == 3} + + +def _location_to_client(page, lx: int, ly: int) -> tuple: + """绝对坐标转换为视口坐标""" + scrool_x = page.run_script('return document.documentElement.scrollLeft;') + scrool_y = page.run_script('return document.documentElement.scrollTop;') + return lx + scrool_x, ly + scrool_y diff --git a/DrissionPage/chrome_element.py b/DrissionPage/chrome_element.py index 4b721ba..62a0822 100644 --- a/DrissionPage/chrome_element.py +++ b/DrissionPage/chrome_element.py @@ -14,11 +14,18 @@ from time import perf_counter, sleep from .keys import _keys_to_typing, _keyDescriptionForString, _keyDefinitions from .session_element import make_session_ele, SessionElement from .base import DrissionElement, BaseElement -from .common import make_absolute_link, get_loc, get_ele_txt, format_html, is_js_func +from .common import make_absolute_link, get_loc, get_ele_txt, format_html, is_js_func, _location_in_viewport class ChromeElement(DrissionElement): + """ChromePage页面对象中的元素对象""" + def __init__(self, page, node_id: str = None, obj_id: str = None): + """初始化,node_id和obj_id必须至少传入一个 \n + :param page: 元素所在ChromePage页面对象 + :param node_id: cdp中的node id + :param obj_id: js中的object id + """ super().__init__(page) self._select = None self._scroll = None @@ -34,9 +41,8 @@ class ChromeElement(DrissionElement): self._obj_id = obj_id def __repr__(self) -> str: - # attrs = [f"{attr}='{self.attrs[attr]}'" for attr in self.attrs] - # return f'' - return f'' + attrs = [f"{attr}='{self.attrs[attr]}'" for attr in self.attrs] + return f'' def __call__(self, loc_or_str: Union[Tuple[str, str], str], @@ -77,46 +83,41 @@ class ChromeElement(DrissionElement): @property def attrs(self) -> dict: + """返回元素所有attribute属性""" attrs = self.page.driver.DOM.getAttributes(nodeId=self._node_id)['attributes'] attrs_len = len(attrs) return {attrs[i]: attrs[i + 1] for i in range(0, attrs_len, 2)} @property def text(self) -> str: - """返回元素内所有文本""" + """返回元素内所有文本,文本已格式化""" return get_ele_txt(make_session_ele(self.html)) @property - def raw_text(self): + def raw_text(self) -> str: """返回未格式化处理的元素内文本""" return self.prop('innerText') # -----------------driver独有属性------------------- @property def obj_id(self) -> str: + """返回js中的object id""" return self._obj_id @property def node_id(self) -> str: + """返回cdp中的node id""" return self._node_id @property def size(self) -> dict: """返回元素宽和高""" model = self.page.driver.DOM.getBoxModel(nodeId=self._node_id)['model'] - return {"height": model['height'], "width": model['width']} - - @property - def client_location(self) -> dict: - """返回元素左上角坐标""" - js = 'return this.getBoundingClientRect().left.toString()+" "+this.getBoundingClientRect().top.toString();' - xy = self.run_script(js) - x, y = xy.split(' ') - return {'x': int(x.split('.')[0]), 'y': int(y.split('.')[0])} + return {'height': model['height'], 'width': model['width']} @property def location(self) -> dict: - """返回元素左上角坐标""" + """返回元素左上角的绝对坐标""" js = '''function(){ function getElementPagePosition(element){ var actualLeft = element.offsetLeft; @@ -139,13 +140,36 @@ class ChromeElement(DrissionElement): return {'x': int(x.split('.')[0]), 'y': int(y.split('.')[0])} @property - def shadow_root(self): + def client_location(self) -> dict: + """返回元素左上角在视口中的坐标""" + js = 'return this.getBoundingClientRect().left.toString()+" "+this.getBoundingClientRect().top.toString();' + xy = self.run_script(js) + x, y = xy.split(' ') + return {'x': int(x.split('.')[0]), 'y': int(y.split('.')[0])} + + @property + def midpoint(self) -> dict: + """返回元素中间点的绝对坐标""" + loc = self.location + size = self.size + lx = loc['x'] + size['width'] // 2 + ly = loc['y'] + size['height'] // 2 + return {'x': lx, 'y': ly} + + @property + def client_midpoint(self) -> dict: + """返回元素中间点在视口中的坐标""" + loc = self.client_location + size = self.size + cx = loc['x'] + size['width'] // 2 + cy = loc['y'] + size['height'] // 2 + return {'x': cx, 'y': cy} + + @property + def shadow_root(self) -> Union[None, 'ChromeShadowRootElement']: """返回当前元素的shadow_root元素对象""" shadow = self.run_script('return this.shadowRoot;') return shadow - # if shadow: - # from .shadow_root_element import ShadowRootElement - # return ShadowRootElement(shadow, self) @property def sr(self): @@ -204,7 +228,7 @@ class ChromeElement(DrissionElement): index: int = 1, filter_loc: Union[tuple, str] = '', timeout: float = None) -> Union['ChromeElement', str, None]: - """返回当前元素前面的一个元素,可指定筛选条件和第几个。查找范围不限兄弟元,而是整个DOM文档 \n + """返回当前元素前面的一个元素,可指定筛选条件和第几个。查找范围不限兄弟元素,而是整个DOM文档 \n :param index: 前面第几个查询结果元素 :param filter_loc: 用于筛选元素的查询语法 :param timeout: 查找元素的超时时间 @@ -216,7 +240,7 @@ class ChromeElement(DrissionElement): index: int = 1, filter_loc: Union[tuple, str] = '', timeout: float = None) -> Union['ChromeElement', str, None]: - """返回当前元素后面的一个元素,可指定筛选条件和第几个。查找范围不限兄弟元,而是整个DOM文档 \n + """返回当前元素后面的一个元素,可指定筛选条件和第几个。查找范围不限兄弟元素,而是整个DOM文档 \n :param index: 后面第几个查询结果元素 :param filter_loc: 用于筛选元素的查询语法 :param timeout: 查找元素的超时时间 @@ -247,7 +271,7 @@ class ChromeElement(DrissionElement): def befores(self, filter_loc: Union[tuple, str] = '', timeout: float = None) -> List[Union['ChromeElement', str]]: - """返回当前元素后面符合条件的全部兄弟元素或节点组成的列表,可用查询语法筛选。查找范围不限兄弟元,而是整个DOM文档 \n + """返回当前元素后面符合条件的全部兄弟元素或节点组成的列表,可用查询语法筛选。查找范围不限兄弟元素,而是整个DOM文档 \n :param filter_loc: 用于筛选元素的查询语法 :param timeout: 查找元素的超时时间 :return: 本元素前面的元素或节点组成的列表 @@ -257,10 +281,10 @@ class ChromeElement(DrissionElement): def wait_ele(self, loc_or_ele: Union[str, tuple, 'ChromeElement'], timeout: float = None) -> 'ChromeElementWaiter': - """等待子元素从dom删除、显示、隐藏 \n + """返回用于等待子元素到达某个状态的等待器对象 \n :param loc_or_ele: 可以是元素、查询字符串、loc元组 :param timeout: 等待超时时间 - :return: 等待是否成功 + :return: 用于等待的ElementWaiter对象 """ return ChromeElementWaiter(self, loc_or_ele, timeout) @@ -296,32 +320,24 @@ class ChromeElement(DrissionElement): def is_alive(self) -> bool: """返回元素是否仍在DOM中""" try: - self.tag + self.attrs return True except Exception: return False @property - def is_in_view(self) -> bool: - """返回元素是否出现在视口中,已元素中点为判断""" - js = """function(){ - const rect = this.getBoundingClientRect(); - x = rect.left+(rect.right-rect.left)/2; - y = rect.top+(rect.bottom-rect.top)/2; - const vWidth = window.innerWidth || document.documentElement.clientWidth; - const vHeight = window.innerHeight || document.documentElement.clientHeight; - if (x< 0 || y < 0 || x > vWidth || y > vHeight){return false;} - return true;}""" - return self.run_script(js) + def is_in_viewport(self) -> bool: + """返回元素是否出现在视口中,以元素中点为判断""" + loc = self.midpoint + return _location_in_viewport(self.page, loc['x'], loc['y']) def attr(self, attr: str) -> Union[str, None]: """返回attribute属性值 \n :param attr: 属性名 :return: 属性值文本,没有该属性返回None """ - # 获取href属性时返回绝对url attrs = self.attrs - if attr == 'href': + if attr == 'href': # 获取href属性时返回绝对url link = attrs.get('href', None) if not link or link.lower().startswith(('javascript:', 'mailto:')): return link @@ -411,7 +427,7 @@ class ChromeElement(DrissionElement): :param loc_or_str: 元素的定位信息,可以是loc元组,或查询字符串 :param timeout: 查找元素超时时间 :param single: True则返回第一个,False则返回全部 - :return: ChromeElement对象 + :return: ChromeElement对象或文本、属性或其组成的列表 """ return make_chrome_ele(self, loc_or_str, single, timeout) @@ -432,7 +448,7 @@ class ChromeElement(DrissionElement): """设置元素property属性 \n :param prop: 属性名 :param value: 属性值 - :return: 是否设置成功 + :return: None """ value = value.replace("'", "\\'") self.run_script(f'this.{prop}="{value}";') @@ -441,14 +457,14 @@ class ChromeElement(DrissionElement): """设置元素attribute属性 \n :param attr: 属性名 :param value: 属性值 - :return: 是否设置成功 + :return: None """ self.run_script(f'this.setAttribute(arguments[0], arguments[1]);', False, attr, str(value)) def remove_attr(self, attr: str) -> None: """删除元素attribute属性 \n :param attr: 属性名 - :return: 是否删除成功 + :return: None """ self.run_script(f'this.removeAttribute("{attr}");') @@ -472,11 +488,19 @@ class ChromeElement(DrissionElement): src = self.attr('src') if not src: return False + if self.tag == 'img': # 等待图片加载完成 + js = ('return this.complete && typeof this.naturalWidth != "undefined" ' + '&& this.naturalWidth > 0 && typeof this.naturalHeight != "undefined" ' + '&& this.naturalHeight > 0') + end_time = perf_counter() + self.page.timeout + while not self.run_script(js) and perf_counter() < end_time: + sleep(.1) + path = path or '.' node = self.page.driver.DOM.describeNode(nodeId=self._node_id)['node'] frame = node.get('frameId', None) - frame = frame or self.page.current_tab_handle + frame = frame or self.page.current_tab_id result = self.page.driver.Page.getResourceContent(frameId=frame, url=src) if result['base64Encoded']: from base64 import b64decode @@ -539,7 +563,7 @@ class ChromeElement(DrissionElement): vals = (str(vals),) modifier, vals = _keys_to_typing(vals) - if modifier != 0: # 包含组合键 + if modifier != 0: # 包含修饰符 for key in vals: _send_key(self, modifier, key) return @@ -551,12 +575,15 @@ class ChromeElement(DrissionElement): self.page.run_cdp('Input.insertText', text=vals) def _set_file_input(self, files: Union[str, list, tuple]) -> None: - """设置上传控件值""" + """设置上传控件值 + :param files: 文件路径列表或字符串,字符串时多个文件用回车分隔 + :return: None + """ if isinstance(files, str): files = files.split('\n') self.page.driver.DOM.setFileInputFiles(files=files, nodeId=self._node_id) - def clear(self, by_js: bool = True) -> None: + def clear(self, by_js: bool = False) -> None: """清空元素文本 \n :param by_js: 是否用js方式清空 :return: None @@ -567,9 +594,9 @@ class ChromeElement(DrissionElement): else: self.input(('\ue009', 'a', '\ue017'), clear=False) - def click(self, by_js: bool = None, timeout: float = None) -> bool: + def click(self, by_js: bool = None, timeout: float = .2) -> bool: """点击元素 \n - 尝试点击直到超时,若都失败就改用js点击 \n + 如果遇到遮挡,会重新尝试点击直到超时,若都失败就改用js点击 \n :param by_js: 是否用js点击,为True时直接用js点击,为False时重试失败也不会改用js :param timeout: 尝试点击的超时时间,不指定则使用父页面的超时时间 :return: 是否点击成功 @@ -586,19 +613,18 @@ class ChromeElement(DrissionElement): if not by_js: self.page.scroll_to_see(self) if self.is_in_view: - xy = self.client_location - location = self.location - size = self.size - client_x = xy['x'] + size['width'] // 2 - client_y = xy['y'] + size['height'] // 2 - loc_x = location['x'] + size['width'] // 2 - loc_y = location['y'] + size['height'] // 2 + midpoint = self.midpoint + client_midpoint = self.client_midpoint + client_x = client_midpoint['x'] + client_y = client_midpoint['y'] + loc_x = midpoint['x'] + loc_y = midpoint['y'] timeout = timeout if timeout is not None else self.page.timeout end_time = perf_counter() + timeout click = do_it(client_x, client_y, loc_x, loc_y) while not click and perf_counter() < end_time: - click = do_it(client_x, client_y, location['x'], location['y']) + click = do_it(client_x, client_y, loc_x, loc_y) if click: return True @@ -611,40 +637,43 @@ class ChromeElement(DrissionElement): return False def click_at(self, - x: Union[int, str] = None, - y: Union[int, str] = None, + offset_x: Union[int, str] = None, + offset_y: Union[int, str] = None, button: str = 'left') -> None: """带偏移量点击本元素,相对于左上角坐标。不传入x或y值时点击元素中点 \n - :param x: 相对元素左上角坐标的x轴偏移量 - :param y: 相对元素左上角坐标的y轴偏移量 + :param offset_x: 相对元素左上角坐标的x轴偏移量 + :param offset_y: 相对元素左上角坐标的y轴偏移量 :param button: 左键还是右键 :return: None """ - x, y = _offset_scroll(self, x, y) + x, y = _offset_scroll(self, offset_x, offset_y) self._click(x, y, button) def r_click(self) -> None: """右键单击""" self.page.scroll_to_see(self) - xy = self.client_location - size = self.size - cx = xy['x'] + size['width'] // 2 - cy = xy['y'] + size['height'] // 2 - self._click(cx, cy, 'right') + xy = self.client_midpoint + self._click(xy['x'], xy['y'], 'right') - def r_click_at(self, x: Union[int, str], y: Union[int, str]) -> None: + def r_click_at(self, offset_x: Union[int, str], offset_y: Union[int, str]) -> None: """带偏移量右键单击本元素,相对于左上角坐标。不传入x或y值时点击元素中点 \n - :param x: 相对元素左上角坐标的x轴偏移量 - :param y: 相对元素左上角坐标的y轴偏移量 + :param offset_x: 相对元素左上角坐标的x轴偏移量 + :param offset_y: 相对元素左上角坐标的y轴偏移量 :return: None """ - self.click_at(x, y, 'right') + self.click_at(offset_x, offset_y, 'right') - def _click(self, x: int, y: int, button: str = 'left') -> None: - """实施点击""" - self.page.driver.Input.dispatchMouseEvent(type='mousePressed', x=x, y=y, button=button, clickCount=1) + def _click(self, client_x: int, client_y: int, button: str = 'left') -> None: + """实施点击 \n + :param client_x: 视口中的x坐标 + :param client_y: 视口中的y坐标 + :param button: 'left'或'right' + :return: None + """ + self.page.driver.Input.dispatchMouseEvent(type='mousePressed', x=client_x, y=client_y, button=button, + clickCount=1) sleep(.1) - self.page.driver.Input.dispatchMouseEvent(type='mouseReleased', x=x, y=y, button=button) + self.page.driver.Input.dispatchMouseEvent(type='mouseReleased', x=client_x, y=client_y, button=button) def hover(self, offset_x: int = None, offset_y: int = None) -> None: """鼠标悬停,可接受偏移量,偏移量相对于元素左上角坐标。不传入x或y值时悬停在元素中点 \n @@ -655,10 +684,76 @@ class ChromeElement(DrissionElement): x, y = _offset_scroll(self, offset_x, offset_y) self.page.driver.Input.dispatchMouseEvent(type='mouseMoved', x=x, y=y) + def drag(self, offset_x: int = 0, offset_y: int = 0, speed: int = 40, shake: bool = True) -> None: + """拖拽当前元素到相对位置 \n + :param offset_x: x变化值 + :param offset_y: y变化值 + :param speed: 拖动的速度,传入0即瞬间到达 + :param shake: 是否随机抖动 + :return: None + """ + curr_xy = self.midpoint + offset_x += curr_xy['x'] + offset_y += curr_xy['y'] + self.drag_to((offset_x, offset_y), speed, shake) + + def drag_to(self, + ele_or_loc: Union[tuple, 'ChromeElement'], + speed: int = 40, + shake: bool = True) -> None: + """拖拽当前元素,目标为另一个元素或坐标元组 \n + :param ele_or_loc: 另一个元素或坐标元组,坐标为元素中点的坐标 + :param speed: 拖动的速度,传入0即瞬间到达 + :param shake: 是否随机抖动 + :return: None + """ + # x, y:目标点坐标 + if isinstance(ele_or_loc, ChromeElement): + midpoint = ele_or_loc.midpoint + target_x = midpoint['x'] + target_y = midpoint['y'] + elif isinstance(ele_or_loc, (list, tuple)): + target_x, target_y = ele_or_loc + else: + raise TypeError('需要ChromeElement对象或坐标。') + + curr_xy = self.midpoint + current_x = curr_xy['x'] + current_y = curr_xy['y'] + width = target_x - current_x + height = target_y - current_y + num = 0 if not speed else int(((abs(width) ** 2 + abs(height) ** 2) ** .5) // speed) + + # 将要经过的点存入列表 + points = [(int(current_x + i * (width / num)), int(current_y + i * (height / num))) for i in range(1, num)] + points.append((target_x, target_y)) + + from .action_chains import ActionChains + from random import randint + actions = ActionChains(self.page) + actions.hold(self) + + # 逐个访问要经过的点 + for x, y in points: + if shake: + x += randint(-3, 4) + y += randint(-3, 4) + actions.move(x - current_x, y - current_y) + current_x, current_y = x, y + actions.release() + def _get_obj_id(self, node_id) -> str: + """根据传入node id获取js中的object id \n + :param node_id: cdp中的node id + :return: js中的object id + """ return self.page.driver.DOM.resolveNode(nodeId=node_id)['object']['objectId'] def _get_node_id(self, obj_id) -> str: + """根据传入object id获取cdp中的node id \n + :param obj_id: js中的object id + :return: cdp中的node id + """ return self.page.driver.DOM.requestNode(objectId=obj_id)['nodeId'] def _get_ele_path(self, mode) -> str: @@ -718,7 +813,7 @@ class ChromeShadowRootElement(BaseElement): def __call__(self, loc_or_str: Union[Tuple[str, str], str], - timeout: float = None) -> Union[ChromeElement, str, None]: + timeout: float = None) -> Union[ChromeElement, None]: """在内部查找元素 \n 例:ele2 = ele1('@id=ele_id') \n :param loc_or_str: 元素的定位信息,可以是loc元组,或查询字符串 @@ -742,23 +837,23 @@ class ChromeShadowRootElement(BaseElement): return False @property - def node_id(self): + def node_id(self) -> str: + """返回元素cdp中的node id""" return self._node_id @property - def obj_id(self): + def obj_id(self) -> str: + """返回元素js中的obect id""" return self._obj_id - def _get_node_id(self, obj_id) -> str: - return self.page.driver.DOM.requestNode(objectId=obj_id)['nodeId'] - @property def tag(self) -> str: - """元素标签名""" + """返回元素标签名""" return 'shadow-root' @property def html(self) -> str: + """返回outerHTML文本""" return f'{self.inner_html}' @property @@ -876,21 +971,21 @@ class ChromeShadowRootElement(BaseElement): def ele(self, loc_or_str: Union[Tuple[str, str], str], - timeout: float = None) -> Union[ChromeElement, str, None]: - """返回当前元素下级符合条件的第一个元素,默认返回 \n + timeout: float = None) -> Union[ChromeElement, None]: + """返回当前元素下级符合条件的第一个元素 \n :param loc_or_str: 元素的定位信息,可以是loc元组,或查询字符串 :param timeout: 查找元素超时时间,默认与元素所在页面等待时间一致 - :return: ChromeElement对象或属性、文本 + :return: ChromeElement对象 """ return self._ele(loc_or_str, timeout) def eles(self, loc_or_str: Union[Tuple[str, str], str], - timeout: float = None) -> List[Union[ChromeElement, str]]: + timeout: float = None) -> List[ChromeElement]: """返回当前元素下级所有符合条件的子元素 \n :param loc_or_str: 元素的定位信息,可以是loc元组,或查询字符串 :param timeout: 查找元素超时时间,默认与元素所在页面等待时间一致 - :return: ChromeElement对象或属性、文本组成的列表 + :return: ChromeElement对象组成的列表 """ return self._ele(loc_or_str, timeout=timeout, single=False) @@ -901,22 +996,22 @@ class ChromeShadowRootElement(BaseElement): """ return make_session_ele(self, loc_or_ele) - def s_eles(self, loc_or_ele) -> List[Union[SessionElement, str]]: + def s_eles(self, loc_or_ele) -> List[SessionElement]: """查找所有符合条件的元素以SessionElement列表形式返回,处理复杂页面时效率很高 \n :param loc_or_ele: 元素的定位信息,可以是loc元组,或查询字符串 - :return: SessionElement对象或属性、文本 + :return: SessionElement对象 """ return make_session_ele(self, loc_or_ele, single=False) def _ele(self, loc_or_str: Union[Tuple[str, str], str], timeout: float = None, - single: bool = True) -> Union['ChromeElement', str, None, List[Union['ChromeElement', str]]]: - """返回当前元素下级符合条件的子元素、属性或节点文本,默认返回第一个 \n + single: bool = True) -> Union['ChromeElement', None, List[ChromeElement]]: + """返回当前元素下级符合条件的子元素、属性或节点文本,默认返回第一个 \n :param loc_or_str: 元素的定位信息,可以是loc元组,或查询字符串 :param timeout: 查找元素超时时间 :param single: True则返回第一个,False则返回全部 - :return: ChromeElement对象 + :return: ChromeElement对象或其组成的列表 """ loc = get_loc(loc_or_str) if loc[0] == 'css selector' and str(loc[1]).startswith(':root'): @@ -944,6 +1039,10 @@ class ChromeShadowRootElement(BaseElement): results.append(ChromeElement(self.page, node_id)) return results + def _get_node_id(self, obj_id) -> str: + """返回元素node id""" + return self.page.driver.DOM.requestNode(objectId=obj_id)['nodeId'] + def make_chrome_ele(ele: ChromeElement, loc: Union[str, Tuple[str, str]], @@ -979,11 +1078,20 @@ def make_chrome_ele(ele: ChromeElement, return _find_by_css(ele, loc[1], single, timeout) -def _find_by_xpath(ele: ChromeElement, xpath: str, single: bool, timeout: float): +def _find_by_xpath(ele: ChromeElement, + xpath: str, + single: bool, + timeout: float) -> Union[ChromeElement, List[ChromeElement]]: + """执行用xpath在元素中查找元素 + :param ele: 在此元素中查找 + :param xpath: 查找语句 + :param single: 是否只返回第一个结果 + :param timeout: 超时时间 + :return: ChromeElement或其组成的列表 + """ type_txt = '9' if single else '7' node_txt = 'this.contentDocument' if ele.tag in ('iframe', 'frame') else 'this' - js = _make_js(xpath, type_txt, node_txt) - # print(js) + js = _make_js_for_find_ele_by_xpath(xpath, type_txt, node_txt) r = ele.page.run_cdp('Runtime.callFunctionOn', functionDeclaration=js, objectId=ele.obj_id, returnByValue=False, awaitPromise=True, userGesture=True) @@ -992,7 +1100,7 @@ def _find_by_xpath(ele: ChromeElement, xpath: str, single: bool, timeout: float) if 'exceptionDetails' in r: if 'The result is not a node set' in r['result']['description']: - js = _make_js(xpath, '1', node_txt) + js = _make_js_for_find_ele_by_xpath(xpath, '1', node_txt) r = ele.page.run_cdp('Runtime.callFunctionOn', functionDeclaration=js, objectId=ele.obj_id, returnByValue=False, awaitPromise=True, userGesture=True) @@ -1023,7 +1131,17 @@ def _find_by_xpath(ele: ChromeElement, xpath: str, single: bool, timeout: float) for i in r[:-1]] -def _find_by_css(ele: ChromeElement, selector: str, single: bool, timeout: float): +def _find_by_css(ele: ChromeElement, + selector: str, + single: bool, + timeout: float) -> Union[ChromeElement, List[ChromeElement]]: + """执行用css selector在元素中查找元素 + :param ele: 在此元素中查找 + :param selector: 查找语句 + :param single: 是否只返回第一个结果 + :param timeout: 超时时间 + :return: ChromeElement或其组成的列表 + """ selector = selector.replace('"', r'\"') find_all = '' if single else 'All' node_txt = 'this.contentDocument' if ele.tag in ('iframe', 'frame', 'shadow-root') else 'this' @@ -1034,8 +1152,6 @@ def _find_by_css(ele: ChromeElement, selector: str, single: bool, timeout: float if 'exceptionDetails' in r: raise SyntaxError(f'查询语句错误:\n{r}') - print(js) - print(r) end_time = perf_counter() + timeout while (r['result']['subtype'] == 'null' or r['result']['description'] == 'NodeList(0)') and perf_counter() < end_time: @@ -1057,7 +1173,13 @@ def _find_by_css(ele: ChromeElement, selector: str, single: bool, timeout: float return [ChromeElement(ele.page, obj_id=i['value']['objectId']) for i in r] -def _make_js(xpath: str, type_txt: str, node_txt: str): +def _make_js_for_find_ele_by_xpath(xpath: str, type_txt: str, node_txt: str) -> str: + """生成用xpath在元素中查找元素的js文本 + :param xpath: xpath文本 + :param type_txt: 查找类型 + :param node_txt: 节点类型 + :return: js文本 + """ for_txt = '' # 获取第一个元素、节点或属性 @@ -1098,7 +1220,7 @@ def _run_script(page_or_ele, script: str, as_expr: bool = False, timeout: float :param script: js文本 :param as_expr: 是否作为表达式运行,为True时args无效 :param args: 参数,按顺序在js文本中对应argument[0]、argument[2]... - :return: + :return: js执行结果 """ if isinstance(page_or_ele, (ChromeElement, ChromeShadowRootElement)): page = page_or_ele.page @@ -1118,7 +1240,6 @@ def _run_script(page_or_ele, script: str, as_expr: bool = False, timeout: float args = args or () if not is_js_func(script): script = f'function(){{{script}}}' - # print(script) res = page.run_cdp('Runtime.callFunctionOn', functionDeclaration=script, objectId=obj_id, @@ -1131,7 +1252,6 @@ def _run_script(page_or_ele, script: str, as_expr: bool = False, timeout: float if exceptionDetails: raise RuntimeError(f'Evaluation failed: {exceptionDetails}') - # print(res) return _parse_js_result(page, page_or_ele, res.get('result')) @@ -1163,9 +1283,6 @@ def _parse_js_result(page, ele, result: dict): elif the_type == 'undefined': return None - # elif the_type in ('string', 'number', 'boolean'): - # return result['value'] - else: return result['value'] @@ -1184,34 +1301,9 @@ def _convert_argument(arg: Any) -> dict: if arg == -inf: return {'unserializableValue': '-Infinity'} - # objectHandle = arg if isinstance(arg, JSHandle) else None - # if objectHandle: - # if objectHandle._context != self: - # raise ElementHandleError('JSHandles can be evaluated only in the context they were created!') - # if objectHandle._disposed: - # raise ElementHandleError('JSHandle is disposed!') - # if objectHandle._remoteObject.get('unserializableValue'): - # return {'unserializableValue': objectHandle._remoteObject.get('unserializableValue')} # noqa: E501 - # if not objectHandle._remoteObject.get('objectId'): - # return {'value': objectHandle._remoteObject.get('value')} - # return {'objectId': objectHandle._remoteObject.get('objectId')} - # return {'value': arg} - -def _offset_scroll(ele: ChromeElement, x: int, y: int): - location = ele.location - size = ele.size - lx = location['x'] + int(x) if x is not None else location['x'] + size['width'] // 2 - ly = location['y'] + int(y) if y is not None else location['y'] + size['height'] // 2 - - ele.page.scroll.to_location(lx - 5, ly - 5) - cl = ele.client_location - x = cl['x'] + int(x) if x is not None else cl['x'] + size['width'] // 2 - y = cl['y'] + int(y) if y is not None else cl['y'] + size['height'] // 2 - return x, y - - -def _send_enter(ele: ChromeElement): +def _send_enter(ele: ChromeElement) -> None: + """发送回车""" # todo:windows系统回车是否不一样 data = {'type': 'keyDown', 'modifiers': 0, 'windowsVirtualKeyCode': 13, 'code': 'Enter', 'key': 'Enter', 'text': '\r', 'autoRepeat': False, 'unmodifiedText': '\r', 'location': 0, 'isKeypad': False} @@ -1222,6 +1314,7 @@ def _send_enter(ele: ChromeElement): def _send_key(ele: ChromeElement, modifier: int, key: str) -> None: + """发送一个字,在键盘中的字符触发按键,其它直接发送文本""" if key not in _keyDefinitions: ele.page.run_cdp('Input.insertText', text=key) @@ -1244,6 +1337,27 @@ def _send_key(ele: ChromeElement, modifier: int, key: str) -> None: ele.page.run_cdp('Input.dispatchKeyEvent', **data) +def _offset_scroll(ele, offset_x: int, offset_y: int) -> tuple: + """接收元素及偏移坐标,滚动到偏移坐标,返回该点在视口中的坐标 + :param ele: 元素对象 + :param offset_x: 偏移量x + :param offset_y: 偏移量y + :return: 视口中的坐标 + """ + location = ele.location + midpoint = ele.midpoint + lx = location['x'] + offset_x if offset_x else midpoint['x'] + ly = location['y'] + offset_y if offset_y else midpoint['y'] + + if not _location_in_viewport(ele.page, lx, ly): + ele.page.scroll.to_location(lx, ly) + cl = ele.client_location + cm = ele.client_midpoint + cx = cl['x'] + offset_x if offset_x else cm['x'] + cy = cl['y'] + offset_y if offset_y else cm['y'] + return cx, cy + + class ChromeScroll(object): """用于滚动的对象""" diff --git a/DrissionPage/chrome_page.py b/DrissionPage/chrome_page.py index de99237..159b642 100644 --- a/DrissionPage/chrome_page.py +++ b/DrissionPage/chrome_page.py @@ -20,14 +20,25 @@ from .chrome_element import ChromeElement, ChromeScroll, _run_script, ChromeElem class ChromePage(BasePage): + """用于管理浏览器的类""" def __init__(self, Tab_or_Options: Union[Tab, DriverOptions] = None, - tab_handle: str = None, + tab_id: str = None, timeout: float = 10): + """初始化 \n + :param Tab_or_Options: Tab对象或DriverOptions对象 + :param tab_id: 要控制的标签页id,不指定默认为激活的 + :param timeout: 超时时间 + """ super().__init__(timeout) - self._connect_debugger(Tab_or_Options, tab_handle) + self._connect_browser(Tab_or_Options, tab_id) - def _connect_debugger(self, Tab_or_Options: Union[Tab, DriverOptions] = None, tab_handle: str = None): + def _connect_browser(self, Tab_or_Options: Union[Tab, DriverOptions] = None, tab_id: str = None) -> None: + """连接浏览器 \n + :param Tab_or_Options: Tab对象或DriverOptions对象 + :param tab_id: 要控制的标签页id,不指定默认为激活的 + :return: None + """ self.timeouts = Timeout(self) self._page_load_strategy = 'normal' if isinstance(Tab_or_Options, Tab): @@ -43,9 +54,11 @@ class ChromePage(BasePage): self._page_load_strategy = self.options.page_load_strategy self.process = connect_chrome(self.options)[1] self.address = self.options.debugger_address - tab_handle = self.tab_handles[0] if not tab_handle else tab_handle - self._driver = Tab(id=tab_handle, type='page', - webSocketDebuggerUrl=f'ws://{self.options.debugger_address}/devtools/page/{tab_handle}') + if not tab_id: + json = loads(requests_get(f'http://{self.address}/json').text) + tab_id = [i['id'] for i in json if i['type'] == 'page'][0] + self._driver = Tab(id=tab_id, type='page', + webSocketDebuggerUrl=f'ws://{self.options.debugger_address}/devtools/page/{tab_id}') else: raise TypeError('只能接收Tab或DriverOptions类型参数。') @@ -61,17 +74,18 @@ class ChromePage(BasePage): self.driver.Page.javascriptDialogClosed = self._on_alert_close def __call__(self, loc_or_str: Union[Tuple[str, str], str, 'ChromeElement'], - timeout: float = None) -> Union['ChromeElement', str, None]: - """在内部查找元素 \n - 例:ele = page('@id=ele_id') \n + timeout: float = None) -> Union['ChromeElement', None]: + """在内部查找元素 \n + 例:ele = page('@id=ele_id') \n :param loc_or_str: 元素的定位信息,可以是loc元组,或查询字符串 :param timeout: 超时时间 - :return: DriverElement对象或属性、文本 + :return: ChromeElement对象 """ return self.ele(loc_or_str, timeout) @property def driver(self) -> Tab: + """返回用于控制浏览器的Tab对象""" return self._driver @property @@ -95,38 +109,40 @@ class ChromePage(BasePage): @property def tabs_count(self) -> int: """返回标签页数量""" - return len(self.tab_handles) + return len(self.tab_ids) @property - def tab_handles(self) -> list: + def tab_ids(self) -> list: """返回所有标签页id""" + self.driver json = loads(requests_get(f'http://{self.address}/json').text) return [i['id'] for i in json if i['type'] == 'page'] @property - def current_tab_handle(self) -> str: - """返回当前标签页handle""" + def current_tab_id(self) -> str: + """返回当前标签页id""" return self.driver.id @property def current_tab_index(self) -> int: """返回当前标签页序号""" - return self.tab_handles.index(self.current_tab_handle) + return self.tab_ids.index(self.current_tab_id) @property def ready_state(self) -> str: - """返回当前页面加载状态,""" + """返回当前页面加载状态,'loading' 'interactive' 'complete'""" return self.run_script('document.readyState;', as_expr=True) @property def size(self) -> dict: - """返回页面总长宽""" + """返回页面总长宽,{'height': int, 'width': int}""" w = self.run_script('document.body.scrollWidth;', as_expr=True) h = self.run_script('document.body.scrollHeight;', as_expr=True) return {'height': h, 'width': w} @property def active_ele(self) -> ChromeElement: + """返回当前焦点所在元素""" return self.run_script('return document.activeElement;') @property @@ -136,7 +152,7 @@ class ChromePage(BasePage): @property def process_id(self) -> Union[None, int]: - """获取浏览器进程id""" + """返回浏览器进程id""" try: return self.driver.SystemInfo.getProcessInfo()['id'] except Exception: @@ -144,25 +160,29 @@ class ChromePage(BasePage): @property def scroll(self) -> ChromeScroll: - """用于滚动滚动条的对象""" + """返回用于滚动滚动条的对象""" if not hasattr(self, '_scroll'): self._scroll = ChromeScroll(self) return self._scroll @property def set_window(self) -> 'WindowSizeSetter': + """返回用于设置窗口大小的对象""" if not hasattr(self, '_window_setter'): self._window_setter = WindowSizeSetter(self) return self._window_setter def set_page_load_strategy(self, value: str) -> None: - """设置页面加载策略,可选'normal', 'eager', 'none'""" + """设置页面加载策略 \n + :param value: 可选'normal', 'eager', 'none' + :return: None + """ if value not in ('normal', 'eager', 'none'): raise ValueError("只能选择'normal', 'eager', 'none'。") self._page_load_strategy = value def set_timeouts(self, implicit: float = None, page_load: float = None, script: float = None) -> None: - """设置超时时间,单位为秒,selenium4以上版本有效 \n + """设置超时时间,单位为秒 \n :param implicit: 查找元素超时时间 :param page_load: 页面加载超时时间 :param script: 脚本运行超时时间 @@ -220,13 +240,21 @@ class ChromePage(BasePage): return self._url_available def get_cookies(self, as_dict: bool = False) -> Union[list, dict]: + """获取cookies信息 \n + :param as_dict: 为True时返回由{name: value}键值对组成的dict + :return: cookies信息 + """ cookies = self.driver.Network.getCookies()['cookies'] if as_dict: return {cookie['name']: cookie['value'] for cookie in cookies} else: return cookies - def set_cookies(self, cookies: Union[RequestsCookieJar, list, tuple, str, dict]): + def set_cookies(self, cookies: Union[RequestsCookieJar, list, tuple, str, dict]) -> None: + """设置cookies值 \n + :param cookies: cookies信息 + :return: None + """ cookies = _cookies_to_tuple(cookies) result_cookies = [] for cookie in cookies: @@ -240,12 +268,22 @@ class ChromePage(BasePage): def ele(self, loc_or_ele: Union[Tuple[str, str], str, ChromeElement], - timeout: float = None) -> Union[ChromeElement, str, None]: + timeout: float = None) -> Union[ChromeElement, None]: + """获取第一个符合条件的元素对象 \n + :param loc_or_ele: 定位符或元素对象 + :param timeout: 查找超时时间 + :return: ChromeElement对象 + """ return self._ele(loc_or_ele, timeout=timeout) def eles(self, loc_or_ele: Union[Tuple[str, str], str, ChromeElement], - timeout: float = None) -> List[Union[ChromeElement, str]]: + timeout: float = None) -> List[ChromeElement]: + """获取所有符合条件的元素对象 \n + :param loc_or_ele: 定位符或元素对象 + :param timeout: 查找超时时间 + :return: ChromeElement对象组成的列表 + """ return self._ele(loc_or_ele, timeout=timeout, single=False) def s_ele(self, loc_or_ele: Union[Tuple[str, str], str, ChromeElement] = None) -> Union[SessionElement, str, None]: @@ -268,7 +306,13 @@ class ChromePage(BasePage): def _ele(self, loc_or_ele: Union[Tuple[str, str], str, ChromeElement], timeout: float = None, - single: bool = True) -> Union[ChromeElement, str, None, List[Union[ChromeElement, str]]]: + single: bool = True) -> Union[ChromeElement, None, List[ChromeElement]]: + """执行元素查找 + :param loc_or_ele: 定位符或元素对象 + :param timeout: 查找超时时间 + :param single: 是否只返回第一个 + :return: ChromeElement对象或元素对象组成的列表 + """ if isinstance(loc_or_ele, (str, tuple)): loc = get_loc(loc_or_ele)[1] elif isinstance(loc_or_ele, ChromeElement): @@ -299,7 +343,7 @@ class ChromePage(BasePage): def wait_ele(self, loc_or_ele: Union[str, tuple, ChromeElement], timeout: float = None) -> ChromeElementWaiter: - """等待元素从dom删除、显示、隐藏 \n + """返回用于等待元素到达某个状态的等待器对象 \n :param loc_or_ele: 可以是元素、查询字符串、loc元组 :param timeout: 等待超时时间 :return: 用于等待的ElementWaiter对象 @@ -311,7 +355,7 @@ class ChromePage(BasePage): full_page: bool = False, left_top: Tuple[int, int] = None, right_bottom: Tuple[int, int] = None) -> Union[str, bytes]: - """对页面进行截图,可对整个网页、可见网页、指定范围截图。对可视范围外截图需要新版浏览器支持 \n + """对页面进行截图,可对整个网页、可见网页、指定范围截图。对可视范围外截图需要90以上版本浏览器支持 \n :param path: 完整路径,后缀可选'jpg','jpeg','png','webp' :param as_bytes: 是否已字节形式返回图片,可选'jpg','jpeg','png','webp',生效时path参数无效 :param full_page: 是否整页截图,为True截取整个网页,为False截取可视窗口 @@ -378,14 +422,14 @@ class ChromePage(BasePage): def forward(self, steps: int = 1) -> None: """在浏览历史中前进若干步 \n - :param steps: 次数 + :param steps: 前进步数 :return: None """ self.run_script(f'window.history.go({steps});', as_expr=True) def back(self, steps: int = 1) -> None: """在浏览历史中后退若干步 \n - :param steps: 次数 + :param steps: 后退步数 :return: None """ self.run_script(f'window.history.go({-steps});', as_expr=True) @@ -394,7 +438,7 @@ class ChromePage(BasePage): """页面停止加载""" self.driver.Page.stopLoading() - def run_cdp(self, cmd: str, **cmd_args): + def run_cdp(self, cmd: str, **cmd_args) -> dict: """执行Chrome DevTools Protocol语句 \n :param cmd: 协议项目 :param cmd_args: 参数 @@ -448,51 +492,50 @@ class ChromePage(BasePage): :param url: 新标签页跳转到的网址 :return: None """ + self.driver url = f'?{url}' if url else '' requests_get(f'http://{self.address}/json/new{url}') - def to_tab(self, num_or_handle: Union[int, str] = 0, activate: bool = True) -> None: + def to_tab(self, num_or_id: Union[int, str] = 0, activate: bool = True) -> None: """跳转到标签页 \n - 注意:当程序使用的是接管的浏览器,获取到的 handle 顺序和视觉效果不一致 \n - :param num_or_handle: 标签页序号或handle字符串,序号第一个为0,最后为-1 + 注意:当程序使用的是接管的浏览器,获取到的 id 顺序和视觉效果不一致 \n + :param num_or_id: 标签页序号或id字符串,序号第一个为0,最后为-1 :param activate: 切换后是否变为活动状态 :return: None """ try: - tab = int(num_or_handle) + tab = int(num_or_id) except (ValueError, TypeError): - tab = num_or_handle + tab = num_or_id - if not self.tab_handles: - return - - tab = self.tab_handles[tab] if isinstance(tab, int) else tab + tab = self.tab_ids[tab] if isinstance(tab, int) else tab self.driver.stop() - self._connect_debugger(tab) + self._connect_browser(tab) if activate: requests_get(f'http://{self.address}/json/activate/{tab}') def to_front(self) -> None: """激活当前标签页使其处于最前面""" - requests_get(f'http://{self.address}/json/activate/{self.current_tab_handle}') + self.driver + requests_get(f'http://{self.address}/json/activate/{self.current_tab_id}') - def close_tabs(self, num_or_handles: Union[int, str, list, tuple, set] = None, others: bool = False) -> None: + def close_tabs(self, num_or_ids: Union[int, str, list, tuple, set] = None, others: bool = False) -> None: """关闭传入的标签页,默认关闭当前页。可传入多个 \n - 注意:当程序使用的是接管的浏览器,获取到的 handle 顺序和视觉效果不一致,不能按序号关闭。 \n - :param num_or_handles:要关闭的标签页序号或handle,可传入handle和序号组成的列表或元组,为None时关闭当前页 + 注意:当程序使用的是接管的浏览器,获取到的 id 顺序和视觉效果不一致,不能按序号关闭。 \n + :param num_or_ids:要关闭的标签页序号或id,可传入id和序号组成的列表或元组,为None时关闭当前页 :param others: 是否关闭指定标签页之外的 :return: None """ if others: - all_tabs = self.tab_handles - reserve_tabs = {self.current_tab_handle} if num_or_handles is None else _get_tabs(all_tabs, num_or_handles) + all_tabs = self.tab_ids + reserve_tabs = {self.current_tab_id} if num_or_ids is None else _get_tabs(all_tabs, num_or_ids) tabs = set(all_tabs) - reserve_tabs else: - tabs = (self.current_tab_handle,) if num_or_handles is None else _get_tabs(self.tab_handles, num_or_handles) + tabs = (self.current_tab_id,) if num_or_ids is None else _get_tabs(self.tab_ids, num_or_ids) tabs_len = len(tabs) - all_len = len(self.tab_handles) + all_len = len(self.tab_ids) if tabs_len > all_len: raise ValueError('要关闭的页面数量不能大于总数量。') @@ -507,13 +550,13 @@ class ChromePage(BasePage): if is_alive: self.to_tab(0) - def close_other_tabs(self, num_or_handles: Union[int, str, list, tuple] = None) -> None: + def close_other_tabs(self, num_or_ids: Union[int, str, list, tuple] = None) -> None: """关闭传入的标签页以外标签页,默认保留当前页。可传入多个 \n - 注意:当程序使用的是接管的浏览器,获取到的 handle 顺序和视觉效果不一致,不能按序号关闭。 \n - :param num_or_handles: 要保留的标签页序号或handle,可传入handle和序号组成的列表或元组,为None时保存当前页 + 注意:当程序使用的是接管的浏览器,获取到的 id 顺序和视觉效果不一致,不能按序号关闭。 \n + :param num_or_ids: 要保留的标签页序号或id,可传入id和序号组成的列表或元组,为None时保存当前页 :return: None """ - self.close_tabs(num_or_handles, True) + self.close_tabs(num_or_ids, True) def clear_cache(self, session_storage: bool = True, @@ -537,10 +580,10 @@ class ChromePage(BasePage): self.driver.Network.clearBrowserCookies() def handle_alert(self, accept: bool = True, send: str = None, timeout: float = None) -> Union[str, None]: - """处理提示框 \n + """处理提示框,可以自动等待提示框出现 \n :param accept: True表示确认,False表示取消,其它值不会按按钮但依然返回文本值 :param send: 处理prompt提示框时可输入文本 - :param timeout: 等待提示框出现的超时时间 + :param timeout: 等待提示框出现的超时时间,为None则使用self.timeout属性的值 :return: 提示框内容文本,未等到提示框则返回None """ timeout = timeout or self.timeout @@ -643,7 +686,7 @@ class ChromePage(BasePage): class Alert(object): - """用于保存alert信息""" + """用于保存alert信息的类""" def __init__(self): self.activated = False @@ -655,7 +698,7 @@ class Alert(object): class Timeout(object): - """用于保存d模式timeout信息""" + """用于保存d模式timeout信息的类""" def __init__(self, page: ChromePage): self.page = page @@ -674,26 +717,20 @@ class WindowSizeSetter(object): self.driver = page.driver self.window_id = self._get_info()['windowId'] - def _get_info(self): - return self.driver.Browser.getWindowBounds() - - def _perform(self, bounds: dict): - self.driver.Browser.setWindowBounds(windowId=self.window_id, bounds=bounds) - def maximized(self) -> None: - """最大化""" + """窗口最大化""" self._perform({'windowState': 'maximized'}) def minimized(self) -> None: - """最小化""" + """窗口最小化""" self._perform({'windowState': 'minimized'}) def fullscreen(self) -> None: - """全屏""" + """设置窗口为全屏""" self._perform({'windowState': 'fullscreen'}) def normal(self) -> None: - """常规""" + """设置窗口为常规模式""" self._perform({'windowState': 'normal'}) def new_size(self, width: int = None, height: int = None) -> None: @@ -709,7 +746,7 @@ class WindowSizeSetter(object): self._perform({'width': width, 'height': height}) def to_location(self, x: int = None, y: int = None) -> None: - """设置在屏幕中的位置,相对左上角坐标 \n + """设置窗口在屏幕中的位置,相对左上角坐标 \n :param x: 距离顶部距离 :param y: 距离左边距离 :return: None @@ -721,22 +758,38 @@ class WindowSizeSetter(object): y = y or info['top'] self._perform({'left': x, 'top': y}) + def _get_info(self) -> dict: + """获取窗口位置及大小信息""" + return self.driver.Browser.getWindowBounds() -def _get_tabs(handles: list, num_or_handles: Union[int, str, list, tuple, set]) -> set: - """返回指定标签页handle组成的set \n - :param handles: handles列表 - :param num_or_handles: 指定的标签页,可以是多个 + def _perform(self, bounds: dict) -> None: + """执行改变窗口大小操作 + :param bounds: 控制数据 + :return: None + """ + self.driver.Browser.setWindowBounds(windowId=self.window_id, bounds=bounds) + + +def _get_tabs(ids: list, num_or_ids: Union[int, str, list, tuple, set]) -> set: + """返回指定标签页id组成的set + :param ids: 所有页面id组成的列表 + :param num_or_ids: 指定的标签页,可以是多个 :return: 指定标签页组成的set """ - if isinstance(num_or_handles, (int, str)): - num_or_handles = (num_or_handles,) - elif not isinstance(num_or_handles, (list, tuple, set)): - raise TypeError('num_or_handle参数只能是int、str、list、set 或 tuple类型。') + if isinstance(num_or_ids, (int, str)): + num_or_ids = (num_or_ids,) + elif not isinstance(num_or_ids, (list, tuple, set)): + raise TypeError('num_or_id参数只能是int、str、list、set 或 tuple类型。') - return set(i if isinstance(i, str) else handles[i] for i in num_or_handles) + return set(i if isinstance(i, str) else ids[i] for i in num_or_ids) def _show_or_hide_browser(page: ChromePage, hide: bool = True) -> None: + """执行显示或隐藏浏览器窗口 + :param page: ChromePage对象 + :param hide: 是否隐藏 + :return: None + """ if not page.address.startswith(('localhost', '127.0.0.1')): return @@ -759,7 +812,11 @@ def _show_or_hide_browser(page: ChromePage, hide: bool = True) -> None: def _get_browser_progress_id(progress, address: str) -> Union[str, None]: - """获取浏览器进程id""" + """获取浏览器进程id + :param progress: 已知的进程对象,没有时传入None + :param address: 浏览器管理地址,含端口 + :return: 进程id + """ if progress: return progress.pid @@ -778,7 +835,11 @@ def _get_browser_progress_id(progress, address: str) -> Union[str, None]: def _get_chrome_hwnds_from_pid(pid, title) -> list: - """通过PID查询句柄ID""" + """通过PID查询句柄ID + :param pid: 进程id + :param title: 窗口标题 + :return: 进程句柄组成的列表 + """ try: from win32gui import IsWindow, GetWindowText, EnumWindows from win32process import GetWindowThreadProcessId diff --git a/DrissionPage/common.py b/DrissionPage/common.py index 0a21db1..99e0d27 100644 --- a/DrissionPage/common.py +++ b/DrissionPage/common.py @@ -524,7 +524,6 @@ def connect_chrome(option: DriverOptions) -> tuple: system_type = system().lower() debugger_address = option.debugger_address chrome_path = option.chrome_path - args = option.arguments debugger_address = debugger_address[7:] if debugger_address.startswith('http://') else debugger_address ip, port = debugger_address.split(':') @@ -536,19 +535,7 @@ def connect_chrome(option: DriverOptions) -> tuple: else chrome_path return chrome_path, None - args = [] if args is None else args - args1 = [] - for arg in args: - if arg.startswith(('--user-data-dir', '--disk-cache-dir', '--user-agent')) and system().lower() == 'windows': - index = arg.find('=') + 1 - args1.append(f'{arg[:index]}"{arg[index:].strip()}"') - else: - args1.append(arg) - - args = set(args1) - - # if proxy: - # args.add(f'--proxy-server={proxy["http"]}') + args = _get_running_args(option) # ----------创建浏览器进程---------- try: @@ -569,7 +556,7 @@ def connect_chrome(option: DriverOptions) -> tuple: return chrome_path, debugger -def _run_browser(port, path: str, args: set) -> Popen: +def _run_browser(port, path: str, args) -> Popen: """创建chrome进程 \n :param port: 端口号 :param path: 浏览器地址 @@ -595,3 +582,52 @@ def _run_browser(port, path: str, args: set) -> Popen: pass raise ConnectionError('无法连接浏览器。') + + +def _get_running_args(opt: DriverOptions) -> list: + """从DriverOptions获取命令行启动参数""" + sys = system().lower() + result = [] + + # ----------处理arguments----------- + args = opt.arguments + for arg in args: + if arg.startswith(('--user-data-dir', '--disk-cache-dir', '--user-agent')) and sys == 'windows': + index = arg.find('=') + 1 + result.append(f'{arg[:index]}"{arg[index:].strip()}"') + else: + result.append(arg) + + # ----------处理extensions------------- + ext = opt.extensions + if ext: + ext = set(ext) + if sys == 'windows': + ext = '","'.join(ext) + ext = f'"{ext}"' + else: + ext = ','.join(ext) + ext = f'--load-extension={ext}' + result.append(ext) + + # ----------处理experimental_options------------- + + return result + + + +def _location_in_viewport(page, loc_x: int, loc_y: int) -> bool: + """判断给定的坐标是否在视口中 |n + :param page: ChromePage对象 + :param loc_x: 页面绝对坐标x + :param loc_y: 页面绝对坐标y + :return: + """ + js = f''' + function(){{var x = {loc_x};var y = {loc_y}; + const vWidth = window.innerWidth || document.documentElement.clientWidth + const vHeight = window.innerHeight || document.documentElement.clientHeight + if (x< document.documentElement.scrollLeft || y < document.documentElement.scrollTop + || x > vWidth || y > vHeight){{return false;}} + return true;}}''' + return page.run_script(js) diff --git a/DrissionPage/web_page.py b/DrissionPage/web_page.py index fdec08b..b408370 100644 --- a/DrissionPage/web_page.py +++ b/DrissionPage/web_page.py @@ -16,17 +16,19 @@ from .session_page import SessionPage class WebPage(SessionPage, ChromePage, BasePage): + """整合浏览器和request的页面类""" + def __init__(self, mode: str = 'd', timeout: float = 10, - tab_handle: str = None, + tab_id: str = None, driver_or_options: Union[Tab, DriverOptions, bool] = None, - session_or_options: Union[SessionOptions, SessionOptions, bool] = None) -> None: - """初始化函数 \n + session_or_options: Union[Session, SessionOptions, bool] = None) -> None: + """初始化函数 \n :param mode: 'd' 或 's',即driver模式和session模式 :param timeout: 超时时间,d模式时为寻找元素时间,s模式时为连接时间,默认10秒 - :param driver_or_options: Tab对象或浏览器设置,只使用s模式时应传入False - :param session_or_options: Session对象或requests设置,只使用d模式时应传入False + :param driver_or_options: Tab对象或DriverOptions对象,只使用s模式时应传入False + :param session_or_options: Session对象或SessionOptions对象,只使用d模式时应传入False """ self._mode = mode.lower() if self._mode not in ('s', 'd'): @@ -37,7 +39,7 @@ class WebPage(SessionPage, ChromePage, BasePage): self._driver = None self._set_session_options(session_or_options) self._set_driver_options(driver_or_options) - self._setting_handle = tab_handle + self._setting_tab_id = tab_id self._has_driver, self._has_session = (None, True) if self._mode == 's' else (True, None) self._response = None @@ -46,12 +48,12 @@ class WebPage(SessionPage, ChromePage, BasePage): def __call__(self, loc_or_str: Union[Tuple[str, str], str, ChromeElement, SessionElement], - timeout: float = None) -> Union[ChromeElement, SessionElement, str, None]: + timeout: float = None) -> Union[ChromeElement, SessionElement, None]: """在内部查找元素 \n 例:ele = page('@id=ele_id') \n :param loc_or_str: 元素的定位信息,可以是loc元组,或查询字符串 :param timeout: 超时时间 - :return: 子元素对象或属性文本 + :return: 子元素对象 """ if self._mode == 's': return super().__call__(loc_or_str) @@ -119,7 +121,7 @@ class WebPage(SessionPage, ChromePage, BasePage): """ self.change_mode('d') if self._driver is None: - self._connect_debugger(self._driver_options, self._setting_handle) + self._connect_browser(self._driver_options, self._setting_tab_id) return self._driver @@ -395,7 +397,7 @@ class WebPage(SessionPage, ChromePage, BasePage): elif isinstance(Tab_or_Options, Tab): self._driver = Tab_or_Options - self._connect_debugger(Tab_or_Options.id) + self._connect_browser(Tab_or_Options.id) self._has_driver = True elif isinstance(Tab_or_Options, DriverOptions): diff --git a/setup.py b/setup.py index 70a2478..27e4693 100644 --- a/setup.py +++ b/setup.py @@ -6,7 +6,7 @@ with open("README.md", "r", encoding='utf-8') as fh: setup( name="DrissionPage", - version="2.7.3", + version="3.0.0", author="g1879", author_email="g1879@qq.com", description="A module that integrates selenium and requests session, encapsulates common page operations.",