4.0.0b31完善Driver的stop()逻辑;优化录像保存逻辑;页面对象增加save()

This commit is contained in:
g1879 2023-12-27 17:21:52 +08:00
parent bd47aee4ca
commit eaad58da9e
15 changed files with 114 additions and 53 deletions

View File

@ -13,4 +13,4 @@ from ._configs.chromium_options import ChromiumOptions
from ._configs.session_options import SessionOptions
__all__ = ['ChromiumPage', 'ChromiumOptions', 'SessionOptions', 'SessionPage', 'WebPage', '__version__']
__version__ = '4.0.0b30'
__version__ = '4.0.0b31'

View File

@ -12,6 +12,8 @@ from requests import get
from websocket import (WebSocketTimeoutException, WebSocketConnectionClosedException, create_connection,
WebSocketException)
from ..errors import PageClosedError
class Driver(object):
def __init__(self, tab_id, tab_type, address):
@ -122,7 +124,7 @@ class Driver(object):
self.alert_flag = msg['method'].endswith('Opening')
function = self.immediate_event_handlers.get(msg['method'])
if function:
Thread(target=function, kwargs=msg['params']).start()
Thread(target=run_function, args=(function, msg['params'])).start()
# function(**msg['params'])
else:
self.event_queue.put(msg)
@ -159,11 +161,13 @@ class Driver(object):
timeout = kwargs.pop('_timeout', 15)
result = self._send({'method': _method, 'params': kwargs}, timeout=timeout)
if 'result' not in result and 'error' in result:
if result is None:
return {'error': {'message': 'page closed'}}
elif 'result' not in result and 'error' in result:
return {'error': result['error']['message'], 'type': result.get('type', 'call_method_error'),
'method': _method, 'args': kwargs}
return result['result']
else:
return result['result']
def start(self):
"""启动连接"""
@ -190,14 +194,14 @@ class Driver(object):
self._ws.close()
self._ws = None
while not self.event_queue.empty():
event = self.event_queue.get_nowait()
function = self.event_handlers.get(event['method'])
if function:
try:
try:
while not self.event_queue.empty():
event = self.event_queue.get_nowait()
function = self.event_handlers.get(event['method'])
if function:
function(**event['params'])
except:
pass
except:
pass
self.event_handlers.clear()
self.method_results.clear()
@ -249,3 +253,10 @@ class BrowserDriver(Driver):
def stop(self):
super().stop()
self.browser._on_quit()
def run_function(function, kwargs):
try:
function(**kwargs)
except PageClosedError:
pass

View File

@ -479,7 +479,7 @@ class ChromiumBase(BasePage):
:param script: js文本
:param args: 参数按顺序在js文本中对应arguments[0]arguments[1]...
:param as_expr: 是否作为表达式运行为True时args无效
:param timeout: js超时时间为None则使用页面timeouts.script设置
:param timeout: js超时时间为None则使用页面timeouts.script属性值
:return: 运行的结果
"""
self.wait.load_complete()
@ -490,7 +490,7 @@ class ChromiumBase(BasePage):
:param script: js文本
:param args: 参数按顺序在js文本中对应arguments[0]arguments[1]...
:param as_expr: 是否作为表达式运行为True时args无效
:param timeout: js超时时间为None则使用页面timeouts.script设置
:param timeout: js超时时间为None则使用页面timeouts.script属性值
:return: None
"""
from threading import Thread
@ -501,9 +501,9 @@ class ChromiumBase(BasePage):
"""访问url
:param url: 目标url
:param show_errmsg: 是否显示和抛出异常
:param retry: 重试次数
:param interval: 重试间隔
:param timeout: 连接超时时间
:param retry: 重试次数为None时使用页面对象retry_times属性值
:param interval: 重试间隔为None时使用页面对象retry_interval属性值
:param timeout: 连接超时时间为None时使用页面对象timeouts.page_load属性值
:return: 目标url是否可用
"""
retry, interval = self._before_connect(url, retry, interval)
@ -1154,3 +1154,18 @@ def close_privacy_dialog(page, tid):
except:
pass
def get_mhtml(page, path=None, name=None):
"""把当前页面保存为mhtml文件
:param page: 要保存的页面对象
:param path: 保存路径为None保存在当前路径
:param name: 文件名为None则用title属性值
:return: mhtml文本
"""
r = page.run_cdp('Page.captureSnapshot')['data']
path = path or '.'
name = name or page.title
with open(f'{path}{sep}{name}.mhtml', 'w', encoding='utf-8') as f:
f.write(r)
return r

View File

@ -6,6 +6,7 @@
from pathlib import Path
from typing import Union, Tuple, List, Any, Optional, Literal
from .chromium_tab import ChromiumTab
from .._base.base import BasePage
from .._base.browser import Browser
from .._base.driver import Driver
@ -263,3 +264,6 @@ class Alert(object):
self.handle_next: Optional[bool] = ...
self.next_text: str = ...
self.auto: Optional[bool] = ...
def get_mhtml(page: Union[ChromiumPage, ChromiumTab], path: Union[str, Path] = None, name: str = None) -> str: ...

View File

@ -205,19 +205,13 @@ class ChromiumFrame(ChromiumBase):
def _onInspectorDetached(self, **kwargs):
"""异域转同域或退出"""
try:
self._reload()
except PageClosedError:
pass
self._reload()
def _onFrameDetached(self, **kwargs):
"""同域变异域"""
self.browser._frames.pop(kwargs['frameId'], None)
if kwargs['frameId'] == self._frame_id:
try:
self._reload()
except PageClosedError:
pass
self._reload()
# ----------挂件----------

View File

@ -11,7 +11,7 @@ from requests import get
from .._base.browser import Browser
from .._functions.browser import connect_browser
from .._configs.chromium_options import ChromiumOptions
from .._pages.chromium_base import ChromiumBase, Timeout
from .._pages.chromium_base import ChromiumBase, get_mhtml, Timeout
from .._pages.chromium_tab import ChromiumTab
from .._units.setter import ChromiumPageSetter
from .._units.waiter import PageWaiter
@ -66,12 +66,13 @@ class ChromiumPage(ChromiumBase):
ws = get(f'http://{self._chromium_options.address}/json/version', headers={'Connection': 'close'})
if not ws:
raise BrowserConnectError('\n浏览器连接失败请检查是否启用全局代理。如是须设置不代理127.0.0.1地址。')
except :
ws = ws.json()['webSocketDebuggerUrl'].split('/')[-1]
except KeyError:
raise BrowserConnectError('浏览器版本太旧,请升级。')
except:
raise BrowserConnectError('\n浏览器连接失败请检查是否启用全局代理。如是须设置不代理127.0.0.1地址。')
ws = ws.json()['webSocketDebuggerUrl'].split('/')[-1]
self._browser = Browser(self._chromium_options.address, ws, self)
if (is_exist and self._chromium_options._headless is False and
'headless' in self._browser.run_cdp('Browser.getVersion')['userAgent'].lower()):
self._browser.quit(3)
@ -140,6 +141,14 @@ class ChromiumPage(ChromiumBase):
"""返回浏览器进程id"""
return self.browser.process_id
def save(self, path=None, name=None):
"""把当前页面保存为mhtml文件
:param path: 保存路径为None保存在当前路径
:param name: 文件名为None则用title属性值
:return: mhtml文本
"""
return get_mhtml(self, path, name)
def get_tab(self, id_or_num=None):
"""获取一个标签页对象
:param id_or_num: 要获取的标签页id或序号为None时获取当前tab序号不是视觉排列顺序而是激活顺序

View File

@ -3,6 +3,7 @@
@Author : g1879
@Contact : g1879@qq.com
"""
from pathlib import Path
from typing import Union, Tuple, List, Optional
from .._base.browser import Browser
@ -54,6 +55,8 @@ class ChromiumPage(ChromiumBase):
@property
def set(self) -> ChromiumPageSetter: ...
def save(self, path: Union[str, Path] = None, name: str = None) -> str: ...
def get_tab(self, tab_id: Union[str, ChromiumTab, int] = None) -> ChromiumTab: ...
def find_tabs(self, title: str = None, url: str = None,

View File

@ -6,9 +6,9 @@
from copy import copy
from .._base.base import BasePage
from .._functions.web import set_session_cookies, set_browser_cookies
from .._configs.session_options import SessionOptions
from .._pages.chromium_base import ChromiumBase
from .._functions.web import set_session_cookies, set_browser_cookies
from .._pages.chromium_base import ChromiumBase, get_mhtml
from .._pages.session_page import SessionPage
from .._units.setter import TabSetter, WebPageTabSetter
from .._units.waiter import TabWaiter
@ -58,6 +58,14 @@ class ChromiumTab(ChromiumBase):
self._wait = TabWaiter(self)
return self._wait
def save(self, path=None, name=None):
"""把当前页面保存为mhtml文件
:param path: 保存路径为None保存在当前路径
:param name: 文件名为None则用title属性值
:return: mhtml文本
"""
return get_mhtml(self, path, name)
def __repr__(self):
return f'<ChromiumTab browser_id={self.browser.id} tab_id={self.tab_id}>'
@ -191,9 +199,9 @@ class WebPageTab(SessionPage, ChromiumTab, BasePage):
"""跳转到一个url
:param url: 目标url
:param show_errmsg: 是否显示和抛出异常
:param retry: 重试次数
:param interval: 重试间隔
:param timeout: 连接超时时间
:param retry: 重试次数为None时使用页面对象retry_times属性值
:param interval: 重试间隔为None时使用页面对象retry_interval属性值
:param timeout: 连接超时时间为None时使用页面对象timeouts.page_load属性值
:param kwargs: 连接参数s模式专用
:return: url是否可用d模式返回None时表示不确定
"""

View File

@ -3,6 +3,7 @@
@Author : g1879
@Contact : g1879@qq.com
"""
from pathlib import Path
from typing import Union, Tuple, Any, List, Optional
from requests import Session, Response
@ -41,6 +42,8 @@ class ChromiumTab(ChromiumBase):
@property
def wait(self) -> TabWaiter: ...
def save(self, path: Union[str, Path] = None, name: str = None) -> str: ...
class WebPageTab(SessionPage, ChromiumTab):
def __init__(self, page: WebPage, tab_id: str):

View File

@ -141,9 +141,9 @@ class SessionPage(BasePage):
"""用get方式跳转到url可输入文件路径
:param url: 目标url可指定本地文件路径
:param show_errmsg: 是否显示和抛出异常
:param retry: 重试次数
:param interval: 重试间隔
:param timeout: 连接超时时间
:param retry: 重试次数为None时使用页面对象retry_times属性值
:param interval: 重试间隔为None时使用页面对象retry_interval属性值
:param timeout: 连接超时时间为None时使用页面对象timeout属性值
:param kwargs: 连接参数
:return: url是否可用
"""

View File

@ -152,9 +152,9 @@ class WebPage(SessionPage, ChromiumPage, BasePage):
"""跳转到一个url
:param url: 目标url
:param show_errmsg: 是否显示和抛出异常
:param retry: 重试次数
:param interval: 重试间隔
:param timeout: 连接超时时间
:param retry: 重试次数为None时使用页面对象retry_times属性值
:param interval: 重试间隔为None时使用页面对象retry_interval属性值
:param timeout: 连接超时时间为None时使用页面对象timeouts.page_load属性值
:param kwargs: 连接参数s模式专用
:return: url是否可用d模式返回None时表示不确定
"""

View File

@ -295,7 +295,9 @@ class Listener(object):
packet = self._request_ids.get(rid)
if packet:
r = self._driver.run('Network.getResponseBody', requestId=rid)
if 'body' in r:
if 'error' in r:
return
elif 'body' in r:
packet._raw_body = r['body']
packet._base64_body = r['base64Encoded']
else:

View File

@ -6,16 +6,17 @@
from base64 import b64decode
from os.path import sep
from pathlib import Path
from random import randint
from shutil import rmtree
from threading import Thread
from time import sleep, time
from .._functions.tools import clean_folder
class Screencast(object):
def __init__(self, page):
self._page = page
self._path = None
self._tmp_path = None
self._running = False
self._enable = False
self._mode = 'video'
@ -33,9 +34,11 @@ class Screencast(object):
self.set_save_path(save_path)
if self._path is None:
raise ValueError('save_path必须设置。')
tmp = self._path / 'tmp'
tmp.mkdir(parents=True, exist_ok=True)
clean_folder(tmp)
if self._mode in ('frugal_video', 'video'):
self._tmp_path = self._path / f'screencast_tmp_{time()}_{randint(0, 100)}'
self._tmp_path.mkdir(parents=True, exist_ok=True)
if self._mode.startswith('frugal'):
self._page.driver.set_callback('Page.screencastFrame', self._onScreencastFrame)
self._page.run_cdp('Page.startScreencast', everyNthFrame=1, quality=100)
@ -45,7 +48,7 @@ class Screencast(object):
self._enable = True
Thread(target=self._run).start()
else:
else: # js模式
js = '''
async function () {
stream = await navigator.mediaDevices.getDisplayMedia({video: true, audio: true})
@ -104,7 +107,7 @@ class Screencast(object):
if self._mode.endswith('imgs'):
return str(Path(self._path).absolute())
if not str(video_name).isascii() or not str(self._path).isascii():
if not str(self._path).isascii():
raise TypeError('转换成视频仅支持英文路径和文件名。')
try:
@ -113,7 +116,7 @@ class Screencast(object):
except ModuleNotFoundError:
raise ModuleNotFoundError('请先安装cv2pip install opencv-python')
pic_list = Path(self._path).glob('*.jpg')
pic_list = Path(self._tmp_path or self._path).glob('*.jpg')
img = imread(str(next(pic_list)))
imgInfo = img.shape
size = (imgInfo[1], imgInfo[0])
@ -124,7 +127,8 @@ class Screencast(object):
img = imread(str(i))
videoWrite.write(img)
clean_folder(self._path, ignore=(name,))
rmtree(self._tmp_path)
self._tmp_path = None
return f'{self._path}{sep}{name}'
def set_save_path(self, save_path=None):
@ -142,14 +146,16 @@ class Screencast(object):
def _run(self):
"""非节俭模式运行方法"""
self._running = True
path = self._tmp_path or self._path
while self._enable:
self._page.get_screenshot(path=self._path, name=f'{time()}.jpg')
self._page.get_screenshot(path=path, name=f'{time()}.jpg')
sleep(.04)
self._running = False
def _onScreencastFrame(self, **kwargs):
"""节俭模式运行方法"""
with open(f'{self._path}\\{kwargs["metadata"]["timestamp"]}.jpg', 'wb') as f:
path = self._tmp_path or self._path
with open(f'{path}{sep}{kwargs["metadata"]["timestamp"]}.jpg', 'wb') as f:
f.write(b64decode(kwargs['data']))
self._page.run_cdp('Page.screencastFrameAck', sessionId=kwargs['sessionId'])
@ -159,16 +165,21 @@ class ScreencastMode(object):
self._screencast = screencast
def video_mode(self):
"""持续视频模式,生成的视频没有声音"""
self._screencast._mode = 'video'
def frugal_video_mode(self):
"""设置节俭视频模式,页面有变化时才录制,生成的视频没有声音"""
self._screencast._mode = 'frugal_video'
def js_video_mode(self):
"""设置使用js录制视频模式可生成有声音的视频但需要手动启动"""
self._screencast._mode = 'js_video'
def frugal_imgs_mode(self):
"""设置节俭视频模式,页面有变化时才截图"""
self._screencast._mode = 'frugal_imgs'
def imgs_mode(self):
"""设置图片模式,持续对页面进行截图"""
self._screencast._mode = 'imgs'

View File

@ -13,6 +13,7 @@ class Screencast(object):
def __init__(self, page: ChromiumBase):
self._page: ChromiumBase = ...
self._path: Path = ...
self._tmp_path: Path = ...
self._running: bool = ...
self._enable: bool = ...
self._mode: str = ...

View File

@ -6,7 +6,7 @@ with open("README.md", "r", encoding='utf-8') as fh:
setup(
name="DrissionPage",
version="4.0.0b30",
version="4.0.0b31",
author="g1879",
author_email="g1879@qq.com",
description="Python based web automation tool. It can control the browser and send and receive data packets.",