Merge remote-tracking branch 'origin/master'

This commit is contained in:
kedaji 2023-01-13 14:58:35 +08:00
commit 783761eee9
6 changed files with 77 additions and 35 deletions

View File

@ -5,6 +5,7 @@
"""
from pathlib import Path
from platform import system
from queue import Queue
from re import search
from time import perf_counter, sleep
@ -42,6 +43,7 @@ class ChromiumPage(ChromiumBase):
self._control_session.keep_alive = False
self._alert = Alert()
self._first_run = True
self._download_list = None
# 接管或启动浏览器
if addr_driver_opts is None or isinstance(addr_driver_opts, DriverOptions):
@ -124,6 +126,30 @@ class ChromiumPage(ChromiumBase):
self._window_setter = WindowSetter(self)
return self._window_setter
@property
def download_list(self):
"""以list方式返回被拦截的下载列表"""
if self._download_list is None:
return []
d_list = []
while not self._download_list.empty():
d_list.append(self._download_list.get())
return d_list
def block_download(self, on_off):
"""开始或停止拦截下载 \n
:param on_off: 开始或停止拦截
:return: None
"""
if on_off:
self._tab_obj.Page.downloadWillBegin = self._on_download_begin
self._tab_obj.Browser.setDownloadBehavior(behavior='deny')
# self._tab_obj.Browser.downloadWillBegin = self._on_download_begin
else:
self._tab_obj.Browser.setDownloadBehavior(behavior='default')
self._tab_obj.Page.downloadWillBegin = None
# self._tab_obj.Browser.downloadWillBegin = None
def get_tab(self, tab_id=None):
"""获取一个标签页对象 \n
:param tab_id: 要获取的标签页id为None时获取当前tab
@ -351,6 +377,15 @@ class ChromiumPage(ChromiumBase):
self._alert.response_text = None
self._tab_obj.has_alert = True
def _on_download_begin(self, **kwargs):
if self._download_list is None:
self._download_list = Queue()
gid = kwargs['guid']
self._tab_obj.Browser.cancelDownload(guid=gid)
url = kwargs['url']
name = kwargs['suggestedFilename']
self._download_list.put(item={'url': url, 'name': name})
class Alert(object):
"""用于保存alert信息的类"""

View File

@ -5,6 +5,7 @@
"""
from os import popen
from pathlib import Path
from queue import Queue
from typing import Union, Tuple, List
from .chromium_base import ChromiumBase
@ -24,6 +25,7 @@ class ChromiumPage(ChromiumBase):
self._window_setter: WindowSetter = ...
self._main_tab: str = ...
self._alert: Alert = ...
self._download_list: Queue = ...
def _connect_browser(self,
addr_driver_opts: Union[str, ChromiumDriver, DriverOptions] = None,
@ -46,7 +48,12 @@ class ChromiumPage(ChromiumBase):
def process_id(self) -> Union[None, int]: ...
@property
def set_window(self) -> 'WindowSetter': ...
def set_window(self) -> WindowSetter: ...
@property
def download_list(self) -> list: ...
def block_download(self, on_off: bool) -> None: ...
def get_tab(self, tab_id: str = None) -> ChromiumTab: ...
@ -82,6 +89,8 @@ class ChromiumPage(ChromiumBase):
def _on_alert_open(self, **kwargs): ...
def _on_download_begin(self, **kwargs): ...
class Alert(object):

View File

@ -10,7 +10,6 @@ from re import split, search, sub
from shutil import rmtree
from subprocess import Popen
from time import perf_counter, sleep
from typing import Union
from zipfile import ZipFile
from urllib.parse import urlparse, urljoin, urlunparse
from requests import get as requests_get
@ -494,24 +493,17 @@ def is_js_func(func):
return False
def _port_is_using(ip: str, port: str) -> Union[bool, None]:
def port_is_using(ip, port):
"""检查端口是否被占用 \n
:param ip: 浏览器地址
:param port: 浏览器端口
:return: bool
"""
import socket
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
s.connect((ip, int(port)))
s.shutdown(2)
return True
except socket.error:
return False
finally:
if s:
from socket import socket, AF_INET, SOCK_STREAM
s = socket(AF_INET, SOCK_STREAM)
result = s.connect_ex((ip, int(port)))
s.close()
return True if result == 0 else False
def connect_browser(option):
@ -528,7 +520,7 @@ def connect_browser(option):
if ip not in ('127.0.0.1', 'localhost'):
return None, None
if _port_is_using(ip, port):
if port_is_using(ip, port):
chrome_path = get_exe_from_port(port) if chrome_path == 'chrome' and system_type == 'windows' else chrome_path
return chrome_path, None

View File

@ -54,6 +54,9 @@ def make_absolute_link(link, page: BasePage = None) -> str: ...
def is_js_func(func: str) -> bool: ...
def port_is_using(ip: str, port: str) -> bool: ...
def connect_browser(option: DriverOptions) -> tuple: ...

View File

@ -805,6 +805,9 @@ class DriverOptions(Options):
self.debugger_address = debugger_address
if download_path is not None:
if 'prefs' not in self.experimental_options:
self.experimental_options['prefs'] = {'download.default_directory': str(download_path)}
else:
self.experimental_options['prefs']['download.default_directory'] = str(download_path)
if user_data_path is not None:

View File

@ -177,15 +177,15 @@ class DriverOptions(Options):
def user_data_path(self) -> str: ...
# -------------重写父类方法,实现链式操作-------------
def add_argument(self, argument: str) -> 'DriverOptions': ...
def add_argument(self, argument: str) -> DriverOptions: ...
def set_capability(self, name: str, value: str) -> 'DriverOptions': ...
def set_capability(self, name: str, value: str) -> DriverOptions: ...
def add_extension(self, extension: str) -> 'DriverOptions': ...
def add_extension(self, extension: str) -> DriverOptions: ...
def add_encoded_extension(self, extension: str) -> 'DriverOptions': ...
def add_encoded_extension(self, extension: str) -> DriverOptions: ...
def add_experimental_option(self, name: str, value: Union[str, int, dict, List[str]]) -> 'DriverOptions': ...
def add_experimental_option(self, name: str, value: Union[str, int, dict, List[str]]) -> DriverOptions: ...
# -------------重写父类方法结束-------------
@ -193,29 +193,29 @@ class DriverOptions(Options):
def save_to_default(self) -> str: ...
def remove_argument(self, value: str) -> 'DriverOptions': ...
def remove_argument(self, value: str) -> DriverOptions: ...
def remove_experimental_option(self, key: str) -> 'DriverOptions': ...
def remove_experimental_option(self, key: str) -> DriverOptions: ...
def remove_all_extensions(self) -> 'DriverOptions': ...
def remove_all_extensions(self) -> DriverOptions: ...
def set_argument(self, arg: str, value: Union[bool, str]) -> 'DriverOptions': ...
def set_argument(self, arg: str, value: Union[bool, str]) -> DriverOptions: ...
def set_timeouts(self, implicit: float = None, pageLoad: float = None, script: float = None) -> 'DriverOptions': ...
def set_timeouts(self, implicit: float = None, pageLoad: float = None, script: float = None) -> DriverOptions: ...
def set_headless(self, on_off: bool = True) -> 'DriverOptions': ...
def set_headless(self, on_off: bool = True) -> DriverOptions: ...
def set_no_imgs(self, on_off: bool = True) -> 'DriverOptions': ...
def set_no_imgs(self, on_off: bool = True) -> DriverOptions: ...
def set_no_js(self, on_off: bool = True) -> 'DriverOptions': ...
def set_no_js(self, on_off: bool = True) -> DriverOptions: ...
def set_mute(self, on_off: bool = True) -> 'DriverOptions': ...
def set_mute(self, on_off: bool = True) -> DriverOptions: ...
def set_user_agent(self, user_agent: str) -> 'DriverOptions': ...
def set_user_agent(self, user_agent: str) -> DriverOptions: ...
def set_proxy(self, proxy: str) -> 'DriverOptions': ...
def set_proxy(self, proxy: str) -> DriverOptions: ...
def set_page_load_strategy(self, value: str) -> 'DriverOptions': ...
def set_page_load_strategy(self, value: str) -> DriverOptions: ...
def set_multiple_open_browser(self, default_user_data_parent: str=None) -> 'DriverOptions': ...
@ -227,7 +227,7 @@ class DriverOptions(Options):
debugger_address: str = None,
download_path: str = None,
user_data_path: str = None,
cache_path: str = None) -> 'DriverOptions': ...
cache_path: str = None) -> DriverOptions: ...
def as_dict(self) -> dict: ...