添加拦截浏览器下载功能

This commit is contained in:
g1879 2023-01-12 23:00:31 +08:00
parent edab95d554
commit 8f1048da1d
4 changed files with 67 additions and 20 deletions

View File

@ -5,6 +5,7 @@
""" """
from pathlib import Path from pathlib import Path
from platform import system from platform import system
from queue import Queue
from re import search from re import search
from time import perf_counter, sleep from time import perf_counter, sleep
@ -42,6 +43,7 @@ class ChromiumPage(ChromiumBase):
self._control_session.keep_alive = False self._control_session.keep_alive = False
self._alert = Alert() self._alert = Alert()
self._first_run = True self._first_run = True
self._download_list = None
# 接管或启动浏览器 # 接管或启动浏览器
if addr_driver_opts is None or isinstance(addr_driver_opts, DriverOptions): if addr_driver_opts is None or isinstance(addr_driver_opts, DriverOptions):
@ -124,6 +126,30 @@ class ChromiumPage(ChromiumBase):
self._window_setter = WindowSetter(self) self._window_setter = WindowSetter(self)
return self._window_setter return self._window_setter
@property
def download_list(self):
"""以list方式返回被拦截的下载列表"""
if self._download_list is None:
return []
d_list = []
while not self._download_list.empty():
d_list.append(self._download_list.get())
return d_list
def block_download(self, on_off):
"""开始或停止拦截下载 \n
:param on_off: 开始或停止拦截
:return: None
"""
if on_off:
self._tab_obj.Page.downloadWillBegin = self._on_download_begin
self._tab_obj.Browser.setDownloadBehavior(behavior='deny')
# self._tab_obj.Browser.downloadWillBegin = self._on_download_begin
else:
self._tab_obj.Browser.setDownloadBehavior(behavior='default')
self._tab_obj.Page.downloadWillBegin = None
# self._tab_obj.Browser.downloadWillBegin = None
def get_tab(self, tab_id=None): def get_tab(self, tab_id=None):
"""获取一个标签页对象 \n """获取一个标签页对象 \n
:param tab_id: 要获取的标签页id为None时获取当前tab :param tab_id: 要获取的标签页id为None时获取当前tab
@ -351,6 +377,15 @@ class ChromiumPage(ChromiumBase):
self._alert.response_text = None self._alert.response_text = None
self._tab_obj.has_alert = True self._tab_obj.has_alert = True
def _on_download_begin(self, **kwargs):
if self._download_list is None:
self._download_list = Queue()
gid = kwargs['guid']
self._tab_obj.Browser.cancelDownload(guid=gid)
url = kwargs['url']
name = kwargs['suggestedFilename']
self._download_list.put(item={'url': url, 'name': name})
class Alert(object): class Alert(object):
"""用于保存alert信息的类""" """用于保存alert信息的类"""

View File

@ -5,6 +5,7 @@
""" """
from os import popen from os import popen
from pathlib import Path from pathlib import Path
from queue import Queue
from typing import Union, Tuple, List from typing import Union, Tuple, List
from .chromium_base import ChromiumBase from .chromium_base import ChromiumBase
@ -24,6 +25,7 @@ class ChromiumPage(ChromiumBase):
self._window_setter: WindowSetter = ... self._window_setter: WindowSetter = ...
self._main_tab: str = ... self._main_tab: str = ...
self._alert: Alert = ... self._alert: Alert = ...
self._download_list: Queue = ...
def _connect_browser(self, def _connect_browser(self,
addr_driver_opts: Union[str, ChromiumDriver, DriverOptions] = None, addr_driver_opts: Union[str, ChromiumDriver, DriverOptions] = None,
@ -46,7 +48,12 @@ class ChromiumPage(ChromiumBase):
def process_id(self) -> Union[None, int]: ... def process_id(self) -> Union[None, int]: ...
@property @property
def set_window(self) -> 'WindowSetter': ... def set_window(self) -> WindowSetter: ...
@property
def download_list(self) -> list: ...
def block_download(self, on_off: bool) -> None: ...
def get_tab(self, tab_id: str = None) -> ChromiumTab: ... def get_tab(self, tab_id: str = None) -> ChromiumTab: ...
@ -82,6 +89,8 @@ class ChromiumPage(ChromiumBase):
def _on_alert_open(self, **kwargs): ... def _on_alert_open(self, **kwargs): ...
def _on_download_begin(self, **kwargs): ...
class Alert(object): class Alert(object):

View File

@ -761,7 +761,10 @@ class DriverOptions(Options):
self.debugger_address = debugger_address self.debugger_address = debugger_address
if download_path is not None: if download_path is not None:
self.experimental_options['prefs']['download.default_directory'] = str(download_path) if 'prefs' not in self.experimental_options:
self.experimental_options['prefs'] = {'download.default_directory': str(download_path)}
else:
self.experimental_options['prefs']['download.default_directory'] = str(download_path)
if user_data_path is not None: if user_data_path is not None:
self.set_argument('--user-data-dir', str(user_data_path)) self.set_argument('--user-data-dir', str(user_data_path))

View File

@ -175,15 +175,15 @@ class DriverOptions(Options):
def user_data_path(self) -> str: ... def user_data_path(self) -> str: ...
# -------------重写父类方法,实现链式操作------------- # -------------重写父类方法,实现链式操作-------------
def add_argument(self, argument: str) -> 'DriverOptions': ... def add_argument(self, argument: str) -> DriverOptions: ...
def set_capability(self, name: str, value: str) -> 'DriverOptions': ... def set_capability(self, name: str, value: str) -> DriverOptions: ...
def add_extension(self, extension: str) -> 'DriverOptions': ... def add_extension(self, extension: str) -> DriverOptions: ...
def add_encoded_extension(self, extension: str) -> 'DriverOptions': ... def add_encoded_extension(self, extension: str) -> DriverOptions: ...
def add_experimental_option(self, name: str, value: Union[str, int, dict, List[str]]) -> 'DriverOptions': ... def add_experimental_option(self, name: str, value: Union[str, int, dict, List[str]]) -> DriverOptions: ...
# -------------重写父类方法结束------------- # -------------重写父类方法结束-------------
@ -191,29 +191,29 @@ class DriverOptions(Options):
def save_to_default(self) -> str: ... def save_to_default(self) -> str: ...
def remove_argument(self, value: str) -> 'DriverOptions': ... def remove_argument(self, value: str) -> DriverOptions: ...
def remove_experimental_option(self, key: str) -> 'DriverOptions': ... def remove_experimental_option(self, key: str) -> DriverOptions: ...
def remove_all_extensions(self) -> 'DriverOptions': ... def remove_all_extensions(self) -> DriverOptions: ...
def set_argument(self, arg: str, value: Union[bool, str]) -> 'DriverOptions': ... def set_argument(self, arg: str, value: Union[bool, str]) -> DriverOptions: ...
def set_timeouts(self, implicit: float = None, pageLoad: float = None, script: float = None) -> 'DriverOptions': ... def set_timeouts(self, implicit: float = None, pageLoad: float = None, script: float = None) -> DriverOptions: ...
def set_headless(self, on_off: bool = True) -> 'DriverOptions': ... def set_headless(self, on_off: bool = True) -> DriverOptions: ...
def set_no_imgs(self, on_off: bool = True) -> 'DriverOptions': ... def set_no_imgs(self, on_off: bool = True) -> DriverOptions: ...
def set_no_js(self, on_off: bool = True) -> 'DriverOptions': ... def set_no_js(self, on_off: bool = True) -> DriverOptions: ...
def set_mute(self, on_off: bool = True) -> 'DriverOptions': ... def set_mute(self, on_off: bool = True) -> DriverOptions: ...
def set_user_agent(self, user_agent: str) -> 'DriverOptions': ... def set_user_agent(self, user_agent: str) -> DriverOptions: ...
def set_proxy(self, proxy: str) -> 'DriverOptions': ... def set_proxy(self, proxy: str) -> DriverOptions: ...
def set_page_load_strategy(self, value: str) -> 'DriverOptions': ... def set_page_load_strategy(self, value: str) -> DriverOptions: ...
def set_paths(self, def set_paths(self,
driver_path: Union[str, Path] = None, driver_path: Union[str, Path] = None,
@ -223,7 +223,7 @@ class DriverOptions(Options):
debugger_address: str = None, debugger_address: str = None,
download_path: str = None, download_path: str = None,
user_data_path: str = None, user_data_path: str = None,
cache_path: str = None) -> 'DriverOptions': ... cache_path: str = None) -> DriverOptions: ...
def as_dict(self) -> dict: ... def as_dict(self) -> dict: ...