2.6.3Listener改用FlowViewer;修复设置ua的问题

This commit is contained in:
g1879 2022-05-20 18:03:59 +08:00
parent 247002cb22
commit 35637bc043
6 changed files with 7 additions and 272 deletions

View File

@ -685,7 +685,7 @@ class DriverOptions(Options):
:param user_agent: user agent文本
:return: 当前对象
"""
return self.set_argument('user-agent', user_agent)
return self.set_argument('--user-agent', user_agent)
def set_proxy(self, proxy: str) -> 'DriverOptions':
"""设置代理 \n

View File

@ -443,6 +443,8 @@ def _create_chrome(chrome_path: str, port: str, args: list, proxy: dict) -> tupl
if arg.startswith(('--user-data-dir', '--disk-cache-dir')):
index = arg.find('=') + 1
args1.append(f'{arg[:index]}"{arg[index:].strip()}"')
elif arg.startswith('--user-agent='):
args1.append(f'--user-agent="{arg[13:]}"')
else:
args1.append(arg)

View File

@ -1,267 +0,0 @@
# -*- coding:utf-8 -*-
"""
@Author : g1879
@Contact : g1879@qq.com
@File : listener.py
"""
from json import loads, JSONDecodeError
from threading import Thread
from queue import Queue
from time import perf_counter, sleep
from typing import Union, Tuple, List, Iterable
from pychrome import Tab, CallMethodException
from requests import get
from DrissionPage.mix_page import MixPage
class ResponseData(object):
"""返回的数据包管理类"""
def __init__(self, response: dict, body: str):
"""初始化 \n
:param response: response格式化的数据
:param body: response包含的内容
"""
self.response = response
self.raw_body = body
self._json_body = None
def __getattr__(self, item):
return self.response.get(item, None)
@property
def body(self):
"""返回body内容如果是json格式自动进行转换其它格式直接返回文本"""
if self._json_body is not False and self.response.get('mimeType', None) == 'application/json':
if self._json_body is None:
try:
self._json_body = loads(self.raw_body)
except JSONDecodeError:
self._json_body = False
return self.raw_body
return self._json_body
else:
return self.raw_body
class Listener(object):
"""浏览器的数据包监听器"""
def __init__(self,
browser: Union[str, int, MixPage, None] = None,
tab_handle: str = None):
"""初始化 \n
:param browser: 要监听的url端口或MixPage对象MixPage对象须设置了local_port参数
为None时自动从系统中寻找可监听的浏览器
:param tab_handle: 要监听的标签页的handle不输入读取当前活动标签页
"""
self.tab = None
self.set_tab(browser, tab_handle)
self.listening = False
self.targets = True
self.results = {}
self._response_count = None
self._requestIds = None
self._tmp_response = None # 捕捉到的所有数据格式[(target, ResponseData), ...]
def set_targets(self, targets: Union[str, List[str], Tuple[str], bool, None]) -> None:
"""设置要拦截的目标,可以设置多个 \n
:param targets: 字符串或字符串组成的列表
:return: None
"""
if isinstance(targets, str):
self.targets = [targets]
elif isinstance(targets, tuple):
self.targets = list(targets)
elif isinstance(targets, list) or targets is True:
self.targets = targets
else:
raise TypeError('targets参数只接收字符串、字符串组成的列表、True、None')
def set_tab(self,
browser: Union[str, int, MixPage, None] = None,
tab_handle: str = None) -> None:
"""设置要监听的标签页 \n
:param browser: 要监听的url端口或MixPage对象MixPage对象须设置了local_port参数
为None时自动从系统中寻找可监听的浏览器
:param tab_handle: 要监听的标签页的handle不输入读取当前活动标签页
:return: None
"""
if isinstance(browser, MixPage):
url = browser.drission.driver_options.debugger_address
if url is not None and tab_handle is None:
tab_handle = browser.current_tab_handle
browser = url
elif isinstance(browser, int):
browser = f'127.0.0.1:{browser}'
browser = browser or _find_chrome()
if browser is None:
raise RuntimeError('未找到可监听的浏览器。')
tab_handle = tab_handle or _get_tab_id(browser)
if tab_handle:
tab_id = tab_handle.split('-')[-1]
else:
raise RuntimeError('未能定位标签页。')
tab_data = {"id": tab_id, "type": "page",
"webSocketDebuggerUrl": f"ws://{browser}/devtools/page/{tab_id}"}
self.tab = Tab(**tab_data)
def listen(self, targets: Union[str, List[str], Tuple[str], bool, None] = None,
count: int = None,
timeout: float = None,
asyn: bool = True) -> None:
"""拦截目标请求,直到超时或达到拦截个数,每次拦截前清空结果 \n
可监听多个目标请求url包含这些字符串就会被记录 \n
:param targets: 要监听的目标字符串或其组成的列表True监听所有None则保留之前的目标不变
:param count: 要记录的个数到达个数停止监听
:param timeout: 监听最长时间到时间即使未达到记录个数也停止None为无限长
:param asyn: 是否异步执行
:return: None
"""
if targets:
self.set_targets(targets)
self.tab.start()
self.tab.Network.enable()
self.listening = True
self.results = {}
self._response_count = 0
self._requestIds = {}
self._tmp_response = Queue(maxsize=0)
self.tab.Network.responseReceived = self._response_received
self.tab.Network.loadingFinished = self._loading_finished
if asyn:
Thread(target=self._do_listen, args=(count, timeout)).start()
else:
self._do_listen(count, timeout)
def stop(self) -> None:
"""停止监听"""
self.listening = False
def wait(self) -> None:
"""等等监听结束"""
while self.listening:
sleep(.5)
def get_results(self, target: str = None) -> List[ResponseData]:
"""获取结果列表 \n
:param target: 要获取的目标为None时获取第一个
:return: 结果数据组成的列表
"""
return self.results.get(next(iter(self.results))) if target is None else self.results.get(target, None)
def steps(self, gap: int = 1) -> Iterable:
"""用于单步操作,可实现没收到若干个数据包执行一步操作(如翻页) \n
于是可以根据数据包是否加载完成来决定是否翻页无须从页面dom去判断是否加载完成 \n
大大简化代码提高可靠性 \n
eg: for i in listener.steps(2): \n
btn.click() \n
:param gap: 每接收到多少个数据包触发
:return: 用于在接收到监听目标时触发动作的可迭代对象
"""
while self.listening:
while self._tmp_response.qsize() >= gap:
yield [self._tmp_response.get(False) for _ in range(gap)]
sleep(.1)
def _do_listen(self,
count: int = None,
timeout: float = None) -> None:
"""执行监听 \n
:param count: 要记录的个数到达个数停止监听
:param timeout: 监听最长时间到时间即使未达到记录个数也停止None为无限长
:return: None
"""
t1 = perf_counter()
# 当收到停止信号、到达须获取结果数、到时间就停止
while self.listening \
and (count is None or self._response_count < count) \
and (timeout is None or perf_counter() - t1 < timeout):
sleep(.5)
self.tab.Network.responseReceived = self._null_function
self.tab.Network.loadingFinished = self._null_function
self.listening = False
def _loading_finished(self, **kwargs):
"""请求完成时处理方法"""
requestId = kwargs['requestId']
target = self._requestIds.pop(requestId, None)
if target is not None:
response = ResponseData(target['response'], self._get_response_body(requestId))
target = target['target']
self._response_count += 1
self._tmp_response.put((target, response))
if target in self.results:
self.results[target].append(response)
else:
self.results[target] = [response]
def _response_received(self, **kwargs) -> None:
"""接收到返回信息时处理方法"""
if self.targets is True:
self._requestIds[kwargs['requestId']] = {'target': True, 'response': kwargs['response']}
else:
for target in self.targets:
if target in kwargs['response']['url']:
self._requestIds[kwargs['requestId']] = {'target': target, 'response': kwargs['response']}
def _null_function(self, **kwargs) -> None:
"""空方法,用于清除绑定的方法"""
pass
def _get_response_body(self, requestId: str) -> Union[str, None]:
"""获取返回的内容 \n
:param requestId: 请求的id
:return: 返回内容的文本
"""
try:
return self.tab.call_method('Network.getResponseBody', requestId=requestId)['body']
except CallMethodException:
return ''
def _find_chrome() -> Union[str, None]:
"""在系统进程中查找开启调试的Chrome浏览器只能在Windows系统使用 \n
:return: ip:port
"""
from os import popen
from re import findall, DOTALL, search
txt = popen('tasklist /fi "imagename eq chrome.exe" /nh').read()
pids = findall(r' (\d+) [c,C]', txt, flags=DOTALL)
for pid in pids:
txt = popen(f'netstat -ano | findstr "{pid}"').read()
r = search(r'TCP {4}(\d+.\d+.\d+.\d+:\d+).*?LISTENING.*?\n', txt, flags=DOTALL)
if r:
return r.group(1)
def _get_tab_id(url: str) -> Union[str, None]:
"""获取浏览器活动标签页id \n
:param url: 浏览器ip:port
:return: 文本形式返回tab id
"""
try:
r = get(f'http://{url}/json', json=True, timeout=2).json()
for i in r:
if i['type'] == 'page':
return i['id']
except Exception:
return None

View File

@ -4,4 +4,4 @@
"""
from .session_element import make_session_ele
from .easy_set import get_match_driver
from .listener import Listener
from FlowViewer import Listener

View File

@ -4,4 +4,4 @@ tldextract
lxml
cssselect
DownloadKit
pychrome
FlowViewer

View File

@ -6,7 +6,7 @@ with open("README.md", "r", encoding='utf-8') as fh:
setup(
name="DrissionPage",
version="2.6.2",
version="2.6.3",
author="g1879",
author_email="g1879@qq.com",
description="A module that integrates selenium and requests session, encapsulates common page operations.",
@ -23,7 +23,7 @@ setup(
"tldextract",
"requests",
"DownloadKit",
"pychrome"
"FlowViewer"
],
classifiers=[
"Programming Language :: Python :: 3.6",