3.2.31 set_argument('--headless')自动改为新写法;优化find_tabs()用法

This commit is contained in:
g1879 2023-07-27 15:56:40 +08:00
parent e46f068218
commit 3a416ec061
23 changed files with 5964 additions and 18 deletions

View File

@ -194,22 +194,25 @@ class ChromiumPage(ChromiumBase):
tab_id = tab_id or self.tab_id
return ChromiumTab(self, tab_id)
def find_tabs(self, text=None, by_title=True, by_url=None, special=False):
def find_tabs(self, title=None, url=None, tab_type=None, single=True):
"""查找符合条件的tab返回它们的id组成的列表
:param text: 查询条件
:param by_title: 是否匹配title
:param by_url: 是否匹配url
:param special: 是否匹配特殊tab如打印页
:return: tab id组成的列表
:param title: 要匹配title的文本
:param url: 要匹配url的文本
:param tab_type: tab类型可用列表输入多个
:param single: 是否返回首个结果的id为False返回所有信息
:return: tab id或tab dict
"""
tabs = self._control_session.get(f'http://{self.address}/json').json() # 不要改用cdp
if text is None or not (by_title or by_url):
return [i['id'] for i in tabs if (not special and i['type'] == 'page')
or (special and i['type'] not in ('page', 'iframe'))]
if isinstance(tab_type, str):
tab_type = {tab_type}
elif isinstance(tab_type, (list, tuple, set)):
tab_type = set(tab_type)
elif tab_type is not None:
raise TypeError('tab_type只能是set、list、tuple、str、None。')
return [i['id'] for i in tabs if ((not special and i['type'] == 'page')
or (special and i['type'] not in ('page', 'iframe')))
and ((by_url and text in i['url']) or (by_title and text in i['title']))]
r = [i for i in tabs if ((title is None or title in i['title']) and (url is None or url in i['url'])
and (tab_type is None or i['type'] in tab_type))]
return r[0]['id'] if r and single else r
def new_tab(self, url=None, switch_to=True):
"""新建一个标签页,该标签页在最后面

View File

@ -81,8 +81,8 @@ class ChromiumPage(ChromiumBase):
def get_tab(self, tab_id: str = None) -> ChromiumTab: ...
def find_tabs(self, text: str = None, by_title: bool = True, by_url: bool = None,
special: bool = False) -> List[str]: ...
def find_tabs(self, title: str = None, url: str = None,
tab_type: Union[str, list, tuple, set] = None, single: bool = True) -> Union[str, List[str]]: ...
def new_tab(self, url: str = None, switch_to: bool = True) -> str: ...

View File

@ -146,8 +146,11 @@ class ChromiumOptions(object):
"""
self.remove_argument(arg)
if value is not False:
arg_str = arg if value is None else f'{arg}={value}'
self._arguments.append(arg_str)
if arg == '--headless' and value is None:
self._arguments.append('--headless=new')
else:
arg_str = arg if value is None else f'{arg}={value}'
self._arguments.append(arg_str)
return self
def remove_argument(self, value):

View File

@ -0,0 +1,364 @@
# -*- coding:utf-8 -*-
"""
@Author : g1879
@Contact : g1879@qq.com
"""
from pathlib import Path
from selenium.webdriver.chrome.options import Options
from .options_manage import OptionsManager
class DriverOptions(Options):
"""chrome浏览器配置类继承自selenium.webdriver.chrome.options的Options类
增加了删除配置和保存到文件方法
"""
def __init__(self, read_file=True, ini_path=None):
"""初始化,默认从文件读取设置
:param read_file: 是否从默认ini文件中读取配置信息
:param ini_path: ini文件路径为None则读取默认ini文件
"""
super().__init__()
self._user_data_path = None
if read_file:
self.ini_path = str(ini_path) if ini_path else str(Path(__file__).parent / 'configs.ini')
om = OptionsManager(self.ini_path)
options_dict = om.chrome_options
self._driver_path = om.paths.get('chromedriver_path', None)
self._download_path = om.paths.get('download_path', None)
self._binary_location = options_dict.get('binary_location', '')
self._arguments = options_dict.get('arguments', [])
self._extensions = options_dict.get('extensions', [])
self._experimental_options = options_dict.get('experimental_options', {})
self._debugger_address = options_dict.get('debugger_address', None)
self.page_load_strategy = options_dict.get('page_load_strategy', 'normal')
self.system_user_path = options_dict.get('system_user_path', False)
for arg in self._arguments:
if arg.startswith('--user-data-dir='):
self.set_paths(user_data_path=arg[16:])
break
self.timeouts = options_dict.get('timeouts', {'implicit': 10, 'pageLoad': 30, 'script': 30})
return
self._driver_path = None
self._download_path = None
self.ini_path = None
self.timeouts = {'implicit': 10, 'pageLoad': 30, 'script': 30}
self._debugger_address = '127.0.0.1:9222'
self.system_user_path = False
@property
def driver_path(self):
"""chromedriver文件路径"""
return self._driver_path
@property
def download_path(self):
"""默认下载路径文件路径"""
return self._download_path
@property
def chrome_path(self):
"""浏览器启动文件路径"""
return self.browser_path
@property
def browser_path(self):
"""浏览器启动文件路径"""
return self.binary_location or 'chrome'
@property
def user_data_path(self):
"""返回用户文件夹路径"""
return self._user_data_path
# -------------重写父类方法,实现链式操作-------------
def add_argument(self, argument):
"""添加一个配置项
:param argument: 配置项内容
:return: 当前对象
"""
super().add_argument(argument)
return self
def set_capability(self, name, value):
"""设置一个capability
:param name: capability名称
:param value: capability值
:return: 当前对象
"""
super().set_capability(name, value)
return self
def add_extension(self, extension):
"""添加插件
:param extension: crx文件路径
:return: 当前对象
"""
super().add_extension(extension)
return self
def add_encoded_extension(self, extension):
"""将带有扩展数据的 Base64 编码字符串添加到将用于将其提取到 ChromeDriver 的列表中
:param extension: 带有扩展数据的 Base64 编码字符串
:return: 当前对象
"""
super().add_encoded_extension(extension)
return self
def add_experimental_option(self, name, value):
"""添加一个实验选项到浏览器
:param name: 选项名称
:param value: 选项值
:return: 当前对象
"""
super().add_experimental_option(name, value)
return self
# -------------重写父类方法结束-------------
def save(self, path=None):
"""保存设置到文件
:param path: ini文件的路径 None 保存到当前读取的配置文件传入 'default' 保存到默认ini文件
:return: 保存文件的绝对路径
"""
if path == 'default':
path = (Path(__file__).parent / 'configs.ini').absolute()
elif path is None:
if self.ini_path:
path = Path(self.ini_path).absolute()
else:
path = (Path(__file__).parent / 'configs.ini').absolute()
else:
path = Path(path).absolute()
path = path / 'config.ini' if path.is_dir() else path
if path.exists():
om = OptionsManager(str(path))
else:
om = OptionsManager(self.ini_path or str(Path(__file__).parent / 'configs.ini'))
options = self.as_dict()
for i in options:
if i == 'driver_path':
om.set_item('paths', 'chromedriver_path', options[i])
elif i == 'download_path':
om.set_item('paths', 'download_path', options[i])
else:
om.set_item('chrome_options', i, options[i])
path = str(path)
om.save(path)
return path
def save_to_default(self):
"""保存当前配置到默认ini文件"""
return self.save('default')
def remove_argument(self, value):
"""移除一个argument项
:param value: 设置项名有值的设置项传入设置名称即可
:return: 当前对象
"""
del_list = []
for argument in self._arguments:
if argument.startswith(value):
del_list.append(argument)
for del_arg in del_list:
self._arguments.remove(del_arg)
return self
def remove_experimental_option(self, key):
"""移除一个实验设置传入key值删除
:param key: 实验设置的名称
:return: 当前对象
"""
if key in self._experimental_options:
self._experimental_options.pop(key)
return self
def remove_all_extensions(self):
"""移除所有插件
:return: 当前对象
"""
# 因插件是以整个文件储存,难以移除其中一个,故如须设置则全部移除再重设
self._extensions = []
return self
def set_argument(self, arg, value):
"""设置浏览器配置的argument属性
:param arg: 属性名
:param value: 属性值有值的属性传入值没有的传入bool
:return: 当前对象
"""
self.remove_argument(arg)
if value:
arg_str = arg if isinstance(value, bool) else f'{arg}={value}'
self.add_argument(arg_str)
return self
def set_timeouts(self, implicit=None, pageLoad=None, script=None):
"""设置超时时间设置单位为秒selenium4以上版本有效
:param implicit: 查找元素超时时间
:param pageLoad: 页面加载超时时间
:param script: 脚本运行超时时间
:return: 当前对象
"""
if implicit is not None:
self.timeouts['implicit'] = implicit
if pageLoad is not None:
self.timeouts['pageLoad'] = pageLoad
if script is not None:
self.timeouts['script'] = script
return self
def set_headless(self, on_off=True):
"""设置是否隐藏浏览器界面
:param on_off: 开或关
:return: 当前对象
"""
on_off = True if on_off else False
return self.set_argument('--headless', on_off)
def set_no_imgs(self, on_off=True):
"""设置是否加载图片
:param on_off: 开或关
:return: 当前对象
"""
on_off = True if on_off else False
return self.set_argument('--blink-settings=imagesEnabled=false', on_off)
def set_no_js(self, on_off=True):
"""设置是否禁用js
:param on_off: 开或关
:return: 当前对象
"""
on_off = True if on_off else False
return self.set_argument('--disable-javascript', on_off)
def set_mute(self, on_off=True):
"""设置是否静音
:param on_off: 开或关
:return: 当前对象
"""
on_off = True if on_off else False
return self.set_argument('--mute-audio', on_off)
def set_user_agent(self, user_agent):
"""设置user agent
:param user_agent: user agent文本
:return: 当前对象
"""
return self.set_argument('--user-agent', user_agent)
def set_proxy(self, proxy):
"""设置代理
:param proxy: 代理url和端口
:return: 当前对象
"""
return self.set_argument('--proxy-server', proxy)
def set_page_load_strategy(self, value):
"""设置page_load_strategy可接收 'normal', 'eager', 'none'
selenium4以上版本才支持此功能
normal默认情况下使用, 等待所有资源下载完成
eagerDOM访问已准备就绪, 但其他资源 (如图像) 可能仍在加载中
none完全不阻塞WebDriver
:param value: 可接收 'normal', 'eager', 'none'
:return: 当前对象
"""
if value not in ('normal', 'eager', 'none'):
raise ValueError("只能选择'normal', 'eager', 'none'")
self.page_load_strategy = value.lower()
return self
def set_paths(self, driver_path=None, chrome_path=None, browser_path=None, local_port=None,
debugger_address=None, download_path=None, user_data_path=None, cache_path=None):
"""快捷的路径设置函数
:param driver_path: chromedriver.exe路径
:param chrome_path: chrome.exe路径
:param browser_path: 浏览器可执行文件路径
:param local_port: 本地端口号
:param debugger_address: 调试浏览器地址127.0.0.1:9222
:param download_path: 下载文件路径
:param user_data_path: 用户数据路径
:param cache_path: 缓存路径
:return: 当前对象
"""
if driver_path is not None:
self._driver_path = str(driver_path)
if chrome_path is not None:
self.binary_location = str(chrome_path)
if browser_path is not None:
self.binary_location = str(browser_path)
if local_port is not None:
self.debugger_address = '' if local_port == '' else f'127.0.0.1:{local_port}'
if debugger_address is not None:
self.debugger_address = debugger_address
if download_path is not None:
self._download_path = str(download_path)
if user_data_path is not None:
self.set_argument('--user-data-dir', str(user_data_path))
self._user_data_path = user_data_path
if cache_path is not None:
self.set_argument('--disk-cache-dir', str(cache_path))
return self
def as_dict(self):
"""已dict方式返回所有配置信息"""
return chrome_options_to_dict(self)
def chrome_options_to_dict(options):
"""把chrome配置对象转换为字典
:param options: chrome配置对象字典或DriverOptions对象
:return: 配置字典
"""
if options in (False, None):
return DriverOptions(read_file=False).as_dict()
if isinstance(options, dict):
return options
re_dict = dict()
attrs = ['debugger_address', 'binary_location', 'arguments', 'extensions', 'experimental_options', 'driver_path',
'page_load_strategy', 'download_path']
options_dir = options.__dir__()
for attr in attrs:
try:
re_dict[attr] = options.__getattribute__(attr) if attr in options_dir else None
except Exception:
pass
if 'timeouts' in options_dir and 'timeouts' in options._caps:
timeouts = options.__getattribute__('timeouts')
re_dict['timeouts'] = timeouts
return re_dict

View File

@ -0,0 +1,89 @@
# -*- coding:utf-8 -*-
"""
@Author : g1879
@Contact : g1879@qq.com
"""
from pathlib import Path
from typing import Union, List
from selenium.webdriver.chrome.options import Options
class DriverOptions(Options):
def __init__(self, read_file: bool = True, ini_path: Union[str, Path] = None):
self.ini_path: str = ...
self._driver_path: str = ...
self._user_data_path: str = ...
self._download_path: str = ...
@property
def driver_path(self) -> str: ...
@property
def download_path(self) -> str: ...
@property
def chrome_path(self) -> str: ...
@property
def browser_path(self) -> str: ...
@property
def user_data_path(self) -> str: ...
# -------------重写父类方法,实现链式操作-------------
def add_argument(self, argument: str) -> DriverOptions: ...
def set_capability(self, name: str, value: str) -> DriverOptions: ...
def add_extension(self, extension: str) -> DriverOptions: ...
def add_encoded_extension(self, extension: str) -> DriverOptions: ...
def add_experimental_option(self, name: str, value: Union[str, int, dict, List[str]]) -> DriverOptions: ...
# -------------重写父类方法结束-------------
def save(self, path: str = None) -> str: ...
def save_to_default(self) -> str: ...
def remove_argument(self, value: str) -> DriverOptions: ...
def remove_experimental_option(self, key: str) -> DriverOptions: ...
def remove_all_extensions(self) -> DriverOptions: ...
def set_argument(self, arg: str, value: Union[bool, str]) -> DriverOptions: ...
def set_timeouts(self, implicit: float = None, pageLoad: float = None, script: float = None) -> DriverOptions: ...
def set_headless(self, on_off: bool = True) -> DriverOptions: ...
def set_no_imgs(self, on_off: bool = True) -> DriverOptions: ...
def set_no_js(self, on_off: bool = True) -> DriverOptions: ...
def set_mute(self, on_off: bool = True) -> DriverOptions: ...
def set_user_agent(self, user_agent: str) -> DriverOptions: ...
def set_proxy(self, proxy: str) -> DriverOptions: ...
def set_page_load_strategy(self, value: str) -> DriverOptions: ...
def set_paths(self,
driver_path: Union[str, Path] = None,
chrome_path: Union[str, Path] = None,
browser_path: Union[str, Path] = None,
local_port: Union[int, str] = None,
debugger_address: str = None,
download_path: str = None,
user_data_path: str = None,
cache_path: str = None) -> DriverOptions: ...
def as_dict(self) -> dict: ...
def chrome_options_to_dict(options: Union[dict, DriverOptions, Options, None, bool]) -> Union[dict, None]: ...

View File

@ -0,0 +1,324 @@
# -*- coding:utf-8 -*-
"""
@Author : g1879
@Contact : g1879@qq.com
"""
from abc import abstractmethod
from re import sub
from urllib.parse import quote
from DrissionPage.commons.web import format_html
from DrissionPage.commons.locator import get_loc
class BaseParser(object):
"""所有页面、元素类的基类"""
def __call__(self, loc_or_str):
return self.ele(loc_or_str)
def ele(self, loc_or_ele, timeout=None):
return self._ele(loc_or_ele, timeout, True)
def eles(self, loc_or_str, timeout=None):
return self._ele(loc_or_str, timeout, False)
# ----------------以下属性或方法待后代实现----------------
@property
def html(self):
return ''
def s_ele(self, loc_or_ele):
pass
def s_eles(self, loc_or_str):
pass
@abstractmethod
def _ele(self, loc_or_ele, timeout=None, single=True):
pass
class BaseElement(BaseParser):
"""各元素类的基类"""
def __init__(self, page=None):
self.page = page
# ----------------以下属性或方法由后代实现----------------
@property
def tag(self):
return
@abstractmethod
def _ele(self, loc_or_str, timeout=None, single=True, relative=False):
pass
def parent(self, level_or_loc=1):
pass
def prev(self, index=1):
return None # ShadowRootElement直接继承
def prevs(self) -> None:
return None # ShadowRootElement直接继承
def next(self, index=1):
pass
def nexts(self):
pass
class DrissionElement(BaseElement):
"""DriverElement、ChromiumElement 和 SessionElement的基类
但不是ShadowRootElement的基类"""
@property
def link(self):
"""返回href或src绝对url"""
return self.attr('href') or self.attr('src')
@property
def css_path(self):
"""返回css path路径"""
return self._get_ele_path('css')
@property
def xpath(self):
"""返回xpath路径"""
return self._get_ele_path('xpath')
@property
def comments(self):
"""返回元素注释文本组成的列表"""
return self.eles('xpath:.//comment()')
def texts(self, text_node_only=False):
"""返回元素内所有直接子节点的文本,包括元素和文本节点
:param text_node_only: 是否只返回文本节点
:return: 文本列表
"""
if text_node_only:
texts = self.eles('xpath:/text()')
else:
texts = [x if isinstance(x, str) else x.text for x in self.eles('xpath:./text() | *')]
return [format_html(x.strip(' ').rstrip('\n')) for x in texts if x and sub('[\r\n\t ]', '', x) != '']
def parent(self, level_or_loc=1):
"""返回上面某一级父元素,可指定层数或用查询语法定位
:param level_or_loc: 第几级父元素或定位符
:return: 上级元素对象
"""
if isinstance(level_or_loc, int):
loc = f'xpath:./ancestor::*[{level_or_loc}]'
elif isinstance(level_or_loc, (tuple, str)):
loc = get_loc(level_or_loc, True)
if loc[0] == 'css selector':
raise ValueError('此css selector语法不受支持请换成xpath。')
loc = f'xpath:./ancestor::{loc[1].lstrip(". / ")}'
else:
raise TypeError('level_or_loc参数只能是tuple、int或str。')
return self._ele(loc, timeout=0, relative=True)
def prev(self, index=1, filter_loc='', timeout=0):
"""返回前面的一个兄弟元素,可用查询语法筛选,可指定返回筛选结果的第几个
:param index: 前面第几个查询结果
:param filter_loc: 用于筛选的查询语法
:param timeout: 查找节点的超时时间
:return: 兄弟元素
"""
nodes = self._get_brothers(index, filter_loc, 'preceding', timeout=timeout)
return nodes[-1] if nodes else None
def next(self, index=1, filter_loc='', timeout=0):
"""返回后面的一个兄弟元素,可用查询语法筛选,可指定返回筛选结果的第几个
:param index: 后面第几个查询结果
:param filter_loc: 用于筛选的查询语法
:param timeout: 查找节点的超时时间
:return: 兄弟元素
"""
nodes = self._get_brothers(index, filter_loc, 'following', timeout=timeout)
return nodes[0] if nodes else None
def before(self, index=1, filter_loc='', timeout=None):
"""返回前面的一个兄弟元素,可用查询语法筛选,可指定返回筛选结果的第几个
:param index: 前面第几个查询结果
:param filter_loc: 用于筛选的查询语法
:param timeout: 查找节点的超时时间
:return: 本元素前面的某个元素或节点
"""
nodes = self._get_brothers(index, filter_loc, 'preceding', False, timeout=timeout)
return nodes[-1] if nodes else None
def after(self, index=1, filter_loc='', timeout=None):
"""返回后面的一个兄弟元素,可用查询语法筛选,可指定返回筛选结果的第几个
:param index: 后面第几个查询结果
:param filter_loc: 用于筛选的查询语法
:param timeout: 查找节点的超时时间
:return: 本元素后面的某个元素或节点
"""
nodes = self._get_brothers(index, filter_loc, 'following', False, timeout)
return nodes[0] if nodes else None
def prevs(self, filter_loc='', timeout=0):
"""返回前面全部兄弟元素或节点组成的列表,可用查询语法筛选
:param filter_loc: 用于筛选的查询语法
:param timeout: 查找节点的超时时间
:return: 兄弟元素或节点文本组成的列表
"""
return self._get_brothers(filter_loc=filter_loc, direction='preceding', timeout=timeout)
def nexts(self, filter_loc='', timeout=0):
"""返回后面全部兄弟元素或节点组成的列表,可用查询语法筛选
:param filter_loc: 用于筛选的查询语法
:param timeout: 查找节点的超时时间
:return: 兄弟元素或节点文本组成的列表
"""
return self._get_brothers(filter_loc=filter_loc, direction='following', timeout=timeout)
def befores(self, filter_loc='', timeout=None):
"""返回后面全部兄弟元素或节点组成的列表,可用查询语法筛选
:param filter_loc: 用于筛选的查询语法
:param timeout: 查找节点的超时时间
:return: 本元素前面的元素或节点组成的列表
"""
return self._get_brothers(filter_loc=filter_loc, direction='preceding', brother=False, timeout=timeout)
def afters(self, filter_loc='', timeout=None):
"""返回前面全部兄弟元素或节点组成的列表,可用查询语法筛选
:param filter_loc: 用于筛选的查询语法
:param timeout: 查找节点的超时时间
:return: 本元素后面的元素或节点组成的列表
"""
return self._get_brothers(filter_loc=filter_loc, direction='following', brother=False, timeout=timeout)
def _get_brothers(self, index=None, filter_loc='', direction='following', brother=True, timeout=.5):
"""按要求返回兄弟元素或节点组成的列表
:param index: 获取第几个该参数不为None时只获取该编号的元素
:param filter_loc: 用于筛选的查询语法
:param direction: 'following' 'preceding'查找的方向
:param brother: 查找范围在同级查找还是整个dom前后查找
:param timeout: 查找等待时间
:return: DriverElement对象或字符串
"""
if index is not None and index < 1:
raise ValueError('index必须大于等于1。')
brother = '-sibling' if brother else ''
if not filter_loc:
loc = '*'
else:
loc = get_loc(filter_loc, True) # 把定位符转换为xpath
if loc[0] == 'css selector':
raise ValueError('此css selector语法不受支持请换成xpath。')
loc = loc[1].lstrip('./')
loc = f'xpath:./{direction}{brother}::{loc}'
nodes = self._ele(loc, timeout=timeout, single=False, relative=True)
nodes = [e for e in nodes if not (isinstance(e, str) and sub('[ \n\t\r]', '', e) == '')]
if nodes and index is not None:
index = index - 1 if direction == 'following' else -index
try:
return [nodes[index]]
except IndexError:
return []
else:
return nodes
# ----------------以下属性或方法由后代实现----------------
@property
def attrs(self):
return
@property
def text(self):
return
@property
def raw_text(self):
return
@abstractmethod
def attr(self, attr: str):
return ''
def _get_ele_path(self, mode):
return ''
class BasePage(BaseParser):
"""页面类的基类"""
def __init__(self, timeout=None):
"""初始化函数"""
self._url = None
self.timeout = timeout if timeout is not None else 10
self.retry_times = 3
self.retry_interval = 2
self._url_available = None
@property
def title(self):
"""返回网页title"""
ele = self.ele('xpath://title')
return ele.text if ele else None
@property
def timeout(self):
"""返回查找元素时等待的秒数"""
return self._timeout
@timeout.setter
def timeout(self, second):
"""设置查找元素时等待的秒数"""
self._timeout = second
@property
def cookies(self):
"""返回cookies"""
return self.get_cookies(True)
@property
def url_available(self):
"""返回当前访问的url有效性"""
return self._url_available
def _before_connect(self, url, retry, interval):
"""连接前的准备
:param url: 要访问的url
:param retry: 重试次数
:param interval: 重试间隔
:return: 重试次数和间隔组成的tuple
"""
self._url = quote(url, safe='/:&?=%;#@+!')
retry = retry if retry is not None else self.retry_times
interval = interval if interval is not None else self.retry_interval
return retry, interval
# ----------------以下属性或方法由后代实现----------------
@property
def url(self):
return
@property
def json(self):
return
@abstractmethod
def get_cookies(self, as_dict=False):
return {}
@abstractmethod
def get(self, url, show_errmsg=False, retry=None, interval=None):
pass

View File

@ -0,0 +1,175 @@
# -*- coding:utf-8 -*-
"""
@Author : g1879
@Contact : g1879@qq.com
"""
from abc import abstractmethod
from typing import Union, Tuple, List
class BaseParser(object):
def __call__(self, loc_or_str: Union[Tuple[str, str], str]): ...
def ele(self, loc_or_ele: Union[Tuple[str, str], str, BaseElement], timeout: float = None): ...
def eles(self, loc_or_str: Union[Tuple[str, str], str], timeout=None): ...
# ----------------以下属性或方法待后代实现----------------
@property
def html(self) -> str: ...
def s_ele(self, loc_or_ele: Union[Tuple[str, str], str, BaseElement]): ...
def s_eles(self, loc_or_str: Union[Tuple[str, str], str]): ...
@abstractmethod
def _ele(self, loc_or_ele, timeout: float = None, single: bool = True): ...
class BaseElement(BaseParser):
def __init__(self, page: BasePage = None):
self.page: BasePage = ...
# ----------------以下属性或方法由后代实现----------------
@property
def tag(self) -> str: ...
@abstractmethod
def _ele(self, loc_or_str: Union[Tuple[str, str], str], timeout: float = None, single: bool = True,
relative: bool = False): ...
def parent(self, level_or_loc: Union[tuple, str, int] = 1): ...
def prev(self, index: int = 1) -> None: ...
def prevs(self) -> None: ...
def next(self, index: int = 1): ...
def nexts(self): ...
class DrissionElement(BaseElement):
def __init__(self,
page: BasePage = ...):
self.page: BasePage = ...
@property
def link(self) -> str: ...
@property
def css_path(self) -> str: ...
@property
def xpath(self) -> str: ...
@property
def comments(self) -> list: ...
def texts(self, text_node_only: bool = False) -> list: ...
def parent(self, level_or_loc: Union[tuple, str, int] = 1) -> Union[DrissionElement, None]: ...
def prev(self,
index: int = 1,
filter_loc: Union[tuple, str] = '',
timeout: float = 0) -> Union[DrissionElement, str, None]: ...
def next(self,
index: int = 1,
filter_loc: Union[tuple, str] = '',
timeout: float = 0) -> Union[DrissionElement, str, None]: ...
def before(self,
index: int = 1,
filter_loc: Union[tuple, str] = '',
timeout: float = None) -> Union[DrissionElement, str, None]: ...
def after(self,
index: int = 1,
filter_loc: Union[tuple, str] = '',
timeout: float = None) -> Union[DrissionElement, str, None]: ...
def prevs(self,
filter_loc: Union[tuple, str] = '',
timeout: float = 0) -> List[Union[DrissionElement, str]]: ...
def nexts(self,
filter_loc: Union[tuple, str] = '',
timeout: float = 0) -> List[Union[DrissionElement, str]]: ...
def befores(self,
filter_loc: Union[tuple, str] = '',
timeout: float = None) -> List[Union[DrissionElement, str]]: ...
def afters(self,
filter_loc: Union[tuple, str] = '',
timeout: float = None) -> List[Union[DrissionElement, str]]: ...
def _get_brothers(self,
index: int = None,
filter_loc: Union[tuple, str] = '',
direction: str = 'following',
brother: bool = True,
timeout: float = 0.5) -> List[Union[DrissionElement, str]]: ...
# ----------------以下属性或方法由后代实现----------------
@property
def attrs(self) -> dict: ...
@property
def text(self) -> str: ...
@property
def raw_text(self) -> str: ...
@abstractmethod
def attr(self, attr: str) -> str: ...
def _get_ele_path(self, mode) -> str: ...
class BasePage(BaseParser):
def __init__(self, timeout: float = None):
self._url_available: bool = ...
self.retry_times: int = ...
self.retry_interval: float = ...
self._timeout = float = ...
@property
def title(self) -> Union[str, None]: ...
@property
def timeout(self) -> float: ...
@timeout.setter
def timeout(self, second: float) -> None: ...
@property
def cookies(self) -> dict: ...
@property
def url_available(self) -> bool: ...
def _before_connect(self, url: str, retry: int, interval: float) -> tuple: ...
# ----------------以下属性或方法由后代实现----------------
@property
def url(self) -> str: ...
@property
def json(self) -> dict: ...
@abstractmethod
def get_cookies(self, as_dict: bool = False) -> Union[list, dict]: ...
@abstractmethod
def get(self,
url: str,
show_errmsg: bool = False,
retry: int = None,
interval: float = None): ...

View File

@ -0,0 +1,458 @@
# -*- encoding: utf-8 -*-
"""
@Author : g1879
@Contact : g1879@qq.com
"""
from platform import system
from sys import exit
from requests import Session
from requests.structures import CaseInsensitiveDict
from selenium import webdriver
from selenium.common.exceptions import SessionNotCreatedException, WebDriverException
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.remote.webdriver import WebDriver as RemoteWebDriver
from tldextract import extract
from DrissionPage.commons.tools import get_pid_from_port, get_exe_from_port
from DrissionPage.commons.browser import connect_browser
from DrissionPage.commons.web import cookies_to_tuple
from DrissionPage.configs.session_options import SessionOptions, session_options_to_dict
from DrissionPage.configs.driver_options import DriverOptions
class Drission(object):
"""Drission类用于管理WebDriver对象和Session对象是驱动器的角色"""
def __init__(self, driver_or_options=None, session_or_options=None, ini_path=None, proxy=None):
"""初始化可接收现成的WebDriver和Session对象或接收它们的配置信息生成对象
:param driver_or_options: driver对象或DriverOptionsOptions类传入False则创建空配置对象
:param session_or_options: Session对象或设置字典传入False则创建空配置对象
:param ini_path: ini文件路径
:param proxy: 代理设置
"""
self._session = None
self._driver = None
self._session_options = None
self._driver_options = None
self._debugger = None
self._proxy = proxy
# ------------------处理session options----------------------
if session_or_options is None:
self._session_options = SessionOptions(ini_path=ini_path).as_dict()
elif session_or_options is False:
self._session_options = SessionOptions(read_file=False).as_dict()
elif isinstance(session_or_options, Session):
self._session = session_or_options
elif isinstance(session_or_options, SessionOptions):
self._session_options = session_or_options.as_dict()
elif isinstance(session_or_options, dict):
self._session_options = session_or_options
else:
raise TypeError('session_or_options参数只能接收Session, dict, SessionOptions或False。')
# ------------------处理driver options----------------------
if driver_or_options is None:
self._driver_options = DriverOptions(ini_path=ini_path)
elif driver_or_options is False:
self._driver_options = DriverOptions(read_file=False)
elif isinstance(driver_or_options, RemoteWebDriver):
self._driver = driver_or_options
elif isinstance(driver_or_options, (Options, DriverOptions)):
self._driver_options = driver_or_options
else:
raise TypeError('driver_or_options参数只能接收WebDriver, Options, DriverOptions或False。')
def __del__(self):
"""关闭对象时关闭浏览器和Session"""
try:
self.close()
except ImportError:
pass
@property
def session(self):
"""返回Session对象如未初始化则按配置信息创建"""
if self._session is None:
self._set_session(self._session_options)
if self._proxy:
self._session.proxies = self._proxy
return self._session
@property
def driver(self):
"""返回WebDriver对象如未初始化则按配置信息创建。
如设置了本地调试浏览器可自动接入或打开浏览器进程
"""
if self._driver is None:
if not self.driver_options.debugger_address and self._proxy:
self.driver_options.add_argument(f'--proxy-server={self._proxy["http"]}')
driver_path = self.driver_options.driver_path or 'chromedriver'
chrome_path = self.driver_options.chrome_path
# -----------若指定debug端口且该端口未在使用中则先启动浏览器进程-----------
if self.driver_options.debugger_address:
# 启动浏览器进程,同时返回该进程使用的 chrome.exe 路径
cp, self._debugger = connect_browser(self.driver_options)
if cp in (None, 'chrome'):
system_type = system().lower()
ip, port = self.driver_options.debugger_address.split(':')
if ip not in ('127.0.0.1', 'localhost'):
chrome_path = None
elif chrome_path == 'chrome' and system_type == 'windows':
chrome_path = get_exe_from_port(port)
# -----------创建WebDriver对象-----------
self._driver = create_driver(chrome_path, driver_path, self.driver_options)
# -----------解决接管新版浏览器不能定位到正确的标签页的问题-----------
active_tab = self._driver.window_handles[0]
if active_tab != self._driver.current_window_handle:
self._driver.switch_to.window(active_tab)
return self._driver
@property
def driver_options(self):
"""返回driver配置信息"""
return self._driver_options
@property
def session_options(self):
"""返回session配置信息"""
return self._session_options
@session_options.setter
def session_options(self, options):
"""设置session配置
:param options: session配置字典
:return: None
"""
self._session_options = session_options_to_dict(options)
self._set_session(self._session_options)
@property
def proxy(self):
"""返回代理信息"""
return self._proxy
@proxy.setter
def proxy(self, proxies=None):
"""设置代理信息
:param proxies: 代理信息字典
:return: None
"""
self._proxy = proxies
if self._session:
self._session.proxies = proxies
if self._driver:
cookies = self._driver.get_cookies()
url = self._driver.current_url
self._driver.quit()
self._driver = None
self._driver = self.driver
self._driver.get(url)
for cookie in cookies:
self.set_cookies(cookie, set_driver=True)
@property
def debugger_progress(self):
"""调试浏览器进程"""
return self._debugger
def kill_browser(self):
"""关闭浏览器进程(如果可以)"""
pid = self.get_browser_progress_id()
if not kill_progress(pid):
self._driver.quit()
def get_browser_progress_id(self):
"""获取浏览器进程id"""
if self.debugger_progress:
return self.debugger_progress.pid
address = str(self.driver_options.debugger_address).split(':')
if len(address) == 2:
ip, port = address
if ip not in ('127.0.0.1', 'localhost') or not port.isdigit():
return None
from os import popen
txt = ''
progresses = popen(f'netstat -nao | findstr :{port}').read().split('\n')
for progress in progresses:
if 'LISTENING' in progress:
txt = progress
break
if not txt:
return None
return txt.split(' ')[-1]
def hide_browser(self):
"""隐藏浏览器界面"""
self._show_or_hide_browser()
def show_browser(self):
"""显示浏览器界面"""
self._show_or_hide_browser(False)
def _show_or_hide_browser(self, hide=True):
if system().lower() != 'windows':
raise OSError('该方法只能在Windows系统使用。')
try:
from win32gui import ShowWindow
from win32con import SW_HIDE, SW_SHOW
except ImportError:
raise ImportError('请先安装pip install pypiwin32')
pid = self.get_browser_progress_id()
if not pid:
print('只有设置了debugger_address参数才能使用 show_browser() 和 hide_browser()')
return
hds = get_chrome_hwnds_from_pid(pid)
sw = SW_HIDE if hide else SW_SHOW
for hd in hds:
ShowWindow(hd, sw)
def set_cookies(self, cookies, set_session=False, set_driver=False):
"""设置cookies
:param cookies: cookies信息可为CookieJar, list, tuple, str, dict
:param set_session: 是否设置session的cookies
:param set_driver: 是否设置driver的cookies
:return: None
"""
cookies = cookies_to_tuple(cookies)
for cookie in cookies:
if cookie['value'] is None:
cookie['value'] = ''
# 添加cookie到session
if set_session:
kwargs = {x: cookie[x] for x in cookie
if x.lower() not in ('name', 'value', 'httponly', 'expiry', 'samesite')}
if 'expiry' in cookie:
kwargs['expires'] = cookie['expiry']
self.session.cookies.set(cookie['name'], cookie['value'], **kwargs)
# 添加cookie到driver
if set_driver:
if 'expiry' in cookie:
cookie['expiry'] = int(cookie['expiry'])
try:
browser_domain = extract(self.driver.current_url).fqdn
except AttributeError:
browser_domain = ''
if not cookie.get('domain', None):
if browser_domain:
url = extract(browser_domain)
cookie_domain = f'{url.domain}.{url.suffix}'
else:
raise ValueError('cookie中没有域名或浏览器未访问过URL。')
cookie['domain'] = cookie_domain
else:
cookie_domain = cookie['domain'] if cookie['domain'][0] != '.' else cookie['domain'][1:]
if cookie_domain not in browser_domain:
self.driver.get(cookie_domain if cookie_domain.startswith('http://')
else f'http://{cookie_domain}')
# 避免selenium自动添加.后无法正确覆盖已有cookie
if cookie['domain'][0] != '.':
c = self.driver.get_cookie(cookie['name'])
if c and c['domain'] == cookie['domain']:
self.driver.delete_cookie(cookie['name'])
self.driver.add_cookie(cookie)
def _set_session(self, data):
"""根据传入字典对session进行设置
:param data: session配置字典
:return: None
"""
if self._session is None:
self._session = Session()
if 'headers' in data:
self._session.headers = CaseInsensitiveDict(data['headers'])
if 'cookies' in data:
self.set_cookies(data['cookies'], set_session=True)
attrs = ['auth', 'proxies', 'hooks', 'params', 'verify',
'cert', 'stream', 'trust_env', 'max_redirects'] # , 'adapters'
for i in attrs:
if i in data:
self._session.__setattr__(i, data[i])
def cookies_to_session(self, copy_user_agent=False):
"""把driver对象的cookies复制到session对象
:param copy_user_agent: 是否复制ua信息
:return: None
"""
if copy_user_agent:
user_agent_to_session(self.driver, self.session)
self.set_cookies(self.driver.get_cookies(), set_session=True)
def cookies_to_driver(self, url):
"""把session对象的cookies复制到driver对象
:param url: 作用域
:return: None
"""
browser_domain = extract(self.driver.current_url).fqdn
ex_url = extract(url)
if ex_url.fqdn not in browser_domain:
self.driver.get(url)
domain = f'{ex_url.domain}.{ex_url.suffix}'
cookies = []
for cookie in self.session.cookies:
if cookie.domain == '':
cookie.domain = domain
if domain in cookie.domain:
cookies.append(cookie)
self.set_cookies(cookies, set_driver=True)
def close_driver(self, kill=False):
"""关闭driver和浏览器"""
if self._driver:
kill_progress(port=self._driver.service.port) # 关闭chromedriver.exe进程
if kill:
self.kill_browser()
else:
self._driver.quit()
self._driver = None
def close_session(self):
"""关闭session"""
if self._session:
self._session.close()
self._session = None
def close(self):
"""关闭session、driver和浏览器"""
if self._driver:
self.close_driver()
if self._session:
self.close_session()
def user_agent_to_session(driver, session):
"""把driver的user-agent复制到session
:param driver: 来源driver对象
:param session: 目标session对象
:return: None
"""
driver = driver
session = session
selenium_user_agent = driver.execute_script("return navigator.userAgent;")
session.headers.update({"User-Agent": selenium_user_agent})
def create_driver(chrome_path, driver_path, options):
"""创建 WebDriver 对象
:param chrome_path: chrome.exe 路径
:param driver_path: chromedriver.exe 路径
:param options: Options 对象
:return: WebDriver 对象
"""
try:
debugger_address = options.debugger_address
if options.debugger_address:
options = Options()
options.debugger_address = debugger_address
return webdriver.Chrome(driver_path, options=options)
# 若版本不对,获取对应 chromedriver 再试
except (WebDriverException, SessionNotCreatedException):
print('打开失败尝试获取driver。\n')
from DrissionPage.easy_set import get_match_driver, get_chrome_path
if chrome_path == 'chrome':
chrome_path = get_chrome_path(show_msg=False, from_ini=False)
if chrome_path:
driver_path = get_match_driver(chrome_path=chrome_path, check_version=False, show_msg=True)
if driver_path:
try:
options.binary_location = chrome_path
return webdriver.Chrome(driver_path, options=options)
except Exception:
pass
print('无法启动请检查浏览器路径或手动设置chromedriver。\n下载地址http://npm.taobao.org/mirrors/chromedriver/')
exit(0)
def get_chrome_hwnds_from_pid(pid):
"""通过PID查询句柄ID"""
try:
from win32gui import IsWindow, GetWindowText, EnumWindows
from win32process import GetWindowThreadProcessId
except ImportError:
raise ImportError('请先安装win32guipip install pypiwin32')
def callback(hwnd, hds):
if IsWindow(hwnd) and '- Google Chrome' in GetWindowText(hwnd):
_, found_pid = GetWindowThreadProcessId(hwnd)
if str(found_pid) == str(pid):
hds.append(hwnd)
return True
hwnds = []
EnumWindows(callback, hwnds)
return hwnds
def kill_progress(pid=None, port=None):
"""关闭浏览器进程
:param pid: 进程id
:param port: 端口号如没有进程id从端口号获取
:return: 是否成功
"""
from os import popen
if system().lower() != 'windows':
return False
pid = pid or get_pid_from_port(port)
if not pid:
return False
if popen(f'tasklist | findstr {pid}').read().lower().startswith('chrome.exe'):
popen(f'taskkill /pid {pid} /F')
return True
else:
return False

View File

@ -0,0 +1,96 @@
# -*- encoding: utf-8 -*-
"""
@Author : g1879
@Contact : g1879@qq.com
"""
from subprocess import Popen
from typing import Union
from requests import Session
from requests.cookies import RequestsCookieJar
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.chrome.webdriver import WebDriver
from selenium.webdriver.remote.webdriver import WebDriver as RemoteWebDriver
from DrissionPage.configs.driver_options import DriverOptions
from DrissionPage.configs.session_options import SessionOptions
class Drission(object):
def __init__(self,
driver_or_options: Union[RemoteWebDriver, Options, DriverOptions, bool] = None,
session_or_options: Union[Session, dict, SessionOptions, bool] = None,
ini_path: str = None,
proxy: dict = None):
self._session: Session = ...
self._session_options: dict = ...
self._proxy: dict = ...
self._driver: WebDriver = ...
self._debugger: Popen = ...
self._driver_options: DriverOptions = ...
def __del__(self): ...
@property
def session(self) -> Session: ...
@property
def driver(self) -> WebDriver: ...
@property
def driver_options(self) -> Union[DriverOptions, Options]: ...
@property
def session_options(self) -> dict: ...
@session_options.setter
def session_options(self, options: Union[dict, SessionOptions]) -> None: ...
@property
def proxy(self) -> Union[None, dict]: ...
@proxy.setter
def proxy(self, proxies: dict = None) -> None: ...
@property
def debugger_progress(self): ...
def kill_browser(self) -> None: ...
def get_browser_progress_id(self) -> Union[str, None]: ...
def hide_browser(self) -> None: ...
def show_browser(self) -> None: ...
def _show_or_hide_browser(self, hide: bool = True) -> None: ...
def set_cookies(self,
cookies: Union[RequestsCookieJar, list, tuple, str, dict],
set_session: bool = False,
set_driver: bool = False) -> None: ...
def _set_session(self, data: dict) -> None: ...
def cookies_to_session(self, copy_user_agent: bool = False) -> None: ...
def cookies_to_driver(self, url: str) -> None: ...
def close_driver(self, kill: bool = False) -> None: ...
def close_session(self) -> None: ...
def close(self) -> None: ...
def user_agent_to_session(driver: RemoteWebDriver, session: Session) -> None: ...
def create_driver(chrome_path: str, driver_path: str, options: Options) -> WebDriver: ...
def get_chrome_hwnds_from_pid(pid: str) -> list: ...
def kill_progress(pid: str = None, port: int = None) -> bool: ...

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,326 @@
# -*- coding:utf-8 -*-
"""
@Author : g1879
@Contact : g1879@qq.com
"""
from typing import Union, List, Any, Tuple
from selenium.webdriver.remote.webdriver import WebDriver as RemoteWebDriver
from selenium.webdriver.remote.webelement import WebElement
from selenium.webdriver.support.select import Select as SeleniumSelect
from .driver_page import DriverPage
from .mix_page import MixPage
from .shadow_root_element import ShadowRootElement
from .base import DrissionElement
from .session_element import SessionElement
class DriverElement(DrissionElement):
def __init__(self, ele: WebElement, page: Union[DriverPage, MixPage] = None):
self._inner_ele: WebElement = ...
self._select: Select = ...
self._scroll: Scroll = ...
self.page: Union[DriverPage, MixPage] = ...
def __repr__(self) -> str: ...
def __call__(self,
loc_or_str: Union[Tuple[str, str], str],
timeout: float = None) -> Union['DriverElement', str, None]: ...
# -----------------共有属性和方法-------------------
@property
def inner_ele(self) -> WebElement: ...
@property
def tag(self) -> str: ...
@property
def html(self) -> str: ...
@property
def inner_html(self) -> str: ...
@property
def attrs(self) -> dict: ...
@property
def text(self) -> str: ...
@property
def raw_text(self) -> str: ...
def attr(self, attr: str) -> str: ...
def ele(self,
loc_or_str: Union[Tuple[str, str], str],
timeout: float = None) -> Union['DriverElement', str, None]: ...
def eles(self,
loc_or_str: Union[Tuple[str, str], str],
timeout: float = None) -> List[Union['DriverElement', str]]: ...
def s_ele(self, loc_or_str: Union[Tuple[str, str], str] = None) -> Union[SessionElement, str, None]: ...
def s_eles(self, loc_or_str: Union[Tuple[str, str], str]) -> List[Union[SessionElement, str]]: ...
def _ele(self,
loc_or_str: Union[Tuple[str, str], str],
timeout: float = None,
single: bool = True,
relative: bool = False) -> Union['DriverElement', str, None, List[Union['DriverElement', str]]]: ...
def _get_ele_path(self, mode) -> str: ...
# -----------------driver独有属性和方法-------------------
@property
def size(self) -> dict: ...
@property
def location(self) -> dict: ...
@property
def shadow_root(self) -> ShadowRootElement: ...
@property
def sr(self) -> ShadowRootElement: ...
@property
def pseudo_before(self) -> str: ...
@property
def pseudo_after(self) -> str: ...
@property
def select(self) -> Select: ...
@property
def scroll(self) -> Scroll: ...
def parent(self, level_or_loc: Union[tuple, str, int] = 1) -> Union['DriverElement', None]: ...
def prev(self,
index: int = 1,
filter_loc: Union[tuple, str] = '',
timeout: float = 0) -> Union['DriverElement', str, None]: ...
def next(self,
index: int = 1,
filter_loc: Union[tuple, str] = '',
timeout: float = 0) -> Union['DriverElement', str, None]: ...
def before(self,
index: int = 1,
filter_loc: Union[tuple, str] = '',
timeout: float = None) -> Union['DriverElement', str, None]: ...
def after(self,
index: int = 1,
filter_loc: Union[tuple, str] = '',
timeout: float = None) -> Union['DriverElement', str, None]: ...
def prevs(self,
filter_loc: Union[tuple, str] = '',
timeout: float = 0) -> List[Union['DriverElement', str]]: ...
def nexts(self,
filter_loc: Union[tuple, str] = '',
timeout: float = 0) -> List[Union['DriverElement', str]]: ...
def befores(self,
filter_loc: Union[tuple, str] = '',
timeout: float = None) -> List[Union['DriverElement', str]]: ...
def afters(self,
filter_loc: Union[tuple, str] = '',
timeout: float = None) -> List[Union['DriverElement', str]]: ...
def left(self, index: int = 1, filter_loc: Union[tuple, str] = '') -> DriverElement: ...
def right(self, index: int = 1, filter_loc: Union[tuple, str] = '') -> 'DriverElement': ...
def above(self, index: int = 1, filter_loc: Union[tuple, str] = '') -> 'DriverElement': ...
def below(self, index: int = 1, filter_loc: Union[tuple, str] = '') -> 'DriverElement': ...
def near(self, index: int = 1, filter_loc: Union[tuple, str] = '') -> 'DriverElement': ...
def lefts(self, filter_loc: Union[tuple, str] = '') -> List['DriverElement']: ...
def rights(self, filter_loc: Union[tuple, str] = '') -> List['DriverElement']: ...
def aboves(self, filter_loc: Union[tuple, str] = '') -> List['DriverElement']: ...
def belows(self, filter_loc: Union[tuple, str] = '') -> List['DriverElement']: ...
def nears(self, filter_loc: Union[tuple, str] = '') -> List['DriverElement']: ...
def wait_ele(self,
loc_or_ele: Union[str, tuple, DrissionElement, WebElement],
timeout: float = None) -> 'ElementWaiter': ...
def style(self, style: str, pseudo_ele: str = '') -> str: ...
def click(self, by_js: bool = None, timeout: float = None) -> bool: ...
def click_at(self,
x: Union[int, str] = None,
y: Union[int, str] = None,
by_js: bool = False) -> None: ...
def r_click(self) -> None: ...
def r_click_at(self, x: Union[int, str] = None, y: Union[int, str] = None) -> None: ...
def input(self,
vals: Union[str, tuple],
clear: bool = True,
insure: bool = True,
timeout: float = None) -> bool: ...
def run_script(self, script: str, *args) -> Any: ...
def submit(self) -> Union[bool, None]: ...
def clear(self, insure: bool = True) -> Union[None, bool]: ...
def is_selected(self) -> bool: ...
def is_enabled(self) -> bool: ...
def is_displayed(self) -> bool: ...
def is_valid(self) -> bool: ...
def screenshot(self, path: str = None, filename: str = None, as_bytes: bool = False) -> Union[str, bytes]: ...
def prop(self, prop: str) -> str: ...
def set_prop(self, prop: str, value: str) -> bool: ...
def set_attr(self, attr: str, value: str) -> bool: ...
def remove_attr(self, attr: str) -> bool: ...
def drag(self, x: int, y: int, speed: int = 40, shake: bool = True) -> None: ...
def drag_to(self,
ele_or_loc: Union[tuple, WebElement, DrissionElement],
speed: int = 40,
shake: bool = True) -> None: ...
def hover(self, x: int = None, y: int = None) -> None: ...
def _get_relative_eles(self,
mode: str,
loc: Union[tuple, str] = '') -> Union[List['DriverElement'], 'DriverElement']: ...
def make_driver_ele(page_or_ele: Union[DriverPage, MixPage, DriverElement, ShadowRootElement],
loc: Union[str, Tuple[str, str]],
single: bool = True,
timeout: float = None) -> Union[DriverElement, str, None, List[Union[DriverElement, str]]]: ...
class ElementsByXpath(object):
def __init__(self, page, xpath: str = None, single: bool = False, timeout: float = 10):
self.single: bool = ...
self.xpath: str = ...
self.page: Union[MixPage, DriverPage] = ...
def __call__(self, ele_or_driver: Union[RemoteWebDriver, WebElement]) \
-> Union[str, DriverElement, None, List[str or DriverElement]]: ...
class Select(object):
def __init__(self, ele: DriverElement):
self.select_ele: SeleniumSelect = ...
self.inner_ele: DriverElement = ...
def __call__(self, text_or_index: Union[str, int, list, tuple], timeout: float = None) -> bool: ...
@property
def is_multi(self) -> bool: ...
@property
def options(self) -> List[DriverElement]: ...
@property
def selected_option(self) -> Union[DriverElement, None]: ...
@property
def selected_options(self) -> List[DriverElement]: ...
def clear(self) -> None: ...
def select(self, text_or_index: Union[str, int, list, tuple], timeout: float = None) -> bool: ...
def select_by_value(self, value: Union[str, list, tuple], timeout: float = None) -> bool: ...
def deselect(self, text_or_index: Union[str, int, list, tuple], timeout: float = None) -> bool: ...
def deselect_by_value(self, value: Union[str, list, tuple], timeout: float = None) -> bool: ...
def invert(self) -> None: ...
def _select(self,
text_value_index: Union[str, int, list, tuple] = ...,
para_type: str = 'text',
deselect: bool = False,
timeout: float = None) -> bool: ...
def _select_multi(self,
text_value_index: Union[list, tuple] = None,
para_type: str = 'text',
deselect: bool = False) -> bool: ...
class ElementWaiter(object):
def __init__(self,
page_or_ele,
loc_or_ele: Union[str, tuple, DriverElement, WebElement],
timeout: float = None):
self.target: Union[DriverElement, WebElement, tuple] = ...
self.timeout: float = ...
self.driver: Union[WebElement, RemoteWebDriver] = ...
def delete(self) -> bool: ...
def display(self) -> bool: ...
def hidden(self) -> bool: ...
def _wait_ele(self, mode: str) -> bool: ...
class Scroll(object):
def __init__(self, page_or_ele):
self.driver: Union[DriverElement, DriverPage] = ...
self.t1: str = ...
self.t2: str = ...
def to_top(self) -> None: ...
def to_bottom(self) -> None: ...
def to_half(self) -> None: ...
def to_rightmost(self) -> None: ...
def to_leftmost(self) -> None: ...
def to_location(self, x: int, y: int) -> None: ...
def up(self, pixel: int = 300) -> None: ...
def down(self, pixel: int = 300) -> None: ...
def left(self, pixel: int = 300) -> None: ...
def right(self, pixel: int = 300) -> None: ...

View File

@ -0,0 +1,611 @@
# -*- coding:utf-8 -*-
"""
@Author : g1879
@Contact : g1879@qq.com
"""
from glob import glob
from os import sep
from pathlib import Path
from time import sleep, perf_counter
from selenium.common.exceptions import NoAlertPresentException
from selenium.webdriver.remote.webelement import WebElement
from selenium.webdriver.support.wait import WebDriverWait
from .base import BasePage
from DrissionPage.commons.tools import get_usable_path
from .driver_element import DriverElement, make_driver_ele, Scroll, ElementWaiter
from .session_element import make_session_ele
class DriverPage(BasePage):
"""DriverPage封装了页面操作的常用功能使用selenium来获取、解析、操作网页"""
def __init__(self, driver, timeout=10):
"""初始化函数接收一个WebDriver对象用来操作网页"""
super().__init__(timeout)
self._driver = driver
self._wait_object = None
self._scroll = None
def __call__(self, loc_or_str, timeout=None):
"""在内部查找元素
ele = page('@id=ele_id')
:param loc_or_str: 元素的定位信息可以是loc元组或查询字符串
:param timeout: 超时时间
:return: DriverElement对象或属性文本
"""
return self.ele(loc_or_str, timeout)
# -----------------共有属性和方法-------------------
@property
def url(self):
"""返回当前网页url"""
if not self._driver or not self.driver.current_url.startswith('http'):
return None
else:
return self.driver.current_url
@property
def html(self):
"""返回页面的html文本"""
return self.driver.find_element('xpath', "//*").get_attribute("outerHTML")
@property
def json(self):
"""当返回内容是json格式时返回对应的字典"""
from json import loads
return loads(self('t:pre').text)
def get(self, url, show_errmsg=False, retry=None, interval=None):
"""访问url
:param url: 目标url
:param show_errmsg: 是否显示和抛出异常
:param retry: 重试次数
:param interval: 重试间隔
:return: 目标url是否可用返回None表示不确定
"""
retry, interval = self._before_connect(url, retry, interval)
self._url_available = self._d_connect(self._url, times=retry, interval=interval, show_errmsg=show_errmsg)
return self._url_available
def ele(self, loc_or_ele, timeout=None):
"""返回页面中符合条件的第一个元素
:param loc_or_ele: 元素的定位信息可以是元素对象loc元组或查询字符串
:param timeout: 查找元素超时时间默认与页面等待时间一致
:return: DriverElement对象或属性文本
"""
return self._ele(loc_or_ele, timeout)
def eles(self, loc_or_str, timeout=None):
"""返回页面中所有符合条件的元素
:param loc_or_str: 元素的定位信息可以是loc元组或查询字符串
:param timeout: 查找元素超时时间默认与页面等待时间一致
:return: DriverElement对象或属性文本组成的列表
"""
return self._ele(loc_or_str, timeout, single=False)
def s_ele(self, loc_or_ele=None):
"""查找第一个符合条件的元素以SessionElement形式返回处理复杂页面时效率很高
:param loc_or_ele: 元素的定位信息可以是loc元组或查询字符串
:return: SessionElement对象或属性文本
"""
if isinstance(loc_or_ele, DriverElement):
return make_session_ele(loc_or_ele)
else:
return make_session_ele(self, loc_or_ele)
def s_eles(self, loc_or_str):
"""查找所有符合条件的元素以SessionElement列表形式返回
:param loc_or_str: 元素的定位信息可以是loc元组或查询字符串
:return: SessionElement对象组成的列表
"""
return make_session_ele(self, loc_or_str, single=False)
def _ele(self, loc_or_ele, timeout=None, single=True):
"""返回页面中符合条件的元素,默认返回第一个
:param loc_or_ele: 元素的定位信息可以是元素对象loc元组或查询字符串
:param timeout: 查找元素超时时间
:param single: True则返回第一个False则返回全部
:return: DriverElement对象
"""
# 接收到字符串或元组获取定位loc元组
if isinstance(loc_or_ele, (str, tuple)):
return make_driver_ele(self, loc_or_ele, single, timeout)
# 接收到DriverElement对象直接返回
elif isinstance(loc_or_ele, DriverElement):
return loc_or_ele
# 接收到WebElement对象打包成DriverElement对象返回
elif isinstance(loc_or_ele, WebElement):
return DriverElement(loc_or_ele, self)
# 接收到的类型不正确,抛出异常
else:
raise ValueError('loc_or_str参数只能是tuple、str、DriverElement 或 WebElement类型。')
def get_cookies(self, as_dict=False):
"""返回当前网站cookies"""
if as_dict:
return {cookie['name']: cookie['value'] for cookie in self.driver.get_cookies()}
else:
return self.driver.get_cookies()
@property
def timeout(self):
"""返回查找元素时等待的秒数"""
return self._timeout
@timeout.setter
def timeout(self, second):
"""设置查找元素时等待的秒数"""
self._timeout = second
self._wait_object = None
def _d_connect(self, to_url, times=0, interval=1, show_errmsg=False):
"""尝试连接,重试若干次
:param to_url: 要访问的url
:param times: 重试次数
:param interval: 重试间隔
:param show_errmsg: 是否抛出异常
:return: 是否成功返回None表示不确定
"""
err = None
is_ok = False
for _ in range(times + 1):
try:
self.driver.get(to_url)
go_ok = True
except Exception as e:
err = e
go_ok = False
is_ok = self.check_page() if go_ok else False
if is_ok is not False:
break
if _ < times:
sleep(interval)
if show_errmsg:
print(f'重试 {to_url}')
if is_ok is False and show_errmsg:
raise err if err is not None else ConnectionError('连接异常。')
return is_ok
# ----------------driver独有属性和方法-----------------------
@property
def driver(self):
return self._driver
@property
def wait_object(self):
"""返回WebDriverWait对象重用避免每次新建对象"""
if self._wait_object is None:
self._wait_object = WebDriverWait(self.driver, timeout=self.timeout)
return self._wait_object
@property
def timeouts(self):
"""返回三种超时时间selenium4以上版本可用"""
return {'implicit': self.timeout,
'pageLoad': self.driver.timeouts.page_load,
'script': self.driver.timeouts.script}
@property
def tabs_count(self):
"""返回标签页数量"""
try:
return len(self.driver.window_handles)
except Exception:
return 0
@property
def tab_handles(self):
"""返回所有标签页handle列表"""
return self.driver.window_handles
@property
def current_tab_index(self):
"""返回当前标签页序号"""
return self.driver.window_handles.index(self.driver.current_window_handle)
@property
def current_tab_handle(self):
"""返回当前标签页handle"""
return self.driver.current_window_handle
@property
def active_ele(self):
"""返回当前焦点所在元素"""
return DriverElement(self.driver.switch_to.active_element, self)
@property
def scroll(self):
"""用于滚动滚动条的对象"""
if self._scroll is None:
self._scroll = Scroll(self)
return self._scroll
@property
def to_frame(self):
"""用于跳转到frame的对象调用其方法实现跳转
示例
page.to_frame.by_loc('tag:iframe') - 通过传入frame的查询字符串定位
page.to_frame.by_loc((By.TAG_NAME, 'iframe')) - 通过传入定位符定位
page.to_frame.by_id('iframe_id') - 通过frame的id属性定位
page.to_frame('iframe_name') - 通过frame的name属性定位
page.to_frame(iframe_element) - 通过传入元素对象定位
page.to_frame(0) - 通过frame的序号定位
page.to_frame.main() - 跳到最顶层
page.to_frame.parent() - 跳到上一层
"""
return ToFrame(self)
def set_timeouts(self, implicit=None, pageLoad=None, script=None):
"""设置超时时间单位为秒selenium4以上版本有效
:param implicit: 查找元素超时时间
:param pageLoad: 页面加载超时时间
:param script: 脚本运行超时时间
:return: None
"""
if implicit is not None:
self.timeout = implicit
if pageLoad is not None:
self.driver.set_page_load_timeout(pageLoad)
if script is not None:
self.driver.set_script_timeout(script)
def wait_ele(self, loc_or_ele, timeout=None):
"""等待元素从dom删除、显示、隐藏
:param loc_or_ele: 可以是元素查询字符串loc元组
:param timeout: 等待超时时间
:return: 用于等待的ElementWaiter对象
"""
return ElementWaiter(self, loc_or_ele, timeout)
def check_page(self):
"""检查页面是否符合预期
由子类自行实现各页面的判定规则
"""
return None
def run_script(self, script, *args):
"""执行js代码
:param script: js文本
:param args: 传入的参数
:return: js执行结果
"""
return self.driver.execute_script(script, *args)
def run_async_script(self, script, *args):
"""以异步方式执行js代码
:param script: js文本
:param args: 传入的参数
:return: js执行结果
"""
return self.driver.execute_async_script(script, *args)
def run_cdp(self, cmd, **cmd_args):
"""执行Chrome DevTools Protocol语句
:param cmd: 协议项目
:param cmd_args: 参数
:return: 执行的结果
"""
return self.driver.execute_cdp_cmd(cmd, cmd_args)
def create_tab(self, url=''):
"""新建并定位到一个标签页,该标签页在最后面
:param url: 新标签页跳转到的网址
:return: None
"""
self.driver.switch_to.new_window('tab')
if url:
self.get(url)
def close_tabs(self, num_or_handles=None):
"""关闭传入的标签页,默认关闭当前页。可传入多个
注意当程序使用的是接管的浏览器获取到的 handle 顺序和视觉效果不一致不能按序号关闭
:param num_or_handles:要关闭的标签页序号或handle可传入handle和序号组成的列表或元组为None时关闭当前页
:return: None
"""
tabs = (self.current_tab_handle,) if num_or_handles is None else get_handles(self.tab_handles, num_or_handles)
for i in tabs:
self.driver.switch_to.window(i)
self.driver.close()
self.to_tab(0)
def close_other_tabs(self, num_or_handles=None):
"""关闭传入的标签页以外标签页,默认保留当前页。可传入多个
注意当程序使用的是接管的浏览器获取到的 handle 顺序和视觉效果不一致不能按序号关闭
:param num_or_handles: 要保留的标签页序号或handle可传入handle和序号组成的列表或元组为None时保存当前页
:return: None
"""
all_tabs = self.driver.window_handles
reserve_tabs = {self.current_tab_handle} if num_or_handles is None else get_handles(all_tabs, num_or_handles)
for i in set(all_tabs) - reserve_tabs:
self.driver.switch_to.window(i)
self.driver.close()
self.to_tab(0)
def to_tab(self, num_or_handle=0):
"""跳转到标签页
注意当程序使用的是接管的浏览器获取到的 handle 顺序和视觉效果不一致
:param num_or_handle: 标签页序号或handle字符串序号第一个为0最后为-1
:return: None
"""
try:
tab = int(num_or_handle)
except (ValueError, TypeError):
tab = num_or_handle
tab = self.driver.window_handles[tab] if isinstance(tab, int) else tab
self.driver.switch_to.window(tab)
def set_ua_to_tab(self, ua):
"""为当前tab设置user agent只在当前tab有效
:param ua: user agent字符串
:return: None
"""
self.driver.execute_cdp_cmd("Network.setUserAgentOverride", {"userAgent": ua})
def get_session_storage(self, item=None):
"""获取sessionStorage信息不设置item则获取全部
:param item: 要获取的项不设置则返回全部
:return: sessionStorage一个或所有项内容
"""
js = f'return sessionStorage.getItem("{item}");' if item else 'return sessionStorage;'
return self.run_script(js)
def get_local_storage(self, item=None):
"""获取localStorage信息不设置item则获取全部
:param item: 要获取的项目不设置则返回全部
:return: localStorage一个或所有项内容
"""
js = f'return localStorage.getItem("{item}");' if item else 'return localStorage;'
return self.run_script(js)
def set_session_storage(self, item, value):
"""设置或删除某项sessionStorage信息
:param item: 要设置的项
:param value: 项的值设置为False时删除该项
:return: None
"""
s = f'sessionStorage.removeItem("{item}");' if item is False else f'sessionStorage.setItem("{item}","{value}");'
self.run_script(s)
def set_local_storage(self, item, value):
"""设置或删除某项localStorage信息
:param item: 要设置的项
:param value: 项的值设置为False时删除该项
:return: None
"""
s = f'localStorage.removeItem("{item}");' if item is False else f'localStorage.setItem("{item}","{value}");'
self.run_script(s)
def clean_cache(self, session_storage=True, local_storage=True, cache=True, cookies=True):
"""清除缓存,可选要清除的项
:param session_storage: 是否清除sessionStorage
:param local_storage: 是否清除localStorage
:param cache: 是否清除cache
:param cookies: 是否清除cookies
:return: None
"""
if session_storage:
self.run_script('sessionStorage.clear();')
if local_storage:
self.run_script('localStorage.clear();')
if cache:
self.run_cdp('Network.clearBrowserCache')
if cookies:
self.run_cdp('Network.clearBrowserCookies')
def screenshot(self, path=None, filename=None, as_bytes=False):
"""截取页面可见范围截图
:param path: 保存路径
:param filename: 图片文件名不传入时以页面title命名
:param as_bytes: 是否已字节形式返回图片为True时上面两个参数失效
:return: 图片完整路径或字节文本
"""
if as_bytes:
return self.driver.get_screenshot_as_png()
name = filename or self.title
if not name.lower().endswith('.png'):
name = f'{name}.png'
path = Path(path or '.').absolute()
path.mkdir(parents=True, exist_ok=True)
img_path = str(get_usable_path(f'{path}{sep}{name}'))
self.driver.save_screenshot(img_path)
return img_path
def scroll_to_see(self, loc_or_ele):
"""滚动页面直到元素可见
:param loc_or_ele: 元素的定位信息可以是loc元组或查询字符串详见ele函数注释
:return: None
"""
ele = self.ele(loc_or_ele)
ele.run_script("arguments[0].scrollIntoView();")
def refresh(self):
"""刷新当前页面"""
self.driver.refresh()
def stop_loading(self):
"""强制停止页面加载"""
self.run_cdp('Page.stopLoading')
def back(self):
"""在浏览历史中后退一步"""
self.driver.back()
def forward(self):
"""在浏览历史中前进一步"""
self.driver.forward()
def set_window_size(self, width=None, height=None):
"""设置浏览器窗口大小默认最大化任一参数为0最小化
:param width: 浏览器窗口高
:param height: 浏览器窗口宽
:return: None
"""
if width is None and height is None:
self.driver.maximize_window()
elif width == 0 or height == 0:
self.driver.minimize_window()
else:
if width < 0 or height < 0:
raise ValueError('x 和 y参数必须大于0。')
new_x = width or self.driver.get_window_size()['width']
new_y = height or self.driver.get_window_size()['height']
self.driver.set_window_size(new_x, new_y)
def chrome_downloading(self, download_path):
"""返回浏览器下载中的文件列表
:param download_path: 下载文件夹路径
:return: 文件列表
"""
return glob(f'{download_path}{sep}*.crdownload')
def process_alert(self, ok=True, send=None, timeout=None):
"""处理提示框
:param ok: True表示确认False表示取消其它值不会按按钮但依然返回文本值
:param send: 处理prompt提示框时可输入文本
:param timeout: 等待提示框出现的超时时间
:return: 提示框内容文本未等到提示框则返回None
"""
def do_it():
try:
return self.driver.switch_to.alert
except NoAlertPresentException:
return False
timeout = timeout if timeout is not None else self.timeout
t1 = perf_counter()
alert = do_it()
while alert is False and perf_counter() - t1 <= timeout:
alert = do_it()
if alert is False:
return None
res_text = alert.text
if send is not None:
alert.send_keys(send)
if ok is True:
alert.accept()
elif ok is False:
alert.dismiss()
return res_text
class ToFrame(object):
"""用于处理焦点跳转到页面框架的类"""
def __init__(self, page):
self.page = page
def __call__(self, condition='main'):
"""跳转到(i)frame可传入id、name、序号、元素对象、定位符
:param condition: (i)frame可传入idname序号元素对象定位符
:return: 当前页面对象
"""
if isinstance(condition, (DriverElement, WebElement)):
self.by_ele(condition)
elif isinstance(condition, int):
self.by_index(condition)
elif ':' not in condition and '=' not in condition and not condition.startswith(('#', '.', '@')):
self.by_id(condition)
else:
self.by_loc(condition)
return self.page
def main(self):
"""焦点跳转到最高层级框架"""
self.page.driver.switch_to.default_content()
return self.page
def parent(self, level=1):
"""焦点跳转到上级框架,可指定上级层数
:param level: 上面第几层框架
:return: 框架所在页面对象
"""
if level < 1:
raise ValueError('level参数须是大于0的整数。')
for _ in range(level):
self.page.driver.switch_to.parent_frame()
return self.page
def by_id(self, id_):
"""焦点跳转到id为该值的(i)frame
:param id_: (i)frame的id属性值
:return: 框架所在页面对象
"""
self.page.driver.switch_to.frame(id_)
return self.page
def by_name(self, name):
"""焦点跳转到name为该值的(i)frame
:param name: (i)frame的name属性值
:return: 框架所在页面对象
"""
self.page.driver.switch_to.frame(name)
return self.page
def by_index(self, index):
"""焦点跳转到页面中第几个(i)frame
:param index: 页面中第几个(i)frame
:return: 框架所在页面对象
"""
self.page.driver.switch_to.frame(index)
return self.page
def by_loc(self, loc):
"""焦点跳转到根据定位符获取到的(i)frame
:param loc: 定位符支持selenium原生和DriverPage定位符
:return: 框架所在页面对象
"""
self.page.driver.switch_to.frame(self.page(loc).inner_ele)
return self.page
def by_ele(self, ele):
"""焦点跳转到传入的(i)frame元素对象
:param ele: (i)frame元素对象
:return: 框架所在页面对象
"""
if isinstance(ele, DriverElement):
ele = ele.inner_ele
self.page.driver.switch_to.frame(ele)
return self.page
def get_handles(handles, num_or_handles):
"""返回指定标签页组成的set
:param handles: handles列表
:param num_or_handles: 指定的标签页可以是多个
:return: 指定标签页组成的set
"""
if isinstance(num_or_handles, (int, str)):
num_or_handles = (num_or_handles,)
elif not isinstance(num_or_handles, (list, tuple)):
raise TypeError('num_or_handle参数只能是int、str、list 或 tuple类型。')
return set(i if isinstance(i, str) else handles[i] for i in num_or_handles)

View File

@ -0,0 +1,189 @@
# -*- coding:utf-8 -*-
"""
@Author : g1879
@Contact : g1879@qq.com
"""
from typing import Union, List, Any, Tuple
from selenium.webdriver.chrome.webdriver import WebDriver
from selenium.webdriver.remote.webdriver import WebDriver as RemoteWebDriver
from selenium.webdriver.remote.webelement import WebElement
from selenium.webdriver.support.wait import WebDriverWait
from .base import BasePage
from .driver_element import DriverElement, Scroll, ElementWaiter
from .mix_page import MixPage
from .session_element import SessionElement
class DriverPage(BasePage):
def __init__(self, driver: RemoteWebDriver, timeout: float = 10) -> None:
self._driver: RemoteWebDriver = ...
self._url: str = ...
self._wait_object: WebDriverWait = ...
self._scroll: Scroll = ...
def __call__(self, loc_or_str: Union[Tuple[str, str], str, DriverElement, WebElement],
timeout: float = None) -> Union[DriverElement, str, None]: ...
# -----------------共有属性和方法-------------------
@property
def url(self) -> Union[str, None]: ...
@property
def html(self) -> str: ...
@property
def json(self) -> dict: ...
def get(self,
url: str,
show_errmsg: bool = False,
retry: int = None,
interval: float = None) -> Union[None, bool]: ...
def ele(self,
loc_or_ele: Union[Tuple[str, str], str, DriverElement, WebElement],
timeout: float = None) -> Union[DriverElement, str, None]: ...
def eles(self,
loc_or_str: Union[Tuple[str, str], str],
timeout: float = None) -> List[Union[DriverElement, str]]: ...
def s_ele(self, loc_or_ele: Union[Tuple[str, str], str, DriverElement] = None) \
-> Union[SessionElement, str, None]: ...
def s_eles(self, loc_or_str: Union[Tuple[str, str], str]) -> List[Union[SessionElement, str]]: ...
def _ele(self,
loc_or_ele: Union[Tuple[str, str], str, DriverElement, WebElement],
timeout: float = None,
single: bool = True) -> Union[DriverElement, str, None, List[Union[DriverElement, str]]]: ...
def get_cookies(self, as_dict: bool = False) -> Union[list, dict]: ...
@property
def timeout(self) -> float: ...
@timeout.setter
def timeout(self, second: float) -> None: ...
def _d_connect(self,
to_url: str,
times: int = 0,
interval: float = 1,
show_errmsg: bool = False) -> Union[bool, None]: ...
# ----------------driver独有属性和方法-----------------------
@property
def driver(self) -> WebDriver: ...
@property
def wait_object(self) -> WebDriverWait: ...
@property
def timeouts(self) -> dict: ...
@property
def tabs_count(self) -> int: ...
@property
def tab_handles(self) -> list: ...
@property
def current_tab_index(self) -> int: ...
@property
def current_tab_handle(self) -> str: ...
@property
def active_ele(self) -> DriverElement: ...
@property
def scroll(self) -> Scroll: ...
@property
def to_frame(self) -> ToFrame: ...
def set_timeouts(self, implicit: float = None, pageLoad: float = None, script: float = None) -> None: ...
def wait_ele(self,
loc_or_ele: Union[str, tuple, DriverElement, WebElement],
timeout: float = None) -> ElementWaiter: ...
def check_page(self) -> Union[bool, None]: ...
def run_script(self, script: str, *args) -> Any: ...
def run_async_script(self, script: str, *args) -> Any: ...
def run_cdp(self, cmd: str, **cmd_args) -> Any: ...
def create_tab(self, url: str = '') -> None: ...
def close_tabs(self, num_or_handles: Union[int, str, list, tuple] = None) -> None: ...
def close_other_tabs(self, num_or_handles: Union[int, str, list, tuple] = None) -> None: ...
def to_tab(self, num_or_handle: Union[int, str] = 0) -> None: ...
def set_ua_to_tab(self, ua: str) -> None: ...
def get_session_storage(self, item: str = None) -> Union[str, dict, None]: ...
def get_local_storage(self, item: str = None) -> Union[str, dict, None]: ...
def set_session_storage(self, item: str, value: Union[str, bool]) -> None: ...
def set_local_storage(self, item: str, value: Union[str, bool]) -> None: ...
def clean_cache(self,
session_storage: bool = True,
local_storage: bool = True,
cache: bool = True,
cookies: bool = True) -> None: ...
def screenshot(self, path: str = None, filename: str = None, as_bytes: bool = False) -> Union[str, bytes]: ...
def scroll_to_see(self, loc_or_ele: Union[str, tuple, WebElement, DriverElement]) -> None: ...
def refresh(self) -> None: ...
def stop_loading(self) -> None: ...
def back(self) -> None: ...
def forward(self) -> None: ...
def set_window_size(self, width: int = None, height: int = None) -> None: ...
def chrome_downloading(self, download_path: str) -> list: ...
def process_alert(self, ok: bool = True, send: str = None, timeout: float = None) -> Union[str, None]: ...
class ToFrame(object):
def __init__(self, page: DriverPage):
self.page: DriverPage = ...
def __call__(self, condition: Union[int, str, tuple, WebElement, DriverElement] = 'main') -> Union[
DriverPage, MixPage]: ...
def main(self) -> DriverPage: ...
def parent(self, level: int = 1) -> DriverPage: ...
def by_id(self, id_: str) -> DriverPage: ...
def by_name(self, name: str) -> DriverPage: ...
def by_index(self, index: int) -> DriverPage: ...
def by_loc(self, loc: Union[str, tuple]) -> DriverPage: ...
def by_ele(self, ele: Union[DriverElement, WebElement]) -> DriverPage: ...
def get_handles(handles: list, num_or_handles: Union[int, str, list, tuple]) -> set: ...

View File

@ -0,0 +1,344 @@
# -*- coding:utf-8 -*-
"""
@Author : g1879
@Contact : g1879@qq.com
"""
from .base import BasePage
from .drission import Drission
from .driver_page import DriverPage
from .session_page import SessionPage
class MixPage(SessionPage, DriverPage, BasePage):
"""MixPage整合了DriverPage和SessionPage封装了对页面的操作
可在seleniumd模式和requestss模式间无缝切换
切换的时候会自动同步cookies
获取信息功能为两种模式共有操作页面元素功能只有d模式有
调用某种模式独有的功能会自动切换到该模式
"""
def __init__(self, mode='d', drission=None, timeout=None, driver_options=None, session_options=None):
"""初始化函数
:param mode: 'd' 's'即driver模式和session模式
:param drission: Drission对象不传入时会自动创建有传入时driver_options和session_options参数无效
:param timeout: 超时时间d模式时为寻找元素时间s模式时为连接时间默认10秒
:param driver_options: 浏览器设置没传入drission参数时会用这个设置新建Drission对象中的WebDriver对象传入False则不创建
:param session_options: requests设置没传入drission参数时会用这个设置新建Drission对象中的Session对象传入False则不创建
"""
self._mode = mode.lower()
if self._mode not in ('s', 'd'):
raise ValueError('mode参数只能是s或d。')
super(DriverPage, self).__init__(timeout)
self._driver, self._session = (None, True) if self._mode == 's' else (True, None)
self._drission = drission or Drission(driver_options, session_options)
self._wait_object = None
self._response = None
self._scroll = None
self._download_set = None
self._download_path = None
if self._mode == 'd':
try:
timeouts = self.drission.driver_options.timeouts
t = timeout if isinstance(timeout, (int, float)) else timeouts['implicit']
self.set_timeouts(t, timeouts['pageLoad'], timeouts['script'])
except Exception:
self.timeout = timeout if timeout is not None else 10
def __call__(self, loc_or_str, timeout=None):
"""在内部查找元素
ele = page('@id=ele_id')
:param loc_or_str: 元素的定位信息可以是loc元组或查询字符串
:param timeout: 超时时间
:return: 子元素对象或属性文本
"""
if self._mode == 's':
return super().__call__(loc_or_str)
elif self._mode == 'd':
return super(SessionPage, self).__call__(loc_or_str, timeout)
# -----------------共有属性和方法-------------------
@property
def url(self):
"""返回当前url"""
if self._mode == 'd':
return self._drission.driver.current_url if self._driver else None
elif self._mode == 's':
return self._session_url
@property
def title(self):
"""返回网页title"""
if self._mode == 's':
return super().title
elif self._mode == 'd':
return super(SessionPage, self).title
@property
def html(self):
"""返回页面html文本"""
if self._mode == 's':
return super().html
elif self._mode == 'd':
return super(SessionPage, self).html
@property
def json(self):
"""当返回内容是json格式时返回对应的字典"""
if self._mode == 's':
return super().json
elif self._mode == 'd':
return super(SessionPage, self).json
def get(self, url, show_errmsg=False, retry=None, interval=None, **kwargs):
"""跳转到一个url
:param url: 目标url
:param show_errmsg: 是否显示和抛出异常
:param retry: 重试次数
:param interval: 重试间隔
:param kwargs: 连接参数s模式专用
:return: url是否可用d模式返回None时表示不确定
"""
if self._mode == 'd':
return super(SessionPage, self).get(url, show_errmsg, retry, interval)
elif self._mode == 's':
return super().get(url, show_errmsg, retry, interval, **kwargs)
def ele(self, loc_or_ele, timeout=None):
"""返回第一个符合条件的元素、属性或节点文本
:param loc_or_ele: 元素的定位信息可以是元素对象loc元组或查询字符串
:param timeout: 查找元素超时时间默认与页面等待时间一致
:return: 元素对象或属性文本节点文本
"""
if self._mode == 's':
return super().ele(loc_or_ele)
elif self._mode == 'd':
return super(SessionPage, self).ele(loc_or_ele, timeout=timeout)
def eles(self, loc_or_str, timeout=None):
"""返回页面中所有符合条件的元素、属性或节点文本
:param loc_or_str: 元素的定位信息可以是loc元组或查询字符串
:param timeout: 查找元素超时时间默认与页面等待时间一致
:return: 元素对象或属性文本组成的列表
"""
if self._mode == 's':
return super().eles(loc_or_str)
elif self._mode == 'd':
return super(SessionPage, self).eles(loc_or_str, timeout=timeout)
def s_ele(self, loc_or_ele=None):
"""查找第一个符合条件的元素以SessionElement形式返回d模式处理复杂页面时效率很高
:param loc_or_ele: 元素的定位信息可以是loc元组或查询字符串
:return: SessionElement对象或属性文本
"""
if self._mode == 's':
return super().s_ele(loc_or_ele)
elif self._mode == 'd':
return super(SessionPage, self).s_ele(loc_or_ele)
def s_eles(self, loc_or_str):
"""查找所有符合条件的元素以SessionElement形式返回d模式处理复杂页面时效率很高
:param loc_or_str: 元素的定位信息可以是loc元组或查询字符串
:return: SessionElement对象或属性文本组成的列表
"""
if self._mode == 's':
return super().s_eles(loc_or_str)
elif self._mode == 'd':
return super(SessionPage, self).s_eles(loc_or_str)
def _ele(self, loc_or_ele, timeout=None, single=True):
"""返回页面中符合条件的元素、属性或节点文本,默认返回第一个
:param loc_or_ele: 元素的定位信息可以是元素对象loc元组或查询字符串
:param timeout: 查找元素超时时间d模式专用
:param single: True则返回第一个False则返回全部
:return: 元素对象或属性文本节点文本
"""
if self._mode == 's':
return super()._ele(loc_or_ele, single=single)
elif self._mode == 'd':
return super(SessionPage, self)._ele(loc_or_ele, timeout=timeout, single=single)
def get_cookies(self, as_dict=False, all_domains=False):
"""返回cookies
:param as_dict: 是否以字典方式返回
:param all_domains: 是否返回所有域的cookies
:return: cookies信息
"""
if self._mode == 's':
return super().get_cookies(as_dict, all_domains)
elif self._mode == 'd':
return super(SessionPage, self).get_cookies(as_dict)
# ----------------MixPage独有属性和方法-----------------------
@property
def drission(self):
"""返回当前使用的 Dirssion 对象"""
return self._drission
@property
def driver(self):
"""返回 driver 对象,如没有则创建
每次访问时切换到 d 模式用于独有函数及外部调用
:return: WebDriver对象
"""
self.change_mode('d')
return self._drission.driver
@property
def session(self):
"""返回 Session 对象,如没有则创建"""
return self._drission.session
@property
def response(self):
"""返回 s 模式获取到的 Response 对象,切换到 s 模式"""
self.change_mode('s')
return self._response
@property
def mode(self):
"""返回当前模式,'s''d' """
return self._mode
@property
def _session_url(self):
"""返回 session 保存的url"""
return self._response.url if self._response else None
def change_mode(self, mode=None, go=True, copy_cookies=True):
"""切换模式,接收's''d',除此以外的字符串会切换为 d 模式
切换时会把当前模式的cookies复制到目标模式
切换后如果go是True调用相应的get函数使访问的页面同步
注意s转d时若浏览器当前网址域名和s模式不一样必须会跳转
:param mode: 模式字符串
:param go: 是否跳转到原模式的url
:param copy_cookies: 是否复制cookies到目标模式
"""
if mode is not None and mode.lower() == self._mode:
return
self._mode = 's' if self._mode == 'd' else 'd'
# s模式转d模式
if self._mode == 'd':
self._driver = True
self._url = None if not self._driver else self._drission.driver.current_url
if self._session_url:
if copy_cookies:
self.cookies_to_driver(self._session_url)
if go:
self.get(self._session_url)
# d模式转s模式
elif self._mode == 's':
self._session = True
self._url = self._session_url
if self._driver:
if copy_cookies:
self.cookies_to_session()
if go and self._drission.driver.current_url.startswith('http'):
self.get(self._drission.driver.current_url)
def set_cookies(self, cookies, refresh=True):
"""设置cookies
:param cookies: cookies信息可为CookieJar, list, tuple, str, dict
:param refresh: 设置cookies后是否刷新页面
:return: None
"""
if self._mode == 's':
self.drission.set_cookies(cookies, set_session=True)
elif self._mode == 'd':
self.drission.set_cookies(cookies, set_driver=True)
if refresh:
self.refresh()
def cookies_to_session(self, copy_user_agent=False):
"""从driver复制cookies到session
:param copy_user_agent : 是否复制user agent信息
"""
self._drission.cookies_to_session(copy_user_agent)
def cookies_to_driver(self, url=None):
"""从session复制cookies到driver
chrome需要指定域才能接收cookies
:param url: 目标域
:return: None
"""
url = url or self._session_url
self._drission.cookies_to_driver(url)
def check_page(self, by_requests=False):
"""d模式时检查网页是否符合预期
默认由response状态检查可重载实现针对性检查
:param by_requests: 是否用内置response检查
:return: bool或NoneNone代表不知道结果
"""
if self._session_url and self._session_url == self.url:
return self._response.ok
# 使用requests访问url并判断可用性
if by_requests:
self.cookies_to_session()
r = self._make_response(self.url, retry=0)[0]
return r.ok if r else False
def close_driver(self):
"""关闭driver及浏览器"""
self._driver = None
self.drission.close_driver(True)
def close_session(self):
"""关闭session"""
self._session = None
self._response = None
self.drission.close_session()
# ----------------重写SessionPage的函数-----------------------
def post(self, url, data=None, show_errmsg=False, retry=None, interval=None, **kwargs):
"""用post方式跳转到url会切换到s模式
:param url: 目标url
:param data: post方式时提交的数据
:param show_errmsg: 是否显示和抛出异常
:param retry: 重试次数
:param interval: 重试间隔
:param kwargs: 连接参数
:return: url是否可用
"""
self.change_mode('s', go=False)
return super().post(url, data, show_errmsg, retry, interval, **kwargs)
@property
def download(self):
"""返回下载器对象"""
if self.mode == 'd':
self.cookies_to_session()
return super().download
def chrome_downloading(self, path=None):
"""返回浏览器下载中的文件列表
:param path: 下载文件夹路径默认读取配置信息
:return: 正在下载的文件列表
"""
try:
path = path or self._drission.driver_options.experimental_options['prefs']['download.default_directory']
if not path:
raise ValueError('未指定下载路径。')
except Exception:
raise IOError('无法找到下载路径。')
return super().chrome_downloading(path)
# ----------------MixPage独有函数-----------------------
def hide_browser(self):
"""隐藏浏览器窗口"""
self.drission.hide_browser()
def show_browser(self):
"""显示浏览器窗口"""
self.drission.show_browser()

View File

@ -0,0 +1,156 @@
# -*- coding:utf-8 -*-
"""
@Author : g1879
@Contact : g1879@qq.com
"""
from typing import Union, List, Tuple, Any
from DownloadKit import DownloadKit
from requests import Response, Session
from requests.cookies import RequestsCookieJar
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.chrome.webdriver import WebDriver
from selenium.webdriver.remote.webelement import WebElement
from .base import BasePage
from DrissionPage.configs.session_options import SessionOptions
from DrissionPage.configs.driver_options import DriverOptions
from .drission import Drission
from .driver_element import DriverElement
from .driver_page import DriverPage
from .session_element import SessionElement
from .session_page import SessionPage
class MixPage(SessionPage, DriverPage, BasePage):
def __init__(self,
mode: str = 'd',
drission: Union[Drission, str] = None,
timeout: float = None,
driver_options: Union[Options, DriverOptions, bool] = None,
session_options: Union[dict, SessionOptions, bool] = None) -> None:
self._mode: str = ...
self._drission: Drission = ...
def __call__(self,
loc_or_str: Union[Tuple[str, str], str, DriverElement, SessionElement, WebElement],
timeout: float = None) -> Union[DriverElement, SessionElement, str, None]: ...
# -----------------共有属性和方法-------------------
@property
def url(self) -> Union[str, None]: ...
@property
def title(self) -> str: ...
@property
def html(self) -> str: ...
@property
def json(self) -> dict: ...
def get(self,
url: str,
show_errmsg: bool | None = False,
retry: int | None = None,
interval: float | None = None,
timeout: float | None = ...,
params: dict | None = ...,
data: Union[dict, str, None] = ...,
json: Union[dict, str, None] = ...,
headers: dict | None = ...,
cookies: Any | None = ...,
files: Any | None = ...,
auth: Any | None = ...,
allow_redirects: bool = ...,
proxies: dict | None = ...,
hooks: Any | None = ...,
stream: Any | None = ...,
verify: Any | None = ...,
cert: Any | None = ...) -> Union[bool, None]: ...
def ele(self,
loc_or_ele: Union[Tuple[str, str], str, DriverElement, SessionElement, WebElement],
timeout: float = None) -> Union[DriverElement, SessionElement, str, None]: ...
def eles(self,
loc_or_str: Union[Tuple[str, str], str],
timeout: float = None) -> List[Union[DriverElement, SessionElement, str]]: ...
def s_ele(self, loc_or_ele: Union[Tuple[str, str], str, DriverElement, SessionElement] = None) \
-> Union[SessionElement, str, None]: ...
def s_eles(self, loc_or_str: Union[Tuple[str, str], str]) -> List[Union[SessionElement, str]]: ...
def _ele(self,
loc_or_ele: Union[Tuple[str, str], str, DriverElement, SessionElement, WebElement],
timeout: float = None, single: bool = False) \
-> Union[DriverElement, SessionElement, str, None, List[Union[SessionElement, str]], List[
Union[DriverElement, str]]]: ...
def get_cookies(self, as_dict: bool = False, all_domains: bool = False) -> Union[dict, list]: ...
# ----------------MixPage独有属性和方法-----------------------
@property
def drission(self) -> Drission: ...
@property
def driver(self) -> WebDriver: ...
@property
def session(self) -> Session: ...
@property
def response(self) -> Response: ...
@property
def mode(self) -> str: ...
@property
def _session_url(self) -> str: ...
def change_mode(self, mode: str = None, go: bool = True, copy_cookies: bool = True) -> None: ...
def set_cookies(self, cookies: Union[RequestsCookieJar, list, tuple, str, dict], refresh: bool = True) -> None: ...
def cookies_to_session(self, copy_user_agent: bool = False) -> None: ...
def cookies_to_driver(self, url: str = None) -> None: ...
def check_page(self, by_requests: bool = False) -> Union[bool, None]: ...
def close_driver(self) -> None: ...
def close_session(self) -> None: ...
# ----------------重写SessionPage的函数-----------------------
def post(self,
url: str,
data: Union[dict, str, None] = None,
show_errmsg: bool = False,
retry: int | None = None,
interval: float | None = None,
timeout: float | None = ...,
params: dict | None = ...,
json: Union[dict, str, None] = ...,
headers: dict | None = ...,
cookies: Any | None = ...,
files: Any | None = ...,
auth: Any | None = ...,
allow_redirects: bool = ...,
proxies: dict | None = ...,
hooks: Any | None = ...,
stream: Any | None = ...,
verify: Any | None = ...,
cert: Any | None = ...) -> bool: ...
@property
def download(self) -> DownloadKit: ...
def chrome_downloading(self, path: str = None) -> list: ...
# ----------------MixPage独有函数-----------------------
def hide_browser(self) -> None: ...
def show_browser(self) -> None: ...

View File

@ -0,0 +1,357 @@
# -*- coding:utf-8 -*-
"""
@Author : g1879
@Contact : g1879@qq.com
"""
from html import unescape
from re import match, DOTALL
from lxml.etree import tostring
from lxml.html import HtmlElement, fromstring
from .base import DrissionElement, BasePage, BaseElement
from DrissionPage.commons.locator import get_loc
from DrissionPage.commons.web import get_ele_txt, make_absolute_link
class SessionElement(DrissionElement):
"""session模式的元素对象包装了一个lxml的Element对象并封装了常用功能"""
def __init__(self, ele, page=None):
"""初始化对象
:param ele: 被包装的HtmlElement元素
:param page: 元素所在页面对象如果是从 html 文本生成的元素则为 None
"""
super().__init__(page)
self._inner_ele = ele
@property
def inner_ele(self):
return self._inner_ele
def __repr__(self):
attrs = [f"{attr}='{self.attrs[attr]}'" for attr in self.attrs]
return f'<SessionElement {self.tag} {" ".join(attrs)}>'
def __call__(self, loc_or_str, timeout=None):
"""在内部查找元素
ele2 = ele1('@id=ele_id')
:param loc_or_str: 元素的定位信息可以是loc元组或查询字符串
:param timeout: 不起实际作用用于和DriverElement对应便于无差别调用
:return: SessionElement对象或属性文本
"""
return self.ele(loc_or_str)
@property
def tag(self):
"""返回元素类型"""
return self._inner_ele.tag
@property
def html(self):
"""返回outerHTML文本"""
html = tostring(self._inner_ele, method="html").decode()
return unescape(html[:html.rfind('>') + 1]) # tostring()会把跟紧元素的文本节点也带上,因此要去掉
@property
def inner_html(self):
"""返回元素innerHTML文本"""
r = match(r'<.*?>(.*)</.*?>', self.html, flags=DOTALL)
return '' if not r else r.group(1)
@property
def attrs(self):
"""返回元素所有属性及值"""
return {attr: self.attr(attr) for attr, val in self.inner_ele.items()}
@property
def text(self):
"""返回元素内所有文本"""
return get_ele_txt(self)
@property
def raw_text(self):
"""返回未格式化处理的元素内文本"""
return str(self._inner_ele.text_content())
def parent(self, level_or_loc=1):
"""返回上面某一级父元素,可指定层数或用查询语法定位
:param level_or_loc: 第几级父元素或定位符
:return: 上级元素对象
"""
return super().parent(level_or_loc)
def prev(self, filter_loc='', index=1, timeout=None):
"""返回前面的一个兄弟元素,可用查询语法筛选,可指定返回筛选结果的第几个
:param filter_loc: 用于筛选的查询语法
:param index: 前面第几个查询结果
:param timeout: 查找节点的超时时间
:return: 兄弟元素
"""
return super().prev(index, filter_loc, timeout)
def next(self, filter_loc='', index=1, timeout=None):
"""返回后面的一个兄弟元素,可用查询语法筛选,可指定返回筛选结果的第几个
:param filter_loc: 用于筛选的查询语法
:param index: 后面第几个查询结果
:param timeout: 查找节点的超时时间
:return: 兄弟元素
"""
return super().next(index, filter_loc, timeout)
def before(self, filter_loc='', index=1, timeout=None):
"""返回前面的一个兄弟元素,可用查询语法筛选,可指定返回筛选结果的第几个
:param filter_loc: 用于筛选的查询语法
:param index: 前面第几个查询结果
:param timeout: 查找节点的超时时间
:return: 本元素前面的某个元素或节点
"""
return super().before(index, filter_loc, timeout)
def after(self, filter_loc='', index=1, timeout=None):
"""返回后面的一个兄弟元素,可用查询语法筛选,可指定返回筛选结果的第几个
:param filter_loc: 用于筛选的查询语法
:param index: 后面第几个查询结果
:param timeout: 查找节点的超时时间
:return: 本元素后面的某个元素或节点
"""
return super().after(index, filter_loc, timeout)
def prevs(self, filter_loc='', timeout=None):
"""返回前面全部兄弟元素或节点组成的列表,可用查询语法筛选
:param filter_loc: 用于筛选的查询语法
:param timeout: 查找节点的超时时间
:return: 兄弟元素或节点文本组成的列表
"""
return super().prevs(filter_loc, timeout)
def nexts(self, filter_loc='', timeout=None):
"""返回后面全部兄弟元素或节点组成的列表,可用查询语法筛选
:param filter_loc: 用于筛选的查询语法
:param timeout: 查找节点的超时时间
:return: 兄弟元素或节点文本组成的列表
"""
return super().nexts(filter_loc, timeout)
def befores(self, filter_loc='', timeout=None):
"""返回后面全部兄弟元素或节点组成的列表,可用查询语法筛选
:param filter_loc: 用于筛选的查询语法
:param timeout: 查找节点的超时时间
:return: 本元素前面的元素或节点组成的列表
"""
return super().befores(filter_loc, timeout)
def afters(self, filter_loc='', timeout=None):
"""返回前面全部兄弟元素或节点组成的列表,可用查询语法筛选
:param filter_loc: 用于筛选的查询语法
:param timeout: 查找节点的超时时间
:return: 本元素后面的元素或节点组成的列表
"""
return super().afters(filter_loc, timeout)
def attr(self, attr):
"""返回attribute属性值
:param attr: 属性名
:return: 属性值文本没有该属性返回None
"""
# 获取href属性时返回绝对url
if attr == 'href':
link = self.inner_ele.get('href')
# 若为链接为None、js或邮件直接返回
if not link or link.lower().startswith(('javascript:', 'mailto:')):
return link
else: # 其它情况直接返回绝对url
return make_absolute_link(link, self.page)
elif attr == 'src':
return make_absolute_link(self.inner_ele.get('src'), self.page)
elif attr == 'text':
return self.text
elif attr == 'innerText':
return self.raw_text
elif attr in ('html', 'outerHTML'):
return self.html
elif attr == 'innerHTML':
return self.inner_html
else:
return self.inner_ele.get(attr)
def ele(self, loc_or_str, timeout=None):
"""返回当前元素下级符合条件的第一个元素、属性或节点文本
:param loc_or_str: 元素的定位信息可以是loc元组或查询字符串
:param timeout: 不起实际作用用于和DriverElement对应便于无差别调用
:return: SessionElement对象或属性文本
"""
return self._ele(loc_or_str)
def eles(self, loc_or_str, timeout=None):
"""返回当前元素下级所有符合条件的子元素、属性或节点文本
:param loc_or_str: 元素的定位信息可以是loc元组或查询字符串
:param timeout: 不起实际作用用于和DriverElement对应便于无差别调用
:return: SessionElement对象或属性文本组成的列表
"""
return self._ele(loc_or_str, single=False)
def s_ele(self, loc_or_str=None):
"""返回当前元素下级符合条件的第一个元素、属性或节点文本
:param loc_or_str: 元素的定位信息可以是loc元组或查询字符串
:return: SessionElement对象或属性文本
"""
return self._ele(loc_or_str)
def s_eles(self, loc_or_str):
"""返回当前元素下级所有符合条件的子元素、属性或节点文本
:param loc_or_str: 元素的定位信息可以是loc元组或查询字符串
:return: SessionElement对象或属性文本组成的列表
"""
return self._ele(loc_or_str, single=False)
def _ele(self, loc_or_str, timeout=None, single=True, relative=False):
"""返回当前元素下级符合条件的子元素、属性或节点文本,默认返回第一个
:param loc_or_str: 元素的定位信息可以是loc元组或查询字符串
:param timeout: 不起实际作用用于和父类对应
:param single: True则返回第一个False则返回全部
:param relative: WebPage用的表示是否相对定位的参数
:return: SessionElement对象
"""
return make_session_ele(self, loc_or_str, single)
def _get_ele_path(self, mode):
"""获取css路径或xpath路径
:param mode: 'css' 'xpath'
:return: css路径或xpath路径
"""
path_str = ''
ele = self
while ele:
if mode == 'css':
brothers = len(ele.eles(f'xpath:./preceding-sibling::*'))
path_str = f'>:nth-child({brothers + 1}){path_str}'
else:
brothers = len(ele.eles(f'xpath:./preceding-sibling::{ele.tag}'))
path_str = f'/{ele.tag}[{brothers + 1}]{path_str}' if brothers > 0 else f'/{ele.tag}{path_str}'
ele = ele.parent()
return f':root{path_str[1:]}' if mode == 'css' else path_str
def make_session_ele(html_or_ele, loc=None, single=True):
"""从接收到的对象或html文本中查找元素返回SessionElement对象
如要直接从html生成SessionElement而不在下级查找loc输入None即可
:param html_or_ele: html文本BaseParser对象
:param loc: 定位元组或字符串为None时不在下级查找返回根元素
:param single: True则返回第一个False则返回全部
:return: 返回SessionElement元素或列表或属性文本
"""
# ---------------处理定位符---------------
if not loc:
if isinstance(html_or_ele, SessionElement):
return html_or_ele if single else [html_or_ele]
loc = ('xpath', '.')
elif isinstance(loc, (str, tuple)):
loc = get_loc(loc)
else:
raise ValueError("定位符必须为str或长度为2的tuple。")
# ---------------根据传入对象类型获取页面对象和lxml元素对象---------------
the_type = str(type(html_or_ele))
# SessionElement
if the_type.endswith(".SessionElement'>"):
page = html_or_ele.page
loc_str = loc[1]
if loc[0] == 'xpath' and loc[1].lstrip().startswith('/'):
loc_str = f'.{loc[1]}'
html_or_ele = html_or_ele.inner_ele
# 若css以>开头表示找元素的直接子元素要用page以绝对路径才能找到
elif loc[0] == 'css selector' and loc[1].lstrip().startswith('>'):
loc_str = f'{html_or_ele.css_path}{loc[1]}'
if html_or_ele.page:
html_or_ele = fromstring(html_or_ele.page.html)
else: # 接收html文本无page的情况
html_or_ele = fromstring(html_or_ele('xpath:/ancestor::*').html)
else:
html_or_ele = html_or_ele.inner_ele
loc = loc[0], loc_str
# ChromiumElement, DriverElement
elif the_type.endswith((".ChromiumElement'>", ".DriverElement'>")):
loc_str = loc[1]
if loc[0] == 'xpath' and loc[1].lstrip().startswith('/'):
loc_str = f'.{loc[1]}'
elif loc[0] == 'css selector' and loc[1].lstrip().startswith('>'):
loc_str = f'{html_or_ele.css_path}{loc[1]}'
loc = loc[0], loc_str
# 获取整个页面html再定位到当前元素以实现查找上级元素
page = html_or_ele.page
xpath = html_or_ele.xpath
if hasattr(html_or_ele, 'doc_id'): # ChromiumElement兼容传入的元素在iframe内的情况
html = html_or_ele.page.run_cdp('DOM.getOuterHTML', objectId=html_or_ele.doc_id)['outerHTML']
else:
html = html_or_ele.page.html
html_or_ele = fromstring(html)
html_or_ele = html_or_ele.xpath(xpath)[0]
# 各种页面对象
elif isinstance(html_or_ele, BasePage):
page = html_or_ele
html_or_ele = fromstring(html_or_ele.html)
# 直接传入html文本
elif isinstance(html_or_ele, str):
page = None
html_or_ele = fromstring(html_or_ele)
# ShadowRootElement, ChromiumShadowRoot, ChromiumFrame
elif isinstance(html_or_ele, BaseElement) or the_type.endswith(".ChromiumFrame'>"):
page = html_or_ele.page
html_or_ele = fromstring(html_or_ele.html)
else:
raise TypeError('html_or_ele参数只能是元素、页面对象或html文本。')
# ---------------执行查找-----------------
try:
if loc[0] == 'xpath': # 用lxml内置方法获取lxml的元素对象列表
ele = html_or_ele.xpath(loc[1])
else: # 用css selector获取元素对象列表
ele = html_or_ele.cssselect(loc[1])
if not isinstance(ele, list): # 结果不是列表,如数字
return ele
# 把lxml元素对象包装成SessionElement对象并按需要返回第一个或全部
if single:
ele = ele[0] if ele else None
if isinstance(ele, HtmlElement):
return SessionElement(ele, page)
elif isinstance(ele, str):
return ele
else:
return None
else: # 返回全部
return [SessionElement(e, page) if isinstance(e, HtmlElement) else e for e in ele if e != '\n']
except Exception as e:
if 'Invalid expression' in str(e):
raise SyntaxError(f'无效的xpath语句{loc}')
elif 'Expected selector' in str(e):
raise SyntaxError(f'无效的css select语句{loc}')
raise e

View File

@ -0,0 +1,114 @@
# -*- coding:utf-8 -*-
"""
@Author : g1879
@Contact : g1879@qq.com
"""
from typing import Union, List, Tuple
from lxml.html import HtmlElement
from .base import DrissionElement, BaseElement
from .driver_element import DriverElement
from .driver_page import DriverPage
from .session_page import SessionPage
class SessionElement(DrissionElement):
def __init__(self, ele: HtmlElement, page: Union[SessionPage, None] = None):
self._inner_ele: HtmlElement = ...
self.page: SessionPage = ...
@property
def inner_ele(self) -> HtmlElement: ...
def __repr__(self) -> str: ...
def __call__(self,
loc_or_str: Union[Tuple[str, str], str],
timeout: float = None) -> Union['SessionElement', str, None]: ...
@property
def tag(self) -> str: ...
@property
def html(self) -> str: ...
@property
def inner_html(self) -> str: ...
@property
def attrs(self) -> dict: ...
@property
def text(self) -> str: ...
@property
def raw_text(self) -> str: ...
def parent(self, level_or_loc: Union[tuple, str, int] = 1) -> Union['SessionElement', None]: ...
def prev(self,
filter_loc: Union[tuple, str] = '',
index: int = 1,
timeout: float = None) -> Union['SessionElement', str, None]: ...
def next(self,
filter_loc: Union[tuple, str] = '',
index: int = 1,
timeout: float = None) -> Union['SessionElement', str, None]: ...
def before(self,
filter_loc: Union[tuple, str] = '',
index: int = 1,
timeout: float = None) -> Union['SessionElement', str, None]: ...
def after(self,
filter_loc: Union[tuple, str] = '',
index: int = 1,
timeout: float = None) -> Union['SessionElement', str, None]: ...
def prevs(self,
filter_loc: Union[tuple, str] = '',
timeout: float = None) -> List[Union['SessionElement', str]]: ...
def nexts(self,
filter_loc: Union[tuple, str] = '',
timeout: float = None) -> List[Union['SessionElement', str]]: ...
def befores(self,
filter_loc: Union[tuple, str] = '',
timeout: float = None) -> List[Union['SessionElement', str]]: ...
def afters(self,
filter_loc: Union[tuple, str] = '',
timeout: float = None) -> List[Union['SessionElement', str]]: ...
def attr(self, attr: str) -> Union[str, None]: ...
def ele(self,
loc_or_str: Union[Tuple[str, str], str],
timeout: float = None) -> Union['SessionElement', str, None]: ...
def eles(self,
loc_or_str: Union[Tuple[str, str], str],
timeout: float = None) -> List[Union['SessionElement', str]]: ...
def s_ele(self,
loc_or_str: Union[Tuple[str, str], str] = None) -> Union['SessionElement', str, None]: ...
def s_eles(self,
loc_or_str: Union[Tuple[str, str], str]) -> List[Union['SessionElement', str]]: ...
def _ele(self,
loc_or_str: Union[Tuple[str, str], str],
timeout: float = None,
single: bool = True,
relative: bool = False) -> Union['SessionElement', str, None, List[Union['SessionElement', str]]]: ...
def _get_ele_path(self, mode: str) -> str: ...
def make_session_ele(html_or_ele: Union[str, SessionElement, SessionPage, DriverElement, BaseElement, DriverPage],
loc: Union[str, Tuple[str, str]] = None,
single: bool = True) -> Union[SessionElement, str, None, List[Union[SessionElement, str]]]: ...

View File

@ -0,0 +1,533 @@
# -*- coding:utf-8 -*-
"""
@Author : g1879
@Contact : g1879@qq.com
"""
from re import search
from time import sleep
from urllib.parse import urlparse
from DownloadKit import DownloadKit
from requests import Session, Response
from requests.structures import CaseInsensitiveDict
from tldextract import extract
from .base import BasePage
from DrissionPage.configs.session_options import SessionOptions
from DrissionPage.commons.web import cookie_to_dict, set_session_cookies
from .session_element import SessionElement, make_session_ele
class SessionPage(BasePage):
"""SessionPage封装了页面操作的常用功能使用requests来获取、解析网页"""
def __init__(self, session_or_options=None, timeout=None):
"""
:param session_or_options: Session对象或SessionOptions对象
:param timeout: 连接超时时间为None时从ini文件读取
"""
self._response = None
self._download_set = None
self._session = None
self._set = None
self._set_start_options(session_or_options, None)
self._set_runtime_settings()
self._create_session()
timeout = timeout if timeout is not None else self.timeout
super().__init__(timeout)
def _set_start_options(self, session_or_options, none):
"""启动配置
:param session_or_options: SessionSessionOptions
:param none: 用于后代继承
:return: None
"""
if not session_or_options or isinstance(session_or_options, SessionOptions):
self._session_options = session_or_options or SessionOptions(session_or_options)
elif isinstance(session_or_options, Session):
self._session_options = SessionOptions()
self._session = session_or_options
def _set_runtime_settings(self):
"""设置运行时用到的属性"""
self._timeout = self._session_options.timeout
self._download_path = self._session_options.download_path
def _create_session(self):
"""创建内建Session对象"""
if not self._session:
self._set_session(self._session_options)
def _set_session(self, opt):
"""根据传入字典对session进行设置
:param opt: session配置字典
:return: None
"""
self._session = Session()
if opt.headers:
self._session.headers = CaseInsensitiveDict(opt.headers)
if opt.cookies:
self.set.cookies(opt.cookies)
if opt.adapters:
for url, adapter in opt.adapters:
self._session.mount(url, adapter)
attrs = ['auth', 'proxies', 'hooks', 'params', 'verify',
'cert', 'stream', 'trust_env', 'max_redirects']
for i in attrs:
attr = opt.__getattribute__(i)
if attr:
self._session.__setattr__(i, attr)
def __call__(self, loc_or_str, timeout=None):
"""在内部查找元素
ele2 = ele1('@id=ele_id')
:param loc_or_str: 元素的定位信息可以是loc元组或查询字符串
:param timeout: 不起实际作用用于和ChromiumElement对应便于无差别调用
:return: SessionElement对象或属性文本
"""
return self.ele(loc_or_str)
# -----------------共有属性和方法-------------------
@property
def title(self):
"""返回网页title"""
ele = self.ele('xpath://title')
return ele.text if ele else None
@property
def url(self):
"""返回当前访问url"""
return self._url
@property
def html(self):
"""返回页面的html文本"""
return self.response.text if self.response else ''
@property
def json(self):
"""当返回内容是json格式时返回对应的字典非json格式时返回None"""
try:
return self.response.json()
except Exception:
return None
@property
def download_path(self):
"""返回下载路径"""
return self._download_path
@property
def download_set(self):
"""返回用于设置下载参数的对象"""
if self._download_set is None:
self._download_set = DownloadSetter(self)
return self._download_set
@property
def download(self):
"""返回下载器对象"""
return self.download_set.DownloadKit
@property
def session(self):
"""返回session对象"""
return self._session
@property
def response(self):
"""返回访问url得到的response对象"""
return self._response
@property
def set(self):
"""返回用于等待的对象"""
if self._set is None:
self._set = SessionPageSetter(self)
return self._set
def get(self, url, show_errmsg=False, retry=None, interval=None, timeout=None, **kwargs):
"""用get方式跳转到url
:param url: 目标url
:param show_errmsg: 是否显示和抛出异常
:param retry: 重试次数
:param interval: 重试间隔
:param timeout: 连接超时时间
:param kwargs: 连接参数
:return: url是否可用
"""
return self._s_connect(url, 'get', None, show_errmsg, retry, interval, **kwargs)
def ele(self, loc_or_ele, timeout=None):
"""返回页面中符合条件的第一个元素、属性或节点文本
:param loc_or_ele: 元素的定位信息可以是元素对象loc元组或查询字符串
:param timeout: 不起实际作用用于和ChromiumElement对应便于无差别调用
:return: SessionElement对象或属性文本
"""
return self._ele(loc_or_ele)
def eles(self, loc_or_str, timeout=None):
"""返回页面中所有符合条件的元素、属性或节点文本
:param loc_or_str: 元素的定位信息可以是loc元组或查询字符串
:param timeout: 不起实际作用用于和ChromiumElement对应便于无差别调用
:return: SessionElement对象或属性文本组成的列表
"""
return self._ele(loc_or_str, single=False)
def s_ele(self, loc_or_ele=None):
"""返回页面中符合条件的第一个元素、属性或节点文本
:param loc_or_ele: 元素的定位信息可以是元素对象loc元组或查询字符串
:return: SessionElement对象或属性文本
"""
return make_session_ele(self.html) if loc_or_ele is None else self._ele(loc_or_ele)
def s_eles(self, loc_or_str):
"""返回页面中符合条件的所有元素、属性或节点文本
:param loc_or_str: 元素的定位信息可以是元素对象loc元组或查询字符串
:return: SessionElement对象或属性文本
"""
return self._ele(loc_or_str, single=False)
def _ele(self, loc_or_ele, timeout=None, single=True):
"""返回页面中符合条件的元素、属性或节点文本,默认返回第一个
:param loc_or_ele: 元素的定位信息可以是元素对象loc元组或查询字符串
:param timeout: 不起实际作用用于和父类对应
:param single: True则返回第一个False则返回全部
:return: SessionElement对象
"""
return loc_or_ele if isinstance(loc_or_ele, SessionElement) else make_session_ele(self, loc_or_ele, single)
def get_cookies(self, as_dict=False, all_domains=False):
"""返回cookies
:param as_dict: 是否以字典方式返回
:param all_domains: 是否返回所有域的cookies
:return: cookies信息
"""
if all_domains:
cookies = self.session.cookies
else:
if self.url:
url = extract(self.url)
domain = f'{url.domain}.{url.suffix}'
cookies = tuple(x for x in self.session.cookies if domain in x.domain or x.domain == '')
else:
cookies = tuple(x for x in self.session.cookies)
if as_dict:
return {x.name: x.value for x in cookies}
else:
return [cookie_to_dict(cookie) for cookie in cookies]
def post(self, url, data=None, show_errmsg=False, retry=None, interval=None, **kwargs):
"""用post方式跳转到url
:param url: 目标url
:param data: 提交的数据
:param show_errmsg: 是否显示和抛出异常
:param retry: 重试次数
:param interval: 重试间隔
:param kwargs: 连接参数
:return: url是否可用
"""
return self._s_connect(url, 'post', data, show_errmsg, retry, interval, **kwargs)
def _s_connect(self, url, mode, data=None, show_errmsg=False, retry=None, interval=None, **kwargs):
"""执行get或post连接
:param url: 目标url
:param mode: 'get' 'post'
:param data: 提交的数据
:param show_errmsg: 是否显示和抛出异常
:param retry: 重试次数
:param interval: 重试间隔
:param kwargs: 连接参数
:return: url是否可用
"""
retry, interval = self._before_connect(url, retry, interval)
self._response, info = self._make_response(self._url, mode, data, retry, interval, show_errmsg, **kwargs)
if self._response is None:
self._url_available = False
else:
if self._response.ok:
self._url_available = True
else:
if show_errmsg:
raise ConnectionError(f'状态码:{self._response.status_code}.')
self._url_available = False
return self._url_available
def _make_response(self, url, mode='get', data=None, retry=None, interval=None, show_errmsg=False, **kwargs):
"""生成Response对象
:param url: 目标url
:param mode: 'get' 'post'
:param data: post方式要提交的数据
:param show_errmsg: 是否显示和抛出异常
:param kwargs: 其它参数
:return: tuple第一位为Response或None第二位为出错信息或'Success'
"""
kwargs = CaseInsensitiveDict(kwargs)
if 'headers' not in kwargs:
kwargs['headers'] = {}
else:
kwargs['headers'] = CaseInsensitiveDict(kwargs['headers'])
# 设置referer和host值
parsed_url = urlparse(url)
hostname = parsed_url.hostname
scheme = parsed_url.scheme
if not check_headers(kwargs, self.session.headers, 'Referer'):
kwargs['headers']['Referer'] = self.url if self.url else f'{scheme}://{hostname}'
if 'Host' not in kwargs['headers']:
kwargs['headers']['Host'] = hostname
if not check_headers(kwargs, self.session.headers, 'timeout'):
kwargs['timeout'] = self.timeout
if 'allow_redirects' not in kwargs:
kwargs['allow_redirects'] = False
r = err = None
retry = retry if retry is not None else self.retry_times
interval = interval if interval is not None else self.retry_interval
for i in range(retry + 1):
try:
if mode == 'get':
r = self.session.get(url, **kwargs)
elif mode == 'post':
r = self.session.post(url, data=data, **kwargs)
if r:
return set_charset(r), 'Success'
except Exception as e:
err = e
# if r and r.status_code in (403, 404):
# break
if i < retry:
sleep(interval)
if show_errmsg:
print(f'重试 {url}')
if r is None:
if show_errmsg:
if err:
raise err
else:
raise ConnectionError('连接失败')
return None, '连接失败' if err is None else err
if not r.ok:
if show_errmsg:
raise ConnectionError(f'状态码:{r.status_code}')
return r, f'状态码:{r.status_code}'
class SessionPageSetter(object):
def __init__(self, page):
self._page = page
def timeout(self, second):
"""设置连接超时时间
:param second: 秒数
:return: None
"""
self._page.timeout = second
def cookies(self, cookies):
"""为Session对象设置cookies
:param cookies: cookies信息
:return: None
"""
set_session_cookies(self._page.session, cookies)
def headers(self, headers):
"""设置通用的headers
:param headers: dict形式的headers
:return: None
"""
self._page.session.headers = CaseInsensitiveDict(headers)
def header(self, attr, value):
"""设置headers中一个项
:param attr: 设置名称
:param value: 设置值
:return: None
"""
self._page.session.headers[attr.lower()] = value
def user_agent(self, ua):
"""设置user agent
:param ua: user agent
:return: None
"""
self._page.session.headers['user-agent'] = ua
def proxies(self, http, https=None):
"""设置proxies参数
:param http: http代理地址
:param https: https代理地址
:return: None
"""
proxies = None if http == https is None else {'http': http, 'https': https or http}
self._page.session.proxies = proxies
def auth(self, auth):
"""设置认证元组或对象
:param auth: 认证元组或对象
:return: None
"""
self._page.session.auth = auth
def hooks(self, hooks):
"""设置回调方法
:param hooks: 回调方法
:return: None
"""
self._page.session.hooks = hooks
def params(self, params):
"""设置查询参数字典
:param params: 查询参数字典
:return: None
"""
self._page.session.params = params
def verify(self, on_off):
"""设置是否验证SSL证书
:param on_off: 是否验证 SSL 证书
:return: None
"""
self._page.session.verify = on_off
def cert(self, cert):
"""SSL客户端证书文件的路径(.pem格式),或(cert, key)元组
:param cert: 证书路径或元组
:return: None
"""
self._page.session.cert = cert
def stream(self, on_off):
"""设置是否使用流式响应内容
:param on_off: 是否使用流式响应内容
:return: None
"""
self._page.session.stream = on_off
def trust_env(self, on_off):
"""设置是否信任环境
:param on_off: 是否信任环境
:return: None
"""
self._page.session.trust_env = on_off
def max_redirects(self, times):
"""设置最大重定向次数
:param times: 最大重定向次数
:return: None
"""
self._page.session.max_redirects = times
def add_adapter(self, url, adapter):
"""添加适配器
:param url: 适配器对应url
:param adapter: 适配器对象
:return: None
"""
self._page.session.mount(url, adapter)
class DownloadSetter(object):
"""用于设置下载参数的类"""
def __init__(self, page):
self._page = page
self._DownloadKit = None
@property
def DownloadKit(self):
if self._DownloadKit is None:
self._DownloadKit = DownloadKit(session=self._page.session, goal_path=self._page.download_path)
return self._DownloadKit
@property
def if_file_exists(self):
"""返回用于设置存在同名文件时处理方法的对象"""
return FileExists(self)
def split(self, on_off):
"""设置是否允许拆分大文件用多线程下载
:param on_off: 是否启用多线程下载大文件
:return: None
"""
self.DownloadKit.split = on_off
def save_path(self, path):
"""设置下载保存路径
:param path: 下载保存路径
:return: None
"""
path = path if path is None else str(path)
self._page._download_path = path
self.DownloadKit.goal_path = path
class FileExists(object):
"""用于设置存在同名文件时处理方法"""
def __init__(self, setter):
"""
:param setter: DownloadSetter对象
"""
self._setter = setter
def __call__(self, mode):
if mode not in ('skip', 'rename', 'overwrite'):
raise ValueError("mode参数只能是'skip', 'rename', 'overwrite'")
self._setter.DownloadKit.file_exists = mode
def skip(self):
"""设为跳过"""
self._setter.DownloadKit.file_exists = 'skip'
def rename(self):
"""设为重命名,文件名后加序号"""
self._setter.DownloadKit._file_exists = 'rename'
def overwrite(self):
"""设为覆盖"""
self._setter.DownloadKit._file_exists = 'overwrite'
def check_headers(kwargs, headers, arg) -> bool:
"""检查kwargs或headers中是否有arg所示属性"""
return arg in kwargs['headers'] or arg in headers
def set_charset(response) -> Response:
"""设置Response对象的编码"""
# 在headers中获取编码
content_type = response.headers.get('content-type', '').lower()
charset = search(r'charset[=: ]*(.*)?;', content_type)
if charset:
response.encoding = charset.group(1)
# 在headers中获取不到编码且如果是网页
elif content_type.replace(' ', '').startswith('text/html'):
re_result = search(b'<meta.*?charset=[ \\\'"]*([^"\\\' />]+).*?>', response.content)
if re_result:
charset = re_result.group(1).decode()
else:
charset = response.apparent_encoding
response.encoding = charset
return response

View File

@ -0,0 +1,237 @@
# -*- coding:utf-8 -*-
"""
@Author : g1879
@Contact : g1879@qq.com
"""
from pathlib import Path
from typing import Any, Union, Tuple, List
from DownloadKit import DownloadKit
from requests import Session, Response
from requests.adapters import HTTPAdapter
from requests.auth import HTTPBasicAuth
from requests.cookies import RequestsCookieJar
from requests.structures import CaseInsensitiveDict
from .base import BasePage
from DrissionPage.configs.session_options import SessionOptions
from .session_element import SessionElement
class SessionPage(BasePage):
def __init__(self,
session_or_options: Union[Session, SessionOptions] = None,
timeout: float = None):
self._session: Session = ...
self._session_options: SessionOptions = ...
self._url: str = ...
self._response: Response = ...
self._download_path: str = ...
self._download_set: DownloadSetter = ...
self._url_available: bool = ...
self.timeout: float = ...
self.retry_times: int = ...
self.retry_interval: float = ...
self._set: SessionPageSetter = ...
def _set_start_options(self, session_or_options, none) -> None: ...
def _create_session(self) -> None: ...
def _set_session(self, opt: SessionOptions) -> None: ...
def _set_runtime_settings(self) -> None: ...
def set_cookies(self, cookies: Union[RequestsCookieJar, list, tuple, str, dict]) -> None: ...
def set_headers(self, headers: dict) -> None: ...
def set_user_agent(self, ua: str) -> None: ...
def __call__(self,
loc_or_str: Union[Tuple[str, str], str, SessionElement],
timeout: float = None) -> Union[SessionElement, str, None]: ...
# -----------------共有属性和方法-------------------
@property
def title(self) -> str: ...
@property
def url(self) -> str: ...
@property
def html(self) -> str: ...
@property
def json(self) -> Union[dict, None]: ...
@property
def download_path(self) -> str: ...
@property
def download_set(self) -> DownloadSetter: ...
def get(self,
url: str,
show_errmsg: bool | None = False,
retry: int | None = None,
interval: float | None = None,
timeout: float | None = None,
params: dict | None = ...,
data: Union[dict, str, None] = ...,
json: Union[dict, str, None] = ...,
headers: dict | None = ...,
cookies: Any | None = ...,
files: Any | None = ...,
auth: Any | None = ...,
allow_redirects: bool = ...,
proxies: dict | None = ...,
hooks: Any | None = ...,
stream: Any | None = ...,
verify: Any | None = ...,
cert: Any | None = ...) -> bool: ...
def ele(self,
loc_or_ele: Union[Tuple[str, str], str, SessionElement],
timeout: float = None) -> Union[SessionElement, str, None]: ...
def eles(self,
loc_or_str: Union[Tuple[str, str], str],
timeout: float = None) -> List[Union[SessionElement, str]]: ...
def s_ele(self,
loc_or_ele: Union[Tuple[str, str], str, SessionElement] = None) \
-> Union[SessionElement, str, None]: ...
def s_eles(self, loc_or_str: Union[Tuple[str, str], str]) -> List[Union[SessionElement, str]]: ...
def _ele(self,
loc_or_ele: Union[Tuple[str, str], str, SessionElement],
timeout: float = None,
single: bool = True) -> Union[SessionElement, str, None, List[Union[SessionElement, str]]]: ...
def get_cookies(self,
as_dict: bool = False,
all_domains: bool = False) -> Union[dict, list]: ...
# ----------------session独有属性和方法-----------------------
@property
def session(self) -> Session: ...
@property
def response(self) -> Response: ...
@property
def set(self) -> SessionPageSetter: ...
@property
def download(self) -> DownloadKit: ...
def post(self,
url: str,
data: Union[dict, str, None] = ...,
show_errmsg: bool = False,
retry: int | None = None,
interval: float | None = None,
timeout: float | None = ...,
params: dict | None = ...,
json: Union[dict, str, None] = ...,
headers: dict | None = ...,
cookies: Any | None = ...,
files: Any | None = ...,
auth: Any | None = ...,
allow_redirects: bool = ...,
proxies: dict | None = ...,
hooks: Any | None = ...,
stream: Any | None = ...,
verify: Any | None = ...,
cert: Any | None = ...) -> bool: ...
def _s_connect(self,
url: str,
mode: str,
data: Union[dict, str, None] = None,
show_errmsg: bool = False,
retry: int = None,
interval: float = None,
**kwargs) -> bool: ...
def _make_response(self,
url: str,
mode: str = 'get',
data: Union[dict, str] = None,
retry: int = None,
interval: float = None,
show_errmsg: bool = False,
**kwargs) -> tuple: ...
class SessionPageSetter(object):
def __init__(self, page: SessionPage):
self._page: SessionPage = ...
def timeout(self, second: float) -> None: ...
def cookies(self, cookies: Union[RequestsCookieJar, list, tuple, str, dict]) -> None: ...
def headers(self, headers: dict) -> None: ...
def header(self, attr: str, value: str) -> None: ...
def user_agent(self, ua: str) -> None: ...
def proxies(self, http, https=None) -> None: ...
def auth(self, auth: Union[Tuple[str, str], HTTPBasicAuth, None]) -> None: ...
def hooks(self, hooks: Union[dict, None]) -> None: ...
def params(self, params: Union[dict, None]) -> None: ...
def verify(self, on_off: Union[bool, None]) -> None: ...
def cert(self, cert: Union[str, Tuple[str, str], None]) -> None: ...
def stream(self, on_off: Union[bool, None]) -> None: ...
def trust_env(self, on_off: Union[bool, None]) -> None: ...
def max_redirects(self, times: Union[int, None]) -> None: ...
def add_adapter(self, url: str, adapter: HTTPAdapter) -> None: ...
class DownloadSetter(object):
def __init__(self, page: SessionPage):
self._page: SessionPage = ...
self._DownloadKit: DownloadKit = ...
@property
def DownloadKit(self) -> DownloadKit: ...
@property
def if_file_exists(self) -> FileExists: ...
def split(self, on_off: bool) -> None: ...
def save_path(self, path: Union[str, Path]): ...
class FileExists(object):
def __init__(self, setter: DownloadSetter):
self._setter: DownloadSetter = ...
def __call__(self, mode: str) -> None: ...
def skip(self) -> None: ...
def rename(self) -> None: ...
def overwrite(self) -> None: ...
def check_headers(kwargs: Union[dict, CaseInsensitiveDict], headers: Union[dict, CaseInsensitiveDict],
arg: str) -> bool: ...
def set_charset(response: Response) -> Response: ...

View File

@ -0,0 +1,219 @@
# -*- coding:utf-8 -*-
"""
@Author : g1879
@Contact : g1879@qq.com
"""
from time import perf_counter
from typing import Union
from selenium.webdriver.remote.webelement import WebElement
from .base import BaseElement
from DrissionPage.commons.locator import get_loc
from .driver_element import make_driver_ele
from .session_element import make_session_ele, SessionElement
class ShadowRootElement(BaseElement):
"""ShadowRootElement是用于处理ShadowRoot的类使用方法和DriverElement基本一致"""
def __init__(self, inner_ele, parent_ele):
super().__init__(parent_ele.page)
self.parent_ele = parent_ele
self._inner_ele = inner_ele
@property
def inner_ele(self):
return self._inner_ele
def __repr__(self):
return f'<ShadowRootElement in {self.parent_ele} >'
def __call__(self, loc_or_str, timeout=None):
"""在内部查找元素
ele2 = ele1('@id=ele_id')
:param loc_or_str: 元素的定位信息可以是loc元组或查询字符串
:param timeout: 超时时间
:return: DriverElement对象或属性文本
"""
return self.ele(loc_or_str, timeout)
@property
def tag(self):
"""元素标签名"""
return 'shadow-root'
@property
def html(self):
return f'<shadow_root>{self.inner_html}</shadow_root>'
@property
def inner_html(self):
"""返回内部的html文本"""
shadow_root = WebElement(self.page.driver, self.inner_ele._id)
return shadow_root.get_attribute('innerHTML')
def parent(self, level_or_loc=1):
"""返回上面某一级父元素,可指定层数或用查询语法定位
:param level_or_loc: 第几级父元素或定位符
:return: DriverElement对象
"""
if isinstance(level_or_loc, int):
loc = f'xpath:./ancestor-or-self::*[{level_or_loc}]'
elif isinstance(level_or_loc, (tuple, str)):
loc = get_loc(level_or_loc, True)
if loc[0] == 'css selector':
raise ValueError('此css selector语法不受支持请换成xpath。')
loc = f'xpath:./ancestor-or-self::{loc[1].lstrip(". / ")}'
else:
raise TypeError('level_or_loc参数只能是tuple、int或str。')
return self.parent_ele.ele(loc, timeout=0)
def next(self, index=1, filter_loc=''):
"""返回后面的一个兄弟元素,可用查询语法筛选,可指定返回筛选结果的第几个
:param index: 第几个查询结果
:param filter_loc: 用于筛选的查询语法
:return: DriverElement对象
"""
nodes = self.nexts(filter_loc=filter_loc)
return nodes[index - 1] if nodes else None
def before(self, index=1, filter_loc=''):
"""返回前面的一个兄弟元素,可用查询语法筛选,可指定返回筛选结果的第几个
:param index: 前面第几个查询结果
:param filter_loc: 用于筛选的查询语法
:return: 本元素前面的某个元素或节点
"""
nodes = self.befores(filter_loc=filter_loc)
return nodes[index - 1] if nodes else None
def after(self, index=1, filter_loc=''):
"""返回后面的一个兄弟元素,可用查询语法筛选,可指定返回筛选结果的第几个
:param index: 后面第几个查询结果
:param filter_loc: 用于筛选的查询语法
:return: 本元素后面的某个元素或节点
"""
nodes = self.afters(filter_loc=filter_loc)
return nodes[index - 1] if nodes else None
def nexts(self, filter_loc=''):
"""返回后面所有兄弟元素或节点组成的列表
:param filter_loc: 用于筛选的查询语法
:return: DriverElement对象组成的列表
"""
loc = get_loc(filter_loc, True)
if loc[0] == 'css selector':
raise ValueError('此css selector语法不受支持请换成xpath。')
loc = loc[1].lstrip('./')
xpath = f'xpath:./{loc}'
return self.parent_ele.eles(xpath, timeout=0.1)
def befores(self, filter_loc=''):
"""返回后面全部兄弟元素或节点组成的列表,可用查询语法筛选
:param filter_loc: 用于筛选的查询语法
:return: 本元素前面的元素或节点组成的列表
"""
loc = get_loc(filter_loc, True)
if loc[0] == 'css selector':
raise ValueError('此css selector语法不受支持请换成xpath。')
loc = loc[1].lstrip('./')
xpath = f'xpath:./preceding::{loc}'
return self.parent_ele.eles(xpath, timeout=0.1)
def afters(self, filter_loc=''):
"""返回前面全部兄弟元素或节点组成的列表,可用查询语法筛选
:param filter_loc: 用于筛选的查询语法
:return: 本元素后面的元素或节点组成的列表
"""
eles1 = self.nexts(filter_loc)
loc = get_loc(filter_loc, True)[1].lstrip('./')
xpath = f'xpath:./following::{loc}'
return eles1 + self.parent_ele.eles(xpath, timeout=0.1)
def ele(self, loc_or_str, timeout=None):
"""返回当前元素下级符合条件的第一个元素,默认返回
:param loc_or_str: 元素的定位信息可以是loc元组或查询字符串
:param timeout: 查找元素超时时间默认与元素所在页面等待时间一致
:return: DriverElement对象或属性文本
"""
return self._ele(loc_or_str, timeout)
def eles(self, loc_or_str, timeout=None):
"""返回当前元素下级所有符合条件的子元素
:param loc_or_str: 元素的定位信息可以是loc元组或查询字符串
:param timeout: 查找元素超时时间默认与元素所在页面等待时间一致
:return: DriverElement对象或属性文本组成的列表
"""
return self._ele(loc_or_str, timeout=timeout, single=False)
def s_ele(self, loc_or_str=None) -> Union[SessionElement, str, None]:
"""查找第一个符合条件的元素以SessionElement形式返回处理复杂页面时效率很高
:param loc_or_str: 元素的定位信息可以是loc元组或查询字符串
:return: SessionElement对象或属性文本
"""
return make_session_ele(self, loc_or_str)
def s_eles(self, loc_or_str):
"""查找所有符合条件的元素以SessionElement列表形式返回处理复杂页面时效率很高
:param loc_or_str: 元素的定位信息可以是loc元组或查询字符串
:return: SessionElement对象或属性文本
"""
return make_session_ele(self, loc_or_str, single=False)
def _ele(self, loc_or_str, timeout=None, single=True, relative=False):
"""返回当前元素下级符合条件的子元素,默认返回第一个
:param loc_or_str: 元素的定位信息可以是loc元组或查询字符串
:param timeout: 查找元素超时时间
:param single: True则返回第一个False则返回全部
:param relative: WebPage用的表示是否相对定位的参数
:return: DriverElement对象
"""
# 先转换为sessionElement再获取所有元素获取它们的css selector路径再用路径在页面上执行查找
loc = get_loc(loc_or_str)
if loc[0] == 'css selector' and str(loc[1]).startswith(':root'):
loc = loc[0], loc[1][5:]
timeout = timeout if timeout is not None else self.page.timeout
t1 = perf_counter()
eles = make_session_ele(self.html).eles(loc)
while not eles and perf_counter() - t1 <= timeout:
eles = make_session_ele(self.html).eles(loc)
if not eles:
return None if single else eles
css_paths = [i.css_path[47:] for i in eles]
if single:
return make_driver_ele(self, f'css:{css_paths[0]}', single, timeout)
else:
return [make_driver_ele(self, f'css:{css}', True, timeout) for css in css_paths]
def run_script(self, script, *args):
"""执行js代码传入自己为第一个参数
:param script: js文本
:param args: 传入的参数
:return: js执行结果
"""
shadow_root = WebElement(self.page.driver, self.inner_ele._id)
return shadow_root.parent.execute_script(script, shadow_root, *args)
def is_enabled(self):
"""是否可用"""
return self.inner_ele.is_enabled()
def is_valid(self):
"""用于判断元素是否还能用,应对页面跳转元素不能用的情况"""
try:
self.is_enabled()
return True
except Exception:
return False

View File

@ -0,0 +1,84 @@
# -*- coding:utf-8 -*-
"""
@Author : g1879
@Contact : g1879@qq.com
"""
from typing import Union, Any, Tuple, List
from selenium.webdriver.remote.webelement import WebElement
from .driver_page import DriverPage
from .mix_page import MixPage
from .base import BaseElement
from .driver_element import DriverElement
from .session_element import SessionElement
class ShadowRootElement(BaseElement):
def __init__(self, inner_ele: WebElement, parent_ele: DriverElement):
self._inner_ele: WebElement = ...
self.parent_ele: DriverElement = ...
self.page: Union[MixPage, DriverPage] = ...
@property
def inner_ele(self) -> WebElement: ...
def __repr__(self) -> str: ...
def __call__(self,
loc_or_str: Union[Tuple[str, str], str],
timeout: float = None) -> Union[DriverElement, str, None]: ...
@property
def tag(self) -> str: ...
@property
def html(self) -> str: ...
@property
def inner_html(self) -> str: ...
def parent(self, level_or_loc: Union[str, int] = 1) -> DriverElement: ...
def next(self,
index: int = 1,
filter_loc: Union[tuple, str] = '') -> Union[DriverElement, str, None]: ...
def before(self,
index: int = 1,
filter_loc: Union[tuple, str] = '') -> Union[DriverElement, str, None]: ...
def after(self,
index: int = 1,
filter_loc: Union[tuple, str] = '') -> Union[DriverElement, str, None]: ...
def nexts(self, filter_loc: Union[tuple, str] = '') -> List[Union[DriverElement, str]]: ...
def befores(self, filter_loc: Union[tuple, str] = '') -> List[Union[DriverElement, str]]: ...
def afters(self, filter_loc: Union[tuple, str] = '') -> List[Union[DriverElement, str]]: ...
def ele(self,
loc_or_str: Union[Tuple[str, str], str],
timeout: float = None) -> Union[DriverElement, str, None]: ...
def eles(self,
loc_or_str: Union[Tuple[str, str], str],
timeout: float = None) -> List[Union[DriverElement, str]]: ...
def s_ele(self, loc_or_str: Union[Tuple[str, str], str] = None) -> Union[SessionElement, str, None]: ...
def s_eles(self, loc_or_str: Union[Tuple[str, str], str]) -> List[Union[SessionElement, str]]: ...
def _ele(self,
loc_or_str: Union[Tuple[str, str], str],
timeout: float = ...,
single: bool = ...,
relative: bool = ...) -> Union[DriverElement, str, None, List[Union[DriverElement, str]]]: ...
def run_script(self, script: str, *args) -> Any: ...
def is_enabled(self) -> bool: ...
def is_valid(self) -> bool: ...

View File

@ -518,7 +518,7 @@ def set_charset(response):
content_type = response.headers.get('content-type', '').lower()
if not content_type.endswith(';'):
content_type += ';'
charset = search(r'charset[=: ]*(.*)?;', content_type)
charset = search(r'charset[=: ]*(.*)?;?', content_type)
if charset:
response.encoding = charset.group(1)

View File

@ -6,7 +6,7 @@ with open("README.md", "r", encoding='utf-8') as fh:
setup(
name="DrissionPage",
version="3.2.30",
version="3.2.31",
author="g1879",
author_email="g1879@qq.com",
description="Python based web automation tool. It can control the browser and send and receive data packets.",