From da849f56cc16f211868e9cb7eaaadf6c918fc327 Mon Sep 17 00:00:00 2001 From: g1879 Date: Fri, 6 Nov 2020 18:07:14 +0800 Subject: [PATCH] =?UTF-8?q?=E5=8E=BB=E9=99=A4=E5=AF=B9requests=5Fhtml?= =?UTF-8?q?=E7=9A=84=E4=BE=9D=E8=B5=96=EF=BC=8C=E6=94=B9=E7=94=A8lxml?= =?UTF-8?q?=E6=94=AF=E6=92=91s=E6=A8=A1=E5=BC=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- DrissionPage/session_element.py | 210 +++++++++++++++----------------- DrissionPage/session_page.py | 21 ++-- 2 files changed, 109 insertions(+), 122 deletions(-) diff --git a/DrissionPage/session_element.py b/DrissionPage/session_element.py index c007561..db4bde7 100644 --- a/DrissionPage/session_element.py +++ b/DrissionPage/session_element.py @@ -7,34 +7,28 @@ import re from html import unescape from typing import Union, List, Tuple +from urllib.parse import urlparse, urljoin, urlunparse -from lxml import etree -from lxml.etree import _Element +from lxml.etree import tostring, HTML +from lxml.html import HtmlElement from .common import DrissionElement, get_loc_from_str, translate_loc_to_xpath -# from lxml.html import HtmlElement -# from requests_html import Element, BaseParser - - class SessionElement(DrissionElement): """session模式的元素对象,包装了一个Element对象,并封装了常用功能""" - def __init__(self, ele: _Element): - super().__init__(ele) + def __init__(self, ele: HtmlElement, page=None): + super().__init__(ele, page) - # def __repr__(self): - # attrs = [f"{attr}='{self.attrs[attr]}'" for attr in self.attrs] - # return f'' + def __repr__(self): + attrs = [f"{attr}='{self.attrs[attr]}'" for attr in self.attrs] + return f'' - # @property - # def attrs(self) -> dict: - # """返回元素所有属性及值""" - # attrs = dict() - # for attr in self.inner_ele.attrs: - # attrs[attr] = self.attr(attr) - # return attrs + @property + def attrs(self) -> dict: + """返回元素所有属性及值""" + return {attr: self.attr(attr) for attr, val in self.inner_ele.items()} @property def text(self) -> str: @@ -50,45 +44,42 @@ class SessionElement(DrissionElement): @property def html(self) -> str: """返回元素innerHTML文本""" - # ee=self.ele('xpath:./*') - html = unescape(etree.tostring(self._inner_ele).decode()).replace('\xa0', ' ') - # html = unescape(self._inner_ele.html).replace('\xa0', ' ') + html = unescape(tostring(self._inner_ele).decode()).replace('\xa0', ' ') r = re.match(r'<.*?>(.*)', html, flags=re.DOTALL) return None if not r else r.group(1) - # return html @property def tag(self) -> str: """返回元素类型""" return self._inner_ele.tag - # @property - # def css_path(self) -> str: - # """返回css path路径""" - # return self._get_ele_path('css') + @property + def css_path(self) -> str: + """返回css path路径""" + return self._get_ele_path('css') - # @property - # def xpath(self) -> str: - # """返回xpath路径""" - # return self._get_ele_path('xpath') + @property + def xpath(self) -> str: + """返回xpath路径""" + return self._get_ele_path('xpath') - # def _get_ele_path(self, mode): - # """获取css路径或xpath路径""" - # path_str = '' - # ele = self - # while ele: - # ele_id = ele.attr('id') - # if ele_id: - # return f'#{ele_id}{path_str}' if mode == 'css' else f'//{ele.tag}[@id="{ele_id}"]{path_str}' - # else: - # if mode == 'css': - # brothers = len(ele.eles(f'xpath:./preceding-sibling::*')) - # path_str = f'>:nth-child({brothers + 1}){path_str}' - # else: - # brothers = len(ele.eles(f'xpath:./preceding-sibling::{ele.tag}')) - # path_str = f'/{ele.tag}[{brothers + 1}]{path_str}' if brothers > 0 else f'/{ele.tag}{path_str}' - # ele = ele.parent - # return path_str[1:] if mode == 'css' else path_str + def _get_ele_path(self, mode): + """获取css路径或xpath路径""" + path_str = '' + ele = self + while ele: + ele_id = ele.attr('id') + if ele_id: + return f'#{ele_id}{path_str}' if mode == 'css' else f'//{ele.tag}[@id="{ele_id}"]{path_str}' + else: + if mode == 'css': + brothers = len(ele.eles(f'xpath:./preceding-sibling::*')) + path_str = f'>:nth-child({brothers + 1}){path_str}' + else: + brothers = len(ele.eles(f'xpath:./preceding-sibling::{ele.tag}')) + path_str = f'/{ele.tag}[{brothers + 1}]{path_str}' if brothers > 0 else f'/{ele.tag}{path_str}' + ele = ele.parent + return path_str[1:] if mode == 'css' else path_str @property def parent(self): @@ -107,7 +98,6 @@ class SessionElement(DrissionElement): def parents(self, num: int = 1): """返回上面第num级父元素 \n - requests_html的Element打包了lxml的元素对象,从lxml元素对象读取上下级关系后再重新打包 \n :param num: 第几级父元素 :return: SessionElement对象 """ @@ -189,20 +179,22 @@ class SessionElement(DrissionElement): else: raise ValueError('Argument loc_or_str can only be tuple or str.') - loc_str = None + element = self if loc_or_str[0] == 'xpath': brackets = len(re.match(r'\(*', loc_or_str[1]).group(0)) bracket, loc_str = '(' * brackets, loc_or_str[1][brackets:] loc_str = loc_str if loc_str.startswith(('.', '/')) else f'.//{loc_str}' loc_str = loc_str if loc_str.startswith('.') else f'.{loc_str}' loc_str = f'{bracket}{loc_str}' - elif loc_or_str[0] == 'css selector': - # Element的html是包含自己的,要如下处理,使其只检索下级的 - loc_str = loc_or_str[1] if loc_or_str[1][0] in '>, ' else f' {loc_or_str[1]}' - loc_str = f':root>{self.tag}{loc_str}' - loc_or_str = loc_or_str[0], loc_str + else: # css selector + if loc_or_str[1][0].startswith('>'): + loc_str = f'{self.css_path}{loc_or_str[1]}' + element = self.page + else: + loc_str = loc_or_str[1] - return execute_session_find(self.inner_ele, loc_or_str, mode, show_errmsg) + loc_or_str = loc_or_str[0], loc_str + return execute_session_find(element, loc_or_str, mode, show_errmsg) def eles(self, loc_or_str: Union[Tuple[str, str], str], show_errmsg: bool = False): """返回当前元素下级所有符合条件的子元素 \n @@ -237,48 +229,61 @@ class SessionElement(DrissionElement): :return: 属性值文本,没有该属性返回None """ try: - if attr == 'href': # 如直接获取attr只能获取相对地址 link = self.inner_ele.get('href') if link.lower().startswith(('javascript:', 'mailto:')): return link elif link.startswith('#'): - if '#' in self.url: - return re.sub(r'#.*', link, self.url) + if '#' in self.page.url: + return re.sub(r'#.*', link, self.page.url) else: - return f'{self.url}{link}' - # elif link.startswith('?'): # 避免当相对URL以?开头时requests-html丢失参数的bug - # if '?' in self.inner_ele.url: - # return re.sub(r'\?.*', link, self.inner_ele.url) - # else: - # return f'{self.inner_ele.url}{link}' - # else: - # for link in self._inner_ele.absolute_links: - # return link - # elif attr == 'src': - # return self._inner_ele._make_absolute(self._inner_ele.attrs['src']) - # elif attr == 'class': - # return ' '.join(self._inner_ele.attrs['class']) - # elif attr == 'text': - # return self.text - # elif attr == 'outerHTML': - # return self.inner_ele.html - # elif attr == 'innerHTML': - # return self.html + return f'{self.page.url}{link}' + elif link.startswith('?'): # 避免当相对url以?开头时丢失参数的bug TODO:测试是否还存在 + if '?' in self.page.url: + return re.sub(r'\?.*', link, self.page.url) + else: + return f'{self.page.url}{link}' + else: + return self._make_absolute(link) + elif attr == 'src': + return self._make_absolute(self.inner_ele.get('src')) + elif attr == 'text': + return self.text + elif attr == 'outerHTML': + return unescape(tostring(self._inner_ele).decode()).replace('\xa0', ' ') + elif attr == 'innerHTML': + return self.html else: return self.inner_ele.get(attr) except: return None + def _make_absolute(self, link): + """生成绝对url""" + parsed = urlparse(link)._asdict() -def execute_session_find(page_or_ele: _Element, + # 相对路径,与页面url拼接并返回 + if not parsed['netloc']: # 相对路径,与 + return urljoin(self.page.url, link) + + # 绝对路径但缺少协议,从页面url获取协议并修复 + if not parsed['scheme']: + parsed['scheme'] = urlparse(self.page.url).scheme + parsed = tuple(v for v in parsed.values()) + return urlunparse(parsed) + + # 绝对路径且不缺协议,直接返回 + return link + + +def execute_session_find(page_or_ele, loc: Tuple[str, str], mode: str = 'single', show_errmsg: bool = False) -> Union[SessionElement, List[SessionElement or str]]: - """执行session模式元素的查找 \n - 页面查找元素及元素查找下级元素皆使用此方法 \n - :param page_or_ele: request_html的页面或元素对象 + """执行session模式元素的查找 \n + 页面查找元素及元素查找下级元素皆使用此方法 \n + :param page_or_ele: SessionPage对象或SessionElement对象 :param loc: 元素定位元组 :param mode: 'single' 或 'all',对应获取第一个或全部 :param show_errmsg: 出现异常时是否显示错误信息 @@ -288,34 +293,26 @@ def execute_session_find(page_or_ele: _Element, if mode not in ['single', 'all']: raise ValueError("Argument mode can only be 'single' or 'all'.") - loc_by, loc_str = loc - # print(loc) - # ele = page_or_ele.xpath(loc_str) - # print(ele) + if isinstance(page_or_ele, SessionElement): + page = page_or_ele.page + page_or_ele = page_or_ele.inner_ele + else: # 传入的是SessionPage对象 + page = page_or_ele + page_or_ele = get_HtmlElement(page_or_ele.response.text) + try: - # ele = None - if loc_by == 'xpath': - ele = page_or_ele.xpath(loc_str) - # if 'PyQuery' in str(type(page_or_ele.element)): - # # 从页面查找。 - # ele = page_or_ele.xpath(loc_str) - # elif 'HtmlElement' in str(type(page_or_ele.element)): - # # 从元素查找。这样区分是为了能找到上级元素 - # try: - # elements = page_or_ele.element.xpath(loc_str) - # ele = [Element(element=e, url=page_or_ele.url) for e in elements] - # except AttributeError: - # ele = page_or_ele.xpath(loc_str) + if loc[0] == 'xpath': + ele = page_or_ele.xpath(loc[1]) else: # 用css selector获取 - ele = page_or_ele.cssselect(loc_str) + ele = page_or_ele.cssselect(loc[1]) if mode == 'single': ele = ele[0] if ele else None - return SessionElement(ele) if isinstance(ele, _Element) else unescape(ele).replace('\xa0', ' ') + return SessionElement(ele, page) if isinstance(ele, HtmlElement) else unescape(ele).replace('\xa0', ' ') elif mode == 'all': ele = filter(lambda x: x != '\n', ele) # 去除元素间换行符 ele = map(lambda x: unescape(x).replace('\xa0', ' ') if isinstance(x, str) else x, ele) # 替换空格 - return [SessionElement(e) if isinstance(e, _Element) else e for e in ele] + return [SessionElement(e, page) if isinstance(e, HtmlElement) else e for e in ele] except: if show_errmsg: print('Element(s) not found.', loc) @@ -323,11 +320,6 @@ def execute_session_find(page_or_ele: _Element, return [] if mode == 'all' else None -def get_HtmlElement(html: str) -> _Element: - # html = f'{html}' - ele_or_page = etree.HTML(html) - # html = etree.tostring(ele_or_page).decode() - # if str(html).startswith('') and str(html).endswith(''): - # html = etree.tostring(ele_or_page)[12:-14].decode() - # ele_or_page = etree.fromstring(html) - return ele_or_page +def get_HtmlElement(html: str) -> HtmlElement: + """从html文本生成元素对象""" + return HTML(html) diff --git a/DrissionPage/session_page.py b/DrissionPage/session_page.py index af71a61..a1a24b5 100644 --- a/DrissionPage/session_page.py +++ b/DrissionPage/session_page.py @@ -12,15 +12,13 @@ from re import search as re_SEARCH from re import sub as re_SUB from time import time, sleep from typing import Union, List, Tuple -from urllib import parse -from urllib.parse import urlparse, quote +from urllib.parse import urlparse, quote, unquote -# from requests_html import HTMLSession, HTMLResponse, Element from requests import Session, Response from .common import get_loc_from_str, translate_loc_to_xpath, get_available_file_name from .config import OptionsManager -from .session_element import SessionElement, execute_session_find, get_HtmlElement +from .session_element import SessionElement, execute_session_find class SessionPage(object): @@ -113,13 +111,10 @@ class SessionPage(object): elif isinstance(loc_or_ele, SessionElement): return loc_or_ele - # elif isinstance(loc_or_ele, Element): - # return SessionElement(loc_or_ele) - else: raise ValueError('Argument loc_or_str can only be tuple, str, SessionElement, Element.') - return execute_session_find(get_HtmlElement(self.response.text), loc_or_ele, mode, show_errmsg) + return execute_session_find(self, loc_or_ele, mode, show_errmsg) def eles(self, loc_or_str: Union[Tuple[str, str], str], @@ -295,7 +290,7 @@ class SessionPage(object): if not file_name: # 找不到则用时间和随机数生成文件名 file_name = f'untitled_{time()}_{randint(0, 100)}' file_name = re_SUB(r'[\\/*:|<>?"]', '', file_name).strip() # 去除非法字符 - file_name = parse.unquote(file_name) + file_name = unquote(file_name) # -------------------重命名文件名------------------- if rename: # 重命名文件,不改变扩展名 @@ -433,10 +428,10 @@ class SessionPage(object): else: charset = headers[content_type[0]].split('=')[1] - if charset: + if charset: # 指定网页编码 r.encoding = charset - if not_stream: # 加载网页时修复编码 - r._content = r.content.replace(b'\x08', b'\\b') # 避免存在退格符导致乱码或解析出错 - # r.html.encoding = r.encoding # 修复requests_html丢失编码方式的bug + if not_stream: # 避免存在退格符导致乱码或解析出错 + r._content = r.content.replace(b'\x08', b'\\b') + return r, 'Success'