g1879 4407a0daa1 一些修改(+)
增加Settings.browser_connect_timeout属性;
优化关闭标签页逻辑;
修复下载路径设置问题;
点击产生的新标签页下载任务可用原标签页等待;
remove_attr()返回元素自身;
select各种方法返回元素本身,找不到项时报错;
2024-10-21 07:10:10 +08:00

478 lines
16 KiB
Python

# -*- coding:utf-8 -*-
"""
@Author : g1879
@Contact : g1879@qq.com
@Copyright: (c) 2024 by g1879, Inc. All Rights Reserved.
@License : BSD 3-Clause.
"""
from pathlib import Path
from time import sleep
from requests.structures import CaseInsensitiveDict
from .cookies_setter import (SessionCookiesSetter, CookiesSetter, WebPageCookiesSetter, BrowserCookiesSetter,
MixTabCookiesSetter)
from .._functions.tools import show_or_hide_browser
from .._functions.web import format_headers
from ..errors import ElementLostError, JavaScriptError
class BaseSetter(object):
def __init__(self, owner):
self._owner = owner
def NoneElement_value(self, value=None, on_off=True):
self._owner._none_ele_return_value = on_off
self._owner._none_ele_value = value
def retry_times(self, times):
self._owner.retry_times = times
def retry_interval(self, interval):
self._owner.retry_interval = interval
def download_path(self, path):
if path is None:
path = '.'
self._owner._download_path = str(Path(path).absolute())
class SessionPageSetter(BaseSetter):
def __init__(self, owner):
super().__init__(owner)
self._cookies_setter = None
@property
def cookies(self):
if self._cookies_setter is None:
self._cookies_setter = SessionCookiesSetter(self._owner)
return self._cookies_setter
def download_path(self, path):
super().download_path(path)
if self._owner._DownloadKit:
self._owner._DownloadKit.set.save_path(self._owner._download_path)
def timeout(self, second):
self._owner._timeout = second
def encoding(self, encoding, set_all=True):
if set_all:
self._owner._encoding = encoding if encoding else None
if self._owner.response:
self._owner.response.encoding = encoding
def headers(self, headers):
self._owner._headers = CaseInsensitiveDict(format_headers(headers))
def header(self, name, value):
self._owner._headers[name] = value
def user_agent(self, ua):
self._owner._headers['user-agent'] = ua
def proxies(self, http=None, https=None):
self._owner.session.proxies = {'http': http, 'https': https}
def auth(self, auth):
self._owner.session.auth = auth
def hooks(self, hooks):
self._owner.session.hooks = hooks
def params(self, params):
self._owner.session.params = params
def verify(self, on_off):
self._owner.session.verify = on_off
def cert(self, cert):
self._owner.session.cert = cert
def stream(self, on_off):
self._owner.session.stream = on_off
def trust_env(self, on_off):
self._owner.session.trust_env = on_off
def max_redirects(self, times):
self._owner.session.max_redirects = times
def add_adapter(self, url, adapter):
self._owner.session.mount(url, adapter)
class BrowserBaseSetter(BaseSetter):
def __init__(self, owner):
super().__init__(owner)
self._cookies_setter = None
@property
def load_mode(self):
return LoadMode(self._owner)
def timeouts(self, base=None, page_load=None, script=None):
if base is not None:
self._owner.timeouts.base = base
if page_load is not None:
self._owner.timeouts.page_load = page_load
if script is not None:
self._owner.timeouts.script = script
class BrowserSetter(BrowserBaseSetter):
@property
def cookies(self):
if self._cookies_setter is None:
self._cookies_setter = BrowserCookiesSetter(self._owner)
return self._cookies_setter
def auto_handle_alert(self, on_off=True, accept=True):
self._owner._auto_handle_alert = None if on_off is None else accept if on_off else 'close'
def download_path(self, path):
super().download_path(path)
self._owner._dl_mgr.set_path('browser', self._owner._download_path)
def download_file_name(self, name=None, suffix=None):
self._owner._dl_mgr.set_rename('browser', name, suffix)
def when_download_file_exists(self, mode):
types = {'rename': 'rename', 'overwrite': 'overwrite', 'skip': 'skip', 'r': 'rename', 'o': 'overwrite',
's': 'skip'}
mode = types.get(mode, mode)
if mode not in types:
raise ValueError(f'''mode参数只能是 '{"', '".join(types.keys())}' 之一,现在是:{mode}''')
self._owner._dl_mgr.set_file_exists('browser', mode)
class ChromiumBaseSetter(BrowserBaseSetter):
@property
def scroll(self):
return PageScrollSetter(self._owner.scroll)
@property
def cookies(self):
if self._cookies_setter is None:
self._cookies_setter = CookiesSetter(self._owner)
return self._cookies_setter
def headers(self, headers):
self._owner._run_cdp('Network.enable')
self._owner._run_cdp('Network.setExtraHTTPHeaders', headers=format_headers(headers))
def user_agent(self, ua, platform=None):
keys = {'userAgent': ua}
if platform:
keys['platform'] = platform
self._owner._run_cdp('Emulation.setUserAgentOverride', **keys)
def session_storage(self, item, value):
self._owner._run_cdp_loaded('DOMStorage.enable')
i = self._owner._run_cdp('Storage.getStorageKeyForFrame', frameId=self._owner._frame_id)['storageKey']
if value is False:
self._owner._run_cdp('DOMStorage.removeDOMStorageItem',
storageId={'storageKey': i, 'isLocalStorage': False}, key=item)
else:
self._owner._run_cdp('DOMStorage.setDOMStorageItem', storageId={'storageKey': i, 'isLocalStorage': False},
key=item, value=value)
self._owner._run_cdp_loaded('DOMStorage.disable')
def local_storage(self, item, value):
self._owner._run_cdp_loaded('DOMStorage.enable')
i = self._owner._run_cdp('Storage.getStorageKeyForFrame', frameId=self._owner._frame_id)['storageKey']
if value is False:
self._owner._run_cdp('DOMStorage.removeDOMStorageItem',
storageId={'storageKey': i, 'isLocalStorage': True}, key=item)
else:
self._owner._run_cdp('DOMStorage.setDOMStorageItem', storageId={'storageKey': i, 'isLocalStorage': True},
key=item, value=value)
self._owner._run_cdp_loaded('DOMStorage.disable')
def upload_files(self, files):
if not self._owner._upload_list:
self._owner.driver.set_callback('Page.fileChooserOpened', self._owner._onFileChooserOpened)
self._owner._run_cdp('Page.setInterceptFileChooserDialog', enabled=True)
if isinstance(files, str):
files = files.split('\n')
elif isinstance(files, Path):
files = (files,)
self._owner._upload_list = [str(Path(i).absolute()) for i in files]
def auto_handle_alert(self, on_off=True, accept=True):
self._owner._alert.auto = None if on_off is None else accept if on_off else 'close'
def blocked_urls(self, urls):
if not urls:
urls = []
elif isinstance(urls, str):
urls = (urls,)
if not isinstance(urls, (list, tuple)):
raise TypeError('urls需传入str、list或tuple类型。')
self._owner._run_cdp('Network.enable')
self._owner._run_cdp('Network.setBlockedURLs', urls=urls)
class TabSetter(ChromiumBaseSetter):
def __init__(self, owner):
super().__init__(owner)
@property
def window(self):
return WindowSetter(self._owner)
def download_path(self, path):
super().download_path(path)
self._owner.browser._dl_mgr.set_path(self._owner, self._owner._download_path)
if self._owner._DownloadKit:
self._owner._DownloadKit.set.save_path(self._owner._download_path)
def download_file_name(self, name=None, suffix=None):
self._owner.browser._dl_mgr.set_rename(self._owner.tab_id, name, suffix)
def when_download_file_exists(self, mode):
types = {'rename': 'rename', 'overwrite': 'overwrite', 'skip': 'skip',
'r': 'rename', 'o': 'overwrite', 's': 'skip'}
mode = types.get(mode, mode)
if mode not in types:
raise ValueError(f'''mode参数只能是 '{"', '".join(types.keys())}' 之一,现在是:{mode}''')
self._owner.browser._dl_mgr.set_file_exists(self._owner.tab_id, mode)
def activate(self):
self._owner.browser.activate_tab(self._owner.tab_id)
class ChromiumPageSetter(TabSetter):
def NoneElement_value(self, value=None, on_off=True):
super().NoneElement_value(value, on_off)
self._owner.browser._none_ele_return_value = on_off
self._owner.browser._none_ele_value = value
def retry_times(self, times):
super().retry_times(times)
self._owner.browser.retry_times = times
def retry_interval(self, interval):
super().retry_interval(interval)
self._owner.browser.retry_interval = interval
def download_path(self, path):
if path is None:
path = '.'
self._owner._download_path = str(Path(path).absolute())
self._owner.browser.set.download_path(path)
if self._owner._DownloadKit:
self._owner._DownloadKit.set.save_path(path)
def download_file_name(self, name=None, suffix=None):
# super().download_file_name(name=name, suffix=suffix)
self._owner.browser.set.download_file_name(name, suffix)
class WebPageSetter(ChromiumPageSetter):
def __init__(self, owner):
super().__init__(owner)
self._session_setter = SessionPageSetter(self._owner)
self._chromium_setter = ChromiumPageSetter(self._owner)
@property
def cookies(self):
if self._cookies_setter is None:
self._cookies_setter = WebPageCookiesSetter(self._owner)
return self._cookies_setter
def headers(self, headers):
if self._owner.mode == 's':
self._session_setter.headers(headers)
else:
self._chromium_setter.headers(headers)
def user_agent(self, ua, platform=None):
if self._owner.mode == 's':
self._session_setter.user_agent(ua)
else:
self._chromium_setter.user_agent(ua, platform)
class MixTabSetter(TabSetter):
def __init__(self, owner):
super().__init__(owner)
self._session_setter = SessionPageSetter(self._owner)
self._chromium_setter = ChromiumBaseSetter(self._owner)
@property
def cookies(self):
if self._cookies_setter is None:
self._cookies_setter = MixTabCookiesSetter(self._owner)
return self._cookies_setter
def headers(self, headers):
if self._owner._session:
self._session_setter.headers(headers)
if self._owner._driver and self._owner._driver.is_running:
self._chromium_setter.headers(headers)
def user_agent(self, ua, platform=None):
if self._owner._session:
self._session_setter.user_agent(ua)
if self._owner._driver and self._owner._driver.is_running:
self._chromium_setter.user_agent(ua, platform)
def timeouts(self, base=None, page_load=None, script=None):
super().timeouts(base=base, page_load=page_load, script=script)
if base is not None:
self._owner._timeout = base
class ChromiumElementSetter(object):
def __init__(self, ele):
self._ele = ele
def attr(self, name, value=''):
try:
self._ele.owner._run_cdp('DOM.setAttributeValue',
nodeId=self._ele._node_id, name=name, value=str(value))
except ElementLostError:
self._ele._refresh_id()
self._ele.owner._run_cdp('DOM.setAttributeValue',
nodeId=self._ele._node_id, name=name, value=str(value))
def property(self, name, value):
value = value.replace('"', r'\"')
self._ele._run_js(f'this.{name}="{value}";')
def style(self, name, value):
try:
self._ele._run_js(f'this.style.{name}="{value}";')
except JavaScriptError:
raise ValueError(f'设置失败,请检查属性名{name}')
def innerHTML(self, html):
self.property('innerHTML', html)
def value(self, value):
self.property('value', value)
class ChromiumFrameSetter(ChromiumBaseSetter):
def attr(self, name, value):
self._owner.frame_ele.set.attr(name, value)
def property(self, name, value):
self._owner.frame_ele.set.property(name=name, value=value)
def style(self, name, value):
self._owner.frame_ele.set.style(name=name, value=value)
class LoadMode(object):
def __init__(self, owner):
self._owner = owner
def __call__(self, value):
if value.lower() not in ('normal', 'eager', 'none'):
raise ValueError("只能选择 'normal', 'eager', 'none'")
self._owner._load_mode = value
if self._owner._type in ('ChromiumPage', 'WebPage'):
self._owner.browser._load_mode = value
def normal(self):
self.__call__('normal')
def eager(self):
self.__call__('eager')
def none(self):
self.__call__('none')
class PageScrollSetter(object):
def __init__(self, scroll):
self._scroll = scroll
def wait_complete(self, on_off=True):
if not isinstance(on_off, bool):
raise TypeError('on_off必须为bool。')
self._scroll._wait_complete = on_off
def smooth(self, on_off=True):
if not isinstance(on_off, bool):
raise TypeError('on_off必须为bool。')
b = 'smooth' if on_off else 'auto'
self._scroll._owner._run_js(f'document.documentElement.style.setProperty("scroll-behavior","{b}");')
self._scroll._wait_complete = on_off
class WindowSetter(object):
def __init__(self, owner):
self._owner = owner
self._window_id = self._get_info()['windowId']
def max(self):
s = self._get_info()['bounds']['windowState']
if s in ('fullscreen', 'minimized'):
self._perform({'windowState': 'normal'})
self._perform({'windowState': 'maximized'})
def mini(self):
s = self._get_info()['bounds']['windowState']
if s == 'fullscreen':
self._perform({'windowState': 'normal'})
self._perform({'windowState': 'minimized'})
def full(self):
s = self._get_info()['bounds']['windowState']
if s == 'minimized':
self._perform({'windowState': 'normal'})
self._perform({'windowState': 'fullscreen'})
def normal(self):
s = self._get_info()['bounds']['windowState']
if s == 'fullscreen':
self._perform({'windowState': 'normal'})
self._perform({'windowState': 'normal'})
def size(self, width=None, height=None):
if width or height:
s = self._get_info()['bounds']['windowState']
if s != 'normal':
self._perform({'windowState': 'normal'})
info = self._get_info()['bounds']
width = width - 16 if width else info['width']
height = height + 7 if height else info['height']
self._perform({'width': width, 'height': height})
def location(self, x=None, y=None):
if x is not None or y is not None:
self.normal()
info = self._get_info()['bounds']
x = x if x is not None else info['left']
y = y if y is not None else info['top']
self._perform({'left': x - 8, 'top': y})
def hide(self):
show_or_hide_browser(self._owner, hide=True)
def show(self):
show_or_hide_browser(self._owner, hide=False)
def _get_info(self):
for _ in range(50):
try:
return self._owner._run_cdp('Browser.getWindowForTarget')
except:
sleep(.1)
raise RuntimeError('获取窗口信息失败。')
def _perform(self, bounds):
try:
self._owner._run_cdp('Browser.setWindowBounds', windowId=self._window_id, bounds=bounds)
except:
raise RuntimeError('浏览器全屏或最小化状态时请先调用set.window.normal()恢复正常状态。')