diff --git a/DrissionPage/chromium_base.py b/DrissionPage/chromium_base.py index d69eb48..55e3fba 100644 --- a/DrissionPage/chromium_base.py +++ b/DrissionPage/chromium_base.py @@ -855,87 +855,6 @@ class ChromiumBase(BasePage): return self.set.load_strategy -class Screencast(object): - def __init__(self, page): - self._page = page - self._path = None - self._quality = 100 - - def start(self, save_path=None, quality=None): - """开始录屏 - :param save_path: 录屏保存位置 - :param quality: 录屏质量 - :return: None - """ - self.set(save_path, quality) - if self._path is None: - raise ValueError('save_path必须设置。') - clean_folder(self._path) - self._page.driver.Page.screencastFrame = self._onScreencastFrame - self._page.run_cdp('Page.startScreencast', everyNthFrame=1, quality=self._quality) - - def stop(self, to_mp4=False, video_name=None): - """停止录屏 - :param to_mp4: 是否合并成MP4格式 - :param video_name: 视频文件名,为None时以当前时间名命 - :return: 文件路径 - """ - self._page.driver.Page.screencastFrame = None - self._page.run_cdp('Page.stopScreencast') - if not to_mp4: - return str(Path(self._path).absolute()) - - if not str(video_name).isascii() or not str(self._path).isascii(): - raise TypeError('转换成视频仅支持英文路径和文件名。') - - try: - from cv2 import VideoWriter, imread - from numpy import fromfile, uint8 - except ModuleNotFoundError: - raise ModuleNotFoundError('请先安装cv2,pip install opencv-python') - - pic_list = Path(self._path).glob('*.jpg') - img = imread(str(next(pic_list))) - imgInfo = img.shape - size = (imgInfo[1], imgInfo[0]) - - if video_name and not video_name.endswith('mp4'): - video_name = f'{video_name}.mp4' - name = f'{time()}.mp4' if not video_name else video_name - fourcc = 14 - videoWrite = VideoWriter(f'{self._path}{sep}{name}', fourcc, 8, size) - - for i in pic_list: - img = imread(str(i)) - videoWrite.write(img) - - clean_folder(self._path, ignore=(name,)) - return f'{self._path}{sep}{name}' - - def set(self, save_path=None, quality=None): - """设置录屏参数 - :param save_path: 保存路径 - :param quality: 视频质量,可取值0-100 - :return: - """ - if save_path: - save_path = Path(save_path) - if save_path.exists() and save_path.is_file(): - raise TypeError('save_path必须指定文件夹。') - save_path.mkdir(parents=True, exist_ok=True) - self._path = str(save_path) - - if quality is not None: - if quality < 0 or quality > 100: - raise ValueError('quality必须在0-100之间。') - self._quality = quality - - def _onScreencastFrame(self, **kwargs): - with open(f'{self._path}\\{kwargs["metadata"]["timestamp"]}.jpg', 'wb') as f: - f.write(b64decode(kwargs['data'])) - self._page.run_cdp('Page.screencastFrameAck', sessionId=kwargs['sessionId']) - - class ChromiumBaseSetter(object): def __init__(self, page): self._page = page @@ -1049,7 +968,10 @@ class ChromiumBaseWaiter(object): :param timeout: 超时时间,默认读取页面超时时间 :return: 是否等待成功 """ - return ChromiumElementWaiter(self._driver, loc_or_ele).delete(timeout) + if isinstance(loc_or_ele, (str, tuple)): + ele = self._driver._ele(loc_or_ele, timeout=.3, raise_err=False) + return ele.wait.delete(timeout) if ele else True + return loc_or_ele.wait.delete(timeout) def ele_display(self, loc_or_ele, timeout=None): """等待元素变成显示状态 @@ -1057,7 +979,8 @@ class ChromiumBaseWaiter(object): :param timeout: 超时时间,默认读取页面超时时间 :return: 是否等待成功 """ - return ChromiumElementWaiter(self._driver, loc_or_ele).display(timeout) + ele = self._driver._ele(loc_or_ele, raise_err=False) + return ele.wait.display(timeout) def ele_hidden(self, loc_or_ele, timeout=None): """等待元素变成隐藏状态 @@ -1065,7 +988,8 @@ class ChromiumBaseWaiter(object): :param timeout: 超时时间,默认读取页面超时时间 :return: 是否等待成功 """ - return ChromiumElementWaiter(self._driver, loc_or_ele).hidden(timeout) + ele = self._driver._ele(loc_or_ele, raise_err=False) + return ele.wait.hidden(timeout) def load_start(self, timeout=None): """等待页面开始加载 @@ -1211,3 +1135,84 @@ class PageScrollSetter(object): b = 'smooth' if on_off else 'auto' self._scroll._driver.run_js(f'document.documentElement.style.setProperty("scroll-behavior","{b}");') self._scroll._wait_complete = on_off + + +class Screencast(object): + def __init__(self, page): + self._page = page + self._path = None + self._quality = 100 + + def start(self, save_path=None, quality=None): + """开始录屏 + :param save_path: 录屏保存位置 + :param quality: 录屏质量 + :return: None + """ + self.set(save_path, quality) + if self._path is None: + raise ValueError('save_path必须设置。') + clean_folder(self._path) + self._page.driver.Page.screencastFrame = self._onScreencastFrame + self._page.run_cdp('Page.startScreencast', everyNthFrame=1, quality=self._quality) + + def stop(self, to_mp4=False, video_name=None): + """停止录屏 + :param to_mp4: 是否合并成MP4格式 + :param video_name: 视频文件名,为None时以当前时间名命 + :return: 文件路径 + """ + self._page.driver.Page.screencastFrame = None + self._page.run_cdp('Page.stopScreencast') + if not to_mp4: + return str(Path(self._path).absolute()) + + if not str(video_name).isascii() or not str(self._path).isascii(): + raise TypeError('转换成视频仅支持英文路径和文件名。') + + try: + from cv2 import VideoWriter, imread + from numpy import fromfile, uint8 + except ModuleNotFoundError: + raise ModuleNotFoundError('请先安装cv2,pip install opencv-python') + + pic_list = Path(self._path).glob('*.jpg') + img = imread(str(next(pic_list))) + imgInfo = img.shape + size = (imgInfo[1], imgInfo[0]) + + if video_name and not video_name.endswith('mp4'): + video_name = f'{video_name}.mp4' + name = f'{time()}.mp4' if not video_name else video_name + fourcc = 14 + videoWrite = VideoWriter(f'{self._path}{sep}{name}', fourcc, 8, size) + + for i in pic_list: + img = imread(str(i)) + videoWrite.write(img) + + clean_folder(self._path, ignore=(name,)) + return f'{self._path}{sep}{name}' + + def set(self, save_path=None, quality=None): + """设置录屏参数 + :param save_path: 保存路径 + :param quality: 视频质量,可取值0-100 + :return: + """ + if save_path: + save_path = Path(save_path) + if save_path.exists() and save_path.is_file(): + raise TypeError('save_path必须指定文件夹。') + save_path.mkdir(parents=True, exist_ok=True) + self._path = str(save_path) + + if quality is not None: + if quality < 0 or quality > 100: + raise ValueError('quality必须在0-100之间。') + self._quality = quality + + def _onScreencastFrame(self, **kwargs): + with open(f'{self._path}\\{kwargs["metadata"]["timestamp"]}.jpg', 'wb') as f: + f.write(b64decode(kwargs['data'])) + self._page.run_cdp('Page.screencastFrameAck', sessionId=kwargs['sessionId']) diff --git a/DrissionPage/chromium_element.py b/DrissionPage/chromium_element.py index 7d59f95..619c724 100644 --- a/DrissionPage/chromium_element.py +++ b/DrissionPage/chromium_element.py @@ -1908,7 +1908,6 @@ class ChromiumSelect(object): eles[0].run_js(f'this.selected=true;') return True - def cancel_by_text(self, text, timeout=None): """此方法用于根据text值取消选择项。当元素是多选列表时,可以接收list或tuple :param text: 文本,传入list或tuple可取消多项 @@ -2017,107 +2016,91 @@ class ChromiumSelect(object): class ChromiumElementWaiter(object): """等待元素在dom中某种状态,如删除、显示、隐藏""" - def __init__(self, page_or_ele, loc_or_ele): + def __init__(self, page, ele): """等待元素在dom中某种状态,如删除、显示、隐藏 - :param page_or_ele: 页面或父元素 - :param loc_or_ele: 要等待的元素,可以是已有元素、定位符 + :param page: 元素所在页面 + :param ele: 要等待的元素 """ - if not isinstance(loc_or_ele, (str, tuple, ChromiumElement)): - raise TypeError('loc_or_ele只能接收定位符或元素对象。') - - self._driver = page_or_ele - self._loc_or_ele = loc_or_ele + self._page = page + self._ele = ele def delete(self, timeout=None): """等待元素从dom删除 :param timeout: 超时时间,为None使用元素所在页面timeout属性 :return: 是否等待成功 """ - if timeout is None: - timeout = self._driver.page.timeout if isinstance(self._driver, ChromiumElement) else self._driver.timeout - - if isinstance(self._loc_or_ele, ChromiumElement): - end_time = perf_counter() + timeout - while perf_counter() < end_time: - if not self._loc_or_ele.states.is_alive: - return True - - ele = self._driver._ele(self._loc_or_ele, timeout=.5, raise_err=False) - if not ele: - return True - - end_time = perf_counter() + timeout - while perf_counter() < end_time: - if not ele.states.is_alive: - return True - - return False + return self._wait_state('is_alive', False, timeout) def display(self, timeout=None): """等待元素从dom显示 :param timeout: 超时时间,为None使用元素所在页面timeout属性 :return: 是否等待成功 """ - return self._wait_ele('display', timeout) + return self._wait_state('is_displayed', True, timeout) def hidden(self, timeout=None): """等待元素从dom隐藏 :param timeout: 超时时间,为None使用元素所在页面timeout属性 :return: 是否等待成功 """ - return self._wait_ele('hidden', timeout) + return self._wait_state('is_displayed', False, timeout) def covered(self, timeout=None): """等待当前元素被遮盖 :param timeout:超时时间,为None使用元素所在页面timeout属性 :return: 是否等待成功 """ - return self._covered(True, timeout) + return self._wait_state('is_covered', True, timeout) def not_covered(self, timeout=None): """等待当前元素被遮盖 :param timeout:超时时间,为None使用元素所在页面timeout属性 :return: 是否等待成功 """ - return self._covered(False, timeout) + return self._wait_state('is_covered', False, timeout) - def _covered(self, mode=False, timeout=None): - """等待当前元素被遮盖 - :param mode: True表示被遮盖,False表示不被遮盖 - :param timeout: 超时时间,为None使用元素所在页面timeout属性 + def enabled(self, timeout=None): + """等待当前元素变成可用 + :param timeout:超时时间,为None使用元素所在页面timeout属性 + :return: 是否等待成功 + """ + return self._wait_state('is_enabled', True, timeout) + + def disabled(self, timeout=None): + """等待当前元素变成可用 + :param timeout:超时时间,为None使用元素所在页面timeout属性 + :return: 是否等待成功 + """ + return self._wait_state('is_enabled', False, timeout) + + def disabled_or_delete(self, timeout=None): + """等待当前元素变成不可用或从DOM移除 + :param timeout:超时时间,为None使用元素所在页面timeout属性 :return: 是否等待成功 """ if timeout is None: - timeout = self._driver.page.timeout if isinstance(self._driver, ChromiumElement) else self._driver.timeout + timeout = self._page.timeout end_time = perf_counter() + timeout while perf_counter() < end_time: - if self._loc_or_ele.states.is_covered == mode: + if not self._ele.states.is_enabled or not self._ele.states.is_alive: return True sleep(.05) return False - def _wait_ele(self, mode, timeout=None): - """执行等待 - :param mode: 等待模式 - :param timeout: 超时时间 + def _wait_state(self, attr, mode=False, timeout=None): + """等待元素某个bool状态到达指定状态 + :param attr: 状态名称 + :param mode: True或False + :param timeout: 超时时间,为None使用元素所在页面timeout属性 :return: 是否等待成功 """ if timeout is None: - timeout = self._driver.page.timeout if isinstance(self._driver, ChromiumElement) else self._driver.timeout - - target = self._driver._ele(self._loc_or_ele, raise_err=False) - if not target: - return None - + timeout = self._page.timeout end_time = perf_counter() + timeout while perf_counter() < end_time: - if mode == 'display' and target.states.is_displayed: + if self._ele.states.__getattribute__(attr) == mode: return True - - elif mode == 'hidden' and not target.states.is_displayed: - return True - sleep(.05) return False diff --git a/DrissionPage/chromium_element.pyi b/DrissionPage/chromium_element.pyi index 4ea0b6e..98f6dcd 100644 --- a/DrissionPage/chromium_element.pyi +++ b/DrissionPage/chromium_element.pyi @@ -535,10 +535,10 @@ class ChromiumSelect(object): class ChromiumElementWaiter(object): def __init__(self, - page_or_ele: Union[ChromiumBase, ChromiumElement], - loc_or_ele: Union[str, tuple, ChromiumElement]): - self._loc_or_ele: Union[str, tuple, ChromiumElement] = ... - self._driver: Union[ChromiumPage, ChromiumPage] = ... + page: ChromiumBase, + ele: ChromiumElement): + self._ele: ChromiumElement = ... + self._page: ChromiumBase = ... def delete(self, timeout: float = None) -> bool: ... @@ -546,13 +546,17 @@ class ChromiumElementWaiter(object): def hidden(self, timeout: float = None) -> bool: ... - def _covered(self, mode: bool = False, timeout: float = None) -> bool: ... - def covered(self, timeout: float = None) -> bool: ... def not_covered(self, timeout: float = None) -> bool: ... - def _wait_ele(self, mode: str, timeout: float = None) -> Union[None, bool]: ... + def enabled(self, timeout: float = None) -> bool: ... + + def disabled(self, timeout: float = None) -> bool: ... + + def disabled_or_delete(self, timeout: float = None) -> bool: ... + + def _wait_state(self, attr: str, mode: bool = False, timeout: float = None) -> bool: ... class Pseudo(object): diff --git a/DrissionPage/web_page.py b/DrissionPage/web_page.py index 2a853c0..70d285d 100644 --- a/DrissionPage/web_page.py +++ b/DrissionPage/web_page.py @@ -491,7 +491,7 @@ class WebPageSetter(ChromiumPageSetter): self._session_setter = SessionPageSetter(self._page) self._chromium_setter = ChromiumPageSetter(self._page) - def cookies(self, cookies, set_session=False, set_driver=False): + def cookies(self, cookies, set_session=True, set_driver=True): """添加cookies信息到浏览器或session对象 :param cookies: 可以接收`CookieJar`、`list`、`tuple`、`str`、`dict`格式的`cookies` :param set_session: 是否设置到Session对象