优化cookies处理逻辑,待测试

This commit is contained in:
g1879 2023-03-11 22:29:25 +08:00
parent 62b1fadaf0
commit 8fc8719547
8 changed files with 63 additions and 64 deletions

View File

@ -12,10 +12,11 @@ from urllib.parse import urlparse
from warnings import warn
from requests import Session
from tldextract import extract
from .base import BasePage
from .chromium_driver import ChromiumDriver
from .chromium_element import ChromiumScroll, ChromiumElement, run_js, make_chromium_ele, ChromiumElementWaiter
from .chromium_element import ChromiumScroll, ChromiumElement, run_js, make_chromium_ele
from .commons.constants import HANDLE_ALERT_METHOD, ERROR, NoneElement
from .commons.locator import get_loc
from .commons.tools import get_usable_path, clean_folder
@ -925,13 +926,20 @@ class ChromiumBaseSetter(object):
cookies = cookies_to_tuple(cookies)
result_cookies = []
for cookie in cookies:
if cookie.get('domain', None) is None:
netloc = urlparse(self._page.url).netloc
if netloc.replace('.', '').isdigit(): # ip
cookie['domain'] = netloc
else: # 域名
u = netloc.split('.')
cookie['domain'] = f'.{u[-2]}.{u[-1]}' if len(u) > 1 else netloc
# todo: 须要吗?
# if 'expiry' in cookie:
# cookie['expiry'] = int(cookie['expiry'])
if not cookie.get('domain', None):
print(cookie)
ex_url = extract(self._page.url)
cookie['domain'] = f'{ex_url.domain}.{ex_url.suffix}' if ex_url.suffix else ex_url.domain
# netloc = urlparse(self._page.url).netloc
# if netloc.replace('.', '').isdigit(): # ip
# cookie['domain'] = netloc
# else: # 域名
# u = netloc.split('.')
# cookie['domain'] = f'.{u[-2]}.{u[-1]}' if len(u) > 1 else netloc
result_cookies.append({'value': '' if cookie['value'] is None else cookie['value'],
'name': cookie['name'],

View File

@ -4,7 +4,8 @@
@Contact : g1879@qq.com
"""
from copy import copy
from urllib.parse import urlparse
from tldextract import extract
from .chromium_base import ChromiumBase, ChromiumBaseSetter
from .commons.web import set_session_cookies
@ -292,16 +293,15 @@ class WebPageTab(SessionPage, ChromiumTab):
def cookies_to_browser(self):
"""把session对象的cookies复制到浏览器"""
netloc = urlparse(self.url).netloc
if netloc.replace('.', '').isdigit(): # ip
domain = netloc
else: # 域名
u = netloc.split('.')
domain = f'.{u[-2]}.{u[-1]}' if len(u) > 1 else netloc
if not self._has_driver:
return
ex_url = extract(self._session_url)
domain = f'{ex_url.domain}.{ex_url.suffix}' if ex_url.suffix else ex_url.domain
cookies = []
for cookie in super().get_cookies():
if cookie.get('domain', None) is None:
if not cookie.get('domain', None):
cookie['domain'] = domain
if domain in cookie['domain']:

View File

@ -5,7 +5,6 @@
"""
from platform import system
from sys import exit
from urllib.parse import urlparse
from requests import Session
from requests.structures import CaseInsensitiveDict
@ -13,6 +12,7 @@ from selenium import webdriver
from selenium.common.exceptions import SessionNotCreatedException, WebDriverException
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.remote.webdriver import WebDriver as RemoteWebDriver
from tldextract import extract
from DrissionPage.commons.tools import get_pid_from_port, get_exe_from_port
from DrissionPage.commons.browser import connect_browser
@ -262,17 +262,14 @@ class Drission(object):
cookie['expiry'] = int(cookie['expiry'])
try:
browser_domain = urlparse(self.driver.current_url).netloc
browser_domain = extract(self.driver.current_url).fqdn
except AttributeError:
browser_domain = ''
if cookie.get('domain', None) is None:
if not cookie.get('domain', None):
if browser_domain:
if browser_domain.replace('.', '').isdigit(): # ip
cookie_domain = browser_domain
else: # 域名
u = browser_domain.split('.')
cookie_domain = f'.{u[-2]}.{u[-1]}' if len(u) > 1 else browser_domain
url = extract(browser_domain)
cookie_domain = f'{url.domain}.{url.suffix}'
else:
raise ValueError('cookie中没有域名或浏览器未访问过URL。')
@ -282,7 +279,8 @@ class Drission(object):
cookie_domain = cookie['domain'] if cookie['domain'][0] != '.' else cookie['domain'][1:]
if cookie_domain not in browser_domain:
self.driver.get(cookie_domain if cookie_domain.startswith('http://') else f'http://{cookie_domain}')
self.driver.get(cookie_domain if cookie_domain.startswith('http://')
else f'http://{cookie_domain}')
# 避免selenium自动添加.后无法正确覆盖已有cookie
if cookie['domain'][0] != '.':
@ -326,17 +324,13 @@ class Drission(object):
:param url: 作用域
:return: None
"""
browser_domain = urlparse(self.driver.current_url).netloc
ex_url = urlparse(url).netloc
browser_domain = extract(self.driver.current_url).fqdn
ex_url = extract(url)
if ex_url not in browser_domain:
if ex_url.fqdn not in browser_domain:
self.driver.get(url)
if ex_url.replace('.', '').isdigit(): # ip
domain = ex_url
else: # 域名
u = ex_url.split('.')
domain = f'.{u[-2]}.{u[-1]}' if len(u) > 1 else ex_url
domain = f'{ex_url.domain}.{ex_url.suffix}'
cookies = []
for cookie in self.session.cookies:
@ -461,4 +455,4 @@ def kill_progress(pid=None, port=None):
popen(f'taskkill /pid {pid} /F')
return True
else:
return False
return False

View File

@ -11,6 +11,7 @@ from warnings import warn
from DownloadKit import DownloadKit
from requests import Session, Response
from requests.structures import CaseInsensitiveDict
from tldextract import extract
from .base import BasePage
from DrissionPage.configs.session_options import SessionOptions
@ -210,12 +211,8 @@ class SessionPage(BasePage):
cookies = self.session.cookies
else:
if self.url:
netloc = urlparse(self.url).netloc
if netloc.replace('.', '').isdigit(): # ip
domain = netloc
else: # 域名
u = netloc.split('.')
domain = f'.{u[-2]}.{u[-1]}' if len(u) > 1 else netloc
url = extract(self.url)
domain = f'{url.domain}.{url.suffix}'
cookies = tuple(x for x in self.session.cookies if domain in x.domain or x.domain == '')
else:
cookies = tuple(x for x in self.session.cookies)

View File

@ -11,6 +11,7 @@ from warnings import warn
from DownloadKit import DownloadKit
from requests import Session, Response
from requests.structures import CaseInsensitiveDict
from tldextract import extract
from .base import BasePage
from .commons.web import cookie_to_dict, set_session_cookies
@ -190,12 +191,8 @@ class SessionPage(BasePage):
cookies = self.session.cookies
else:
if self.url:
netloc = urlparse(self.url).netloc
if netloc.replace('.', '').isdigit(): # ip
domain = netloc
else: # 域名
u = netloc.split('.')
domain = f'.{u[-2]}.{u[-1]}' if len(u) > 1 else netloc
ex_url = extract(self.url)
domain = f'{ex_url.domain}.{ex_url.suffix}' if ex_url.suffix else ex_url.domain
cookies = tuple(x for x in self.session.cookies if domain in x.domain or x.domain == '')
else:

View File

@ -4,10 +4,10 @@
@Contact : g1879@qq.com
"""
from pathlib import Path
from urllib.parse import urlparse
from warnings import warn
from requests import Session
from tldextract import extract
from .commons.web import set_session_cookies
from .base import BasePage
@ -363,29 +363,30 @@ class WebPage(SessionPage, ChromiumPage, BasePage):
selenium_user_agent = self.run_cdp('Runtime.evaluate', expression='navigator.userAgent;')['result']['value']
self.session.headers.update({"User-Agent": selenium_user_agent})
set_session_cookies(self.session, self._get_driver_cookies(as_dict=True))
set_session_cookies(self.session, self._get_driver_cookies(as_dict=False)) # 带域名list
def cookies_to_browser(self):
"""把session对象的cookies复制到浏览器"""
if not self._has_driver:
return
netloc = urlparse(self._browser_url).netloc
if netloc.replace('.', '').isdigit(): # ip
domain = netloc
else: # 域名
u = netloc.split('.')
domain = f'.{u[-2]}.{u[-1]}' if len(u) > 1 else netloc
ex_url = extract(self._session_url)
domain = f'{ex_url.domain}.{ex_url.suffix}' if ex_url.suffix else ex_url.domain
cookies = []
for cookie in super().get_cookies():
for cookie in super().get_cookies(): # 带域名list
if not cookie.get('domain', None):
cookie['domain'] = domain
if domain in cookie['domain']:
cookies.append(cookie)
self.run_cdp_loaded('Network.setCookies', cookies=cookies)
# self.run_cdp_loaded('Network.setCookies', cookies=cookies)
for c in cookies:
try:
self.run_cdp_loaded('Network.setCookie', name=c['name'], value=c['value'], domain=c['domain'])
except Exception as e:
print(e)
def get_cookies(self, as_dict=False, all_domains=False, all_info=False):
"""返回cookies

View File

@ -4,4 +4,5 @@ cssselect
DownloadKit>=0.5.3
FlowViewer>=0.2.1
websocket-client
click
click
tldextract

View File

@ -6,7 +6,7 @@ with open("README.md", "r", encoding='utf-8') as fh:
setup(
name="DrissionPage",
version="3.2.15",
version="3.2.16",
author="g1879",
author_email="g1879@qq.com",
description="Python based web automation tool. It can control the browser and send and receive data packets.",
@ -19,12 +19,13 @@ setup(
packages=find_packages(),
zip_safe=False,
install_requires=[
"lxml",
"requests",
"DownloadKit>=0.5.3",
"FlowViewer",
"websocket-client",
'click~=8.1.3'
'lxml',
'requests',
'DownloadKit>=0.5.3',
'FlowViewer',
'websocket-client',
'click~=8.1.3',
'tldextract'
],
classifiers=[
"Programming Language :: Python :: 3.6",