增加拦截文件选择框并自动输入路径功能

This commit is contained in:
g1879 2023-02-09 11:53:38 +08:00
parent 10a350c8ea
commit 7636ab98f3
3 changed files with 41 additions and 6 deletions

View File

@ -4,6 +4,7 @@
@Contact : g1879@qq.com
"""
from json import loads
from pathlib import Path
from time import perf_counter, sleep
from requests import Session
@ -54,13 +55,15 @@ class ChromiumBase(BasePage):
self._first_run = False
def _chromium_init(self):
"""浏览器初始设置"""
self._control_session = Session()
self._control_session.keep_alive = False
self._first_run = True
self._is_reading = False
self._upload_list = None
def _set_options(self):
"""用于设置浏览器运行参数"""
"""设置与s模式共用的运行参数便于被子类覆盖"""
self._timeouts = Timeout(self)
self._page_load_strategy = 'normal'
@ -190,6 +193,26 @@ class ChromiumBase(BasePage):
if self._debug_recorder:
self._debug_recorder.add_data((perf_counter(), '加载流程', 'navigated'))
def _onFileChooserOpened(self, **kwargs):
"""文件选择框打开时触发"""
if self._upload_list:
files = self._upload_list if kwargs['mode'] == 'selectMultiple' else self._upload_list[:1]
self._tab_obj.DOM.setFileInputFiles(files=files, backendNodeId=kwargs['backendNodeId'])
self._upload_list = []
def set_upload_files(self, files):
"""等待上传的文件路径
:param files: 文件路径列表或字符串字符串时多个文件用回车分隔
:return: None
"""
if self._upload_list is None:
self._tab_obj.Page.fileChooserOpened = self._onFileChooserOpened
self._tab_obj.Page.setInterceptFileChooserDialog(enabled=True)
if isinstance(files, str):
files = files.split('\n')
self._upload_list = [str(Path(i).absolute()) for i in files]
def __call__(self, loc_or_str, timeout=None):
"""在内部查找元素
ele = page('@id=ele_id')
@ -296,6 +319,11 @@ class ChromiumBase(BasePage):
"""返回用于设置页面加载策略的对象"""
return PageLoadStrategy(self)
@property
def upload_list(self):
"""返回等待上传文件列表"""
return self._upload_list
def set_timeouts(self, implicit=None, page_load=None, script=None):
"""设置超时时间,单位为秒
:param implicit: 查找元素超时时间

View File

@ -36,6 +36,7 @@ class ChromiumBase(BasePage):
self._root_id: str = ...
self._debug: bool = ...
self._debug_recorder: Recorder = ...
self._upload_list: list = ...
def _connect_browser(self,
addr_driver_opts: Union[str, ChromiumDriver, DriverOptions] = None,
@ -59,6 +60,10 @@ class ChromiumBase(BasePage):
def _onFrameNavigated(self, **kwargs): ...
def _onFileChooserOpened(self, **kwargs): ...
def set_upload_files(self, files: Union[str, list, tuple]) -> None: ...
def _set_options(self) -> None: ...
def __call__(self, loc_or_str: Union[Tuple[str, str], str, ChromiumElement],
@ -112,6 +117,9 @@ class ChromiumBase(BasePage):
@property
def set_page_load_strategy(self) -> PageLoadStrategy: ...
@property
def upload_list(self) -> list: ...
def set_timeouts(self, implicit: float = None, page_load: float = None, script: float = None) -> None: ...
def run_js(self, script: str, as_expr: bool = False, *args: Any) -> Any: ...
@ -169,7 +177,7 @@ class ChromiumBase(BasePage):
def run_cdp(self, cmd: str, **cmd_args) -> dict: ...
def set_user_agent(self, ua: str, platform:str=None) -> None: ...
def set_user_agent(self, ua: str, platform: str = None) -> None: ...
def get_session_storage(self, item: str = None) -> Union[str, dict, None]: ...

View File

@ -551,8 +551,8 @@ class ChromiumElement(DrissionElement):
except Exception:
self.click(by_js=True)
if clear:
self.clear(by_js=True)
if clear and vals != '\n':
self.clear(by_js=False)
# ------------处理字符-------------
if not isinstance(vals, (tuple, list)):
@ -1301,8 +1301,7 @@ def run_js(page_or_ele, script, as_expr=False, timeout=None, args=None, not_chan
exceptionDetails = res.get('exceptionDetails')
if exceptionDetails:
print(script)
raise RuntimeError(f'javascript错误: {exceptionDetails}')
raise RuntimeError(f'javascript{script}\n错误信息: {exceptionDetails}')
try:
return _parse_js_result(page, page_or_ele, res.get('result'))