diff --git a/DrissionPage/_pages/chromium_base.py b/DrissionPage/_pages/chromium_base.py index 45225f1..8941a47 100644 --- a/DrissionPage/_pages/chromium_base.py +++ b/DrissionPage/_pages/chromium_base.py @@ -24,6 +24,7 @@ from .._functions.tools import raise_error from .._functions.web import location_in_viewport from .._units.actions import Actions from .._units.listener import Listener +from .._units.listener_ws import Listener_Ws from .._units.rect import TabRect from .._units.screencast import Screencast from .._units.scroller import PageScroller @@ -64,6 +65,7 @@ class ChromiumBase(BasePage): self._type = 'ChromiumBase' if not hasattr(self, '_listener'): self._listener = None + self._listener_ws = None if isinstance(address, int) or (isinstance(address, str) and address.isdigit()): address = f'127.0.0.1:{address}' @@ -298,6 +300,13 @@ class ChromiumBase(BasePage): if self._listener is None: self._listener = Listener(self) return self._listener + + @property + def listen_ws(self): + """返回用于聆听ws数据包的对象""" + if self._listener_ws is None: + self._listener_ws = Listener_Ws(self) + return self._listener_ws @property def states(self): diff --git a/DrissionPage/_pages/chromium_base.pyi b/DrissionPage/_pages/chromium_base.pyi index d5e6e5e..06314d9 100644 --- a/DrissionPage/_pages/chromium_base.pyi +++ b/DrissionPage/_pages/chromium_base.pyi @@ -18,6 +18,7 @@ from .._pages.chromium_frame import ChromiumFrame from .._pages.chromium_page import ChromiumPage from .._units.actions import Actions from .._units.listener import Listener +from .._units.listener_ws import Listener_Ws from .._units.rect import TabRect from .._units.screencast import Screencast from .._units.scroller import Scroller, PageScroller @@ -53,6 +54,7 @@ class ChromiumBase(BasePage): self._screencast: Screencast = ... self._actions: Actions = ... self._listener: Listener = ... + self._listener_ws: Listener_Ws = ... self._states: PageStates = ... self._alert: Alert = ... self._has_alert: bool = ... @@ -163,6 +165,9 @@ class ChromiumBase(BasePage): @property def listen(self) -> Listener: ... + + @property + def listen_ws(self) -> Listener_Ws: ... @property def states(self) -> PageStates: ... diff --git a/DrissionPage/_units/listener_ws.py b/DrissionPage/_units/listener_ws.py new file mode 100644 index 0000000..9bc4a5d --- /dev/null +++ b/DrissionPage/_units/listener_ws.py @@ -0,0 +1,81 @@ +# -*- coding:utf-8 -*- +""" +@Author : g1879 +@Contact : g1879@qq.com +@Copyright: (c) 2024 by g1879, Inc. All Rights Reserved. +@License : BSD 3-Clause. +""" +from base64 import b64decode +from json import JSONDecodeError, loads +from queue import Queue +from re import search +from time import perf_counter, sleep + +from requests.structures import CaseInsensitiveDict + +from .._base.driver import Driver +from .._functions.settings import Settings +from ..errors import WaitTimeoutError + + +class Listener_Ws(object): + """监听器基类""" + + def __init__(self, page): + """ + :param page: ChromiumBase对象 + """ + self._page = page + self._address = page.address + self._target_id = page._target_id + self._driver = None + + self._caught = None + + self.listening = False + + def start(self): + self.clear() + if self.listening: + return + self._driver = Driver(self._target_id, "page", self._address) + self._driver.run("Network.enable") + self._set_callback() + self.listening = True + + def steps(self, count=None, timeout=None, gap=1): + caught = 0 + end = perf_counter() + timeout if timeout else None + while True: + if timeout and perf_counter() > end: + return + if self._caught.qsize() >= gap: + yield self._caught.get_nowait() if gap == 1 else [ + self._caught.get_nowait() for _ in range(gap) + ] + if timeout: + end = perf_counter() + timeout + if count: + caught += gap + if caught >= count: + return + sleep(0.05) + + def clear(self): + self._caught = Queue(maxsize=0) + + def _set_callback(self): + self._driver.set_callback("Network.webSocketClosed", self._websocket_closed) + self._driver.set_callback("Network.webSocketCreated", self._websocket_created) + self._driver.set_callback( + "Network.webSocketFrameReceived", self._websocket_frame_received + ) + + def _websocket_closed(self, **kwargs): + print("_websocket_closed", kwargs) + + def _websocket_created(self, **kwargs): + rid = kwargs.get("requestId") + + def _websocket_frame_received(self, **kwargs): + self._caught.put(kwargs["response"])