diff --git a/DrissionPage/commons/tools.py b/DrissionPage/commons/tools.py index 1a70f15..5aa0077 100644 --- a/DrissionPage/commons/tools.py +++ b/DrissionPage/commons/tools.py @@ -7,7 +7,7 @@ from pathlib import Path from re import search, sub from shutil import rmtree from zipfile import ZipFile - +from time import perf_counter, sleep, time def get_exe_from_port(port): """获取端口号第一条进程的可执行文件路径 @@ -150,3 +150,39 @@ def unzip(zip_path, to_path): with ZipFile(zip_path, 'r') as f: return [f.extract(f.namelist()[0], path=to_path)] + +def wait_until(page, condition, timeout=10, poll=0.1, raise_err=True): + """等待返回值不为False或空,直到超时 + :param page (DrissionPage): DrissionPage对象 + :param condition (function | str | tuple): 等待条件,返回值不为False则停止等待 + :param timeout (float, optional): 超时时间 + :param poll (float, optional): 轮询间隔 + :param message (str, optional): 超时时的报错信息 + :param ignored_exceptions (bool, optional): 是否忽略异常 + :return: DP Element or bool + """ + end_time = perf_counter() + timeout + if isinstance(condition, str) or isinstance(condition, tuple): + if not callable(getattr(page, 's_ele', None)): + raise AttributeError('page对象缺少s_ele方法') + condition_method = lambda page: page.s_ele(condition) + elif callable(condition): + condition_method = condition + else: + raise ValueError('condition必须是函数或者字符串或者元组') + while perf_counter() < end_time: + try: + value = condition_method(page) + if value: + return value + except Exception as exc: + pass + + sleep(poll) + if perf_counter() > end_time: + break + + if raise_err: + raise TimeoutError('等待超时') + else: + return False \ No newline at end of file diff --git a/DrissionPage/commons/tools.pyi b/DrissionPage/commons/tools.pyi index a95722d..8830fc7 100644 --- a/DrissionPage/commons/tools.pyi +++ b/DrissionPage/commons/tools.pyi @@ -5,6 +5,7 @@ """ from pathlib import Path from typing import Union +from types import FunctionType def get_exe_from_port(port: Union[str, int]) -> Union[str, None]: ... @@ -29,3 +30,5 @@ def clean_folder(folder_path: Union[str, Path], ignore: Union[tuple, list] = Non def unzip(zip_path: str, to_path: str) -> Union[list, None]: ... + +def wait_until(page, condition: Union[FunctionType, str, tuple], timeout: float, poll: float, raise_err: bool): ... \ No newline at end of file