基本完成

This commit is contained in:
g1879 2022-11-09 17:36:37 +08:00
parent 9020d9a781
commit 569ab2e542
7 changed files with 541 additions and 207 deletions

View File

@ -0,0 +1,228 @@
# -*- coding:utf-8 -*-
"""
@Author : g1879
@Contact : g1879@qq.com
@File : shadow_root_element.py
"""
from time import perf_counter
from typing import Union, Any, Tuple, List
from selenium.webdriver.remote.webelement import WebElement
from .base import BaseElement
from .common import get_loc
from .driver_element import make_driver_ele, DriverElement
from .session_element import make_session_ele, SessionElement
class ChromeShadowRootElement(BaseElement):
"""ChromeShadowRootElement是用于处理ShadowRoot的类使用方法和ChromeElement基本一致"""
def __init__(self, parent_ele: DriverElement):
super().__init__(parent_ele.page)
self.parent_ele = parent_ele
def __repr__(self) -> str:
return f'<ShadowRootElement in {self.parent_ele} >'
def __call__(self,
loc_or_str: Union[Tuple[str, str], str],
timeout: float = None) -> Union[DriverElement, str, None]:
"""在内部查找元素 \n
ele2 = ele1('@id=ele_id') \n
:param loc_or_str: 元素的定位信息可以是loc元组或查询字符串
:param timeout: 超时时间
:return: DriverElement对象或属性文本
"""
return self.ele(loc_or_str, timeout)
@property
def tag(self) -> str:
"""元素标签名"""
return 'shadow-root'
# @property
# def html(self) -> str:
# return f'<shadow_root>{self.inner_html}</shadow_root>'
#
# @property
# def inner_html(self) -> str:
# """返回内部的html文本"""
# shadow_root = WebElement(self.page.driver, self.inner_ele._id)
# return shadow_root.get_attribute('innerHTML')
#
# def parent(self, level_or_loc: Union[str, int] = 1) -> DriverElement:
# """返回上面某一级父元素,可指定层数或用查询语法定位 \n
# :param level_or_loc: 第几级父元素,或定位符
# :return: DriverElement对象
# """
# if isinstance(level_or_loc, int):
# loc = f'xpath:./ancestor-or-self::*[{level_or_loc}]'
#
# elif isinstance(level_or_loc, (tuple, str)):
# loc = get_loc(level_or_loc, True)
#
# if loc[0] == 'css selector':
# raise ValueError('此css selector语法不受支持请换成xpath。')
#
# loc = f'xpath:./ancestor-or-self::{loc[1].lstrip(". / ")}'
#
# else:
# raise TypeError('level_or_loc参数只能是tuple、int或str。')
#
# return self.parent_ele.ele(loc, timeout=0)
#
# def next(self,
# index: int = 1,
# filter_loc: Union[tuple, str] = '') -> Union[DriverElement, str, None]:
# """返回后面的一个兄弟元素,可用查询语法筛选,可指定返回筛选结果的第几个 \n
# :param index: 第几个查询结果元素
# :param filter_loc: 用于筛选元素的查询语法
# :return: DriverElement对象
# """
# nodes = self.nexts(filter_loc=filter_loc)
# return nodes[index - 1] if nodes else None
#
# def before(self,
# index: int = 1,
# filter_loc: Union[tuple, str] = '') -> Union[DriverElement, str, None]:
# """返回前面的一个兄弟元素,可用查询语法筛选,可指定返回筛选结果的第几个 \n
# :param index: 前面第几个查询结果元素
# :param filter_loc: 用于筛选元素的查询语法
# :return: 本元素前面的某个元素或节点
# """
# nodes = self.befores(filter_loc=filter_loc)
# return nodes[index - 1] if nodes else None
#
# def after(self, index: int = 1,
# filter_loc: Union[tuple, str] = '') -> Union[DriverElement, str, None]:
# """返回后面的一个兄弟元素,可用查询语法筛选,可指定返回筛选结果的第几个 \n
# :param index: 后面第几个查询结果元素
# :param filter_loc: 用于筛选元素的查询语法
# :return: 本元素后面的某个元素或节点
# """
# nodes = self.afters(filter_loc=filter_loc)
# return nodes[index - 1] if nodes else None
#
# def nexts(self, filter_loc: Union[tuple, str] = '') -> List[Union[DriverElement, str]]:
# """返回后面所有兄弟元素或节点组成的列表 \n
# :param filter_loc: 用于筛选元素的查询语法
# :return: DriverElement对象组成的列表
# """
# loc = get_loc(filter_loc, True)
# if loc[0] == 'css selector':
# raise ValueError('此css selector语法不受支持请换成xpath。')
#
# loc = loc[1].lstrip('./')
# xpath = f'xpath:./{loc}'
# return self.parent_ele.eles(xpath, timeout=0.1)
#
# def befores(self, filter_loc: Union[tuple, str] = '') -> List[Union[DriverElement, str]]:
# """返回后面全部兄弟元素或节点组成的列表,可用查询语法筛选 \n
# :param filter_loc: 用于筛选元素的查询语法
# :return: 本元素前面的元素或节点组成的列表
# """
# loc = get_loc(filter_loc, True)
# if loc[0] == 'css selector':
# raise ValueError('此css selector语法不受支持请换成xpath。')
#
# loc = loc[1].lstrip('./')
# xpath = f'xpath:./preceding::{loc}'
# return self.parent_ele.eles(xpath, timeout=0.1)
#
# def afters(self, filter_loc: Union[tuple, str] = '') -> List[Union[DriverElement, str]]:
# """返回前面全部兄弟元素或节点组成的列表,可用查询语法筛选 \n
# :param filter_loc: 用于筛选元素的查询语法
# :return: 本元素后面的元素或节点组成的列表
# """
# eles1 = self.nexts(filter_loc)
# loc = get_loc(filter_loc, True)[1].lstrip('./')
# xpath = f'xpath:./following::{loc}'
# return eles1 + self.parent_ele.eles(xpath, timeout=0.1)
#
# def ele(self,
# loc_or_str: Union[Tuple[str, str], str],
# timeout: float = None) -> Union[DriverElement, str, None]:
# """返回当前元素下级符合条件的第一个元素,默认返回 \n
# :param loc_or_str: 元素的定位信息可以是loc元组或查询字符串
# :param timeout: 查找元素超时时间,默认与元素所在页面等待时间一致
# :return: DriverElement对象或属性、文本
# """
# return self._ele(loc_or_str, timeout)
#
# def eles(self,
# loc_or_str: Union[Tuple[str, str], str],
# timeout: float = None) -> List[Union[DriverElement, str]]:
# """返回当前元素下级所有符合条件的子元素 \n
# :param loc_or_str: 元素的定位信息可以是loc元组或查询字符串
# :param timeout: 查找元素超时时间,默认与元素所在页面等待时间一致
# :return: DriverElement对象或属性、文本组成的列表
# """
# return self._ele(loc_or_str, timeout=timeout, single=False)
#
# def s_ele(self, loc_or_ele=None) -> Union[SessionElement, str, None]:
# """查找第一个符合条件的元素以SessionElement形式返回处理复杂页面时效率很高 \n
# :param loc_or_ele: 元素的定位信息可以是loc元组或查询字符串
# :return: SessionElement对象或属性、文本
# """
# return make_session_ele(self, loc_or_ele)
#
# def s_eles(self, loc_or_ele) -> List[Union[SessionElement, str]]:
# """查找所有符合条件的元素以SessionElement列表形式返回处理复杂页面时效率很高 \n
# :param loc_or_ele: 元素的定位信息可以是loc元组或查询字符串
# :return: SessionElement对象或属性、文本
# """
# return make_session_ele(self, loc_or_ele, single=False)
#
# def _ele(self,
# loc_or_str: Union[Tuple[str, str], str],
# timeout: float = None,
# single: bool = True) -> Union[DriverElement, str, None, List[Union[DriverElement, str]]]:
# """返回当前元素下级符合条件的子元素,默认返回第一个 \n
# :param loc_or_str: 元素的定位信息可以是loc元组或查询字符串
# :param timeout: 查找元素超时时间
# :param single: True则返回第一个False则返回全部
# :return: DriverElement对象
# """
# # 先转换为sessionElement再获取所有元素获取它们的css selector路径再用路径在页面上执行查找
# loc = get_loc(loc_or_str)
# if loc[0] == 'css selector' and str(loc[1]).startswith(':root'):
# loc = loc[0], loc[1][5:]
#
# timeout = timeout if timeout is not None else self.page.timeout
# t1 = perf_counter()
# eles = make_session_ele(self.html).eles(loc)
# while not eles and perf_counter() - t1 <= timeout:
# eles = make_session_ele(self.html).eles(loc)
#
# if not eles:
# return None if single else eles
#
# css_paths = [i.css_path[47:] for i in eles]
#
# if single:
# return make_driver_ele(self, f'css:{css_paths[0]}', single, timeout)
# else:
# return [make_driver_ele(self, f'css:{css}', True, timeout) for css in css_paths]
#
# def run_script(self, script: str, *args) -> Any:
# """执行js代码传入自己为第一个参数 \n
# :param script: js文本
# :param args: 传入的参数
# :return: js执行结果
# """
# shadow_root = WebElement(self.page.driver, self.inner_ele._id)
# return shadow_root.parent.execute_script(script, shadow_root, *args)
#
# def is_enabled(self) -> bool:
# """是否可用"""
# return self.inner_ele.is_enabled()
#
# def is_valid(self) -> bool:
# """用于判断元素是否还能用,应对页面跳转元素不能用的情况"""
# try:
# self.is_enabled()
# return True
#
# except Exception:
# return False

View File

@ -20,6 +20,7 @@ class ChromeElement(DrissionElement):
super().__init__(page)
self._select = None
self._scroll = None
self._tag = None
if not node_id and not obj_id:
raise TypeError('node_id或obj_id必须传入一个。')
@ -31,8 +32,9 @@ class ChromeElement(DrissionElement):
self._obj_id = obj_id
def __repr__(self) -> str:
attrs = [f"{attr}='{self.attrs[attr]}'" for attr in self.attrs]
return f'<ChromeElement {self.tag} {" ".join(attrs)}>'
# attrs = [f"{attr}='{self.attrs[attr]}'" for attr in self.attrs]
# return f'<ChromeElement {self.tag} {" ".join(attrs)}>'
return f'<ChromeElement {self.tag} >'
def __call__(self,
loc_or_str: Union[Tuple[str, str], str],
@ -58,14 +60,18 @@ class ChromeElement(DrissionElement):
@property
def tag(self) -> str:
return self.page.driver.DOM.describeNode(nodeId=self._node_id)['node']['localName']
"""返回元素tag"""
# print(self.page.driver.DOM.describeNode(nodeId=self._node_id))
if self._tag is None:
self._tag = self.page.driver.DOM.describeNode(nodeId=self._node_id)['node']['localName'].lower()
return self._tag
@property
def inner_html(self) -> str:
"""返回元素innerHTML文本"""
if self.tag in ('iframe', 'frame'):
return self.run_script('return this.contentDocument.documentElement;').html
# return _run_script(self, 'this.contentDocument.body;').html
# return run_script(self, 'this.contentDocument.body;').html
return self.run_script('return this.innerHTML;')
@property
@ -111,38 +117,40 @@ class ChromeElement(DrissionElement):
def location(self) -> dict:
"""返回元素左上角坐标"""
js = '''function(){
function getElementPagePosition(element){
var actualLeft = element.offsetLeft;
var current = element.offsetParent;
while (current !== null){
actualLeft += current.offsetLeft;
current = current.offsetParent;
}
var actualTop = element.offsetTop;
var current = element.offsetParent;
while (current !== null){
actualTop += (current.offsetTop+current.clientTop);
current = current.offsetParent;
}
return actualLeft.toString() +' '+actualTop.toString();
}
return getElementPagePosition(this);}'''
function getElementPagePosition(element){
var actualLeft = element.offsetLeft;
var current = element.offsetParent;
while (current !== null){
actualLeft += current.offsetLeft;
current = current.offsetParent;
}
var actualTop = element.offsetTop;
var current = element.offsetParent;
while (current !== null){
actualTop += (current.offsetTop+current.clientTop);
current = current.offsetParent;
}
return actualLeft.toString() +' '+actualTop.toString();
}
return getElementPagePosition(this);}'''
xy = self.run_script(js)
x, y = xy.split(' ')
return {'x': int(x.split('.')[0]), 'y': int(y.split('.')[0])}
# @property
# def shadow_root(self):
# """返回当前元素的shadow_root元素对象"""
# shadow = self.run_script('return arguments[0].shadowRoot')
# if shadow:
# from .shadow_root_element import ShadowRootElement
# return ShadowRootElement(shadow, self)
#
# @property
# def sr(self):
# """返回当前元素的shadow_root元素对象"""
# return self.shadow_root
@property
def shadow_root(self):
"""返回当前元素的shadow_root元素对象"""
shadow = self.run_script('return this.shadowRoot;')
return shadow
# if shadow:
# from .shadow_root_element import ShadowRootElement
# return ShadowRootElement(shadow, self)
@property
def sr(self):
"""返回当前元素的shadow_root元素对象"""
return self.shadow_root
@property
def pseudo_before(self) -> str:
@ -345,7 +353,7 @@ function getElementPagePosition(element){
:param args: 参数按顺序在js文本中对应argument[0]argument[2]...
:return: 运行的结果
"""
return _run_script(self, script, as_expr, args)
return _run_script(self, script, as_expr, self.page.timeouts.script, args)
def ele(self,
loc_or_str: Union[Tuple[str, str], str],
@ -474,55 +482,38 @@ function getElementPagePosition(element){
:param clear: 输入前是否清空文本框
:return: None
"""
combination_key = False
if not isinstance(vals, (str, tuple, list)):
vals = str(vals)
if isinstance(vals, str):
if '\n' in vals:
combination_key = True
vals = (vals,)
if self.tag == 'input' and self.attr('type') == 'file':
return self._set_file_input(vals)
try:
self.page.driver.DOM.focus(nodeId=self._node_id)
except Exception:
self.click(by_js=False)
self.click(by_js=True)
if clear:
self.clear(by_js=True)
if not combination_key:
for i in ('\ue008', '\ue009', '\ue00a', '\ue03d'): # ctrl alt shift command 四键
if i in vals:
combination_key = True
break
# ------------处理字符-------------
if not isinstance(vals, (tuple, list)):
vals = (str(vals),)
modifier, vals = _keys_to_typing(vals)
if not combination_key:
self.page.run_cdp('Input.insertText', text=''.join(vals))
if modifier != 0: # 包含组合键
for key in vals:
_send_key(self, modifier, key)
return
modifier, typing = _keys_to_typing(vals)
for key in typing:
print([key])
if key not in _keyDefinitions:
self.page.run_cdp('Input.insertText', text=key)
if vals.endswith('\n'):
self.page.run_cdp('Input.insertText', text=vals[:-1])
_send_key(self, modifier, '\n')
else:
self.page.run_cdp('Input.insertText', text=vals)
else:
description = _keyDescriptionForString(modifier, key)
text = description['text']
data = {'type': 'keyDown' if text else 'rawKeyDown',
'modifiers': modifier,
'windowsVirtualKeyCode': description['keyCode'],
'code': description['code'],
'key': description['key'],
'text': text,
'autoRepeat': False,
'unmodifiedText': text,
'location': description['location'],
'isKeypad': description['location'] == 3}
self.page.run_cdp('Input.dispatchKeyEvent', **data)
# data['type'] = 'keyUp'
# self.page.run_cdp('Input.dispatchKeyEvent', **data)
def _set_file_input(self, files: Union[str, list, tuple]) -> None:
"""设置上传控件值"""
if isinstance(files):
files = files.split('\n')
self.page.driver.DOM.setFileInputFiles(files=files, nodeId=self._node_id)
def clear(self, by_js: bool = True) -> None:
"""清空元素文本 \n
@ -752,7 +743,8 @@ def _find_by_xpath(ele: ChromeElement, xpath: str, single: bool, timeout: float)
def _find_by_css(ele: ChromeElement, selector: str, single: bool, timeout: float):
selector = selector.replace('"', r'\"')
find_all = '' if single else 'All'
js = f'this.querySelector{find_all}("{selector}");'
node_txt = 'this.contentDocument' if ele.tag in ('iframe', 'frame') else 'this'
js = f'function(){{return {node_txt}.querySelector{find_all}("{selector}");}}'
r = ele.page.run_cdp('Runtime.callFunctionOn',
functionDeclaration=js, objectId=ele.obj_id, returnByValue=False, awaitPromise=True,
userGesture=True)
@ -815,7 +807,7 @@ else{a.push(e.snapshotItem(i));}}"""
return js
def _run_script(page_or_ele, script: str, as_expr: bool = False, args: tuple = None) -> Any:
def _run_script(page_or_ele, script: str, as_expr: bool = False, timeout: float = None, args: tuple = None) -> Any:
"""运行javascript代码 \n
:param page_or_ele: 页面对象或元素对象
:param script: js文本
@ -835,7 +827,8 @@ def _run_script(page_or_ele, script: str, as_expr: bool = False, args: tuple = N
expression=script,
returnByValue=False,
awaitPromise=True,
userGesture=True)
userGesture=True,
timeout=timeout * 1000)
else:
args = args or ()
if not is_js_func(script):
@ -851,7 +844,7 @@ def _run_script(page_or_ele, script: str, as_expr: bool = False, args: tuple = N
exceptionDetails = res.get('exceptionDetails')
if exceptionDetails:
raise RuntimeError(f'Evaluation failed: {exceptionDetails}')
# print(res.get('result'))
return _parse_js_result(page, res.get('result'))
@ -928,6 +921,39 @@ def _offset_scroll(ele: ChromeElement, x: int, y: int):
return x, y
def _send_enter(ele: ChromeElement):
# todo:windows系统回车是否不一样
data = {'type': 'keyDown', 'modifiers': 0, 'windowsVirtualKeyCode': 13, 'code': 'Enter', 'key': 'Enter',
'text': '\r', 'autoRepeat': False, 'unmodifiedText': '\r', 'location': 0, 'isKeypad': False}
ele.page.run_cdp('Input.dispatchKeyEvent', **data)
data['type'] = 'keyUp'
ele.page.run_cdp('Input.dispatchKeyEvent', **data)
def _send_key(ele: ChromeElement, modifier: int, key: str) -> None:
if key not in _keyDefinitions:
ele.page.run_cdp('Input.insertText', text=key)
else:
description = _keyDescriptionForString(modifier, key)
text = description['text']
data = {'type': 'keyDown' if text else 'rawKeyDown',
'modifiers': modifier,
'windowsVirtualKeyCode': description['keyCode'],
'code': description['code'],
'key': description['key'],
'text': text,
'autoRepeat': False,
'unmodifiedText': text,
'location': description['location'],
'isKeypad': description['location'] == 3}
ele.page.run_cdp('Input.dispatchKeyEvent', **data)
data['type'] = 'keyUp'
ele.page.run_cdp('Input.dispatchKeyEvent', **data)
class ChromeScroll(object):
"""用于滚动的对象"""

View File

@ -10,6 +10,7 @@ from json import loads
from requests.cookies import RequestsCookieJar
from .session_element import SessionElement, make_session_ele
from .config import DriverOptions, _cookies_to_tuple
from .base import BasePage
from .common import get_loc
@ -25,24 +26,27 @@ class ChromePage(BasePage):
super().__init__(timeout)
self._connect_debugger(Tab_or_Options, tab_handle)
# def _ready(self):
# self._alert = Alert()
# self.driver.Page.javascriptDialogOpening = self._on_alert_open
# self.driver.Page.javascriptDialogClosed = self._on_alert_close
def _connect_debugger(self, Tab_or_Options: Union[Tab, DriverOptions] = None, tab_handle: str = None):
self.timeouts = Timeout(self)
self._page_load_strategy = 'normal'
if isinstance(Tab_or_Options, Tab):
self._driver = Tab_or_Options
self.address = search(r'ws://(.*?)/dev', Tab_or_Options._websocket_url).group(1)
self.options = None
else:
if Tab_or_Options is None:
Tab_or_Options = DriverOptions() # 从ini文件读取
connect_chrome(Tab_or_Options)
self.address = Tab_or_Options.debugger_address
elif isinstance(Tab_or_Options, DriverOptions):
self.options = Tab_or_Options or DriverOptions() # 从ini文件读取
self.set_timeouts(page_load=self.options.timeouts['pageLoad'],
script=self.options.timeouts['script'])
self._page_load_strategy = self.options.page_load_strategy
connect_chrome(self.options)
self.address = self.options.debugger_address
tab_handle = self.tab_handles[0] if not tab_handle else tab_handle
self._driver = Tab(id=tab_handle, type='page',
webSocketDebuggerUrl=f'ws://{Tab_or_Options.debugger_address}/devtools/page/{tab_handle}')
webSocketDebuggerUrl=f'ws://{self.options.debugger_address}/devtools/page/{tab_handle}')
else:
raise TypeError('只能接收Tab或DriverOptions类型参数。')
self._driver.start()
self._driver.DOM.enable()
@ -110,7 +114,7 @@ class ChromePage(BasePage):
@property
def ready_state(self) -> str:
"""返回当前页面加载状态,"""
return self.driver.Runtime.evaluate(expression='document.readyState;')['result']['value']
return self.run_script('document.readyState;', as_expr=True)
@property
def scroll(self) -> ChromeScroll:
@ -122,10 +126,41 @@ class ChromePage(BasePage):
@property
def size(self) -> dict:
"""返回页面总长宽"""
w = self.driver.Runtime.evaluate(expression='document.body.scrollWidth;')['result']['value']
h = self.driver.Runtime.evaluate(expression='document.body.scrollHeight;')['result']['value']
w = self.run_script('document.body.scrollWidth;', as_expr=True)
h = self.run_script('document.body.scrollHeight;', as_expr=True)
return {'height': h, 'width': w}
@property
def active_ele(self) -> ChromeElement:
return self.run_script('return document.activeElement;')
@property
def page_load_strategy(self) -> str:
"""返回页面加载策略"""
return self._page_load_strategy
def set_page_load_strategy(self, value: str) -> None:
"""设置页面加载策略,可选'normal', 'eager', 'none'"""
if value not in ('normal', 'eager', 'none'):
raise ValueError("只能选择'normal', 'eager', 'none'")
self._page_load_strategy = value
def set_timeouts(self, implicit: float = None, page_load: float = None, script: float = None) -> None:
"""设置超时时间单位为秒selenium4以上版本有效 \n
:param implicit: 查找元素超时时间
:param page_load: 页面加载超时时间
:param script: 脚本运行超时时间
:return: None
"""
if implicit is not None:
self.timeout = implicit
if page_load is not None:
self.timeouts.page_load = page_load
if script is not None:
self.timeouts.script = script
def run_script(self, script: str, as_expr: bool = False, *args: Any) -> Any:
"""运行javascript代码 \n
:param script: js文本
@ -133,7 +168,7 @@ class ChromePage(BasePage):
:param args: 参数按顺序在js文本中对应argument[0]argument[2]...
:return: 运行的结果
"""
return _run_script(self, script, as_expr, args)
return _run_script(self, script, as_expr, self.timeouts.script, args)
def get(self,
url: str,
@ -187,11 +222,22 @@ class ChromePage(BasePage):
timeout: float = None) -> List[Union[ChromeElement, str]]:
return self._ele(loc_or_ele, timeout=timeout, single=False)
# def s_ele(self):
# pass
#
# def s_eles(self):
# pass
def s_ele(self, loc_or_ele: Union[Tuple[str, str], str, ChromeElement] = None) -> Union[SessionElement, str, None]:
"""查找第一个符合条件的元素以SessionElement形式返回处理复杂页面时效率很高 \n
:param loc_or_ele: 元素的定位信息可以是loc元组或查询字符串
:return: SessionElement对象或属性文本
"""
if isinstance(loc_or_ele, ChromeElement):
return make_session_ele(loc_or_ele)
else:
return make_session_ele(self, loc_or_ele)
def s_eles(self, loc_or_str: Union[Tuple[str, str], str] = None) -> List[Union[SessionElement, str]]:
"""查找所有符合条件的元素以SessionElement列表形式返回 \n
:param loc_or_str: 元素的定位信息可以是loc元组或查询字符串
:return: SessionElement对象组成的列表
"""
return make_session_ele(self, loc_or_str, single=False)
def _ele(self,
loc_or_ele: Union[Tuple[str, str], str, ChromeElement],
@ -205,12 +251,12 @@ class ChromePage(BasePage):
raise ValueError('loc_or_str参数只能是tuple、str、ChromeElement类型。')
timeout = timeout if timeout is not None else self.timeout
search_result = self.driver.DOM.performSearch(query=loc)
search_result = self.driver.DOM.performSearch(query=loc, includeUserAgentShadowDOM=True)
count = search_result['resultCount']
end_time = perf_counter() + timeout
while count == 0 and perf_counter() < end_time:
search_result = self.driver.DOM.performSearch(query=loc)
search_result = self.driver.DOM.performSearch(query=loc, includeUserAgentShadowDOM=True)
count = search_result['resultCount']
if count == 0:
@ -309,16 +355,17 @@ class ChromePage(BasePage):
:param steps: 次数
:return: None
"""
self.driver.Runtime.evaluate(expression=f'window.history.go({steps});')
self.run_script(f'window.history.go({steps});', as_expr=True)
def back(self, steps: int = 1) -> None:
"""在浏览历史中后退若干步 \n
:param steps: 次数
:return: None
"""
self.driver.Runtime.evaluate(expression=f'window.history.go({-steps});')
self.run_script(f'window.history.go({-steps});', as_expr=True)
def stop_loading(self) -> None:
"""页面停止加载"""
self.driver.Page.stopLoading()
def run_cdp(self, cmd: str, **cmd_args):
@ -342,7 +389,7 @@ class ChromePage(BasePage):
:return: sessionStorage一个或所有项内容
"""
js = f'sessionStorage.getItem("{item}");' if item else 'sessionStorage;'
return self.driver.Runtime.evaluate(js)
return self.run_script(js, as_expr=True)
def get_local_storage(self, item: str = None) -> Union[str, dict, None]:
"""获取localStorage信息不设置item则获取全部 \n
@ -350,7 +397,7 @@ class ChromePage(BasePage):
:return: localStorage一个或所有项内容
"""
js = f'localStorage.getItem("{item}");' if item else 'localStorage;'
return self.driver.Runtime.evaluate(js)
return self.run_script(js, as_expr=True)
def set_session_storage(self, item: str, value: Union[str, bool]) -> None:
"""设置或删除某项sessionStorage信息 \n
@ -358,8 +405,8 @@ class ChromePage(BasePage):
:param value: 项的值设置为False时删除该项
:return: None
"""
s = f'sessionStorage.removeItem("{item}");' if item is False else f'sessionStorage.setItem("{item}","{value}");'
return self.driver.Runtime.evaluate(s)
js = f'sessionStorage.removeItem("{item}");' if item is False else f'sessionStorage.setItem("{item}","{value}");'
return self.run_script(js, as_expr=True)
def set_local_storage(self, item: str, value: Union[str, bool]) -> None:
"""设置或删除某项localStorage信息 \n
@ -367,8 +414,8 @@ class ChromePage(BasePage):
:param value: 项的值设置为False时删除该项
:return: None
"""
s = f'localStorage.removeItem("{item}");' if item is False else f'localStorage.setItem("{item}","{value}");'
return self.driver.Runtime.evaluate(s)
js = f'localStorage.removeItem("{item}");' if item is False else f'localStorage.setItem("{item}","{value}");'
return self.run_script(js, as_expr=True)
def create_tab(self, url: str = None) -> None:
"""新建并定位到一个标签页,该标签页在最后面 \n
@ -442,7 +489,7 @@ class ChromePage(BasePage):
"""
self.close_tabs(num_or_handles, True)
def clean_cache(self,
def clear_cache(self,
session_storage: bool = True,
local_storage: bool = True,
cache: bool = True,
@ -455,84 +502,14 @@ class ChromePage(BasePage):
:return: None
"""
if session_storage:
self.driver.Runtime.evaluate(expression='sessionStorage.clear();')
self.run_script('sessionStorage.clear();', as_expr=True)
if local_storage:
self.driver.Runtime.evaluate(expression='localStorage.clear();')
self.run_script('localStorage.clear();', as_expr=True)
if cache:
self.driver.Network.clearBrowserCache()
if cookies:
self.driver.Network.clearBrowserCookies()
def check_page(self):
pass
# @property
# def active_ele(self):
# pass
def _d_connect(self,
to_url: str,
times: int = 0,
interval: float = 1,
show_errmsg: bool = False,
timeout: float = None) -> Union[bool, None]:
"""尝试连接,重试若干次 \n
:param to_url: 要访问的url
:param times: 重试次数
:param interval: 重试间隔
:param show_errmsg: 是否抛出异常
:return: 是否成功返回None表示不确定
"""
err = None
is_ok = False
timeout = timeout if timeout is not None else self.timeout
for _ in range(times + 1):
try:
result = self.driver.Page.navigate(url=to_url)
end_time = perf_counter() + timeout
while self.ready_state != 'complete' and perf_counter() < end_time:
sleep(.5)
if self.ready_state != 'complete':
raise TimeoutError
if 'errorText' in result:
raise ConnectionError(result['errorText'])
go_ok = True
except Exception as e:
err = e
go_ok = False
is_ok = self.check_page() if go_ok else False
if is_ok is not False:
break
if _ < times:
sleep(interval)
if show_errmsg:
print(f'重试 {to_url}')
if is_ok is False and show_errmsg:
raise err if err is not None else ConnectionError('连接异常。')
return is_ok
def _on_alert_close(self, **kwargs):
self._alert.activated = False
self._alert.text = None
self._alert.type = None
self._alert.defaultPrompt = None
self._alert.response_accept = kwargs.get['result']
self._alert.response_text = kwargs['userInput']
def _on_alert_open(self, **kwargs):
self._alert.activated = True
self._alert.text = kwargs['message']
self._alert.type = kwargs['message']
self._alert.defaultPrompt = kwargs.get('defaultPrompt', None)
self._alert.response_accept = None
self._alert.response_text = None
def handle_alert(self, accept: bool = True, send: str = None, timeout: float = None) -> Union[str, None]:
"""处理提示框 \n
:param accept: True表示确认False表示取消其它值不会按按钮但依然返回文本值
@ -554,8 +531,86 @@ class ChromePage(BasePage):
self.driver.Page.handleJavaScriptDialog(accept=accept)
return res_text
def check_page(self) -> Union[bool, None]:
"""检查页面是否符合预期 \n
由子类自行实现各页面的判定规则
"""
return None
def _d_connect(self,
to_url: str,
times: int = 0,
interval: float = 1,
show_errmsg: bool = False,
timeout: float = None) -> Union[bool, None]:
"""尝试连接,重试若干次 \n
:param to_url: 要访问的url
:param times: 重试次数
:param interval: 重试间隔
:param show_errmsg: 是否抛出异常
:param timeout: 连接超时时间
:return: 是否成功返回None表示不确定
"""
err = None
is_ok = False
timeout = timeout if timeout is not None else self.timeout
for _ in range(times + 1):
result = self.driver.Page.navigate(url=to_url)
is_timeout = True
end_time = perf_counter() + timeout
while perf_counter() < end_time:
if ((self.page_load_strategy == 'normal' and self.ready_state == 'complete')
or (self.page_load_strategy == 'eager' and self.ready_state in ('interactive', 'complete'))
or (self.page_load_strategy == 'none' and self.ready_state
in ('loading', 'interactive', 'complete'))):
self.stop_loading()
is_timeout = False
break
if is_timeout:
raise TimeoutError('页面连接超时。')
if 'errorText' in result:
raise ConnectionError(result['errorText'])
is_ok = self.check_page()
if is_ok is not False:
break
if _ < times:
sleep(interval)
if show_errmsg:
print(f'重试 {to_url}')
if is_ok is False and show_errmsg:
raise err if err is not None else ConnectionError('连接异常。')
return is_ok
def _on_alert_close(self, **kwargs):
"""alert关闭时触发的方法"""
self._alert.activated = False
self._alert.text = None
self._alert.type = None
self._alert.defaultPrompt = None
self._alert.response_accept = kwargs.get['result']
self._alert.response_text = kwargs['userInput']
def _on_alert_open(self, **kwargs):
"""alert出现时触发的方法"""
self._alert.activated = True
self._alert.text = kwargs['message']
self._alert.type = kwargs['message']
self._alert.defaultPrompt = kwargs.get('defaultPrompt', None)
self._alert.response_accept = None
self._alert.response_text = None
class Alert(object):
"""用于保存alert信息"""
def __init__(self):
self.activated = False
self.text = None
@ -565,6 +620,19 @@ class Alert(object):
self.response_text = None
class Timeout(object):
"""用于保存d模式timeout信息"""
def __init__(self, page: ChromePage):
self.page = page
self.page_load = 30
self.script = 30
@property
def implicit(self):
return self.page.timeout
def _get_tabs(handles: list, num_or_handles: Union[int, str, list, tuple, set]) -> set:
"""返回指定标签页handle组成的set \n
:param handles: handles列表

View File

@ -703,6 +703,8 @@ class DriverOptions(Options):
:param value: 可接收 'normal', 'eager', 'none'
:return: 当前对象
"""
if value not in ('normal', 'eager', 'none'):
raise ValueError("只能选择'normal', 'eager', 'none'")
self.page_load_strategy = value.lower()
return self

View File

@ -512,9 +512,9 @@ class DriverPage(BasePage):
"""
return glob(f'{download_path}{sep}*.crdownload')
def handle_alert(self, accept: bool = True, send: str = None, timeout: float = None) -> Union[str, None]:
def process_alert(self, ok: bool = True, send: str = None, timeout: float = None) -> Union[str, None]:
"""处理提示框 \n
:param accept: True表示确认False表示取消其它值不会按按钮但依然返回文本值
:param ok: True表示确认False表示取消其它值不会按按钮但依然返回文本值
:param send: 处理prompt提示框时可输入文本
:param timeout: 等待提示框出现的超时时间
:return: 提示框内容文本未等到提示框则返回None
@ -540,9 +540,9 @@ class DriverPage(BasePage):
if send is not None:
alert.send_keys(send)
if accept is True:
if ok is True:
alert.accept()
elif accept is False:
elif ok is False:
alert.dismiss()
return res_text

View File

@ -3,9 +3,7 @@ from typing import List, Tuple, Dict
class Keys:
"""
Set of special keys codes.
"""
"""特殊按键"""
NULL = '\ue000'
CANCEL = '\ue001' # ^break
@ -329,14 +327,18 @@ _keyDefinitions = {
'}': {'keyCode': 221, 'key': '}', 'code': 'BracketRight'},
'"': {'keyCode': 222, 'key': '"', 'code': 'Quote'},
}
_modifierBit = {'\ue00a': 1,
'\ue009': 2,
'\ue03d': 4,
'\ue008': 8}
def _keys_to_typing(value) -> Tuple[int, list]:
def _keys_to_typing(value) -> Tuple[int, str]:
typing: List[str] = []
modifier = 0
for val in value:
if val in ('\ue008', '\ue009', '\ue00a', '\ue03d'):
modifier |= _modifierBit(val)
if val in ('\ue009', '\ue008', '\ue00a', '\ue03d'):
modifier |= _modifierBit.get(val, 0)
continue
if isinstance(val, (int, float)):
val = str(val)
@ -345,30 +347,17 @@ def _keys_to_typing(value) -> Tuple[int, list]:
else:
for i in range(len(val)):
typing.append(val[i])
return modifier, typing
def _modifierBit(key: str) -> int:
if key == '\ue00a':
return 1
if key == '\ue009':
return 2
if key == '\ue03d':
return 4
if key == '\ue008':
return 8
return 0
return modifier, ''.join(typing)
def _keyDescriptionForString(_modifiers: int, keyString: str) -> Dict: # noqa: C901
shift = _modifiers & 8
description = {
'key': '',
'keyCode': 0,
'code': '',
'text': '',
'location': 0,
}
description = {'key': '',
'keyCode': 0,
'code': '',
'text': '',
'location': 0}
definition: Dict = _keyDefinitions.get(keyString) # type: ignore
if not definition:

View File

@ -185,6 +185,27 @@ class WebPage(SessionPage, ChromePage, BasePage):
elif self._mode == 'd':
return super(SessionPage, self).eles(loc_or_str, timeout=timeout)
def s_ele(self, loc_or_ele: Union[Tuple[str, str], str, ChromeElement, SessionElement] = None) \
-> Union[SessionElement, str, None]:
"""查找第一个符合条件的元素以SessionElement形式返回d模式处理复杂页面时效率很高 \n
:param loc_or_ele: 元素的定位信息可以是loc元组或查询字符串
:return: SessionElement对象或属性文本
"""
if self._mode == 's':
return super().s_ele(loc_or_ele)
elif self._mode == 'd':
return super(SessionPage, self).s_ele(loc_or_ele)
def s_eles(self, loc_or_str: Union[Tuple[str, str], str] = None) -> List[Union[SessionElement, str]]:
"""查找所有符合条件的元素以SessionElement形式返回d模式处理复杂页面时效率很高 \n
:param loc_or_str: 元素的定位信息可以是loc元组或查询字符串
:return: SessionElement对象或属性文本组成的列表
"""
if self._mode == 's':
return super().s_eles(loc_or_str)
elif self._mode == 'd':
return super(SessionPage, self).s_eles(loc_or_str)
def change_mode(self, mode: str = None, go: bool = True) -> None:
"""切换模式,接收's''d',除此以外的字符串会切换为 d 模式 \n
切换时会把当前模式的cookies复制到目标模式 \n
@ -379,4 +400,4 @@ class WebPage(SessionPage, ChromePage, BasePage):
self._session_options = Session_or_Options
else:
raise TypeError('session_or_options参数只能接收Session, dict, SessionOptions或False。')
raise TypeError('session_or_options参数只能接收Session, dict, SessionOptions或False。')