mirror of
https://gitee.com/g1879/DrissionPage.git
synced 2024-12-10 04:00:23 +08:00
3.2.8增加录屏功能
This commit is contained in:
parent
c000bec826
commit
9ce271561f
@ -3,9 +3,11 @@
|
||||
@Author : g1879
|
||||
@Contact : g1879@qq.com
|
||||
"""
|
||||
from base64 import b64decode
|
||||
from json import loads, JSONDecodeError
|
||||
from os import sep
|
||||
from pathlib import Path
|
||||
from time import perf_counter, sleep
|
||||
from time import perf_counter, sleep, time
|
||||
from warnings import warn
|
||||
|
||||
from requests import Session
|
||||
@ -15,7 +17,7 @@ from .chromium_driver import ChromiumDriver
|
||||
from .chromium_element import ChromiumScroll, ChromiumElement, run_js, make_chromium_ele, ChromiumElementWaiter
|
||||
from .commons.constants import HANDLE_ALERT_METHOD, ERROR, NoneElement
|
||||
from .commons.locator import get_loc
|
||||
from .commons.tools import get_usable_path
|
||||
from .commons.tools import get_usable_path, clean_folder
|
||||
from .commons.web import cookies_to_tuple
|
||||
from .errors import ContextLossError, ElementLossError, AlertExistsError, CallMethodError, TabClosedError, \
|
||||
NoRectError, BrowserConnectError
|
||||
@ -37,6 +39,7 @@ class ChromiumBase(BasePage):
|
||||
self._debug_recorder = None
|
||||
self._tab_obj = None
|
||||
self._set = None
|
||||
self._screencast = None
|
||||
|
||||
self._set_start_options(address, None)
|
||||
self._set_runtime_settings()
|
||||
@ -324,6 +327,13 @@ class ChromiumBase(BasePage):
|
||||
self._set = ChromiumBaseSetter(self)
|
||||
return self._set
|
||||
|
||||
@property
|
||||
def screencast(self):
|
||||
"""返回用于录屏的对象"""
|
||||
if self._screencast is None:
|
||||
self._screencast = Screencast(self)
|
||||
return self._screencast
|
||||
|
||||
def run_cdp(self, cmd, **cmd_args):
|
||||
"""执行Chrome DevTools Protocol语句
|
||||
:param cmd: 协议项目
|
||||
@ -803,6 +813,89 @@ class ChromiumBase(BasePage):
|
||||
return self.set.load_strategy
|
||||
|
||||
|
||||
class Screencast(object):
|
||||
def __init__(self, page):
|
||||
self._page = page
|
||||
self._path = None
|
||||
self._quality = 100
|
||||
|
||||
def start(self, save_path=None, quality=None):
|
||||
"""开始录屏
|
||||
:param save_path: 录屏保存位置
|
||||
:param quality: 录屏质量
|
||||
:return: None
|
||||
"""
|
||||
self.set(save_path, quality)
|
||||
if self._path is None:
|
||||
raise ValueError('save_path必须设置。')
|
||||
if not self._path.isascii():
|
||||
raise TypeError('仅支持英文路径。')
|
||||
clean_folder(self._path)
|
||||
self._page.driver.Page.screencastFrame = self._onScreencastFrame
|
||||
self._page.run_cdp('Page.startScreencast', everyNthFrame=1, quality=self._quality)
|
||||
|
||||
def stop(self, to_mp4=True, video_name=None):
|
||||
"""停止录屏
|
||||
:param to_mp4: 是否合并成MP4格式
|
||||
:param video_name: 视频文件名,为None时以当前时间名命
|
||||
:return: 文件路径
|
||||
"""
|
||||
self._page.driver.Page.screencastFrame = None
|
||||
self._page.run_cdp('Page.stopScreencast')
|
||||
if not to_mp4:
|
||||
return str(Path(self._path).absolute())
|
||||
|
||||
if not str(video_name).isascii():
|
||||
raise TypeError('仅支持英文文件名。')
|
||||
|
||||
try:
|
||||
from cv2 import VideoWriter, imread
|
||||
from numpy import fromfile, uint8
|
||||
except ModuleNotFoundError:
|
||||
raise ModuleNotFoundError('请先安装cv2,pip install opencv-python')
|
||||
|
||||
pic_list = Path(self._path).glob('*.jpg')
|
||||
img = imread(str(next(pic_list)))
|
||||
imgInfo = img.shape
|
||||
size = (imgInfo[1], imgInfo[0])
|
||||
|
||||
if video_name and not video_name.endswith('mp4'):
|
||||
video_name = f'{video_name}.mp4'
|
||||
name = f'{time()}.mp4' if not video_name else video_name
|
||||
fourcc = 14
|
||||
videoWrite = VideoWriter(f'{self._path}{sep}{name}', fourcc, 8, size)
|
||||
|
||||
for i in pic_list:
|
||||
img = imread(str(i))
|
||||
videoWrite.write(img)
|
||||
|
||||
clean_folder(self._path, ignore=(name,))
|
||||
return f'{self._path}{sep}{name}'
|
||||
|
||||
def set(self, save_path=None, quality=None):
|
||||
"""设置录屏参数
|
||||
:param save_path: 保存路径
|
||||
:param quality: 视频质量,可取值0-100
|
||||
:return:
|
||||
"""
|
||||
if save_path:
|
||||
save_path = Path(save_path)
|
||||
if save_path.exists() and save_path.is_file():
|
||||
raise TypeError('save_path必须指定文件夹。')
|
||||
save_path.mkdir(parents=True, exist_ok=True)
|
||||
self._path = str(save_path)
|
||||
|
||||
if quality is not None:
|
||||
if quality < 0 or quality > 100:
|
||||
raise ValueError('quality必须在0-100之间。')
|
||||
self._quality = quality
|
||||
|
||||
def _onScreencastFrame(self, **kwargs):
|
||||
with open(f'{self._path}\\{kwargs["metadata"]["timestamp"]}.jpg', 'wb') as f:
|
||||
f.write(b64decode(kwargs['data']))
|
||||
self._page.run_cdp('Page.screencastFrameAck', sessionId=kwargs['sessionId'])
|
||||
|
||||
|
||||
class ChromiumBaseSetter(object):
|
||||
def __init__(self, page):
|
||||
self._page = page
|
||||
|
@ -40,6 +40,7 @@ class ChromiumBase(BasePage):
|
||||
self._upload_list: list = ...
|
||||
self._wait: ChromiumBaseWaiter = ...
|
||||
self._set: ChromiumBaseSetter = ...
|
||||
self._screencast: Screencast = ...
|
||||
|
||||
def _connect_browser(self, tab_id: str = None) -> None: ...
|
||||
|
||||
@ -118,6 +119,9 @@ class ChromiumBase(BasePage):
|
||||
@property
|
||||
def set(self) -> ChromiumBaseSetter: ...
|
||||
|
||||
@property
|
||||
def screencast(self) -> Screencast: ...
|
||||
|
||||
def run_js(self, script: str, *args: Any, as_expr: bool = False) -> Any: ...
|
||||
|
||||
def run_js_loaded(self, script: str, *args: Any, as_expr: bool = False) -> Any: ...
|
||||
@ -193,6 +197,21 @@ class ChromiumBase(BasePage):
|
||||
timeout: float = None) -> Union[bool, None]: ...
|
||||
|
||||
|
||||
class Screencast(object):
|
||||
def __init__(self, page: ChromiumBase):
|
||||
self._page: ChromiumBase = ...
|
||||
self._path: str = ...
|
||||
self._quality: int = ...
|
||||
|
||||
def start(self, save_path: Union[str, Path] = None, quality: int = None) -> None: ...
|
||||
|
||||
def stop(self, to_mp4: bool = True, video_name: str = None) -> str: ...
|
||||
|
||||
def set(self, save_path: Union[str, Path] = None, quality: int = None) -> None: ...
|
||||
|
||||
def _onScreencastFrame(self, **kwargs) -> None: ...
|
||||
|
||||
|
||||
class ChromiumBaseWaiter(object):
|
||||
def __init__(self, page: ChromiumBase):
|
||||
self._driver: ChromiumBase = ...
|
||||
|
@ -149,9 +149,8 @@ class ChromiumDriver(object):
|
||||
if event['method'] in self.event_handlers:
|
||||
try:
|
||||
self.event_handlers[event['method']](**event['params'])
|
||||
except Exception:
|
||||
pass
|
||||
# raise RuntimeError(f"回调函数 {event['method']} 错误:{e}")
|
||||
except Exception as e:
|
||||
raise RuntimeError(f"\n回调函数 {self.event_handlers[event['method']].__name__} 错误:\n{e}")
|
||||
|
||||
self.event_queue.task_done()
|
||||
|
||||
|
@ -25,7 +25,7 @@ def get_long(txt) -> int: ...
|
||||
def port_is_using(ip: str, port: Union[str, int]) -> bool: ...
|
||||
|
||||
|
||||
def clean_folder(folder_path: Union[str, Path], ignore: list = None) -> None: ...
|
||||
def clean_folder(folder_path: Union[str, Path], ignore: Union[tuple, list] = None) -> None: ...
|
||||
|
||||
|
||||
def unzip(zip_path: str, to_path: str) -> Union[list, None]: ...
|
||||
|
@ -47,6 +47,7 @@ class WebPage(SessionPage, ChromiumPage, BasePage):
|
||||
self._response = None
|
||||
self._download_set = None
|
||||
self._set = None
|
||||
self._screencast = None
|
||||
|
||||
self._set_start_options(driver_or_options, session_or_options)
|
||||
self._set_runtime_settings()
|
||||
|
2
setup.py
2
setup.py
@ -6,7 +6,7 @@ with open("README.md", "r", encoding='utf-8') as fh:
|
||||
|
||||
setup(
|
||||
name="DrissionPage",
|
||||
version="3.2.7",
|
||||
version="3.2.8",
|
||||
author="g1879",
|
||||
author_email="g1879@qq.com",
|
||||
description="Python based web automation tool. It can control the browser and send and receive data packets.",
|
||||
|
Loading…
x
Reference in New Issue
Block a user