mirror of
https://gitee.com/g1879/DrissionPage.git
synced 2024-12-10 04:00:23 +08:00
继续开发新版,未完成
This commit is contained in:
parent
fdfa9a778c
commit
48c0428600
@ -5,10 +5,11 @@
|
||||
@File : chrome_element.py
|
||||
"""
|
||||
from pathlib import Path
|
||||
from re import search
|
||||
from typing import Union, Tuple, List, Any
|
||||
from time import perf_counter, sleep
|
||||
|
||||
from .session_element import make_session_ele
|
||||
from .session_element import make_session_ele, SessionElement
|
||||
from .base import DrissionElement
|
||||
from .common import make_absolute_link, get_loc, get_ele_txt, format_html, is_js_func
|
||||
|
||||
@ -32,17 +33,26 @@ class ChromeElement(DrissionElement):
|
||||
attrs = [f"{attr}='{self.attrs[attr]}'" for attr in self.attrs]
|
||||
return f'<ChromeElement {self.tag} {" ".join(attrs)}>'
|
||||
|
||||
@property
|
||||
def obj_id(self) -> str:
|
||||
return self._obj_id
|
||||
|
||||
@property
|
||||
def node_id(self) -> str:
|
||||
return self._node_id
|
||||
def __call__(self,
|
||||
loc_or_str: Union[Tuple[str, str], str],
|
||||
timeout: float = None) -> Union['ChromeElement', str, None]:
|
||||
"""在内部查找元素 \n
|
||||
例:ele2 = ele1('@id=ele_id') \n
|
||||
:param loc_or_str: 元素的定位信息,可以是loc元组,或查询字符串
|
||||
:param timeout: 超时时间
|
||||
:return: ChromeElement对象或属性、文本
|
||||
"""
|
||||
return self.ele(loc_or_str, timeout)
|
||||
|
||||
@property
|
||||
def html(self) -> str:
|
||||
"""返回元素outerHTML文本"""
|
||||
tag = self.tag
|
||||
if tag in ('iframe', 'frame'):
|
||||
out_html = self.page.driver.DOM.getOuterHTML(nodeId=self._node_id)['outerHTML']
|
||||
in_html = self.inner_html
|
||||
sign = search(rf'<{tag}.*?>', out_html).group(0)
|
||||
return f'{sign}{in_html}</{tag}>'
|
||||
return self.page.driver.DOM.getOuterHTML(nodeId=self._node_id)['outerHTML']
|
||||
|
||||
@property
|
||||
@ -52,7 +62,10 @@ class ChromeElement(DrissionElement):
|
||||
@property
|
||||
def inner_html(self) -> str:
|
||||
"""返回元素innerHTML文本"""
|
||||
return self.run_script('this.innerHTML;')['result']['value']
|
||||
if self.tag in ('iframe', 'frame'):
|
||||
return _run_script(self, 'this.contentDocument.documentElement;').html
|
||||
# return _run_script(self, 'this.contentDocument.body;').html
|
||||
return self.run_script('this.innerHTML;')
|
||||
|
||||
@property
|
||||
def attrs(self) -> dict:
|
||||
@ -60,6 +73,25 @@ class ChromeElement(DrissionElement):
|
||||
attrs_len = len(attrs)
|
||||
return {attrs[i]: attrs[i + 1] for i in range(0, attrs_len, 2)}
|
||||
|
||||
@property
|
||||
def text(self) -> str:
|
||||
"""返回元素内所有文本"""
|
||||
return get_ele_txt(make_session_ele(self.html))
|
||||
|
||||
@property
|
||||
def raw_text(self):
|
||||
"""返回未格式化处理的元素内文本"""
|
||||
return self.prop('innerText')
|
||||
|
||||
# -----------------driver独有属性-------------------
|
||||
@property
|
||||
def obj_id(self) -> str:
|
||||
return self._obj_id
|
||||
|
||||
@property
|
||||
def node_id(self) -> str:
|
||||
return self._node_id
|
||||
|
||||
@property
|
||||
def size(self) -> dict:
|
||||
"""返回元素宽和高"""
|
||||
@ -70,7 +102,7 @@ class ChromeElement(DrissionElement):
|
||||
def client_location(self) -> dict:
|
||||
"""返回元素左上角坐标"""
|
||||
js = 'this.getBoundingClientRect().left.toString()+" "+this.getBoundingClientRect().top.toString();'
|
||||
xy = self.run_script(js)['result']['value']
|
||||
xy = self.run_script(js)
|
||||
x, y = xy.split(' ')
|
||||
return {'x': int(x.split('.')[0]), 'y': int(y.split('.')[0])}
|
||||
|
||||
@ -94,10 +126,33 @@ function getElementPagePosition(element){
|
||||
return actualLeft.toString() +' '+actualTop.toString();
|
||||
}
|
||||
return getElementPagePosition(this);}'''
|
||||
xy = self.run_script(js)['result']['value']
|
||||
xy = self.run_script(js)
|
||||
x, y = xy.split(' ')
|
||||
return {'x': int(x.split('.')[0]), 'y': int(y.split('.')[0])}
|
||||
|
||||
# @property
|
||||
# def shadow_root(self):
|
||||
# """返回当前元素的shadow_root元素对象"""
|
||||
# shadow = self.run_script('return arguments[0].shadowRoot')
|
||||
# if shadow:
|
||||
# from .shadow_root_element import ShadowRootElement
|
||||
# return ShadowRootElement(shadow, self)
|
||||
#
|
||||
# @property
|
||||
# def sr(self):
|
||||
# """返回当前元素的shadow_root元素对象"""
|
||||
# return self.shadow_root
|
||||
|
||||
@property
|
||||
def pseudo_before(self) -> str:
|
||||
"""返回当前元素的::before伪元素内容"""
|
||||
return self.style('content', 'before')
|
||||
|
||||
@property
|
||||
def pseudo_after(self) -> str:
|
||||
"""返回当前元素的::after伪元素内容"""
|
||||
return self.style('content', 'after')
|
||||
|
||||
@property
|
||||
def scroll(self) -> 'ChromeScroll':
|
||||
"""用于滚动滚动条的对象"""
|
||||
@ -105,6 +160,116 @@ function getElementPagePosition(element){
|
||||
self._scroll = ChromeScroll(self)
|
||||
return self._scroll
|
||||
|
||||
def parent(self, level_or_loc: Union[tuple, str, int] = 1) -> Union['ChromeElement', None]:
|
||||
"""返回上面某一级父元素,可指定层数或用查询语法定位 \n
|
||||
:param level_or_loc: 第几级父元素,或定位符
|
||||
:return: 上级元素对象
|
||||
"""
|
||||
return super().parent(level_or_loc)
|
||||
|
||||
def prev(self,
|
||||
index: int = 1,
|
||||
filter_loc: Union[tuple, str] = '',
|
||||
timeout: float = 0) -> Union['ChromeElement', str, None]:
|
||||
"""返回前面的一个兄弟元素,可用查询语法筛选,可指定返回筛选结果的第几个 \n
|
||||
:param index: 前面第几个查询结果元素
|
||||
:param filter_loc: 用于筛选元素的查询语法
|
||||
:param timeout: 查找元素的超时时间
|
||||
:return: 兄弟元素
|
||||
"""
|
||||
return super().prev(index, filter_loc, timeout)
|
||||
|
||||
def next(self,
|
||||
index: int = 1,
|
||||
filter_loc: Union[tuple, str] = '',
|
||||
timeout: float = 0) -> Union['ChromeElement', str, None]:
|
||||
"""返回后面的一个兄弟元素,可用查询语法筛选,可指定返回筛选结果的第几个 \n
|
||||
:param index: 后面第几个查询结果元素
|
||||
:param filter_loc: 用于筛选元素的查询语法
|
||||
:param timeout: 查找元素的超时时间
|
||||
:return: 兄弟元素
|
||||
"""
|
||||
return super().next(index, filter_loc, timeout)
|
||||
|
||||
def before(self,
|
||||
index: int = 1,
|
||||
filter_loc: Union[tuple, str] = '',
|
||||
timeout: float = None) -> Union['ChromeElement', str, None]:
|
||||
"""返回当前元素前面的一个元素,可指定筛选条件和第几个。查找范围不限兄弟元,而是整个DOM文档 \n
|
||||
:param index: 前面第几个查询结果元素
|
||||
:param filter_loc: 用于筛选元素的查询语法
|
||||
:param timeout: 查找元素的超时时间
|
||||
:return: 本元素前面的某个元素或节点
|
||||
"""
|
||||
return super().before(index, filter_loc, timeout)
|
||||
|
||||
def after(self,
|
||||
index: int = 1,
|
||||
filter_loc: Union[tuple, str] = '',
|
||||
timeout: float = None) -> Union['ChromeElement', str, None]:
|
||||
"""返回当前元素后面的一个元素,可指定筛选条件和第几个。查找范围不限兄弟元,而是整个DOM文档 \n
|
||||
:param index: 后面第几个查询结果元素
|
||||
:param filter_loc: 用于筛选元素的查询语法
|
||||
:param timeout: 查找元素的超时时间
|
||||
:return: 本元素后面的某个元素或节点
|
||||
"""
|
||||
return super().after(index, filter_loc, timeout)
|
||||
|
||||
def prevs(self,
|
||||
filter_loc: Union[tuple, str] = '',
|
||||
timeout: float = 0) -> List[Union['ChromeElement', str]]:
|
||||
"""返回前面全部兄弟元素或节点组成的列表,可用查询语法筛选 \n
|
||||
:param filter_loc: 用于筛选元素的查询语法
|
||||
:param timeout: 查找元素的超时时间
|
||||
:return: 兄弟元素或节点文本组成的列表
|
||||
"""
|
||||
return super().prevs(filter_loc, timeout)
|
||||
|
||||
def nexts(self,
|
||||
filter_loc: Union[tuple, str] = '',
|
||||
timeout: float = 0) -> List[Union['ChromeElement', str]]:
|
||||
"""返回后面全部兄弟元素或节点组成的列表,可用查询语法筛选 \n
|
||||
:param filter_loc: 用于筛选元素的查询语法
|
||||
:param timeout: 查找元素的超时时间
|
||||
:return: 兄弟元素或节点文本组成的列表
|
||||
"""
|
||||
return super().nexts(filter_loc, timeout)
|
||||
|
||||
def befores(self,
|
||||
filter_loc: Union[tuple, str] = '',
|
||||
timeout: float = None) -> List[Union['ChromeElement', str]]:
|
||||
"""返回当前元素后面符合条件的全部兄弟元素或节点组成的列表,可用查询语法筛选。查找范围不限兄弟元,而是整个DOM文档 \n
|
||||
:param filter_loc: 用于筛选元素的查询语法
|
||||
:param timeout: 查找元素的超时时间
|
||||
:return: 本元素前面的元素或节点组成的列表
|
||||
"""
|
||||
return super().befores(filter_loc, timeout)
|
||||
|
||||
# def wait_ele(self,
|
||||
# loc_or_ele: Union[str, tuple, 'ChromeElement'],
|
||||
# timeout: float = None) -> 'ElementWaiter':
|
||||
# """等待子元素从dom删除、显示、隐藏 \n
|
||||
# :param loc_or_ele: 可以是元素、查询字符串、loc元组
|
||||
# :param timeout: 等待超时时间
|
||||
# :return: 等待是否成功
|
||||
# """
|
||||
# return ElementWaiter(self, loc_or_ele, timeout)
|
||||
|
||||
@property
|
||||
def select(self):
|
||||
"""返回专门处理下拉列表的Select类,非下拉列表元素返回False"""
|
||||
if self._select is None:
|
||||
if self.tag != 'select':
|
||||
self._select = False
|
||||
else:
|
||||
self._select = ChromeSelect(self)
|
||||
|
||||
return self._select
|
||||
|
||||
@property
|
||||
def is_selected(self):
|
||||
return self.run_script('this.selected;')
|
||||
|
||||
@property
|
||||
def is_in_view(self) -> bool:
|
||||
"""返回元素是否出现在视口中,已元素中点为判断"""
|
||||
@ -116,48 +281,7 @@ const vWidth = window.innerWidth || document.documentElement.clientWidth;
|
||||
const vHeight = window.innerHeight || document.documentElement.clientHeight;
|
||||
if (x< 0 || y < 0 || x > vWidth || y > vHeight){return false;}
|
||||
return true;}"""
|
||||
return self.run_script(js)['result']['value']
|
||||
|
||||
def run_script(self, script: str, as_expr: bool = False, *args: Any) -> Any:
|
||||
"""运行javascript代码 \n
|
||||
:param script: js文本
|
||||
:param as_expr: 是否作为表达式运行,为True时args无效
|
||||
:param args: 参数,按顺序在js文本中对应argument[0]、argument[2]...
|
||||
:return: 运行的结果
|
||||
"""
|
||||
return run_script(self, script, as_expr, *args)
|
||||
|
||||
def ele(self,
|
||||
loc_or_str: Union[Tuple[str, str], str],
|
||||
timeout: float = None) -> Union['ChromeElement', str, None]:
|
||||
"""返回当前元素下级符合条件的第一个元素、属性或节点文本 \n
|
||||
:param loc_or_str: 元素的定位信息,可以是loc元组,或查询字符串
|
||||
:param timeout: 查找元素超时时间,默认与元素所在页面等待时间一致
|
||||
:return: DriverElement对象或属性、文本
|
||||
"""
|
||||
return self._ele(loc_or_str, timeout)
|
||||
|
||||
def eles(self,
|
||||
loc_or_str: Union[Tuple[str, str], str],
|
||||
timeout: float = None) -> List[Union['ChromeElement', str]]:
|
||||
"""返回当前元素下级所有符合条件的子元素、属性或节点文本 \n
|
||||
:param loc_or_str: 元素的定位信息,可以是loc元组,或查询字符串
|
||||
:param timeout: 查找元素超时时间,默认与元素所在页面等待时间一致
|
||||
:return: DriverElement对象或属性、文本组成的列表
|
||||
"""
|
||||
return self._ele(loc_or_str, timeout=timeout, single=False)
|
||||
|
||||
def _ele(self,
|
||||
loc_or_str: Union[Tuple[str, str], str],
|
||||
timeout: float = None,
|
||||
single: bool = True) -> Union['ChromeElement', str, None, List[Union['ChromeElement', str]]]:
|
||||
"""返回当前元素下级符合条件的子元素、属性或节点文本,默认返回第一个 \n
|
||||
:param loc_or_str: 元素的定位信息,可以是loc元组,或查询字符串
|
||||
:param timeout: 查找元素超时时间
|
||||
:param single: True则返回第一个,False则返回全部
|
||||
:return: DriverElement对象
|
||||
"""
|
||||
return make_chrome_ele(self, loc_or_str, single, timeout)
|
||||
return self.run_script(js)
|
||||
|
||||
def attr(self, attr: str) -> Union[str, None]:
|
||||
"""返回attribute属性值 \n
|
||||
@ -168,11 +292,9 @@ return true;}"""
|
||||
attrs = self.attrs
|
||||
if attr == 'href':
|
||||
link = attrs['href']
|
||||
# 若为链接为None、js或邮件,直接返回
|
||||
if not link or link.lower().startswith(('javascript:', 'mailto:')):
|
||||
return link
|
||||
|
||||
else: # 其它情况直接返回绝对url
|
||||
else:
|
||||
return make_absolute_link(link, self.page)
|
||||
|
||||
elif attr == 'src':
|
||||
@ -191,7 +313,66 @@ return true;}"""
|
||||
return self.inner_html
|
||||
|
||||
else:
|
||||
return attrs[attr]
|
||||
return attrs.get(attr, None)
|
||||
|
||||
def run_script(self, script: str, as_expr: bool = False, *args: Any) -> Any:
|
||||
"""运行javascript代码 \n
|
||||
:param script: js文本
|
||||
:param as_expr: 是否作为表达式运行,为True时args无效
|
||||
:param args: 参数,按顺序在js文本中对应argument[0]、argument[2]...
|
||||
:return: 运行的结果
|
||||
"""
|
||||
return _run_script(self, script, as_expr, *args)
|
||||
|
||||
def ele(self,
|
||||
loc_or_str: Union[Tuple[str, str], str],
|
||||
timeout: float = None) -> Union['ChromeElement', str, None]:
|
||||
"""返回当前元素下级符合条件的第一个元素、属性或节点文本 \n
|
||||
:param loc_or_str: 元素的定位信息,可以是loc元组,或查询字符串
|
||||
:param timeout: 查找元素超时时间,默认与元素所在页面等待时间一致
|
||||
:return: ChromeElement对象或属性、文本
|
||||
"""
|
||||
return self._ele(loc_or_str, timeout)
|
||||
|
||||
def eles(self,
|
||||
loc_or_str: Union[Tuple[str, str], str],
|
||||
timeout: float = None) -> List[Union['ChromeElement', str]]:
|
||||
"""返回当前元素下级所有符合条件的子元素、属性或节点文本 \n
|
||||
:param loc_or_str: 元素的定位信息,可以是loc元组,或查询字符串
|
||||
:param timeout: 查找元素超时时间,默认与元素所在页面等待时间一致
|
||||
:return: ChromeElement对象或属性、文本组成的列表
|
||||
"""
|
||||
return self._ele(loc_or_str, timeout=timeout, single=False)
|
||||
|
||||
def s_ele(self, loc_or_str: Union[Tuple[str, str], str] = None) -> Union[SessionElement, str, None]:
|
||||
"""查找第一个符合条件的元素以SessionElement形式返回,处理复杂页面时效率很高 \n
|
||||
:param loc_or_str: 元素的定位信息,可以是loc元组,或查询字符串
|
||||
:return: SessionElement对象或属性、文本
|
||||
"""
|
||||
if self.tag in ('iframe', 'frame'):
|
||||
return make_session_ele(self.inner_html, loc_or_str)
|
||||
return make_session_ele(self, loc_or_str)
|
||||
|
||||
def s_eles(self, loc_or_str: Union[Tuple[str, str], str] = None) -> List[Union[SessionElement, str]]:
|
||||
"""查找所有符合条件的元素以SessionElement列表形式返回 \n
|
||||
:param loc_or_str: 定位符
|
||||
:return: SessionElement或属性、文本组成的列表
|
||||
"""
|
||||
if self.tag in ('iframe', 'frame'):
|
||||
return make_session_ele(self.inner_html, loc_or_str, single=False)
|
||||
return make_session_ele(self, loc_or_str, single=False)
|
||||
|
||||
def _ele(self,
|
||||
loc_or_str: Union[Tuple[str, str], str],
|
||||
timeout: float = None,
|
||||
single: bool = True) -> Union['ChromeElement', str, None, List[Union['ChromeElement', str]]]:
|
||||
"""返回当前元素下级符合条件的子元素、属性或节点文本,默认返回第一个 \n
|
||||
:param loc_or_str: 元素的定位信息,可以是loc元组,或查询字符串
|
||||
:param timeout: 查找元素超时时间
|
||||
:param single: True则返回第一个,False则返回全部
|
||||
:return: ChromeElement对象
|
||||
"""
|
||||
return make_chrome_ele(self, loc_or_str, single, timeout)
|
||||
|
||||
def prop(self, prop: str) -> Union[str, int, None]:
|
||||
"""获取property属性值 \n
|
||||
@ -213,9 +394,22 @@ return true;}"""
|
||||
:return: 是否设置成功
|
||||
"""
|
||||
value = value.replace("'", "\\'")
|
||||
r = self.run_script(f'this.{prop}="{value}";')
|
||||
if 'exceptionDetails' in r:
|
||||
raise SyntaxError(r['result']['description'])
|
||||
self.run_script(f'this.{prop}="{value}";')
|
||||
|
||||
def set_attr(self, attr: str, value: str) -> None:
|
||||
"""设置元素attribute属性 \n
|
||||
:param attr: 属性名
|
||||
:param value: 属性值
|
||||
:return: 是否设置成功
|
||||
"""
|
||||
self.run_script(f'this.setAttribute(arguments[0], arguments[1]);', False, attr, str(value))
|
||||
|
||||
def remove_attr(self, attr: str) -> None:
|
||||
"""删除元素attribute属性 \n
|
||||
:param attr: 属性名
|
||||
:return: 是否删除成功
|
||||
"""
|
||||
self.run_script(f'this.removeAttribute("{attr}");')
|
||||
|
||||
def style(self, style: str, pseudo_ele: str = '') -> str:
|
||||
"""返回元素样式属性值,可获取伪元素属性值 \n
|
||||
@ -367,15 +561,6 @@ return true;}"""
|
||||
def is_valid(self):
|
||||
return True
|
||||
|
||||
@property
|
||||
def text(self) -> str:
|
||||
"""返回元素内所有文本"""
|
||||
return get_ele_txt(make_session_ele(self.html))
|
||||
|
||||
@property
|
||||
def raw_text(self):
|
||||
return
|
||||
|
||||
def _get_ele_path(self, mode) -> str:
|
||||
"""返获取css路径或xpath路径"""
|
||||
if mode == 'xpath':
|
||||
@ -415,7 +600,7 @@ return true;}"""
|
||||
}
|
||||
return e(this);}
|
||||
'''
|
||||
t = self.run_script(js)['result']['value']
|
||||
t = self.run_script(js)
|
||||
return f':root{t}' if mode == 'css' else t
|
||||
|
||||
|
||||
@ -428,7 +613,7 @@ def make_chrome_ele(ele: ChromeElement,
|
||||
:param loc: 元素定位元组
|
||||
:param single: True则返回第一个,False则返回全部
|
||||
:param timeout: 查找元素超时时间
|
||||
:return: 返回DriverElement元素或它们组成的列表
|
||||
:return: 返回ChromeElement元素或它们组成的列表
|
||||
"""
|
||||
# ---------------处理定位符---------------
|
||||
if isinstance(loc, (str, tuple)):
|
||||
@ -457,14 +642,18 @@ def _find_by_xpath(ele: ChromeElement, xpath: str, single: bool, timeout: float)
|
||||
type_txt = '9' if single else '7'
|
||||
node_txt = 'this.contentDocument' if ele.tag in ('iframe', 'frame') else 'this'
|
||||
js = _make_js(xpath, type_txt, node_txt)
|
||||
r = ele.run_script(js)
|
||||
r = ele.page.run_cdp('Runtime.callFunctionOn',
|
||||
functionDeclaration=js, objectId=ele.obj_id, returnByValue=False, awaitPromise=True,
|
||||
userGesture=True)
|
||||
if r['result']['type'] == 'string':
|
||||
return r['result']['value']
|
||||
|
||||
if 'exceptionDetails' in r:
|
||||
if 'The result is not a node set' in r['result']['description']:
|
||||
js = _make_js(xpath, '1', node_txt)
|
||||
r = ele.run_script(js)
|
||||
r = ele.page.run_cdp('Runtime.callFunctionOn',
|
||||
functionDeclaration=js, objectId=ele.obj_id, returnByValue=False, awaitPromise=True,
|
||||
userGesture=True)
|
||||
return r['result']['value']
|
||||
else:
|
||||
raise SyntaxError(f'查询语句错误:\n{r}')
|
||||
@ -472,7 +661,9 @@ def _find_by_xpath(ele: ChromeElement, xpath: str, single: bool, timeout: float)
|
||||
t1 = perf_counter()
|
||||
while (r['result']['subtype'] == 'null'
|
||||
or r['result']['description'] == 'NodeList(0)') and perf_counter() - t1 < timeout:
|
||||
r = ele.run_script(js)
|
||||
r = ele.page.run_cdp('Runtime.callFunctionOn',
|
||||
functionDeclaration=js, objectId=ele.obj_id, returnByValue=False, awaitPromise=True,
|
||||
userGesture=True)
|
||||
|
||||
if single:
|
||||
if r['result']['subtype'] == 'null':
|
||||
@ -494,14 +685,18 @@ def _find_by_css(ele: ChromeElement, selector: str, single: bool, timeout: float
|
||||
selector = selector.replace('"', r'\"')
|
||||
find_all = '' if single else 'All'
|
||||
js = f'this.querySelector{find_all}("{selector}");'
|
||||
r = ele.run_script(js)
|
||||
r = ele.page.run_cdp('Runtime.callFunctionOn',
|
||||
functionDeclaration=js, objectId=ele.obj_id, returnByValue=False, awaitPromise=True,
|
||||
userGesture=True)
|
||||
if 'exceptionDetails' in r:
|
||||
raise SyntaxError(f'查询语句错误:\n{r}')
|
||||
|
||||
t1 = perf_counter()
|
||||
while (r['result']['subtype'] == 'null'
|
||||
or r['result']['description'] == 'NodeList(0)') and perf_counter() - t1 < timeout:
|
||||
r = ele.run_script(js)
|
||||
r = ele.page.run_cdp('Runtime.callFunctionOn',
|
||||
functionDeclaration=js, objectId=ele.obj_id, returnByValue=False, awaitPromise=True,
|
||||
userGesture=True)
|
||||
|
||||
if single:
|
||||
if r['result']['subtype'] == 'null':
|
||||
@ -552,7 +747,7 @@ else{a.push(e.snapshotItem(i));}}"""
|
||||
return js
|
||||
|
||||
|
||||
def run_script(page_or_ele, script: str, as_expr: bool = False, *args: Any) -> Any:
|
||||
def _run_script(page_or_ele, script: str, as_expr: bool = False, *args: Any) -> Any:
|
||||
"""运行javascript代码 \n
|
||||
:param page_or_ele: 页面对象或元素对象
|
||||
:param script: js文本
|
||||
@ -570,7 +765,6 @@ def run_script(page_or_ele, script: str, as_expr: bool = False, *args: Any) -> A
|
||||
if as_expr:
|
||||
res = page.run_cdp('Runtime.evaluate',
|
||||
expression=script,
|
||||
# contextId=self._contextId,
|
||||
returnByValue=False,
|
||||
awaitPromise=True,
|
||||
userGesture=True)
|
||||
@ -581,7 +775,6 @@ def run_script(page_or_ele, script: str, as_expr: bool = False, *args: Any) -> A
|
||||
res = page.run_cdp('Runtime.callFunctionOn',
|
||||
functionDeclaration=script,
|
||||
objectId=obj_id,
|
||||
# 'executionContextId': self._contextId,
|
||||
arguments=[_convert_argument(arg) for arg in args],
|
||||
returnByValue=False,
|
||||
awaitPromise=True,
|
||||
@ -734,3 +927,274 @@ class ChromeScroll(object):
|
||||
:return: None
|
||||
"""
|
||||
self._run_script(f'{{}}.scrollBy({pixel},0);')
|
||||
|
||||
|
||||
class ChromeSelect(object):
|
||||
"""ChromeSelect 类专门用于处理 d 模式下 select 标签"""
|
||||
|
||||
def __init__(self, ele: ChromeElement):
|
||||
"""初始化 \n
|
||||
:param ele: select 元素对象
|
||||
"""
|
||||
if ele.tag != 'select':
|
||||
raise TypeError(f"select方法只能在<select>元素使用,现在是:{ele.tag}。")
|
||||
|
||||
self._ele = ele
|
||||
|
||||
def __call__(self, text_or_index: Union[str, int, list, tuple], timeout=None) -> bool:
|
||||
"""选定下拉列表中子元素 \n
|
||||
:param text_or_index: 根据文本、值选或序号择选项,若允许多选,传入list或tuple可多选
|
||||
:param timeout: 超时时间,不输入默认实用页面超时时间
|
||||
:return: None
|
||||
"""
|
||||
i = 'index' if isinstance(text_or_index, int) else 'text'
|
||||
timeout = timeout if timeout is not None else self._ele.page.timeout
|
||||
return self._select(text_or_index, timeout=timeout)
|
||||
|
||||
@property
|
||||
def is_multi(self) -> bool:
|
||||
"""返回是否多选表单"""
|
||||
multi = self._ele.attr('multiple')
|
||||
return multi and multi.lower() != "false"
|
||||
|
||||
@property
|
||||
def options(self) -> List[ChromeElement]:
|
||||
"""返回所有选项元素组成的列表"""
|
||||
return self._ele.eles('tag:option')
|
||||
|
||||
@property
|
||||
def selected_option(self) -> Union[ChromeElement, None]:
|
||||
"""返回第一个被选中的option元素 \n
|
||||
:return: ChromeElement对象或None
|
||||
"""
|
||||
ele = self._ele.run_script('this.options[this.selectedIndex];')
|
||||
return ele
|
||||
|
||||
@property
|
||||
def selected_options(self) -> List[ChromeElement]:
|
||||
"""返回所有被选中的option元素列表 \n
|
||||
:return: ChromeElement对象组成的列表
|
||||
"""
|
||||
return [x for x in self.options if x.is_selected]
|
||||
|
||||
def clear(self) -> None:
|
||||
"""清除所有已选项"""
|
||||
self.select_ele.deselect_all()
|
||||
|
||||
def select_by_text(self, text: Union[str, list, tuple], timeout=None) -> bool:
|
||||
"""此方法用于根据text值选择项。当元素是多选列表时,可以接收list或tuple \n
|
||||
:param text: text属性值,传入list或tuple可选择多项
|
||||
:param timeout: 超时时间,不输入默认实用页面超时时间
|
||||
:return: None
|
||||
"""
|
||||
timeout = timeout if timeout is not None else self._ele.page.timeout
|
||||
return self._select(text, 'text', False, timeout)
|
||||
|
||||
def select_by_value(self, value: Union[str, list, tuple], timeout=None) -> bool:
|
||||
"""此方法用于根据value值选择项。当元素是多选列表时,可以接收list或tuple \n
|
||||
:param value: value属性值,传入list或tuple可选择多项
|
||||
:param timeout: 超时时间,不输入默认实用页面超时时间
|
||||
:return: None
|
||||
"""
|
||||
timeout = timeout if timeout is not None else self._ele.page.timeout
|
||||
return self._select(value, 'value', False, timeout)
|
||||
|
||||
def select_by_index(self, index: Union[int, list, tuple], timeout=None) -> bool:
|
||||
"""此方法用于根据index值选择项。当元素是多选列表时,可以接收list或tuple \n
|
||||
:param index: index属性值,传入list或tuple可选择多项
|
||||
:param timeout: 超时时间,不输入默认实用页面超时时间
|
||||
:return: None
|
||||
"""
|
||||
timeout = timeout if timeout is not None else self._ele.page.timeout
|
||||
return self._select(index, 'index', False, timeout)
|
||||
|
||||
def deselect_by_text(self, text: Union[str, list, tuple], timeout=None) -> bool:
|
||||
"""此方法用于根据text值取消选择项。当元素是多选列表时,可以接收list或tuple \n
|
||||
:param text: text属性值,传入list或tuple可取消多项
|
||||
:param timeout: 超时时间,不输入默认实用页面超时时间
|
||||
:return: None
|
||||
"""
|
||||
timeout = timeout if timeout is not None else self._ele.page.timeout
|
||||
return self._select(text, 'text', True, timeout)
|
||||
|
||||
def deselect_by_value(self, value: Union[str, list, tuple], timeout=None) -> bool:
|
||||
"""此方法用于根据value值取消选择项。当元素是多选列表时,可以接收list或tuple \n
|
||||
:param value: value属性值,传入list或tuple可取消多项
|
||||
:param timeout: 超时时间,不输入默认实用页面超时时间
|
||||
:return: None
|
||||
"""
|
||||
timeout = timeout if timeout is not None else self._ele.page.timeout
|
||||
return self._select(value, 'value', True, timeout)
|
||||
|
||||
def deselect_by_index(self, index: Union[int, list, tuple], timeout=None) -> bool:
|
||||
"""此方法用于根据index值取消选择项。当元素是多选列表时,可以接收list或tuple \n
|
||||
:param index: value属性值,传入list或tuple可取消多项
|
||||
:param timeout: 超时时间,不输入默认实用页面超时时间
|
||||
:return: None
|
||||
"""
|
||||
timeout = timeout if timeout is not None else self._ele.page.timeout
|
||||
return self._select(index, 'index', True, timeout)
|
||||
|
||||
def invert(self) -> None:
|
||||
"""反选"""
|
||||
if not self.is_multi:
|
||||
raise NotImplementedError("只能对多项选框执行反选。")
|
||||
|
||||
for i in self.options:
|
||||
i.click(by_js=True)
|
||||
|
||||
def _select(self,
|
||||
text_value_index: Union[str, int, list, tuple] = None,
|
||||
para_type: str = 'text',
|
||||
deselect: bool = False,
|
||||
timeout=None) -> bool:
|
||||
"""选定或取消选定下拉列表中子元素 \n
|
||||
:param text_value_index: 根据文本、值选或序号择选项,若允许多选,传入list或tuple可多选
|
||||
:param para_type: 参数类型,可选 'text'、'value'、'index'
|
||||
:param deselect: 是否取消选择
|
||||
:return: 是否选择成功
|
||||
"""
|
||||
if not self.is_multi and isinstance(text_value_index, (list, tuple)):
|
||||
raise TypeError('单选下拉列表不能传入list和tuple')
|
||||
|
||||
def do_select():
|
||||
if para_type == 'text':
|
||||
ele = self._ele(f'tx={text_value_index}', timeout=0)
|
||||
elif para_type == 'value':
|
||||
ele = self._ele(f'@value={text_value_index}', timeout=0)
|
||||
elif para_type == 'index':
|
||||
ele = self._ele(f'x:.//option[{int(text_value_index)}]', timeout=0)
|
||||
else:
|
||||
raise ValueError('para_type参数只能传入"text"、"value"或"index"。')
|
||||
|
||||
if not ele:
|
||||
return False
|
||||
|
||||
js = 'false' if deselect else 'true'
|
||||
ele.run_script(f'this.selected={js};')
|
||||
|
||||
return True
|
||||
|
||||
if isinstance(text_value_index, (str, int)):
|
||||
t1 = perf_counter()
|
||||
ok = do_select()
|
||||
while not ok and perf_counter() - t1 < timeout:
|
||||
sleep(.2)
|
||||
ok = do_select()
|
||||
return ok
|
||||
|
||||
elif isinstance(text_value_index, (list, tuple)):
|
||||
return self._select_multi(text_value_index, para_type, deselect)
|
||||
|
||||
else:
|
||||
raise TypeError('只能传入str、int、list和tuple类型。')
|
||||
|
||||
def _select_multi(self,
|
||||
text_value_index: Union[list, tuple] = None,
|
||||
para_type: str = 'text',
|
||||
deselect: bool = False) -> bool:
|
||||
"""选定或取消选定下拉列表中多个子元素 \n
|
||||
:param text_value_index: 根据文本、值选或序号择选多项
|
||||
:param para_type: 参数类型,可选 'text'、'value'、'index'
|
||||
:param deselect: 是否取消选择
|
||||
:return: 是否选择成功
|
||||
"""
|
||||
if para_type not in ('text', 'value', 'index'):
|
||||
raise ValueError('para_type参数只能传入“text”、“value”或“index”')
|
||||
|
||||
if not isinstance(text_value_index, (list, tuple)):
|
||||
raise TypeError('只能传入list或tuple类型。')
|
||||
|
||||
success = True
|
||||
for i in text_value_index:
|
||||
if not isinstance(i, (int, str)):
|
||||
raise TypeError('列表只能由str或int组成')
|
||||
|
||||
p = 'index' if isinstance(i, int) else para_type
|
||||
if not self._select(i, p, deselect):
|
||||
success = False
|
||||
|
||||
return success
|
||||
|
||||
# class ElementWaiter(object):
|
||||
# """等待元素在dom中某种状态,如删除、显示、隐藏"""
|
||||
#
|
||||
# def __init__(self,
|
||||
# page_or_ele,
|
||||
# loc_or_ele: Union[str, tuple, ChromeElement],
|
||||
# timeout: float = None):
|
||||
# """等待元素在dom中某种状态,如删除、显示、隐藏 \n
|
||||
# :param page_or_ele: 页面或父元素
|
||||
# :param loc_or_ele: 要等待的元素,可以是已有元素、定位符
|
||||
# :param timeout: 超时时间,默认读取页面超时时间
|
||||
# """
|
||||
# if isinstance(page_or_ele, ChromeElement):
|
||||
# page = page_or_ele.page
|
||||
# self.driver = page_or_ele.inner_ele
|
||||
# else:
|
||||
# page = page_or_ele
|
||||
# self.driver = page_or_ele.driver
|
||||
#
|
||||
# if isinstance(loc_or_ele, ChromeElement):
|
||||
# self.target = loc_or_ele.inner_ele
|
||||
#
|
||||
# elif isinstance(loc_or_ele, str):
|
||||
# self.target = str_to_loc(loc_or_ele)
|
||||
#
|
||||
# elif isinstance(loc_or_ele, tuple):
|
||||
# self.target = loc_or_ele
|
||||
#
|
||||
# else:
|
||||
# raise TypeError('loc_or_ele参数只能是str、tuple、DriverElement 或 WebElement类型。')
|
||||
#
|
||||
# self.timeout = timeout if timeout is not None else page.timeout
|
||||
#
|
||||
# def delete(self) -> bool:
|
||||
# """等待元素从dom删除"""
|
||||
# return self._wait_ele('del')
|
||||
#
|
||||
# def display(self) -> bool:
|
||||
# """等待元素从dom显示"""
|
||||
# return self._wait_ele('display')
|
||||
#
|
||||
# def hidden(self) -> bool:
|
||||
# """等待元素从dom隐藏"""
|
||||
# return self._wait_ele('hidden')
|
||||
#
|
||||
# def _wait_ele(self, mode: str) -> bool:
|
||||
# """执行等待
|
||||
# :param mode: 等待模式
|
||||
# :return: 是否等待成功
|
||||
# """
|
||||
# if isinstance(self.target, WebElement):
|
||||
# end_time = time() + self.timeout
|
||||
# while time() < end_time:
|
||||
# if mode == 'del':
|
||||
# try:
|
||||
# self.target.is_enabled()
|
||||
# except Exception:
|
||||
# return True
|
||||
#
|
||||
# elif mode == 'display' and self.target.is_displayed():
|
||||
# return True
|
||||
#
|
||||
# elif mode == 'hidden' and not self.target.is_displayed():
|
||||
# return True
|
||||
#
|
||||
# return False
|
||||
#
|
||||
# else:
|
||||
# try:
|
||||
# if mode == 'del':
|
||||
# WebDriverWait(self.driver, self.timeout).until_not(ec.presence_of_element_located(self.target))
|
||||
#
|
||||
# elif mode == 'display':
|
||||
# WebDriverWait(self.driver, self.timeout).until(ec.visibility_of_element_located(self.target))
|
||||
#
|
||||
# elif mode == 'hidden':
|
||||
# WebDriverWait(self.driver, self.timeout).until_not(ec.visibility_of_element_located(self.target))
|
||||
#
|
||||
# return True
|
||||
#
|
||||
# except Exception:
|
||||
# return False
|
||||
|
@ -14,7 +14,7 @@ from .config import DriverOptions, _cookies_to_tuple
|
||||
from .base import BasePage
|
||||
from .common import get_loc
|
||||
from .drission import connect_chrome
|
||||
from .chrome_element import ChromeElement, ChromeScroll, run_script
|
||||
from .chrome_element import ChromeElement, ChromeScroll, _run_script
|
||||
|
||||
|
||||
class ChromePage(BasePage):
|
||||
@ -55,7 +55,7 @@ class ChromePage(BasePage):
|
||||
return self.ele(loc_or_str, timeout)
|
||||
|
||||
@property
|
||||
def driver(self):
|
||||
def driver(self) -> Tab:
|
||||
return self._driver
|
||||
|
||||
@property
|
||||
@ -79,10 +79,7 @@ class ChromePage(BasePage):
|
||||
@property
|
||||
def tabs_count(self) -> int:
|
||||
"""返回标签页数量"""
|
||||
try:
|
||||
return len(self.tab_handles)
|
||||
except Exception:
|
||||
return 0
|
||||
return len(self.tab_handles)
|
||||
|
||||
@property
|
||||
def tab_handles(self) -> list:
|
||||
@ -126,7 +123,7 @@ class ChromePage(BasePage):
|
||||
:param args: 参数,按顺序在js文本中对应argument[0]、argument[2]...
|
||||
:return: 运行的结果
|
||||
"""
|
||||
return run_script(self, script, as_expr, *args)
|
||||
return _run_script(self, script, as_expr, *args)
|
||||
|
||||
def get(self,
|
||||
url: str,
|
||||
|
@ -562,7 +562,7 @@ def connect_chrome(option: DriverOptions) -> tuple:
|
||||
chrome_path = _get_chrome_path(show_msg=False)
|
||||
|
||||
if not chrome_path:
|
||||
raise FileNotFoundError('无法找到chrome.exe路径,请手动配置。')
|
||||
raise FileNotFoundError('无法找到chrome路径,请手动配置。')
|
||||
|
||||
debugger = _run_browser(port, chrome_path, args)
|
||||
|
||||
|
@ -4,7 +4,7 @@ tmp_path =
|
||||
|
||||
[chrome_options]
|
||||
debugger_address = 127.0.0.1:9222
|
||||
binary_location = chrome
|
||||
binary_location = chromium-browser
|
||||
arguments = ['--no-sandbox', '--disable-gpu', '--ignore-certificate-errors', '--disable-infobars', '--disable-popup-blocking']
|
||||
extensions = []
|
||||
experimental_options = {'prefs': {'profile.default_content_settings.popups': 0, 'profile.default_content_setting_values': {'notifications': 2}, 'plugins.plugins_list': [{'enabled': False, 'name': 'Chrome PDF Viewer'}]}, 'useAutomationExtension': False, 'excludeSwitches': ['enable-automation']}
|
||||
|
@ -9,6 +9,7 @@ from typing import Union, List, Tuple
|
||||
|
||||
from lxml.etree import tostring
|
||||
from lxml.html import HtmlElement, fromstring
|
||||
# from lxml.etree import Element as HtmlElement, fromstring
|
||||
|
||||
from .base import DrissionElement, BasePage, BaseElement
|
||||
from .common import get_ele_txt, get_loc, make_absolute_link
|
||||
|
@ -1,6 +1,7 @@
|
||||
# -*- coding:utf-8 -*-
|
||||
from typing import Union, Tuple
|
||||
from typing import Union, Tuple, List
|
||||
|
||||
from DownloadKit import DownloadKit
|
||||
from pychrome import Tab
|
||||
from requests import Session, Response
|
||||
from requests.structures import CaseInsensitiveDict
|
||||
@ -125,6 +126,7 @@ class WebPage(SessionPage, ChromePage, BasePage):
|
||||
"""返回Tab对象,如未初始化则按配置信息创建。 \n
|
||||
如设置了本地调试浏览器,可自动接入或打开浏览器进程。
|
||||
"""
|
||||
self.change_mode('d')
|
||||
if self._driver is None:
|
||||
self._connect_debugger(self._driver_options, self._setting_handle)
|
||||
|
||||
@ -156,6 +158,32 @@ class WebPage(SessionPage, ChromePage, BasePage):
|
||||
elif self._mode == 's':
|
||||
return super().get(url, show_errmsg, retry, interval, timeout, **kwargs)
|
||||
|
||||
def ele(self,
|
||||
loc_or_ele: Union[Tuple[str, str], str, ChromeElement, SessionElement],
|
||||
timeout: float = None) -> Union[ChromeElement, SessionElement, str, None]:
|
||||
"""返回第一个符合条件的元素、属性或节点文本 \n
|
||||
:param loc_or_ele: 元素的定位信息,可以是元素对象,loc元组,或查询字符串
|
||||
:param timeout: 查找元素超时时间,默认与页面等待时间一致
|
||||
:return: 元素对象或属性、文本节点文本
|
||||
"""
|
||||
if self._mode == 's':
|
||||
return super().ele(loc_or_ele)
|
||||
elif self._mode == 'd':
|
||||
return super(SessionPage, self).ele(loc_or_ele, timeout=timeout)
|
||||
|
||||
def eles(self,
|
||||
loc_or_str: Union[Tuple[str, str], str],
|
||||
timeout: float = None) -> List[Union[ChromeElement, SessionElement, str]]:
|
||||
"""返回页面中所有符合条件的元素、属性或节点文本 \n
|
||||
:param loc_or_str: 元素的定位信息,可以是loc元组,或查询字符串
|
||||
:param timeout: 查找元素超时时间,默认与页面等待时间一致
|
||||
:return: 元素对象或属性、文本组成的列表
|
||||
"""
|
||||
if self._mode == 's':
|
||||
return super().eles(loc_or_str)
|
||||
elif self._mode == 'd':
|
||||
return super(SessionPage, self).eles(loc_or_str, timeout=timeout)
|
||||
|
||||
def change_mode(self, mode: str = None, go: bool = True) -> None:
|
||||
"""切换模式,接收's'或'd',除此以外的字符串会切换为 d 模式 \n
|
||||
切换时会把当前模式的cookies复制到目标模式 \n
|
||||
@ -249,6 +277,48 @@ class WebPage(SessionPage, ChromePage, BasePage):
|
||||
|
||||
self.session.cookies.set(cookie['name'], cookie['value'], **kwargs)
|
||||
|
||||
# ----------------重写SessionPage的函数-----------------------
|
||||
def post(self,
|
||||
url: str,
|
||||
data: Union[dict, str] = None,
|
||||
show_errmsg: bool = False,
|
||||
retry: int = None,
|
||||
interval: float = None,
|
||||
**kwargs) -> bool:
|
||||
"""用post方式跳转到url,会切换到s模式 \n
|
||||
:param url: 目标url
|
||||
:param data: post方式时提交的数据
|
||||
:param show_errmsg: 是否显示和抛出异常
|
||||
:param retry: 重试次数
|
||||
:param interval: 重试间隔(秒)
|
||||
:param kwargs: 连接参数
|
||||
:return: url是否可用
|
||||
"""
|
||||
self.change_mode('s', go=False)
|
||||
return super().post(url, data, show_errmsg, retry, interval, **kwargs)
|
||||
|
||||
@property
|
||||
def download(self) -> DownloadKit:
|
||||
if self.mode == 'd':
|
||||
self.cookies_to_session()
|
||||
return super().download
|
||||
|
||||
def _ele(self,
|
||||
loc_or_ele: Union[Tuple[str, str], str, ChromeElement, SessionElement],
|
||||
timeout: float = None, single: bool = True) \
|
||||
-> Union[ChromeElement, SessionElement, str, None, List[Union[SessionElement, str]], List[
|
||||
Union[ChromeElement, str]]]:
|
||||
"""返回页面中符合条件的元素、属性或节点文本,默认返回第一个 \n
|
||||
:param loc_or_ele: 元素的定位信息,可以是元素对象,loc元组,或查询字符串
|
||||
:param timeout: 查找元素超时时间,d模式专用
|
||||
:param single: True则返回第一个,False则返回全部
|
||||
:return: 元素对象或属性、文本节点文本
|
||||
"""
|
||||
if self._mode == 's':
|
||||
return super()._ele(loc_or_ele, single=single)
|
||||
elif self._mode == 'd':
|
||||
return super(SessionPage, self)._ele(loc_or_ele, timeout=timeout, single=single)
|
||||
|
||||
def _set_session(self, data: dict) -> None:
|
||||
"""根据传入字典对session进行设置 \n
|
||||
:param data: session配置字典
|
||||
@ -306,4 +376,4 @@ class WebPage(SessionPage, ChromePage, BasePage):
|
||||
self._session_options = Session_or_Options
|
||||
|
||||
else:
|
||||
raise TypeError('session_or_options参数只能接收Session, dict, SessionOptions或False。')
|
||||
raise TypeError('session_or_options参数只能接收Session, dict, SessionOptions或False。')
|
Loading…
x
Reference in New Issue
Block a user