继续开发新版,未完成

This commit is contained in:
g1879 2022-11-08 18:08:51 +08:00
parent 8c02db63f2
commit b4a775ce4b
4 changed files with 231 additions and 188 deletions

View File

@ -1,5 +1,8 @@
# -*- coding:utf-8 -*-
from warnings import filterwarnings
filterwarnings('ignore')
from .mix_page import MixPage
from .web_page import WebPage
from .config import DriverOptions, SessionOptions

View File

@ -63,9 +63,9 @@ class ChromeElement(DrissionElement):
def inner_html(self) -> str:
"""返回元素innerHTML文本"""
if self.tag in ('iframe', 'frame'):
return _run_script(self, 'this.contentDocument.documentElement;').html
return self.run_script('return this.contentDocument.documentElement;').html
# return _run_script(self, 'this.contentDocument.body;').html
return self.run_script('this.innerHTML;')
return self.run_script('return this.innerHTML;')
@property
def attrs(self) -> dict:
@ -101,7 +101,7 @@ class ChromeElement(DrissionElement):
@property
def client_location(self) -> dict:
"""返回元素左上角坐标"""
js = 'this.getBoundingClientRect().left.toString()+" "+this.getBoundingClientRect().top.toString();'
js = 'return this.getBoundingClientRect().left.toString()+" "+this.getBoundingClientRect().top.toString();'
xy = self.run_script(js)
x, y = xy.split(' ')
return {'x': int(x.split('.')[0]), 'y': int(y.split('.')[0])}
@ -245,18 +245,18 @@ function getElementPagePosition(element){
"""
return super().befores(filter_loc, timeout)
# def wait_ele(self,
# loc_or_ele: Union[str, tuple, 'ChromeElement'],
# timeout: float = None) -> 'ElementWaiter':
# """等待子元素从dom删除、显示、隐藏 \n
# :param loc_or_ele: 可以是元素、查询字符串、loc元组
# :param timeout: 等待超时时间
# :return: 等待是否成功
# """
# return ElementWaiter(self, loc_or_ele, timeout)
def wait_ele(self,
loc_or_ele: Union[str, tuple, 'ChromeElement'],
timeout: float = None) -> 'ChromeElementWaiter':
"""等待子元素从dom删除、显示、隐藏 \n
:param loc_or_ele: 可以是元素查询字符串loc元组
:param timeout: 等待超时时间
:return: 等待是否成功
"""
return ChromeElementWaiter(self, loc_or_ele, timeout)
@property
def select(self):
def select(self) -> 'ChromeSelect':
"""返回专门处理下拉列表的Select类非下拉列表元素返回False"""
if self._select is None:
if self.tag != 'select':
@ -267,8 +267,30 @@ function getElementPagePosition(element){
return self._select
@property
def is_selected(self):
return self.run_script('this.selected;')
def is_selected(self) -> bool:
"""返回元素是否被选择"""
return self.run_script('return this.selected;')
@property
def is_displayed(self) -> bool:
"""返回元素是否显示"""
return not (self.style('visibility') == 'hidden'
or self.run_script('return this.offsetParent === null;')
or self.style('display') == 'none')
@property
def is_enabled(self) -> bool:
"""返回元素是否可用"""
return not self.run_script('return this.disabled;')
@property
def is_alive(self) -> bool:
"""返回元素是否仍在DOM中"""
try:
self.tag
return True
except Exception:
return False
@property
def is_in_view(self) -> bool:
@ -419,7 +441,7 @@ return true;}"""
"""
if pseudo_ele:
pseudo_ele = f', "{pseudo_ele}"' if pseudo_ele.startswith(':') else f', "::{pseudo_ele}"'
js = f'function(){{return window.getComputedStyle(this{pseudo_ele}).getPropertyValue("{style}");}}'
js = f'return window.getComputedStyle(this{pseudo_ele}).getPropertyValue("{style}");'
return self.run_script(js)
def get_screenshot(self, path: [str, Path] = None,
@ -430,12 +452,12 @@ return true;}"""
:return: 图片完整路径或字节文本
"""
if self.tag == 'img': # 等待图片加载完成
js = ('arguments[0].complete && typeof arguments[0].naturalWidth != "undefined" '
'&& arguments[0].naturalWidth > 0 && typeof arguments[0].naturalHeight != "undefined" '
'&& arguments[0].naturalHeight > 0')
t1 = perf_counter()
while not self.run_script(js) and perf_counter() - t1 < self.page.timeout:
pass
js = ('return this.complete && typeof this.naturalWidth != "undefined" '
'&& this.naturalWidth > 0 && typeof this.naturalHeight != "undefined" '
'&& this.naturalHeight > 0')
end_time = perf_counter() + self.page.timeout
while not self.run_script(js) and perf_counter() < end_time:
sleep(.1)
left, top = self.location.values()
height, width = self.size.values()
@ -454,55 +476,16 @@ return true;}"""
self.page.driver.Input.dispatchKeyEvent(type='char',
modifiers=2, code='KeyA')
# def input(self,
# vals: Union[str, tuple],
# clear: bool = True,
# insure: bool = True,
# timeout: float = None) -> bool:
# """输入文本或组合键也可用于输入文件路径到input元素文件间用\n间隔 \n
# :param vals: 文本值或按键组合
# :param clear: 输入前是否清空文本框
# :param insure: 确保输入正确,解决文本框有时输入失效的问题,不能用于输入组合键
# :param timeout: 尝试输入的超时时间不指定则使用父页面的超时时间只在insure为True时生效
# :return: bool
# """
# if not insure or self.tag != 'input' or self.prop('type') != 'text': # 普通输入
# if not isinstance(vals, (str, tuple)):
# vals = str(vals)
# if clear:
# self.inner_ele.clear()
#
# self.inner_ele.send_keys(*vals)
# return True
#
# else: # 确保输入正确
# if not isinstance(vals, str):
# vals = str(vals)
# enter = '\n' if vals.endswith('\n') else None
# full_txt = vals if clear else f'{self.attr("value")}{vals}'
# full_txt = full_txt.rstrip('\n')
#
# self.click(by_js=True)
# timeout = timeout if timeout is not None else self.page.timeout
# t1 = perf_counter()
# while self.is_valid() and self.attr('value') != full_txt and perf_counter() - t1 <= timeout:
# try:
# if clear:
# self.inner_ele.send_keys(u'\ue009', 'a', u'\ue017') # 有些ui下clear()不生效用CTRL+a代替
# self.inner_ele.send_keys(vals)
#
# except Exception:
# pass
#
# if not self.is_valid():
# return False
# else:
# if self.attr('value') != full_txt:
# return False
# else:
# if enter:
# self.inner_ele.send_keys(enter)
# return True
def input(self) -> bool:
try:
self.page.driver.DOM.focus(nodeId=self._node_id)
except Exception:
self.click(by_js=True)
self.page.driver.Input.dispatchKeyEvent(type='keyDown', keyIdentifier="\ue009", )
self.page.driver.Input.dispatchKeyEvent(type='keyDown', code="KeyA", )
# self.page.driver.Input.dispatchKeyEvent(type='keyDown',
# commands=["\ue009",'KeyA'])
def click(self, by_js: bool = None, timeout: float = None) -> bool:
"""点击元素 \n
@ -516,15 +499,13 @@ return true;}"""
r = self.page.driver.DOM.getNodeForLocation(x=lx + 1, y=ly + 1)
if r.get('nodeId') != self._node_id:
return False
self.page.driver.Input.dispatchMouseEvent(type='mousePressed', x=cx, y=cy, button='left', clickCount=1)
sleep(.1)
self.page.driver.Input.dispatchMouseEvent(type='mouseReleased', x=cx, y=cy, button='left')
self._click(cx, cy)
return True
if not by_js:
self.page.scroll_to_see(self)
if self.is_in_view:
timeout = timeout if timeout is not None else self.page.timeout
xy = self.client_location
location = self.location
size = self.size
@ -533,23 +514,66 @@ return true;}"""
loc_x = location['x'] + size['width'] // 2
loc_y = location['y'] + size['height'] // 2
t1 = perf_counter()
timeout = timeout if timeout is not None else self.page.timeout
end_time = perf_counter() + timeout
click = do_it(client_x, client_y, loc_x, loc_y)
while not click and perf_counter() - t1 <= timeout:
while not click and perf_counter() < end_time:
click = do_it(client_x, client_y, location['x'], location['y'])
if click:
return True
if by_js is not False:
js = 'function(){this.click();}'
js = 'this.click();'
self.run_script(js)
return True
return False
def click_at(self):
pass
def click_at(self,
x: Union[int, str] = None,
y: Union[int, str] = None,
button: str = 'left') -> None:
"""带偏移量点击本元素相对于左上角坐标。不传入x或y值时点击元素中点 \n
:param x: 相对元素左上角坐标的x轴偏移量
:param y: 相对元素左上角坐标的y轴偏移量
:param button: 左键还是右键
:return: None
"""
x, y = _offset_scroll(self, x, y)
self._click(x, y, button)
def r_click(self) -> None:
"""右键单击"""
self.page.scroll_to_see(self)
xy = self.client_location
size = self.size
cx = xy['x'] + size['width'] // 2
cy = xy['y'] + size['height'] // 2
self._click(cx, cy, 'right')
def r_click_at(self, x: Union[int, str], y: Union[int, str]) -> None:
"""带偏移量右键单击本元素相对于左上角坐标。不传入x或y值时点击元素中点 \n
:param x: 相对元素左上角坐标的x轴偏移量
:param y: 相对元素左上角坐标的y轴偏移量
:return: None
"""
self.click_at(x, y, 'right')
def _click(self, x: int, y: int, button: str = 'left') -> None:
"""实施点击"""
self.page.driver.Input.dispatchMouseEvent(type='mousePressed', x=x, y=y, button=button, clickCount=1)
sleep(.1)
self.page.driver.Input.dispatchMouseEvent(type='mouseReleased', x=x, y=y, button=button)
def hover(self, x: int = None, y: int = None) -> None:
"""鼠标悬停可接受偏移量偏移量相对于元素左上角坐标。不传入x或y值时悬停在元素中点 \n
:param x: 相对元素左上角坐标的x轴偏移量
:param y: 相对元素左上角坐标的y轴偏移量
:return: None
"""
x, y = _offset_scroll(self, x, y)
self.page.driver.Input.dispatchMouseEvent(type='mouseMoved', x=x, y=y)
def _get_obj_id(self, node_id) -> str:
return self.page.driver.DOM.resolveNode(nodeId=node_id)['object']['objectId']
@ -557,10 +581,6 @@ return true;}"""
def _get_node_id(self, obj_id) -> str:
return self.page.driver.DOM.requestNode(objectId=obj_id)['nodeId']
@property
def is_valid(self):
return True
def _get_ele_path(self, mode) -> str:
"""返获取css路径或xpath路径"""
if mode == 'xpath':
@ -658,9 +678,9 @@ def _find_by_xpath(ele: ChromeElement, xpath: str, single: bool, timeout: float)
else:
raise SyntaxError(f'查询语句错误:\n{r}')
t1 = perf_counter()
end_time = perf_counter() + timeout
while (r['result']['subtype'] == 'null'
or r['result']['description'] == 'NodeList(0)') and perf_counter() - t1 < timeout:
or r['result']['description'] == 'NodeList(0)') and perf_counter() < end_time:
r = ele.page.run_cdp('Runtime.callFunctionOn',
functionDeclaration=js, objectId=ele.obj_id, returnByValue=False, awaitPromise=True,
userGesture=True)
@ -691,9 +711,9 @@ def _find_by_css(ele: ChromeElement, selector: str, single: bool, timeout: float
if 'exceptionDetails' in r:
raise SyntaxError(f'查询语句错误:\n{r}')
t1 = perf_counter()
end_time = perf_counter() + timeout
while (r['result']['subtype'] == 'null'
or r['result']['description'] == 'NodeList(0)') and perf_counter() - t1 < timeout:
or r['result']['description'] == 'NodeList(0)') and perf_counter() < end_time:
r = ele.page.run_cdp('Runtime.callFunctionOn',
functionDeclaration=js, objectId=ele.obj_id, returnByValue=False, awaitPromise=True,
userGesture=True)
@ -771,7 +791,6 @@ def _run_script(page_or_ele, script: str, as_expr: bool = False, args: tuple = N
else:
args = args or ()
if not is_js_func(script):
script = script if script.strip().startswith('return') else f'return {script}'
script = f'function(){{{script}}}'
res = page.run_cdp('Runtime.callFunctionOn',
functionDeclaration=script,
@ -784,6 +803,7 @@ def _run_script(page_or_ele, script: str, as_expr: bool = False, args: tuple = N
exceptionDetails = res.get('exceptionDetails')
if exceptionDetails:
raise RuntimeError(f'Evaluation failed: {exceptionDetails}')
# print(res.get('result'))
return _parse_js_result(page, res.get('result'))
@ -847,6 +867,19 @@ def _convert_argument(arg: Any) -> dict:
# return {'value': arg}
def _offset_scroll(ele: ChromeElement, x: int, y: int):
location = ele.location
size = ele.size
lx = location['x'] + int(x) if x is not None else location['x'] + size['width'] // 2
ly = location['y'] + int(y) if y is not None else location['y'] + size['height'] // 2
ele.page.scroll.to_location(lx - 5, ly - 5)
cl = ele.client_location
x = cl['x'] + int(x) if x is not None else cl['x'] + size['width'] // 2
y = cl['y'] + int(y) if y is not None else cl['y'] + size['height'] // 2
return x, y
class ChromeScroll(object):
"""用于滚动的对象"""
@ -968,7 +1001,7 @@ class ChromeSelect(object):
"""返回第一个被选中的option元素 \n
:return: ChromeElement对象或None
"""
ele = self._ele.run_script('this.options[this.selectedIndex];')
ele = self._ele.run_script('return this.options[this.selectedIndex];')
return ele
@property
@ -1081,9 +1114,9 @@ class ChromeSelect(object):
return True
if isinstance(text_value_index, (str, int)):
t1 = perf_counter()
ok = do_select()
while not ok and perf_counter() - t1 < timeout:
end_time = perf_counter() + timeout
while not ok and perf_counter() < end_time:
sleep(.2)
ok = do_select()
return ok
@ -1121,85 +1154,71 @@ class ChromeSelect(object):
return success
# class ElementWaiter(object):
# """等待元素在dom中某种状态如删除、显示、隐藏"""
#
# def __init__(self,
# page_or_ele,
# loc_or_ele: Union[str, tuple, ChromeElement],
# timeout: float = None):
# """等待元素在dom中某种状态如删除、显示、隐藏 \n
# :param page_or_ele: 页面或父元素
# :param loc_or_ele: 要等待的元素,可以是已有元素、定位符
# :param timeout: 超时时间,默认读取页面超时时间
# """
# if isinstance(page_or_ele, ChromeElement):
# page = page_or_ele.page
# self.driver = page_or_ele.inner_ele
# else:
# page = page_or_ele
# self.driver = page_or_ele.driver
#
# if isinstance(loc_or_ele, ChromeElement):
# self.target = loc_or_ele.inner_ele
#
# elif isinstance(loc_or_ele, str):
# self.target = str_to_loc(loc_or_ele)
#
# elif isinstance(loc_or_ele, tuple):
# self.target = loc_or_ele
#
# else:
# raise TypeError('loc_or_ele参数只能是str、tuple、DriverElement 或 WebElement类型。')
#
# self.timeout = timeout if timeout is not None else page.timeout
#
# def delete(self) -> bool:
# """等待元素从dom删除"""
# return self._wait_ele('del')
#
# def display(self) -> bool:
# """等待元素从dom显示"""
# return self._wait_ele('display')
#
# def hidden(self) -> bool:
# """等待元素从dom隐藏"""
# return self._wait_ele('hidden')
#
# def _wait_ele(self, mode: str) -> bool:
# """执行等待
# :param mode: 等待模式
# :return: 是否等待成功
# """
# if isinstance(self.target, WebElement):
# end_time = time() + self.timeout
# while time() < end_time:
# if mode == 'del':
# try:
# self.target.is_enabled()
# except Exception:
# return True
#
# elif mode == 'display' and self.target.is_displayed():
# return True
#
# elif mode == 'hidden' and not self.target.is_displayed():
# return True
#
# return False
#
# else:
# try:
# if mode == 'del':
# WebDriverWait(self.driver, self.timeout).until_not(ec.presence_of_element_located(self.target))
#
# elif mode == 'display':
# WebDriverWait(self.driver, self.timeout).until(ec.visibility_of_element_located(self.target))
#
# elif mode == 'hidden':
# WebDriverWait(self.driver, self.timeout).until_not(ec.visibility_of_element_located(self.target))
#
# return True
#
# except Exception:
# return False
class ChromeElementWaiter(object):
"""等待元素在dom中某种状态如删除、显示、隐藏"""
def __init__(self,
page_or_ele,
loc_or_ele: Union[str, tuple, ChromeElement],
timeout: float = None):
"""等待元素在dom中某种状态如删除、显示、隐藏 \n
:param page_or_ele: 页面或父元素
:param loc_or_ele: 要等待的元素可以是已有元素定位符
:param timeout: 超时时间默认读取页面超时时间
"""
if not isinstance(loc_or_ele, (str, tuple, ChromeElement)):
raise TypeError('loc_or_ele只能接收定位符或元素对象。')
self.driver = page_or_ele
self.loc_or_ele = loc_or_ele
if timeout is not None:
self.timeout = timeout
else:
self.timeout = page_or_ele.page.timeout if isinstance(page_or_ele, ChromeElement) else page_or_ele.timeout
def delete(self) -> bool:
"""等待元素从dom删除"""
if isinstance(self.loc_or_ele, ChromeElement):
end_time = perf_counter() + self.timeout
while perf_counter() < end_time:
if not self.loc_or_ele.is_alive:
return True
ele = self.driver(self.loc_or_ele, timeout=.5)
if ele is None:
return True
end_time = perf_counter() + self.timeout
while perf_counter() < end_time:
if not self.loc_or_ele.is_alive:
return True
return False
def display(self) -> bool:
"""等待元素从dom显示"""
return self._wait_ele('display')
def hidden(self) -> bool:
"""等待元素从dom隐藏"""
return self._wait_ele('hidden')
def _wait_ele(self, mode: str) -> bool:
"""执行等待
:param mode: 等待模式
:return: 是否等待成功
"""
target = self.driver(self.loc_or_ele)
if target is None:
return None
end_time = perf_counter() + self.timeout
while perf_counter() < end_time:
if mode == 'display' and target.is_displayed:
return True
elif mode == 'hidden' and not target.is_displayed:
return True
return False

View File

@ -14,7 +14,7 @@ from .config import DriverOptions, _cookies_to_tuple
from .base import BasePage
from .common import get_loc
from .drission import connect_chrome
from .chrome_element import ChromeElement, ChromeScroll, _run_script
from .chrome_element import ChromeElement, ChromeScroll, _run_script, ChromeElementWaiter
class ChromePage(BasePage):
@ -25,10 +25,10 @@ class ChromePage(BasePage):
super().__init__(timeout)
self._connect_debugger(Tab_or_Options, tab_handle)
def _ready(self):
self._alert = Alert()
self.driver.Page.javascriptDialogOpening = self._on_alert_open
self.driver.Page.javascriptDialogClosed = self._on_alert_close
# def _ready(self):
# self._alert = Alert()
# self.driver.Page.javascriptDialogOpening = self._on_alert_open
# self.driver.Page.javascriptDialogClosed = self._on_alert_close
def _connect_debugger(self, Tab_or_Options: Union[Tab, DriverOptions] = None, tab_handle: str = None):
if isinstance(Tab_or_Options, Tab):
@ -50,6 +50,10 @@ class ChromePage(BasePage):
root = self._driver.DOM.getDocument()
self.root = ChromeElement(self, node_id=root['root']['nodeId'])
self._alert = Alert()
self.driver.Page.javascriptDialogOpening = self._on_alert_open
self.driver.Page.javascriptDialogClosed = self._on_alert_close
def __call__(self, loc_or_str: Union[Tuple[str, str], str, 'ChromeElement'],
timeout: float = None) -> Union['ChromeElement', str, None]:
"""在内部查找元素 \n
@ -204,8 +208,8 @@ class ChromePage(BasePage):
search_result = self.driver.DOM.performSearch(query=loc)
count = search_result['resultCount']
t1 = perf_counter()
while count == 0 and perf_counter() - t1 < timeout:
end_time = perf_counter() + timeout
while count == 0 and perf_counter() < end_time:
search_result = self.driver.DOM.performSearch(query=loc)
count = search_result['resultCount']
@ -220,6 +224,16 @@ class ChromePage(BasePage):
else:
return [ChromeElement(self, node_id=i) for i in nodeIds['nodeIds']]
def wait_ele(self,
loc_or_ele: Union[str, tuple, ChromeElement],
timeout: float = None) -> ChromeElementWaiter:
"""等待元素从dom删除、显示、隐藏 \n
:param loc_or_ele: 可以是元素查询字符串loc元组
:param timeout: 等待超时时间
:return: 用于等待的ElementWaiter对象
"""
return ChromeElementWaiter(self, loc_or_ele, timeout)
def get_screenshot(self, path: [str, Path] = None,
as_bytes: [bool, str] = None,
full_page: bool = False,
@ -476,8 +490,8 @@ class ChromePage(BasePage):
for _ in range(times + 1):
try:
result = self.driver.Page.navigate(url=to_url)
t1 = perf_counter()
while self.ready_state != 'complete' and perf_counter() - t1 < timeout:
end_time = perf_counter() + timeout
while self.ready_state != 'complete' and perf_counter() < end_time:
sleep(.5)
if self.ready_state != 'complete':
raise TimeoutError
@ -508,12 +522,16 @@ class ChromePage(BasePage):
self._alert.text = None
self._alert.type = None
self._alert.defaultPrompt = None
self._alert.response_accept = kwargs.get['result']
self._alert.response_text = kwargs['userInput']
def _on_alert_open(self, **kwargs):
self._alert.activated = True
self._alert.text = kwargs['message']
self._alert.type = kwargs['message']
self._alert.defaultPrompt = kwargs.get('defaultPrompt', None)
self._alert.response_accept = None
self._alert.response_text = None
def handle_alert(self, accept: bool = True, send: str = None, timeout: float = None) -> Union[str, None]:
"""处理提示框 \n
@ -523,8 +541,8 @@ class ChromePage(BasePage):
:return: 提示框内容文本未等到提示框则返回None
"""
timeout = timeout or self.timeout
t1 = perf_counter()
while not self._alert.activated and perf_counter() - t1 < timeout:
end_time = perf_counter() + timeout
while not self._alert.activated and perf_counter() < end_time:
sleep(.1)
if not self._alert.activated:
return None
@ -543,6 +561,8 @@ class Alert(object):
self.text = None
self.type = None
self.defaultPrompt = None
self.response_accept = None
self.response_text = None
def _get_tabs(handles: list, num_or_handles: Union[int, str, list, tuple, set]) -> set:

View File

@ -42,7 +42,8 @@ class WebPage(SessionPage, ChromePage, BasePage):
self._response = None
if self._mode == 'd':
self._ready()
self.driver
# self._ready()
# if self._mode == 'd':
# try:
@ -199,8 +200,8 @@ class WebPage(SessionPage, ChromePage, BasePage):
# s模式转d模式
if self._mode == 'd':
if not self._has_driver:
self._ready()
# if not self._has_driver:
# self._ready()
self._has_driver = True
self._url = None if not self._has_driver else super(SessionPage, self).url