调整格式,无功能修改

This commit is contained in:
g1879 2021-08-11 11:31:12 +08:00
parent 19f8b14500
commit 94111d08a5
7 changed files with 600 additions and 621 deletions

View File

@ -27,14 +27,6 @@ class DrissionElement(object):
def inner_ele(self) -> Union[WebElement, HtmlElement]:
return self._inner_ele
@property
def is_valid(self):
return True
# @property
# def text(self):
# return
@property
def html(self):
return
@ -55,13 +47,9 @@ class DrissionElement(object):
def prev(self):
return
# @property
# def css_path(self):
# return
#
# @property
# def xpath(self):
# return
@property
def is_valid(self):
return True
@abstractmethod
def ele(self, loc: Union[tuple, str], mode: str = None):
@ -71,10 +59,6 @@ class DrissionElement(object):
def eles(self, loc: Union[tuple, str]):
pass
# @abstractmethod
# def attr(self, attr: str):
# pass
def str_to_loc(loc: str) -> tuple:
"""处理元素查找语句 \n

View File

@ -4,7 +4,6 @@
@Contact : g1879@qq.com
@File : driver_element.py
"""
from pathlib import Path
from re import sub
from time import sleep
@ -43,7 +42,12 @@ class DriverElement(DrissionElement):
"""
return self.ele(loc_or_str, mode, timeout)
# -----------------共有属性-------------------
# -----------------共有属性和方法-------------------
@property
def tag(self) -> str:
"""返回元素类型"""
return self._inner_ele.tag_name.lower()
@property
def html(self) -> str:
"""返回元素outerHTML文本"""
@ -55,9 +59,19 @@ class DriverElement(DrissionElement):
return self.attr('innerHTML')
@property
def tag(self) -> str:
"""返回元素类型"""
return self._inner_ele.tag_name.lower()
def parent(self):
"""返回父级元素"""
return self.parents()
@property
def next(self):
"""返回后一个兄弟元素"""
return self.nexts()
@property
def prev(self):
"""返回前一个兄弟元素"""
return self.prevs()
@property
def attrs(self) -> dict:
@ -108,73 +122,11 @@ class DriverElement(DrissionElement):
"""返回xpath路径"""
return self._get_ele_path('xpath')
@property
def parent(self):
"""返回父级元素"""
return self.parents()
@property
def next(self):
"""返回后一个兄弟元素"""
return self.nexts()
@property
def prev(self):
"""返回前一个兄弟元素"""
return self.prevs()
@property
def comments(self) -> list:
"""返回元素注释文本组成的列表"""
return self.eles('xpath:.//comment()')
# -----------------driver独占属性-------------------
@property
def size(self) -> dict:
"""返回元素宽和高"""
return self.inner_ele.size
@property
def location(self) -> dict:
"""返回元素左上角坐标"""
return self.inner_ele.location
@property
def shadow_root(self):
"""返回当前元素的shadow_root元素对象"""
shadow = self.run_script('return arguments[0].shadowRoot')
if shadow:
from .shadow_root_element import ShadowRootElement
return ShadowRootElement(shadow, self)
@property
def sr(self):
"""返回当前元素的shadow_root元素对象"""
return self.shadow_root
@property
def before(self) -> str:
"""返回当前元素的::before伪元素内容"""
return self.get_style_property('content', 'before')
@property
def after(self) -> str:
"""返回当前元素的::after伪元素内容"""
return self.get_style_property('content', 'after')
@property
def select(self):
"""返回专门处理下拉列表的Select类非下拉列表元素返回False"""
if self._select is None:
if self.tag != 'select':
self._select = False
else:
self._select = Select(self)
return self._select
# -----------------共有函数-------------------
def texts(self, text_node_only: bool = False) -> list:
"""返回元素内所有直接子节点的文本,包括元素和文本节点 \n
:param text_node_only: 是否只返回文本节点
@ -326,7 +278,128 @@ class DriverElement(DrissionElement):
"""
return self.ele(loc_or_str, mode='all', timeout=timeout)
# -----------------driver独占函数-------------------
def _get_ele_path(self, mode) -> str:
"""返获取css路径或xpath路径"""
if mode == 'xpath':
txt1 = 'var tag = el.nodeName.toLowerCase();'
# txt2 = '''return '//' + tag + '[@id="' + el.id + '"]' + path;'''
txt3 = ''' && sib.nodeName.toLowerCase()==tag'''
txt4 = '''
if(nth>1){path = '/' + tag + '[' + nth + ']' + path;}
else{path = '/' + tag + path;}'''
txt5 = '''return path;'''
elif mode == 'css':
txt1 = ''
# txt2 = '''return '#' + el.id + path;'''
txt3 = ''
txt4 = '''path = '>' + ":nth-child(" + nth + ")" + path;'''
txt5 = '''return path.substr(1);'''
else:
raise ValueError(f"Argument mode can only be 'xpath' or 'css', not '{mode}'.")
js = '''
function e(el) {
if (!(el instanceof Element)) return;
var path = '';
while (el.nodeType === Node.ELEMENT_NODE) {
''' + txt1 + '''
var sib = el, nth = 0;
while (sib) {
if(sib.nodeType === Node.ELEMENT_NODE''' + txt3 + '''){nth += 1;}
sib = sib.previousSibling;
}
''' + txt4 + '''
el = el.parentNode;
}
''' + txt5 + '''
}
return e(arguments[0]);
'''
return self.run_script(js)
def _get_brother(self, num: int = 1, mode: str = 'ele', direction: str = 'next'):
"""返回前面第num个兄弟节点或元素 \n
:param num: 前面第几个兄弟节点或元素
:param mode: 'ele', 'node' 'text'匹配元素节点或文本节点
:param direction: 'next' 'prev'查找的方向
:return: DriverElement对象或字符串
"""
# 查找节点的类型
if mode == 'ele':
node_txt = '*'
elif mode == 'node':
node_txt = 'node()'
elif mode == 'text':
node_txt = 'text()'
else:
raise ValueError(f"Argument mode can only be 'node' ,'ele' or 'text', not '{mode}'.")
# 查找节点的方向
if direction == 'next':
direction_txt = 'following'
elif direction == 'prev':
direction_txt = 'preceding'
else:
raise ValueError(f"Argument direction can only be 'next' or 'prev', not '{direction}'.")
timeout = 0 if direction == 'prev' else .5
# 获取节点
ele_or_node = self.ele(f'xpath:./{direction_txt}-sibling::{node_txt}[{num}]', timeout=timeout)
# 跳过元素间的换行符
while isinstance(ele_or_node, str) and ele_or_node.replace('\n', '').replace('\t', '').replace(' ', '') == '':
num += 1
ele_or_node = self.ele(f'xpath:./{direction_txt}-sibling::{node_txt}[{num}]', timeout=timeout)
return ele_or_node
# -----------------driver独有属性和方法-------------------
@property
def size(self) -> dict:
"""返回元素宽和高"""
return self.inner_ele.size
@property
def location(self) -> dict:
"""返回元素左上角坐标"""
return self.inner_ele.location
@property
def shadow_root(self):
"""返回当前元素的shadow_root元素对象"""
shadow = self.run_script('return arguments[0].shadowRoot')
if shadow:
from .shadow_root_element import ShadowRootElement
return ShadowRootElement(shadow, self)
@property
def sr(self):
"""返回当前元素的shadow_root元素对象"""
return self.shadow_root
@property
def before(self) -> str:
"""返回当前元素的::before伪元素内容"""
return self.get_style_property('content', 'before')
@property
def after(self) -> str:
"""返回当前元素的::after伪元素内容"""
return self.get_style_property('content', 'after')
@property
def select(self):
"""返回专门处理下拉列表的Select类非下拉列表元素返回False"""
if self._select is None:
if self.tag != 'select':
self._select = False
else:
self._select = Select(self)
return self._select
def get_style_property(self, style: str, pseudo_ele: str = '') -> str:
"""返回元素样式属性值
@ -604,85 +677,6 @@ class DriverElement(DrissionElement):
from selenium.webdriver import ActionChains
ActionChains(self.page.driver).move_to_element(self.inner_ele).perform()
# -----------------私有函数-------------------
def _get_ele_path(self, mode) -> str:
"""返获取css路径或xpath路径"""
if mode == 'xpath':
txt1 = 'var tag = el.nodeName.toLowerCase();'
# txt2 = '''return '//' + tag + '[@id="' + el.id + '"]' + path;'''
txt3 = ''' && sib.nodeName.toLowerCase()==tag'''
txt4 = '''
if(nth>1){path = '/' + tag + '[' + nth + ']' + path;}
else{path = '/' + tag + path;}'''
txt5 = '''return path;'''
elif mode == 'css':
txt1 = ''
# txt2 = '''return '#' + el.id + path;'''
txt3 = ''
txt4 = '''path = '>' + ":nth-child(" + nth + ")" + path;'''
txt5 = '''return path.substr(1);'''
else:
raise ValueError(f"Argument mode can only be 'xpath' or 'css', not '{mode}'.")
js = '''
function e(el) {
if (!(el instanceof Element)) return;
var path = '';
while (el.nodeType === Node.ELEMENT_NODE) {
''' + txt1 + '''
var sib = el, nth = 0;
while (sib) {
if(sib.nodeType === Node.ELEMENT_NODE''' + txt3 + '''){nth += 1;}
sib = sib.previousSibling;
}
''' + txt4 + '''
el = el.parentNode;
}
''' + txt5 + '''
}
return e(arguments[0]);
'''
return self.run_script(js)
def _get_brother(self, num: int = 1, mode: str = 'ele', direction: str = 'next'):
"""返回前面第num个兄弟节点或元素 \n
:param num: 前面第几个兄弟节点或元素
:param mode: 'ele', 'node' 'text'匹配元素节点或文本节点
:param direction: 'next' 'prev'查找的方向
:return: DriverElement对象或字符串
"""
# 查找节点的类型
if mode == 'ele':
node_txt = '*'
elif mode == 'node':
node_txt = 'node()'
elif mode == 'text':
node_txt = 'text()'
else:
raise ValueError(f"Argument mode can only be 'node' ,'ele' or 'text', not '{mode}'.")
# 查找节点的方向
if direction == 'next':
direction_txt = 'following'
elif direction == 'prev':
direction_txt = 'preceding'
else:
raise ValueError(f"Argument direction can only be 'next' or 'prev', not '{direction}'.")
timeout = 0 if direction == 'prev' else .5
# 获取节点
ele_or_node = self.ele(f'xpath:./{direction_txt}-sibling::{node_txt}[{num}]', timeout=timeout)
# 跳过元素间的换行符
while isinstance(ele_or_node, str) and ele_or_node.replace('\n', '').replace('\t', '').replace(' ', '') == '':
num += 1
ele_or_node = self.ele(f'xpath:./{direction_txt}-sibling::{node_txt}[{num}]', timeout=timeout)
return ele_or_node
def execute_driver_find(page_or_ele,
loc: Tuple[str, str],

View File

@ -39,10 +39,7 @@ class DriverPage(object):
timeout: float = None):
return self.ele(loc_or_str, mode, timeout)
@property
def driver(self) -> WebDriver:
return self._driver
# -----------------共有属性和方法-------------------
@property
def url(self) -> Union[str, None]:
"""返回当前网页url"""
@ -51,88 +48,25 @@ class DriverPage(object):
else:
return self.driver.current_url
@property
def title(self) -> str:
"""返回网页title"""
return self.driver.title
@property
def html(self) -> str:
"""返回页面html文本"""
return format_html(self.driver.find_element_by_xpath("//*").get_attribute("outerHTML"))
@property
def url_available(self) -> bool:
"""url有效性"""
return self._url_available
@property
def cookies(self) -> list:
"""返回当前网站cookies"""
return self.get_cookies(True)
@property
def title(self) -> str:
"""返回网页title"""
return self.driver.title
@property
def timeout(self) -> float:
"""返回查找元素时等待的秒数"""
return self._timeout
@timeout.setter
def timeout(self, second: float) -> None:
"""设置查找元素时等待的秒数"""
self._timeout = second
self._wait = None
@property
def wait_object(self) -> WebDriverWait:
"""返回WebDriverWait对象重用避免每次新建对象"""
if self._wait is None:
self._wait = WebDriverWait(self.driver, timeout=self.timeout)
return self._wait
def get_cookies(self, as_dict: bool = False) -> Union[list, dict]:
"""返回当前网站cookies"""
if as_dict:
return {cookie['name']: cookie['value'] for cookie in self.driver.get_cookies()}
else:
return self.driver.get_cookies()
def _try_to_connect(self,
to_url: str,
times: int = 0,
interval: float = 1,
show_errmsg: bool = False, ):
"""尝试连接,重试若干次 \n
:param to_url: 要访问的url
:param times: 重试次数
:param interval: 重试间隔
:param show_errmsg: 是否抛出异常
:return: 是否成功
"""
err = None
is_ok = False
for _ in range(times + 1):
try:
self.driver.get(to_url)
go_ok = True
except Exception as e:
err = e
go_ok = False
is_ok = self.check_page() if go_ok else False
if is_ok is not False:
break
if _ < times:
sleep(interval)
print(f'重试 {to_url}')
if is_ok is False and show_errmsg:
raise err if err is not None else ConnectionError('Connect error.')
return is_ok
def url_available(self) -> bool:
"""url有效性"""
return self._url_available
def get(self,
url: str,
@ -262,7 +196,97 @@ class DriverPage(object):
return self.ele(loc_or_str, mode='all', timeout=timeout)
# ----------------以下为独有函数-----------------------
def get_cookies(self, as_dict: bool = False) -> Union[list, dict]:
"""返回当前网站cookies"""
if as_dict:
return {cookie['name']: cookie['value'] for cookie in self.driver.get_cookies()}
else:
return self.driver.get_cookies()
def _try_to_connect(self,
to_url: str,
times: int = 0,
interval: float = 1,
show_errmsg: bool = False, ):
"""尝试连接,重试若干次 \n
:param to_url: 要访问的url
:param times: 重试次数
:param interval: 重试间隔
:param show_errmsg: 是否抛出异常
:return: 是否成功
"""
err = None
is_ok = False
for _ in range(times + 1):
try:
self.driver.get(to_url)
go_ok = True
except Exception as e:
err = e
go_ok = False
is_ok = self.check_page() if go_ok else False
if is_ok is not False:
break
if _ < times:
sleep(interval)
print(f'重试 {to_url}')
if is_ok is False and show_errmsg:
raise err if err is not None else ConnectionError('Connect error.')
return is_ok
# ----------------driver独有属性和方法-----------------------
@property
def driver(self) -> WebDriver:
return self._driver
@property
def timeout(self) -> float:
"""返回查找元素时等待的秒数"""
return self._timeout
@timeout.setter
def timeout(self, second: float) -> None:
"""设置查找元素时等待的秒数"""
self._timeout = second
self._wait = None
@property
def wait_object(self) -> WebDriverWait:
"""返回WebDriverWait对象重用避免每次新建对象"""
if self._wait is None:
self._wait = WebDriverWait(self.driver, timeout=self.timeout)
return self._wait
@property
def tabs_count(self) -> int:
"""返回标签页数量"""
try:
return len(self.driver.window_handles)
except:
return 0
@property
def tab_handles(self) -> list:
"""返回所有标签页handle列表"""
return self.driver.window_handles
@property
def current_tab_num(self) -> int:
"""返回当前标签页序号"""
return self.driver.window_handles.index(self.driver.current_window_handle)
@property
def current_tab_handle(self) -> str:
"""返回当前标签页handle"""
return self.driver.current_window_handle
def wait_ele(self,
loc_or_ele: Union[str, tuple, DriverElement, WebElement],
mode: str,
@ -348,29 +372,6 @@ class DriverPage(object):
"""
return self.driver.execute_script(script, *args)
@property
def tabs_count(self) -> int:
"""返回标签页数量"""
try:
return len(self.driver.window_handles)
except:
return 0
@property
def tab_handles(self) -> list:
"""返回所有标签页handle列表"""
return self.driver.window_handles
@property
def current_tab_num(self) -> int:
"""返回当前标签页序号"""
return self.driver.window_handles.index(self.driver.current_window_handle)
@property
def current_tab_handle(self) -> str:
"""返回当前标签页handle"""
return self.driver.current_window_handle
def create_tab(self, url: str = '') -> None:
"""新建并定位到一个标签页,该标签页在最后面 \n
:param url: 新标签页跳转到的网址

View File

@ -76,6 +76,7 @@ class MixPage(Null, SessionPage, DriverPage):
timeout: float = None):
return self.ele(loc_or_str, mode, timeout)
# -----------------共有属性和方法-------------------
@property
def url(self) -> Union[str, None]:
"""返回当前url"""
@ -85,47 +86,12 @@ class MixPage(Null, SessionPage, DriverPage):
return self._session_url
@property
def _session_url(self) -> str:
"""返回session保存的url"""
return self._response.url if self._response else None
@property
def mode(self) -> str:
"""返回当前模式,'s''d' """
return self._mode
@property
def drission(self) -> Drission:
"""返回当前使用的Dirssion对象"""
return self._drission
@property
def driver(self) -> WebDriver:
"""返回driver对象如没有则创建 \n
每次访问时切换到d模式用于独有函数及外部调用
:return: WebDriver对象
"""
self.change_mode('d')
return self._drission.driver
@property
def session(self) -> Session:
"""返回Session对象如没有则创建"""
return self._drission.session
@property
def response(self) -> Response:
"""返回s模式获取到的Response对象切换到s模式"""
self.change_mode('s')
return self._response
@property
def cookies(self) -> Union[dict, list]:
"""返回cookies"""
def title(self) -> str:
"""返回网页title"""
if self._mode == 's':
return super().cookies
return super().title
elif self._mode == 'd':
return super(SessionPage, self).cookies
return super(SessionPage, self).title
@property
def html(self) -> str:
@ -136,193 +102,12 @@ class MixPage(Null, SessionPage, DriverPage):
return super(SessionPage, self).html
@property
def title(self) -> str:
"""返回网页title"""
def cookies(self) -> Union[dict, list]:
"""返回cookies"""
if self._mode == 's':
return super().title
return super().cookies
elif self._mode == 'd':
return super(SessionPage, self).title
def set_cookies(self, cookies: Union[RequestsCookieJar, list, tuple, str, dict], refresh: bool = True) -> None:
"""设置cookies \n
:param cookies: cookies信息可为CookieJar, list, tuple, str, dict
:param refresh: 设置cookies后是否刷新页面
:return: None
"""
if self._mode == 's':
self.drission.set_cookies(cookies, set_session=True)
elif self._mode == 'd':
self.drission.set_cookies(cookies, set_driver=True)
if refresh:
self.refresh()
def get_cookies(self, as_dict: bool = False, all_domains: bool = False) -> Union[dict, list]:
"""返回cookies \n
:param as_dict: 是否以字典方式返回
:param all_domains: 是否返回所有域的cookies
:return: cookies信息
"""
if self._mode == 's':
return super().get_cookies(as_dict, all_domains)
elif self._mode == 'd':
return super(SessionPage, self).get_cookies(as_dict)
def change_mode(self, mode: str = None, go: bool = True) -> None:
"""切换模式,接收's''d'除此以外的字符串会切换为d模式 \n
切换时会把当前模式的cookies复制到目标模式 \n
切换后如果go是True调用相应的get函数使访问的页面同步 \n
:param mode: 模式字符串
:param go: 是否跳转到原模式的url
"""
if mode is not None and mode.lower() == self._mode:
return
self._mode = 's' if self._mode == 'd' else 'd'
# s模式转d模式
if self._mode == 'd':
self._driver = True
self._url = None if not self._driver else self._drission.driver.current_url
if self._session_url:
self.cookies_to_driver(self._session_url)
if go:
self.get(self._session_url)
# d模式转s模式
elif self._mode == 's':
self._session = True
self._url = self._session_url
if self._driver:
self.cookies_to_session()
if go and self._drission.driver.current_url.startswith('http'):
self.get(self._drission.driver.current_url)
def cookies_to_session(self, copy_user_agent: bool = False) -> None:
"""从driver复制cookies到session \n
:param copy_user_agent : 是否复制user agent信息
"""
self._drission.cookies_to_session(copy_user_agent)
def cookies_to_driver(self, url=None) -> None:
"""从session复制cookies到driver \n
chrome需要指定域才能接收cookies \n
:param url: 目标域
:return: None
"""
url = url or self._session_url
self._drission.cookies_to_driver(url)
def check_page(self, by_requests: bool = False) -> Union[bool, None]:
"""d模式时检查网页是否符合预期 \n
默认由response状态检查可重载实现针对性检查 \n
:param by_requests: 是否用内置response检查
:return: bool或NoneNone代表不知道结果
"""
if self._session_url and self._session_url == self.url:
return self._response.ok
# 使用requests访问url并判断可用性
if by_requests:
self.cookies_to_session()
r = self._make_response(self.url, **{'timeout': 3})[0]
return r.ok if r else False
# ----------------重写SessionPage的函数-----------------------
def post(self,
url: str,
data: dict = None,
go_anyway: bool = False,
show_errmsg: bool = False,
retry: int = None,
interval: float = None,
**kwargs) -> Union[bool, None]:
"""用post方式跳转到url会切换到s模式 \n
:param url: 目标url
:param data: post方式时提交的数据
:param go_anyway: 若目标url与当前url一致是否强制跳转
:param show_errmsg: 是否显示和抛出异常
:param retry: 重试次数
:param interval: 重试间隔
:param kwargs: 连接参数
:return: url是否可用
"""
self.change_mode('s', go=False)
return super().post(url, data, go_anyway, show_errmsg, retry, interval, **kwargs)
def download(self,
file_url: str,
goal_path: str = None,
rename: str = None,
file_exists: str = 'rename',
post_data: dict = None,
show_msg: bool = False,
show_errmsg: bool = False,
retry: int = None,
interval: float = None,
**kwargs) -> Tuple[bool, str]:
"""下载一个文件 \n
d模式下下载前先同步cookies \n
:param file_url: 文件url
:param goal_path: 存放路径默认为ini文件中指定的临时文件夹
:param rename: 重命名文件可不写扩展名
:param file_exists: 若存在同名文件可选择 'rename', 'overwrite', 'skip' 方式处理
:param post_data: post方式的数据
:param show_msg: 是否显示下载信息
:param show_errmsg: 是否显示和抛出异常
:param retry: 重试次数
:param interval: 重试间隔时间
:param kwargs: 连接参数
:return: 下载是否成功bool和状态信息成功时信息为文件路径的元组
"""
if self.mode == 'd':
self.cookies_to_session()
return super().download(file_url, goal_path, rename, file_exists, post_data, show_msg, show_errmsg, retry,
interval, **kwargs)
# ----------------重写DriverPage的函数-----------------------
def chrome_downloading(self, download_path: str = None) -> list:
"""返回浏览器下载中的文件列表 \n
:param download_path: 下载文件夹路径默认读取配置信息
:return: 正在下载的文件列表
"""
try:
path = download_path or self._drission.driver_options['experimental_options']['prefs'][
'download.default_directory']
if not path:
raise
except:
raise IOError('Download path not found.')
return super().chrome_downloading(path)
# ----------------以下为共用函数-----------------------
def _try_to_connect(self,
to_url: str,
times: int = 0,
interval: float = 1,
mode: str = 'get',
data: dict = None,
show_errmsg: bool = False,
**kwargs):
"""尝试连接,重试若干次 \n
:param to_url: 要访问的url
:param times: 重试次数
:param interval: 重试间隔
:param show_errmsg: 是否抛出异常
:param kwargs: 连接参数
:return: s模式为Response对象d模式为bool
"""
if self._mode == 'd':
return super(SessionPage, self)._try_to_connect(to_url, times, interval, show_errmsg)
elif self._mode == 's':
return super()._try_to_connect(to_url, times, interval, mode, data, show_errmsg, **kwargs)
return super(SessionPage, self).cookies
def get(self,
url: str,
@ -437,6 +222,151 @@ class MixPage(Null, SessionPage, DriverPage):
elif self._mode == 'd':
return super(SessionPage, self).eles(loc_or_str, timeout=timeout)
def get_cookies(self, as_dict: bool = False, all_domains: bool = False) -> Union[dict, list]:
"""返回cookies \n
:param as_dict: 是否以字典方式返回
:param all_domains: 是否返回所有域的cookies
:return: cookies信息
"""
if self._mode == 's':
return super().get_cookies(as_dict, all_domains)
elif self._mode == 'd':
return super(SessionPage, self).get_cookies(as_dict)
def _try_to_connect(self,
to_url: str,
times: int = 0,
interval: float = 1,
mode: str = 'get',
data: dict = None,
show_errmsg: bool = False,
**kwargs):
"""尝试连接,重试若干次 \n
:param to_url: 要访问的url
:param times: 重试次数
:param interval: 重试间隔
:param show_errmsg: 是否抛出异常
:param kwargs: 连接参数
:return: s模式为Response对象d模式为bool
"""
if self._mode == 'd':
return super(SessionPage, self)._try_to_connect(to_url, times, interval, show_errmsg)
elif self._mode == 's':
return super()._try_to_connect(to_url, times, interval, mode, data, show_errmsg, **kwargs)
# ----------------MixPage独有属性和方法-----------------------
@property
def drission(self) -> Drission:
"""返回当前使用的Dirssion对象"""
return self._drission
@property
def driver(self) -> WebDriver:
"""返回driver对象如没有则创建 \n
每次访问时切换到d模式用于独有函数及外部调用
:return: WebDriver对象
"""
self.change_mode('d')
return self._drission.driver
@property
def session(self) -> Session:
"""返回Session对象如没有则创建"""
return self._drission.session
@property
def response(self) -> Response:
"""返回s模式获取到的Response对象切换到s模式"""
self.change_mode('s')
return self._response
@property
def mode(self) -> str:
"""返回当前模式,'s''d' """
return self._mode
@property
def _session_url(self) -> str:
"""返回session保存的url"""
return self._response.url if self._response else None
def change_mode(self, mode: str = None, go: bool = True) -> None:
"""切换模式,接收's''d'除此以外的字符串会切换为d模式 \n
切换时会把当前模式的cookies复制到目标模式 \n
切换后如果go是True调用相应的get函数使访问的页面同步 \n
:param mode: 模式字符串
:param go: 是否跳转到原模式的url
"""
if mode is not None and mode.lower() == self._mode:
return
self._mode = 's' if self._mode == 'd' else 'd'
# s模式转d模式
if self._mode == 'd':
self._driver = True
self._url = None if not self._driver else self._drission.driver.current_url
if self._session_url:
self.cookies_to_driver(self._session_url)
if go:
self.get(self._session_url)
# d模式转s模式
elif self._mode == 's':
self._session = True
self._url = self._session_url
if self._driver:
self.cookies_to_session()
if go and self._drission.driver.current_url.startswith('http'):
self.get(self._drission.driver.current_url)
def set_cookies(self, cookies: Union[RequestsCookieJar, list, tuple, str, dict], refresh: bool = True) -> None:
"""设置cookies \n
:param cookies: cookies信息可为CookieJar, list, tuple, str, dict
:param refresh: 设置cookies后是否刷新页面
:return: None
"""
if self._mode == 's':
self.drission.set_cookies(cookies, set_session=True)
elif self._mode == 'd':
self.drission.set_cookies(cookies, set_driver=True)
if refresh:
self.refresh()
def cookies_to_session(self, copy_user_agent: bool = False) -> None:
"""从driver复制cookies到session \n
:param copy_user_agent : 是否复制user agent信息
"""
self._drission.cookies_to_session(copy_user_agent)
def cookies_to_driver(self, url=None) -> None:
"""从session复制cookies到driver \n
chrome需要指定域才能接收cookies \n
:param url: 目标域
:return: None
"""
url = url or self._session_url
self._drission.cookies_to_driver(url)
def check_page(self, by_requests: bool = False) -> Union[bool, None]:
"""d模式时检查网页是否符合预期 \n
默认由response状态检查可重载实现针对性检查 \n
:param by_requests: 是否用内置response检查
:return: bool或NoneNone代表不知道结果
"""
if self._session_url and self._session_url == self.url:
return self._response.ok
# 使用requests访问url并判断可用性
if by_requests:
self.cookies_to_session()
r = self._make_response(self.url, **{'timeout': 3})[0]
return r.ok if r else False
def close_driver(self) -> None:
"""关闭driver及浏览器"""
self._driver = None
@ -447,3 +377,71 @@ class MixPage(Null, SessionPage, DriverPage):
self._session = None
self._response = None
self.drission.close_session()
# ----------------重写SessionPage的函数-----------------------
def post(self,
url: str,
data: dict = None,
go_anyway: bool = False,
show_errmsg: bool = False,
retry: int = None,
interval: float = None,
**kwargs) -> Union[bool, None]:
"""用post方式跳转到url会切换到s模式 \n
:param url: 目标url
:param data: post方式时提交的数据
:param go_anyway: 若目标url与当前url一致是否强制跳转
:param show_errmsg: 是否显示和抛出异常
:param retry: 重试次数
:param interval: 重试间隔
:param kwargs: 连接参数
:return: url是否可用
"""
self.change_mode('s', go=False)
return super().post(url, data, go_anyway, show_errmsg, retry, interval, **kwargs)
def download(self,
file_url: str,
goal_path: str = None,
rename: str = None,
file_exists: str = 'rename',
post_data: dict = None,
show_msg: bool = False,
show_errmsg: bool = False,
retry: int = None,
interval: float = None,
**kwargs) -> Tuple[bool, str]:
"""下载一个文件 \n
d模式下下载前先同步cookies \n
:param file_url: 文件url
:param goal_path: 存放路径默认为ini文件中指定的临时文件夹
:param rename: 重命名文件可不写扩展名
:param file_exists: 若存在同名文件可选择 'rename', 'overwrite', 'skip' 方式处理
:param post_data: post方式的数据
:param show_msg: 是否显示下载信息
:param show_errmsg: 是否显示和抛出异常
:param retry: 重试次数
:param interval: 重试间隔时间
:param kwargs: 连接参数
:return: 下载是否成功bool和状态信息成功时信息为文件路径的元组
"""
if self.mode == 'd':
self.cookies_to_session()
return super().download(file_url, goal_path, rename, file_exists, post_data, show_msg, show_errmsg, retry,
interval, **kwargs)
# ----------------重写DriverPage的函数-----------------------
def chrome_downloading(self, download_path: str = None) -> list:
"""返回浏览器下载中的文件列表 \n
:param download_path: 下载文件夹路径默认读取配置信息
:return: 正在下载的文件列表
"""
try:
path = download_path or self._drission.driver_options['experimental_options']['prefs'][
'download.default_directory']
if not path:
raise
except:
raise IOError('Download path not found.')
return super().chrome_downloading(path)

View File

@ -33,6 +33,11 @@ class SessionElement(DrissionElement):
"""
return self.ele(loc_or_str, mode)
@property
def tag(self) -> str:
"""返回元素类型"""
return self._inner_ele.tag
@property
def html(self) -> str:
"""返回元素outerHTML文本"""
@ -45,6 +50,26 @@ class SessionElement(DrissionElement):
r = match(r'<.*?>(.*)</.*?>', self.html, flags=DOTALL)
return '' if not r else r.group(1)
@property
def parent(self):
"""返回父级元素"""
return self.parents()
@property
def next(self):
"""返回后一个兄弟元素"""
return self.nexts()
@property
def prev(self):
"""返回前一个兄弟元素"""
return self.prevs()
@property
def attrs(self) -> dict:
"""返回元素所有属性及值"""
return {attr: self.attr(attr) for attr, val in self.inner_ele.items()}
@property
def text(self) -> str:
"""返回元素内所有文本"""
@ -87,16 +112,6 @@ class SessionElement(DrissionElement):
"""返回未格式化处理的元素内文本"""
return str(self._inner_ele.text_content())
@property
def tag(self) -> str:
"""返回元素类型"""
return self._inner_ele.tag
@property
def attrs(self) -> dict:
"""返回元素所有属性及值"""
return {attr: self.attr(attr) for attr, val in self.inner_ele.items()}
@property
def link(self) -> str:
"""返回href或src绝对url"""
@ -112,21 +127,6 @@ class SessionElement(DrissionElement):
"""返回xpath路径"""
return self._get_ele_path('xpath')
@property
def parent(self):
"""返回父级元素"""
return self.parents()
@property
def next(self):
"""返回后一个兄弟元素"""
return self.nexts()
@property
def prev(self):
"""返回前一个兄弟元素"""
return self.prevs()
@property
def comments(self) -> list:
"""返回元素注释文本组成的列表"""
@ -298,30 +298,6 @@ class SessionElement(DrissionElement):
"""
return self.ele(loc_or_str, mode='all')
# -----------------私有函数-------------------
def _make_absolute(self, link) -> str:
"""获取绝对url
:param link: 超链接
:return: 绝对链接
"""
if not link:
return link
parsed = urlparse(link)._asdict()
# 相对路径与页面url拼接并返回
if not parsed['netloc']: # 相对路径,与
return urljoin(self.page.url, link)
# 绝对路径但缺少协议从页面url获取协议并修复
if not parsed['scheme']:
parsed['scheme'] = urlparse(self.page.url).scheme
parsed = tuple(v for v in parsed.values())
return urlunparse(parsed)
# 绝对路径且不缺协议,直接返回
return link
def _get_ele_path(self, mode) -> str:
"""获取css路径或xpath路径
:param mode: 'css' 'xpath'
@ -377,6 +353,30 @@ class SessionElement(DrissionElement):
return ele_or_node
# ----------------session独有方法-----------------------
def _make_absolute(self, link) -> str:
"""获取绝对url
:param link: 超链接
:return: 绝对链接
"""
if not link:
return link
parsed = urlparse(link)._asdict()
# 相对路径与页面url拼接并返回
if not parsed['netloc']: # 相对路径,与
return urljoin(self.page.url, link)
# 绝对路径但缺少协议从页面url获取协议并修复
if not parsed['scheme']:
parsed['scheme'] = urlparse(self.page.url).scheme
parsed = tuple(v for v in parsed.values())
return urlunparse(parsed)
# 绝对路径且不缺协议,直接返回
return link
def execute_session_find(page_or_ele,
loc: Tuple[str, str],

View File

@ -40,31 +40,12 @@ class SessionPage(object):
timeout: float = None):
return self.ele(loc_or_str, mode)
@property
def session(self) -> Session:
"""返回session对象"""
return self._session
@property
def response(self) -> Response:
"""返回访问url得到的response对象"""
return self._response
# -----------------共有属性和方法-------------------
@property
def url(self) -> str:
"""返回当前访问url"""
return self._url
@property
def url_available(self) -> bool:
"""返回当前访问的url有效性"""
return self._url_available
@property
def cookies(self) -> dict:
"""返回session的cookies"""
return self.get_cookies(True)
@property
def title(self) -> str:
"""返回网页title"""
@ -75,26 +56,56 @@ class SessionPage(object):
"""返回页面html文本"""
return format_html(self.response.text) if self.response else ''
def get_cookies(self, as_dict: bool = False, all_domains: bool = False) -> Union[dict, list]:
"""返回cookies \n
:param as_dict: 是否以字典方式返回
:param all_domains: 是否返回所有域的cookies
:return: cookies信息
"""
if all_domains:
cookies = self.session.cookies
else:
if self.url:
url = extract(self.url)
domain = f'{url.domain}.{url.suffix}'
cookies = tuple(x for x in self.session.cookies if domain in x.domain or x.domain == '')
else:
cookies = tuple(x for x in self.session.cookies)
@property
def cookies(self) -> dict:
"""返回session的cookies"""
return self.get_cookies(True)
@property
def url_available(self) -> bool:
"""返回当前访问的url有效性"""
return self._url_available
def get(self,
url: str,
go_anyway: bool = False,
show_errmsg: bool = False,
retry: int = None,
interval: float = None,
**kwargs) -> Union[bool, None]:
"""用get方式跳转到url \n
:param url: 目标url
:param go_anyway: 若目标url与当前url一致是否强制跳转
:param show_errmsg: 是否显示和抛出异常
:param retry: 重试次数
:param interval: 重试间隔
:param kwargs: 连接参数
:return: url是否可用
"""
to_url = quote(url, safe='/:&?=%;#@+!')
retry = int(retry) if retry is not None else int(self.retry_times)
interval = int(interval) if interval is not None else int(self.retry_interval)
if not url or (not go_anyway and self.url == to_url):
return
self._url = to_url
self._response = self._try_to_connect(to_url, times=retry, interval=interval, show_errmsg=show_errmsg, **kwargs)
if self._response is None:
self._url_available = False
if as_dict:
return {x.name: x.value for x in cookies}
else:
return [_cookie_to_dict(cookie) for cookie in cookies]
if self._response.ok:
self._url_available = True
else:
if show_errmsg:
raise ConnectionError(f'{to_url}\nStatus code: {self._response.status_code}.')
self._url_available = False
return self._url_available
def ele(self,
loc_or_ele: Union[Tuple[str, str], str, SessionElement],
@ -183,6 +194,27 @@ class SessionPage(object):
return self.ele(loc_or_str, mode='all')
def get_cookies(self, as_dict: bool = False, all_domains: bool = False) -> Union[dict, list]:
"""返回cookies \n
:param as_dict: 是否以字典方式返回
:param all_domains: 是否返回所有域的cookies
:return: cookies信息
"""
if all_domains:
cookies = self.session.cookies
else:
if self.url:
url = extract(self.url)
domain = f'{url.domain}.{url.suffix}'
cookies = tuple(x for x in self.session.cookies if domain in x.domain or x.domain == '')
else:
cookies = tuple(x for x in self.session.cookies)
if as_dict:
return {x.name: x.value for x in cookies}
else:
return [_cookie_to_dict(cookie) for cookie in cookies]
def _try_to_connect(self,
to_url: str,
times: int = 0,
@ -223,46 +255,16 @@ class SessionPage(object):
return r
def get(self,
url: str,
go_anyway: bool = False,
show_errmsg: bool = False,
retry: int = None,
interval: float = None,
**kwargs) -> Union[bool, None]:
"""用get方式跳转到url \n
:param url: 目标url
:param go_anyway: 若目标url与当前url一致是否强制跳转
:param show_errmsg: 是否显示和抛出异常
:param retry: 重试次数
:param interval: 重试间隔
:param kwargs: 连接参数
:return: url是否可用
"""
to_url = quote(url, safe='/:&?=%;#@+!')
retry = int(retry) if retry is not None else int(self.retry_times)
interval = int(interval) if interval is not None else int(self.retry_interval)
# ----------------session独有属性和方法-----------------------
@property
def session(self) -> Session:
"""返回session对象"""
return self._session
if not url or (not go_anyway and self.url == to_url):
return
self._url = to_url
self._response = self._try_to_connect(to_url, times=retry, interval=interval, show_errmsg=show_errmsg, **kwargs)
if self._response is None:
self._url_available = False
else:
if self._response.ok:
self._url_available = True
else:
if show_errmsg:
raise ConnectionError(f'{to_url}\nStatus code: {self._response.status_code}.')
self._url_available = False
return self._url_available
@property
def response(self) -> Response:
"""返回访问url得到的response对象"""
return self._response
def post(self,
url: str,

View File

@ -1,4 +1,3 @@
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from re import split as re_SPLIT
from typing import Union, Any, Tuple
@ -45,6 +44,11 @@ class ShadowRootElement(DrissionElement):
"""shadow-root所依赖的父元素"""
return self.parent_ele
@property
def next(self):
"""返回后一个兄弟元素"""
return self.nexts()
def parents(self, num: int = 1):
"""返回上面第num级父元素 \n
:param num: 第几级父元素
@ -53,11 +57,6 @@ class ShadowRootElement(DrissionElement):
loc = 'xpath', f'.{"/.." * (num - 1)}'
return self.parent_ele.ele(loc, timeout=0.1)
@property
def next(self):
"""返回后一个兄弟元素"""
return self.nexts()
def nexts(self, num: int = 1):
"""返回后面第num个兄弟元素 \n
:param num: 后面第几个兄弟元素
@ -175,6 +174,7 @@ class ShadowRootElement(DrissionElement):
except:
return False
# ----------------ShadowRootElement独有方法-----------------------
def _find_eles_by_text(self, text: str, tag: str = '', match: str = 'exact', mode: str = 'single'):
"""根据文本获取页面元素 \n
:param text: 文本字符串