新建dev分支继续3.3;相对定位第一个参数支持数字

This commit is contained in:
g1879 2023-06-28 14:51:12 +08:00
parent 339510342a
commit 07d023daad
35 changed files with 1278 additions and 1288 deletions

View File

@ -11,11 +11,3 @@ from .web_page import WebPage
# 启动配置类
from .configs.chromium_options import ChromiumOptions
from .configs.session_options import SessionOptions
# 旧版页面类和启动配置类
try:
from .mixpage.mix_page import MixPage
from .mixpage.drission import Drission
from .configs.driver_options import DriverOptions
except ModuleNotFoundError:
pass

View File

@ -4,9 +4,12 @@
@Contact : g1879@qq.com
"""
from abc import abstractmethod
from pathlib import Path
from re import sub
from urllib.parse import quote
from DownloadKit import DownloadKit
from .commons.constants import Settings, NoneElement
from .commons.locator import get_loc
from .commons.web import format_html
@ -58,12 +61,6 @@ class BaseElement(BaseParser):
def parent(self, level_or_loc=1):
pass
def prev(self, index=1):
return None # ShadowRootElement直接继承
def prevs(self) -> None:
return None # ShadowRootElement直接继承
def next(self, index=1):
pass
@ -84,7 +81,7 @@ class BaseElement(BaseParser):
class DrissionElement(BaseElement):
"""DriverElement、ChromiumElement 和 SessionElement的基类
"""ChromiumElement 和 SessionElement的基类
但不是ShadowRootElement的基类"""
@property
@ -119,9 +116,10 @@ class DrissionElement(BaseElement):
return [format_html(x.strip(' ').rstrip('\n')) for x in texts if x and sub('[\r\n\t ]', '', x) != '']
def parent(self, level_or_loc=1):
def parent(self, level_or_loc=1, index=1):
"""返回上面某一级父元素,可指定层数或用查询语法定位
:param level_or_loc: 第几级父元素或定位符
:param index: 当level_or_loc传入定位符使用此参数选择第几个结果
:return: 上级元素对象
"""
if isinstance(level_or_loc, int):
@ -133,21 +131,24 @@ class DrissionElement(BaseElement):
if loc[0] == 'css selector':
raise ValueError('此css selector语法不受支持请换成xpath。')
loc = f'xpath:./ancestor::{loc[1].lstrip(". / ")}'
loc = f'xpath:./ancestor::{loc[1].lstrip(". / ")}[{index}]'
else:
raise TypeError('level_or_loc参数只能是tuple、int或str。')
return self._ele(loc, timeout=0, relative=True, raise_err=False)
def child(self, index=1, filter_loc='', timeout=None, ele_only=True):
def child(self, filter_loc='', index=1, timeout=None, ele_only=True):
"""返回直接子元素元素或节点组成的列表,可用查询语法筛选
:param index: 第几个查询结果1开始
:param filter_loc: 用于筛选的查询语法
:param index: 第几个查询结果1开始
:param timeout: 查找节点的超时时间
:param ele_only: 是否只获取元素为False时把文本注释节点也纳入
:return: 直接子元素或节点文本组成的列表
"""
if isinstance(filter_loc, int):
index = filter_loc
filter_loc = ''
nodes = self.children(filter_loc=filter_loc, timeout=timeout, ele_only=ele_only)
if not nodes:
if Settings.raise_ele_not_found:
@ -163,14 +164,17 @@ class DrissionElement(BaseElement):
else:
return NoneElement()
def prev(self, index=1, filter_loc='', timeout=0, ele_only=True):
def prev(self, filter_loc='', index=1, timeout=0, ele_only=True):
"""返回前面的一个兄弟元素,可用查询语法筛选,可指定返回筛选结果的第几个
:param index: 前面第几个查询结果1开始
:param filter_loc: 用于筛选的查询语法
:param index: 前面第几个查询结果1开始
:param timeout: 查找节点的超时时间
:param ele_only: 是否只获取元素为False时把文本注释节点也纳入
:return: 兄弟元素
"""
if isinstance(filter_loc, int):
index = filter_loc
filter_loc = ''
nodes = self._get_brothers(index, filter_loc, 'preceding', timeout=timeout, ele_only=ele_only)
if nodes:
return nodes[-1]
@ -179,14 +183,17 @@ class DrissionElement(BaseElement):
else:
return NoneElement()
def next(self, index=1, filter_loc='', timeout=0, ele_only=True):
def next(self, filter_loc='', index=1, timeout=0, ele_only=True):
"""返回后面的一个兄弟元素,可用查询语法筛选,可指定返回筛选结果的第几个
:param index: 后面第几个查询结果1开始
:param filter_loc: 用于筛选的查询语法
:param index: 后面第几个查询结果1开始
:param timeout: 查找节点的超时时间
:param ele_only: 是否只获取元素为False时把文本注释节点也纳入
:return: 兄弟元素
"""
if isinstance(filter_loc, int):
index = filter_loc
filter_loc = ''
nodes = self._get_brothers(index, filter_loc, 'following', timeout=timeout, ele_only=ele_only)
if nodes:
return nodes[0]
@ -195,14 +202,17 @@ class DrissionElement(BaseElement):
else:
return NoneElement()
def before(self, index=1, filter_loc='', timeout=None, ele_only=True):
def before(self, filter_loc='', index=1, timeout=None, ele_only=True):
"""返回前面的一个兄弟元素,可用查询语法筛选,可指定返回筛选结果的第几个
:param index: 前面第几个查询结果1开始
:param filter_loc: 用于筛选的查询语法
:param index: 前面第几个查询结果1开始
:param timeout: 查找节点的超时时间
:param ele_only: 是否只获取元素为False时把文本注释节点也纳入
:return: 本元素前面的某个元素或节点
"""
if isinstance(filter_loc, int):
index = filter_loc
filter_loc = ''
nodes = self._get_brothers(index, filter_loc, 'preceding', False, timeout=timeout, ele_only=ele_only)
if nodes:
return nodes[-1]
@ -211,14 +221,17 @@ class DrissionElement(BaseElement):
else:
return NoneElement()
def after(self, index=1, filter_loc='', timeout=None, ele_only=True):
def after(self, filter_loc='', index=1, timeout=None, ele_only=True):
"""返回后面的一个兄弟元素,可用查询语法筛选,可指定返回筛选结果的第几个
:param index: 后面第几个查询结果1开始
:param filter_loc: 用于筛选的查询语法
:param index: 后面第几个查询结果1开始
:param timeout: 查找节点的超时时间
:param ele_only: 是否只获取元素为False时把文本注释节点也纳入
:return: 本元素后面的某个元素或节点
"""
if isinstance(filter_loc, int):
index = filter_loc
filter_loc = ''
nodes = self._get_brothers(index, filter_loc, 'following', False, timeout, ele_only=ele_only)
if nodes:
return nodes[0]
@ -292,7 +305,7 @@ class DrissionElement(BaseElement):
:param direction: 'following' 'preceding'查找的方向
:param brother: 查找范围在同级查找还是整个dom前后查找
:param timeout: 查找等待时间
:return: DriverElement对象或字符串
:return: 元素对象或字符串
"""
if index is not None and index < 1:
raise ValueError('index必须大于等于1。')
@ -353,6 +366,8 @@ class BasePage(BaseParser):
self.retry_times = 3
self.retry_interval = 2
self._url_available = None
self._download_path = ''
self._DownloadKit = None
@property
def title(self):
@ -380,6 +395,18 @@ class BasePage(BaseParser):
"""返回当前访问的url有效性"""
return self._url_available
@property
def download_path(self):
"""返回默认下载路径"""
return str(Path(self._download_path).absolute())
@property
def download(self):
"""返回下载器对象"""
if self._DownloadKit is None:
self._DownloadKit = DownloadKit(session=self, goal_path=self.download_path)
return self._DownloadKit
def _before_connect(self, url, retry, interval):
"""连接前的准备
:param url: 要访问的url
@ -387,7 +414,7 @@ class BasePage(BaseParser):
:param interval: 重试间隔
:return: 重试次数和间隔组成的tuple
"""
self._url = quote(url, safe='/:&?=%;#@+!')
self._url = quote(url, safe='/:&?=%;#@+![]')
retry = retry if retry is not None else self.retry_times
interval = interval if interval is not None else self.retry_interval
return retry, interval

View File

@ -6,6 +6,8 @@
from abc import abstractmethod
from typing import Union, Tuple, List
from DownloadKit import DownloadKit
from .commons.constants import NoneElement
@ -78,30 +80,35 @@ class DrissionElement(BaseElement):
def texts(self, text_node_only: bool = False) -> list: ...
def parent(self, level_or_loc: Union[tuple, str, int] = 1) -> Union[DrissionElement, None]: ...
def parent(self, level_or_loc: Union[tuple, str, int] = 1, index: int = 1) -> Union[DrissionElement, None]: ...
def child(self, index: int = 1,
filter_loc: Union[tuple, str] = '',
def child(self,
filter_loc: Union[tuple, str, int] = '',
index: int = 1,
timeout: float = None,
ele_only: bool = True) -> Union[DrissionElement, str, NoneElement]: ...
def prev(self, index: int = 1,
filter_loc: Union[tuple, str] = '',
def prev(self,
filter_loc: Union[tuple, str, int] = '',
index: int = 1,
timeout: float = 0,
ele_only: bool = True) -> Union[DrissionElement, str, NoneElement]: ...
def next(self, index: int = 1,
filter_loc: Union[tuple, str] = '',
def next(self,
filter_loc: Union[tuple, str, int] = '',
index: int = 1,
timeout: float = 0,
ele_only: bool = True) -> Union[DrissionElement, str, NoneElement]: ...
def before(self, index: int = 1,
filter_loc: Union[tuple, str] = '',
def before(self,
filter_loc: Union[tuple, str, int] = '',
index: int = 1,
timeout: float = None,
ele_only: bool = True) -> Union[DrissionElement, str, NoneElement]: ...
def after(self, index: int = 1,
filter_loc: Union[tuple, str] = '',
def after(self,
filter_loc: Union[tuple, str, int] = '',
index: int = 1,
timeout: float = None,
ele_only: bool = True) -> Union[DrissionElement, str, NoneElement]: ...
@ -154,7 +161,9 @@ class BasePage(BaseParser):
self._url_available: bool = ...
self.retry_times: int = ...
self.retry_interval: float = ...
self._timeout = float = ...
self._timeout: float = ...
self._download_path: str = ...
self._DownloadKit: DownloadKit = ...
@property
def title(self) -> Union[str, None]: ...
@ -171,6 +180,12 @@ class BasePage(BaseParser):
@property
def url_available(self) -> bool: ...
@property
def download_path(self) -> str: ...
@property
def download(self) -> DownloadKit: ...
def _before_connect(self, url: str, retry: int, interval: float) -> tuple: ...
# ----------------以下属性或方法由后代实现----------------

View File

@ -7,7 +7,6 @@ from base64 import b64decode
from json import loads, JSONDecodeError
from os import sep
from pathlib import Path
from re import search
from threading import Thread
from time import perf_counter, sleep, time
@ -19,9 +18,10 @@ from .chromium_element import ChromiumScroll, ChromiumElement, run_js, make_chro
from .commons.constants import HANDLE_ALERT_METHOD, ERROR, NoneElement
from .commons.locator import get_loc
from .commons.tools import get_usable_path, clean_folder
from .commons.web import set_browser_cookies, ResponseData
from .errors import ContextLossError, ElementLossError, AlertExistsError, CallMethodError, TabClosedError, \
NoRectError, BrowserConnectError
from .commons.web import set_browser_cookies
from .errors import ContextLossError, ElementLossError, AlertExistsError, CDPError, TabClosedError, \
NoRectError, BrowserConnectError, GetDocumentError
from .network_listener import NetworkListener
from .session_element import make_session_ele
@ -41,6 +41,7 @@ class ChromiumBase(BasePage):
self._tab_obj = None
self._set = None
self._screencast = None
self._listener = None
if isinstance(address, int) or (isinstance(address, str) and address.isdigit()):
address = f'127.0.0.1:{address}'
@ -70,7 +71,9 @@ class ChromiumBase(BasePage):
"""
self._chromium_init()
if not tab_id:
json = self._control_session.get(f'http://{self.address}/json').json()
u = f'http://{self.address}/json'
json = self._control_session.get(u).json()
self._control_session.get(u, headers={'Connection': 'close'})
tab_id = [i['id'] for i in json if i['type'] == 'page']
if not tab_id:
raise BrowserConnectError('浏览器连接失败,可能是浏览器版本原因。')
@ -83,6 +86,7 @@ class ChromiumBase(BasePage):
"""浏览器初始设置"""
self._control_session = Session()
self._control_session.keep_alive = False
self._control_session.proxies = {'http': None, 'https': None}
self._first_run = True
self._is_reading = False
self._upload_list = None
@ -131,7 +135,8 @@ class ChromiumBase(BasePage):
self._debug_recorder.add_data((perf_counter(), '信息', f'root_id{self._root_id}'))
break
except Exception:
except CDPError as e:
err = e
if self._debug:
print('重试获取document')
if self._debug_recorder:
@ -140,7 +145,9 @@ class ChromiumBase(BasePage):
sleep(.1)
else:
raise RuntimeError('获取document失败。')
txt = f'请检查是否创建了过多页面对象同时操作浏览器。\n如无法解决,请把以下信息报告作者。\n{err._info}\n' \
f'报告网址https://gitee.com/g1879/DrissionPage/issues'
raise GetDocumentError(txt)
if self._debug:
print('获取document结束')
@ -325,6 +332,11 @@ class ChromiumBase(BasePage):
"""返回页面加载策略有3种'none''normal''eager'"""
return self._page_load_strategy
@property
def user_agent(self):
"""返回user agent"""
return self.run_cdp('Runtime.evaluate', expression='navigator.userAgent;')['result']['value']
@property
def scroll(self):
"""返回用于滚动滚动条的对象"""
@ -364,6 +376,13 @@ class ChromiumBase(BasePage):
self._screencast = Screencast(self)
return self._screencast
@property
def listener(self):
"""返回用于聆听数据包的对象"""
if self._listener is None:
self._listener = NetworkListener(self)
return self._listener
def run_cdp(self, cmd, **cmd_args):
"""执行Chrome DevTools Protocol语句
:param cmd: 协议项目
@ -391,7 +410,7 @@ class ChromiumBase(BasePage):
elif error in ('Node does not have a layout object', 'Could not compute box model.'):
raise NoRectError
elif r['type'] == 'call_method_error':
raise CallMethodError(f'\n错误:{r["error"]}\nmethod{r["method"]}\nargs{r["args"]}')
raise CDPError(f'\n错误:{r["error"]}\nmethod{r["method"]}\nargs{r["args"]}')
else:
raise RuntimeError(r)
@ -542,9 +561,12 @@ class ChromiumBase(BasePage):
if ok:
try:
if single:
return make_chromium_ele(self, node_id=nodeIds['nodeIds'][0])
r = make_chromium_ele(self, node_id=nodeIds['nodeIds'][0])
break
else:
return [make_chromium_ele(self, node_id=i) for i in nodeIds['nodeIds']]
r = [make_chromium_ele(self, node_id=i) for i in nodeIds['nodeIds']]
break
except ElementLossError:
ok = False
@ -560,6 +582,12 @@ class ChromiumBase(BasePage):
sleep(.1)
try:
self.run_cdp('DOM.discardSearchResults', searchId=search_result['searchId'])
except:
pass
return r
def refresh(self, ignore_cache=False):
"""刷新当前页面
:param ignore_cache: 是否忽略缓存
@ -784,7 +812,7 @@ class ChromiumBase(BasePage):
while self.ready_state not in ('complete', None):
sleep(.1)
if self._debug or show_errmsg:
print(f'重试 {to_url}')
print(f'重试{t + 1} {to_url}')
if err:
if show_errmsg:
@ -928,8 +956,18 @@ class ChromiumBaseSetter(object):
js = f'localStorage.removeItem("{item}");' if item is False else f'localStorage.setItem("{item}","{value}");'
return self._page.run_js_loaded(js, as_expr=True)
def cookie(self, cookie):
"""设置单个cookie
:param cookie: cookie信息
:return: None
"""
if isinstance(cookie, str):
self.cookies(cookie)
else:
self.cookies([cookie])
def cookies(self, cookies):
"""设置cookies值
"""设置多个cookie注意不要传入单个
:param cookies: cookies信息
:return: None
"""
@ -963,7 +1001,6 @@ class ChromiumBaseWaiter(object):
:param page_or_ele: 页面对象或元素对象
"""
self._driver = page_or_ele
self._listener = None
def ele_delete(self, loc_or_ele, timeout=None):
"""等待元素从DOM中删除
@ -971,10 +1008,8 @@ class ChromiumBaseWaiter(object):
:param timeout: 超时时间默认读取页面超时时间
:return: 是否等待成功
"""
if isinstance(loc_or_ele, (str, tuple)):
ele = self._driver._ele(loc_or_ele, timeout=.3, raise_err=False)
return ele.wait.delete(timeout) if ele else True
return loc_or_ele.wait.delete(timeout)
ele = self._driver._ele(loc_or_ele, raise_err=False, timeout=0)
return ele.wait.delete(timeout) if ele else True
def ele_display(self, loc_or_ele, timeout=None):
"""等待元素变成显示状态
@ -982,8 +1017,8 @@ class ChromiumBaseWaiter(object):
:param timeout: 超时时间默认读取页面超时时间
:return: 是否等待成功
"""
ele = self._driver._ele(loc_or_ele, raise_err=False)
return ele.wait.display(timeout) if ele else False
ele = self._driver._ele(loc_or_ele, raise_err=False, timeout=0)
return ele.wait.display(timeout)
def ele_hidden(self, loc_or_ele, timeout=None):
"""等待元素变成隐藏状态
@ -991,9 +1026,18 @@ class ChromiumBaseWaiter(object):
:param timeout: 超时时间默认读取页面超时时间
:return: 是否等待成功
"""
ele = self._driver._ele(loc_or_ele, raise_err=False)
ele = self._driver._ele(loc_or_ele, raise_err=False, timeout=0)
return ele.wait.hidden(timeout)
def ele_load(self, loc, timeout=None):
"""等待元素加载到DOM
:param loc: 要等待的元素输入定位符
:param timeout: 超时时间默认读取页面超时时间
:return: 成功返回元素对象失败返回False
"""
ele = self._driver._ele(loc, raise_err=False, timeout=timeout)
return ele if ele else False
def load_start(self, timeout=None):
"""等待页面开始加载
:param timeout: 超时时间为None时使用页面timeout属性
@ -1021,7 +1065,8 @@ class ChromiumBaseWaiter(object):
:return: 是否等待成功
"""
if timeout != 0:
timeout = self._driver.timeout if timeout in (None, True) else timeout
if timeout is None or timeout is True:
timeout = self._driver.timeout
end_time = perf_counter() + timeout
while perf_counter() < end_time:
if self._driver.is_loading == start:
@ -1029,132 +1074,6 @@ class ChromiumBaseWaiter(object):
sleep(gap)
return False
def set_targets(self, targets, is_regex=False):
"""指定要等待的数据包
:param targets: 要匹配的数据包url特征可用list等传入多个
:param is_regex: 设置的target是否正则表达式
:return: None
"""
if not self._listener:
self._listener = NetworkListener(self._driver)
self._listener.set_targets(targets, is_regex)
def data_packets(self, timeout=None, any_one=False):
"""等待指定数据包加载完成
:param timeout: 超时时间为None则使用页面对象timeout
:param any_one: 多个target时是否全部监听到才结束为True时监听到一个目标就结束
:return: ResponseData对象或监听结果字典
"""
if not self._listener:
self._listener = NetworkListener(self._driver)
return self._listener.listen(timeout, any_one)
def stop_listening(self):
"""停止监听数据包"""
if not self._listener:
self._listener = NetworkListener(self._driver)
self._listener.stop()
class NetworkListener(object):
def __init__(self, page):
self._page = page
self._targets = None
self._is_regex = False
self._results = {}
self._single = False
self._requests = {}
def set_targets(self, targets, is_regex=False):
"""指定要等待的数据包
:param targets: 要匹配的数据包url特征可用list等传入多个
:param is_regex: 设置的target是否正则表达式
:return: None
"""
if not isinstance(targets, (str, list, tuple, set)):
raise TypeError('targets只能是str、list、tuple、set。')
self._is_regex = is_regex
if isinstance(targets, str):
self._targets = {targets}
self._single = True
else:
self._targets = set(targets)
self._single = False
self._page.run_cdp('Network.enable')
if targets is not None:
self._page.driver.Network.requestWillBeSent = self._requestWillBeSent
self._page.driver.Network.responseReceived = self._response_received
self._page.driver.Network.loadingFinished = self._loading_finished
else:
self.stop()
def stop(self):
"""停止监听数据包"""
self._page.run_cdp('Network.disable')
self._page.driver.Network.requestWillBeSent = None
self._page.driver.Network.responseReceived = None
self._page.driver.Network.loadingFinished = None
def listen(self, timeout=None, any_one=False):
"""等待指定数据包加载完成
:param timeout: 超时时间为None则使用页面对象timeout
:param any_one: 多个target时是否全部监听到才结束为True时监听到一个目标就结束
:return: ResponseData对象或监听结果字典
"""
if self._targets is None:
raise RuntimeError('必须先用set_targets()设置等待目标。')
timeout = timeout if timeout is not None else self._page.timeout
end_time = perf_counter() + timeout
while perf_counter() < end_time:
if self._results and (any_one or set(self._results) == self._targets):
break
sleep(.1)
self._requests = {}
if not self._results:
return False
r = list(self._results.values())[0] if self._single else self._results
self._results = {}
return r
def _response_received(self, **kwargs):
"""接收到返回信息时处理方法"""
if kwargs['requestId'] in self._requests:
self._requests[kwargs['requestId']]['response'] = kwargs['response']
def _loading_finished(self, **kwargs):
"""请求完成时处理方法"""
request_id = kwargs['requestId']
if request_id in self._requests:
try:
r = self._page.run_cdp('Network.getResponseBody', requestId=request_id)
body = r['body']
is_base64 = r['base64Encoded']
except CallMethodError:
body = ''
is_base64 = False
request = self._requests[request_id]
target = request['target']
rd = ResponseData(request_id, request['response'], body, self._page.tab_id, target)
rd.method = request['method']
rd.postData = request['post_data']
rd._base64_body = is_base64
rd.requestHeaders = request['request_headers']
self._results[target] = rd
def _requestWillBeSent(self, **kwargs):
"""接收到请求时的回调函数"""
for target in self._targets:
if (self._is_regex and search(target, kwargs['request']['url'])) or (
not self._is_regex and target in kwargs['request']['url']):
self._requests[kwargs['requestId']] = {'target': target,
'method': kwargs['request']['method'],
'post_data': kwargs['request'].get('postData', None),
'request_headers': kwargs['request']['headers']}
break
class ChromiumPageScroll(ChromiumScroll):
def __init__(self, page):
@ -1165,10 +1084,10 @@ class ChromiumPageScroll(ChromiumScroll):
self.t1 = 'window'
self.t2 = 'document.documentElement'
def to_see(self, loc_or_ele, center=False):
def to_see(self, loc_or_ele, center=None):
"""滚动页面直到元素可见
:param loc_or_ele: 元素的定位信息可以是loc元组或查询字符串
:param center: 是否尽量滚动到页面正中
:param center: 是否尽量滚动到页面正中为None时如果被遮挡则滚动到页面正中
:return: None
"""
ele = self._driver._ele(loc_or_ele)
@ -1177,17 +1096,22 @@ class ChromiumPageScroll(ChromiumScroll):
def _to_see(self, ele, center):
"""执行滚动页面直到元素可见
:param ele: 元素对象
:param center: 是否尽量滚动到页面正中
:param center: 是否尽量滚动到页面正中为None时如果被遮挡则滚动到页面正中
:return: None
"""
if center:
ele.run_js('this.scrollIntoViewIfNeeded();')
self._wait_scrolled()
return
ele.run_js('this.scrollIntoViewIfNeeded(false);')
if ele.states.is_covered:
ele.run_js('this.scrollIntoViewIfNeeded();')
txt = 'true' if center else 'false'
ele.run_js(f'this.scrollIntoViewIfNeeded({txt});')
if center or (center is not False and ele.states.is_covered):
ele.run_js('''function getWindowScrollTop() {var scroll_top = 0;
if (document.documentElement && document.documentElement.scrollTop) {
scroll_top = document.documentElement.scrollTop;
} else if (document.body) {scroll_top = document.body.scrollTop;}
return scroll_top;}
const { top, height } = this.getBoundingClientRect();
const elCenter = top + height / 2;
const center = window.innerHeight / 2;
window.scrollTo({top: getWindowScrollTop() - (center - elCenter),
behavior: 'instant'});''')
self._wait_scrolled()

View File

@ -4,7 +4,7 @@
@Contact : g1879@qq.com
"""
from pathlib import Path
from typing import Union, Tuple, List, Any, Dict
from typing import Union, Tuple, List, Any
from DataRecorder import Recorder
from requests import Session
@ -15,12 +15,11 @@ from .chromium_driver import ChromiumDriver
from .chromium_element import ChromiumElement, ChromiumScroll
from .chromium_frame import ChromiumFrame
from .commons.constants import NoneElement
from .commons.web import ResponseData
from .network_listener import NetworkListener
from .session_element import SessionElement
class ChromiumBase(BasePage):
def __init__(self,
address: Union[str, int],
tab_id: str = None,
@ -42,6 +41,7 @@ class ChromiumBase(BasePage):
self._wait: ChromiumBaseWaiter = ...
self._set: ChromiumBaseSetter = ...
self._screencast: Screencast = ...
self._listener: NetworkListener = ...
def _connect_browser(self, tab_id: str = None) -> None: ...
@ -111,6 +111,9 @@ class ChromiumBase(BasePage):
@property
def page_load_strategy(self) -> str: ...
@property
def user_agent(self) -> str: ...
@property
def scroll(self) -> ChromiumPageScroll: ...
@ -129,37 +132,33 @@ class ChromiumBase(BasePage):
@property
def screencast(self) -> Screencast: ...
@property
def listener(self) -> NetworkListener: ...
def run_js(self, script: str, *args: Any, as_expr: bool = False) -> Any: ...
def run_js_loaded(self, script: str, *args: Any, as_expr: bool = False) -> Any: ...
def run_async_js(self, script: str, *args: Any, as_expr: bool = False) -> None: ...
def get(self,
url: str,
show_errmsg: bool = False,
retry: int = None,
interval: float = None,
timeout: float = None) -> Union[None, bool]: ...
def get(self, url: str, show_errmsg: bool = False, retry: int = None,
interval: float = None, timeout: float = None) -> Union[None, bool]: ...
def get_cookies(self, as_dict: bool = False, all_domains: bool = False, all_info: bool = False) -> Union[
list, dict]: ...
def get_cookies(self, as_dict: bool = False, all_domains: bool = False,
all_info: bool = False) -> Union[list, dict]: ...
def ele(self,
loc_or_ele: Union[Tuple[str, str], str, ChromiumElement, ChromiumFrame],
timeout: float = None) -> ChromiumElement: ...
def ele(self, loc_or_ele: Union[Tuple[str, str], str, ChromiumElement, ChromiumFrame],
timeout: float = None) -> Union[ChromiumElement, str]: ...
def eles(self,
loc_or_str: Union[Tuple[str, str], str],
timeout: float = None) -> List[ChromiumElement]: ...
def eles(self, loc_or_str: Union[Tuple[str, str], str],
timeout: float = None) -> List[Union[ChromiumElement, str]]: ...
def s_ele(self, loc_or_ele: Union[Tuple[str, str], str] = None) \
-> Union[SessionElement, str, NoneElement]: ...
def s_eles(self, loc_or_str: Union[Tuple[str, str], str]) -> List[Union[SessionElement, str]]: ...
def _find_elements(self,
loc_or_ele: Union[Tuple[str, str], str, ChromiumElement, ChromiumFrame],
def _find_elements(self, loc_or_ele: Union[Tuple[str, str], str, ChromiumElement, ChromiumFrame],
timeout: float = None, single: bool = True, relative: bool = False, raise_err: bool = None) \
-> Union[ChromiumElement, ChromiumFrame, NoneElement, List[Union[ChromiumElement, ChromiumFrame]]]: ...
@ -217,7 +216,6 @@ class ChromiumBase(BasePage):
class ChromiumBaseWaiter(object):
def __init__(self, page: ChromiumBase):
self._driver: ChromiumBase = ...
self._listener: NetworkListener = ...
def ele_delete(self, loc_or_ele: Union[str, tuple, ChromiumElement], timeout: float = None) -> bool: ...
@ -225,51 +223,23 @@ class ChromiumBaseWaiter(object):
def ele_hidden(self, loc_or_ele: Union[str, tuple, ChromiumElement], timeout: float = None) -> bool: ...
def ele_load(self, loc: Union[str, tuple], timeout: float = None) -> Union[bool, ChromiumElement]: ...
def _loading(self, timeout: float = None, start: bool = True, gap: float = .01) -> bool: ...
def load_start(self, timeout: float = None) -> bool: ...
def load_complete(self, timeout: float = None) -> bool: ...
def set_targets(self, targets: Union[str, list, tuple, set], is_regex: bool = False) -> None: ...
def stop_listening(self) -> None: ...
def data_packets(self, timeout: float = None,
any_one: bool = False) -> Union[ResponseData, Dict[str, ResponseData], False]: ...
def upload_paths_inputted(self) -> None: ...
class NetworkListener(object):
def __init__(self, page):
self._page: ChromiumBase = ...
self._targets: Union[str, dict] = ...
self._single: bool = ...
self._results: Union[ResponseData, Dict[str, ResponseData], False] = ...
self._is_regex: bool = ...
self._requests: dict = ...
def set_targets(self, targets: Union[str, list, tuple, set], is_regex: bool = False) -> None: ...
def stop(self) -> None: ...
def listen(self, timeout: float = None,
any_one: bool = False) -> Union[ResponseData, Dict[str, ResponseData], False]: ...
def _response_received(self, **kwargs) -> None: ...
def _loading_finished(self, **kwargs) -> None: ...
def _requestWillBeSent(self, **kwargs) -> None: ...
class ChromiumPageScroll(ChromiumScroll):
def __init__(self, page: ChromiumBase): ...
def to_see(self, loc_or_ele: Union[str, tuple, ChromiumElement], center: bool = False) -> None: ...
def to_see(self, loc_or_ele: Union[str, tuple, ChromiumElement], center: Union[bool, None] = None) -> None: ...
def _to_see(self, ele: ChromiumElement, center: bool) -> None: ...
def _to_see(self, ele: ChromiumElement, center: Union[bool, None]) -> None: ...
class ChromiumBaseSetter(object):
@ -294,6 +264,8 @@ class ChromiumBaseSetter(object):
def local_storage(self, item: str, value: Union[str, bool]) -> None: ...
def cookie(self, cookies: Union[RequestsCookieJar, str, dict]) -> None: ...
def cookies(self, cookies: Union[RequestsCookieJar, list, tuple, str, dict]) -> None: ...
def headers(self, headers: dict) -> None: ...
@ -366,4 +338,4 @@ class ScreencastMode(object):
def frugal_imgs_mode(self) -> None: ...
def imgs_mode(self) -> None: ...
def imgs_mode(self) -> None: ...

View File

@ -11,7 +11,7 @@ from threading import Thread, Event
from websocket import WebSocketTimeoutException, WebSocketException, WebSocketConnectionClosedException, \
create_connection
from .errors import CallMethodError
from .errors import CDPError
class GenericAttr(object):
@ -79,7 +79,13 @@ class ChromiumDriver(object):
message_json = dumps(message)
if self.debug:
print(f"发> {message_json}")
if self.debug is True or (isinstance(self.debug, str) and message.get('method', '').startswith(self.debug)):
print(f'发> {message_json}')
elif isinstance(self.debug, (list, tuple, set)):
for m in self.debug:
if message.get('method', '').startswith(m):
print(f'发> {message_json}')
break
if not isinstance(timeout, (int, float)) or timeout > 1:
q_timeout = 1
@ -117,7 +123,7 @@ class ChromiumDriver(object):
try:
self._ws.settimeout(1)
message_json = self._ws.recv()
message = loads(message_json)
mes = loads(message_json)
except WebSocketTimeoutException:
continue
except (WebSocketException, OSError, WebSocketConnectionClosedException):
@ -125,17 +131,24 @@ class ChromiumDriver(object):
return
if self.debug:
print(f'<收 {message_json}')
if self.debug is True or 'id' in mes or (isinstance(self.debug, str)
and mes.get('method', '').startswith(self.debug)):
print(f'<收 {message_json}')
elif isinstance(self.debug, (list, tuple, set)):
for m in self.debug:
if mes.get('method', '').startswith(m):
print(f'<收 {message_json}')
break
if "method" in message:
self.event_queue.put(message)
if "method" in mes:
self.event_queue.put(mes)
elif "id" in message:
if message["id"] in self.method_results:
self.method_results[message['id']].put(message)
elif "id" in mes:
if mes["id"] in self.method_results:
self.method_results[mes['id']].put(mes)
elif self.debug:
print(f'未知信息:{message}')
print(f'未知信息:{mes}')
def _handle_event_loop(self):
"""当接收到浏览器信息,执行已绑定的方法"""
@ -170,7 +183,7 @@ class ChromiumDriver(object):
self.start()
# raise RuntimeError("不能在启动前调用方法。")
if args:
raise CallMethodError("参数必须是key=value形式。")
raise CDPError("参数必须是key=value形式。")
if self._stopped.is_set():
return {'error': 'tab closed', 'type': 'tab_closed'}

View File

@ -14,7 +14,7 @@ from .commons.keys import keys_to_typing, keyDescriptionForString, keyDefinition
from .commons.locator import get_loc
from .commons.web import make_absolute_link, get_ele_txt, format_html, is_js_func, location_in_viewport, offset_scroll
from .errors import ContextLossError, ElementLossError, JavaScriptError, NoRectError, ElementNotFoundError, \
CallMethodError, NoResourceError, CanNotClickError
CDPError, NoResourceError, CanNotClickError
from .session_element import make_session_ele
@ -99,7 +99,7 @@ class ChromiumElement(DrissionElement):
try:
attrs = self.page.run_cdp('DOM.getAttributes', nodeId=self._node_id)['attributes']
return {attrs[i]: attrs[i + 1] for i in range(0, len(attrs), 2)}
except CallMethodError: # 文档根元素不能调用此方法
except CDPError: # 文档根元素不能调用此方法
return {}
@property
@ -203,12 +203,13 @@ class ChromiumElement(DrissionElement):
return self._select
def parent(self, level_or_loc=1):
def parent(self, level_or_loc=1, index=1):
"""返回上面某一级父元素,可指定层数或用查询语法定位
:param level_or_loc: 第几级父元素或定位符
:param index: 当level_or_loc传入定位符使用此参数选择第几个结果
:return: 上级元素对象
"""
return super().parent(level_or_loc)
return super().parent(level_or_loc, index)
def child(self, filter_loc='', index=1, timeout=0, ele_only=True):
"""返回当前元素的一个符合条件的直接子元素,可用查询语法筛选,可指定返回筛选结果的第几个
@ -218,7 +219,7 @@ class ChromiumElement(DrissionElement):
:param ele_only: 是否只获取元素为False时把文本注释节点也纳入
:return: 直接子元素或节点文本
"""
return super().child(index, filter_loc, timeout, ele_only=ele_only)
return super().child(filter_loc, index, timeout, ele_only=ele_only)
def prev(self, filter_loc='', index=1, timeout=0, ele_only=True):
"""返回当前元素前面一个符合条件的同级元素,可用查询语法筛选,可指定返回筛选结果的第几个
@ -228,7 +229,7 @@ class ChromiumElement(DrissionElement):
:param ele_only: 是否只获取元素为False时把文本注释节点也纳入
:return: 兄弟元素或节点文本
"""
return super().prev(index, filter_loc, timeout, ele_only=ele_only)
return super().prev(filter_loc, index, timeout, ele_only=ele_only)
def next(self, filter_loc='', index=1, timeout=0, ele_only=True):
"""返回当前元素后面一个符合条件的同级元素,可用查询语法筛选,可指定返回筛选结果的第几个
@ -238,7 +239,7 @@ class ChromiumElement(DrissionElement):
:param ele_only: 是否只获取元素为False时把文本注释节点也纳入
:return: 兄弟元素或节点文本
"""
return super().next(index, filter_loc, timeout, ele_only=ele_only)
return super().next(filter_loc, index, timeout, ele_only=ele_only)
def before(self, filter_loc='', index=1, timeout=None, ele_only=True):
"""返回文档中当前元素前面符合条件的第一个元素,可用查询语法筛选,可指定返回筛选结果的第几个
@ -249,7 +250,7 @@ class ChromiumElement(DrissionElement):
:param ele_only: 是否只获取元素为False时把文本注释节点也纳入
:return: 本元素前面的某个元素或节点
"""
return super().before(index, filter_loc, timeout, ele_only=ele_only)
return super().before(filter_loc, index, timeout, ele_only=ele_only)
def after(self, filter_loc='', index=1, timeout=None, ele_only=True):
"""返回文档中此当前元素后面符合条件的第一个元素,可用查询语法筛选,可指定返回筛选结果的第几个
@ -260,7 +261,7 @@ class ChromiumElement(DrissionElement):
:param ele_only: 是否只获取元素为False时把文本注释节点也纳入
:return: 本元素后面的某个元素或节点
"""
return super().after(index, filter_loc, timeout, ele_only=ele_only)
return super().after(filter_loc, index, timeout, ele_only=ele_only)
def children(self, filter_loc='', timeout=0, ele_only=True):
"""返回当前元素符合条件的直接子元素或节点组成的列表,可用查询语法筛选
@ -464,7 +465,7 @@ class ChromiumElement(DrissionElement):
try:
result = self.page.run_cdp('Page.getResourceContent', frameId=frame, url=src)
break
except CallMethodError:
except CDPError:
sleep(.1)
if not result:
@ -522,15 +523,24 @@ class ChromiumElement(DrissionElement):
return self.page._get_screenshot(path, as_bytes=as_bytes, as_base64=as_base64, full_page=False,
left_top=left_top, right_bottom=right_bottom, ele=self)
def input(self, vals, clear=True):
def input(self, vals, clear=True, by_js=False):
"""输入文本或组合键也可用于输入文件路径到input元素路径间用\n间隔)
:param vals: 文本值或按键组合
:param clear: 输入前是否清空文本框
:param by_js: 是否用js方式输入不能输入组合键
:return: None
"""
if self.tag == 'input' and self.attr('type') == 'file':
return self._set_file_input(vals)
if by_js:
if clear:
self.clear(True)
if isinstance(vals, (list, tuple)):
vals = ''.join([str(i) for i in vals])
self.set.prop('value', str(vals))
return
if clear and vals not in ('\n', '\ue007'):
self.clear(by_js=False)
else:
@ -749,7 +759,7 @@ class ChromiumShadowRoot(BaseElement):
ele2 = ele1('@id=ele_id')
:param loc_or_str: 元素的定位信息可以是loc元组或查询字符串
:param timeout: 超时时间
:return: DriverElement对象或属性文本
:return: 元素对象或属性文本
"""
return self.ele(loc_or_str, timeout)
@ -799,9 +809,10 @@ class ChromiumShadowRoot(BaseElement):
from threading import Thread
Thread(target=run_js, args=(self, script, as_expr, self.page.timeouts.script, args)).start()
def parent(self, level_or_loc=1):
def parent(self, level_or_loc=1, index=1):
"""返回上面某一级父元素,可指定层数或用查询语法定位
:param level_or_loc: 第几级父元素或定位符
:param index: 当level_or_loc传入定位符使用此参数选择第几个结果
:return: ChromiumElement对象
"""
if isinstance(level_or_loc, int):
@ -813,7 +824,7 @@ class ChromiumShadowRoot(BaseElement):
if loc[0] == 'css selector':
raise ValueError('此css selector语法不受支持请换成xpath。')
loc = f'xpath:./ancestor-or-self::{loc[1].lstrip(". / ")}'
loc = f'xpath:./ancestor-or-self::{loc[1].lstrip(". / ")}[{index}]'
else:
raise TypeError('level_or_loc参数只能是tuple、int或str。')
@ -1424,7 +1435,7 @@ class ChromiumElementStates(object):
lx, ly = self._ele.locations.click_point
try:
r = self._ele.page.run_cdp('DOM.getNodeForLocation', x=lx, y=ly)
except CallMethodError:
except CDPError:
return False
if r.get('backendNodeId') != self._ele.ids.backend_id:
@ -1771,9 +1782,9 @@ class ChromiumScroll(object):
class ChromiumElementScroll(ChromiumScroll):
def to_see(self, center=False):
def to_see(self, center=None):
"""滚动页面直到元素可见
:param center: 是否尽量滚动到页面正中
:param center: 是否尽量滚动到页面正中为None时如果被遮挡则滚动到页面正中
:return: None
"""
self._driver.page.scroll.to_see(self._driver, center=center)

View File

@ -94,29 +94,29 @@ class ChromiumElement(DrissionElement):
@property
def click(self) -> Click: ...
def parent(self, level_or_loc: Union[tuple, str, int] = 1) -> Union[ChromiumElement, None]: ...
def parent(self, level_or_loc: Union[tuple, str, int] = 1, index: int = 1) -> Union[ChromiumElement, None]: ...
def child(self, filter_loc: Union[tuple, str] = '',
def child(self, filter_loc: Union[tuple, str, int] = '',
index: int = 1,
timeout: float = 0,
ele_only: bool = True) -> Union[ChromiumElement, str, None]: ...
def prev(self, filter_loc: Union[tuple, str] = '',
def prev(self, filter_loc: Union[tuple, str, int] = '',
index: int = 1,
timeout: float = 0,
ele_only: bool = True) -> Union[ChromiumElement, str, None]: ...
def next(self, filter_loc: Union[tuple, str] = '',
def next(self, filter_loc: Union[tuple, str, int] = '',
index: int = 1,
timeout: float = 0,
ele_only: bool = True) -> Union[ChromiumElement, str, None]: ...
def before(self, filter_loc: Union[tuple, str] = '',
def before(self, filter_loc: Union[tuple, str, int] = '',
index: int = 1,
timeout: float = None,
ele_only: bool = True) -> Union[ChromiumElement, str, None]: ...
def after(self, filter_loc: Union[tuple, str] = '',
def after(self, filter_loc: Union[tuple, str, int] = '',
index: int = 1,
timeout: float = None,
ele_only: bool = True) -> Union[ChromiumElement, str, None]: ...
@ -183,7 +183,7 @@ class ChromiumElement(DrissionElement):
def get_screenshot(self, path: [str, Path] = None, as_bytes: [bool, str] = None,
as_base64: [bool, str] = None) -> Union[str, bytes]: ...
def input(self, vals: Any, clear: bool = True) -> None: ...
def input(self, vals: Any, clear: bool = True, by_js: bool = False) -> None: ...
def _set_file_input(self, files: Union[str, list, tuple]) -> None: ...
@ -273,7 +273,7 @@ class ChromiumShadowRoot(BaseElement):
def run_async_js(self, script: str, *args: Any, as_expr: bool = False) -> None: ...
def parent(self, level_or_loc: Union[str, int] = 1) -> ChromiumElement: ...
def parent(self, level_or_loc: Union[str, int] = 1, index: int = 1) -> ChromiumElement: ...
def child(self, filter_loc: Union[tuple, str] = '',
index: int = 1) -> Union[ChromiumElement, str, None]: ...
@ -496,7 +496,7 @@ class ChromiumScroll(object):
class ChromiumElementScroll(ChromiumScroll):
def to_see(self, center: bool = False) -> None: ...
def to_see(self, center: Union[bool, None] = None) -> None: ...
class ChromiumSelect(object):

View File

@ -69,7 +69,9 @@ class ChromiumFrame(ChromiumBase):
try:
super()._driver_init(tab_id)
except:
self._control_session.get(f'http://{self.address}/json')
u = f'http://{self.address}/json'
self._control_session.get(u)
self._control_session.get(u, headers={'Connection': 'close'})
super()._driver_init(tab_id)
def _reload(self):
@ -359,13 +361,14 @@ class ChromiumFrame(ChromiumBase):
else:
return self.doc_ele.run_js(script, *args, as_expr=as_expr)
def parent(self, level_or_loc=1):
def parent(self, level_or_loc=1, index=1):
"""返回上面某一级父元素,可指定层数或用查询语法定位
:param level_or_loc: 第几级父元素或定位符
:param index: 当level_or_loc传入定位符使用此参数选择第几个结果
:return: 上级元素对象
"""
self._check_ok()
return self.frame_ele.parent(level_or_loc)
return self.frame_ele.parent(level_or_loc, index)
def prev(self, filter_loc='', index=1, timeout=0, ele_only=True):
"""返回当前元素前面一个符合条件的同级元素,可用查询语法筛选,可指定返回筛选结果的第几个
@ -638,10 +641,10 @@ class ChromiumFrameScroll(ChromiumPageScroll):
self.t1 = self.t2 = 'this.documentElement'
self._wait_complete = False
def to_see(self, loc_or_ele, center=False):
def to_see(self, loc_or_ele, center=None):
"""滚动页面直到元素可见
:param loc_or_ele: 元素的定位信息可以是loc元组或查询字符串
:param center: 是否尽量滚动到页面正中
:param center: 是否尽量滚动到页面正中为None时如果被遮挡则滚动到页面正中
:return: None
"""
ele = loc_or_ele if isinstance(loc_or_ele, ChromiumElement) else self._driver._ele(loc_or_ele)

View File

@ -120,24 +120,24 @@ class ChromiumFrame(ChromiumBase):
def run_js(self, script: str, *args: Any, as_expr: bool = False) -> Any: ...
def parent(self, level_or_loc: Union[tuple, str, int] = 1) -> Union[ChromiumElement, None]: ...
def parent(self, level_or_loc: Union[tuple, str, int] = 1, index: int = 1) -> Union[ChromiumElement, None]: ...
def prev(self, filter_loc: Union[tuple, str] = '',
def prev(self, filter_loc: Union[tuple, str, int] = '',
index: int = 1,
timeout: float = 0,
ele_only: bool = True) -> Union[ChromiumElement, str]: ...
def next(self, filter_loc: Union[tuple, str] = '',
def next(self, filter_loc: Union[tuple, str, int] = '',
index: int = 1,
timeout: float = 0,
ele_only: bool = True) -> Union[ChromiumElement, str]: ...
def before(self, filter_loc: Union[tuple, str] = '',
def before(self, filter_loc: Union[tuple, str, int] = '',
index: int = 1,
timeout: float = None,
ele_only: bool = True) -> Union[ChromiumElement, str]: ...
def after(self, filter_loc: Union[tuple, str] = '',
def after(self, filter_loc: Union[tuple, str, int] = '',
index: int = 1,
timeout: float = None,
ele_only: bool = True) -> Union[ChromiumElement, str]: ...
@ -203,7 +203,7 @@ class ChromiumFrameIds(object):
class ChromiumFrameScroll(ChromiumPageScroll):
def __init__(self, frame: ChromiumFrame) -> None: ...
def to_see(self, loc_or_ele: Union[str, tuple, ChromiumElement], center: bool = False) -> None: ...
def to_see(self, loc_or_ele: Union[str, tuple, ChromiumElement], center: Union[None, bool] = None) -> None: ...
class ChromiumFrameSetter(ChromiumBaseSetter):

View File

@ -3,23 +3,16 @@
@Author : g1879
@Contact : g1879@qq.com
"""
from pathlib import Path
from platform import system
from threading import Thread
from time import perf_counter, sleep
from warnings import warn
from requests import Session
from .chromium_base import ChromiumBase, Timeout, ChromiumBaseSetter, ChromiumBaseWaiter
from .chromium_driver import ChromiumDriver
from .chromium_tab import ChromiumTab
from .commons.browser import connect_browser
from .commons.tools import port_is_using
from .commons.web import set_session_cookies
from .configs.chromium_options import ChromiumOptions
from .errors import CallMethodError, BrowserConnectError
from .session_page import DownloadSetter
from .errors import BrowserConnectError
class ChromiumPage(ChromiumBase):
@ -31,17 +24,15 @@ class ChromiumPage(ChromiumBase):
:param tab_id: 要控制的标签页id不指定默认为激活的
:param timeout: 超时时间
"""
self._download_set = None
self._download_path = None
super().__init__(addr_driver_opts, tab_id, timeout)
def _set_start_options(self, addr_driver_opts, none):
"""设置浏览器启动属性
:param addr_driver_opts: 'ip:port'ChromiumDriverChromiumOptions
:param addr_driver_opts: 'ip:port'ChromiumOptions
:param none: 用于后代继承
:return: None
"""
if not addr_driver_opts or str(type(addr_driver_opts)).endswith(("ChromiumOptions'>", "DriverOptions'>")):
if not addr_driver_opts or isinstance(addr_driver_opts, ChromiumOptions):
self._driver_options = addr_driver_opts or ChromiumOptions(addr_driver_opts)
# 接收浏览器地址和端口
@ -80,7 +71,9 @@ class ChromiumPage(ChromiumBase):
if not self._tab_obj: # 不是传入driver的情况
connect_browser(self._driver_options)
if not tab_id:
json = self._control_session.get(f'http://{self.address}/json').json()
u = f'http://{self.address}/json'
json = self._control_session.get(u).json()
self._control_session.get(u, headers={'Connection': 'close'})
tab_id = [i['id'] for i in json if i['type'] == 'page']
if not tab_id:
raise BrowserConnectError('浏览器连接失败,可能是浏览器版本原因。')
@ -94,7 +87,9 @@ class ChromiumPage(ChromiumBase):
def _page_init(self):
"""页面相关设置"""
ws = self._control_session.get(f'http://{self.address}/json/version').json()['webSocketDebuggerUrl']
u = f'http://{self.address}/json/version'
ws = self._control_session.get(u).json()['webSocketDebuggerUrl']
self._control_session.get(u, headers={'Connection': 'close'})
self._browser_driver = ChromiumDriver(ws.split('/')[-1], 'browser', self.address)
self._browser_driver.start()
@ -104,10 +99,10 @@ class ChromiumPage(ChromiumBase):
self._rect = None
self._main_tab = self.tab_id
try:
self.download_set.by_browser()
except CallMethodError:
pass
# try:
# self.download_set.by_browser()
# except CDPError:
# pass
self._process_id = None
r = self.browser_driver.SystemInfo.getProcessInfo()
@ -131,7 +126,9 @@ class ChromiumPage(ChromiumBase):
@property
def tabs(self):
"""返回所有标签页id组成的列表"""
j = self._control_session.get(f'http://{self.address}/json').json() # 不要改用cdp
u = f'http://{self.address}/json'
j = self._control_session.get(u).json() # 不要改用cdp
self._control_session.get(u, headers={'Connection': 'close'})
return [i['id'] for i in j if i['type'] == 'page']
@property
@ -155,23 +152,23 @@ class ChromiumPage(ChromiumBase):
self._set = ChromiumPageSetter(self)
return self._set
@property
def download_path(self):
"""返回默认下载路径"""
p = self._download_path or ''
return str(Path(p).absolute())
@property
def download_set(self):
"""返回用于设置下载参数的对象"""
if self._download_set is None:
self._download_set = ChromiumDownloadSetter(self)
return self._download_set
@property
def download(self):
"""返回下载器对象"""
return self.download_set._switched_DownloadKit
# @property
# def download_path(self):
# """返回默认下载路径"""
# p = self._download_path or ''
# return str(Path(p).absolute())
#
# @property
# def download_set(self):
# """返回用于设置下载参数的对象"""
# if self._download_set is None:
# self._download_set = BaseDownloadSetter(self)
# return self._download_set
#
# @property
# def download(self):
# """返回下载器对象"""
# return self.download_set._switched_DownloadKit
@property
def rect(self):
@ -194,24 +191,29 @@ class ChromiumPage(ChromiumBase):
tab_id = tab_id or self.tab_id
return ChromiumTab(self, tab_id)
def find_tabs(self, text=None, by_title=True, by_url=None, special=False):
def find_tabs(self, title=None, url=None, tab_type=None, single=True):
"""查找符合条件的tab返回它们的id组成的列表
:param text: 查询条件
:param by_title: 是否匹配title
:param by_url: 是否匹配url
:param special: 是否匹配特殊tab如打印页
:return: tab id组成的列表
:param title: 要匹配title的文本
:param url: 要匹配url的文本
:param tab_type: tab类型可用列表输入多个
:param single: 是否返回首个结果的id为False返回所有信息
:return: tab id或tab dict
"""
tabs = self._control_session.get(f'http://{self.address}/json').json() # 不要改用cdp
if text is None or not (by_title or by_url):
return [i['id'] for i in tabs if (not special and i['type'] == 'page')
or (special and i['type'] not in ('page', 'iframe'))]
u = f'http://{self.address}/json'
tabs = self._control_session.get(u).json() # 不要改用cdp
self._control_session.get(u, headers={'Connection': 'close'})
if isinstance(tab_type, str):
tab_type = {tab_type}
elif isinstance(tab_type, (list, tuple, set)):
tab_type = set(tab_type)
elif tab_type is not None:
raise TypeError('tab_type只能是set、list、tuple、str、None。')
return [i['id'] for i in tabs if ((not special and i['type'] == 'page')
or (special and i['type'] not in ('page', 'iframe')))
and ((by_url and text in i['url']) or (by_title and text in i['title']))]
r = [i for i in tabs if ((title is None or title in i['title']) and (url is None or url in i['url'])
and (tab_type is None or i['type'] in tab_type))]
return r[0]['id'] if r and single else r
def new_tab(self, url=None, switch_to=True):
def new_tab(self, url=None, switch_to=False):
"""新建一个标签页,该标签页在最后面
:param url: 新标签页跳转到的网址
:param switch_to: 新建标签页后是否把焦点移过去
@ -383,13 +385,6 @@ class ChromiumPageWaiter(ChromiumBaseWaiter):
super().__init__(page)
self._listener = None
def download_begin(self, timeout=None):
"""等待浏览器下载开始
:param timeout: 等待超时时间为None则使用页面对象timeout属性
:return: 是否等到下载开始
"""
return self._driver.download_set.wait_download_begin(timeout)
def new_tab(self, timeout=None):
"""等待新标签页出现
:param timeout: 等待超时时间为None则使用页面对象timeout属性
@ -400,6 +395,20 @@ class ChromiumPageWaiter(ChromiumBaseWaiter):
while self._driver.tab_id == self._driver.latest_tab and perf_counter() < end_time:
sleep(.01)
# def download_begin(self, timeout=1.5):
# """等待浏览器下载开始
# :param timeout: 等待超时时间为None则使用页面对象timeout属性
# :return: 是否等到下载开始
# """
# return self._driver.download_set.wait_download_begin(timeout)
#
# def download_finish(self, timeout=None):
# """等待下载结束
# :param timeout: 等待超时时间为None则使用页面对象timeout属性
# :return: 是否等到下载结束
# """
# return self._driver.download_set.wait_download_finish(timeout)
class ChromiumTabRect(object):
def __init__(self, page):
@ -472,125 +481,247 @@ class ChromiumTabRect(object):
return self._page.browser_driver.Browser.getWindowForTarget(targetId=self._page.tab_id)['bounds']
class ChromiumDownloadSetter(DownloadSetter):
"""用于设置下载参数的类"""
# class BaseDownloadSetter(DownloadSetter):
# """用于设置下载参数的类"""
#
# def __init__(self, page):
# """
# :param page: ChromiumPage对象
# """
# super().__init__(page)
# self._behavior = 'allowAndName'
# self._session = None
# self._save_path = ''
# self._rename = None
# self._waiting_download = False
# self._download_begin = False
# self._browser_missions = {}
# self._browser_downloading_count = 0
# self._show_msg = True
#
# @property
# def session(self):
# """返回用于DownloadKit的Session对象"""
# if self._session is None:
# self._session = Session()
# return self._session
#
# @property
# def browser_missions(self):
# """返回浏览器下载任务"""
# return list(self._browser_missions.values())
#
# @property
# def DownloadKit_missions(self):
# """返回DownloadKit下载任务"""
# return list(self.DownloadKit.missions.values())
#
# @property
# def _switched_DownloadKit(self):
# """返回从浏览器同步cookies后的Session对象"""
# self._cookies_to_session()
# return self.DownloadKit
#
# def save_path(self, path):
# """设置下载路径
# :param path: 下载路径
# :return: None
# """
# path = path or ''
# path = Path(path).absolute()
# path.mkdir(parents=True, exist_ok=True)
# path = str(path)
# self._save_path = path
# self._page._download_path = path
# try:
# self._page.browser_driver.Browser.setDownloadBehavior(behavior='allowAndName', downloadPath=path,
# eventsEnabled=True)
# except CDPError:
# warn('\n您的浏览器版本太低用新标签页下载文件可能崩溃建议升级。')
# self._page.run_cdp('Page.setDownloadBehavior', behavior='allowAndName', downloadPath=path)
#
# self.DownloadKit.goal_path = path
#
# def rename(self, name):
# """设置浏览器下一个下载任务的文件名
# :param name: 文件名,不带后缀时自动使用原后缀
# :return: None
# """
# self._rename = name
#
# def by_browser(self):
# """设置使用浏览器下载文件"""
# try:
# self._page.browser_driver.Browser.setDownloadBehavior(behavior='allowAndName', eventsEnabled=True,
# downloadPath=self._page.download_path)
# self._page.browser_driver.Browser.downloadWillBegin = self._download_will_begin
# self._page.browser_driver.Browser.downloadProgress = self._download_progress
# except CDPError:
# self._page.driver.Page.setDownloadBehavior(behavior='allowAndName', downloadPath=self._page.download_path)
# self._page.driver.Page.downloadWillBegin = self._download_will_begin
# self._page.driver.Page.downloadProgress = self._download_progress
#
# self._behavior = 'allowAndName'
#
# def by_DownloadKit(self):
# """设置使用DownloadKit下载文件"""
# try:
# self._page.browser_driver.Browser.setDownloadBehavior(behavior='deny', eventsEnabled=True)
# self._page.browser_driver.Browser.downloadWillBegin = self._download_by_DownloadKit
# except CDPError:
# raise RuntimeError('您的浏览器版本太低,不支持此方法,请升级。')
#
# self._behavior = 'deny'
#
# def wait_download_begin(self, timeout=None):
# """等待浏览器下载开始
# :param timeout: 等待超时时间为None则使用页面对象timeout属性
# :return: 是否等到下载开始
# """
# self._waiting_download = True
# result = False
# timeout = timeout if timeout is not None else self._page.timeout
# end_time = perf_counter() + timeout
# while perf_counter() < end_time:
# if self._download_begin:
# result = True
# break
# sleep(.05)
# self._download_begin = False
# self._waiting_download = False
# return result
#
# def wait_download_finish(self, timeout=None):
# """等待所有下载结束
# :param timeout: 超时时间
# :return: 是否等待到下载完成
# """
# timeout = timeout if timeout is not None else self._page.timeout
# end_time = perf_counter() + timeout
# while perf_counter() < end_time:
# if (self._DownloadKit is None or not self.DownloadKit.is_running) and self._browser_downloading_count == 0:
# return True
# sleep(.5)
# return False
#
# def show_msg(self, on_off=True):
# """是否显示下载信息
# :param on_off: bool表示开或关
# :return: None
# """
# self._show_msg = on_off
#
# def _cookies_to_session(self):
# """把driver对象的cookies复制到session对象"""
# ua = self._page.run_cdp('Runtime.evaluate', expression='navigator.userAgent;')['result']['value']
# self.session.headers.update({"User-Agent": ua})
# set_session_cookies(self.session, self._page.get_cookies(as_dict=False, all_info=False))
#
# def _download_by_DownloadKit(self, **kwargs):
# """拦截浏览器下载并用downloadKit下载"""
# url = kwargs['url']
# if url.startswith('blob:'):
# raise TypeError('bolb:开头的链接无法使用DownloadKit下载请用浏览器下载功能。')
#
# self._page.browser_driver.Browser.cancelDownload(guid=kwargs['guid'])
#
# if self._rename:
# rename = get_rename(kwargs['suggestedFilename'], self._rename)
# self._rename = None
# else:
# rename = kwargs['suggestedFilename']
#
# mission = self._page.download.add(file_url=url, goal_path=self._page.download_path, rename=rename)
# Thread(target=self._wait_download_complete, args=(mission,), daemon=False).start()
#
# if self._waiting_download:
# self._download_begin = True
#
# self._browser_downloading_count += 1
#
# if self._show_msg:
# print(f'(DownloadKit)开始下载:{Path(self._save_path) / rename}')
#
# def _download_will_begin(self, **kwargs):
# """浏览器下载即将开始时调用"""
# if self._rename:
# rename = get_rename(kwargs['suggestedFilename'], self._rename)
# self._rename = None
# else:
# rename = kwargs['suggestedFilename']
#
# m = BrowserDownloadMission(kwargs['guid'], kwargs['url'], rename)
# self._browser_missions[kwargs['guid']] = m
# aid_path = Path(self._save_path) / rename
#
# if self._show_msg:
# print(f'(Browser)开始下载:{rename}')
# self._browser_downloading_count += 1
#
# if self._file_exists == 'skip' and aid_path.exists():
# m.state = 'skipped'
# m.save_path = aid_path.absolute()
# self._page.browser_driver.call_method('Browser.cancelDownload', guid=kwargs['guid'])
# (Path(self._save_path) / kwargs["guid"]).unlink(missing_ok=True)
# return
#
# if self._waiting_download:
# self._download_begin = True
#
# def _download_progress(self, **kwargs):
# """下载状态产生变化时调用"""
# guid = kwargs['guid']
# m = self._browser_missions.get(guid, None)
# if m:
# m.size = kwargs['totalBytes']
# m.received = kwargs['receivedBytes']
# m.state = kwargs['state']
#
# if m.state == 'completed':
# path = Path(self._save_path) / m.name
# from_path = Path(self._save_path) / guid
# if path.exists():
# if self._file_exists == 'rename':
# path = get_usable_path(path)
# else: # 'overwrite'
# path.unlink()
# from_path.rename(path)
# m.save_path = path.absolute()
#
# if kwargs['state'] != 'inProgress':
# if self._show_msg and m:
# if kwargs['state'] == 'completed':
# print(f'(Browser)下载完成:{m.save_path}')
# elif m.state != 'skipped':
# print(f'(Browser)下载失败:{m.save_path}')
# else:
# print(f'(Browser)已跳过:{m.save_path}')
# self._browser_downloading_count -= 1
#
# def _wait_download_complete(self, mission):
# """等待DownloadKit下载完成"""
# mission.wait(show=False)
# if self._show_msg:
# if mission.result == 'skip':
# print(f'(DownloadKit)已跳过:{mission.path}')
# elif not mission.result:
# print(f'(DownloadKit)下载失败:{mission.path}')
# else:
# print(f'(DownloadKit)下载完成:{mission.path}')
def __init__(self, page):
"""
:param page: ChromiumPage对象
"""
super().__init__(page)
self._behavior = 'allow'
self._download_th = None
self._session = None
self._waiting_download = False
self._download_begin = False
@property
def session(self):
"""返回用于DownloadKit的Session对象"""
if self._session is None:
self._session = Session()
return self._session
class BrowserDownloadMission(object):
def __init__(self, guid, url, name):
self.id = guid
self.url = url
self.name = name
self.save_path = None
self.state = None
self.size = None
self.received = None
@property
def _switched_DownloadKit(self):
"""返回从浏览器同步cookies后的Session对象"""
self._cookies_to_session()
return self.DownloadKit
def save_path(self, path):
"""设置下载路径
:param path: 下载路径
:return: None
"""
path = path or ''
path = Path(path).absolute()
path.mkdir(parents=True, exist_ok=True)
path = str(path)
self._page._download_path = path
try:
self._page.browser_driver.Browser.setDownloadBehavior(behavior='allow', downloadPath=path,
eventsEnabled=True)
except CallMethodError:
warn('\n您的浏览器版本太低,用新标签页下载文件可能崩溃,建议升级。')
self._page.run_cdp('Page.setDownloadBehavior', behavior='allow', downloadPath=path)
self.DownloadKit.goal_path = path
def by_browser(self):
"""设置使用浏览器下载文件"""
try:
self._page.browser_driver.Browser.setDownloadBehavior(behavior='allow', eventsEnabled=True,
downloadPath=self._page.download_path)
self._page.browser_driver.Browser.downloadWillBegin = self._download_by_browser
except CallMethodError:
self._page.driver.Page.setDownloadBehavior(behavior='allow', downloadPath=self._page.download_path)
self._page.driver.Page.downloadWillBegin = self._download_by_browser
self._behavior = 'allow'
def by_DownloadKit(self):
"""设置使用DownloadKit下载文件"""
try:
self._page.browser_driver.Browser.setDownloadBehavior(behavior='deny', eventsEnabled=True)
self._page.browser_driver.Browser.downloadWillBegin = self._download_by_DownloadKit
except CallMethodError:
raise RuntimeError('您的浏览器版本太低,不支持此方法,请升级。')
self._behavior = 'deny'
def wait_download_begin(self, timeout=None):
"""等待浏览器下载开始
:param timeout: 等待超时时间为None则使用页面对象timeout属性
:return: 是否等到下载开始
"""
self._waiting_download = True
result = False
timeout = timeout if timeout is not None else self._page.timeout
end_time = perf_counter() + timeout
while perf_counter() < end_time:
if self._download_begin:
result = True
break
sleep(.05)
self._download_begin = False
self._waiting_download = False
return result
def _cookies_to_session(self):
"""把driver对象的cookies复制到session对象"""
ua = self._page.run_cdp('Runtime.evaluate', expression='navigator.userAgent;')['result']['value']
self.session.headers.update({"User-Agent": ua})
set_session_cookies(self.session, self._page.get_cookies(as_dict=False, all_info=False))
def _download_by_DownloadKit(self, **kwargs):
"""拦截浏览器下载并用downloadKit下载"""
url = kwargs['url']
if url.startswith('blob:'):
self._page.browser_driver.Browser.setDownloadBehavior(behavior='allow', eventsEnabled=True,
downloadPath=self._page.download_path)
sleep(2)
self._page.browser_driver.Browser.setDownloadBehavior(behavior='deny', eventsEnabled=True)
else:
self._page.browser_driver.Browser.cancelDownload(guid=kwargs['guid'])
self._page.download.add(file_url=url, goal_path=self._page.download_path,
rename=kwargs['suggestedFilename'])
if self._download_th is None or not self._download_th.is_alive():
self._download_th = Thread(target=self._wait_download_complete, daemon=False)
self._download_th.start()
if self._waiting_download:
self._download_begin = True
def _download_by_browser(self, **kwargs):
"""使用浏览器下载时调用"""
if self._waiting_download:
self._download_begin = True
def _wait_download_complete(self):
"""等待下载完成"""
self._page.download.wait()
def __repr__(self):
return f'<BrowserDownloadMission {self.save_path}>'
class Alert(object):
@ -788,3 +919,11 @@ def get_chrome_hwnds_from_pid(pid, title):
hwnds = []
EnumWindows(callback, hwnds)
return hwnds
def get_rename(original, rename):
if '.' in rename:
return rename
else:
suffix = original[original.rfind('.'):] if '.' in original else ''
return f'{rename}{suffix}'

View File

@ -5,41 +5,39 @@
"""
from os import popen
from pathlib import Path
from threading import Thread
from typing import Union, Tuple, List
from typing import Union, Tuple, List, Dict
from DownloadKit import DownloadKit
from DownloadKit.mission import Mission
from requests import Session
from .chromium_base import ChromiumBase, ChromiumBaseSetter, ChromiumBaseWaiter, NetworkListener
from .chromium_base import ChromiumBase, ChromiumBaseSetter, ChromiumBaseWaiter
from .chromium_driver import ChromiumDriver
from .chromium_tab import ChromiumTab
from .configs.chromium_options import ChromiumOptions
from .configs.driver_options import DriverOptions
from .network_listener import NetworkListener
from .session_page import DownloadSetter
class ChromiumPage(ChromiumBase):
def __init__(self,
addr_driver_opts: Union[str, int, ChromiumOptions, ChromiumDriver, DriverOptions] = None,
addr_driver_opts: Union[str, int, ChromiumOptions, ChromiumDriver] = None,
tab_id: str = None,
timeout: float = None):
self._driver_options: [ChromiumDriver, DriverOptions] = ...
self._driver_options: ChromiumOptions = ...
self._process_id: str = ...
self._window_setter: WindowSetter = ...
self._main_tab: str = ...
self._alert: Alert = ...
self._download_path: str = ...
self._download_set: ChromiumDownloadSetter = ...
self._browser_driver: ChromiumDriver = ...
self._rect: ChromiumTabRect = ...
def _connect_browser(self,
addr_driver_opts: Union[str, ChromiumDriver, DriverOptions] = None,
addr_driver_opts: Union[str, ChromiumDriver] = None,
tab_id: str = None) -> None: ...
def _set_start_options(self, addr_driver_opts: Union[str, ChromiumDriver, DriverOptions], none) -> None: ...
def _set_start_options(self, addr_driver_opts: Union[str, ChromiumDriver], none) -> None: ...
def _page_init(self) -> None: ...
@ -70,21 +68,12 @@ class ChromiumPage(ChromiumBase):
@property
def set(self) -> ChromiumPageSetter: ...
@property
def download_set(self) -> ChromiumDownloadSetter: ...
@property
def download(self) -> DownloadKit: ...
@property
def download_path(self) -> str: ...
def get_tab(self, tab_id: str = None) -> ChromiumTab: ...
def find_tabs(self, text: str = None, by_title: bool = True, by_url: bool = None,
special: bool = False) -> List[str]: ...
def find_tabs(self, title: str = None, url: str = None,
tab_type: Union[str, list, tuple, set] = None, single: bool = True) -> Union[str, List[str]]: ...
def new_tab(self, url: str = None, switch_to: bool = True) -> str: ...
def new_tab(self, url: str = None, switch_to: bool = False) -> str: ...
def to_main_tab(self) -> None: ...
@ -113,7 +102,9 @@ class ChromiumPageWaiter(ChromiumBaseWaiter):
_driver: ChromiumPage = ...
_listener: Union[NetworkListener, None] = ...
def download_begin(self, timeout: float = None) -> bool: ...
def download_begin(self, timeout: float = 1.5) -> bool: ...
def download_finish(self, timeout: float = None) -> bool: ...
def new_tab(self, timeout: float = None) -> bool: ...
@ -151,36 +142,65 @@ class ChromiumTabRect(object):
def _get_browser_rect(self) -> dict: ...
class ChromiumDownloadSetter(DownloadSetter):
class BaseDownloadSetter(DownloadSetter):
def __init__(self, page: ChromiumPage):
self._page: ChromiumPage = ...
self._behavior: str = ...
self._download_th: Thread = ...
self._session: Session = None
self._session: Session = ...
self._save_path: str = ...
self._rename: str = ...
self._waiting_download: bool = ...
self._download_begin: bool = ...
self._browser_missions: Dict[str, BrowserDownloadMission] = ...
self._browser_downloading_count: int = ...
self._show_msg: bool = ...
@property
def session(self) -> Session: ...
@property
def browser_missions(self) -> List[BrowserDownloadMission]: ...
@property
def DownloadKit_missions(self) -> List[Mission]: ...
@property
def _switched_DownloadKit(self) -> DownloadKit: ...
def save_path(self, path: Union[str, Path]) -> None: ...
def rename(self, name: str) -> None: ...
def by_browser(self) -> None: ...
def by_DownloadKit(self) -> None: ...
def wait_download_begin(self, timeout: float = None) -> bool: ...
def wait_download_finish(self, timeout: float = None) -> bool: ...
def show_msg(self, on_off: bool = True) -> None: ...
def _cookies_to_session(self) -> None: ...
def _download_by_DownloadKit(self, **kwargs) -> None: ...
def _download_by_browser(self, **kwargs) -> None: ...
def _download_will_begin(self, **kwargs) -> None: ...
def _wait_download_complete(self) -> None: ...
def _download_progress(self, **kwargs) -> None: ...
def _wait_download_complete(self, mission: Mission) -> None: ...
class BrowserDownloadMission(object):
def __init__(self, guid: str, url: str, name: str):
self.id: str = ...
self.url: str = ...
self.name: str = ...
self.save_path: str = ...
self.state: str = ...
self.size: str = ...
self.received: str = ...
class Alert(object):
@ -239,3 +259,6 @@ class ChromiumPageSetter(ChromiumBaseSetter):
def window(self) -> WindowSetter: ...
def tab_to_front(self, tab_or_id: Union[str, ChromiumTab] = None) -> None: ...
def get_rename(original: str, rename: str) -> str: ...

View File

@ -7,7 +7,7 @@ from copy import copy
from .chromium_base import ChromiumBase, ChromiumBaseSetter
from .commons.web import set_session_cookies, set_browser_cookies
from .session_page import SessionPage, SessionPageSetter, DownloadSetter
from .session_page import SessionPage, SessionPageSetter
class ChromiumTab(ChromiumBase):
@ -28,6 +28,10 @@ class ChromiumTab(ChromiumBase):
self.retry_interval = self.page.retry_interval
self._page_load_strategy = self.page.page_load_strategy
def close(self):
"""关闭当前标签页"""
self.page.close_tabs(self.tab_id)
@property
def rect(self):
"""返回获取窗口坐标和大小的对象"""
@ -48,11 +52,12 @@ class WebPageTab(SessionPage, ChromiumTab):
self._has_driver = True
self._has_session = True
self._session = copy(page.session)
self._response = None
self._download_set = None
self._download_path = None
self._set = None
self._download_set = None
self._download_path = page.download_path
self._DownloadKit = None
super(SessionPage, self)._set_runtime_settings()
self._connect_browser(tab_id)
@ -120,6 +125,14 @@ class WebPageTab(SessionPage, ChromiumTab):
"""以dict方式返回cookies"""
return super().cookies
@property
def user_agent(self):
"""返回user agent"""
if self._mode == 's':
return super().user_agent
elif self._mode == 'd':
return super(SessionPage, self).user_agent
@property
def session(self):
"""返回Session对象如未初始化则按配置信息创建"""
@ -152,18 +165,6 @@ class WebPageTab(SessionPage, ChromiumTab):
self._set = WebPageTabSetter(self)
return self._set
@property
def download_set(self):
"""返回下载设置对象"""
if self._download_set is None:
self._download_set = WebPageTabDownloadSetter(self)
return self._download_set
@property
def download(self):
"""返回下载器对象"""
return self.download_set._switched_DownloadKit
def get(self, url, show_errmsg=False, retry=None, interval=None, timeout=None, **kwargs):
"""跳转到一个url
:param url: 目标url
@ -292,17 +293,12 @@ class WebPageTab(SessionPage, ChromiumTab):
selenium_user_agent = self.run_cdp('Runtime.evaluate', expression='navigator.userAgent;')['result']['value']
self.session.headers.update({"User-Agent": selenium_user_agent})
# set_session_cookies(self.session, self._get_driver_cookies(as_dict=True))
# set_session_cookies(self.session, self._get_driver_cookies(all_domains=True))
set_session_cookies(self.session, self._get_driver_cookies())
set_session_cookies(self.session, super(SessionPage, self).get_cookies())
def cookies_to_browser(self):
"""把session对象的cookies复制到浏览器"""
if not self._has_driver:
return
# set_browser_cookies(self, super().get_cookies(as_dict=True))
# set_browser_cookies(self, super().get_cookies(all_domains=True))
set_browser_cookies(self, super().get_cookies())
def get_cookies(self, as_dict=False, all_domains=False, all_info=False):
@ -315,22 +311,7 @@ class WebPageTab(SessionPage, ChromiumTab):
if self._mode == 's':
return super().get_cookies(as_dict, all_domains, all_info)
elif self._mode == 'd':
return self._get_driver_cookies(as_dict, all_info)
def _get_driver_cookies(self, as_dict=False, all_info=False):
"""获取浏览器cookies
:param as_dict: 是否以dict形式返回为True时all_info无效
:param all_info: 是否返回所有信息为False时只返回namevaluedomain
:return: cookies信息
"""
cookies = self.run_cdp('Network.getCookies')['cookies']
if as_dict:
return {cookie['name']: cookie['value'] for cookie in cookies}
elif all_info:
return cookies
else:
return [{'name': cookie['name'], 'value': cookie['value'], 'domain': cookie['domain']}
for cookie in cookies]
return super(SessionPage, self).get_cookies(as_dict, all_domains, all_info)
def _find_elements(self, loc_or_ele, timeout=None, single=True, relative=False, raise_err=None):
"""返回页面中符合条件的元素、属性或节点文本,默认返回第一个
@ -355,7 +336,7 @@ class WebPageTabSetter(ChromiumBaseSetter):
self._chromium_setter = ChromiumBaseSetter(self._page)
def cookies(self, cookies):
"""添加cookies信息到浏览器或session对象
"""添加多个cookies信息到浏览器或session对象,注意不要传入单个
:param cookies: 可以接收`CookieJar``list``tuple``str``dict`格式的`cookies`
:return: None
"""
@ -382,18 +363,3 @@ class WebPageTabSetter(ChromiumBaseSetter):
self._chromium_setter.user_agent(ua, platform)
class WebPageTabDownloadSetter(DownloadSetter):
"""用于设置下载参数的类"""
def __init__(self, page):
super().__init__(page)
self._session = page.session
@property
def _switched_DownloadKit(self):
"""返回从浏览器同步cookies后的Session对象"""
if self._page.mode == 'd':
ua = self._page.run_cdp('Runtime.evaluate', expression='navigator.userAgent;')['result']['value']
self._page.session.headers.update({"User-Agent": ua})
set_session_cookies(self._page.session, self._page.get_cookies(as_dict=False, all_domains=False))
return self.DownloadKit

View File

@ -5,7 +5,6 @@
"""
from typing import Union, Tuple, Any, List
from DownloadKit import DownloadKit
from requests import Session, Response
from .chromium_base import ChromiumBase, ChromiumBaseSetter
@ -13,7 +12,7 @@ from .chromium_element import ChromiumElement
from .chromium_frame import ChromiumFrame
from .chromium_page import ChromiumPage, ChromiumTabRect
from .session_element import SessionElement
from .session_page import SessionPage, SessionPageSetter, DownloadSetter
from .session_page import SessionPage, SessionPageSetter
from .web_page import WebPage
@ -24,6 +23,8 @@ class ChromiumTab(ChromiumBase):
def _set_runtime_settings(self) -> None: ...
def close(self) -> None: ...
@property
def rect(self) -> ChromiumTabRect: ...
@ -34,8 +35,6 @@ class WebPageTab(SessionPage, ChromiumTab):
self._mode: str = ...
self._has_driver = ...
self._has_session = ...
self._download_set = ...
self._download_path = ...
def __call__(self,
loc_or_str: Union[Tuple[str, str], str, ChromiumElement, SessionElement],
@ -65,6 +64,9 @@ class WebPageTab(SessionPage, ChromiumTab):
@property
def cookies(self) -> dict: ...
@property
def user_agent(self) -> str: ...
@property
def session(self) -> Session: ...
@ -119,8 +121,6 @@ class WebPageTab(SessionPage, ChromiumTab):
def get_cookies(self, as_dict: bool = False, all_domains: bool = False,
all_info: bool = False) -> Union[dict, list]: ...
def _get_driver_cookies(self, as_dict: bool = False, all_info: bool = False) -> dict: ...
# ----------------重写SessionPage的函数-----------------------
def post(self,
url: str,
@ -145,12 +145,6 @@ class WebPageTab(SessionPage, ChromiumTab):
@property
def set(self) -> WebPageTabSetter: ...
@property
def download(self) -> DownloadKit: ...
@property
def download_set(self) -> WebPageTabDownloadSetter: ...
def _find_elements(self, loc_or_ele: Union[Tuple[str, str], str, ChromiumElement, SessionElement, ChromiumFrame],
timeout: float = None, single: bool = True, relative: bool = False, raise_err: bool = None) \
-> Union[ChromiumElement, SessionElement, ChromiumFrame, str, None, List[Union[SessionElement, str]], List[
@ -167,13 +161,3 @@ class WebPageTabSetter(ChromiumBaseSetter):
def headers(self, headers: dict) -> None: ...
def cookies(self, cookies) -> None: ...
class WebPageTabDownloadSetter(DownloadSetter):
"""用于设置下载参数的类"""
def __init__(self, page: WebPageTab):
self._page: WebPageTab = ...
@property
def _switched_DownloadKit(self) -> DownloadKit: ...

7
DrissionPage/common.pyi Normal file
View File

@ -0,0 +1,7 @@
# -*- coding:utf-8 -*-
from .session_element import make_session_ele as make_session_ele
from .action_chains import ActionChains as ActionChains
from .commons.keys import Keys as Keys
from .commons.by import By as By
from .commons.constants import Settings as Settings

View File

@ -11,14 +11,13 @@ from time import perf_counter, sleep
from requests import get as requests_get
from DrissionPage.configs.chromium_options import ChromiumOptions
from DrissionPage.errors import BrowserConnectError
from .tools import port_is_using
def connect_browser(option):
"""连接或启动浏览器
:param option: DriverOptions对象
:param option: ChromiumOptions对象
:return: chrome 路径和进程对象组成的元组
"""
debugger_address = option.debugger_address.replace('localhost', '127.0.0.1').lstrip('http://').lstrip('https://')
@ -55,8 +54,8 @@ def connect_browser(option):
def get_launch_args(opt):
"""DriverOptions获取命令行启动参数
:param opt: DriverOptions或ChromiumOptions
"""ChromiumOptions获取命令行启动参数
:param opt: ChromiumOptions
:return: 启动参数列表
"""
# ----------处理arguments-----------
@ -87,7 +86,7 @@ def get_launch_args(opt):
result = list(result)
# ----------处理插件extensions-------------
ext = opt.extensions if isinstance(opt, ChromiumOptions) else opt._extension_files
ext = opt.extensions
if ext:
ext = ','.join(set(ext))
ext = f'--load-extension={ext}'
@ -98,15 +97,11 @@ def get_launch_args(opt):
def set_prefs(opt):
"""处理启动配置中的prefs项目前只能对已存在文件夹配置
:param opt: DriverOptions或ChromiumOptions
:param opt: ChromiumOptions
:return: None
"""
if isinstance(opt, ChromiumOptions):
prefs = opt.preferences
del_list = opt._prefs_to_del
else:
prefs = opt.experimental_options.get('prefs', [])
del_list = []
prefs = opt.preferences
del_list = opt._prefs_to_del
if not opt.user_data_path:
return
@ -150,7 +145,9 @@ def test_connect(ip, port):
end_time = perf_counter() + 30
while perf_counter() < end_time:
try:
tabs = requests_get(f'http://{ip}:{port}/json', timeout=10).json()
u = f'http://{ip}:{port}/json'
tabs = requests_get(u, timeout=10, proxies={'http': None, 'https': None}).json()
requests_get(u, headers={'Connection': 'close'}, proxies={'http': None, 'https': None})
for tab in tabs:
if tab['type'] == 'page':
return

View File

@ -3,16 +3,13 @@
@Author : g1879
@Contact : g1879@qq.com
"""
from typing import Union
from DrissionPage.configs.chromium_options import ChromiumOptions
from DrissionPage.configs.driver_options import DriverOptions
def connect_browser(option: Union[ChromiumOptions, DriverOptions]) -> tuple: ...
def connect_browser(option: ChromiumOptions) -> tuple: ...
def get_launch_args(opt: Union[ChromiumOptions, DriverOptions]) -> list: ...
def get_launch_args(opt: ChromiumOptions) -> list: ...
def set_prefs(opt: Union[ChromiumOptions, DriverOptions]) -> None: ...
def set_prefs(opt: ChromiumOptions) -> None: ...

View File

@ -6,47 +6,6 @@
from pathlib import Path
from re import search, sub
from shutil import rmtree
from zipfile import ZipFile
def get_exe_from_port(port):
"""获取端口号第一条进程的可执行文件路径
:param port: 端口号
:return: 可执行文件的绝对路径
"""
from os import popen
pid = get_pid_from_port(port)
if not pid:
return
else:
file_lst = popen(f'wmic process where processid={pid} get executablepath').read().split('\n')
return file_lst[2].strip() if len(file_lst) > 2 else None
def get_pid_from_port(port):
"""获取端口号第一条进程的pid
:param port: 端口号
:return: 进程id
"""
from platform import system
if system().lower() != 'windows' or port is None:
return None
from os import popen
from time import perf_counter
try: # 避免Anaconda中可能产生的报错
process = popen(f'netstat -ano |findstr {port}').read().split('\n')[0]
t = perf_counter()
while not process and perf_counter() - t < 5:
process = popen(f'netstat -ano |findstr {port}').read().split('\n')[0]
return process.split(' ')[-1] or None
except Exception:
return None
def get_usable_path(path):
@ -142,11 +101,41 @@ def clean_folder(folder_path, ignore=None):
elif f.is_dir():
rmtree(f, True)
def unzip(zip_path, to_path):
"""解压下载的chromedriver.zip文件"""
if not zip_path:
return
with ZipFile(zip_path, 'r') as f:
return [f.extract(f.namelist()[0], path=to_path)]
# def get_exe_from_port(port):
# """获取端口号第一条进程的可执行文件路径
# :param port: 端口号
# :return: 可执行文件的绝对路径
# """
# from os import popen
#
# pid = get_pid_from_port(port)
# if not pid:
# return
# else:
# file_lst = popen(f'wmic process where processid={pid} get executablepath').read().split('\n')
# return file_lst[2].strip() if len(file_lst) > 2 else None
#
#
# def get_pid_from_port(port):
# """获取端口号第一条进程的pid
# :param port: 端口号
# :return: 进程id
# """
# from platform import system
# if system().lower() != 'windows' or port is None:
# return None
#
# from os import popen
# from time import perf_counter
#
# try: # 避免Anaconda中可能产生的报错
# process = popen(f'netstat -ano |findstr {port}').read().split('\n')[0]
#
# t = perf_counter()
# while not process and perf_counter() - t < 5:
# process = popen(f'netstat -ano |findstr {port}').read().split('\n')[0]
#
# return process.split(' ')[-1] or None
#
# except Exception:
# return None

View File

@ -7,10 +7,10 @@ from pathlib import Path
from typing import Union
def get_exe_from_port(port: Union[str, int]) -> Union[str, None]: ...
# def get_exe_from_port(port: Union[str, int]) -> Union[str, None]: ...
def get_pid_from_port(port: Union[str, int]) -> Union[str, None]: ...
# def get_pid_from_port(port: Union[str, int]) -> Union[str, None]: ...
def get_usable_path(path: Union[str, Path]) -> Path: ...
@ -26,6 +26,3 @@ def port_is_using(ip: str, port: Union[str, int]) -> bool: ...
def clean_folder(folder_path: Union[str, Path], ignore: Union[tuple, list] = None) -> None: ...
def unzip(zip_path: str, to_path: str) -> Union[list, None]: ...

View File

@ -3,103 +3,15 @@
@Author : g1879
@Contact : g1879@qq.com
"""
from base64 import b64decode
from html import unescape
from http.cookiejar import Cookie
from json import loads, JSONDecodeError
from re import sub
from urllib.parse import urlparse, urljoin, urlunparse
from requests.cookies import RequestsCookieJar
from requests.structures import CaseInsensitiveDict
from tldextract import extract
class ResponseData(object):
"""返回的数据包管理类"""
__slots__ = ('requestId', 'response', 'rawBody', 'tab', 'target', 'url', 'status', 'statusText', 'securityDetails',
'headersText', 'mimeType', 'requestHeadersText', 'connectionReused', 'connectionId', 'remoteIPAddress',
'remotePort', 'fromDiskCache', 'fromServiceWorker', 'fromPrefetchCache', 'encodedDataLength', 'timing',
'serviceWorkerResponseSource', 'responseTime', 'cacheStorageCacheName', 'protocol', 'securityState',
'_requestHeaders', '_body', '_base64_body', '_rawPostData', '_postData', 'method')
def __init__(self, request_id, response, body, tab, target):
"""
:param response: response的数据
:param body: response包含的内容
:param tab: 产生这个数据包的tab的id
:param target: 监听目标
"""
self.requestId = request_id
self.response = CaseInsensitiveDict(response)
self.rawBody = body
self.tab = tab
self.target = target
self._requestHeaders = None
self._postData = None
self._body = None
self._base64_body = False
self._rawPostData = None
def __getattr__(self, item):
return self.response.get(item, None)
def __getitem__(self, item):
return self.response.get(item, None)
def __repr__(self):
return f'<ResponseData target={self.target} request_id={self.requestId}>'
@property
def headers(self):
"""以大小写不敏感字典返回headers数据"""
headers = self.response.get('headers', None)
return CaseInsensitiveDict(headers) if headers else None
@property
def requestHeaders(self):
"""以大小写不敏感字典返回requestHeaders数据"""
if self._requestHeaders:
return self._requestHeaders
headers = self.response.get('requestHeaders', None)
return CaseInsensitiveDict(headers) if headers else None
@requestHeaders.setter
def requestHeaders(self, val):
"""设置requestHeaders"""
self._requestHeaders = val
@property
def postData(self):
"""返回postData数据"""
if self._postData is None and self._rawPostData:
try:
self._postData = loads(self._rawPostData)
except (JSONDecodeError, TypeError):
self._postData = self._rawPostData
return self._postData
@postData.setter
def postData(self, val):
"""设置postData"""
self._rawPostData = val
@property
def body(self):
"""返回body内容如果是json格式自动进行转换如果时图片格式进行base64转换其它格式直接返回文本"""
if self._body is None:
if self._base64_body:
self._body = b64decode(self.rawBody)
else:
try:
self._body = loads(self.rawBody)
except (JSONDecodeError, TypeError):
self._body = self.rawBody
return self._body
def get_ele_txt(e):
"""获取元素内所有文本
:param e: 元素对象
@ -190,8 +102,6 @@ def location_in_viewport(page, loc_x, loc_y):
if (x< scrollLeft || y < scrollTop || x > vWidth + scrollLeft || y > vHeight + scrollTop){{return false;}}
return true;}}'''
return page.run_js(js)
# const vWidth = window.innerWidth || document.documentElement.clientWidth;
# const vHeight = window.innerHeight || document.documentElement.clientHeight;
def offset_scroll(ele, offset_x, offset_y):
@ -334,8 +244,7 @@ def set_browser_cookies(page, cookies):
:param cookies: cookies信息
:return: None
"""
cookies = cookies_to_tuple(cookies)
for cookie in cookies:
for cookie in cookies_to_tuple(cookies):
if 'expiry' in cookie:
cookie['expires'] = int(cookie['expiry'])
cookie.pop('expiry')
@ -343,6 +252,15 @@ def set_browser_cookies(page, cookies):
cookie['expires'] = int(cookie['expires'])
if cookie['value'] is None:
cookie['value'] = ''
if cookie['name'].startswith('__Secure-'):
cookie['secure'] = True
if cookie['name'].startswith('__Host-'):
cookie['path'] = '/'
cookie['secure'] = True
cookie['url'] = page.url
page.run_cdp_loaded('Network.setCookie', **cookie)
continue # 不用设置域名,可退出
if cookie.get('domain', None):
try:
@ -376,7 +294,13 @@ def is_cookie_in_driver(page, cookie):
:param cookie: dict格式cookie
:return: bool
"""
for c in page.get_cookies():
if cookie['name'] == c['name'] and cookie['value'] == c['value']:
return True
if 'domain' in cookie:
for c in page.get_cookies(all_domains=True):
if cookie['name'] == c['name'] and cookie['value'] == c['value'] and cookie['domain'] == c.get('domain',
None):
return True
else:
for c in page.get_cookies(all_domains=True):
if cookie['name'] == c['name'] and cookie['value'] == c['value']:
return True
return False

View File

@ -8,73 +8,12 @@ from typing import Union
from requests import Session
from requests.cookies import RequestsCookieJar
from requests.structures import CaseInsensitiveDict
from DrissionPage.base import DrissionElement, BasePage
from DrissionPage.chromium_element import ChromiumElement
from DrissionPage.chromium_base import ChromiumBase
class ResponseData(object):
def __init__(self, request_id: str, response: dict, body: str, tab: str, target: str):
self.requestId: str = ...
self.response: CaseInsensitiveDict = ...
self.rawBody: str = ...
self._body: Union[str, dict, bytes] = ...
self._base64_body: bool = ...
self.tab: str = ...
self.target: str = ...
self.method: str = ...
self._postData: dict = ...
self._rawPostData: str = ...
self.url: str = ...
self.status: str = ...
self.statusText: str = ...
self.headersText: str = ...
self.mimeType: str = ...
self.requestHeadersText: str = ...
self.connectionReused: str = ...
self.connectionId: str = ...
self.remoteIPAddress: str = ...
self.remotePort: str = ...
self.fromDiskCache: str = ...
self.fromServiceWorker: str = ...
self.fromPrefetchCache: str = ...
self.encodedDataLength: str = ...
self.timing: str = ...
self.serviceWorkerResponseSource: str = ...
self.responseTime: str = ...
self.cacheStorageCacheName: str = ...
self.protocol: str = ...
self.securityState: str = ...
self.securityDetails: str = ...
def __getattr__(self, item: str) -> Union[str, None]: ...
def __getitem__(self, item: str) -> Union[str, None]: ...
def __repr__(self) -> str: ...
@property
def headers(self) -> Union[CaseInsensitiveDict, None]: ...
@property
def requestHeaders(self) -> Union[CaseInsensitiveDict, None]: ...
@requestHeaders.setter
def requestHeaders(self, val:dict) -> None: ...
@property
def postData(self) -> Union[dict, str, None]: ...
@postData.setter
def postData(self, val: Union[str, dict]) -> None: ...
@property
def body(self) -> Union[str, dict, bytes]: ...
def get_ele_txt(e: DrissionElement) -> str: ...

View File

@ -26,7 +26,7 @@ class ChromiumOptions(object):
self.ini_path = om.ini_path
options = om.chrome_options
self._download_path = om.paths.get('download_path', None)
self._download_path = om.paths.get('download_path', '')
self._arguments = options.get('arguments', [])
self._binary_location = options.get('binary_location', '')
self._extensions = options.get('extensions', [])
@ -62,7 +62,7 @@ class ChromiumOptions(object):
self.ini_path = None
self._binary_location = "chrome"
self._arguments = []
self._download_path = None
self._download_path = ''
self._extensions = []
self._prefs = {}
self._timeouts = {'implicit': 10, 'pageLoad': 30, 'script': 30}

View File

@ -1,11 +1,10 @@
[paths]
chromedriver_path =
download_path =
[chrome_options]
debugger_address = 127.0.0.1:9222
binary_location = chrome
arguments = ['--remote-allow-origins=*', '--no-first-run', '--disable-gpu', '--disable-infobars', '--disable-popup-blocking']
arguments = ['--remote-allow-origins=*', '--no-first-run', '--disable-infobars', '--disable-popup-blocking']
extensions = []
experimental_options = {'prefs': {'profile.default_content_settings.popups': 0, 'profile.default_content_setting_values': {'notifications': 2}}}
page_load_strategy = normal

View File

@ -21,7 +21,7 @@ class SessionOptions(object):
:param ini_path: ini文件路径
"""
self.ini_path = None
self._download_path = None
self._download_path = ''
self._headers = None
self._cookies = None
self._auth = None
@ -73,7 +73,7 @@ class SessionOptions(object):
self.set_proxies(om.proxies.get('http', None), om.proxies.get('https', None))
self._timeout = om.timeouts.get('implicit', 10)
self._download_path = om.paths.get('download_path', None)
self._download_path = om.paths.get('download_path', '')
# ===========须独立处理的项开始============
@property
@ -110,14 +110,13 @@ class SessionOptions(object):
self._proxies = {}
return self._proxies
def set_proxies(self, http, https=None):
def set_proxies(self, http=None, https=None):
"""设置proxies参数
:param http: http代理地址
:param https: https代理地址
:return: 返回当前对象
"""
proxies = None if http == https is None else {'http': http, 'https': https or http}
self._sets('proxies', proxies)
self._sets('proxies', {'http': http, 'https': https})
return self
# ===========须独立处理的项结束============

View File

@ -6,20 +6,10 @@
from os import popen
from pathlib import Path
from re import search
from typing import Union
from .commons.constants import Settings
from .commons.tools import unzip
from .configs.chromium_options import ChromiumOptions
from .configs.options_manage import OptionsManager
from .session_page import SessionPage
try:
from selenium import webdriver
from DrissionPage.mixpage.drission import Drission
from .configs.driver_options import DriverOptions
except ModuleNotFoundError:
pass
def raise_when_ele_not_found(on_off=True):
@ -48,19 +38,14 @@ def show_settings(ini_path=None):
OptionsManager(ini_path).show()
def set_paths(driver_path=None,
chrome_path=None,
browser_path=None,
def set_paths(browser_path=None,
local_port=None,
debugger_address=None,
download_path=None,
user_data_path=None,
cache_path=None,
ini_path=None,
check_version=False):
ini_path=None):
"""快捷的路径设置函数
:param driver_path: chromedriver.exe路径
:param chrome_path: 浏览器可执行文件路径
:param browser_path: 浏览器可执行文件路径
:param local_port: 本地端口号
:param debugger_address: 调试浏览器地址127.0.0.1:9222
@ -68,7 +53,6 @@ def set_paths(driver_path=None,
:param user_data_path: 用户数据路径
:param cache_path: 缓存路径
:param ini_path: 要修改的ini文件路径
:param check_version: 是否检查chromedriver和chrome是否匹配
:return: None
"""
om = OptionsManager(ini_path)
@ -76,12 +60,6 @@ def set_paths(driver_path=None,
def format_path(path: str) -> str:
return str(path) if path else ''
if driver_path is not None:
om.set_item('paths', 'chromedriver_path', format_path(driver_path))
if chrome_path is not None:
om.set_item('chrome_options', 'binary_location', format_path(chrome_path))
if browser_path is not None:
om.set_item('chrome_options', 'binary_location', format_path(browser_path))
@ -103,9 +81,6 @@ def set_paths(driver_path=None,
if cache_path is not None:
set_argument('--disk-cache-dir', format_path(cache_path), ini_path)
if check_version:
check_driver_version(format_path(driver_path), format_path(browser_path))
def use_auto_port(on_off=True, ini_path=None):
"""设置启动浏览器时使用自动分配的端口和临时文件夹
@ -203,89 +178,6 @@ def set_proxy(proxy, ini_path=None):
set_argument('--proxy-server', proxy, ini_path)
def check_driver_version(driver_path=None, chrome_path=None):
"""检查传入的chrome和chromedriver是否匹配
:param driver_path: chromedriver.exe路径
:param chrome_path: chrome.exe路径
:return: 是否匹配
"""
print('正在检测可用性...')
om = OptionsManager()
driver_path = driver_path or om.get_value('paths', 'chromedriver_path') or 'chromedriver'
chrome_path = str(chrome_path or om.get_value('chrome_options', 'binary_location'))
do = DriverOptions(read_file=False)
do.add_argument('--headless')
if chrome_path:
do.binary_location = chrome_path
try:
driver = webdriver.Chrome(driver_path, options=do)
driver.quit()
print('版本匹配,可正常使用。')
return True
except Exception as e:
print(f'出现异常:\n{e}\n可执行easy_set.get_match_driver()自动下载匹配的版本。\n'
f'或自行从以下网址下载http://npm.taobao.org/mirrors/chromedriver/')
return False
# -------------------------自动识别chrome版本号并下载对应driver------------------------
def get_match_driver(ini_path='default',
save_path=None,
chrome_path=None,
show_msg=True,
check_version=True):
"""自动识别chrome版本并下载匹配的driver
:param ini_path: 要读取和修改的ini文件路径
:param save_path: chromedriver保存路径
:param chrome_path: 指定chrome.exe位置
:param show_msg: 是否打印信息
:param check_version: 是否检查版本匹配
:return: None
"""
save_path = save_path or str(Path(__file__).parent)
chrome_path = chrome_path or get_chrome_path(ini_path, show_msg)
chrome_path = Path(chrome_path).absolute() if chrome_path else None
if show_msg:
print('chrome.exe路径', chrome_path)
ver = _get_chrome_version(str(chrome_path))
if show_msg:
print('version', ver)
zip_path = _download_driver(ver, save_path, show_msg=show_msg)
if not zip_path and show_msg:
print('没有找到对应版本的driver。')
try:
driver_path = unzip(zip_path, save_path)[0]
except TypeError:
driver_path = None
if show_msg:
print('解压路径', driver_path)
if driver_path:
Path(zip_path).unlink()
if ini_path:
set_paths(driver_path=driver_path, chrome_path=str(chrome_path), ini_path=ini_path, check_version=False)
if check_version:
if not check_driver_version(driver_path, chrome_path) and show_msg:
print('获取失败,请手动配置。')
else:
if show_msg:
print('获取失败,请手动配置。')
return driver_path
def get_chrome_path(ini_path=None,
show_msg=True,
from_ini=True,
@ -365,54 +257,3 @@ def get_chrome_path(ini_path=None,
return str(path)
except OSError:
pass
def _get_chrome_version(path: str) -> Union[str, None]:
"""根据文件路径获取版本号
:param path: chrome.exe文件路径
:return: 版本号
"""
if not path:
return
path = str(path).replace('\\', '\\\\')
try:
return (popen(f'wmic datafile where "name=\'{path}\'" get version').read()
.lower().split('\n')[2].replace(' ', ''))
except Exception:
return None
def _download_driver(version: str, save_path: str = None, show_msg: bool = True) -> Union[str, None]:
"""根据传入的版本号到镜像网站查找,下载最相近的
:param version: 本地版本号
:return: 保存地址
"""
if not version:
return
main_ver = version.split('.')[0]
remote_ver = None
page = SessionPage(Drission().session)
page.get('https://registry.npmmirror.com/-/binary/chromedriver/')
for version in page.json:
# 遍历所有版本跳过大版本不一致的如果有完全匹配的获取url如果没有获取最后一个版本的url
if not version['name'].startswith(f'{main_ver}.'):
continue
remote_ver = version['name']
if version['name'] == f'{version}/':
break
if remote_ver:
url = f'https://cdn.npmmirror.com/binaries/chromedriver/{remote_ver}chromedriver_win32.zip'
save_path = save_path or str(Path(__file__).parent)
result = page.download(url, save_path, file_exists='overwrite', show_msg=show_msg)
if result[0]:
return result[1]
return None

View File

@ -16,16 +16,13 @@ def configs_to_here(file_name: Union[Path, str] = None) -> None: ...
def show_settings(ini_path: Union[str, Path] = None) -> None: ...
def set_paths(driver_path: Union[str, Path] = None,
chrome_path: Union[str, Path] = None,
browser_path: Union[str, Path] = None,
def set_paths(browser_path: Union[str, Path] = None,
local_port: Union[int, str] = None,
debugger_address: str = None,
download_path: Union[str, Path] = None,
user_data_path: Union[str, Path] = None,
cache_path: Union[str, Path] = None,
ini_path: Union[str, Path] = None,
check_version: bool = False) -> None: ...
ini_path: Union[str, Path] = None) -> None: ...
def use_auto_port(on_off: bool = True, ini_path: Union[str, Path] = None) -> None: ...
@ -55,17 +52,6 @@ def set_user_agent(user_agent: str, ini_path: Union[str, Path] = None) -> None:
def set_proxy(proxy: str, ini_path: Union[str, Path] = None) -> None: ...
def check_driver_version(driver_path: Union[str, Path] = None, chrome_path: str = None) -> bool: ...
# -------------------------自动识别chrome版本号并下载对应driver------------------------
def get_match_driver(ini_path: Union[str, None] = 'default',
save_path: str = None,
chrome_path: str = None,
show_msg: bool = True,
check_version: bool = True) -> Union[str, None]: ...
def get_chrome_path(ini_path: str = None,
show_msg: bool = True,
from_ini: bool = True,

View File

@ -24,7 +24,7 @@ class ElementLossError(BaseError):
_info = '元素对象因刷新已失效。'
class CallMethodError(BaseError):
class CDPError(BaseError):
_info = '方法调用错误。'
@ -54,3 +54,7 @@ class NoResourceError(BaseError):
class CanNotClickError(BaseError):
_info = '该元素无法滚动到视口或被遮挡,无法点击。'
class GetDocumentError(BaseError):
_info = '获取文档失败。'

View File

@ -0,0 +1,325 @@
# -*- coding:utf-8 -*-
from base64 import b64decode
from json import JSONDecodeError, loads
from queue import Queue
from re import search
from threading import Thread
from time import perf_counter, sleep
from requests.structures import CaseInsensitiveDict
from .errors import CDPError
class NetworkListener(object):
"""监听器基类"""
def __init__(self, page):
"""
:param page: ChromiumBase对象
"""
self._page = page
self._driver = self._page.driver
self._tmp = None # 临存捕捉到的数据
self._request_ids = None # 暂存须要拦截的请求id
self._total_count = None # 当次监听的数量上限
self._caught_count = None # 当次已监听到的数量
self._begin_time = None # 当次监听开始时间
self._timeout = None # 当次监听超时时间
self.listening = False
self._targets = None # 默认监听所有
self.tab_id = None # 当前tab的id
self._results = []
self._is_regex = False
self._method = None
def set_targets(self, targets=True, is_regex=False, method=None):
"""指定要等待的数据包
:param targets: 要匹配的数据包url特征可用list等传入多个为True时获取所有
:param is_regex: 设置的target是否正则表达式
:param method: 设置监听的请求类型可用list等指定多个为None时监听全部
:return: None
"""
if targets is not None:
if not isinstance(targets, (str, list, tuple, set)) and targets is not True:
raise TypeError('targets只能是str、list、tuple、set、True。')
if targets is True:
targets = ''
if isinstance(targets, str):
self._targets = {targets}
else:
self._targets = set(targets)
self._is_regex = is_regex
if method is not None:
if isinstance(method, str):
self._method = {method.upper()}
elif isinstance(method, (list, tuple, set)):
self._method = set(i.upper() for i in method)
else:
raise TypeError('method参数只能是str、list、tuple、set类型。')
def listen(self, targets=None, count=None, timeout=None):
"""拦截目标请求,直到超时或达到拦截个数,每次拦截前清空结果
可监听多个目标请求url包含这些字符串就会被记录
:param targets: 要监听的目标字符串或其组成的列表True监听所有None则保留之前的目标不变
:param count: 要记录的个数到达个数停止监听
:param timeout: 监听最长时间到时间即使未达到记录个数也停止None为无限长
:return: None
"""
if targets:
self.set_targets(targets)
self.listening = True
self._results = []
self._request_ids = {}
self._tmp = Queue(maxsize=0)
self._caught_count = 0
self._begin_time = perf_counter()
self._timeout = timeout
self._set_callback_func()
self._total_count = len(self._targets) if not count else count
Thread(target=self._wait_to_stop).start()
def stop(self):
"""停止监听"""
self._stop()
self.listening = False
def wait(self):
"""等待监听结束"""
while self.listening:
sleep(.2)
return self._results
def get_results(self, target=None):
"""获取结果列表
:param target: 要获取的目标为None时获取全部
:return: 结果数据组成的列表
"""
return self._results if target is None else [i for i in self._results if i.target == target]
def _wait_to_stop(self):
"""当收到停止信号、到达须获取结果数、到时间就停止"""
while self._is_continue():
sleep(.2)
self.stop()
def _is_continue(self):
"""是否继续当前监听"""
return self.listening \
and (self._total_count is None or self._caught_count < self._total_count) \
and (self._timeout is None or perf_counter() - self._begin_time < self._timeout)
def steps(self, gap=1):
"""用于单步操作,可实现没收到若干个数据包执行一步操作(如翻页)
:param gap: 每接收到多少个数据包触发
:return: 用于在接收到监听目标时触发动作的可迭代对象
"""
if not isinstance(gap, int) or gap < 1:
raise ValueError('gap参数必须为大于0的整数。')
while self.listening or not self._tmp.empty():
while self._tmp.qsize() >= gap:
yield self._tmp.get(False) if gap == 1 else [self._tmp.get(False) for _ in range(gap)]
sleep(.1)
def _set_callback_func(self):
"""设置监听请求的回调函数"""
self._driver.set_listener('Network.requestWillBeSent', self._requestWillBeSent)
self._driver.set_listener('Network.responseReceived', self._response_received)
self._driver.set_listener('Network.loadingFinished', self._loading_finished)
self._driver.set_listener('Network.loadingFailed', self._loading_failed)
self._driver.call_method('Network.enable')
def _stop(self) -> None:
"""停止监听前要做的工作"""
self._driver.set_listener('Network.requestWillBeSent', None)
self._driver.set_listener('Network.responseReceived', None)
self._driver.set_listener('Network.loadingFinished', None)
self._driver.set_listener('Network.loadingFailed', None)
# self._driver.call_method('Network.disable')
def _requestWillBeSent(self, **kwargs):
"""接收到请求时的回调函数"""
for target in self._targets:
if ((self._is_regex and search(target, kwargs['request']['url'])) or
(not self._is_regex and target in kwargs['request']['url'])) and (
not self._method or kwargs['request']['method'] in self._method):
self._request_ids[kwargs['requestId']] = DataPacket(self._page.tab_id, target, kwargs)
if kwargs['request'].get('hasPostData', None) and not kwargs['request'].get('postData', None):
self._request_ids[kwargs['requestId']]._raw_post_data = \
self._page.run_cdp('Network.getRequestPostData', requestId=kwargs['requestId'])['postData']
break
def _response_received(self, **kwargs):
"""接收到返回信息时处理方法"""
request_id = kwargs['requestId']
if request_id in self._request_ids:
self._request_ids[request_id]._raw_response = kwargs['response']
self._request_ids[request_id]._resource_type = kwargs['type']
def _loading_finished(self, **kwargs):
"""请求完成时处理方法"""
request_id = kwargs['requestId']
if request_id in self._request_ids:
try:
r = self._page.run_cdp('Network.getResponseBody', requestId=request_id)
body = r['body']
is_base64 = r['base64Encoded']
except CDPError:
body = ''
is_base64 = False
dp = self._request_ids[request_id]
dp._raw_body = body
dp._base64_body = is_base64
self._tmp.put(dp)
self._results.append(dp)
self._caught_count += 1
def _loading_failed(self, **kwargs):
"""请求失败时的回调方法"""
request_id = kwargs['requestId']
if request_id in self._request_ids:
dp = self._request_ids[request_id]
dp.errorText = kwargs['errorText']
dp._resource_type = kwargs['type']
self._tmp.put(dp)
self._results.append(dp)
self._caught_count += 1
class DataPacket(object):
"""返回的数据包管理类"""
def __init__(self, tab, target, raw_request):
"""
:param tab: 产生这个数据包的tab的id
:param target: 监听目标
:param raw_request: 原始request数据从cdp获得
"""
self.tab = tab
self.target = target
self._raw_request = raw_request
self._raw_post_data = None
self._raw_response = None
self._raw_body = None
self._base64_body = False
self._request = None
self._response = None
self.errorText = None
self._resource_type = None
@property
def url(self):
return self.request.url
@property
def method(self):
return self.request.method
@property
def frameId(self):
return self._raw_request.get('frameId')
@property
def resourceType(self):
return self._resource_type
@property
def request(self):
if self._request is None:
self._request = Request(self._raw_request['request'], self._raw_post_data)
return self._request
@property
def response(self):
if self._response is None:
self._response = Response(self._raw_response, self._raw_body, self._base64_body)
return self._response
class Request(object):
def __init__(self, raw_request, post_data):
self._request = raw_request
self._raw_post_data = post_data
self._postData = None
self._headers = None
def __getattr__(self, item):
return self._request.get(item, None)
@property
def headers(self):
"""以大小写不敏感字典返回headers数据"""
if self._headers is None:
self._headers = CaseInsensitiveDict(self._request['headers'])
return self._headers
@property
def postData(self):
"""返回postData数据"""
if self._postData is None:
if self._raw_post_data:
postData = self._raw_post_data
elif self._request.get('postData', None):
postData = self._request['postData']
else:
postData = False
try:
self._postData = loads(postData)
except (JSONDecodeError, TypeError):
self._postData = postData
return self._postData
class Response(object):
def __init__(self, raw_response, raw_body, base64_body):
self._response = raw_response
self._raw_body = raw_body
self._is_base64_body = base64_body
self._body = None
self._headers = None
def __getattr__(self, item):
return self._response.get(item, None)
@property
def headers(self):
"""以大小写不敏感字典返回headers数据"""
if self._headers is None:
self._headers = CaseInsensitiveDict(self._response['headers'])
return self._headers
@property
def body(self):
"""返回body内容如果是json格式自动进行转换如果时图片格式进行base64转换其它格式直接返回文本"""
if self._body is None:
if self._is_base64_body:
self._body = b64decode(self._raw_body)
else:
try:
self._body = loads(self._raw_body)
except (JSONDecodeError, TypeError):
self._body = self._raw_body
return self._body

View File

@ -0,0 +1,140 @@
from queue import Queue
from typing import Union, Dict, List, Iterable, Tuple
from requests.structures import CaseInsensitiveDict
from chromium_base import ChromiumBase
from chromium_driver import ChromiumDriver
class NetworkListener(object):
def __init__(self, page: ChromiumBase):
self._page: ChromiumBase = ...
self._total_count: int = ...
self._caught_count: int = ...
self._targets: Union[str, dict] = ...
self._results: list = ...
self._method: set = ...
self._tmp: Queue = ...
self._is_regex: bool = ...
self._driver: ChromiumDriver = ...
self._request_ids: dict = ...
self.listening: bool = ...
self._timeout: float = ...
self._begin_time: float = ...
def set_targets(self, targets: Union[str, list, tuple, set, None] = None, is_regex: bool = False,
count: int = None, method: Union[str, list, tuple, set] = None) -> None: ...
def stop(self) -> None: ...
@property
def results(self) -> Union[DataPacket, Dict[str, List[DataPacket]], False]: ...
def clear(self) -> None: ...
def listen(self, targets: Union[str, List[str], Tuple, bool, None] = ..., count: int = ...,
timeout: float = ...) -> Union[DataPacket, Dict[str, List[DataPacket]], False]: ...
def _listen(self, timeout: float = None,
any_one: bool = False) -> Union[DataPacket, Dict[str, List[DataPacket]], False]: ...
def _requestWillBeSent(self, **kwargs) -> None: ...
def _response_received(self, **kwargs) -> None: ...
def _loading_finished(self, **kwargs) -> None: ...
def _loading_failed(self, **kwargs) -> None: ...
def _request_paused(self, **kwargs) -> None: ...
def _wait_to_stop(self) -> None: ...
def _is_continue(self) -> bool: ...
def steps(self, gap=1) -> Iterable[Union[DataPacket, List[DataPacket]]]: ...
def _set_callback_func(self) -> None: ...
def _stop(self) -> None: ...
class DataPacket(object):
"""返回的数据包管理类"""
def __init__(self, tab: str, target: str, raw_info: dict):
self.tab: str = ...
self.target: str = ...
self._raw_request: dict = ...
self._raw_response: dict = ...
self._raw_post_data: str = ...
self._raw_body: str = ...
self._base64_body: bool = ...
self._request: Request = ...
self._response: Response = ...
self.errorText: str = ...
self._resource_type: str = ...
@property
def url(self) -> str: ...
@property
def method(self) -> str: ...
@property
def frameId(self) -> str: ...
@property
def resourceType(self) -> str: ...
@property
def request(self) -> Request: ...
@property
def response(self) -> Response: ...
class Request(object):
url: str = ...
_headers: Union[CaseInsensitiveDict, None] = ...
method: str = ...
# urlFragment: str = ...
# postDataEntries: list = ...
# mixedContentType: str = ...
# initialPriority: str = ...
# referrerPolicy: str = ...
# isLinkPreload: bool = ...
# trustTokenParams: dict = ...
# isSameSite: bool = ...
def __init__(self, raw_request: dict, post_data: str):
self._request: dict = ...
self._raw_post_data: str = ...
self._postData: str = ...
@property
def headers(self) -> dict: ...
@property
def postData(self) -> Union[str, dict]: ...
class Response(object):
status: str = ...
statusText: int = ...
mimeType: str = ...
def __init__(self, raw_response: dict, raw_body: str, base64_body: bool):
self._response: dict = ...
self._raw_body: str = ...
self._is_base64_body: bool = ...
self._body: Union[str, dict] = ...
self._headers: dict = ...
@property
def headers(self) -> CaseInsensitiveDict: ...
@property
def body(self) -> Union[str, dict, bool]: ...

View File

@ -38,7 +38,7 @@ class SessionElement(DrissionElement):
"""在内部查找元素
ele2 = ele1('@id=ele_id')
:param loc_or_str: 元素的定位信息可以是loc元组或查询字符串
:param timeout: 不起实际作用用于和DriverElement对应便于无差别调用
:param timeout: 不起实际作用
:return: SessionElement对象或属性文本
"""
return self.ele(loc_or_str)
@ -75,12 +75,13 @@ class SessionElement(DrissionElement):
"""返回未格式化处理的元素内文本"""
return str(self._inner_ele.text_content())
def parent(self, level_or_loc=1):
def parent(self, level_or_loc=1, index=1):
"""返回上面某一级父元素,可指定层数或用查询语法定位
:param level_or_loc: 第几级父元素或定位符
:param index: 当level_or_loc传入定位符使用此参数选择第几个结果
:return: 上级元素对象
"""
return super().parent(level_or_loc)
return super().parent(level_or_loc, index)
def child(self, filter_loc='', index=1, timeout=None, ele_only=True):
"""返回当前元素的一个符合条件的直接子元素,可用查询语法筛选,可指定返回筛选结果的第几个
@ -90,7 +91,7 @@ class SessionElement(DrissionElement):
:param ele_only: 是否只获取元素为False时把文本注释节点也纳入
:return: 直接子元素或节点文本
"""
return super().child(index, filter_loc, timeout, ele_only=ele_only)
return super().child(filter_loc, index, timeout, ele_only=ele_only)
def prev(self, filter_loc='', index=1, timeout=None, ele_only=True):
"""返回当前元素前面一个符合条件的同级元素,可用查询语法筛选,可指定返回筛选结果的第几个
@ -100,7 +101,7 @@ class SessionElement(DrissionElement):
:param ele_only: 是否只获取元素为False时把文本注释节点也纳入
:return: 同级元素
"""
return super().prev(index, filter_loc, timeout, ele_only=ele_only)
return super().prev(filter_loc, index, timeout, ele_only=ele_only)
def next(self, filter_loc='', index=1, timeout=None, ele_only=True):
"""返回当前元素后面一个符合条件的同级元素,可用查询语法筛选,可指定返回筛选结果的第几个
@ -110,7 +111,7 @@ class SessionElement(DrissionElement):
:param ele_only: 是否只获取元素为False时把文本注释节点也纳入
:return: 同级元素
"""
return super().next(index, filter_loc, timeout, ele_only=ele_only)
return super().next(filter_loc, index, timeout, ele_only=ele_only)
def before(self, filter_loc='', index=1, timeout=None, ele_only=True):
"""返回文档中当前元素前面符合条件的第一个元素,可用查询语法筛选,可指定返回筛选结果的第几个
@ -121,7 +122,7 @@ class SessionElement(DrissionElement):
:param ele_only: 是否只获取元素为False时把文本注释节点也纳入
:return: 本元素前面的某个元素或节点
"""
return super().before(index, filter_loc, timeout, ele_only=ele_only)
return super().before(filter_loc, index, timeout, ele_only=ele_only)
def after(self, filter_loc='', index=1, timeout=None, ele_only=True):
"""返回文档中此当前元素后面符合条件的第一个元素,可用查询语法筛选,可指定返回筛选结果的第几个
@ -132,7 +133,7 @@ class SessionElement(DrissionElement):
:param ele_only: 是否只获取元素为False时把文本注释节点也纳入
:return: 本元素后面的某个元素或节点
"""
return super().after(index, filter_loc, timeout, ele_only=ele_only)
return super().after(filter_loc, index, timeout, ele_only=ele_only)
def children(self, filter_loc='', timeout=0, ele_only=True):
"""返回当前元素符合条件的直接子元素或节点组成的列表,可用查询语法筛选
@ -217,7 +218,7 @@ class SessionElement(DrissionElement):
def ele(self, loc_or_str, timeout=None):
"""返回当前元素下级符合条件的第一个元素、属性或节点文本
:param loc_or_str: 元素的定位信息可以是loc元组或查询字符串
:param timeout: 不起实际作用用于和DriverElement对应便于无差别调用
:param timeout: 不起实际作用
:return: SessionElement对象或属性文本
"""
return self._ele(loc_or_str)
@ -225,7 +226,7 @@ class SessionElement(DrissionElement):
def eles(self, loc_or_str, timeout=None):
"""返回当前元素下级所有符合条件的子元素、属性或节点文本
:param loc_or_str: 元素的定位信息可以是loc元组或查询字符串
:param timeout: 不起实际作用用于和DriverElement对应便于无差别调用
:param timeout: 不起实际作用
:return: SessionElement对象或属性文本组成的列表
"""
return self._ele(loc_or_str, single=False)
@ -321,8 +322,7 @@ def make_session_ele(html_or_ele, loc=None, single=True):
loc = loc[0], loc_str
# ChromiumElement, DriverElement
elif the_type.endswith((".ChromiumElement'>", ".DriverElement'>")):
elif the_type.endswith(".ChromiumElement'>"):
loc_str = loc[1]
if loc[0] == 'xpath' and loc[1].lstrip().startswith('/'):
loc_str = f'.{loc[1]}'

View File

@ -12,8 +12,6 @@ from .chromium_base import ChromiumBase
from .chromium_element import ChromiumElement
from .chromium_frame import ChromiumFrame
from .commons.constants import NoneElement
from mixpage.driver_element import DriverElement
from mixpage.driver_page import DriverPage
from .session_page import SessionPage
@ -50,29 +48,29 @@ class SessionElement(DrissionElement):
@property
def raw_text(self) -> str: ...
def parent(self, level_or_loc: Union[tuple, str, int] = 1) -> Union['SessionElement', None]: ...
def parent(self, level_or_loc: Union[tuple, str, int] = 1, index: int = 1) -> Union['SessionElement', None]: ...
def child(self, filter_loc: Union[tuple, str] = '',
def child(self, filter_loc: Union[tuple, str, int] = '',
index: int = 1,
timeout: float = None,
ele_only: bool = True) -> Union['SessionElement', str, None]: ...
def prev(self, filter_loc: Union[tuple, str] = '',
def prev(self, filter_loc: Union[tuple, str, int] = '',
index: int = 1,
timeout: float = None,
ele_only: bool = True) -> Union['SessionElement', str, None]: ...
def next(self, filter_loc: Union[tuple, str] = '',
def next(self, filter_loc: Union[tuple, str, int] = '',
index: int = 1,
timeout: float = None,
ele_only: bool = True) -> Union['SessionElement', str, None]: ...
def before(self, filter_loc: Union[tuple, str] = '',
def before(self, filter_loc: Union[tuple, str, int] = '',
index: int = 1,
timeout: float = None,
ele_only: bool = True) -> Union['SessionElement', str, None]: ...
def after(self, filter_loc: Union[tuple, str] = '',
def after(self, filter_loc: Union[tuple, str, int] = '',
index: int = 1,
timeout: float = None,
ele_only: bool = True) -> Union['SessionElement', str, None]: ...
@ -124,8 +122,8 @@ class SessionElement(DrissionElement):
def _get_ele_path(self, mode: str) -> str: ...
def make_session_ele(html_or_ele: Union[str, SessionElement, SessionPage, ChromiumElement, DriverElement, BaseElement,
ChromiumFrame, ChromiumBase, DriverPage],
def make_session_ele(html_or_ele: Union[str, SessionElement, SessionPage, ChromiumElement, BaseElement, ChromiumFrame,
ChromiumBase],
loc: Union[str, Tuple[str, str]] = None,
single: bool = True) -> Union[
SessionElement, str, NoneElement, List[Union[SessionElement, str]]]: ...

View File

@ -7,7 +7,6 @@ from re import search
from time import sleep
from urllib.parse import urlparse
from DownloadKit import DownloadKit
from requests import Session, Response
from requests.structures import CaseInsensitiveDict
from tldextract import extract
@ -27,7 +26,6 @@ class SessionPage(BasePage):
:param timeout: 连接超时时间为None时从ini文件读取
"""
self._response = None
self._download_set = None
self._session = None
self._set = None
self._set_start_options(session_or_options, None)
@ -99,21 +97,9 @@ class SessionPage(BasePage):
return None
@property
def download_path(self):
"""返回下载路径"""
return self._download_path
@property
def download_set(self):
"""返回用于设置下载参数的对象"""
if self._download_set is None:
self._download_set = DownloadSetter(self)
return self._download_set
@property
def download(self):
"""返回下载器对象"""
return self.download_set.DownloadKit
def user_agent(self):
"""返回user agent"""
return self.session.headers.get('user-agent', '')
@property
def session(self):
@ -337,8 +323,18 @@ class SessionPageSetter(object):
"""
self._page.timeout = second
def cookie(self, cookie):
"""为Session对象设置单个cookie
:param cookie: cookie信息
:return: None
"""
if isinstance(cookie, str):
self.cookies(cookie)
else:
self.cookies([cookie])
def cookies(self, cookies):
"""为Session对象设置cookies
"""为Session对象设置多个cookie注意不要传入单个
:param cookies: cookies信息
:return: None
"""
@ -366,14 +362,13 @@ class SessionPageSetter(object):
"""
self._page.session.headers['user-agent'] = ua
def proxies(self, http, https=None):
def proxies(self, http=None, https=None):
"""设置proxies参数
:param http: http代理地址
:param https: https代理地址
:return: None
"""
proxies = None if http == https is None else {'http': http, 'https': https or http}
self._page.session.proxies = proxies
self._page.session.proxies = {'http': http, 'https': https}
def auth(self, auth):
"""设置认证元组或对象
@ -440,68 +435,6 @@ class SessionPageSetter(object):
self._page.session.mount(url, adapter)
class DownloadSetter(object):
"""用于设置下载参数的类"""
def __init__(self, page):
self._page = page
self._DownloadKit = None
@property
def DownloadKit(self):
if self._DownloadKit is None:
self._DownloadKit = DownloadKit(session=self._page, goal_path=self._page.download_path)
return self._DownloadKit
@property
def if_file_exists(self):
"""返回用于设置存在同名文件时处理方法的对象"""
return FileExists(self)
def split(self, on_off):
"""设置是否允许拆分大文件用多线程下载
:param on_off: 是否启用多线程下载大文件
:return: None
"""
self.DownloadKit.split = on_off
def save_path(self, path):
"""设置下载保存路径
:param path: 下载保存路径
:return: None
"""
path = path if path is None else str(path)
self._page._download_path = path
self.DownloadKit.goal_path = path
class FileExists(object):
"""用于设置存在同名文件时处理方法"""
def __init__(self, setter):
"""
:param setter: DownloadSetter对象
"""
self._setter = setter
def __call__(self, mode):
if mode not in ('skip', 'rename', 'overwrite'):
raise ValueError("mode参数只能是'skip', 'rename', 'overwrite'")
self._setter.DownloadKit.file_exists = mode
def skip(self):
"""设为跳过"""
self._setter.DownloadKit.file_exists = 'skip'
def rename(self):
"""设为重命名,文件名后加序号"""
self._setter.DownloadKit._file_exists = 'rename'
def overwrite(self):
"""设为覆盖"""
self._setter.DownloadKit._file_exists = 'overwrite'
def check_headers(kwargs, headers, arg) -> bool:
"""检查kwargs或headers中是否有arg所示属性"""
return arg in kwargs['headers'] or arg in headers

View File

@ -3,22 +3,20 @@
@Author : g1879
@Contact : g1879@qq.com
"""
from pathlib import Path
from http.cookiejar import Cookie
from typing import Any, Union, Tuple, List
from DownloadKit import DownloadKit
# from DownloadKit import DownloadKit
from requests import Session, Response
from requests.adapters import HTTPAdapter
from requests.auth import HTTPBasicAuth
from requests.cookies import RequestsCookieJar
from requests.structures import CaseInsensitiveDict
from .commons.constants import NoneElement
from .base import BasePage
from .chromium_page import ChromiumPage
from .commons.constants import NoneElement
from .configs.session_options import SessionOptions
from .session_element import SessionElement
from .web_page import WebPage
class SessionPage(BasePage):
@ -29,8 +27,8 @@ class SessionPage(BasePage):
self._session_options: SessionOptions = ...
self._url: str = ...
self._response: Response = ...
self._download_path: str = ...
self._download_set: DownloadSetter = ...
# self._download_path: str = ...
# self._DownloadKit: DownloadKit = ...
self._url_available: bool = ...
self.timeout: float = ...
self.retry_times: int = ...
@ -64,10 +62,10 @@ class SessionPage(BasePage):
def json(self) -> Union[dict, None]: ...
@property
def download_path(self) -> str: ...
def user_agent(self) -> str: ...
@property
def download_set(self) -> DownloadSetter: ...
def download_path(self) -> str: ...
def get(self,
url: str,
@ -120,8 +118,8 @@ class SessionPage(BasePage):
@property
def set(self) -> SessionPageSetter: ...
@property
def download(self) -> DownloadKit: ...
# @property
# def download(self) -> DownloadKit: ...
def post(self,
url: str,
@ -172,6 +170,8 @@ class SessionPageSetter(object):
def timeout(self, second: float) -> None: ...
def cookie(self, cookie: Union[Cookie, str, dict]) -> None: ...
def cookies(self, cookies: Union[RequestsCookieJar, list, tuple, str, dict]) -> None: ...
def headers(self, headers: dict) -> None: ...
@ -180,7 +180,7 @@ class SessionPageSetter(object):
def user_agent(self, ua: str) -> None: ...
def proxies(self, http, https=None) -> None: ...
def proxies(self, http: str = None, https: str = None) -> None: ...
def auth(self, auth: Union[Tuple[str, str], HTTPBasicAuth, None]) -> None: ...
@ -201,35 +201,6 @@ class SessionPageSetter(object):
def add_adapter(self, url: str, adapter: HTTPAdapter) -> None: ...
class DownloadSetter(object):
def __init__(self, page: Union[SessionPage, WebPage, ChromiumPage]):
self._page: SessionPage = ...
self._DownloadKit: DownloadKit = ...
@property
def DownloadKit(self) -> DownloadKit: ...
@property
def if_file_exists(self) -> FileExists: ...
def split(self, on_off: bool) -> None: ...
def save_path(self, path: Union[str, Path]): ...
class FileExists(object):
def __init__(self, setter: DownloadSetter):
self._setter: DownloadSetter = ...
def __call__(self, mode: str) -> None: ...
def skip(self) -> None: ...
def rename(self) -> None: ...
def overwrite(self) -> None: ...
def check_headers(kwargs: Union[dict, CaseInsensitiveDict], headers: Union[dict, CaseInsensitiveDict],
arg: str) -> bool: ...

View File

@ -3,20 +3,16 @@
@Author : g1879
@Contact : g1879@qq.com
"""
from pathlib import Path
from warnings import warn
from requests import Session
from .base import BasePage
from .chromium_base import ChromiumBase, Timeout
from .chromium_driver import ChromiumDriver
from .chromium_page import ChromiumPage, ChromiumDownloadSetter, ChromiumPageSetter
from .chromium_page import ChromiumPage, ChromiumPageSetter
from .chromium_tab import WebPageTab
from .commons.web import set_session_cookies, set_browser_cookies
from .configs.chromium_options import ChromiumOptions
from .configs.session_options import SessionOptions
from .errors import CallMethodError
from .session_page import SessionPage, SessionPageSetter
@ -27,7 +23,7 @@ class WebPage(SessionPage, ChromiumPage, BasePage):
"""初始化函数
:param mode: 'd' 's'即driver模式和session模式
:param timeout: 超时时间d模式时为寻找元素时间s模式时为连接时间默认10秒
:param driver_or_options: ChromiumDriver对象或DriverOptions对象只使用s模式时应传入False
:param driver_or_options: ChromiumDriver对象只使用s模式时应传入False
:param session_or_options: Session对象或SessionOptions对象只使用d模式时应传入False
"""
self._mode = mode.lower()
@ -45,7 +41,6 @@ class WebPage(SessionPage, ChromiumPage, BasePage):
self._driver_options = None
self._session_options = None
self._response = None
self._download_set = None
self._set = None
self._screencast = None
@ -59,7 +54,7 @@ class WebPage(SessionPage, ChromiumPage, BasePage):
def _set_start_options(self, dr_opt, se_opt):
"""处理两种模式的设置
:param dr_opt: ChromiumDriver或DriverOptions对象为None则从ini读取为False用默认信息创建
:param dr_opt: ChromiumDriver或ChromiumOptions对象为None则从ini读取为False用默认信息创建
:param se_opt: SessionSessionOptions对象或配置信息为None则从ini读取为False用默认信息创建
:return: None
"""
@ -77,7 +72,7 @@ class WebPage(SessionPage, ChromiumPage, BasePage):
elif dr_opt is False:
self._driver_options = ChromiumOptions(read_file=False)
elif str(type(dr_opt)).endswith(("ChromiumOptions'>", "DriverOptions'>")):
elif isinstance(dr_opt, ChromiumOptions):
self._driver_options = dr_opt
else:
@ -107,7 +102,6 @@ class WebPage(SessionPage, ChromiumPage, BasePage):
self._timeouts = Timeout(self)
self._page_load_strategy = self._driver_options.page_load_strategy
self._download_path = None
if se_opt is not False:
self.set.timeouts(implicit=self._session_options.timeout)
@ -186,6 +180,14 @@ class WebPage(SessionPage, ChromiumPage, BasePage):
"""以dict方式返回cookies"""
return super().cookies
@property
def user_agent(self):
"""返回user agent"""
if self._mode == 's':
return super().user_agent
elif self._mode == 'd':
return super(SessionPage, self).user_agent
@property
def session(self):
"""返回Session对象如未初始化则按配置信息创建"""
@ -211,23 +213,6 @@ class WebPage(SessionPage, ChromiumPage, BasePage):
"""
self.set.timeouts(implicit=second)
@property
def download_path(self):
"""返回默认下载路径"""
return super(SessionPage, self).download_path
@property
def download_set(self):
"""返回下载设置对象"""
if self._download_set is None:
self._download_set = WebPageDownloadSetter(self)
return self._download_set
@property
def download(self):
"""返回下载器对象"""
return self.download_set._switched_DownloadKit
@property
def set(self):
"""返回用于等待的对象"""
@ -360,20 +345,15 @@ class WebPage(SessionPage, ChromiumPage, BasePage):
return
if copy_user_agent:
selenium_user_agent = self.run_cdp('Runtime.evaluate', expression='navigator.userAgent;')['result']['value']
self.session.headers.update({"User-Agent": selenium_user_agent})
user_agent = self.run_cdp('Runtime.evaluate', expression='navigator.userAgent;')['result']['value']
self.session.headers.update({"User-Agent": user_agent})
# set_session_cookies(self.session, self._get_driver_cookies(as_dict=True))
# set_session_cookies(self.session, self._get_driver_cookies(all_domains=True))
set_session_cookies(self.session, self._get_driver_cookies())
set_session_cookies(self.session, super(SessionPage, self).get_cookies())
def cookies_to_browser(self):
"""把session对象的cookies复制到浏览器"""
if not self._has_driver:
return
# set_browser_cookies(self, super().get_cookies(as_dict=True))
# set_browser_cookies(self, super().get_cookies(all_domains=True))
set_browser_cookies(self, super().get_cookies())
def get_cookies(self, as_dict=False, all_domains=False, all_info=False):
@ -386,7 +366,7 @@ class WebPage(SessionPage, ChromiumPage, BasePage):
if self._mode == 's':
return super().get_cookies(as_dict, all_domains, all_info)
elif self._mode == 'd':
return self._get_driver_cookies(as_dict, all_info)
return super(SessionPage, self).get_cookies(as_dict, all_domains, all_info)
def get_tab(self, tab_id=None):
"""获取一个标签页对象
@ -396,21 +376,6 @@ class WebPage(SessionPage, ChromiumPage, BasePage):
tab_id = tab_id or self.tab_id
return WebPageTab(self, tab_id)
def _get_driver_cookies(self, as_dict=False, all_info=False):
"""获取浏览器cookies
:param as_dict: 是否以dict形式返回为True时all_info无效
:param all_info: 是否返回所有信息
:return: cookies信息
"""
cookies = self.run_cdp('Network.getCookies')['cookies']
if as_dict:
return {cookie['name']: cookie['value'] for cookie in cookies}
elif all_info:
return cookies
else:
return [{'name': cookie['name'], 'value': cookie['value'], 'domain': cookie['domain']}
for cookie in cookies]
def close_driver(self):
"""关闭driver及浏览器"""
if self._has_driver:
@ -493,66 +458,3 @@ class WebPageSetter(ChromiumPageSetter):
self._session_setter.user_agent(ua)
else:
self._chromium_setter.user_agent(ua, platform)
class WebPageDownloadSetter(ChromiumDownloadSetter):
"""用于设置下载参数的类"""
def __init__(self, page):
super().__init__(page)
self._session = page.session
@property
def _switched_DownloadKit(self):
"""返回从浏览器同步cookies后的Session对象"""
if self._page.mode == 'd':
self._cookies_to_session()
return self.DownloadKit
def save_path(self, path):
"""设置下载路径
:param path: 下载路径
:return: None
"""
path = path or ''
path = Path(path).absolute()
path.mkdir(parents=True, exist_ok=True)
path = str(path)
self._page._download_path = path
self.DownloadKit.goal_path = path
if self._page._has_driver:
try:
self._page.browser_driver.Browser.setDownloadBehavior(behavior=self._behavior, downloadPath=path,
eventsEnabled=True)
except CallMethodError:
warn('\n您的浏览器版本太低,用新标签页下载文件可能崩溃,建议升级。')
self._page.run_cdp('Page.setDownloadBehavior', behavior=self._behavior, downloadPath=path)
def by_browser(self):
"""设置使用浏览器下载文件"""
if not self._page._has_driver:
raise RuntimeError('浏览器未连接。')
try:
self._page.browser_driver.Browser.setDownloadBehavior(behavior='allow', eventsEnabled=True,
downloadPath=self._page.download_path)
self._page.browser_driver.Browser.downloadWillBegin = self._download_by_browser
except CallMethodError:
warn('\n您的浏览器版本太低,用新标签页下载文件可能崩溃,建议升级。')
self._page.driver.Page.setDownloadBehavior(behavior='allow', downloadPath=self._page.download_path)
self._page.driver.Page.downloadWillBegin = self._download_by_browser
self._behavior = 'allow'
def by_DownloadKit(self):
"""设置使用DownloadKit下载文件"""
if self._page._has_driver:
try:
self._page.browser_driver.Browser.setDownloadBehavior(behavior='deny', eventsEnabled=True)
self._page.browser_driver.Browser.downloadWillBegin = self._download_by_DownloadKit
except CallMethodError:
raise RuntimeError('您的浏览器版本太低,不支持此方法,请升级。')
self._behavior = 'deny'

View File

@ -12,10 +12,9 @@ from .base import BasePage
from .chromium_driver import ChromiumDriver
from .chromium_element import ChromiumElement
from .chromium_frame import ChromiumFrame
from .chromium_page import ChromiumPage, ChromiumDownloadSetter, ChromiumPageSetter
from .chromium_page import ChromiumPage, ChromiumPageSetter
from .chromium_tab import WebPageTab
from .configs.chromium_options import ChromiumOptions
from .configs.driver_options import DriverOptions
from .configs.session_options import SessionOptions
from .session_element import SessionElement
from .session_page import SessionPage, SessionPageSetter
@ -26,15 +25,15 @@ class WebPage(SessionPage, ChromiumPage, BasePage):
def __init__(self,
mode: str = 'd',
timeout: float = None,
driver_or_options: Union[ChromiumDriver, ChromiumOptions, DriverOptions, bool] = None,
driver_or_options: Union[ChromiumDriver, ChromiumOptions, bool] = None,
session_or_options: Union[Session, SessionOptions, bool] = None) -> None:
self._mode: str = ...
self._has_driver: bool = ...
self._has_session: bool = ...
self.address: str = ...
self._session_options: Union[SessionOptions, None] = ...
self._driver_options: Union[ChromiumOptions, DriverOptions, None] = ...
self._download_set: WebPageDownloadSetter = ...
self._driver_options: Union[ChromiumOptions, None] = ...
self._DownloadKit: DownloadKit = ...
self._download_path: str = ...
self._tab_obj: ChromiumDriver = ...
@ -67,6 +66,9 @@ class WebPage(SessionPage, ChromiumPage, BasePage):
@property
def cookies(self) -> dict: ...
@property
def user_agent(self) -> str: ...
@property
def session(self) -> Session: ...
@ -79,12 +81,6 @@ class WebPage(SessionPage, ChromiumPage, BasePage):
@timeout.setter
def timeout(self, second: float) -> None: ...
@property
def download_path(self) -> str: ...
@property
def download_set(self) -> WebPageDownloadSetter: ...
def get(self,
url: str,
show_errmsg: bool = False,
@ -129,8 +125,6 @@ class WebPage(SessionPage, ChromiumPage, BasePage):
def get_tab(self, tab_id: str = None) -> WebPageTab: ...
def _get_driver_cookies(self, as_dict: bool = False, all_info: bool = False) -> dict: ...
def close_driver(self) -> None: ...
def close_session(self) -> None: ...
@ -156,9 +150,6 @@ class WebPage(SessionPage, ChromiumPage, BasePage):
verify: Any | None = ...,
cert: Any | None = ...) -> bool: ...
@property
def download(self) -> DownloadKit: ...
@property
def set(self) -> WebPageSetter: ...
@ -167,7 +158,7 @@ class WebPage(SessionPage, ChromiumPage, BasePage):
-> Union[ChromiumElement, SessionElement, ChromiumFrame, str, None, List[Union[SessionElement, str]], List[
Union[ChromiumElement, str, ChromiumFrame]]]: ...
def _set_start_options(self, dr_opt: Union[ChromiumDriver, DriverOptions, bool, None],
def _set_start_options(self, dr_opt: Union[ChromiumDriver, bool, None],
se_opt: Union[Session, SessionOptions, bool, None]) -> None: ...
def quit(self) -> None: ...
@ -185,21 +176,3 @@ class WebPageSetter(ChromiumPageSetter):
def headers(self, headers: dict) -> None: ...
def cookies(self, cookies) -> None: ...
class WebPageDownloadSetter(ChromiumDownloadSetter):
def __init__(self, page: WebPage):
self._page: WebPage = ...
self._behavior: str = ...
self._session: Session = None
@property
def _switched_DownloadKit(self) -> DownloadKit: ...
def save_path(self, path) -> None: ...
def by_browser(self) -> None: ...
def by_DownloadKit(self) -> None: ...
def _download_by_DownloadKit(self, **kwargs) -> None: ...