diff --git a/DrissionPage/__init__.py b/DrissionPage/__init__.py index 1f553d7..335c6a4 100644 --- a/DrissionPage/__init__.py +++ b/DrissionPage/__init__.py @@ -11,3 +11,11 @@ from .web_page import WebPage # 启动配置类 from .configs.chromium_options import ChromiumOptions from .configs.session_options import SessionOptions + +# 旧版页面类和启动配置类 +try: + from .mixpage.mix_page import MixPage + from .mixpage.drission import Drission + from .configs.driver_options import DriverOptions +except ModuleNotFoundError: + pass diff --git a/DrissionPage/base.py b/DrissionPage/base.py index 21692cb..58ff3f3 100644 --- a/DrissionPage/base.py +++ b/DrissionPage/base.py @@ -4,12 +4,9 @@ @Contact : g1879@qq.com """ from abc import abstractmethod -from pathlib import Path from re import sub from urllib.parse import quote -from DownloadKit import DownloadKit - from .commons.constants import Settings, NoneElement from .commons.locator import get_loc from .commons.web import format_html @@ -62,10 +59,10 @@ class BaseElement(BaseParser): pass def prev(self, index=1): - return None # ChromiumShadowRoot直接继承 + return None # ShadowRootElement直接继承 def prevs(self) -> None: - return None # ChromiumShadowRoot直接继承 + return None # ShadowRootElement直接继承 def next(self, index=1): pass @@ -87,7 +84,7 @@ class BaseElement(BaseParser): class DrissionElement(BaseElement): - """ChromiumElement 和 SessionElement的基类 + """DriverElement、ChromiumElement 和 SessionElement的基类 但不是ShadowRootElement的基类""" @property @@ -122,10 +119,9 @@ class DrissionElement(BaseElement): return [format_html(x.strip(' ').rstrip('\n')) for x in texts if x and sub('[\r\n\t ]', '', x) != ''] - def parent(self, level_or_loc=1, index=1): + def parent(self, level_or_loc=1): """返回上面某一级父元素,可指定层数或用查询语法定位 :param level_or_loc: 第几级父元素,或定位符 - :param index: 当level_or_loc传入定位符,使用此参数选择第几个结果 :return: 上级元素对象 """ if isinstance(level_or_loc, int): @@ -137,7 +133,7 @@ class DrissionElement(BaseElement): if loc[0] == 'css selector': raise ValueError('此css selector语法不受支持,请换成xpath。') - loc = f'xpath:./ancestor::{loc[1].lstrip(". / ")}[{index}]' + loc = f'xpath:./ancestor::{loc[1].lstrip(". / ")}' else: raise TypeError('level_or_loc参数只能是tuple、int或str。') @@ -296,7 +292,7 @@ class DrissionElement(BaseElement): :param direction: 'following' 或 'preceding',查找的方向 :param brother: 查找范围,在同级查找还是整个dom前后查找 :param timeout: 查找等待时间 - :return: 元素对象或字符串 + :return: DriverElement对象或字符串 """ if index is not None and index < 1: raise ValueError('index必须大于等于1。') @@ -357,8 +353,6 @@ class BasePage(BaseParser): self.retry_times = 3 self.retry_interval = 2 self._url_available = None - self._download_path = '' - self._DownloadKit = None @property def title(self): @@ -386,18 +380,6 @@ class BasePage(BaseParser): """返回当前访问的url有效性""" return self._url_available - @property - def download_path(self): - """返回默认下载路径""" - return str(Path(self._download_path).absolute()) - - @property - def download(self): - """返回下载器对象""" - if self._DownloadKit is None: - self._DownloadKit = DownloadKit(session=self, goal_path=self.download_path) - return self._DownloadKit - def _before_connect(self, url, retry, interval): """连接前的准备 :param url: 要访问的url @@ -405,7 +387,7 @@ class BasePage(BaseParser): :param interval: 重试间隔 :return: 重试次数和间隔组成的tuple """ - self._url = quote(url, safe='/:&?=%;#@+![]') + self._url = quote(url, safe='/:&?=%;#@+!') retry = retry if retry is not None else self.retry_times interval = interval if interval is not None else self.retry_interval return retry, interval diff --git a/DrissionPage/base.pyi b/DrissionPage/base.pyi index 3d9ec12..690241d 100644 --- a/DrissionPage/base.pyi +++ b/DrissionPage/base.pyi @@ -6,8 +6,6 @@ from abc import abstractmethod from typing import Union, Tuple, List -from DownloadKit import DownloadKit - from .commons.constants import NoneElement @@ -80,7 +78,7 @@ class DrissionElement(BaseElement): def texts(self, text_node_only: bool = False) -> list: ... - def parent(self, level_or_loc: Union[tuple, str, int] = 1, index: int = 1) -> Union[DrissionElement, None]: ... + def parent(self, level_or_loc: Union[tuple, str, int] = 1) -> Union[DrissionElement, None]: ... def child(self, index: int = 1, filter_loc: Union[tuple, str] = '', @@ -156,9 +154,7 @@ class BasePage(BaseParser): self._url_available: bool = ... self.retry_times: int = ... self.retry_interval: float = ... - self._timeout: float = ... - self._download_path: str = ... - self._DownloadKit: DownloadKit = ... + self._timeout = float = ... @property def title(self) -> Union[str, None]: ... @@ -175,12 +171,6 @@ class BasePage(BaseParser): @property def url_available(self) -> bool: ... - @property - def download_path(self) -> str: ... - - @property - def download(self) -> DownloadKit: ... - def _before_connect(self, url: str, retry: int, interval: float) -> tuple: ... # ----------------以下属性或方法由后代实现---------------- diff --git a/DrissionPage/chromium_base.py b/DrissionPage/chromium_base.py index e972da5..b87fb7a 100644 --- a/DrissionPage/chromium_base.py +++ b/DrissionPage/chromium_base.py @@ -7,6 +7,7 @@ from base64 import b64decode from json import loads, JSONDecodeError from os import sep from pathlib import Path +from re import search from threading import Thread from time import perf_counter, sleep, time @@ -18,10 +19,9 @@ from .chromium_element import ChromiumScroll, ChromiumElement, run_js, make_chro from .commons.constants import HANDLE_ALERT_METHOD, ERROR, NoneElement from .commons.locator import get_loc from .commons.tools import get_usable_path, clean_folder -from .commons.web import set_browser_cookies -from .errors import ContextLossError, ElementLossError, AlertExistsError, CDPError, TabClosedError, \ - NoRectError, BrowserConnectError, GetDocumentError -from .network_listener import NetworkListener +from .commons.web import set_browser_cookies, ResponseData +from .errors import ContextLossError, ElementLossError, AlertExistsError, CallMethodError, TabClosedError, \ + NoRectError, BrowserConnectError from .session_element import make_session_ele @@ -41,7 +41,6 @@ class ChromiumBase(BasePage): self._tab_obj = None self._set = None self._screencast = None - self._listener = None if isinstance(address, int) or (isinstance(address, str) and address.isdigit()): address = f'127.0.0.1:{address}' @@ -71,9 +70,7 @@ class ChromiumBase(BasePage): """ self._chromium_init() if not tab_id: - u = f'http://{self.address}/json' - json = self._control_session.get(u).json() - self._control_session.get(u, headers={'Connection': 'close'}) + json = self._control_session.get(f'http://{self.address}/json').json() tab_id = [i['id'] for i in json if i['type'] == 'page'] if not tab_id: raise BrowserConnectError('浏览器连接失败,可能是浏览器版本原因。') @@ -86,7 +83,6 @@ class ChromiumBase(BasePage): """浏览器初始设置""" self._control_session = Session() self._control_session.keep_alive = False - self._control_session.proxies = {'http': None, 'https': None} self._first_run = True self._is_reading = False self._upload_list = None @@ -135,8 +131,7 @@ class ChromiumBase(BasePage): self._debug_recorder.add_data((perf_counter(), '信息', f'root_id:{self._root_id}')) break - except CDPError as e: - err = e + except Exception: if self._debug: print('重试获取document') if self._debug_recorder: @@ -145,9 +140,7 @@ class ChromiumBase(BasePage): sleep(.1) else: - txt = f'请检查是否创建了过多页面对象同时操作浏览器。\n如无法解决,请把以下信息报告作者。\n{err._info}\n' \ - f'报告网址:https://gitee.com/g1879/DrissionPage/issues' - raise GetDocumentError(txt) + raise RuntimeError('获取document失败。') if self._debug: print('获取document结束') @@ -332,11 +325,6 @@ class ChromiumBase(BasePage): """返回页面加载策略,有3种:'none'、'normal'、'eager'""" return self._page_load_strategy - @property - def user_agent(self): - """返回user agent""" - return self.run_cdp('Runtime.evaluate', expression='navigator.userAgent;')['result']['value'] - @property def scroll(self): """返回用于滚动滚动条的对象""" @@ -376,13 +364,6 @@ class ChromiumBase(BasePage): self._screencast = Screencast(self) return self._screencast - @property - def listener(self): - """返回用于聆听数据包的对象""" - if self._listener is None: - self._listener = NetworkListener(self) - return self._listener - def run_cdp(self, cmd, **cmd_args): """执行Chrome DevTools Protocol语句 :param cmd: 协议项目 @@ -410,7 +391,7 @@ class ChromiumBase(BasePage): elif error in ('Node does not have a layout object', 'Could not compute box model.'): raise NoRectError elif r['type'] == 'call_method_error': - raise CDPError(f'\n错误:{r["error"]}\nmethod:{r["method"]}\nargs:{r["args"]}') + raise CallMethodError(f'\n错误:{r["error"]}\nmethod:{r["method"]}\nargs:{r["args"]}') else: raise RuntimeError(r) @@ -561,12 +542,9 @@ class ChromiumBase(BasePage): if ok: try: if single: - r = make_chromium_ele(self, node_id=nodeIds['nodeIds'][0]) - break - + return make_chromium_ele(self, node_id=nodeIds['nodeIds'][0]) else: - r = [make_chromium_ele(self, node_id=i) for i in nodeIds['nodeIds']] - break + return [make_chromium_ele(self, node_id=i) for i in nodeIds['nodeIds']] except ElementLossError: ok = False @@ -582,12 +560,6 @@ class ChromiumBase(BasePage): sleep(.1) - try: - self.run_cdp('DOM.discardSearchResults', searchId=search_result['searchId']) - except: - pass - return r - def refresh(self, ignore_cache=False): """刷新当前页面 :param ignore_cache: 是否忽略缓存 @@ -812,7 +784,7 @@ class ChromiumBase(BasePage): while self.ready_state not in ('complete', None): sleep(.1) if self._debug or show_errmsg: - print(f'重试{t + 1} {to_url}') + print(f'重试 {to_url}') if err: if show_errmsg: @@ -956,18 +928,8 @@ class ChromiumBaseSetter(object): js = f'localStorage.removeItem("{item}");' if item is False else f'localStorage.setItem("{item}","{value}");' return self._page.run_js_loaded(js, as_expr=True) - def cookie(self, cookie): - """设置单个cookie - :param cookie: cookie信息 - :return: None - """ - if isinstance(cookie, str): - self.cookies(cookie) - else: - self.cookies([cookie]) - def cookies(self, cookies): - """设置多个cookie,注意不要传入单个 + """设置cookies值 :param cookies: cookies信息 :return: None """ @@ -1001,6 +963,7 @@ class ChromiumBaseWaiter(object): :param page_or_ele: 页面对象或元素对象 """ self._driver = page_or_ele + self._listener = None def ele_delete(self, loc_or_ele, timeout=None): """等待元素从DOM中删除 @@ -1008,8 +971,10 @@ class ChromiumBaseWaiter(object): :param timeout: 超时时间,默认读取页面超时时间 :return: 是否等待成功 """ - ele = self._driver._ele(loc_or_ele, raise_err=False, timeout=0) - return ele.wait.delete(timeout) if ele else True + if isinstance(loc_or_ele, (str, tuple)): + ele = self._driver._ele(loc_or_ele, timeout=.3, raise_err=False) + return ele.wait.delete(timeout) if ele else True + return loc_or_ele.wait.delete(timeout) def ele_display(self, loc_or_ele, timeout=None): """等待元素变成显示状态 @@ -1017,8 +982,8 @@ class ChromiumBaseWaiter(object): :param timeout: 超时时间,默认读取页面超时时间 :return: 是否等待成功 """ - ele = self._driver._ele(loc_or_ele, raise_err=False, timeout=0) - return ele.wait.display(timeout) + ele = self._driver._ele(loc_or_ele, raise_err=False) + return ele.wait.display(timeout) if ele else False def ele_hidden(self, loc_or_ele, timeout=None): """等待元素变成隐藏状态 @@ -1026,18 +991,9 @@ class ChromiumBaseWaiter(object): :param timeout: 超时时间,默认读取页面超时时间 :return: 是否等待成功 """ - ele = self._driver._ele(loc_or_ele, raise_err=False, timeout=0) + ele = self._driver._ele(loc_or_ele, raise_err=False) return ele.wait.hidden(timeout) - def ele_load(self, loc, timeout=None): - """等待元素加载到DOM - :param loc: 要等待的元素,输入定位符 - :param timeout: 超时时间,默认读取页面超时时间 - :return: 成功返回元素对象,失败返回False - """ - ele = self._driver._ele(loc, raise_err=False, timeout=timeout) - return ele if ele else False - def load_start(self, timeout=None): """等待页面开始加载 :param timeout: 超时时间,为None时使用页面timeout属性 @@ -1065,8 +1021,7 @@ class ChromiumBaseWaiter(object): :return: 是否等待成功 """ if timeout != 0: - if timeout is None or timeout is True: - timeout = self._driver.timeout + timeout = self._driver.timeout if timeout in (None, True) else timeout end_time = perf_counter() + timeout while perf_counter() < end_time: if self._driver.is_loading == start: @@ -1074,6 +1029,132 @@ class ChromiumBaseWaiter(object): sleep(gap) return False + def set_targets(self, targets, is_regex=False): + """指定要等待的数据包 + :param targets: 要匹配的数据包url特征,可用list等传入多个 + :param is_regex: 设置的target是否正则表达式 + :return: None + """ + if not self._listener: + self._listener = NetworkListener(self._driver) + self._listener.set_targets(targets, is_regex) + + def data_packets(self, timeout=None, any_one=False): + """等待指定数据包加载完成 + :param timeout: 超时时间,为None则使用页面对象timeout + :param any_one: 多个target时,是否全部监听到才结束,为True时监听到一个目标就结束 + :return: ResponseData对象或监听结果字典 + """ + if not self._listener: + self._listener = NetworkListener(self._driver) + return self._listener.listen(timeout, any_one) + + def stop_listening(self): + """停止监听数据包""" + if not self._listener: + self._listener = NetworkListener(self._driver) + self._listener.stop() + + +class NetworkListener(object): + def __init__(self, page): + self._page = page + self._targets = None + self._is_regex = False + self._results = {} + self._single = False + self._requests = {} + + def set_targets(self, targets, is_regex=False): + """指定要等待的数据包 + :param targets: 要匹配的数据包url特征,可用list等传入多个 + :param is_regex: 设置的target是否正则表达式 + :return: None + """ + if not isinstance(targets, (str, list, tuple, set)): + raise TypeError('targets只能是str、list、tuple、set。') + self._is_regex = is_regex + if isinstance(targets, str): + self._targets = {targets} + self._single = True + else: + self._targets = set(targets) + self._single = False + self._page.run_cdp('Network.enable') + if targets is not None: + self._page.driver.Network.requestWillBeSent = self._requestWillBeSent + self._page.driver.Network.responseReceived = self._response_received + self._page.driver.Network.loadingFinished = self._loading_finished + else: + self.stop() + + def stop(self): + """停止监听数据包""" + self._page.run_cdp('Network.disable') + self._page.driver.Network.requestWillBeSent = None + self._page.driver.Network.responseReceived = None + self._page.driver.Network.loadingFinished = None + + def listen(self, timeout=None, any_one=False): + """等待指定数据包加载完成 + :param timeout: 超时时间,为None则使用页面对象timeout + :param any_one: 多个target时,是否全部监听到才结束,为True时监听到一个目标就结束 + :return: ResponseData对象或监听结果字典 + """ + if self._targets is None: + raise RuntimeError('必须先用set_targets()设置等待目标。') + + timeout = timeout if timeout is not None else self._page.timeout + end_time = perf_counter() + timeout + while perf_counter() < end_time: + if self._results and (any_one or set(self._results) == self._targets): + break + sleep(.1) + + self._requests = {} + if not self._results: + return False + r = list(self._results.values())[0] if self._single else self._results + self._results = {} + return r + + def _response_received(self, **kwargs): + """接收到返回信息时处理方法""" + if kwargs['requestId'] in self._requests: + self._requests[kwargs['requestId']]['response'] = kwargs['response'] + + def _loading_finished(self, **kwargs): + """请求完成时处理方法""" + request_id = kwargs['requestId'] + if request_id in self._requests: + try: + r = self._page.run_cdp('Network.getResponseBody', requestId=request_id) + body = r['body'] + is_base64 = r['base64Encoded'] + except CallMethodError: + body = '' + is_base64 = False + + request = self._requests[request_id] + target = request['target'] + rd = ResponseData(request_id, request['response'], body, self._page.tab_id, target) + rd.method = request['method'] + rd.postData = request['post_data'] + rd._base64_body = is_base64 + rd.requestHeaders = request['request_headers'] + self._results[target] = rd + + def _requestWillBeSent(self, **kwargs): + """接收到请求时的回调函数""" + for target in self._targets: + if (self._is_regex and search(target, kwargs['request']['url'])) or ( + not self._is_regex and target in kwargs['request']['url']): + self._requests[kwargs['requestId']] = {'target': target, + 'method': kwargs['request']['method'], + 'post_data': kwargs['request'].get('postData', None), + 'request_headers': kwargs['request']['headers']} + break + class ChromiumPageScroll(ChromiumScroll): def __init__(self, page): @@ -1084,10 +1165,10 @@ class ChromiumPageScroll(ChromiumScroll): self.t1 = 'window' self.t2 = 'document.documentElement' - def to_see(self, loc_or_ele, center=None): + def to_see(self, loc_or_ele, center=False): """滚动页面直到元素可见 :param loc_or_ele: 元素的定位信息,可以是loc元组,或查询字符串 - :param center: 是否尽量滚动到页面正中,为None时如果被遮挡,则滚动到页面正中 + :param center: 是否尽量滚动到页面正中 :return: None """ ele = self._driver._ele(loc_or_ele) @@ -1096,22 +1177,17 @@ class ChromiumPageScroll(ChromiumScroll): def _to_see(self, ele, center): """执行滚动页面直到元素可见 :param ele: 元素对象 - :param center: 是否尽量滚动到页面正中,为None时如果被遮挡,则滚动到页面正中 + :param center: 是否尽量滚动到页面正中 :return: None """ - txt = 'true' if center else 'false' - ele.run_js(f'this.scrollIntoViewIfNeeded({txt});') - if center or (center is not False and ele.states.is_covered): - ele.run_js('''function getWindowScrollTop() {var scroll_top = 0; - if (document.documentElement && document.documentElement.scrollTop) { - scroll_top = document.documentElement.scrollTop; - } else if (document.body) {scroll_top = document.body.scrollTop;} - return scroll_top;} - const { top, height } = this.getBoundingClientRect(); - const elCenter = top + height / 2; - const center = window.innerHeight / 2; - window.scrollTo({top: getWindowScrollTop() - (center - elCenter), - behavior: 'instant'});''') + if center: + ele.run_js('this.scrollIntoViewIfNeeded();') + self._wait_scrolled() + return + + ele.run_js('this.scrollIntoViewIfNeeded(false);') + if ele.states.is_covered: + ele.run_js('this.scrollIntoViewIfNeeded();') self._wait_scrolled() diff --git a/DrissionPage/chromium_base.pyi b/DrissionPage/chromium_base.pyi index 11c5878..9638dc8 100644 --- a/DrissionPage/chromium_base.pyi +++ b/DrissionPage/chromium_base.pyi @@ -4,7 +4,7 @@ @Contact : g1879@qq.com """ from pathlib import Path -from typing import Union, Tuple, List, Any +from typing import Union, Tuple, List, Any, Dict from DataRecorder import Recorder from requests import Session @@ -15,11 +15,12 @@ from .chromium_driver import ChromiumDriver from .chromium_element import ChromiumElement, ChromiumScroll from .chromium_frame import ChromiumFrame from .commons.constants import NoneElement -from .network_listener import NetworkListener +from .commons.web import ResponseData from .session_element import SessionElement class ChromiumBase(BasePage): + def __init__(self, address: Union[str, int], tab_id: str = None, @@ -41,7 +42,6 @@ class ChromiumBase(BasePage): self._wait: ChromiumBaseWaiter = ... self._set: ChromiumBaseSetter = ... self._screencast: Screencast = ... - self._listener: NetworkListener = ... def _connect_browser(self, tab_id: str = None) -> None: ... @@ -111,9 +111,6 @@ class ChromiumBase(BasePage): @property def page_load_strategy(self) -> str: ... - @property - def user_agent(self) -> str: ... - @property def scroll(self) -> ChromiumPageScroll: ... @@ -132,33 +129,37 @@ class ChromiumBase(BasePage): @property def screencast(self) -> Screencast: ... - @property - def listener(self) -> NetworkListener: ... - def run_js(self, script: str, *args: Any, as_expr: bool = False) -> Any: ... def run_js_loaded(self, script: str, *args: Any, as_expr: bool = False) -> Any: ... def run_async_js(self, script: str, *args: Any, as_expr: bool = False) -> None: ... - def get(self, url: str, show_errmsg: bool = False, retry: int = None, - interval: float = None, timeout: float = None) -> Union[None, bool]: ... + def get(self, + url: str, + show_errmsg: bool = False, + retry: int = None, + interval: float = None, + timeout: float = None) -> Union[None, bool]: ... - def get_cookies(self, as_dict: bool = False, all_domains: bool = False, - all_info: bool = False) -> Union[list, dict]: ... + def get_cookies(self, as_dict: bool = False, all_domains: bool = False, all_info: bool = False) -> Union[ + list, dict]: ... - def ele(self, loc_or_ele: Union[Tuple[str, str], str, ChromiumElement, ChromiumFrame], - timeout: float = None) -> Union[ChromiumElement, str]: ... + def ele(self, + loc_or_ele: Union[Tuple[str, str], str, ChromiumElement, ChromiumFrame], + timeout: float = None) -> ChromiumElement: ... - def eles(self, loc_or_str: Union[Tuple[str, str], str], - timeout: float = None) -> List[Union[ChromiumElement, str]]: ... + def eles(self, + loc_or_str: Union[Tuple[str, str], str], + timeout: float = None) -> List[ChromiumElement]: ... def s_ele(self, loc_or_ele: Union[Tuple[str, str], str] = None) \ -> Union[SessionElement, str, NoneElement]: ... def s_eles(self, loc_or_str: Union[Tuple[str, str], str]) -> List[Union[SessionElement, str]]: ... - def _find_elements(self, loc_or_ele: Union[Tuple[str, str], str, ChromiumElement, ChromiumFrame], + def _find_elements(self, + loc_or_ele: Union[Tuple[str, str], str, ChromiumElement, ChromiumFrame], timeout: float = None, single: bool = True, relative: bool = False, raise_err: bool = None) \ -> Union[ChromiumElement, ChromiumFrame, NoneElement, List[Union[ChromiumElement, ChromiumFrame]]]: ... @@ -216,6 +217,7 @@ class ChromiumBase(BasePage): class ChromiumBaseWaiter(object): def __init__(self, page: ChromiumBase): self._driver: ChromiumBase = ... + self._listener: NetworkListener = ... def ele_delete(self, loc_or_ele: Union[str, tuple, ChromiumElement], timeout: float = None) -> bool: ... @@ -223,23 +225,51 @@ class ChromiumBaseWaiter(object): def ele_hidden(self, loc_or_ele: Union[str, tuple, ChromiumElement], timeout: float = None) -> bool: ... - def ele_load(self, loc: Union[str, tuple], timeout: float = None) -> Union[bool, ChromiumElement]: ... - def _loading(self, timeout: float = None, start: bool = True, gap: float = .01) -> bool: ... def load_start(self, timeout: float = None) -> bool: ... def load_complete(self, timeout: float = None) -> bool: ... + def set_targets(self, targets: Union[str, list, tuple, set], is_regex: bool = False) -> None: ... + + def stop_listening(self) -> None: ... + + def data_packets(self, timeout: float = None, + any_one: bool = False) -> Union[ResponseData, Dict[str, ResponseData], False]: ... + def upload_paths_inputted(self) -> None: ... +class NetworkListener(object): + def __init__(self, page): + self._page: ChromiumBase = ... + self._targets: Union[str, dict] = ... + self._single: bool = ... + self._results: Union[ResponseData, Dict[str, ResponseData], False] = ... + self._is_regex: bool = ... + self._requests: dict = ... + + def set_targets(self, targets: Union[str, list, tuple, set], is_regex: bool = False) -> None: ... + + def stop(self) -> None: ... + + def listen(self, timeout: float = None, + any_one: bool = False) -> Union[ResponseData, Dict[str, ResponseData], False]: ... + + def _response_received(self, **kwargs) -> None: ... + + def _loading_finished(self, **kwargs) -> None: ... + + def _requestWillBeSent(self, **kwargs) -> None: ... + + class ChromiumPageScroll(ChromiumScroll): def __init__(self, page: ChromiumBase): ... - def to_see(self, loc_or_ele: Union[str, tuple, ChromiumElement], center: Union[bool, None] = None) -> None: ... + def to_see(self, loc_or_ele: Union[str, tuple, ChromiumElement], center: bool = False) -> None: ... - def _to_see(self, ele: ChromiumElement, center: Union[bool, None]) -> None: ... + def _to_see(self, ele: ChromiumElement, center: bool) -> None: ... class ChromiumBaseSetter(object): @@ -264,8 +294,6 @@ class ChromiumBaseSetter(object): def local_storage(self, item: str, value: Union[str, bool]) -> None: ... - def cookie(self, cookies: Union[RequestsCookieJar, str, dict]) -> None: ... - def cookies(self, cookies: Union[RequestsCookieJar, list, tuple, str, dict]) -> None: ... def headers(self, headers: dict) -> None: ... @@ -338,4 +366,4 @@ class ScreencastMode(object): def frugal_imgs_mode(self) -> None: ... - def imgs_mode(self) -> None: ... + def imgs_mode(self) -> None: ... \ No newline at end of file diff --git a/DrissionPage/chromium_driver.py b/DrissionPage/chromium_driver.py index 9d053f3..fbe08e1 100644 --- a/DrissionPage/chromium_driver.py +++ b/DrissionPage/chromium_driver.py @@ -11,7 +11,7 @@ from threading import Thread, Event from websocket import WebSocketTimeoutException, WebSocketException, WebSocketConnectionClosedException, \ create_connection -from .errors import CDPError +from .errors import CallMethodError class GenericAttr(object): @@ -79,13 +79,7 @@ class ChromiumDriver(object): message_json = dumps(message) if self.debug: - if self.debug is True or (isinstance(self.debug, str) and message.get('method', '').startswith(self.debug)): - print(f'发> {message_json}') - elif isinstance(self.debug, (list, tuple, set)): - for m in self.debug: - if message.get('method', '').startswith(m): - print(f'发> {message_json}') - break + print(f"发> {message_json}") if not isinstance(timeout, (int, float)) or timeout > 1: q_timeout = 1 @@ -123,7 +117,7 @@ class ChromiumDriver(object): try: self._ws.settimeout(1) message_json = self._ws.recv() - mes = loads(message_json) + message = loads(message_json) except WebSocketTimeoutException: continue except (WebSocketException, OSError, WebSocketConnectionClosedException): @@ -131,24 +125,17 @@ class ChromiumDriver(object): return if self.debug: - if self.debug is True or 'id' in mes or (isinstance(self.debug, str) - and mes.get('method', '').startswith(self.debug)): - print(f'<收 {message_json}') - elif isinstance(self.debug, (list, tuple, set)): - for m in self.debug: - if mes.get('method', '').startswith(m): - print(f'<收 {message_json}') - break + print(f'<收 {message_json}') - if "method" in mes: - self.event_queue.put(mes) + if "method" in message: + self.event_queue.put(message) - elif "id" in mes: - if mes["id"] in self.method_results: - self.method_results[mes['id']].put(mes) + elif "id" in message: + if message["id"] in self.method_results: + self.method_results[message['id']].put(message) elif self.debug: - print(f'未知信息:{mes}') + print(f'未知信息:{message}') def _handle_event_loop(self): """当接收到浏览器信息,执行已绑定的方法""" @@ -183,7 +170,7 @@ class ChromiumDriver(object): self.start() # raise RuntimeError("不能在启动前调用方法。") if args: - raise CDPError("参数必须是key=value形式。") + raise CallMethodError("参数必须是key=value形式。") if self._stopped.is_set(): return {'error': 'tab closed', 'type': 'tab_closed'} diff --git a/DrissionPage/chromium_element.py b/DrissionPage/chromium_element.py index bf75627..59f4eb0 100644 --- a/DrissionPage/chromium_element.py +++ b/DrissionPage/chromium_element.py @@ -14,7 +14,7 @@ from .commons.keys import keys_to_typing, keyDescriptionForString, keyDefinition from .commons.locator import get_loc from .commons.web import make_absolute_link, get_ele_txt, format_html, is_js_func, location_in_viewport, offset_scroll from .errors import ContextLossError, ElementLossError, JavaScriptError, NoRectError, ElementNotFoundError, \ - CDPError, NoResourceError, CanNotClickError + CallMethodError, NoResourceError, CanNotClickError from .session_element import make_session_ele @@ -99,7 +99,7 @@ class ChromiumElement(DrissionElement): try: attrs = self.page.run_cdp('DOM.getAttributes', nodeId=self._node_id)['attributes'] return {attrs[i]: attrs[i + 1] for i in range(0, len(attrs), 2)} - except CDPError: # 文档根元素不能调用此方法 + except CallMethodError: # 文档根元素不能调用此方法 return {} @property @@ -203,13 +203,12 @@ class ChromiumElement(DrissionElement): return self._select - def parent(self, level_or_loc=1, index=1): + def parent(self, level_or_loc=1): """返回上面某一级父元素,可指定层数或用查询语法定位 :param level_or_loc: 第几级父元素,或定位符 - :param index: 当level_or_loc传入定位符,使用此参数选择第几个结果 :return: 上级元素对象 """ - return super().parent(level_or_loc, index) + return super().parent(level_or_loc) def child(self, filter_loc='', index=1, timeout=0, ele_only=True): """返回当前元素的一个符合条件的直接子元素,可用查询语法筛选,可指定返回筛选结果的第几个 @@ -465,7 +464,7 @@ class ChromiumElement(DrissionElement): try: result = self.page.run_cdp('Page.getResourceContent', frameId=frame, url=src) break - except CDPError: + except CallMethodError: sleep(.1) if not result: @@ -523,24 +522,15 @@ class ChromiumElement(DrissionElement): return self.page._get_screenshot(path, as_bytes=as_bytes, as_base64=as_base64, full_page=False, left_top=left_top, right_bottom=right_bottom, ele=self) - def input(self, vals, clear=True, by_js=False): + def input(self, vals, clear=True): """输入文本或组合键,也可用于输入文件路径到input元素(路径间用\n间隔) :param vals: 文本值或按键组合 :param clear: 输入前是否清空文本框 - :param by_js: 是否用js方式输入,不能输入组合键 :return: None """ if self.tag == 'input' and self.attr('type') == 'file': return self._set_file_input(vals) - if by_js: - if clear: - self.clear(True) - if isinstance(vals, (list, tuple)): - vals = ''.join([str(i) for i in vals]) - self.set.prop('value', str(vals)) - return - if clear and vals not in ('\n', '\ue007'): self.clear(by_js=False) else: @@ -759,7 +749,7 @@ class ChromiumShadowRoot(BaseElement): 例:ele2 = ele1('@id=ele_id') :param loc_or_str: 元素的定位信息,可以是loc元组,或查询字符串 :param timeout: 超时时间 - :return: 元素对象或属性、文本 + :return: DriverElement对象或属性、文本 """ return self.ele(loc_or_str, timeout) @@ -809,10 +799,9 @@ class ChromiumShadowRoot(BaseElement): from threading import Thread Thread(target=run_js, args=(self, script, as_expr, self.page.timeouts.script, args)).start() - def parent(self, level_or_loc=1, index=1): + def parent(self, level_or_loc=1): """返回上面某一级父元素,可指定层数或用查询语法定位 :param level_or_loc: 第几级父元素,或定位符 - :param index: 当level_or_loc传入定位符,使用此参数选择第几个结果 :return: ChromiumElement对象 """ if isinstance(level_or_loc, int): @@ -824,7 +813,7 @@ class ChromiumShadowRoot(BaseElement): if loc[0] == 'css selector': raise ValueError('此css selector语法不受支持,请换成xpath。') - loc = f'xpath:./ancestor-or-self::{loc[1].lstrip(". / ")}[{index}]' + loc = f'xpath:./ancestor-or-self::{loc[1].lstrip(". / ")}' else: raise TypeError('level_or_loc参数只能是tuple、int或str。') @@ -1435,7 +1424,7 @@ class ChromiumElementStates(object): lx, ly = self._ele.locations.click_point try: r = self._ele.page.run_cdp('DOM.getNodeForLocation', x=lx, y=ly) - except CDPError: + except CallMethodError: return False if r.get('backendNodeId') != self._ele.ids.backend_id: @@ -1782,9 +1771,9 @@ class ChromiumScroll(object): class ChromiumElementScroll(ChromiumScroll): - def to_see(self, center=None): + def to_see(self, center=False): """滚动页面直到元素可见 - :param center: 是否尽量滚动到页面正中,为None时如果被遮挡,则滚动到页面正中 + :param center: 是否尽量滚动到页面正中 :return: None """ self._driver.page.scroll.to_see(self._driver, center=center) diff --git a/DrissionPage/chromium_element.pyi b/DrissionPage/chromium_element.pyi index c074c20..53538ff 100644 --- a/DrissionPage/chromium_element.pyi +++ b/DrissionPage/chromium_element.pyi @@ -94,7 +94,7 @@ class ChromiumElement(DrissionElement): @property def click(self) -> Click: ... - def parent(self, level_or_loc: Union[tuple, str, int] = 1, index: int = 1) -> Union[ChromiumElement, None]: ... + def parent(self, level_or_loc: Union[tuple, str, int] = 1) -> Union[ChromiumElement, None]: ... def child(self, filter_loc: Union[tuple, str] = '', index: int = 1, @@ -183,7 +183,7 @@ class ChromiumElement(DrissionElement): def get_screenshot(self, path: [str, Path] = None, as_bytes: [bool, str] = None, as_base64: [bool, str] = None) -> Union[str, bytes]: ... - def input(self, vals: Any, clear: bool = True, by_js: bool = False) -> None: ... + def input(self, vals: Any, clear: bool = True) -> None: ... def _set_file_input(self, files: Union[str, list, tuple]) -> None: ... @@ -273,7 +273,7 @@ class ChromiumShadowRoot(BaseElement): def run_async_js(self, script: str, *args: Any, as_expr: bool = False) -> None: ... - def parent(self, level_or_loc: Union[str, int] = 1, index: int = 1) -> ChromiumElement: ... + def parent(self, level_or_loc: Union[str, int] = 1) -> ChromiumElement: ... def child(self, filter_loc: Union[tuple, str] = '', index: int = 1) -> Union[ChromiumElement, str, None]: ... @@ -496,7 +496,7 @@ class ChromiumScroll(object): class ChromiumElementScroll(ChromiumScroll): - def to_see(self, center: Union[bool, None] = None) -> None: ... + def to_see(self, center: bool = False) -> None: ... class ChromiumSelect(object): diff --git a/DrissionPage/chromium_frame.py b/DrissionPage/chromium_frame.py index 58ea1e8..f9118c4 100644 --- a/DrissionPage/chromium_frame.py +++ b/DrissionPage/chromium_frame.py @@ -69,9 +69,7 @@ class ChromiumFrame(ChromiumBase): try: super()._driver_init(tab_id) except: - u = f'http://{self.address}/json' - self._control_session.get(u) - self._control_session.get(u, headers={'Connection': 'close'}) + self._control_session.get(f'http://{self.address}/json') super()._driver_init(tab_id) def _reload(self): @@ -640,10 +638,10 @@ class ChromiumFrameScroll(ChromiumPageScroll): self.t1 = self.t2 = 'this.documentElement' self._wait_complete = False - def to_see(self, loc_or_ele, center=None): + def to_see(self, loc_or_ele, center=False): """滚动页面直到元素可见 :param loc_or_ele: 元素的定位信息,可以是loc元组,或查询字符串 - :param center: 是否尽量滚动到页面正中,为None时如果被遮挡,则滚动到页面正中 + :param center: 是否尽量滚动到页面正中 :return: None """ ele = loc_or_ele if isinstance(loc_or_ele, ChromiumElement) else self._driver._ele(loc_or_ele) diff --git a/DrissionPage/chromium_frame.pyi b/DrissionPage/chromium_frame.pyi index 5326e53..631fb5f 100644 --- a/DrissionPage/chromium_frame.pyi +++ b/DrissionPage/chromium_frame.pyi @@ -203,7 +203,7 @@ class ChromiumFrameIds(object): class ChromiumFrameScroll(ChromiumPageScroll): def __init__(self, frame: ChromiumFrame) -> None: ... - def to_see(self, loc_or_ele: Union[str, tuple, ChromiumElement], center: Union[None, bool] = None) -> None: ... + def to_see(self, loc_or_ele: Union[str, tuple, ChromiumElement], center: bool = False) -> None: ... class ChromiumFrameSetter(ChromiumBaseSetter): diff --git a/DrissionPage/chromium_page.py b/DrissionPage/chromium_page.py index 00ec09d..16d7a7b 100644 --- a/DrissionPage/chromium_page.py +++ b/DrissionPage/chromium_page.py @@ -3,16 +3,23 @@ @Author : g1879 @Contact : g1879@qq.com """ +from pathlib import Path from platform import system +from threading import Thread from time import perf_counter, sleep +from warnings import warn + +from requests import Session from .chromium_base import ChromiumBase, Timeout, ChromiumBaseSetter, ChromiumBaseWaiter from .chromium_driver import ChromiumDriver from .chromium_tab import ChromiumTab from .commons.browser import connect_browser from .commons.tools import port_is_using +from .commons.web import set_session_cookies from .configs.chromium_options import ChromiumOptions -from .errors import BrowserConnectError +from .errors import CallMethodError, BrowserConnectError +from .session_page import DownloadSetter class ChromiumPage(ChromiumBase): @@ -24,15 +31,17 @@ class ChromiumPage(ChromiumBase): :param tab_id: 要控制的标签页id,不指定默认为激活的 :param timeout: 超时时间 """ + self._download_set = None + self._download_path = None super().__init__(addr_driver_opts, tab_id, timeout) def _set_start_options(self, addr_driver_opts, none): """设置浏览器启动属性 - :param addr_driver_opts: 'ip:port'、ChromiumOptions + :param addr_driver_opts: 'ip:port'、ChromiumDriver、ChromiumOptions :param none: 用于后代继承 :return: None """ - if not addr_driver_opts or isinstance(addr_driver_opts, ChromiumOptions): + if not addr_driver_opts or str(type(addr_driver_opts)).endswith(("ChromiumOptions'>", "DriverOptions'>")): self._driver_options = addr_driver_opts or ChromiumOptions(addr_driver_opts) # 接收浏览器地址和端口 @@ -71,9 +80,7 @@ class ChromiumPage(ChromiumBase): if not self._tab_obj: # 不是传入driver的情况 connect_browser(self._driver_options) if not tab_id: - u = f'http://{self.address}/json' - json = self._control_session.get(u).json() - self._control_session.get(u, headers={'Connection': 'close'}) + json = self._control_session.get(f'http://{self.address}/json').json() tab_id = [i['id'] for i in json if i['type'] == 'page'] if not tab_id: raise BrowserConnectError('浏览器连接失败,可能是浏览器版本原因。') @@ -87,9 +94,7 @@ class ChromiumPage(ChromiumBase): def _page_init(self): """页面相关设置""" - u = f'http://{self.address}/json/version' - ws = self._control_session.get(u).json()['webSocketDebuggerUrl'] - self._control_session.get(u, headers={'Connection': 'close'}) + ws = self._control_session.get(f'http://{self.address}/json/version').json()['webSocketDebuggerUrl'] self._browser_driver = ChromiumDriver(ws.split('/')[-1], 'browser', self.address) self._browser_driver.start() @@ -99,10 +104,10 @@ class ChromiumPage(ChromiumBase): self._rect = None self._main_tab = self.tab_id - # try: - # self.download_set.by_browser() - # except CDPError: - # pass + try: + self.download_set.by_browser() + except CallMethodError: + pass self._process_id = None r = self.browser_driver.SystemInfo.getProcessInfo() @@ -126,9 +131,7 @@ class ChromiumPage(ChromiumBase): @property def tabs(self): """返回所有标签页id组成的列表""" - u = f'http://{self.address}/json' - j = self._control_session.get(u).json() # 不要改用cdp - self._control_session.get(u, headers={'Connection': 'close'}) + j = self._control_session.get(f'http://{self.address}/json').json() # 不要改用cdp return [i['id'] for i in j if i['type'] == 'page'] @property @@ -152,23 +155,23 @@ class ChromiumPage(ChromiumBase): self._set = ChromiumPageSetter(self) return self._set - # @property - # def download_path(self): - # """返回默认下载路径""" - # p = self._download_path or '' - # return str(Path(p).absolute()) - # - # @property - # def download_set(self): - # """返回用于设置下载参数的对象""" - # if self._download_set is None: - # self._download_set = BaseDownloadSetter(self) - # return self._download_set - # - # @property - # def download(self): - # """返回下载器对象""" - # return self.download_set._switched_DownloadKit + @property + def download_path(self): + """返回默认下载路径""" + p = self._download_path or '' + return str(Path(p).absolute()) + + @property + def download_set(self): + """返回用于设置下载参数的对象""" + if self._download_set is None: + self._download_set = ChromiumDownloadSetter(self) + return self._download_set + + @property + def download(self): + """返回下载器对象""" + return self.download_set._switched_DownloadKit @property def rect(self): @@ -191,29 +194,24 @@ class ChromiumPage(ChromiumBase): tab_id = tab_id or self.tab_id return ChromiumTab(self, tab_id) - def find_tabs(self, title=None, url=None, tab_type=None, single=True): + def find_tabs(self, text=None, by_title=True, by_url=None, special=False): """查找符合条件的tab,返回它们的id组成的列表 - :param title: 要匹配title的文本 - :param url: 要匹配url的文本 - :param tab_type: tab类型,可用列表输入多个 - :param single: 是否返回首个结果的id,为False返回所有信息 - :return: tab id或tab dict + :param text: 查询条件 + :param by_title: 是否匹配title + :param by_url: 是否匹配url + :param special: 是否匹配特殊tab,如打印页 + :return: tab id组成的列表 """ - u = f'http://{self.address}/json' - tabs = self._control_session.get(u).json() # 不要改用cdp - self._control_session.get(u, headers={'Connection': 'close'}) - if isinstance(tab_type, str): - tab_type = {tab_type} - elif isinstance(tab_type, (list, tuple, set)): - tab_type = set(tab_type) - elif tab_type is not None: - raise TypeError('tab_type只能是set、list、tuple、str、None。') + tabs = self._control_session.get(f'http://{self.address}/json').json() # 不要改用cdp + if text is None or not (by_title or by_url): + return [i['id'] for i in tabs if (not special and i['type'] == 'page') + or (special and i['type'] not in ('page', 'iframe'))] - r = [i for i in tabs if ((title is None or title in i['title']) and (url is None or url in i['url']) - and (tab_type is None or i['type'] in tab_type))] - return r[0]['id'] if r and single else r + return [i['id'] for i in tabs if ((not special and i['type'] == 'page') + or (special and i['type'] not in ('page', 'iframe'))) + and ((by_url and text in i['url']) or (by_title and text in i['title']))] - def new_tab(self, url=None, switch_to=False): + def new_tab(self, url=None, switch_to=True): """新建一个标签页,该标签页在最后面 :param url: 新标签页跳转到的网址 :param switch_to: 新建标签页后是否把焦点移过去 @@ -385,6 +383,13 @@ class ChromiumPageWaiter(ChromiumBaseWaiter): super().__init__(page) self._listener = None + def download_begin(self, timeout=None): + """等待浏览器下载开始 + :param timeout: 等待超时时间,为None则使用页面对象timeout属性 + :return: 是否等到下载开始 + """ + return self._driver.download_set.wait_download_begin(timeout) + def new_tab(self, timeout=None): """等待新标签页出现 :param timeout: 等待超时时间,为None则使用页面对象timeout属性 @@ -395,20 +400,6 @@ class ChromiumPageWaiter(ChromiumBaseWaiter): while self._driver.tab_id == self._driver.latest_tab and perf_counter() < end_time: sleep(.01) - # def download_begin(self, timeout=1.5): - # """等待浏览器下载开始 - # :param timeout: 等待超时时间,为None则使用页面对象timeout属性 - # :return: 是否等到下载开始 - # """ - # return self._driver.download_set.wait_download_begin(timeout) - # - # def download_finish(self, timeout=None): - # """等待下载结束 - # :param timeout: 等待超时时间,为None则使用页面对象timeout属性 - # :return: 是否等到下载结束 - # """ - # return self._driver.download_set.wait_download_finish(timeout) - class ChromiumTabRect(object): def __init__(self, page): @@ -481,247 +472,125 @@ class ChromiumTabRect(object): return self._page.browser_driver.Browser.getWindowForTarget(targetId=self._page.tab_id)['bounds'] -# class BaseDownloadSetter(DownloadSetter): -# """用于设置下载参数的类""" -# -# def __init__(self, page): -# """ -# :param page: ChromiumPage对象 -# """ -# super().__init__(page) -# self._behavior = 'allowAndName' -# self._session = None -# self._save_path = '' -# self._rename = None -# self._waiting_download = False -# self._download_begin = False -# self._browser_missions = {} -# self._browser_downloading_count = 0 -# self._show_msg = True -# -# @property -# def session(self): -# """返回用于DownloadKit的Session对象""" -# if self._session is None: -# self._session = Session() -# return self._session -# -# @property -# def browser_missions(self): -# """返回浏览器下载任务""" -# return list(self._browser_missions.values()) -# -# @property -# def DownloadKit_missions(self): -# """返回DownloadKit下载任务""" -# return list(self.DownloadKit.missions.values()) -# -# @property -# def _switched_DownloadKit(self): -# """返回从浏览器同步cookies后的Session对象""" -# self._cookies_to_session() -# return self.DownloadKit -# -# def save_path(self, path): -# """设置下载路径 -# :param path: 下载路径 -# :return: None -# """ -# path = path or '' -# path = Path(path).absolute() -# path.mkdir(parents=True, exist_ok=True) -# path = str(path) -# self._save_path = path -# self._page._download_path = path -# try: -# self._page.browser_driver.Browser.setDownloadBehavior(behavior='allowAndName', downloadPath=path, -# eventsEnabled=True) -# except CDPError: -# warn('\n您的浏览器版本太低,用新标签页下载文件可能崩溃,建议升级。') -# self._page.run_cdp('Page.setDownloadBehavior', behavior='allowAndName', downloadPath=path) -# -# self.DownloadKit.goal_path = path -# -# def rename(self, name): -# """设置浏览器下一个下载任务的文件名 -# :param name: 文件名,不带后缀时自动使用原后缀 -# :return: None -# """ -# self._rename = name -# -# def by_browser(self): -# """设置使用浏览器下载文件""" -# try: -# self._page.browser_driver.Browser.setDownloadBehavior(behavior='allowAndName', eventsEnabled=True, -# downloadPath=self._page.download_path) -# self._page.browser_driver.Browser.downloadWillBegin = self._download_will_begin -# self._page.browser_driver.Browser.downloadProgress = self._download_progress -# except CDPError: -# self._page.driver.Page.setDownloadBehavior(behavior='allowAndName', downloadPath=self._page.download_path) -# self._page.driver.Page.downloadWillBegin = self._download_will_begin -# self._page.driver.Page.downloadProgress = self._download_progress -# -# self._behavior = 'allowAndName' -# -# def by_DownloadKit(self): -# """设置使用DownloadKit下载文件""" -# try: -# self._page.browser_driver.Browser.setDownloadBehavior(behavior='deny', eventsEnabled=True) -# self._page.browser_driver.Browser.downloadWillBegin = self._download_by_DownloadKit -# except CDPError: -# raise RuntimeError('您的浏览器版本太低,不支持此方法,请升级。') -# -# self._behavior = 'deny' -# -# def wait_download_begin(self, timeout=None): -# """等待浏览器下载开始 -# :param timeout: 等待超时时间,为None则使用页面对象timeout属性 -# :return: 是否等到下载开始 -# """ -# self._waiting_download = True -# result = False -# timeout = timeout if timeout is not None else self._page.timeout -# end_time = perf_counter() + timeout -# while perf_counter() < end_time: -# if self._download_begin: -# result = True -# break -# sleep(.05) -# self._download_begin = False -# self._waiting_download = False -# return result -# -# def wait_download_finish(self, timeout=None): -# """等待所有下载结束 -# :param timeout: 超时时间 -# :return: 是否等待到下载完成 -# """ -# timeout = timeout if timeout is not None else self._page.timeout -# end_time = perf_counter() + timeout -# while perf_counter() < end_time: -# if (self._DownloadKit is None or not self.DownloadKit.is_running) and self._browser_downloading_count == 0: -# return True -# sleep(.5) -# return False -# -# def show_msg(self, on_off=True): -# """是否显示下载信息 -# :param on_off: bool表示开或关 -# :return: None -# """ -# self._show_msg = on_off -# -# def _cookies_to_session(self): -# """把driver对象的cookies复制到session对象""" -# ua = self._page.run_cdp('Runtime.evaluate', expression='navigator.userAgent;')['result']['value'] -# self.session.headers.update({"User-Agent": ua}) -# set_session_cookies(self.session, self._page.get_cookies(as_dict=False, all_info=False)) -# -# def _download_by_DownloadKit(self, **kwargs): -# """拦截浏览器下载并用downloadKit下载""" -# url = kwargs['url'] -# if url.startswith('blob:'): -# raise TypeError('bolb:开头的链接无法使用DownloadKit下载,请用浏览器下载功能。') -# -# self._page.browser_driver.Browser.cancelDownload(guid=kwargs['guid']) -# -# if self._rename: -# rename = get_rename(kwargs['suggestedFilename'], self._rename) -# self._rename = None -# else: -# rename = kwargs['suggestedFilename'] -# -# mission = self._page.download.add(file_url=url, goal_path=self._page.download_path, rename=rename) -# Thread(target=self._wait_download_complete, args=(mission,), daemon=False).start() -# -# if self._waiting_download: -# self._download_begin = True -# -# self._browser_downloading_count += 1 -# -# if self._show_msg: -# print(f'(DownloadKit)开始下载:{Path(self._save_path) / rename}') -# -# def _download_will_begin(self, **kwargs): -# """浏览器下载即将开始时调用""" -# if self._rename: -# rename = get_rename(kwargs['suggestedFilename'], self._rename) -# self._rename = None -# else: -# rename = kwargs['suggestedFilename'] -# -# m = BrowserDownloadMission(kwargs['guid'], kwargs['url'], rename) -# self._browser_missions[kwargs['guid']] = m -# aid_path = Path(self._save_path) / rename -# -# if self._show_msg: -# print(f'(Browser)开始下载:{rename}') -# self._browser_downloading_count += 1 -# -# if self._file_exists == 'skip' and aid_path.exists(): -# m.state = 'skipped' -# m.save_path = aid_path.absolute() -# self._page.browser_driver.call_method('Browser.cancelDownload', guid=kwargs['guid']) -# (Path(self._save_path) / kwargs["guid"]).unlink(missing_ok=True) -# return -# -# if self._waiting_download: -# self._download_begin = True -# -# def _download_progress(self, **kwargs): -# """下载状态产生变化时调用""" -# guid = kwargs['guid'] -# m = self._browser_missions.get(guid, None) -# if m: -# m.size = kwargs['totalBytes'] -# m.received = kwargs['receivedBytes'] -# m.state = kwargs['state'] -# -# if m.state == 'completed': -# path = Path(self._save_path) / m.name -# from_path = Path(self._save_path) / guid -# if path.exists(): -# if self._file_exists == 'rename': -# path = get_usable_path(path) -# else: # 'overwrite' -# path.unlink() -# from_path.rename(path) -# m.save_path = path.absolute() -# -# if kwargs['state'] != 'inProgress': -# if self._show_msg and m: -# if kwargs['state'] == 'completed': -# print(f'(Browser)下载完成:{m.save_path}') -# elif m.state != 'skipped': -# print(f'(Browser)下载失败:{m.save_path}') -# else: -# print(f'(Browser)已跳过:{m.save_path}') -# self._browser_downloading_count -= 1 -# -# def _wait_download_complete(self, mission): -# """等待DownloadKit下载完成""" -# mission.wait(show=False) -# if self._show_msg: -# if mission.result == 'skip': -# print(f'(DownloadKit)已跳过:{mission.path}') -# elif not mission.result: -# print(f'(DownloadKit)下载失败:{mission.path}') -# else: -# print(f'(DownloadKit)下载完成:{mission.path}') +class ChromiumDownloadSetter(DownloadSetter): + """用于设置下载参数的类""" + def __init__(self, page): + """ + :param page: ChromiumPage对象 + """ + super().__init__(page) + self._behavior = 'allow' + self._download_th = None + self._session = None + self._waiting_download = False + self._download_begin = False -class BrowserDownloadMission(object): - def __init__(self, guid, url, name): - self.id = guid - self.url = url - self.name = name - self.save_path = None - self.state = None - self.size = None - self.received = None + @property + def session(self): + """返回用于DownloadKit的Session对象""" + if self._session is None: + self._session = Session() + return self._session - def __repr__(self): - return f'' + @property + def _switched_DownloadKit(self): + """返回从浏览器同步cookies后的Session对象""" + self._cookies_to_session() + return self.DownloadKit + + def save_path(self, path): + """设置下载路径 + :param path: 下载路径 + :return: None + """ + path = path or '' + path = Path(path).absolute() + path.mkdir(parents=True, exist_ok=True) + path = str(path) + self._page._download_path = path + try: + self._page.browser_driver.Browser.setDownloadBehavior(behavior='allow', downloadPath=path, + eventsEnabled=True) + except CallMethodError: + warn('\n您的浏览器版本太低,用新标签页下载文件可能崩溃,建议升级。') + self._page.run_cdp('Page.setDownloadBehavior', behavior='allow', downloadPath=path) + + self.DownloadKit.goal_path = path + + def by_browser(self): + """设置使用浏览器下载文件""" + try: + self._page.browser_driver.Browser.setDownloadBehavior(behavior='allow', eventsEnabled=True, + downloadPath=self._page.download_path) + self._page.browser_driver.Browser.downloadWillBegin = self._download_by_browser + except CallMethodError: + self._page.driver.Page.setDownloadBehavior(behavior='allow', downloadPath=self._page.download_path) + self._page.driver.Page.downloadWillBegin = self._download_by_browser + + self._behavior = 'allow' + + def by_DownloadKit(self): + """设置使用DownloadKit下载文件""" + try: + self._page.browser_driver.Browser.setDownloadBehavior(behavior='deny', eventsEnabled=True) + self._page.browser_driver.Browser.downloadWillBegin = self._download_by_DownloadKit + except CallMethodError: + raise RuntimeError('您的浏览器版本太低,不支持此方法,请升级。') + self._behavior = 'deny' + + def wait_download_begin(self, timeout=None): + """等待浏览器下载开始 + :param timeout: 等待超时时间,为None则使用页面对象timeout属性 + :return: 是否等到下载开始 + """ + self._waiting_download = True + result = False + timeout = timeout if timeout is not None else self._page.timeout + end_time = perf_counter() + timeout + while perf_counter() < end_time: + if self._download_begin: + result = True + break + sleep(.05) + self._download_begin = False + self._waiting_download = False + return result + + def _cookies_to_session(self): + """把driver对象的cookies复制到session对象""" + ua = self._page.run_cdp('Runtime.evaluate', expression='navigator.userAgent;')['result']['value'] + self.session.headers.update({"User-Agent": ua}) + set_session_cookies(self.session, self._page.get_cookies(as_dict=False, all_info=False)) + + def _download_by_DownloadKit(self, **kwargs): + """拦截浏览器下载并用downloadKit下载""" + url = kwargs['url'] + if url.startswith('blob:'): + self._page.browser_driver.Browser.setDownloadBehavior(behavior='allow', eventsEnabled=True, + downloadPath=self._page.download_path) + sleep(2) + self._page.browser_driver.Browser.setDownloadBehavior(behavior='deny', eventsEnabled=True) + + else: + self._page.browser_driver.Browser.cancelDownload(guid=kwargs['guid']) + self._page.download.add(file_url=url, goal_path=self._page.download_path, + rename=kwargs['suggestedFilename']) + if self._download_th is None or not self._download_th.is_alive(): + self._download_th = Thread(target=self._wait_download_complete, daemon=False) + self._download_th.start() + + if self._waiting_download: + self._download_begin = True + + def _download_by_browser(self, **kwargs): + """使用浏览器下载时调用""" + if self._waiting_download: + self._download_begin = True + + def _wait_download_complete(self): + """等待下载完成""" + self._page.download.wait() class Alert(object): @@ -919,11 +788,3 @@ def get_chrome_hwnds_from_pid(pid, title): hwnds = [] EnumWindows(callback, hwnds) return hwnds - - -def get_rename(original, rename): - if '.' in rename: - return rename - else: - suffix = original[original.rfind('.'):] if '.' in original else '' - return f'{rename}{suffix}' diff --git a/DrissionPage/chromium_page.pyi b/DrissionPage/chromium_page.pyi index cfcb25e..d4ceb86 100644 --- a/DrissionPage/chromium_page.pyi +++ b/DrissionPage/chromium_page.pyi @@ -5,39 +5,41 @@ """ from os import popen from pathlib import Path -from typing import Union, Tuple, List, Dict +from threading import Thread +from typing import Union, Tuple, List from DownloadKit import DownloadKit -from DownloadKit.mission import Mission from requests import Session -from .chromium_base import ChromiumBase, ChromiumBaseSetter, ChromiumBaseWaiter +from .chromium_base import ChromiumBase, ChromiumBaseSetter, ChromiumBaseWaiter, NetworkListener from .chromium_driver import ChromiumDriver from .chromium_tab import ChromiumTab from .configs.chromium_options import ChromiumOptions -from .network_listener import NetworkListener +from .configs.driver_options import DriverOptions from .session_page import DownloadSetter class ChromiumPage(ChromiumBase): def __init__(self, - addr_driver_opts: Union[str, int, ChromiumOptions, ChromiumDriver] = None, + addr_driver_opts: Union[str, int, ChromiumOptions, ChromiumDriver, DriverOptions] = None, tab_id: str = None, timeout: float = None): - self._driver_options: ChromiumOptions = ... + self._driver_options: [ChromiumDriver, DriverOptions] = ... self._process_id: str = ... self._window_setter: WindowSetter = ... self._main_tab: str = ... self._alert: Alert = ... + self._download_path: str = ... + self._download_set: ChromiumDownloadSetter = ... self._browser_driver: ChromiumDriver = ... self._rect: ChromiumTabRect = ... def _connect_browser(self, - addr_driver_opts: Union[str, ChromiumDriver] = None, + addr_driver_opts: Union[str, ChromiumDriver, DriverOptions] = None, tab_id: str = None) -> None: ... - def _set_start_options(self, addr_driver_opts: Union[str, ChromiumDriver], none) -> None: ... + def _set_start_options(self, addr_driver_opts: Union[str, ChromiumDriver, DriverOptions], none) -> None: ... def _page_init(self) -> None: ... @@ -68,12 +70,21 @@ class ChromiumPage(ChromiumBase): @property def set(self) -> ChromiumPageSetter: ... + @property + def download_set(self) -> ChromiumDownloadSetter: ... + + @property + def download(self) -> DownloadKit: ... + + @property + def download_path(self) -> str: ... + def get_tab(self, tab_id: str = None) -> ChromiumTab: ... - def find_tabs(self, title: str = None, url: str = None, - tab_type: Union[str, list, tuple, set] = None, single: bool = True) -> Union[str, List[str]]: ... + def find_tabs(self, text: str = None, by_title: bool = True, by_url: bool = None, + special: bool = False) -> List[str]: ... - def new_tab(self, url: str = None, switch_to: bool = False) -> str: ... + def new_tab(self, url: str = None, switch_to: bool = True) -> str: ... def to_main_tab(self) -> None: ... @@ -102,9 +113,7 @@ class ChromiumPageWaiter(ChromiumBaseWaiter): _driver: ChromiumPage = ... _listener: Union[NetworkListener, None] = ... - def download_begin(self, timeout: float = 1.5) -> bool: ... - - def download_finish(self, timeout: float = None) -> bool: ... + def download_begin(self, timeout: float = None) -> bool: ... def new_tab(self, timeout: float = None) -> bool: ... @@ -142,65 +151,36 @@ class ChromiumTabRect(object): def _get_browser_rect(self) -> dict: ... -class BaseDownloadSetter(DownloadSetter): +class ChromiumDownloadSetter(DownloadSetter): def __init__(self, page: ChromiumPage): self._page: ChromiumPage = ... self._behavior: str = ... - self._session: Session = ... - self._save_path: str = ... - self._rename: str = ... + self._download_th: Thread = ... + self._session: Session = None self._waiting_download: bool = ... self._download_begin: bool = ... - self._browser_missions: Dict[str, BrowserDownloadMission] = ... - self._browser_downloading_count: int = ... - self._show_msg: bool = ... @property def session(self) -> Session: ... - @property - def browser_missions(self) -> List[BrowserDownloadMission]: ... - - @property - def DownloadKit_missions(self) -> List[Mission]: ... - @property def _switched_DownloadKit(self) -> DownloadKit: ... def save_path(self, path: Union[str, Path]) -> None: ... - def rename(self, name: str) -> None: ... - def by_browser(self) -> None: ... def by_DownloadKit(self) -> None: ... def wait_download_begin(self, timeout: float = None) -> bool: ... - def wait_download_finish(self, timeout: float = None) -> bool: ... - - def show_msg(self, on_off: bool = True) -> None: ... - def _cookies_to_session(self) -> None: ... def _download_by_DownloadKit(self, **kwargs) -> None: ... - def _download_will_begin(self, **kwargs) -> None: ... + def _download_by_browser(self, **kwargs) -> None: ... - def _download_progress(self, **kwargs) -> None: ... - - def _wait_download_complete(self, mission: Mission) -> None: ... - - -class BrowserDownloadMission(object): - def __init__(self, guid: str, url: str, name: str): - self.id: str = ... - self.url: str = ... - self.name: str = ... - self.save_path: str = ... - self.state: str = ... - self.size: str = ... - self.received: str = ... + def _wait_download_complete(self) -> None: ... class Alert(object): @@ -259,6 +239,3 @@ class ChromiumPageSetter(ChromiumBaseSetter): def window(self) -> WindowSetter: ... def tab_to_front(self, tab_or_id: Union[str, ChromiumTab] = None) -> None: ... - - -def get_rename(original: str, rename: str) -> str: ... diff --git a/DrissionPage/chromium_tab.py b/DrissionPage/chromium_tab.py index 256569b..a97c9e7 100644 --- a/DrissionPage/chromium_tab.py +++ b/DrissionPage/chromium_tab.py @@ -7,7 +7,7 @@ from copy import copy from .chromium_base import ChromiumBase, ChromiumBaseSetter from .commons.web import set_session_cookies, set_browser_cookies -from .session_page import SessionPage, SessionPageSetter +from .session_page import SessionPage, SessionPageSetter, DownloadSetter class ChromiumTab(ChromiumBase): @@ -28,10 +28,6 @@ class ChromiumTab(ChromiumBase): self.retry_interval = self.page.retry_interval self._page_load_strategy = self.page.page_load_strategy - def close(self): - """关闭当前标签页""" - self.page.close_tabs(self.tab_id) - @property def rect(self): """返回获取窗口坐标和大小的对象""" @@ -52,12 +48,11 @@ class WebPageTab(SessionPage, ChromiumTab): self._has_driver = True self._has_session = True self._session = copy(page.session) - self._response = None - self._set = None + self._response = None self._download_set = None - self._download_path = page.download_path - self._DownloadKit = None + self._download_path = None + self._set = None super(SessionPage, self)._set_runtime_settings() self._connect_browser(tab_id) @@ -125,14 +120,6 @@ class WebPageTab(SessionPage, ChromiumTab): """以dict方式返回cookies""" return super().cookies - @property - def user_agent(self): - """返回user agent""" - if self._mode == 's': - return super().user_agent - elif self._mode == 'd': - return super(SessionPage, self).user_agent - @property def session(self): """返回Session对象,如未初始化则按配置信息创建""" @@ -165,6 +152,18 @@ class WebPageTab(SessionPage, ChromiumTab): self._set = WebPageTabSetter(self) return self._set + @property + def download_set(self): + """返回下载设置对象""" + if self._download_set is None: + self._download_set = WebPageTabDownloadSetter(self) + return self._download_set + + @property + def download(self): + """返回下载器对象""" + return self.download_set._switched_DownloadKit + def get(self, url, show_errmsg=False, retry=None, interval=None, timeout=None, **kwargs): """跳转到一个url :param url: 目标url @@ -293,12 +292,17 @@ class WebPageTab(SessionPage, ChromiumTab): selenium_user_agent = self.run_cdp('Runtime.evaluate', expression='navigator.userAgent;')['result']['value'] self.session.headers.update({"User-Agent": selenium_user_agent}) - set_session_cookies(self.session, super(SessionPage, self).get_cookies()) + # set_session_cookies(self.session, self._get_driver_cookies(as_dict=True)) + # set_session_cookies(self.session, self._get_driver_cookies(all_domains=True)) + set_session_cookies(self.session, self._get_driver_cookies()) def cookies_to_browser(self): """把session对象的cookies复制到浏览器""" if not self._has_driver: return + + # set_browser_cookies(self, super().get_cookies(as_dict=True)) + # set_browser_cookies(self, super().get_cookies(all_domains=True)) set_browser_cookies(self, super().get_cookies()) def get_cookies(self, as_dict=False, all_domains=False, all_info=False): @@ -311,7 +315,22 @@ class WebPageTab(SessionPage, ChromiumTab): if self._mode == 's': return super().get_cookies(as_dict, all_domains, all_info) elif self._mode == 'd': - return super(SessionPage, self).get_cookies(as_dict, all_domains, all_info) + return self._get_driver_cookies(as_dict, all_info) + + def _get_driver_cookies(self, as_dict=False, all_info=False): + """获取浏览器cookies + :param as_dict: 是否以dict形式返回,为True时all_info无效 + :param all_info: 是否返回所有信息,为False时只返回name、value、domain + :return: cookies信息 + """ + cookies = self.run_cdp('Network.getCookies')['cookies'] + if as_dict: + return {cookie['name']: cookie['value'] for cookie in cookies} + elif all_info: + return cookies + else: + return [{'name': cookie['name'], 'value': cookie['value'], 'domain': cookie['domain']} + for cookie in cookies] def _find_elements(self, loc_or_ele, timeout=None, single=True, relative=False, raise_err=None): """返回页面中符合条件的元素、属性或节点文本,默认返回第一个 @@ -336,7 +355,7 @@ class WebPageTabSetter(ChromiumBaseSetter): self._chromium_setter = ChromiumBaseSetter(self._page) def cookies(self, cookies): - """添加多个cookies信息到浏览器或session对象,注意不要传入单个 + """添加cookies信息到浏览器或session对象 :param cookies: 可以接收`CookieJar`、`list`、`tuple`、`str`、`dict`格式的`cookies` :return: None """ @@ -363,3 +382,18 @@ class WebPageTabSetter(ChromiumBaseSetter): self._chromium_setter.user_agent(ua, platform) +class WebPageTabDownloadSetter(DownloadSetter): + """用于设置下载参数的类""" + + def __init__(self, page): + super().__init__(page) + self._session = page.session + + @property + def _switched_DownloadKit(self): + """返回从浏览器同步cookies后的Session对象""" + if self._page.mode == 'd': + ua = self._page.run_cdp('Runtime.evaluate', expression='navigator.userAgent;')['result']['value'] + self._page.session.headers.update({"User-Agent": ua}) + set_session_cookies(self._page.session, self._page.get_cookies(as_dict=False, all_domains=False)) + return self.DownloadKit diff --git a/DrissionPage/chromium_tab.pyi b/DrissionPage/chromium_tab.pyi index 6a99610..9def1d7 100644 --- a/DrissionPage/chromium_tab.pyi +++ b/DrissionPage/chromium_tab.pyi @@ -5,6 +5,7 @@ """ from typing import Union, Tuple, Any, List +from DownloadKit import DownloadKit from requests import Session, Response from .chromium_base import ChromiumBase, ChromiumBaseSetter @@ -12,7 +13,7 @@ from .chromium_element import ChromiumElement from .chromium_frame import ChromiumFrame from .chromium_page import ChromiumPage, ChromiumTabRect from .session_element import SessionElement -from .session_page import SessionPage, SessionPageSetter +from .session_page import SessionPage, SessionPageSetter, DownloadSetter from .web_page import WebPage @@ -23,8 +24,6 @@ class ChromiumTab(ChromiumBase): def _set_runtime_settings(self) -> None: ... - def close(self) -> None: ... - @property def rect(self) -> ChromiumTabRect: ... @@ -35,6 +34,8 @@ class WebPageTab(SessionPage, ChromiumTab): self._mode: str = ... self._has_driver = ... self._has_session = ... + self._download_set = ... + self._download_path = ... def __call__(self, loc_or_str: Union[Tuple[str, str], str, ChromiumElement, SessionElement], @@ -64,9 +65,6 @@ class WebPageTab(SessionPage, ChromiumTab): @property def cookies(self) -> dict: ... - @property - def user_agent(self) -> str: ... - @property def session(self) -> Session: ... @@ -121,6 +119,8 @@ class WebPageTab(SessionPage, ChromiumTab): def get_cookies(self, as_dict: bool = False, all_domains: bool = False, all_info: bool = False) -> Union[dict, list]: ... + def _get_driver_cookies(self, as_dict: bool = False, all_info: bool = False) -> dict: ... + # ----------------重写SessionPage的函数----------------------- def post(self, url: str, @@ -145,6 +145,12 @@ class WebPageTab(SessionPage, ChromiumTab): @property def set(self) -> WebPageTabSetter: ... + @property + def download(self) -> DownloadKit: ... + + @property + def download_set(self) -> WebPageTabDownloadSetter: ... + def _find_elements(self, loc_or_ele: Union[Tuple[str, str], str, ChromiumElement, SessionElement, ChromiumFrame], timeout: float = None, single: bool = True, relative: bool = False, raise_err: bool = None) \ -> Union[ChromiumElement, SessionElement, ChromiumFrame, str, None, List[Union[SessionElement, str]], List[ @@ -161,3 +167,13 @@ class WebPageTabSetter(ChromiumBaseSetter): def headers(self, headers: dict) -> None: ... def cookies(self, cookies) -> None: ... + + +class WebPageTabDownloadSetter(DownloadSetter): + """用于设置下载参数的类""" + + def __init__(self, page: WebPageTab): + self._page: WebPageTab = ... + + @property + def _switched_DownloadKit(self) -> DownloadKit: ... diff --git a/DrissionPage/common.pyi b/DrissionPage/common.pyi deleted file mode 100644 index 54677db..0000000 --- a/DrissionPage/common.pyi +++ /dev/null @@ -1,7 +0,0 @@ -# -*- coding:utf-8 -*- -from .session_element import make_session_ele as make_session_ele - -from .action_chains import ActionChains as ActionChains -from .commons.keys import Keys as Keys -from .commons.by import By as By -from .commons.constants import Settings as Settings diff --git a/DrissionPage/commons/browser.py b/DrissionPage/commons/browser.py index 86b3e9e..73349aa 100644 --- a/DrissionPage/commons/browser.py +++ b/DrissionPage/commons/browser.py @@ -11,13 +11,14 @@ from time import perf_counter, sleep from requests import get as requests_get +from DrissionPage.configs.chromium_options import ChromiumOptions from DrissionPage.errors import BrowserConnectError from .tools import port_is_using def connect_browser(option): """连接或启动浏览器 - :param option: ChromiumOptions对象 + :param option: DriverOptions对象 :return: chrome 路径和进程对象组成的元组 """ debugger_address = option.debugger_address.replace('localhost', '127.0.0.1').lstrip('http://').lstrip('https://') @@ -54,8 +55,8 @@ def connect_browser(option): def get_launch_args(opt): - """从ChromiumOptions获取命令行启动参数 - :param opt: ChromiumOptions + """从DriverOptions获取命令行启动参数 + :param opt: DriverOptions或ChromiumOptions :return: 启动参数列表 """ # ----------处理arguments----------- @@ -86,7 +87,7 @@ def get_launch_args(opt): result = list(result) # ----------处理插件extensions------------- - ext = opt.extensions + ext = opt.extensions if isinstance(opt, ChromiumOptions) else opt._extension_files if ext: ext = ','.join(set(ext)) ext = f'--load-extension={ext}' @@ -97,11 +98,15 @@ def get_launch_args(opt): def set_prefs(opt): """处理启动配置中的prefs项,目前只能对已存在文件夹配置 - :param opt: ChromiumOptions + :param opt: DriverOptions或ChromiumOptions :return: None """ - prefs = opt.preferences - del_list = opt._prefs_to_del + if isinstance(opt, ChromiumOptions): + prefs = opt.preferences + del_list = opt._prefs_to_del + else: + prefs = opt.experimental_options.get('prefs', []) + del_list = [] if not opt.user_data_path: return @@ -145,9 +150,7 @@ def test_connect(ip, port): end_time = perf_counter() + 30 while perf_counter() < end_time: try: - u = f'http://{ip}:{port}/json' - tabs = requests_get(u, timeout=10, proxies={'http': None, 'https': None}).json() - requests_get(u, headers={'Connection': 'close'}, proxies={'http': None, 'https': None}) + tabs = requests_get(f'http://{ip}:{port}/json', timeout=10).json() for tab in tabs: if tab['type'] == 'page': return diff --git a/DrissionPage/commons/browser.pyi b/DrissionPage/commons/browser.pyi index ede46db..2324000 100644 --- a/DrissionPage/commons/browser.pyi +++ b/DrissionPage/commons/browser.pyi @@ -3,13 +3,16 @@ @Author : g1879 @Contact : g1879@qq.com """ +from typing import Union + from DrissionPage.configs.chromium_options import ChromiumOptions +from DrissionPage.configs.driver_options import DriverOptions -def connect_browser(option: ChromiumOptions) -> tuple: ... +def connect_browser(option: Union[ChromiumOptions, DriverOptions]) -> tuple: ... -def get_launch_args(opt: ChromiumOptions) -> list: ... +def get_launch_args(opt: Union[ChromiumOptions, DriverOptions]) -> list: ... -def set_prefs(opt: ChromiumOptions) -> None: ... +def set_prefs(opt: Union[ChromiumOptions, DriverOptions]) -> None: ... diff --git a/DrissionPage/commons/tools.py b/DrissionPage/commons/tools.py index a95dc7d..1a70f15 100644 --- a/DrissionPage/commons/tools.py +++ b/DrissionPage/commons/tools.py @@ -6,6 +6,47 @@ from pathlib import Path from re import search, sub from shutil import rmtree +from zipfile import ZipFile + + +def get_exe_from_port(port): + """获取端口号第一条进程的可执行文件路径 + :param port: 端口号 + :return: 可执行文件的绝对路径 + """ + from os import popen + + pid = get_pid_from_port(port) + if not pid: + return + else: + file_lst = popen(f'wmic process where processid={pid} get executablepath').read().split('\n') + return file_lst[2].strip() if len(file_lst) > 2 else None + + +def get_pid_from_port(port): + """获取端口号第一条进程的pid + :param port: 端口号 + :return: 进程id + """ + from platform import system + if system().lower() != 'windows' or port is None: + return None + + from os import popen + from time import perf_counter + + try: # 避免Anaconda中可能产生的报错 + process = popen(f'netstat -ano |findstr {port}').read().split('\n')[0] + + t = perf_counter() + while not process and perf_counter() - t < 5: + process = popen(f'netstat -ano |findstr {port}').read().split('\n')[0] + + return process.split(' ')[-1] or None + + except Exception: + return None def get_usable_path(path): @@ -101,41 +142,11 @@ def clean_folder(folder_path, ignore=None): elif f.is_dir(): rmtree(f, True) -# def get_exe_from_port(port): -# """获取端口号第一条进程的可执行文件路径 -# :param port: 端口号 -# :return: 可执行文件的绝对路径 -# """ -# from os import popen -# -# pid = get_pid_from_port(port) -# if not pid: -# return -# else: -# file_lst = popen(f'wmic process where processid={pid} get executablepath').read().split('\n') -# return file_lst[2].strip() if len(file_lst) > 2 else None -# -# -# def get_pid_from_port(port): -# """获取端口号第一条进程的pid -# :param port: 端口号 -# :return: 进程id -# """ -# from platform import system -# if system().lower() != 'windows' or port is None: -# return None -# -# from os import popen -# from time import perf_counter -# -# try: # 避免Anaconda中可能产生的报错 -# process = popen(f'netstat -ano |findstr {port}').read().split('\n')[0] -# -# t = perf_counter() -# while not process and perf_counter() - t < 5: -# process = popen(f'netstat -ano |findstr {port}').read().split('\n')[0] -# -# return process.split(' ')[-1] or None -# -# except Exception: -# return None + +def unzip(zip_path, to_path): + """解压下载的chromedriver.zip文件""" + if not zip_path: + return + + with ZipFile(zip_path, 'r') as f: + return [f.extract(f.namelist()[0], path=to_path)] diff --git a/DrissionPage/commons/tools.pyi b/DrissionPage/commons/tools.pyi index f7b91e8..a95722d 100644 --- a/DrissionPage/commons/tools.pyi +++ b/DrissionPage/commons/tools.pyi @@ -7,10 +7,10 @@ from pathlib import Path from typing import Union -# def get_exe_from_port(port: Union[str, int]) -> Union[str, None]: ... +def get_exe_from_port(port: Union[str, int]) -> Union[str, None]: ... -# def get_pid_from_port(port: Union[str, int]) -> Union[str, None]: ... +def get_pid_from_port(port: Union[str, int]) -> Union[str, None]: ... def get_usable_path(path: Union[str, Path]) -> Path: ... @@ -26,3 +26,6 @@ def port_is_using(ip: str, port: Union[str, int]) -> bool: ... def clean_folder(folder_path: Union[str, Path], ignore: Union[tuple, list] = None) -> None: ... + + +def unzip(zip_path: str, to_path: str) -> Union[list, None]: ... diff --git a/DrissionPage/commons/web.py b/DrissionPage/commons/web.py index 2e54997..0a7cd14 100644 --- a/DrissionPage/commons/web.py +++ b/DrissionPage/commons/web.py @@ -3,15 +3,103 @@ @Author : g1879 @Contact : g1879@qq.com """ +from base64 import b64decode from html import unescape from http.cookiejar import Cookie +from json import loads, JSONDecodeError from re import sub from urllib.parse import urlparse, urljoin, urlunparse from requests.cookies import RequestsCookieJar +from requests.structures import CaseInsensitiveDict from tldextract import extract +class ResponseData(object): + """返回的数据包管理类""" + __slots__ = ('requestId', 'response', 'rawBody', 'tab', 'target', 'url', 'status', 'statusText', 'securityDetails', + 'headersText', 'mimeType', 'requestHeadersText', 'connectionReused', 'connectionId', 'remoteIPAddress', + 'remotePort', 'fromDiskCache', 'fromServiceWorker', 'fromPrefetchCache', 'encodedDataLength', 'timing', + 'serviceWorkerResponseSource', 'responseTime', 'cacheStorageCacheName', 'protocol', 'securityState', + '_requestHeaders', '_body', '_base64_body', '_rawPostData', '_postData', 'method') + + def __init__(self, request_id, response, body, tab, target): + """ + :param response: response的数据 + :param body: response包含的内容 + :param tab: 产生这个数据包的tab的id + :param target: 监听目标 + """ + self.requestId = request_id + self.response = CaseInsensitiveDict(response) + self.rawBody = body + self.tab = tab + self.target = target + self._requestHeaders = None + self._postData = None + self._body = None + self._base64_body = False + self._rawPostData = None + + def __getattr__(self, item): + return self.response.get(item, None) + + def __getitem__(self, item): + return self.response.get(item, None) + + def __repr__(self): + return f'' + + @property + def headers(self): + """以大小写不敏感字典返回headers数据""" + headers = self.response.get('headers', None) + return CaseInsensitiveDict(headers) if headers else None + + @property + def requestHeaders(self): + """以大小写不敏感字典返回requestHeaders数据""" + if self._requestHeaders: + return self._requestHeaders + headers = self.response.get('requestHeaders', None) + return CaseInsensitiveDict(headers) if headers else None + + @requestHeaders.setter + def requestHeaders(self, val): + """设置requestHeaders""" + self._requestHeaders = val + + @property + def postData(self): + """返回postData数据""" + if self._postData is None and self._rawPostData: + try: + self._postData = loads(self._rawPostData) + except (JSONDecodeError, TypeError): + self._postData = self._rawPostData + return self._postData + + @postData.setter + def postData(self, val): + """设置postData""" + self._rawPostData = val + + @property + def body(self): + """返回body内容,如果是json格式,自动进行转换,如果时图片格式,进行base64转换,其它格式直接返回文本""" + if self._body is None: + if self._base64_body: + self._body = b64decode(self.rawBody) + + else: + try: + self._body = loads(self.rawBody) + except (JSONDecodeError, TypeError): + self._body = self.rawBody + + return self._body + + def get_ele_txt(e): """获取元素内所有文本 :param e: 元素对象 @@ -255,21 +343,14 @@ def set_browser_cookies(page, cookies): cookie['expires'] = int(cookie['expires']) if cookie['value'] is None: cookie['value'] = '' - if cookie['name'].startswith('__Secure-'): - cookie['secure'] = True - if cookie['name'].startswith('__Host-'): - cookie['path'] = '/' - cookie['secure'] = True - - else: - if cookie.get('domain', None): - try: - page.run_cdp_loaded('Network.setCookie', **cookie) - if is_cookie_in_driver(page, cookie): - continue - except Exception: - pass + if cookie.get('domain', None): + try: + page.run_cdp_loaded('Network.setCookie', **cookie) + if is_cookie_in_driver(page, cookie): + continue + except Exception: + pass ex_url = extract(page._browser_url) d_list = ex_url.subdomain.split('.') @@ -295,13 +376,7 @@ def is_cookie_in_driver(page, cookie): :param cookie: dict格式cookie :return: bool """ - if 'domain' in cookie: - for c in page.get_cookies(all_domains=True): - if cookie['name'] == c['name'] and cookie['value'] == c['value'] and cookie['domain'] == c.get('domain', - None): - return True - else: - for c in page.get_cookies(all_domains=True): - if cookie['name'] == c['name'] and cookie['value'] == c['value']: - return True + for c in page.get_cookies(): + if cookie['name'] == c['name'] and cookie['value'] == c['value']: + return True return False diff --git a/DrissionPage/commons/web.pyi b/DrissionPage/commons/web.pyi index b91ba71..b57ed66 100644 --- a/DrissionPage/commons/web.pyi +++ b/DrissionPage/commons/web.pyi @@ -8,12 +8,73 @@ from typing import Union from requests import Session from requests.cookies import RequestsCookieJar +from requests.structures import CaseInsensitiveDict from DrissionPage.base import DrissionElement, BasePage from DrissionPage.chromium_element import ChromiumElement from DrissionPage.chromium_base import ChromiumBase +class ResponseData(object): + + def __init__(self, request_id: str, response: dict, body: str, tab: str, target: str): + self.requestId: str = ... + self.response: CaseInsensitiveDict = ... + self.rawBody: str = ... + self._body: Union[str, dict, bytes] = ... + self._base64_body: bool = ... + self.tab: str = ... + self.target: str = ... + self.method: str = ... + self._postData: dict = ... + self._rawPostData: str = ... + self.url: str = ... + self.status: str = ... + self.statusText: str = ... + self.headersText: str = ... + self.mimeType: str = ... + self.requestHeadersText: str = ... + self.connectionReused: str = ... + self.connectionId: str = ... + self.remoteIPAddress: str = ... + self.remotePort: str = ... + self.fromDiskCache: str = ... + self.fromServiceWorker: str = ... + self.fromPrefetchCache: str = ... + self.encodedDataLength: str = ... + self.timing: str = ... + self.serviceWorkerResponseSource: str = ... + self.responseTime: str = ... + self.cacheStorageCacheName: str = ... + self.protocol: str = ... + self.securityState: str = ... + self.securityDetails: str = ... + + def __getattr__(self, item: str) -> Union[str, None]: ... + + def __getitem__(self, item: str) -> Union[str, None]: ... + + def __repr__(self) -> str: ... + + @property + def headers(self) -> Union[CaseInsensitiveDict, None]: ... + + @property + def requestHeaders(self) -> Union[CaseInsensitiveDict, None]: ... + + @requestHeaders.setter + def requestHeaders(self, val:dict) -> None: ... + + @property + def postData(self) -> Union[dict, str, None]: ... + + @postData.setter + def postData(self, val: Union[str, dict]) -> None: ... + + @property + def body(self) -> Union[str, dict, bytes]: ... + + def get_ele_txt(e: DrissionElement) -> str: ... diff --git a/DrissionPage/configs/chromium_options.py b/DrissionPage/configs/chromium_options.py index 7a6b0f5..e31d97e 100644 --- a/DrissionPage/configs/chromium_options.py +++ b/DrissionPage/configs/chromium_options.py @@ -26,7 +26,7 @@ class ChromiumOptions(object): self.ini_path = om.ini_path options = om.chrome_options - self._download_path = om.paths.get('download_path', '') + self._download_path = om.paths.get('download_path', None) self._arguments = options.get('arguments', []) self._binary_location = options.get('binary_location', '') self._extensions = options.get('extensions', []) @@ -62,7 +62,7 @@ class ChromiumOptions(object): self.ini_path = None self._binary_location = "chrome" self._arguments = [] - self._download_path = '' + self._download_path = None self._extensions = [] self._prefs = {} self._timeouts = {'implicit': 10, 'pageLoad': 30, 'script': 30} diff --git a/DrissionPage/configs/configs.ini b/DrissionPage/configs/configs.ini index 54d20ab..6591d2f 100644 --- a/DrissionPage/configs/configs.ini +++ b/DrissionPage/configs/configs.ini @@ -1,10 +1,11 @@ [paths] +chromedriver_path = download_path = [chrome_options] debugger_address = 127.0.0.1:9222 binary_location = chrome -arguments = ['--remote-allow-origins=*', '--no-first-run', '--disable-infobars', '--disable-popup-blocking'] +arguments = ['--remote-allow-origins=*', '--no-first-run', '--disable-gpu', '--disable-infobars', '--disable-popup-blocking'] extensions = [] experimental_options = {'prefs': {'profile.default_content_settings.popups': 0, 'profile.default_content_setting_values': {'notifications': 2}}} page_load_strategy = normal diff --git a/DrissionPage/configs/session_options.py b/DrissionPage/configs/session_options.py index 03fc800..eaa763b 100644 --- a/DrissionPage/configs/session_options.py +++ b/DrissionPage/configs/session_options.py @@ -21,7 +21,7 @@ class SessionOptions(object): :param ini_path: ini文件路径 """ self.ini_path = None - self._download_path = '' + self._download_path = None self._headers = None self._cookies = None self._auth = None @@ -73,7 +73,7 @@ class SessionOptions(object): self.set_proxies(om.proxies.get('http', None), om.proxies.get('https', None)) self._timeout = om.timeouts.get('implicit', 10) - self._download_path = om.paths.get('download_path', '') + self._download_path = om.paths.get('download_path', None) # ===========须独立处理的项开始============ @property @@ -110,13 +110,14 @@ class SessionOptions(object): self._proxies = {} return self._proxies - def set_proxies(self, http=None, https=None): + def set_proxies(self, http, https=None): """设置proxies参数 :param http: http代理地址 :param https: https代理地址 :return: 返回当前对象 """ - self._sets('proxies', {'http': http, 'https': https}) + proxies = None if http == https is None else {'http': http, 'https': https or http} + self._sets('proxies', proxies) return self # ===========须独立处理的项结束============ diff --git a/DrissionPage/easy_set.py b/DrissionPage/easy_set.py index 91fcacb..98e1260 100644 --- a/DrissionPage/easy_set.py +++ b/DrissionPage/easy_set.py @@ -6,10 +6,20 @@ from os import popen from pathlib import Path from re import search +from typing import Union from .commons.constants import Settings +from .commons.tools import unzip from .configs.chromium_options import ChromiumOptions from .configs.options_manage import OptionsManager +from .session_page import SessionPage + +try: + from selenium import webdriver + from DrissionPage.mixpage.drission import Drission + from .configs.driver_options import DriverOptions +except ModuleNotFoundError: + pass def raise_when_ele_not_found(on_off=True): @@ -38,14 +48,19 @@ def show_settings(ini_path=None): OptionsManager(ini_path).show() -def set_paths(browser_path=None, +def set_paths(driver_path=None, + chrome_path=None, + browser_path=None, local_port=None, debugger_address=None, download_path=None, user_data_path=None, cache_path=None, - ini_path=None): + ini_path=None, + check_version=False): """快捷的路径设置函数 + :param driver_path: chromedriver.exe路径 + :param chrome_path: 浏览器可执行文件路径 :param browser_path: 浏览器可执行文件路径 :param local_port: 本地端口号 :param debugger_address: 调试浏览器地址,例:127.0.0.1:9222 @@ -53,6 +68,7 @@ def set_paths(browser_path=None, :param user_data_path: 用户数据路径 :param cache_path: 缓存路径 :param ini_path: 要修改的ini文件路径 + :param check_version: 是否检查chromedriver和chrome是否匹配 :return: None """ om = OptionsManager(ini_path) @@ -60,6 +76,12 @@ def set_paths(browser_path=None, def format_path(path: str) -> str: return str(path) if path else '' + if driver_path is not None: + om.set_item('paths', 'chromedriver_path', format_path(driver_path)) + + if chrome_path is not None: + om.set_item('chrome_options', 'binary_location', format_path(chrome_path)) + if browser_path is not None: om.set_item('chrome_options', 'binary_location', format_path(browser_path)) @@ -81,6 +103,9 @@ def set_paths(browser_path=None, if cache_path is not None: set_argument('--disk-cache-dir', format_path(cache_path), ini_path) + if check_version: + check_driver_version(format_path(driver_path), format_path(browser_path)) + def use_auto_port(on_off=True, ini_path=None): """设置启动浏览器时使用自动分配的端口和临时文件夹 @@ -178,6 +203,89 @@ def set_proxy(proxy, ini_path=None): set_argument('--proxy-server', proxy, ini_path) +def check_driver_version(driver_path=None, chrome_path=None): + """检查传入的chrome和chromedriver是否匹配 + :param driver_path: chromedriver.exe路径 + :param chrome_path: chrome.exe路径 + :return: 是否匹配 + """ + print('正在检测可用性...') + om = OptionsManager() + driver_path = driver_path or om.get_value('paths', 'chromedriver_path') or 'chromedriver' + chrome_path = str(chrome_path or om.get_value('chrome_options', 'binary_location')) + do = DriverOptions(read_file=False) + do.add_argument('--headless') + + if chrome_path: + do.binary_location = chrome_path + + try: + driver = webdriver.Chrome(driver_path, options=do) + driver.quit() + print('版本匹配,可正常使用。') + + return True + + except Exception as e: + print(f'出现异常:\n{e}\n可执行easy_set.get_match_driver()自动下载匹配的版本。\n' + f'或自行从以下网址下载:http://npm.taobao.org/mirrors/chromedriver/') + + return False + + +# -------------------------自动识别chrome版本号并下载对应driver------------------------ +def get_match_driver(ini_path='default', + save_path=None, + chrome_path=None, + show_msg=True, + check_version=True): + """自动识别chrome版本并下载匹配的driver + :param ini_path: 要读取和修改的ini文件路径 + :param save_path: chromedriver保存路径 + :param chrome_path: 指定chrome.exe位置 + :param show_msg: 是否打印信息 + :param check_version: 是否检查版本匹配 + :return: None + """ + save_path = save_path or str(Path(__file__).parent) + + chrome_path = chrome_path or get_chrome_path(ini_path, show_msg) + chrome_path = Path(chrome_path).absolute() if chrome_path else None + if show_msg: + print('chrome.exe路径', chrome_path) + + ver = _get_chrome_version(str(chrome_path)) + if show_msg: + print('version', ver) + + zip_path = _download_driver(ver, save_path, show_msg=show_msg) + + if not zip_path and show_msg: + print('没有找到对应版本的driver。') + + try: + driver_path = unzip(zip_path, save_path)[0] + except TypeError: + driver_path = None + + if show_msg: + print('解压路径', driver_path) + + if driver_path: + Path(zip_path).unlink() + if ini_path: + set_paths(driver_path=driver_path, chrome_path=str(chrome_path), ini_path=ini_path, check_version=False) + + if check_version: + if not check_driver_version(driver_path, chrome_path) and show_msg: + print('获取失败,请手动配置。') + else: + if show_msg: + print('获取失败,请手动配置。') + + return driver_path + + def get_chrome_path(ini_path=None, show_msg=True, from_ini=True, @@ -257,3 +365,54 @@ def get_chrome_path(ini_path=None, return str(path) except OSError: pass + + +def _get_chrome_version(path: str) -> Union[str, None]: + """根据文件路径获取版本号 + :param path: chrome.exe文件路径 + :return: 版本号 + """ + if not path: + return + + path = str(path).replace('\\', '\\\\') + + try: + return (popen(f'wmic datafile where "name=\'{path}\'" get version').read() + .lower().split('\n')[2].replace(' ', '')) + except Exception: + return None + + +def _download_driver(version: str, save_path: str = None, show_msg: bool = True) -> Union[str, None]: + """根据传入的版本号到镜像网站查找,下载最相近的 + :param version: 本地版本号 + :return: 保存地址 + """ + if not version: + return + + main_ver = version.split('.')[0] + remote_ver = None + + page = SessionPage(Drission().session) + page.get('https://registry.npmmirror.com/-/binary/chromedriver/') + + for version in page.json: + # 遍历所有版本,跳过大版本不一致的,如果有完全匹配的,获取url,如果没有,获取最后一个版本的url + if not version['name'].startswith(f'{main_ver}.'): + continue + + remote_ver = version['name'] + if version['name'] == f'{version}/': + break + + if remote_ver: + url = f'https://cdn.npmmirror.com/binaries/chromedriver/{remote_ver}chromedriver_win32.zip' + save_path = save_path or str(Path(__file__).parent) + result = page.download(url, save_path, file_exists='overwrite', show_msg=show_msg) + + if result[0]: + return result[1] + + return None diff --git a/DrissionPage/easy_set.pyi b/DrissionPage/easy_set.pyi index d70e8b9..7ea52ca 100644 --- a/DrissionPage/easy_set.pyi +++ b/DrissionPage/easy_set.pyi @@ -16,13 +16,16 @@ def configs_to_here(file_name: Union[Path, str] = None) -> None: ... def show_settings(ini_path: Union[str, Path] = None) -> None: ... -def set_paths(browser_path: Union[str, Path] = None, +def set_paths(driver_path: Union[str, Path] = None, + chrome_path: Union[str, Path] = None, + browser_path: Union[str, Path] = None, local_port: Union[int, str] = None, debugger_address: str = None, download_path: Union[str, Path] = None, user_data_path: Union[str, Path] = None, cache_path: Union[str, Path] = None, - ini_path: Union[str, Path] = None) -> None: ... + ini_path: Union[str, Path] = None, + check_version: bool = False) -> None: ... def use_auto_port(on_off: bool = True, ini_path: Union[str, Path] = None) -> None: ... @@ -52,6 +55,17 @@ def set_user_agent(user_agent: str, ini_path: Union[str, Path] = None) -> None: def set_proxy(proxy: str, ini_path: Union[str, Path] = None) -> None: ... +def check_driver_version(driver_path: Union[str, Path] = None, chrome_path: str = None) -> bool: ... + + +# -------------------------自动识别chrome版本号并下载对应driver------------------------ +def get_match_driver(ini_path: Union[str, None] = 'default', + save_path: str = None, + chrome_path: str = None, + show_msg: bool = True, + check_version: bool = True) -> Union[str, None]: ... + + def get_chrome_path(ini_path: str = None, show_msg: bool = True, from_ini: bool = True, diff --git a/DrissionPage/errors.py b/DrissionPage/errors.py index 4eabfa6..3bcfca0 100644 --- a/DrissionPage/errors.py +++ b/DrissionPage/errors.py @@ -24,7 +24,7 @@ class ElementLossError(BaseError): _info = '元素对象因刷新已失效。' -class CDPError(BaseError): +class CallMethodError(BaseError): _info = '方法调用错误。' @@ -54,7 +54,3 @@ class NoResourceError(BaseError): class CanNotClickError(BaseError): _info = '该元素无法滚动到视口或被遮挡,无法点击。' - - -class GetDocumentError(BaseError): - _info = '获取文档失败。' diff --git a/DrissionPage/network_listener.py b/DrissionPage/network_listener.py deleted file mode 100644 index 92f473b..0000000 --- a/DrissionPage/network_listener.py +++ /dev/null @@ -1,325 +0,0 @@ -# -*- coding:utf-8 -*- -from base64 import b64decode -from json import JSONDecodeError, loads -from queue import Queue -from re import search -from threading import Thread -from time import perf_counter, sleep - -from requests.structures import CaseInsensitiveDict - -from .errors import CDPError - - -class NetworkListener(object): - """监听器基类""" - - def __init__(self, page): - """ - :param page: ChromiumBase对象 - """ - self._page = page - self._driver = self._page.driver - - self._tmp = None # 临存捕捉到的数据 - self._request_ids = None # 暂存须要拦截的请求id - - self._total_count = None # 当次监听的数量上限 - self._caught_count = None # 当次已监听到的数量 - self._begin_time = None # 当次监听开始时间 - self._timeout = None # 当次监听超时时间 - - self.listening = False - self._targets = None # 默认监听所有 - self.tab_id = None # 当前tab的id - self._results = [] - - self._is_regex = False - self._method = None - - def set_targets(self, targets=True, is_regex=False, method=None): - """指定要等待的数据包 - :param targets: 要匹配的数据包url特征,可用list等传入多个,为True时获取所有 - :param is_regex: 设置的target是否正则表达式 - :param method: 设置监听的请求类型,可用list等指定多个,为None时监听全部 - :return: None - """ - if targets is not None: - if not isinstance(targets, (str, list, tuple, set)) and targets is not True: - raise TypeError('targets只能是str、list、tuple、set、True。') - if targets is True: - targets = '' - - if isinstance(targets, str): - self._targets = {targets} - else: - self._targets = set(targets) - - self._is_regex = is_regex - - if method is not None: - if isinstance(method, str): - self._method = {method.upper()} - elif isinstance(method, (list, tuple, set)): - self._method = set(i.upper() for i in method) - else: - raise TypeError('method参数只能是str、list、tuple、set类型。') - - def listen(self, targets=None, count=None, timeout=None): - """拦截目标请求,直到超时或达到拦截个数,每次拦截前清空结果 - 可监听多个目标,请求url包含这些字符串就会被记录 - :param targets: 要监听的目标字符串或其组成的列表,True监听所有,None则保留之前的目标不变 - :param count: 要记录的个数,到达个数停止监听 - :param timeout: 监听最长时间,到时间即使未达到记录个数也停止,None为无限长 - :return: None - """ - if targets: - self.set_targets(targets) - - self.listening = True - self._results = [] - self._request_ids = {} - self._tmp = Queue(maxsize=0) - - self._caught_count = 0 - self._begin_time = perf_counter() - self._timeout = timeout - - self._set_callback_func() - - self._total_count = len(self._targets) if not count else count - - Thread(target=self._wait_to_stop).start() - - def stop(self): - """停止监听""" - self._stop() - self.listening = False - - def wait(self): - """等待监听结束""" - while self.listening: - sleep(.2) - return self._results - - def get_results(self, target=None): - """获取结果列表 - :param target: 要获取的目标,为None时获取全部 - :return: 结果数据组成的列表 - """ - return self._results if target is None else [i for i in self._results if i.target == target] - - def _wait_to_stop(self): - """当收到停止信号、到达须获取结果数、到时间就停止""" - while self._is_continue(): - sleep(.2) - self.stop() - - def _is_continue(self): - """是否继续当前监听""" - return self.listening \ - and (self._total_count is None or self._caught_count < self._total_count) \ - and (self._timeout is None or perf_counter() - self._begin_time < self._timeout) - - def steps(self, gap=1): - """用于单步操作,可实现没收到若干个数据包执行一步操作(如翻页) - :param gap: 每接收到多少个数据包触发 - :return: 用于在接收到监听目标时触发动作的可迭代对象 - """ - if not isinstance(gap, int) or gap < 1: - raise ValueError('gap参数必须为大于0的整数。') - while self.listening or not self._tmp.empty(): - while self._tmp.qsize() >= gap: - yield self._tmp.get(False) if gap == 1 else [self._tmp.get(False) for _ in range(gap)] - - sleep(.1) - - def _set_callback_func(self): - """设置监听请求的回调函数""" - self._driver.set_listener('Network.requestWillBeSent', self._requestWillBeSent) - self._driver.set_listener('Network.responseReceived', self._response_received) - self._driver.set_listener('Network.loadingFinished', self._loading_finished) - self._driver.set_listener('Network.loadingFailed', self._loading_failed) - self._driver.call_method('Network.enable') - - def _stop(self) -> None: - """停止监听前要做的工作""" - self._driver.set_listener('Network.requestWillBeSent', None) - self._driver.set_listener('Network.responseReceived', None) - self._driver.set_listener('Network.loadingFinished', None) - self._driver.set_listener('Network.loadingFailed', None) - # self._driver.call_method('Network.disable') - - def _requestWillBeSent(self, **kwargs): - """接收到请求时的回调函数""" - for target in self._targets: - if ((self._is_regex and search(target, kwargs['request']['url'])) or - (not self._is_regex and target in kwargs['request']['url'])) and ( - not self._method or kwargs['request']['method'] in self._method): - self._request_ids[kwargs['requestId']] = DataPacket(self._page.tab_id, target, kwargs) - - if kwargs['request'].get('hasPostData', None) and not kwargs['request'].get('postData', None): - self._request_ids[kwargs['requestId']]._raw_post_data = \ - self._page.run_cdp('Network.getRequestPostData', requestId=kwargs['requestId'])['postData'] - - break - - def _response_received(self, **kwargs): - """接收到返回信息时处理方法""" - request_id = kwargs['requestId'] - if request_id in self._request_ids: - self._request_ids[request_id]._raw_response = kwargs['response'] - self._request_ids[request_id]._resource_type = kwargs['type'] - - def _loading_finished(self, **kwargs): - """请求完成时处理方法""" - request_id = kwargs['requestId'] - if request_id in self._request_ids: - try: - r = self._page.run_cdp('Network.getResponseBody', requestId=request_id) - body = r['body'] - is_base64 = r['base64Encoded'] - except CDPError: - body = '' - is_base64 = False - - dp = self._request_ids[request_id] - dp._raw_body = body - dp._base64_body = is_base64 - - self._tmp.put(dp) - self._results.append(dp) - self._caught_count += 1 - - def _loading_failed(self, **kwargs): - """请求失败时的回调方法""" - request_id = kwargs['requestId'] - if request_id in self._request_ids: - dp = self._request_ids[request_id] - dp.errorText = kwargs['errorText'] - dp._resource_type = kwargs['type'] - - self._tmp.put(dp) - self._results.append(dp) - self._caught_count += 1 - - -class DataPacket(object): - """返回的数据包管理类""" - - def __init__(self, tab, target, raw_request): - """ - :param tab: 产生这个数据包的tab的id - :param target: 监听目标 - :param raw_request: 原始request数据,从cdp获得 - """ - self.tab = tab - self.target = target - - self._raw_request = raw_request - self._raw_post_data = None - - self._raw_response = None - self._raw_body = None - self._base64_body = False - - self._request = None - self._response = None - self.errorText = None - self._resource_type = None - - @property - def url(self): - return self.request.url - - @property - def method(self): - return self.request.method - - @property - def frameId(self): - return self._raw_request.get('frameId') - - @property - def resourceType(self): - return self._resource_type - - @property - def request(self): - if self._request is None: - self._request = Request(self._raw_request['request'], self._raw_post_data) - return self._request - - @property - def response(self): - if self._response is None: - self._response = Response(self._raw_response, self._raw_body, self._base64_body) - return self._response - - -class Request(object): - def __init__(self, raw_request, post_data): - self._request = raw_request - self._raw_post_data = post_data - self._postData = None - self._headers = None - - def __getattr__(self, item): - return self._request.get(item, None) - - @property - def headers(self): - """以大小写不敏感字典返回headers数据""" - if self._headers is None: - self._headers = CaseInsensitiveDict(self._request['headers']) - return self._headers - - @property - def postData(self): - """返回postData数据""" - if self._postData is None: - if self._raw_post_data: - postData = self._raw_post_data - elif self._request.get('postData', None): - postData = self._request['postData'] - else: - postData = False - try: - self._postData = loads(postData) - except (JSONDecodeError, TypeError): - self._postData = postData - return self._postData - - -class Response(object): - def __init__(self, raw_response, raw_body, base64_body): - self._response = raw_response - self._raw_body = raw_body - self._is_base64_body = base64_body - self._body = None - self._headers = None - - def __getattr__(self, item): - return self._response.get(item, None) - - @property - def headers(self): - """以大小写不敏感字典返回headers数据""" - if self._headers is None: - self._headers = CaseInsensitiveDict(self._response['headers']) - return self._headers - - @property - def body(self): - """返回body内容,如果是json格式,自动进行转换,如果时图片格式,进行base64转换,其它格式直接返回文本""" - if self._body is None: - if self._is_base64_body: - self._body = b64decode(self._raw_body) - - else: - try: - self._body = loads(self._raw_body) - except (JSONDecodeError, TypeError): - self._body = self._raw_body - - return self._body diff --git a/DrissionPage/network_listener.pyi b/DrissionPage/network_listener.pyi deleted file mode 100644 index 759f7b2..0000000 --- a/DrissionPage/network_listener.pyi +++ /dev/null @@ -1,140 +0,0 @@ -from queue import Queue -from typing import Union, Dict, List, Iterable, Tuple - -from requests.structures import CaseInsensitiveDict - -from chromium_base import ChromiumBase -from chromium_driver import ChromiumDriver - - -class NetworkListener(object): - def __init__(self, page: ChromiumBase): - self._page: ChromiumBase = ... - self._total_count: int = ... - self._caught_count: int = ... - self._targets: Union[str, dict] = ... - self._results: list = ... - self._method: set = ... - self._tmp: Queue = ... - self._is_regex: bool = ... - self._driver: ChromiumDriver = ... - self._request_ids: dict = ... - self.listening: bool = ... - self._timeout: float = ... - self._begin_time: float = ... - - def set_targets(self, targets: Union[str, list, tuple, set, None] = None, is_regex: bool = False, - count: int = None, method: Union[str, list, tuple, set] = None) -> None: ... - - def stop(self) -> None: ... - - @property - def results(self) -> Union[DataPacket, Dict[str, List[DataPacket]], False]: ... - - def clear(self) -> None: ... - - def listen(self, targets: Union[str, List[str], Tuple, bool, None] = ..., count: int = ..., - timeout: float = ...) -> Union[DataPacket, Dict[str, List[DataPacket]], False]: ... - - def _listen(self, timeout: float = None, - any_one: bool = False) -> Union[DataPacket, Dict[str, List[DataPacket]], False]: ... - - def _requestWillBeSent(self, **kwargs) -> None: ... - - def _response_received(self, **kwargs) -> None: ... - - def _loading_finished(self, **kwargs) -> None: ... - - def _loading_failed(self, **kwargs) -> None: ... - - def _request_paused(self, **kwargs) -> None: ... - - def _wait_to_stop(self) -> None: ... - - def _is_continue(self) -> bool: ... - - def steps(self, gap=1) -> Iterable[Union[DataPacket, List[DataPacket]]]: ... - - def _set_callback_func(self) -> None: ... - - def _stop(self) -> None: ... - - -class DataPacket(object): - """返回的数据包管理类""" - - def __init__(self, tab: str, target: str, raw_info: dict): - self.tab: str = ... - self.target: str = ... - self._raw_request: dict = ... - self._raw_response: dict = ... - self._raw_post_data: str = ... - self._raw_body: str = ... - self._base64_body: bool = ... - self._request: Request = ... - self._response: Response = ... - self.errorText: str = ... - self._resource_type: str = ... - - @property - def url(self) -> str: ... - - @property - def method(self) -> str: ... - - @property - def frameId(self) -> str: ... - - @property - def resourceType(self) -> str: ... - - @property - def request(self) -> Request: ... - - @property - def response(self) -> Response: ... - - -class Request(object): - url: str = ... - _headers: Union[CaseInsensitiveDict, None] = ... - method: str = ... - - # urlFragment: str = ... - # postDataEntries: list = ... - # mixedContentType: str = ... - # initialPriority: str = ... - # referrerPolicy: str = ... - # isLinkPreload: bool = ... - # trustTokenParams: dict = ... - # isSameSite: bool = ... - - def __init__(self, raw_request: dict, post_data: str): - self._request: dict = ... - self._raw_post_data: str = ... - self._postData: str = ... - - @property - def headers(self) -> dict: ... - - @property - def postData(self) -> Union[str, dict]: ... - - -class Response(object): - status: str = ... - statusText: int = ... - mimeType: str = ... - - def __init__(self, raw_response: dict, raw_body: str, base64_body: bool): - self._response: dict = ... - self._raw_body: str = ... - self._is_base64_body: bool = ... - self._body: Union[str, dict] = ... - self._headers: dict = ... - - @property - def headers(self) -> CaseInsensitiveDict: ... - - @property - def body(self) -> Union[str, dict, bool]: ... diff --git a/DrissionPage/session_element.py b/DrissionPage/session_element.py index 5142222..e7d375b 100644 --- a/DrissionPage/session_element.py +++ b/DrissionPage/session_element.py @@ -38,7 +38,7 @@ class SessionElement(DrissionElement): """在内部查找元素 例:ele2 = ele1('@id=ele_id') :param loc_or_str: 元素的定位信息,可以是loc元组,或查询字符串 - :param timeout: 不起实际作用 + :param timeout: 不起实际作用,用于和DriverElement对应,便于无差别调用 :return: SessionElement对象或属性、文本 """ return self.ele(loc_or_str) @@ -75,13 +75,12 @@ class SessionElement(DrissionElement): """返回未格式化处理的元素内文本""" return str(self._inner_ele.text_content()) - def parent(self, level_or_loc=1, index=1): + def parent(self, level_or_loc=1): """返回上面某一级父元素,可指定层数或用查询语法定位 :param level_or_loc: 第几级父元素,或定位符 - :param index: 当level_or_loc传入定位符,使用此参数选择第几个结果 :return: 上级元素对象 """ - return super().parent(level_or_loc, index) + return super().parent(level_or_loc) def child(self, filter_loc='', index=1, timeout=None, ele_only=True): """返回当前元素的一个符合条件的直接子元素,可用查询语法筛选,可指定返回筛选结果的第几个 @@ -218,7 +217,7 @@ class SessionElement(DrissionElement): def ele(self, loc_or_str, timeout=None): """返回当前元素下级符合条件的第一个元素、属性或节点文本 :param loc_or_str: 元素的定位信息,可以是loc元组,或查询字符串 - :param timeout: 不起实际作用 + :param timeout: 不起实际作用,用于和DriverElement对应,便于无差别调用 :return: SessionElement对象或属性、文本 """ return self._ele(loc_or_str) @@ -226,7 +225,7 @@ class SessionElement(DrissionElement): def eles(self, loc_or_str, timeout=None): """返回当前元素下级所有符合条件的子元素、属性或节点文本 :param loc_or_str: 元素的定位信息,可以是loc元组,或查询字符串 - :param timeout: 不起实际作用 + :param timeout: 不起实际作用,用于和DriverElement对应,便于无差别调用 :return: SessionElement对象或属性、文本组成的列表 """ return self._ele(loc_or_str, single=False) @@ -322,7 +321,8 @@ def make_session_ele(html_or_ele, loc=None, single=True): loc = loc[0], loc_str - elif the_type.endswith(".ChromiumElement'>"): + # ChromiumElement, DriverElement + elif the_type.endswith((".ChromiumElement'>", ".DriverElement'>")): loc_str = loc[1] if loc[0] == 'xpath' and loc[1].lstrip().startswith('/'): loc_str = f'.{loc[1]}' diff --git a/DrissionPage/session_element.pyi b/DrissionPage/session_element.pyi index 2959c57..4d455e1 100644 --- a/DrissionPage/session_element.pyi +++ b/DrissionPage/session_element.pyi @@ -12,6 +12,8 @@ from .chromium_base import ChromiumBase from .chromium_element import ChromiumElement from .chromium_frame import ChromiumFrame from .commons.constants import NoneElement +from mixpage.driver_element import DriverElement +from mixpage.driver_page import DriverPage from .session_page import SessionPage @@ -48,7 +50,7 @@ class SessionElement(DrissionElement): @property def raw_text(self) -> str: ... - def parent(self, level_or_loc: Union[tuple, str, int] = 1, index: int = 1) -> Union['SessionElement', None]: ... + def parent(self, level_or_loc: Union[tuple, str, int] = 1) -> Union['SessionElement', None]: ... def child(self, filter_loc: Union[tuple, str] = '', index: int = 1, @@ -122,8 +124,8 @@ class SessionElement(DrissionElement): def _get_ele_path(self, mode: str) -> str: ... -def make_session_ele(html_or_ele: Union[str, SessionElement, SessionPage, ChromiumElement, BaseElement, ChromiumFrame, -ChromiumBase], +def make_session_ele(html_or_ele: Union[str, SessionElement, SessionPage, ChromiumElement, DriverElement, BaseElement, +ChromiumFrame, ChromiumBase, DriverPage], loc: Union[str, Tuple[str, str]] = None, single: bool = True) -> Union[ SessionElement, str, NoneElement, List[Union[SessionElement, str]]]: ... diff --git a/DrissionPage/session_page.py b/DrissionPage/session_page.py index cc67f6e..3d934b9 100644 --- a/DrissionPage/session_page.py +++ b/DrissionPage/session_page.py @@ -7,6 +7,7 @@ from re import search from time import sleep from urllib.parse import urlparse +from DownloadKit import DownloadKit from requests import Session, Response from requests.structures import CaseInsensitiveDict from tldextract import extract @@ -26,6 +27,7 @@ class SessionPage(BasePage): :param timeout: 连接超时时间,为None时从ini文件读取 """ self._response = None + self._download_set = None self._session = None self._set = None self._set_start_options(session_or_options, None) @@ -97,9 +99,21 @@ class SessionPage(BasePage): return None @property - def user_agent(self): - """返回user agent""" - return self.session.headers.get('user-agent', '') + def download_path(self): + """返回下载路径""" + return self._download_path + + @property + def download_set(self): + """返回用于设置下载参数的对象""" + if self._download_set is None: + self._download_set = DownloadSetter(self) + return self._download_set + + @property + def download(self): + """返回下载器对象""" + return self.download_set.DownloadKit @property def session(self): @@ -323,18 +337,8 @@ class SessionPageSetter(object): """ self._page.timeout = second - def cookie(self, cookie): - """为Session对象设置单个cookie - :param cookie: cookie信息 - :return: None - """ - if isinstance(cookie, str): - self.cookies(cookie) - else: - self.cookies([cookie]) - def cookies(self, cookies): - """为Session对象设置多个cookie,注意不要传入单个 + """为Session对象设置cookies :param cookies: cookies信息 :return: None """ @@ -362,13 +366,14 @@ class SessionPageSetter(object): """ self._page.session.headers['user-agent'] = ua - def proxies(self, http=None, https=None): + def proxies(self, http, https=None): """设置proxies参数 :param http: http代理地址 :param https: https代理地址 :return: None """ - self._page.session.proxies = {'http': http, 'https': https} + proxies = None if http == https is None else {'http': http, 'https': https or http} + self._page.session.proxies = proxies def auth(self, auth): """设置认证元组或对象 @@ -435,6 +440,68 @@ class SessionPageSetter(object): self._page.session.mount(url, adapter) +class DownloadSetter(object): + """用于设置下载参数的类""" + + def __init__(self, page): + self._page = page + self._DownloadKit = None + + @property + def DownloadKit(self): + if self._DownloadKit is None: + self._DownloadKit = DownloadKit(session=self._page, goal_path=self._page.download_path) + return self._DownloadKit + + @property + def if_file_exists(self): + """返回用于设置存在同名文件时处理方法的对象""" + return FileExists(self) + + def split(self, on_off): + """设置是否允许拆分大文件用多线程下载 + :param on_off: 是否启用多线程下载大文件 + :return: None + """ + self.DownloadKit.split = on_off + + def save_path(self, path): + """设置下载保存路径 + :param path: 下载保存路径 + :return: None + """ + path = path if path is None else str(path) + self._page._download_path = path + self.DownloadKit.goal_path = path + + +class FileExists(object): + """用于设置存在同名文件时处理方法""" + + def __init__(self, setter): + """ + :param setter: DownloadSetter对象 + """ + self._setter = setter + + def __call__(self, mode): + if mode not in ('skip', 'rename', 'overwrite'): + raise ValueError("mode参数只能是'skip', 'rename', 'overwrite'") + self._setter.DownloadKit.file_exists = mode + + def skip(self): + """设为跳过""" + self._setter.DownloadKit.file_exists = 'skip' + + def rename(self): + """设为重命名,文件名后加序号""" + self._setter.DownloadKit._file_exists = 'rename' + + def overwrite(self): + """设为覆盖""" + self._setter.DownloadKit._file_exists = 'overwrite' + + def check_headers(kwargs, headers, arg) -> bool: """检查kwargs或headers中是否有arg所示属性""" return arg in kwargs['headers'] or arg in headers diff --git a/DrissionPage/session_page.pyi b/DrissionPage/session_page.pyi index bb803f0..c551834 100644 --- a/DrissionPage/session_page.pyi +++ b/DrissionPage/session_page.pyi @@ -3,20 +3,22 @@ @Author : g1879 @Contact : g1879@qq.com """ -from http.cookiejar import Cookie +from pathlib import Path from typing import Any, Union, Tuple, List -# from DownloadKit import DownloadKit +from DownloadKit import DownloadKit from requests import Session, Response from requests.adapters import HTTPAdapter from requests.auth import HTTPBasicAuth from requests.cookies import RequestsCookieJar from requests.structures import CaseInsensitiveDict -from .base import BasePage from .commons.constants import NoneElement +from .base import BasePage +from .chromium_page import ChromiumPage from .configs.session_options import SessionOptions from .session_element import SessionElement +from .web_page import WebPage class SessionPage(BasePage): @@ -27,8 +29,8 @@ class SessionPage(BasePage): self._session_options: SessionOptions = ... self._url: str = ... self._response: Response = ... - # self._download_path: str = ... - # self._DownloadKit: DownloadKit = ... + self._download_path: str = ... + self._download_set: DownloadSetter = ... self._url_available: bool = ... self.timeout: float = ... self.retry_times: int = ... @@ -62,10 +64,10 @@ class SessionPage(BasePage): def json(self) -> Union[dict, None]: ... @property - def user_agent(self) -> str: ... + def download_path(self) -> str: ... @property - def download_path(self) -> str: ... + def download_set(self) -> DownloadSetter: ... def get(self, url: str, @@ -118,8 +120,8 @@ class SessionPage(BasePage): @property def set(self) -> SessionPageSetter: ... - # @property - # def download(self) -> DownloadKit: ... + @property + def download(self) -> DownloadKit: ... def post(self, url: str, @@ -170,8 +172,6 @@ class SessionPageSetter(object): def timeout(self, second: float) -> None: ... - def cookie(self, cookie: Union[Cookie, str, dict]) -> None: ... - def cookies(self, cookies: Union[RequestsCookieJar, list, tuple, str, dict]) -> None: ... def headers(self, headers: dict) -> None: ... @@ -180,7 +180,7 @@ class SessionPageSetter(object): def user_agent(self, ua: str) -> None: ... - def proxies(self, http: str = None, https: str = None) -> None: ... + def proxies(self, http, https=None) -> None: ... def auth(self, auth: Union[Tuple[str, str], HTTPBasicAuth, None]) -> None: ... @@ -201,6 +201,35 @@ class SessionPageSetter(object): def add_adapter(self, url: str, adapter: HTTPAdapter) -> None: ... +class DownloadSetter(object): + def __init__(self, page: Union[SessionPage, WebPage, ChromiumPage]): + self._page: SessionPage = ... + self._DownloadKit: DownloadKit = ... + + @property + def DownloadKit(self) -> DownloadKit: ... + + @property + def if_file_exists(self) -> FileExists: ... + + def split(self, on_off: bool) -> None: ... + + def save_path(self, path: Union[str, Path]): ... + + +class FileExists(object): + def __init__(self, setter: DownloadSetter): + self._setter: DownloadSetter = ... + + def __call__(self, mode: str) -> None: ... + + def skip(self) -> None: ... + + def rename(self) -> None: ... + + def overwrite(self) -> None: ... + + def check_headers(kwargs: Union[dict, CaseInsensitiveDict], headers: Union[dict, CaseInsensitiveDict], arg: str) -> bool: ... diff --git a/DrissionPage/web_page.py b/DrissionPage/web_page.py index 059597b..62e1a1b 100644 --- a/DrissionPage/web_page.py +++ b/DrissionPage/web_page.py @@ -3,16 +3,20 @@ @Author : g1879 @Contact : g1879@qq.com """ +from pathlib import Path +from warnings import warn + from requests import Session from .base import BasePage from .chromium_base import ChromiumBase, Timeout from .chromium_driver import ChromiumDriver -from .chromium_page import ChromiumPage, ChromiumPageSetter +from .chromium_page import ChromiumPage, ChromiumDownloadSetter, ChromiumPageSetter from .chromium_tab import WebPageTab from .commons.web import set_session_cookies, set_browser_cookies from .configs.chromium_options import ChromiumOptions from .configs.session_options import SessionOptions +from .errors import CallMethodError from .session_page import SessionPage, SessionPageSetter @@ -23,7 +27,7 @@ class WebPage(SessionPage, ChromiumPage, BasePage): """初始化函数 :param mode: 'd' 或 's',即driver模式和session模式 :param timeout: 超时时间,d模式时为寻找元素时间,s模式时为连接时间,默认10秒 - :param driver_or_options: ChromiumDriver对象,只使用s模式时应传入False + :param driver_or_options: ChromiumDriver对象或DriverOptions对象,只使用s模式时应传入False :param session_or_options: Session对象或SessionOptions对象,只使用d模式时应传入False """ self._mode = mode.lower() @@ -41,6 +45,7 @@ class WebPage(SessionPage, ChromiumPage, BasePage): self._driver_options = None self._session_options = None self._response = None + self._download_set = None self._set = None self._screencast = None @@ -54,7 +59,7 @@ class WebPage(SessionPage, ChromiumPage, BasePage): def _set_start_options(self, dr_opt, se_opt): """处理两种模式的设置 - :param dr_opt: ChromiumDriver或ChromiumOptions对象,为None则从ini读取,为False用默认信息创建 + :param dr_opt: ChromiumDriver或DriverOptions对象,为None则从ini读取,为False用默认信息创建 :param se_opt: Session、SessionOptions对象或配置信息,为None则从ini读取,为False用默认信息创建 :return: None """ @@ -72,7 +77,7 @@ class WebPage(SessionPage, ChromiumPage, BasePage): elif dr_opt is False: self._driver_options = ChromiumOptions(read_file=False) - elif isinstance(dr_opt, ChromiumOptions): + elif str(type(dr_opt)).endswith(("ChromiumOptions'>", "DriverOptions'>")): self._driver_options = dr_opt else: @@ -102,6 +107,7 @@ class WebPage(SessionPage, ChromiumPage, BasePage): self._timeouts = Timeout(self) self._page_load_strategy = self._driver_options.page_load_strategy + self._download_path = None if se_opt is not False: self.set.timeouts(implicit=self._session_options.timeout) @@ -180,14 +186,6 @@ class WebPage(SessionPage, ChromiumPage, BasePage): """以dict方式返回cookies""" return super().cookies - @property - def user_agent(self): - """返回user agent""" - if self._mode == 's': - return super().user_agent - elif self._mode == 'd': - return super(SessionPage, self).user_agent - @property def session(self): """返回Session对象,如未初始化则按配置信息创建""" @@ -213,6 +211,23 @@ class WebPage(SessionPage, ChromiumPage, BasePage): """ self.set.timeouts(implicit=second) + @property + def download_path(self): + """返回默认下载路径""" + return super(SessionPage, self).download_path + + @property + def download_set(self): + """返回下载设置对象""" + if self._download_set is None: + self._download_set = WebPageDownloadSetter(self) + return self._download_set + + @property + def download(self): + """返回下载器对象""" + return self.download_set._switched_DownloadKit + @property def set(self): """返回用于等待的对象""" @@ -345,15 +360,20 @@ class WebPage(SessionPage, ChromiumPage, BasePage): return if copy_user_agent: - user_agent = self.run_cdp('Runtime.evaluate', expression='navigator.userAgent;')['result']['value'] - self.session.headers.update({"User-Agent": user_agent}) + selenium_user_agent = self.run_cdp('Runtime.evaluate', expression='navigator.userAgent;')['result']['value'] + self.session.headers.update({"User-Agent": selenium_user_agent}) - set_session_cookies(self.session, super(SessionPage, self).get_cookies()) + # set_session_cookies(self.session, self._get_driver_cookies(as_dict=True)) + # set_session_cookies(self.session, self._get_driver_cookies(all_domains=True)) + set_session_cookies(self.session, self._get_driver_cookies()) def cookies_to_browser(self): """把session对象的cookies复制到浏览器""" if not self._has_driver: return + + # set_browser_cookies(self, super().get_cookies(as_dict=True)) + # set_browser_cookies(self, super().get_cookies(all_domains=True)) set_browser_cookies(self, super().get_cookies()) def get_cookies(self, as_dict=False, all_domains=False, all_info=False): @@ -366,7 +386,7 @@ class WebPage(SessionPage, ChromiumPage, BasePage): if self._mode == 's': return super().get_cookies(as_dict, all_domains, all_info) elif self._mode == 'd': - return super(SessionPage, self).get_cookies(as_dict, all_domains, all_info) + return self._get_driver_cookies(as_dict, all_info) def get_tab(self, tab_id=None): """获取一个标签页对象 @@ -376,6 +396,21 @@ class WebPage(SessionPage, ChromiumPage, BasePage): tab_id = tab_id or self.tab_id return WebPageTab(self, tab_id) + def _get_driver_cookies(self, as_dict=False, all_info=False): + """获取浏览器cookies + :param as_dict: 是否以dict形式返回,为True时all_info无效 + :param all_info: 是否返回所有信息 + :return: cookies信息 + """ + cookies = self.run_cdp('Network.getCookies')['cookies'] + if as_dict: + return {cookie['name']: cookie['value'] for cookie in cookies} + elif all_info: + return cookies + else: + return [{'name': cookie['name'], 'value': cookie['value'], 'domain': cookie['domain']} + for cookie in cookies] + def close_driver(self): """关闭driver及浏览器""" if self._has_driver: @@ -458,3 +493,66 @@ class WebPageSetter(ChromiumPageSetter): self._session_setter.user_agent(ua) else: self._chromium_setter.user_agent(ua, platform) + + +class WebPageDownloadSetter(ChromiumDownloadSetter): + """用于设置下载参数的类""" + + def __init__(self, page): + super().__init__(page) + self._session = page.session + + @property + def _switched_DownloadKit(self): + """返回从浏览器同步cookies后的Session对象""" + if self._page.mode == 'd': + self._cookies_to_session() + return self.DownloadKit + + def save_path(self, path): + """设置下载路径 + :param path: 下载路径 + :return: None + """ + path = path or '' + path = Path(path).absolute() + path.mkdir(parents=True, exist_ok=True) + path = str(path) + self._page._download_path = path + self.DownloadKit.goal_path = path + + if self._page._has_driver: + try: + self._page.browser_driver.Browser.setDownloadBehavior(behavior=self._behavior, downloadPath=path, + eventsEnabled=True) + except CallMethodError: + warn('\n您的浏览器版本太低,用新标签页下载文件可能崩溃,建议升级。') + self._page.run_cdp('Page.setDownloadBehavior', behavior=self._behavior, downloadPath=path) + + def by_browser(self): + """设置使用浏览器下载文件""" + if not self._page._has_driver: + raise RuntimeError('浏览器未连接。') + + try: + self._page.browser_driver.Browser.setDownloadBehavior(behavior='allow', eventsEnabled=True, + downloadPath=self._page.download_path) + self._page.browser_driver.Browser.downloadWillBegin = self._download_by_browser + + except CallMethodError: + warn('\n您的浏览器版本太低,用新标签页下载文件可能崩溃,建议升级。') + self._page.driver.Page.setDownloadBehavior(behavior='allow', downloadPath=self._page.download_path) + self._page.driver.Page.downloadWillBegin = self._download_by_browser + + self._behavior = 'allow' + + def by_DownloadKit(self): + """设置使用DownloadKit下载文件""" + if self._page._has_driver: + try: + self._page.browser_driver.Browser.setDownloadBehavior(behavior='deny', eventsEnabled=True) + self._page.browser_driver.Browser.downloadWillBegin = self._download_by_DownloadKit + except CallMethodError: + raise RuntimeError('您的浏览器版本太低,不支持此方法,请升级。') + + self._behavior = 'deny' diff --git a/DrissionPage/web_page.pyi b/DrissionPage/web_page.pyi index d153c30..bc4eb58 100644 --- a/DrissionPage/web_page.pyi +++ b/DrissionPage/web_page.pyi @@ -12,9 +12,10 @@ from .base import BasePage from .chromium_driver import ChromiumDriver from .chromium_element import ChromiumElement from .chromium_frame import ChromiumFrame -from .chromium_page import ChromiumPage, ChromiumPageSetter +from .chromium_page import ChromiumPage, ChromiumDownloadSetter, ChromiumPageSetter from .chromium_tab import WebPageTab from .configs.chromium_options import ChromiumOptions +from .configs.driver_options import DriverOptions from .configs.session_options import SessionOptions from .session_element import SessionElement from .session_page import SessionPage, SessionPageSetter @@ -25,15 +26,15 @@ class WebPage(SessionPage, ChromiumPage, BasePage): def __init__(self, mode: str = 'd', timeout: float = None, - driver_or_options: Union[ChromiumDriver, ChromiumOptions, bool] = None, + driver_or_options: Union[ChromiumDriver, ChromiumOptions, DriverOptions, bool] = None, session_or_options: Union[Session, SessionOptions, bool] = None) -> None: self._mode: str = ... self._has_driver: bool = ... self._has_session: bool = ... self.address: str = ... self._session_options: Union[SessionOptions, None] = ... - self._driver_options: Union[ChromiumOptions, None] = ... - self._DownloadKit: DownloadKit = ... + self._driver_options: Union[ChromiumOptions, DriverOptions, None] = ... + self._download_set: WebPageDownloadSetter = ... self._download_path: str = ... self._tab_obj: ChromiumDriver = ... @@ -66,9 +67,6 @@ class WebPage(SessionPage, ChromiumPage, BasePage): @property def cookies(self) -> dict: ... - @property - def user_agent(self) -> str: ... - @property def session(self) -> Session: ... @@ -81,6 +79,12 @@ class WebPage(SessionPage, ChromiumPage, BasePage): @timeout.setter def timeout(self, second: float) -> None: ... + @property + def download_path(self) -> str: ... + + @property + def download_set(self) -> WebPageDownloadSetter: ... + def get(self, url: str, show_errmsg: bool = False, @@ -125,6 +129,8 @@ class WebPage(SessionPage, ChromiumPage, BasePage): def get_tab(self, tab_id: str = None) -> WebPageTab: ... + def _get_driver_cookies(self, as_dict: bool = False, all_info: bool = False) -> dict: ... + def close_driver(self) -> None: ... def close_session(self) -> None: ... @@ -150,6 +156,9 @@ class WebPage(SessionPage, ChromiumPage, BasePage): verify: Any | None = ..., cert: Any | None = ...) -> bool: ... + @property + def download(self) -> DownloadKit: ... + @property def set(self) -> WebPageSetter: ... @@ -158,7 +167,7 @@ class WebPage(SessionPage, ChromiumPage, BasePage): -> Union[ChromiumElement, SessionElement, ChromiumFrame, str, None, List[Union[SessionElement, str]], List[ Union[ChromiumElement, str, ChromiumFrame]]]: ... - def _set_start_options(self, dr_opt: Union[ChromiumDriver, bool, None], + def _set_start_options(self, dr_opt: Union[ChromiumDriver, DriverOptions, bool, None], se_opt: Union[Session, SessionOptions, bool, None]) -> None: ... def quit(self) -> None: ... @@ -176,3 +185,21 @@ class WebPageSetter(ChromiumPageSetter): def headers(self, headers: dict) -> None: ... def cookies(self, cookies) -> None: ... + + +class WebPageDownloadSetter(ChromiumDownloadSetter): + def __init__(self, page: WebPage): + self._page: WebPage = ... + self._behavior: str = ... + self._session: Session = None + + @property + def _switched_DownloadKit(self) -> DownloadKit: ... + + def save_path(self, path) -> None: ... + + def by_browser(self) -> None: ... + + def by_DownloadKit(self) -> None: ... + + def _download_by_DownloadKit(self, **kwargs) -> None: ... diff --git a/setup.py b/setup.py index d1d38c5..d621cb6 100644 --- a/setup.py +++ b/setup.py @@ -6,7 +6,7 @@ with open("README.md", "r", encoding='utf-8') as fh: setup( name="DrissionPage", - version="3.2.28", + version="3.2.30", author="g1879", author_email="g1879@qq.com", description="Python based web automation tool. It can control the browser and send and receive data packets.",