mirror of
https://gitee.com/g1879/DrissionPage.git
synced 2024-12-10 04:00:23 +08:00
添加监听ws
This commit is contained in:
parent
9e4b39be55
commit
6b1bdfa316
@ -25,6 +25,7 @@ from .._functions.tools import raise_error
|
||||
from .._functions.web import location_in_viewport
|
||||
from .._units.actions import Actions
|
||||
from .._units.listener import Listener
|
||||
from .._units.listener_ws import Listener_Ws
|
||||
from .._units.rect import TabRect
|
||||
from .._units.screencast import Screencast
|
||||
from .._units.scroller import PageScroller
|
||||
@ -65,6 +66,7 @@ class ChromiumBase(BasePage):
|
||||
self._type = 'ChromiumBase'
|
||||
if not hasattr(self, '_listener'):
|
||||
self._listener = None
|
||||
self._listener_ws = None
|
||||
|
||||
if isinstance(address, int) or (isinstance(address, str) and address.isdigit()):
|
||||
address = f'127.0.0.1:{address}'
|
||||
@ -300,6 +302,13 @@ class ChromiumBase(BasePage):
|
||||
if self._listener is None:
|
||||
self._listener = Listener(self)
|
||||
return self._listener
|
||||
|
||||
@property
|
||||
def listen_ws(self):
|
||||
"""返回用于聆听ws数据包的对象"""
|
||||
if self._listener_ws is None:
|
||||
self._listener_ws = Listener_Ws(self)
|
||||
return self._listener_ws
|
||||
|
||||
@property
|
||||
def states(self):
|
||||
|
@ -19,6 +19,7 @@ from .._pages.chromium_frame import ChromiumFrame
|
||||
from .._pages.chromium_page import ChromiumPage
|
||||
from .._units.actions import Actions
|
||||
from .._units.listener import Listener
|
||||
from .._units.listener_ws import Listener_Ws
|
||||
from .._units.rect import TabRect
|
||||
from .._units.screencast import Screencast
|
||||
from .._units.scroller import Scroller, PageScroller
|
||||
@ -54,6 +55,7 @@ class ChromiumBase(BasePage):
|
||||
self._screencast: Screencast = ...
|
||||
self._actions: Actions = ...
|
||||
self._listener: Listener = ...
|
||||
self._listener_ws: Listener_Ws = ...
|
||||
self._states: PageStates = ...
|
||||
self._alert: Alert = ...
|
||||
self._has_alert: bool = ...
|
||||
@ -164,6 +166,9 @@ class ChromiumBase(BasePage):
|
||||
|
||||
@property
|
||||
def listen(self) -> Listener: ...
|
||||
|
||||
@property
|
||||
def listen_ws(self) -> Listener_Ws: ...
|
||||
|
||||
@property
|
||||
def states(self) -> PageStates: ...
|
||||
|
81
DrissionPage/_units/listener_ws.py
Normal file
81
DrissionPage/_units/listener_ws.py
Normal file
@ -0,0 +1,81 @@
|
||||
# -*- coding:utf-8 -*-
|
||||
"""
|
||||
@Author : g1879
|
||||
@Contact : g1879@qq.com
|
||||
@Copyright: (c) 2024 by g1879, Inc. All Rights Reserved.
|
||||
@License : BSD 3-Clause.
|
||||
"""
|
||||
from base64 import b64decode
|
||||
from json import JSONDecodeError, loads
|
||||
from queue import Queue
|
||||
from re import search
|
||||
from time import perf_counter, sleep
|
||||
|
||||
from requests.structures import CaseInsensitiveDict
|
||||
|
||||
from .._base.driver import Driver
|
||||
from .._functions.settings import Settings
|
||||
from ..errors import WaitTimeoutError
|
||||
|
||||
|
||||
class Listener_Ws(object):
|
||||
"""监听器基类"""
|
||||
|
||||
def __init__(self, page):
|
||||
"""
|
||||
:param page: ChromiumBase对象
|
||||
"""
|
||||
self._page = page
|
||||
self._address = page.address
|
||||
self._target_id = page._target_id
|
||||
self._driver = None
|
||||
|
||||
self._caught = None
|
||||
|
||||
self.listening = False
|
||||
|
||||
def start(self):
|
||||
self.clear()
|
||||
if self.listening:
|
||||
return
|
||||
self._driver = Driver(self._target_id, "page", self._address)
|
||||
self._driver.run("Network.enable")
|
||||
self._set_callback()
|
||||
self.listening = True
|
||||
|
||||
def steps(self, count=None, timeout=None, gap=1):
|
||||
caught = 0
|
||||
end = perf_counter() + timeout if timeout else None
|
||||
while True:
|
||||
if timeout and perf_counter() > end:
|
||||
return
|
||||
if self._caught.qsize() >= gap:
|
||||
yield self._caught.get_nowait() if gap == 1 else [
|
||||
self._caught.get_nowait() for _ in range(gap)
|
||||
]
|
||||
if timeout:
|
||||
end = perf_counter() + timeout
|
||||
if count:
|
||||
caught += gap
|
||||
if caught >= count:
|
||||
return
|
||||
sleep(0.05)
|
||||
|
||||
def clear(self):
|
||||
self._caught = Queue(maxsize=0)
|
||||
|
||||
def _set_callback(self):
|
||||
self._driver.set_callback("Network.webSocketClosed", self._websocket_closed)
|
||||
self._driver.set_callback("Network.webSocketCreated", self._websocket_created)
|
||||
self._driver.set_callback(
|
||||
"Network.webSocketFrameReceived", self._websocket_frame_received
|
||||
)
|
||||
|
||||
def _websocket_closed(self, **kwargs):
|
||||
print("_websocket_closed", kwargs)
|
||||
|
||||
def _websocket_created(self, **kwargs):
|
||||
rid = kwargs.get("requestId")
|
||||
|
||||
def _websocket_frame_received(self, **kwargs):
|
||||
self._caught.put(kwargs["response"])
|
Loading…
x
Reference in New Issue
Block a user