mirror of
https://gitee.com/g1879/DrissionPage.git
synced 2024-12-10 04:00:23 +08:00
继续开发新版,未完成
This commit is contained in:
parent
2d9e101c69
commit
2ec02f965f
@ -6,7 +6,7 @@
|
||||
"""
|
||||
from pathlib import Path
|
||||
from typing import Union, Tuple, List
|
||||
from time import perf_counter
|
||||
from time import perf_counter, sleep
|
||||
|
||||
from .session_element import make_session_ele
|
||||
from .base import DrissionElement
|
||||
@ -107,6 +107,19 @@ function getElementPagePosition(element){
|
||||
self._scroll = ChromeScroll(self)
|
||||
return self._scroll
|
||||
|
||||
@property
|
||||
def is_in_view(self) -> bool:
|
||||
"""返回元素是否出现在视口中,已元素中点为判断"""
|
||||
js = """function(){
|
||||
const rect = this.getBoundingClientRect();
|
||||
x = rect.left+(rect.right-rect.left)/2;
|
||||
y = rect.top+(rect.bottom-rect.top)/2;
|
||||
const vWidth = window.innerWidth || document.documentElement.clientWidth;
|
||||
const vHeight = window.innerHeight || document.documentElement.clientHeight;
|
||||
if (x< 0 || y < 0 || x > vWidth || y > vHeight){return false;}
|
||||
return true;}"""
|
||||
return self.page.driver.Runtime.callFunctionOn(functionDeclaration=js, objectId=self.obj_id)['result']['value']
|
||||
|
||||
def run_script(self, arg: 'ChromeElement'):
|
||||
js = 'function(){alert(arguments[0].value);}'
|
||||
return self.page.driver.Runtime.callFunctionOn(functionDeclaration=js, objectId=self.obj_id,
|
||||
@ -228,28 +241,50 @@ function getElementPagePosition(element){
|
||||
return self.page.get_screenshot(path, as_bytes=as_bytes, full_page=False,
|
||||
left_top=left_top, right_bottom=right_bottom)
|
||||
|
||||
def click(self, by_js: bool = False) -> None:
|
||||
def click(self, by_js: bool = None, timeout: float = None) -> None:
|
||||
"""点击元素 \n
|
||||
尝试点击直到超时,若都失败就改用js点击 \n
|
||||
:param by_js: 是否用js点击,为True时直接用js点击,为False时重试失败也不会改用js
|
||||
:param timeout: 尝试点击的超时时间,不指定则使用父页面的超时时间
|
||||
:return: 是否点击成功
|
||||
"""
|
||||
if by_js:
|
||||
|
||||
def do_it(x, y) -> bool:
|
||||
r = self.page.driver.DOM.getNodeForLocation(x=x, y=y, includeUserAgentShadowDOM=True)
|
||||
if r.get('nodeId') != self._node_id:
|
||||
return False
|
||||
self.page.driver.Input.dispatchMouseEvent(type='mousePressed', x=x, y=y, button='left', clickCount=1)
|
||||
sleep(.1)
|
||||
self.page.driver.Input.dispatchMouseEvent(type='mouseReleased', x=x, y=y, button='left')
|
||||
return True
|
||||
|
||||
if not by_js:
|
||||
self.page.scroll_to_see(self)
|
||||
if self.is_in_view:
|
||||
timeout = timeout if timeout is not None else self.page.timeout
|
||||
xy = self.client_location
|
||||
size = self.size
|
||||
ele_x = xy['x'] + size['width'] // 2
|
||||
ele_y = xy['y'] + size['height'] // 2
|
||||
|
||||
t1 = perf_counter()
|
||||
click = do_it(ele_x, ele_y)
|
||||
while not click and perf_counter() - t1 <= timeout:
|
||||
print('ss')
|
||||
click = do_it(ele_x, ele_y)
|
||||
|
||||
if click:
|
||||
return True
|
||||
|
||||
if by_js is not False:
|
||||
js = 'function(){this.click();}'
|
||||
self.page.driver.Runtime.callFunctionOn(functionDeclaration=js, objectId=self._obj_id)
|
||||
return
|
||||
return True
|
||||
|
||||
self.page.driver.DOM.scrollIntoViewIfNeeded(nodeId=self._node_id)
|
||||
xy = self.client_location
|
||||
size = self.size
|
||||
x = xy['x'] + size['width'] // 2
|
||||
y = xy['y'] + size['height'] // 2
|
||||
self.page.driver.Input.dispatchMouseEvent(type='mousePressed', x=x, y=y, button='left', clickCount=1)
|
||||
self.page.driver.Input.dispatchMouseEvent(type='mouseReleased', x=x, y=y, button='left')
|
||||
return False
|
||||
|
||||
# js = """function(){const event=new MouseEvent('click',{view:window, bubbles:true, cancelable:true});
|
||||
# this.dispatchEvent(event);}"""
|
||||
# self.page.driver.Runtime.callFunctionOn(functionDeclaration=js, objectId=self._obj_id)
|
||||
def click_at(self):
|
||||
pass
|
||||
|
||||
def _get_obj_id(self, node_id) -> str:
|
||||
return self.page.driver.DOM.resolveNode(nodeId=node_id)['object']['objectId']
|
||||
|
@ -1,6 +1,4 @@
|
||||
# -*- coding:utf-8 -*-
|
||||
from base64 import b64decode
|
||||
from math import inf
|
||||
from pathlib import Path
|
||||
from time import perf_counter, sleep
|
||||
from typing import Union, Tuple, List, Any
|
||||
@ -106,6 +104,11 @@ class ChromePage(BasePage):
|
||||
return {'height': h, 'width': w}
|
||||
|
||||
def run_script(self, script: str, *args: Any) -> Any:
|
||||
"""运行javascript代码 \n
|
||||
:param script: js文本
|
||||
:param args: 参数,按顺序在js文本中对应argument[0]、argument[2]...
|
||||
:return:
|
||||
"""
|
||||
if not args and not is_js_func(script):
|
||||
res = self.run_cdp('Runtime.evaluate',
|
||||
expression=script,
|
||||
@ -116,6 +119,7 @@ class ChromePage(BasePage):
|
||||
else:
|
||||
res = self.run_cdp('Runtime.callFunctionOn',
|
||||
functionDeclaration=script,
|
||||
objectId=self.root.obj_id,
|
||||
# 'executionContextId': self._contextId,
|
||||
arguments=[convert_argument(arg) for arg in args],
|
||||
returnByValue=False,
|
||||
@ -252,6 +256,7 @@ class ChromePage(BasePage):
|
||||
else:
|
||||
png = self.driver.Page.captureScreenshot(format=pic_type)['data']
|
||||
|
||||
from base64 import b64decode
|
||||
png = b64decode(png)
|
||||
|
||||
if as_bytes:
|
||||
@ -448,7 +453,8 @@ class ChromePage(BasePage):
|
||||
webSocketDebuggerUrl=f'ws://{self.debugger_address}/devtools/page/{tab_handle}')
|
||||
self.driver.start()
|
||||
self.driver.DOM.enable()
|
||||
self.driver.DOM.getDocument()
|
||||
root = self.driver.DOM.getDocument()
|
||||
self.root = ChromeElement(self, node_id=root['root']['nodeId'])
|
||||
|
||||
def _d_connect(self,
|
||||
to_url: str,
|
||||
@ -514,27 +520,50 @@ def _get_tabs(handles: list, num_or_handles: Union[int, str, list, tuple, set])
|
||||
|
||||
def _parse_js_result(page: ChromePage, result: dict):
|
||||
"""解析js返回的结果"""
|
||||
if 'unserializableValue' in result:
|
||||
return result['unserializableValue']
|
||||
|
||||
the_type = result['type']
|
||||
if the_type in ('string', 'number', 'boolean'):
|
||||
return result['value']
|
||||
elif the_type == 'undefined':
|
||||
return None
|
||||
elif the_type == 'object':
|
||||
|
||||
if the_type == 'object':
|
||||
sub_type = result['subtype']
|
||||
if sub_type == 'null':
|
||||
return None
|
||||
|
||||
elif sub_type == 'node':
|
||||
return ChromeElement(page, obj_id=result['objectId'])
|
||||
|
||||
elif sub_type == 'array':
|
||||
r = page.driver.Runtime.getProperties(objectId=result['result']['objectId'], ownProperties=True)['result']
|
||||
return [_parse_js_result(page, result=i['value']) for i in r]
|
||||
|
||||
else:
|
||||
return result['value']
|
||||
|
||||
elif the_type == 'undefined':
|
||||
return None
|
||||
|
||||
# elif the_type in ('string', 'number', 'boolean'):
|
||||
# return result['value']
|
||||
|
||||
else:
|
||||
return result['value']
|
||||
|
||||
|
||||
def convert_argument(arg: Any) -> dict:
|
||||
pass
|
||||
"""把参数转换成js能够接收的形式"""
|
||||
# if arg == inf:
|
||||
# return {'unserializableValue': 'Infinity'}
|
||||
# if arg == -inf:
|
||||
# return {'unserializableValue': '-Infinity'}
|
||||
if isinstance(arg, ChromeElement):
|
||||
return {'objectId': arg.obj_id}
|
||||
|
||||
elif isinstance(arg, int, float, str, bool):
|
||||
return {'value': arg}
|
||||
|
||||
from math import inf
|
||||
if arg == inf:
|
||||
return {'unserializableValue': 'Infinity'}
|
||||
if arg == -inf:
|
||||
return {'unserializableValue': '-Infinity'}
|
||||
|
||||
# objectHandle = arg if isinstance(arg, JSHandle) else None
|
||||
# if objectHandle:
|
||||
# if objectHandle._context != self:
|
||||
|
@ -483,8 +483,10 @@ def _run_browser(port, path: str, args: set) -> Popen:
|
||||
t1 = perf_counter()
|
||||
while perf_counter() - t1 < 10:
|
||||
try:
|
||||
requests_get(f'http://127.0.0.1:{port}/json')
|
||||
return debugger
|
||||
tabs = requests_get(f'http://127.0.0.1:{port}/json').json()
|
||||
for tab in tabs:
|
||||
if tab['type'] == 'page':
|
||||
return debugger
|
||||
except requests_connection_err:
|
||||
pass
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user