From dd762fccb9511f5623e905e2b4f3bd2b79232613 Mon Sep 17 00:00:00 2001 From: g1879 Date: Tue, 15 Nov 2022 01:33:44 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96SessionPage=E5=92=8CWebPage?= =?UTF-8?q?=E7=9A=84cookies=E8=AE=BE=E7=BD=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- DrissionPage/session_page.py | 48 +++++++++++++++++++++++++++-- DrissionPage/web_page.py | 58 +++++++++++++----------------------- 2 files changed, 66 insertions(+), 40 deletions(-) diff --git a/DrissionPage/session_page.py b/DrissionPage/session_page.py index 8649868..fa22947 100644 --- a/DrissionPage/session_page.py +++ b/DrissionPage/session_page.py @@ -15,18 +15,60 @@ from tldextract import extract from DownloadKit import DownloadKit from .base import BasePage -from .config import _cookie_to_dict +from .config import _cookie_to_dict, SessionOptions, _cookies_to_tuple from .session_element import SessionElement, make_session_ele class SessionPage(BasePage): """SessionPage封装了页面操作的常用功能,使用requests来获取、解析网页""" - def __init__(self, session: Session, timeout: float = 10): + def __init__(self, session_or_options: Union[Session, SessionOptions] = None, + timeout: float = 10): """初始化函数""" super().__init__(timeout) - self._session = session self._response = None + self._create_session(session_or_options) + + def _create_session(self, Session_or_Options) -> None: + if Session_or_Options is None or isinstance(Session_or_Options, SessionOptions): + options = Session_or_Options or SessionOptions() + self._set_session(options.as_dict()) + elif isinstance(Session_or_Options, Session): + self._session = Session_or_Options + + def _set_session(self, data: dict) -> None: + """根据传入字典对session进行设置 \n + :param data: session配置字典 + :return: None + """ + if self._session is None: + self._session = Session() + + if 'headers' in data: + self._session.headers = CaseInsensitiveDict(data['headers']) + if 'cookies' in data: + self.set_cookies(data['cookies']) + + attrs = ['auth', 'proxies', 'hooks', 'params', 'verify', + 'cert', 'stream', 'trust_env', 'max_redirects'] # , 'adapters' + for i in attrs: + if i in data: + self._session.__setattr__(i, data[i]) + + def set_cookies(self, cookies): + cookies = _cookies_to_tuple(cookies) + for cookie in cookies: + if cookie['value'] is None: + cookie['value'] = '' + + kwargs = {x: cookie[x] for x in cookie + if x.lower() in ('version', 'port', 'domain', 'path', 'secure', + 'expires', 'discard', 'comment', 'comment_url', 'rest')} + + if 'expiry' in cookie: + kwargs['expires'] = cookie['expiry'] + + self.session.cookies.set(cookie['name'], cookie['value'], **kwargs) def __call__(self, loc_or_str: Union[Tuple[str, str], str, SessionElement], diff --git a/DrissionPage/web_page.py b/DrissionPage/web_page.py index 47eaa9b..0478e48 100644 --- a/DrissionPage/web_page.py +++ b/DrissionPage/web_page.py @@ -4,7 +4,6 @@ from typing import Union, Tuple, List from DownloadKit import DownloadKit from pychrome import Tab from requests import Session, Response -from requests.structures import CaseInsensitiveDict from tldextract import extract from .chromium_element import ChromiumElement @@ -210,7 +209,7 @@ class WebPage(SessionPage, ChromiumPage, BasePage): return self._mode = 's' if self._mode == 'd' else 'd' - + print(self._mode) # s模式转d模式 if self._mode == 'd': if not self._has_driver: @@ -246,7 +245,7 @@ class WebPage(SessionPage, ChromiumPage, BasePage): selenium_user_agent = self.run_script("navigator.userAgent;") self.session.headers.update({"User-Agent": selenium_user_agent}) - self.set_cookies(super(SessionPage, self).get_cookies(as_dict=True), set_session=True) + self.set_cookies(self._get_driver_cookies(as_dict=True), set_session=True) def cookies_to_driver(self) -> None: """把session对象的cookies复制到driver对象""" @@ -270,28 +269,32 @@ class WebPage(SessionPage, ChromiumPage, BasePage): if self._mode == 's': return super().get_cookies(as_dict, all_domains) elif self._mode == 'd': - return super(SessionPage, self).get_cookies(as_dict) + return self._get_driver_cookies(as_dict) + + def _get_driver_cookies(self, as_dict: bool = False): + cookies = super(SessionPage, self).driver.Network.getCookies()['cookies'] + if as_dict: + return {cookie['name']: cookie['value'] for cookie in cookies} + else: + return cookies def set_cookies(self, cookies, set_session: bool = False, set_driver: bool = False): # 添加cookie到driver if set_driver: - super(SessionPage, self).set_cookies(cookies) + cookies = _cookies_to_tuple(cookies) + result_cookies = [] + for cookie in cookies: + if not cookie.get('domain', None): + continue + c = {'value': '' if cookie['value'] is None else cookie['value'], + 'name': cookie['name'], + 'domain': cookie['domain']} + result_cookies.append(c) + super(SessionPage, self).driver.Network.setCookies(cookies=result_cookies) # 添加cookie到session if set_session: - cookies = _cookies_to_tuple(cookies) - for cookie in cookies: - if cookie['value'] is None: - cookie['value'] = '' - - kwargs = {x: cookie[x] for x in cookie - if x.lower() in ('version', 'port', 'domain', 'path', 'secure', - 'expires', 'discard', 'comment', 'comment_url', 'rest')} - - if 'expiry' in cookie: - kwargs['expires'] = cookie['expiry'] - - self.session.cookies.set(cookie['name'], cookie['value'], **kwargs) + super().set_cookies(cookies) def check_page(self, by_requests: bool = False) -> Union[bool, None]: """d模式时检查网页是否符合预期 \n @@ -368,25 +371,6 @@ class WebPage(SessionPage, ChromiumPage, BasePage): elif self._mode == 'd': return super(SessionPage, self)._ele(loc_or_ele, timeout=timeout, single=single) - def _set_session(self, data: dict) -> None: - """根据传入字典对session进行设置 \n - :param data: session配置字典 - :return: None - """ - if self._session is None: - self._session = Session() - - if 'headers' in data: - self._session.headers = CaseInsensitiveDict(data['headers']) - if 'cookies' in data: - self.set_cookies(data['cookies'], set_session=True) - - attrs = ['auth', 'proxies', 'hooks', 'params', 'verify', - 'cert', 'stream', 'trust_env', 'max_redirects'] # , 'adapters' - for i in attrs: - if i in data: - self._session.__setattr__(i, data[i]) - def _set_driver_options(self, Tab_or_Options): """处理driver设置""" if Tab_or_Options is None: