diff --git a/DrissionPage/_pages/chromium_base.py b/DrissionPage/_pages/chromium_base.py index 0b3a5e2..1159b17 100644 --- a/DrissionPage/_pages/chromium_base.py +++ b/DrissionPage/_pages/chromium_base.py @@ -3,13 +3,11 @@ @Author : g1879 @Contact : g1879@qq.com """ -from base64 import b64decode from json import loads, JSONDecodeError from os.path import sep from pathlib import Path from re import findall -from threading import Thread -from time import perf_counter, sleep, time +from time import perf_counter, sleep from requests import get @@ -17,12 +15,13 @@ from .._base.base import BasePage from .._base.chromium_driver import ChromiumDriver from .._commons.constants import ERROR, NoneElement from .._commons.locator import get_loc -from .._commons.tools import get_usable_path, clean_folder +from .._commons.tools import get_usable_path from .._commons.web import location_in_viewport from .._elements.chromium_element import ChromiumScroll, ChromiumElement, run_js, make_chromium_ele from .._elements.session_element import make_session_ele from .._units.action_chains import ActionChains from .._units.network_listener import NetworkListener +from .._units.screencast import Screencast from .._units.setter import ChromiumBaseSetter from .._units.waiter import ChromiumBaseWaiter from ..errors import ContextLossError, ElementLossError, AlertExistsError, CDPError, TabClosedError, \ @@ -1058,166 +1057,6 @@ class Timeout(object): return str({'implicit': self.implicit, 'page_load': self.page_load, 'script': self.script}) -class Screencast(object): - def __init__(self, page): - self._page = page - self._path = None - self._running = False - self._enable = False - self._mode = 'video' - - @property - def set_mode(self): - """返回用于设置录屏幕式的对象""" - return ScreencastMode(self) - - def start(self, save_path=None): - """开始录屏 - :param save_path: 录屏保存位置 - :return: None - """ - self.set_save_path(save_path) - if self._path is None: - raise ValueError('save_path必须设置。') - clean_folder(self._path) - if self._mode.startswith('frugal'): - self._page.driver.set_listener('Page.screencastFrame', self._onScreencastFrame) - self._page.run_cdp('Page.startScreencast', everyNthFrame=1, quality=100) - - elif not self._mode.startswith('js'): - self._running = True - self._enable = True - Thread(target=self._run).start() - - else: - js = ''' - async function () { - stream = await navigator.mediaDevices.getDisplayMedia({video: true, audio: true}) - mime = MediaRecorder.isTypeSupported("video/webm; codecs=vp9") - ? "video/webm; codecs=vp9" - : "video/webm" - mediaRecorder = new MediaRecorder(stream, {mimeType: mime}) - DrissionPage_Screencast_chunks = [] - mediaRecorder.addEventListener('dataavailable', function(e) { - DrissionPage_Screencast_blob_ok = false; - DrissionPage_Screencast_chunks.push(e.data); - DrissionPage_Screencast_blob_ok = true; - }) - mediaRecorder.start() - - mediaRecorder.addEventListener('stop', function(){ - while(DrissionPage_Screencast_blob_ok==false){} - DrissionPage_Screencast_blob = new Blob(DrissionPage_Screencast_chunks, - {type: DrissionPage_Screencast_chunks[0].type}); - }) - } - ''' - print('请手动选择要录制的目标。') - self._page.run_js('var DrissionPage_Screencast_blob;var DrissionPage_Screencast_blob_ok=false;') - self._page.run_js(js) - - def stop(self, video_name=None): - """停止录屏 - :param video_name: 视频文件名,为None时以当前时间名命 - :return: 文件路径 - """ - if video_name and not video_name.endswith('mp4'): - video_name = f'{video_name}.mp4' - name = f'{time()}.mp4' if not video_name else video_name - path = f'{self._path}{sep}{name}' - - if self._mode.startswith('js'): - self._page.run_js('mediaRecorder.stop();', as_expr=True) - while not self._page.run_js('return DrissionPage_Screencast_blob_ok;'): - sleep(.1) - blob = self._page.run_js('return DrissionPage_Screencast_blob;') - uuid = self._page.run_cdp('IO.resolveBlob', objectId=blob['result']['objectId'])['uuid'] - data = self._page.run_cdp('IO.read', handle=f'blob:{uuid}')['data'] - with open(path, 'wb') as f: - f.write(b64decode(data)) - return path - - if self._mode.startswith('frugal'): - self._page.driver.set_listener('Page.screencastFrame', None) - self._page.run_cdp('Page.stopScreencast') - else: - self._enable = False - while self._running: - sleep(.1) - - if self._mode.endswith('imgs'): - return str(Path(self._path).absolute()) - - if not str(video_name).isascii() or not str(self._path).isascii(): - raise TypeError('转换成视频仅支持英文路径和文件名。') - - try: - from cv2 import VideoWriter, imread, VideoWriter_fourcc - from numpy import fromfile, uint8 - except ModuleNotFoundError: - raise ModuleNotFoundError('请先安装cv2,pip install opencv-python') - - pic_list = Path(self._path).glob('*.jpg') - img = imread(str(next(pic_list))) - imgInfo = img.shape - size = (imgInfo[1], imgInfo[0]) - - videoWrite = VideoWriter(path, VideoWriter_fourcc(*"mp4v"), 5, size) - - for i in pic_list: - img = imread(str(i)) - videoWrite.write(img) - - clean_folder(self._path, ignore=(name,)) - return f'{self._path}{sep}{name}' - - def set_save_path(self, save_path=None): - """设置保存路径 - :param save_path: 保存路径 - :return: None - """ - if save_path: - save_path = Path(save_path) - if save_path.exists() and save_path.is_file(): - raise TypeError('save_path必须指定文件夹。') - save_path.mkdir(parents=True, exist_ok=True) - self._path = save_path - - def _run(self): - """非节俭模式运行方法""" - self._running = True - while self._enable: - self._page.get_screenshot(path=self._path, name=f'{time()}.jpg') - sleep(.04) - self._running = False - - def _onScreencastFrame(self, **kwargs): - """节俭模式运行方法""" - with open(f'{self._path}\\{kwargs["metadata"]["timestamp"]}.jpg', 'wb') as f: - f.write(b64decode(kwargs['data'])) - self._page.run_cdp('Page.screencastFrameAck', sessionId=kwargs['sessionId']) - - -class ScreencastMode(object): - def __init__(self, screencast): - self._screencast = screencast - - def video_mode(self): - self._screencast._mode = 'video' - - def frugal_video_mode(self): - self._screencast._mode = 'frugal_video' - - def js_video_mode(self): - self._screencast._mode = 'js_video' - - def frugal_imgs_mode(self): - self._screencast._mode = 'frugal_imgs' - - def imgs_mode(self): - self._screencast._mode = 'imgs' - - class Alert(object): """用于保存alert信息的类""" diff --git a/DrissionPage/_pages/chromium_base.pyi b/DrissionPage/_pages/chromium_base.pyi index edabfa1..4ff2964 100644 --- a/DrissionPage/_pages/chromium_base.pyi +++ b/DrissionPage/_pages/chromium_base.pyi @@ -18,6 +18,7 @@ from .._pages.chromium_frame import ChromiumFrame from .._pages.chromium_page import ChromiumPage from .._units.action_chains import ActionChains from .._units.network_listener import NetworkListener +from .._units.screencast import Screencast from .._units.setter import ChromiumBaseSetter from .._units.waiter import ChromiumBaseWaiter @@ -262,43 +263,6 @@ class Timeout(object): self.script: float = ... -class Screencast(object): - def __init__(self, page: ChromiumBase): - self._page: ChromiumBase = ... - self._path: Path = ... - self._running: bool = ... - self._enable: bool = ... - self._mode: str = ... - - @property - def set_mode(self) -> ScreencastMode: ... - - def start(self, save_path: Union[str, Path] = None) -> None: ... - - def stop(self, video_name: str = None) -> str: ... - - def set_save_path(self, save_path: Union[str, Path] = None) -> None: ... - - def _run(self) -> None: ... - - def _onScreencastFrame(self, **kwargs) -> None: ... - - -class ScreencastMode(object): - def __init__(self, screencast: Screencast): - self._screencast: Screencast = ... - - def video_mode(self) -> None: ... - - def frugal_video_mode(self) -> None: ... - - def js_video_mode(self) -> None: ... - - def frugal_imgs_mode(self) -> None: ... - - def imgs_mode(self) -> None: ... - - class Alert(object): def __init__(self): diff --git a/DrissionPage/_units/network_listener.py b/DrissionPage/_units/network_listener.py index e36954c..a82c5f9 100644 --- a/DrissionPage/_units/network_listener.py +++ b/DrissionPage/_units/network_listener.py @@ -12,7 +12,6 @@ from time import perf_counter, sleep from requests.structures import CaseInsensitiveDict from .._base.chromium_driver import ChromiumDriver -from ..errors import CDPError class NetworkListener(object): @@ -218,16 +217,13 @@ class NetworkListener(object): request_id = kwargs['requestId'] dp = self._request_ids.get(request_id) if dp: - try: - r = self._driver.call_method('Network.getResponseBody', requestId=request_id) - body = r['body'] - is_base64 = r['base64Encoded'] - except CDPError: - body = '' - is_base64 = False - - dp._raw_body = body - dp._base64_body = is_base64 + r = self._driver.call_method('Network.getResponseBody', requestId=request_id) + if 'body' in r: + dp._raw_body = r['body'] + dp._base64_body = r['base64Encoded'] + else: + dp._raw_body = '' + dp._base64_body = False self._caught.put(dp) try: diff --git a/DrissionPage/_units/screencast.py b/DrissionPage/_units/screencast.py new file mode 100644 index 0000000..1d25683 --- /dev/null +++ b/DrissionPage/_units/screencast.py @@ -0,0 +1,172 @@ +# -*- coding:utf-8 -*- +""" +@Author : g1879 +@Contact : g1879@qq.com +""" +from base64 import b64decode +from os.path import sep +from pathlib import Path +from threading import Thread +from time import sleep, time + +from .._commons.tools import clean_folder + + +class Screencast(object): + def __init__(self, page): + self._page = page + self._path = None + self._running = False + self._enable = False + self._mode = 'video' + + @property + def set_mode(self): + """返回用于设置录屏幕式的对象""" + return ScreencastMode(self) + + def start(self, save_path=None): + """开始录屏 + :param save_path: 录屏保存位置 + :return: None + """ + self.set_save_path(save_path) + if self._path is None: + raise ValueError('save_path必须设置。') + clean_folder(self._path) + if self._mode.startswith('frugal'): + self._page.driver.set_listener('Page.screencastFrame', self._onScreencastFrame) + self._page.run_cdp('Page.startScreencast', everyNthFrame=1, quality=100) + + elif not self._mode.startswith('js'): + self._running = True + self._enable = True + Thread(target=self._run).start() + + else: + js = ''' + async function () { + stream = await navigator.mediaDevices.getDisplayMedia({video: true, audio: true}) + mime = MediaRecorder.isTypeSupported("video/webm; codecs=vp9") + ? "video/webm; codecs=vp9" + : "video/webm" + mediaRecorder = new MediaRecorder(stream, {mimeType: mime}) + DrissionPage_Screencast_chunks = [] + mediaRecorder.addEventListener('dataavailable', function(e) { + DrissionPage_Screencast_blob_ok = false; + DrissionPage_Screencast_chunks.push(e.data); + DrissionPage_Screencast_blob_ok = true; + }) + mediaRecorder.start() + + mediaRecorder.addEventListener('stop', function(){ + while(DrissionPage_Screencast_blob_ok==false){} + DrissionPage_Screencast_blob = new Blob(DrissionPage_Screencast_chunks, + {type: DrissionPage_Screencast_chunks[0].type}); + }) + } + ''' + print('请手动选择要录制的目标。') + self._page.run_js('var DrissionPage_Screencast_blob;var DrissionPage_Screencast_blob_ok=false;') + self._page.run_js(js) + + def stop(self, video_name=None): + """停止录屏 + :param video_name: 视频文件名,为None时以当前时间名命 + :return: 文件路径 + """ + if video_name and not video_name.endswith('mp4'): + video_name = f'{video_name}.mp4' + name = f'{time()}.mp4' if not video_name else video_name + path = f'{self._path}{sep}{name}' + + if self._mode.startswith('js'): + self._page.run_js('mediaRecorder.stop();', as_expr=True) + while not self._page.run_js('return DrissionPage_Screencast_blob_ok;'): + sleep(.1) + blob = self._page.run_js('return DrissionPage_Screencast_blob;') + uuid = self._page.run_cdp('IO.resolveBlob', objectId=blob['result']['objectId'])['uuid'] + data = self._page.run_cdp('IO.read', handle=f'blob:{uuid}')['data'] + with open(path, 'wb') as f: + f.write(b64decode(data)) + return path + + if self._mode.startswith('frugal'): + self._page.driver.set_listener('Page.screencastFrame', None) + self._page.run_cdp('Page.stopScreencast') + else: + self._enable = False + while self._running: + sleep(.1) + + if self._mode.endswith('imgs'): + return str(Path(self._path).absolute()) + + if not str(video_name).isascii() or not str(self._path).isascii(): + raise TypeError('转换成视频仅支持英文路径和文件名。') + + try: + from cv2 import VideoWriter, imread, VideoWriter_fourcc + from numpy import fromfile, uint8 + except ModuleNotFoundError: + raise ModuleNotFoundError('请先安装cv2,pip install opencv-python') + + pic_list = Path(self._path).glob('*.jpg') + img = imread(str(next(pic_list))) + imgInfo = img.shape + size = (imgInfo[1], imgInfo[0]) + + videoWrite = VideoWriter(path, VideoWriter_fourcc(*"mp4v"), 5, size) + + for i in pic_list: + img = imread(str(i)) + videoWrite.write(img) + + clean_folder(self._path, ignore=(name,)) + return f'{self._path}{sep}{name}' + + def set_save_path(self, save_path=None): + """设置保存路径 + :param save_path: 保存路径 + :return: None + """ + if save_path: + save_path = Path(save_path) + if save_path.exists() and save_path.is_file(): + raise TypeError('save_path必须指定文件夹。') + save_path.mkdir(parents=True, exist_ok=True) + self._path = save_path + + def _run(self): + """非节俭模式运行方法""" + self._running = True + while self._enable: + self._page.get_screenshot(path=self._path, name=f'{time()}.jpg') + sleep(.04) + self._running = False + + def _onScreencastFrame(self, **kwargs): + """节俭模式运行方法""" + with open(f'{self._path}\\{kwargs["metadata"]["timestamp"]}.jpg', 'wb') as f: + f.write(b64decode(kwargs['data'])) + self._page.run_cdp('Page.screencastFrameAck', sessionId=kwargs['sessionId']) + + +class ScreencastMode(object): + def __init__(self, screencast): + self._screencast = screencast + + def video_mode(self): + self._screencast._mode = 'video' + + def frugal_video_mode(self): + self._screencast._mode = 'frugal_video' + + def js_video_mode(self): + self._screencast._mode = 'js_video' + + def frugal_imgs_mode(self): + self._screencast._mode = 'frugal_imgs' + + def imgs_mode(self): + self._screencast._mode = 'imgs' diff --git a/DrissionPage/_units/screencast.pyi b/DrissionPage/_units/screencast.pyi new file mode 100644 index 0000000..6c23592 --- /dev/null +++ b/DrissionPage/_units/screencast.pyi @@ -0,0 +1,46 @@ +# -*- coding:utf-8 -*- +""" +@Author : g1879 +@Contact : g1879@qq.com +""" +from pathlib import Path +from typing import Union + +from .._pages.chromium_base import ChromiumBase + + +class Screencast(object): + def __init__(self, page: ChromiumBase): + self._page: ChromiumBase = ... + self._path: Path = ... + self._running: bool = ... + self._enable: bool = ... + self._mode: str = ... + + @property + def set_mode(self) -> ScreencastMode: ... + + def start(self, save_path: Union[str, Path] = None) -> None: ... + + def stop(self, video_name: str = None) -> str: ... + + def set_save_path(self, save_path: Union[str, Path] = None) -> None: ... + + def _run(self) -> None: ... + + def _onScreencastFrame(self, **kwargs) -> None: ... + + +class ScreencastMode(object): + def __init__(self, screencast: Screencast): + self._screencast: Screencast = ... + + def video_mode(self) -> None: ... + + def frugal_video_mode(self) -> None: ... + + def js_video_mode(self) -> None: ... + + def frugal_imgs_mode(self) -> None: ... + + def imgs_mode(self) -> None: ...