4.1.0.11(+)

适配130浏览器;
修复下载问题;
优化下载信息显示;
WebPage的post()会返回Response对象;
click.to_download()恢复new_tab参数,默认None;
This commit is contained in:
g1879 2024-11-08 10:31:17 +08:00
parent 9d3b7f3ec1
commit 7a5cc465db
8 changed files with 68 additions and 40 deletions

View File

@ -12,4 +12,4 @@ from ._pages.chromium_page import ChromiumPage
from ._pages.session_page import SessionPage
from ._pages.web_page import WebPage
__version__ = '4.1.0.10'
__version__ = '4.1.0.11'

View File

@ -361,7 +361,8 @@ class Chromium(object):
raise TypeError('tab_type只能是set、list、tuple、str、None。')
tabs = [i for i in tabs if ((title is None or title in i['title']) and (url is None or url in i['url'])
and (tab_type is None or i['type'] in tab_type))]
and (tab_type is None or i['type'] in tab_type)
and i['title'] != 'chrome-extension://neajdppkdcdipfabeoofebfddakdcjhd/audio.html')]
if as_id:
return [tab['id'] for tab in tabs]
with self._lock:

View File

@ -136,14 +136,14 @@ def str_to_xpath_loc(loc):
# 根据文本查找
elif loc.startswith('text='):
loc_str = f'//*[text()={_make_search_str(loc[5:])}]'
loc_str = f'//*[text()={_quotes_escape(loc[5:])}]'
elif loc.startswith('text:') and loc != 'text:':
loc_str = f'//*/text()[contains(., {_make_search_str(loc[5:])})]/..'
loc_str = f'//*/text()[contains(., {_quotes_escape(loc[5:])})]/..'
elif loc.startswith('text^') and loc != 'text^':
loc_str = f'//*/text()[starts-with(., {_make_search_str(loc[5:])})]/..'
loc_str = f'//*/text()[starts-with(., {_quotes_escape(loc[5:])})]/..'
elif loc.startswith('text$') and loc != 'text$':
loc_str = f'//*/text()[substring(., string-length(.) - string-length({_make_search_str(loc[5:])}) +1) = ' \
f'{_make_search_str(loc[5:])}]/..'
loc_str = (f'//*/text()[substring(., string-length(.) - string-length({_quotes_escape(loc[5:])}) +1) = '
f'{_quotes_escape(loc[5:])}]/..')
# 用xpath查找
elif loc.startswith(('xpath:', 'xpath=')) and loc not in ('xpath:', 'xpath='):
@ -156,7 +156,7 @@ def str_to_xpath_loc(loc):
# 根据文本模糊查找
elif loc:
loc_str = f'//*/text()[contains(., {_make_search_str(loc)})]/..'
loc_str = f'//*/text()[contains(., {_quotes_escape(loc)})]/..'
else:
loc_str = '//*'
@ -226,30 +226,30 @@ def _make_single_xpath_str(tag: str, text: str) -> tuple:
symbol = r[1]
if symbol == '=': # 精确查找
arg = 'text()' if r[0] in ('@text()', '@tx()') else r[0]
arg_str = f'{arg}={_make_search_str(r[2])}'
arg_str = f'{arg}={_quotes_escape(r[2])}'
elif symbol == '^': # 匹配开头
if r[0] in ('@text()', '@tx()'):
txt_str = f'/text()[starts-with(., {_make_search_str(r[2])})]/..'
txt_str = f'/text()[starts-with(., {_quotes_escape(r[2])})]/..'
arg_str = ''
else:
arg_str = f"starts-with({r[0]},{_make_search_str(r[2])})"
arg_str = f"starts-with({r[0]},{_quotes_escape(r[2])})"
elif symbol == '$': # 匹配结尾
if r[0] in ('@text()', '@tx()'):
txt_str = (f'/text()[substring(., string-length(.) - string-length({_make_search_str(r[2])}) '
f'+1) = {_make_search_str(r[2])}]/..')
txt_str = (f'/text()[substring(., string-length(.) - string-length('
f'{_quotes_escape(r[2])}) +1) = {_quotes_escape(r[2])}]/..')
arg_str = ''
else:
arg_str = (f'substring({r[0]}, string-length({r[0]}) - string-length({_make_search_str(r[2])}) '
f'+1) = {_make_search_str(r[2])}')
arg_str = (f'substring({r[0]}, string-length({r[0]}) - string-length('
f'{_quotes_escape(r[2])}) +1) = {_quotes_escape(r[2])}')
elif symbol == ':': # 模糊查找
if r[0] in ('@text()', '@tx()'):
txt_str = f'/text()[contains(., {_make_search_str(r[2])})]/..'
txt_str = f'/text()[contains(., {_quotes_escape(r[2])})]/..'
arg_str = ''
else:
arg_str = f"contains({r[0]},{_make_search_str(r[2])})"
arg_str = f"contains({r[0]},{_quotes_escape(r[2])})"
else:
raise ValueError(f'符号不正确:{symbol}')
@ -313,17 +313,17 @@ def _make_multi_xpath_str(tag: str, text: str) -> tuple:
txt = r[2]
if symbol == '=':
arg_str = f'{arg}={_make_search_str(txt)}'
arg_str = f'{arg}={_quotes_escape(txt)}'
elif symbol == ':':
arg_str = f'contains({arg},{_make_search_str(txt)})'
arg_str = f'contains({arg},{_quotes_escape(txt)})'
elif symbol == '^':
arg_str = f'starts-with({arg},{_make_search_str(txt)})'
arg_str = f'starts-with({arg},{_quotes_escape(txt)})'
elif symbol == '$':
arg_str = f'substring({arg}, string-length({arg}) - string-length({_make_search_str(txt)}) +1) ' \
f'= {_make_search_str(txt)}'
arg_str = (f'substring({arg}, string-length({arg}) - string-length('
f'{_quotes_escape(txt)}) +1) = {_quotes_escape(txt)}')
else:
raise ValueError(f'符号不正确:{symbol}')
@ -342,11 +342,14 @@ def _make_multi_xpath_str(tag: str, text: str) -> tuple:
return 'xpath', f'//*[{arg_str}]' if arg_str else f'//*'
def _make_search_str(search_str: str) -> str:
def _quotes_escape(search_str: str) -> str:
""""转义,不知何故不能直接用 \ 来转义
:param search_str: 查询字符串
:return: "转义后的字符串
"""
if '"' not in search_str:
return f'"{search_str}"'
parts = search_str.split('"')
parts_num = len(parts)
search_str = 'concat('

View File

@ -127,9 +127,8 @@ class WebPage(SessionPage, ChromiumPage, BasePage):
def post(self, url, show_errmsg=False, retry=None, interval=None, **kwargs):
if self.mode == 'd':
self.cookies_to_session()
super().post(url, show_errmsg, retry, interval, **kwargs)
return self.response
return super().post(url, show_errmsg, retry, interval, **kwargs)
super().post(url, show_errmsg, retry, interval, **kwargs)
return self.response
def ele(self, locator, index=1, timeout=None):
return (super(SessionPage, self).ele(locator, index=index, timeout=timeout)

View File

@ -5,11 +5,13 @@
@Copyright: (c) 2024 by g1879, Inc. All Rights Reserved.
@License : BSD 3-Clause.
"""
from pathlib import Path
from time import perf_counter, sleep
from .waiter import wait_mission
from .._functions.settings import Settings
from .._functions.web import offset_scroll
from .._units.downloader import TabDownloadSettings
from ..errors import CanNotClickError, CDPError, NoRectError, AlertExistsError
@ -118,16 +120,32 @@ class Clicker(object):
def multi(self, times=2):
return self.at(count=times)
def to_download(self, save_path=None, rename=None, suffix=None, by_js=False, timeout=None, new_tab=None):
# 即将废弃new_tab参数
def to_download(self, save_path=None, rename=None, suffix=None, new_tab=None, by_js=False, timeout=None):
if not self._ele.tab._browser._dl_mgr._running:
self._ele.tab._browser.set.download_path('.')
when_file_exists = None
tmp_path = None
if self._ele.tab._type.endswith('Page'):
obj = browser = self._ele.owner._browser
tid = 'browser'
elif new_tab:
obj = browser = self._ele.owner._browser
tid = 'browser'
t_settings = TabDownloadSettings(self._ele.owner.tab_id)
b_settings = TabDownloadSettings('browser')
when_file_exists = b_settings.when_file_exists
b_settings.when_file_exists = t_settings.when_file_exists
b_settings.rename = t_settings.rename
b_settings.suffix = t_settings.suffix
t_settings.rename = None
t_settings.suffix = None
if not save_path and b_settings.path != t_settings.path:
tmp_path = b_settings.path
b_settings.path = t_settings.path
else:
obj = self._ele.owner._tab
browser = obj.browser
@ -136,7 +154,7 @@ class Clicker(object):
if save_path:
tmp_path = obj.download_path
obj.set.download_path(save_path)
TabDownloadSettings(tid).path = str(Path(save_path).absolute())
if rename or suffix:
obj.set.download_file_name(rename, suffix)
if timeout is None:
@ -147,9 +165,13 @@ class Clicker(object):
m = wait_mission(browser, tid, timeout)
if tmp_path:
obj.set.download_path(tmp_path)
TabDownloadSettings(tid).path = tmp_path
if when_file_exists:
browser.set.when_download_file_exists(when_file_exists)
if m and new_tab:
self._ele.owner.browser._dl_mgr._tab_missions.setdefault(self._ele.owner.tab_id, set()).add(m)
m.from_tab = self._ele.owner.tab_id
browser._dl_mgr._waiting_tab.discard(self._ele.owner.tab_id)
return m
def to_upload(self, file_paths, by_js=False):

View File

@ -80,12 +80,14 @@ class Clicker(object):
save_path: Union[str, Path] = None,
rename: str = None,
suffix: str = None,
new_tab: bool = None,
by_js: bool = False,
timeout: float = None) -> DownloadMission:
"""点击触发下载
:param save_path: 保存路径为None保存在原来设置的如未设置保存到当前路径
:param rename: 重命名文件名
:param suffix: 指定文件后缀
:param new_tab: 下载任务是否从新标签页触发为None会自动获取如获取不到设为True
:param by_js: 是否用js方式点击逻辑与click()一致
:param timeout: 等待下载触发的超时时间为None则使用页面对象设置
:return: DownloadMission对象

View File

@ -65,17 +65,17 @@ class DownloadManager(object):
return self._flags.get(tab_id, None)
def get_tab_missions(self, tab_id):
return self._tab_missions.get(tab_id, [])
return self._tab_missions.get(tab_id, set())
def set_done(self, mission, state, final_path=None):
if mission.state not in ('canceled', 'skipped'):
mission.state = state
mission.final_path = final_path
if mission.tab_id in self._tab_missions and mission in self._tab_missions[mission.tab_id]:
self._tab_missions[mission.tab_id].remove(mission)
self._tab_missions[mission.tab_id].discard(mission)
if (mission.from_tab and mission.from_tab in self._tab_missions
and mission in self._tab_missions[mission.from_tab]):
self._tab_missions[mission.from_tab].remove(mission)
self._tab_missions[mission.from_tab].discard(mission)
self._missions.pop(mission.id, None)
mission._is_done = True
@ -104,7 +104,7 @@ class DownloadManager(object):
def _onDownloadWillBegin(self, **kwargs):
guid = kwargs['guid']
tab_id = self._browser._frames.get(kwargs['frameId'], 'browser')
tab = 'browser' if tab_id in ('browser', self._page_id) else tab_id
tab = 'browser' if tab_id in ('browser', self._page_id) or self.get_flag('browser') is not None else tab_id
opener = self._browser._relation.get(tab_id, None)
from_tab = None
if opener and opener in self._waiting_tab:
@ -148,7 +148,7 @@ class DownloadManager(object):
m = DownloadMission(self, tab_id, guid, settings.path, name, kwargs['url'], self._tmp_path, overwrite)
if from_tab:
m.from_tab = from_tab
self._tab_missions.setdefault(from_tab, []).append(m)
self._tab_missions.setdefault(from_tab, set()).add(m)
self._missions[guid] = m
if self.get_flag('browser') is False or self.get_flag(tab) is False: # 取消该任务
@ -156,7 +156,7 @@ class DownloadManager(object):
elif skip:
self.skip(m)
else:
self._tab_missions.setdefault(tab_id, []).append(m)
self._tab_missions.setdefault(tab_id, set()).add(m)
if self.get_flag('browser') is not None:
self._flags['browser'] = m
@ -261,7 +261,7 @@ class DownloadMission(object):
end_time = perf_counter()
while self.name is None and perf_counter() < end_time:
sleep(0.01)
print(f'文件名:{self.name}')
print(f'文件名:{self.name or "未知"}')
print(f'目录路径:{self.folder}')
if timeout is None:
@ -282,6 +282,7 @@ class DownloadMission(object):
if show:
if self.state == 'completed':
print('\r100% ', end='')
if self._overwrite is None:
print(f'完成并重命名 {self.final_path}')
elif self._overwrite is False:

View File

@ -5,7 +5,7 @@
@Copyright: (c) 2024 by g1879, Inc. All Rights Reserved.
@License : BSD 3-Clause.
"""
from typing import Dict, Optional, Union, Literal
from typing import Dict, Optional, Union, Literal, Set
from .._base.chromium import Chromium
from .._pages.chromium_base import ChromiumBase
@ -16,7 +16,7 @@ FILE_EXISTS = Literal['skip', 'rename', 'overwrite', 's', 'r', 'o']
class DownloadManager(object):
_browser: Chromium = ...
_missions: Dict[str, DownloadMission] = ...
_tab_missions: dict = ...
_tab_missions: Dict[str, Set[DownloadMission]] = ...
_flags: dict = ...
_waiting_tab: set = ...
_running: bool = ...