基本完成Listener修改

This commit is contained in:
g1879 2023-10-23 01:03:46 +08:00
parent e5a2a25473
commit 06a215d93a
8 changed files with 308 additions and 244 deletions

View File

@ -7,17 +7,18 @@ from os.path import basename, sep
from pathlib import Path from pathlib import Path
from time import perf_counter, sleep from time import perf_counter, sleep
from .session_element import make_session_ele
from .._base.base import DrissionElement, BaseElement from .._base.base import DrissionElement, BaseElement
from .._commons.constants import FRAME_ELEMENT, NoneElement, Settings from .._commons.constants import FRAME_ELEMENT, NoneElement, Settings
from .._commons.keys import keys_to_typing, keyDescriptionForString, keyDefinitions from .._commons.keys import keys_to_typing, keyDescriptionForString, keyDefinitions
from .._commons.locator import get_loc from .._commons.locator import get_loc
from .._commons.tools import get_usable_path from .._commons.tools import get_usable_path
from .._commons.web import make_absolute_link, get_ele_txt, format_html, is_js_func, location_in_viewport, offset_scroll from .._commons.web import make_absolute_link, get_ele_txt, format_html, is_js_func, location_in_viewport, offset_scroll
from ..errors import ContextLossError, ElementLossError, JavaScriptError, NoRectError, ElementNotFoundError, \ from .._units.clicker import Clicker
CDPError, NoResourceError, CanNotClickError
from .session_element import make_session_ele
from .._units.setter import ChromiumElementSetter from .._units.setter import ChromiumElementSetter
from .._units.waiter import ChromiumElementWaiter from .._units.waiter import ChromiumElementWaiter
from ..errors import ContextLossError, ElementLossError, JavaScriptError, ElementNotFoundError, \
CDPError, NoResourceError
class ChromiumElement(DrissionElement): class ChromiumElement(DrissionElement):
@ -37,7 +38,7 @@ class ChromiumElement(DrissionElement):
self._set = None self._set = None
self._states = None self._states = None
self._pseudo = None self._pseudo = None
self._click = None self._clicker = None
self._tag = None self._tag = None
self._wait = None self._wait = None
@ -183,9 +184,9 @@ class ChromiumElement(DrissionElement):
@property @property
def click(self): def click(self):
"""返回用于点击的对象""" """返回用于点击的对象"""
if self._click is None: if self._clicker is None:
self._click = Click(self) self._clicker = Clicker(self)
return self._click return self._clicker
@property @property
def wait(self): def wait(self):
@ -502,7 +503,7 @@ class ChromiumElement(DrissionElement):
if is_blob: if is_blob:
if base64_to_bytes: if base64_to_bytes:
from base64 import b64decode from base64 import b64decode
return b64decode(result.split(',', 1)[1]) return b64decode(result.split(',', 1)[-1])
else: else:
return result return result
@ -1598,111 +1599,6 @@ class Locations(object):
return x + sx, y + sy return x + sx, y + sy
class Click(object):
def __init__(self, ele):
"""
:param ele: ChromiumElement
"""
self._ele = ele
def __call__(self, by_js=False, timeout=1):
"""点击元素
如果遇到遮挡可选择是否用js点击
:param by_js: 是否用js点击为None时先用模拟点击遇到遮挡改用js为True时直接用js点击为False时只用模拟点击
:param timeout: 模拟点击的超时时间等待元素可见不被遮挡进入视口
:return: 是否点击成功
"""
return self.left(by_js, timeout)
def left(self, by_js=False, timeout=1):
"""点击元素可选择是否用js点击
:param by_js: 是否用js点击为None时先用模拟点击遇到遮挡改用js为True时直接用js点击为False时只用模拟点击
:param timeout: 模拟点击的超时时间等待元素可见不被遮挡进入视口
:return: 是否点击成功
"""
if not by_js:
try:
self._ele.scroll.to_see()
can_click = False
timeout = self._ele.page.timeout if timeout is None else timeout
if timeout == 0:
if self._ele.states.is_in_viewport and self._ele.states.is_enabled and self._ele.states.is_displayed:
can_click = True
else:
end_time = perf_counter() + timeout
while perf_counter() < end_time:
if self._ele.states.is_in_viewport and self._ele.states.is_enabled and self._ele.states.is_displayed:
can_click = True
break
if not self._ele.states.is_in_viewport:
by_js = True
elif can_click and (by_js is False or not self._ele.states.is_covered):
client_x, client_y = self._ele.locations.viewport_midpoint if self._ele.tag == 'input' \
else self._ele.locations.viewport_click_point
self._click(client_x, client_y)
return True
except NoRectError:
by_js = True
if by_js is not False:
self._ele.run_js('this.click();')
return True
if Settings.raise_when_click_failed:
raise CanNotClickError
return False
def right(self):
"""右键单击"""
self._ele.page.scroll.to_see(self._ele)
x, y = self._ele.locations.viewport_click_point
self._click(x, y, 'right')
def middle(self):
"""中键单击"""
self._ele.page.scroll.to_see(self._ele)
x, y = self._ele.locations.viewport_click_point
self._click(x, y, 'middle')
def at(self, offset_x=None, offset_y=None, button='left', count=1):
"""带偏移量点击本元素相对于左上角坐标。不传入x或y值时点击元素中间点
:param offset_x: 相对元素左上角坐标的x轴偏移量
:param offset_y: 相对元素左上角坐标的y轴偏移量
:param button: 点击哪个键可选 left, middle, right, back, forward
:param count: 点击次数
:return: None
"""
self._ele.page.scroll.to_see(self._ele)
if offset_x is None and offset_y is None:
w, h = self._ele.size
offset_x = w // 2
offset_y = h // 2
x, y = offset_scroll(self._ele, offset_x, offset_y)
self._click(x, y, button, count)
def twice(self):
"""双击元素"""
self.at(count=2)
def _click(self, client_x, client_y, button='left', count=1):
"""实施点击
:param client_x: 视口中的x坐标
:param client_y: 视口中的y坐标
:param button: 'left' 'right' 'middle' 'back' 'forward'
:param count: 点击次数
:return: None
"""
self._ele.page.run_cdp('Input.dispatchMouseEvent', type='mousePressed',
x=client_x, y=client_y, button=button, clickCount=count)
# sleep(.05)
self._ele.page.run_cdp('Input.dispatchMouseEvent', type='mouseReleased',
x=client_x, y=client_y, button=button)
class ChromiumScroll(object): class ChromiumScroll(object):
"""用于滚动的对象""" """用于滚动的对象"""

View File

@ -6,6 +6,7 @@
from pathlib import Path from pathlib import Path
from typing import Union, Tuple, List, Any from typing import Union, Tuple, List, Any
from .._units.clicker import Clicker
from .._base.base import DrissionElement, BaseElement from .._base.base import DrissionElement, BaseElement
from .._commons.constants import NoneElement from .._commons.constants import NoneElement
from .._elements.session_element import SessionElement from .._elements.session_element import SessionElement
@ -30,7 +31,7 @@ class ChromiumElement(DrissionElement):
self._doc_id: str = ... self._doc_id: str = ...
self._ids: ChromiumElementIds = ... self._ids: ChromiumElementIds = ...
self._scroll: ChromiumElementScroll = ... self._scroll: ChromiumElementScroll = ...
self._click: Click = ... self._clicker: Clicker = ...
self._select: ChromiumSelect = ... self._select: ChromiumSelect = ...
self._wait: ChromiumElementWaiter = ... self._wait: ChromiumElementWaiter = ...
self._locations: Locations = ... self._locations: Locations = ...
@ -94,7 +95,7 @@ class ChromiumElement(DrissionElement):
def scroll(self) -> ChromiumElementScroll: ... def scroll(self) -> ChromiumElementScroll: ...
@property @property
def click(self) -> Click: ... def click(self) -> Clicker: ...
def parent(self, level_or_loc: Union[tuple, str, int] = 1, index: int = 1) -> Union[ChromiumElement, None]: ... def parent(self, level_or_loc: Union[tuple, str, int] = 1, index: int = 1) -> Union[ChromiumElement, None]: ...
@ -437,25 +438,6 @@ class Locations(object):
def _get_page_coord(self, x: int, y: int) -> Tuple[int, int]: ... def _get_page_coord(self, x: int, y: int) -> Tuple[int, int]: ...
class Click(object):
def __init__(self, ele: ChromiumElement):
self._ele: ChromiumElement = ...
def __call__(self, by_js: Union[None, bool] = False, timeout: float = 1) -> bool: ...
def left(self, by_js: Union[None, bool] = False, timeout: float = 1) -> bool: ...
def right(self) -> None: ...
def middle(self) -> None: ...
def at(self, offset_x: int = None, offset_y: int = None, button: str = 'left', count: int = 1) -> None: ...
def twice(self, by_js: bool = False) -> None: ...
def _click(self, client_x: int, client_y: int, button: str = 'left', count: int = 1) -> None: ...
class ChromiumScroll(object): class ChromiumScroll(object):
def __init__(self, page_or_ele: Union[ChromiumBase, ChromiumElement, ChromiumFrame]): def __init__(self, page_or_ele: Union[ChromiumBase, ChromiumElement, ChromiumFrame]):
self.t1: str = ... self.t1: str = ...

View File

@ -0,0 +1,115 @@
# -*- coding:utf-8 -*-
"""
@Author : g1879
@Contact : g1879@qq.com
"""
from time import perf_counter
from .._commons.constants import Settings
from .._commons.web import offset_scroll
from ..errors import NoRectError, CanNotClickError
class Clicker(object):
def __init__(self, ele):
"""
:param ele: ChromiumElement
"""
self._ele = ele
def __call__(self, by_js=False, timeout=1):
"""点击元素
如果遇到遮挡可选择是否用js点击
:param by_js: 是否用js点击为None时先用模拟点击遇到遮挡改用js为True时直接用js点击为False时只用模拟点击
:param timeout: 模拟点击的超时时间等待元素可见不被遮挡进入视口
:return: 是否点击成功
"""
return self.left(by_js, timeout)
def left(self, by_js=False, timeout=1):
"""点击元素可选择是否用js点击
:param by_js: 是否用js点击为None时先用模拟点击遇到遮挡改用js为True时直接用js点击为False时只用模拟点击
:param timeout: 模拟点击的超时时间等待元素可见不被遮挡进入视口
:return: 是否点击成功
"""
if not by_js:
try:
self._ele.scroll.to_see()
can_click = False
timeout = self._ele.page.timeout if timeout is None else timeout
if timeout == 0:
if self._ele.states.is_in_viewport and self._ele.states.is_enabled and self._ele.states.is_displayed:
can_click = True
else:
end_time = perf_counter() + timeout
while perf_counter() < end_time:
if self._ele.states.is_in_viewport and self._ele.states.is_enabled and self._ele.states.is_displayed:
can_click = True
break
if not self._ele.states.is_in_viewport:
by_js = True
elif can_click and (by_js is False or not self._ele.states.is_covered):
client_x, client_y = self._ele.locations.viewport_midpoint if self._ele.tag == 'input' \
else self._ele.locations.viewport_click_point
self._click(client_x, client_y)
return True
except NoRectError:
by_js = True
if by_js is not False:
self._ele.run_js('this.click();')
return True
if Settings.raise_when_click_failed:
raise CanNotClickError
return False
def right(self):
"""右键单击"""
self._ele.page.scroll.to_see(self._ele)
x, y = self._ele.locations.viewport_click_point
self._click(x, y, 'right')
def middle(self):
"""中键单击"""
self._ele.page.scroll.to_see(self._ele)
x, y = self._ele.locations.viewport_click_point
self._click(x, y, 'middle')
def at(self, offset_x=None, offset_y=None, button='left', count=1):
"""带偏移量点击本元素相对于左上角坐标。不传入x或y值时点击元素中间点
:param offset_x: 相对元素左上角坐标的x轴偏移量
:param offset_y: 相对元素左上角坐标的y轴偏移量
:param button: 点击哪个键可选 left, middle, right, back, forward
:param count: 点击次数
:return: None
"""
self._ele.page.scroll.to_see(self._ele)
if offset_x is None and offset_y is None:
w, h = self._ele.size
offset_x = w // 2
offset_y = h // 2
x, y = offset_scroll(self._ele, offset_x, offset_y)
self._click(x, y, button, count)
def twice(self):
"""双击元素"""
self.at(count=2)
def _click(self, client_x, client_y, button='left', count=1):
"""实施点击
:param client_x: 视口中的x坐标
:param client_y: 视口中的y坐标
:param button: 'left' 'right' 'middle' 'back' 'forward'
:param count: 点击次数
:return: None
"""
self._ele.page.run_cdp('Input.dispatchMouseEvent', type='mousePressed',
x=client_x, y=client_y, button=button, clickCount=count)
# sleep(.05)
self._ele.page.run_cdp('Input.dispatchMouseEvent', type='mouseReleased',
x=client_x, y=client_y, button=button)

View File

@ -0,0 +1,27 @@
# -*- coding:utf-8 -*-
"""
@Author : g1879
@Contact : g1879@qq.com
"""
from typing import Union
from .._elements.chromium_element import ChromiumElement
class Clicker(object):
def __init__(self, ele: ChromiumElement):
self._ele: ChromiumElement = ...
def __call__(self, by_js: Union[None, bool] = False, timeout: float = 1) -> bool: ...
def left(self, by_js: Union[None, bool] = False, timeout: float = 1) -> bool: ...
def right(self) -> None: ...
def middle(self) -> None: ...
def at(self, offset_x: int = None, offset_y: int = None, button: str = 'left', count: int = 1) -> None: ...
def twice(self, by_js: bool = False) -> None: ...
def _click(self, client_x: int, client_y: int, button: str = 'left', count: int = 1) -> None: ...

View File

@ -7,7 +7,6 @@ from base64 import b64decode
from json import JSONDecodeError, loads from json import JSONDecodeError, loads
from queue import Queue from queue import Queue
from re import search from re import search
from threading import Thread
from time import perf_counter, sleep from time import perf_counter, sleep
from requests.structures import CaseInsensitiveDict from requests.structures import CaseInsensitiveDict
@ -23,24 +22,24 @@ class NetworkListener(object):
:param page: ChromiumBase对象 :param page: ChromiumBase对象
""" """
self._page = page self._page = page
self._driver = self._page.driver self._driver = page.driver
self._driver.call_method('Network.enable')
self._tmp = None # 临存捕捉到的数据 self._caught = None # 临存捕捉到的数据
self._request_ids = None # 暂存须要拦截的请求id self._request_ids = None # 暂存须要拦截的请求id
self._total_count = None # 当次监听的数量上限
self._caught_count = None # 当次已监听到的数量
self._begin_time = None # 当次监听开始时间
self._timeout = None # 当次监听超时时间
self.listening = False self.listening = False
self._targets = None # 默认监听所有 self._targets = None # 默认监听所有
self.tab_id = None # 当前tab的id self.tab_id = None # 当前tab的id
self._results = []
self._is_regex = False self._is_regex = False
self._method = None self._method = None
@property
def targets(self):
"""返回监听目标"""
return self._targets
def set_targets(self, targets=True, is_regex=False, method=None): def set_targets(self, targets=True, is_regex=False, method=None):
"""指定要等待的数据包 """指定要等待的数据包
:param targets: 要匹配的数据包url特征可用list等传入多个为True时获取所有 :param targets: 要匹配的数据包url特征可用list等传入多个为True时获取所有
@ -54,10 +53,7 @@ class NetworkListener(object):
if targets is True: if targets is True:
targets = '' targets = ''
if isinstance(targets, str): self._targets = {targets} if isinstance(targets, str) else set(targets)
self._targets = {targets}
else:
self._targets = set(targets)
self._is_regex = is_regex self._is_regex = is_regex
@ -69,93 +65,128 @@ class NetworkListener(object):
else: else:
raise TypeError('method参数只能是str、list、tuple、set类型。') raise TypeError('method参数只能是str、list、tuple、set类型。')
def listen(self, targets=None, count=None, timeout=None): def listen(self, targets=None, is_regex=False, method=None):
"""拦截目标请求,直到超时或达到拦截个数,每次拦截前清空结果 """拦截目标请求,每次拦截前清空结果
可监听多个目标请求url包含这些字符串就会被记录 :param targets: 要匹配的数据包url特征可用list等传入多个为True时获取所有
:param targets: 要监听的目标字符串或其组成的列表True监听所有None则保留之前的目标不变 :param is_regex: 设置的target是否正则表达式
:param count: 要记录的个数到达个数停止监听 :param method: 设置监听的请求类型可用list等指定多个为None时监听全部
:param timeout: 监听最长时间到时间即使未达到记录个数也停止None为无限长
:return: None :return: None
""" """
if targets: if targets:
self.set_targets(targets) self.set_targets(targets, is_regex, method)
self.listening = True self.listening = True
self._results = []
self._request_ids = {} self._request_ids = {}
self._tmp = Queue(maxsize=0) self._caught = Queue(maxsize=0)
self._caught_count = 0 self._set_callback()
self._begin_time = perf_counter()
self._timeout = timeout
self._set_callback_func() def wait(self, count=1, timeout=None, fix_count=True):
"""等待符合要求的数据包到达指定数量
self._total_count = len(self._targets) if not count else count :param count: 需要捕捉的数据包数量
:param timeout: 超时时间为None无限等待
Thread(target=self._wait_to_stop).start() :param fix_count: 是否必须满足总数要求发生超时为True返回False为False返回已捕捉到的数据包
:return: count为1时返回数据包对象大于1时返回列表超时且fix_count为True时返回False
def stop(self):
"""停止监听"""
self._stop()
self.listening = False
def wait(self):
"""等待监听结束"""
while self.listening:
sleep(.2)
return self._results
def get_results(self, target=None):
"""获取结果列表
:param target: 要获取的目标为None时获取全部
:return: 结果数据组成的列表
""" """
return self._results if target is None else [i for i in self._results if i.target == target] if not self.listening:
raise RuntimeError('监听未启动或已暂停。')
if not timeout:
while self._caught.qsize() < count:
sleep(.05)
fail = False
def _wait_to_stop(self): else:
"""当收到停止信号、到达须获取结果数、到时间就停止""" end = perf_counter() + count
while self._is_continue(): while True:
sleep(.2) if perf_counter() > end:
self.stop() fail = True
break
if self._caught.qsize() >= count:
fail = False
break
def _is_continue(self): if fail:
"""是否继续当前监听""" if fix_count or not self._caught.qsize():
return self.listening \ return False
and (self._total_count is None or self._caught_count < self._total_count) \ else:
and (self._timeout is None or perf_counter() - self._begin_time < self._timeout) return [self._caught.get_nowait() for _ in range(self._caught.qsize())]
def steps(self, gap=1): if count == 1:
return self._caught.get_nowait()
return [self._caught.get_nowait() for _ in range(count)]
def steps(self, count=None, timeout=None, gap=1):
"""用于单步操作,可实现没收到若干个数据包执行一步操作(如翻页) """用于单步操作,可实现没收到若干个数据包执行一步操作(如翻页)
:param count: 需捕获的数据包总数为None表示无限
:param timeout: 每个数据包等待时间为None表示无限
:param gap: 每接收到多少个数据包触发 :param gap: 每接收到多少个数据包触发
:return: 用于在接收到监听目标时触发动作的可迭代对象 :return: 用于在接收到监听目标时触发动作的可迭代对象
""" """
if not isinstance(gap, int) or gap < 1: caught = 0
raise ValueError('gap参数必须为大于0的整数。') end = perf_counter() + timeout if timeout else None
while self.listening or not self._tmp.empty(): while True:
while self._tmp.qsize() >= gap: if timeout and perf_counter() > end:
yield self._tmp.get(False) if gap == 1 else [self._tmp.get(False) for _ in range(gap)] return
if self._caught.qsize() >= gap:
yield self._caught.get_nowait() if gap == 1 else [self._caught.get_nowait() for _ in range(gap)]
if timeout:
end = perf_counter() + timeout
if count:
caught += gap
if caught >= count:
return
sleep(.05)
sleep(.1) def stop(self):
"""停止监听,清空已监听到的列表"""
if self.listening:
self.pause()
self.clear()
def _set_callback_func(self): def pause(self, clear=True):
"""暂停监听
:param clear: 是否清空已获取队列
:return: None
"""
if self.listening:
self._driver.set_listener('Network.requestWillBeSent', None)
self._driver.set_listener('Network.responseReceived', None)
self._driver.set_listener('Network.loadingFinished', None)
self._driver.set_listener('Network.loadingFailed', None)
self.listening = False
if clear:
self.clear()
def go_on(self):
"""继续暂停的监听"""
if self.listening:
return
self._set_callback()
self.listening = True
def clear(self):
"""清空结果"""
self._request_ids = {}
self._caught.queue.clear()
def _set_callback(self):
"""设置监听请求的回调函数""" """设置监听请求的回调函数"""
self._driver.set_listener('Network.requestWillBeSent', self._requestWillBeSent) self._driver.set_listener('Network.requestWillBeSent', self._requestWillBeSent)
self._driver.set_listener('Network.responseReceived', self._response_received) self._driver.set_listener('Network.responseReceived', self._response_received)
self._driver.set_listener('Network.loadingFinished', self._loading_finished) self._driver.set_listener('Network.loadingFinished', self._loading_finished)
self._driver.set_listener('Network.loadingFailed', self._loading_failed) self._driver.set_listener('Network.loadingFailed', self._loading_failed)
self._driver.call_method('Network.enable')
def _stop(self) -> None:
"""停止监听前要做的工作"""
self._driver.set_listener('Network.requestWillBeSent', None)
self._driver.set_listener('Network.responseReceived', None)
self._driver.set_listener('Network.loadingFinished', None)
self._driver.set_listener('Network.loadingFailed', None)
# self._driver.call_method('Network.disable')
def _requestWillBeSent(self, **kwargs): def _requestWillBeSent(self, **kwargs):
"""接收到请求时的回调函数""" """接收到请求时的回调函数"""
if not self._targets:
self._request_ids[kwargs['requestId']] = DataPacket(self._page.tab_id, None, kwargs)
if kwargs['request'].get('hasPostData', None) and not kwargs['request'].get('postData', None):
self._request_ids[kwargs['requestId']]._raw_post_data = \
self._page.run_cdp('Network.getRequestPostData', requestId=kwargs['requestId'])['postData']
return
for target in self._targets: for target in self._targets:
if ((self._is_regex and search(target, kwargs['request']['url'])) or if ((self._is_regex and search(target, kwargs['request']['url'])) or
(not self._is_regex and target in kwargs['request']['url'])) and ( (not self._is_regex and target in kwargs['request']['url'])) and (
@ -191,9 +222,11 @@ class NetworkListener(object):
dp._raw_body = body dp._raw_body = body
dp._base64_body = is_base64 dp._base64_body = is_base64
self._tmp.put(dp) self._caught.put(dp)
self._results.append(dp) try:
self._caught_count += 1 self._request_ids.pop(request_id)
except:
pass
def _loading_failed(self, **kwargs): def _loading_failed(self, **kwargs):
"""请求失败时的回调方法""" """请求失败时的回调方法"""
@ -203,21 +236,23 @@ class NetworkListener(object):
dp.errorText = kwargs['errorText'] dp.errorText = kwargs['errorText']
dp._resource_type = kwargs['type'] dp._resource_type = kwargs['type']
self._tmp.put(dp) self._caught.put(dp)
self._results.append(dp) try:
self._caught_count += 1 self._request_ids.pop(request_id)
except:
pass
class DataPacket(object): class DataPacket(object):
"""返回的数据包管理类""" """返回的数据包管理类"""
def __init__(self, tab, target, raw_request): def __init__(self, tab_id, target, raw_request):
""" """
:param tab: 产生这个数据包的tab的id :param tab_id: 产生这个数据包的tab的id
:param target: 监听目标 :param target: 监听目标
:param raw_request: 原始request数据从cdp获得 :param raw_request: 原始request数据从cdp获得
""" """
self.tab = tab self.tab = tab_id
self.target = target self.target = target
self._raw_request = raw_request self._raw_request = raw_request
@ -232,6 +267,10 @@ class DataPacket(object):
self.errorText = None self.errorText = None
self._resource_type = None self._resource_type = None
def __repr__(self):
t = f'"{self.target}"' if self.target is not None else None
return f'<DataPacket target={t} url="{self.url}">'
@property @property
def url(self): def url(self):
return self.request.url return self.request.url

View File

@ -4,7 +4,7 @@
@Contact : g1879@qq.com @Contact : g1879@qq.com
""" """
from queue import Queue from queue import Queue
from typing import Union, Dict, List, Iterable, Tuple from typing import Union, Dict, List, Iterable, Tuple, Optional
from requests.structures import CaseInsensitiveDict from requests.structures import CaseInsensitiveDict
@ -15,36 +15,36 @@ from .._pages.chromium_base import ChromiumBase
class NetworkListener(object): class NetworkListener(object):
def __init__(self, page: ChromiumBase): def __init__(self, page: ChromiumBase):
self._page: ChromiumBase = ... self._page: ChromiumBase = ...
self._total_count: int = ...
self._caught_count: int = ...
self._targets: Union[str, dict] = ... self._targets: Union[str, dict] = ...
self._results: list = ...
self._method: set = ... self._method: set = ...
self._tmp: Queue = ... self._caught: Queue = ...
self._is_regex: bool = ... self._is_regex: bool = ...
self._driver: ChromiumDriver = ... self._driver: ChromiumDriver = ...
self._request_ids: dict = ... self._request_ids: dict = ...
self.listening: bool = ... self.listening: bool = ...
self._timeout: float = ...
self._begin_time: float = ... @property
def targets(self) -> Optional[set]: ...
def set_targets(self, targets: Union[str, list, tuple, set, None] = None, is_regex: bool = False, def set_targets(self, targets: Union[str, list, tuple, set, None] = None, is_regex: bool = False,
count: int = None, method: Union[str, list, tuple, set] = None) -> None: ... method: Union[str, list, tuple, set] = None) -> None: ...
def stop(self) -> None: ... def stop(self) -> None: ...
def wait(self):... def pause(self, clear: bool = True) -> None: ...
def go_on(self) -> None: ...
def wait(self, count: int = 1, timeout: float = None, fix_count: bool = True): ...
@property @property
def results(self) -> Union[DataPacket, Dict[str, List[DataPacket]], False]: ... def results(self) -> Union[DataPacket, Dict[str, List[DataPacket]], False]: ...
def clear(self) -> None: ... def clear(self) -> None: ...
def listen(self, targets: Union[str, List[str], Tuple, bool, None] = ..., count: int = ..., def listen(self, targets: Union[str, List[str], Tuple, bool, None] = None, is_regex: bool = False,
timeout: float = ...) -> Union[DataPacket, Dict[str, List[DataPacket]], False]: ... method: Union[str, list, tuple, set] = None) \
-> Union[DataPacket, Dict[str, List[DataPacket]], False]: ...
def _listen(self, timeout: float = None,
any_one: bool = False) -> Union[DataPacket, Dict[str, List[DataPacket]], False]: ...
def _requestWillBeSent(self, **kwargs) -> None: ... def _requestWillBeSent(self, **kwargs) -> None: ...
@ -54,24 +54,17 @@ class NetworkListener(object):
def _loading_failed(self, **kwargs) -> None: ... def _loading_failed(self, **kwargs) -> None: ...
def _request_paused(self, **kwargs) -> None: ... def steps(self, count: int = None, timeout: float = None,
gap=1) -> Iterable[Union[DataPacket, List[DataPacket]]]: ...
def _wait_to_stop(self) -> None: ... def _set_callback(self) -> None: ...
def _is_continue(self) -> bool: ...
def steps(self, gap=1) -> Iterable[Union[DataPacket, List[DataPacket]]]: ...
def _set_callback_func(self) -> None: ...
def _stop(self) -> None: ...
class DataPacket(object): class DataPacket(object):
"""返回的数据包管理类""" """返回的数据包管理类"""
def __init__(self, tab: str, target: str, raw_info: dict): def __init__(self, tab_id: str, target: Optional[str], raw_info: dict):
self.tab: str = ... self.tab_id: str = ...
self.target: str = ... self.target: str = ...
self._raw_request: dict = ... self._raw_request: dict = ...
self._raw_response: dict = ... self._raw_response: dict = ...

View File

@ -126,6 +126,14 @@ class ChromiumBaseWaiter(object):
""" """
return self._change('title', text, exclude, timeout, raise_err) return self._change('title', text, exclude, timeout, raise_err)
def data_packets(self, count=1, timeout=None, fix_count: bool = True):
"""等待符合要求的数据包到达指定数量
:param count: 需要捕捉的数据包数量
:param timeout: 超时时间为None无限等待
:param fix_count: 是否必须满足总数要求发生超时为True返回False为False返回已捕捉到的数据包
:return: count为1时返回数据包对象大于1时返回列表超时且fix_count为True时返回False"""
return self._driver.listener.wait(count, timeout, fix_count)
def _change(self, arg, text, exclude=False, timeout=None, raise_err=None): def _change(self, arg, text, exclude=False, timeout=None, raise_err=None):
"""等待指定属性变成包含或不包含指定文本 """等待指定属性变成包含或不包含指定文本
:param arg: 要被匹配的属性 :param arg: 要被匹配的属性

View File

@ -3,9 +3,10 @@
@Author : g1879 @Author : g1879
@Contact : g1879@qq.com @Contact : g1879@qq.com
""" """
from typing import Union from typing import Union, List
from .download_manager import DownloadMission from .download_manager import DownloadMission
from .network_listener import DataPacket
from .._elements.chromium_element import ChromiumElement from .._elements.chromium_element import ChromiumElement
from .._pages.chromium_base import ChromiumBase from .._pages.chromium_base import ChromiumBase
from .._pages.chromium_frame import ChromiumFrame from .._pages.chromium_frame import ChromiumFrame
@ -46,6 +47,9 @@ class ChromiumBaseWaiter(object):
def title_change(self, text: str, exclude: bool = False, timeout: float = None, raise_err: bool = None) -> bool: ... def title_change(self, text: str, exclude: bool = False, timeout: float = None, raise_err: bool = None) -> bool: ...
def data_packets(self, count: int = 1, timeout: float = None,
fix_count: bool = True) -> Union[List[DataPacket], DataPacket, None]: ...
def _change(self, arg: str, text: str, exclude: bool = False, timeout: float = None, def _change(self, arg: str, text: str, exclude: bool = False, timeout: float = None,
raise_err: bool = None) -> bool: ... raise_err: bool = None) -> bool: ...