Merge pull request !3 from g1879/dev
This commit is contained in:
g1879 2020-09-07 10:01:43 +08:00 committed by Gitee
commit 224c4642a6
6 changed files with 402 additions and 56 deletions

View File

@ -28,9 +28,9 @@ class DrissionElement(object):
def is_valid(self):
return True
@property
def text(self):
return
# @property
# def text(self):
# return
@property
def html(self):
@ -52,13 +52,13 @@ class DrissionElement(object):
def prev(self):
return
@property
def css_path(self):
return
@property
def xpath(self):
return
# @property
# def css_path(self):
# return
#
# @property
# def xpath(self):
# return
@abstractmethod
def ele(self, loc: Union[tuple, str], mode: str = None, show_errmsg: bool = True):
@ -68,9 +68,9 @@ class DrissionElement(object):
def eles(self, loc: Union[tuple, str], show_errmsg: bool = True):
pass
@abstractmethod
def attr(self, attr: str):
pass
# @abstractmethod
# def attr(self, attr: str):
# pass
def get_loc_from_str(loc: str) -> tuple:

View File

@ -123,6 +123,15 @@ class DriverElement(DrissionElement):
'''
return self.run_script(js)
@property
def shadow_root(self):
e = self.run_script('return arguments[0].shadowRoot')
if e:
from .shadow_root_element import ShadowRootElement
return ShadowRootElement(e, self)
else:
return None
@property
def parent(self):
"""返回父级元素"""
@ -199,7 +208,7 @@ class DriverElement(DrissionElement):
ele.ele('xpath://div[@class="ele_class"]') - 返回第一个符合xpath的子元素 \n
ele.ele('css:div.ele_class') - 返回第一个符合css selector的子元素 \n
:param loc_or_str: 元素的定位信息可以是loc元组或查询字符串
:param mode: 'single' 'all对应查找一个或全部
:param mode: 'single' 'all'对应查找一个或全部
:param timeout: 查找元素超时时间
:param show_errmsg: 出现异常时是否打印信息
:return: DriverElement对象

View File

@ -6,7 +6,7 @@
"""
from glob import glob
from pathlib import Path
from time import time
from time import time, sleep
from typing import Union, List, Any
from urllib.parse import quote
@ -60,21 +60,49 @@ class DriverPage(object):
"""返回网页title"""
return self.driver.title
def get(self, url: str, go_anyway: bool = False, show_errmsg: bool = False) -> Union[None, bool]:
def _try_to_get(self,
to_url: str,
times: int = 0,
interval: float = 1,
show_errmsg: bool = False, ):
"""尝试连接,重试若干次 \n
:param to_url: 要访问的url
:param times: 重试次数
:param interval: 重试间隔
:param show_errmsg: 是否抛出异常
:return: 是否成功
"""
self.driver.get(to_url)
is_ok = self.check_page()
while times and is_ok is False:
sleep(interval)
self.driver.get(to_url)
is_ok = self.check_page()
times -= 1
if is_ok is False and show_errmsg:
raise ConnectionError('Connect error.')
return is_ok
def get(self,
url: str,
go_anyway: bool = False,
show_errmsg: bool = False,
retry: int = 0,
interval: float = 1,
) -> Union[None, bool]:
"""访问url \n
:param url: 目标url
:param go_anyway: 若目标url与当前url一致是否强制跳转
:param show_errmsg: 是否显示和抛出异常
:param retry: 重试次数
:param interval: 重试间隔
:return: 目标url是否可用
"""
to_url = quote(url, safe='/:&?=%;#@')
if not url or (not go_anyway and self.url == to_url):
return
self._url = to_url
self.driver.get(to_url)
self._url_available = self.check_page()
if self._url_available is False and show_errmsg:
raise ConnectionError('Connect error.')
self._url_available = self._try_to_get(to_url, times=retry, interval=interval, show_errmsg=show_errmsg)
return self._url_available
def ele(self,

View File

@ -11,6 +11,7 @@ from requests_html import HTMLSession, Element
from selenium.webdriver.chrome.webdriver import WebDriver
from selenium.webdriver.remote.webelement import WebElement
from .config import DriverOptions
from .drission import Drission
from .driver_element import DriverElement
from .driver_page import DriverPage
@ -32,16 +33,23 @@ class MixPage(Null, SessionPage, DriverPage):
这些功能由DriverPage和SessionPage类实现
"""
def __init__(self, drission: Union[Drission, str] = None, mode: str = 'd', timeout: float = 10):
"""初始化函数 \n
def __init__(self,
drission: Union[Drission, str] = None,
mode: str = 'd',
timeout: float = 10,
driver_options: Union[dict, DriverOptions] = None,
session_options: dict = None):
"""初始化函数 \n
:param drission: 整合了driver和session的类传入's''d'时快速配置相应模式
:param mode: 默认使用selenium的d模式
:param driver_options: 浏览器设置没有传入drission参数时会用这个设置新建Drission对象
:param session_options: requests设置没有传入drission参数时会用这个设置新建Drission对象
"""
super().__init__()
if drission in ['s', 'd', 'S', 'D']:
mode = drission.lower()
drission = None
self._drission = drission or Drission()
self._drission = drission or Drission(driver_options, session_options)
self._session = None
self._driver = None
self._url = None
@ -227,20 +235,46 @@ class MixPage(Null, SessionPage, DriverPage):
return super().chrome_downloading(path)
# ----------------以下为共用函数-----------------------
def _try_to_get(self,
to_url: str,
times: int = 0,
interval: float = 1,
show_errmsg: bool = False,
**kwargs):
"""尝试连接,重试若干次 \n
:param to_url: 要访问的url
:param times: 重试次数
:param interval: 重试间隔
:param show_errmsg: 是否抛出异常
:param kwargs: 连接参数
:return: s模式为HTMLResponse对象d模式为bool
"""
if self._mode == 'd':
return super(SessionPage, self)._try_to_get(to_url, times, interval, show_errmsg)
elif self._mode == 's':
return super()._try_to_get(to_url, times, interval, show_errmsg, **kwargs)
def get(self, url: str, go_anyway=False, show_errmsg: bool = False, **kwargs) -> Union[bool, None]:
def get(self,
url: str,
go_anyway=False,
show_errmsg: bool = False,
retry: int = 2,
interval: float = 1,
**kwargs) -> Union[bool, None]:
"""跳转到一个url \n
跳转前先同步cookies跳转后判断目标url是否可用
:param url: 目标url
:param go_anyway: 若目标url与当前url一致是否强制跳转
:param show_errmsg: 是否显示和抛出异常
:param retry: 重试次数
:param interval: 重试间隔
:param kwargs: 连接参数s模式专用
:return: url是否可用
"""
if self._mode == 'd':
return super(SessionPage, self).get(url, go_anyway, show_errmsg)
return super(SessionPage, self).get(url, go_anyway, show_errmsg, retry, interval)
elif self._mode == 's':
return super().get(url, go_anyway, show_errmsg, **kwargs)
return super().get(url, go_anyway, show_errmsg, retry, interval, **kwargs)
def ele(self,
loc_or_ele: Union[tuple, str, DriverElement, SessionElement, Element, WebElement],

View File

@ -9,7 +9,7 @@ from pathlib import Path
from random import randint
from re import search as re_SEARCH
from re import sub as re_SUB
from time import time
from time import time, sleep
from typing import Union, List
from urllib.parse import urlparse, quote
@ -142,30 +142,60 @@ class SessionPage(object):
raise TypeError('Type of loc_or_str can only be tuple or str.')
return self.ele(loc_or_str, mode='all', show_errmsg=True)
def _try_to_get(self,
to_url: str,
times: int = 0,
interval: float = 1,
show_errmsg: bool = False,
**kwargs) -> HTMLResponse:
"""尝试连接,重试若干次 \n
:param to_url: 要访问的url
:param times: 重试次数
:param interval: 重试间隔
:param show_errmsg: 是否抛出异常
:param kwargs: 连接参数
:return: HTMLResponse对象
"""
r = self._make_response(to_url, show_errmsg=show_errmsg, **kwargs)[0]
while times and (not r or r.content == b''):
if r is not None and r.status_code in (403, 404):
break
print('重试', to_url)
sleep(interval)
r = self._make_response(to_url, show_errmsg=show_errmsg, **kwargs)[0]
times -= 1
return r
def get(self,
url: str,
go_anyway: bool = False,
show_errmsg: bool = False,
retry: int = 0,
interval: float = 1,
**kwargs) -> Union[bool, None]:
"""用get方式跳转到url \n
:param url: 目标url
:param go_anyway: 若目标url与当前url一致是否强制跳转
:param show_errmsg: 是否显示和抛出异常
:param retry: 重试次数
:param interval: 重试间隔
:param kwargs: 连接参数
:return: url是否可用
"""
to_url = quote(url, safe='/:&?=%;#@')
to_url = quote(url, safe='/:&?=%;#@+')
if not url or (not go_anyway and self.url == to_url):
return
self._url = to_url
self._response = self._make_response(to_url, show_errmsg=show_errmsg, **kwargs)[0]
self._response = self._try_to_get(to_url, times=retry, interval=interval, show_errmsg=show_errmsg, **kwargs)
if self._response is None:
self._url_available = False
else:
try:
self._response.html.encoding = self._response.encoding # 修复requests_html丢失编码方式的bug
except:
pass
stream = tuple(x for x in kwargs if x.lower() == 'stream')
if (not stream or not kwargs[stream[0]]) and not self.session.stream:
try:
self._response.html.encoding = self._response.encoding # 修复requests_html丢失编码方式的bug
except:
pass
if self._response.ok:
self._url_available = True
@ -247,31 +277,29 @@ class SessionPage(object):
raise ConnectionError(f'Status code: {r.status_code}.')
return False, f'Status code: {r.status_code}.'
# -------------------获取文件名-------------------
# header里有文件名则使用它否则在url里截取但不能保证url包含文件名
if 'Content-disposition' in r.headers:
if 'Content-disposition' in r.headers: # header里有文件名则使用它
file_name = r.headers['Content-disposition'].split('"')[1].encode('ISO-8859-1').decode('utf-8')
elif os_PATH.basename(file_url):
elif os_PATH.basename(file_url): # 在url里获取文件名
file_name = os_PATH.basename(file_url).split("?")[0]
else:
else: # 找不到则用时间和随机数生成文件名
file_name = f'untitled_{time()}_{randint(0, 100)}'
file_name = re_SUB(r'[\\/*:|<>?"]', '', file_name).strip()
file_name = re_SUB(r'[\\/*:|<>?"]', '', file_name).strip() # 去除非法字符
# -------------------重命名文件名-------------------
if rename: # 重命名文件,不改变扩展名
rename = re_SUB(r'[\\/*:|<>?"]', '', rename).strip()
ext_name = file_name.split('.')[-1]
if rename.lower().endswith(f'.{ext_name}'.lower()) or ext_name == file_name:
if '.' in rename or ext_name == file_name:
full_name = rename
else:
full_name = f'{rename}.{ext_name}'
else:
full_name = file_name
# -------------------生成路径-------------------
goal_Path = Path(goal_path)
goal_path = ''
for key, i in enumerate(goal_Path.parts): # 去除路径中的非法字符
goal_path += goal_Path.drive if key == 0 and goal_Path.drive else re_SUB(r'[*:|<>?"]', '', i).strip()
goal_path += '\\' if i != '\\' and key < len(goal_Path.parts) - 1 else ''
goal_Path = Path(goal_path)
goal_Path.mkdir(parents=True, exist_ok=True)
goal_path = goal_Path.absolute()
@ -287,8 +315,8 @@ class SessionPage(object):
full_path = Path(f'{goal_path}\\{full_name}')
else:
raise ValueError("Argument file_exists can only be 'skip', 'overwrite', 'rename'.")
if show_msg: # 打印要下载的文件
# -------------------打印要下载的文件-------------------
if show_msg:
print(full_name if file_name == full_name else f'{file_name} -> {full_name}')
print(f'Downloading to: {goal_path}')
@ -317,9 +345,8 @@ class SessionPage(object):
else:
download_status, info = True, 'Success.'
finally:
# 删除下载出错文件
if not download_status and full_path.exists():
full_path.unlink()
full_path.unlink() # 删除下载出错文件
r.close()
# -------------------显示并返回值-------------------
if show_msg:
@ -343,7 +370,7 @@ class SessionPage(object):
"""
if mode not in ['get', 'post']:
raise ValueError("Argument mode can only be 'get' or 'post'.")
url = quote(url, safe='/:&?=%;#@')
url = quote(url, safe='/:&?=%;#@+')
# 设置referer和host值
kwargs_set = set(x.lower() for x in kwargs)
@ -374,15 +401,27 @@ class SessionPage(object):
return None, e
else:
headers = dict(r.headers)
if 'Content-Type' not in headers or 'charset' not in headers['Content-Type']:
re_result = re_SEARCH(r'<meta.*?charset=[ \'"]*([^"\' />]+).*?>', r.text)
try:
charset = re_result.group(1)
except:
charset = r.apparent_encoding
content_type = tuple(x for x in headers if x.lower() == 'content-type')
stream = tuple(x for x in kwargs if x.lower() == 'stream')
charset = None
if not content_type or 'charset' not in headers[content_type[0]].lower():
if (not stream or not kwargs[stream[0]]) and not self.session.stream:
# ========================
re_result = None
for chunk in r.iter_content(chunk_size=512):
re_result = re_SEARCH(r'<meta.*?charset=[ \'"]*([^"\' />]+).*?>', chunk.decode())
break
# ========================
# re_result = re_SEARCH(r'<meta.*?charset=[ \'"]*([^"\' />]+).*?>', r.text)
try:
charset = re_result.group(1)
except:
charset = r.apparent_encoding
else:
charset = headers['Content-Type'].split('=')[1]
charset = headers[content_type[0]].split('=')[1]
# 避免存在退格符导致乱码或解析出错
r._content = r.content if 'stream' in kwargs and kwargs['stream'] else r.content.replace(b'\x08', b'\\b')
r.encoding = charset
if (not stream or not kwargs[stream[0]]) and not self.session.stream:
r._content = r.content.replace(b'\x08', b'\\b')
if charset:
r.encoding = charset
return r, 'Success'

View File

@ -0,0 +1,236 @@
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from html import unescape
from re import split as re_SPLIT
from typing import Union, Any
from selenium.webdriver.remote.webelement import WebElement
from .common import DrissionElement
from .driver_element import execute_driver_find
class ShadowRootElement(DrissionElement):
def __init__(self, inner_ele: WebElement, parent_ele, timeout: float = 10):
super().__init__(inner_ele)
self.parent_ele = parent_ele
self.timeout = timeout
self._driver = inner_ele.parent
def __repr__(self):
return f'<ShadowRootElement in {self.parent_ele} >'
@property
def driver(self):
"""返回控制元素的WebDriver对象"""
return self._driver
@property
def tag(self):
return 'shadow-root'
@property
def html(self):
return unescape(self.inner_ele.get_attribute('innerHTML')).replace('\xa0', ' ')
@property
def parent(self):
return self.parent_ele
def parents(self, num: int = 1):
"""返回上面第num级父元素 \n
:param num: 第几级父元素
:return: DriverElement对象
"""
loc = 'xpath', f'.{"/.." * (num - 1)}'
return self.parent_ele.ele(loc, timeout=0.01, show_errmsg=False)
@property
def next(self):
"""返回后一个兄弟元素"""
return self.nexts()
def nexts(self, num: int = 1):
"""返回后面第num个兄弟元素 \n
:param num: 后面第几个兄弟元素
:return: DriverElement对象
"""
loc = 'css selector', f':nth-child({num})'
return self.parent_ele.ele(loc)
def ele(self,
loc_or_str: Union[tuple, str],
mode: str = 'single',
timeout: float = None,
show_errmsg: bool = False):
"""返回当前元素下级符合条件的子元素,默认返回第一个 \n
示例 \n
- 用loc元组查找 \n
ele.ele((By.CLASS_NAME, 'ele_class')) - 返回第一个class为ele_class的子元素 \n
- 用查询字符串查找 \n
查找方式属性tag name和属性文本css selector \n
其中@表示属性=表示精确匹配:表示模糊匹配无控制字符串时默认搜索该字符串 \n
ele.ele('@class:ele_class') - 返回第一个class含有ele_class的子元素 \n
ele.ele('@name=ele_name') - 返回第一个name等于ele_name的子元素 \n
ele.ele('@placeholder') - 返回第一个带placeholder属性的子元素 \n
ele.ele('tag:p') - 返回第一个<p>子元素 \n
ele.ele('tag:div@class:ele_class') - 返回第一个class含有ele_class的div子元素 \n
ele.ele('tag:div@class=ele_class') - 返回第一个class等于ele_class的div子元素 \n
ele.ele('tag:div@text():some_text') - 返回第一个文本含有some_text的div子元素 \n
ele.ele('tag:div@text()=some_text') - 返回第一个文本等于some_text的div子元素 \n
ele.ele('text:some_text') - 返回第一个文本含有some_text的子元素 \n
ele.ele('some_text') - 返回第一个文本含有some_text的子元素等价于上一行 \n
ele.ele('text=some_text') - 返回第一个文本等于some_text的子元素 \n
ele.ele('css:div.ele_class') - 返回第一个符合css selector的子元素 \n
:param loc_or_str: 元素的定位信息可以是loc元组或查询字符串
:param mode: 'single' 'all'对应查找一个或全部
:param timeout: 查找元素超时时间
:param show_errmsg: 出现异常时是否打印信息
:return: DriverElement对象
"""
if isinstance(loc_or_str, str):
loc_or_str = get_css_from_str(loc_or_str)
elif isinstance(loc_or_str, tuple) and len(loc_or_str) == 2:
if loc_or_str[0] == 'xpath':
raise ValueError('不支持xpath')
else:
raise ValueError('Argument loc_or_str can only be tuple or str.')
timeout = timeout or self.timeout
if loc_or_str[0] == 'css selector':
return execute_driver_find(self.inner_ele, loc_or_str, mode, show_errmsg, timeout)
elif loc_or_str[0] == 'text':
return self._find_eles_by_text(loc_or_str[1], loc_or_str[2], loc_or_str[3], mode)
def eles(self,
loc_or_str: Union[tuple, str],
timeout: float = None,
show_errmsg: bool = False):
"""返回当前元素下级所有符合条件的子元素 \n
示例 \n
- 用loc元组查找 \n
ele.eles((By.CLASS_NAME, 'ele_class')) - 返回所有class为ele_class的子元素 \n
- 用查询字符串查找 \n
查找方式属性tag name和属性文本css selector \n
其中@表示属性=表示精确匹配:表示模糊匹配无控制字符串时默认搜索该字符串 \n
ele.eles('@class:ele_class') - 返回所有class含有ele_class的子元素 \n
ele.eles('@name=ele_name') - 返回所有name等于ele_name的子元素 \n
ele.eles('@placeholder') - 返回所有带placeholder属性的子元素 \n
ele.eles('tag:p') - 返回所有<p>子元素 \n
ele.eles('tag:div@class:ele_class') - 返回所有class含有ele_class的div子元素 \n
ele.eles('tag:div@class=ele_class') - 返回所有class等于ele_class的div子元素 \n
ele.eles('tag:div@text():some_text') - 返回所有文本含有some_text的div子元素 \n
ele.eles('tag:div@text()=some_text') - 返回所有文本等于some_text的div子元素 \n
ele.eles('text:some_text') - 返回所有文本含有some_text的子元素 \n
ele.eles('some_text') - 返回所有文本含有some_text的子元素等价于上一行 \n
ele.eles('text=some_text') - 返回所有文本等于some_text的子元素 \n
ele.eles('css:div.ele_class') - 返回所有符合css selector的子元素 \n
:param loc_or_str: 元素的定位信息可以是loc元组或查询字符串
:param timeout: 查找元素超时时间
:param show_errmsg: 出现异常时是否打印信息
:return: DriverElement对象组成的列表
"""
return self.ele(loc_or_str, mode='all', show_errmsg=show_errmsg, timeout=timeout)
def run_script(self, script: str, *args) -> Any:
"""执行js代码传入自己为第一个参数 \n
:param script: js文本
:param args: 传入的参数
:return: js执行结果
"""
return self.inner_ele.parent.execute_script(script, self.inner_ele, *args)
def is_enabled(self) -> bool:
"""是否可用"""
return self.inner_ele.is_enabled()
def is_valid(self) -> bool:
"""用于判断元素是否还能用,应对页面跳转元素不能用的情况"""
try:
self.is_enabled()
return True
except:
return False
def _find_eles_by_text(self, text: str, tag: str = '', match: str = 'exact', mode: str = 'single'):
"""根据文本获取页面元素 \n
:param text: 文本字符串
:param tag: tag name
:param match: 'exact' 'fuzzy'对应精确或模糊匹配
:param mode: 'single' 'all'对应匹配一个或全部
:return: 返回DriverElement对象或组成的列表
"""
eles = self.run_script('return arguments[0].querySelectorAll("*")') # 获取所有元素
from .driver_element import DriverElement
results = []
for ele in eles: # 遍历所有元素,找到符合条件的
if tag and tag != ele.tag_name:
continue
txt = self.driver.execute_script(
'if(arguments[0].firstChild!=null){return arguments[0].firstChild.nodeValue}', ele)
txt = txt or ''
if text == '' or match == 'exact': # 匹配没有文本的元素或精确匹配
if text == txt:
if mode == 'single':
return DriverElement(ele)
elif mode == 'all':
results.append(DriverElement(ele))
elif match == 'fuzzy': # 模糊匹配
if text in txt:
if mode == 'single':
return DriverElement(ele)
elif mode == 'all':
results.append(DriverElement(ele))
return None if mode == 'single' else results
def get_css_from_str(loc: str) -> tuple:
"""处理元素查找语句 \n
查找方式属性tag name及属性文本css selector \n
=表示精确匹配:表示模糊匹配无控制字符串时默认搜索该字符串 \n
=表示精确匹配:表示模糊匹配无控制字符串时默认搜索该字符串 \n
示例 \n
@class:ele_class - class含有ele_class的元素 \n
@class=ele_class - class等于ele_class的元素 \n
@class - 带class属性的元素 \n
tag:div - div元素 \n
tag:div@class:ele_class - class含有ele_class的div元素 \n
tag:div@class=ele_class - class等于ele_class的div元素 \n
tag:div@text():search_text - 文本含有search_text的div元素 \n
tag:div@text()=search_text - 文本等于search_text的div元素 \n
text:search_text - 文本含有search_text的元素 \n
text=search_text - 文本等于search_text的元素 \n
css:div.ele_class \n
"""
loc_by = 'css selector'
if loc.startswith('@'): # 根据属性查找
r = re_SPLIT(r'([:=])', loc[1:], maxsplit=1)
if len(r) == 3:
mode = '=' if r[1] == '=' else '*='
loc_str = f'*[{r[0]}{mode}{r[2]}]'
else:
loc_str = f'*[{loc[1:]}]'
elif loc.startswith(('tag=', 'tag:')): # 根据tag name查找
if '@' not in loc[4:]:
loc_str = f'{loc[4:]}'
else:
at_lst = loc[4:].split('@', maxsplit=1)
r = re_SPLIT(r'([:=])', at_lst[1], maxsplit=1)
if len(r) == 3:
if r[0] == 'text()':
match = 'exact' if r[1] == '=' else 'fuzzy'
return 'text', r[2], at_lst[0], match
mode = '=' if r[1] == '=' else '*='
loc_str = f'{at_lst[0]}[{r[0]}{mode}"{r[2]}"]'
else:
loc_str = f'{at_lst[0]}[{r[0]}]'
elif loc.startswith(('css=', 'css:')): # 用css selector查找
loc_str = loc[4:]
elif loc.startswith(('xpath=', 'xpath:')): # 用xpath查找
raise ValueError('不支持xpath')
elif loc.startswith(('text=', 'text:')): # 根据文本查找
match = 'exact' if loc[4] == '=' else 'fuzzy'
return 'text', loc[5:], '', match
else: # 根据文本模糊查找
return 'text', loc, '', 'fuzzy'
return loc_by, loc_str