完善ChromiumDriver

This commit is contained in:
g1879 2023-01-17 10:51:11 +08:00
parent 2bafcef885
commit ab2914f9ba
4 changed files with 122 additions and 57 deletions

View File

@ -62,8 +62,8 @@ class ChromiumBase(BasePage):
""" """
self._is_loading = True self._is_loading = True
if tab_id: if tab_id:
self._tab_obj = ChromiumDriver(id=tab_id, type='page', self._tab_obj = ChromiumDriver(tab_id=tab_id, tab_type='page',
webSocketDebuggerUrl=f'ws://{self.address}/devtools/page/{tab_id}') ws_url=f'ws://{self.address}/devtools/page/{tab_id}')
self._tab_obj.start() self._tab_obj.start()
self._tab_obj.DOM.enable() self._tab_obj.DOM.enable()

View File

@ -1,17 +1,12 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
from functools import partial from functools import partial
from json import dumps, loads from json import dumps, loads
from os import getenv from queue import Queue, Empty
from threading import Thread, Event from threading import Thread, Event
from websocket import WebSocketTimeoutException, WebSocketException, WebSocketConnectionClosedException, \ from websocket import WebSocketTimeoutException, WebSocketException, WebSocketConnectionClosedException, \
create_connection create_connection
try:
import Queue as queue
except ImportError:
import queue
class GenericAttr(object): class GenericAttr(object):
def __init__(self, name, tab): def __init__(self, name, tab):
@ -32,21 +27,23 @@ class GenericAttr(object):
class ChromiumDriver(object): class ChromiumDriver(object):
status_initial = 'initial' _INITIAL_ = 'initial'
status_started = 'started' _STARTED_ = 'started'
status_stopped = 'stopped' _STOPPED_ = 'stopped'
def __init__(self, **kwargs): def __init__(self, tab_id, tab_type, ws_url):
self.id = kwargs.get("id") """
self.type = kwargs.get("type") :param tab_id: 标签页id
self.debug = getenv("DEBUG", False) :param tab_type: 标签页类型
:param ws_url: web socket url
"""
self.id = tab_id
self.type = tab_type
self.debug = False
self.has_alert = False self.has_alert = False
self._websocket_url = kwargs.get("webSocketDebuggerUrl") self._websocket_url = ws_url
self._kwargs = kwargs
self._cur_id = 0 self._cur_id = 0
self._ws = None self._ws = None
self._recv_th = Thread(target=self._recv_loop) self._recv_th = Thread(target=self._recv_loop)
@ -56,21 +53,31 @@ class ChromiumDriver(object):
self._stopped = Event() self._stopped = Event()
self._started = False self._started = False
self.status = self.status_initial self.status = self._INITIAL_
self.event_handlers = {} self.event_handlers = {}
self.method_results = {} self.method_results = {}
self.event_queue = queue.Queue() self.event_queue = Queue()
@property
def websocket_url(self):
"""返回websocket连接地址"""
return self._websocket_url
def _send(self, message, timeout=None): def _send(self, message, timeout=None):
"""发送信息到浏览器,并返回浏览器返回的信息
:param message: 发送给浏览器的数据
:param timeout: 超时时间
:return: 浏览器返回的数据
"""
if 'id' not in message: if 'id' not in message:
self._cur_id += 1 self._cur_id += 1
message['id'] = self._cur_id message['id'] = self._cur_id
message_json = dumps(message) message_json = dumps(message)
if self.debug: # pragma: no cover if self.debug:
print(f"SEND > {message_json}") print(f"> {message_json}")
if not isinstance(timeout, (int, float)) or timeout > 1: if not isinstance(timeout, (int, float)) or timeout > 1:
q_timeout = 1 q_timeout = 1
@ -78,9 +85,7 @@ class ChromiumDriver(object):
q_timeout = timeout / 2.0 q_timeout = timeout / 2.0
try: try:
self.method_results[message['id']] = queue.Queue() self.method_results[message['id']] = Queue()
# just raise the exception to user
self._ws.send(message_json) self._ws.send(message_json)
while not self._stopped.is_set(): while not self._stopped.is_set():
@ -88,11 +93,11 @@ class ChromiumDriver(object):
if isinstance(timeout, (int, float)): if isinstance(timeout, (int, float)):
if timeout < q_timeout: if timeout < q_timeout:
q_timeout = timeout q_timeout = timeout
timeout -= q_timeout timeout -= q_timeout
return self.method_results[message['id']].get(timeout=q_timeout) return self.method_results[message['id']].get(timeout=q_timeout)
except queue.Empty:
except Empty:
if self.has_alert: if self.has_alert:
return {'result': {'alert': True}} return {'result': {'alert': True}}
if isinstance(timeout, (int, float)) and timeout <= 0: if isinstance(timeout, (int, float)) and timeout <= 0:
@ -100,11 +105,11 @@ class ChromiumDriver(object):
continue continue
# raise UserAbortException("User abort, call stop() when calling %s" % message['method'])
finally: finally:
self.method_results.pop(message['id'], None) self.method_results.pop(message['id'], None)
def _recv_loop(self): def _recv_loop(self):
"""接收浏览器信息的守护线程方法"""
while not self._stopped.is_set(): while not self._stopped.is_set():
try: try:
self._ws.settimeout(1) self._ws.settimeout(1)
@ -114,12 +119,11 @@ class ChromiumDriver(object):
continue continue
except (WebSocketException, OSError, WebSocketConnectionClosedException): except (WebSocketException, OSError, WebSocketConnectionClosedException):
if not self._stopped.is_set(): if not self._stopped.is_set():
# logger.error("websocket exception", exc_info=True)
self._stopped.set() self._stopped.set()
return return
if self.debug: # pragma: no cover if self.debug:
print(f'< RECV {message_json}') print(f'< {message_json}')
if "method" in message: if "method" in message:
self.event_queue.put(message) self.event_queue.put(message)
@ -127,14 +131,16 @@ class ChromiumDriver(object):
elif "id" in message: elif "id" in message:
if message["id"] in self.method_results: if message["id"] in self.method_results:
self.method_results[message['id']].put(message) self.method_results[message['id']].put(message)
# else: # pragma: no cover
# warn("unknown message: %s" % message) elif self.debug:
print(f'未知信息:{message}')
def _handle_event_loop(self): def _handle_event_loop(self):
"""当接收到浏览器信息,执行已绑定的方法"""
while not self._stopped.is_set(): while not self._stopped.is_set():
try: try:
event = self.event_queue.get(timeout=1) event = self.event_queue.get(timeout=1)
except queue.Empty: except Empty:
continue continue
if event['method'] in self.event_handlers: if event['method'] in self.event_handlers:
@ -151,9 +157,14 @@ class ChromiumDriver(object):
return attr return attr
def call_method(self, _method, *args, **kwargs): def call_method(self, _method, *args, **kwargs):
"""执行cdp方法 \n
:param _method: cdp方法名
:param args: cdp参数
:param kwargs: cdp参数
:return: 执行结果
"""
if not self._started: if not self._started:
raise RuntimeError("不能在启动前调用方法。") raise RuntimeError("不能在启动前调用方法。")
if args: if args:
raise CallMethodException("参数必须是key=value形式。") raise CallMethodException("参数必须是key=value形式。")
@ -168,14 +179,14 @@ class ChromiumDriver(object):
return result['result'] return result['result']
def start(self): def start(self):
"""启动连接"""
if self._started: if self._started:
return False return False
if not self._websocket_url: if not self._websocket_url:
raise RuntimeError("已存在另一个连接。") raise RuntimeError("已存在另一个连接。")
self._started = True self._started = True
self.status = self.status_started self.status = self._STARTED_
self._stopped.clear() self._stopped.clear()
self._ws = create_connection(self._websocket_url, enable_multithread=True) self._ws = create_connection(self._websocket_url, enable_multithread=True)
self._recv_th.start() self._recv_th.start()
@ -183,22 +194,26 @@ class ChromiumDriver(object):
return True return True
def stop(self): def stop(self):
"""中断连接"""
if self._stopped.is_set(): if self._stopped.is_set():
return False return False
if not self._started: if not self._started:
raise RuntimeError("Driver正在运行。") raise RuntimeError("Driver正在运行。")
self.status = self.status_stopped self.status = self._STOPPED_
self._stopped.set() self._stopped.set()
if self._ws: if self._ws:
self._ws.close() self._ws.close()
return True return True
def set_listener(self, event, callback): def set_listener(self, event, callback):
"""绑定cdp event和回调方法 \n
:param event: cdp event
:param callback: 绑定到cdp event的回调方法
:return: 回调方法
"""
if not callback: if not callback:
return self.event_handlers.pop(event, None) return self.event_handlers.pop(event, None)
if not callable(callback): if not callable(callback):
raise RuntimeError("方法不能调用。") raise RuntimeError("方法不能调用。")
@ -206,23 +221,12 @@ class ChromiumDriver(object):
return True return True
def get_listener(self, event): def get_listener(self, event):
"""获取cdp event对应的回调方法 \n
:param event: cdp event
:return: 回调方法
"""
return self.event_handlers.get(event, None) return self.event_handlers.get(event, None)
def del_all_listeners(self):
self.event_handlers = {}
return True
def wait(self, timeout=None):
if not self._started:
raise RuntimeError("Driver仍未运行。")
if timeout:
return self._stopped.wait(timeout)
self._recv_th.join()
self._handle_event_th.join()
return True
def __str__(self): def __str__(self):
return f"<ChromiumDriver {self.id}>" return f"<ChromiumDriver {self.id}>"

View File

@ -0,0 +1,61 @@
# -*- coding: utf-8 -*-
from queue import Queue
from threading import Thread, Event
from typing import Union, Callable
class GenericAttr(object):
def __init__(self, name: str, tab: ChromiumDriver): ...
def __getattr__(self, item: str) -> Callable: ...
def __setattr__(self, key: str, value: Callable) -> None: ...
class ChromiumDriver(object):
_INITIAL_: str
_STARTED_: str
_STOPPED_: str
id: str
type: str
debug: bool
has_alert: bool
_websocket_url: str
_cur_id: int
_ws = None
_recv_th: Thread
_handle_event_th: Thread
_stopped: Event
_started: bool
status: str
event_handlers: dict
method_results: dict
event_queue: Queue
def __init__(self, tab_id: str, tab_type: str, ws_url: str): ...
@property
def websocket_url(self) -> str: ...
def _send(self, message: dict, timeout: Union[int, float] = None) -> dict: ...
def _recv_loop(self) -> None: ...
def _handle_event_loop(self) -> None: ...
def __getattr__(self, item: str) -> Callable: ...
def call_method(self, _method: str, *args, **kwargs) -> dict: ...
def start(self) -> bool: ...
def stop(self) -> bool: ...
def set_listener(self, event: str, callback: Callable) -> Union[Callable, None, bool]: ...
def get_listener(self, event: str) -> Union[Callable, None]: ...
def __str__(self) -> str: ...
class CallMethodException(Exception): ...

View File

@ -65,7 +65,7 @@ class ChromiumPage(ChromiumBase):
# 接收传递过来的ChromiumDriver浏览器 # 接收传递过来的ChromiumDriver浏览器
elif isinstance(addr_driver_opts, ChromiumDriver): elif isinstance(addr_driver_opts, ChromiumDriver):
self._tab_obj = addr_driver_opts self._tab_obj = addr_driver_opts
self.address = search(r'ws://(.*?)/dev', addr_driver_opts._websocket_url).group(1) self.address = search(r'ws://(.*?)/dev', addr_driver_opts.websocket_url).group(1)
self.process = None self.process = None
self.options = DriverOptions(read_file=False) self.options = DriverOptions(read_file=False)