增加page.wait.ele_load();new_tab()默认不切换;所有BasePage支持download();未完成下载设置改造

This commit is contained in:
g1879 2023-05-17 00:23:07 +08:00
parent d5d5540490
commit 9774e3d389
6 changed files with 463 additions and 386 deletions

View File

@ -10,6 +10,7 @@ from pathlib import Path
from threading import Thread from threading import Thread
from time import perf_counter, sleep, time from time import perf_counter, sleep, time
from DownloadKit import DownloadKit
from requests import Session from requests import Session
from .base import BasePage from .base import BasePage
@ -18,11 +19,12 @@ from .chromium_element import ChromiumScroll, ChromiumElement, run_js, make_chro
from .commons.constants import HANDLE_ALERT_METHOD, ERROR, NoneElement from .commons.constants import HANDLE_ALERT_METHOD, ERROR, NoneElement
from .commons.locator import get_loc from .commons.locator import get_loc
from .commons.tools import get_usable_path, clean_folder from .commons.tools import get_usable_path, clean_folder
from .commons.web import set_browser_cookies from .commons.web import set_browser_cookies, set_session_cookies
from .errors import ContextLossError, ElementLossError, AlertExistsError, CDPError, TabClosedError, \ from .errors import ContextLossError, ElementLossError, AlertExistsError, CDPError, TabClosedError, \
NoRectError, BrowserConnectError, GetDocumentError NoRectError, BrowserConnectError, GetDocumentError
from .network_listener import NetworkListener from .network_listener import NetworkListener
from .session_element import make_session_ele from .session_element import make_session_ele
from .session_page import DownloadSetter
class ChromiumBase(BasePage): class ChromiumBase(BasePage):
@ -42,6 +44,8 @@ class ChromiumBase(BasePage):
self._set = None self._set = None
self._screencast = None self._screencast = None
self._listener = None self._listener = None
self._download_set = None
self._download_path = None
if isinstance(address, int) or (isinstance(address, str) and address.isdigit()): if isinstance(address, int) or (isinstance(address, str) and address.isdigit()):
address = f'127.0.0.1:{address}' address = f'127.0.0.1:{address}'
@ -375,6 +379,24 @@ class ChromiumBase(BasePage):
self._listener = NetworkListener(self) self._listener = NetworkListener(self)
return self._listener return self._listener
@property
def download_path(self):
"""返回默认下载路径"""
p = self._download_path or ''
return str(Path(p).absolute())
@property
def download_set(self):
"""返回用于设置下载参数的对象"""
if self._download_set is None:
self._download_set = BaseDownloadSetter(self)
return self._download_set
@property
def download(self):
"""返回下载器对象"""
return self.download_set.DownloadKit
def run_cdp(self, cmd, **cmd_args): def run_cdp(self, cmd, **cmd_args):
"""执行Chrome DevTools Protocol语句 """执行Chrome DevTools Protocol语句
:param cmd: 协议项目 :param cmd: 协议项目
@ -872,6 +894,45 @@ class ChromiumBase(BasePage):
return str(path.absolute()) return str(path.absolute())
class BaseDownloadSetter(DownloadSetter):
"""用于设置下载参数的类"""
def __init__(self, page):
"""
:param page: ChromiumPage对象
"""
super().__init__(page)
self._session = None
self._show_msg = True
@property
def session(self):
"""返回用于DownloadKit的Session对象"""
if self._session is None:
self._session = Session()
ua = self._page.run_cdp('Runtime.evaluate', expression='navigator.userAgent;')['result']['value']
self._session.headers.update({"User-Agent": ua})
self._cookies_to_session()
return self._session
@property
def DownloadKit(self):
if self._DownloadKit is None:
self._DownloadKit = DownloadKit(session=self._page, goal_path=self._page.download_path)
return self._DownloadKit
def show_msg(self, on_off=True):
"""是否显示下载信息
:param on_off: bool表示开或关
:return: None
"""
self._show_msg = on_off
def _cookies_to_session(self):
"""把driver对象的cookies复制到session对象"""
set_session_cookies(self._session, self._page.get_cookies(as_dict=False, all_info=False))
class ChromiumBaseSetter(object): class ChromiumBaseSetter(object):
def __init__(self, page): def __init__(self, page):
self._page = page self._page = page
@ -981,10 +1042,8 @@ class ChromiumBaseWaiter(object):
:param timeout: 超时时间默认读取页面超时时间 :param timeout: 超时时间默认读取页面超时时间
:return: 是否等待成功 :return: 是否等待成功
""" """
if isinstance(loc_or_ele, (str, tuple)): ele = self._driver._ele(loc_or_ele, raise_err=False, timeout=0)
ele = self._driver._ele(loc_or_ele, timeout=.3, raise_err=False) return ele.wait.delete(timeout) if ele else True
return ele.wait.delete(timeout) if ele else True
return loc_or_ele.wait.delete(timeout)
def ele_display(self, loc_or_ele, timeout=None): def ele_display(self, loc_or_ele, timeout=None):
"""等待元素变成显示状态 """等待元素变成显示状态
@ -992,8 +1051,8 @@ class ChromiumBaseWaiter(object):
:param timeout: 超时时间默认读取页面超时时间 :param timeout: 超时时间默认读取页面超时时间
:return: 是否等待成功 :return: 是否等待成功
""" """
ele = self._driver._ele(loc_or_ele, raise_err=False) ele = self._driver._ele(loc_or_ele, raise_err=False, timeout=0)
return ele.wait.display(timeout) if ele else False return ele.wait.display(timeout)
def ele_hidden(self, loc_or_ele, timeout=None): def ele_hidden(self, loc_or_ele, timeout=None):
"""等待元素变成隐藏状态 """等待元素变成隐藏状态
@ -1001,9 +1060,18 @@ class ChromiumBaseWaiter(object):
:param timeout: 超时时间默认读取页面超时时间 :param timeout: 超时时间默认读取页面超时时间
:return: 是否等待成功 :return: 是否等待成功
""" """
ele = self._driver._ele(loc_or_ele, raise_err=False) ele = self._driver._ele(loc_or_ele, raise_err=False, timeout=0)
return ele.wait.hidden(timeout) return ele.wait.hidden(timeout)
def ele_load(self, loc, timeout=None):
"""等待元素加载到DOM
:param loc: 要等待的元素输入定位符
:param timeout: 超时时间默认读取页面超时时间
:return: 成功返回元素对象失败返回False
"""
ele = self._driver._ele(loc, raise_err=False, timeout=timeout)
return ele if ele else False
def load_start(self, timeout=None): def load_start(self, timeout=None):
"""等待页面开始加载 """等待页面开始加载
:param timeout: 超时时间为None时使用页面timeout属性 :param timeout: 超时时间为None时使用页面timeout属性

View File

@ -7,6 +7,8 @@ from pathlib import Path
from typing import Union, Tuple, List, Any from typing import Union, Tuple, List, Any
from DataRecorder import Recorder from DataRecorder import Recorder
from DownloadKit import DownloadKit
from DownloadKit.mission import Mission
from requests import Session from requests import Session
from requests.cookies import RequestsCookieJar from requests.cookies import RequestsCookieJar
@ -17,6 +19,7 @@ from .chromium_frame import ChromiumFrame
from .commons.constants import NoneElement from .commons.constants import NoneElement
from .network_listener import NetworkListener from .network_listener import NetworkListener
from .session_element import SessionElement from .session_element import SessionElement
from .session_page import DownloadSetter
class ChromiumBase(BasePage): class ChromiumBase(BasePage):
@ -43,6 +46,8 @@ class ChromiumBase(BasePage):
self._set: ChromiumBaseSetter = ... self._set: ChromiumBaseSetter = ...
self._screencast: Screencast = ... self._screencast: Screencast = ...
self._listener: NetworkListener = ... self._listener: NetworkListener = ...
self._download_path: str = ...
self._download_set: BaseDownloadSetter = ...
def _connect_browser(self, tab_id: str = None) -> None: ... def _connect_browser(self, tab_id: str = None) -> None: ...
@ -133,6 +138,15 @@ class ChromiumBase(BasePage):
@property @property
def listener(self) -> NetworkListener: ... def listener(self) -> NetworkListener: ...
@property
def download_set(self) -> BaseDownloadSetter: ...
@property
def download(self) -> DownloadKit: ...
@property
def download_path(self) -> str: ...
def run_js(self, script: str, *args: Any, as_expr: bool = False) -> Any: ... def run_js(self, script: str, *args: Any, as_expr: bool = False) -> Any: ...
def run_js_loaded(self, script: str, *args: Any, as_expr: bool = False) -> Any: ... def run_js_loaded(self, script: str, *args: Any, as_expr: bool = False) -> Any: ...
@ -211,6 +225,22 @@ class ChromiumBase(BasePage):
timeout: float = None) -> Union[bool, None]: ... timeout: float = None) -> Union[bool, None]: ...
class BaseDownloadSetter(DownloadSetter):
def __init__(self, page: ChromiumBase):
self._page: ChromiumBase = ...
self._session: Session = ...
self._show_msg: bool = ...
@property
def session(self) -> Session: ...
def save_path(self, path: Union[str, Path]) -> None: ...
def show_msg(self, on_off: bool = True) -> None: ...
def _cookies_to_session(self) -> None: ...
class ChromiumBaseWaiter(object): class ChromiumBaseWaiter(object):
def __init__(self, page: ChromiumBase): def __init__(self, page: ChromiumBase):
self._driver: ChromiumBase = ... self._driver: ChromiumBase = ...
@ -221,6 +251,8 @@ class ChromiumBaseWaiter(object):
def ele_hidden(self, loc_or_ele: Union[str, tuple, ChromiumElement], timeout: float = None) -> bool: ... def ele_hidden(self, loc_or_ele: Union[str, tuple, ChromiumElement], timeout: float = None) -> bool: ...
def ele_load(self, loc: Union[str, tuple], timeout: float = None) -> Union[bool, ChromiumElement]: ...
def _loading(self, timeout: float = None, start: bool = True, gap: float = .01) -> bool: ... def _loading(self, timeout: float = None, start: bool = True, gap: float = .01) -> bool: ...
def load_start(self, timeout: float = None) -> bool: ... def load_start(self, timeout: float = None) -> bool: ...

View File

@ -3,23 +3,16 @@
@Author : g1879 @Author : g1879
@Contact : g1879@qq.com @Contact : g1879@qq.com
""" """
from pathlib import Path
from platform import system from platform import system
from threading import Thread
from time import perf_counter, sleep from time import perf_counter, sleep
from warnings import warn
from requests import Session
from .chromium_base import ChromiumBase, Timeout, ChromiumBaseSetter, ChromiumBaseWaiter from .chromium_base import ChromiumBase, Timeout, ChromiumBaseSetter, ChromiumBaseWaiter
from .chromium_driver import ChromiumDriver from .chromium_driver import ChromiumDriver
from .chromium_tab import ChromiumTab from .chromium_tab import ChromiumTab
from .commons.browser import connect_browser from .commons.browser import connect_browser
from .commons.tools import port_is_using, get_usable_path from .commons.tools import port_is_using
from .commons.web import set_session_cookies
from .configs.chromium_options import ChromiumOptions from .configs.chromium_options import ChromiumOptions
from .errors import CDPError, BrowserConnectError from .errors import BrowserConnectError
from .session_page import DownloadSetter
class ChromiumPage(ChromiumBase): class ChromiumPage(ChromiumBase):
@ -31,8 +24,8 @@ class ChromiumPage(ChromiumBase):
:param tab_id: 要控制的标签页id不指定默认为激活的 :param tab_id: 要控制的标签页id不指定默认为激活的
:param timeout: 超时时间 :param timeout: 超时时间
""" """
self._download_set = None # self._download_set = None
self._download_path = None # self._download_path = None
super().__init__(addr_driver_opts, tab_id, timeout) super().__init__(addr_driver_opts, tab_id, timeout)
def _set_start_options(self, addr_driver_opts, none): def _set_start_options(self, addr_driver_opts, none):
@ -104,10 +97,10 @@ class ChromiumPage(ChromiumBase):
self._rect = None self._rect = None
self._main_tab = self.tab_id self._main_tab = self.tab_id
try: # try:
self.download_set.by_browser() # self.download_set.by_browser()
except CDPError: # except CDPError:
pass # pass
self._process_id = None self._process_id = None
r = self.browser_driver.SystemInfo.getProcessInfo() r = self.browser_driver.SystemInfo.getProcessInfo()
@ -155,23 +148,23 @@ class ChromiumPage(ChromiumBase):
self._set = ChromiumPageSetter(self) self._set = ChromiumPageSetter(self)
return self._set return self._set
@property # @property
def download_path(self): # def download_path(self):
"""返回默认下载路径""" # """返回默认下载路径"""
p = self._download_path or '' # p = self._download_path or ''
return str(Path(p).absolute()) # return str(Path(p).absolute())
#
@property # @property
def download_set(self): # def download_set(self):
"""返回用于设置下载参数的对象""" # """返回用于设置下载参数的对象"""
if self._download_set is None: # if self._download_set is None:
self._download_set = ChromiumDownloadSetter(self) # self._download_set = BaseDownloadSetter(self)
return self._download_set # return self._download_set
#
@property # @property
def download(self): # def download(self):
"""返回下载器对象""" # """返回下载器对象"""
return self.download_set._switched_DownloadKit # return self.download_set._switched_DownloadKit
@property @property
def rect(self): def rect(self):
@ -214,7 +207,7 @@ class ChromiumPage(ChromiumBase):
and (tab_type is None or i['type'] in tab_type))] and (tab_type is None or i['type'] in tab_type))]
return r[0]['id'] if r and single else r return r[0]['id'] if r and single else r
def new_tab(self, url=None, switch_to=True): def new_tab(self, url=None, switch_to=False):
"""新建一个标签页,该标签页在最后面 """新建一个标签页,该标签页在最后面
:param url: 新标签页跳转到的网址 :param url: 新标签页跳转到的网址
:param switch_to: 新建标签页后是否把焦点移过去 :param switch_to: 新建标签页后是否把焦点移过去
@ -386,20 +379,6 @@ class ChromiumPageWaiter(ChromiumBaseWaiter):
super().__init__(page) super().__init__(page)
self._listener = None self._listener = None
def download_begin(self, timeout=1.5):
"""等待浏览器下载开始
:param timeout: 等待超时时间为None则使用页面对象timeout属性
:return: 是否等到下载开始
"""
return self._driver.download_set.wait_download_begin(timeout)
def download_finish(self, timeout=None):
"""等待下载结束
:param timeout: 等待超时时间为None则使用页面对象timeout属性
:return: 是否等到下载结束
"""
return self._driver.download_set.wait_download_finish(timeout)
def new_tab(self, timeout=None): def new_tab(self, timeout=None):
"""等待新标签页出现 """等待新标签页出现
:param timeout: 等待超时时间为None则使用页面对象timeout属性 :param timeout: 等待超时时间为None则使用页面对象timeout属性
@ -410,6 +389,20 @@ class ChromiumPageWaiter(ChromiumBaseWaiter):
while self._driver.tab_id == self._driver.latest_tab and perf_counter() < end_time: while self._driver.tab_id == self._driver.latest_tab and perf_counter() < end_time:
sleep(.01) sleep(.01)
# def download_begin(self, timeout=1.5):
# """等待浏览器下载开始
# :param timeout: 等待超时时间为None则使用页面对象timeout属性
# :return: 是否等到下载开始
# """
# return self._driver.download_set.wait_download_begin(timeout)
#
# def download_finish(self, timeout=None):
# """等待下载结束
# :param timeout: 等待超时时间为None则使用页面对象timeout属性
# :return: 是否等到下载结束
# """
# return self._driver.download_set.wait_download_finish(timeout)
class ChromiumTabRect(object): class ChromiumTabRect(object):
def __init__(self, page): def __init__(self, page):
@ -482,233 +475,233 @@ class ChromiumTabRect(object):
return self._page.browser_driver.Browser.getWindowForTarget(targetId=self._page.tab_id)['bounds'] return self._page.browser_driver.Browser.getWindowForTarget(targetId=self._page.tab_id)['bounds']
class ChromiumDownloadSetter(DownloadSetter): # class BaseDownloadSetter(DownloadSetter):
"""用于设置下载参数的类""" # """用于设置下载参数的类"""
#
def __init__(self, page): # def __init__(self, page):
""" # """
:param page: ChromiumPage对象 # :param page: ChromiumPage对象
""" # """
super().__init__(page) # super().__init__(page)
self._behavior = 'allowAndName' # self._behavior = 'allowAndName'
self._session = None # self._session = None
self._save_path = '' # self._save_path = ''
self._rename = None # self._rename = None
self._waiting_download = False # self._waiting_download = False
self._download_begin = False # self._download_begin = False
self._browser_missions = {} # self._browser_missions = {}
self._browser_downloading_count = 0 # self._browser_downloading_count = 0
self._show_msg = True # self._show_msg = True
#
@property # @property
def session(self): # def session(self):
"""返回用于DownloadKit的Session对象""" # """返回用于DownloadKit的Session对象"""
if self._session is None: # if self._session is None:
self._session = Session() # self._session = Session()
return self._session # return self._session
#
@property # @property
def browser_missions(self): # def browser_missions(self):
"""返回浏览器下载任务""" # """返回浏览器下载任务"""
return list(self._browser_missions.values()) # return list(self._browser_missions.values())
#
@property # @property
def DownloadKit_missions(self): # def DownloadKit_missions(self):
"""返回DownloadKit下载任务""" # """返回DownloadKit下载任务"""
return list(self.DownloadKit.missions.values()) # return list(self.DownloadKit.missions.values())
#
@property # @property
def _switched_DownloadKit(self): # def _switched_DownloadKit(self):
"""返回从浏览器同步cookies后的Session对象""" # """返回从浏览器同步cookies后的Session对象"""
self._cookies_to_session() # self._cookies_to_session()
return self.DownloadKit # return self.DownloadKit
#
def save_path(self, path): # def save_path(self, path):
"""设置下载路径 # """设置下载路径
:param path: 下载路径 # :param path: 下载路径
:return: None # :return: None
""" # """
path = path or '' # path = path or ''
path = Path(path).absolute() # path = Path(path).absolute()
path.mkdir(parents=True, exist_ok=True) # path.mkdir(parents=True, exist_ok=True)
path = str(path) # path = str(path)
self._save_path = path # self._save_path = path
self._page._download_path = path # self._page._download_path = path
try: # try:
self._page.browser_driver.Browser.setDownloadBehavior(behavior='allowAndName', downloadPath=path, # self._page.browser_driver.Browser.setDownloadBehavior(behavior='allowAndName', downloadPath=path,
eventsEnabled=True) # eventsEnabled=True)
except CDPError: # except CDPError:
warn('\n您的浏览器版本太低,用新标签页下载文件可能崩溃,建议升级。') # warn('\n您的浏览器版本太低,用新标签页下载文件可能崩溃,建议升级。')
self._page.run_cdp('Page.setDownloadBehavior', behavior='allowAndName', downloadPath=path) # self._page.run_cdp('Page.setDownloadBehavior', behavior='allowAndName', downloadPath=path)
#
self.DownloadKit.goal_path = path # self.DownloadKit.goal_path = path
#
def rename(self, name): # def rename(self, name):
"""设置浏览器下一个下载任务的文件名 # """设置浏览器下一个下载任务的文件名
:param name: 文件名不带后缀时自动使用原后缀 # :param name: 文件名,不带后缀时自动使用原后缀
:return: None # :return: None
""" # """
self._rename = name # self._rename = name
#
def by_browser(self): # def by_browser(self):
"""设置使用浏览器下载文件""" # """设置使用浏览器下载文件"""
try: # try:
self._page.browser_driver.Browser.setDownloadBehavior(behavior='allowAndName', eventsEnabled=True, # self._page.browser_driver.Browser.setDownloadBehavior(behavior='allowAndName', eventsEnabled=True,
downloadPath=self._page.download_path) # downloadPath=self._page.download_path)
self._page.browser_driver.Browser.downloadWillBegin = self._download_will_begin # self._page.browser_driver.Browser.downloadWillBegin = self._download_will_begin
self._page.browser_driver.Browser.downloadProgress = self._download_progress # self._page.browser_driver.Browser.downloadProgress = self._download_progress
except CDPError: # except CDPError:
self._page.driver.Page.setDownloadBehavior(behavior='allowAndName', downloadPath=self._page.download_path) # self._page.driver.Page.setDownloadBehavior(behavior='allowAndName', downloadPath=self._page.download_path)
self._page.driver.Page.downloadWillBegin = self._download_will_begin # self._page.driver.Page.downloadWillBegin = self._download_will_begin
self._page.driver.Page.downloadProgress = self._download_progress # self._page.driver.Page.downloadProgress = self._download_progress
#
self._behavior = 'allowAndName' # self._behavior = 'allowAndName'
#
def by_DownloadKit(self): # def by_DownloadKit(self):
"""设置使用DownloadKit下载文件""" # """设置使用DownloadKit下载文件"""
try: # try:
self._page.browser_driver.Browser.setDownloadBehavior(behavior='deny', eventsEnabled=True) # self._page.browser_driver.Browser.setDownloadBehavior(behavior='deny', eventsEnabled=True)
self._page.browser_driver.Browser.downloadWillBegin = self._download_by_DownloadKit # self._page.browser_driver.Browser.downloadWillBegin = self._download_by_DownloadKit
except CDPError: # except CDPError:
raise RuntimeError('您的浏览器版本太低,不支持此方法,请升级。') # raise RuntimeError('您的浏览器版本太低,不支持此方法,请升级。')
#
self._behavior = 'deny' # self._behavior = 'deny'
#
def wait_download_begin(self, timeout=None): # def wait_download_begin(self, timeout=None):
"""等待浏览器下载开始 # """等待浏览器下载开始
:param timeout: 等待超时时间为None则使用页面对象timeout属性 # :param timeout: 等待超时时间为None则使用页面对象timeout属性
:return: 是否等到下载开始 # :return: 是否等到下载开始
""" # """
self._waiting_download = True # self._waiting_download = True
result = False # result = False
timeout = timeout if timeout is not None else self._page.timeout # timeout = timeout if timeout is not None else self._page.timeout
end_time = perf_counter() + timeout # end_time = perf_counter() + timeout
while perf_counter() < end_time: # while perf_counter() < end_time:
if self._download_begin: # if self._download_begin:
result = True # result = True
break # break
sleep(.05) # sleep(.05)
self._download_begin = False # self._download_begin = False
self._waiting_download = False # self._waiting_download = False
return result # return result
#
def wait_download_finish(self, timeout=None): # def wait_download_finish(self, timeout=None):
"""等待所有下载结束 # """等待所有下载结束
:param timeout: 超时时间 # :param timeout: 超时时间
:return: 是否等待到下载完成 # :return: 是否等待到下载完成
""" # """
timeout = timeout if timeout is not None else self._page.timeout # timeout = timeout if timeout is not None else self._page.timeout
end_time = perf_counter() + timeout # end_time = perf_counter() + timeout
while perf_counter() < end_time: # while perf_counter() < end_time:
if (self._DownloadKit is None or not self.DownloadKit.is_running) and self._browser_downloading_count == 0: # if (self._DownloadKit is None or not self.DownloadKit.is_running) and self._browser_downloading_count == 0:
return True # return True
sleep(.5) # sleep(.5)
return False # return False
#
def show_msg(self, on_off=True): # def show_msg(self, on_off=True):
"""是否显示下载信息 # """是否显示下载信息
:param on_off: bool表示开或关 # :param on_off: bool表示开或关
:return: None # :return: None
""" # """
self._show_msg = on_off # self._show_msg = on_off
#
def _cookies_to_session(self): # def _cookies_to_session(self):
"""把driver对象的cookies复制到session对象""" # """把driver对象的cookies复制到session对象"""
ua = self._page.run_cdp('Runtime.evaluate', expression='navigator.userAgent;')['result']['value'] # ua = self._page.run_cdp('Runtime.evaluate', expression='navigator.userAgent;')['result']['value']
self.session.headers.update({"User-Agent": ua}) # self.session.headers.update({"User-Agent": ua})
set_session_cookies(self.session, self._page.get_cookies(as_dict=False, all_info=False)) # set_session_cookies(self.session, self._page.get_cookies(as_dict=False, all_info=False))
#
def _download_by_DownloadKit(self, **kwargs): # def _download_by_DownloadKit(self, **kwargs):
"""拦截浏览器下载并用downloadKit下载""" # """拦截浏览器下载并用downloadKit下载"""
url = kwargs['url'] # url = kwargs['url']
if url.startswith('blob:'): # if url.startswith('blob:'):
raise TypeError('bolb:开头的链接无法使用DownloadKit下载请用浏览器下载功能。') # raise TypeError('bolb:开头的链接无法使用DownloadKit下载请用浏览器下载功能。')
#
self._page.browser_driver.Browser.cancelDownload(guid=kwargs['guid']) # self._page.browser_driver.Browser.cancelDownload(guid=kwargs['guid'])
#
if self._rename: # if self._rename:
rename = get_rename(kwargs['suggestedFilename'], self._rename) # rename = get_rename(kwargs['suggestedFilename'], self._rename)
self._rename = None # self._rename = None
else: # else:
rename = kwargs['suggestedFilename'] # rename = kwargs['suggestedFilename']
#
mission = self._page.download.add(file_url=url, goal_path=self._page.download_path, rename=rename) # mission = self._page.download.add(file_url=url, goal_path=self._page.download_path, rename=rename)
Thread(target=self._wait_download_complete, args=(mission,), daemon=False).start() # Thread(target=self._wait_download_complete, args=(mission,), daemon=False).start()
#
if self._waiting_download: # if self._waiting_download:
self._download_begin = True # self._download_begin = True
#
self._browser_downloading_count += 1 # self._browser_downloading_count += 1
#
if self._show_msg: # if self._show_msg:
print(f'(DownloadKit)开始下载:{Path(self._save_path) / rename}') # print(f'(DownloadKit)开始下载:{Path(self._save_path) / rename}')
#
def _download_will_begin(self, **kwargs): # def _download_will_begin(self, **kwargs):
"""浏览器下载即将开始时调用""" # """浏览器下载即将开始时调用"""
if self._rename: # if self._rename:
rename = get_rename(kwargs['suggestedFilename'], self._rename) # rename = get_rename(kwargs['suggestedFilename'], self._rename)
self._rename = None # self._rename = None
else: # else:
rename = kwargs['suggestedFilename'] # rename = kwargs['suggestedFilename']
#
m = BrowserDownloadMission(kwargs['guid'], kwargs['url'], rename) # m = BrowserDownloadMission(kwargs['guid'], kwargs['url'], rename)
self._browser_missions[kwargs['guid']] = m # self._browser_missions[kwargs['guid']] = m
aid_path = Path(self._save_path) / rename # aid_path = Path(self._save_path) / rename
#
if self._show_msg: # if self._show_msg:
print(f'(Browser)开始下载:{rename}') # print(f'(Browser)开始下载:{rename}')
self._browser_downloading_count += 1 # self._browser_downloading_count += 1
#
if self._file_exists == 'skip' and aid_path.exists(): # if self._file_exists == 'skip' and aid_path.exists():
m.state = 'skipped' # m.state = 'skipped'
m.save_path = aid_path.absolute() # m.save_path = aid_path.absolute()
self._page.browser_driver.call_method('Browser.cancelDownload', guid=kwargs['guid']) # self._page.browser_driver.call_method('Browser.cancelDownload', guid=kwargs['guid'])
(Path(self._save_path) / kwargs["guid"]).unlink(missing_ok=True) # (Path(self._save_path) / kwargs["guid"]).unlink(missing_ok=True)
return # return
#
if self._waiting_download: # if self._waiting_download:
self._download_begin = True # self._download_begin = True
#
def _download_progress(self, **kwargs): # def _download_progress(self, **kwargs):
"""下载状态产生变化时调用""" # """下载状态产生变化时调用"""
guid = kwargs['guid'] # guid = kwargs['guid']
m = self._browser_missions.get(guid, None) # m = self._browser_missions.get(guid, None)
if m: # if m:
m.size = kwargs['totalBytes'] # m.size = kwargs['totalBytes']
m.received = kwargs['receivedBytes'] # m.received = kwargs['receivedBytes']
m.state = kwargs['state'] # m.state = kwargs['state']
#
if m.state == 'completed': # if m.state == 'completed':
path = Path(self._save_path) / m.name # path = Path(self._save_path) / m.name
from_path = Path(self._save_path) / guid # from_path = Path(self._save_path) / guid
if path.exists(): # if path.exists():
if self._file_exists == 'rename': # if self._file_exists == 'rename':
path = get_usable_path(path) # path = get_usable_path(path)
else: # 'overwrite' # else: # 'overwrite'
path.unlink() # path.unlink()
from_path.rename(path) # from_path.rename(path)
m.save_path = path.absolute() # m.save_path = path.absolute()
#
if kwargs['state'] != 'inProgress': # if kwargs['state'] != 'inProgress':
if self._show_msg and m: # if self._show_msg and m:
if kwargs['state'] == 'completed': # if kwargs['state'] == 'completed':
print(f'(Browser)下载完成:{m.save_path}') # print(f'(Browser)下载完成:{m.save_path}')
elif m.state != 'skipped': # elif m.state != 'skipped':
print(f'(Browser)下载失败:{m.save_path}') # print(f'(Browser)下载失败:{m.save_path}')
else: # else:
print(f'(Browser)已跳过:{m.save_path}') # print(f'(Browser)已跳过:{m.save_path}')
self._browser_downloading_count -= 1 # self._browser_downloading_count -= 1
#
def _wait_download_complete(self, mission): # def _wait_download_complete(self, mission):
"""等待DownloadKit下载完成""" # """等待DownloadKit下载完成"""
mission.wait(show=False) # mission.wait(show=False)
if self._show_msg: # if self._show_msg:
if mission.result == 'skip': # if mission.result == 'skip':
print(f'(DownloadKit)已跳过:{mission.path}') # print(f'(DownloadKit)已跳过:{mission.path}')
elif not mission.result: # elif not mission.result:
print(f'(DownloadKit)下载失败:{mission.path}') # print(f'(DownloadKit)下载失败:{mission.path}')
else: # else:
print(f'(DownloadKit)下载完成:{mission.path}') # print(f'(DownloadKit)下载完成:{mission.path}')
class BrowserDownloadMission(object): class BrowserDownloadMission(object):

View File

@ -30,8 +30,6 @@ class ChromiumPage(ChromiumBase):
self._window_setter: WindowSetter = ... self._window_setter: WindowSetter = ...
self._main_tab: str = ... self._main_tab: str = ...
self._alert: Alert = ... self._alert: Alert = ...
self._download_path: str = ...
self._download_set: ChromiumDownloadSetter = ...
self._browser_driver: ChromiumDriver = ... self._browser_driver: ChromiumDriver = ...
self._rect: ChromiumTabRect = ... self._rect: ChromiumTabRect = ...
@ -70,21 +68,12 @@ class ChromiumPage(ChromiumBase):
@property @property
def set(self) -> ChromiumPageSetter: ... def set(self) -> ChromiumPageSetter: ...
@property
def download_set(self) -> ChromiumDownloadSetter: ...
@property
def download(self) -> DownloadKit: ...
@property
def download_path(self) -> str: ...
def get_tab(self, tab_id: str = None) -> ChromiumTab: ... def get_tab(self, tab_id: str = None) -> ChromiumTab: ...
def find_tabs(self, title: str = None, url: str = None, def find_tabs(self, title: str = None, url: str = None,
tab_type: Union[str, list, tuple, set] = None, single: bool = True) -> Union[str, List[str]]: ... tab_type: Union[str, list, tuple, set] = None, single: bool = True) -> Union[str, List[str]]: ...
def new_tab(self, url: str = None, switch_to: bool = True) -> str: ... def new_tab(self, url: str = None, switch_to: bool = False) -> str: ...
def to_main_tab(self) -> None: ... def to_main_tab(self) -> None: ...
@ -153,7 +142,7 @@ class ChromiumTabRect(object):
def _get_browser_rect(self) -> dict: ... def _get_browser_rect(self) -> dict: ...
class ChromiumDownloadSetter(DownloadSetter): class BaseDownloadSetter(DownloadSetter):
def __init__(self, page: ChromiumPage): def __init__(self, page: ChromiumPage):
self._page: ChromiumPage = ... self._page: ChromiumPage = ...
self._behavior: str = ... self._behavior: str = ...

View File

@ -13,12 +13,11 @@ from requests.auth import HTTPBasicAuth
from requests.cookies import RequestsCookieJar from requests.cookies import RequestsCookieJar
from requests.structures import CaseInsensitiveDict from requests.structures import CaseInsensitiveDict
from .commons.constants import NoneElement
from .base import BasePage from .base import BasePage
from .chromium_page import ChromiumPage from .chromium_base import ChromiumBase
from .commons.constants import NoneElement
from .configs.session_options import SessionOptions from .configs.session_options import SessionOptions
from .session_element import SessionElement from .session_element import SessionElement
from .web_page import WebPage
class SessionPage(BasePage): class SessionPage(BasePage):
@ -202,7 +201,7 @@ class SessionPageSetter(object):
class DownloadSetter(object): class DownloadSetter(object):
def __init__(self, page: Union[SessionPage, WebPage, ChromiumPage]): def __init__(self, page: Union[SessionPage, ChromiumBase]):
self._page: SessionPage = ... self._page: SessionPage = ...
self._DownloadKit: DownloadKit = ... self._DownloadKit: DownloadKit = ...
self._file_exists: str = ... self._file_exists: str = ...

View File

@ -3,20 +3,17 @@
@Author : g1879 @Author : g1879
@Contact : g1879@qq.com @Contact : g1879@qq.com
""" """
from pathlib import Path
from warnings import warn
from requests import Session from requests import Session
from .base import BasePage from .base import BasePage
from .chromium_base import ChromiumBase, Timeout from .chromium_base import ChromiumBase, Timeout
from .chromium_driver import ChromiumDriver from .chromium_driver import ChromiumDriver
from .chromium_page import ChromiumPage, ChromiumDownloadSetter, ChromiumPageSetter from .chromium_page import ChromiumPage, ChromiumPageSetter
from .chromium_tab import WebPageTab from .chromium_tab import WebPageTab
from .commons.web import set_session_cookies, set_browser_cookies from .commons.web import set_session_cookies, set_browser_cookies
from .configs.chromium_options import ChromiumOptions from .configs.chromium_options import ChromiumOptions
from .configs.session_options import SessionOptions from .configs.session_options import SessionOptions
from .errors import CDPError
from .session_page import SessionPage, SessionPageSetter from .session_page import SessionPage, SessionPageSetter
@ -45,7 +42,7 @@ class WebPage(SessionPage, ChromiumPage, BasePage):
self._driver_options = None self._driver_options = None
self._session_options = None self._session_options = None
self._response = None self._response = None
self._download_set = None # self._download_set = None
self._set = None self._set = None
self._screencast = None self._screencast = None
@ -211,22 +208,22 @@ class WebPage(SessionPage, ChromiumPage, BasePage):
""" """
self.set.timeouts(implicit=second) self.set.timeouts(implicit=second)
@property # @property
def download_path(self): # def download_path(self):
"""返回默认下载路径""" # """返回默认下载路径"""
return super(SessionPage, self).download_path # return super(SessionPage, self).download_path
#
@property # @property
def download_set(self): # def download_set(self):
"""返回下载设置对象""" # """返回下载设置对象"""
if self._download_set is None: # if self._download_set is None:
self._download_set = WebPageDownloadSetter(self) # self._download_set = WebPageDownloadSetter(self)
return self._download_set # return self._download_set
#
@property # @property
def download(self): # def download(self):
"""返回下载器对象""" # """返回下载器对象"""
return self.download_set._switched_DownloadKit # return self.download_set._switched_DownloadKit
@property @property
def set(self): def set(self):
@ -494,69 +491,68 @@ class WebPageSetter(ChromiumPageSetter):
else: else:
self._chromium_setter.user_agent(ua, platform) self._chromium_setter.user_agent(ua, platform)
# class WebPageDownloadSetter(BaseDownloadSetter):
class WebPageDownloadSetter(ChromiumDownloadSetter): # """用于设置下载参数的类"""
"""用于设置下载参数的类""" #
# def __init__(self, page):
def __init__(self, page): # super().__init__(page)
super().__init__(page) # self._session = page.session
self._session = page.session #
# @property
@property # def _switched_DownloadKit(self):
def _switched_DownloadKit(self): # """返回从浏览器同步cookies后的Session对象"""
"""返回从浏览器同步cookies后的Session对象""" # if self._page.mode == 'd':
if self._page.mode == 'd': # self._cookies_to_session()
self._cookies_to_session() # return self.DownloadKit
return self.DownloadKit #
# def save_path(self, path):
def save_path(self, path): # """设置下载路径
"""设置下载路径 # :param path: 下载路径
:param path: 下载路径 # :return: None
:return: None # """
""" # path = path or ''
path = path or '' # path = Path(path).absolute()
path = Path(path).absolute() # path.mkdir(parents=True, exist_ok=True)
path.mkdir(parents=True, exist_ok=True) # path = str(path)
path = str(path) # self._save_path = path
self._save_path = path # self._page._download_path = path
self._page._download_path = path # self.DownloadKit.goal_path = path
self.DownloadKit.goal_path = path #
# if self._page._has_driver:
if self._page._has_driver: # try:
try: # self._page.browser_driver.Browser.setDownloadBehavior(behavior=self._behavior, downloadPath=path,
self._page.browser_driver.Browser.setDownloadBehavior(behavior=self._behavior, downloadPath=path, # eventsEnabled=True)
eventsEnabled=True) # except CDPError:
except CDPError: # warn('\n您的浏览器版本太低用新标签页下载文件可能崩溃建议升级。')
warn('\n您的浏览器版本太低,用新标签页下载文件可能崩溃,建议升级。') # self._page.run_cdp('Page.setDownloadBehavior', behavior=self._behavior, downloadPath=path)
self._page.run_cdp('Page.setDownloadBehavior', behavior=self._behavior, downloadPath=path) #
# def by_browser(self):
def by_browser(self): # """设置使用浏览器下载文件"""
"""设置使用浏览器下载文件""" # if not self._page._has_driver:
if not self._page._has_driver: # raise RuntimeError('浏览器未连接。')
raise RuntimeError('浏览器未连接。') #
# try:
try: # self._page.browser_driver.Browser.setDownloadBehavior(behavior='allowAndName', eventsEnabled=True,
self._page.browser_driver.Browser.setDownloadBehavior(behavior='allowAndName', eventsEnabled=True, # downloadPath=self._page.download_path)
downloadPath=self._page.download_path) # self._page.browser_driver.Browser.downloadWillBegin = self._download_will_begin
self._page.browser_driver.Browser.downloadWillBegin = self._download_will_begin # self._page.browser_driver.Browser.downloadProgress = self._download_progress
self._page.browser_driver.Browser.downloadProgress = self._download_progress #
# except CDPError:
except CDPError: # warn('\n您的浏览器版本太低用新标签页下载文件可能崩溃建议升级。')
warn('\n您的浏览器版本太低,用新标签页下载文件可能崩溃,建议升级。') # self._page.driver.Page.setDownloadBehavior(behavior='allowAndName', downloadPath=self._page.download_path)
self._page.driver.Page.setDownloadBehavior(behavior='allowAndName', downloadPath=self._page.download_path) # self._page.driver.Page.downloadWillBegin = self._download_will_begin
self._page.driver.Page.downloadWillBegin = self._download_will_begin # self._page.driver.Page.downloadProgress = self._download_progress
self._page.driver.Page.downloadProgress = self._download_progress #
# self._behavior = 'allowAndName'
self._behavior = 'allowAndName' #
# def by_DownloadKit(self):
def by_DownloadKit(self): # """设置使用DownloadKit下载文件"""
"""设置使用DownloadKit下载文件""" # if self._page._has_driver:
if self._page._has_driver: # try:
try: # self._page.browser_driver.Browser.setDownloadBehavior(behavior='deny', eventsEnabled=True)
self._page.browser_driver.Browser.setDownloadBehavior(behavior='deny', eventsEnabled=True) # self._page.browser_driver.Browser.downloadWillBegin = self._download_by_DownloadKit
self._page.browser_driver.Browser.downloadWillBegin = self._download_by_DownloadKit # # self._page.browser_driver.Browser.downloadProgress = None
# self._page.browser_driver.Browser.downloadProgress = None # except CDPError:
except CDPError: # raise RuntimeError('您的浏览器版本太低,不支持此方法,请升级。')
raise RuntimeError('您的浏览器版本太低,不支持此方法,请升级。') #
# self._behavior = 'deny'
self._behavior = 'deny'