添加wait_until方法,支持自定义组合等待条件

This commit is contained in:
donggoing 2023-08-30 10:22:09 +08:00
parent e6f14d3710
commit c52799c609
2 changed files with 40 additions and 1 deletions

View File

@ -7,7 +7,7 @@ from pathlib import Path
from re import search, sub from re import search, sub
from shutil import rmtree from shutil import rmtree
from zipfile import ZipFile from zipfile import ZipFile
from time import perf_counter, sleep, time
def get_exe_from_port(port): def get_exe_from_port(port):
"""获取端口号第一条进程的可执行文件路径 """获取端口号第一条进程的可执行文件路径
@ -150,3 +150,39 @@ def unzip(zip_path, to_path):
with ZipFile(zip_path, 'r') as f: with ZipFile(zip_path, 'r') as f:
return [f.extract(f.namelist()[0], path=to_path)] return [f.extract(f.namelist()[0], path=to_path)]
def wait_until(page, condition, timeout=10, poll=0.1, raise_err=True):
"""等待返回值不为False或空直到超时
:param page (DrissionPage): DrissionPage对象
:param condition (function | str | tuple): 等待条件返回值不为False则停止等待
:param timeout (float, optional): 超时时间
:param poll (float, optional): 轮询间隔
:param message (str, optional): 超时时的报错信息
:param ignored_exceptions (bool, optional): 是否忽略异常
:return: DP Element or bool
"""
end_time = perf_counter() + timeout
if isinstance(condition, str) or isinstance(condition, tuple):
if not callable(getattr(page, 's_ele', None)):
raise AttributeError('page对象缺少s_ele方法')
condition_method = lambda page: page.s_ele(condition)
elif callable(condition):
condition_method = condition
else:
raise ValueError('condition必须是函数或者字符串或者元组')
while perf_counter() < end_time:
try:
value = condition_method(page)
if value:
return value
except Exception as exc:
pass
sleep(poll)
if perf_counter() > end_time:
break
if raise_err:
raise TimeoutError('等待超时')
else:
return False

View File

@ -5,6 +5,7 @@
""" """
from pathlib import Path from pathlib import Path
from typing import Union from typing import Union
from types import FunctionType
def get_exe_from_port(port: Union[str, int]) -> Union[str, None]: ... def get_exe_from_port(port: Union[str, int]) -> Union[str, None]: ...
@ -29,3 +30,5 @@ def clean_folder(folder_path: Union[str, Path], ignore: Union[tuple, list] = Non
def unzip(zip_path: str, to_path: str) -> Union[list, None]: ... def unzip(zip_path: str, to_path: str) -> Union[list, None]: ...
def wait_until(page, condition: Union[FunctionType, str, tuple], timeout: float, poll: float, raise_err: bool): ...