From b4a775ce4b021d3a18dcd860bae74c928460da97 Mon Sep 17 00:00:00 2001 From: g1879 Date: Tue, 8 Nov 2022 18:08:51 +0800 Subject: [PATCH] =?UTF-8?q?=E7=BB=A7=E7=BB=AD=E5=BC=80=E5=8F=91=E6=96=B0?= =?UTF-8?q?=E7=89=88=EF=BC=8C=E6=9C=AA=E5=AE=8C=E6=88=90?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- DrissionPage/__init__.py | 3 + DrissionPage/chrome_element.py | 367 +++++++++++++++++---------------- DrissionPage/chrome_page.py | 42 +++- DrissionPage/web_page.py | 7 +- 4 files changed, 231 insertions(+), 188 deletions(-) diff --git a/DrissionPage/__init__.py b/DrissionPage/__init__.py index 8adc44b..13c1905 100644 --- a/DrissionPage/__init__.py +++ b/DrissionPage/__init__.py @@ -1,5 +1,8 @@ # -*- coding:utf-8 -*- +from warnings import filterwarnings + +filterwarnings('ignore') from .mix_page import MixPage from .web_page import WebPage from .config import DriverOptions, SessionOptions diff --git a/DrissionPage/chrome_element.py b/DrissionPage/chrome_element.py index 7dd980e..262298a 100644 --- a/DrissionPage/chrome_element.py +++ b/DrissionPage/chrome_element.py @@ -63,9 +63,9 @@ class ChromeElement(DrissionElement): def inner_html(self) -> str: """返回元素innerHTML文本""" if self.tag in ('iframe', 'frame'): - return _run_script(self, 'this.contentDocument.documentElement;').html + return self.run_script('return this.contentDocument.documentElement;').html # return _run_script(self, 'this.contentDocument.body;').html - return self.run_script('this.innerHTML;') + return self.run_script('return this.innerHTML;') @property def attrs(self) -> dict: @@ -101,7 +101,7 @@ class ChromeElement(DrissionElement): @property def client_location(self) -> dict: """返回元素左上角坐标""" - js = 'this.getBoundingClientRect().left.toString()+" "+this.getBoundingClientRect().top.toString();' + js = 'return this.getBoundingClientRect().left.toString()+" "+this.getBoundingClientRect().top.toString();' xy = self.run_script(js) x, y = xy.split(' ') return {'x': int(x.split('.')[0]), 'y': int(y.split('.')[0])} @@ -245,18 +245,18 @@ function getElementPagePosition(element){ """ return super().befores(filter_loc, timeout) - # def wait_ele(self, - # loc_or_ele: Union[str, tuple, 'ChromeElement'], - # timeout: float = None) -> 'ElementWaiter': - # """等待子元素从dom删除、显示、隐藏 \n - # :param loc_or_ele: 可以是元素、查询字符串、loc元组 - # :param timeout: 等待超时时间 - # :return: 等待是否成功 - # """ - # return ElementWaiter(self, loc_or_ele, timeout) + def wait_ele(self, + loc_or_ele: Union[str, tuple, 'ChromeElement'], + timeout: float = None) -> 'ChromeElementWaiter': + """等待子元素从dom删除、显示、隐藏 \n + :param loc_or_ele: 可以是元素、查询字符串、loc元组 + :param timeout: 等待超时时间 + :return: 等待是否成功 + """ + return ChromeElementWaiter(self, loc_or_ele, timeout) @property - def select(self): + def select(self) -> 'ChromeSelect': """返回专门处理下拉列表的Select类,非下拉列表元素返回False""" if self._select is None: if self.tag != 'select': @@ -267,8 +267,30 @@ function getElementPagePosition(element){ return self._select @property - def is_selected(self): - return self.run_script('this.selected;') + def is_selected(self) -> bool: + """返回元素是否被选择""" + return self.run_script('return this.selected;') + + @property + def is_displayed(self) -> bool: + """返回元素是否显示""" + return not (self.style('visibility') == 'hidden' + or self.run_script('return this.offsetParent === null;') + or self.style('display') == 'none') + + @property + def is_enabled(self) -> bool: + """返回元素是否可用""" + return not self.run_script('return this.disabled;') + + @property + def is_alive(self) -> bool: + """返回元素是否仍在DOM中""" + try: + self.tag + return True + except Exception: + return False @property def is_in_view(self) -> bool: @@ -419,7 +441,7 @@ return true;}""" """ if pseudo_ele: pseudo_ele = f', "{pseudo_ele}"' if pseudo_ele.startswith(':') else f', "::{pseudo_ele}"' - js = f'function(){{return window.getComputedStyle(this{pseudo_ele}).getPropertyValue("{style}");}}' + js = f'return window.getComputedStyle(this{pseudo_ele}).getPropertyValue("{style}");' return self.run_script(js) def get_screenshot(self, path: [str, Path] = None, @@ -430,12 +452,12 @@ return true;}""" :return: 图片完整路径或字节文本 """ if self.tag == 'img': # 等待图片加载完成 - js = ('arguments[0].complete && typeof arguments[0].naturalWidth != "undefined" ' - '&& arguments[0].naturalWidth > 0 && typeof arguments[0].naturalHeight != "undefined" ' - '&& arguments[0].naturalHeight > 0') - t1 = perf_counter() - while not self.run_script(js) and perf_counter() - t1 < self.page.timeout: - pass + js = ('return this.complete && typeof this.naturalWidth != "undefined" ' + '&& this.naturalWidth > 0 && typeof this.naturalHeight != "undefined" ' + '&& this.naturalHeight > 0') + end_time = perf_counter() + self.page.timeout + while not self.run_script(js) and perf_counter() < end_time: + sleep(.1) left, top = self.location.values() height, width = self.size.values() @@ -454,55 +476,16 @@ return true;}""" self.page.driver.Input.dispatchKeyEvent(type='char', modifiers=2, code='KeyA') - # def input(self, - # vals: Union[str, tuple], - # clear: bool = True, - # insure: bool = True, - # timeout: float = None) -> bool: - # """输入文本或组合键,也可用于输入文件路径到input元素(文件间用\n间隔) \n - # :param vals: 文本值或按键组合 - # :param clear: 输入前是否清空文本框 - # :param insure: 确保输入正确,解决文本框有时输入失效的问题,不能用于输入组合键 - # :param timeout: 尝试输入的超时时间,不指定则使用父页面的超时时间,只在insure为True时生效 - # :return: bool - # """ - # if not insure or self.tag != 'input' or self.prop('type') != 'text': # 普通输入 - # if not isinstance(vals, (str, tuple)): - # vals = str(vals) - # if clear: - # self.inner_ele.clear() - # - # self.inner_ele.send_keys(*vals) - # return True - # - # else: # 确保输入正确 - # if not isinstance(vals, str): - # vals = str(vals) - # enter = '\n' if vals.endswith('\n') else None - # full_txt = vals if clear else f'{self.attr("value")}{vals}' - # full_txt = full_txt.rstrip('\n') - # - # self.click(by_js=True) - # timeout = timeout if timeout is not None else self.page.timeout - # t1 = perf_counter() - # while self.is_valid() and self.attr('value') != full_txt and perf_counter() - t1 <= timeout: - # try: - # if clear: - # self.inner_ele.send_keys(u'\ue009', 'a', u'\ue017') # 有些ui下clear()不生效,用CTRL+a代替 - # self.inner_ele.send_keys(vals) - # - # except Exception: - # pass - # - # if not self.is_valid(): - # return False - # else: - # if self.attr('value') != full_txt: - # return False - # else: - # if enter: - # self.inner_ele.send_keys(enter) - # return True + def input(self) -> bool: + try: + self.page.driver.DOM.focus(nodeId=self._node_id) + except Exception: + self.click(by_js=True) + + self.page.driver.Input.dispatchKeyEvent(type='keyDown', keyIdentifier="\ue009", ) + self.page.driver.Input.dispatchKeyEvent(type='keyDown', code="KeyA", ) + # self.page.driver.Input.dispatchKeyEvent(type='keyDown', + # commands=["\ue009",'KeyA']) def click(self, by_js: bool = None, timeout: float = None) -> bool: """点击元素 \n @@ -516,15 +499,13 @@ return true;}""" r = self.page.driver.DOM.getNodeForLocation(x=lx + 1, y=ly + 1) if r.get('nodeId') != self._node_id: return False - self.page.driver.Input.dispatchMouseEvent(type='mousePressed', x=cx, y=cy, button='left', clickCount=1) - sleep(.1) - self.page.driver.Input.dispatchMouseEvent(type='mouseReleased', x=cx, y=cy, button='left') + + self._click(cx, cy) return True if not by_js: self.page.scroll_to_see(self) if self.is_in_view: - timeout = timeout if timeout is not None else self.page.timeout xy = self.client_location location = self.location size = self.size @@ -533,23 +514,66 @@ return true;}""" loc_x = location['x'] + size['width'] // 2 loc_y = location['y'] + size['height'] // 2 - t1 = perf_counter() + timeout = timeout if timeout is not None else self.page.timeout + end_time = perf_counter() + timeout click = do_it(client_x, client_y, loc_x, loc_y) - while not click and perf_counter() - t1 <= timeout: + while not click and perf_counter() < end_time: click = do_it(client_x, client_y, location['x'], location['y']) if click: return True if by_js is not False: - js = 'function(){this.click();}' + js = 'this.click();' self.run_script(js) return True return False - def click_at(self): - pass + def click_at(self, + x: Union[int, str] = None, + y: Union[int, str] = None, + button: str = 'left') -> None: + """带偏移量点击本元素,相对于左上角坐标。不传入x或y值时点击元素中点 \n + :param x: 相对元素左上角坐标的x轴偏移量 + :param y: 相对元素左上角坐标的y轴偏移量 + :param button: 左键还是右键 + :return: None + """ + x, y = _offset_scroll(self, x, y) + self._click(x, y, button) + + def r_click(self) -> None: + """右键单击""" + self.page.scroll_to_see(self) + xy = self.client_location + size = self.size + cx = xy['x'] + size['width'] // 2 + cy = xy['y'] + size['height'] // 2 + self._click(cx, cy, 'right') + + def r_click_at(self, x: Union[int, str], y: Union[int, str]) -> None: + """带偏移量右键单击本元素,相对于左上角坐标。不传入x或y值时点击元素中点 \n + :param x: 相对元素左上角坐标的x轴偏移量 + :param y: 相对元素左上角坐标的y轴偏移量 + :return: None + """ + self.click_at(x, y, 'right') + + def _click(self, x: int, y: int, button: str = 'left') -> None: + """实施点击""" + self.page.driver.Input.dispatchMouseEvent(type='mousePressed', x=x, y=y, button=button, clickCount=1) + sleep(.1) + self.page.driver.Input.dispatchMouseEvent(type='mouseReleased', x=x, y=y, button=button) + + def hover(self, x: int = None, y: int = None) -> None: + """鼠标悬停,可接受偏移量,偏移量相对于元素左上角坐标。不传入x或y值时悬停在元素中点 \n + :param x: 相对元素左上角坐标的x轴偏移量 + :param y: 相对元素左上角坐标的y轴偏移量 + :return: None + """ + x, y = _offset_scroll(self, x, y) + self.page.driver.Input.dispatchMouseEvent(type='mouseMoved', x=x, y=y) def _get_obj_id(self, node_id) -> str: return self.page.driver.DOM.resolveNode(nodeId=node_id)['object']['objectId'] @@ -557,10 +581,6 @@ return true;}""" def _get_node_id(self, obj_id) -> str: return self.page.driver.DOM.requestNode(objectId=obj_id)['nodeId'] - @property - def is_valid(self): - return True - def _get_ele_path(self, mode) -> str: """返获取css路径或xpath路径""" if mode == 'xpath': @@ -658,9 +678,9 @@ def _find_by_xpath(ele: ChromeElement, xpath: str, single: bool, timeout: float) else: raise SyntaxError(f'查询语句错误:\n{r}') - t1 = perf_counter() + end_time = perf_counter() + timeout while (r['result']['subtype'] == 'null' - or r['result']['description'] == 'NodeList(0)') and perf_counter() - t1 < timeout: + or r['result']['description'] == 'NodeList(0)') and perf_counter() < end_time: r = ele.page.run_cdp('Runtime.callFunctionOn', functionDeclaration=js, objectId=ele.obj_id, returnByValue=False, awaitPromise=True, userGesture=True) @@ -691,9 +711,9 @@ def _find_by_css(ele: ChromeElement, selector: str, single: bool, timeout: float if 'exceptionDetails' in r: raise SyntaxError(f'查询语句错误:\n{r}') - t1 = perf_counter() + end_time = perf_counter() + timeout while (r['result']['subtype'] == 'null' - or r['result']['description'] == 'NodeList(0)') and perf_counter() - t1 < timeout: + or r['result']['description'] == 'NodeList(0)') and perf_counter() < end_time: r = ele.page.run_cdp('Runtime.callFunctionOn', functionDeclaration=js, objectId=ele.obj_id, returnByValue=False, awaitPromise=True, userGesture=True) @@ -771,7 +791,6 @@ def _run_script(page_or_ele, script: str, as_expr: bool = False, args: tuple = N else: args = args or () if not is_js_func(script): - script = script if script.strip().startswith('return') else f'return {script}' script = f'function(){{{script}}}' res = page.run_cdp('Runtime.callFunctionOn', functionDeclaration=script, @@ -784,6 +803,7 @@ def _run_script(page_or_ele, script: str, as_expr: bool = False, args: tuple = N exceptionDetails = res.get('exceptionDetails') if exceptionDetails: raise RuntimeError(f'Evaluation failed: {exceptionDetails}') + # print(res.get('result')) return _parse_js_result(page, res.get('result')) @@ -847,6 +867,19 @@ def _convert_argument(arg: Any) -> dict: # return {'value': arg} +def _offset_scroll(ele: ChromeElement, x: int, y: int): + location = ele.location + size = ele.size + lx = location['x'] + int(x) if x is not None else location['x'] + size['width'] // 2 + ly = location['y'] + int(y) if y is not None else location['y'] + size['height'] // 2 + + ele.page.scroll.to_location(lx - 5, ly - 5) + cl = ele.client_location + x = cl['x'] + int(x) if x is not None else cl['x'] + size['width'] // 2 + y = cl['y'] + int(y) if y is not None else cl['y'] + size['height'] // 2 + return x, y + + class ChromeScroll(object): """用于滚动的对象""" @@ -968,7 +1001,7 @@ class ChromeSelect(object): """返回第一个被选中的option元素 \n :return: ChromeElement对象或None """ - ele = self._ele.run_script('this.options[this.selectedIndex];') + ele = self._ele.run_script('return this.options[this.selectedIndex];') return ele @property @@ -1081,9 +1114,9 @@ class ChromeSelect(object): return True if isinstance(text_value_index, (str, int)): - t1 = perf_counter() ok = do_select() - while not ok and perf_counter() - t1 < timeout: + end_time = perf_counter() + timeout + while not ok and perf_counter() < end_time: sleep(.2) ok = do_select() return ok @@ -1121,85 +1154,71 @@ class ChromeSelect(object): return success -# class ElementWaiter(object): -# """等待元素在dom中某种状态,如删除、显示、隐藏""" -# -# def __init__(self, -# page_or_ele, -# loc_or_ele: Union[str, tuple, ChromeElement], -# timeout: float = None): -# """等待元素在dom中某种状态,如删除、显示、隐藏 \n -# :param page_or_ele: 页面或父元素 -# :param loc_or_ele: 要等待的元素,可以是已有元素、定位符 -# :param timeout: 超时时间,默认读取页面超时时间 -# """ -# if isinstance(page_or_ele, ChromeElement): -# page = page_or_ele.page -# self.driver = page_or_ele.inner_ele -# else: -# page = page_or_ele -# self.driver = page_or_ele.driver -# -# if isinstance(loc_or_ele, ChromeElement): -# self.target = loc_or_ele.inner_ele -# -# elif isinstance(loc_or_ele, str): -# self.target = str_to_loc(loc_or_ele) -# -# elif isinstance(loc_or_ele, tuple): -# self.target = loc_or_ele -# -# else: -# raise TypeError('loc_or_ele参数只能是str、tuple、DriverElement 或 WebElement类型。') -# -# self.timeout = timeout if timeout is not None else page.timeout -# -# def delete(self) -> bool: -# """等待元素从dom删除""" -# return self._wait_ele('del') -# -# def display(self) -> bool: -# """等待元素从dom显示""" -# return self._wait_ele('display') -# -# def hidden(self) -> bool: -# """等待元素从dom隐藏""" -# return self._wait_ele('hidden') -# -# def _wait_ele(self, mode: str) -> bool: -# """执行等待 -# :param mode: 等待模式 -# :return: 是否等待成功 -# """ -# if isinstance(self.target, WebElement): -# end_time = time() + self.timeout -# while time() < end_time: -# if mode == 'del': -# try: -# self.target.is_enabled() -# except Exception: -# return True -# -# elif mode == 'display' and self.target.is_displayed(): -# return True -# -# elif mode == 'hidden' and not self.target.is_displayed(): -# return True -# -# return False -# -# else: -# try: -# if mode == 'del': -# WebDriverWait(self.driver, self.timeout).until_not(ec.presence_of_element_located(self.target)) -# -# elif mode == 'display': -# WebDriverWait(self.driver, self.timeout).until(ec.visibility_of_element_located(self.target)) -# -# elif mode == 'hidden': -# WebDriverWait(self.driver, self.timeout).until_not(ec.visibility_of_element_located(self.target)) -# -# return True -# -# except Exception: -# return False + +class ChromeElementWaiter(object): + """等待元素在dom中某种状态,如删除、显示、隐藏""" + + def __init__(self, + page_or_ele, + loc_or_ele: Union[str, tuple, ChromeElement], + timeout: float = None): + """等待元素在dom中某种状态,如删除、显示、隐藏 \n + :param page_or_ele: 页面或父元素 + :param loc_or_ele: 要等待的元素,可以是已有元素、定位符 + :param timeout: 超时时间,默认读取页面超时时间 + """ + if not isinstance(loc_or_ele, (str, tuple, ChromeElement)): + raise TypeError('loc_or_ele只能接收定位符或元素对象。') + + self.driver = page_or_ele + self.loc_or_ele = loc_or_ele + if timeout is not None: + self.timeout = timeout + else: + self.timeout = page_or_ele.page.timeout if isinstance(page_or_ele, ChromeElement) else page_or_ele.timeout + + def delete(self) -> bool: + """等待元素从dom删除""" + if isinstance(self.loc_or_ele, ChromeElement): + end_time = perf_counter() + self.timeout + while perf_counter() < end_time: + if not self.loc_or_ele.is_alive: + return True + + ele = self.driver(self.loc_or_ele, timeout=.5) + if ele is None: + return True + + end_time = perf_counter() + self.timeout + while perf_counter() < end_time: + if not self.loc_or_ele.is_alive: + return True + + return False + + def display(self) -> bool: + """等待元素从dom显示""" + return self._wait_ele('display') + + def hidden(self) -> bool: + """等待元素从dom隐藏""" + return self._wait_ele('hidden') + + def _wait_ele(self, mode: str) -> bool: + """执行等待 + :param mode: 等待模式 + :return: 是否等待成功 + """ + target = self.driver(self.loc_or_ele) + if target is None: + return None + + end_time = perf_counter() + self.timeout + while perf_counter() < end_time: + if mode == 'display' and target.is_displayed: + return True + + elif mode == 'hidden' and not target.is_displayed: + return True + + return False diff --git a/DrissionPage/chrome_page.py b/DrissionPage/chrome_page.py index cd573d0..acd152b 100644 --- a/DrissionPage/chrome_page.py +++ b/DrissionPage/chrome_page.py @@ -14,7 +14,7 @@ from .config import DriverOptions, _cookies_to_tuple from .base import BasePage from .common import get_loc from .drission import connect_chrome -from .chrome_element import ChromeElement, ChromeScroll, _run_script +from .chrome_element import ChromeElement, ChromeScroll, _run_script, ChromeElementWaiter class ChromePage(BasePage): @@ -25,10 +25,10 @@ class ChromePage(BasePage): super().__init__(timeout) self._connect_debugger(Tab_or_Options, tab_handle) - def _ready(self): - self._alert = Alert() - self.driver.Page.javascriptDialogOpening = self._on_alert_open - self.driver.Page.javascriptDialogClosed = self._on_alert_close + # def _ready(self): + # self._alert = Alert() + # self.driver.Page.javascriptDialogOpening = self._on_alert_open + # self.driver.Page.javascriptDialogClosed = self._on_alert_close def _connect_debugger(self, Tab_or_Options: Union[Tab, DriverOptions] = None, tab_handle: str = None): if isinstance(Tab_or_Options, Tab): @@ -50,6 +50,10 @@ class ChromePage(BasePage): root = self._driver.DOM.getDocument() self.root = ChromeElement(self, node_id=root['root']['nodeId']) + self._alert = Alert() + self.driver.Page.javascriptDialogOpening = self._on_alert_open + self.driver.Page.javascriptDialogClosed = self._on_alert_close + def __call__(self, loc_or_str: Union[Tuple[str, str], str, 'ChromeElement'], timeout: float = None) -> Union['ChromeElement', str, None]: """在内部查找元素 \n @@ -204,8 +208,8 @@ class ChromePage(BasePage): search_result = self.driver.DOM.performSearch(query=loc) count = search_result['resultCount'] - t1 = perf_counter() - while count == 0 and perf_counter() - t1 < timeout: + end_time = perf_counter() + timeout + while count == 0 and perf_counter() < end_time: search_result = self.driver.DOM.performSearch(query=loc) count = search_result['resultCount'] @@ -220,6 +224,16 @@ class ChromePage(BasePage): else: return [ChromeElement(self, node_id=i) for i in nodeIds['nodeIds']] + def wait_ele(self, + loc_or_ele: Union[str, tuple, ChromeElement], + timeout: float = None) -> ChromeElementWaiter: + """等待元素从dom删除、显示、隐藏 \n + :param loc_or_ele: 可以是元素、查询字符串、loc元组 + :param timeout: 等待超时时间 + :return: 用于等待的ElementWaiter对象 + """ + return ChromeElementWaiter(self, loc_or_ele, timeout) + def get_screenshot(self, path: [str, Path] = None, as_bytes: [bool, str] = None, full_page: bool = False, @@ -476,8 +490,8 @@ class ChromePage(BasePage): for _ in range(times + 1): try: result = self.driver.Page.navigate(url=to_url) - t1 = perf_counter() - while self.ready_state != 'complete' and perf_counter() - t1 < timeout: + end_time = perf_counter() + timeout + while self.ready_state != 'complete' and perf_counter() < end_time: sleep(.5) if self.ready_state != 'complete': raise TimeoutError @@ -508,12 +522,16 @@ class ChromePage(BasePage): self._alert.text = None self._alert.type = None self._alert.defaultPrompt = None + self._alert.response_accept = kwargs.get['result'] + self._alert.response_text = kwargs['userInput'] def _on_alert_open(self, **kwargs): self._alert.activated = True self._alert.text = kwargs['message'] self._alert.type = kwargs['message'] self._alert.defaultPrompt = kwargs.get('defaultPrompt', None) + self._alert.response_accept = None + self._alert.response_text = None def handle_alert(self, accept: bool = True, send: str = None, timeout: float = None) -> Union[str, None]: """处理提示框 \n @@ -523,8 +541,8 @@ class ChromePage(BasePage): :return: 提示框内容文本,未等到提示框则返回None """ timeout = timeout or self.timeout - t1 = perf_counter() - while not self._alert.activated and perf_counter() - t1 < timeout: + end_time = perf_counter() + timeout + while not self._alert.activated and perf_counter() < end_time: sleep(.1) if not self._alert.activated: return None @@ -543,6 +561,8 @@ class Alert(object): self.text = None self.type = None self.defaultPrompt = None + self.response_accept = None + self.response_text = None def _get_tabs(handles: list, num_or_handles: Union[int, str, list, tuple, set]) -> set: diff --git a/DrissionPage/web_page.py b/DrissionPage/web_page.py index 170df11..729e667 100644 --- a/DrissionPage/web_page.py +++ b/DrissionPage/web_page.py @@ -42,7 +42,8 @@ class WebPage(SessionPage, ChromePage, BasePage): self._response = None if self._mode == 'd': - self._ready() + self.driver + # self._ready() # if self._mode == 'd': # try: @@ -199,8 +200,8 @@ class WebPage(SessionPage, ChromePage, BasePage): # s模式转d模式 if self._mode == 'd': - if not self._has_driver: - self._ready() + # if not self._has_driver: + # self._ready() self._has_driver = True self._url = None if not self._has_driver else super(SessionPage, self).url