优化处理立即执行的动作逻辑

This commit is contained in:
g1879 2024-01-02 15:26:13 +08:00
parent f2e147a7e2
commit a20fafebd7
5 changed files with 34 additions and 81 deletions

View File

@ -36,6 +36,7 @@ class Driver(object):
self._handle_event_th = Thread(target=self._handle_event_loop)
self._recv_th.daemon = True
self._handle_event_th.daemon = True
self._handle_immediate_event_th = None
self._stopped = Event()
@ -43,6 +44,7 @@ class Driver(object):
self.immediate_event_handlers = {}
self.method_results = {}
self.event_queue = Queue()
self.immediate_event_queue = Queue()
self.start()
@ -126,8 +128,7 @@ class Driver(object):
self.alert_flag = msg['method'].endswith('Opening')
function = self.immediate_event_handlers.get(msg['method'])
if function:
Thread(target=run_function, args=(function, msg['params'])).start()
# function(**msg['params'])
self._handle_immediate_event(function, msg['params'])
else:
self.event_queue.put(msg)
@ -151,6 +152,26 @@ class Driver(object):
self.event_queue.task_done()
def _handle_immediate_event_loop(self):
while not self._stopped.is_set() and not self.immediate_event_queue.empty():
function, kwargs = self.immediate_event_queue.get(timeout=1)
try:
function(**kwargs)
except PageDisconnectedError:
pass
def _handle_immediate_event(self, function, kwargs):
"""处理立即执行的动作
:param function: 要运行下方法
:param kwargs: 方法参数
:return: None
"""
self.immediate_event_queue.put((function, kwargs))
if self._handle_immediate_event_th is None or not self._handle_immediate_event_th.is_alive():
self._handle_immediate_event_th = Thread(target=self._handle_immediate_event_loop)
self._handle_immediate_event_th.daemon = True
self._handle_immediate_event_th.start()
def run(self, _method, **kwargs):
"""执行cdp方法
:param _method: cdp方法名
@ -220,11 +241,6 @@ class Driver(object):
else:
handler.pop(event, None)
def __str__(self):
return f'<Driver {self.id}>'
__repr__ = __str__
class BrowserDriver(Driver):
BROWSERS = {}
@ -253,10 +269,3 @@ class BrowserDriver(Driver):
def stop(self):
super().stop()
self.browser._on_quit()
def run_function(function, kwargs):
try:
function(**kwargs)
except PageDisconnectedError:
pass

View File

@ -25,18 +25,20 @@ class Driver(object):
id: str
address: str
type: str
_debug: bool
# _debug: bool
alert_flag: bool
_websocket_url: str
_cur_id: int
_ws: Optional[WebSocket]
_recv_th: Thread
_handle_event_th: Thread
_handle_immediate_event_th: Optional[Thread]
_stopped: Event
event_handlers: dict
immediate_event_handlers: dict
method_results: dict
event_queue: Queue
immediate_event_queue: Queue
def __init__(self, tab_id: str, tab_type: str, address: str): ...
@ -46,7 +48,9 @@ class Driver(object):
def _handle_event_loop(self) -> None: ...
def __getattr__(self, item: str) -> Callable: ...
def _handle_immediate_event_loop(self): ...
def _handle_immediate_event(self, function: Callable, kwargs: dict): ...
def run(self, _method: str, **kwargs) -> dict: ...
@ -58,8 +62,6 @@ class Driver(object):
def set_callback(self, event: str, callback: Union[Callable, None], immediate: bool = False) -> None: ...
def __str__(self) -> str: ...
class BrowserDriver(Driver):
BROWSERS: Dict[str, Driver] = ...

View File

@ -44,7 +44,6 @@ class ChromiumBase(BasePage):
super().__init__()
self._is_loading = None
self._root_id = None # object id
self._debug = False
self._set = None
self._screencast = None
self._actions = None
@ -114,8 +113,6 @@ class ChromiumBase(BasePage):
if self._js_ready_state == 'complete' and self._ready_state is None:
self._get_document()
self._ready_state = 'complete'
if self._debug:
print(f'{self._frame_id}在connect_browser变成complete')
def _driver_init(self, tab_id):
"""新建页面、页面刷新、切换标签页后要进行的cdp参数初始化
@ -145,15 +142,13 @@ class ChromiumBase(BasePage):
self._driver.set_callback('Page.loadEventFired', self._onLoadEventFired)
self._driver.set_callback('Page.frameStoppedLoading', self._onFrameStoppedLoading)
self._driver.set_callback('Page.frameAttached', self._onFrameAttached)
self._driver.set_callback('Page.frameDetached', self._onFrameDetached, immediate=True)
self._driver.set_callback('Page.frameDetached', self._onFrameDetached)
def _get_document(self, timeout=10):
"""获取页面文档
:param timeout: 超时时间
:return: 是否获取成功
"""
if self._debug:
print('获取文档开始')
if self._is_reading:
return
timeout = timeout if timeout >= .5 else .5
@ -169,8 +164,6 @@ class ChromiumBase(BasePage):
r = self.run_cdp('Page.getFrameTree')
for i in findall(r"'id': '(.*?)'", str(r)):
self.browser._frames[i] = self.tab_id
if self._debug:
print('获取文档结束')
result = True
break
@ -195,10 +188,6 @@ class ChromiumBase(BasePage):
"""页面开始加载时执行"""
self.browser._frames[kwargs['frameId']] = self.tab_id
if kwargs['frameId'] == self._frame_id:
if self._debug:
print(f'{self._frame_id}触发FrameStartedLoading')
print('在FrameStartedLoading变成loading')
self._doc_got = False
self._ready_state = 'connecting'
self._is_loading = True
@ -208,66 +197,35 @@ class ChromiumBase(BasePage):
t.daemon = True
t.start()
if self._debug:
print(f'{self._frame_id}执行FrameStartedLoading完毕')
def _onFrameNavigated(self, **kwargs):
"""页面跳转时执行"""
if kwargs['frame']['id'] == self._frame_id:
if self._debug:
print(f'{self._frame_id}触发FrameNavigated')
print('在FrameNavigated变成loading')
self._doc_got = False
self._ready_state = 'loading'
self._is_loading = True
if self._debug:
print(f'>>> FrameNavigated {kwargs}')
def _onDomContentEventFired(self, **kwargs):
"""在页面刷新、变化后重新读取页面内容"""
if self._debug:
print(f'{self._frame_id}触发DomContentEventFired')
print('在DomContentEventFired变成interactive')
if self._load_mode == 'eager':
self.run_cdp('Page.stopLoading')
if self._get_document(self._load_end_time - perf_counter() - .1):
self._doc_got = True
self._ready_state = 'interactive'
if self._debug:
print(f'{self._frame_id}执行DomContentEventFired完毕')
def _onLoadEventFired(self, **kwargs):
"""在页面刷新、变化后重新读取页面内容"""
if self._debug:
print(f'{self._frame_id}触发LoadEventFired')
print('在LoadEventFired变成complete')
if self._doc_got is False and self._get_document(self._load_end_time - perf_counter() - .1):
self._doc_got = True
self._ready_state = 'complete'
if self._debug:
print(f'{self._frame_id}执行LoadEventFired完毕')
def _onFrameStoppedLoading(self, **kwargs):
"""页面加载完成后执行"""
self.browser._frames[kwargs['frameId']] = self.tab_id
if kwargs['frameId'] == self._frame_id:
if self._debug:
print(f'{self._frame_id}触发FrameStoppedLoading')
print('在FrameStoppedLoading变成complete')
if self._doc_got is False:
self._get_document(self._load_end_time - perf_counter() - .1)
self._ready_state = 'complete'
if self._debug:
print(f'{self._frame_id}执行FrameStoppedLoading完毕')
def _onFileChooserOpened(self, **kwargs):
"""文件选择框打开时执行"""
if self._upload_list:
@ -669,8 +627,6 @@ class ChromiumBase(BasePage):
def stop_loading(self):
"""页面停止加载"""
if self._debug:
print('停止页面加载')
try:
self.run_cdp('Page.stopLoading')
except (PageDisconnectedError, CDPError):
@ -974,7 +930,7 @@ class ChromiumBase(BasePage):
if err:
if t < times:
sleep(interval)
if self._debug or show_errmsg:
if show_errmsg:
print(f'重试{t + 1} {to_url}')
end_time1 = end_time - perf_counter()
while self._ready_state not in ('loading', 'complete') and perf_counter() < end_time1: # 等待出错信息显示
@ -991,7 +947,7 @@ class ChromiumBase(BasePage):
err = TimeoutError(f'页面连接超时(等待{timeout}秒)。')
if t < times:
sleep(interval)
if self._debug or show_errmsg:
if show_errmsg:
print(f'重试{t + 1} {to_url}')
continue

View File

@ -46,7 +46,6 @@ class ChromiumBase(BasePage):
self._scroll: Scroller = ...
self._url: str = ...
self._root_id: str = ...
self._debug: bool = ...
self._upload_list: list = ...
self._wait: BaseWaiter = ...
self._set: ChromiumBaseSetter = ...

View File

@ -99,16 +99,15 @@ class ChromiumFrame(ChromiumBase):
self.browser.driver.get(f'http://{self.address}/json')
super()._driver_init(tab_id)
self._driver.set_callback('Inspector.detached', self._onInspectorDetached, immediate=True)
self._driver.set_callback('Page.frameDetached', None)
self._driver.set_callback('Page.frameDetached', self._onFrameDetached, immediate=True)
def _reload(self):
"""重新获取document"""
self._is_loading = True
debug = self._debug
d_debug = self.driver._debug
self._reloading = True
self._doc_got = False
if debug:
print(f'{self._frame_id} reload 开始')
self._driver.stop()
try:
@ -132,7 +131,6 @@ class ChromiumFrame(ChromiumBase):
if self._listener:
self._listener._to_target(self._target_page.tab_id, self.address, self)
super().__init__(self.address, self._target_page.tab_id, self._target_page.timeout)
self._debug = debug
self.driver._debug = d_debug
else:
@ -156,15 +154,11 @@ class ChromiumFrame(ChromiumBase):
# print(f'获取doc失败重试 {e}')
# else:
# raise GetDocumentError
self._debug = debug
self.driver._debug = d_debug
self._is_loading = False
self._reloading = False
if self._debug:
print(f'{self._frame_id} reload 完毕')
def _get_document(self, timeout=10):
"""刷新cdp使用的document数据
:param timeout: 超时时间
@ -173,9 +167,6 @@ class ChromiumFrame(ChromiumBase):
if self._is_reading:
return
if self._debug:
print('获取文档开始')
self._is_reading = True
try:
if self._is_diff_domain is False:
@ -192,13 +183,9 @@ class ChromiumFrame(ChromiumBase):
r = self.run_cdp('Page.getFrameTree')
for i in findall(r"'id': '(.*?)'", str(r)):
self.browser._frames[i] = self.tab_id
if self._debug:
print('获取文档结束')
return True
except:
if self._debug:
print('获取文档失败')
return False
finally: