# -*- coding: utf-8 -*- """ @Author : g1879 @Contact : g1879@qq.com @Copyright: (c) 2024 by g1879, Inc. All Rights Reserved. @License : BSD 3-Clause. """ from json import dumps, loads, JSONDecodeError from queue import Queue, Empty from threading import Thread from time import perf_counter, sleep from requests import adapters from requests import Session from websocket import (WebSocketTimeoutException, WebSocketConnectionClosedException, create_connection, WebSocketException, WebSocketBadStatusException) from .._functions.settings import Settings from ..errors import PageDisconnectedError, BrowserConnectError adapters.DEFAULT_RETRIES = 5 class Driver(object): def __init__(self, tab_id, tab_type, address, owner=None): self.id = tab_id self.address = address self.type = tab_type self.owner = owner # self._debug = True # self._debug = False self.alert_flag = False # 标记alert出现,跳过一条请求后复原 self._websocket_url = f'ws://{address}/devtools/{tab_type}/{tab_id}' self._cur_id = 0 self._ws = None self._recv_th = Thread(target=self._recv_loop) self._handle_event_th = Thread(target=self._handle_event_loop) self._recv_th.daemon = True self._handle_event_th.daemon = True self._handle_immediate_event_th = None self.is_running = False self.event_handlers = {} self.immediate_event_handlers = {} self.method_results = {} self.event_queue = Queue() self.immediate_event_queue = Queue() self.start() def _send(self, message, timeout=None): self._cur_id += 1 ws_id = self._cur_id message['id'] = ws_id message_json = dumps(message) # if self._debug: # if self._debug is True or (isinstance(self._debug, str) and # message.get('method', '').startswith(self._debug)): # print(f'发> {message_json}') # elif isinstance(self._debug, (list, tuple, set)): # for m in self._debug: # if message.get('method', '').startswith(m): # print(f'发> {message_json}') # break end_time = perf_counter() + timeout if timeout is not None else None self.method_results[ws_id] = Queue() try: self._ws.send(message_json) if timeout == 0: self.method_results.pop(ws_id, None) return {'id': ws_id, 'result': {}} except (OSError, WebSocketConnectionClosedException): self.method_results.pop(ws_id, None) return {'error': {'message': 'connection disconnected'}, 'type': 'connection_error'} while self.is_running: try: result = self.method_results[ws_id].get(timeout=.2) self.method_results.pop(ws_id, None) return result except Empty: if self.alert_flag and message['method'].startswith(('Input.', 'Runtime.')): return {'error': {'message': 'alert exists.'}, 'type': 'alert_exists'} if timeout is not None and perf_counter() > end_time: self.method_results.pop(ws_id, None) return {'error': {'message': 'alert exists.'}, 'type': 'alert_exists'} \ if self.alert_flag else {'error': {'message': 'timeout'}, 'type': 'timeout'} continue return {'error': {'message': 'connection disconnected'}, 'type': 'connection_error'} def _recv_loop(self): while self.is_running: try: # self._ws.settimeout(1) msg_json = self._ws.recv() msg = loads(msg_json) except WebSocketTimeoutException: continue except (WebSocketException, OSError, WebSocketConnectionClosedException, JSONDecodeError): self._stop() return # if self._debug: # if self._debug is True or 'id' in msg or (isinstance(self._debug, str) # and msg.get('method', '').startswith(self._debug)): # print(f'<收 {msg_json}') # elif isinstance(self._debug, (list, tuple, set)): # for m in self._debug: # if msg.get('method', '').startswith(m): # print(f'<收 {msg_json}') # break if 'method' in msg: if msg['method'].startswith('Page.javascriptDialog'): self.alert_flag = msg['method'].endswith('Opening') function = self.immediate_event_handlers.get(msg['method']) if function: self._handle_immediate_event(function, msg['params']) else: self.event_queue.put(msg) elif msg.get('id') in self.method_results: self.method_results[msg['id']].put(msg) # elif self._debug: # print(f'未知信息:{msg}') def _handle_event_loop(self): while self.is_running: try: event = self.event_queue.get(timeout=1) except Empty: continue function = self.event_handlers.get(event['method']) if function: function(**event['params']) self.event_queue.task_done() def _handle_immediate_event_loop(self): while not self.immediate_event_queue.empty(): function, kwargs = self.immediate_event_queue.get(timeout=1) try: function(**kwargs) except PageDisconnectedError: pass def _handle_immediate_event(self, function, kwargs): self.immediate_event_queue.put((function, kwargs)) if self._handle_immediate_event_th is None or not self._handle_immediate_event_th.is_alive(): self._handle_immediate_event_th = Thread(target=self._handle_immediate_event_loop) self._handle_immediate_event_th.daemon = True self._handle_immediate_event_th.start() def run(self, _method, **kwargs): """执行cdp方法 :param _method: cdp方法名 :param kwargs: cdp参数 :return: 执行结果 """ if not self.is_running: return {'error': 'connection disconnected', 'type': 'connection_error'} timeout = kwargs.pop('_timeout', Settings.cdp_timeout) result = self._send({'method': _method, 'params': kwargs}, timeout=timeout) if 'result' not in result and 'error' in result: kwargs['_timeout'] = timeout return {'error': result['error']['message'], 'type': result.get('type', 'call_method_error'), 'method': _method, 'args': kwargs, 'data': result['error'].get('data')} else: return result['result'] def start(self): self.is_running = True try: self._ws = create_connection(self._websocket_url, enable_multithread=True, suppress_origin=True) except WebSocketBadStatusException as e: if 'Handshake status 403 Forbidden' in str(e): raise RuntimeError('请升级websocket-client库。') else: return except ConnectionRefusedError: raise BrowserConnectError('浏览器未开启或已关闭。') self._recv_th.start() self._handle_event_th.start() return True def stop(self): self._stop() while self._handle_event_th.is_alive() or self._recv_th.is_alive(): sleep(.1) return True def _stop(self): if not self.is_running: return False self.is_running = False if self._ws: self._ws.close() self._ws = None # try: # while not self.immediate_event_queue.empty(): # function, kwargs = self.immediate_event_queue.get_nowait() # try: # function(**kwargs) # except PageDisconnectedError: # raise # pass # sleep(.1) # # while not self.event_queue.empty(): # event = self.event_queue.get_nowait() # function = self.event_handlers.get(event['method']) # if function: # function(**event['params']) # sleep(.1) # except: # pass self.event_handlers.clear() self.method_results.clear() self.event_queue.queue.clear() if hasattr(self.owner, '_on_disconnect'): self.owner._on_disconnect() def set_callback(self, event, callback, immediate=False): handler = self.immediate_event_handlers if immediate else self.event_handlers if callback: handler[event] = callback else: handler.pop(event, None) class BrowserDriver(Driver): BROWSERS = {} def __new__(cls, tab_id, tab_type, address, owner): if tab_id in cls.BROWSERS: return cls.BROWSERS[tab_id] return object.__new__(cls) def __init__(self, tab_id, tab_type, address, owner): if hasattr(self, '_created'): return self._created = True BrowserDriver.BROWSERS[tab_id] = self super().__init__(tab_id, tab_type, address, owner) def __repr__(self): return f'' def get(self, url): s = Session() s.trust_env = False s.keep_alive = False r = s.get(url, headers={'Connection': 'close'}) r.close() s.close() return r