基本完成下载功能修改,待测试和完善

This commit is contained in:
g1879 2023-08-28 22:59:16 +08:00
parent bd54c1f481
commit b8e2be8799
9 changed files with 86 additions and 342 deletions

View File

@ -43,6 +43,8 @@ class ChromiumBase(BasePage):
self._set = None
self._screencast = None
self._listener = None
self._wait_download_flag = None
self._download_rename = None
if isinstance(address, int) or (isinstance(address, str) and address.isdigit()):
address = f'127.0.0.1:{address}'
@ -64,7 +66,6 @@ class ChromiumBase(BasePage):
def _set_runtime_settings(self):
self._timeouts = Timeout(self)
self._page_load_strategy = 'normal'
self._wait_download_flag = None
def _connect_browser(self, tab_id=None):
"""连接浏览器,在第一次时运行
@ -190,7 +191,7 @@ class ChromiumBase(BasePage):
return False
def _onFrameStartedLoading(self, **kwargs):
"""页面开始加载时触发"""
"""页面开始加载时执行"""
if kwargs['frameId'] == self._target_id:
self._is_loading = True
@ -200,7 +201,7 @@ class ChromiumBase(BasePage):
self._debug_recorder.add_data((perf_counter(), '加载流程', 'FrameStartedLoading'))
def _onFrameStoppedLoading(self, **kwargs):
"""页面加载完成后触发"""
"""页面加载完成后执行"""
if kwargs['frameId'] == self._target_id and self._first_run is False and self._is_loading:
if self._debug:
print('页面停止加载 FrameStoppedLoading')
@ -219,14 +220,14 @@ class ChromiumBase(BasePage):
self._get_document()
def _onDocumentUpdated(self, **kwargs):
"""页面跳转时触发"""
"""页面跳转时执行"""
if self._debug:
print('documentUpdated')
if self._debug_recorder:
self._debug_recorder.add_data((perf_counter(), '加载流程', 'documentUpdated'))
def _onFrameNavigated(self, **kwargs):
"""页面跳转时触发"""
"""页面跳转时执行"""
if kwargs['frame'].get('parentId', None) == self._target_id and self._first_run is False and self._is_loading:
self._is_loading = True
if self._debug:
@ -235,7 +236,7 @@ class ChromiumBase(BasePage):
self._debug_recorder.add_data((perf_counter(), '加载流程', 'navigated'))
def _onFileChooserOpened(self, **kwargs):
"""文件选择框打开时触发"""
"""文件选择框打开时执行"""
if self._upload_list:
files = self._upload_list if kwargs['mode'] == 'selectMultiple' else self._upload_list[:1]
self.run_cdp('DOM.setFileInputFiles', files=files, backendNodeId=kwargs['backendNodeId'])
@ -245,10 +246,23 @@ class ChromiumBase(BasePage):
self._upload_list = None
def _onDownloadWillBegin(self, **kwargs):
"""下载即将开始时执行"""
if self._wait_download_flag is False:
self._page.run_cdp('Browser.cancelDownload', guid=kwargs['guid'])
self._page._dl_mgr.add_mission(kwargs['guid'], self.download_path, kwargs['suggestedFilename'])
self._wait_download_flag = {'url': kwargs['url'], 'name': kwargs['suggestedFilename']}
if self._download_rename:
tmp = kwargs['suggestedFilename'].rsplit('.', 1)
ext_name = tmp[-1] if len(tmp) > 1 else ''
tmp = self._download_rename.rsplit('.', 1)
ext_rename = tmp[-1] if len(tmp) > 1 else ''
n = self._download_rename if ext_rename == ext_name else f'{self._download_rename}.{ext_name}'
self._download_rename = None
else:
n = kwargs['suggestedFilename']
self._page._dl_mgr.add_mission(kwargs['guid'], self.download_path, n)
self._wait_download_flag = {'url': kwargs['url'], 'name': n}
def __call__(self, loc_or_str, timeout=None):
"""在内部查找元素

View File

@ -46,6 +46,7 @@ class ChromiumBase(BasePage):
self._screencast: Screencast = ...
self._listener: NetworkListener = ...
self._wait_download_flag: bool = ...
self._download_rename: str = ...
def _connect_browser(self, tab_id: str = None) -> None: ...

View File

@ -446,266 +446,38 @@ class BrowserDownloadManager(object):
self._page = page
page.set.download_path(page.download_path)
self._page.browser_driver.set_listener('Browser.downloadProgress', self._onDownloadProgress)
self._page.browser_driver.set_listener('Browser.downloadWillBegin', self._onDownloadWillBegin)
self._missions = {}
def add_mission(self, guid, path, name):
"""添加下载任务信息
:param guid: guid
:param path: 保存路径
:param name: 保存文件名
:return: None
"""
self._missions[guid] = {'path': path, 'name': name}
def _onDownloadWillBegin(self, **kwargs):
"""用于获取弹出新标签页触发的下载任务"""
sleep(.1)
if kwargs['guid'] not in self._missions:
self.add_mission(kwargs['guid'], self._page.download_path, kwargs['suggestedFilename'])
def _onDownloadProgress(self, **kwargs):
if kwargs['state'] == 'completed' and kwargs['guid'] in self._missions:
"""下载状态变化时执行"""
if kwargs['state'] in ('completed', 'canceled') and kwargs['guid'] in self._missions:
guid = kwargs['guid']
path = self._missions[guid]['path']
name = self._missions[guid]['name']
form_path = f'{self._page.download_path}\\{guid}'
to_path = get_usable_path(f'{path}\\{name}')
if kwargs['state'] == 'completed':
path = self._missions[guid]['path']
name = self._missions[guid]['name']
form_path = f'{self._page.download_path}\\{guid}'
to_path = get_usable_path(f'{path}\\{name}')
move(form_path, to_path)
move(form_path, to_path)
self._missions.pop(guid)
# class BaseDownloadSetter(DownloadSetter):
# """用于设置下载参数的类"""
#
# def __init__(self, page):
# """
# :param page: ChromiumPage对象
# """
# super().__init__(page)
# self._behavior = 'allowAndName'
# self._session = None
# self._save_path = ''
# self._rename = None
# self._waiting_download = False
# self._download_begin = False
# self._browser_missions = {}
# self._browser_downloading_count = 0
# self._show_msg = True
#
# @property
# def session(self):
# """返回用于DownloadKit的Session对象"""
# if self._session is None:
# self._session = Session()
# return self._session
#
# @property
# def browser_missions(self):
# """返回浏览器下载任务"""
# return list(self._browser_missions.values())
#
# @property
# def DownloadKit_missions(self):
# """返回DownloadKit下载任务"""
# return list(self.DownloadKit.missions.values())
#
# @property
# def _switched_DownloadKit(self):
# """返回从浏览器同步cookies后的Session对象"""
# self._cookies_to_session()
# return self.DownloadKit
#
# def save_path(self, path):
# """设置下载路径
# :param path: 下载路径
# :return: None
# """
# path = path or ''
# path = Path(path).absolute()
# path.mkdir(parents=True, exist_ok=True)
# path = str(path)
# self._save_path = path
# self._page._download_path = path
# try:
# self._page.browser_driver.Browser.setDownloadBehavior(behavior='allowAndName', downloadPath=path,
# eventsEnabled=True)
# except CDPError:
# warn('\n您的浏览器版本太低用新标签页下载文件可能崩溃建议升级。')
# self._page.run_cdp('Page.setDownloadBehavior', behavior='allowAndName', downloadPath=path)
#
# self.DownloadKit.goal_path = path
#
# def rename(self, name):
# """设置浏览器下一个下载任务的文件名
# :param name: 文件名,不带后缀时自动使用原后缀
# :return: None
# """
# self._rename = name
#
# def by_browser(self):
# """设置使用浏览器下载文件"""
# try:
# self._page.browser_driver.Browser.setDownloadBehavior(behavior='allowAndName', eventsEnabled=True,
# downloadPath=self._page.download_path)
# self._page.browser_driver.Browser.downloadWillBegin = self._download_will_begin
# self._page.browser_driver.Browser.downloadProgress = self._download_progress
# except CDPError:
# self._page.driver.Page.setDownloadBehavior(behavior='allowAndName', downloadPath=self._page.download_path)
# self._page.driver.Page.downloadWillBegin = self._download_will_begin
# self._page.driver.Page.downloadProgress = self._download_progress
#
# self._behavior = 'allowAndName'
#
# def by_DownloadKit(self):
# """设置使用DownloadKit下载文件"""
# try:
# self._page.browser_driver.Browser.setDownloadBehavior(behavior='deny', eventsEnabled=True)
# self._page.browser_driver.Browser.downloadWillBegin = self._download_by_DownloadKit
# except CDPError:
# raise RuntimeError('您的浏览器版本太低,不支持此方法,请升级。')
#
# self._behavior = 'deny'
#
# def wait_download_begin(self, timeout=None):
# """等待浏览器下载开始
# :param timeout: 等待超时时间为None则使用页面对象timeout属性
# :return: 是否等到下载开始
# """
# self._waiting_download = True
# result = False
# timeout = timeout if timeout is not None else self._page.timeout
# end_time = perf_counter() + timeout
# while perf_counter() < end_time:
# if self._download_begin:
# result = True
# break
# sleep(.05)
# self._download_begin = False
# self._waiting_download = False
# return result
#
# def wait_download_finish(self, timeout=None):
# """等待所有下载结束
# :param timeout: 超时时间
# :return: 是否等待到下载完成
# """
# timeout = timeout if timeout is not None else self._page.timeout
# end_time = perf_counter() + timeout
# while perf_counter() < end_time:
# if (self._DownloadKit is None or not self.DownloadKit.is_running) and self._browser_downloading_count == 0:
# return True
# sleep(.5)
# return False
#
# def show_msg(self, on_off=True):
# """是否显示下载信息
# :param on_off: bool表示开或关
# :return: None
# """
# self._show_msg = on_off
#
# def _cookies_to_session(self):
# """把driver对象的cookies复制到session对象"""
# ua = self._page.run_cdp('Runtime.evaluate', expression='navigator.userAgent;')['result']['value']
# self.session.headers.update({"User-Agent": ua})
# set_session_cookies(self.session, self._page.get_cookies(as_dict=False, all_info=False))
#
# def _download_by_DownloadKit(self, **kwargs):
# """拦截浏览器下载并用downloadKit下载"""
# url = kwargs['url']
# if url.startswith('blob:'):
# raise TypeError('bolb:开头的链接无法使用DownloadKit下载请用浏览器下载功能。')
#
# self._page.browser_driver.Browser.cancelDownload(guid=kwargs['guid'])
#
# if self._rename:
# rename = get_rename(kwargs['suggestedFilename'], self._rename)
# self._rename = None
# else:
# rename = kwargs['suggestedFilename']
#
# mission = self._page.download.add(file_url=url, goal_path=self._page.download_path, rename=rename)
# Thread(target=self._wait_download_complete, args=(mission,), daemon=False).start()
#
# if self._waiting_download:
# self._download_begin = True
#
# self._browser_downloading_count += 1
#
# if self._show_msg:
# print(f'(DownloadKit)开始下载:{Path(self._save_path) / rename}')
#
# def _download_will_begin(self, **kwargs):
# """浏览器下载即将开始时调用"""
# if self._rename:
# rename = get_rename(kwargs['suggestedFilename'], self._rename)
# self._rename = None
# else:
# rename = kwargs['suggestedFilename']
#
# m = BrowserDownloadMission(kwargs['guid'], kwargs['url'], rename)
# self._browser_missions[kwargs['guid']] = m
# aid_path = Path(self._save_path) / rename
#
# if self._show_msg:
# print(f'(Browser)开始下载:{rename}')
# self._browser_downloading_count += 1
#
# if self._file_exists == 'skip' and aid_path.exists():
# m.state = 'skipped'
# m.save_path = aid_path.absolute()
# self._page.browser_driver.call_method('Browser.cancelDownload', guid=kwargs['guid'])
# (Path(self._save_path) / kwargs["guid"]).unlink(missing_ok=True)
# return
#
# if self._waiting_download:
# self._download_begin = True
#
# def _download_progress(self, **kwargs):
# """下载状态产生变化时调用"""
# guid = kwargs['guid']
# m = self._browser_missions.get(guid, None)
# if m:
# m.size = kwargs['totalBytes']
# m.received = kwargs['receivedBytes']
# m.state = kwargs['state']
#
# if m.state == 'completed':
# path = Path(self._save_path) / m.name
# from_path = Path(self._save_path) / guid
# if path.exists():
# if self._file_exists == 'rename':
# path = get_usable_path(path)
# else: # 'overwrite'
# path.unlink()
# from_path.rename(path)
# m.save_path = path.absolute()
#
# if kwargs['state'] != 'inProgress':
# if self._show_msg and m:
# if kwargs['state'] == 'completed':
# print(f'(Browser)下载完成:{m.save_path}')
# elif m.state != 'skipped':
# print(f'(Browser)下载失败:{m.save_path}')
# else:
# print(f'(Browser)已跳过:{m.save_path}')
# self._browser_downloading_count -= 1
#
# def _wait_download_complete(self, mission):
# """等待DownloadKit下载完成"""
# mission.wait(show=False)
# if self._show_msg:
# if mission.result == 'skip':
# print(f'(DownloadKit)已跳过:{mission.path}')
# elif not mission.result:
# print(f'(DownloadKit)下载失败:{mission.path}')
# else:
# print(f'(DownloadKit)下载完成:{mission.path}')
class BrowserDownloadMission(object):
def __init__(self, guid, url, name):
self.id = guid
self.url = url
self.name = name
self.save_path = None
self.state = None
self.size = None
self.received = None
def __repr__(self):
return f'<BrowserDownloadMission {self.save_path}>'
class Alert(object):
"""用于保存alert信息的类"""

View File

@ -133,70 +133,11 @@ class BrowserDownloadManager(object):
def add_mission(self, guid: str, path: str, name: str) -> None: ...
def _onDownloadWillBegin(self, **kwargs) -> None: ...
def _onDownloadProgress(self, **kwargs) -> None: ...
# class BaseDownloadSetter(DownloadSetter):
# def __init__(self, page: ChromiumPage):
# self._page: ChromiumPage = ...
# self._behavior: str = ...
# self._session: Session = ...
# self._save_path: str = ...
# self._rename: str = ...
# self._waiting_download: bool = ...
# self._download_begin: bool = ...
# self._browser_missions: Dict[str, BrowserDownloadMission] = ...
# self._browser_downloading_count: int = ...
# self._show_msg: bool = ...
#
# @property
# def session(self) -> Session: ...
#
# @property
# def browser_missions(self) -> List[BrowserDownloadMission]: ...
#
# @property
# def DownloadKit_missions(self) -> List[Mission]: ...
#
# @property
# def _switched_DownloadKit(self) -> DownloadKit: ...
#
# def save_path(self, path: Union[str, Path]) -> None: ...
#
# def rename(self, name: str) -> None: ...
#
# def by_browser(self) -> None: ...
#
# def by_DownloadKit(self) -> None: ...
#
# def wait_download_begin(self, timeout: float = None) -> bool: ...
#
# def wait_download_finish(self, timeout: float = None) -> bool: ...
#
# def show_msg(self, on_off: bool = True) -> None: ...
#
# def _cookies_to_session(self) -> None: ...
#
# def _download_by_DownloadKit(self, **kwargs) -> None: ...
#
# def _download_will_begin(self, **kwargs) -> None: ...
#
# def _download_progress(self, **kwargs) -> None: ...
#
# def _wait_download_complete(self, mission: Mission) -> None: ...
class BrowserDownloadMission(object):
def __init__(self, guid: str, url: str, name: str):
self.id: str = ...
self.url: str = ...
self.name: str = ...
self.save_path: str = ...
self.state: str = ...
self.size: str = ...
self.received: str = ...
class Alert(object):
def __init__(self):

View File

@ -16,6 +16,7 @@ def get_usable_path(path):
"""
path = Path(path)
parent = path.parent
parent.mkdir(parents=True, exist_ok=True)
path = parent / make_valid_name(path.name)
name = path.stem if path.is_file() else path.name
ext = path.suffix if path.is_file() else ''

View File

@ -117,8 +117,6 @@ class ChromiumBaseSetter(object):
self._page.run_cdp('Network.enable')
self._page.run_cdp('Network.setExtraHTTPHeaders', headers=headers)
class DownloadSetter(object):
def download_path(self, path):
"""设置下载路径
:param path: 下载路径
@ -128,13 +126,20 @@ class DownloadSetter(object):
if self._page._DownloadKit:
self._page._DownloadKit.set.goal_path(path)
def download_file_name(self, name):
"""设置下一个被下载文件的名称
:param name: 文件名可不含后缀
:return: None
"""
self._page._download_rename = name
class TabSetter(ChromiumBaseSetter, DownloadSetter):
class TabSetter(ChromiumBaseSetter):
def __init__(self, page):
super().__init__(page)
class ChromiumPageSetter(ChromiumBaseSetter, DownloadSetter):
class ChromiumPageSetter(ChromiumBaseSetter):
def main_tab(self, tab_id=None):
"""设置主tab
:param tab_id: 标签页id不传入则设置当前tab
@ -168,7 +173,7 @@ class ChromiumPageSetter(ChromiumBaseSetter, DownloadSetter):
behavior='allowAndName', eventsEnabled=True)
class SessionPageSetter(DownloadSetter):
class SessionPageSetter(object):
def __init__(self, page):
"""
:param page: SessionPage对象
@ -311,7 +316,7 @@ class SessionPageSetter(DownloadSetter):
self._page.session.mount(url, adapter)
class WebPageSetter(ChromiumPageSetter, DownloadSetter):
class WebPageSetter(ChromiumPageSetter):
def __init__(self, page):
super().__init__(page)
self._session_setter = SessionPageSetter(self._page)
@ -345,7 +350,7 @@ class WebPageSetter(ChromiumPageSetter, DownloadSetter):
self._chromium_setter.user_agent(ua, platform)
class WebPageTabSetter(ChromiumBaseSetter, DownloadSetter):
class WebPageTabSetter(ChromiumBaseSetter):
def __init__(self, page):
super().__init__(page)
self._session_setter = SessionPageSetter(self._page)

View File

@ -50,16 +50,16 @@ class ChromiumBaseSetter(object):
def upload_files(self, files: Union[str, list, tuple]) -> None: ...
class DownloadSetter(object):
def download_path(self, path: Union[str, Path]) -> None: ...
def download_file_name(self, name: str) -> None: ...
class TabSetter(ChromiumBaseSetter, DownloadSetter):
class TabSetter(ChromiumBaseSetter):
def __init__(self, page): ...
class ChromiumPageSetter(ChromiumBaseSetter, DownloadSetter):
class ChromiumPageSetter(ChromiumBaseSetter):
_page: ChromiumPage = ...
def main_tab(self, tab_id: str = None) -> None: ...
@ -72,7 +72,7 @@ class ChromiumPageSetter(ChromiumBaseSetter, DownloadSetter):
def download_path(self, path: Union[str, Path]) -> None: ...
class SessionPageSetter(DownloadSetter):
class SessionPageSetter(object):
def __init__(self, page: SessionPage):
self._page: SessionPage = ...
@ -113,7 +113,7 @@ class SessionPageSetter(DownloadSetter):
def add_adapter(self, url: str, adapter: HTTPAdapter) -> None: ...
class WebPageSetter(ChromiumPageSetter, DownloadSetter):
class WebPageSetter(ChromiumPageSetter):
_page: WebPage = ...
_session_setter: SessionPageSetter = ...
_chromium_setter: ChromiumPageSetter = ...
@ -125,7 +125,7 @@ class WebPageSetter(ChromiumPageSetter, DownloadSetter):
def cookies(self, cookies) -> None: ...
class WebPageTabSetter(ChromiumBaseSetter, DownloadSetter):
class WebPageTabSetter(ChromiumBaseSetter):
_page: WebPage = ...
_session_setter: SessionPageSetter = ...
_chromium_setter: ChromiumBaseSetter = ...

View File

@ -193,9 +193,23 @@ class ChromiumPageWaiter(ChromiumBaseWaiter):
else:
return False
def browser_downloads_complete(self):
"""等待所有下载任务结束"""
pass
def browser_downloads_complete(self, timeout=None):
"""等待所有下载任务结束
:param timeout: 超时时间为None时无限等待
:return: 是否等待成功
"""
if not timeout:
while self._driver._dl_mgr._missions:
sleep(.5)
return True
else:
end_time = perf_counter() + timeout
while end_time > perf_counter():
if not self._driver._dl_mgr._missions:
return True
sleep(.5)
return False if self._driver._dl_mgr._missions else True
class ChromiumElementWaiter(object):

View File

@ -3,7 +3,7 @@
@Author : g1879
@Contact : g1879@qq.com
"""
from typing import Union, Optional
from typing import Union
from .chromium_base import ChromiumBase
from .chromium_element import ChromiumElement
@ -48,14 +48,10 @@ class ChromiumBaseWaiter(object):
class ChromiumPageWaiter(ChromiumBaseWaiter):
_driver: ChromiumPage = ...
# _listener: Union[NetworkListener, None] = ...
# def download_begin(self, timeout: float = 1.5) -> bool: ...
# def download_finish(self, timeout: float = None) -> bool: ...
def new_tab(self, timeout: float = None, raise_err: bool = None) -> bool: ...
def browser_downloads_complete(self, timeout: float = None) -> bool: ...
class ChromiumElementWaiter(object):
def __init__(self,