diff --git a/DrissionPage/config.py b/DrissionPage/config.py index e9de3e5..a0fd752 100644 --- a/DrissionPage/config.py +++ b/DrissionPage/config.py @@ -727,12 +727,16 @@ def _session_options_to_dict(options: Union[dict, SessionOptions, None]) -> Unio def _cookie_to_dict(cookie: Cookie) -> dict: """把Cookie对象转为dict格式""" + # print(cookie) if isinstance(cookie, Cookie): - # return cookie.__dict__ - cookie_dict = {'name': cookie.name, 'value': str(cookie.value), 'path': cookie.path, 'domain': cookie.domain} - - if cookie.expires: - cookie_dict['expiry'] = cookie.expires + cookie_dict = cookie.__dict__.copy() + cookie_dict.pop('rfc2109') + cookie_dict.pop('_rest') + return cookie_dict + # cookie_dict = {'name': cookie.name, 'value': str(cookie.value), 'path': cookie.path, 'domain': cookie.domain} + # + # if cookie.expires: + # cookie_dict['expiry'] = cookie.expires elif isinstance(cookie, dict): return cookie diff --git a/DrissionPage/drission.py b/DrissionPage/drission.py index 61cbdd3..19acd0f 100644 --- a/DrissionPage/drission.py +++ b/DrissionPage/drission.py @@ -6,12 +6,11 @@ """ from sys import exit from typing import Union -from urllib.parse import urlparse from requests import Session from requests.cookies import RequestsCookieJar from selenium import webdriver -from selenium.common.exceptions import WebDriverException, SessionNotCreatedException +from selenium.common.exceptions import SessionNotCreatedException, UnableToSetCookieException # , WebDriverException from selenium.webdriver.chrome.options import Options from selenium.webdriver.chrome.webdriver import WebDriver from tldextract import extract @@ -20,6 +19,9 @@ from .config import (_dict_to_chrome_options, _session_options_to_dict, SessionOptions, DriverOptions, _chrome_options_to_dict, OptionsManager, _cookie_to_dict) +# from urllib.parse import urlparse + + class Drission(object): """Drission类用于管理WebDriver对象和Session对象,是驱动器的角色""" @@ -153,12 +155,19 @@ class Drission(object): self._driver.get(url) for cookie in cookies: - self._ensure_add_cookie(cookie) + # self._ensure_add_cookie(cookie) + self.set_cookies(cookie, set_driver=True) def set_cookies(self, cookies: Union[RequestsCookieJar, list, tuple, str], set_session: bool = False, set_driver: bool = False) -> None: + """ + :param cookies: + :param set_session: + :param set_driver: + :return: + """ if isinstance(cookies, (list, tuple, RequestsCookieJar)): cookies = tuple(_cookie_to_dict(cookie) for cookie in cookies) elif isinstance(cookies, str): @@ -169,12 +178,34 @@ class Drission(object): raise TypeError for cookie in cookies: + if cookie['value'] is None: + cookie['value'] = '' + if set_session: - kwargs = {x: cookie[x] for x in cookie if x not in ('name', 'value')} + kwargs = {x: cookie[x] for x in cookie if x not in ('name', 'value', 'httpOnly', 'expiry')} + + if 'expiry' in cookie: + kwargs['expires'] = cookie['expiry'] + self.session.cookies.set(cookie['name'], cookie['value'], **kwargs) if set_driver: - self.driver.add_cookie(cookie) + + try: + self.driver.add_cookie(cookie) + + except UnableToSetCookieException: + cookie_domain = cookie['domain'] if cookie['domain'][0] != '.' else cookie['domain'][1:] + + try: + browser_domain = extract(self.driver.current_url).fqdn + except AttributeError: + browser_domain = '' + + if cookie_domain not in browser_domain: + self.driver.get(f'http://{cookie_domain.lstrip("http://")}') + + self.driver.add_cookie(cookie) def add_a_cookie(self, cookie: str, @@ -199,103 +230,108 @@ class Drission(object): 'cert', 'stream', 'trust_env', 'max_redirects'] # , 'adapters' if 'cookies' in data: - self.set_cookies(data['cookies'], set_driver=False) + self.set_cookies(data['cookies'], set_session=True) for i in attrs: if i in data: self._session.__setattr__(i, data[i]) - def cookies_to_session(self, copy_user_agent: bool = False, - driver: WebDriver = None, - session: Session = None) -> None: + def cookies_to_session(self, copy_user_agent: bool = False) -> None: """把driver对象的cookies复制到session对象 \n :param copy_user_agent: 是否复制ua信息 - :param driver: 来源driver对象 - :param session: 目标session对象 :return: None """ - driver = driver or self.driver - session = session or self.session - if copy_user_agent: - self.user_agent_to_session(driver, session) + self.user_agent_to_session(self.driver, self.session) - for cookie in driver.get_cookies(): - session.cookies.set(cookie['name'], cookie['value'], domain=cookie['domain']) + self.set_cookies(self.driver.get_cookies(), set_session=True) + # for cookie in driver.get_cookies(): + # session.cookies.set(cookie['name'], cookie['value'], domain=cookie['domain']) - def cookies_to_driver(self, url: str, - driver: WebDriver = None, - session: Session = None) -> None: + def cookies_to_driver(self, url: str) -> None: """把session对象的cookies复制到driver对象 \n :param url: 作用域 - :param driver: 目标driver对象 - :param session: 来源session对象 :return: None """ - driver = driver or self.driver - session = session or self.session - domain = urlparse(url).netloc + url = extract(url) + domain = f'{url.domain}.{url.suffix}' + cookies = tuple(x for x in self.session.cookies if domain in x.domain) - if not domain: - raise Exception('Without specifying a domain') + self.set_cookies(cookies, set_driver=True) - # 翻译cookies - for i in [x for x in session.cookies if domain in x.domain]: - cookie_data = _cookie_to_dict(i) - self._ensure_add_cookie(cookie_data, driver=driver) - - def _ensure_add_cookie(self, cookie, override_domain=None, driver=None) -> None: - """添加cookie到driver \n - :param cookie: 要添加的cookie - :param override_domain: 覆盖作用域 - :param driver: 操作的driver对象 - :return: None - """ - driver = driver or self.driver - - if override_domain: - cookie['domain'] = override_domain - - cookie_domain = cookie['domain'] if cookie['domain'][0] != '.' else cookie['domain'][1:] - - try: - browser_domain = extract(driver.current_url).fqdn - except AttributeError: - browser_domain = '' - - if cookie_domain not in browser_domain: - driver.get(f'http://{cookie_domain.lstrip("http://")}') - - if 'expiry' in cookie: - cookie['expiry'] = int(cookie['expiry']) - - driver.add_cookie(cookie) - - # 如果添加失败,尝试更宽的域名 - if not self._is_cookie_in_driver(cookie, driver): - cookie['domain'] = extract(cookie['domain']).registered_domain - driver.add_cookie(cookie) - - if not self._is_cookie_in_driver(cookie): - raise WebDriverException(f"Couldn't add the following cookie to the webdriver\n{cookie}\n") - - def _is_cookie_in_driver(self, cookie, driver=None) -> bool: - """检查cookie是否已经在driver里 \n - 只检查name、value、domain,检查domain时比较宽 \n - :param cookie: 要检查的cookie - :param driver: 被检查的driver - :return: 返回布尔值 - """ - driver = driver or self.driver - for driver_cookie in driver.get_cookies(): - - if (cookie['name'] == driver_cookie['name'] and - cookie['value'] == driver_cookie['value'] and - (cookie['domain'] == driver_cookie['domain'] or - f'.{cookie["domain"]}' == driver_cookie['domain'])): - return True - - return False + # def cookies_to_driver(self, url: str, + # driver: WebDriver = None, + # session: Session = None) -> None: + # """把session对象的cookies复制到driver对象 \n + # :param url: 作用域 + # :param driver: 目标driver对象 + # :param session: 来源session对象 + # :return: None + # """ + # driver = driver or self.driver + # session = session or self.session + # domain = urlparse(url).netloc + # + # if not domain: + # raise Exception('Without specifying a domain') + # + # # 翻译cookies + # for i in [x for x in session.cookies if domain in x.domain]: + # cookie_data = _cookie_to_dict(i) + # self._ensure_add_cookie(cookie_data, driver=driver) + # + # def _ensure_add_cookie(self, cookie, override_domain=None, driver=None) -> None: + # """添加cookie到driver \n + # :param cookie: 要添加的cookie + # :param override_domain: 覆盖作用域 + # :param driver: 操作的driver对象 + # :return: None + # """ + # driver = driver or self.driver + # + # if override_domain: + # cookie['domain'] = override_domain + # + # cookie_domain = cookie['domain'] if cookie['domain'][0] != '.' else cookie['domain'][1:] + # + # try: + # browser_domain = extract(driver.current_url).fqdn + # except AttributeError: + # browser_domain = '' + # + # if cookie_domain not in browser_domain: + # driver.get(f'http://{cookie_domain.lstrip("http://")}') + # + # if 'expiry' in cookie: + # cookie['expiry'] = int(cookie['expiry']) + # + # driver.add_cookie(cookie) + # + # # 如果添加失败,尝试更宽的域名 + # if not self._is_cookie_in_driver(cookie, driver): + # cookie['domain'] = extract(cookie['domain']).registered_domain + # driver.add_cookie(cookie) + # + # if not self._is_cookie_in_driver(cookie): + # raise WebDriverException(f"Couldn't add the following cookie to the webdriver\n{cookie}\n") + # + # def _is_cookie_in_driver(self, cookie, driver=None) -> bool: + # """检查cookie是否已经在driver里 \n + # 只检查name、value、domain,检查domain时比较宽 \n + # :param cookie: 要检查的cookie + # :param driver: 被检查的driver + # :return: 返回布尔值 + # """ + # driver = driver or self.driver + # for driver_cookie in driver.get_cookies(): + # + # if (cookie['name'] == driver_cookie['name'] and + # cookie['value'] == driver_cookie['value'] and + # (cookie['domain'] == driver_cookie['domain'] or + # f'.{cookie["domain"]}' == driver_cookie['domain'])): + # return True + # + # return False def user_agent_to_session(self, driver: WebDriver = None, session: Session = None) -> None: """把driver的user-agent复制到session \n