改进ChromiumFrame,未完成

This commit is contained in:
g1879 2022-12-22 16:06:27 +08:00
parent 517da31d30
commit 59f4adb1b2
4 changed files with 96 additions and 36 deletions

View File

@ -263,10 +263,10 @@ class DrissionElement(BaseElement):
class BasePage(BaseParser):
"""页面类的基类"""
def __init__(self, timeout=10):
def __init__(self, timeout=None):
"""初始化函数"""
self._url = None
self.timeout = timeout
self.timeout = timeout if timeout is not None else 10
self.retry_times = 3
self.retry_interval = 2
self._url_available = None

View File

@ -14,6 +14,8 @@ from .common import make_absolute_link, get_loc, get_ele_txt, format_html, is_js
from .keys import _keys_to_typing, _keyDescriptionForString, _keyDefinitions
from .session_element import make_session_ele
__FRAME_ELEMENT__ = ('iframe', 'frame')
class ChromiumElement(DrissionElement):
"""ChromePage页面对象中的元素对象"""
@ -38,7 +40,8 @@ class ChromiumElement(DrissionElement):
self._node_id = self._get_node_id(obj_id)
self._obj_id = obj_id
self._doc_id = self.run_script('return this.ownerDocument;')['objectId']
doc = self.run_script('return this.ownerDocument;')
self._doc_id = doc['objectId'] if doc else None
def __repr__(self):
attrs = self.attrs
@ -738,7 +741,6 @@ class ChromiumElement(DrissionElement):
"""返获取css路径或xpath路径"""
if mode == 'xpath':
txt1 = 'var tag = el.nodeName.toLowerCase();'
# txt2 = '''return '//' + tag + '[@id="' + el.id + '"]' + path;'''
txt3 = ''' && sib.nodeName.toLowerCase()==tag'''
txt4 = '''
if(nth>1){path = '/' + tag + '[' + nth + ']' + path;}
@ -747,7 +749,6 @@ class ChromiumElement(DrissionElement):
elif mode == 'css':
txt1 = ''
# txt2 = '''return '#' + el.id + path;'''
txt3 = ''
txt4 = '''path = '>' + ":nth-child(" + nth + ")" + path;'''
txt5 = '''return path.substr(1);'''
@ -1116,7 +1117,6 @@ def _find_by_xpath(ele, xpath, single, timeout, relative=True):
if r['result']['subtype'] == 'null':
return None
else:
# return ChromiumElement(ele.page, obj_id=r['result']['objectId'])
return make_chromium_ele(ele.page, obj_id=r['result']['objectId'])
else:
@ -1237,9 +1237,6 @@ def run_script(page_or_ele, script, as_expr=False, timeout=None, args=None, not_
if isinstance(page_or_ele, (ChromiumElement, ChromiumShadowRootElement)):
page = page_or_ele.page
obj_id = page_or_ele.obj_id
# todo:
# elif isinstance(page_or_ele, ChromiumFrame):
# pass
else:
page = page_or_ele
obj_id = page_or_ele._root_id

View File

@ -4,7 +4,6 @@
@Contact : g1879@qq.com
"""
from re import search
from urllib.parse import urlparse
from .chromium_base import ChromiumBase
from .chromium_element import ChromiumElement
@ -23,20 +22,22 @@ class ChromiumFrame(object):
"""
self.page = page
self.frame_ele = ele
self.frame_id = page.run_cdp('DOM.describeNode', nodeId=ele.node_id)['node'].get('frameId', None)
node = page.run_cdp('DOM.describeNode', nodeId=ele.node_id, not_change=True)['node']
self.frame_id = node.get('frameId', None)
# 有src属性且域名和主框架不一样为异域frame
src = ele.attr('src')
if src and urlparse(src).netloc != urlparse(page.url).netloc:
if self.frame_id in str(self.page.run_cdp('Page.getFrameTree', not_change=True)['frameTree']):
self._is_diff_domain = False
self.frame_page = None
backend_id = node.get('contentDocument', None).get('backendNodeId', None)
obj_id = self.page.driver.DOM.resolveNode(backendNodeId=backend_id)['object']['objectId']
self._doc_ele = ChromiumElement(page, obj_id=obj_id)
else: # 若frame_id不在frame_tree中为异域frame
self._is_diff_domain = True
self._doc_ele = None
self.frame_page = ChromiumBase(page.address, self.frame_id)
self.frame_page.set_page_load_strategy(self.page.page_load_strategy)
self.frame_page.timeouts = self.page.timeouts
self.frame_page._debug = True
else:
self.frame_page = None
self._is_diff_domain = False
def __call__(self, loc_or_str, timeout=None):
"""在内部查找元素 \n
@ -59,7 +60,7 @@ class ChromiumFrame(object):
@property
def url(self):
""""""
"""返回frame当前访问的url"""
if self._is_diff_domain:
return self.frame_page.url
else:
@ -71,7 +72,8 @@ class ChromiumFrame(object):
"""返回元素outerHTML文本"""
if self._is_diff_domain:
tag = self.tag
out_html = self.page.run_cdp('DOM.getOuterHTML', nodeId=self.frame_ele.node_id)['outerHTML']
out_html = self.page.run_cdp('DOM.getOuterHTML',
nodeId=self.frame_ele.node_id, not_change=True)['outerHTML']
in_html = self.frame_page.html
sign = search(rf'<{tag}.*?>', out_html).group(0)
return f'{sign}{in_html}</{tag}>'
@ -88,6 +90,7 @@ class ChromiumFrame(object):
@property
def cookies(self):
"""以dict格式返回cookies"""
return self.frame_page.cookies if self._is_diff_domain else self.page.cookies
@property
@ -135,6 +138,16 @@ class ChromiumFrame(object):
"""返回frame元素是否显示"""
return self.frame_ele.is_displayed
@property
def xpath(self):
"""返回frame的xpath绝对路径"""
return self.frame_ele.xpath
@property
def css_path(self):
"""返回frame的css selector绝对路径"""
return self.frame_ele.css_path
def get(self, url, show_errmsg=False, retry=None, interval=None, timeout=None):
"""访问目标网页 \n
:param url: 目标url
@ -144,22 +157,55 @@ class ChromiumFrame(object):
:param timeout: 连接超时时间
:return: 目标url是否可用
"""
# todo: 处理跳转到异域的情况
if self._is_diff_domain:
return self.page.get(url, show_errmsg, retry, interval, timeout)
return self.frame_page.get(url, show_errmsg, retry, interval, timeout)
else:
# todo:
pass
self.frame_ele.run_script(f'this.contentWindow.location="{url}";')
def refresh(self):
"document.getElementById('some_frame_id').contentWindow.location.reload();"
"""刷新frame页面"""
if self._is_diff_domain:
raise RuntimeError('refresh()仅支持同域frame。')
else:
try:
self.frame_ele.run_script('this.contentWindow.location.reload();')
except RuntimeError:
return RuntimeError('非同源域名无法执行refresh()。')
def forward(self, steps=1):
"""在浏览历史中前进若干步 \n
:param steps: 前进步数
:return: None
"""
if self._is_diff_domain:
raise RuntimeError('forward()仅支持同域frame。')
else:
try:
self.frame_ele.run_script(f'this.contentWindow.history.go({steps});')
except RuntimeError:
return RuntimeError('非同源域名无法执行forward()。')
def back(self, steps=1):
"""在浏览历史中后退若干步 \n
:param steps: 后退步数
:return: None
"""
if self._is_diff_domain:
raise RuntimeError('back()仅支持同域frame。')
else:
try:
self.frame_ele.run_script(f'this.contentWindow.history.go({-steps});')
except RuntimeError:
return RuntimeError('非同源域名无法执行back()。')
def ele(self, loc_or_str, timeout=None):
"""在frame内查找单个元素
"""在frame内查找单个元素 \n
:param loc_or_str: 定位符或元素对象
:param timeout: 查找超时时间
:return: ChromiumElement对象
"""
d = self.frame_page if self._is_diff_domain else self.frame_ele
d = self.frame_page if self._is_diff_domain else self._doc_ele
return d.ele(loc_or_str, timeout)
def eles(self, loc_or_str, timeout=None):
@ -168,7 +214,7 @@ class ChromiumFrame(object):
:param timeout: 查找超时时间
:return: ChromiumElement对象组成的列表
"""
d = self.frame_page if self._is_diff_domain else self.frame_ele
d = self.frame_page if self._is_diff_domain else self._doc_ele
return d.eles(loc_or_str, timeout)
def s_ele(self, loc_or_str=None):
@ -210,6 +256,10 @@ class ChromiumFrame(object):
"""
self.frame_ele.remove_attr(attr)
def run_script(self, script, as_expr=False, *args):
# todo:
pass
def parent(self, level_or_loc=1):
"""返回上面某一级父元素,可指定层数或用查询语法定位 \n
:param level_or_loc: 第几级父元素或定位符

View File

@ -3,7 +3,7 @@
@Author : g1879
@Contact : g1879@qq.com
"""
from typing import Union, Tuple, List
from typing import Union, Tuple, List, Any
from session_element import SessionElement
from .chromium_element import ChromiumElement
@ -19,6 +19,7 @@ class ChromiumFrame(object):
self.frame_ele: ChromiumElement = ...
self.frame_page: ChromiumBase = ...
self.page: ChromiumBase = ...
self._doc_ele: ChromiumElement =...
self.frame_id: str = ...
self._is_diff_domain: bool = ...
self.is_loading: bool = ...
@ -68,6 +69,12 @@ class ChromiumFrame(object):
@property
def is_displayed(self) -> bool: ...
@property
def xpath(self) -> str: ...
@property
def css_path(self) -> str: ...
def get(self,
url: str,
show_errmsg: bool = ...,
@ -77,6 +84,10 @@ class ChromiumFrame(object):
def refresh(self) -> None: ...
def forward(self, steps: int = ...) -> None: ...
def back(self, steps: int = ...) -> None: ...
def ele(self,
loc_or_str: Union[Tuple[str, str], str, ChromiumElement, 'ChromiumFrame'],
timeout: float = ...): ...
@ -96,36 +107,38 @@ class ChromiumFrame(object):
def remove_attr(self, attr: str) -> None: ...
def run_script(self, script: str, as_expr: bool = ..., *args: Any) -> Any: ...
def parent(self, level_or_loc: Union[tuple, str, int] = ...) -> Union[ChromiumElement, None]: ...
def prev(self,
filter_loc: Union[tuple, str] = ...,
index: int = ...,
timeout: float = ...) -> Union[ChromiumElement, str, None]: ...
timeout: float = ...) -> Union[ChromiumElement, ChromiumFrame, str, None]: ...
def next(self,
filter_loc: Union[tuple, str] = ...,
index: int = ...,
timeout: float = ...) -> Union[ChromiumElement, str, None]: ...
timeout: float = ...) -> Union[ChromiumElement, ChromiumFrame, str, None]: ...
def before(self,
filter_loc: Union[tuple, str] = ...,
index: int = ...,
timeout: float = ...) -> Union[ChromiumElement, str, None]: ...
timeout: float = ...) -> Union[ChromiumElement, ChromiumFrame, str, None]: ...
def after(self,
filter_loc: Union[tuple, str] = ...,
index: int = ...,
timeout: float = ...) -> Union[ChromiumElement, str, None]: ...
timeout: float = ...) -> Union[ChromiumElement, ChromiumFrame, str, None]: ...
def prevs(self,
filter_loc: Union[tuple, str] = ...,
timeout: float = ...) -> List[Union[ChromiumElement, str]]: ...
timeout: float = ...) -> List[Union[ChromiumElement, ChromiumFrame, str]]: ...
def nexts(self,
filter_loc: Union[tuple, str] = ...,
timeout: float = ...) -> List[Union[ChromiumElement, str]]: ...
timeout: float = ...) -> List[Union[ChromiumElement, ChromiumFrame, str]]: ...
def befores(self,
filter_loc: Union[tuple, str] = ...,
timeout: float = ...) -> List[Union[ChromiumElement, str]]: ...
timeout: float = ...) -> List[Union[ChromiumElement, ChromiumFrame, str]]: ...