修复监听器小bug

This commit is contained in:
g1879 2023-10-26 21:40:06 +08:00
parent ab1f85d192
commit cd1369e33a
5 changed files with 229 additions and 212 deletions

View File

@ -3,13 +3,11 @@
@Author : g1879
@Contact : g1879@qq.com
"""
from base64 import b64decode
from json import loads, JSONDecodeError
from os.path import sep
from pathlib import Path
from re import findall
from threading import Thread
from time import perf_counter, sleep, time
from time import perf_counter, sleep
from requests import get
@ -17,12 +15,13 @@ from .._base.base import BasePage
from .._base.chromium_driver import ChromiumDriver
from .._commons.constants import ERROR, NoneElement
from .._commons.locator import get_loc
from .._commons.tools import get_usable_path, clean_folder
from .._commons.tools import get_usable_path
from .._commons.web import location_in_viewport
from .._elements.chromium_element import ChromiumScroll, ChromiumElement, run_js, make_chromium_ele
from .._elements.session_element import make_session_ele
from .._units.action_chains import ActionChains
from .._units.network_listener import NetworkListener
from .._units.screencast import Screencast
from .._units.setter import ChromiumBaseSetter
from .._units.waiter import ChromiumBaseWaiter
from ..errors import ContextLossError, ElementLossError, AlertExistsError, CDPError, TabClosedError, \
@ -1058,166 +1057,6 @@ class Timeout(object):
return str({'implicit': self.implicit, 'page_load': self.page_load, 'script': self.script})
class Screencast(object):
def __init__(self, page):
self._page = page
self._path = None
self._running = False
self._enable = False
self._mode = 'video'
@property
def set_mode(self):
"""返回用于设置录屏幕式的对象"""
return ScreencastMode(self)
def start(self, save_path=None):
"""开始录屏
:param save_path: 录屏保存位置
:return: None
"""
self.set_save_path(save_path)
if self._path is None:
raise ValueError('save_path必须设置。')
clean_folder(self._path)
if self._mode.startswith('frugal'):
self._page.driver.set_listener('Page.screencastFrame', self._onScreencastFrame)
self._page.run_cdp('Page.startScreencast', everyNthFrame=1, quality=100)
elif not self._mode.startswith('js'):
self._running = True
self._enable = True
Thread(target=self._run).start()
else:
js = '''
async function () {
stream = await navigator.mediaDevices.getDisplayMedia({video: true, audio: true})
mime = MediaRecorder.isTypeSupported("video/webm; codecs=vp9")
? "video/webm; codecs=vp9"
: "video/webm"
mediaRecorder = new MediaRecorder(stream, {mimeType: mime})
DrissionPage_Screencast_chunks = []
mediaRecorder.addEventListener('dataavailable', function(e) {
DrissionPage_Screencast_blob_ok = false;
DrissionPage_Screencast_chunks.push(e.data);
DrissionPage_Screencast_blob_ok = true;
})
mediaRecorder.start()
mediaRecorder.addEventListener('stop', function(){
while(DrissionPage_Screencast_blob_ok==false){}
DrissionPage_Screencast_blob = new Blob(DrissionPage_Screencast_chunks,
{type: DrissionPage_Screencast_chunks[0].type});
})
}
'''
print('请手动选择要录制的目标。')
self._page.run_js('var DrissionPage_Screencast_blob;var DrissionPage_Screencast_blob_ok=false;')
self._page.run_js(js)
def stop(self, video_name=None):
"""停止录屏
:param video_name: 视频文件名为None时以当前时间名命
:return: 文件路径
"""
if video_name and not video_name.endswith('mp4'):
video_name = f'{video_name}.mp4'
name = f'{time()}.mp4' if not video_name else video_name
path = f'{self._path}{sep}{name}'
if self._mode.startswith('js'):
self._page.run_js('mediaRecorder.stop();', as_expr=True)
while not self._page.run_js('return DrissionPage_Screencast_blob_ok;'):
sleep(.1)
blob = self._page.run_js('return DrissionPage_Screencast_blob;')
uuid = self._page.run_cdp('IO.resolveBlob', objectId=blob['result']['objectId'])['uuid']
data = self._page.run_cdp('IO.read', handle=f'blob:{uuid}')['data']
with open(path, 'wb') as f:
f.write(b64decode(data))
return path
if self._mode.startswith('frugal'):
self._page.driver.set_listener('Page.screencastFrame', None)
self._page.run_cdp('Page.stopScreencast')
else:
self._enable = False
while self._running:
sleep(.1)
if self._mode.endswith('imgs'):
return str(Path(self._path).absolute())
if not str(video_name).isascii() or not str(self._path).isascii():
raise TypeError('转换成视频仅支持英文路径和文件名。')
try:
from cv2 import VideoWriter, imread, VideoWriter_fourcc
from numpy import fromfile, uint8
except ModuleNotFoundError:
raise ModuleNotFoundError('请先安装cv2pip install opencv-python')
pic_list = Path(self._path).glob('*.jpg')
img = imread(str(next(pic_list)))
imgInfo = img.shape
size = (imgInfo[1], imgInfo[0])
videoWrite = VideoWriter(path, VideoWriter_fourcc(*"mp4v"), 5, size)
for i in pic_list:
img = imread(str(i))
videoWrite.write(img)
clean_folder(self._path, ignore=(name,))
return f'{self._path}{sep}{name}'
def set_save_path(self, save_path=None):
"""设置保存路径
:param save_path: 保存路径
:return: None
"""
if save_path:
save_path = Path(save_path)
if save_path.exists() and save_path.is_file():
raise TypeError('save_path必须指定文件夹。')
save_path.mkdir(parents=True, exist_ok=True)
self._path = save_path
def _run(self):
"""非节俭模式运行方法"""
self._running = True
while self._enable:
self._page.get_screenshot(path=self._path, name=f'{time()}.jpg')
sleep(.04)
self._running = False
def _onScreencastFrame(self, **kwargs):
"""节俭模式运行方法"""
with open(f'{self._path}\\{kwargs["metadata"]["timestamp"]}.jpg', 'wb') as f:
f.write(b64decode(kwargs['data']))
self._page.run_cdp('Page.screencastFrameAck', sessionId=kwargs['sessionId'])
class ScreencastMode(object):
def __init__(self, screencast):
self._screencast = screencast
def video_mode(self):
self._screencast._mode = 'video'
def frugal_video_mode(self):
self._screencast._mode = 'frugal_video'
def js_video_mode(self):
self._screencast._mode = 'js_video'
def frugal_imgs_mode(self):
self._screencast._mode = 'frugal_imgs'
def imgs_mode(self):
self._screencast._mode = 'imgs'
class Alert(object):
"""用于保存alert信息的类"""

View File

@ -18,6 +18,7 @@ from .._pages.chromium_frame import ChromiumFrame
from .._pages.chromium_page import ChromiumPage
from .._units.action_chains import ActionChains
from .._units.network_listener import NetworkListener
from .._units.screencast import Screencast
from .._units.setter import ChromiumBaseSetter
from .._units.waiter import ChromiumBaseWaiter
@ -262,43 +263,6 @@ class Timeout(object):
self.script: float = ...
class Screencast(object):
def __init__(self, page: ChromiumBase):
self._page: ChromiumBase = ...
self._path: Path = ...
self._running: bool = ...
self._enable: bool = ...
self._mode: str = ...
@property
def set_mode(self) -> ScreencastMode: ...
def start(self, save_path: Union[str, Path] = None) -> None: ...
def stop(self, video_name: str = None) -> str: ...
def set_save_path(self, save_path: Union[str, Path] = None) -> None: ...
def _run(self) -> None: ...
def _onScreencastFrame(self, **kwargs) -> None: ...
class ScreencastMode(object):
def __init__(self, screencast: Screencast):
self._screencast: Screencast = ...
def video_mode(self) -> None: ...
def frugal_video_mode(self) -> None: ...
def js_video_mode(self) -> None: ...
def frugal_imgs_mode(self) -> None: ...
def imgs_mode(self) -> None: ...
class Alert(object):
def __init__(self):

View File

@ -12,7 +12,6 @@ from time import perf_counter, sleep
from requests.structures import CaseInsensitiveDict
from .._base.chromium_driver import ChromiumDriver
from ..errors import CDPError
class NetworkListener(object):
@ -218,16 +217,13 @@ class NetworkListener(object):
request_id = kwargs['requestId']
dp = self._request_ids.get(request_id)
if dp:
try:
r = self._driver.call_method('Network.getResponseBody', requestId=request_id)
body = r['body']
is_base64 = r['base64Encoded']
except CDPError:
body = ''
is_base64 = False
dp._raw_body = body
dp._base64_body = is_base64
r = self._driver.call_method('Network.getResponseBody', requestId=request_id)
if 'body' in r:
dp._raw_body = r['body']
dp._base64_body = r['base64Encoded']
else:
dp._raw_body = ''
dp._base64_body = False
self._caught.put(dp)
try:

View File

@ -0,0 +1,172 @@
# -*- coding:utf-8 -*-
"""
@Author : g1879
@Contact : g1879@qq.com
"""
from base64 import b64decode
from os.path import sep
from pathlib import Path
from threading import Thread
from time import sleep, time
from .._commons.tools import clean_folder
class Screencast(object):
def __init__(self, page):
self._page = page
self._path = None
self._running = False
self._enable = False
self._mode = 'video'
@property
def set_mode(self):
"""返回用于设置录屏幕式的对象"""
return ScreencastMode(self)
def start(self, save_path=None):
"""开始录屏
:param save_path: 录屏保存位置
:return: None
"""
self.set_save_path(save_path)
if self._path is None:
raise ValueError('save_path必须设置。')
clean_folder(self._path)
if self._mode.startswith('frugal'):
self._page.driver.set_listener('Page.screencastFrame', self._onScreencastFrame)
self._page.run_cdp('Page.startScreencast', everyNthFrame=1, quality=100)
elif not self._mode.startswith('js'):
self._running = True
self._enable = True
Thread(target=self._run).start()
else:
js = '''
async function () {
stream = await navigator.mediaDevices.getDisplayMedia({video: true, audio: true})
mime = MediaRecorder.isTypeSupported("video/webm; codecs=vp9")
? "video/webm; codecs=vp9"
: "video/webm"
mediaRecorder = new MediaRecorder(stream, {mimeType: mime})
DrissionPage_Screencast_chunks = []
mediaRecorder.addEventListener('dataavailable', function(e) {
DrissionPage_Screencast_blob_ok = false;
DrissionPage_Screencast_chunks.push(e.data);
DrissionPage_Screencast_blob_ok = true;
})
mediaRecorder.start()
mediaRecorder.addEventListener('stop', function(){
while(DrissionPage_Screencast_blob_ok==false){}
DrissionPage_Screencast_blob = new Blob(DrissionPage_Screencast_chunks,
{type: DrissionPage_Screencast_chunks[0].type});
})
}
'''
print('请手动选择要录制的目标。')
self._page.run_js('var DrissionPage_Screencast_blob;var DrissionPage_Screencast_blob_ok=false;')
self._page.run_js(js)
def stop(self, video_name=None):
"""停止录屏
:param video_name: 视频文件名为None时以当前时间名命
:return: 文件路径
"""
if video_name and not video_name.endswith('mp4'):
video_name = f'{video_name}.mp4'
name = f'{time()}.mp4' if not video_name else video_name
path = f'{self._path}{sep}{name}'
if self._mode.startswith('js'):
self._page.run_js('mediaRecorder.stop();', as_expr=True)
while not self._page.run_js('return DrissionPage_Screencast_blob_ok;'):
sleep(.1)
blob = self._page.run_js('return DrissionPage_Screencast_blob;')
uuid = self._page.run_cdp('IO.resolveBlob', objectId=blob['result']['objectId'])['uuid']
data = self._page.run_cdp('IO.read', handle=f'blob:{uuid}')['data']
with open(path, 'wb') as f:
f.write(b64decode(data))
return path
if self._mode.startswith('frugal'):
self._page.driver.set_listener('Page.screencastFrame', None)
self._page.run_cdp('Page.stopScreencast')
else:
self._enable = False
while self._running:
sleep(.1)
if self._mode.endswith('imgs'):
return str(Path(self._path).absolute())
if not str(video_name).isascii() or not str(self._path).isascii():
raise TypeError('转换成视频仅支持英文路径和文件名。')
try:
from cv2 import VideoWriter, imread, VideoWriter_fourcc
from numpy import fromfile, uint8
except ModuleNotFoundError:
raise ModuleNotFoundError('请先安装cv2pip install opencv-python')
pic_list = Path(self._path).glob('*.jpg')
img = imread(str(next(pic_list)))
imgInfo = img.shape
size = (imgInfo[1], imgInfo[0])
videoWrite = VideoWriter(path, VideoWriter_fourcc(*"mp4v"), 5, size)
for i in pic_list:
img = imread(str(i))
videoWrite.write(img)
clean_folder(self._path, ignore=(name,))
return f'{self._path}{sep}{name}'
def set_save_path(self, save_path=None):
"""设置保存路径
:param save_path: 保存路径
:return: None
"""
if save_path:
save_path = Path(save_path)
if save_path.exists() and save_path.is_file():
raise TypeError('save_path必须指定文件夹。')
save_path.mkdir(parents=True, exist_ok=True)
self._path = save_path
def _run(self):
"""非节俭模式运行方法"""
self._running = True
while self._enable:
self._page.get_screenshot(path=self._path, name=f'{time()}.jpg')
sleep(.04)
self._running = False
def _onScreencastFrame(self, **kwargs):
"""节俭模式运行方法"""
with open(f'{self._path}\\{kwargs["metadata"]["timestamp"]}.jpg', 'wb') as f:
f.write(b64decode(kwargs['data']))
self._page.run_cdp('Page.screencastFrameAck', sessionId=kwargs['sessionId'])
class ScreencastMode(object):
def __init__(self, screencast):
self._screencast = screencast
def video_mode(self):
self._screencast._mode = 'video'
def frugal_video_mode(self):
self._screencast._mode = 'frugal_video'
def js_video_mode(self):
self._screencast._mode = 'js_video'
def frugal_imgs_mode(self):
self._screencast._mode = 'frugal_imgs'
def imgs_mode(self):
self._screencast._mode = 'imgs'

View File

@ -0,0 +1,46 @@
# -*- coding:utf-8 -*-
"""
@Author : g1879
@Contact : g1879@qq.com
"""
from pathlib import Path
from typing import Union
from .._pages.chromium_base import ChromiumBase
class Screencast(object):
def __init__(self, page: ChromiumBase):
self._page: ChromiumBase = ...
self._path: Path = ...
self._running: bool = ...
self._enable: bool = ...
self._mode: str = ...
@property
def set_mode(self) -> ScreencastMode: ...
def start(self, save_path: Union[str, Path] = None) -> None: ...
def stop(self, video_name: str = None) -> str: ...
def set_save_path(self, save_path: Union[str, Path] = None) -> None: ...
def _run(self) -> None: ...
def _onScreencastFrame(self, **kwargs) -> None: ...
class ScreencastMode(object):
def __init__(self, screencast: Screencast):
self._screencast: Screencast = ...
def video_mode(self) -> None: ...
def frugal_video_mode(self) -> None: ...
def js_video_mode(self) -> None: ...
def frugal_imgs_mode(self) -> None: ...
def imgs_mode(self) -> None: ...