2.6.2 d模式增加stop_loading()方法;优化监听器功能

This commit is contained in:
g1879 2022-05-18 00:12:00 +08:00
parent 912f69269a
commit b197e13dce
5 changed files with 53 additions and 41 deletions

View File

@ -414,6 +414,10 @@ class DriverPage(BasePage):
"""刷新当前页面"""
self.driver.refresh()
def stop_loading(self) -> None:
"""强制停止页面加载"""
self.run_cdp('Page.stopLoading', {})
def back(self) -> None:
"""在浏览历史中后退一步"""
self.driver.back()

View File

@ -4,6 +4,7 @@
"""
from json import loads, JSONDecodeError
from threading import Thread
from queue import Queue
from time import perf_counter, sleep
from typing import Union, Tuple, List, Iterable
@ -61,10 +62,9 @@ class Listener(object):
self.targets = True
self.results = {}
self._response_count = 0
self._requestIds = {}
self._a_response_loaded = False
self._all_response = [] # 捕捉到的所有数据格式[(target, ResponseData), ...]
self._response_count = None
self._requestIds = None
self._tmp_response = None # 捕捉到的所有数据格式[(target, ResponseData), ...]
def set_targets(self, targets: Union[str, List[str], Tuple[str], bool, None]) -> None:
"""设置要拦截的目标,可以设置多个 \n
@ -102,7 +102,7 @@ class Listener(object):
def listen(self, targets: Union[str, List[str], Tuple[str], bool, None] = None,
count: int = None,
timeout: float = None,
asyn: bool = False) -> None:
asyn: bool = True) -> None:
"""拦截目标请求,直到超时或达到拦截个数,每次拦截前清空结果 \n
可监听多个目标请求url包含这些字符串就会被记录 \n
:param targets: 要监听的目标字符串或其组成的列表True监听所有None则保留之前的目标不变
@ -114,13 +114,16 @@ class Listener(object):
if targets:
self.set_targets(targets)
self.tab.Network.responseReceived = self._response_received
self.tab.Network.loadingFinished = self._loading_finished
self.tab.start()
self.tab.Network.enable()
self.listening = True
self.results = {}
self._response_count = 0
self._requestIds = {}
self._tmp_response = Queue(maxsize=0)
self.tab.Network.responseReceived = self._response_received
self.tab.Network.loadingFinished = self._loading_finished
if asyn:
Thread(target=self._do_listen, args=(count, timeout)).start()
@ -143,26 +146,6 @@ class Listener(object):
"""
return self.results.get(next(iter(self.results))) if target is None else self.results.get(target, None)
def _do_listen(self,
count: int = None,
timeout: float = None) -> None:
"""执行监听 \n
:param count: 要记录的个数到达个数停止监听
:param timeout: 监听最长时间到时间即使未达到记录个数也停止None为无限长
:return: None
"""
t1 = perf_counter()
while True: # 当收到停止信号、到达须获取结果数、到时间就停止
if not self.listening \
or (count is not None and self._response_count >= count) \
or (timeout is not None and perf_counter() - t1 >= timeout):
break
sleep(.5)
self.tab.Network.responseReceived = self._null_function
self.tab.Network.loadingFinished = self._null_function
self.listening = False
def steps(self, gap: int = 1) -> Iterable:
"""用于单步操作,可实现没收到若干个数据包执行一步操作(如翻页) \n
于是可以根据数据包是否加载完成来决定是否翻页无须从页面dom去判断是否加载完成 \n
@ -172,16 +155,30 @@ class Listener(object):
:param gap: 每接收到多少个数据包触发
:return: 用于在接收到监听目标时触发动作的可迭代对象
"""
count = 0
while True:
if not self.listening:
return
while self.listening:
while self._tmp_response.qsize() >= gap:
yield [self._tmp_response.get(False) for _ in range(gap)]
if self._a_response_loaded:
self._a_response_loaded = False
count += 1
if count % gap == 0:
yield self._all_response[-gap:]
sleep(.1)
def _do_listen(self,
count: int = None,
timeout: float = None) -> None:
"""执行监听 \n
:param count: 要记录的个数到达个数停止监听
:param timeout: 监听最长时间到时间即使未达到记录个数也停止None为无限长
:return: None
"""
t1 = perf_counter()
# 当收到停止信号、到达须获取结果数、到时间就停止
while self.listening \
and (count is None or self._response_count < count) \
and (timeout is None or perf_counter() - t1 < timeout):
sleep(.5)
self.tab.Network.responseReceived = self._null_function
self.tab.Network.loadingFinished = self._null_function
self.listening = False
def _loading_finished(self, **kwargs):
"""请求完成时处理方法"""
@ -192,15 +189,13 @@ class Listener(object):
response = ResponseData(target['response'], self._get_response_body(requestId))
target = target['target']
self._response_count += 1
self._tmp_response.put((target, response))
if target in self.results:
self.results[target].append(response)
else:
self.results[target] = [response]
self._all_response.append((target, response))
self._a_response_loaded = True
def _response_received(self, **kwargs) -> None:
"""接收到返回信息时处理方法"""
if self.targets is True:

View File

@ -427,6 +427,14 @@ page.scroll_to_see((By.XPATH, '//div'))
返回:`None`
## stop_loading()
此方法用于强制当前页面加载。
参数:无
返回:`None`
## back()
此方法用于在浏览历史中后退一步。

View File

@ -1,3 +1,8 @@
# v2.6.2
- d 模式增加`stop_loading()`方法
- 优化完善监听器功能
# v2.6.0
- 新增`Listener`

View File

@ -6,7 +6,7 @@ with open("README.md", "r", encoding='utf-8') as fh:
setup(
name="DrissionPage",
version="2.6.1",
version="2.6.2",
author="g1879",
author_email="g1879@qq.com",
description="A module that integrates selenium and requests session, encapsulates common page operations.",