mirror of
https://gitee.com/g1879/DrissionPage.git
synced 2024-12-10 04:00:23 +08:00
修改
This commit is contained in:
parent
5ab30b7916
commit
95b019b116
@ -2,6 +2,7 @@
|
||||
"""
|
||||
实用工具
|
||||
"""
|
||||
from json import loads, JSONDecodeError
|
||||
from threading import Thread
|
||||
from time import perf_counter, sleep
|
||||
from typing import Union, Tuple, List
|
||||
@ -13,6 +14,30 @@ from .easy_set import get_match_driver
|
||||
from .mix_page import MixPage
|
||||
|
||||
|
||||
class ResponseData(object):
|
||||
def __init__(self, response: dict, body: str):
|
||||
self.response = response
|
||||
self.raw_body = body
|
||||
self._json_body = None
|
||||
|
||||
def __getattr__(self, item):
|
||||
return self.response.get(item, None)
|
||||
|
||||
@property
|
||||
def body(self):
|
||||
if self._json_body is not False and self.response.get('mimeType', None) == 'application/json':
|
||||
if self._json_body is None:
|
||||
try:
|
||||
self._json_body = loads(self.raw_body)
|
||||
except JSONDecodeError:
|
||||
self._json_body = False
|
||||
return self.raw_body
|
||||
return self._json_body
|
||||
|
||||
else:
|
||||
return self.raw_body
|
||||
|
||||
|
||||
class Listener(object):
|
||||
"""浏览器的数据包监听器"""
|
||||
|
||||
@ -27,11 +52,18 @@ class Listener(object):
|
||||
|
||||
self.listening = False
|
||||
self.targets = None
|
||||
self.results: dict = {}
|
||||
self.results = {}
|
||||
|
||||
self._response_count = 0
|
||||
self._requestIds = {}
|
||||
|
||||
# @property
|
||||
# def first_result(self) -> Union[ResponseData, None]:
|
||||
# try:
|
||||
# return list(self.results.values())[0][0]
|
||||
# except IndexError:
|
||||
# return None
|
||||
|
||||
def set_targets(self, targets: Union[str, List[str], Tuple[str]]) -> None:
|
||||
"""设置要拦截的目标,可以设置多个 \n
|
||||
:param targets: 字符串或字符串组成的列表
|
||||
@ -94,6 +126,13 @@ class Listener(object):
|
||||
while self.listening:
|
||||
sleep(.5)
|
||||
|
||||
def get_results(self, target: str = None) -> List[ResponseData]:
|
||||
"""获取结果列表 \n
|
||||
:param target: 要获取的目标,为None时获取第一个
|
||||
:return: 结果数据组成的列表
|
||||
"""
|
||||
return self.results.get(next(iter(self.results))) if target is None else self.results.get(target, None)
|
||||
|
||||
def _do_listen(self,
|
||||
count: int = None,
|
||||
timeout: float = None) -> None:
|
||||
@ -119,8 +158,7 @@ class Listener(object):
|
||||
requestId = kwargs['requestId']
|
||||
target = self._requestIds.pop(requestId, None)
|
||||
if target is not None:
|
||||
response = target['response']
|
||||
response['body'] = self._get_response_body(requestId)
|
||||
response = ResponseData(target['response'], self._get_response_body(requestId))
|
||||
target = target['target']
|
||||
if target in self.results:
|
||||
self.results[target].append(response)
|
||||
|
Loading…
x
Reference in New Issue
Block a user