From 9ce271561faaa925564c4225a578d6d9c091935b Mon Sep 17 00:00:00 2001 From: g1879 Date: Fri, 3 Mar 2023 19:49:39 +0800 Subject: [PATCH] =?UTF-8?q?3.2.8=E5=A2=9E=E5=8A=A0=E5=BD=95=E5=B1=8F?= =?UTF-8?q?=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- DrissionPage/chromium_base.py | 97 ++++++++++++++++++++++++++++++++- DrissionPage/chromium_base.pyi | 19 +++++++ DrissionPage/chromium_driver.py | 5 +- DrissionPage/commons/tools.pyi | 2 +- DrissionPage/web_page.py | 1 + setup.py | 2 +- 6 files changed, 119 insertions(+), 7 deletions(-) diff --git a/DrissionPage/chromium_base.py b/DrissionPage/chromium_base.py index 0867264..6720398 100644 --- a/DrissionPage/chromium_base.py +++ b/DrissionPage/chromium_base.py @@ -3,9 +3,11 @@ @Author : g1879 @Contact : g1879@qq.com """ +from base64 import b64decode from json import loads, JSONDecodeError +from os import sep from pathlib import Path -from time import perf_counter, sleep +from time import perf_counter, sleep, time from warnings import warn from requests import Session @@ -15,7 +17,7 @@ from .chromium_driver import ChromiumDriver from .chromium_element import ChromiumScroll, ChromiumElement, run_js, make_chromium_ele, ChromiumElementWaiter from .commons.constants import HANDLE_ALERT_METHOD, ERROR, NoneElement from .commons.locator import get_loc -from .commons.tools import get_usable_path +from .commons.tools import get_usable_path, clean_folder from .commons.web import cookies_to_tuple from .errors import ContextLossError, ElementLossError, AlertExistsError, CallMethodError, TabClosedError, \ NoRectError, BrowserConnectError @@ -37,6 +39,7 @@ class ChromiumBase(BasePage): self._debug_recorder = None self._tab_obj = None self._set = None + self._screencast = None self._set_start_options(address, None) self._set_runtime_settings() @@ -324,6 +327,13 @@ class ChromiumBase(BasePage): self._set = ChromiumBaseSetter(self) return self._set + @property + def screencast(self): + """返回用于录屏的对象""" + if self._screencast is None: + self._screencast = Screencast(self) + return self._screencast + def run_cdp(self, cmd, **cmd_args): """执行Chrome DevTools Protocol语句 :param cmd: 协议项目 @@ -803,6 +813,89 @@ class ChromiumBase(BasePage): return self.set.load_strategy +class Screencast(object): + def __init__(self, page): + self._page = page + self._path = None + self._quality = 100 + + def start(self, save_path=None, quality=None): + """开始录屏 + :param save_path: 录屏保存位置 + :param quality: 录屏质量 + :return: None + """ + self.set(save_path, quality) + if self._path is None: + raise ValueError('save_path必须设置。') + if not self._path.isascii(): + raise TypeError('仅支持英文路径。') + clean_folder(self._path) + self._page.driver.Page.screencastFrame = self._onScreencastFrame + self._page.run_cdp('Page.startScreencast', everyNthFrame=1, quality=self._quality) + + def stop(self, to_mp4=True, video_name=None): + """停止录屏 + :param to_mp4: 是否合并成MP4格式 + :param video_name: 视频文件名,为None时以当前时间名命 + :return: 文件路径 + """ + self._page.driver.Page.screencastFrame = None + self._page.run_cdp('Page.stopScreencast') + if not to_mp4: + return str(Path(self._path).absolute()) + + if not str(video_name).isascii(): + raise TypeError('仅支持英文文件名。') + + try: + from cv2 import VideoWriter, imread + from numpy import fromfile, uint8 + except ModuleNotFoundError: + raise ModuleNotFoundError('请先安装cv2,pip install opencv-python') + + pic_list = Path(self._path).glob('*.jpg') + img = imread(str(next(pic_list))) + imgInfo = img.shape + size = (imgInfo[1], imgInfo[0]) + + if video_name and not video_name.endswith('mp4'): + video_name = f'{video_name}.mp4' + name = f'{time()}.mp4' if not video_name else video_name + fourcc = 14 + videoWrite = VideoWriter(f'{self._path}{sep}{name}', fourcc, 8, size) + + for i in pic_list: + img = imread(str(i)) + videoWrite.write(img) + + clean_folder(self._path, ignore=(name,)) + return f'{self._path}{sep}{name}' + + def set(self, save_path=None, quality=None): + """设置录屏参数 + :param save_path: 保存路径 + :param quality: 视频质量,可取值0-100 + :return: + """ + if save_path: + save_path = Path(save_path) + if save_path.exists() and save_path.is_file(): + raise TypeError('save_path必须指定文件夹。') + save_path.mkdir(parents=True, exist_ok=True) + self._path = str(save_path) + + if quality is not None: + if quality < 0 or quality > 100: + raise ValueError('quality必须在0-100之间。') + self._quality = quality + + def _onScreencastFrame(self, **kwargs): + with open(f'{self._path}\\{kwargs["metadata"]["timestamp"]}.jpg', 'wb') as f: + f.write(b64decode(kwargs['data'])) + self._page.run_cdp('Page.screencastFrameAck', sessionId=kwargs['sessionId']) + + class ChromiumBaseSetter(object): def __init__(self, page): self._page = page diff --git a/DrissionPage/chromium_base.pyi b/DrissionPage/chromium_base.pyi index 7d0d60a..c29412a 100644 --- a/DrissionPage/chromium_base.pyi +++ b/DrissionPage/chromium_base.pyi @@ -40,6 +40,7 @@ class ChromiumBase(BasePage): self._upload_list: list = ... self._wait: ChromiumBaseWaiter = ... self._set: ChromiumBaseSetter = ... + self._screencast: Screencast = ... def _connect_browser(self, tab_id: str = None) -> None: ... @@ -118,6 +119,9 @@ class ChromiumBase(BasePage): @property def set(self) -> ChromiumBaseSetter: ... + @property + def screencast(self) -> Screencast: ... + def run_js(self, script: str, *args: Any, as_expr: bool = False) -> Any: ... def run_js_loaded(self, script: str, *args: Any, as_expr: bool = False) -> Any: ... @@ -193,6 +197,21 @@ class ChromiumBase(BasePage): timeout: float = None) -> Union[bool, None]: ... +class Screencast(object): + def __init__(self, page: ChromiumBase): + self._page: ChromiumBase = ... + self._path: str = ... + self._quality: int = ... + + def start(self, save_path: Union[str, Path] = None, quality: int = None) -> None: ... + + def stop(self, to_mp4: bool = True, video_name: str = None) -> str: ... + + def set(self, save_path: Union[str, Path] = None, quality: int = None) -> None: ... + + def _onScreencastFrame(self, **kwargs) -> None: ... + + class ChromiumBaseWaiter(object): def __init__(self, page: ChromiumBase): self._driver: ChromiumBase = ... diff --git a/DrissionPage/chromium_driver.py b/DrissionPage/chromium_driver.py index 99268bb..0635774 100644 --- a/DrissionPage/chromium_driver.py +++ b/DrissionPage/chromium_driver.py @@ -149,9 +149,8 @@ class ChromiumDriver(object): if event['method'] in self.event_handlers: try: self.event_handlers[event['method']](**event['params']) - except Exception: - pass - # raise RuntimeError(f"回调函数 {event['method']} 错误:{e}") + except Exception as e: + raise RuntimeError(f"\n回调函数 {self.event_handlers[event['method']].__name__} 错误:\n{e}") self.event_queue.task_done() diff --git a/DrissionPage/commons/tools.pyi b/DrissionPage/commons/tools.pyi index a3673cb..a95722d 100644 --- a/DrissionPage/commons/tools.pyi +++ b/DrissionPage/commons/tools.pyi @@ -25,7 +25,7 @@ def get_long(txt) -> int: ... def port_is_using(ip: str, port: Union[str, int]) -> bool: ... -def clean_folder(folder_path: Union[str, Path], ignore: list = None) -> None: ... +def clean_folder(folder_path: Union[str, Path], ignore: Union[tuple, list] = None) -> None: ... def unzip(zip_path: str, to_path: str) -> Union[list, None]: ... diff --git a/DrissionPage/web_page.py b/DrissionPage/web_page.py index 42f704a..321e698 100644 --- a/DrissionPage/web_page.py +++ b/DrissionPage/web_page.py @@ -47,6 +47,7 @@ class WebPage(SessionPage, ChromiumPage, BasePage): self._response = None self._download_set = None self._set = None + self._screencast = None self._set_start_options(driver_or_options, session_or_options) self._set_runtime_settings() diff --git a/setup.py b/setup.py index 49f78f0..9bda126 100644 --- a/setup.py +++ b/setup.py @@ -6,7 +6,7 @@ with open("README.md", "r", encoding='utf-8') as fh: setup( name="DrissionPage", - version="3.2.7", + version="3.2.8", author="g1879", author_email="g1879@qq.com", description="Python based web automation tool. It can control the browser and send and receive data packets.",