基本完成

This commit is contained in:
g1879 2020-05-21 16:46:09 +08:00
parent 1daadf25d4
commit 1ad8589b14
10 changed files with 2136 additions and 803 deletions

137
DrissionPage/common.py Normal file
View File

@ -0,0 +1,137 @@
# -*- coding:utf-8 -*-
"""
@Author : g1879
@Contact : g1879@qq.com
@File : common.py
"""
from abc import abstractmethod
from pathlib import Path
from typing import Union
from requests_html import Element
from selenium.webdriver.remote.webelement import WebElement
class DrissionElement(object):
def __init__(self, ele):
self._inner_ele = ele
@property
def inner_ele(self) -> Union[WebElement, Element]:
return self._inner_ele
@property
def is_valid(self):
return True
@property
def text(self):
return
@property
def html(self):
return
@property
def tag(self):
return
@property
def parent(self):
return
@property
def next(self):
return
@property
def prev(self):
return
@abstractmethod
def ele(self, loc: tuple, mode: str = None, show_errmsg: bool = True):
pass
@abstractmethod
def eles(self, loc: tuple, show_errmsg: bool = True):
pass
@abstractmethod
def attr(self, attr: str):
pass
def get_loc_from_str(loc: str) -> tuple:
loc_item = loc.split(':', 1)
by = loc_item[0]
loc_by = 'xpath'
if by == 'tag' and len(loc_item) == 2:
loc_str = f'//{loc_item[1]}'
elif by.startswith('@') and len(loc_item) == 2:
loc_str = f'//*[{by}="{loc_item[1]}"]'
elif by.startswith('@') and len(loc_item) == 1:
loc_str = f'//*[{by}]'
elif by == 'text' and len(loc_item) == 2:
loc_str = _make_xpath_search_str(loc_item[1])
elif by == 'xpath' and len(loc_item) == 2:
loc_str = loc_item[1]
elif by == 'css' and len(loc_item) == 2:
loc_by = 'css selector'
loc_str = loc_item[1]
else:
loc_str = _make_xpath_search_str(by)
return loc_by, loc_str
def _make_xpath_search_str(search_str: str):
# 将"转义,不知何故不能直接用\"
parts = search_str.split('"')
parts_num = len(parts)
search_str = 'concat('
for key, i in enumerate(parts):
search_str += f'"{i}"'
search_str += ',' + '\'"\',' if key < parts_num - 1 else ''
search_str += ',"")'
return f"//*[contains(text(),{search_str})]"
def translate_loc_to_xpath(loc):
"""把By类型转为xpath或css selector"""
loc_by = 'xpath'
loc_str = None
if loc[0] == 'xpath':
loc_str = loc[1]
elif loc[0] == 'css selector':
loc_by = 'css selector'
loc_str = loc[1]
elif loc[0] == 'id':
loc_str = f'//*[@id="{loc[1]}"]'
elif loc[0] == 'class name':
loc_str = f'//*[@class="{loc[1]}"]'
elif loc[0] == 'link text':
loc_str = f'//a[text()="{loc[1]}"]'
elif loc[0] == 'name':
loc_str = f'//*[@name="{loc[1]}"]'
elif loc[0] == 'tag name':
loc_str = f'//{loc[1]}'
elif loc[0] == 'partial link text':
loc_str = f'//a[contains(text(),"{loc[1]}")]'
return loc_by, loc_str
def avoid_duplicate_name(folder_path: str, file_name: str) -> str:
"""检查文件是否重名,并返回可以使用的文件名
:param folder_path: 文件夹路径
:param file_name: 要检查的文件名
:return: 可用的文件名
"""
while (file_Path := Path(folder_path).joinpath(file_name)).exists():
ext_name = file_Path.suffix
base_name = file_Path.stem
num = base_name.split(' ')[-1]
if num[0] == '(' and num[-1] == ')' and num[1:-1].isdigit():
num = int(num[1:-1])
file_name = f'{base_name.replace(f"({num})", "", -1)}({num + 1}){ext_name}'
else:
file_name = f'{base_name} (1){ext_name}'
return file_name

View File

@ -1,63 +1,141 @@
# -*- coding:utf-8 -*-
"""
配置文件
@Author : g1879
@Contact : g1879@qq.com
@File : config.py
"""
from configparser import ConfigParser, NoSectionError, NoOptionError
from pathlib import Path
from typing import Any
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
class OptionsManager(object):
"""管理配置文件内容的类"""
def __init__(self, path: str = None):
"""初始化,读取配置文件,如没有设置临时文件夹,则设置并新建"""
self.path = path or Path(__file__).parent / 'configs.ini'
self._conf = ConfigParser()
self._conf.read(self.path, encoding='utf-8')
if 'global_tmp_path' not in self.get_option('paths') or not self.get_value('paths', 'global_tmp_path'):
global_tmp_path = f'{str(Path(__file__).parent)}\\tmp'
Path(global_tmp_path).mkdir(parents=True, exist_ok=True)
self.set_item('paths', 'global_tmp_path', global_tmp_path)
self.save()
global_driver_options = {
# ---------------已打开的浏览器---------------
'debuggerAddress': '127.0.0.1:9222',
# ---------------chromedriver路径---------------
'chromedriver_path': r'D:\python\Google Chrome\Chrome\chromedriver.exe',
# ---------------手动指定使用的浏览器位置---------------
# 'binary_location': r'C:\Program Files (x86)\Google\Chrome\Application\chrome.exe',
# ---------------启动参数---------------
'arguments': [
# '--headless', # 隐藏浏览器窗口
'--mute-audio', # 静音
'--no-sandbox',
# '--blink-settings=imagesEnabled=false', # 不加载图片
# r'--user-data-dir="E:\tmp\chrome_tmp"', # 指定用户文件夹路径
# '-disk-cache-dir=""', # 指定缓存路径
'zh_CN.UTF-8', # 编码格式
# "--proxy-server=http://127.0.0.1:8888", # 设置代理
# '--hide-scrollbars', # 隐藏滚动条
# '--start-maximized', # 浏览器窗口最大化
# "--disable-javascript", # 禁用JavaScript
# 模拟移动设备
# 'user-agent="MQQBrowser/26 Mozilla/5.0 (Linux; U; Android 2.3.7; zh-cn; MB200 Build/GRJ22; CyanogenMod-7) AppleWebKit/533.1 (KHTML, like Gecko) Version/4.0 Mobile Safari/533.1"',
'--disable-gpu' # 谷歌文档提到需要加上这个属性来规避bug
],
# ---------------扩展文件---------------
'extension_files': [],
# 'extensions': [],
# ---------------实验性质的设置参数---------------
'experimental_options': {
'prefs': {
# 设置下载路径
'download.default_directory': global_tmp_path,
# 下载不弹出窗口
'profile.default_content_settings.popups': 0,
# 无弹窗
'profile.default_content_setting_values': {'notifications': 2},
# 禁用PDF插件
'plugins.plugins_list': [{"enabled": False, "name": "Chrome PDF Viewer"}],
# 设置为开发者模式,防反爬虫
'excludeSwitches': ["ignore-certificate-errors", "enable-automation"]
}
def get_value(self, section: str, item: str) -> Any:
"""获取配置的值"""
try:
return eval(self._conf.get(section, item))
except SyntaxError:
return self._conf.get(section, item)
except NoSectionError and NoOptionError:
return None
}
}
def get_option(self, section: str) -> dict:
"""把section内容以字典方式返回"""
items = self._conf.items(section)
option = dict()
for j in items:
try:
option[j[0]] = eval(self._conf.get(section, j[0]).replace('\\', '\\\\'))
except SyntaxError:
option[j[0]] = self._conf.get(section, j[0])
return option
global_session_options = {
'headers': {
"User-Agent": 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_6) AppleWebKit/603.3.8 (KHTML, like Gecko)'
' Version/10.1.2 Safari/603.3.8',
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "zh-cn", "Connection": "keep-alive",
"Accept-Charset": "GB2312,utf-8;q=0.7,*;q=0.7"}
}
def set_item(self, section: str, item: str, value: str):
"""设置配置值"""
self._conf.set(section, item, str(value))
def save(self):
"""保存配置文件"""
self._conf.write(open(self.path, 'w'))
class DriverOptions(Options):
def __init__(self, read_file=True):
"""初始化,默认从文件读取设置"""
super().__init__()
if read_file:
options_dict = OptionsManager().get_option('chrome_options')
self._binary_location = options_dict['binary_location'] if 'binary_location' in options_dict else ''
self._arguments = options_dict['arguments'] if 'arguments' in options_dict else []
self._extensions = options_dict['extensions'] if 'extensions' in options_dict else []
self._experimental_options = options_dict[
'experimental_options'] if 'experimental_options' in options_dict else {}
self._debugger_address = options_dict['debugger_address'] if 'debugger_address' in options_dict else None
def save(self):
"""保存设置到文件"""
om = OptionsManager()
options = _chrome_options_to_dict(self)
for i in options:
om.set_item('chrome_options', i, options[i])
om.save()
def remove_argument(self, value: str):
"""移除一个设置"""
if value in self._arguments:
self._arguments.remove(value)
def remove_experimental_option(self, key: str):
"""移除一个实验设置传入key值删除"""
if key in self._experimental_options:
self._experimental_options.pop(key)
def remove_all_extensions(self):
"""移除所有插件
因插件是以整个文件储存难以移除其中一个故如须设置则全部移除再重设"""
self._extensions = []
def _dict_to_chrome_options(options: dict) -> Options:
"""从传入的字典获取浏览器设置返回ChromeOptions对象"""
chrome_options = webdriver.ChromeOptions()
if 'debugger_address' in options and options['debugger_address']:
# 控制已打开的浏览器
chrome_options.debugger_address = options['debugger_address']
else:
if 'binary_location' in options and options['binary_location']:
# 手动指定使用的浏览器位置
chrome_options.binary_location = options['binary_location']
if 'arguments' in options:
# 启动参数
if not isinstance(options['arguments'], list):
raise Exception(f'Arguments need listnot {type(options["arguments"])}.')
for arg in options['arguments']:
chrome_options.add_argument(arg)
if 'extension_files' in options and options['extension_files']:
# 加载插件
if not isinstance(options['extension_files'], list):
raise Exception(f'Extension files need listnot {type(options["extension_files"])}.')
for arg in options['extension_files']:
chrome_options.add_extension(arg)
if 'extensions' in options and options['extensions']:
if not isinstance(options['extensions'], list):
raise Exception(f'Extensions need listnot {type(options["extensions"])}.')
for arg in options['extensions']:
chrome_options.add_encoded_extension(arg)
if 'experimental_options' in options and options['experimental_options']:
# 实验性质的设置参数
if not isinstance(options['experimental_options'], dict):
raise Exception(f'Experimental options need dictnot {type(options["experimental_options"])}.')
for i in options['experimental_options']:
chrome_options.add_experimental_option(i, options['experimental_options'][i])
# if 'capabilities' in options and options['capabilities']:
# pass # 未知怎么用
return chrome_options
def _chrome_options_to_dict(options: Options) -> dict:
re_dict = dict()
re_dict['binary_location'] = options.binary_location
re_dict['debugger_address'] = options.debugger_address
re_dict['arguments'] = options.arguments
re_dict['extensions'] = options.extensions
re_dict['experimental_options'] = options.experimental_options
# re_dict['capabilities'] = options.capabilities
return re_dict

65
DrissionPage/configs.ini Normal file
View File

@ -0,0 +1,65 @@
[paths]
;chromedriver_path = D:\python\Google Chrome\Chrome\chromedriver81.exe
chromedriver_path = D:\python\Google Chrome\Chrome\chromedriver.exe
global_tmp_path = D:\python\projects\fsjy\upload_news\DrissionPage\tmp
[chrome_options]
debugger_address =
;127.0.0.1:9222
;binary_location = C:\Program Files (x86)\Google\Chrome\Application\chrome.exe
binary_location = D:\python\Google Chrome\Chrome\chrome.exe
arguments = [
; 隐藏浏览器窗口
'--headless',
; 静音
'--mute-audio',
; 不使用沙盒
'--no-sandbox',
; 不加载图片
; '--blink-settings=imagesEnabled=false',
; 指定用户文件夹路径
; r'--user-data-dir="E:\tmp\chrome_tmp"',
; 指定缓存路径
; '-disk-cache-dir=""',
; 编码格式
'zh_CN.UTF-8',
; 设置代理
; "--proxy-server=http://127.0.0.1:1081",
; 隐藏滚动条
; '--hide-scrollbars',
; 浏览器窗口最大化
; '--start-maximized',
; 禁用JavaScript
; "--disable-javascript",
; 模拟移动设备
; 'user-agent="MQQBrowser/26 Mozilla/5.0 (Linux; U; Android 2.3.7; zh-cn; MB200 Build/GRJ22; CyanogenMod-7) AppleWebKit/533.1 (KHTML, like Gecko) Version/4.0 Mobile Safari/533.1"',
; 谷歌文档提到需要加上这个属性来规避bug
'--disable-gpu'
]
extensions = []
experimental_options = {
'prefs': {
; 设置下载路径
'download.default_directory': r'D:\python\projects\fsjy\upload_news\DrissionPage\tmp',
; 下载不弹出窗口
'profile.default_content_settings.popups': 0,
; 无弹窗
'profile.default_content_setting_values': {'notifications': 2},
; 禁用PDF插件
'plugins.plugins_list': [{"enabled": False, "name": "Chrome PDF Viewer"}],
; 设置为开发者模式,防反爬虫(无用)
'excludeSwitches': ["ignore-certificate-errors", "enable-automation"],
'useAutomationExtension': False
}
}
[session_options]
headers = {
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_6) AppleWebKit/603.3.8 (KHTML, like Gecko) Version/10.1.2 Safari/603.3.8",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "zh-cn",
"Connection": "keep-alive",
"Accept-Charset": "GB2312,utf-8;q=0.7,*;q=0.7"
}
;proxies = { "http": "127.0.0.1:8888", "https": "http://127.0.0.1:8888" }

View File

@ -4,131 +4,146 @@
@Contact : g1879@qq.com
@File : drission.py
"""
from typing import Union
from urllib.parse import urlparse
import tldextract
from requests import Session
from requests_html import HTMLSession
from selenium import webdriver
from selenium.common.exceptions import WebDriverException
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.chrome.webdriver import WebDriver
from .config import global_driver_options, global_session_options
def _get_chrome_options(options: dict) -> Options:
""" 从传入的字典获取浏览器设置返回ChromeOptions对象"""
chrome_options = webdriver.ChromeOptions()
if 'debuggerAddress' in options:
# 控制已打开的浏览器
chrome_options.add_experimental_option('debuggerAddress', options['debuggerAddress'])
else:
if 'binary_location' in options and options['binary_location']:
# 手动指定使用的浏览器位置
chrome_options.binary_location = options['binary_location']
if 'arguments' in options:
# 启动参数
if isinstance(options['arguments'], list):
for arg in options['arguments']:
chrome_options.add_argument(arg)
else:
raise Exception(f'需要list而非{type(options["arguments"])}')
if 'extension_files' in options and options['extension_files']:
# 加载插件
if isinstance(options['extension_files'], list):
for arg in options['extension_files']:
chrome_options.add_extension(arg)
else:
raise Exception(f'需要list而非{type(options["extension_files"])}')
if 'experimental_options' in options:
# 实验性质的设置参数
if isinstance(options['experimental_options'], dict):
for i in options['experimental_options']:
chrome_options.add_experimental_option(i, options['experimental_options'][i])
else:
raise Exception(f'需要dict而非{type(options["experimental_options"])}')
return chrome_options
from .config import _dict_to_chrome_options, OptionsManager
class Drission(object):
""" Drission类整合了WebDriver对象和HTLSession对象
可按要求创建关闭及同步cookies
"""Drission类整合了WebDriver对象和HTLSession对象可按要求创建、关闭及同步cookies
"""
def __init__(self, driver_options: dict = None, session_options: dict = None):
self._driver = None
def __init__(self, driver_options: Union[dict, Options] = None, session_options: dict = None,
driver_path: str = None):
"""初始化配置信息但不生成session和driver实例
:param driver_options: chrome设置Options类或设置字典
:param session_options: session设置
:param driver_path: chromedriver路径如为空则为'chromedriver'
"""
self._session = None
self._driver_options = driver_options if driver_options else global_driver_options
self._session_options = session_options if session_options else global_session_options
self._driver = None
om = OptionsManager()
self._session_options = session_options or om.get_option('session_options')
self._driver_options = driver_options or om.get_option('chrome_options')
if driver_path:
self._driver_path = driver_path
elif 'chromedriver_path' in om.get_option('paths') and om.get_option('paths')['chromedriver_path']:
self._driver_path = om.get_option('paths')['chromedriver_path']
else:
self._driver_path = 'chromedriver'
@property
def session(self):
"""获取HTMLSession对象"""
if self._session is None:
self._session = HTMLSession()
attrs = ['headers', 'cookies', 'auth', 'proxies', 'hooks', 'params', 'verify',
'cert', 'adapters', 'stream', 'trust_env', 'max_redirects']
for i in attrs:
if i in self._session_options:
exec(f'self._session.{i} = self._session_options["{i}"]')
return self._session
@property
def driver(self):
"""获取WebDriver对象按传入配置信息初始化"""
if self._driver is None:
if 'chromedriver_path' in self._driver_options:
driver_path = self._driver_options['chromedriver_path']
if isinstance(self._driver_options, Options):
options = self._driver_options
if options.debugger_address:
# 因同时设置调试浏览器和其他配置会导致异常,故新建一个对象
debugger_address = options.debugger_address
options = webdriver.ChromeOptions()
options.debugger_address = debugger_address
elif isinstance(self._driver_options, dict):
options = _dict_to_chrome_options(self._driver_options)
else:
driver_path = 'chromedriver'
self._driver = webdriver.Chrome(driver_path, options=_get_chrome_options(self._driver_options))
raise KeyError('Driver options invalid')
self._driver = webdriver.Chrome(self._driver_path, options=options)
# 反爬设置,似乎没用
self._driver.execute_cdp_cmd("Page.addScriptToEvaluateOnNewDocument", {
"source": """
Object.defineProperty(navigator, 'webdriver', {
get: () => undefined
})
"""
})
return self._driver
@property
def session_options(self):
def session_options(self) -> dict:
return self._session_options
def cookies_to_session(self, copy_user_agent: bool = False) -> None:
""" 把driver的cookies复制到session"""
if copy_user_agent:
self.copy_user_agent_from_driver()
for cookie in self.driver.get_cookies():
self.session.cookies.set(cookie['name'], cookie['value'], domain=cookie['domain'])
@session_options.setter
def session_options(self, value: dict):
self._session_options = value
def cookies_to_driver(self, url: str):
def cookies_to_session(self, copy_user_agent: bool = False, driver: WebDriver = None, session: Session = None) \
-> None:
"""把driver的cookies复制到session"""
driver = driver or self.driver
session = session or self.session
if copy_user_agent:
self.user_agent_to_session(driver, session)
for cookie in driver.get_cookies():
session.cookies.set(cookie['name'], cookie['value'], domain=cookie['domain'])
def cookies_to_driver(self, url: str, driver: WebDriver = None, session: Session = None) -> None:
"""把session的cookies复制到driver"""
driver = driver or self.driver
session = session or self.session
domain = urlparse(url).netloc
if not domain:
raise Exception('Without specifying a domain')
# 翻译cookies
for i in [x for x in self.session.cookies if domain in x.domain]:
for i in [x for x in session.cookies if domain in x.domain]:
cookie_data = {'name': i.name, 'value': str(i.value), 'path': i.path, 'domain': i.domain}
if i.expires:
cookie_data['expiry'] = i.expires
self.ensure_add_cookie(cookie_data)
self._ensure_add_cookie(cookie_data, driver=driver)
def ensure_add_cookie(self, cookie, override_domain=None) -> None:
def _ensure_add_cookie(self, cookie, override_domain=None, driver=None) -> None:
"""添加cookie到driver"""
driver = driver or self.driver
if override_domain:
cookie['domain'] = override_domain
cookie_domain = cookie['domain'] if cookie['domain'][0] != '.' else cookie['domain'][1:]
try:
browser_domain = tldextract.extract(self.driver.current_url).fqdn
browser_domain = tldextract.extract(driver.current_url).fqdn
except AttributeError:
browser_domain = ''
if cookie_domain not in browser_domain:
self.driver.get(f'http://{cookie_domain.lstrip("http://")}')
driver.get(f'http://{cookie_domain.lstrip("http://")}')
self.driver.add_cookie(cookie)
driver.add_cookie(cookie)
# 如果添加失败,尝试更宽的域名
if not self.is_cookie_in_driver(cookie):
if not self._is_cookie_in_driver(cookie, driver):
cookie['domain'] = tldextract.extract(cookie['domain']).registered_domain
self.driver.add_cookie(cookie)
if not self.is_cookie_in_driver(cookie):
driver.add_cookie(cookie)
if not self._is_cookie_in_driver(cookie):
raise WebDriverException(f"Couldn't add the following cookie to the webdriver\n{cookie}\n")
def is_cookie_in_driver(self, cookie) -> bool:
def _is_cookie_in_driver(self, cookie, driver=None) -> bool:
"""检查cookie是否已经在driver里
只检查namevaluedomain检查domain时比较宽"""
for driver_cookie in self.driver.get_cookies():
driver = driver or self.driver
for driver_cookie in driver.get_cookies():
if (cookie['name'] == driver_cookie['name'] and
cookie['value'] == driver_cookie['value'] and
(cookie['domain'] == driver_cookie['domain'] or
@ -136,10 +151,12 @@ class Drission(object):
return True
return False
def copy_user_agent_from_driver(self) -> None:
def user_agent_to_session(self, driver: WebDriver = None, session: Session = None) -> None:
"""把driver的user-agent复制到session"""
selenium_user_agent = self.driver.execute_script("return navigator.userAgent;")
self.session.headers.update({"user-agent": selenium_user_agent})
driver = driver or self.driver
session = session or self.session
selenium_user_agent = driver.execute_script("return navigator.userAgent;")
session.headers.update({"User-Agent": selenium_user_agent})
def close_driver(self) -> None:
"""关闭driver和浏览器"""

View File

@ -0,0 +1,251 @@
# -*- coding:utf-8 -*-
"""
@Author : g1879
@Contact : g1879@qq.com
@File : driver_element.py
"""
from html import unescape
from pathlib import Path
from time import sleep
from typing import Union, List, Any
from selenium.webdriver.chrome.webdriver import WebDriver
from selenium.webdriver.remote.webelement import WebElement
from selenium.webdriver.support import expected_conditions as ec
from selenium.webdriver.support.select import Select
from selenium.webdriver.support.wait import WebDriverWait
from .common import DrissionElement, get_loc_from_str, translate_loc_to_xpath
from .config import OptionsManager
class DriverElement(DrissionElement):
'''driver模式的元素对象包装了一个WebElement对象并封装了常用功能'''
def __init__(self, ele: WebElement, timeout: float = 10):
super().__init__(ele)
self.timeout = timeout
def __repr__(self):
attrs = [f"{attr}='{self.attrs[attr]}'" for attr in self.attrs]
return f'<DriverElement {self.tag} {" ".join(attrs)}>'
@property
def attrs(self) -> dict:
"""返回元素所有属性及值"""
js = '''
var dom=arguments[0];
var names="{";
var len = dom.attributes.length;
for(var i=0;i<len;i++){
let it = dom.attributes[i];
let localName = it.localName;
let value = it.value;
names += "'" + localName + "':'" + value.replace(/'/g,"\\\\'") + "', ";
}
names+="}"
return names;
'''
return eval(self.run_script(js))
@property
def text(self) -> str:
"""元素内文本"""
return unescape(self.attr('innerText')).replace('\xa0', ' ')
@property
def html(self) -> str:
"""元素innerHTML"""
return unescape(self.attr('innerHTML')).replace('\xa0', ' ')
@property
def tag(self) -> str:
"""元素类型"""
return self._inner_ele.tag_name
@property
def parent(self):
"""父级元素"""
loc = 'xpath', './..'
return self.ele(loc, timeout=1, show_errmsg=False)
@property
def next(self):
"""下一个兄弟元素"""
loc = 'xpath', './following-sibling::*[1]'
return self.ele(loc, timeout=1, show_errmsg=False)
@property
def prev(self):
"""上一个兄弟元素"""
loc = 'xpath', './preceding-sibling::*[1]'
return self.ele(loc, timeout=1, show_errmsg=False)
def attr(self, attr: str) -> str:
"""获取属性值"""
if attr == 'text':
return self.text
else:
# return self.attrs[attr]
return self.inner_ele.get_attribute(attr)
def ele(self, loc_or_str: Union[tuple, str], mode: str = None, show_errmsg: bool = False, timeout: float = None):
"""根据loc获取元素或列表可用用字符串控制获取方式可选'id','class','name','tagName'
ele.find('id:ele_id')
"""
if isinstance(loc_or_str, str):
loc_or_str = get_loc_from_str(loc_or_str)
elif isinstance(loc_or_str, tuple) and len(loc_or_str) == 2:
loc_or_str = translate_loc_to_xpath(loc_or_str)
else:
raise ValueError('loc_or_str must be tuple or str.')
if loc_or_str[0] == 'xpath':
# 确保查询语句最前面是.
loc_str = f'.{loc_or_str[1]}' if not loc_or_str[1].startswith('.') else loc_or_str[1]
loc_or_str = loc_or_str[0], loc_str
timeout = timeout or self.timeout
return execute_driver_find(self.inner_ele, loc_or_str, mode, show_errmsg, timeout)
def eles(self, loc_or_str: Union[tuple, str], show_errmsg: bool = False, timeout: float = None):
"""根据loc获取子元素列表"""
return self.ele(loc_or_str, mode='all', show_errmsg=show_errmsg, timeout=timeout)
# -----------------以下为driver独占-------------------
def click(self, by_js=False) -> bool:
"""点击"""
if not by_js:
for _ in range(10):
try:
self.inner_ele.click()
return True
except Exception as e:
# print(e)
sleep(0.2)
# 若点击失败用js方式点击
# print('Click by JS.')
try:
self.run_script('arguments[0].click()')
return True
except:
raise
def input(self, value, clear: bool = True) -> bool:
"""输入文本"""
try:
if clear:
self.clear()
self.inner_ele.send_keys(value)
return True
except:
raise
def run_script(self, script: str) -> Any:
"""运行js"""
return self.inner_ele.parent.execute_script(script, self.inner_ele)
def submit(self) -> None:
"""提交表单"""
self.inner_ele.submit()
def clear(self) -> None:
"""清空元素"""
self.run_script("arguments[0].value=''")
# self.ele.clear()
def is_selected(self) -> bool:
"""是否选中"""
return self.inner_ele.is_selected()
def is_enabled(self) -> bool:
"""是否可用"""
return self.inner_ele.is_enabled()
def is_displayed(self) -> bool:
"""是否可见"""
return self.inner_ele.is_displayed()
def is_valid(self) -> bool:
"""用于判断元素是否还能用,应对页面跳转元素不能用的情况"""
try:
self.is_enabled()
return True
except:
return False
@property
def size(self) -> dict:
"""元素大小"""
return self.inner_ele.size
@property
def location(self) -> dict:
"""元素坐标"""
return self.inner_ele.location
def screenshot(self, path: str = None, filename: str = None) -> str:
"""元素截图"""
name = filename or self.tag
path = path or OptionsManager().get_value('paths', 'global_tmp_path')
if not path:
raise IOError('No path specified.')
Path(path).mkdir(parents=True, exist_ok=True)
# 等待元素加载完成
if self.tag == 'img':
js = 'return arguments[0].complete && typeof arguments[0].naturalWidth != "undefined" ' \
'&& arguments[0].naturalWidth > 0'
while not self.run_script(js):
pass
img_path = f'{path}\\{name}.png'
self.inner_ele.screenshot(img_path)
return img_path
def select(self, text: str) -> bool:
"""在下拉列表中选择"""
ele = Select(self.inner_ele)
try:
ele.select_by_visible_text(text)
return True
except:
return False
def set_attr(self, attr: str, value: str) -> bool:
"""设置元素属性"""
try:
self.run_script(f"arguments[0].{attr} = '{value}';")
return True
except:
raise
def execute_driver_find(page_or_ele: Union[WebElement, WebDriver], loc: tuple, mode: str = 'single',
show_errmsg: bool = False, timeout: float = 10) -> Union[DriverElement, List[DriverElement]]:
"""执行driver模式元素的查找
页面查找元素及元素查找下级元素皆使用此方法
:param page_or_ele: driver模式页面或元素
:param loc: 元素定位语句
:param mode: 'single''all'
:param show_errmsg: 是否显示错误信息
:param timeout: 查找元素超时时间
:return: 返回DriverElement元素或列表
"""
mode = mode or 'single'
if mode not in ['single', 'all']:
raise ValueError("mode must be 'single' or 'all'.")
msg = result = None
try:
wait = WebDriverWait(page_or_ele, timeout=timeout)
if mode == 'single':
msg = 'Element not found.'
result = DriverElement(wait.until(ec.presence_of_element_located(loc)))
elif mode == 'all':
msg = 'Elements not found.'
eles = wait.until(ec.presence_of_all_elements_located(loc))
result = [DriverElement(ele) for ele in eles]
return result
except:
if show_errmsg:
print(msg, loc)
raise
return [] if mode == 'all' else None

View File

@ -4,25 +4,26 @@
@Contact : g1879@qq.com
@File : driver_page.py
"""
from html import unescape
from time import sleep
from typing import Union
from glob import glob
from typing import Union, List, Any
from urllib import parse
from selenium.webdriver.chrome.webdriver import WebDriver
from selenium.webdriver.remote.webelement import WebElement
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.ui import Select
from selenium.webdriver.support.wait import WebDriverWait
from .common import get_loc_from_str
from .config import OptionsManager
from .driver_element import DriverElement, execute_driver_find
class DriverPage(object):
"""DriverPage封装了页面操作的常用功能使用selenium来获取、解析、操作网页"""
def __init__(self, driver: WebDriver, locs=None):
def __init__(self, driver: WebDriver, timeout: float = 10): # , locs=None
"""初始化函数接收一个WebDriver对象用来操作网页"""
self._driver = driver
self._locs = locs
self.timeout = timeout
# self._locs = locs
self._url = None
self._url_available = None
@ -38,11 +39,26 @@ class DriverPage(object):
else:
return self._driver.current_url
@property
def html(self) -> str:
"""获取元素innerHTML如未指定元素则获取页面源代码"""
return self.driver.find_element_by_xpath("//*").get_attribute("outerHTML")
@property
def url_available(self) -> bool:
"""url有效性"""
return self._url_available
@property
def cookies(self) -> list:
"""返回当前网站cookies"""
return self.driver.get_cookies()
@property
def title(self) -> str:
"""获取网页title"""
return self._driver.title
def get(self, url: str, params: dict = None, go_anyway: bool = False) -> Union[None, bool]:
"""跳转到url"""
to_url = f'{url}?{parse.urlencode(params)}' if params else url
@ -50,162 +66,43 @@ class DriverPage(object):
return
self._url = to_url
self.driver.get(to_url)
self._url_available = True if self.check_driver_url() else False
self._url_available = self.check_page()
return self._url_available
@property
def cookies(self) -> list:
"""返回当前网站cookies"""
return self.driver.get_cookies()
def get_title(self) -> str:
"""获取网页title"""
return self._driver.title
def _get_ele(self, loc_or_ele: Union[WebElement, tuple]) -> WebElement:
"""接收loc或元素实例返回元素实例"""
# ========================================
# ** 必须与SessionPage类中同名函数保持一致 **
# ========================================
if isinstance(loc_or_ele, tuple):
return self.find(loc_or_ele)
return loc_or_ele
def find(self, loc: tuple, mode: str = None, timeout: float = 10, show_errmsg: bool = True) \
-> Union[WebElement, list]:
"""查找一个元素
:param loc: 页面元素地址
def ele(self, loc_or_ele: Union[tuple, str, DriverElement], mode: str = None,
timeout: float = None, show_errmsg: bool = False) -> Union[DriverElement, List[DriverElement], None]:
"""根据loc获取元素或列表可用用字符串控制获取方式可选'id','class','name','tagName'
ele.find('id:ele_id')
:param loc_or_ele: 页面元素地址
:param mode: 以某种方式查找元素可选'single' , 'all', 'visible'
:param timeout: 是否显示错误信息
:param show_errmsg: 是否显示错误信息
:return: 页面元素对象或列表
"""
mode = mode if mode else 'single'
if mode not in ['single', 'all', 'visible']:
raise ValueError("mode须在'single', 'all', 'visible'中选择")
msg = ele = None
try:
wait = WebDriverWait(self.driver, timeout=timeout)
if mode == 'single':
msg = '未找到元素'
ele = wait.until(EC.presence_of_element_located(loc))
elif mode == 'all':
msg = '未找到元素s'
ele = wait.until(EC.presence_of_all_elements_located(loc))
elif mode == 'visible':
msg = '元素不可见或不存在'
ele = wait.until(EC.visibility_of_element_located(loc))
return ele
except:
if show_errmsg:
print(msg, loc)
if isinstance(loc_or_ele, DriverElement):
return loc_or_ele
elif isinstance(loc_or_ele, str):
loc_or_ele = get_loc_from_str(loc_or_ele)
def find_all(self, loc: tuple, timeout: float = 10, show_errmsg=True) -> list:
timeout = timeout or self.timeout
return execute_driver_find(self.driver, loc_or_ele, mode, show_errmsg, timeout)
def eles(self, loc: Union[tuple, str], timeout: float = None, show_errmsg=False) -> List[DriverElement]:
"""查找符合条件的所有元素"""
return self.find(loc, mode='all', timeout=timeout, show_errmsg=show_errmsg)
def search(self, value: str, mode: str = None, timeout: float = 10) -> Union[WebElement, list, None]:
"""根据内容搜索元素
:param value: 搜索内容
:param mode: 可选'single','all'
:param timeout: 超时时间
:return: 页面元素对象
"""
mode = mode if mode else 'single'
if mode not in ['single', 'all']:
raise ValueError("mode须在'single', 'all'中选择")
ele = []
try:
loc = 'xpath', f'//*[contains(text(),"{value}")]'
wait = WebDriverWait(self.driver, timeout=timeout)
if mode == 'single':
ele = wait.until(EC.presence_of_element_located(loc))
elif mode == 'all':
ele = wait.until(EC.presence_of_all_elements_located(loc))
return ele
except:
if mode == 'single':
return None
elif mode == 'all':
return []
def search_all(self, value: str, timeout: float = 10) -> list:
"""根据内容搜索元素"""
return self.search(value, mode='all', timeout=timeout)
def get_attr(self, loc_or_ele: Union[WebElement, tuple], attr: str) -> str:
"""获取元素属性"""
ele = self._get_ele(loc_or_ele)
try:
return ele.get_attribute(attr)
except:
return ''
def get_html(self, loc_or_ele: Union[WebElement, tuple] = None) -> str:
"""获取元素innerHTML如未指定元素则获取页面源代码"""
if not loc_or_ele:
return self.driver.find_element_by_xpath("//*").get_attribute("outerHTML")
return unescape(self.get_attr(loc_or_ele, 'innerHTML')).replace('\xa0', ' ')
def get_text(self, loc_or_ele: Union[WebElement, tuple]) -> str:
"""获取innerText"""
return unescape(self.get_attr(loc_or_ele, 'innerText')).replace('\xa0', ' ')
return self.ele(loc, mode='all', timeout=timeout, show_errmsg=show_errmsg)
# ----------------以下为独有函数-----------------------
def find_visible(self, loc: tuple, timeout: float = 10, show_errmsg: bool = True) -> WebElement:
"""查找一个可见元素"""
return self.find(loc, mode='visible', timeout=timeout, show_errmsg=show_errmsg)
def check_page(self) -> Union[bool, None]:
"""检查页面是否符合预期
由子类自行实现各页面的判定规则"""
return None
def check_driver_url(self) -> bool:
"""由子类自行实现各页面的判定规则"""
return True
def input(self, loc_or_ele: Union[WebElement, tuple], value: str, clear: bool = True) -> bool:
"""向文本框填入文本"""
ele = self._get_ele(loc_or_ele)
try:
if clear:
self.run_script(ele, "arguments[0].value=''")
ele.send_keys(value)
return True
except:
raise
def click(self, loc_or_ele: Union[WebElement, tuple]) -> bool:
"""点击一个元素"""
ele = self._get_ele(loc_or_ele)
if not ele:
raise
for _ in range(10):
try:
ele.click()
return True
except Exception as e:
print(e)
sleep(0.2)
# 点击失败代表被遮挡用js方式点击
print(f'用js点击{loc_or_ele}')
try:
self.run_script(ele, 'arguments[0].click()')
return True
except:
raise
def set_attr(self, loc_or_ele: Union[WebElement, tuple], attribute: str, value: str) -> bool:
"""设置元素属性"""
ele = self._get_ele(loc_or_ele)
try:
self.driver.execute_script(f"arguments[0].{attribute} = '{value}';", ele)
return True
except:
raise
def run_script(self, loc_or_ele: Union[WebElement, tuple], script: str) -> bool:
def run_script(self, script: str) -> Any:
"""执行js脚本"""
ele = self._get_ele(loc_or_ele)
ele = self.ele(('css selector', 'html'))
try:
return self.driver.execute_script(script, ele)
return ele.run_script(script)
except:
raise
@ -228,10 +125,10 @@ class DriverPage(object):
"""关闭当前标签页"""
self.driver.close()
def close_other_tabs(self, tab_index: int = None) -> None:
"""关闭其它标签页,没有传入序号代表保留当前页"""
def close_other_tabs(self, index: int = None) -> None:
"""传入序号,关闭序号以外标签页,没有传入序号代表保留当前页"""
tabs = self.driver.window_handles # 获得所有标签页权柄
page_handle = tabs[tab_index] if tab_index >= 0 else self.driver.current_window_handle
page_handle = tabs[index] if index >= 0 else self.driver.current_window_handle
for i in tabs: # 遍历所有标签页,关闭非保留的
if i != page_handle:
self.driver.switch_to.window(i)
@ -244,39 +141,55 @@ class DriverPage(object):
self.driver.switch_to.default_content()
return True
else:
ele = self._get_ele(loc_or_ele)
ele = self.ele(loc_or_ele)
try:
self.driver.switch_to.frame(ele)
self.driver.switch_to.frame(ele.inner_ele)
return True
except:
raise
def get_screen(self, loc_or_ele: Union[WebElement, tuple], path: str, file_name: str = None) -> str:
"""获取元素截图"""
ele = self._get_ele(loc_or_ele)
name = file_name if file_name else ele.tag_name
# 等待元素加载完成
js = 'return arguments[0].complete && typeof arguments[0].naturalWidth ' \
'!= "undefined" && arguments[0].naturalWidth > 0'
while not self.run_script(ele, js):
pass
def screenshot(self, path: str = None, filename: str = None) -> str:
"""获取网页截图"""
ele = self.ele(('css selector', 'html'))
path = path or OptionsManager().get_value('paths', 'global_tmp_path')
if not path:
raise IOError('No path specified.')
name = filename or self.title
img_path = f'{path}\\{name}.png'
ele.screenshot(img_path)
ele.screenshot(path, name)
return img_path
def scroll_to_see(self, loc_or_ele: Union[WebElement, tuple]) -> None:
"""滚动直到元素可见"""
ele = self._get_ele(loc_or_ele)
self.run_script(ele, "arguments[0].scrollIntoView();")
ele = self.ele(loc_or_ele)
ele.run_script("arguments[0].scrollIntoView();")
def choose_select_list(self, loc_or_ele: Union[WebElement, tuple], text: str) -> bool:
"""选择下拉列表"""
ele = Select(self._get_ele(loc_or_ele))
try:
ele.select_by_visible_text(text)
return True
except:
return False
def scroll_to(self, mode: str = 'bottom', pixel: int = 300) -> None:
"""滚动页面,按照参数决定如何滚动
:param mode: 滚动的方向topbottomrightmostleftmostupdownleftright
:param pixel: 滚动的像素
:return: None
"""
if mode == 'top':
self.driver.execute_script("window.scrollTo(document.documentElement.scrollLeft,0);")
elif mode == 'bottom':
self.driver.execute_script(
"window.scrollTo(document.documentElement.scrollLeft,document.body.scrollHeight);")
elif mode == 'rightmost':
self.driver.execute_script("window.scrollTo(document.body.scrollWidth,document.documentElement.scrollTop);")
elif mode == 'leftmost':
self.driver.execute_script("window.scrollTo(0,document.documentElement.scrollTop);")
elif mode == 'up':
self.driver.execute_script(f"window.scrollBy(0,-{pixel});")
elif mode == 'down':
self.driver.execute_script(f"window.scrollBy(0,{pixel});")
elif mode == 'left':
self.driver.execute_script(f"window.scrollBy(-{pixel},0);")
elif mode == 'right':
self.driver.execute_script(f"window.scrollBy({pixel},0);")
else:
raise KeyError(
"mode must be selected among 'top','bottom','rightmost','leftmost','up','down','left','right'.")
def refresh(self) -> None:
"""刷新页面"""
@ -291,11 +204,19 @@ class DriverPage(object):
if not x and not y:
self.driver.maximize_window()
else:
new_x = x if x else self.driver.get_window_size()['width']
new_y = y if y else self.driver.get_window_size()['height']
if x <= 0 or y <= 0:
raise KeyError('x and y must greater than 0.')
new_x = x or self.driver.get_window_size()['width']
new_y = y or self.driver.get_window_size()['height']
self.driver.set_window_size(new_x, new_y)
def close_driver(self) -> None:
"""关闭driver及浏览器"""
self._driver.quit()
self._driver = None
def is_downloading(self, download_path: str = None) -> bool:
if download_path:
p = download_path
else:
try:
p = OptionsManager().get_value('chrome_options', 'experimental_options')['prefs'][
'download.default_directory']
except IOError('No download path found.'):
raise
return not glob(f'{p}\\*.crdownload')

View File

@ -1,255 +0,0 @@
# -*- coding:utf-8 -*-
"""
@Author : g1879
@Contact : g1879@qq.com
@File : mix_page.py
"""
import re
from html import unescape
from time import sleep
from typing import Union
from requests_html import Element
from selenium.webdriver.remote.webelement import WebElement
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.select import Select
from selenium.webdriver.support.wait import WebDriverWait
from .config import global_tmp_path
from .session_page import _translate_loc
class MixElement(object):
def __init__(self, ele: Union[WebElement, Element]):
self._ele = ele
@property
def ele(self) -> Union[WebElement, Element]:
"""返回元素对象"""
return self._ele
@property
def text(self) -> str:
"""元素内文本"""
if isinstance(self._ele, Element):
return unescape(self._ele.text).replace('\xa0', ' ')
else:
return unescape(self.attr('innerText')).replace('\xa0', ' ')
@property
def html(self) -> str:
"""元素innerHTML"""
if isinstance(self._ele, Element):
html = unescape(self._ele.html).replace('\xa0', ' ')
r = re.match(r'<.*?>(.*)</.*?>', html, flags=re.DOTALL)
return r.group(1)
else:
return unescape(self.attr('innerHTML')).replace('\xa0', ' ')
@property
def tag_name(self) -> str:
"""获取标签名"""
if isinstance(self._ele, Element):
html = unescape(self._ele.html).replace('\xa0', ' ')
r = re.match(r'^<(.*?)\s+', html, flags=re.DOTALL)
return r.group(1)
else:
return self._ele.tag_name
def attr(self, attr) -> str:
"""获取属性值"""
if isinstance(self._ele, Element):
try:
if attr == 'href':
# 如直接获取attr只能获取相对地址
for link in self._ele.absolute_links:
return link
elif attr == 'class':
class_str = ''
for key, i in enumerate(self._ele.attrs['class']):
class_str += ' ' if key > 0 else ''
class_str += i
return class_str
else:
return self._ele.attrs[attr]
except:
return ''
else:
return self._ele.get_attribute(attr)
def find(self, loc: tuple, mode: str = None, show_errmsg: bool = True) -> Union[WebElement, Element, list, None]:
"""根据loc获取元素"""
if isinstance(self._ele, Element):
mode = mode if mode else 'single'
if mode not in ['single', 'all']:
raise ValueError("mode须在'single', 'all'中选择")
loc_by, loc_str = _translate_loc(loc)
msg = ele = None
try:
if mode == 'single':
msg = '未找到元素'
if loc_by == 'xpath':
ele = MixElement(self.ele.xpath(loc_str, first=True, _encoding='utf-8'))
else:
ele = MixElement(self.ele.find(loc_str, first=True, _encoding='utf-8'))
elif mode == 'all':
msg = '未找到元素s'
if loc_by == 'xpath':
ele = self.ele.xpath(loc_str, first=False, _encoding='utf-8')
else:
ele = self.ele.find(loc_str, first=False, _encoding='utf-8')
return ele
except:
if show_errmsg:
print(msg, loc)
raise
else: # d模式
mode = mode if mode else 'single'
if mode not in ['single', 'all', 'visible']:
raise ValueError("mode须在'single', 'all', 'visible'中选择")
msg = ele = None
try:
wait = WebDriverWait(self.ele.parent, timeout=10)
if mode == 'single':
msg = '未找到元素'
ele = wait.until(EC.presence_of_element_located(loc))
elif mode == 'all':
msg = '未找到元素s'
ele = MixElement(wait.until(EC.presence_of_all_elements_located(loc)))
elif mode == 'visible':
msg = '元素不可见或不存在'
ele = wait.until(EC.visibility_of_element_located(loc))
return ele
except:
if show_errmsg:
print(msg, loc)
raise
def find_all(self, loc: tuple, show_errmsg: bool = True) -> list:
"""根据loc获取子元素列表"""
return self.find(loc, mode='all', show_errmsg=show_errmsg)
def search(self, value: str, mode: str = None):
"""根据内容获取元素"""
mode = mode if mode else 'single'
if mode not in ['single', 'all']:
raise ValueError("mode须在'single', 'all'中选择")
if isinstance(self._ele, Element):
try:
if mode == 'single':
ele = self.ele.xpath(f'.//*[contains(text(),"{value}")]', first=True)
return MixElement(ele)
elif mode == 'all':
eles = self.ele.xpath(f'.//*[contains(text(),"{value}")]')
return [MixElement(ele) for ele in eles]
except:
return None
else: # d模式
try:
loc = 'xpath', f'.//*[contains(text(),"{value}")]'
wait = WebDriverWait(self.ele.parent, timeout=10)
if mode == 'single':
ele = wait.until(EC.presence_of_element_located(loc))
return MixElement(ele)
elif mode == 'all':
eles = wait.until(EC.presence_of_all_elements_located(loc))
return [MixElement(ele) for ele in eles]
except:
return None
def search_all(self, value: str) -> list:
"""根据内容获取元素列表"""
return self.search(value, mode='all')
# -----------------以下为d模式独占-------------------
def click(self) -> bool:
"""点击"""
for _ in range(10):
try:
self.ele.click()
return True
except Exception as e:
print(e)
sleep(0.2)
# 若点击失败用js方式点击
print('用js点击')
try:
self.run_script('arguments[0].click()')
return True
except:
raise
def input(self, value, clear: bool = True) -> bool:
"""输入文本"""
try:
if clear:
self.run_script("arguments[0].value=''")
self.ele.send_keys(value)
return True
except:
raise
def run_script(self, script: str):
"""运行js"""
self.ele.parent.execute_script(script, self.ele)
def submit(self):
"""提交表单"""
self.ele.submit()
def clear(self):
"""清空元素"""
self.ele.clear()
def is_selected(self) -> bool:
"""是否选中"""
return self.ele.is_selected()
def is_enabled(self) -> bool:
"""是否可用"""
return self.ele.is_enabled()
def is_displayed(self) -> bool:
"""是否可见"""
return self.ele.is_displayed()
@property
def size(self):
"""元素大小"""
return self.ele.size
@property
def location(self):
"""元素坐标"""
return self.ele.location
def screenshot(self, path: str = None, filename: str = None) -> str:
"""元素截图"""
path = path if path else global_tmp_path
name = filename if filename else self.tag_name
# 等待元素加载完成
if self.tag_name == 'img':
js = 'return arguments[0].complete && typeof arguments[0].naturalWidth ' \
'!= "undefined" && arguments[0].naturalWidth > 0'
while not self.run_script(js):
pass
img_path = f'{path}\\{name}.png'
self.ele.screenshot(img_path)
return img_path
def select(self, text: str):
"""选择下拉列表"""
ele = Select(self.ele)
try:
ele.select_by_visible_text(text)
return True
except:
return False
def set_attr(self, attr, value) -> bool:
"""设置元素属性"""
try:
self.run_script(f"arguments[0].{attr} = '{value}';")
return True
except:
raise

View File

@ -4,16 +4,17 @@
@Contact : g1879@qq.com
@File : mix_page.py
"""
from typing import Union
from typing import Union, List
from urllib import parse
from requests import Response
from requests_html import Element, HTMLSession
from requests_html import HTMLSession
from selenium.webdriver.chrome.webdriver import WebDriver
from selenium.webdriver.remote.webelement import WebElement
from .drission import Drission
from .driver_element import DriverElement
from .driver_page import DriverPage
from .session_element import SessionElement
from .session_page import SessionPage
@ -31,10 +32,9 @@ class MixPage(Null, SessionPage, DriverPage):
这些功能由DriverPage和SessionPage类实现
"""
def __init__(self, drission: Drission, locs=None, mode='d'):
def __init__(self, drission: Drission, mode='d', timeout: float = 10):
"""初始化函数
:param drission: 整合了driver和session的类
:param locs: 提供页面元素地址的类
:param mode: 默认使用selenium的d模式
"""
super().__init__()
@ -43,13 +43,15 @@ class MixPage(Null, SessionPage, DriverPage):
self._driver = None
self._url = None
self._response = None
self._locs = locs
self.timeout = timeout
self._url_available = None
self._mode = mode
if mode == 's':
self._session = self._drission.session
elif mode == 'd':
self._driver = self._drission.driver
else:
raise KeyError("mode must be 'd' or 's'.")
@property
def url(self) -> str:
@ -70,19 +72,29 @@ class MixPage(Null, SessionPage, DriverPage):
"""
return self._mode
def change_mode(self, mode: str = None) -> None:
def change_mode(self, mode: str = None, go: bool = True) -> None:
"""切换模式接收字符串s或d除此以外的字符串会切换为d模式
切换后调用相应的get函数使访问的页面同步
切换时会把当前模式的cookies复制到目标模式
切换后如果go是True调用相应的get函数使访问的页面同步
:param mode: 模式字符串
:param go: 是否跳转到原模式的url
"""
if mode == self._mode:
return
self._mode = 's' if self._mode == 'd' else 'd'
if self._mode == 'd': # s转d
self._url = super(SessionPage, self).url
if self.session_url:
self.cookies_to_driver(self.session_url)
if go:
self.get(self.session_url)
elif self._mode == 's': # d转s
self._url = self.session_url
if self._session is None:
self._session = self._drission.session
if self._driver:
self.cookies_to_session()
if go:
self.get(super(SessionPage, self).url)
@property
@ -109,7 +121,7 @@ class MixPage(Null, SessionPage, DriverPage):
"""
if self._session is None:
self._session = self._drission.session
self.change_mode('s')
# self.change_mode('s')
return self._session
@property
@ -126,109 +138,82 @@ class MixPage(Null, SessionPage, DriverPage):
elif self._mode == 'd':
return super(SessionPage, self).cookies
def check_driver_url(self) -> bool:
"""判断页面是否能访问,由子类依据不同的页面自行实现"""
return True
def cookies_to_session(self) -> None:
"""从driver复制cookies到session"""
self._drission.cookies_to_session()
def cookies_to_session(self, copy_user_agent: bool = False) -> None:
"""从driver复制cookies到session
:param copy_user_agent : 是否复制user agent信息
"""
self._drission.cookies_to_session(copy_user_agent)
def cookies_to_driver(self, url=None) -> None:
"""从session复制cookies到driverchrome需要指定域才能接收cookies"""
u = url if url else self.session_url
u = url or self.session_url
self._drission.cookies_to_driver(u)
# ----------------重写SessionPage的函数-----------------------
def post(self, url: str, params: dict = None, data: dict = None, go_anyway: bool = False, **kwargs) \
-> Union[bool, None]:
"""post前先转换模式但不跳转"""
self.change_mode('s', go=False)
return super().post(url, params, data, go_anyway, **kwargs)
# ----------------以下为共用函数-----------------------
def get(self, url: str, params: dict = None, go_anyway=False, **kwargs) -> Union[bool, Response, None]:
def get(self, url: str, params: dict = None, go_anyway=False, **kwargs) -> Union[bool, None]:
"""跳转到一个url跳转前先同步cookies跳转后判断目标url是否可用"""
to_url = f'{url}?{parse.urlencode(params)}' if params else url
if not url or (not go_anyway and self.url == to_url):
return
if self._mode == 'd':
if self.session_url:
self.cookies_to_driver(self.session_url)
super(SessionPage, self).get(url=to_url, go_anyway=go_anyway)
if self._session:
ua = {"User-Agent": "Mozilla/5.0 (Windows; U; Windows NT 5.1; zh-CN; rv:1.9.1.6) "}
return True if self._session.get(to_url, headers=ua).status_code == 200 else False
if self.session_url == self.url:
self._url_available = True if self._response and self._response.status_code == 200 else False
else:
return self.check_driver_url()
self._url_available = self.check_page()
return self._url_available
elif self._mode == 's':
if self._session is None:
self._session = self._drission.session
if self._driver:
self.cookies_to_session()
super().get(url=to_url, go_anyway=go_anyway, **self.drission.session_options)
super().get(url=to_url, go_anyway=go_anyway, **kwargs)
return self._url_available
def find(self, loc: tuple, mode=None, timeout: float = 10, show_errmsg: bool = True) -> Union[WebElement, Element]:
def ele(self, loc_or_ele: Union[tuple, str, DriverElement, SessionElement], mode: str = None, timeout: float = None,
show_errmsg: bool = False) -> Union[DriverElement, SessionElement]:
"""查找一个元素,根据模式调用对应的查找函数
:param loc: 页面元素地址
:param loc_or_ele: 页面元素地址
:param mode: 以某种方式查找元素可选'single','all','visible'(d模式独有)
:param timeout: 超时时间
:param show_errmsg: 是否显示错误信息
:return: 页面元素对象s模式下返回Elementd模式下返回WebElement
"""
if self._mode == 's':
return super().find(loc, mode=mode, show_errmsg=show_errmsg)
return super().ele(loc_or_ele, mode=mode, show_errmsg=show_errmsg)
elif self._mode == 'd':
return super(SessionPage, self).find(loc, mode=mode, timeout=timeout, show_errmsg=show_errmsg)
timeout = timeout or self.timeout
# return super(SessionPage, self).ele(loc_or_ele, mode=mode, timeout=timeout, show_errmsg=show_errmsg)
return DriverPage.ele(self, loc_or_ele, mode=mode, timeout=timeout, show_errmsg=show_errmsg)
def find_all(self, loc: tuple, timeout: float = 10, show_errmsg: bool = True) -> list:
def eles(self, loc_or_str: Union[tuple, str], timeout: float = None, show_errmsg: bool = False) -> List[
DriverElement]:
"""查找符合条件的所有元素"""
if self._mode == 's':
return super().find_all(loc, show_errmsg)
return super().eles(loc_or_str, show_errmsg)
elif self._mode == 'd':
return super(SessionPage, self).find_all(loc, timeout=timeout, show_errmsg=show_errmsg)
return super(SessionPage, self).eles(loc_or_str, timeout=timeout, show_errmsg=show_errmsg)
def search(self, value: str, mode: str = None, timeout: float = 10) -> Union[WebElement, Element, None]:
"""根据内容搜索元素
:param value: 搜索内容
:param mode: 可选'single','all'
:param timeout: 超时时间
:return: 页面元素对象s模式下返回Elementd模式下返回WebElement
"""
@property
def html(self) -> str:
"""获取页面HTML"""
if self._mode == 's':
return super().search(value, mode=mode)
return super().html
elif self._mode == 'd':
return super(SessionPage, self).search(value, mode=mode, timeout=timeout)
return super(SessionPage, self).html
def search_all(self, value: str, timeout: float = 10) -> list:
"""根据内容搜索元素"""
if self._mode == 's':
return super().search_all(value)
elif self._mode == 'd':
return super(SessionPage, self).search_all(value, timeout=timeout)
def get_attr(self, loc_or_ele: Union[WebElement, Element, tuple], attr: str) -> str:
"""获取元素属性值"""
if self._mode == 's':
return super().get_attr(loc_or_ele, attr)
elif self._mode == 'd':
return super(SessionPage, self).get_attr(loc_or_ele, attr)
def get_html(self, loc_or_ele: Union[WebElement, Element, tuple] = None) -> str:
"""获取元素innerHTML如未指定元素则获取页面源代码"""
if self._mode == 's':
return super().get_html(loc_or_ele)
elif self._mode == 'd':
return super(SessionPage, self).get_html(loc_or_ele)
def get_text(self, loc_or_ele) -> str:
"""获取元素innerText"""
if self._mode == 's':
return super().get_text(loc_or_ele)
elif self._mode == 'd':
return super(SessionPage, self).get_text(loc_or_ele)
def get_title(self) -> str:
@property
def title(self) -> str:
"""获取页面title"""
if self._mode == 's':
return super().get_title()
return super().title
elif self._mode == 'd':
return super(SessionPage, self).get_title()
return super(SessionPage, self).title
def close_driver(self) -> None:
"""关闭driver及浏览器切换到s模式"""

View File

@ -4,54 +4,27 @@
@Contact : g1879@qq.com
@File : session_page.py
"""
import re
from html import unescape
from typing import Union
import os
from pathlib import Path
from random import random
from time import time
from typing import Union, List
from urllib import parse
from requests_html import Element, HTMLSession, HTMLResponse
from requests_html import HTMLSession, HTMLResponse
from .config import global_session_options
def _translate_loc(loc):
"""把By类型转为xpath或css selector"""
loc_by = loc_str = None
if loc[0] == 'xpath':
loc_by = 'xpath'
loc_str = loc[1]
elif loc[0] == 'css selector':
loc_by = 'css selector'
loc_str = loc[1]
elif loc[0] == 'id':
loc_by = 'css selector'
loc_str = f'#{loc[1]}'
elif loc[0] == 'class name':
loc_by = 'xpath'
loc_str = f'//*[@class="{loc[1]}"]'
elif loc[0] == 'link text':
loc_by = 'xpath'
loc_str = f'//a[text()="{loc[1]}"]'
elif loc[0] == 'name':
loc_by = 'css selector'
loc_str = f'[name={loc[1]}]'
elif loc[0] == 'tag name':
loc_by = 'css selector'
loc_str = loc[1]
elif loc[0] == 'partial link text':
loc_by = 'xpath'
loc_str = f'//a[contains(text(),"{loc[1]}")]'
return loc_by, loc_str
from .common import get_loc_from_str, translate_loc_to_xpath, avoid_duplicate_name
from .config import OptionsManager
from .session_element import SessionElement, execute_session_find
class SessionPage(object):
"""SessionPage封装了页面操作的常用功能使用requests_html来获取、解析网页。
"""
"""SessionPage封装了页面操作的常用功能使用requests_html来获取、解析网页。"""
def __init__(self, session: HTMLSession, locs=None):
def __init__(self, session: HTMLSession):
"""初始化函数"""
self._session = session
self._locs = locs
# self._locs = locs
self._url = None
self._url_available = None
self._response = None
@ -79,130 +52,124 @@ class SessionPage(object):
"""当前session的cookies"""
return self.session.cookies.get_dict()
def get_title(self) -> str:
@property
def title(self) -> str:
"""获取网页title"""
return self.get_text(('css selector', 'title'))
return self.ele(('css selector', 'title')).text
def find(self, loc: tuple, mode: str = None, show_errmsg: bool = True) -> Union[Element, list]:
@property
def html(self) -> str:
"""获取元素innerHTML如未指定元素则获取所有源代码"""
return self.response.html.html
def ele(self, loc_or_ele: Union[tuple, str, SessionElement], mode: str = None, show_errmsg: bool = False) \
-> Union[SessionElement, List[SessionElement], None]:
"""查找一个元素
:param loc: 页面元素地址
:param loc_or_ele: 页面元素地址
:param mode: 以某种方式查找元素可选'single','all'
:param show_errmsg: 是否显示错误信息
:return: 页面元素对象或列表
"""
mode = mode if mode else 'single'
if mode not in ['single', 'all']:
raise ValueError("mode须在'single', 'all'中选择")
loc_by, loc_str = _translate_loc(loc)
msg = first = None
try:
if mode == 'single':
msg = '未找到元素'
first = True
elif mode == 'all':
msg = '未找到元素s'
first = False
if loc_by == 'xpath':
return self.response.html.xpath(loc_str, first=first, _encoding='utf-8')
else:
return self.response.html.find(loc_str, first=first, _encoding='utf-8')
except:
if show_errmsg:
print(msg, loc)
raise
def find_all(self, loc: tuple, show_errmsg: bool = True) -> list:
"""查找符合条件的所有元素"""
return self.find(loc, mode='all', show_errmsg=True)
def search(self, value: str, mode: str = None) -> Union[Element, list, None]:
"""根据内容搜索元素
:param value: 搜索内容
:param mode: 可选'single','all'
:return: 页面元素对象
"""
mode = mode if mode else 'single'
if mode not in ['single', 'all']:
raise ValueError("mode须在'single', 'all'中选择")
try:
if mode == 'single':
ele = self.response.html.xpath(f'.//*[contains(text(),"{value}")]', first=True)
return ele
elif mode == 'all':
eles = self.response.html.xpath(f'.//*[contains(text(),"{value}")]')
return eles
except:
return
def search_all(self, value: str) -> list:
"""根据内容搜索元素"""
return self.search(value, mode='all')
def _get_ele(self, loc_or_ele: Union[Element, tuple]) -> Element:
"""获取loc或元素实例返回元素实例"""
# ======================================
# ** 必须与DriverPage类中同名函数保持一致 **
# ======================================
if isinstance(loc_or_ele, tuple):
return self.find(loc_or_ele)
if isinstance(loc_or_ele, SessionElement):
return loc_or_ele
def get_attr(self, loc_or_ele: Union[Element, tuple], attr: str) -> str:
"""获取元素属性"""
ele = self._get_ele(loc_or_ele)
try:
if attr == 'href':
# 如直接获取attr只能获取相对地址
for link in ele.absolute_links:
return link
elif attr == 'class':
class_str = ''
for key, i in enumerate(ele.attrs['class']):
class_str += ' ' if key > 0 else ''
class_str += i
return class_str
elif isinstance(loc_or_ele, str):
loc = get_loc_from_str(loc_or_ele)
else:
return ele.attrs[attr]
except:
return ''
loc = translate_loc_to_xpath(loc_or_ele)
def get_html(self, loc_or_ele: Union[Element, tuple] = None) -> str:
"""获取元素innerHTML如未指定元素则获取所有源代码"""
if not loc_or_ele:
return self.response.html.html
ele = self._get_ele(loc_or_ele)
re_str = r'<.*?>(.*)</.*?>'
html = unescape(ele.html).replace('\xa0', ' ')
r = re.match(re_str, html, flags=re.DOTALL)
return r.group(1)
return execute_session_find(self.response.html, loc, mode, show_errmsg)
def get_text(self, loc_or_ele: Union[Element, tuple]) -> str:
"""获取innerText"""
ele = self._get_ele(loc_or_ele)
return unescape(ele.text).replace('\xa0', ' ')
def eles(self, loc: Union[tuple, str], show_errmsg: bool = False) -> List[SessionElement]:
"""查找符合条件的所有元素"""
return self.ele(loc, mode='all', show_errmsg=True)
def get(self, url: str, params: dict = None, go_anyway: bool = False, **kwargs) -> Union[bool, None]:
"""用get方式跳转到url调用_make_response()函数生成response对象"""
to_url = f'{url}?{parse.urlencode(params)}' if params else url
if not url or (not go_anyway and self.url == to_url):
return
self._response = self._make_response(to_url, **kwargs)[0]
self._url_available = self._response
self._url = url
self._response = self._make_response(to_url, **kwargs)
if self._response:
self._response.html.encoding = self._response.encoding # 修复requests_html丢失编码方式的bug
self._url_available = True if self._response and self._response.status_code == 200 else False
return self._url_available
# ------------以下为独占函数--------------
def post(self, url: str, params: dict = None, data: dict = None, go_anyway: bool = False, **kwargs) \
-> Union[bool, None]:
"""用post方式跳转到url调用_make_response()函数生成response对象"""
to_url = f'{url}?{parse.urlencode(params)}' if params else url
if not url or (not go_anyway and self._url == to_url):
return
self._response = self._make_response(to_url, mode='post', data=data, **kwargs)[0]
self._url_available = self._response
self._url = url
self._response = self._make_response(to_url, mode='post', data=data, **kwargs)
if self._response:
self._response.html.encoding = self._response.encoding # 修复requests_html丢失编码方式的bug
self._url_available = True if self._response and self._response.status_code == 200 else False
return self._url_available
def _make_response(self, url: str, mode: str = 'get', data: dict = None, **kwargs) -> tuple:
def download(self, file_url: str, goal_path: str = None, rename: str = None, **kwargs) -> tuple:
"""下载一个文件生成的response不写入self._response是临时的"""
goal_path = goal_path or OptionsManager().get_value('paths', 'global_tmp_path')
if not goal_path:
raise IOError('No path specified.')
kwargs['stream'] = True
if 'timeout' not in kwargs:
kwargs['timeout'] = 20
r = self._make_response(file_url, mode='get', **kwargs)
if not r:
print('Invalid link')
return False, 'Invalid link'
# -------------------获取文件名-------------------
# header里有文件名则使用它否则在url里截取但不能保证url包含文件名
if 'Content-disposition' in r.headers:
file_name = r.headers['Content-disposition'].split('"')[1].encode('ISO-8859-1').decode('utf-8')
elif os.path.basename(file_url):
file_name = os.path.basename(file_url).split("?")[0]
else:
file_name = f'untitled_{time()}_{random.randint(0, 100)}'
file_full_name = rename or file_name
# 避免和现有文件重名
file_full_name = avoid_duplicate_name(goal_path, file_full_name)
# 打印要下载的文件
print_txt = file_full_name if file_name == file_full_name else f'{file_name} -> {file_full_name}'
print(print_txt)
# -------------------开始下载-------------------
# 获取远程文件大小
file_size = int(r.headers['Content-Length']) if 'Content-Length' in r.headers else None
# 已下载文件大小和下载状态
downloaded_size, download_status = 0, False
# 完整的存放路径
full_path = Path(f'{goal_path}\\{file_full_name}')
try:
with open(str(full_path), 'wb') as tmpFile:
print(f'Downloading to: {goal_path}')
for chunk in r.iter_content(chunk_size=1024):
if chunk:
tmpFile.write(chunk)
# 如表头有返回文件大小,显示进度
if file_size:
downloaded_size += 1024
rate = downloaded_size / file_size if downloaded_size < file_size else 1
print('\r {:.0%} '.format(rate), end="")
except Exception as e:
download_status, info = False, f'Download failed.\n{e}'
raise
else:
download_status, info = (False, 'File size is 0.') if full_path.stat().st_size == 0 else (True, 'Success.')
finally:
# 删除下载出错文件
if not download_status and full_path.exists():
full_path.unlink()
r.close()
# -------------------显示并返回值-------------------
print(info, '\n')
info = file_full_name if download_status else info
return download_status, info
def _make_response(self, url: str, mode: str = 'get', data: dict = None, **kwargs) -> Union[HTMLResponse, bool]:
"""生成response对象。接收mode参数以决定用什么方式。
:param url: 要访问的网址
:param mode: 'get','post'中选择
@ -211,14 +178,17 @@ class SessionPage(object):
:return: Response对象
"""
if mode not in ['get', 'post']:
raise ValueError("mode须在'get', 'post'中选择")
self._url = url
if not kwargs:
kwargs = global_session_options
raise ValueError("mode must be 'get' or 'post'.")
# 设置referer值
if self._url:
if 'headers' in set(x.lower() for x in kwargs):
if 'referer' not in set(x.lower() for x in kwargs['headers']):
kwargs['headers']['Referer'] = self._url
else:
for i in global_session_options:
if i not in kwargs:
kwargs[i] = global_session_options[i]
kwargs['headers'] = self.session.headers
kwargs['headers']['Referer'] = self._url
try:
r = None
if mode == 'get':
@ -227,12 +197,7 @@ class SessionPage(object):
r = self.session.post(url, data=data, **kwargs)
except:
return_value = False
info = 'URL Invalid'
else:
if r.status_code == 200:
# r.encoding = 'utf-8'
return_value = r
info = 'Success'
else:
return_value = False
info = f'{r.status_code}'
return return_value, info
return return_value

1177
README.md

File diff suppressed because it is too large Load Diff