去除对requests_html的依赖,改用lxml支撑s模式

This commit is contained in:
g1879 2020-11-06 18:07:14 +08:00
parent 6714ed116e
commit da849f56cc
2 changed files with 109 additions and 122 deletions

View File

@ -7,34 +7,28 @@
import re import re
from html import unescape from html import unescape
from typing import Union, List, Tuple from typing import Union, List, Tuple
from urllib.parse import urlparse, urljoin, urlunparse
from lxml import etree from lxml.etree import tostring, HTML
from lxml.etree import _Element from lxml.html import HtmlElement
from .common import DrissionElement, get_loc_from_str, translate_loc_to_xpath from .common import DrissionElement, get_loc_from_str, translate_loc_to_xpath
# from lxml.html import HtmlElement
# from requests_html import Element, BaseParser
class SessionElement(DrissionElement): class SessionElement(DrissionElement):
"""session模式的元素对象包装了一个Element对象并封装了常用功能""" """session模式的元素对象包装了一个Element对象并封装了常用功能"""
def __init__(self, ele: _Element): def __init__(self, ele: HtmlElement, page=None):
super().__init__(ele) super().__init__(ele, page)
# def __repr__(self): def __repr__(self):
# attrs = [f"{attr}='{self.attrs[attr]}'" for attr in self.attrs] attrs = [f"{attr}='{self.attrs[attr]}'" for attr in self.attrs]
# return f'<SessionElement {self.tag} {" ".join(attrs)}>' return f'<SessionElement {self.tag} {" ".join(attrs)}>'
# @property @property
# def attrs(self) -> dict: def attrs(self) -> dict:
# """返回元素所有属性及值""" """返回元素所有属性及值"""
# attrs = dict() return {attr: self.attr(attr) for attr, val in self.inner_ele.items()}
# for attr in self.inner_ele.attrs:
# attrs[attr] = self.attr(attr)
# return attrs
@property @property
def text(self) -> str: def text(self) -> str:
@ -50,45 +44,42 @@ class SessionElement(DrissionElement):
@property @property
def html(self) -> str: def html(self) -> str:
"""返回元素innerHTML文本""" """返回元素innerHTML文本"""
# ee=self.ele('xpath:./*') html = unescape(tostring(self._inner_ele).decode()).replace('\xa0', ' ')
html = unescape(etree.tostring(self._inner_ele).decode()).replace('\xa0', ' ')
# html = unescape(self._inner_ele.html).replace('\xa0', ' ')
r = re.match(r'<.*?>(.*)</.*?>', html, flags=re.DOTALL) r = re.match(r'<.*?>(.*)</.*?>', html, flags=re.DOTALL)
return None if not r else r.group(1) return None if not r else r.group(1)
# return html
@property @property
def tag(self) -> str: def tag(self) -> str:
"""返回元素类型""" """返回元素类型"""
return self._inner_ele.tag return self._inner_ele.tag
# @property @property
# def css_path(self) -> str: def css_path(self) -> str:
# """返回css path路径""" """返回css path路径"""
# return self._get_ele_path('css') return self._get_ele_path('css')
# @property @property
# def xpath(self) -> str: def xpath(self) -> str:
# """返回xpath路径""" """返回xpath路径"""
# return self._get_ele_path('xpath') return self._get_ele_path('xpath')
# def _get_ele_path(self, mode): def _get_ele_path(self, mode):
# """获取css路径或xpath路径""" """获取css路径或xpath路径"""
# path_str = '' path_str = ''
# ele = self ele = self
# while ele: while ele:
# ele_id = ele.attr('id') ele_id = ele.attr('id')
# if ele_id: if ele_id:
# return f'#{ele_id}{path_str}' if mode == 'css' else f'//{ele.tag}[@id="{ele_id}"]{path_str}' return f'#{ele_id}{path_str}' if mode == 'css' else f'//{ele.tag}[@id="{ele_id}"]{path_str}'
# else: else:
# if mode == 'css': if mode == 'css':
# brothers = len(ele.eles(f'xpath:./preceding-sibling::*')) brothers = len(ele.eles(f'xpath:./preceding-sibling::*'))
# path_str = f'>:nth-child({brothers + 1}){path_str}' path_str = f'>:nth-child({brothers + 1}){path_str}'
# else: else:
# brothers = len(ele.eles(f'xpath:./preceding-sibling::{ele.tag}')) brothers = len(ele.eles(f'xpath:./preceding-sibling::{ele.tag}'))
# path_str = f'/{ele.tag}[{brothers + 1}]{path_str}' if brothers > 0 else f'/{ele.tag}{path_str}' path_str = f'/{ele.tag}[{brothers + 1}]{path_str}' if brothers > 0 else f'/{ele.tag}{path_str}'
# ele = ele.parent ele = ele.parent
# return path_str[1:] if mode == 'css' else path_str return path_str[1:] if mode == 'css' else path_str
@property @property
def parent(self): def parent(self):
@ -107,7 +98,6 @@ class SessionElement(DrissionElement):
def parents(self, num: int = 1): def parents(self, num: int = 1):
"""返回上面第num级父元素 \n """返回上面第num级父元素 \n
requests_html的Element打包了lxml的元素对象从lxml元素对象读取上下级关系后再重新打包 \n
:param num: 第几级父元素 :param num: 第几级父元素
:return: SessionElement对象 :return: SessionElement对象
""" """
@ -189,20 +179,22 @@ class SessionElement(DrissionElement):
else: else:
raise ValueError('Argument loc_or_str can only be tuple or str.') raise ValueError('Argument loc_or_str can only be tuple or str.')
loc_str = None element = self
if loc_or_str[0] == 'xpath': if loc_or_str[0] == 'xpath':
brackets = len(re.match(r'\(*', loc_or_str[1]).group(0)) brackets = len(re.match(r'\(*', loc_or_str[1]).group(0))
bracket, loc_str = '(' * brackets, loc_or_str[1][brackets:] bracket, loc_str = '(' * brackets, loc_or_str[1][brackets:]
loc_str = loc_str if loc_str.startswith(('.', '/')) else f'.//{loc_str}' loc_str = loc_str if loc_str.startswith(('.', '/')) else f'.//{loc_str}'
loc_str = loc_str if loc_str.startswith('.') else f'.{loc_str}' loc_str = loc_str if loc_str.startswith('.') else f'.{loc_str}'
loc_str = f'{bracket}{loc_str}' loc_str = f'{bracket}{loc_str}'
elif loc_or_str[0] == 'css selector': else: # css selector
# Element的html是包含自己的要如下处理使其只检索下级的 if loc_or_str[1][0].startswith('>'):
loc_str = loc_or_str[1] if loc_or_str[1][0] in '>, ' else f' {loc_or_str[1]}' loc_str = f'{self.css_path}{loc_or_str[1]}'
loc_str = f':root>{self.tag}{loc_str}' element = self.page
loc_or_str = loc_or_str[0], loc_str else:
loc_str = loc_or_str[1]
return execute_session_find(self.inner_ele, loc_or_str, mode, show_errmsg) loc_or_str = loc_or_str[0], loc_str
return execute_session_find(element, loc_or_str, mode, show_errmsg)
def eles(self, loc_or_str: Union[Tuple[str, str], str], show_errmsg: bool = False): def eles(self, loc_or_str: Union[Tuple[str, str], str], show_errmsg: bool = False):
"""返回当前元素下级所有符合条件的子元素 \n """返回当前元素下级所有符合条件的子元素 \n
@ -237,48 +229,61 @@ class SessionElement(DrissionElement):
:return: 属性值文本没有该属性返回None :return: 属性值文本没有该属性返回None
""" """
try: try:
if attr == 'href': if attr == 'href':
# 如直接获取attr只能获取相对地址 # 如直接获取attr只能获取相对地址
link = self.inner_ele.get('href') link = self.inner_ele.get('href')
if link.lower().startswith(('javascript:', 'mailto:')): if link.lower().startswith(('javascript:', 'mailto:')):
return link return link
elif link.startswith('#'): elif link.startswith('#'):
if '#' in self.url: if '#' in self.page.url:
return re.sub(r'#.*', link, self.url) return re.sub(r'#.*', link, self.page.url)
else: else:
return f'{self.url}{link}' return f'{self.page.url}{link}'
# elif link.startswith('?'): # 避免当相对URL以?开头时requests-html丢失参数的bug elif link.startswith('?'): # 避免当相对url以?开头时丢失参数的bug TODO:测试是否还存在
# if '?' in self.inner_ele.url: if '?' in self.page.url:
# return re.sub(r'\?.*', link, self.inner_ele.url) return re.sub(r'\?.*', link, self.page.url)
# else: else:
# return f'{self.inner_ele.url}{link}' return f'{self.page.url}{link}'
# else: else:
# for link in self._inner_ele.absolute_links: return self._make_absolute(link)
# return link elif attr == 'src':
# elif attr == 'src': return self._make_absolute(self.inner_ele.get('src'))
# return self._inner_ele._make_absolute(self._inner_ele.attrs['src']) elif attr == 'text':
# elif attr == 'class': return self.text
# return ' '.join(self._inner_ele.attrs['class']) elif attr == 'outerHTML':
# elif attr == 'text': return unescape(tostring(self._inner_ele).decode()).replace('\xa0', ' ')
# return self.text elif attr == 'innerHTML':
# elif attr == 'outerHTML': return self.html
# return self.inner_ele.html
# elif attr == 'innerHTML':
# return self.html
else: else:
return self.inner_ele.get(attr) return self.inner_ele.get(attr)
except: except:
return None return None
def _make_absolute(self, link):
"""生成绝对url"""
parsed = urlparse(link)._asdict()
def execute_session_find(page_or_ele: _Element, # 相对路径与页面url拼接并返回
if not parsed['netloc']: # 相对路径,与
return urljoin(self.page.url, link)
# 绝对路径但缺少协议从页面url获取协议并修复
if not parsed['scheme']:
parsed['scheme'] = urlparse(self.page.url).scheme
parsed = tuple(v for v in parsed.values())
return urlunparse(parsed)
# 绝对路径且不缺协议,直接返回
return link
def execute_session_find(page_or_ele,
loc: Tuple[str, str], loc: Tuple[str, str],
mode: str = 'single', mode: str = 'single',
show_errmsg: bool = False) -> Union[SessionElement, List[SessionElement or str]]: show_errmsg: bool = False) -> Union[SessionElement, List[SessionElement or str]]:
"""执行session模式元素的查找 \n """执行session模式元素的查找 \n
页面查找元素及元素查找下级元素皆使用此方法 \n 页面查找元素及元素查找下级元素皆使用此方法 \n
:param page_or_ele: request_html的页面或元素对象 :param page_or_ele: SessionPage对象或SessionElement对象
:param loc: 元素定位元组 :param loc: 元素定位元组
:param mode: 'single' 'all'对应获取第一个或全部 :param mode: 'single' 'all'对应获取第一个或全部
:param show_errmsg: 出现异常时是否显示错误信息 :param show_errmsg: 出现异常时是否显示错误信息
@ -288,34 +293,26 @@ def execute_session_find(page_or_ele: _Element,
if mode not in ['single', 'all']: if mode not in ['single', 'all']:
raise ValueError("Argument mode can only be 'single' or 'all'.") raise ValueError("Argument mode can only be 'single' or 'all'.")
loc_by, loc_str = loc if isinstance(page_or_ele, SessionElement):
# print(loc) page = page_or_ele.page
# ele = page_or_ele.xpath(loc_str) page_or_ele = page_or_ele.inner_ele
# print(ele) else: # 传入的是SessionPage对象
page = page_or_ele
page_or_ele = get_HtmlElement(page_or_ele.response.text)
try: try:
# ele = None if loc[0] == 'xpath':
if loc_by == 'xpath': ele = page_or_ele.xpath(loc[1])
ele = page_or_ele.xpath(loc_str)
# if 'PyQuery' in str(type(page_or_ele.element)):
# # 从页面查找。
# ele = page_or_ele.xpath(loc_str)
# elif 'HtmlElement' in str(type(page_or_ele.element)):
# # 从元素查找。这样区分是为了能找到上级元素
# try:
# elements = page_or_ele.element.xpath(loc_str)
# ele = [Element(element=e, url=page_or_ele.url) for e in elements]
# except AttributeError:
# ele = page_or_ele.xpath(loc_str)
else: # 用css selector获取 else: # 用css selector获取
ele = page_or_ele.cssselect(loc_str) ele = page_or_ele.cssselect(loc[1])
if mode == 'single': if mode == 'single':
ele = ele[0] if ele else None ele = ele[0] if ele else None
return SessionElement(ele) if isinstance(ele, _Element) else unescape(ele).replace('\xa0', ' ') return SessionElement(ele, page) if isinstance(ele, HtmlElement) else unescape(ele).replace('\xa0', ' ')
elif mode == 'all': elif mode == 'all':
ele = filter(lambda x: x != '\n', ele) # 去除元素间换行符 ele = filter(lambda x: x != '\n', ele) # 去除元素间换行符
ele = map(lambda x: unescape(x).replace('\xa0', ' ') if isinstance(x, str) else x, ele) # 替换空格 ele = map(lambda x: unescape(x).replace('\xa0', ' ') if isinstance(x, str) else x, ele) # 替换空格
return [SessionElement(e) if isinstance(e, _Element) else e for e in ele] return [SessionElement(e, page) if isinstance(e, HtmlElement) else e for e in ele]
except: except:
if show_errmsg: if show_errmsg:
print('Element(s) not found.', loc) print('Element(s) not found.', loc)
@ -323,11 +320,6 @@ def execute_session_find(page_or_ele: _Element,
return [] if mode == 'all' else None return [] if mode == 'all' else None
def get_HtmlElement(html: str) -> _Element: def get_HtmlElement(html: str) -> HtmlElement:
# html = f'<drission_root>{html}</drission_root>' """从html文本生成元素对象"""
ele_or_page = etree.HTML(html) return HTML(html)
# html = etree.tostring(ele_or_page).decode()
# if str(html).startswith('<html><body>') and str(html).endswith('</body></html>'):
# html = etree.tostring(ele_or_page)[12:-14].decode()
# ele_or_page = etree.fromstring(html)
return ele_or_page

View File

@ -12,15 +12,13 @@ from re import search as re_SEARCH
from re import sub as re_SUB from re import sub as re_SUB
from time import time, sleep from time import time, sleep
from typing import Union, List, Tuple from typing import Union, List, Tuple
from urllib import parse from urllib.parse import urlparse, quote, unquote
from urllib.parse import urlparse, quote
# from requests_html import HTMLSession, HTMLResponse, Element
from requests import Session, Response from requests import Session, Response
from .common import get_loc_from_str, translate_loc_to_xpath, get_available_file_name from .common import get_loc_from_str, translate_loc_to_xpath, get_available_file_name
from .config import OptionsManager from .config import OptionsManager
from .session_element import SessionElement, execute_session_find, get_HtmlElement from .session_element import SessionElement, execute_session_find
class SessionPage(object): class SessionPage(object):
@ -113,13 +111,10 @@ class SessionPage(object):
elif isinstance(loc_or_ele, SessionElement): elif isinstance(loc_or_ele, SessionElement):
return loc_or_ele return loc_or_ele
# elif isinstance(loc_or_ele, Element):
# return SessionElement(loc_or_ele)
else: else:
raise ValueError('Argument loc_or_str can only be tuple, str, SessionElement, Element.') raise ValueError('Argument loc_or_str can only be tuple, str, SessionElement, Element.')
return execute_session_find(get_HtmlElement(self.response.text), loc_or_ele, mode, show_errmsg) return execute_session_find(self, loc_or_ele, mode, show_errmsg)
def eles(self, def eles(self,
loc_or_str: Union[Tuple[str, str], str], loc_or_str: Union[Tuple[str, str], str],
@ -295,7 +290,7 @@ class SessionPage(object):
if not file_name: # 找不到则用时间和随机数生成文件名 if not file_name: # 找不到则用时间和随机数生成文件名
file_name = f'untitled_{time()}_{randint(0, 100)}' file_name = f'untitled_{time()}_{randint(0, 100)}'
file_name = re_SUB(r'[\\/*:|<>?"]', '', file_name).strip() # 去除非法字符 file_name = re_SUB(r'[\\/*:|<>?"]', '', file_name).strip() # 去除非法字符
file_name = parse.unquote(file_name) file_name = unquote(file_name)
# -------------------重命名文件名------------------- # -------------------重命名文件名-------------------
if rename: # 重命名文件,不改变扩展名 if rename: # 重命名文件,不改变扩展名
@ -433,10 +428,10 @@ class SessionPage(object):
else: else:
charset = headers[content_type[0]].split('=')[1] charset = headers[content_type[0]].split('=')[1]
if charset: if charset: # 指定网页编码
r.encoding = charset r.encoding = charset
if not_stream: # 加载网页时修复编码 if not_stream: # 避免存在退格符导致乱码或解析出错
r._content = r.content.replace(b'\x08', b'\\b') # 避免存在退格符导致乱码或解析出错 r._content = r.content.replace(b'\x08', b'\\b')
# r.html.encoding = r.encoding # 修复requests_html丢失编码方式的bug
return r, 'Success' return r, 'Success'