From fdfa9a778c9a116bf722494137f5e41cbb3007f7 Mon Sep 17 00:00:00 2001 From: g1879 Date: Sun, 6 Nov 2022 23:32:05 +0800 Subject: [PATCH] =?UTF-8?q?=E5=88=9D=E6=AD=A5=E5=AE=8C=E6=88=90WebPage?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- DrissionPage/__init__.py | 3 +- DrissionPage/chrome_driver.py | 6 + DrissionPage/chrome_page.py | 92 +++++----- DrissionPage/common.py | 108 ++++++++++++ DrissionPage/config.py | 2 +- DrissionPage/configs.ini | 6 +- DrissionPage/drission.py | 116 +------------ DrissionPage/session_page.py | 5 +- DrissionPage/web_page.py | 309 ++++++++++++++++++++++++++++++++++ 9 files changed, 491 insertions(+), 156 deletions(-) create mode 100644 DrissionPage/chrome_driver.py create mode 100644 DrissionPage/web_page.py diff --git a/DrissionPage/__init__.py b/DrissionPage/__init__.py index e719897..8adc44b 100644 --- a/DrissionPage/__init__.py +++ b/DrissionPage/__init__.py @@ -1,4 +1,5 @@ # -*- coding:utf-8 -*- -from .drission import Drission from .mix_page import MixPage +from .web_page import WebPage +from .config import DriverOptions, SessionOptions diff --git a/DrissionPage/chrome_driver.py b/DrissionPage/chrome_driver.py new file mode 100644 index 0000000..2b00289 --- /dev/null +++ b/DrissionPage/chrome_driver.py @@ -0,0 +1,6 @@ +# -*- coding:utf-8 -*- +class ChromeDriver(object): + def __init__(self, + address: str = 'localhost:9222', + path: str = 'chrome'): + self.address = address[7:] if address.startswith('http://') else address diff --git a/DrissionPage/chrome_page.py b/DrissionPage/chrome_page.py index fa2d354..e50fa2c 100644 --- a/DrissionPage/chrome_page.py +++ b/DrissionPage/chrome_page.py @@ -1,5 +1,6 @@ # -*- coding:utf-8 -*- from pathlib import Path +from re import search from time import perf_counter, sleep from typing import Union, Tuple, List, Any @@ -7,6 +8,9 @@ from pychrome import Tab from requests import get as requests_get from json import loads +from requests.cookies import RequestsCookieJar + +from .config import DriverOptions, _cookies_to_tuple from .base import BasePage from .common import get_loc from .drission import connect_chrome @@ -15,23 +19,30 @@ from .chrome_element import ChromeElement, ChromeScroll, run_script class ChromePage(BasePage): - def __init__(self, address: str = '127.0.0.1:9222', - path: str = 'chrome', + def __init__(self, Tab_or_Options: Union[Tab, DriverOptions] = None, tab_handle: str = None, - args: list = None, timeout: float = 10): super().__init__(timeout) - self.debugger_address = address[7:] if address.startswith('http://') else address - connect_chrome(path, self.debugger_address, args) - tab_handle = self.tab_handles[0] if not tab_handle else tab_handle - self._connect_debugger(tab_handle) - self.version = self._get_version() - self._main_version = int(self.version.split('.')[0]) - self._scroll = None + self._connect_debugger(Tab_or_Options, tab_handle) - def _get_version(self): - browser = requests_get(f'http://{self.debugger_address}/json/version').json()['Browser'] - return browser.split('/')[1] + def _connect_debugger(self, Tab_or_Options: Union[Tab, DriverOptions] = None, tab_handle: str = None): + if isinstance(Tab_or_Options, Tab): + self._driver = Tab_or_Options + self.address = search(r'ws://(.*?)/dev', Tab_or_Options._websocket_url).group(1) + + else: + if Tab_or_Options is None: + Tab_or_Options = DriverOptions() # 从ini文件读取 + connect_chrome(Tab_or_Options) + self.address = Tab_or_Options.debugger_address + tab_handle = self.tab_handles[0] if not tab_handle else tab_handle + self._driver = Tab(id=tab_handle, type='page', + webSocketDebuggerUrl=f'ws://{Tab_or_Options.debugger_address}/devtools/page/{tab_handle}') + + self._driver.start() + self._driver.DOM.enable() + root = self._driver.DOM.getDocument() + self.root = ChromeElement(self, node_id=root['root']['nodeId']) def __call__(self, loc_or_str: Union[Tuple[str, str], str, 'ChromeElement'], timeout: float = None) -> Union['ChromeElement', str, None]: @@ -43,11 +54,16 @@ class ChromePage(BasePage): """ return self.ele(loc_or_str, timeout) + @property + def driver(self): + return self._driver + @property def url(self) -> str: """返回当前页面url""" - json = loads(requests_get(f'http://{self.debugger_address}/json').text) - return [i['url'] for i in json if i['id'] == self.driver.id][0] + tab_id = self.driver.id # 用于WebPage时激活浏览器 + json = loads(requests_get(f'http://{self.address}/json').text) + return [i['url'] for i in json if i['id'] == tab_id][0] @property def html(self) -> str: @@ -71,7 +87,7 @@ class ChromePage(BasePage): @property def tab_handles(self) -> list: """返回所有标签页id""" - json = loads(requests_get(f'http://{self.debugger_address}/json').text) + json = loads(requests_get(f'http://{self.address}/json').text) return [i['id'] for i in json if i['type'] == 'page'] @property @@ -92,7 +108,7 @@ class ChromePage(BasePage): @property def scroll(self) -> ChromeScroll: """用于滚动滚动条的对象""" - if self._scroll is None: + if not hasattr(self, '_scroll'): self._scroll = ChromeScroll(self) return self._scroll @@ -142,6 +158,18 @@ class ChromePage(BasePage): else: return cookies + def set_cookies(self, cookies: Union[RequestsCookieJar, list, tuple, str, dict]): + cookies = _cookies_to_tuple(cookies) + result_cookies = [] + for cookie in cookies: + if not cookie.get('domain', None): + continue + c = {'value': '' if cookie['value'] is None else cookie['value'], + 'name': cookie['name'], + 'domain': cookie['domain']} + result_cookies.append(c) + self.driver.Network.setCookies(cookies=result_cookies) + def ele(self, loc_or_ele: Union[Tuple[str, str], str, ChromeElement], timeout: float = None) -> Union[ChromeElement, str, None]: @@ -170,20 +198,20 @@ class ChromePage(BasePage): raise ValueError('loc_or_str参数只能是tuple、str、ChromeElement类型。') timeout = timeout if timeout is not None else self.timeout - search = self.driver.DOM.performSearch(query=loc) - count = search['resultCount'] + search_result = self.driver.DOM.performSearch(query=loc) + count = search_result['resultCount'] t1 = perf_counter() while count == 0 and perf_counter() - t1 < timeout: - search = self.driver.DOM.performSearch(query=loc) - count = search['resultCount'] + search_result = self.driver.DOM.performSearch(query=loc) + count = search_result['resultCount'] if count == 0: return None else: count = 1 if single else count - nodeIds = self.driver.DOM.getSearchResults(searchId=search['searchId'], fromIndex=0, toIndex=count) + nodeIds = self.driver.DOM.getSearchResults(searchId=search_result['searchId'], fromIndex=0, toIndex=count) if count == 1: return ChromeElement(self, node_id=nodeIds['nodeIds'][0]) else: @@ -219,10 +247,6 @@ class ChromePage(BasePage): raise TypeError(f'不支持的文件格式:{pic_type}。') pic_type = 'jpeg' if pic_type == '.jpg' else pic_type[1:] - if full_page and self._main_version < 90: - print('注意:版本号大于90的chrome才支持整页截图。') - full_page = False - hw = self.size if full_page: vp = {'x': 0, 'y': 0, 'width': hw['width'], 'height': hw['height'], 'scale': 1} @@ -335,7 +359,7 @@ class ChromePage(BasePage): :return: None """ url = f'?{url}' if url else '' - requests_get(f'http://{self.debugger_address}/json/new{url}') + requests_get(f'http://{self.address}/json/new{url}') def to_tab(self, num_or_handle: Union[int, str] = 0, activate: bool = True) -> None: """跳转到标签页 \n @@ -357,11 +381,11 @@ class ChromePage(BasePage): self._connect_debugger(tab) if activate: - requests_get(f'http://{self.debugger_address}/json/activate/{tab}') + requests_get(f'http://{self.address}/json/activate/{tab}') def to_front(self) -> None: """激活当前标签页使其处于最前面""" - requests_get(f'http://{self.debugger_address}/json/activate/{self.current_tab_handle}') + requests_get(f'http://{self.address}/json/activate/{self.current_tab_handle}') def close_tabs(self, num_or_handles: Union[int, str, list, tuple, set] = None, others: bool = False) -> None: """关闭传入的标签页,默认关闭当前页。可传入多个 \n @@ -388,7 +412,7 @@ class ChromePage(BasePage): is_alive = False for tab in tabs: - requests_get(f'http://{self.debugger_address}/json/close/{tab}') + requests_get(f'http://{self.address}/json/close/{tab}') if is_alive: self.to_tab(0) @@ -429,14 +453,6 @@ class ChromePage(BasePage): # def active_ele(self): # pass - def _connect_debugger(self, tab_handle: str): - self.driver = Tab(id=tab_handle, type='page', - webSocketDebuggerUrl=f'ws://{self.debugger_address}/devtools/page/{tab_handle}') - self.driver.start() - self.driver.DOM.enable() - root = self.driver.DOM.getDocument() - self.root = ChromeElement(self, node_id=root['root']['nodeId']) - def _d_connect(self, to_url: str, times: int = 0, diff --git a/DrissionPage/common.py b/DrissionPage/common.py index 0ba39be..9df0db3 100644 --- a/DrissionPage/common.py +++ b/DrissionPage/common.py @@ -6,11 +6,18 @@ """ from html import unescape from pathlib import Path +from platform import system from re import split, search, sub from shutil import rmtree +from subprocess import Popen +from time import perf_counter from typing import Union from zipfile import ZipFile from urllib.parse import urlparse, urljoin, urlunparse +from requests import get as requests_get +from requests.exceptions import ConnectionError as requests_connection_err + +from .config import DriverOptions def get_ele_txt(e) -> str: @@ -487,3 +494,104 @@ def is_js_func(func: str) -> bool: elif '=>' in func: return True return False + + +def _port_is_using(ip: str, port: str) -> Union[bool, None]: + """检查端口是否被占用 \n + :param ip: 浏览器地址 + :param port: 浏览器端口 + :return: bool + """ + import socket + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + + try: + s.connect((ip, int(port))) + s.shutdown(2) + return True + except socket.error: + return False + finally: + if s: + s.close() + + +def connect_chrome(option: DriverOptions) -> tuple: + """连接或启动chrome \n + :param option: DriverOptions对象 + :return: chrome 路径和进程对象组成的元组 + """ + system_type = system().lower() + debugger_address = option.debugger_address + chrome_path = option.chrome_path + args = option.arguments + + debugger_address = debugger_address[7:] if debugger_address.startswith('http://') else debugger_address + ip, port = debugger_address.split(':') + if ip not in ('127.0.0.1', 'localhost'): + return None, None + + if _port_is_using(ip, port): + chrome_path = get_exe_path_from_port(port) if chrome_path == 'chrome' and system_type == 'windows' \ + else chrome_path + return chrome_path, None + + args = [] if args is None else args + args1 = [] + for arg in args: + if arg.startswith(('--user-data-dir', '--disk-cache-dir', '--user-agent')) and system().lower() == 'windows': + index = arg.find('=') + 1 + args1.append(f'{arg[:index]}"{arg[index:].strip()}"') + else: + args1.append(arg) + + args = set(args1) + + # if proxy: + # args.add(f'--proxy-server={proxy["http"]}') + + # ----------创建浏览器进程---------- + try: + debugger = _run_browser(port, chrome_path, args) + if chrome_path == 'chrome' and system_type == 'windows': + chrome_path = get_exe_path_from_port(port) + + # 传入的路径找不到,主动在ini文件、注册表、系统变量中找 + except FileNotFoundError: + from DrissionPage.easy_set import _get_chrome_path + chrome_path = _get_chrome_path(show_msg=False) + + if not chrome_path: + raise FileNotFoundError('无法找到chrome.exe路径,请手动配置。') + + debugger = _run_browser(port, chrome_path, args) + + return chrome_path, debugger + + +def _run_browser(port, path: str, args: set) -> Popen: + """创建chrome进程 \n + :param port: 端口号 + :param path: 浏览器地址 + :param args: 启动参数 + :return: 进程对象 + """ + sys = system().lower() + if sys == 'windows': + args = ' '.join(args) + debugger = Popen(f'"{path}" --remote-debugging-port={port} {args}', shell=False) + else: + arguments = [path, f'--remote-debugging-port={port}'] + list(args) + debugger = Popen(arguments, shell=False) + + t1 = perf_counter() + while perf_counter() - t1 < 10: + try: + tabs = requests_get(f'http://127.0.0.1:{port}/json').json() + for tab in tabs: + if tab['type'] == 'page': + return debugger + except requests_connection_err: + pass + + raise ConnectionError('无法连接浏览器。') diff --git a/DrissionPage/config.py b/DrissionPage/config.py index e6cfa11..139cf88 100644 --- a/DrissionPage/config.py +++ b/DrissionPage/config.py @@ -494,7 +494,7 @@ class DriverOptions(Options): @property def chrome_path(self) -> str: """浏览器启动文件路径""" - return self.binary_location + return self.binary_location or 'chrome' # -------------重写父类方法,实现链式操作------------- def add_argument(self, argument) -> 'DriverOptions': diff --git a/DrissionPage/configs.ini b/DrissionPage/configs.ini index a90da88..e742857 100644 --- a/DrissionPage/configs.ini +++ b/DrissionPage/configs.ini @@ -1,11 +1,11 @@ [paths] -chromedriver_path = D:\coding\Chrome92\chromedriver.exe +chromedriver_path = tmp_path = [chrome_options] debugger_address = 127.0.0.1:9222 -binary_location = D:\coding\Chrome92\chrome.exe -arguments = ['--no-sandbox', '--disable-gpu', '--ignore-certificate-errors', '--disable-infobars', '--disable-popup-blocking', '--user-data-dir=D:\\coding\\Chrome92\\user_data'] +binary_location = chrome +arguments = ['--no-sandbox', '--disable-gpu', '--ignore-certificate-errors', '--disable-infobars', '--disable-popup-blocking'] extensions = [] experimental_options = {'prefs': {'profile.default_content_settings.popups': 0, 'profile.default_content_setting_values': {'notifications': 2}, 'plugins.plugins_list': [{'enabled': False, 'name': 'Chrome PDF Viewer'}]}, 'useAutomationExtension': False, 'excludeSwitches': ['enable-automation']} timeouts = {'implicit': 10.0, 'pageLoad': 30.0, 'script': 30.0} diff --git a/DrissionPage/drission.py b/DrissionPage/drission.py index dc31d6d..4fcf370 100644 --- a/DrissionPage/drission.py +++ b/DrissionPage/drission.py @@ -4,24 +4,21 @@ @Contact : g1879@qq.com @File : drission.py """ -from subprocess import Popen from sys import exit from typing import Union from platform import system -from requests import Session, get as requests_get +from requests import Session from requests.cookies import RequestsCookieJar from requests.structures import CaseInsensitiveDict -from requests.exceptions import ConnectionError as requests_connection_err from selenium import webdriver from selenium.common.exceptions import SessionNotCreatedException, WebDriverException from selenium.webdriver.chrome.options import Options from selenium.webdriver.chrome.webdriver import WebDriver from selenium.webdriver.remote.webdriver import WebDriver as RemoteWebDriver -from time import perf_counter from tldextract import extract -from .common import get_pid_from_port, get_exe_path_from_port +from .common import get_pid_from_port, connect_chrome from .config import _session_options_to_dict, SessionOptions, DriverOptions, _cookies_to_tuple @@ -109,13 +106,12 @@ class Drission(object): self.driver_options.add_argument(f'--proxy-server={self._proxy["http"]}') driver_path = self.driver_options.driver_path or 'chromedriver' - chrome_path = self.driver_options.binary_location or 'chrome.exe' + chrome_path = self.driver_options.chrome_path # -----------若指定debug端口且该端口未在使用中,则先启动浏览器进程----------- if self.driver_options.debugger_address: # 启动浏览器进程,同时返回该进程使用的 chrome.exe 路径 - chrome_path, self._debugger = connect_chrome(chrome_path, self.driver_options.debugger_address, - self.driver_options.arguments, self._proxy) + chrome_path, self._debugger = connect_chrome(self.driver_options) # -----------创建WebDriver对象----------- self._driver = _create_driver(chrome_path, driver_path, self.driver_options) @@ -391,108 +387,6 @@ def user_agent_to_session(driver: RemoteWebDriver, session: Session) -> None: session.headers.update({"User-Agent": selenium_user_agent}) -def _port_is_using(ip: str, port: str) -> Union[bool, None]: - """检查端口是否被占用 \n - :param ip: 浏览器地址 - :param port: 浏览器端口 - :return: bool - """ - import socket - s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - - try: - s.connect((ip, int(port))) - s.shutdown(2) - return True - except socket.error: - return False - finally: - if s: - s.close() - - -def connect_chrome(chrome_path: str, debugger_address: str, args: list = None, proxy: dict = None) -> tuple: - """连接或启动chrome \n - :param chrome_path: chrome.exe 路径 - :param debugger_address: 进程运行的ip和端口号 - :param args: chrome 配置参数 - :param proxy: 代理配置 - :return: chrome 路径和进程对象组成的元组 - """ - debugger_address = debugger_address[7:] if debugger_address.startswith('http://') else debugger_address - ip, port = debugger_address.split(':') - if ip not in ('127.0.0.1', 'localhost'): - return None, None - - if _port_is_using(ip, port): - chrome_path = get_exe_path_from_port(port) if chrome_path == 'chrome.exe' else chrome_path - return chrome_path, None - - args = [] if args is None else args - args1 = [] - for arg in args: - if arg.startswith(('--user-data-dir', '--disk-cache-dir')): - index = arg.find('=') + 1 - args1.append(f'{arg[:index]}"{arg[index:].strip()}"') - elif arg.startswith('--user-agent='): - args1.append(f'--user-agent="{arg[13:]}"') - else: - args1.append(arg) - - args = set(args1) - - if proxy: - args.add(f'--proxy-server={proxy["http"]}') - - # ----------创建浏览器进程---------- - try: - debugger = _run_browser(port, chrome_path, args) - if chrome_path == 'chrome.exe': - chrome_path = get_exe_path_from_port(port) - - # 传入的路径找不到,主动在ini文件、注册表、系统变量中找 - except FileNotFoundError: - from DrissionPage.easy_set import _get_chrome_path - chrome_path = _get_chrome_path(show_msg=False) - - if not chrome_path: - raise FileNotFoundError('无法找到chrome.exe路径,请手动配置。') - - debugger = _run_browser(port, chrome_path, args) - - return chrome_path, debugger - - -def _run_browser(port, path: str, args: set) -> Popen: - """创建chrome进程 \n - :param port: 端口号 - :param path: 浏览器地址 - :param args: 启动参数 - :return: 进程对象 - """ - sys = system().lower() - if sys == 'windows': - args = ' '.join(args) - debugger = Popen(f'"{path}" --remote-debugging-port={port} {args}', shell=False) - elif sys == 'linux': - arguments = [path, f'--remote-debugging-port={port}'] + list(args) - debugger = Popen(arguments, shell=False) - else: - raise OSError('只支持Windows和Linux系统。') - - t1 = perf_counter() - while perf_counter() - t1 < 10: - try: - tabs = requests_get(f'http://127.0.0.1:{port}/json').json() - for tab in tabs: - if tab['type'] == 'page': - return debugger - except requests_connection_err: - pass - - raise ConnectionError('无法连接浏览器。') - - def _create_driver(chrome_path: str, driver_path: str, options: Options) -> WebDriver: """创建 WebDriver 对象 \n :param chrome_path: chrome.exe 路径 @@ -514,7 +408,7 @@ def _create_driver(chrome_path: str, driver_path: str, options: Options) -> WebD from .easy_set import get_match_driver from DrissionPage.easy_set import _get_chrome_path - if chrome_path == 'chrome.exe': + if chrome_path == 'chrome': chrome_path = _get_chrome_path(show_msg=False, from_ini=False) if chrome_path: diff --git a/DrissionPage/session_page.py b/DrissionPage/session_page.py index 473b9f5..8649868 100644 --- a/DrissionPage/session_page.py +++ b/DrissionPage/session_page.py @@ -27,7 +27,6 @@ class SessionPage(BasePage): super().__init__(timeout) self._session = session self._response = None - self._download_kit = None def __call__(self, loc_or_str: Union[Tuple[str, str], str, SessionElement], @@ -61,12 +60,14 @@ class SessionPage(BasePage): show_errmsg: bool = False, retry: int = None, interval: float = None, + timeout: float = None, **kwargs) -> bool: """用get方式跳转到url \n :param url: 目标url :param show_errmsg: 是否显示和抛出异常 :param retry: 重试次数 :param interval: 重试间隔(秒) + :param timeout: 连接超时时间(秒) :param kwargs: 连接参数 :return: url是否可用 """ @@ -152,7 +153,7 @@ class SessionPage(BasePage): @property def download(self) -> DownloadKit: - if self._download_kit is None: + if not hasattr(self, '_download_kit'): self._download_kit = DownloadKit(session=self) return self._download_kit diff --git a/DrissionPage/web_page.py b/DrissionPage/web_page.py new file mode 100644 index 0000000..7862d03 --- /dev/null +++ b/DrissionPage/web_page.py @@ -0,0 +1,309 @@ +# -*- coding:utf-8 -*- +from typing import Union, Tuple + +from pychrome import Tab +from requests import Session, Response +from requests.structures import CaseInsensitiveDict +from tldextract import extract + +from .chrome_element import ChromeElement +from .session_element import SessionElement +from .base import BasePage +from .config import DriverOptions, SessionOptions, _cookies_to_tuple +from .chrome_page import ChromePage +from .session_page import SessionPage + + +class WebPage(SessionPage, ChromePage, BasePage): + def __init__(self, + mode: str = 'd', + timeout: float = 10, + tab_handle: str = None, + driver_or_options: Union[Tab, DriverOptions, bool] = None, + session_or_options: Union[SessionOptions, SessionOptions, bool] = None) -> None: + """初始化函数 \n + :param mode: 'd' 或 's',即driver模式和session模式 + :param timeout: 超时时间,d模式时为寻找元素时间,s模式时为连接时间,默认10秒 + :param driver_or_options: Tab对象或浏览器设置,只使用s模式时应传入False + :param session_or_options: Session对象或requests设置,只使用d模式时应传入False + """ + self._mode = mode.lower() + if self._mode not in ('s', 'd'): + raise ValueError('mode参数只能是s或d。') + + super(ChromePage, self).__init__(timeout) # 调用Base的__init__() + self._session = None + self._driver = None + self._set_session_options(session_or_options) + self._set_driver_options(driver_or_options) + self._setting_handle = tab_handle + self._has_driver, self._has_session = (None, True) if self._mode == 's' else (True, None) + self._response = None + + if self._mode == 'd': + self.driver + + # if self._mode == 'd': + # try: + # timeouts = self.drission.driver_options.timeouts + # t = timeout if timeout is not None else timeouts['implicit'] / 1000 + # self.set_timeouts(t, timeouts['pageLoad'] / 1000, timeouts['script'] / 1000) + # + # except Exception: + # self.timeout = timeout if timeout is not None else 10 + + def __call__(self, + loc_or_str: Union[Tuple[str, str], str, ChromeElement, SessionElement], + timeout: float = None) -> Union[ChromeElement, SessionElement, str, None]: + """在内部查找元素 \n + 例:ele = page('@id=ele_id') \n + :param loc_or_str: 元素的定位信息,可以是loc元组,或查询字符串 + :param timeout: 超时时间 + :return: 子元素对象或属性文本 + """ + if self._mode == 's': + return super().__call__(loc_or_str) + elif self._mode == 'd': + return super(SessionPage, self).__call__(loc_or_str, timeout) + + # -----------------共有属性和方法------------------- + @property + def url(self) -> Union[str, None]: + """返回当前url""" + if self._mode == 'd': + return super(SessionPage, self).url if self._has_driver else None + elif self._mode == 's': + return self._session_url + + @property + def html(self) -> str: + """返回页面html文本""" + if self._mode == 's': + return super().html + elif self._mode == 'd': + return super(SessionPage, self).html + + @property + def json(self) -> dict: + """当返回内容是json格式时,返回对应的字典""" + if self._mode == 's': + return super().json + elif self._mode == 'd': + return super(SessionPage, self).json + + @property + def response(self) -> Response: + """返回 s 模式获取到的 Response 对象,切换到 s 模式""" + self.change_mode('s') + return self._response + + @property + def mode(self) -> str: + """返回当前模式,'s'或'd' """ + return self._mode + + @property + def cookies(self): + if self._mode == 's': + return super().get_cookies() + elif self._mode == 'd': + return super(SessionPage, self).get_cookies() + + @property + def session(self) -> Session: + """返回Session对象,如未初始化则按配置信息创建""" + if self._session is None: + self._set_session(self._session_options) + + # if self._proxy: + # self._session.proxies = self._proxy + + return self._session + + @property + def driver(self) -> Tab: + """返回Tab对象,如未初始化则按配置信息创建。 \n + 如设置了本地调试浏览器,可自动接入或打开浏览器进程。 + """ + if self._driver is None: + self._connect_debugger(self._driver_options, self._setting_handle) + + return self._driver + + @property + def _session_url(self) -> str: + """返回 session 保存的url""" + return self._response.url if self._response else None + + def get(self, + url: str, + show_errmsg: bool = False, + retry: int = None, + interval: float = None, + timeout: float = None, + **kwargs) -> Union[bool, None]: + """跳转到一个url \n + :param url: 目标url + :param show_errmsg: 是否显示和抛出异常 + :param retry: 重试次数 + :param interval: 重试间隔(秒) + :param timeout: 连接超时时间(秒) + :param kwargs: 连接参数,s模式专用 + :return: url是否可用,d模式返回None时表示不确定 + """ + if self._mode == 'd': + return super(SessionPage, self).get(url, show_errmsg, retry, interval, timeout) + elif self._mode == 's': + return super().get(url, show_errmsg, retry, interval, timeout, **kwargs) + + def change_mode(self, mode: str = None, go: bool = True) -> None: + """切换模式,接收's'或'd',除此以外的字符串会切换为 d 模式 \n + 切换时会把当前模式的cookies复制到目标模式 \n + 切换后,如果go是True,调用相应的get函数使访问的页面同步 \n + 注意:s转d时,若浏览器当前网址域名和s模式不一样,必须会跳转 \n + :param mode: 模式字符串 + :param go: 是否跳转到原模式的url + """ + if mode is not None and mode.lower() == self._mode: + return + + self._mode = 's' if self._mode == 'd' else 'd' + + # s模式转d模式 + if self._mode == 'd': + self._has_driver = True + self._url = None if not self._has_driver else super(SessionPage, self).url + + if self._session_url: + self.cookies_to_driver() + + if go: + self.get(self._session_url) + + # d模式转s模式 + elif self._mode == 's': + self._has_session = True + self._url = self._session_url + + if self._has_driver: + self.cookies_to_session() + + if go: + url = super(SessionPage, self).url + if url.startswith('http'): + self.get(url) + + def cookies_to_session(self, copy_user_agent: bool = False) -> None: + """把driver对象的cookies复制到session对象 \n + :param copy_user_agent: 是否复制ua信息 + :return: None + """ + if copy_user_agent: + selenium_user_agent = self.run_script("navigator.userAgent;") + self.session.headers.update({"User-Agent": selenium_user_agent}) + + self.set_cookies(super(SessionPage, self).get_cookies(as_dict=True), set_session=True) + + def cookies_to_driver(self) -> None: + """把session对象的cookies复制到driver对象""" + ex_url = extract(self._session_url) + domain = f'{ex_url.domain}.{ex_url.suffix}' + cookies = [] + for cookie in super().get_cookies(): + if cookie.get('domain', '') == '': + cookie['domain'] = domain + + if domain in cookie['domain']: + cookies.append(cookie) + self.set_cookies(cookies, set_driver=True) + + def get_cookies(self, as_dict: bool = False, all_domains: bool = False) -> Union[dict, list]: + """返回cookies \n + :param as_dict: 是否以字典方式返回 + :param all_domains: 是否返回所有域的cookies + :return: cookies信息 + """ + if self._mode == 's': + return super().get_cookies(as_dict, all_domains) + elif self._mode == 'd': + return super(SessionPage, self).get_cookies(as_dict) + + def set_cookies(self, cookies, set_session: bool = False, set_driver: bool = False): + # 添加cookie到driver + if set_driver: + super(SessionPage, self).set_cookies(cookies) + + # 添加cookie到session + if set_session: + cookies = _cookies_to_tuple(cookies) + for cookie in cookies: + if cookie['value'] is None: + cookie['value'] = '' + + kwargs = {x: cookie[x] for x in cookie + if x.lower() in ('version', 'port', 'domain', 'path', 'secure', + 'expires', 'discard', 'comment', 'comment_url', 'rest')} + + if 'expiry' in cookie: + kwargs['expires'] = cookie['expiry'] + + self.session.cookies.set(cookie['name'], cookie['value'], **kwargs) + + def _set_session(self, data: dict) -> None: + """根据传入字典对session进行设置 \n + :param data: session配置字典 + :return: None + """ + if self._session is None: + self._session = Session() + + if 'headers' in data: + self._session.headers = CaseInsensitiveDict(data['headers']) + if 'cookies' in data: + self.set_cookies(data['cookies'], set_session=True) + + attrs = ['auth', 'proxies', 'hooks', 'params', 'verify', + 'cert', 'stream', 'trust_env', 'max_redirects'] # , 'adapters' + for i in attrs: + if i in data: + self._session.__setattr__(i, data[i]) + + def _set_driver_options(self, Tab_or_Options): + """处理driver设置""" + if Tab_or_Options is None: + self._driver_options = DriverOptions() + + elif Tab_or_Options is False: + self._driver_options = DriverOptions(read_file=False) + + elif isinstance(Tab_or_Options, Tab): + self._driver = Tab_or_Options + self._connect_debugger(Tab_or_Options.id) + self._has_driver = True + + elif isinstance(Tab_or_Options, DriverOptions): + self._driver_options = Tab_or_Options + + else: + raise TypeError('driver_or_options参数只能接收WebDriver, Options, DriverOptions或False。') + + def _set_session_options(self, Session_or_Options): + """处理session设置""" + if Session_or_Options is None: + self._session_options = SessionOptions().as_dict() + + elif Session_or_Options is False: + self._session_options = SessionOptions(read_file=False).as_dict() + + elif isinstance(Session_or_Options, Session): + self._session = Session_or_Options + self._has_session = True + + elif isinstance(Session_or_Options, SessionOptions): + self._session_options = Session_or_Options.as_dict() + + elif isinstance(Session_or_Options, dict): + self._session_options = Session_or_Options + + else: + raise TypeError('session_or_options参数只能接收Session, dict, SessionOptions或False。')