初步完成WebPage

This commit is contained in:
g1879 2022-11-06 23:32:05 +08:00
parent d39da469cf
commit fdfa9a778c
9 changed files with 491 additions and 156 deletions

View File

@ -1,4 +1,5 @@
# -*- coding:utf-8 -*-
from .drission import Drission
from .mix_page import MixPage
from .web_page import WebPage
from .config import DriverOptions, SessionOptions

View File

@ -0,0 +1,6 @@
# -*- coding:utf-8 -*-
class ChromeDriver(object):
def __init__(self,
address: str = 'localhost:9222',
path: str = 'chrome'):
self.address = address[7:] if address.startswith('http://') else address

View File

@ -1,5 +1,6 @@
# -*- coding:utf-8 -*-
from pathlib import Path
from re import search
from time import perf_counter, sleep
from typing import Union, Tuple, List, Any
@ -7,6 +8,9 @@ from pychrome import Tab
from requests import get as requests_get
from json import loads
from requests.cookies import RequestsCookieJar
from .config import DriverOptions, _cookies_to_tuple
from .base import BasePage
from .common import get_loc
from .drission import connect_chrome
@ -15,23 +19,30 @@ from .chrome_element import ChromeElement, ChromeScroll, run_script
class ChromePage(BasePage):
def __init__(self, address: str = '127.0.0.1:9222',
path: str = 'chrome',
def __init__(self, Tab_or_Options: Union[Tab, DriverOptions] = None,
tab_handle: str = None,
args: list = None,
timeout: float = 10):
super().__init__(timeout)
self.debugger_address = address[7:] if address.startswith('http://') else address
connect_chrome(path, self.debugger_address, args)
tab_handle = self.tab_handles[0] if not tab_handle else tab_handle
self._connect_debugger(tab_handle)
self.version = self._get_version()
self._main_version = int(self.version.split('.')[0])
self._scroll = None
self._connect_debugger(Tab_or_Options, tab_handle)
def _get_version(self):
browser = requests_get(f'http://{self.debugger_address}/json/version').json()['Browser']
return browser.split('/')[1]
def _connect_debugger(self, Tab_or_Options: Union[Tab, DriverOptions] = None, tab_handle: str = None):
if isinstance(Tab_or_Options, Tab):
self._driver = Tab_or_Options
self.address = search(r'ws://(.*?)/dev', Tab_or_Options._websocket_url).group(1)
else:
if Tab_or_Options is None:
Tab_or_Options = DriverOptions() # 从ini文件读取
connect_chrome(Tab_or_Options)
self.address = Tab_or_Options.debugger_address
tab_handle = self.tab_handles[0] if not tab_handle else tab_handle
self._driver = Tab(id=tab_handle, type='page',
webSocketDebuggerUrl=f'ws://{Tab_or_Options.debugger_address}/devtools/page/{tab_handle}')
self._driver.start()
self._driver.DOM.enable()
root = self._driver.DOM.getDocument()
self.root = ChromeElement(self, node_id=root['root']['nodeId'])
def __call__(self, loc_or_str: Union[Tuple[str, str], str, 'ChromeElement'],
timeout: float = None) -> Union['ChromeElement', str, None]:
@ -43,11 +54,16 @@ class ChromePage(BasePage):
"""
return self.ele(loc_or_str, timeout)
@property
def driver(self):
return self._driver
@property
def url(self) -> str:
"""返回当前页面url"""
json = loads(requests_get(f'http://{self.debugger_address}/json').text)
return [i['url'] for i in json if i['id'] == self.driver.id][0]
tab_id = self.driver.id # 用于WebPage时激活浏览器
json = loads(requests_get(f'http://{self.address}/json').text)
return [i['url'] for i in json if i['id'] == tab_id][0]
@property
def html(self) -> str:
@ -71,7 +87,7 @@ class ChromePage(BasePage):
@property
def tab_handles(self) -> list:
"""返回所有标签页id"""
json = loads(requests_get(f'http://{self.debugger_address}/json').text)
json = loads(requests_get(f'http://{self.address}/json').text)
return [i['id'] for i in json if i['type'] == 'page']
@property
@ -92,7 +108,7 @@ class ChromePage(BasePage):
@property
def scroll(self) -> ChromeScroll:
"""用于滚动滚动条的对象"""
if self._scroll is None:
if not hasattr(self, '_scroll'):
self._scroll = ChromeScroll(self)
return self._scroll
@ -142,6 +158,18 @@ class ChromePage(BasePage):
else:
return cookies
def set_cookies(self, cookies: Union[RequestsCookieJar, list, tuple, str, dict]):
cookies = _cookies_to_tuple(cookies)
result_cookies = []
for cookie in cookies:
if not cookie.get('domain', None):
continue
c = {'value': '' if cookie['value'] is None else cookie['value'],
'name': cookie['name'],
'domain': cookie['domain']}
result_cookies.append(c)
self.driver.Network.setCookies(cookies=result_cookies)
def ele(self,
loc_or_ele: Union[Tuple[str, str], str, ChromeElement],
timeout: float = None) -> Union[ChromeElement, str, None]:
@ -170,20 +198,20 @@ class ChromePage(BasePage):
raise ValueError('loc_or_str参数只能是tuple、str、ChromeElement类型。')
timeout = timeout if timeout is not None else self.timeout
search = self.driver.DOM.performSearch(query=loc)
count = search['resultCount']
search_result = self.driver.DOM.performSearch(query=loc)
count = search_result['resultCount']
t1 = perf_counter()
while count == 0 and perf_counter() - t1 < timeout:
search = self.driver.DOM.performSearch(query=loc)
count = search['resultCount']
search_result = self.driver.DOM.performSearch(query=loc)
count = search_result['resultCount']
if count == 0:
return None
else:
count = 1 if single else count
nodeIds = self.driver.DOM.getSearchResults(searchId=search['searchId'], fromIndex=0, toIndex=count)
nodeIds = self.driver.DOM.getSearchResults(searchId=search_result['searchId'], fromIndex=0, toIndex=count)
if count == 1:
return ChromeElement(self, node_id=nodeIds['nodeIds'][0])
else:
@ -219,10 +247,6 @@ class ChromePage(BasePage):
raise TypeError(f'不支持的文件格式:{pic_type}')
pic_type = 'jpeg' if pic_type == '.jpg' else pic_type[1:]
if full_page and self._main_version < 90:
print('注意版本号大于90的chrome才支持整页截图。')
full_page = False
hw = self.size
if full_page:
vp = {'x': 0, 'y': 0, 'width': hw['width'], 'height': hw['height'], 'scale': 1}
@ -335,7 +359,7 @@ class ChromePage(BasePage):
:return: None
"""
url = f'?{url}' if url else ''
requests_get(f'http://{self.debugger_address}/json/new{url}')
requests_get(f'http://{self.address}/json/new{url}')
def to_tab(self, num_or_handle: Union[int, str] = 0, activate: bool = True) -> None:
"""跳转到标签页 \n
@ -357,11 +381,11 @@ class ChromePage(BasePage):
self._connect_debugger(tab)
if activate:
requests_get(f'http://{self.debugger_address}/json/activate/{tab}')
requests_get(f'http://{self.address}/json/activate/{tab}')
def to_front(self) -> None:
"""激活当前标签页使其处于最前面"""
requests_get(f'http://{self.debugger_address}/json/activate/{self.current_tab_handle}')
requests_get(f'http://{self.address}/json/activate/{self.current_tab_handle}')
def close_tabs(self, num_or_handles: Union[int, str, list, tuple, set] = None, others: bool = False) -> None:
"""关闭传入的标签页,默认关闭当前页。可传入多个 \n
@ -388,7 +412,7 @@ class ChromePage(BasePage):
is_alive = False
for tab in tabs:
requests_get(f'http://{self.debugger_address}/json/close/{tab}')
requests_get(f'http://{self.address}/json/close/{tab}')
if is_alive:
self.to_tab(0)
@ -429,14 +453,6 @@ class ChromePage(BasePage):
# def active_ele(self):
# pass
def _connect_debugger(self, tab_handle: str):
self.driver = Tab(id=tab_handle, type='page',
webSocketDebuggerUrl=f'ws://{self.debugger_address}/devtools/page/{tab_handle}')
self.driver.start()
self.driver.DOM.enable()
root = self.driver.DOM.getDocument()
self.root = ChromeElement(self, node_id=root['root']['nodeId'])
def _d_connect(self,
to_url: str,
times: int = 0,

View File

@ -6,11 +6,18 @@
"""
from html import unescape
from pathlib import Path
from platform import system
from re import split, search, sub
from shutil import rmtree
from subprocess import Popen
from time import perf_counter
from typing import Union
from zipfile import ZipFile
from urllib.parse import urlparse, urljoin, urlunparse
from requests import get as requests_get
from requests.exceptions import ConnectionError as requests_connection_err
from .config import DriverOptions
def get_ele_txt(e) -> str:
@ -487,3 +494,104 @@ def is_js_func(func: str) -> bool:
elif '=>' in func:
return True
return False
def _port_is_using(ip: str, port: str) -> Union[bool, None]:
"""检查端口是否被占用 \n
:param ip: 浏览器地址
:param port: 浏览器端口
:return: bool
"""
import socket
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
s.connect((ip, int(port)))
s.shutdown(2)
return True
except socket.error:
return False
finally:
if s:
s.close()
def connect_chrome(option: DriverOptions) -> tuple:
"""连接或启动chrome \n
:param option: DriverOptions对象
:return: chrome 路径和进程对象组成的元组
"""
system_type = system().lower()
debugger_address = option.debugger_address
chrome_path = option.chrome_path
args = option.arguments
debugger_address = debugger_address[7:] if debugger_address.startswith('http://') else debugger_address
ip, port = debugger_address.split(':')
if ip not in ('127.0.0.1', 'localhost'):
return None, None
if _port_is_using(ip, port):
chrome_path = get_exe_path_from_port(port) if chrome_path == 'chrome' and system_type == 'windows' \
else chrome_path
return chrome_path, None
args = [] if args is None else args
args1 = []
for arg in args:
if arg.startswith(('--user-data-dir', '--disk-cache-dir', '--user-agent')) and system().lower() == 'windows':
index = arg.find('=') + 1
args1.append(f'{arg[:index]}"{arg[index:].strip()}"')
else:
args1.append(arg)
args = set(args1)
# if proxy:
# args.add(f'--proxy-server={proxy["http"]}')
# ----------创建浏览器进程----------
try:
debugger = _run_browser(port, chrome_path, args)
if chrome_path == 'chrome' and system_type == 'windows':
chrome_path = get_exe_path_from_port(port)
# 传入的路径找不到主动在ini文件、注册表、系统变量中找
except FileNotFoundError:
from DrissionPage.easy_set import _get_chrome_path
chrome_path = _get_chrome_path(show_msg=False)
if not chrome_path:
raise FileNotFoundError('无法找到chrome.exe路径请手动配置。')
debugger = _run_browser(port, chrome_path, args)
return chrome_path, debugger
def _run_browser(port, path: str, args: set) -> Popen:
"""创建chrome进程 \n
:param port: 端口号
:param path: 浏览器地址
:param args: 启动参数
:return: 进程对象
"""
sys = system().lower()
if sys == 'windows':
args = ' '.join(args)
debugger = Popen(f'"{path}" --remote-debugging-port={port} {args}', shell=False)
else:
arguments = [path, f'--remote-debugging-port={port}'] + list(args)
debugger = Popen(arguments, shell=False)
t1 = perf_counter()
while perf_counter() - t1 < 10:
try:
tabs = requests_get(f'http://127.0.0.1:{port}/json').json()
for tab in tabs:
if tab['type'] == 'page':
return debugger
except requests_connection_err:
pass
raise ConnectionError('无法连接浏览器。')

View File

@ -494,7 +494,7 @@ class DriverOptions(Options):
@property
def chrome_path(self) -> str:
"""浏览器启动文件路径"""
return self.binary_location
return self.binary_location or 'chrome'
# -------------重写父类方法,实现链式操作-------------
def add_argument(self, argument) -> 'DriverOptions':

View File

@ -1,11 +1,11 @@
[paths]
chromedriver_path = D:\coding\Chrome92\chromedriver.exe
chromedriver_path =
tmp_path =
[chrome_options]
debugger_address = 127.0.0.1:9222
binary_location = D:\coding\Chrome92\chrome.exe
arguments = ['--no-sandbox', '--disable-gpu', '--ignore-certificate-errors', '--disable-infobars', '--disable-popup-blocking', '--user-data-dir=D:\\coding\\Chrome92\\user_data']
binary_location = chrome
arguments = ['--no-sandbox', '--disable-gpu', '--ignore-certificate-errors', '--disable-infobars', '--disable-popup-blocking']
extensions = []
experimental_options = {'prefs': {'profile.default_content_settings.popups': 0, 'profile.default_content_setting_values': {'notifications': 2}, 'plugins.plugins_list': [{'enabled': False, 'name': 'Chrome PDF Viewer'}]}, 'useAutomationExtension': False, 'excludeSwitches': ['enable-automation']}
timeouts = {'implicit': 10.0, 'pageLoad': 30.0, 'script': 30.0}

View File

@ -4,24 +4,21 @@
@Contact : g1879@qq.com
@File : drission.py
"""
from subprocess import Popen
from sys import exit
from typing import Union
from platform import system
from requests import Session, get as requests_get
from requests import Session
from requests.cookies import RequestsCookieJar
from requests.structures import CaseInsensitiveDict
from requests.exceptions import ConnectionError as requests_connection_err
from selenium import webdriver
from selenium.common.exceptions import SessionNotCreatedException, WebDriverException
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.chrome.webdriver import WebDriver
from selenium.webdriver.remote.webdriver import WebDriver as RemoteWebDriver
from time import perf_counter
from tldextract import extract
from .common import get_pid_from_port, get_exe_path_from_port
from .common import get_pid_from_port, connect_chrome
from .config import _session_options_to_dict, SessionOptions, DriverOptions, _cookies_to_tuple
@ -109,13 +106,12 @@ class Drission(object):
self.driver_options.add_argument(f'--proxy-server={self._proxy["http"]}')
driver_path = self.driver_options.driver_path or 'chromedriver'
chrome_path = self.driver_options.binary_location or 'chrome.exe'
chrome_path = self.driver_options.chrome_path
# -----------若指定debug端口且该端口未在使用中则先启动浏览器进程-----------
if self.driver_options.debugger_address:
# 启动浏览器进程,同时返回该进程使用的 chrome.exe 路径
chrome_path, self._debugger = connect_chrome(chrome_path, self.driver_options.debugger_address,
self.driver_options.arguments, self._proxy)
chrome_path, self._debugger = connect_chrome(self.driver_options)
# -----------创建WebDriver对象-----------
self._driver = _create_driver(chrome_path, driver_path, self.driver_options)
@ -391,108 +387,6 @@ def user_agent_to_session(driver: RemoteWebDriver, session: Session) -> None:
session.headers.update({"User-Agent": selenium_user_agent})
def _port_is_using(ip: str, port: str) -> Union[bool, None]:
"""检查端口是否被占用 \n
:param ip: 浏览器地址
:param port: 浏览器端口
:return: bool
"""
import socket
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
s.connect((ip, int(port)))
s.shutdown(2)
return True
except socket.error:
return False
finally:
if s:
s.close()
def connect_chrome(chrome_path: str, debugger_address: str, args: list = None, proxy: dict = None) -> tuple:
"""连接或启动chrome \n
:param chrome_path: chrome.exe 路径
:param debugger_address: 进程运行的ip和端口号
:param args: chrome 配置参数
:param proxy: 代理配置
:return: chrome 路径和进程对象组成的元组
"""
debugger_address = debugger_address[7:] if debugger_address.startswith('http://') else debugger_address
ip, port = debugger_address.split(':')
if ip not in ('127.0.0.1', 'localhost'):
return None, None
if _port_is_using(ip, port):
chrome_path = get_exe_path_from_port(port) if chrome_path == 'chrome.exe' else chrome_path
return chrome_path, None
args = [] if args is None else args
args1 = []
for arg in args:
if arg.startswith(('--user-data-dir', '--disk-cache-dir')):
index = arg.find('=') + 1
args1.append(f'{arg[:index]}"{arg[index:].strip()}"')
elif arg.startswith('--user-agent='):
args1.append(f'--user-agent="{arg[13:]}"')
else:
args1.append(arg)
args = set(args1)
if proxy:
args.add(f'--proxy-server={proxy["http"]}')
# ----------创建浏览器进程----------
try:
debugger = _run_browser(port, chrome_path, args)
if chrome_path == 'chrome.exe':
chrome_path = get_exe_path_from_port(port)
# 传入的路径找不到主动在ini文件、注册表、系统变量中找
except FileNotFoundError:
from DrissionPage.easy_set import _get_chrome_path
chrome_path = _get_chrome_path(show_msg=False)
if not chrome_path:
raise FileNotFoundError('无法找到chrome.exe路径请手动配置。')
debugger = _run_browser(port, chrome_path, args)
return chrome_path, debugger
def _run_browser(port, path: str, args: set) -> Popen:
"""创建chrome进程 \n
:param port: 端口号
:param path: 浏览器地址
:param args: 启动参数
:return: 进程对象
"""
sys = system().lower()
if sys == 'windows':
args = ' '.join(args)
debugger = Popen(f'"{path}" --remote-debugging-port={port} {args}', shell=False)
elif sys == 'linux':
arguments = [path, f'--remote-debugging-port={port}'] + list(args)
debugger = Popen(arguments, shell=False)
else:
raise OSError('只支持Windows和Linux系统。')
t1 = perf_counter()
while perf_counter() - t1 < 10:
try:
tabs = requests_get(f'http://127.0.0.1:{port}/json').json()
for tab in tabs:
if tab['type'] == 'page':
return debugger
except requests_connection_err:
pass
raise ConnectionError('无法连接浏览器。')
def _create_driver(chrome_path: str, driver_path: str, options: Options) -> WebDriver:
"""创建 WebDriver 对象 \n
:param chrome_path: chrome.exe 路径
@ -514,7 +408,7 @@ def _create_driver(chrome_path: str, driver_path: str, options: Options) -> WebD
from .easy_set import get_match_driver
from DrissionPage.easy_set import _get_chrome_path
if chrome_path == 'chrome.exe':
if chrome_path == 'chrome':
chrome_path = _get_chrome_path(show_msg=False, from_ini=False)
if chrome_path:

View File

@ -27,7 +27,6 @@ class SessionPage(BasePage):
super().__init__(timeout)
self._session = session
self._response = None
self._download_kit = None
def __call__(self,
loc_or_str: Union[Tuple[str, str], str, SessionElement],
@ -61,12 +60,14 @@ class SessionPage(BasePage):
show_errmsg: bool = False,
retry: int = None,
interval: float = None,
timeout: float = None,
**kwargs) -> bool:
"""用get方式跳转到url \n
:param url: 目标url
:param show_errmsg: 是否显示和抛出异常
:param retry: 重试次数
:param interval: 重试间隔
:param timeout: 连接超时时间
:param kwargs: 连接参数
:return: url是否可用
"""
@ -152,7 +153,7 @@ class SessionPage(BasePage):
@property
def download(self) -> DownloadKit:
if self._download_kit is None:
if not hasattr(self, '_download_kit'):
self._download_kit = DownloadKit(session=self)
return self._download_kit

309
DrissionPage/web_page.py Normal file
View File

@ -0,0 +1,309 @@
# -*- coding:utf-8 -*-
from typing import Union, Tuple
from pychrome import Tab
from requests import Session, Response
from requests.structures import CaseInsensitiveDict
from tldextract import extract
from .chrome_element import ChromeElement
from .session_element import SessionElement
from .base import BasePage
from .config import DriverOptions, SessionOptions, _cookies_to_tuple
from .chrome_page import ChromePage
from .session_page import SessionPage
class WebPage(SessionPage, ChromePage, BasePage):
def __init__(self,
mode: str = 'd',
timeout: float = 10,
tab_handle: str = None,
driver_or_options: Union[Tab, DriverOptions, bool] = None,
session_or_options: Union[SessionOptions, SessionOptions, bool] = None) -> None:
"""初始化函数 \n
:param mode: 'd' 's'即driver模式和session模式
:param timeout: 超时时间d模式时为寻找元素时间s模式时为连接时间默认10秒
:param driver_or_options: Tab对象或浏览器设置只使用s模式时应传入False
:param session_or_options: Session对象或requests设置只使用d模式时应传入False
"""
self._mode = mode.lower()
if self._mode not in ('s', 'd'):
raise ValueError('mode参数只能是s或d。')
super(ChromePage, self).__init__(timeout) # 调用Base的__init__()
self._session = None
self._driver = None
self._set_session_options(session_or_options)
self._set_driver_options(driver_or_options)
self._setting_handle = tab_handle
self._has_driver, self._has_session = (None, True) if self._mode == 's' else (True, None)
self._response = None
if self._mode == 'd':
self.driver
# if self._mode == 'd':
# try:
# timeouts = self.drission.driver_options.timeouts
# t = timeout if timeout is not None else timeouts['implicit'] / 1000
# self.set_timeouts(t, timeouts['pageLoad'] / 1000, timeouts['script'] / 1000)
#
# except Exception:
# self.timeout = timeout if timeout is not None else 10
def __call__(self,
loc_or_str: Union[Tuple[str, str], str, ChromeElement, SessionElement],
timeout: float = None) -> Union[ChromeElement, SessionElement, str, None]:
"""在内部查找元素 \n
ele = page('@id=ele_id') \n
:param loc_or_str: 元素的定位信息可以是loc元组或查询字符串
:param timeout: 超时时间
:return: 子元素对象或属性文本
"""
if self._mode == 's':
return super().__call__(loc_or_str)
elif self._mode == 'd':
return super(SessionPage, self).__call__(loc_or_str, timeout)
# -----------------共有属性和方法-------------------
@property
def url(self) -> Union[str, None]:
"""返回当前url"""
if self._mode == 'd':
return super(SessionPage, self).url if self._has_driver else None
elif self._mode == 's':
return self._session_url
@property
def html(self) -> str:
"""返回页面html文本"""
if self._mode == 's':
return super().html
elif self._mode == 'd':
return super(SessionPage, self).html
@property
def json(self) -> dict:
"""当返回内容是json格式时返回对应的字典"""
if self._mode == 's':
return super().json
elif self._mode == 'd':
return super(SessionPage, self).json
@property
def response(self) -> Response:
"""返回 s 模式获取到的 Response 对象,切换到 s 模式"""
self.change_mode('s')
return self._response
@property
def mode(self) -> str:
"""返回当前模式,'s''d' """
return self._mode
@property
def cookies(self):
if self._mode == 's':
return super().get_cookies()
elif self._mode == 'd':
return super(SessionPage, self).get_cookies()
@property
def session(self) -> Session:
"""返回Session对象如未初始化则按配置信息创建"""
if self._session is None:
self._set_session(self._session_options)
# if self._proxy:
# self._session.proxies = self._proxy
return self._session
@property
def driver(self) -> Tab:
"""返回Tab对象如未初始化则按配置信息创建。 \n
如设置了本地调试浏览器可自动接入或打开浏览器进程
"""
if self._driver is None:
self._connect_debugger(self._driver_options, self._setting_handle)
return self._driver
@property
def _session_url(self) -> str:
"""返回 session 保存的url"""
return self._response.url if self._response else None
def get(self,
url: str,
show_errmsg: bool = False,
retry: int = None,
interval: float = None,
timeout: float = None,
**kwargs) -> Union[bool, None]:
"""跳转到一个url \n
:param url: 目标url
:param show_errmsg: 是否显示和抛出异常
:param retry: 重试次数
:param interval: 重试间隔
:param timeout: 连接超时时间
:param kwargs: 连接参数s模式专用
:return: url是否可用d模式返回None时表示不确定
"""
if self._mode == 'd':
return super(SessionPage, self).get(url, show_errmsg, retry, interval, timeout)
elif self._mode == 's':
return super().get(url, show_errmsg, retry, interval, timeout, **kwargs)
def change_mode(self, mode: str = None, go: bool = True) -> None:
"""切换模式,接收's''d',除此以外的字符串会切换为 d 模式 \n
切换时会把当前模式的cookies复制到目标模式 \n
切换后如果go是True调用相应的get函数使访问的页面同步 \n
注意s转d时若浏览器当前网址域名和s模式不一样必须会跳转 \n
:param mode: 模式字符串
:param go: 是否跳转到原模式的url
"""
if mode is not None and mode.lower() == self._mode:
return
self._mode = 's' if self._mode == 'd' else 'd'
# s模式转d模式
if self._mode == 'd':
self._has_driver = True
self._url = None if not self._has_driver else super(SessionPage, self).url
if self._session_url:
self.cookies_to_driver()
if go:
self.get(self._session_url)
# d模式转s模式
elif self._mode == 's':
self._has_session = True
self._url = self._session_url
if self._has_driver:
self.cookies_to_session()
if go:
url = super(SessionPage, self).url
if url.startswith('http'):
self.get(url)
def cookies_to_session(self, copy_user_agent: bool = False) -> None:
"""把driver对象的cookies复制到session对象 \n
:param copy_user_agent: 是否复制ua信息
:return: None
"""
if copy_user_agent:
selenium_user_agent = self.run_script("navigator.userAgent;")
self.session.headers.update({"User-Agent": selenium_user_agent})
self.set_cookies(super(SessionPage, self).get_cookies(as_dict=True), set_session=True)
def cookies_to_driver(self) -> None:
"""把session对象的cookies复制到driver对象"""
ex_url = extract(self._session_url)
domain = f'{ex_url.domain}.{ex_url.suffix}'
cookies = []
for cookie in super().get_cookies():
if cookie.get('domain', '') == '':
cookie['domain'] = domain
if domain in cookie['domain']:
cookies.append(cookie)
self.set_cookies(cookies, set_driver=True)
def get_cookies(self, as_dict: bool = False, all_domains: bool = False) -> Union[dict, list]:
"""返回cookies \n
:param as_dict: 是否以字典方式返回
:param all_domains: 是否返回所有域的cookies
:return: cookies信息
"""
if self._mode == 's':
return super().get_cookies(as_dict, all_domains)
elif self._mode == 'd':
return super(SessionPage, self).get_cookies(as_dict)
def set_cookies(self, cookies, set_session: bool = False, set_driver: bool = False):
# 添加cookie到driver
if set_driver:
super(SessionPage, self).set_cookies(cookies)
# 添加cookie到session
if set_session:
cookies = _cookies_to_tuple(cookies)
for cookie in cookies:
if cookie['value'] is None:
cookie['value'] = ''
kwargs = {x: cookie[x] for x in cookie
if x.lower() in ('version', 'port', 'domain', 'path', 'secure',
'expires', 'discard', 'comment', 'comment_url', 'rest')}
if 'expiry' in cookie:
kwargs['expires'] = cookie['expiry']
self.session.cookies.set(cookie['name'], cookie['value'], **kwargs)
def _set_session(self, data: dict) -> None:
"""根据传入字典对session进行设置 \n
:param data: session配置字典
:return: None
"""
if self._session is None:
self._session = Session()
if 'headers' in data:
self._session.headers = CaseInsensitiveDict(data['headers'])
if 'cookies' in data:
self.set_cookies(data['cookies'], set_session=True)
attrs = ['auth', 'proxies', 'hooks', 'params', 'verify',
'cert', 'stream', 'trust_env', 'max_redirects'] # , 'adapters'
for i in attrs:
if i in data:
self._session.__setattr__(i, data[i])
def _set_driver_options(self, Tab_or_Options):
"""处理driver设置"""
if Tab_or_Options is None:
self._driver_options = DriverOptions()
elif Tab_or_Options is False:
self._driver_options = DriverOptions(read_file=False)
elif isinstance(Tab_or_Options, Tab):
self._driver = Tab_or_Options
self._connect_debugger(Tab_or_Options.id)
self._has_driver = True
elif isinstance(Tab_or_Options, DriverOptions):
self._driver_options = Tab_or_Options
else:
raise TypeError('driver_or_options参数只能接收WebDriver, Options, DriverOptions或False。')
def _set_session_options(self, Session_or_Options):
"""处理session设置"""
if Session_or_Options is None:
self._session_options = SessionOptions().as_dict()
elif Session_or_Options is False:
self._session_options = SessionOptions(read_file=False).as_dict()
elif isinstance(Session_or_Options, Session):
self._session = Session_or_Options
self._has_session = True
elif isinstance(Session_or_Options, SessionOptions):
self._session_options = Session_or_Options.as_dict()
elif isinstance(Session_or_Options, dict):
self._session_options = Session_or_Options
else:
raise TypeError('session_or_options参数只能接收Session, dict, SessionOptions或False。')