弃用requests_html,未完成

This commit is contained in:
g1879 2020-11-04 16:34:55 +08:00
parent bbea8871a0
commit 41d700a3d9
6 changed files with 154 additions and 127 deletions

View File

@ -10,18 +10,20 @@ from re import split as re_SPLIT
from shutil import rmtree from shutil import rmtree
from typing import Union from typing import Union
from requests_html import Element from lxml.etree import _Element
# from lxml.html import HtmlElement
# from requests_html import Element
from selenium.webdriver.remote.webelement import WebElement from selenium.webdriver.remote.webelement import WebElement
class DrissionElement(object): class DrissionElement(object):
"""SessionElement和DriverElement的基类""" """SessionElement和DriverElement的基类"""
def __init__(self, ele: Union[Element, WebElement]): def __init__(self, ele: Union[WebElement, _Element]):
self._inner_ele = ele self._inner_ele = ele
@property @property
def inner_ele(self) -> Union[WebElement, Element]: def inner_ele(self) -> Union[WebElement, _Element]:
return self._inner_ele return self._inner_ele
@property @property

View File

@ -8,7 +8,7 @@ from typing import Union
from urllib.parse import urlparse from urllib.parse import urlparse
from requests import Session from requests import Session
from requests_html import HTMLSession # from requests_html import HTMLSession
from selenium import webdriver from selenium import webdriver
from selenium.common.exceptions import WebDriverException from selenium.common.exceptions import WebDriverException
from selenium.webdriver.chrome.options import Options from selenium.webdriver.chrome.options import Options
@ -23,12 +23,12 @@ class Drission(object):
def __init__(self, def __init__(self,
driver_or_options: Union[WebDriver, dict, Options] = None, driver_or_options: Union[WebDriver, dict, Options] = None,
session_or_options: Union[Session, HTMLSession, dict] = None, session_or_options: Union[Session, dict] = None,
ini_path: str = None, ini_path: str = None,
proxy: dict = None): proxy: dict = None):
"""初始化可接收现成的WebDriver和Session对象或接收它们的配置信息 \n """初始化可接收现成的WebDriver和Session对象或接收它们的配置信息 \n
:param driver_or_options: driver对象或chrome设置Options类或设置字典 :param driver_or_options: driver对象或chrome设置Options类或设置字典
:param session_or_options: sessionHTMLSession对象或session设置 :param session_or_options: Session对象设置
:param ini_path: ini文件路径 :param ini_path: ini文件路径
:param proxy: 代理设置 :param proxy: 代理设置
""" """
@ -36,15 +36,15 @@ class Drission(object):
self._driver = None self._driver = None
self._driver_path = 'chromedriver' self._driver_path = 'chromedriver'
self._proxy = proxy self._proxy = proxy
if isinstance(session_or_options, HTMLSession): if isinstance(session_or_options, Session):
self._session = session_or_options self._session = session_or_options
elif isinstance(session_or_options, Session): # elif isinstance(session_or_options, Session):
self._session = HTMLSession() # self._session = HTMLSession()
for key in session_or_options.__dict__: # session对象强制升级为子类HTMLSession对象 # for key in session_or_options.__dict__: # session对象强制升级为子类HTMLSession对象
if key != 'hooks': # if key != 'hooks':
self._session.__dict__[key] = session_or_options.__dict__[key] # self._session.__dict__[key] = session_or_options.__dict__[key]
else: # else:
self._session.hooks['response'].extend(session_or_options.hooks['response']) # self._session.hooks['response'].extend(session_or_options.hooks['response'])
else: else:
if session_or_options is None: if session_or_options is None:
self._session_options = OptionsManager(ini_path).get_option('session_options') self._session_options = OptionsManager(ini_path).get_option('session_options')
@ -64,10 +64,10 @@ class Drission(object):
self._driver_path = self._driver_options['driver_path'] self._driver_path = self._driver_options['driver_path']
@property @property
def session(self) -> HTMLSession: def session(self) -> Session:
"""返回HTMLSession对象如为None则按配置信息创建""" """返回HTMLSession对象如为None则按配置信息创建"""
if self._session is None: if self._session is None:
self._session = HTMLSession() self._session = Session()
attrs = ['headers', 'cookies', 'auth', 'proxies', 'hooks', 'params', 'verify', attrs = ['headers', 'cookies', 'auth', 'proxies', 'hooks', 'params', 'verify',
'cert', 'adapters', 'stream', 'trust_env', 'max_redirects'] 'cert', 'adapters', 'stream', 'trust_env', 'max_redirects']
for i in attrs: for i in attrs:

View File

@ -6,8 +6,8 @@
""" """
from typing import Union, List, Tuple from typing import Union, List, Tuple
from requests import Response from requests import Response, Session
from requests_html import HTMLSession, Element # from requests_html import HTMLSession, Element
from selenium.webdriver.chrome.webdriver import WebDriver from selenium.webdriver.chrome.webdriver import WebDriver
from selenium.webdriver.remote.webelement import WebElement from selenium.webdriver.remote.webelement import WebElement
@ -127,7 +127,7 @@ class MixPage(Null, SessionPage, DriverPage):
return self._drission.driver return self._drission.driver
@property @property
def session(self) -> HTMLSession: def session(self) -> Session:
"""返回session对象如没有则创建 \n """返回session对象如没有则创建 \n
:return: HTMLSession对象 :return: HTMLSession对象
""" """
@ -279,7 +279,7 @@ class MixPage(Null, SessionPage, DriverPage):
return super().get(url, go_anyway, show_errmsg, retry, interval, **kwargs) return super().get(url, go_anyway, show_errmsg, retry, interval, **kwargs)
def ele(self, def ele(self,
loc_or_ele: Union[Tuple[str, str], str, DriverElement, SessionElement, Element, WebElement], loc_or_ele: Union[Tuple[str, str], str, DriverElement, SessionElement, WebElement],
mode: str = None, mode: str = None,
timeout: float = None, timeout: float = None,
show_errmsg: bool = False) -> Union[DriverElement, SessionElement, str]: show_errmsg: bool = False) -> Union[DriverElement, SessionElement, str]:

View File

@ -8,28 +8,33 @@ import re
from html import unescape from html import unescape
from typing import Union, List, Tuple from typing import Union, List, Tuple
from requests_html import Element, BaseParser from lxml import etree
from lxml.etree import _Element
from .common import DrissionElement, get_loc_from_str, translate_loc_to_xpath from .common import DrissionElement, get_loc_from_str, translate_loc_to_xpath
# from lxml.html import HtmlElement
# from requests_html import Element, BaseParser
class SessionElement(DrissionElement): class SessionElement(DrissionElement):
"""session模式的元素对象包装了一个Element对象并封装了常用功能""" """session模式的元素对象包装了一个Element对象并封装了常用功能"""
def __init__(self, ele: Element): def __init__(self, ele: _Element):
super().__init__(ele) super().__init__(ele)
def __repr__(self): # def __repr__(self):
attrs = [f"{attr}='{self.attrs[attr]}'" for attr in self.attrs] # attrs = [f"{attr}='{self.attrs[attr]}'" for attr in self.attrs]
return f'<SessionElement {self.tag} {" ".join(attrs)}>' # return f'<SessionElement {self.tag} {" ".join(attrs)}>'
@property # @property
def attrs(self) -> dict: # def attrs(self) -> dict:
"""返回元素所有属性及值""" # """返回元素所有属性及值"""
attrs = dict() # attrs = dict()
for attr in self.inner_ele.attrs: # for attr in self.inner_ele.attrs:
attrs[attr] = self.attr(attr) # attrs[attr] = self.attr(attr)
return attrs # return attrs
@property @property
def text(self) -> str: def text(self) -> str:
@ -45,42 +50,45 @@ class SessionElement(DrissionElement):
@property @property
def html(self) -> str: def html(self) -> str:
"""返回元素innerHTML文本""" """返回元素innerHTML文本"""
html = unescape(self._inner_ele.html).replace('\xa0', ' ') # ee=self.ele('xpath:./*')
html = unescape(etree.tostring(self._inner_ele).decode()).replace('\xa0', ' ')
# html = unescape(self._inner_ele.html).replace('\xa0', ' ')
r = re.match(r'<.*?>(.*)</.*?>', html, flags=re.DOTALL) r = re.match(r'<.*?>(.*)</.*?>', html, flags=re.DOTALL)
return None if not r else r.group(1) return None if not r else r.group(1)
# return html
@property @property
def tag(self) -> str: def tag(self) -> str:
"""返回元素类型""" """返回元素类型"""
return self._inner_ele.tag return self._inner_ele.tag
@property # @property
def css_path(self) -> str: # def css_path(self) -> str:
"""返回css path路径""" # """返回css path路径"""
return self._get_ele_path('css') # return self._get_ele_path('css')
@property # @property
def xpath(self) -> str: # def xpath(self) -> str:
"""返回xpath路径""" # """返回xpath路径"""
return self._get_ele_path('xpath') # return self._get_ele_path('xpath')
def _get_ele_path(self, mode): # def _get_ele_path(self, mode):
"""获取css路径或xpath路径""" # """获取css路径或xpath路径"""
path_str = '' # path_str = ''
ele = self # ele = self
while ele: # while ele:
ele_id = ele.attr('id') # ele_id = ele.attr('id')
if ele_id: # if ele_id:
return f'#{ele_id}{path_str}' if mode == 'css' else f'//{ele.tag}[@id="{ele_id}"]{path_str}' # return f'#{ele_id}{path_str}' if mode == 'css' else f'//{ele.tag}[@id="{ele_id}"]{path_str}'
else: # else:
if mode == 'css': # if mode == 'css':
brothers = len(ele.eles(f'xpath:./preceding-sibling::*')) # brothers = len(ele.eles(f'xpath:./preceding-sibling::*'))
path_str = f'>:nth-child({brothers + 1}){path_str}' # path_str = f'>:nth-child({brothers + 1}){path_str}'
else: # else:
brothers = len(ele.eles(f'xpath:./preceding-sibling::{ele.tag}')) # brothers = len(ele.eles(f'xpath:./preceding-sibling::{ele.tag}'))
path_str = f'/{ele.tag}[{brothers + 1}]{path_str}' if brothers > 0 else f'/{ele.tag}{path_str}' # path_str = f'/{ele.tag}[{brothers + 1}]{path_str}' if brothers > 0 else f'/{ele.tag}{path_str}'
ele = ele.parent # ele = ele.parent
return path_str[1:] if mode == 'css' else path_str # return path_str[1:] if mode == 'css' else path_str
@property @property
def parent(self): def parent(self):
@ -199,47 +207,47 @@ class SessionElement(DrissionElement):
""" """
return self.ele(loc_or_str, mode='all', show_errmsg=show_errmsg) return self.ele(loc_or_str, mode='all', show_errmsg=show_errmsg)
def attr(self, attr: str) -> Union[str, None]: # def attr(self, attr: str) -> Union[str, None]:
"""返回属性值 \n # """返回属性值 \n
:param attr: 属性名 # :param attr: 属性名
:return: 属性值文本没有该属性返回None # :return: 属性值文本没有该属性返回None
""" # """
try: # try:
if attr == 'href': # if attr == 'href':
# 如直接获取attr只能获取相对地址 # # 如直接获取attr只能获取相对地址
link = self._inner_ele.attrs['href'] # link = self._inner_ele.attrs['href']
if link.lower().startswith(('javascript:', 'mailto:')): # if link.lower().startswith(('javascript:', 'mailto:')):
return link # return link
elif link.startswith('#'): # elif link.startswith('#'):
if '#' in self.inner_ele.url: # if '#' in self.inner_ele.url:
return re.sub(r'#.*', link, self.inner_ele.url) # return re.sub(r'#.*', link, self.inner_ele.url)
else: # else:
return f'{self.inner_ele.url}{link}' # return f'{self.inner_ele.url}{link}'
elif link.startswith('?'): # 避免当相对URL以?开头时requests-html丢失参数的bug # elif link.startswith('?'): # 避免当相对URL以?开头时requests-html丢失参数的bug
if '?' in self.inner_ele.url: # if '?' in self.inner_ele.url:
return re.sub(r'\?.*', link, self.inner_ele.url) # return re.sub(r'\?.*', link, self.inner_ele.url)
else: # else:
return f'{self.inner_ele.url}{link}' # return f'{self.inner_ele.url}{link}'
else: # else:
for link in self._inner_ele.absolute_links: # for link in self._inner_ele.absolute_links:
return link # return link
elif attr == 'src': # elif attr == 'src':
return self._inner_ele._make_absolute(self._inner_ele.attrs['src']) # return self._inner_ele._make_absolute(self._inner_ele.attrs['src'])
elif attr == 'class': # elif attr == 'class':
return ' '.join(self._inner_ele.attrs['class']) # return ' '.join(self._inner_ele.attrs['class'])
elif attr == 'text': # elif attr == 'text':
return self.text # return self.text
elif attr == 'outerHTML': # elif attr == 'outerHTML':
return self.inner_ele.html # return self.inner_ele.html
elif attr == 'innerHTML': # elif attr == 'innerHTML':
return self.html # return self.html
else: # else:
return self._inner_ele.attrs[attr] # return self._inner_ele.attrs[attr]
except: # except:
return None # return None
def execute_session_find(page_or_ele: BaseParser, def execute_session_find(page_or_ele: _Element,
loc: Tuple[str, str], loc: Tuple[str, str],
mode: str = 'single', mode: str = 'single',
show_errmsg: bool = False) -> Union[SessionElement, List[SessionElement or str]]: show_errmsg: bool = False) -> Union[SessionElement, List[SessionElement or str]]:
@ -254,32 +262,47 @@ def execute_session_find(page_or_ele: BaseParser,
mode = mode or 'single' mode = mode or 'single'
if mode not in ['single', 'all']: if mode not in ['single', 'all']:
raise ValueError("Argument mode can only be 'single' or 'all'.") raise ValueError("Argument mode can only be 'single' or 'all'.")
loc_by, loc_str = loc loc_by, loc_str = loc
# print(loc)
# ele = page_or_ele.xpath(loc_str)
# print(ele)
try: try:
ele = None # ele = None
if loc_by == 'xpath': if loc_by == 'xpath':
if 'PyQuery' in str(type(page_or_ele.element)):
# 从页面查找。
ele = page_or_ele.xpath(loc_str)
elif 'HtmlElement' in str(type(page_or_ele.element)):
# 从元素查找。这样区分是为了能找到上级元素
try:
elements = page_or_ele.element.xpath(loc_str)
ele = [Element(element=e, url=page_or_ele.url) for e in elements]
except AttributeError:
ele = page_or_ele.xpath(loc_str) ele = page_or_ele.xpath(loc_str)
# if 'PyQuery' in str(type(page_or_ele.element)):
# # 从页面查找。
# ele = page_or_ele.xpath(loc_str)
# elif 'HtmlElement' in str(type(page_or_ele.element)):
# # 从元素查找。这样区分是为了能找到上级元素
# try:
# elements = page_or_ele.element.xpath(loc_str)
# ele = [Element(element=e, url=page_or_ele.url) for e in elements]
# except AttributeError:
# ele = page_or_ele.xpath(loc_str)
else: # 用css selector获取 else: # 用css selector获取
ele = page_or_ele.find(loc_str) ele = page_or_ele.cssselect(loc_str)
if mode == 'single': if mode == 'single':
ele = ele[0] if ele else None ele = ele[0] if ele else None
return SessionElement(ele) if isinstance(ele, Element) else unescape(ele).replace('\xa0', ' ') return SessionElement(ele) if isinstance(ele, _Element) else unescape(ele).replace('\xa0', ' ')
elif mode == 'all': elif mode == 'all':
ele = filter(lambda x: x != '\n', ele) # 去除元素间换行符 ele = filter(lambda x: x != '\n', ele) # 去除元素间换行符
ele = map(lambda x: unescape(x).replace('\xa0', ' ') if isinstance(x, str) else x, ele) # 替换空格 ele = map(lambda x: unescape(x).replace('\xa0', ' ') if isinstance(x, str) else x, ele) # 替换空格
return [SessionElement(e) if isinstance(e, Element) else e for e in ele] return [SessionElement(e) if isinstance(e, _Element) else e for e in ele]
except: except:
if show_errmsg: if show_errmsg:
print('Element(s) not found.', loc) print('Element(s) not found.', loc)
raise raise
return [] if mode == 'all' else None return [] if mode == 'all' else None
def get_HtmlElement(html: str) -> _Element:
# html = f'<drission_root>{html}</drission_root>'
ele_or_page = etree.HTML(html)
# html = etree.tostring(ele_or_page).decode()
# if str(html).startswith('<html><body>') and str(html).endswith('</body></html>'):
# html = etree.tostring(ele_or_page)[12:-14].decode()
# ele_or_page = etree.fromstring(html)
return ele_or_page

View File

@ -15,17 +15,18 @@ from typing import Union, List, Tuple
from urllib import parse from urllib import parse
from urllib.parse import urlparse, quote from urllib.parse import urlparse, quote
from requests_html import HTMLSession, HTMLResponse, Element # from requests_html import HTMLSession, HTMLResponse, Element
from requests import Session, Response
from .common import get_loc_from_str, translate_loc_to_xpath, get_available_file_name from .common import get_loc_from_str, translate_loc_to_xpath, get_available_file_name
from .config import OptionsManager from .config import OptionsManager
from .session_element import SessionElement, execute_session_find from .session_element import SessionElement, execute_session_find, get_HtmlElement
class SessionPage(object): class SessionPage(object):
"""SessionPage封装了页面操作的常用功能使用requests_html来获取、解析网页。""" """SessionPage封装了页面操作的常用功能使用requests_html来获取、解析网页。"""
def __init__(self, session: HTMLSession, timeout: float = 10): def __init__(self, session: Session, timeout: float = 10):
"""初始化函数""" """初始化函数"""
self._session = session self._session = session
self.timeout = timeout self.timeout = timeout
@ -34,12 +35,12 @@ class SessionPage(object):
self._response = None self._response = None
@property @property
def session(self) -> HTMLSession: def session(self) -> Session:
"""返回session对象""" """返回session对象"""
return self._session return self._session
@property @property
def response(self) -> HTMLResponse: def response(self) -> Response:
"""返回访问url得到的response对象""" """返回访问url得到的response对象"""
return self._response return self._response
@ -66,10 +67,10 @@ class SessionPage(object):
@property @property
def html(self) -> str: def html(self) -> str:
"""返回页面html文本""" """返回页面html文本"""
return self.response.html.html return self.response.text
def ele(self, def ele(self,
loc_or_ele: Union[Tuple[str, str], str, SessionElement, Element], loc_or_ele: Union[Tuple[str, str], str, SessionElement], # , Element
mode: str = None, mode: str = None,
show_errmsg: bool = False) -> Union[SessionElement, List[SessionElement or str], str, None]: show_errmsg: bool = False) -> Union[SessionElement, List[SessionElement or str], str, None]:
"""返回页面中符合条件的元素,默认返回第一个 \n """返回页面中符合条件的元素,默认返回第一个 \n
@ -112,13 +113,13 @@ class SessionPage(object):
elif isinstance(loc_or_ele, SessionElement): elif isinstance(loc_or_ele, SessionElement):
return loc_or_ele return loc_or_ele
elif isinstance(loc_or_ele, Element): # elif isinstance(loc_or_ele, Element):
return SessionElement(loc_or_ele) # return SessionElement(loc_or_ele)
else: else:
raise ValueError('Argument loc_or_str can only be tuple, str, SessionElement, Element.') raise ValueError('Argument loc_or_str can only be tuple, str, SessionElement, Element.')
return execute_session_find(self.response.html, loc_or_ele, mode, show_errmsg) return execute_session_find(get_HtmlElement(self.response.text), loc_or_ele, mode, show_errmsg)
def eles(self, def eles(self,
loc_or_str: Union[Tuple[str, str], str], loc_or_str: Union[Tuple[str, str], str],
@ -156,7 +157,7 @@ class SessionPage(object):
times: int = 0, times: int = 0,
interval: float = 1, interval: float = 1,
show_errmsg: bool = False, show_errmsg: bool = False,
**kwargs) -> HTMLResponse: **kwargs) -> Response:
"""尝试连接,重试若干次 \n """尝试连接,重试若干次 \n
:param to_url: 要访问的url :param to_url: 要访问的url
:param times: 重试次数 :param times: 重试次数
@ -434,7 +435,7 @@ class SessionPage(object):
if not_stream: # 加载网页时修复编码 if not_stream: # 加载网页时修复编码
r._content = r.content.replace(b'\x08', b'\\b') # 避免存在退格符导致乱码或解析出错 r._content = r.content.replace(b'\x08', b'\\b') # 避免存在退格符导致乱码或解析出错
r.html.encoding = r.encoding # 修复requests_html丢失编码方式的bug # r.html.encoding = r.encoding # 修复requests_html丢失编码方式的bug
if charset: if charset:
r.encoding = charset r.encoding = charset
return r, 'Success' return r, 'Success'

View File

@ -1,4 +1,5 @@
selenium~=3.141.0 selenium
requests-html~=0.10.0 requests
requests~=2.23.0 tldextract
tldextract~=2.2.2 DrissionPage
lxml