基本完成下载功能完善,待测试

This commit is contained in:
g1879 2023-09-07 18:00:11 +08:00
parent 95717981c8
commit e5055decd8
13 changed files with 270 additions and 79 deletions

View File

@ -23,7 +23,7 @@ from .errors import ContextLossError, ElementLossError, AlertExistsError, CDPErr
from .network_listener import NetworkListener
from .session_element import make_session_ele
from .setter import ChromiumBaseSetter
from .waiter import ChromiumBaseWaiter
from .waiter import ChromiumBaseWaiter, DownloadMission
class ChromiumBase(BasePage):
@ -44,9 +44,12 @@ class ChromiumBase(BasePage):
self._set = None
self._screencast = None
self._listener = None
self._wait_download_flag = None
self._download_rename = None
self._download_path = ''
self._when_download_file_exists = 'rename'
self._download_missions = set()
if isinstance(address, int) or (isinstance(address, str) and address.isdigit()):
address = f'127.0.0.1:{address}'
@ -249,22 +252,7 @@ class ChromiumBase(BasePage):
def _onDownloadWillBegin(self, **kwargs):
"""下载即将开始时执行"""
if self._wait_download_flag is False:
self._page.run_cdp('Browser.cancelDownload', guid=kwargs['guid'])
if self._download_rename:
tmp = kwargs['suggestedFilename'].rsplit('.', 1)
ext_name = tmp[-1] if len(tmp) > 1 else ''
tmp = self._download_rename.rsplit('.', 1)
ext_rename = tmp[-1] if len(tmp) > 1 else ''
n = self._download_rename if ext_rename == ext_name else f'{self._download_rename}.{ext_name}'
self._download_rename = None
else:
n = kwargs['suggestedFilename']
self._page._dl_mgr.add_mission(kwargs['guid'], self.download_path, n)
self._wait_download_flag = {'url': kwargs['url'], 'name': n}
handle_download(self, kwargs)
def __call__(self, loc_or_str, timeout=None):
"""在内部查找元素
@ -1141,3 +1129,34 @@ class ScreencastMode(object):
def imgs_mode(self):
self._screencast._mode = 'imgs'
def handle_download(tab, kwargs):
"""在下载开始前处理任务
:param tab: 触发任务的tab对象
:param kwargs: 浏览器返回的数据
:return: None
"""
tab._page._dl_mgr._missions[kwargs['guid']] = None
if tab._download_rename:
tmp = kwargs['suggestedFilename'].rsplit('.', 1)
ext_name = tmp[-1] if len(tmp) > 1 else ''
tmp = tab._download_rename.rsplit('.', 1)
ext_rename = tmp[-1] if len(tmp) > 1 else ''
n = tab._download_rename if ext_rename == ext_name else f'{tab._download_rename}.{ext_name}'
tab._download_rename = None
else:
n = kwargs['suggestedFilename']
m = DownloadMission(tab, kwargs['guid'], tab.download_path, n, kwargs['url'])
tab._page._dl_mgr.add_mission(m)
tab._wait_download_flag = m
tab._download_missions.add(m)
if tab._wait_download_flag is False: # 取消该任务
m._set_done('canceled', True)
if tab._when_download_file_exists == 'skip' and (Path(m.path) / m.name).exists():
m._set_done('skipped', True)

View File

@ -47,6 +47,8 @@ class ChromiumBase(BasePage):
self._listener: NetworkListener = ...
self._wait_download_flag: bool = ...
self._download_rename: str = ...
self._when_download_file_exists: str = ...
self._download_missions: set = ...
def _connect_browser(self, tab_id: str = None) -> None: ...
@ -275,3 +277,6 @@ class ScreencastMode(object):
def frugal_imgs_mode(self) -> None: ...
def imgs_mode(self) -> None: ...
def handle_download(tab: ChromiumBase, kwargs: dict) -> None: ...

View File

@ -12,6 +12,7 @@ from .base import DrissionElement, BaseElement
from .commons.constants import FRAME_ELEMENT, NoneElement, Settings
from .commons.keys import keys_to_typing, keyDescriptionForString, keyDefinitions
from .commons.locator import get_loc
from .commons.tools import make_valid_name
from .commons.web import make_absolute_link, get_ele_txt, format_html, is_js_func, location_in_viewport, offset_scroll
from .errors import ContextLossError, ElementLossError, JavaScriptError, NoRectError, ElementNotFoundError, \
CDPError, NoResourceError, CanNotClickError
@ -474,15 +475,11 @@ class ChromiumElement(DrissionElement):
if not result:
return None
if result['base64Encoded']:
if base64_to_bytes:
from base64 import b64decode
data = b64decode(result['content'])
else:
data = result['content']
if result['base64Encoded'] and base64_to_bytes:
from base64 import b64decode
return b64decode(result['content'])
else:
data = result['content']
return data
return result['content']
def save(self, path=None, rename=None, timeout=None):
"""保存图片或其它有src属性的元素的资源
@ -497,6 +494,7 @@ class ChromiumElement(DrissionElement):
path = path or '.'
rename = rename or basename(self.prop('currentSrc'))
rename = make_valid_name(rename)
write_type = 'wb' if isinstance(data, bytes) else 'w'
Path(path).mkdir(parents=True, exist_ok=True)

View File

@ -78,6 +78,7 @@ class ChromiumFrame(ChromiumBase):
self.retry_interval = self._target_page.retry_interval
self._page_load_strategy = self._target_page.page_load_strategy
self._download_path = self._target_page.download_path
self._when_download_file_exists = self._target_page._when_download_file_exists
def _driver_init(self, tab_id):
"""避免出现服务器500错误

View File

@ -7,6 +7,7 @@ from shutil import move
from time import perf_counter, sleep
from .chromium_base import ChromiumBase, Timeout
from .chromium_base import handle_download
from .chromium_driver import ChromiumDriver
from .chromium_tab import ChromiumTab
from .commons.browser import connect_browser
@ -450,50 +451,56 @@ class BrowserDownloadManager(object):
page.set.download_path(page.download_path)
self._page.browser_driver.set_listener('Browser.downloadProgress', self._onDownloadProgress)
self._page.browser_driver.set_listener('Browser.downloadWillBegin', self._onDownloadWillBegin)
self._missions = {}
self._missions = set()
def add_mission(self, guid, path, name):
@property
def missions(self):
return self._missions
def add_mission(self, mission):
"""添加下载任务信息
:param guid: guid
:param path: 保存路径
:param name: 保存文件名
:param mission: DownloadMission对象
:return: None
"""
self._missions[guid] = {'path': path, 'name': name}
self._missions.add(mission)
def cancel(self, mission):
"""取消一个下载任务
:param mission: 任务对象
:return: None
"""
self._page.browser_driver.call_method('Browser.cancelDownload', guid=mission.id)
self._missions.remove(mission)
def _onDownloadWillBegin(self, **kwargs):
"""用于获取弹出新标签页触发的下载任务"""
sleep(.2)
sleep(.3)
if kwargs['guid'] not in self._missions:
if self._page._wait_download_flag is False:
self._page.run_cdp('Browser.cancelDownload', guid=kwargs['guid'])
if self._page._download_rename:
tmp = kwargs['suggestedFilename'].rsplit('.', 1)
ext_name = tmp[-1] if len(tmp) > 1 else ''
tmp = self._page._download_rename.rsplit('.', 1)
ext_rename = tmp[-1] if len(tmp) > 1 else ''
n = self._page._download_rename if ext_rename == ext_name else f'{self._page._download_rename}.{ext_name}'
self._download_rename = None
else:
n = kwargs['suggestedFilename']
self._page._dl_mgr.add_mission(kwargs['guid'], self._page.download_path, n)
self._wait_download_flag = {'url': kwargs['url'], 'name': n}
handle_download(self._page, kwargs)
def _onDownloadProgress(self, **kwargs):
"""下载状态变化时执行"""
if kwargs['state'] in ('completed', 'canceled') and kwargs['guid'] in self._missions:
guid = kwargs['guid']
if kwargs['state'] == 'completed':
path = self._missions[guid]['path']
name = self._missions[guid]['name']
form_path = f'{self._page.download_path}\\{guid}'
to_path = get_usable_path(f'{path}\\{name}')
move(form_path, to_path)
if kwargs['guid'] in self._missions:
mission = self._missions[kwargs['guid']]
# print(mission)
if kwargs['state'] == 'inProgress':
mission.state = 'running'
mission.received_bytes = kwargs['receivedBytes']
mission.total_bytes = kwargs['totalBytes']
self._missions.pop(guid)
elif kwargs['state'] == 'completed':
mission.received_bytes = kwargs['receivedBytes']
mission.total_bytes = kwargs['totalBytes']
form_path = f'{self._page.download_path}\\{mission.id}'
to_path = get_usable_path(f'{mission.path}\\{mission.name}')
move(form_path, to_path)
mission.final_path = to_path
mission.state = 'completed'
self._missions.pop(mission.id)
else:
mission.state = 'canceled'
self._missions.pop(mission.id)
class Alert(object):

View File

@ -3,14 +3,14 @@
@Author : g1879
@Contact : g1879@qq.com
"""
from typing import Union, Tuple, List
from typing import Union, Tuple, List, Dict, Optional, Set
from .chromium_base import ChromiumBase
from .chromium_driver import ChromiumDriver
from .chromium_tab import ChromiumTab
from .configs.chromium_options import ChromiumOptions
from .setter import ChromiumPageSetter
from .waiter import ChromiumPageWaiter
from .waiter import ChromiumPageWaiter, DownloadMission
class ChromiumPage(ChromiumBase):
@ -127,11 +127,16 @@ class ChromiumTabRect(object):
class BrowserDownloadManager(object):
_page: ChromiumPage = ...
_missions: dict = ...
_missions: Set[DownloadMission] = ...
def __init__(self, page: ChromiumPage): ...
def add_mission(self, guid: str, path: str, name: str) -> None: ...
@property
def missions(self) -> Set[DownloadMission]: ...
def add_mission(self, mission: DownloadMission) -> None: ...
def cancel(self, mission: DownloadMission) -> None: ...
def _onDownloadWillBegin(self, **kwargs) -> None: ...

View File

@ -30,6 +30,7 @@ class ChromiumTab(ChromiumBase):
self.retry_interval = self.page.retry_interval
self._page_load_strategy = self.page.page_load_strategy
self._download_path = self.page.download_path
self._when_download_file_exists = self.page._when_download_file_exists
def close(self):
"""关闭当前标签页"""
@ -59,7 +60,7 @@ class WebPageTab(SessionPage, ChromiumTab):
:param page: WebPage对象
:param tab_id: 要控制的标签页id
"""
self.page = page
self._page = page
self.address = page.address
self._debug = page._debug
self._debug_recorder = page._debug_recorder

View File

@ -178,14 +178,14 @@ def get_chrome_hwnds_from_pid(pid, title):
EnumWindows(callback, hwnds)
return hwnds
def wait_until(page, condition, timeout=10, poll=0.1, raise_err=True):
"""等待返回值不为False或空直到超时
:param page (DrissionPage): DrissionPage对象
:param condition (function | str | tuple): 等待条件返回值不为False则停止等待
:param timeout (float, optional): 超时时间
:param poll (float, optional): 轮询间隔
:param message (str, optional): 超时时的报错信息
:param ignored_exceptions (bool, optional): 是否忽略异常
:param page: DrissionPage对象
:param condition: 等待条件返回值不为False则停止等待
:param timeout: 超时时间
:param poll: 轮询间隔
:param raise_err: 是否抛出异常
:return: DP Element or bool
"""
end_time = perf_counter() + timeout
@ -204,11 +204,11 @@ def wait_until(page, condition, timeout=10, poll=0.1, raise_err=True):
return value
except Exception as exc:
pass
sleep(poll)
if perf_counter() > end_time:
break
if raise_err:
raise TimeoutError('等待超时')
else:

View File

@ -40,4 +40,5 @@ def get_browser_progress_id(progress: Union[popen, None], address: str) -> Union
def get_chrome_hwnds_from_pid(pid: Union[str, int], title: str) -> list: ...
def wait_until(page, condition: Union[FunctionType, str, tuple], timeout: float, poll: float, raise_err: bool): ...

View File

@ -133,6 +133,11 @@ class ChromiumBaseSetter(object):
"""
self._page._download_rename = name
def when_download_file_exists(self, mode):
if mode not in ('rename', 'overwrite', 'skip'):
raise ValueError(f"mode参数只能是'rename', 'overwrite', 'skip' 之一,现在是:{mode}")
self._page._when_download_file_exists = mode
class TabSetter(ChromiumBaseSetter):
def __init__(self, page):

View File

@ -54,6 +54,8 @@ class ChromiumBaseSetter(object):
def download_file_name(self, name: str) -> None: ...
def when_download_file_exists(self, mode: str) -> None: ...
class TabSetter(ChromiumBaseSetter):
def __init__(self, page): ...

View File

@ -1,4 +1,5 @@
# -*- coding:utf-8 -*-
from pathlib import Path
from time import sleep, perf_counter
from .commons.constants import Settings
@ -85,13 +86,13 @@ class ChromiumBaseWaiter(object):
while self._driver._upload_list:
sleep(.01)
def browser_download_begin(self, timeout=None, cancel=False):
def download_begin(self, timeout=None, cancel_it=False):
"""等待浏览器下载开始,可将其拦截
:param timeout: 超时时间None使用页面对象超时时间
:param cancel: 是否取消该任务
:param cancel_it: 是否取消该任务
:return: 成功返回任务信息dict失败返回False
"""
self._driver._wait_download_flag = False if cancel else True
self._driver._wait_download_flag = False if cancel_it else True
if timeout is None:
timeout = self._driver.timeout
@ -105,6 +106,32 @@ class ChromiumBaseWaiter(object):
self._driver._wait_download_flag = None
return r
def downloads_done(self, timeout=None, cancel_if_timeout=True):
"""等待所有浏览器下载任务结束
:param timeout: 超时时间为None时无限等待
:param cancel_if_timeout: 超时时是否取消剩余任务
:return: 是否等待成功
"""
if not timeout:
while self._driver._download_missions:
sleep(.5)
return True
else:
end_time = perf_counter() + timeout
while end_time > perf_counter():
if not self._driver._download_missions:
return True
sleep(.5)
if self._driver._download_missions:
if cancel_if_timeout:
for m in self._driver._download_missions:
m.cancel()
return False
else:
return True
def url_change(self, text, exclude=False, timeout=None, raise_err=None):
"""等待url变成包含或不包含指定文本
:param text: 用于识别的文本
@ -200,9 +227,10 @@ class ChromiumPageWaiter(ChromiumBaseWaiter):
else:
return False
def browser_downloads_complete(self, timeout=None):
"""等待所有下载任务结束
def all_downloads_done(self, timeout=None, cancel_if_timeout=True):
"""等待所有浏览器下载任务结束
:param timeout: 超时时间为None时无限等待
:param cancel_if_timeout: 超时时是否取消剩余任务
:return: 是否等待成功
"""
if not timeout:
@ -216,7 +244,14 @@ class ChromiumPageWaiter(ChromiumBaseWaiter):
if not self._driver._dl_mgr._missions:
return True
sleep(.5)
return False if self._driver._dl_mgr._missions else True
if self._driver._dl_mgr._missions:
if cancel_if_timeout:
for m in self._driver._dl_mgr._missions:
m.cancel()
return False
else:
return True
class ChromiumElementWaiter(object):
@ -341,3 +376,90 @@ class FrameWaiter(ChromiumBaseWaiter, ChromiumElementWaiter):
"""
super().__init__(frame)
super(ChromiumBaseWaiter, self).__init__(frame, frame.frame_ele)
class DownloadMission(object):
def __init__(self, tab, _id, path, name, url):
self.url = url
self.tab = tab
self.id = _id
self.path = path
self.name = name
self.state = 'waiting'
self.total_bytes = None
self.received_bytes = 0
self.final_path = None
def __repr__(self):
# return f'<DownloadMission {self.id} {self.state} {self.rate}>'
return f'<DownloadMission {id(self)} {self.rate}>'
@property
def rate(self):
"""以百分比形式返回下载进度"""
return round((self.received_bytes / self.total_bytes) * 100, 2) if self.total_bytes else None
def cancel(self):
"""取消该任务,如任务已完成,删除已下载的文件"""
self._set_done('canceled', True)
if self.final_path:
Path(self.final_path).unlink(True)
def wait(self, show=True, timeout=None, cancel_if_timeout=True):
"""等待任务结束
:param show: 是否显示下载信息
:param timeout: 超时时间为None则无限等待
:param cancel_if_timeout: 超时时是否取消任务
:return: 等待成功返回完整路径否则返回False
"""
if show:
print(f'url{self.url}')
t2 = perf_counter()
while self.name is None and perf_counter() - t2 < 4:
sleep(0.01)
print(f'文件名:{self.name}')
print(f'目标路径:{self.path}')
if timeout is None:
while self.id in self.tab._page._dl_mgr.missions:
if show:
print(f'\r{self.rate}% ', end='')
sleep(.2)
else:
running = True
end_time = perf_counter() + timeout
while perf_counter() < end_time:
if show:
print(f'\r{self.rate}% ', end='')
if self.id not in self.tab._page._dl_mgr.missions:
running = False
break
sleep(.2)
if running and cancel_if_timeout:
self.cancel()
if show:
if self.state == 'completed':
print(f'下载完成 {self.final_path}')
elif self.state == 'canceled':
print(f'下载取消')
elif self.state == 'skipped':
print(f'已跳过')
print()
return self.final_path if self.final_path else False
def _set_done(self, state, cancel=False, final_path=None):
"""设置任务结束
:param state: 任务状态
:param cancel: 是否取消
:param final_path: 最终路径
:return: None
"""
self.state = state
self.final_path = final_path
if cancel:
self.tab._page._dl_mgr.cancel(self)
self.tab._download_missions.remove(self)

View File

@ -3,7 +3,7 @@
@Author : g1879
@Contact : g1879@qq.com
"""
from typing import Union
from typing import Union, Optional
from .chromium_base import ChromiumBase
from .chromium_element import ChromiumElement
@ -37,7 +37,9 @@ class ChromiumBaseWaiter(object):
def upload_paths_inputted(self) -> None: ...
def browser_download_begin(self, timeout: float = None, cancel: bool = False) -> Union[dict, bool]: ...
def download_begin(self, timeout: float = None, cancel_it: bool = False) -> Union[DownloadMission, bool]: ...
def downloads_done(self, timeout: float = None, cancel_if_timeout: bool = True) -> bool: ...
def url_change(self, text: str, exclude: bool = False, timeout: float = None, raise_err: bool = None) -> bool: ...
@ -52,7 +54,7 @@ class ChromiumPageWaiter(ChromiumBaseWaiter):
def new_tab(self, timeout: float = None, raise_err: bool = None) -> bool: ...
def browser_downloads_complete(self, timeout: float = None) -> bool: ...
def all_downloads_done(self, timeout: float = None, cancel_if_timeout: bool = True) -> bool: ...
class ChromiumElementWaiter(object):
@ -85,3 +87,26 @@ class ChromiumElementWaiter(object):
class FrameWaiter(ChromiumBaseWaiter, ChromiumElementWaiter):
def __init__(self, frame: ChromiumFrame): ...
class DownloadMission(object):
tab: ChromiumBase = ...
url: str = ...
id: str = ...
path: str = ...
name: str = ...
state: str = ...
total_bytes: Optional[int] = ...
received_bytes: int = ...
final_path: Optional[str] = ...
def __init__(self, tab: ChromiumBase, _id: str, path: str, name: str, url: str): ...
@property
def rate(self) -> float: ...
def cancel(self) -> None: ...
def wait(self, show: bool = True, timeout=None, cancel_if_timeout=True) -> Union[bool, str]: ...
def _set_done(self, state: str, cancel: bool = False, final_path: str = None) -> None: ...