From 8f1048da1d3ef530f1e35becdb5aa84893502c81 Mon Sep 17 00:00:00 2001 From: g1879 Date: Thu, 12 Jan 2023 23:00:31 +0800 Subject: [PATCH 1/2] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E6=8B=A6=E6=88=AA?= =?UTF-8?q?=E6=B5=8F=E8=A7=88=E5=99=A8=E4=B8=8B=E8=BD=BD=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- DrissionPage/chromium_page.py | 35 +++++++++++++++++++++++++++++++++ DrissionPage/chromium_page.pyi | 11 ++++++++++- DrissionPage/config.py | 5 ++++- DrissionPage/config.pyi | 36 +++++++++++++++++----------------- 4 files changed, 67 insertions(+), 20 deletions(-) diff --git a/DrissionPage/chromium_page.py b/DrissionPage/chromium_page.py index b8409b7..76a818b 100644 --- a/DrissionPage/chromium_page.py +++ b/DrissionPage/chromium_page.py @@ -5,6 +5,7 @@ """ from pathlib import Path from platform import system +from queue import Queue from re import search from time import perf_counter, sleep @@ -42,6 +43,7 @@ class ChromiumPage(ChromiumBase): self._control_session.keep_alive = False self._alert = Alert() self._first_run = True + self._download_list = None # 接管或启动浏览器 if addr_driver_opts is None or isinstance(addr_driver_opts, DriverOptions): @@ -124,6 +126,30 @@ class ChromiumPage(ChromiumBase): self._window_setter = WindowSetter(self) return self._window_setter + @property + def download_list(self): + """以list方式返回被拦截的下载列表""" + if self._download_list is None: + return [] + d_list = [] + while not self._download_list.empty(): + d_list.append(self._download_list.get()) + return d_list + + def block_download(self, on_off): + """开始或停止拦截下载 \n + :param on_off: 开始或停止拦截 + :return: None + """ + if on_off: + self._tab_obj.Page.downloadWillBegin = self._on_download_begin + self._tab_obj.Browser.setDownloadBehavior(behavior='deny') + # self._tab_obj.Browser.downloadWillBegin = self._on_download_begin + else: + self._tab_obj.Browser.setDownloadBehavior(behavior='default') + self._tab_obj.Page.downloadWillBegin = None + # self._tab_obj.Browser.downloadWillBegin = None + def get_tab(self, tab_id=None): """获取一个标签页对象 \n :param tab_id: 要获取的标签页id,为None时获取当前tab @@ -351,6 +377,15 @@ class ChromiumPage(ChromiumBase): self._alert.response_text = None self._tab_obj.has_alert = True + def _on_download_begin(self, **kwargs): + if self._download_list is None: + self._download_list = Queue() + gid = kwargs['guid'] + self._tab_obj.Browser.cancelDownload(guid=gid) + url = kwargs['url'] + name = kwargs['suggestedFilename'] + self._download_list.put(item={'url': url, 'name': name}) + class Alert(object): """用于保存alert信息的类""" diff --git a/DrissionPage/chromium_page.pyi b/DrissionPage/chromium_page.pyi index 5880082..dd31a3b 100644 --- a/DrissionPage/chromium_page.pyi +++ b/DrissionPage/chromium_page.pyi @@ -5,6 +5,7 @@ """ from os import popen from pathlib import Path +from queue import Queue from typing import Union, Tuple, List from .chromium_base import ChromiumBase @@ -24,6 +25,7 @@ class ChromiumPage(ChromiumBase): self._window_setter: WindowSetter = ... self._main_tab: str = ... self._alert: Alert = ... + self._download_list: Queue = ... def _connect_browser(self, addr_driver_opts: Union[str, ChromiumDriver, DriverOptions] = None, @@ -46,7 +48,12 @@ class ChromiumPage(ChromiumBase): def process_id(self) -> Union[None, int]: ... @property - def set_window(self) -> 'WindowSetter': ... + def set_window(self) -> WindowSetter: ... + + @property + def download_list(self) -> list: ... + + def block_download(self, on_off: bool) -> None: ... def get_tab(self, tab_id: str = None) -> ChromiumTab: ... @@ -82,6 +89,8 @@ class ChromiumPage(ChromiumBase): def _on_alert_open(self, **kwargs): ... + def _on_download_begin(self, **kwargs): ... + class Alert(object): diff --git a/DrissionPage/config.py b/DrissionPage/config.py index b332296..7aaab16 100644 --- a/DrissionPage/config.py +++ b/DrissionPage/config.py @@ -761,7 +761,10 @@ class DriverOptions(Options): self.debugger_address = debugger_address if download_path is not None: - self.experimental_options['prefs']['download.default_directory'] = str(download_path) + if 'prefs' not in self.experimental_options: + self.experimental_options['prefs'] = {'download.default_directory': str(download_path)} + else: + self.experimental_options['prefs']['download.default_directory'] = str(download_path) if user_data_path is not None: self.set_argument('--user-data-dir', str(user_data_path)) diff --git a/DrissionPage/config.pyi b/DrissionPage/config.pyi index 4319af3..5c6bc54 100644 --- a/DrissionPage/config.pyi +++ b/DrissionPage/config.pyi @@ -175,15 +175,15 @@ class DriverOptions(Options): def user_data_path(self) -> str: ... # -------------重写父类方法,实现链式操作------------- - def add_argument(self, argument: str) -> 'DriverOptions': ... + def add_argument(self, argument: str) -> DriverOptions: ... - def set_capability(self, name: str, value: str) -> 'DriverOptions': ... + def set_capability(self, name: str, value: str) -> DriverOptions: ... - def add_extension(self, extension: str) -> 'DriverOptions': ... + def add_extension(self, extension: str) -> DriverOptions: ... - def add_encoded_extension(self, extension: str) -> 'DriverOptions': ... + def add_encoded_extension(self, extension: str) -> DriverOptions: ... - def add_experimental_option(self, name: str, value: Union[str, int, dict, List[str]]) -> 'DriverOptions': ... + def add_experimental_option(self, name: str, value: Union[str, int, dict, List[str]]) -> DriverOptions: ... # -------------重写父类方法结束------------- @@ -191,29 +191,29 @@ class DriverOptions(Options): def save_to_default(self) -> str: ... - def remove_argument(self, value: str) -> 'DriverOptions': ... + def remove_argument(self, value: str) -> DriverOptions: ... - def remove_experimental_option(self, key: str) -> 'DriverOptions': ... + def remove_experimental_option(self, key: str) -> DriverOptions: ... - def remove_all_extensions(self) -> 'DriverOptions': ... + def remove_all_extensions(self) -> DriverOptions: ... - def set_argument(self, arg: str, value: Union[bool, str]) -> 'DriverOptions': ... + def set_argument(self, arg: str, value: Union[bool, str]) -> DriverOptions: ... - def set_timeouts(self, implicit: float = None, pageLoad: float = None, script: float = None) -> 'DriverOptions': ... + def set_timeouts(self, implicit: float = None, pageLoad: float = None, script: float = None) -> DriverOptions: ... - def set_headless(self, on_off: bool = True) -> 'DriverOptions': ... + def set_headless(self, on_off: bool = True) -> DriverOptions: ... - def set_no_imgs(self, on_off: bool = True) -> 'DriverOptions': ... + def set_no_imgs(self, on_off: bool = True) -> DriverOptions: ... - def set_no_js(self, on_off: bool = True) -> 'DriverOptions': ... + def set_no_js(self, on_off: bool = True) -> DriverOptions: ... - def set_mute(self, on_off: bool = True) -> 'DriverOptions': ... + def set_mute(self, on_off: bool = True) -> DriverOptions: ... - def set_user_agent(self, user_agent: str) -> 'DriverOptions': ... + def set_user_agent(self, user_agent: str) -> DriverOptions: ... - def set_proxy(self, proxy: str) -> 'DriverOptions': ... + def set_proxy(self, proxy: str) -> DriverOptions: ... - def set_page_load_strategy(self, value: str) -> 'DriverOptions': ... + def set_page_load_strategy(self, value: str) -> DriverOptions: ... def set_paths(self, driver_path: Union[str, Path] = None, @@ -223,7 +223,7 @@ class DriverOptions(Options): debugger_address: str = None, download_path: str = None, user_data_path: str = None, - cache_path: str = None) -> 'DriverOptions': ... + cache_path: str = None) -> DriverOptions: ... def as_dict(self) -> dict: ... From 0b60c6c56154228dfbc748c6f825123c388ff4de Mon Sep 17 00:00:00 2001 From: g1879 Date: Fri, 13 Jan 2023 11:36:24 +0800 Subject: [PATCH 2/2] =?UTF-8?q?=E4=BF=AE=E6=94=B9port=5Fis=5Fusing()?= =?UTF-8?q?=E6=96=B9=E6=B3=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- DrissionPage/common.py | 22 +++++++--------------- DrissionPage/common.pyi | 3 +++ 2 files changed, 10 insertions(+), 15 deletions(-) diff --git a/DrissionPage/common.py b/DrissionPage/common.py index 09d1bcb..f62d962 100644 --- a/DrissionPage/common.py +++ b/DrissionPage/common.py @@ -10,7 +10,6 @@ from re import split, search, sub from shutil import rmtree from subprocess import Popen from time import perf_counter, sleep -from typing import Union from zipfile import ZipFile from urllib.parse import urlparse, urljoin, urlunparse from requests import get as requests_get @@ -494,24 +493,17 @@ def is_js_func(func): return False -def _port_is_using(ip: str, port: str) -> Union[bool, None]: +def port_is_using(ip, port): """检查端口是否被占用 \n :param ip: 浏览器地址 :param port: 浏览器端口 :return: bool """ - import socket - s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - - try: - s.connect((ip, int(port))) - s.shutdown(2) - return True - except socket.error: - return False - finally: - if s: - s.close() + from socket import socket, AF_INET, SOCK_STREAM + s = socket(AF_INET, SOCK_STREAM) + result = s.connect_ex((ip, int(port))) + s.close() + return True if result == 0 else False def connect_browser(option): @@ -528,7 +520,7 @@ def connect_browser(option): if ip not in ('127.0.0.1', 'localhost'): return None, None - if _port_is_using(ip, port): + if port_is_using(ip, port): chrome_path = get_exe_from_port(port) if chrome_path == 'chrome' and system_type == 'windows' else chrome_path return chrome_path, None diff --git a/DrissionPage/common.pyi b/DrissionPage/common.pyi index 6d62a4d..4d9551e 100644 --- a/DrissionPage/common.pyi +++ b/DrissionPage/common.pyi @@ -54,6 +54,9 @@ def make_absolute_link(link, page: BasePage = None) -> str: ... def is_js_func(func: str) -> bool: ... +def port_is_using(ip: str, port: str) -> bool: ... + + def connect_browser(option: DriverOptions) -> tuple: ...