From ab2914f9ba94ce8923bc06f524c5d5a1ac547220 Mon Sep 17 00:00:00 2001 From: g1879 Date: Tue, 17 Jan 2023 10:51:11 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=8C=E5=96=84ChromiumDriver?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- DrissionPage/chromium_base.py | 4 +- DrissionPage/chromium_driver.py | 112 ++++++++++++++++--------------- DrissionPage/chromium_driver.pyi | 61 +++++++++++++++++ DrissionPage/chromium_page.py | 2 +- 4 files changed, 122 insertions(+), 57 deletions(-) create mode 100644 DrissionPage/chromium_driver.pyi diff --git a/DrissionPage/chromium_base.py b/DrissionPage/chromium_base.py index fe56694..4f6bd60 100644 --- a/DrissionPage/chromium_base.py +++ b/DrissionPage/chromium_base.py @@ -62,8 +62,8 @@ class ChromiumBase(BasePage): """ self._is_loading = True if tab_id: - self._tab_obj = ChromiumDriver(id=tab_id, type='page', - webSocketDebuggerUrl=f'ws://{self.address}/devtools/page/{tab_id}') + self._tab_obj = ChromiumDriver(tab_id=tab_id, tab_type='page', + ws_url=f'ws://{self.address}/devtools/page/{tab_id}') self._tab_obj.start() self._tab_obj.DOM.enable() diff --git a/DrissionPage/chromium_driver.py b/DrissionPage/chromium_driver.py index b7d2ce0..6c02251 100644 --- a/DrissionPage/chromium_driver.py +++ b/DrissionPage/chromium_driver.py @@ -1,17 +1,12 @@ # -*- coding: utf-8 -*- from functools import partial from json import dumps, loads -from os import getenv +from queue import Queue, Empty from threading import Thread, Event from websocket import WebSocketTimeoutException, WebSocketException, WebSocketConnectionClosedException, \ create_connection -try: - import Queue as queue -except ImportError: - import queue - class GenericAttr(object): def __init__(self, name, tab): @@ -32,21 +27,23 @@ class GenericAttr(object): class ChromiumDriver(object): - status_initial = 'initial' - status_started = 'started' - status_stopped = 'stopped' + _INITIAL_ = 'initial' + _STARTED_ = 'started' + _STOPPED_ = 'stopped' - def __init__(self, **kwargs): - self.id = kwargs.get("id") - self.type = kwargs.get("type") - self.debug = getenv("DEBUG", False) + def __init__(self, tab_id, tab_type, ws_url): + """ + :param tab_id: 标签页id + :param tab_type: 标签页类型 + :param ws_url: web socket url + """ + self.id = tab_id + self.type = tab_type + self.debug = False self.has_alert = False - self._websocket_url = kwargs.get("webSocketDebuggerUrl") - self._kwargs = kwargs - + self._websocket_url = ws_url self._cur_id = 0 - self._ws = None self._recv_th = Thread(target=self._recv_loop) @@ -56,21 +53,31 @@ class ChromiumDriver(object): self._stopped = Event() self._started = False - self.status = self.status_initial + self.status = self._INITIAL_ self.event_handlers = {} self.method_results = {} - self.event_queue = queue.Queue() + self.event_queue = Queue() + + @property + def websocket_url(self): + """返回websocket连接地址""" + return self._websocket_url def _send(self, message, timeout=None): + """发送信息到浏览器,并返回浏览器返回的信息 + :param message: 发送给浏览器的数据 + :param timeout: 超时时间 + :return: 浏览器返回的数据 + """ if 'id' not in message: self._cur_id += 1 message['id'] = self._cur_id message_json = dumps(message) - if self.debug: # pragma: no cover - print(f"SEND > {message_json}") + if self.debug: + print(f"发> {message_json}") if not isinstance(timeout, (int, float)) or timeout > 1: q_timeout = 1 @@ -78,9 +85,7 @@ class ChromiumDriver(object): q_timeout = timeout / 2.0 try: - self.method_results[message['id']] = queue.Queue() - - # just raise the exception to user + self.method_results[message['id']] = Queue() self._ws.send(message_json) while not self._stopped.is_set(): @@ -88,11 +93,11 @@ class ChromiumDriver(object): if isinstance(timeout, (int, float)): if timeout < q_timeout: q_timeout = timeout - timeout -= q_timeout return self.method_results[message['id']].get(timeout=q_timeout) - except queue.Empty: + + except Empty: if self.has_alert: return {'result': {'alert': True}} if isinstance(timeout, (int, float)) and timeout <= 0: @@ -100,11 +105,11 @@ class ChromiumDriver(object): continue - # raise UserAbortException("User abort, call stop() when calling %s" % message['method']) finally: self.method_results.pop(message['id'], None) def _recv_loop(self): + """接收浏览器信息的守护线程方法""" while not self._stopped.is_set(): try: self._ws.settimeout(1) @@ -114,12 +119,11 @@ class ChromiumDriver(object): continue except (WebSocketException, OSError, WebSocketConnectionClosedException): if not self._stopped.is_set(): - # logger.error("websocket exception", exc_info=True) self._stopped.set() return - if self.debug: # pragma: no cover - print(f'< RECV {message_json}') + if self.debug: + print(f'<收 {message_json}') if "method" in message: self.event_queue.put(message) @@ -127,14 +131,16 @@ class ChromiumDriver(object): elif "id" in message: if message["id"] in self.method_results: self.method_results[message['id']].put(message) - # else: # pragma: no cover - # warn("unknown message: %s" % message) + + elif self.debug: + print(f'未知信息:{message}') def _handle_event_loop(self): + """当接收到浏览器信息,执行已绑定的方法""" while not self._stopped.is_set(): try: event = self.event_queue.get(timeout=1) - except queue.Empty: + except Empty: continue if event['method'] in self.event_handlers: @@ -151,9 +157,14 @@ class ChromiumDriver(object): return attr def call_method(self, _method, *args, **kwargs): + """执行cdp方法 \n + :param _method: cdp方法名 + :param args: cdp参数 + :param kwargs: cdp参数 + :return: 执行结果 + """ if not self._started: raise RuntimeError("不能在启动前调用方法。") - if args: raise CallMethodException("参数必须是key=value形式。") @@ -168,14 +179,14 @@ class ChromiumDriver(object): return result['result'] def start(self): + """启动连接""" if self._started: return False - if not self._websocket_url: raise RuntimeError("已存在另一个连接。") self._started = True - self.status = self.status_started + self.status = self._STARTED_ self._stopped.clear() self._ws = create_connection(self._websocket_url, enable_multithread=True) self._recv_th.start() @@ -183,22 +194,26 @@ class ChromiumDriver(object): return True def stop(self): + """中断连接""" if self._stopped.is_set(): return False - if not self._started: raise RuntimeError("Driver正在运行。") - self.status = self.status_stopped + self.status = self._STOPPED_ self._stopped.set() if self._ws: self._ws.close() return True def set_listener(self, event, callback): + """绑定cdp event和回调方法 \n + :param event: cdp event + :param callback: 绑定到cdp event的回调方法 + :return: 回调方法 + """ if not callback: return self.event_handlers.pop(event, None) - if not callable(callback): raise RuntimeError("方法不能调用。") @@ -206,23 +221,12 @@ class ChromiumDriver(object): return True def get_listener(self, event): + """获取cdp event对应的回调方法 \n + :param event: cdp event + :return: 回调方法 + """ return self.event_handlers.get(event, None) - def del_all_listeners(self): - self.event_handlers = {} - return True - - def wait(self, timeout=None): - if not self._started: - raise RuntimeError("Driver仍未运行。") - - if timeout: - return self._stopped.wait(timeout) - - self._recv_th.join() - self._handle_event_th.join() - return True - def __str__(self): return f"" diff --git a/DrissionPage/chromium_driver.pyi b/DrissionPage/chromium_driver.pyi new file mode 100644 index 0000000..f7dbe97 --- /dev/null +++ b/DrissionPage/chromium_driver.pyi @@ -0,0 +1,61 @@ +# -*- coding: utf-8 -*- +from queue import Queue +from threading import Thread, Event +from typing import Union, Callable + + +class GenericAttr(object): + def __init__(self, name: str, tab: ChromiumDriver): ... + + def __getattr__(self, item: str) -> Callable: ... + + def __setattr__(self, key: str, value: Callable) -> None: ... + + +class ChromiumDriver(object): + _INITIAL_: str + _STARTED_: str + _STOPPED_: str + id: str + type: str + debug: bool + has_alert: bool + _websocket_url: str + _cur_id: int + _ws = None + _recv_th: Thread + _handle_event_th: Thread + _stopped: Event + _started: bool + status: str + event_handlers: dict + method_results: dict + event_queue: Queue + + def __init__(self, tab_id: str, tab_type: str, ws_url: str): ... + + @property + def websocket_url(self) -> str: ... + + def _send(self, message: dict, timeout: Union[int, float] = None) -> dict: ... + + def _recv_loop(self) -> None: ... + + def _handle_event_loop(self) -> None: ... + + def __getattr__(self, item: str) -> Callable: ... + + def call_method(self, _method: str, *args, **kwargs) -> dict: ... + + def start(self) -> bool: ... + + def stop(self) -> bool: ... + + def set_listener(self, event: str, callback: Callable) -> Union[Callable, None, bool]: ... + + def get_listener(self, event: str) -> Union[Callable, None]: ... + + def __str__(self) -> str: ... + + +class CallMethodException(Exception): ... diff --git a/DrissionPage/chromium_page.py b/DrissionPage/chromium_page.py index c87f62f..501fe93 100644 --- a/DrissionPage/chromium_page.py +++ b/DrissionPage/chromium_page.py @@ -65,7 +65,7 @@ class ChromiumPage(ChromiumBase): # 接收传递过来的ChromiumDriver,浏览器 elif isinstance(addr_driver_opts, ChromiumDriver): self._tab_obj = addr_driver_opts - self.address = search(r'ws://(.*?)/dev', addr_driver_opts._websocket_url).group(1) + self.address = search(r'ws://(.*?)/dev', addr_driver_opts.websocket_url).group(1) self.process = None self.options = DriverOptions(read_file=False)