diff --git a/DrissionPage/common.py b/DrissionPage/common.py index c062606..d0e950c 100644 --- a/DrissionPage/common.py +++ b/DrissionPage/common.py @@ -96,7 +96,12 @@ def str_to_loc(loc: str) -> tuple: text:search_text - 文本含有search_text的元素 \n text=search_text - 文本等于search_text的元素 \n xpath://div[@class="ele_class"] - 用xpath查找 \n - css:div.ele_class - 用css selector查找 + css:div.ele_class - 用css selector查找 \n + xpath://div[@class="ele_class"] - 等同于 x://div[@class="ele_class"] \n + css:div.ele_class - 等同于 c:div.ele_class \n + tag:div - 等同于 t:div \n + text:search_text - 等同于 tx:search_text \n + text=search_text - 等同于 tx=search_text \n """ loc_by = 'xpath' @@ -107,12 +112,18 @@ def str_to_loc(loc: str) -> tuple: else: loc = loc.replace('.', '@class=', 1) - if loc.startswith('#'): + elif loc.startswith('#'): if loc.startswith(('#=', '#:',)): loc = loc.replace('#', '@id', 1) else: loc = loc.replace('#', '@id=', 1) + elif loc.startswith(('t:', 't=')): + loc = f'tag:{loc[2:]}' + + elif loc.startswith(('tx:', 'tx=')): + loc = f'text{loc[2:]}' + # 根据属性查找 if loc.startswith('@'): r = re_SPLIT(r'([:=])', loc[1:], maxsplit=1) @@ -123,7 +134,7 @@ def str_to_loc(loc: str) -> tuple: loc_str = f'//*[@{loc[1:]}]' # 根据tag name查找 - elif loc.startswith(('tag=', 'tag:')): + elif loc.startswith(('tag:', 'tag=')): if '@' not in loc[4:]: loc_str = f'//*[name()="{loc[4:]}"]' else: @@ -131,13 +142,13 @@ def str_to_loc(loc: str) -> tuple: r = re_SPLIT(r'([:=])', at_lst[1], maxsplit=1) if len(r) == 3: mode = 'exact' if r[1] == '=' else 'fuzzy' - arg_str = r[0] if r[0] == 'text()' else f'@{r[0]}' + arg_str = 'text()' if r[0] in ('text()', 'tx()') else f'@{r[0]}' loc_str = _make_xpath_str(at_lst[0], arg_str, r[2], mode) else: loc_str = f'//*[name()="{at_lst[0]}" and @{r[0]}]' # 根据文本查找 - elif loc.startswith(('text=', 'text:')): + elif loc.startswith(('text:', 'text=')): if len(loc) > 5: mode = 'exact' if loc[4] == '=' else 'fuzzy' loc_str = _make_xpath_str('*', 'text()', loc[5:], mode) @@ -145,13 +156,18 @@ def str_to_loc(loc: str) -> tuple: loc_str = '//*[not(text())]' # 用xpath查找 - elif loc.startswith(('xpath=', 'xpath:')): + elif loc.startswith(('xpath:', 'xpath=')): loc_str = loc[6:] + elif loc.startswith(('x:', 'x=')): + loc_str = loc[2:] # 用css selector查找 - elif loc.startswith(('css=', 'css:')): + elif loc.startswith(('css:', 'css=')): loc_by = 'css selector' loc_str = loc[4:] + elif loc.startswith(('c:', 'c=')): + loc_by = 'css selector' + loc_str = loc[2:] # 根据文本模糊查找 else: @@ -177,14 +193,18 @@ def _make_xpath_str(tag: str, arg: str, val: str, mode: str = 'fuzzy') -> str: return f'//*[{tag_name}{arg}={_make_search_str(val)}]' elif mode == 'fuzzy': - return f"//*[{tag_name}contains({arg},{_make_search_str(val)})]" + if arg == 'text()': + tag_name = '' if tag == '*' else f'{tag}/' + return f'//{tag_name}text()[contains(., {_make_search_str(val)})]/..' + else: + return f"//*[{tag_name}contains({arg},{_make_search_str(val)})]" else: raise ValueError("Argument mode can only be 'exact' or 'fuzzy'.") def _make_search_str(search_str: str) -> str: - """将"转义,不知何故不能直接用\来转义 \n + """将"转义,不知何故不能直接用 \ 来转义 \n :param search_str: 查询字符串 :return: 把"转义后的字符串 """ @@ -201,9 +221,15 @@ def _make_search_str(search_str: str) -> str: return search_str -def format_html(text: str) -> str: +def format_html(text: str, trans: bool = True) -> str: """处理html编码字符""" - return unescape(text).replace('\xa0', ' ') if text else text + if not text: + return text + + if trans: + text = unescape(text) + + return text.replace('\xa0', ' ') def translate_loc(loc: tuple) -> tuple: @@ -291,3 +317,25 @@ def unzip(zip_path: str, to_path: str) -> Union[list, None]: with ZipFile(zip_path, 'r') as f: return [f.extract(f.namelist()[0], path=to_path)] + + +def get_exe_path_from_port(port: Union[str, int]) -> Union[str, None]: + """获取端口号第一条进程的可执行文件路径 \n + :param port: 端口号 + :return: 可执行文件的绝对路径 + """ + from os import popen + from time import perf_counter + process = popen(f'netstat -ano |findstr {port}').read().split('\n')[0] + t = perf_counter() + + while not process and perf_counter() - t < 10: + process = popen(f'netstat -ano |findstr {port}').read().split('\n')[0] + + processid = process[process.rfind(' ') + 1:] + + if not processid: + return + else: + file_lst = popen(f'wmic process where processid={processid} get executablepath').read().split('\n') + return file_lst[2].strip() if len(file_lst) > 2 else None diff --git a/DrissionPage/config.py b/DrissionPage/config.py index 17fccd0..06c1f6a 100644 --- a/DrissionPage/config.py +++ b/DrissionPage/config.py @@ -8,11 +8,10 @@ from configparser import RawConfigParser, NoSectionError, NoOptionError from http.cookiejar import Cookie from pathlib import Path -from typing import Any, Union - from requests.cookies import RequestsCookieJar from selenium import webdriver from selenium.webdriver.chrome.options import Options +from typing import Any, Union class OptionsManager(object): @@ -22,7 +21,7 @@ class OptionsManager(object): """初始化,读取配置文件,如没有设置临时文件夹,则设置并新建 \n :param path: ini文件的路径,默认读取模块文件夹下的 """ - self.ini_path = path or str(Path(__file__).parent / 'configs.ini') + self.ini_path = str(Path(__file__).parent / 'configs.ini') if path == 'default' or path is None else path self._conf = RawConfigParser() self._conf.read(self.ini_path, encoding='utf-8') @@ -399,7 +398,7 @@ class SessionOptions(object): path = path / 'config.ini' if path.is_dir() else path if path.exists(): - om = OptionsManager(path) + om = OptionsManager(str(path)) else: om = OptionsManager(self.ini_path or str(Path(__file__).parent / 'configs.ini')) @@ -466,7 +465,7 @@ class DriverOptions(Options): path = path / 'config.ini' if path.is_dir() else path if path.exists(): - om = OptionsManager(path) + om = OptionsManager(str(path)) else: om = OptionsManager(self.ini_path or str(Path(__file__).parent / 'configs.ini')) @@ -667,8 +666,7 @@ def _dict_to_chrome_options(options: dict) -> Options: for i in options['experimental_options']: chrome_options.add_experimental_option(i, options['experimental_options'][i]) - # if options.get('capabilities' ,None): - # pass # 未知怎么用 + return chrome_options @@ -683,8 +681,9 @@ def _chrome_options_to_dict(options: Union[dict, DriverOptions, Options, None]) re_dict = dict() attrs = ['debugger_address', 'binary_location', 'arguments', 'extensions', 'experimental_options', 'driver_path'] + options_dir = options.__dir__() for attr in attrs: - re_dict[attr] = options.__getattribute__(f'_{attr}') + re_dict[attr] = options.__getattribute__(f'_{attr}') if attr in options_dir else None return re_dict @@ -739,10 +738,11 @@ def _cookie_to_dict(cookie: Union[Cookie, str, dict]) -> dict: attr_val = attr.lstrip().split('=') if key == 0: + # TODO: 检查 cookie_dict['name'] = attr_val[0] - cookie_dict['value'] = attr_val[1] + cookie_dict['value'] = attr_val[1] if len(attr_val) == 2 else '' else: - cookie_dict[attr_val[0]] = attr_val[1] + cookie_dict[attr_val[0]] = attr_val[1] if len(attr_val) == 2 else '' return cookie_dict diff --git a/DrissionPage/drission.py b/DrissionPage/drission.py index 6930023..2b4904c 100644 --- a/DrissionPage/drission.py +++ b/DrissionPage/drission.py @@ -4,16 +4,15 @@ @Contact : g1879@qq.com @File : drission.py """ -from sys import exit -from typing import Union - from requests import Session from requests.cookies import RequestsCookieJar from selenium import webdriver from selenium.common.exceptions import SessionNotCreatedException, WebDriverException from selenium.webdriver.chrome.options import Options from selenium.webdriver.chrome.webdriver import WebDriver +from sys import exit from tldextract import extract +from typing import Union from .config import (_dict_to_chrome_options, _session_options_to_dict, SessionOptions, DriverOptions, _chrome_options_to_dict, OptionsManager, _cookies_to_tuple) @@ -35,6 +34,7 @@ class Drission(object): """ self._session = None self._driver = None + self._debugger = None self._proxy = proxy om = OptionsManager(ini_path) if session_or_options is None or driver_or_options is None else None @@ -83,37 +83,38 @@ class Drission(object): 如设置了本地调试浏览器,可自动接入或打开浏览器进程。 """ if self._driver is None: - if isinstance(self._driver_options, dict): - options = _dict_to_chrome_options(self._driver_options) - else: + if not isinstance(self._driver_options, dict): raise TypeError('Driver options invalid') - if self._proxy: + options = _dict_to_chrome_options(self._driver_options) + + if not self._driver_options.get('debugger_address', None) and self._proxy: options.add_argument(f'--proxy-server={self._proxy["http"]}') driver_path = self._driver_options.get('driver_path', None) or 'chromedriver' + chrome_path = self._driver_options.get('binary_location', None) or 'chrome.exe' + # -----------若指定debug端口且该端口未在使用中,则先启动浏览器进程----------- + if options.debugger_address and _check_port(options.debugger_address) is False: + from subprocess import Popen + port = options.debugger_address[options.debugger_address.rfind(':') + 1:] + + # 启动浏览器进程,同时返回该进程使用的 chrome.exe 路径 + chrome_path, self._debugger = _create_chrome(chrome_path, port, + self._driver_options['arguments'], self._proxy) + + # -----------创建WebDriver对象----------- + self._driver = _create_driver(chrome_path, driver_path, options) + + # 反反爬设置 try: - if options.debugger_address and _check_port(options.debugger_address) is False: - from subprocess import Popen - port = options.debugger_address.split(':')[-1] - chrome_path = self._driver_options.get('binary_location', None) or 'chrome.exe' - Popen(f'{chrome_path} --remote-debugging-port={port}', shell=False) + self._driver.execute_script('Object.defineProperty(navigator,"webdriver",{get:() => Chrome,});') + except: + pass - self._driver = webdriver.Chrome(driver_path, options=options) - - except (WebDriverException, SessionNotCreatedException): - print('未指定chromedriver路径或版本与Chrome不匹配,可执行easy_set.get_match_driver()自动下载匹配的版本。') - exit(0) - - # 反爬设置,似乎没用 - self._driver.execute_cdp_cmd("Page.addScriptToEvaluateOnNewDocument", { - "source": """ - Object.defineProperty(navigator, 'webdriver', { - get: () => Chrome - }) - """ - }) + # self._driver.execute_cdp_cmd( + # 'Page.addScriptToEvaluateOnNewDocument', + # {'source': 'Object.defineProperty(navigator,"webdriver",{get:() => Chrome,});'}) return self._driver @@ -163,6 +164,41 @@ class Drission(object): for cookie in cookies: self.set_cookies(cookie, set_driver=True) + @property + def debugger_progress(self): + """调试浏览器进程""" + return self._debugger + + def kill_browser(self) -> None: + """关闭浏览器进程(如果可以)""" + if self.debugger_progress: + self.debugger_progress.kill() + return + + address = self.driver_options.get('debugger_address', '').split(':') + if len(address) == 1: + self.close_driver() + + elif len(address) == 2: + ip, port = address + if ip not in ('127.0.0.1', 'localhost') or not port.isdigit(): + return + + from os import popen + progresses = popen(f'netstat -nao | findstr :{port}').read().split('\n') + txt = '' + for progress in progresses: + if 'LISTENING' in progress: + txt = progress + break + + if not txt: + return + + pid = txt[txt.rfind(' ') + 1:] + if popen(f'tasklist | findstr {pid}').read().lower().startswith('chrome.exe'): + popen(f'taskkill /pid {pid} /F') + def set_cookies(self, cookies: Union[RequestsCookieJar, list, tuple, str, dict], set_session: bool = False, @@ -181,7 +217,8 @@ class Drission(object): # 添加cookie到session if set_session: - kwargs = {x: cookie[x] for x in cookie if x not in ('name', 'value', 'httpOnly', 'expiry')} + kwargs = {x: cookie[x] for x in cookie + if x.lower() not in ('name', 'value', 'httponly', 'expiry', 'samesite')} if 'expiry' in cookie: kwargs['expires'] = cookie['expiry'] @@ -214,9 +251,19 @@ class Drission(object): self.driver.get(cookie_domain if cookie_domain.startswith('http://') else f'http://{cookie_domain}') + # 避免selenium自动添加.后无法正确覆盖已有cookie + if cookie['domain'][0] != '.': + c = self.driver.get_cookie(cookie['name']) + if c and c['domain'] == cookie['domain']: + self.driver.delete_cookie(cookie['name']) + self.driver.add_cookie(cookie) def _set_session(self, data: dict) -> None: + """根据传入字典对session进行设置 \n + :param data: session配置字典 + :return: None + """ if self._session is None: self._session = Session() @@ -303,7 +350,7 @@ class Drission(object): def _check_port(debugger_address: str) -> Union[bool, None]: - """检查端口是否可用 \n + """检查端口是否被占用 \n :param debugger_address: 浏览器地址及端口 :return: bool """ @@ -322,3 +369,89 @@ def _check_port(debugger_address: str) -> Union[bool, None]: return True except socket.error: return False + finally: + if s: + s.close() + + +def _create_chrome(chrome_path: str, port: str, args: list, proxy: dict) -> tuple: + """创建 chrome 进程 \n + :param chrome_path: chrome.exe 路径 + :param port: 进程运行的端口号 + :param args: chrome 配置参数 + :return: chrome.exe 路径和进程对象组成的元组 + """ + from subprocess import Popen + + # ----------为路径加上双引号,避免路径中的空格产生异常---------- + args1 = [] + for arg in args: + if arg.startswith(('--user-data-dir', '--disk-cache-dir')): + index = arg.find('=') + 1 + args1.append(f'{arg[:index]}"{arg[index:].strip()}"') + else: + args1.append(arg) + + args = ' '.join(set(args1)) + + if proxy: + args = f'{args} --proxy-server={proxy["http"]}' + + # ----------创建浏览器进程---------- + try: + debugger = Popen(f'{chrome_path} --remote-debugging-port={port} {args}', shell=False) + + if chrome_path == 'chrome.exe': + from common import get_exe_path_from_port + chrome_path = get_exe_path_from_port(port) + + # 传入的路径找不到,主动在ini文件、注册表、系统变量中找 + except FileNotFoundError: + from DrissionPage.easy_set import _get_chrome_path + chrome_path = _get_chrome_path(show_msg=False) + + if not chrome_path: + raise FileNotFoundError('无法找到chrome.exe路径,请手动配置。') + + debugger = Popen(f'"{chrome_path}" --remote-debugging-port={port} {args}', shell=False) + + return chrome_path, debugger + + +def _create_driver(chrome_path: str, driver_path: str, options: Options) -> WebDriver: + """创建 WebDriver 对象 \n + :param chrome_path: chrome.exe 路径 + :param driver_path: chromedriver.exe 路径 + :param options: Options 对象 + :return: WebDriver 对象 + """ + try: + return webdriver.Chrome(driver_path, options=options) + + # 若版本不对,获取对应 chromedriver 再试 + except (WebDriverException, SessionNotCreatedException): + from .easy_set import get_match_driver + chrome_path = None if chrome_path == 'chrome.exe' else chrome_path + driver_path = get_match_driver(chrome_path=chrome_path, check_version=False, show_msg=False) + + if driver_path: + try: + return webdriver.Chrome(driver_path, options=options) + except: + pass + + # 当找不到 driver 且 chrome_path 为 None 时,说明安装的版本过高,改在系统路径中查找 + elif chrome_path is None and driver_path is None: + from DrissionPage.easy_set import _get_chrome_path + chrome_path = _get_chrome_path(show_msg=False, from_ini=False, from_regedit=False) + driver_path = get_match_driver(chrome_path=chrome_path, check_version=False, show_msg=False) + + if driver_path: + options.binary_location = chrome_path + try: + return webdriver.Chrome(driver_path, options=options) + except: + pass + + print('无法启动,请检查chromedriver版本与Chrome是否匹配,并手动设置。') + exit(0) diff --git a/DrissionPage/driver_element.py b/DrissionPage/driver_element.py index 68f5a9c..f8c6e1c 100644 --- a/DrissionPage/driver_element.py +++ b/DrissionPage/driver_element.py @@ -4,15 +4,15 @@ @Contact : g1879@qq.com @File : driver_element.py """ +import re from pathlib import Path -from time import sleep -from typing import Union, List, Any, Tuple - from selenium.common.exceptions import TimeoutException, JavascriptException, InvalidElementStateException from selenium.webdriver.chrome.webdriver import WebDriver from selenium.webdriver.remote.webelement import WebElement from selenium.webdriver.support import expected_conditions as ec from selenium.webdriver.support.wait import WebDriverWait +from time import sleep +from typing import Union, List, Any, Tuple from .common import DrissionElement, str_to_loc, get_available_file_name, translate_loc, format_html @@ -22,6 +22,7 @@ class DriverElement(DrissionElement): def __init__(self, ele: WebElement, page=None): super().__init__(ele, page) + self._select = None def __repr__(self): attrs = [f"{attr}='{self.attrs[attr]}'" for attr in self.attrs] @@ -54,7 +55,7 @@ class DriverElement(DrissionElement): @property def tag(self) -> str: """返回元素类型""" - return self._inner_ele.tag_name + return self._inner_ele.tag_name.lower() @property def attrs(self) -> dict: @@ -78,7 +79,17 @@ class DriverElement(DrissionElement): @property def text(self) -> str: """返回元素内所有文本""" - return self.attr('innerText') + # return format_html(self.inner_ele.get_attribute('innerText'), False) + re_str = self.inner_ele.get_attribute('innerText') + re_str = re.sub(r'\n{2,}', '\n', re_str) + re_str = re.sub(r' {2,}', ' ', re_str) + + return format_html(re_str.strip('\n '), False) + + @property + def raw_text(self) -> str: + """返回未格式化处理的元素内文本""" + return self.inner_ele.get_attribute('innerText') @property def link(self) -> str: @@ -109,6 +120,11 @@ class DriverElement(DrissionElement): """返回前一个兄弟元素""" return self._get_brother(1, 'ele', 'prev') + @property + def comments(self) -> list: + """返回元素注释文本组成的列表""" + return self.eles('xpath:.//comment()') + # -----------------driver独占属性------------------- @property def size(self) -> dict: @@ -128,6 +144,11 @@ class DriverElement(DrissionElement): from .shadow_root_element import ShadowRootElement return ShadowRootElement(shadow, self) + @property + def sr(self): + """返回当前元素的shadow_root元素对象""" + return self.shadow_root + @property def before(self) -> str: """返回当前元素的::before伪元素内容""" @@ -138,16 +159,30 @@ class DriverElement(DrissionElement): """返回当前元素的::after伪元素内容""" return self.get_style_property('content', 'after') + @property + def select(self): + """返回专门处理下拉列表的Select类,非下拉列表元素返回False""" + if self._select is None: + if self.tag != 'select': + self._select = False + else: + self._select = Select(self) + + return self._select + # -----------------共有函数------------------- + def texts(self, text_node_only: bool = False) -> list: """返回元素内所有直接子节点的文本,包括元素和文本节点 \n :param text_node_only: 是否只返回文本节点 :return: 文本列表 """ if text_node_only: - return self.eles('xpath:./text()') + texts = self.eles('xpath:/text()') else: - return [x if isinstance(x, str) else x.text for x in self.eles('xpath:./node()')] + texts = [x if isinstance(x, str) else x.text for x in self.eles('xpath:./text() | *')] + + return [x.strip(' ') for x in texts if x and x.replace('\n', '').replace('\t', '').replace(' ', '') != ''] def parents(self, num: int = 1): """返回上面第num级父元素 \n @@ -155,7 +190,7 @@ class DriverElement(DrissionElement): :return: DriverElement对象 """ loc = 'xpath', f'.{"/.." * num}' - return self.ele(loc, timeout=0.1) + return self.ele(loc, timeout=0) def nexts(self, num: int = 1, mode: str = 'ele'): """返回后面第num个兄弟元素或节点文本 \n @@ -178,7 +213,10 @@ class DriverElement(DrissionElement): :param attr: 属性名 :return: 属性值文本 """ - attr = 'innerText' if attr == 'text' else attr + # attr = 'innerText' if attr == 'text' else attr + if attr in ('text', 'innerText'): + return self.text + return format_html(self.inner_ele.get_attribute(attr)) def ele(self, @@ -188,27 +226,34 @@ class DriverElement(DrissionElement): """返回当前元素下级符合条件的子元素、属性或节点文本,默认返回第一个 \n 示例: \n - 用loc元组查找: \n - ele.ele((By.CLASS_NAME, 'ele_class')) - 返回第一个class为ele_class的子元素 \n + ele.ele((By.CLASS_NAME, 'ele_class')) - 返回第一个class为ele_class的子元素 \n - 用查询字符串查找: \n - 查找方式:属性、tag name和属性、文本、xpath、css selector、id、class \n - @表示属性,.表示class,#表示id,=表示精确匹配,:表示模糊匹配,无控制字符串时默认搜索该字符串 \n + 查找方式:属性、tag name和属性、文本、xpath、css selector、id、class \n + @表示属性,.表示class,#表示id,=表示精确匹配,:表示模糊匹配,无控制字符串时默认搜索该字符串 \n ele.ele('.ele_class') - 返回第一个 class 为 ele_class 的子元素 \n ele.ele('.:ele_class') - 返回第一个 class 中含有 ele_class 的子元素 \n - ele.ele('#ele_id') - 返回第一个 id 为 ele_id 的子元素 \n + ele.ele('#ele_id') - 返回第一个 id 为 ele_id 的子元素 \n ele.ele('#:ele_id') - 返回第一个 id 中含有 ele_id 的子元素 \n - ele.ele('@class:ele_class') - 返回第一个class含有ele_class的子元素 \n - ele.ele('@name=ele_name') - 返回第一个name等于ele_name的子元素 \n - ele.ele('@placeholder') - 返回第一个带placeholder属性的子元素 \n - ele.ele('tag:p') - 返回第一个
子元素 \n - ele.ele('tag:div@class:ele_class') - 返回第一个class含有ele_class的div子元素 \n - ele.ele('tag:div@class=ele_class') - 返回第一个class等于ele_class的div子元素 \n - ele.ele('tag:div@text():some_text') - 返回第一个文本含有some_text的div子元素 \n - ele.ele('tag:div@text()=some_text') - 返回第一个文本等于some_text的div子元素 \n - ele.ele('text:some_text') - 返回第一个文本含有some_text的子元素 \n - ele.ele('some_text') - 返回第一个文本含有some_text的子元素(等价于上一行) \n - ele.ele('text=some_text') - 返回第一个文本等于some_text的子元素 \n - ele.ele('xpath://div[@class="ele_class"]') - 返回第一个符合xpath的子元素 \n - ele.ele('css:div.ele_class') - 返回第一个符合css selector的子元素 \n + ele.ele('@class:ele_class') - 返回第一个class含有ele_class的子元素 \n + ele.ele('@name=ele_name') - 返回第一个name等于ele_name的子元素 \n + ele.ele('@placeholder') - 返回第一个带placeholder属性的子元素 \n + ele.ele('tag:p') - 返回第一个
子元素 \n + ele.ele('tag:div@class:ele_class') - 返回第一个class含有ele_class的div子元素 \n + ele.ele('tag:div@class=ele_class') - 返回第一个class等于ele_class的div子元素 \n + ele.ele('tag:div@text():some_text') - 返回第一个文本含有some_text的div子元素 \n + ele.ele('tag:div@text()=some_text') - 返回第一个文本等于some_text的div子元素 \n + ele.ele('text:some_text') - 返回第一个文本含有some_text的子元素 \n + ele.ele('some_text') - 返回第一个文本含有some_text的子元素(等价于上一行) \n + ele.ele('text=some_text') - 返回第一个文本等于some_text的子元素 \n + ele.ele('xpath://div[@class="ele_class"]') - 返回第一个符合xpath的子元素 \n + ele.ele('css:div.ele_class') - 返回第一个符合css selector的子元素 \n + - 查询字符串还有最精简模式,用x代替xpath、c代替css、t代替tag、tx代替text: \n + ele.ele('x://div[@class="ele_class"]') - 等同于 ele.ele('xpath://div[@class="ele_class"]') \n + ele.ele('c:div.ele_class') - 等同于 ele.ele('css:div.ele_class') \n + ele.ele('t:div') - 等同于 ele.ele('tag:div') \n + ele.ele('t:div@tx()=some_text') - 等同于 ele.ele('tag:div@text()=some_text') \n + ele.ele('tx:some_text') - 等同于 ele.ele('text:some_text') \n + ele.ele('tx=some_text') - 等同于 ele.ele('text=some_text') :param loc_or_str: 元素的定位信息,可以是loc元组,或查询字符串 :param mode: 'single' 或 'all',对应查找一个或全部 :param timeout: 查找元素超时时间 @@ -241,30 +286,37 @@ class DriverElement(DrissionElement): def eles(self, loc_or_str: Union[Tuple[str, str], str], timeout: float = None): - """返回当前元素下级所有符合条件的子元素、属性或节点文本 \n - 示例: \n - - 用loc元组查找: \n - ele.eles((By.CLASS_NAME, 'ele_class')) - 返回所有class为ele_class的子元素 \n - - 用查询字符串查找: \n - 查找方式:属性、tag name和属性、文本、xpath、css selector、id、class \n - @表示属性,.表示class,#表示id,=表示精确匹配,:表示模糊匹配,无控制字符串时默认搜索该字符串 \n - ele.eles('.ele_class') - 返回所有 class 为 ele_class 的子元素 \n - ele.eles('.:ele_class') - 返回所有 class 中含有 ele_class 的子元素 \n - ele.eles('#ele_id') - 返回所有 id 为 ele_id 的子元素 \n - ele.eles('#:ele_id') - 返回所有 id 中含有 ele_id 的子元素 \n - ele.eles('@class:ele_class') - 返回所有class含有ele_class的子元素 \n - ele.eles('@name=ele_name') - 返回所有name等于ele_name的子元素 \n - ele.eles('@placeholder') - 返回所有带placeholder属性的子元素 \n - ele.eles('tag:p') - 返回所有
子元素 \n - ele.eles('tag:div@class:ele_class') - 返回所有class含有ele_class的div子元素 \n - ele.eles('tag:div@class=ele_class') - 返回所有class等于ele_class的div子元素 \n - ele.eles('tag:div@text():some_text') - 返回所有文本含有some_text的div子元素 \n - ele.eles('tag:div@text()=some_text') - 返回所有文本等于some_text的div子元素 \n - ele.eles('text:some_text') - 返回所有文本含有some_text的子元素 \n - ele.eles('some_text') - 返回所有文本含有some_text的子元素(等价于上一行) \n - ele.eles('text=some_text') - 返回所有文本等于some_text的子元素 \n - ele.eles('xpath://div[@class="ele_class"]') - 返回所有符合xpath的子元素 \n - ele.eles('css:div.ele_class') - 返回所有符合css selector的子元素 \n + """返回当前元素下级所有符合条件的子元素、属性或节点文本 \n + 示例: \n + - 用loc元组查找: \n + ele.eles((By.CLASS_NAME, 'ele_class')) - 返回所有class为ele_class的子元素 \n + - 用查询字符串查找: \n + 查找方式:属性、tag name和属性、文本、xpath、css selector、id、class \n + @表示属性,.表示class,#表示id,=表示精确匹配,:表示模糊匹配,无控制字符串时默认搜索该字符串 \n + ele.eles('.ele_class') - 返回所有 class 为 ele_class 的子元素 \n + ele.eles('.:ele_class') - 返回所有 class 中含有 ele_class 的子元素 \n + ele.eles('#ele_id') - 返回所有 id 为 ele_id 的子元素 \n + ele.eles('#:ele_id') - 返回所有 id 中含有 ele_id 的子元素 \n + ele.eles('@class:ele_class') - 返回所有class含有ele_class的子元素 \n + ele.eles('@name=ele_name') - 返回所有name等于ele_name的子元素 \n + ele.eles('@placeholder') - 返回所有带placeholder属性的子元素 \n + ele.eles('tag:p') - 返回所有
子元素 \n
+ ele.eles('tag:div@class:ele_class') - 返回所有class含有ele_class的div子元素 \n
+ ele.eles('tag:div@class=ele_class') - 返回所有class等于ele_class的div子元素 \n
+ ele.eles('tag:div@text():some_text') - 返回所有文本含有some_text的div子元素 \n
+ ele.eles('tag:div@text()=some_text') - 返回所有文本等于some_text的div子元素 \n
+ ele.eles('text:some_text') - 返回所有文本含有some_text的子元素 \n
+ ele.eles('some_text') - 返回所有文本含有some_text的子元素(等价于上一行) \n
+ ele.eles('text=some_text') - 返回所有文本等于some_text的子元素 \n
+ ele.eles('xpath://div[@class="ele_class"]') - 返回所有符合xpath的子元素 \n
+ ele.eles('css:div.ele_class') - 返回所有符合css selector的子元素 \n
+ - 查询字符串还有最精简模式,用x代替xpath、c代替css、t代替tag、tx代替text: \n
+ ele.eles('x://div[@class="ele_class"]') - 等同于 ele.eles('xpath://div[@class="ele_class"]') \n
+ ele.eles('c:div.ele_class') - 等同于 ele.eles('css:div.ele_class') \n
+ ele.eles('t:div') - 等同于 ele.eles('tag:div') \n
+ ele.eles('t:div@tx()=some_text') - 等同于 ele.eles('tag:div@text()=some_text') \n
+ ele.eles('tx:some_text') - 等同于 ele.eles('text:some_text') \n
+ ele.eles('tx=some_text') - 等同于 ele.eles('text=some_text')
:param loc_or_str: 元素的定位信息,可以是loc元组,或查询字符串
:param timeout: 查找元素超时时间
:return: DriverElement对象组成的列表
@@ -285,14 +337,14 @@ class DriverElement(DrissionElement):
return None if r == 'none' else r
- def click(self, by_js=None) -> bool:
+ def click(self, by_js: bool = None) -> bool:
"""点击元素 \n
- 尝试点击10次,若都失败就改用js点击 \n
+ 尝试点击3次,若都失败就改用js点击 \n
:param by_js: 是否用js点击,为True时直接用js点击,为False时重试失败也不会改用js
:return: 是否点击成功
"""
if not by_js:
- for _ in range(10):
+ for _ in range(3):
try:
self.inner_ele.click()
return True
@@ -306,17 +358,45 @@ class DriverElement(DrissionElement):
return False
- def input(self, value: str, clear: bool = True) -> bool:
- """输入文本 \n
- :param value: 文本值
+ def click_at(self, x: Union[int, str] = None, y: Union[int, str] = None, by_js=False) -> None:
+ """带偏移量点击本元素,相对于左上角坐标。不传入x或y值时点击元素中点 \n
+ :param x: 相对元素左上角坐标的x轴偏移量
+ :param y: 相对元素左上角坐标的y轴偏移量
+ :param by_js: 是否用js点击
+ :return: None
+ """
+ if by_js:
+ x = self.location['x'] + int(x) if x is not None else self.location['x'] + self.size['width'] // 2
+ y = self.location['y'] + int(y) if y is not None else self.location['y'] + self.size['height'] // 2
+ js = f"""
+ var ev = document.createEvent('HTMLEvents');
+ ev.clientX = {x};
+ ev.clientY = {y};
+ ev.initEvent('click', false, true);
+ arguments[0].dispatchEvent(ev);
+ """
+ self.run_script(js)
+
+ else:
+ x = int(x) if x is not None else self.size['width'] // 2
+ y = int(y) if y is not None else self.size['height'] // 2
+
+ from selenium.webdriver import ActionChains
+ ActionChains(self.page.driver).move_to_element_with_offset(self.inner_ele, x, y).click().perform()
+
+ def input(self, value: Union[str, tuple], clear: bool = True) -> bool:
+ """输入文本或组合键 \n
+ :param value: 文本值或按键组合
:param clear: 输入前是否清空文本框
:return: 是否输入成功
"""
try:
if clear:
self.clear()
- self.inner_ele.send_keys(value)
+
+ self.inner_ele.send_keys(*value)
return True
+
except Exception as e:
print(e)
return False
@@ -367,7 +447,8 @@ class DriverElement(DrissionElement):
name = filename or self.tag
path = Path(path).absolute()
path.mkdir(parents=True, exist_ok=True)
- name = get_available_file_name(str(path), f'{name}.png')
+ name = f'{name}.png' if not name.endswith('.png') else name
+ name = get_available_file_name(str(path), name)
# 等待元素加载完成
if self.tag == 'img':
@@ -381,21 +462,6 @@ class DriverElement(DrissionElement):
return img_path
- def select(self, text: str) -> bool:
- """选择下拉列表中子元素 \n
- :param text: 要选择的文本
- :return: 是否选择成功
- """
- from selenium.webdriver.support.select import Select
- ele = Select(self.inner_ele)
-
- try:
- ele.select_by_visible_text(text)
- return True
- except Exception as e:
- print(e)
- return False
-
def set_attr(self, attr: str, value: str) -> bool:
"""设置元素属性 \n
:param attr: 属性名
@@ -403,7 +469,7 @@ class DriverElement(DrissionElement):
:return: 是否设置成功
"""
try:
- self.run_script(f"arguments[0].{attr} = '{value}';")
+ self.run_script(f"arguments[0].setAttribute(arguments[1], arguments[2]);", attr, value)
return True
except:
return False
@@ -493,12 +559,14 @@ class DriverElement(DrissionElement):
if(nth>1){path = '/' + tag + '[' + nth + ']' + path;}
else{path = '/' + tag + path;}'''
txt5 = '''return path;'''
+
elif mode == 'css':
txt1 = ''
# txt2 = '''return '#' + el.id + path;'''
txt3 = ''
txt4 = '''path = '>' + ":nth-child(" + nth + ")" + path;'''
txt5 = '''return path.substr(1);'''
+
else:
raise ValueError(f"Argument mode can only be 'xpath' or 'css', not '{mode}'.")
@@ -514,7 +582,6 @@ class DriverElement(DrissionElement):
sib = sib.previousSibling;
}
''' + txt4 + '''
-
el = el.parentNode;
}
''' + txt5 + '''
@@ -548,13 +615,15 @@ class DriverElement(DrissionElement):
else:
raise ValueError(f"Argument direction can only be 'next' or 'prev', not '{direction}'.")
+ timeout = 0 if direction == 'prev' else .5
+
# 获取节点
- ele_or_node = self.ele(f'xpath:./{direction_txt}-sibling::{node_txt}[{num}]', timeout=0.1)
+ ele_or_node = self.ele(f'xpath:./{direction_txt}-sibling::{node_txt}[{num}]', timeout=timeout)
# 跳过元素间的换行符
- while ele_or_node == '\n':
+ while isinstance(ele_or_node, str) and ele_or_node.replace('\n', '').replace('\t', '').replace(' ', '') == '':
num += 1
- ele_or_node = self.ele(f'xpath:./{direction_txt}-sibling::{node_txt}[{num}]', timeout=0.1)
+ ele_or_node = self.ele(f'xpath:./{direction_txt}-sibling::{node_txt}[{num}]', timeout=timeout)
return ele_or_node
@@ -572,7 +641,7 @@ def execute_driver_find(page_or_ele,
:return: 返回DriverElement元素或它们组成的列表
"""
mode = mode or 'single'
- if mode not in ['single', 'all']:
+ if mode not in ('single', 'all'):
raise ValueError(f"Argument mode can only be 'single' or 'all', not '{mode}'.")
if isinstance(page_or_ele, DrissionElement):
@@ -582,15 +651,19 @@ def execute_driver_find(page_or_ele,
page = page_or_ele
driver = page_or_ele.driver
- try:
- if timeout and timeout != page.timeout:
- wait = WebDriverWait(driver, timeout=timeout)
- else:
- page.wait._driver = driver
- wait = page.wait
+ # 设置等待对象
+ if timeout is not None and timeout != page.timeout:
+ wait = WebDriverWait(driver, timeout=timeout)
+ else:
+ page.wait._driver = driver
+ wait = page.wait
+ try:
+ # 使用xpath查找
if loc[0] == 'xpath':
return wait.until(ElementsByXpath(page, loc[1], mode, timeout))
+
+ # 使用css selector查找
else:
if mode == 'single':
return DriverElement(wait.until(ec.presence_of_element_located(loc)), page)
@@ -627,7 +700,7 @@ class ElementsByXpath(object):
"""用js通过xpath获取元素、节点或属性
:param node: 'document' 或 元素对象
:param xpath_txt: xpath语句
- :param type_txt: resultType,参考https://developer.mozilla.org/zh-CN/docs/Web/API/Document/evaluate
+ :param type_txt: resultType,参考 https://developer.mozilla.org/zh-CN/docs/Web/API/Document/evaluate
:return: 元素对象或属性、文本字符串
"""
node_txt = 'document' if not node or node == 'document' else 'arguments[0]'
@@ -638,6 +711,7 @@ class ElementsByXpath(object):
return_txt = '''
if(e.singleNodeValue.constructor.name=="Text"){return e.singleNodeValue.data;}
else if(e.singleNodeValue.constructor.name=="Attr"){return e.singleNodeValue.nodeValue;}
+ else if(e.singleNodeValue.constructor.name=="Comment"){return e.singleNodeValue.nodeValue;}
else{return e.singleNodeValue;}
'''
@@ -648,6 +722,7 @@ class ElementsByXpath(object):
for(var i = 0; i 元素 \n
- page.ele('tag:div@class:ele_class') - 返回第一个class含有ele_class的div元素 \n
- page.ele('tag:div@class=ele_class') - 返回第一个class等于ele_class的div元素 \n
- page.ele('tag:div@text():some_text') - 返回第一个文本含有some_text的div元素 \n
- page.ele('tag:div@text()=some_text') - 返回第一个文本等于some_text的div元素 \n
- page.ele('text:some_text') - 返回第一个文本含有some_text的元素 \n
- page.ele('some_text') - 返回第一个文本含有some_text的元素(等价于上一行) \n
- page.ele('text=some_text') - 返回第一个文本等于some_text的元素 \n
- page.ele('xpath://div[@class="ele_class"]') - 返回第一个符合xpath的元素 \n
- page.ele('css:div.ele_class') - 返回第一个符合css selector的元素 \n
+ """返回页面中符合条件的元素、属性或节点文本,默认返回第一个 \n
+ 示例: \n
+ - 接收到元素对象时: \n
+ 返回元素对象对象 \n
+ - 用loc元组查找: \n
+ ele.ele((By.CLASS_NAME, 'ele_class')) - 返回第一个class为ele_class的子元素 \n
+ - 用查询字符串查找: \n
+ 查找方式:属性、tag name和属性、文本、xpath、css selector、id、class \n
+ @表示属性,.表示class,#表示id,=表示精确匹配,:表示模糊匹配,无控制字符串时默认搜索该字符串 \n
+ page.ele('.ele_class') - 返回第一个 class 为 ele_class 的元素 \n
+ page.ele('.:ele_class') - 返回第一个 class 中含有 ele_class 的元素 \n
+ page.ele('#ele_id') - 返回第一个 id 为 ele_id 的元素 \n
+ page.ele('#:ele_id') - 返回第一个 id 中含有 ele_id 的元素 \n
+ page.ele('@class:ele_class') - 返回第一个class含有ele_class的元素 \n
+ page.ele('@name=ele_name') - 返回第一个name等于ele_name的元素 \n
+ page.ele('@placeholder') - 返回第一个带placeholder属性的元素 \n
+ page.ele('tag:p') - 返回第一个 元素 \n
+ page.ele('tag:div@class:ele_class') - 返回第一个class含有ele_class的div元素 \n
+ page.ele('tag:div@class=ele_class') - 返回第一个class等于ele_class的div元素 \n
+ page.ele('tag:div@text():some_text') - 返回第一个文本含有some_text的div元素 \n
+ page.ele('tag:div@text()=some_text') - 返回第一个文本等于some_text的div元素 \n
+ page.ele('text:some_text') - 返回第一个文本含有some_text的元素 \n
+ page.ele('some_text') - 返回第一个文本含有some_text的元素(等价于上一行) \n
+ page.ele('text=some_text') - 返回第一个文本等于some_text的元素 \n
+ page.ele('xpath://div[@class="ele_class"]') - 返回第一个符合xpath的元素 \n
+ page.ele('css:div.ele_class') - 返回第一个符合css selector的元素 \n
+ - 查询字符串还有最精简模式,用x代替xpath、c代替css、t代替tag、tx代替text: \n
+ page.ele('x://div[@class="ele_class"]') - 等同于 page.ele('xpath://div[@class="ele_class"]') \n
+ page.ele('c:div.ele_class') - 等同于 page.ele('css:div.ele_class') \n
+ page.ele('t:div') - 等同于 page.ele('tag:div') \n
+ page.ele('t:div@tx()=some_text') - 等同于 page.ele('tag:div@text()=some_text') \n
+ page.ele('tx:some_text') - 等同于 page.ele('text:some_text') \n
+ page.ele('tx=some_text') - 等同于 page.ele('text=some_text')
:param loc_or_ele: 元素的定位信息,可以是元素对象,loc元组,或查询字符串
:param mode: 'single' 或 'all‘,对应查找一个或全部
:param timeout: 查找元素超时时间,d模式专用
@@ -376,36 +392,42 @@ class MixPage(Null, SessionPage, DriverPage):
if self._mode == 's':
return super().ele(loc_or_ele, mode=mode)
elif self._mode == 'd':
- timeout = timeout or self.timeout
return super(SessionPage, self).ele(loc_or_ele, mode=mode, timeout=timeout)
def eles(self,
loc_or_str: Union[Tuple[str, str], str],
timeout: float = None) -> Union[List[DriverElement], List[SessionElement]]:
- """返回页面中所有符合条件的元素、属性或节点文本 \n
- 示例: \n
- - 用loc元组查找: \n
- page.eles((By.CLASS_NAME, 'ele_class')) - 返回所有class为ele_class的元素 \n
- - 用查询字符串查找: \n
- 查找方式:属性、tag name和属性、文本、xpath、css selector、id、class \n
- @表示属性,.表示class,#表示id,=表示精确匹配,:表示模糊匹配,无控制字符串时默认搜索该字符串 \n
- page.eles('.ele_class') - 返回所有 class 为 ele_class 的元素 \n
- page.eles('.:ele_class') - 返回所有 class 中含有 ele_class 的元素 \n
- page.eles('#ele_id') - 返回所有 id 为 ele_id 的元素 \n
- page.eles('#:ele_id') - 返回所有 id 中含有 ele_id 的元素 \n
- page.eles('@class:ele_class') - 返回所有class含有ele_class的元素 \n
- page.eles('@name=ele_name') - 返回所有name等于ele_name的元素 \n
- page.eles('@placeholder') - 返回所有带placeholder属性的元素 \n
- page.eles('tag:p') - 返回所有 元素 \n
- page.eles('tag:div@class:ele_class') - 返回所有class含有ele_class的div元素 \n
- page.eles('tag:div@class=ele_class') - 返回所有class等于ele_class的div元素 \n
- page.eles('tag:div@text():some_text') - 返回所有文本含有some_text的div元素 \n
- page.eles('tag:div@text()=some_text') - 返回所有文本等于some_text的div元素 \n
- page.eles('text:some_text') - 返回所有文本含有some_text的元素 \n
- page.eles('some_text') - 返回所有文本含有some_text的元素(等价于上一行) \n
- page.eles('text=some_text') - 返回所有文本等于some_text的元素 \n
- page.eles('xpath://div[@class="ele_class"]') - 返回所有符合xpath的元素 \n
- page.eles('css:div.ele_class') - 返回所有符合css selector的元素 \n
+ """返回页面中所有符合条件的元素、属性或节点文本 \n
+ 示例: \n
+ - 用loc元组查找: \n
+ page.eles((By.CLASS_NAME, 'ele_class')) - 返回所有class为ele_class的元素 \n
+ - 用查询字符串查找: \n
+ 查找方式:属性、tag name和属性、文本、xpath、css selector、id、class \n
+ @表示属性,.表示class,#表示id,=表示精确匹配,:表示模糊匹配,无控制字符串时默认搜索该字符串 \n
+ page.eles('.ele_class') - 返回所有 class 为 ele_class 的元素 \n
+ page.eles('.:ele_class') - 返回所有 class 中含有 ele_class 的元素 \n
+ page.eles('#ele_id') - 返回所有 id 为 ele_id 的元素 \n
+ page.eles('#:ele_id') - 返回所有 id 中含有 ele_id 的元素 \n
+ page.eles('@class:ele_class') - 返回所有class含有ele_class的元素 \n
+ page.eles('@name=ele_name') - 返回所有name等于ele_name的元素 \n
+ page.eles('@placeholder') - 返回所有带placeholder属性的元素 \n
+ page.eles('tag:p') - 返回所有 元素 \n
+ page.eles('tag:div@class:ele_class') - 返回所有class含有ele_class的div元素 \n
+ page.eles('tag:div@class=ele_class') - 返回所有class等于ele_class的div元素 \n
+ page.eles('tag:div@text():some_text') - 返回所有文本含有some_text的div元素 \n
+ page.eles('tag:div@text()=some_text') - 返回所有文本等于some_text的div元素 \n
+ page.eles('text:some_text') - 返回所有文本含有some_text的元素 \n
+ page.eles('some_text') - 返回所有文本含有some_text的元素(等价于上一行) \n
+ page.eles('text=some_text') - 返回所有文本等于some_text的元素 \n
+ page.eles('xpath://div[@class="ele_class"]') - 返回所有符合xpath的元素 \n
+ page.eles('css:div.ele_class') - 返回所有符合css selector的元素 \n
+ - 查询字符串还有最精简模式,用x代替xpath、c代替css、t代替tag、tx代替text: \n
+ page.eles('x://div[@class="ele_class"]') - 等同于 page.eles('xpath://div[@class="ele_class"]') \n
+ page.eles('c:div.ele_class') - 等同于 page.eles('css:div.ele_class') \n
+ page.eles('t:div') - 等同于 page.eles('tag:div') \n
+ page.eles('t:div@tx()=some_text') - 等同于 page.eles('tag:div@text()=some_text') \n
+ page.eles('tx:some_text') - 等同于 page.eles('text:some_text') \n
+ page.eles('tx=some_text') - 等同于 page.eles('text=some_text')
:param loc_or_str: 元素的定位信息,可以是loc元组,或查询字符串
:param timeout: 查找元素超时时间,d模式专用
:return: 元素对象或属性、文本节点文本组成的列表
diff --git a/DrissionPage/session_element.py b/DrissionPage/session_element.py
index 47b3fe5..0bcf345 100644
--- a/DrissionPage/session_element.py
+++ b/DrissionPage/session_element.py
@@ -36,9 +36,8 @@ class SessionElement(DrissionElement):
@property
def html(self) -> str:
"""返回元素outerHTML文本"""
- # tostring()会把跟紧元素的文本节点也带上,因此要去掉
html = format_html(tostring(self._inner_ele, method="html").decode())
- return html[:html.rfind('>') + 1]
+ return html[:html.rfind('>') + 1] # tostring()会把跟紧元素的文本节点也带上,因此要去掉
@property
def inner_html(self) -> str:
@@ -46,6 +45,48 @@ class SessionElement(DrissionElement):
r = re.match(r'<.*?>(.*)', self.html, flags=re.DOTALL)
return '' if not r else r.group(1)
+ @property
+ def text(self) -> str:
+ """返回元素内所有文本"""
+
+ # 为尽量保证与浏览器结果一致,弄得比较复杂
+ def get_node(ele, pre: bool = False):
+ str_list = []
+ if ele.tag == 'pre':
+ pre = True
+
+ current_tag = None
+ for el in ele.eles('xpath:./text() | *'):
+ if current_tag in ('br', 'p') and str_list and str_list[-1] != '\n':
+ str_list.append('\n')
+
+ if isinstance(el, str):
+ if el.replace(' ', '').replace('\n', '') != '':
+ if pre:
+ str_list.append(el)
+ else:
+ str_list.append(el.replace('\n', ' ').strip(' \t'))
+
+ elif '\n' in el and str_list and str_list[-1] != '\n':
+ str_list.append('\n')
+ else:
+ str_list.append(' ')
+ current_tag = None
+ else:
+ str_list.extend(get_node(el, pre))
+ current_tag = el.tag
+
+ return str_list
+
+ re_str = ''.join(get_node(self))
+ re_str = re.sub(r' {2,}', ' ', re_str)
+ return format_html(re_str, False)
+
+ @property
+ def raw_text(self) -> str:
+ """返回未格式化处理的元素内文本"""
+ return str(self._inner_ele.text_content())
+
@property
def tag(self) -> str:
"""返回元素类型"""
@@ -56,11 +97,6 @@ class SessionElement(DrissionElement):
"""返回元素所有属性及值"""
return {attr: self.attr(attr) for attr, val in self.inner_ele.items()}
- @property
- def text(self) -> str:
- """返回元素内所有文本"""
- return str(self._inner_ele.text_content())
-
@property
def link(self) -> str:
"""返回href或src绝对url"""
@@ -91,26 +127,22 @@ class SessionElement(DrissionElement):
"""返回前一个兄弟元素"""
return self._get_brother(1, 'ele', 'prev')
+ @property
+ def comments(self):
+ return self.eles('xpath:.//comment()')
+
def texts(self, text_node_only: bool = False) -> list:
"""返回元素内所有直接子节点的文本,包括元素和文本节点 \n
:param text_node_only: 是否只返回文本节点
:return: 文本列表
"""
if text_node_only:
- return self.eles('xpath:/text()')
+ texts = self.eles('xpath:/text()')
else:
- texts = []
+ texts = [x if isinstance(x, str) else x.text for x in self.eles('xpath:./text() | *')]
- for node in self.eles('xpath:/node()'):
- if isinstance(node, str):
- text = node
- else:
- text = node.text
-
- if text:
- texts.append(text)
-
- return texts
+ return [format_html(x.strip(' ')) for x in texts if
+ x and x.replace('\n', '').replace('\t', '').replace(' ', '') != '']
def parents(self, num: int = 1):
"""返回上面第num级父元素 \n
@@ -155,7 +187,7 @@ class SessionElement(DrissionElement):
elif attr == 'src':
return self._make_absolute(self.inner_ele.get('src'))
- elif attr in ['text', 'innerText']:
+ elif attr in ('text', 'innerText'):
return self.text
elif attr == 'outerHTML':
@@ -168,30 +200,37 @@ class SessionElement(DrissionElement):
return self.inner_ele.get(attr)
def ele(self, loc_or_str: Union[Tuple[str, str], str], mode: str = None):
- """返回当前元素下级符合条件的子元素、属性或节点文本,默认返回第一个 \n
- 示例: \n
+ """返回当前元素下级符合条件的子元素、属性或节点文本,默认返回第一个 \n
+ 示例: \n
- 用loc元组查找: \n
- ele.ele((By.CLASS_NAME, 'ele_class')) - 返回第一个class为ele_class的子元素 \n
+ ele.ele((By.CLASS_NAME, 'ele_class')) - 返回第一个class为ele_class的子元素 \n
- 用查询字符串查找: \n
- 查找方式:属性、tag name和属性、文本、xpath、css selector、id、class \n
- @表示属性,.表示class,#表示id,=表示精确匹配,:表示模糊匹配,无控制字符串时默认搜索该字符串 \n
+ 查找方式:属性、tag name和属性、文本、xpath、css selector、id、class \n
+ @表示属性,.表示class,#表示id,=表示精确匹配,:表示模糊匹配,无控制字符串时默认搜索该字符串 \n
ele.ele('.ele_class') - 返回第一个 class 为 ele_class 的子元素 \n
ele.ele('.:ele_class') - 返回第一个 class 中含有 ele_class 的子元素 \n
- ele.ele('#ele_id') - 返回第一个 id 为 ele_id 的子元素 \n
+ ele.ele('#ele_id') - 返回第一个 id 为 ele_id 的子元素 \n
ele.ele('#:ele_id') - 返回第一个 id 中含有 ele_id 的子元素 \n
- ele.ele('@class:ele_class') - 返回第一个class含有ele_class的子元素 \n
- ele.ele('@name=ele_name') - 返回第一个name等于ele_name的子元素 \n
- ele.ele('@placeholder') - 返回第一个带placeholder属性的子元素 \n
- ele.ele('tag:p') - 返回第一个 子元素 \n
- ele.ele('tag:div@class:ele_class') - 返回第一个class含有ele_class的div子元素 \n
- ele.ele('tag:div@class=ele_class') - 返回第一个class等于ele_class的div子元素 \n
- ele.ele('tag:div@text():some_text') - 返回第一个文本含有some_text的div子元素 \n
- ele.ele('tag:div@text()=some_text') - 返回第一个文本等于some_text的div子元素 \n
- ele.ele('text:some_text') - 返回第一个文本含有some_text的子元素 \n
- ele.ele('some_text') - 返回第一个文本含有some_text的子元素(等价于上一行) \n
- ele.ele('text=some_text') - 返回第一个文本等于some_text的子元素 \n
- ele.ele('xpath://div[@class="ele_class"]') - 返回第一个符合xpath的子元素 \n
- ele.ele('css:div.ele_class') - 返回第一个符合css selector的子元素 \n
+ ele.ele('@class:ele_class') - 返回第一个class含有ele_class的子元素 \n
+ ele.ele('@name=ele_name') - 返回第一个name等于ele_name的子元素 \n
+ ele.ele('@placeholder') - 返回第一个带placeholder属性的子元素 \n
+ ele.ele('tag:p') - 返回第一个 子元素 \n
+ ele.ele('tag:div@class:ele_class') - 返回第一个class含有ele_class的div子元素 \n
+ ele.ele('tag:div@class=ele_class') - 返回第一个class等于ele_class的div子元素 \n
+ ele.ele('tag:div@text():some_text') - 返回第一个文本含有some_text的div子元素 \n
+ ele.ele('tag:div@text()=some_text') - 返回第一个文本等于some_text的div子元素 \n
+ ele.ele('text:some_text') - 返回第一个文本含有some_text的子元素 \n
+ ele.ele('some_text') - 返回第一个文本含有some_text的子元素(等价于上一行) \n
+ ele.ele('text=some_text') - 返回第一个文本等于some_text的子元素 \n
+ ele.ele('xpath://div[@class="ele_class"]') - 返回第一个符合xpath的子元素 \n
+ ele.ele('css:div.ele_class') - 返回第一个符合css selector的子元素 \n
+ - 查询字符串还有最精简模式,用x代替xpath、c代替css、t代替tag、tx代替text: \n
+ ele.ele('x://div[@class="ele_class"]') - 等同于 ele.ele('xpath://div[@class="ele_class"]') \n
+ ele.ele('c:div.ele_class') - 等同于 ele.ele('css:div.ele_class') \n
+ ele.ele('t:div') - 等同于 ele.ele('tag:div') \n
+ ele.ele('t:div@tx()=some_text') - 等同于 ele.ele('tag:div@text()=some_text') \n
+ ele.ele('tx:some_text') - 等同于 ele.ele('text:some_text') \n
+ ele.ele('tx=some_text') - 等同于 ele.ele('text=some_text')
:param loc_or_str: 元素的定位信息,可以是loc元组,或查询字符串
:param mode: 'single' 或 'all‘,对应查找一个或全部
:return: SessionElement对象
@@ -222,30 +261,37 @@ class SessionElement(DrissionElement):
return execute_session_find(element, loc_or_str, mode)
def eles(self, loc_or_str: Union[Tuple[str, str], str]):
- """返回当前元素下级所有符合条件的子元素、属性或节点文本 \n
- 示例: \n
- - 用loc元组查找: \n
- ele.eles((By.CLASS_NAME, 'ele_class')) - 返回所有class为ele_class的子元素 \n
- - 用查询字符串查找: \n
- 查找方式:属性、tag name和属性、文本、xpath、css selector、id、class \n
- @表示属性,.表示class,#表示id,=表示精确匹配,:表示模糊匹配,无控制字符串时默认搜索该字符串 \n
- ele.eles('.ele_class') - 返回所有 class 为 ele_class 的子元素 \n
- ele.eles('.:ele_class') - 返回所有 class 中含有 ele_class 的子元素 \n
- ele.eles('#ele_id') - 返回所有 id 为 ele_id 的子元素 \n
- ele.eles('#:ele_id') - 返回所有 id 中含有 ele_id 的子元素 \n
- ele.eles('@class:ele_class') - 返回所有class含有ele_class的子元素 \n
- ele.eles('@name=ele_name') - 返回所有name等于ele_name的子元素 \n
- ele.eles('@placeholder') - 返回所有带placeholder属性的子元素 \n
- ele.eles('tag:p') - 返回所有 子元素 \n
- ele.eles('tag:div@class:ele_class') - 返回所有class含有ele_class的div子元素 \n
- ele.eles('tag:div@class=ele_class') - 返回所有class等于ele_class的div子元素 \n
- ele.eles('tag:div@text():some_text') - 返回所有文本含有some_text的div子元素 \n
- ele.eles('tag:div@text()=some_text') - 返回所有文本等于some_text的div子元素 \n
- ele.eles('text:some_text') - 返回所有文本含有some_text的子元素 \n
- ele.eles('some_text') - 返回所有文本含有some_text的子元素(等价于上一行) \n
- ele.eles('text=some_text') - 返回所有文本等于some_text的子元素 \n
- ele.eles('xpath://div[@class="ele_class"]') - 返回所有符合xpath的子元素 \n
- ele.eles('css:div.ele_class') - 返回所有符合css selector的子元素 \n
+ """返回当前元素下级所有符合条件的子元素、属性或节点文本 \n
+ 示例: \n
+ - 用loc元组查找: \n
+ ele.eles((By.CLASS_NAME, 'ele_class')) - 返回所有class为ele_class的子元素 \n
+ - 用查询字符串查找: \n
+ 查找方式:属性、tag name和属性、文本、xpath、css selector、id、class \n
+ @表示属性,.表示class,#表示id,=表示精确匹配,:表示模糊匹配,无控制字符串时默认搜索该字符串 \n
+ ele.eles('.ele_class') - 返回所有 class 为 ele_class 的子元素 \n
+ ele.eles('.:ele_class') - 返回所有 class 中含有 ele_class 的子元素 \n
+ ele.eles('#ele_id') - 返回所有 id 为 ele_id 的子元素 \n
+ ele.eles('#:ele_id') - 返回所有 id 中含有 ele_id 的子元素 \n
+ ele.eles('@class:ele_class') - 返回所有class含有ele_class的子元素 \n
+ ele.eles('@name=ele_name') - 返回所有name等于ele_name的子元素 \n
+ ele.eles('@placeholder') - 返回所有带placeholder属性的子元素 \n
+ ele.eles('tag:p') - 返回所有 子元素 \n
+ ele.eles('tag:div@class:ele_class') - 返回所有class含有ele_class的div子元素 \n
+ ele.eles('tag:div@class=ele_class') - 返回所有class等于ele_class的div子元素 \n
+ ele.eles('tag:div@text():some_text') - 返回所有文本含有some_text的div子元素 \n
+ ele.eles('tag:div@text()=some_text') - 返回所有文本等于some_text的div子元素 \n
+ ele.eles('text:some_text') - 返回所有文本含有some_text的子元素 \n
+ ele.eles('some_text') - 返回所有文本含有some_text的子元素(等价于上一行) \n
+ ele.eles('text=some_text') - 返回所有文本等于some_text的子元素 \n
+ ele.eles('xpath://div[@class="ele_class"]') - 返回所有符合xpath的子元素 \n
+ ele.eles('css:div.ele_class') - 返回所有符合css selector的子元素 \n
+ - 查询字符串还有最精简模式,用x代替xpath、c代替css、t代替tag、tx代替text: \n
+ ele.eles('x://div[@class="ele_class"]') - 等同于 ele.eles('xpath://div[@class="ele_class"]') \n
+ ele.eles('c:div.ele_class') - 等同于 ele.eles('css:div.ele_class') \n
+ ele.eles('t:div') - 等同于 ele.eles('tag:div') \n
+ ele.eles('t:div@tx()=some_text') - 等同于 ele.eles('tag:div@text()=some_text') \n
+ ele.eles('tx:some_text') - 等同于 ele.eles('text:some_text') \n
+ ele.eles('tx=some_text') - 等同于 ele.eles('text=some_text')
:param loc_or_str: 元素的定位信息,可以是loc元组,或查询字符串
:return: SessionElement对象组成的列表
"""
@@ -284,12 +330,6 @@ class SessionElement(DrissionElement):
ele = self
while ele:
- # ele_id = ele.attr('id')
-
- # if ele_id:
- # return f'#{ele_id}{path_str}' if mode == 'css' else f'//{ele.tag}[@id="{ele_id}"]{path_str}'
- # else:
-
if mode == 'css':
brothers = len(ele.eles(f'xpath:./preceding-sibling::*'))
path_str = f'>:nth-child({brothers + 1}){path_str}'
@@ -302,7 +342,7 @@ class SessionElement(DrissionElement):
return path_str[1:] if mode == 'css' else path_str
def _get_brother(self, num: int = 1, mode: str = 'ele', direction: str = 'next'):
- """返回前面第num个兄弟元素或节点 \n
+ """返回前面或后面第num个兄弟元素或节点 \n
:param num: 前面第几个兄弟元素或节点
:param mode: 'ele', 'node' 或 'text',匹配元素、节点、或文本节点
:param direction: 'next' 或 'prev',查找的方向
@@ -348,7 +388,7 @@ def execute_session_find(page_or_ele,
:return: 返回SessionElement元素或列表
"""
mode = mode or 'single'
- if mode not in ['single', 'all']:
+ if mode not in ('single', 'all'):
raise ValueError(f"Argument mode can only be 'single' or 'all', not '{mode}'.")
# 根据传入对象类型获取页面对象和lxml元素对象
@@ -357,7 +397,7 @@ def execute_session_find(page_or_ele,
page_or_ele = page_or_ele.inner_ele
else: # 传入的是SessionPage对象
page = page_or_ele
- page_or_ele = fromstring(page_or_ele.html)
+ page_or_ele = fromstring(re.sub(r' ?', ' ', page_or_ele.response.text))
try:
# 用lxml内置方法获取lxml的元素对象列表
@@ -368,6 +408,10 @@ def execute_session_find(page_or_ele,
else:
ele = page_or_ele.cssselect(loc[1])
+ # 结果不是列表,如数字
+ if not isinstance(ele, list):
+ return ele
+
# 把lxml元素对象包装成SessionElement对象并按需要返回第一个或全部
if mode == 'single':
ele = ele[0] if ele else None
diff --git a/DrissionPage/session_page.py b/DrissionPage/session_page.py
index 060e50d..f4c5959 100644
--- a/DrissionPage/session_page.py
+++ b/DrissionPage/session_page.py
@@ -10,11 +10,11 @@ from pathlib import Path
from random import randint
from re import search as re_SEARCH
from re import sub as re_SUB
-from time import time, sleep
from typing import Union, List, Tuple
from urllib.parse import urlparse, quote, unquote
from requests import Session, Response
+from time import time, sleep
from tldextract import extract
from .common import str_to_loc, translate_loc, get_available_file_name, format_html
@@ -33,6 +33,9 @@ class SessionPage(object):
self._url_available = None
self._response = None
+ self.retry_times = 3
+ self.retry_interval = 2
+
@property
def session(self) -> Session:
"""返回session对象"""
@@ -194,17 +197,25 @@ class SessionPage(object):
:param kwargs: 连接参数
:return: HTMLResponse对象
"""
- r = self._make_response(to_url, mode=mode, show_errmsg=show_errmsg, **kwargs)[0]
+ err = None
+ r = None
- while times and (not r or r.content == b''):
- if r is not None and r.status_code in (403, 404):
+ for _ in range(times + 1):
+ try:
+ r = self._make_response(to_url, mode=mode, show_errmsg=True, **kwargs)[0]
+ except Exception as e:
+ err = e
+ r = None
+
+ if r and (r.content != b'' or r.status_code in (403, 404)):
break
- print('重试', to_url)
- sleep(interval)
+ if _ < times:
+ sleep(interval)
+ print(f'重试 {to_url}')
- r = self._make_response(to_url, mode=mode, show_errmsg=show_errmsg, **kwargs)[0]
- times -= 1
+ if not r and show_errmsg:
+ raise err if err is not None else ConnectionError('Connect error.')
return r
@@ -212,8 +223,8 @@ class SessionPage(object):
url: str,
go_anyway: bool = False,
show_errmsg: bool = False,
- retry: int = 0,
- interval: float = 1,
+ retry: int = None,
+ interval: float = None,
**kwargs) -> Union[bool, None]:
"""用get方式跳转到url \n
:param url: 目标url
@@ -224,7 +235,9 @@ class SessionPage(object):
:param kwargs: 连接参数
:return: url是否可用
"""
- to_url = quote(url, safe='/:&?=%;#@+')
+ to_url = quote(url, safe='/:&?=%;#@+!')
+ retry = int(retry) if retry is not None else int(self.retry_times)
+ interval = int(interval) if interval is not None else int(self.retry_interval)
if not url or (not go_anyway and self.url == to_url):
return
@@ -252,8 +265,8 @@ class SessionPage(object):
data: dict = None,
go_anyway: bool = True,
show_errmsg: bool = False,
- retry: int = 0,
- interval: float = 1,
+ retry: int = None,
+ interval: float = None,
**kwargs) -> Union[bool, None]:
"""用post方式跳转到url \n
:param url: 目标url
@@ -265,7 +278,9 @@ class SessionPage(object):
:param kwargs: 连接参数
:return: url是否可用
"""
- to_url = quote(url, safe='/:&?=%;#@')
+ to_url = quote(url, safe='/:&?=%;#@+!')
+ retry = int(retry) if retry is not None else int(self.retry_times)
+ interval = int(interval) if interval is not None else int(self.retry_interval)
if not url or (not go_anyway and self._url == to_url):
return
@@ -295,6 +310,8 @@ class SessionPage(object):
post_data: dict = None,
show_msg: bool = False,
show_errmsg: bool = False,
+ retry: int = None,
+ interval: float = None,
**kwargs) -> tuple:
"""下载一个文件 \n
:param file_url: 文件url
@@ -304,158 +321,189 @@ class SessionPage(object):
:param post_data: post方式的数据
:param show_msg: 是否显示下载信息
:param show_errmsg: 是否抛出和显示异常
+ :param retry: 重试次数
+ :param interval: 重试间隔时间
:param kwargs: 连接参数
:return: 下载是否成功(bool)和状态信息(成功时信息为文件路径)的元组
"""
- # 生成的response不写入self._response,是临时的
- kwargs['stream'] = True
-
- if 'timeout' not in kwargs:
- kwargs['timeout'] = 20
-
- mode = 'post' if post_data else 'get'
- r, info = self._make_response(file_url, mode=mode, data=post_data, show_errmsg=show_errmsg, **kwargs)
-
- if r is None:
+ if file_exists == 'skip' and Path(f'{goal_path}\\{rename}').exists():
if show_msg:
- print(info)
+ print(f'{file_url}\n{goal_path}\\{rename}\nSkipped.\n')
- return False, info
-
- if not r.ok:
- if show_errmsg:
- raise ConnectionError(f'Status code: {r.status_code}.')
-
- return False, f'Status code: {r.status_code}.'
-
- # -------------------获取文件名-------------------
- file_name = ''
- content_disposition = r.headers.get('content-disposition')
-
- # 使用header里的文件名
- if content_disposition:
- file_name = r.headers[content_disposition[0]].encode('ISO-8859-1').decode('utf-8')
- file_name = re.search(r'filename *= *"?([^";]+)', file_name)
- if file_name:
- file_name = file_name.group(1)
-
- if file_name[0] == file_name[-1] == "'":
- file_name = file_name[1:-1]
-
- # 在url里获取文件名
- if not file_name and os_PATH.basename(file_url):
- file_name = os_PATH.basename(file_url).split("?")[0]
-
- # 找不到则用时间和随机数生成文件名
- if not file_name:
- file_name = f'untitled_{time()}_{randint(0, 100)}'
-
- # 去除非法字符
- file_name = re_SUB(r'[\\/*:|<>?"]', '', file_name).strip()
- file_name = unquote(file_name)
-
- # -------------------重命名,不改变扩展名-------------------
- if rename:
- rename = re_SUB(r'[\\/*:|<>?"]', '', rename).strip()
- ext_name = file_name.split('.')[-1]
-
- if '.' in rename or ext_name == file_name:
- full_name = rename
- else:
- full_name = f'{rename}.{ext_name}'
-
- else:
- full_name = file_name
-
- # -------------------生成路径-------------------
- goal_Path = Path(goal_path)
- goal_path = ''
- skip = False
-
- for key, i in enumerate(goal_Path.parts): # 去除路径中的非法字符
- goal_path += goal_Path.drive if key == 0 and goal_Path.drive else re_SUB(r'[*:|<>?"]', '', i).strip()
- goal_path += '\\' if i != '\\' and key < len(goal_Path.parts) - 1 else ''
-
- goal_Path = Path(goal_path)
- goal_Path.mkdir(parents=True, exist_ok=True)
- goal_path = goal_Path.absolute()
- full_path = Path(f'{goal_path}\\{full_name}')
-
- if full_path.exists():
- if file_exists == 'rename':
- full_name = get_available_file_name(goal_path, full_name)
- full_path = Path(f'{goal_path}\\{full_name}')
-
- elif file_exists == 'skip':
- skip = True
-
- elif file_exists == 'overwrite':
- pass
-
- else:
- raise ValueError("Argument file_exists can only be 'skip', 'overwrite', 'rename'.")
-
- # -------------------打印要下载的文件-------------------
- if show_msg:
- print(file_url)
- print(full_name if file_name == full_name else f'{file_name} -> {full_name}')
- print(f'Downloading to: {goal_path}')
-
- if skip:
- print('Skipped.\n')
-
- # -------------------开始下载-------------------
- if skip:
return False, 'Skipped because a file with the same name already exists.'
- # 获取远程文件大小
- content_length = r.headers.get('content-length')
- file_size = int(content_length) if content_length else None
+ def do(url: str,
+ goal: str,
+ new_name: str = None,
+ exists: str = 'rename',
+ data: dict = None,
+ msg: bool = False,
+ errmsg: bool = False,
+ **args) -> tuple:
+ args['stream'] = True
- # 已下载文件大小和下载状态
- downloaded_size, download_status = 0, False
+ if 'timeout' not in args:
+ args['timeout'] = 20
- try:
- with open(str(full_path), 'wb') as tmpFile:
- for chunk in r.iter_content(chunk_size=1024):
- if chunk:
- tmpFile.write(chunk)
+ mode = 'post' if data else 'get'
+ # 生成的response不写入self._response,是临时的
+ r, info = self._make_response(url, mode=mode, data=data, show_errmsg=errmsg, **args)
- # 如表头有返回文件大小,显示进度
- if show_msg and file_size:
- downloaded_size += 1024
- rate = downloaded_size / file_size if downloaded_size < file_size else 1
- print('\r {:.0%} '.format(rate), end="")
+ if r is None:
+ if msg:
+ print(info)
- except Exception as e:
- if show_errmsg:
- raise ConnectionError(e)
+ return False, info
- download_status, info = False, f'Download failed.\n{e}'
+ if not r.ok:
+ if errmsg:
+ raise ConnectionError(f'Status code: {r.status_code}.')
- else:
- if full_path.stat().st_size == 0:
- if show_errmsg:
- raise ValueError('File size is 0.')
+ return False, f'Status code: {r.status_code}.'
- download_status, info = False, 'File size is 0.'
+ # -------------------获取文件名-------------------
+ file_name = ''
+ content_disposition = r.headers.get('content-disposition')
+
+ # 使用header里的文件名
+ if content_disposition:
+ file_name = content_disposition.encode('ISO-8859-1').decode('utf-8')
+ file_name = re.search(r'filename *= *"?([^";]+)', file_name)
+
+ if file_name:
+ file_name = file_name.group(1)
+
+ if file_name[0] == file_name[-1] == "'":
+ file_name = file_name[1:-1]
+
+ # 在url里获取文件名
+ if not file_name and os_PATH.basename(url):
+ file_name = os_PATH.basename(url).split("?")[0]
+
+ # 找不到则用时间和随机数生成文件名
+ if not file_name:
+ file_name = f'untitled_{time()}_{randint(0, 100)}'
+
+ # 去除非法字符
+ file_name = re_SUB(r'[\\/*:|<>?"]', '', file_name).strip()
+ file_name = unquote(file_name)
+
+ # -------------------重命名,不改变扩展名-------------------
+ if new_name:
+ new_name = re_SUB(r'[\\/*:|<>?"]', '', new_name).strip()
+ ext_name = file_name.split('.')[-1]
+
+ if '.' in new_name or ext_name == file_name:
+ full_name = new_name
+ else:
+ full_name = f'{new_name}.{ext_name}'
else:
- download_status, info = True, 'Success.'
+ full_name = file_name
- finally:
- # 删除下载出错文件
- if not download_status and full_path.exists():
- full_path.unlink()
+ # -------------------生成路径-------------------
+ goal_Path = Path(goal)
+ goal = ''
+ skip = False
- r.close()
+ for key, i in enumerate(goal_Path.parts): # 去除路径中的非法字符
+ goal += goal_Path.drive if key == 0 and goal_Path.drive else re_SUB(r'[*:|<>?"]', '', i).strip()
+ goal += '\\' if i != '\\' and key < len(goal_Path.parts) - 1 else ''
- # -------------------显示并返回值-------------------
- if show_msg:
- print(info, '\n')
+ goal_Path = Path(goal).absolute()
+ goal_Path.mkdir(parents=True, exist_ok=True)
+ full_path = Path(f'{goal}\\{full_name}')
- info = f'{goal_path}\\{full_name}' if download_status else info
- return download_status, info
+ if full_path.exists():
+ if file_exists == 'rename':
+ full_name = get_available_file_name(goal, full_name)
+ full_path = Path(f'{goal}\\{full_name}')
+
+ elif exists == 'skip':
+ skip = True
+
+ elif exists == 'overwrite':
+ pass
+
+ else:
+ raise ValueError("Argument file_exists can only be 'skip', 'overwrite', 'rename'.")
+
+ # -------------------打印要下载的文件-------------------
+ if msg:
+ print(file_url)
+ print(full_name if file_name == full_name else f'{file_name} -> {full_name}')
+ print(f'Downloading to: {goal}')
+
+ if skip:
+ print('Skipped.\n')
+
+ # -------------------开始下载-------------------
+ if skip:
+ return False, 'Skipped because a file with the same name already exists.'
+
+ # 获取远程文件大小
+ content_length = r.headers.get('content-length')
+ file_size = int(content_length) if content_length else None
+
+ # 已下载文件大小和下载状态
+ downloaded_size, download_status = 0, False
+
+ try:
+ with open(str(full_path), 'wb') as tmpFile:
+ for chunk in r.iter_content(chunk_size=1024):
+ if chunk:
+ tmpFile.write(chunk)
+
+ # 如表头有返回文件大小,显示进度
+ if msg and file_size:
+ downloaded_size += 1024
+ rate = downloaded_size / file_size if downloaded_size < file_size else 1
+ print('\r {:.0%} '.format(rate), end="")
+
+ except Exception as e:
+ if errmsg:
+ raise ConnectionError(e)
+
+ download_status, info = False, f'Download failed.\n{e}'
+
+ else:
+ if full_path.stat().st_size == 0:
+ if errmsg:
+ raise ValueError('File size is 0.')
+
+ download_status, info = False, 'File size is 0.'
+
+ else:
+ download_status, info = True, str(full_path)
+
+ finally:
+ # 删除下载出错文件
+ if not download_status and full_path.exists():
+ full_path.unlink()
+
+ r.close()
+
+ # -------------------显示并返回值-------------------
+ if msg:
+ print(info, '\n')
+
+ info = f'{goal}\\{full_name}' if download_status else info
+ return download_status, info
+
+ retry_times = retry or self.retry_times
+ retry_interval = interval or self.retry_interval
+ result = do(file_url, goal_path, rename, file_exists, post_data, show_msg, show_errmsg, **kwargs)
+
+ if not result[0] and not str(result[1]).startswith('Skipped'):
+ for i in range(retry_times):
+ sleep(retry_interval)
+
+ print(f'重试 {file_url}')
+ result = do(file_url, goal_path, rename, file_exists, post_data, show_msg, show_errmsg, **kwargs)
+ if result[0]:
+ break
+
+ return result
def _make_response(self,
url: str,
@@ -469,17 +517,17 @@ class SessionPage(object):
:param data: post方式要提交的数据
:param show_errmsg: 是否显示和抛出异常
:param kwargs: 其它参数
- :return: tuple,第一位为Response或None,第二位为出错信息或'Sussess'
+ :return: tuple,第一位为Response或None,第二位为出错信息或'Success'
"""
if not url:
if show_errmsg:
raise ValueError('url is empty.')
return None, 'url is empty.'
- if mode not in ['get', 'post']:
+ if mode not in ('get', 'post'):
raise ValueError("Argument mode can only be 'get' or 'post'.")
- url = quote(url, safe='/:&?=%;#@+')
+ url = quote(url, safe='/:&?=%;#@+!')
# 设置referer和host值
kwargs_set = set(x.lower() for x in kwargs)
@@ -520,14 +568,14 @@ class SessionPage(object):
else:
# ----------------获取并设置编码开始-----------------
# 在headers中获取编码
- content_type = r.headers.get('content-type').lower()
+ content_type = r.headers.get('content-type', '').lower()
charset = re.search(r'charset[=: ]*(.*)?[;]', content_type)
if charset:
r.encoding = charset.group(1)
# 在headers中获取不到编码,且如果是网页
- elif content_type.replace(' ', '').lower().startswith('text/html'):
+ elif content_type.replace(' ', '').startswith('text/html'):
re_result = re_SEARCH(b'