Pre Merge pull request !37 from 魂穆河/dev

This commit is contained in:
魂穆河 2024-01-30 09:05:10 +00:00 committed by Gitee
commit 92cbea47b7
No known key found for this signature in database
GPG Key ID: 173E9B9CA92EEF8F
3 changed files with 95 additions and 0 deletions

View File

@ -24,6 +24,7 @@ from .._functions.tools import raise_error
from .._functions.web import location_in_viewport
from .._units.actions import Actions
from .._units.listener import Listener
from .._units.listener_ws import Listener_Ws
from .._units.rect import TabRect
from .._units.screencast import Screencast
from .._units.scroller import PageScroller
@ -64,6 +65,7 @@ class ChromiumBase(BasePage):
self._type = 'ChromiumBase'
if not hasattr(self, '_listener'):
self._listener = None
self._listener_ws = None
if isinstance(address, int) or (isinstance(address, str) and address.isdigit()):
address = f'127.0.0.1:{address}'
@ -298,6 +300,13 @@ class ChromiumBase(BasePage):
if self._listener is None:
self._listener = Listener(self)
return self._listener
@property
def listen_ws(self):
"""返回用于聆听ws数据包的对象"""
if self._listener_ws is None:
self._listener_ws = Listener_Ws(self)
return self._listener_ws
@property
def states(self):

View File

@ -18,6 +18,7 @@ from .._pages.chromium_frame import ChromiumFrame
from .._pages.chromium_page import ChromiumPage
from .._units.actions import Actions
from .._units.listener import Listener
from .._units.listener_ws import Listener_Ws
from .._units.rect import TabRect
from .._units.screencast import Screencast
from .._units.scroller import Scroller, PageScroller
@ -53,6 +54,7 @@ class ChromiumBase(BasePage):
self._screencast: Screencast = ...
self._actions: Actions = ...
self._listener: Listener = ...
self._listener_ws: Listener_Ws = ...
self._states: PageStates = ...
self._alert: Alert = ...
self._has_alert: bool = ...
@ -163,6 +165,9 @@ class ChromiumBase(BasePage):
@property
def listen(self) -> Listener: ...
@property
def listen_ws(self) -> Listener_Ws: ...
@property
def states(self) -> PageStates: ...

View File

@ -0,0 +1,81 @@
# -*- coding:utf-8 -*-
"""
@Author : g1879
@Contact : g1879@qq.com
@Copyright: (c) 2024 by g1879, Inc. All Rights Reserved.
@License : BSD 3-Clause.
"""
from base64 import b64decode
from json import JSONDecodeError, loads
from queue import Queue
from re import search
from time import perf_counter, sleep
from requests.structures import CaseInsensitiveDict
from .._base.driver import Driver
from .._functions.settings import Settings
from ..errors import WaitTimeoutError
class Listener_Ws(object):
"""监听器基类"""
def __init__(self, page):
"""
:param page: ChromiumBase对象
"""
self._page = page
self._address = page.address
self._target_id = page._target_id
self._driver = None
self._caught = None
self.listening = False
def start(self):
self.clear()
if self.listening:
return
self._driver = Driver(self._target_id, "page", self._address)
self._driver.run("Network.enable")
self._set_callback()
self.listening = True
def steps(self, count=None, timeout=None, gap=1):
caught = 0
end = perf_counter() + timeout if timeout else None
while True:
if timeout and perf_counter() > end:
return
if self._caught.qsize() >= gap:
yield self._caught.get_nowait() if gap == 1 else [
self._caught.get_nowait() for _ in range(gap)
]
if timeout:
end = perf_counter() + timeout
if count:
caught += gap
if caught >= count:
return
sleep(0.05)
def clear(self):
self._caught = Queue(maxsize=0)
def _set_callback(self):
self._driver.set_callback("Network.webSocketClosed", self._websocket_closed)
self._driver.set_callback("Network.webSocketCreated", self._websocket_created)
self._driver.set_callback(
"Network.webSocketFrameReceived", self._websocket_frame_received
)
def _websocket_closed(self, **kwargs):
print("_websocket_closed", kwargs)
def _websocket_created(self, **kwargs):
rid = kwargs.get("requestId")
def _websocket_frame_received(self, **kwargs):
self._caught.put(kwargs["response"])