mirror of
https://gitee.com/g1879/DrissionPage.git
synced 2024-12-10 04:00:23 +08:00
继续开发新版,未完成
This commit is contained in:
parent
2ec02f965f
commit
4b43798a9e
@ -5,12 +5,12 @@
|
||||
@File : chrome_element.py
|
||||
"""
|
||||
from pathlib import Path
|
||||
from typing import Union, Tuple, List
|
||||
from typing import Union, Tuple, List, Any
|
||||
from time import perf_counter, sleep
|
||||
|
||||
from .session_element import make_session_ele
|
||||
from .base import DrissionElement
|
||||
from .common import make_absolute_link, get_loc, get_ele_txt, format_html
|
||||
from .common import make_absolute_link, get_loc, get_ele_txt, format_html, is_js_func
|
||||
|
||||
|
||||
class ChromeElement(DrissionElement):
|
||||
@ -52,8 +52,7 @@ class ChromeElement(DrissionElement):
|
||||
@property
|
||||
def inner_html(self) -> str:
|
||||
"""返回元素innerHTML文本"""
|
||||
return self.page.driver.Runtime.callFunctionOn(functionDeclaration='function(){return this.innerHTML;}',
|
||||
objectId=self._obj_id)['result']['value']
|
||||
return self.run_script('this.innerHTML;')['result']['value']
|
||||
|
||||
@property
|
||||
def attrs(self) -> dict:
|
||||
@ -70,9 +69,8 @@ class ChromeElement(DrissionElement):
|
||||
@property
|
||||
def client_location(self) -> dict:
|
||||
"""返回元素左上角坐标"""
|
||||
js = '''function(){
|
||||
return this.getBoundingClientRect().left.toString()+" "+this.getBoundingClientRect().top.toString();}'''
|
||||
xy = self.page.driver.Runtime.callFunctionOn(functionDeclaration=js, objectId=self._obj_id)['result']['value']
|
||||
js = 'this.getBoundingClientRect().left.toString()+" "+this.getBoundingClientRect().top.toString();'
|
||||
xy = self.run_script(js)['result']['value']
|
||||
x, y = xy.split(' ')
|
||||
return {'x': int(x.split('.')[0]), 'y': int(y.split('.')[0])}
|
||||
|
||||
@ -96,7 +94,7 @@ function getElementPagePosition(element){
|
||||
return actualLeft.toString() +' '+actualTop.toString();
|
||||
}
|
||||
return getElementPagePosition(this);}'''
|
||||
xy = self.page.driver.Runtime.callFunctionOn(functionDeclaration=js, objectId=self._obj_id)['result']['value']
|
||||
xy = self.run_script(js)['result']['value']
|
||||
x, y = xy.split(' ')
|
||||
return {'x': int(x.split('.')[0]), 'y': int(y.split('.')[0])}
|
||||
|
||||
@ -118,12 +116,16 @@ const vWidth = window.innerWidth || document.documentElement.clientWidth;
|
||||
const vHeight = window.innerHeight || document.documentElement.clientHeight;
|
||||
if (x< 0 || y < 0 || x > vWidth || y > vHeight){return false;}
|
||||
return true;}"""
|
||||
return self.page.driver.Runtime.callFunctionOn(functionDeclaration=js, objectId=self.obj_id)['result']['value']
|
||||
return self.run_script(js)['result']['value']
|
||||
|
||||
def run_script(self, arg: 'ChromeElement'):
|
||||
js = 'function(){alert(arguments[0].value);}'
|
||||
return self.page.driver.Runtime.callFunctionOn(functionDeclaration=js, objectId=self.obj_id,
|
||||
arguments=[{'objectId': arg.obj_id}])
|
||||
def run_script(self, script: str, as_expr: bool = False, *args: Any) -> Any:
|
||||
"""运行javascript代码 \n
|
||||
:param script: js文本
|
||||
:param as_expr: 是否作为表达式运行,为True时args无效
|
||||
:param args: 参数,按顺序在js文本中对应argument[0]、argument[2]...
|
||||
:return: 运行的结果
|
||||
"""
|
||||
return run_script(self, script, as_expr, *args)
|
||||
|
||||
def ele(self,
|
||||
loc_or_str: Union[Tuple[str, str], str],
|
||||
@ -211,8 +213,7 @@ return true;}"""
|
||||
:return: 是否设置成功
|
||||
"""
|
||||
value = value.replace("'", "\\'")
|
||||
r = self.page.driver.Runtime.callFunctionOn(functionDeclaration=f"function(){{this.{prop}='{value}';}}",
|
||||
objectId=self._obj_id)
|
||||
r = self.run_script(f'this.{prop}="{value}";')
|
||||
if 'exceptionDetails' in r:
|
||||
raise SyntaxError(r['result']['description'])
|
||||
|
||||
@ -225,7 +226,7 @@ return true;}"""
|
||||
if pseudo_ele:
|
||||
pseudo_ele = f', "{pseudo_ele}"' if pseudo_ele.startswith(':') else f', "::{pseudo_ele}"'
|
||||
js = f'function(){{return window.getComputedStyle(this{pseudo_ele}).getPropertyValue("{style}");}}'
|
||||
return self.page.driver.Runtime.callFunctionOn(functionDeclaration=js, objectId=self._obj_id)
|
||||
return self.run_script(js)
|
||||
|
||||
def get_screenshot(self, path: [str, Path] = None,
|
||||
as_bytes: [bool, str] = None) -> Union[str, bytes]:
|
||||
@ -241,7 +242,57 @@ return true;}"""
|
||||
return self.page.get_screenshot(path, as_bytes=as_bytes, full_page=False,
|
||||
left_top=left_top, right_bottom=right_bottom)
|
||||
|
||||
def click(self, by_js: bool = None, timeout: float = None) -> None:
|
||||
# def input(self,
|
||||
# vals: Union[str, tuple],
|
||||
# clear: bool = True,
|
||||
# insure: bool = True,
|
||||
# timeout: float = None) -> bool:
|
||||
# """输入文本或组合键,也可用于输入文件路径到input元素(文件间用\n间隔) \n
|
||||
# :param vals: 文本值或按键组合
|
||||
# :param clear: 输入前是否清空文本框
|
||||
# :param insure: 确保输入正确,解决文本框有时输入失效的问题,不能用于输入组合键
|
||||
# :param timeout: 尝试输入的超时时间,不指定则使用父页面的超时时间,只在insure为True时生效
|
||||
# :return: bool
|
||||
# """
|
||||
# if not insure or self.tag != 'input' or self.prop('type') != 'text': # 普通输入
|
||||
# if not isinstance(vals, (str, tuple)):
|
||||
# vals = str(vals)
|
||||
# if clear:
|
||||
# self.inner_ele.clear()
|
||||
#
|
||||
# self.inner_ele.send_keys(*vals)
|
||||
# return True
|
||||
#
|
||||
# else: # 确保输入正确
|
||||
# if not isinstance(vals, str):
|
||||
# vals = str(vals)
|
||||
# enter = '\n' if vals.endswith('\n') else None
|
||||
# full_txt = vals if clear else f'{self.attr("value")}{vals}'
|
||||
# full_txt = full_txt.rstrip('\n')
|
||||
#
|
||||
# self.click(by_js=True)
|
||||
# timeout = timeout if timeout is not None else self.page.timeout
|
||||
# t1 = perf_counter()
|
||||
# while self.is_valid() and self.attr('value') != full_txt and perf_counter() - t1 <= timeout:
|
||||
# try:
|
||||
# if clear:
|
||||
# self.inner_ele.send_keys(u'\ue009', 'a', u'\ue017') # 有些ui下clear()不生效,用CTRL+a代替
|
||||
# self.inner_ele.send_keys(vals)
|
||||
#
|
||||
# except Exception:
|
||||
# pass
|
||||
#
|
||||
# if not self.is_valid():
|
||||
# return False
|
||||
# else:
|
||||
# if self.attr('value') != full_txt:
|
||||
# return False
|
||||
# else:
|
||||
# if enter:
|
||||
# self.inner_ele.send_keys(enter)
|
||||
# return True
|
||||
|
||||
def click(self, by_js: bool = None, timeout: float = None) -> bool:
|
||||
"""点击元素 \n
|
||||
尝试点击直到超时,若都失败就改用js点击 \n
|
||||
:param by_js: 是否用js点击,为True时直接用js点击,为False时重试失败也不会改用js
|
||||
@ -249,13 +300,13 @@ return true;}"""
|
||||
:return: 是否点击成功
|
||||
"""
|
||||
|
||||
def do_it(x, y) -> bool:
|
||||
r = self.page.driver.DOM.getNodeForLocation(x=x, y=y, includeUserAgentShadowDOM=True)
|
||||
def do_it(cx, cy, lx, ly) -> bool:
|
||||
r = self.page.driver.DOM.getNodeForLocation(x=lx + 1, y=ly + 1)
|
||||
if r.get('nodeId') != self._node_id:
|
||||
return False
|
||||
self.page.driver.Input.dispatchMouseEvent(type='mousePressed', x=x, y=y, button='left', clickCount=1)
|
||||
self.page.driver.Input.dispatchMouseEvent(type='mousePressed', x=cx, y=cy, button='left', clickCount=1)
|
||||
sleep(.1)
|
||||
self.page.driver.Input.dispatchMouseEvent(type='mouseReleased', x=x, y=y, button='left')
|
||||
self.page.driver.Input.dispatchMouseEvent(type='mouseReleased', x=cx, y=cy, button='left')
|
||||
return True
|
||||
|
||||
if not by_js:
|
||||
@ -263,22 +314,24 @@ return true;}"""
|
||||
if self.is_in_view:
|
||||
timeout = timeout if timeout is not None else self.page.timeout
|
||||
xy = self.client_location
|
||||
location = self.location
|
||||
size = self.size
|
||||
ele_x = xy['x'] + size['width'] // 2
|
||||
ele_y = xy['y'] + size['height'] // 2
|
||||
client_x = xy['x'] + size['width'] // 2
|
||||
client_y = xy['y'] + size['height'] // 2
|
||||
loc_x = location['x'] + size['width'] // 2
|
||||
loc_y = location['y'] + size['height'] // 2
|
||||
|
||||
t1 = perf_counter()
|
||||
click = do_it(ele_x, ele_y)
|
||||
click = do_it(client_x, client_y, loc_x, loc_y)
|
||||
while not click and perf_counter() - t1 <= timeout:
|
||||
print('ss')
|
||||
click = do_it(ele_x, ele_y)
|
||||
click = do_it(client_x, client_y, location['x'], location['y'])
|
||||
|
||||
if click:
|
||||
return True
|
||||
|
||||
if by_js is not False:
|
||||
js = 'function(){this.click();}'
|
||||
self.page.driver.Runtime.callFunctionOn(functionDeclaration=js, objectId=self._obj_id)
|
||||
self.run_script(js)
|
||||
return True
|
||||
|
||||
return False
|
||||
@ -344,7 +397,7 @@ return true;}"""
|
||||
}
|
||||
return e(this);}
|
||||
'''
|
||||
t = self.page.driver.Runtime.callFunctionOn(functionDeclaration=js, objectId=self._obj_id)['result']['value']
|
||||
t = self.run_script(js)['result']['value']
|
||||
return f':root{t}' if mode == 'css' else t
|
||||
|
||||
|
||||
@ -386,14 +439,14 @@ def _find_by_xpath(ele: ChromeElement, xpath: str, single: bool, timeout: float)
|
||||
type_txt = '9' if single else '7'
|
||||
node_txt = 'this.contentDocument' if ele.tag in ('iframe', 'frame') else 'this'
|
||||
js = _make_js(xpath, type_txt, node_txt)
|
||||
r = ele.page.driver.Runtime.callFunctionOn(functionDeclaration=js, objectId=ele.obj_id)
|
||||
r = ele.run_script(js)
|
||||
if r['result']['type'] == 'string':
|
||||
return r['result']['value']
|
||||
|
||||
if 'exceptionDetails' in r:
|
||||
if 'The result is not a node set' in r['result']['description']:
|
||||
js = _make_js(xpath, '1', node_txt)
|
||||
r = ele.page.driver.Runtime.callFunctionOn(functionDeclaration=js, objectId=ele.obj_id)
|
||||
r = ele.run_script(js)
|
||||
return r['result']['value']
|
||||
else:
|
||||
raise SyntaxError(f'查询语句错误:\n{r}')
|
||||
@ -401,7 +454,7 @@ def _find_by_xpath(ele: ChromeElement, xpath: str, single: bool, timeout: float)
|
||||
t1 = perf_counter()
|
||||
while (r['result']['subtype'] == 'null'
|
||||
or r['result']['description'] == 'NodeList(0)') and perf_counter() - t1 < timeout:
|
||||
r = ele.page.driver.Runtime.callFunctionOn(functionDeclaration=js, objectId=ele.obj_id)
|
||||
r = ele.run_script(js)
|
||||
|
||||
if single:
|
||||
if r['result']['subtype'] == 'null':
|
||||
@ -422,15 +475,15 @@ def _find_by_xpath(ele: ChromeElement, xpath: str, single: bool, timeout: float)
|
||||
def _find_by_css(ele: ChromeElement, selector: str, single: bool, timeout: float):
|
||||
selector = selector.replace('"', r'\"')
|
||||
find_all = '' if single else 'All'
|
||||
js = f'function(){{return this.querySelector{find_all}("{selector}");}}'
|
||||
r = ele.page.driver.Runtime.callFunctionOn(functionDeclaration=js, objectId=ele.obj_id)
|
||||
js = f'this.querySelector{find_all}("{selector}");'
|
||||
r = ele.run_script(js)
|
||||
if 'exceptionDetails' in r:
|
||||
raise SyntaxError(f'查询语句错误:\n{r}')
|
||||
|
||||
t1 = perf_counter()
|
||||
while (r['result']['subtype'] == 'null'
|
||||
or r['result']['description'] == 'NodeList(0)') and perf_counter() - t1 < timeout:
|
||||
r = ele.page.driver.Runtime.callFunctionOn(functionDeclaration=js, objectId=ele.obj_id)
|
||||
r = ele.run_script(js)
|
||||
|
||||
if single:
|
||||
if r['result']['subtype'] == 'null':
|
||||
@ -481,6 +534,106 @@ else{a.push(e.snapshotItem(i));}}"""
|
||||
return js
|
||||
|
||||
|
||||
def run_script(page_or_ele, script: str, as_expr: bool = False, *args: Any) -> Any:
|
||||
"""运行javascript代码 \n
|
||||
:param page_or_ele: 页面对象或元素对象
|
||||
:param script: js文本
|
||||
:param as_expr: 是否作为表达式运行,为True时args无效
|
||||
:param args: 参数,按顺序在js文本中对应argument[0]、argument[2]...
|
||||
:return:
|
||||
"""
|
||||
if isinstance(page_or_ele, ChromeElement):
|
||||
page = page_or_ele.page
|
||||
obj_id = page_or_ele.obj_id
|
||||
else:
|
||||
page = page_or_ele
|
||||
obj_id = page_or_ele.root.obj_id
|
||||
|
||||
if as_expr:
|
||||
res = page.run_cdp('Runtime.evaluate',
|
||||
expression=script,
|
||||
# contextId=self._contextId,
|
||||
returnByValue=False,
|
||||
awaitPromise=True,
|
||||
userGesture=True)
|
||||
else:
|
||||
if not is_js_func(script):
|
||||
script = f'function(){{return {script}}}'
|
||||
res = page.run_cdp('Runtime.callFunctionOn',
|
||||
functionDeclaration=script,
|
||||
objectId=obj_id,
|
||||
# 'executionContextId': self._contextId,
|
||||
arguments=[_convert_argument(arg) for arg in args],
|
||||
returnByValue=False,
|
||||
awaitPromise=True,
|
||||
userGesture=True)
|
||||
|
||||
exceptionDetails = res.get('exceptionDetails')
|
||||
if exceptionDetails:
|
||||
raise RuntimeError(f'Evaluation failed: {exceptionDetails}')
|
||||
return _parse_js_result(page, res.get('result'))
|
||||
|
||||
|
||||
def _parse_js_result(page, result: dict):
|
||||
"""解析js返回的结果"""
|
||||
if 'unserializableValue' in result:
|
||||
return result['unserializableValue']
|
||||
|
||||
the_type = result['type']
|
||||
|
||||
if the_type == 'object':
|
||||
sub_type = result['subtype']
|
||||
if sub_type == 'null':
|
||||
return None
|
||||
|
||||
elif sub_type == 'node':
|
||||
return ChromeElement(page, obj_id=result['objectId'])
|
||||
|
||||
elif sub_type == 'array':
|
||||
r = page.driver.Runtime.getProperties(objectId=result['result']['objectId'], ownProperties=True)['result']
|
||||
return [_parse_js_result(page, result=i['value']) for i in r]
|
||||
|
||||
else:
|
||||
return result['value']
|
||||
|
||||
elif the_type == 'undefined':
|
||||
return None
|
||||
|
||||
# elif the_type in ('string', 'number', 'boolean'):
|
||||
# return result['value']
|
||||
|
||||
else:
|
||||
return result['value']
|
||||
|
||||
|
||||
def _convert_argument(arg: Any) -> dict:
|
||||
"""把参数转换成js能够接收的形式"""
|
||||
if isinstance(arg, ChromeElement):
|
||||
return {'objectId': arg.obj_id}
|
||||
|
||||
elif isinstance(arg, (int, float, str, bool)):
|
||||
return {'value': arg}
|
||||
|
||||
from math import inf
|
||||
if arg == inf:
|
||||
return {'unserializableValue': 'Infinity'}
|
||||
if arg == -inf:
|
||||
return {'unserializableValue': '-Infinity'}
|
||||
|
||||
# objectHandle = arg if isinstance(arg, JSHandle) else None
|
||||
# if objectHandle:
|
||||
# if objectHandle._context != self:
|
||||
# raise ElementHandleError('JSHandles can be evaluated only in the context they were created!')
|
||||
# if objectHandle._disposed:
|
||||
# raise ElementHandleError('JSHandle is disposed!')
|
||||
# if objectHandle._remoteObject.get('unserializableValue'):
|
||||
# return {'unserializableValue': objectHandle._remoteObject.get('unserializableValue')} # noqa: E501
|
||||
# if not objectHandle._remoteObject.get('objectId'):
|
||||
# return {'value': objectHandle._remoteObject.get('value')}
|
||||
# return {'objectId': objectHandle._remoteObject.get('objectId')}
|
||||
# return {'value': arg}
|
||||
|
||||
|
||||
class ChromeScroll(object):
|
||||
"""用于滚动的对象"""
|
||||
|
||||
@ -501,8 +654,7 @@ class ChromeScroll(object):
|
||||
def _run_script(self, js: str):
|
||||
js = js.format(self.t1, self.t2, self.t2)
|
||||
if self.obj_id:
|
||||
js = f'function(){{{js}}}'
|
||||
self.page.driver.Runtime.callFunctionOn(functionDeclaration=js, objectId=self.obj_id)
|
||||
self.page.run_script(js)
|
||||
else:
|
||||
self.page.driver.Runtime.evaluate(expression=js)
|
||||
|
||||
|
@ -8,14 +8,14 @@ from requests import get as requests_get
|
||||
from json import loads
|
||||
|
||||
from .base import BasePage
|
||||
from .common import get_loc, is_js_func
|
||||
from .common import get_loc
|
||||
from .drission import connect_chrome
|
||||
from .chrome_element import ChromeElement, ChromeScroll
|
||||
from .chrome_element import ChromeElement, ChromeScroll, run_script
|
||||
|
||||
|
||||
class ChromePage(BasePage):
|
||||
|
||||
def __init__(self, address: str,
|
||||
def __init__(self, address: str = '127.0.0.1:9222',
|
||||
path: str = 'chrome',
|
||||
tab_handle: str = None,
|
||||
args: list = None,
|
||||
@ -103,33 +103,14 @@ class ChromePage(BasePage):
|
||||
h = self.driver.Runtime.evaluate(expression='document.body.scrollHeight;')['result']['value']
|
||||
return {'height': h, 'width': w}
|
||||
|
||||
def run_script(self, script: str, *args: Any) -> Any:
|
||||
def run_script(self, script: str, as_expr: bool = False, *args: Any) -> Any:
|
||||
"""运行javascript代码 \n
|
||||
:param script: js文本
|
||||
:param as_expr: 是否作为表达式运行,为True时args无效
|
||||
:param args: 参数,按顺序在js文本中对应argument[0]、argument[2]...
|
||||
:return:
|
||||
:return: 运行的结果
|
||||
"""
|
||||
if not args and not is_js_func(script):
|
||||
res = self.run_cdp('Runtime.evaluate',
|
||||
expression=script,
|
||||
returnByValue=False,
|
||||
awaitPromise=True,
|
||||
userGesture=True)
|
||||
|
||||
else:
|
||||
res = self.run_cdp('Runtime.callFunctionOn',
|
||||
functionDeclaration=script,
|
||||
objectId=self.root.obj_id,
|
||||
# 'executionContextId': self._contextId,
|
||||
arguments=[convert_argument(arg) for arg in args],
|
||||
returnByValue=False,
|
||||
awaitPromise=True,
|
||||
userGesture=True)
|
||||
|
||||
exceptionDetails = res.get('exceptionDetails')
|
||||
if exceptionDetails:
|
||||
raise RuntimeError(f'Evaluation failed: {exceptionDetails}')
|
||||
return _parse_js_result(self, res.get('result'))
|
||||
return run_script(self, script, as_expr, *args)
|
||||
|
||||
def get(self,
|
||||
url: str,
|
||||
@ -516,63 +497,3 @@ def _get_tabs(handles: list, num_or_handles: Union[int, str, list, tuple, set])
|
||||
raise TypeError('num_or_handle参数只能是int、str、list、set 或 tuple类型。')
|
||||
|
||||
return set(i if isinstance(i, str) else handles[i] for i in num_or_handles)
|
||||
|
||||
|
||||
def _parse_js_result(page: ChromePage, result: dict):
|
||||
"""解析js返回的结果"""
|
||||
if 'unserializableValue' in result:
|
||||
return result['unserializableValue']
|
||||
|
||||
the_type = result['type']
|
||||
|
||||
if the_type == 'object':
|
||||
sub_type = result['subtype']
|
||||
if sub_type == 'null':
|
||||
return None
|
||||
|
||||
elif sub_type == 'node':
|
||||
return ChromeElement(page, obj_id=result['objectId'])
|
||||
|
||||
elif sub_type == 'array':
|
||||
r = page.driver.Runtime.getProperties(objectId=result['result']['objectId'], ownProperties=True)['result']
|
||||
return [_parse_js_result(page, result=i['value']) for i in r]
|
||||
|
||||
else:
|
||||
return result['value']
|
||||
|
||||
elif the_type == 'undefined':
|
||||
return None
|
||||
|
||||
# elif the_type in ('string', 'number', 'boolean'):
|
||||
# return result['value']
|
||||
|
||||
else:
|
||||
return result['value']
|
||||
|
||||
|
||||
def convert_argument(arg: Any) -> dict:
|
||||
"""把参数转换成js能够接收的形式"""
|
||||
if isinstance(arg, ChromeElement):
|
||||
return {'objectId': arg.obj_id}
|
||||
|
||||
elif isinstance(arg, int, float, str, bool):
|
||||
return {'value': arg}
|
||||
|
||||
from math import inf
|
||||
if arg == inf:
|
||||
return {'unserializableValue': 'Infinity'}
|
||||
if arg == -inf:
|
||||
return {'unserializableValue': '-Infinity'}
|
||||
|
||||
# objectHandle = arg if isinstance(arg, JSHandle) else None
|
||||
# if objectHandle:
|
||||
# if objectHandle._context != self:
|
||||
# raise ElementHandleError('JSHandles can be evaluated only in the context they were created!')
|
||||
# if objectHandle._disposed:
|
||||
# raise ElementHandleError('JSHandle is disposed!')
|
||||
# if objectHandle._remoteObject.get('unserializableValue'):
|
||||
# return {'unserializableValue': objectHandle._remoteObject.get('unserializableValue')} # noqa: E501
|
||||
# if not objectHandle._remoteObject.get('objectId'):
|
||||
# return {'value': objectHandle._remoteObject.get('value')}
|
||||
# return {'objectId': objectHandle._remoteObject.get('objectId')}
|
||||
# return {'value': arg}
|
||||
|
Loading…
x
Reference in New Issue
Block a user