元素等待曾加enabled()、disabled()、disabled_or_delete();优化等待逻辑

This commit is contained in:
g1879 2023-03-09 18:29:49 +08:00
parent 44b225e550
commit c593981b13
4 changed files with 137 additions and 145 deletions

View File

@ -855,87 +855,6 @@ class ChromiumBase(BasePage):
return self.set.load_strategy
class Screencast(object):
def __init__(self, page):
self._page = page
self._path = None
self._quality = 100
def start(self, save_path=None, quality=None):
"""开始录屏
:param save_path: 录屏保存位置
:param quality: 录屏质量
:return: None
"""
self.set(save_path, quality)
if self._path is None:
raise ValueError('save_path必须设置。')
clean_folder(self._path)
self._page.driver.Page.screencastFrame = self._onScreencastFrame
self._page.run_cdp('Page.startScreencast', everyNthFrame=1, quality=self._quality)
def stop(self, to_mp4=False, video_name=None):
"""停止录屏
:param to_mp4: 是否合并成MP4格式
:param video_name: 视频文件名为None时以当前时间名命
:return: 文件路径
"""
self._page.driver.Page.screencastFrame = None
self._page.run_cdp('Page.stopScreencast')
if not to_mp4:
return str(Path(self._path).absolute())
if not str(video_name).isascii() or not str(self._path).isascii():
raise TypeError('转换成视频仅支持英文路径和文件名。')
try:
from cv2 import VideoWriter, imread
from numpy import fromfile, uint8
except ModuleNotFoundError:
raise ModuleNotFoundError('请先安装cv2pip install opencv-python')
pic_list = Path(self._path).glob('*.jpg')
img = imread(str(next(pic_list)))
imgInfo = img.shape
size = (imgInfo[1], imgInfo[0])
if video_name and not video_name.endswith('mp4'):
video_name = f'{video_name}.mp4'
name = f'{time()}.mp4' if not video_name else video_name
fourcc = 14
videoWrite = VideoWriter(f'{self._path}{sep}{name}', fourcc, 8, size)
for i in pic_list:
img = imread(str(i))
videoWrite.write(img)
clean_folder(self._path, ignore=(name,))
return f'{self._path}{sep}{name}'
def set(self, save_path=None, quality=None):
"""设置录屏参数
:param save_path: 保存路径
:param quality: 视频质量可取值0-100
:return:
"""
if save_path:
save_path = Path(save_path)
if save_path.exists() and save_path.is_file():
raise TypeError('save_path必须指定文件夹。')
save_path.mkdir(parents=True, exist_ok=True)
self._path = str(save_path)
if quality is not None:
if quality < 0 or quality > 100:
raise ValueError('quality必须在0-100之间。')
self._quality = quality
def _onScreencastFrame(self, **kwargs):
with open(f'{self._path}\\{kwargs["metadata"]["timestamp"]}.jpg', 'wb') as f:
f.write(b64decode(kwargs['data']))
self._page.run_cdp('Page.screencastFrameAck', sessionId=kwargs['sessionId'])
class ChromiumBaseSetter(object):
def __init__(self, page):
self._page = page
@ -1049,7 +968,10 @@ class ChromiumBaseWaiter(object):
:param timeout: 超时时间默认读取页面超时时间
:return: 是否等待成功
"""
return ChromiumElementWaiter(self._driver, loc_or_ele).delete(timeout)
if isinstance(loc_or_ele, (str, tuple)):
ele = self._driver._ele(loc_or_ele, timeout=.3, raise_err=False)
return ele.wait.delete(timeout) if ele else True
return loc_or_ele.wait.delete(timeout)
def ele_display(self, loc_or_ele, timeout=None):
"""等待元素变成显示状态
@ -1057,7 +979,8 @@ class ChromiumBaseWaiter(object):
:param timeout: 超时时间默认读取页面超时时间
:return: 是否等待成功
"""
return ChromiumElementWaiter(self._driver, loc_or_ele).display(timeout)
ele = self._driver._ele(loc_or_ele, raise_err=False)
return ele.wait.display(timeout)
def ele_hidden(self, loc_or_ele, timeout=None):
"""等待元素变成隐藏状态
@ -1065,7 +988,8 @@ class ChromiumBaseWaiter(object):
:param timeout: 超时时间默认读取页面超时时间
:return: 是否等待成功
"""
return ChromiumElementWaiter(self._driver, loc_or_ele).hidden(timeout)
ele = self._driver._ele(loc_or_ele, raise_err=False)
return ele.wait.hidden(timeout)
def load_start(self, timeout=None):
"""等待页面开始加载
@ -1211,3 +1135,84 @@ class PageScrollSetter(object):
b = 'smooth' if on_off else 'auto'
self._scroll._driver.run_js(f'document.documentElement.style.setProperty("scroll-behavior","{b}");')
self._scroll._wait_complete = on_off
class Screencast(object):
def __init__(self, page):
self._page = page
self._path = None
self._quality = 100
def start(self, save_path=None, quality=None):
"""开始录屏
:param save_path: 录屏保存位置
:param quality: 录屏质量
:return: None
"""
self.set(save_path, quality)
if self._path is None:
raise ValueError('save_path必须设置。')
clean_folder(self._path)
self._page.driver.Page.screencastFrame = self._onScreencastFrame
self._page.run_cdp('Page.startScreencast', everyNthFrame=1, quality=self._quality)
def stop(self, to_mp4=False, video_name=None):
"""停止录屏
:param to_mp4: 是否合并成MP4格式
:param video_name: 视频文件名为None时以当前时间名命
:return: 文件路径
"""
self._page.driver.Page.screencastFrame = None
self._page.run_cdp('Page.stopScreencast')
if not to_mp4:
return str(Path(self._path).absolute())
if not str(video_name).isascii() or not str(self._path).isascii():
raise TypeError('转换成视频仅支持英文路径和文件名。')
try:
from cv2 import VideoWriter, imread
from numpy import fromfile, uint8
except ModuleNotFoundError:
raise ModuleNotFoundError('请先安装cv2pip install opencv-python')
pic_list = Path(self._path).glob('*.jpg')
img = imread(str(next(pic_list)))
imgInfo = img.shape
size = (imgInfo[1], imgInfo[0])
if video_name and not video_name.endswith('mp4'):
video_name = f'{video_name}.mp4'
name = f'{time()}.mp4' if not video_name else video_name
fourcc = 14
videoWrite = VideoWriter(f'{self._path}{sep}{name}', fourcc, 8, size)
for i in pic_list:
img = imread(str(i))
videoWrite.write(img)
clean_folder(self._path, ignore=(name,))
return f'{self._path}{sep}{name}'
def set(self, save_path=None, quality=None):
"""设置录屏参数
:param save_path: 保存路径
:param quality: 视频质量可取值0-100
:return:
"""
if save_path:
save_path = Path(save_path)
if save_path.exists() and save_path.is_file():
raise TypeError('save_path必须指定文件夹。')
save_path.mkdir(parents=True, exist_ok=True)
self._path = str(save_path)
if quality is not None:
if quality < 0 or quality > 100:
raise ValueError('quality必须在0-100之间。')
self._quality = quality
def _onScreencastFrame(self, **kwargs):
with open(f'{self._path}\\{kwargs["metadata"]["timestamp"]}.jpg', 'wb') as f:
f.write(b64decode(kwargs['data']))
self._page.run_cdp('Page.screencastFrameAck', sessionId=kwargs['sessionId'])

View File

@ -1908,7 +1908,6 @@ class ChromiumSelect(object):
eles[0].run_js(f'this.selected=true;')
return True
def cancel_by_text(self, text, timeout=None):
"""此方法用于根据text值取消选择项。当元素是多选列表时可以接收list或tuple
:param text: 文本传入list或tuple可取消多项
@ -2017,107 +2016,91 @@ class ChromiumSelect(object):
class ChromiumElementWaiter(object):
"""等待元素在dom中某种状态如删除、显示、隐藏"""
def __init__(self, page_or_ele, loc_or_ele):
def __init__(self, page, ele):
"""等待元素在dom中某种状态如删除、显示、隐藏
:param page_or_ele: 页面或父元素
:param loc_or_ele: 要等待的元素可以是已有元素定位符
:param page: 元素所在页面
:param ele: 要等待的元素
"""
if not isinstance(loc_or_ele, (str, tuple, ChromiumElement)):
raise TypeError('loc_or_ele只能接收定位符或元素对象。')
self._driver = page_or_ele
self._loc_or_ele = loc_or_ele
self._page = page
self._ele = ele
def delete(self, timeout=None):
"""等待元素从dom删除
:param timeout: 超时时间为None使用元素所在页面timeout属性
:return: 是否等待成功
"""
if timeout is None:
timeout = self._driver.page.timeout if isinstance(self._driver, ChromiumElement) else self._driver.timeout
if isinstance(self._loc_or_ele, ChromiumElement):
end_time = perf_counter() + timeout
while perf_counter() < end_time:
if not self._loc_or_ele.states.is_alive:
return True
ele = self._driver._ele(self._loc_or_ele, timeout=.5, raise_err=False)
if not ele:
return True
end_time = perf_counter() + timeout
while perf_counter() < end_time:
if not ele.states.is_alive:
return True
return False
return self._wait_state('is_alive', False, timeout)
def display(self, timeout=None):
"""等待元素从dom显示
:param timeout: 超时时间为None使用元素所在页面timeout属性
:return: 是否等待成功
"""
return self._wait_ele('display', timeout)
return self._wait_state('is_displayed', True, timeout)
def hidden(self, timeout=None):
"""等待元素从dom隐藏
:param timeout: 超时时间为None使用元素所在页面timeout属性
:return: 是否等待成功
"""
return self._wait_ele('hidden', timeout)
return self._wait_state('is_displayed', False, timeout)
def covered(self, timeout=None):
"""等待当前元素被遮盖
:param timeout:超时时间为None使用元素所在页面timeout属性
:return: 是否等待成功
"""
return self._covered(True, timeout)
return self._wait_state('is_covered', True, timeout)
def not_covered(self, timeout=None):
"""等待当前元素被遮盖
:param timeout:超时时间为None使用元素所在页面timeout属性
:return: 是否等待成功
"""
return self._covered(False, timeout)
return self._wait_state('is_covered', False, timeout)
def _covered(self, mode=False, timeout=None):
"""等待当前元素被遮盖
:param mode: True表示被遮盖False表示不被遮盖
:param timeout: 超时时间为None使用元素所在页面timeout属性
def enabled(self, timeout=None):
"""等待当前元素变成可用
:param timeout:超时时间为None使用元素所在页面timeout属性
:return: 是否等待成功
"""
return self._wait_state('is_enabled', True, timeout)
def disabled(self, timeout=None):
"""等待当前元素变成可用
:param timeout:超时时间为None使用元素所在页面timeout属性
:return: 是否等待成功
"""
return self._wait_state('is_enabled', False, timeout)
def disabled_or_delete(self, timeout=None):
"""等待当前元素变成不可用或从DOM移除
:param timeout:超时时间为None使用元素所在页面timeout属性
:return: 是否等待成功
"""
if timeout is None:
timeout = self._driver.page.timeout if isinstance(self._driver, ChromiumElement) else self._driver.timeout
timeout = self._page.timeout
end_time = perf_counter() + timeout
while perf_counter() < end_time:
if self._loc_or_ele.states.is_covered == mode:
if not self._ele.states.is_enabled or not self._ele.states.is_alive:
return True
sleep(.05)
return False
def _wait_ele(self, mode, timeout=None):
"""执行等待
:param mode: 等待模式
:param timeout: 超时时间
def _wait_state(self, attr, mode=False, timeout=None):
"""等待元素某个bool状态到达指定状态
:param attr: 状态名称
:param mode: True或False
:param timeout: 超时时间为None使用元素所在页面timeout属性
:return: 是否等待成功
"""
if timeout is None:
timeout = self._driver.page.timeout if isinstance(self._driver, ChromiumElement) else self._driver.timeout
target = self._driver._ele(self._loc_or_ele, raise_err=False)
if not target:
return None
timeout = self._page.timeout
end_time = perf_counter() + timeout
while perf_counter() < end_time:
if mode == 'display' and target.states.is_displayed:
if self._ele.states.__getattribute__(attr) == mode:
return True
elif mode == 'hidden' and not target.states.is_displayed:
return True
sleep(.05)
return False

View File

@ -535,10 +535,10 @@ class ChromiumSelect(object):
class ChromiumElementWaiter(object):
def __init__(self,
page_or_ele: Union[ChromiumBase, ChromiumElement],
loc_or_ele: Union[str, tuple, ChromiumElement]):
self._loc_or_ele: Union[str, tuple, ChromiumElement] = ...
self._driver: Union[ChromiumPage, ChromiumPage] = ...
page: ChromiumBase,
ele: ChromiumElement):
self._ele: ChromiumElement = ...
self._page: ChromiumBase = ...
def delete(self, timeout: float = None) -> bool: ...
@ -546,13 +546,17 @@ class ChromiumElementWaiter(object):
def hidden(self, timeout: float = None) -> bool: ...
def _covered(self, mode: bool = False, timeout: float = None) -> bool: ...
def covered(self, timeout: float = None) -> bool: ...
def not_covered(self, timeout: float = None) -> bool: ...
def _wait_ele(self, mode: str, timeout: float = None) -> Union[None, bool]: ...
def enabled(self, timeout: float = None) -> bool: ...
def disabled(self, timeout: float = None) -> bool: ...
def disabled_or_delete(self, timeout: float = None) -> bool: ...
def _wait_state(self, attr: str, mode: bool = False, timeout: float = None) -> bool: ...
class Pseudo(object):

View File

@ -491,7 +491,7 @@ class WebPageSetter(ChromiumPageSetter):
self._session_setter = SessionPageSetter(self._page)
self._chromium_setter = ChromiumPageSetter(self._page)
def cookies(self, cookies, set_session=False, set_driver=False):
def cookies(self, cookies, set_session=True, set_driver=True):
"""添加cookies信息到浏览器或session对象
:param cookies: 可以接收`CookieJar``list``tuple``str``dict`格式的`cookies`
:param set_session: 是否设置到Session对象