diff --git a/DrissionPage/__init__.py b/DrissionPage/__init__.py index d5f2850..7f71de5 100644 --- a/DrissionPage/__init__.py +++ b/DrissionPage/__init__.py @@ -3,16 +3,24 @@ @Author : g1879 @Contact : g1879@qq.com """ +# 常用页面类 +from .chromium_page import ChromiumPage +from .session_page import SessionPage +from .web_page import WebPage + +# 启动配置类 +from .configs.chromium_options import ChromiumOptions +from .configs.session_options import SessionOptions + +# 常用工具 +from .action_chains import ActionChains +from .keys import Keys + +# 旧版页面类和启动配置类 +from .mix_page import MixPage +from .drission import Drission +from .configs.driver_options import DriverOptions from warnings import filterwarnings filterwarnings('ignore') -from .mix_page import MixPage -from .web_page import WebPage -from .chromium_page import ChromiumPage -from .session_page import SessionPage -from .drission import Drission -from .configs.driver_options import DriverOptions -from .configs.chromium_options import ChromiumOptions -from .configs.session_options import SessionOptions -from .action_chains import ActionChains diff --git a/DrissionPage/action_chains.py b/DrissionPage/action_chains.py index a9f5a71..0210b40 100644 --- a/DrissionPage/action_chains.py +++ b/DrissionPage/action_chains.py @@ -5,7 +5,7 @@ """ from time import sleep -from .common import location_in_viewport +from .functions.web import location_in_viewport from .keys import _modifierBit, _keyDescriptionForString diff --git a/DrissionPage/base.py b/DrissionPage/base.py index 2455697..008051a 100644 --- a/DrissionPage/base.py +++ b/DrissionPage/base.py @@ -7,7 +7,8 @@ from abc import abstractmethod from re import sub from urllib.parse import quote -from .common import format_html, get_loc +from .functions.web import format_html +from .functions.locator import get_loc class BaseParser(object): diff --git a/DrissionPage/chromium_base.py b/DrissionPage/chromium_base.py index 2f3856d..7b66964 100644 --- a/DrissionPage/chromium_base.py +++ b/DrissionPage/chromium_base.py @@ -10,7 +10,8 @@ from requests import Session from .base import BasePage from .chromium_element import ChromiumElementWaiter, ChromiumScroll, ChromiumElement, run_js, make_chromium_ele -from .common import get_loc, offset_scroll, cookies_to_tuple +from .functions.locator import get_loc +from .functions.web import offset_scroll, cookies_to_tuple from .session_element import make_session_ele from .chromium_driver import ChromiumDriver diff --git a/DrissionPage/chromium_element.py b/DrissionPage/chromium_element.py index bd67512..bfb1200 100644 --- a/DrissionPage/chromium_element.py +++ b/DrissionPage/chromium_element.py @@ -9,8 +9,8 @@ from pathlib import Path from time import perf_counter, sleep from .base import DrissionElement, BaseElement -from .common import make_absolute_link, get_loc, get_ele_txt, format_html, is_js_func, location_in_viewport, \ - offset_scroll +from .functions.locator import get_loc +from .functions.web import make_absolute_link, get_ele_txt, format_html, is_js_func, location_in_viewport, offset_scroll from .keys import _keys_to_typing, _keyDescriptionForString, _keyDefinitions from .session_element import make_session_ele diff --git a/DrissionPage/chromium_page.py b/DrissionPage/chromium_page.py index 8e3184c..56e1ee8 100644 --- a/DrissionPage/chromium_page.py +++ b/DrissionPage/chromium_page.py @@ -13,7 +13,7 @@ from requests import Session from .chromium_base import Timeout, ChromiumBase from .chromium_driver import ChromiumDriver from .chromium_tab import ChromiumTab -from .common import connect_browser +from .functions.browser import connect_browser from .configs.driver_options import DriverOptions from .session_page import DownloadSetter diff --git a/DrissionPage/common.py b/DrissionPage/common.py deleted file mode 100644 index acd76ac..0000000 --- a/DrissionPage/common.py +++ /dev/null @@ -1,768 +0,0 @@ -# -*- coding:utf-8 -*- -""" -@Author : g1879 -@Contact : g1879@qq.com -""" -from html import unescape -from http.cookiejar import Cookie -from pathlib import Path -from platform import system -from re import split, search, sub -from shutil import rmtree -from subprocess import Popen -from time import perf_counter, sleep -from urllib.parse import urlparse, urljoin, urlunparse -from zipfile import ZipFile - -from requests import get as requests_get -from requests.cookies import RequestsCookieJar - -# from .configs.chromium_options import ChromiumOptions -from .configs.driver_options import DriverOptions - - -def get_ele_txt(e): - """获取元素内所有文本 - :param e: 元素对象 - :return: 元素内所有文本 - """ - # 前面无须换行的元素 - nowrap_list = ('br', 'sub', 'sup', 'em', 'strong', 'a', 'font', 'b', 'span', 's', 'i', 'del', 'ins', 'img', 'td', - 'th', 'abbr', 'bdi', 'bdo', 'cite', 'code', 'data', 'dfn', 'kbd', 'mark', 'q', 'rp', 'rt', 'ruby', - 'samp', 'small', 'time', 'u', 'var', 'wbr', 'button', 'slot', 'content') - # 后面添加换行的元素 - wrap_after_list = ('p', 'div', 'h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'ol', 'li', 'blockquote', 'header', - 'footer', 'address' 'article', 'aside', 'main', 'nav', 'section', 'figcaption', 'summary') - # 不获取文本的元素 - noText_list = ('script', 'style', 'video', 'audio', 'iframe', 'embed', 'noscript', 'canvas', 'template') - # 用/t分隔的元素 - tab_list = ('td', 'th') - - if e.tag in noText_list: - return e.raw_text - - def get_node_txt(ele, pre: bool = False): - tag = ele.tag - if tag == 'br': - return [True] - if not pre and tag == 'pre': - pre = True - - str_list = [] - if tag in noText_list and not pre: # 标签内的文本不返回 - return str_list - - nodes = ele.eles('xpath:./text() | *') - prev_ele = '' - for el in nodes: - if isinstance(el, str): # 字符节点 - if pre: - str_list.append(el) - - else: - if sub('[ \n\t\r]', '', el) != '': # 字符除了回车和空格还有其它内容 - txt = el - if not pre: - txt = txt.replace('\n', ' ').strip(' ') - txt = sub(r' {2,}', ' ', txt) - str_list.append(txt) - - else: # 元素节点 - if el.tag not in nowrap_list and str_list and str_list[-1] != '\n': # 元素间换行的情况 - str_list.append('\n') - if el.tag in tab_list and prev_ele in tab_list: # 表格的行 - str_list.append('\t') - - str_list.extend(get_node_txt(el, pre)) - prev_ele = el.tag - - if tag in wrap_after_list and str_list and str_list[-1] not in ('\n', True): # 有些元素后面要添加回车 - str_list.append('\n') - - return str_list - - re_str = get_node_txt(e) - if re_str and re_str[-1] == '\n': - re_str.pop() - re_str = ''.join([i if i is not True else '\n' for i in re_str]) - return format_html(re_str) - - -def get_loc(loc, translate_css=False): - """接收selenium定位元组或本库定位语法,转换为标准定位元组,可翻译css selector为xpath \n - :param loc: selenium定位元组或本库定位语法 - :param translate_css: 是否翻译css selector为xpath - :return: DrissionPage定位元组 - """ - if isinstance(loc, tuple): - loc = translate_loc(loc) - - elif isinstance(loc, str): - loc = str_to_loc(loc) - - else: - raise TypeError('loc参数只能是tuple或str。') - - if loc[0] == 'css selector' and translate_css: - from lxml.cssselect import CSSSelector, ExpressionError - try: - path = str(CSSSelector(loc[1], translator='html').path) - path = path[20:] if path.startswith('descendant-or-self::') else path - loc = 'xpath', path - except ExpressionError: - pass - - return loc - - -def str_to_loc(loc): - """处理元素查找语句 \n - 查找方式:属性、tag name及属性、文本、xpath、css selector、id、class \n - @表示属性,.表示class,#表示id,=表示精确匹配,:表示模糊匹配,无控制字符串时默认搜索该字符串 \n - """ - loc_by = 'xpath' - - if loc.startswith('.'): - if loc.startswith(('.=', '.:',)): - loc = loc.replace('.', '@class', 1) - else: - loc = loc.replace('.', '@class=', 1) - - elif loc.startswith('#'): - if loc.startswith(('#=', '#:',)): - loc = loc.replace('#', '@id', 1) - else: - loc = loc.replace('#', '@id=', 1) - - elif loc.startswith(('t:', 't=')): - loc = f'tag:{loc[2:]}' - - elif loc.startswith(('tx:', 'tx=')): - loc = f'text{loc[2:]}' - - # ------------------------------------------------------------------ - # 多属性查找 - if loc.startswith('@@') and loc != '@@': - loc_str = _make_multi_xpath_str('*', loc) - - # 单属性查找 - elif loc.startswith('@') and loc != '@': - loc_str = _make_single_xpath_str('*', loc) - - # 根据tag name查找 - elif loc.startswith(('tag:', 'tag=')) and loc not in ('tag:', 'tag='): - at_ind = loc.find('@') - if at_ind == -1: - loc_str = f'//*[name()="{loc[4:]}"]' - else: - if loc[at_ind:].startswith('@@'): - loc_str = _make_multi_xpath_str(loc[4:at_ind], loc[at_ind:]) - else: - loc_str = _make_single_xpath_str(loc[4:at_ind], loc[at_ind:]) - - # 根据文本查找 - elif loc.startswith('text='): - loc_str = f'//*[text()={_make_search_str(loc[5:])}]' - elif loc.startswith('text:') and loc != 'text:': - loc_str = f'//*/text()[contains(., {_make_search_str(loc[5:])})]/..' - - # 用xpath查找 - elif loc.startswith(('xpath:', 'xpath=')) and loc not in ('xpath:', 'xpath='): - loc_str = loc[6:] - elif loc.startswith(('x:', 'x=')) and loc not in ('x:', 'x='): - loc_str = loc[2:] - - # 用css selector查找 - elif loc.startswith(('css:', 'css=')) and loc not in ('css:', 'css='): - loc_by = 'css selector' - loc_str = loc[4:] - elif loc.startswith(('c:', 'c=')) and loc not in ('c:', 'c='): - loc_by = 'css selector' - loc_str = loc[2:] - - # 根据文本模糊查找 - elif loc: - loc_str = f'//*/text()[contains(., {_make_search_str(loc)})]/..' - else: - loc_str = '//*' - - return loc_by, loc_str - - -def _make_single_xpath_str(tag: str, text: str) -> str: - """生成xpath语句 \n - :param tag: 标签名 - :param text: 待处理的字符串 - :return: xpath字符串 - """ - arg_list = [] if tag == '*' else [f'name()="{tag}"'] - arg_str = txt_str = '' - - if text == '@': - arg_str = 'not(@*)' - - else: - r = split(r'([:=])', text, maxsplit=1) - len_r = len(r) - len_r0 = len(r[0]) - if len_r != 3 and len_r0 > 1: - arg_str = 'normalize-space(text())' if r[0] in ('@text()', '@tx()') else f'{r[0]}' - - elif len_r == 3 and len_r0 > 1: - if r[1] == '=': # 精确查找 - arg = '.' if r[0] in ('@text()', '@tx()') else r[0] - arg_str = f'{arg}={_make_search_str(r[2])}' - - else: # 模糊查找 - if r[0] in ('@text()', '@tx()'): - txt_str = f'/text()[contains(., {_make_search_str(r[2])})]/..' - arg_str = '' - else: - arg_str = f"contains({r[0]},{_make_search_str(r[2])})" - - if arg_str: - arg_list.append(arg_str) - arg_str = ' and '.join(arg_list) - return f'//*[{arg_str}]{txt_str}' if arg_str else f'//*{txt_str}' - - -def _make_multi_xpath_str(tag: str, text: str) -> str: - """生成多属性查找的xpath语句 \n - :param tag: 标签名 - :param text: 待处理的字符串 - :return: xpath字符串 - """ - arg_list = [] if tag == '*' else [f'name()="{tag}"'] - args = text.split('@@') - - for arg in args[1:]: - r = split(r'([:=])', arg, maxsplit=1) - arg_str = '' - len_r = len(r) - - if not r[0]: # 不查询任何属性 - arg_str = 'not(@*)' - - else: - r[0], ignore = (r[0][1:], True) if r[0][0] == '-' else (r[0], None) # 是否去除某个属性 - - if len_r != 3: # 只有属性名没有属性内容,查询是否存在该属性 - arg_str = 'normalize-space(text())' if r[0] in ('text()', 'tx()') else f'@{r[0]}' - - elif len_r == 3: # 属性名和内容都有 - arg = '.' if r[0] in ('text()', 'tx()') else f'@{r[0]}' - if r[1] == '=': - arg_str = f'{arg}={_make_search_str(r[2])}' - else: - arg_str = f'contains({arg},{_make_search_str(r[2])})' - - if arg_str and ignore: - arg_str = f'not({arg_str})' - - if arg_str: - arg_list.append(arg_str) - - arg_str = ' and '.join(arg_list) - return f'//*[{arg_str}]' if arg_str else f'//*' - - -def _make_search_str(search_str: str) -> str: - """将"转义,不知何故不能直接用 \ 来转义 \n - :param search_str: 查询字符串 - :return: 把"转义后的字符串 - """ - parts = search_str.split('"') - parts_num = len(parts) - search_str = 'concat(' - - for key, i in enumerate(parts): - search_str += f'"{i}"' - search_str += ',' + '\'"\',' if key < parts_num - 1 else '' - - search_str += ',"")' - return search_str - - -def translate_loc(loc): - """把By类型的loc元组转换为css selector或xpath类型的 \n - :param loc: By类型的loc元组 - :return: css selector或xpath类型的loc元组 - """ - if len(loc) != 2: - raise ValueError('定位符长度必须为2。') - - loc_by = 'xpath' - loc_0 = loc[0].lower() - - if loc_0 == 'xpath': - loc_str = loc[1] - - elif loc_0 == 'css selector': - loc_by = loc_0 - loc_str = loc[1] - - elif loc_0 == 'id': - loc_str = f'//*[@id="{loc[1]}"]' - - elif loc_0 == 'class name': - loc_str = f'//*[@class="{loc[1]}"]' - - elif loc_0 == 'link text': - loc_str = f'//a[text()="{loc[1]}"]' - - elif loc_0 == 'name': - loc_str = f'//*[@name="{loc[1]}"]' - - elif loc_0 == 'tag name': - loc_str = f'//{loc[1]}' - - elif loc_0 == 'partial link text': - loc_str = f'//a[contains(text(),"{loc[1]}")]' - - else: - raise ValueError('无法识别的定位符。') - - return loc_by, loc_str - - -def format_html(text): - """处理html编码字符 \n - :param text: html文本 - :return: 格式化后的html文本 - """ - return unescape(text).replace('\xa0', ' ') if text else text - - -def cookie_to_dict(cookie): - """把Cookie对象转为dict格式 \n - :param cookie: Cookie对象 - :return: cookie字典 - """ - if isinstance(cookie, Cookie): - cookie_dict = cookie.__dict__.copy() - cookie_dict.pop('rfc2109') - cookie_dict.pop('_rest') - return cookie_dict - - elif isinstance(cookie, dict): - cookie_dict = cookie - - elif isinstance(cookie, str): - cookie = cookie.split(',' if ',' in cookie else ';') - cookie_dict = {} - - for key, attr in enumerate(cookie): - attr_val = attr.lstrip().split('=') - - if key == 0: - cookie_dict['name'] = attr_val[0] - cookie_dict['value'] = attr_val[1] if len(attr_val) == 2 else '' - else: - cookie_dict[attr_val[0]] = attr_val[1] if len(attr_val) == 2 else '' - - return cookie_dict - - else: - raise TypeError('cookie参数必须为Cookie、str或dict类型。') - - return cookie_dict - - -def cookies_to_tuple(cookies): - """把cookies转为tuple格式 \n - :param cookies: cookies信息,可为CookieJar, list, tuple, str, dict - :return: 返回tuple形式的cookies - """ - if isinstance(cookies, (list, tuple, RequestsCookieJar)): - cookies = tuple(cookie_to_dict(cookie) for cookie in cookies) - - elif isinstance(cookies, str): - cookies = tuple(cookie_to_dict(cookie.lstrip()) for cookie in cookies.split(";")) - - elif isinstance(cookies, dict): - cookies = tuple({'name': cookie, 'value': cookies[cookie]} for cookie in cookies) - - else: - raise TypeError('cookies参数必须为RequestsCookieJar、list、tuple、str或dict类型。') - - return cookies - - -def clean_folder(folder_path, ignore=None): - """清空一个文件夹,除了ignore里的文件和文件夹 \n - :param folder_path: 要清空的文件夹路径 - :param ignore: 忽略列表 - :return: None - """ - ignore = [] if not ignore else ignore - p = Path(folder_path) - - for f in p.iterdir(): - if f.name not in ignore: - if f.is_file(): - f.unlink() - elif f.is_dir(): - rmtree(f, True) - - -def unzip(zip_path, to_path): - """解压下载的chromedriver.zip文件""" - if not zip_path: - return - - with ZipFile(zip_path, 'r') as f: - return [f.extract(f.namelist()[0], path=to_path)] - - -def get_exe_from_port(port): - """获取端口号第一条进程的可执行文件路径 \n - :param port: 端口号 - :return: 可执行文件的绝对路径 - """ - from os import popen - - pid = get_pid_from_port(port) - if not pid: - return - else: - file_lst = popen(f'wmic process where processid={pid} get executablepath').read().split('\n') - return file_lst[2].strip() if len(file_lst) > 2 else None - - -def get_pid_from_port(port): - """获取端口号第一条进程的pid \n - :param port: 端口号 - :return: 进程id - """ - from platform import system - if system().lower() != 'windows' or port is None: - return None - - from os import popen - from time import perf_counter - - try: # 避免Anaconda中可能产生的报错 - process = popen(f'netstat -ano |findstr {port}').read().split('\n')[0] - - t = perf_counter() - while not process and perf_counter() - t < 5: - process = popen(f'netstat -ano |findstr {port}').read().split('\n')[0] - - return process.split(' ')[-1] or None - - except AttributeError: - return None - - -def get_usable_path(path): - """检查文件或文件夹是否有重名,并返回可以使用的路径 \n - :param path: 文件或文件夹路径 - :return: 可用的路径,Path对象 - """ - path = Path(path) - parent = path.parent - path = parent / make_valid_name(path.name) - name = path.stem if path.is_file() else path.name - ext = path.suffix if path.is_file() else '' - - first_time = True - - while path.exists(): - r = search(r'(.*)_(\d+)$', name) - - if not r or (r and first_time): - src_name, num = name, '1' - else: - src_name, num = r.group(1), int(r.group(2)) + 1 - - name = f'{src_name}_{num}' - path = parent / f'{name}{ext}' - first_time = None - - return path - - -def make_valid_name(full_name): - """获取有效的文件名 \n - :param full_name: 文件名 - :return: 可用的文件名 - """ - # ----------------去除前后空格---------------- - full_name = full_name.strip() - - # ----------------使总长度不大于255个字符(一个汉字是2个字符)---------------- - r = search(r'(.*)(\.[^.]+$)', full_name) # 拆分文件名和后缀名 - if r: - name, ext = r.group(1), r.group(2) - ext_long = len(ext) - else: - name, ext = full_name, '' - ext_long = 0 - - while get_long(name) > 255 - ext_long: - name = name[:-1] - - full_name = f'{name}{ext}' - - # ----------------去除不允许存在的字符---------------- - return sub(r'[<>/\\|:*?\n]', '', full_name) - - -def get_long(txt): - """返回字符串中字符个数(一个汉字是2个字符) \n - :param txt: 字符串 - :return: 字符个数 - """ - txt_len = len(txt) - return int((len(txt.encode('utf-8')) - txt_len) / 2 + txt_len) - - -def make_absolute_link(link, page=None): - """获取绝对url - :param link: 超链接 - :param page: 页面对象 - :return: 绝对链接 - """ - if not link: - return link - - parsed = urlparse(link)._asdict() - - # 是相对路径,与页面url拼接并返回 - if not parsed['netloc']: - return urljoin(page.url, link) if page else link - - # 是绝对路径但缺少协议,从页面url获取协议并修复 - if not parsed['scheme'] and page: - parsed['scheme'] = urlparse(page.url).scheme - parsed = tuple(v for v in parsed.values()) - return urlunparse(parsed) - - # 绝对路径且不缺协议,直接返回 - return link - - -def is_js_func(func): - """检查文本是否js函数""" - func = func.strip() - if func.startswith('function') or func.startswith('async '): - return True - elif '=>' in func: - return True - return False - - -def port_is_using(ip, port): - """检查端口是否被占用 \n - :param ip: 浏览器地址 - :param port: 浏览器端口 - :return: bool - """ - from socket import socket, AF_INET, SOCK_STREAM - s = socket(AF_INET, SOCK_STREAM) - result = s.connect_ex((ip, int(port))) - s.close() - return result == 0 - - -def connect_browser(option): - """连接或启动浏览器 \n - :param option: DriverOptions对象 - :return: chrome 路径和进程对象组成的元组 - """ - system_type = system().lower() - debugger_address = option.debugger_address - chrome_path = option.browser_path - - debugger_address = debugger_address[7:] if debugger_address.startswith('http://') else debugger_address - ip, port = debugger_address.split(':') - if ip not in ('127.0.0.1', 'localhost'): - return None, None - - if port_is_using(ip, port): - chrome_path = get_exe_from_port(port) if chrome_path == 'chrome' and system_type == 'windows' else chrome_path - return chrome_path, None - - args = get_launch_args(option) - set_prefs(option) - - # ----------创建浏览器进程---------- - try: - debugger = _run_browser(port, chrome_path, args) - if chrome_path == 'chrome' and system_type == 'windows': - chrome_path = get_exe_from_port(port) - - # 传入的路径找不到,主动在ini文件、注册表、系统变量中找 - except FileNotFoundError: - from DrissionPage.easy_set import get_chrome_path - chrome_path = get_chrome_path(show_msg=False) - - if not chrome_path: - raise FileNotFoundError('无法找到chrome路径,请手动配置。') - - debugger = _run_browser(port, chrome_path, args) - - return chrome_path, debugger - - -def get_launch_args(opt): - """从DriverOptions获取命令行启动参数""" - sys = system().lower() - result = [] - - # ----------处理arguments----------- - args = opt.arguments - for arg in args: - index = arg.find('=') + 1 - if index == 0: - result.append(arg) - else: - a = arg[index:].strip() - if a.startswith('"') and a.endswith('"'): - result.append(arg) - else: - result.append(f'{arg[:index]}"{a}"') - - # ----------处理插件extensions------------- - ext = opt._extension_files if isinstance(opt, DriverOptions) else opt.extensions - if ext: - ext = set(ext) - if sys == 'windows': - ext = '","'.join(ext) - ext = f'"{ext}"' - else: - ext = ','.join(ext) - ext = f'--load-extension={ext}' - result.append(ext) - - return result - - -def set_prefs(opt): - """处理启动配置中的prefs项,目前只能对已存在文件夹配置""" - # todo: 支持删除pref项 - prefs = opt.experimental_options.get('prefs', None) if isinstance(opt, DriverOptions) else opt.preferences - if prefs and opt.user_data_path: - args = opt.arguments - user = 'Default' - for arg in args: - if arg.startswith('--profile-directory'): - user = arg.split('=')[-1].strip() - break - - prefs_file = Path(opt.user_data_path) / user / 'Preferences' - if not prefs_file.exists(): - prefs_file.parent.mkdir(parents=True, exist_ok=True) - with open(prefs_file, 'w') as f: - f.write('{}') - - from json import load, dump - with open(prefs_file, "r", encoding='utf-8') as f: - j = load(f) - - for pref in prefs: - value = prefs[pref] - pref = pref.split('.') - _make_leave_in_dict(j, pref, 0, len(pref)) - _set_value_to_dict(j, pref, value) - - with open(prefs_file, 'w', encoding='utf-8') as f: - dump(j, f) - - -def _run_browser(port, path: str, args) -> Popen: - """创建chrome进程 \n - :param port: 端口号 - :param path: 浏览器地址 - :param args: 启动参数 - :return: 进程对象 - """ - sys = system().lower() - if sys == 'windows': - args = ' '.join(args) - debugger = Popen(f'"{path}" --remote-debugging-port={port} {args}', shell=False) - else: - arguments = [path, f'--remote-debugging-port={port}'] + list(args) - debugger = Popen(arguments, shell=False) - - t1 = perf_counter() - while perf_counter() - t1 < 10: - try: - tabs = requests_get(f'http://127.0.0.1:{port}/json').json() - for tab in tabs: - if tab['type'] == 'page': - return debugger - except Exception: - sleep(.2) - - raise ConnectionError('无法连接浏览器。') - - -def _make_leave_in_dict(target_dict: dict, src: list, num: int, end: int) -> None: - """把prefs中a.b.c形式的属性转为a['b']['c']形式 - :param target_dict: 要处理的dict - :param src: 属性层级列表[a, b, c] - :param num: 当前处理第几个 - :param end: src长度 - :return: None - """ - if num == end: - return - if src[num] not in target_dict: - target_dict[src[num]] = {} - num += 1 - _make_leave_in_dict(target_dict[src[num - 1]], src, num, end) - - -def _set_value_to_dict(target_dict: dict, src: list, value) -> None: - """把a.b.c形式的属性的值赋值到a['b']['c']形式的字典中 - :param target_dict: 要处理的dict - :param src: 属性层级列表[a, b, c] - :param value: 属性值 - :return: None - """ - src = "']['".join(src) - src = f"target_dict['{src}']=value" - exec(src) - - -def location_in_viewport(page, loc_x, loc_y): - """判断给定的坐标是否在视口中 |n - :param page: ChromePage对象 - :param loc_x: 页面绝对坐标x - :param loc_y: 页面绝对坐标y - :return: - """ - js = f'''function(){{var x = {loc_x}; var y = {loc_y}; - const scrollLeft = document.documentElement.scrollLeft; - const scrollTop = document.documentElement.scrollTop; - const vWidth = document.documentElement.clientWidth; - const vHeight = document.documentElement.clientHeight; - if (x< scrollLeft || y < scrollTop || x > vWidth + scrollLeft || y > vHeight + scrollTop){{return false;}} - return true;}}''' - return page.run_js(js) - # const vWidth = window.innerWidth || document.documentElement.clientWidth; - # const vHeight = window.innerHeight || document.documentElement.clientHeight; - - -def offset_scroll(ele, offset_x, offset_y): - """接收元素及偏移坐标,把坐标滚动到页面中间,返回该点在视口中的坐标 \n - 有偏移量时以元素左上角坐标为基准,没有时以_click_point为基准 - :param ele: 元素对象 - :param offset_x: 偏移量x - :param offset_y: 偏移量y - :return: 视口中的坐标 - """ - loc_x, loc_y = ele.location - cp_x, cp_y = ele._click_point - lx = loc_x + offset_x if offset_x else cp_x - ly = loc_y + offset_y if offset_y else cp_y - if not location_in_viewport(ele.page, lx, ly): - clientWidth = ele.page.run_js('return document.body.clientWidth;') - clientHeight = ele.page.run_js('return document.body.clientHeight;') - ele.page.scroll.to_location(lx - clientWidth // 2, ly - clientHeight // 2) - cl_x, cl_y = ele.client_location - ccp_x, ccp_y = ele._client_click_point - cx = cl_x + offset_x if offset_x else ccp_x - cy = cl_y + offset_y if offset_y else ccp_y - return cx, cy diff --git a/DrissionPage/common.pyi b/DrissionPage/common.pyi deleted file mode 100644 index 09236f0..0000000 --- a/DrissionPage/common.pyi +++ /dev/null @@ -1,81 +0,0 @@ -# -*- coding:utf-8 -*- -""" -@Author : g1879 -@Contact : g1879@qq.com -""" -from http.cookiejar import Cookie -from pathlib import Path -from typing import Union - -from requests.cookies import RequestsCookieJar - -from .base import BasePage, DrissionElement -from .chromium_element import ChromiumElement -from .config import DriverOptions -from .configs.chromium_options import ChromiumOptions - - -def get_ele_txt(e: DrissionElement) -> str: ... - - -def get_loc(loc: Union[tuple, str], translate_css: bool = False) -> tuple: ... - - -def str_to_loc(loc: str) -> tuple: ... - - -def translate_loc(loc: tuple) -> tuple: ... - - -def format_html(text: str) -> str: ... - - -def cookie_to_dict(cookie: Union[Cookie, str, dict]) -> dict: ... - - -def cookies_to_tuple(cookies: Union[RequestsCookieJar, list, tuple, str, dict]) -> tuple: ... - - -def clean_folder(folder_path: str, ignore: list = None) -> None: ... - - -def unzip(zip_path: str, to_path: str) -> Union[list, None]: ... - - -def get_exe_from_port(port: Union[str, int]) -> Union[str, None]: ... - - -def get_pid_from_port(port: Union[str, int]) -> Union[str, None]: ... - - -def get_usable_path(path: Union[str, Path]) -> Path: ... - - -def make_valid_name(full_name: str) -> str: ... - - -def get_long(txt) -> int: ... - - -def make_absolute_link(link, page: BasePage = None) -> str: ... - - -def is_js_func(func: str) -> bool: ... - - -def port_is_using(ip: str, port: Union[str, int]) -> bool: ... - - -def connect_browser(option: Union[ChromiumOptions, DriverOptions]) -> tuple: ... - - -def get_launch_args(opt: Union[ChromiumOptions, DriverOptions]) -> list: ... - - -def set_prefs(opt: Union[ChromiumOptions, DriverOptions]) -> None: ... - - -def location_in_viewport(page, loc_x: int, loc_y: int) -> bool: ... - - -def offset_scroll(ele: ChromiumElement, offset_x: int, offset_y: int) -> tuple: ... diff --git a/DrissionPage/configs/chromium_options.py b/DrissionPage/configs/chromium_options.py index cfee57a..8982edc 100644 --- a/DrissionPage/configs/chromium_options.py +++ b/DrissionPage/configs/chromium_options.py @@ -2,7 +2,7 @@ from pathlib import Path from shutil import rmtree -from DrissionPage.common import port_is_using +from DrissionPage.functions.tools import port_is_using from .options_manage import OptionsManager diff --git a/DrissionPage/configs/session_options.py b/DrissionPage/configs/session_options.py index e2b4525..d2b16f0 100644 --- a/DrissionPage/configs/session_options.py +++ b/DrissionPage/configs/session_options.py @@ -2,7 +2,7 @@ from pathlib import Path -from DrissionPage.common import cookies_to_tuple +from DrissionPage.functions.web import cookies_to_tuple from .options_manage import OptionsManager diff --git a/DrissionPage/drission.py b/DrissionPage/drission.py index 9230acb..c4afc4f 100644 --- a/DrissionPage/drission.py +++ b/DrissionPage/drission.py @@ -14,7 +14,9 @@ from selenium.webdriver.chrome.options import Options from selenium.webdriver.remote.webdriver import WebDriver as RemoteWebDriver from tldextract import extract -from .common import get_pid_from_port, connect_browser, cookies_to_tuple +from .functions.tools import get_pid_from_port +from .functions.browser import connect_browser +from .functions.web import cookies_to_tuple from .configs.session_options import SessionOptions, session_options_to_dict from .configs.driver_options import DriverOptions diff --git a/DrissionPage/driver_element.py b/DrissionPage/driver_element.py index 0c5c8ec..2a0cc96 100644 --- a/DrissionPage/driver_element.py +++ b/DrissionPage/driver_element.py @@ -15,7 +15,9 @@ from selenium.webdriver.support import expected_conditions as ec from selenium.webdriver.support.wait import WebDriverWait from .base import DrissionElement, BaseElement -from .common import str_to_loc, get_usable_path, format_html, get_ele_txt, get_loc +from .functions.locator import str_to_loc, get_loc +from .functions.tools import get_usable_path +from .functions.web import format_html, get_ele_txt from .session_element import make_session_ele diff --git a/DrissionPage/driver_page.py b/DrissionPage/driver_page.py index 68d6b26..02350a7 100644 --- a/DrissionPage/driver_page.py +++ b/DrissionPage/driver_page.py @@ -13,7 +13,7 @@ from selenium.webdriver.remote.webelement import WebElement from selenium.webdriver.support.wait import WebDriverWait from .base import BasePage -from .common import get_usable_path +from .functions.tools import get_usable_path from .driver_element import DriverElement, make_driver_ele, Scroll, ElementWaiter from .session_element import make_session_ele diff --git a/DrissionPage/functions/browser.py b/DrissionPage/functions/browser.py new file mode 100644 index 0000000..a0d0e3f --- /dev/null +++ b/DrissionPage/functions/browser.py @@ -0,0 +1,156 @@ +# -*- coding:utf-8 -*- +from pathlib import Path +from platform import system +from subprocess import Popen +from time import perf_counter, sleep + +from requests import get as requests_get + +from DrissionPage.configs.driver_options import DriverOptions +from .tools import port_is_using, get_exe_from_port + + +def connect_browser(option): + """连接或启动浏览器 \n + :param option: DriverOptions对象 + :return: chrome 路径和进程对象组成的元组 + """ + system_type = system().lower() + debugger_address = option.debugger_address + chrome_path = option.browser_path + + debugger_address = debugger_address[7:] if debugger_address.startswith('http://') else debugger_address + ip, port = debugger_address.split(':') + if ip not in ('127.0.0.1', 'localhost'): + return None, None + + if port_is_using(ip, port): + chrome_path = get_exe_from_port(port) if chrome_path == 'chrome' and system_type == 'windows' else chrome_path + return chrome_path, None + + args = get_launch_args(option) + set_prefs(option) + + # ----------创建浏览器进程---------- + try: + debugger = _run_browser(port, chrome_path, args) + if chrome_path == 'chrome' and system_type == 'windows': + chrome_path = get_exe_from_port(port) + + # 传入的路径找不到,主动在ini文件、注册表、系统变量中找 + except FileNotFoundError: + from DrissionPage.easy_set import get_chrome_path + chrome_path = get_chrome_path(show_msg=False) + + if not chrome_path: + raise FileNotFoundError('无法找到chrome路径,请手动配置。') + + debugger = _run_browser(port, chrome_path, args) + + return chrome_path, debugger + + +def get_launch_args(opt): + """从DriverOptions获取命令行启动参数 + :param opt: DriverOptions或ChromiumOptions + :return: 启动参数列表 + """ + # ----------处理arguments----------- + result = set(i for i in opt.arguments if not i.startswith(('--load-extension=', '--remote-debugging-port='))) + result = list(result) + + # ----------处理插件extensions------------- + ext = opt._extension_files if isinstance(opt, DriverOptions) else opt.extensions + if ext: + ext = ','.join(set(ext)) + ext = f'--load-extension={ext}' + result.append(ext) + + return result + + +def set_prefs(opt): + """处理启动配置中的prefs项,目前只能对已存在文件夹配置 + :param opt: DriverOptions或ChromiumOptions + :return: None + """ + # todo: 支持删除pref项 + prefs = opt.experimental_options.get('prefs', None) if isinstance(opt, DriverOptions) else opt.preferences + if prefs and opt.user_data_path: + args = opt.arguments + user = 'Default' + for arg in args: + if arg.startswith('--profile-directory'): + user = arg.split('=')[-1].strip() + break + + prefs_file = Path(opt.user_data_path) / user / 'Preferences' + if not prefs_file.exists(): + prefs_file.parent.mkdir(parents=True, exist_ok=True) + with open(prefs_file, 'w') as f: + f.write('{}') + + from json import load, dump + with open(prefs_file, "r", encoding='utf-8') as f: + j = load(f) + + for pref in prefs: + value = prefs[pref] + pref = pref.split('.') + _make_leave_in_dict(j, pref, 0, len(pref)) + _set_value_to_dict(j, pref, value) + + with open(prefs_file, 'w', encoding='utf-8') as f: + dump(j, f) + + +def _run_browser(port, path: str, args) -> Popen: + """创建chrome进程 \n + :param port: 端口号 + :param path: 浏览器地址 + :param args: 启动参数 + :return: 进程对象 + """ + arguments = [path, f'--remote-debugging-port={port}'] + arguments.extend(args) + debugger = Popen(arguments, shell=False) + + end_time = perf_counter() + 10 + while perf_counter() < end_time: + try: + tabs = requests_get(f'http://127.0.0.1:{port}/json').json() + for tab in tabs: + if tab['type'] == 'page': + return debugger + except Exception: + sleep(.2) + + raise ConnectionError('无法连接浏览器。') + + +def _make_leave_in_dict(target_dict: dict, src: list, num: int, end: int) -> None: + """把prefs中a.b.c形式的属性转为a['b']['c']形式 + :param target_dict: 要处理的dict + :param src: 属性层级列表[a, b, c] + :param num: 当前处理第几个 + :param end: src长度 + :return: None + """ + if num == end: + return + if src[num] not in target_dict: + target_dict[src[num]] = {} + num += 1 + _make_leave_in_dict(target_dict[src[num - 1]], src, num, end) + + +def _set_value_to_dict(target_dict: dict, src: list, value) -> None: + """把a.b.c形式的属性的值赋值到a['b']['c']形式的字典中 + :param target_dict: 要处理的dict + :param src: 属性层级列表[a, b, c] + :param value: 属性值 + :return: None + """ + src = "']['".join(src) + src = f"target_dict['{src}']=value" + exec(src) diff --git a/DrissionPage/functions/browser.pyi b/DrissionPage/functions/browser.pyi new file mode 100644 index 0000000..f233715 --- /dev/null +++ b/DrissionPage/functions/browser.pyi @@ -0,0 +1,14 @@ +# -*- coding:utf-8 -*- +from typing import Union + +from DrissionPage.configs.chromium_options import ChromiumOptions +from DrissionPage.configs.driver_options import DriverOptions + + +def connect_browser(option: Union[ChromiumOptions, DriverOptions]) -> tuple: ... + + +def get_launch_args(opt: Union[ChromiumOptions, DriverOptions]) -> list: ... + + +def set_prefs(opt: Union[ChromiumOptions, DriverOptions]) -> None: ... diff --git a/DrissionPage/functions/locator.py b/DrissionPage/functions/locator.py new file mode 100644 index 0000000..1547e11 --- /dev/null +++ b/DrissionPage/functions/locator.py @@ -0,0 +1,243 @@ +# -*- coding:utf-8 -*- +""" +@Author : g1879 +@Contact : g1879@qq.com +""" +from re import split + + +def get_loc(loc, translate_css=False): + """接收selenium定位元组或本库定位语法,转换为标准定位元组,可翻译css selector为xpath \n + :param loc: selenium定位元组或本库定位语法 + :param translate_css: 是否翻译css selector为xpath + :return: DrissionPage定位元组 + """ + if isinstance(loc, tuple): + loc = translate_loc(loc) + + elif isinstance(loc, str): + loc = str_to_loc(loc) + + else: + raise TypeError('loc参数只能是tuple或str。') + + if loc[0] == 'css selector' and translate_css: + from lxml.cssselect import CSSSelector, ExpressionError + try: + path = str(CSSSelector(loc[1], translator='html').path) + path = path[20:] if path.startswith('descendant-or-self::') else path + loc = 'xpath', path + except ExpressionError: + pass + + return loc + + +def str_to_loc(loc): + """处理元素查找语句 \n + 查找方式:属性、tag name及属性、文本、xpath、css selector、id、class \n + @表示属性,.表示class,#表示id,=表示精确匹配,:表示模糊匹配,无控制字符串时默认搜索该字符串 \n + """ + loc_by = 'xpath' + + if loc.startswith('.'): + if loc.startswith(('.=', '.:',)): + loc = loc.replace('.', '@class', 1) + else: + loc = loc.replace('.', '@class=', 1) + + elif loc.startswith('#'): + if loc.startswith(('#=', '#:',)): + loc = loc.replace('#', '@id', 1) + else: + loc = loc.replace('#', '@id=', 1) + + elif loc.startswith(('t:', 't=')): + loc = f'tag:{loc[2:]}' + + elif loc.startswith(('tx:', 'tx=')): + loc = f'text{loc[2:]}' + + # ------------------------------------------------------------------ + # 多属性查找 + if loc.startswith('@@') and loc != '@@': + loc_str = _make_multi_xpath_str('*', loc) + + # 单属性查找 + elif loc.startswith('@') and loc != '@': + loc_str = _make_single_xpath_str('*', loc) + + # 根据tag name查找 + elif loc.startswith(('tag:', 'tag=')) and loc not in ('tag:', 'tag='): + at_ind = loc.find('@') + if at_ind == -1: + loc_str = f'//*[name()="{loc[4:]}"]' + else: + if loc[at_ind:].startswith('@@'): + loc_str = _make_multi_xpath_str(loc[4:at_ind], loc[at_ind:]) + else: + loc_str = _make_single_xpath_str(loc[4:at_ind], loc[at_ind:]) + + # 根据文本查找 + elif loc.startswith('text='): + loc_str = f'//*[text()={_make_search_str(loc[5:])}]' + elif loc.startswith('text:') and loc != 'text:': + loc_str = f'//*/text()[contains(., {_make_search_str(loc[5:])})]/..' + + # 用xpath查找 + elif loc.startswith(('xpath:', 'xpath=')) and loc not in ('xpath:', 'xpath='): + loc_str = loc[6:] + elif loc.startswith(('x:', 'x=')) and loc not in ('x:', 'x='): + loc_str = loc[2:] + + # 用css selector查找 + elif loc.startswith(('css:', 'css=')) and loc not in ('css:', 'css='): + loc_by = 'css selector' + loc_str = loc[4:] + elif loc.startswith(('c:', 'c=')) and loc not in ('c:', 'c='): + loc_by = 'css selector' + loc_str = loc[2:] + + # 根据文本模糊查找 + elif loc: + loc_str = f'//*/text()[contains(., {_make_search_str(loc)})]/..' + else: + loc_str = '//*' + + return loc_by, loc_str + + +def _make_single_xpath_str(tag: str, text: str) -> str: + """生成xpath语句 \n + :param tag: 标签名 + :param text: 待处理的字符串 + :return: xpath字符串 + """ + arg_list = [] if tag == '*' else [f'name()="{tag}"'] + arg_str = txt_str = '' + + if text == '@': + arg_str = 'not(@*)' + + else: + r = split(r'([:=])', text, maxsplit=1) + len_r = len(r) + len_r0 = len(r[0]) + if len_r != 3 and len_r0 > 1: + arg_str = 'normalize-space(text())' if r[0] in ('@text()', '@tx()') else f'{r[0]}' + + elif len_r == 3 and len_r0 > 1: + if r[1] == '=': # 精确查找 + arg = '.' if r[0] in ('@text()', '@tx()') else r[0] + arg_str = f'{arg}={_make_search_str(r[2])}' + + else: # 模糊查找 + if r[0] in ('@text()', '@tx()'): + txt_str = f'/text()[contains(., {_make_search_str(r[2])})]/..' + arg_str = '' + else: + arg_str = f"contains({r[0]},{_make_search_str(r[2])})" + + if arg_str: + arg_list.append(arg_str) + arg_str = ' and '.join(arg_list) + return f'//*[{arg_str}]{txt_str}' if arg_str else f'//*{txt_str}' + + +def _make_multi_xpath_str(tag: str, text: str) -> str: + """生成多属性查找的xpath语句 \n + :param tag: 标签名 + :param text: 待处理的字符串 + :return: xpath字符串 + """ + arg_list = [] if tag == '*' else [f'name()="{tag}"'] + args = text.split('@@') + + for arg in args[1:]: + r = split(r'([:=])', arg, maxsplit=1) + arg_str = '' + len_r = len(r) + + if not r[0]: # 不查询任何属性 + arg_str = 'not(@*)' + + else: + r[0], ignore = (r[0][1:], True) if r[0][0] == '-' else (r[0], None) # 是否去除某个属性 + + if len_r != 3: # 只有属性名没有属性内容,查询是否存在该属性 + arg_str = 'normalize-space(text())' if r[0] in ('text()', 'tx()') else f'@{r[0]}' + + elif len_r == 3: # 属性名和内容都有 + arg = '.' if r[0] in ('text()', 'tx()') else f'@{r[0]}' + if r[1] == '=': + arg_str = f'{arg}={_make_search_str(r[2])}' + else: + arg_str = f'contains({arg},{_make_search_str(r[2])})' + + if arg_str and ignore: + arg_str = f'not({arg_str})' + + if arg_str: + arg_list.append(arg_str) + + arg_str = ' and '.join(arg_list) + return f'//*[{arg_str}]' if arg_str else f'//*' + + +def _make_search_str(search_str: str) -> str: + """将"转义,不知何故不能直接用 \ 来转义 \n + :param search_str: 查询字符串 + :return: 把"转义后的字符串 + """ + parts = search_str.split('"') + parts_num = len(parts) + search_str = 'concat(' + + for key, i in enumerate(parts): + search_str += f'"{i}"' + search_str += ',' + '\'"\',' if key < parts_num - 1 else '' + + search_str += ',"")' + return search_str + + +def translate_loc(loc): + """把By类型的loc元组转换为css selector或xpath类型的 \n + :param loc: By类型的loc元组 + :return: css selector或xpath类型的loc元组 + """ + if len(loc) != 2: + raise ValueError('定位符长度必须为2。') + + loc_by = 'xpath' + loc_0 = loc[0].lower() + + if loc_0 == 'xpath': + loc_str = loc[1] + + elif loc_0 == 'css selector': + loc_by = loc_0 + loc_str = loc[1] + + elif loc_0 == 'id': + loc_str = f'//*[@id="{loc[1]}"]' + + elif loc_0 == 'class name': + loc_str = f'//*[@class="{loc[1]}"]' + + elif loc_0 == 'link text': + loc_str = f'//a[text()="{loc[1]}"]' + + elif loc_0 == 'name': + loc_str = f'//*[@name="{loc[1]}"]' + + elif loc_0 == 'tag name': + loc_str = f'//{loc[1]}' + + elif loc_0 == 'partial link text': + loc_str = f'//a[contains(text(),"{loc[1]}")]' + + else: + raise ValueError('无法识别的定位符。') + + return loc_by, loc_str diff --git a/DrissionPage/functions/locator.pyi b/DrissionPage/functions/locator.pyi new file mode 100644 index 0000000..1038890 --- /dev/null +++ b/DrissionPage/functions/locator.pyi @@ -0,0 +1,15 @@ +# -*- coding:utf-8 -*- +""" +@Author : g1879 +@Contact : g1879@qq.com +""" +from typing import Union + + +def get_loc(loc: Union[tuple, str], translate_css: bool = False) -> tuple: ... + + +def str_to_loc(loc: str) -> tuple: ... + + +def translate_loc(loc: tuple) -> tuple: ... diff --git a/DrissionPage/functions/tools.py b/DrissionPage/functions/tools.py new file mode 100644 index 0000000..a14aad2 --- /dev/null +++ b/DrissionPage/functions/tools.py @@ -0,0 +1,151 @@ +# -*- coding:utf-8 -*- +""" +@Author : g1879 +@Contact : g1879@qq.com +""" +from pathlib import Path +from re import search, sub +from shutil import rmtree +from zipfile import ZipFile + + +def get_exe_from_port(port): + """获取端口号第一条进程的可执行文件路径 \n + :param port: 端口号 + :return: 可执行文件的绝对路径 + """ + from os import popen + + pid = get_pid_from_port(port) + if not pid: + return + else: + file_lst = popen(f'wmic process where processid={pid} get executablepath').read().split('\n') + return file_lst[2].strip() if len(file_lst) > 2 else None + + +def get_pid_from_port(port): + """获取端口号第一条进程的pid \n + :param port: 端口号 + :return: 进程id + """ + from platform import system + if system().lower() != 'windows' or port is None: + return None + + from os import popen + from time import perf_counter + + try: # 避免Anaconda中可能产生的报错 + process = popen(f'netstat -ano |findstr {port}').read().split('\n')[0] + + t = perf_counter() + while not process and perf_counter() - t < 5: + process = popen(f'netstat -ano |findstr {port}').read().split('\n')[0] + + return process.split(' ')[-1] or None + + except AttributeError: + return None + + +def get_usable_path(path): + """检查文件或文件夹是否有重名,并返回可以使用的路径 \n + :param path: 文件或文件夹路径 + :return: 可用的路径,Path对象 + """ + path = Path(path) + parent = path.parent + path = parent / make_valid_name(path.name) + name = path.stem if path.is_file() else path.name + ext = path.suffix if path.is_file() else '' + + first_time = True + + while path.exists(): + r = search(r'(.*)_(\d+)$', name) + + if not r or (r and first_time): + src_name, num = name, '1' + else: + src_name, num = r.group(1), int(r.group(2)) + 1 + + name = f'{src_name}_{num}' + path = parent / f'{name}{ext}' + first_time = None + + return path + + +def make_valid_name(full_name): + """获取有效的文件名 \n + :param full_name: 文件名 + :return: 可用的文件名 + """ + # ----------------去除前后空格---------------- + full_name = full_name.strip() + + # ----------------使总长度不大于255个字符(一个汉字是2个字符)---------------- + r = search(r'(.*)(\.[^.]+$)', full_name) # 拆分文件名和后缀名 + if r: + name, ext = r.group(1), r.group(2) + ext_long = len(ext) + else: + name, ext = full_name, '' + ext_long = 0 + + while get_long(name) > 255 - ext_long: + name = name[:-1] + + full_name = f'{name}{ext}' + + # ----------------去除不允许存在的字符---------------- + return sub(r'[<>/\\|:*?\n]', '', full_name) + + +def get_long(txt): + """返回字符串中字符个数(一个汉字是2个字符) \n + :param txt: 字符串 + :return: 字符个数 + """ + txt_len = len(txt) + return int((len(txt.encode('utf-8')) - txt_len) / 2 + txt_len) + + +def port_is_using(ip, port): + """检查端口是否被占用 \n + :param ip: 浏览器地址 + :param port: 浏览器端口 + :return: bool + """ + from socket import socket, AF_INET, SOCK_STREAM + s = socket(AF_INET, SOCK_STREAM) + result = s.connect_ex((ip, int(port))) + s.close() + return result == 0 + + +def clean_folder(folder_path, ignore=None): + """清空一个文件夹,除了ignore里的文件和文件夹 \n + :param folder_path: 要清空的文件夹路径 + :param ignore: 忽略列表 + :return: None + """ + ignore = [] if not ignore else ignore + p = Path(folder_path) + + for f in p.iterdir(): + if f.name not in ignore: + if f.is_file(): + f.unlink() + elif f.is_dir(): + rmtree(f, True) + + +def unzip(zip_path, to_path): + """解压下载的chromedriver.zip文件""" + if not zip_path: + return + + with ZipFile(zip_path, 'r') as f: + return [f.extract(f.namelist()[0], path=to_path)] diff --git a/DrissionPage/functions/tools.pyi b/DrissionPage/functions/tools.pyi new file mode 100644 index 0000000..21584a0 --- /dev/null +++ b/DrissionPage/functions/tools.pyi @@ -0,0 +1,31 @@ +# -*- coding:utf-8 -*- +""" +@Author : g1879 +@Contact : g1879@qq.com +""" +from pathlib import Path +from typing import Union + + +def get_exe_from_port(port: Union[str, int]) -> Union[str, None]: ... + + +def get_pid_from_port(port: Union[str, int]) -> Union[str, None]: ... + + +def get_usable_path(path: Union[str, Path]) -> Path: ... + + +def make_valid_name(full_name: str) -> str: ... + + +def get_long(txt) -> int: ... + + +def port_is_using(ip: str, port: Union[str, int]) -> bool: ... + + +def clean_folder(folder_path: str, ignore: list = None) -> None: ... + + +def unzip(zip_path: str, to_path: str) -> Union[list, None]: ... diff --git a/DrissionPage/functions/web.py b/DrissionPage/functions/web.py new file mode 100644 index 0000000..95a2907 --- /dev/null +++ b/DrissionPage/functions/web.py @@ -0,0 +1,218 @@ +# -*- coding:utf-8 -*- +""" +@Author : g1879 +@Contact : g1879@qq.com +""" +from html import unescape +from http.cookiejar import Cookie +from re import sub +from urllib.parse import urlparse, urljoin, urlunparse + +from requests.cookies import RequestsCookieJar + + +def get_ele_txt(e): + """获取元素内所有文本 + :param e: 元素对象 + :return: 元素内所有文本 + """ + # 前面无须换行的元素 + nowrap_list = ('br', 'sub', 'sup', 'em', 'strong', 'a', 'font', 'b', 'span', 's', 'i', 'del', 'ins', 'img', 'td', + 'th', 'abbr', 'bdi', 'bdo', 'cite', 'code', 'data', 'dfn', 'kbd', 'mark', 'q', 'rp', 'rt', 'ruby', + 'samp', 'small', 'time', 'u', 'var', 'wbr', 'button', 'slot', 'content') + # 后面添加换行的元素 + wrap_after_list = ('p', 'div', 'h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'ol', 'li', 'blockquote', 'header', + 'footer', 'address' 'article', 'aside', 'main', 'nav', 'section', 'figcaption', 'summary') + # 不获取文本的元素 + noText_list = ('script', 'style', 'video', 'audio', 'iframe', 'embed', 'noscript', 'canvas', 'template') + # 用/t分隔的元素 + tab_list = ('td', 'th') + + if e.tag in noText_list: + return e.raw_text + + def get_node_txt(ele, pre: bool = False): + tag = ele.tag + if tag == 'br': + return [True] + if not pre and tag == 'pre': + pre = True + + str_list = [] + if tag in noText_list and not pre: # 标签内的文本不返回 + return str_list + + nodes = ele.eles('xpath:./text() | *') + prev_ele = '' + for el in nodes: + if isinstance(el, str): # 字符节点 + if pre: + str_list.append(el) + + else: + if sub('[ \n\t\r]', '', el) != '': # 字符除了回车和空格还有其它内容 + txt = el + if not pre: + txt = txt.replace('\n', ' ').strip(' ') + txt = sub(r' {2,}', ' ', txt) + str_list.append(txt) + + else: # 元素节点 + if el.tag not in nowrap_list and str_list and str_list[-1] != '\n': # 元素间换行的情况 + str_list.append('\n') + if el.tag in tab_list and prev_ele in tab_list: # 表格的行 + str_list.append('\t') + + str_list.extend(get_node_txt(el, pre)) + prev_ele = el.tag + + if tag in wrap_after_list and str_list and str_list[-1] not in ('\n', True): # 有些元素后面要添加回车 + str_list.append('\n') + + return str_list + + re_str = get_node_txt(e) + if re_str and re_str[-1] == '\n': + re_str.pop() + re_str = ''.join([i if i is not True else '\n' for i in re_str]) + return format_html(re_str) + + +def format_html(text): + """处理html编码字符 \n + :param text: html文本 + :return: 格式化后的html文本 + """ + return unescape(text).replace('\xa0', ' ') if text else text + + +def location_in_viewport(page, loc_x, loc_y): + """判断给定的坐标是否在视口中 |n + :param page: ChromePage对象 + :param loc_x: 页面绝对坐标x + :param loc_y: 页面绝对坐标y + :return: + """ + js = f'''function(){{var x = {loc_x}; var y = {loc_y}; + const scrollLeft = document.documentElement.scrollLeft; + const scrollTop = document.documentElement.scrollTop; + const vWidth = document.documentElement.clientWidth; + const vHeight = document.documentElement.clientHeight; + if (x< scrollLeft || y < scrollTop || x > vWidth + scrollLeft || y > vHeight + scrollTop){{return false;}} + return true;}}''' + return page.run_js(js) + # const vWidth = window.innerWidth || document.documentElement.clientWidth; + # const vHeight = window.innerHeight || document.documentElement.clientHeight; + + +def offset_scroll(ele, offset_x, offset_y): + """接收元素及偏移坐标,把坐标滚动到页面中间,返回该点在视口中的坐标 \n + 有偏移量时以元素左上角坐标为基准,没有时以_click_point为基准 + :param ele: 元素对象 + :param offset_x: 偏移量x + :param offset_y: 偏移量y + :return: 视口中的坐标 + """ + loc_x, loc_y = ele.location + cp_x, cp_y = ele._click_point + lx = loc_x + offset_x if offset_x else cp_x + ly = loc_y + offset_y if offset_y else cp_y + if not location_in_viewport(ele.page, lx, ly): + clientWidth = ele.page.run_js('return document.body.clientWidth;') + clientHeight = ele.page.run_js('return document.body.clientHeight;') + ele.page.scroll.to_location(lx - clientWidth // 2, ly - clientHeight // 2) + cl_x, cl_y = ele.client_location + ccp_x, ccp_y = ele._client_click_point + cx = cl_x + offset_x if offset_x else ccp_x + cy = cl_y + offset_y if offset_y else ccp_y + return cx, cy + + +def make_absolute_link(link, page=None): + """获取绝对url + :param link: 超链接 + :param page: 页面对象 + :return: 绝对链接 + """ + if not link: + return link + + parsed = urlparse(link)._asdict() + + # 是相对路径,与页面url拼接并返回 + if not parsed['netloc']: + return urljoin(page.url, link) if page else link + + # 是绝对路径但缺少协议,从页面url获取协议并修复 + if not parsed['scheme'] and page: + parsed['scheme'] = urlparse(page.url).scheme + parsed = tuple(v for v in parsed.values()) + return urlunparse(parsed) + + # 绝对路径且不缺协议,直接返回 + return link + + +def is_js_func(func): + """检查文本是否js函数""" + func = func.strip() + if func.startswith('function') or func.startswith('async '): + return True + elif '=>' in func: + return True + return False + + +def cookie_to_dict(cookie): + """把Cookie对象转为dict格式 \n + :param cookie: Cookie对象 + :return: cookie字典 + """ + if isinstance(cookie, Cookie): + cookie_dict = cookie.__dict__.copy() + cookie_dict.pop('rfc2109') + cookie_dict.pop('_rest') + return cookie_dict + + elif isinstance(cookie, dict): + cookie_dict = cookie + + elif isinstance(cookie, str): + cookie = cookie.split(',' if ',' in cookie else ';') + cookie_dict = {} + + for key, attr in enumerate(cookie): + attr_val = attr.lstrip().split('=') + + if key == 0: + cookie_dict['name'] = attr_val[0] + cookie_dict['value'] = attr_val[1] if len(attr_val) == 2 else '' + else: + cookie_dict[attr_val[0]] = attr_val[1] if len(attr_val) == 2 else '' + + return cookie_dict + + else: + raise TypeError('cookie参数必须为Cookie、str或dict类型。') + + return cookie_dict + + +def cookies_to_tuple(cookies): + """把cookies转为tuple格式 \n + :param cookies: cookies信息,可为CookieJar, list, tuple, str, dict + :return: 返回tuple形式的cookies + """ + if isinstance(cookies, (list, tuple, RequestsCookieJar)): + cookies = tuple(cookie_to_dict(cookie) for cookie in cookies) + + elif isinstance(cookies, str): + cookies = tuple(cookie_to_dict(cookie.lstrip()) for cookie in cookies.split(";")) + + elif isinstance(cookies, dict): + cookies = tuple({'name': cookie, 'value': cookies[cookie]} for cookie in cookies) + + else: + raise TypeError('cookies参数必须为RequestsCookieJar、list、tuple、str或dict类型。') + + return cookies diff --git a/DrissionPage/functions/web.pyi b/DrissionPage/functions/web.pyi new file mode 100644 index 0000000..7c49332 --- /dev/null +++ b/DrissionPage/functions/web.pyi @@ -0,0 +1,36 @@ +# -*- coding:utf-8 -*- +""" +@Author : g1879 +@Contact : g1879@qq.com +""" +from http.cookiejar import Cookie +from typing import Union + +from requests.cookies import RequestsCookieJar + +from DrissionPage.base import DrissionElement, BasePage +from DrissionPage.chromium_element import ChromiumElement + + +def get_ele_txt(e: DrissionElement) -> str: ... + + +def format_html(text: str) -> str: ... + + +def location_in_viewport(page, loc_x: int, loc_y: int) -> bool: ... + + +def offset_scroll(ele: ChromiumElement, offset_x: int, offset_y: int) -> tuple: ... + + +def make_absolute_link(link, page: BasePage = None) -> str: ... + + +def is_js_func(func: str) -> bool: ... + + +def cookie_to_dict(cookie: Union[Cookie, str, dict]) -> dict: ... + + +def cookies_to_tuple(cookies: Union[RequestsCookieJar, list, tuple, str, dict]) -> tuple: ... diff --git a/DrissionPage/session_element.py b/DrissionPage/session_element.py index 449793d..57c7d66 100644 --- a/DrissionPage/session_element.py +++ b/DrissionPage/session_element.py @@ -10,7 +10,8 @@ from lxml.etree import tostring from lxml.html import HtmlElement, fromstring from .base import DrissionElement, BasePage, BaseElement -from .common import get_ele_txt, get_loc, make_absolute_link +from .functions.web import get_ele_txt, make_absolute_link +from .functions.locator import get_loc class SessionElement(DrissionElement): diff --git a/DrissionPage/session_page.py b/DrissionPage/session_page.py index e0dd769..52abdb1 100644 --- a/DrissionPage/session_page.py +++ b/DrissionPage/session_page.py @@ -14,7 +14,7 @@ from tldextract import extract from .base import BasePage from .configs.session_options import SessionOptions -from .common import cookies_to_tuple, cookie_to_dict +from .functions.web import cookies_to_tuple, cookie_to_dict from .session_element import SessionElement, make_session_ele diff --git a/DrissionPage/shadow_root_element.py b/DrissionPage/shadow_root_element.py index 728b566..ed3c29e 100644 --- a/DrissionPage/shadow_root_element.py +++ b/DrissionPage/shadow_root_element.py @@ -9,7 +9,7 @@ from typing import Union from selenium.webdriver.remote.webelement import WebElement from .base import BaseElement -from .common import get_loc +from .functions.locator import get_loc from .driver_element import make_driver_ele from .session_element import make_session_ele, SessionElement diff --git a/DrissionPage/web_page.py b/DrissionPage/web_page.py index 0906dd9..36c6d6f 100644 --- a/DrissionPage/web_page.py +++ b/DrissionPage/web_page.py @@ -15,7 +15,7 @@ from .chromium_driver import ChromiumDriver from .chromium_page import ChromiumPage, ChromiumDownloadSetter from .configs.session_options import SessionOptions from .configs.driver_options import DriverOptions -from .common import cookies_to_tuple +from .functions.web import cookies_to_tuple from .session_page import SessionPage