继续修改cookies功能,未完成

This commit is contained in:
g1879 2020-12-02 17:55:56 +08:00
parent 237ed9607e
commit bfd967edf0
2 changed files with 129 additions and 89 deletions

View File

@ -727,12 +727,16 @@ def _session_options_to_dict(options: Union[dict, SessionOptions, None]) -> Unio
def _cookie_to_dict(cookie: Cookie) -> dict:
"""把Cookie对象转为dict格式"""
# print(cookie)
if isinstance(cookie, Cookie):
# return cookie.__dict__
cookie_dict = {'name': cookie.name, 'value': str(cookie.value), 'path': cookie.path, 'domain': cookie.domain}
if cookie.expires:
cookie_dict['expiry'] = cookie.expires
cookie_dict = cookie.__dict__.copy()
cookie_dict.pop('rfc2109')
cookie_dict.pop('_rest')
return cookie_dict
# cookie_dict = {'name': cookie.name, 'value': str(cookie.value), 'path': cookie.path, 'domain': cookie.domain}
#
# if cookie.expires:
# cookie_dict['expiry'] = cookie.expires
elif isinstance(cookie, dict):
return cookie

View File

@ -6,12 +6,11 @@
"""
from sys import exit
from typing import Union
from urllib.parse import urlparse
from requests import Session
from requests.cookies import RequestsCookieJar
from selenium import webdriver
from selenium.common.exceptions import WebDriverException, SessionNotCreatedException
from selenium.common.exceptions import SessionNotCreatedException, UnableToSetCookieException # , WebDriverException
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.chrome.webdriver import WebDriver
from tldextract import extract
@ -20,6 +19,9 @@ from .config import (_dict_to_chrome_options, _session_options_to_dict,
SessionOptions, DriverOptions, _chrome_options_to_dict, OptionsManager, _cookie_to_dict)
# from urllib.parse import urlparse
class Drission(object):
"""Drission类用于管理WebDriver对象和Session对象是驱动器的角色"""
@ -153,12 +155,19 @@ class Drission(object):
self._driver.get(url)
for cookie in cookies:
self._ensure_add_cookie(cookie)
# self._ensure_add_cookie(cookie)
self.set_cookies(cookie, set_driver=True)
def set_cookies(self,
cookies: Union[RequestsCookieJar, list, tuple, str],
set_session: bool = False,
set_driver: bool = False) -> None:
"""
:param cookies:
:param set_session:
:param set_driver:
:return:
"""
if isinstance(cookies, (list, tuple, RequestsCookieJar)):
cookies = tuple(_cookie_to_dict(cookie) for cookie in cookies)
elif isinstance(cookies, str):
@ -169,12 +178,34 @@ class Drission(object):
raise TypeError
for cookie in cookies:
if cookie['value'] is None:
cookie['value'] = ''
if set_session:
kwargs = {x: cookie[x] for x in cookie if x not in ('name', 'value')}
kwargs = {x: cookie[x] for x in cookie if x not in ('name', 'value', 'httpOnly', 'expiry')}
if 'expiry' in cookie:
kwargs['expires'] = cookie['expiry']
self.session.cookies.set(cookie['name'], cookie['value'], **kwargs)
if set_driver:
self.driver.add_cookie(cookie)
try:
self.driver.add_cookie(cookie)
except UnableToSetCookieException:
cookie_domain = cookie['domain'] if cookie['domain'][0] != '.' else cookie['domain'][1:]
try:
browser_domain = extract(self.driver.current_url).fqdn
except AttributeError:
browser_domain = ''
if cookie_domain not in browser_domain:
self.driver.get(f'http://{cookie_domain.lstrip("http://")}')
self.driver.add_cookie(cookie)
def add_a_cookie(self,
cookie: str,
@ -199,103 +230,108 @@ class Drission(object):
'cert', 'stream', 'trust_env', 'max_redirects'] # , 'adapters'
if 'cookies' in data:
self.set_cookies(data['cookies'], set_driver=False)
self.set_cookies(data['cookies'], set_session=True)
for i in attrs:
if i in data:
self._session.__setattr__(i, data[i])
def cookies_to_session(self, copy_user_agent: bool = False,
driver: WebDriver = None,
session: Session = None) -> None:
def cookies_to_session(self, copy_user_agent: bool = False) -> None:
"""把driver对象的cookies复制到session对象 \n
:param copy_user_agent: 是否复制ua信息
:param driver: 来源driver对象
:param session: 目标session对象
:return: None
"""
driver = driver or self.driver
session = session or self.session
if copy_user_agent:
self.user_agent_to_session(driver, session)
self.user_agent_to_session(self.driver, self.session)
for cookie in driver.get_cookies():
session.cookies.set(cookie['name'], cookie['value'], domain=cookie['domain'])
self.set_cookies(self.driver.get_cookies(), set_session=True)
# for cookie in driver.get_cookies():
# session.cookies.set(cookie['name'], cookie['value'], domain=cookie['domain'])
def cookies_to_driver(self, url: str,
driver: WebDriver = None,
session: Session = None) -> None:
def cookies_to_driver(self, url: str) -> None:
"""把session对象的cookies复制到driver对象 \n
:param url: 作用域
:param driver: 目标driver对象
:param session: 来源session对象
:return: None
"""
driver = driver or self.driver
session = session or self.session
domain = urlparse(url).netloc
url = extract(url)
domain = f'{url.domain}.{url.suffix}'
cookies = tuple(x for x in self.session.cookies if domain in x.domain)
if not domain:
raise Exception('Without specifying a domain')
self.set_cookies(cookies, set_driver=True)
# 翻译cookies
for i in [x for x in session.cookies if domain in x.domain]:
cookie_data = _cookie_to_dict(i)
self._ensure_add_cookie(cookie_data, driver=driver)
def _ensure_add_cookie(self, cookie, override_domain=None, driver=None) -> None:
"""添加cookie到driver \n
:param cookie: 要添加的cookie
:param override_domain: 覆盖作用域
:param driver: 操作的driver对象
:return: None
"""
driver = driver or self.driver
if override_domain:
cookie['domain'] = override_domain
cookie_domain = cookie['domain'] if cookie['domain'][0] != '.' else cookie['domain'][1:]
try:
browser_domain = extract(driver.current_url).fqdn
except AttributeError:
browser_domain = ''
if cookie_domain not in browser_domain:
driver.get(f'http://{cookie_domain.lstrip("http://")}')
if 'expiry' in cookie:
cookie['expiry'] = int(cookie['expiry'])
driver.add_cookie(cookie)
# 如果添加失败,尝试更宽的域名
if not self._is_cookie_in_driver(cookie, driver):
cookie['domain'] = extract(cookie['domain']).registered_domain
driver.add_cookie(cookie)
if not self._is_cookie_in_driver(cookie):
raise WebDriverException(f"Couldn't add the following cookie to the webdriver\n{cookie}\n")
def _is_cookie_in_driver(self, cookie, driver=None) -> bool:
"""检查cookie是否已经在driver里 \n
只检查namevaluedomain检查domain时比较宽 \n
:param cookie: 要检查的cookie
:param driver: 被检查的driver
:return: 返回布尔值
"""
driver = driver or self.driver
for driver_cookie in driver.get_cookies():
if (cookie['name'] == driver_cookie['name'] and
cookie['value'] == driver_cookie['value'] and
(cookie['domain'] == driver_cookie['domain'] or
f'.{cookie["domain"]}' == driver_cookie['domain'])):
return True
return False
# def cookies_to_driver(self, url: str,
# driver: WebDriver = None,
# session: Session = None) -> None:
# """把session对象的cookies复制到driver对象 \n
# :param url: 作用域
# :param driver: 目标driver对象
# :param session: 来源session对象
# :return: None
# """
# driver = driver or self.driver
# session = session or self.session
# domain = urlparse(url).netloc
#
# if not domain:
# raise Exception('Without specifying a domain')
#
# # 翻译cookies
# for i in [x for x in session.cookies if domain in x.domain]:
# cookie_data = _cookie_to_dict(i)
# self._ensure_add_cookie(cookie_data, driver=driver)
#
# def _ensure_add_cookie(self, cookie, override_domain=None, driver=None) -> None:
# """添加cookie到driver \n
# :param cookie: 要添加的cookie
# :param override_domain: 覆盖作用域
# :param driver: 操作的driver对象
# :return: None
# """
# driver = driver or self.driver
#
# if override_domain:
# cookie['domain'] = override_domain
#
# cookie_domain = cookie['domain'] if cookie['domain'][0] != '.' else cookie['domain'][1:]
#
# try:
# browser_domain = extract(driver.current_url).fqdn
# except AttributeError:
# browser_domain = ''
#
# if cookie_domain not in browser_domain:
# driver.get(f'http://{cookie_domain.lstrip("http://")}')
#
# if 'expiry' in cookie:
# cookie['expiry'] = int(cookie['expiry'])
#
# driver.add_cookie(cookie)
#
# # 如果添加失败,尝试更宽的域名
# if not self._is_cookie_in_driver(cookie, driver):
# cookie['domain'] = extract(cookie['domain']).registered_domain
# driver.add_cookie(cookie)
#
# if not self._is_cookie_in_driver(cookie):
# raise WebDriverException(f"Couldn't add the following cookie to the webdriver\n{cookie}\n")
#
# def _is_cookie_in_driver(self, cookie, driver=None) -> bool:
# """检查cookie是否已经在driver里 \n
# 只检查name、value、domain检查domain时比较宽 \n
# :param cookie: 要检查的cookie
# :param driver: 被检查的driver
# :return: 返回布尔值
# """
# driver = driver or self.driver
# for driver_cookie in driver.get_cookies():
#
# if (cookie['name'] == driver_cookie['name'] and
# cookie['value'] == driver_cookie['value'] and
# (cookie['domain'] == driver_cookie['domain'] or
# f'.{cookie["domain"]}' == driver_cookie['domain'])):
# return True
#
# return False
def user_agent_to_session(self, driver: WebDriver = None, session: Session = None) -> None:
"""把driver的user-agent复制到session \n