初步完成ChromiumFrame,待完善

This commit is contained in:
g1879 2022-12-24 23:01:23 +08:00
parent 388e78c34c
commit b1d9666fd8
3 changed files with 205 additions and 282 deletions

View File

@ -1099,7 +1099,7 @@ def find_in_chromium_ele(ele, loc, single=True, timeout=None, relative=True):
if isinstance(loc, (str, tuple)):
loc = get_loc(loc)
else:
raise ValueError("定位符必须为str或长度为2的tuple对象。")
raise ValueError(f"定位符必须为str或长度为2的tuple对象。现在是:{loc}")
loc_str = loc[1]
if loc[0] == 'xpath' and loc[1].lstrip().startswith('/'):

View File

@ -8,135 +8,25 @@ from time import sleep
from .chromium_base import ChromiumBase
from .chromium_element import ChromiumElement
from .session_element import make_session_ele
class cc(ChromiumBase):
def __init__(self, address: str, tab_id=None, timeout=None, frame_id=None, page=None, frame_ele=None):
self.frame_id = frame_id
self.frame_ele = frame_ele
self.ppp = page
super().__init__(address, tab_id, timeout)
self.backend_id = frame_ele.backend_id
self.doc_ele = ChromiumElement(self.ppp, backend_id=self.backend_id)
def _get_new_document(self):
"""刷新cdp使用的document数据"""
if not self._is_reading:
self._is_reading = True
if self._debug:
print('---获取document')
while True:
try:
self.doc_ele = ChromiumElement(self.ppp, backend_id=self.backend_id)
break
except Exception:
# raise
pass
if self._debug:
print('---获取document结束')
self._is_loading = False
self._is_reading = False
def _onFrameStartedLoading(self, **kwargs):
"""页面开始加载时触发"""
# pass
if kwargs['frameId'] == self.frame_id:
self._is_loading = True
if self._debug:
print('页面开始加载 FrameStartedLoading')
def _onFrameStoppedLoading(self, **kwargs):
"""页面加载完成后触发"""
# pass
if kwargs['frameId'] == self.frame_id and self._first_run is False and self._is_loading:
if self._debug:
print('页面停止加载 FrameStoppedLoading')
self._get_new_document()
def _d_connect(self, to_url, times=0, interval=1, show_errmsg=False, timeout=None):
"""尝试连接,重试若干次 \n
:param to_url: 要访问的url
:param times: 重试次数
:param interval: 重试间隔
:param show_errmsg: 是否抛出异常
:param timeout: 连接超时时间
:return: 是否成功返回None表示不确定
"""
err = None
timeout = timeout if timeout is not None else self.timeouts.page_load
for t in range(times + 1):
err = None
result = self._driver.Page.navigate(url=to_url, frameId=self.frame_id)
is_timeout = not self._wait_loaded(timeout)
while self.is_loading:
sleep(.1)
if is_timeout:
err = TimeoutError('页面连接超时。')
if 'errorText' in result:
err = ConnectionError(result['errorText'])
if not err:
break
if t < times:
sleep(interval)
while self.ready_state != 'complete':
sleep(.1)
if self._debug:
print('重试')
if show_errmsg:
print(f'重试 {to_url}')
if err:
if show_errmsg:
raise err if err is not None else ConnectionError('连接异常。')
return False
return True
class ChromiumFrame(object):
"""frame元素的类。
frame既是元素也是页面可以获取元素属性和定位周边元素也能跳转到网址
同域和异域的frame处理方式不一样同域的当作元素看待异域的当作页面看待"""
class ChromiumFrame(ChromiumBase):
def __init__(self, page, ele):
"""
:param page: frame所在页面对象
:param ele: frame容器元素对象
"""
self.page = page
self.frame_ele = ele
node = page.run_cdp('DOM.describeNode', nodeId=ele.node_id, not_change=True)['node']
self.frame_id = node.get('frameId', None)
self.frame_id = node['frameId']
self._backend_id = ele.backend_id
self._frame_ele = ele
if self._is_inner_frame():
self._is_diff_domain = False
self.frame_page = None
# self.backend_id = node.get('contentDocument', None).get('backendNodeId', None)
# obj_id = self.page.driver.DOM.resolveNode(backendNodeId=self.backend_id)['object']['objectId']
self.cc = cc(page.address, page.tab_id, page.timeout, self.frame_id, self.page, self.frame_ele)
self.cc._debug = True
# self._doc_ele = ChromiumElement(page, obj_id=obj_id)
self._doc_ele = self.cc.doc_ele
else: # 若frame_id不在frame_tree中为异域frame
super().__init__(page.address, page.tab_id, page.timeout)
self.doc_ele = ChromiumElement(self.page, backend_id=node['contentDocument']['backendNodeId'])
else:
self._is_diff_domain = True
self._doc_ele = None
self.frame_page = ChromiumBase(page.address, self.frame_id)
self.frame_page.set_page_load_strategy(self.page.page_load_strategy)
self.frame_page.timeouts = self.page.timeouts
super().__init__(page.address, self.frame_id, page.timeout)
obj_id = super().run_script('document;', as_expr=True)['objectId']
self.doc_ele = ChromiumElement(self, obj_id=obj_id)
def __call__(self, loc_or_str, timeout=None):
"""在内部查找元素 \n
@ -152,74 +42,61 @@ class ChromiumFrame(object):
attrs = [f"{attr}='{attrs[attr]}'" for attr in attrs]
return f'<ChromiumFrame {self.frame_ele.tag} {" ".join(attrs)}>'
def _is_inner_frame(self):
"""返回当前frame是否同域"""
return self.frame_id in str(self.page.run_cdp('Page.getFrameTree', not_change=True)['frameTree'])
def _get_new_document(self):
"""刷新cdp使用的document数据"""
if not self._is_reading:
self._is_reading = True
if self._debug:
print('---获取document')
while True:
try:
if self._is_inner_frame():
node = self.page.run_cdp('DOM.describeNode', backendNodeId=self.backend_id, not_change=True)[
'node']
self.doc_ele = ChromiumElement(self.page, backend_id=node['contentDocument']['backendNodeId'])
else:
b_id = self._tab_obj.DOM.getDocument()['root']['backendNodeId']
self.doc_ele = ChromiumElement(self, backend_id=b_id)
break
except Exception:
raise
pass
if self._debug:
print('---获取document结束')
self._is_loading = False
self._is_reading = False
def _onFrameStartedLoading(self, **kwargs):
"""页面开始加载时触发"""
# print('开始', kwargs['frameId'])
if kwargs['frameId'] == self.frame_id:
self._is_loading = True
if self._debug:
print('页面开始加载 FrameStartedLoading')
def _onFrameStoppedLoading(self, **kwargs):
"""页面加载完成后触发"""
# print('停止', kwargs['frameId'])
if kwargs['frameId'] == self.frame_id and self._first_run is False and self._is_loading:
if self._debug:
print('页面停止加载 FrameStoppedLoading')
self._get_new_document()
@property
def tag(self):
"""返回元素tag"""
return self.frame_ele.tag
def tab_id(self):
"""返回当前标签页id"""
return self.page.tab_id
@property
def url(self):
"""返回frame当前访问的url"""
if self._is_diff_domain:
return self.frame_page.url
else:
r = self.page.run_cdp('DOM.describeNode', nodeId=self.frame_ele.node_id)
return r['node']['contentDocument']['documentURL']
@property
def html(self):
"""返回元素outerHTML文本"""
if self._is_diff_domain:
tag = self.tag
out_html = self.page.run_cdp('DOM.getOuterHTML',
nodeId=self.frame_ele.node_id, not_change=True)['outerHTML']
in_html = self.frame_page.html
sign = search(rf'<{tag}.*?>', out_html).group(0)
return f'{sign}{in_html}</{tag}>'
else:
return self.frame_ele.html
@property
def title(self):
"""返回frame内网页title"""
d = self.frame_page if self._is_diff_domain else self.frame_ele
ele = d.ele('xpath://title')
return ele.text if ele else None
@property
def cookies(self):
"""以dict格式返回cookies"""
return self.frame_page.cookies if self._is_diff_domain else self.page.cookies
@property
def inner_html(self):
"""返回元素innerHTML文本"""
return self.frame_page.html if self._is_diff_domain else self.frame_ele.inner_html
@property
def attrs(self):
"""返回frame元素所有attribute属性"""
return self.frame_ele.attrs
@property
def frame_size(self):
"""返回frame内页面尺寸格式(长, 高)"""
if self._is_diff_domain:
return self.frame_page.size
else:
h = self.frame_ele.run_script('return this.contentDocument.body.scrollHeight;')
w = self.frame_ele.run_script('return this.contentDocument.body.scrollWidth;')
return w, h
@property
def size(self):
"""返回frame元素大小"""
return self.frame_ele.size
def backend_id(self):
"""返回cdp中的node id"""
return self._backend_id
@property
def obj_id(self):
@ -232,9 +109,65 @@ class ChromiumFrame(object):
return self.frame_ele.node_id
@property
def backend_id(self):
"""返回cdp中的node id"""
return self.frame_ele.backend_id
def frame_ele(self):
"""返回总页面上的frame元素"""
return self._frame_ele
@property
def tag(self):
"""返回元素tag"""
return self.frame_ele.tag
@property
def url(self):
"""返回frame当前访问的url"""
return self.doc_ele.run_script('return this.location.href;')
@property
def html(self):
"""返回元素outerHTML文本"""
tag = self.tag
out_html = self.page.run_cdp('DOM.getOuterHTML',
nodeId=self.frame_ele.node_id, not_change=True)['outerHTML']
sign = search(rf'<{tag}.*?>', out_html).group(0)
return f'{sign}{self.inner_html}</{tag}>'
@property
def inner_html(self):
"""返回元素innerHTML文本"""
return self.doc_ele.run_script('return this.documentElement.outerHTML;')
@property
def title(self):
"""返回页面title"""
return self.ele('t:title').text
@property
def cookies(self):
"""以dict格式返回cookies"""
return super().cookies if self._is_diff_domain else self.doc_ele.run_script('return this.cookie;')
@property
def attrs(self):
"""返回frame元素所有attribute属性"""
return self.frame_ele.attrs
@property
def frame_size(self):
"""返回frame内页面尺寸格式(长, 高)"""
w = self.doc_ele.run_script('return this.body.scrollWidth')
h = self.doc_ele.run_script('return this.body.scrollHeight')
return w, h
@property
def size(self):
"""返回frame元素大小"""
return self.frame_ele.size
@property
def active_ele(self):
"""返回当前焦点所在元素"""
return self.doc_ele.run_script('return this.activeElement;')
@property
def location(self):
@ -256,94 +189,9 @@ class ChromiumFrame(object):
"""返回frame的css selector绝对路径"""
return self.frame_ele.css_path
def get(self, url, show_errmsg=False, retry=None, interval=None, timeout=None):
"""访问目标网页url为同域名时只有url参数生效 \n
:param url: 目标url
:param show_errmsg: 是否显示和抛出异常
:param retry: 重试次数
:param interval: 重试间隔
:param timeout: 连接超时时间
:return: 目标url是否可用
"""
# todo: 处理同域名跳转到异域,及同域跳转到异域的情况
if self._is_diff_domain:
r = self.frame_page.get(url, show_errmsg, retry, interval, timeout)
else:
r = self.cc.get(url, show_errmsg, retry, interval, timeout)
# self.frame_ele.run_script(f'this.contentWindow.location="{url}";')
return r
def refresh(self):
"""刷新frame页面"""
if self._is_diff_domain:
raise RuntimeError('refresh()仅支持同域frame。')
else:
try:
self.frame_ele.run_script('this.contentWindow.location.reload();')
except RuntimeError:
return RuntimeError('非同源域名无法执行refresh()。')
def forward(self, steps=1):
"""在浏览历史中前进若干步 \n
:param steps: 前进步数
:return: None
"""
if self._is_diff_domain:
raise RuntimeError('forward()仅支持同域frame。')
else:
try:
self.frame_ele.run_script(f'this.contentWindow.history.go({steps});')
except RuntimeError:
return RuntimeError('非同源域名无法执行forward()。')
def back(self, steps=1):
"""在浏览历史中后退若干步 \n
:param steps: 后退步数
:return: None
"""
if self._is_diff_domain:
raise RuntimeError('back()仅支持同域frame。')
else:
try:
self.frame_ele.run_script(f'this.contentWindow.history.go({-steps});')
except RuntimeError:
return RuntimeError('非同源域名无法执行back()。')
def ele(self, loc_or_str, timeout=None):
"""在frame内查找单个元素 \n
:param loc_or_str: 定位符或元素对象
:param timeout: 查找超时时间
:return: ChromiumElement对象
"""
d = self.frame_page if self._is_diff_domain else self.cc.doc_ele
return d.ele(loc_or_str, timeout)
def eles(self, loc_or_str, timeout=None):
"""获取所有符合条件的元素对象 \n
:param loc_or_str: 定位符或元素对象
:param timeout: 查找超时时间
:return: ChromiumElement对象组成的列表
"""
d = self.frame_page if self._is_diff_domain else self.cc.doc_ele
return d.eles(loc_or_str, timeout)
def s_ele(self, loc_or_str=None):
"""查找第一个符合条件的元素以SessionElement形式返回处理复杂页面时效率很高 \n
:param loc_or_str: 元素的定位信息可以是loc元组或查询字符串
:return: SessionElement对象或属性文本
"""
if isinstance(loc_or_str, ChromiumElement):
return make_session_ele(loc_or_str)
else:
return make_session_ele(self, loc_or_str)
def s_eles(self, loc_or_str=None):
"""查找所有符合条件的元素以SessionElement列表形式返回 \n
:param loc_or_str: 元素的定位信息可以是loc元组或查询字符串
:return: SessionElement对象组成的列表
"""
return make_session_ele(self, loc_or_str, single=False)
self.doc_ele.run_script('this.location.reload();')
def attr(self, attr):
"""返回frame元素attribute属性值 \n
@ -368,8 +216,13 @@ class ChromiumFrame(object):
self.frame_ele.remove_attr(attr)
def run_script(self, script, as_expr=False, *args):
# todo:
pass
"""运行javascript代码 \n
:param script: js文本
:param as_expr: 是否作为表达式运行为True时args无效
:param args: 参数按顺序在js文本中对应argument[0]argument[2]...
:return: 运行的结果
"""
return self.doc_ele.run_script(script, as_expr=as_expr, *args)
def parent(self, level_or_loc=1):
"""返回上面某一级父元素,可指定层数或用查询语法定位 \n
@ -437,3 +290,65 @@ class ChromiumFrame(object):
:return: 本元素前面的元素或节点组成的列表
"""
return self.frame_ele.befores(filter_loc, timeout)
def _ele(self, loc_or_ele, timeout=None, single=True, relative=False):
"""在frame内查找单个元素 \n
:param loc_or_ele: 定位符或元素对象
:param timeout: 查找超时时间
:return: ChromiumElement对象
"""
if isinstance(loc_or_ele, ChromiumElement):
return loc_or_ele
while self.is_loading:
sleep(.05)
return self.doc_ele.ele(loc_or_ele, timeout) if single else self.doc_ele.eles(loc_or_ele, timeout)
def _d_connect(self, to_url, times=0, interval=1, show_errmsg=False, timeout=None):
"""尝试连接,重试若干次 \n
:param to_url: 要访问的url
:param times: 重试次数
:param interval: 重试间隔
:param show_errmsg: 是否抛出异常
:param timeout: 连接超时时间
:return: 是否成功返回None表示不确定
"""
err = None
timeout = timeout if timeout is not None else self.timeouts.page_load
for t in range(times + 1):
err = None
result = self._driver.Page.navigate(url=to_url, frameId=self.frame_id)
is_timeout = not self._wait_loaded(timeout)
while self.is_loading:
sleep(.1)
if is_timeout:
err = TimeoutError('页面连接超时。')
if 'errorText' in result:
err = ConnectionError(result['errorText'])
if not err:
break
if t < times:
sleep(interval)
while self.ready_state != 'complete':
sleep(.1)
if self._debug:
print('重试')
if show_errmsg:
print(f'重试 {to_url}')
if err:
if show_errmsg:
raise err if err is not None else ConnectionError('连接异常。')
return False
return True
def _is_inner_frame(self):
"""返回当前frame是否同域"""
return self.frame_id in str(self.page.run_cdp('Page.getFrameTree', not_change=True)['frameTree'])

View File

@ -10,19 +10,21 @@ from .chromium_element import ChromiumElement
from .chromium_base import ChromiumBase
class ChromiumFrame(object):
class ChromiumFrame(ChromiumBase):
"""frame元素的类。
frame既是元素也是页面可以获取元素属性和定位周边元素也能跳转到网址
同域和异域的frame处理方式不一样同域的当作元素看待异域的当作页面看待"""
def __init__(self, page: ChromiumBase, ele: ChromiumElement):
self.frame_ele: ChromiumElement = ...
self.frame_page: ChromiumBase = ...
self.page: ChromiumBase = ...
self._doc_ele: ChromiumElement = ...
self.frame_id: str = ...
self._frame_ele: ChromiumElement = ...
self._backend_id: str = ...
self.frame_page: ChromiumBase = ...
self._doc_ele: ChromiumElement = ...
self._is_diff_domain: bool = ...
self.is_loading: bool = ...
self.doc_ele: ChromiumElement = ...
# self.is_loading: bool = ...
def __call__(self,
loc_or_str: Union[Tuple[str, str], str],
@ -59,6 +61,9 @@ class ChromiumFrame(object):
@property
def size(self) -> tuple: ...
@property
def active_ele(self) -> ChromiumElement: ...
@property
def obj_id(self) -> str: ...
@ -68,6 +73,9 @@ class ChromiumFrame(object):
@property
def backend_id(self) -> str: ...
@property
def frame_ele(self) -> ChromiumElement: ...
@property
def location(self) -> dict: ...
@ -94,7 +102,7 @@ class ChromiumFrame(object):
def back(self, steps: int = ...) -> None: ...
def ele(self,
loc_or_str: Union[Tuple[str, str], str, ChromiumElement, 'ChromiumFrame'],
loc_or_ele: Union[Tuple[str, str], str, ChromiumElement, 'ChromiumFrame'],
timeout: float = ...): ...
def eles(self,