# -*- coding: utf-8 -*- from __future__ import unicode_literals from functools import partial from json import dumps, loads from logging import getLogger from os import getenv from threading import Thread, Event from warnings import warn from websocket import WebSocketTimeoutException, WebSocketException, WebSocketConnectionClosedException, \ create_connection try: import Queue as queue except ImportError: import queue logger = getLogger(__name__) class GenericAttr(object): def __init__(self, name, tab): self.__dict__['name'] = name self.__dict__['tab'] = tab def __getattr__(self, item): method_name = "%s.%s" % (self.name, item) event_listener = self.tab.get_listener(method_name) if event_listener: return event_listener return partial(self.tab.call_method, method_name) def __setattr__(self, key, value): self.tab.set_listener("%s.%s" % (self.name, key), value) class Tab(object): status_initial = 'initial' status_started = 'started' status_stopped = 'stopped' def __init__(self, **kwargs): self.id = kwargs.get("id") self.type = kwargs.get("type") self.debug = getenv("DEBUG", False) self._websocket_url = kwargs.get("webSocketDebuggerUrl") self._kwargs = kwargs self._cur_id = 1000 self._ws = None self._recv_th = Thread(target=self._recv_loop) self._recv_th.daemon = True self._handle_event_th = Thread(target=self._handle_event_loop) self._handle_event_th.daemon = True self._stopped = Event() self._started = False self.status = self.status_initial self.event_handlers = {} self.method_results = {} self.event_queue = queue.Queue() def _send(self, message, timeout=None): if 'id' not in message: self._cur_id += 1 message['id'] = self._cur_id message_json = dumps(message) if self.debug: # pragma: no cover print("SEND > %s" % message_json) if not isinstance(timeout, (int, float)) or timeout > 1: q_timeout = 1 else: q_timeout = timeout / 2.0 try: self.method_results[message['id']] = queue.Queue() # just raise the exception to user self._ws.send(message_json) while not self._stopped.is_set(): try: if isinstance(timeout, (int, float)): if timeout < q_timeout: q_timeout = timeout timeout -= q_timeout return self.method_results[message['id']].get(timeout=q_timeout) except queue.Empty: if isinstance(timeout, (int, float)) and timeout <= 0: raise TimeoutException("Calling %s timeout" % message['method']) continue # raise UserAbortException("User abort, call stop() when calling %s" % message['method']) finally: self.method_results.pop(message['id'], None) def _recv_loop(self): while not self._stopped.is_set(): try: self._ws.settimeout(1) message_json = self._ws.recv() message = loads(message_json) except WebSocketTimeoutException: continue except (WebSocketException, OSError, WebSocketConnectionClosedException): if not self._stopped.is_set(): # logger.error("websocket exception", exc_info=True) self._stopped.set() return if self.debug: # pragma: no cover print('< RECV %s' % message_json) if "method" in message: self.event_queue.put(message) elif "id" in message: if message["id"] in self.method_results: self.method_results[message['id']].put(message) else: # pragma: no cover warn("unknown message: %s" % message) def _handle_event_loop(self): while not self._stopped.is_set(): try: event = self.event_queue.get(timeout=1) except queue.Empty: continue if event['method'] in self.event_handlers: try: self.event_handlers[event['method']](**event['params']) except Exception as e: logger.error("callback %s exception" % event['method'], exc_info=True) self.event_queue.task_done() def __getattr__(self, item): attr = GenericAttr(item, self) setattr(self, item, attr) return attr def call_method(self, _method, *args, **kwargs): if not self._started: raise RuntimeException("Cannot call method before it is started") if args: raise CallMethodException("the params should be key=value format") if self._stopped.is_set(): raise RuntimeException("Tab has been stopped") timeout = kwargs.pop("_timeout", None) result = self._send({"method": _method, "params": kwargs}, timeout=timeout) if 'result' not in result and 'error' in result: warn("%s error: %s" % (_method, result['error']['message'])) raise CallMethodException("calling method: %s error: %s" % (_method, result['error']['message'])) return result['result'] def set_listener(self, event, callback): if not callback: return self.event_handlers.pop(event, None) if not callable(callback): raise RuntimeException("callback should be callable") self.event_handlers[event] = callback return True def get_listener(self, event): return self.event_handlers.get(event, None) def del_all_listeners(self): self.event_handlers = {} return True def start(self): if self._started: return False if not self._websocket_url: raise RuntimeException("Already has another client connect to this tab") self._started = True self.status = self.status_started self._stopped.clear() self._ws = create_connection(self._websocket_url, enable_multithread=True) self._recv_th.start() self._handle_event_th.start() return True def stop(self): if self._stopped.is_set(): return False if not self._started: raise RuntimeException("Tab is not running") self.status = self.status_stopped self._stopped.set() if self._ws: self._ws.close() return True def wait(self, timeout=None): if not self._started: raise RuntimeException("Tab is not running") if timeout: return self._stopped.wait(timeout) self._recv_th.join() self._handle_event_th.join() return True def __str__(self): return "" % self.id __repr__ = __str__ class PyChromeException(Exception): pass class UserAbortException(PyChromeException): pass class TabConnectionException(PyChromeException): pass class CallMethodException(PyChromeException): pass class TimeoutException(PyChromeException): pass class RuntimeException(PyChromeException): pass