diff --git a/DrissionPage/chromium_base.py b/DrissionPage/chromium_base.py new file mode 100644 index 0000000..56b0247 --- /dev/null +++ b/DrissionPage/chromium_base.py @@ -0,0 +1,735 @@ +# -*- coding:utf-8 -*- +from json import loads +from re import search +from time import perf_counter, sleep +from typing import Union, Tuple, List, Any +from urllib.parse import urlparse + +from requests import Session +from requests.cookies import RequestsCookieJar + +from .chromium_element import ChromiumElementWaiter, ChromeScroll +from .common import get_loc +from .session_element import SessionElement, make_session_ele +from .chromium_element import ChromiumElement, _run_script +from .config import DriverOptions, _cookies_to_tuple +from .base import BasePage +from .tab import Tab + + +class ChromiumBase(BasePage): + """标签页、frame、页面基类""" + + def __init__(self, + address: str, + tab_id: str = None, + timeout: float = None): + """初始化 \n + :param address: 浏览器地址:端口 + :param tab_id: 要控制的标签页id,不指定默认为激活的 + :param timeout: 超时时间 + """ + super().__init__(timeout) + self._is_loading = None + self._root_id = None + self._connect_browser(address, tab_id) + + def _connect_browser(self, + addr_tab_opts: Union[str, Tab, DriverOptions] = None, + tab_id: str = None) -> None: + """连接浏览器,在第一次时运行 \n + :param addr_tab_opts: 浏览器地址、Tab对象或DriverOptions对象 + :param tab_id: 要控制的标签页id,不指定默认为激活的 + :return: None + """ + self._root_id = None + self.timeouts = Timeout(self) + self._control_session = Session() + self._control_session.keep_alive = False + self._first_run = True + self._is_reading = False # 用于避免不同线程重复读取document + + self.address = addr_tab_opts + if not tab_id: + json = self._control_session.get(f'http://{self.address}/json').json() + tab_id = [i['id'] for i in json if i['type'] == 'page'][0] + self._set_options() + self._init_page(tab_id) + self._get_document() + + def _init_page(self, tab_id: str = None) -> None: + """新建页面、页面刷新、切换标签页后要进行的cdp参数初始化 + :param tab_id: 要跳转到的标签页id + :return: None + """ + self._is_loading = True + if tab_id: + self._tab_obj = Tab(id=tab_id, type='page', + webSocketDebuggerUrl=f'ws://{self.address}/devtools/page/{tab_id}') + + self._tab_obj.start() + self._tab_obj.DOM.enable() + self._tab_obj.Page.enable() + + self._tab_obj.Page.frameNavigated = self._onFrameNavigated + self._tab_obj.Page.loadEventFired = self._onLoadEventFired + + def _get_document(self) -> None: + """刷新cdp使用的document数据""" + if not self._is_reading: + self._is_reading = True + self._wait_loading() + root_id = self._tab_obj.DOM.getDocument()['root']['nodeId'] + self._root_id = self._tab_obj.DOM.resolveNode(nodeId=root_id)['object']['objectId'] + self._is_loading = False + self._is_reading = False + + def _wait_loading(self, timeout: float = None) -> bool: + """等待页面加载完成 + :param timeout: 超时时间 + :return: 是否成功,超时返回False + """ + timeout = timeout if timeout is not None else self.timeouts.page_load + + end_time = perf_counter() + timeout + while perf_counter() < end_time: + state = self.ready_state + if state == 'complete': + return True + elif self.page_load_strategy == 'eager' and state in ('interactive', 'complete'): + self.stop_loading() + return True + elif self.page_load_strategy == 'none': + self.stop_loading() + return True + sleep(.1) + + self.stop_loading() + return False + + def _onLoadEventFired(self, **kwargs): + """在页面刷新、变化后重新读取页面内容""" + if self._first_run is False and self._is_loading: + self._get_document() + + def _onFrameNavigated(self, **kwargs): + """页面跳转时触发""" + if not kwargs['frame'].get('parentId', None): + self._is_loading = True + + def _set_options(self) -> None: + pass + + def __call__(self, loc_or_str: Union[Tuple[str, str], str, 'ChromiumElement'], + timeout: float = None) -> Union['ChromiumElement', 'ChromiumFrame', None]: + """在内部查找元素 \n + 例:ele = page('@id=ele_id') \n + :param loc_or_str: 元素的定位信息,可以是loc元组,或查询字符串 + :param timeout: 超时时间 + :return: ChromiumElement对象 + """ + return self.ele(loc_or_str, timeout) + + @property + def driver(self) -> Tab: + """返回用于控制浏览器的Tab对象""" + return self._tab_obj + + @property + def _driver(self): + return self._tab_obj + + @property + def _wait_driver(self) -> Tab: + """返回用于控制浏览器的Tab对象,会先等待页面加载完毕""" + while self._is_loading: + sleep(.1) + self._wait_loading() + return self._tab_obj + + @property + def is_loading(self) -> bool: + """返回页面是否正在加载状态""" + return self._is_loading + + @property + def url(self) -> str: + """返回当前页面url""" + json = self._control_session.get(f'http://{self.address}/json').json() + return [i['url'] for i in json if i['id'] == self._tab_obj.id][0] # change_mode要调用,不能用_driver + + @property + def html(self) -> str: + """返回当前页面html文本""" + node_id = self._wait_driver.DOM.getDocument()['root']['nodeId'] + return self._wait_driver.DOM.getOuterHTML(nodeId=node_id)['outerHTML'] + + @property + def json(self) -> dict: + """当返回内容是json格式时,返回对应的字典""" + return loads(self('t:pre').text) + + @property + def tab_id(self) -> str: + """返回当前标签页id""" + return self.driver.id if self.driver.status == 'started' else '' + + @property + def ready_state(self) -> str: + """返回当前页面加载状态,'loading' 'interactive' 'complete'""" + return self._driver.Runtime.evaluate(expression='document.readyState;')['result']['value'] + + @property + def size(self) -> dict: + """返回页面总长宽,{'height': int, 'width': int}""" + w = self.run_script('document.body.scrollWidth;', as_expr=True) + h = self.run_script('document.body.scrollHeight;', as_expr=True) + return {'height': h, 'width': w} + + @property + def active_ele(self) -> ChromiumElement: + """返回当前焦点所在元素""" + return self.run_script('return document.activeElement;') + + @property + def page_load_strategy(self) -> str: + """返回页面加载策略""" + return self._page_load_strategy + + @property + def scroll(self) -> 'ChromeScroll': + """返回用于滚动滚动条的对象""" + if not hasattr(self, '_scroll'): + self._scroll = ChromeScroll(self) + return self._scroll + + def set_page_load_strategy(self, value: str) -> None: + """设置页面加载策略 \n + :param value: 可选'normal', 'eager', 'none' + :return: None + """ + if value not in ('normal', 'eager', 'none'): + raise ValueError("只能选择'normal', 'eager', 'none'。") + self._page_load_strategy = value + + def set_timeouts(self, implicit: float = None, page_load: float = None, script: float = None) -> None: + """设置超时时间,单位为秒 \n + :param implicit: 查找元素超时时间 + :param page_load: 页面加载超时时间 + :param script: 脚本运行超时时间 + :return: None + """ + if implicit is not None: + self.timeout = implicit + + if page_load is not None: + self.timeouts.page_load = page_load + + if script is not None: + self.timeouts.script = script + + def run_script(self, script: str, as_expr: bool = False, *args: Any) -> Any: + """运行javascript代码 \n + :param script: js文本 + :param as_expr: 是否作为表达式运行,为True时args无效 + :param args: 参数,按顺序在js文本中对应argument[0]、argument[2]... + :return: 运行的结果 + """ + return _run_script(self, script, as_expr, self.timeouts.script, args) + + def run_async_script(self, script: str, as_expr: bool = False, *args: Any) -> None: + """以异步方式执行js代码 \n + :param script: js文本 + :param as_expr: 是否作为表达式运行,为True时args无效 + :param args: 参数,按顺序在js文本中对应argument[0]、argument[2]... + :return: None + """ + from threading import Thread + Thread(target=_run_script, args=(self, script, as_expr, self.timeouts.script, args)).start() + + def get(self, + url: str, + show_errmsg: bool = False, + retry: int = None, + interval: float = None, + timeout: float = None) -> Union[None, bool]: + """访问url \n + :param url: 目标url + :param show_errmsg: 是否显示和抛出异常 + :param retry: 重试次数 + :param interval: 重试间隔(秒) + :param timeout: 连接超时时间 + :return: 目标url是否可用,返回None表示不确定 + """ + retry, interval = self._before_connect(url, retry, interval) + self._url_available = self._d_connect(self._url, + times=retry, + interval=interval, + show_errmsg=show_errmsg, + timeout=timeout) + return self._url_available + + def get_cookies(self, as_dict: bool = False) -> Union[list, dict]: + """获取cookies信息 \n + :param as_dict: 为True时返回由{name: value}键值对组成的dict + :return: cookies信息 + """ + cookies = self._wait_driver.Network.getCookies()['cookies'] + if as_dict: + return {cookie['name']: cookie['value'] for cookie in cookies} + else: + return cookies + + def set_cookies(self, cookies: Union[RequestsCookieJar, list, tuple, str, dict]) -> None: + """设置cookies值 \n + :param cookies: cookies信息 + :return: None + """ + cookies = _cookies_to_tuple(cookies) + result_cookies = [] + for cookie in cookies: + if not cookie.get('domain', None): + continue + c = {'value': '' if cookie['value'] is None else cookie['value'], + 'name': cookie['name'], + 'domain': cookie['domain']} + result_cookies.append(c) + self._wait_driver.Network.setCookies(cookies=result_cookies) + + def ele(self, + loc_or_ele: Union[Tuple[str, str], str, ChromiumElement, 'ChromiumFrame'], + timeout: float = None) -> Union[ChromiumElement, 'ChromiumFrame', None]: + """获取第一个符合条件的元素对象 \n + :param loc_or_ele: 定位符或元素对象 + :param timeout: 查找超时时间 + :return: ChromiumElement对象 + """ + return self._ele(loc_or_ele, timeout=timeout) + + def eles(self, + loc_or_ele: Union[Tuple[str, str], str], + timeout: float = None) -> List[Union[ChromiumElement, 'ChromiumFrame']]: + """获取所有符合条件的元素对象 \n + :param loc_or_ele: 定位符或元素对象 + :param timeout: 查找超时时间 + :return: ChromiumElement对象组成的列表 + """ + return self._ele(loc_or_ele, timeout=timeout, single=False) + + def s_ele(self, loc_or_ele: Union[Tuple[str, str], str, ChromiumElement] = None) \ + -> Union[SessionElement, str, None]: + """查找第一个符合条件的元素以SessionElement形式返回,处理复杂页面时效率很高 \n + :param loc_or_ele: 元素的定位信息,可以是loc元组,或查询字符串 + :return: SessionElement对象或属性、文本 + """ + if isinstance(loc_or_ele, ChromiumElement): + return make_session_ele(loc_or_ele) + else: + return make_session_ele(self, loc_or_ele) + + def s_eles(self, loc_or_str: Union[Tuple[str, str], str] = None) -> List[Union[SessionElement, str]]: + """查找所有符合条件的元素以SessionElement列表形式返回 \n + :param loc_or_str: 元素的定位信息,可以是loc元组,或查询字符串 + :return: SessionElement对象组成的列表 + """ + return make_session_ele(self, loc_or_str, single=False) + + def _ele(self, + loc_or_ele: Union[Tuple[str, str], str, ChromiumElement, 'ChromiumFrame'], + timeout: float = None, single: bool = True, relative: bool = False) \ + -> Union[ChromiumElement, 'ChromiumFrame', None, List[Union[ChromiumElement, 'ChromiumFrame']]]: + """执行元素查找 + :param loc_or_ele: 定位符或元素对象 + :param timeout: 查找超时时间 + :param single: 是否只返回第一个 + :return: ChromiumElement对象或元素对象组成的列表 + """ + if isinstance(loc_or_ele, (str, tuple)): + loc = get_loc(loc_or_ele)[1] + elif isinstance(loc_or_ele, ChromiumElement): + return loc_or_ele + else: + raise ValueError('loc_or_str参数只能是tuple、str、ChromiumElement类型。') + + timeout = timeout if timeout is not None else self.timeout + search_result = self._wait_driver.DOM.performSearch(query=loc, includeUserAgentShadowDOM=True) + count = search_result['resultCount'] + + end_time = perf_counter() + timeout + while count == 0 and perf_counter() < end_time: + search_result = self._wait_driver.DOM.performSearch(query=loc, includeUserAgentShadowDOM=True) + count = search_result['resultCount'] + + if count == 0: + return None if single else [] + + count = 1 if single else count + nodeIds = self._wait_driver.DOM.getSearchResults(searchId=search_result['searchId'], fromIndex=0, + toIndex=count) + eles = [] + for i in nodeIds['nodeIds']: + ele = ChromiumElement(self, node_id=i) + if ele.tag in ('iframe', 'frame'): + src = ele.attr('src') + if src: + netloc1 = urlparse(src).netloc + netloc2 = urlparse(self.url).netloc + if netloc1 != netloc2: + ele = ChromiumFrame(self, ele) + eles.append(ele) + + return eles[0] if single else eles + + def wait_ele(self, + loc_or_ele: Union[str, tuple, ChromiumElement], + timeout: float = None) -> 'ChromiumElementWaiter': + """返回用于等待元素到达某个状态的等待器对象 \n + :param loc_or_ele: 可以是元素、查询字符串、loc元组 + :param timeout: 等待超时时间 + :return: 用于等待的ElementWaiter对象 + """ + return ChromiumElementWaiter(self, loc_or_ele, timeout) + + def scroll_to_see(self, loc_or_ele: Union[str, tuple, ChromiumElement]) -> None: + """滚动页面直到元素可见 \n + :param loc_or_ele: 元素的定位信息,可以是loc元组,或查询字符串(详见ele函数注释) + :return: None + """ + node_id = self.ele(loc_or_ele).node_id + try: + self._wait_driver.DOM.scrollIntoViewIfNeeded(nodeId=node_id) + except Exception: + self.ele(loc_or_ele).run_script("this.scrollIntoView();") + + def refresh(self, ignore_cache: bool = False) -> None: + """刷新当前页面 \n + :param ignore_cache: 是否忽略缓存 + :return: None + """ + self._driver.Page.reload(ignoreCache=ignore_cache) + + def forward(self, steps: int = 1) -> None: + """在浏览历史中前进若干步 \n + :param steps: 前进步数 + :return: None + """ + self.run_script(f'window.history.go({steps});', as_expr=True) + + def back(self, steps: int = 1) -> None: + """在浏览历史中后退若干步 \n + :param steps: 后退步数 + :return: None + """ + self.run_script(f'window.history.go({-steps});', as_expr=True) + + def stop_loading(self) -> None: + """页面停止加载""" + self._driver.Page.stopLoading() + self._get_document() + + def run_cdp(self, cmd: str, **cmd_args) -> dict: + """执行Chrome DevTools Protocol语句 \n + :param cmd: 协议项目 + :param cmd_args: 参数 + :return: 执行的结果 + """ + try: + return self._driver.call_method(cmd, **cmd_args) + except Exception as e: + if 'Could not find node with given id' in str(e): + raise RuntimeError('该元素已不在当前页面中。') + raise + + def set_user_agent(self, ua: str) -> None: + """为当前tab设置user agent,只在当前tab有效 \n + :param ua: user agent字符串 + :return: None + """ + self._wait_driver.Network.setUserAgentOverride(userAgent=ua) + + def get_session_storage(self, item: str = None) -> Union[str, dict, None]: + """获取sessionStorage信息,不设置item则获取全部 \n + :param item: 要获取的项,不设置则返回全部 + :return: sessionStorage一个或所有项内容 + """ + js = f'sessionStorage.getItem("{item}");' if item else 'sessionStorage;' + return self.run_script(js, as_expr=True) + + def get_local_storage(self, item: str = None) -> Union[str, dict, None]: + """获取localStorage信息,不设置item则获取全部 \n + :param item: 要获取的项目,不设置则返回全部 + :return: localStorage一个或所有项内容 + """ + js = f'localStorage.getItem("{item}");' if item else 'localStorage;' + return self.run_script(js, as_expr=True) + + def set_session_storage(self, item: str, value: Union[str, bool]) -> None: + """设置或删除某项sessionStorage信息 \n + :param item: 要设置的项 + :param value: 项的值,设置为False时,删除该项 + :return: None + """ + js = f'sessionStorage.removeItem("{item}");' if item is False else f'sessionStorage.setItem("{item}","{value}");' + return self.run_script(js, as_expr=True) + + def set_local_storage(self, item: str, value: Union[str, bool]) -> None: + """设置或删除某项localStorage信息 \n + :param item: 要设置的项 + :param value: 项的值,设置为False时,删除该项 + :return: None + """ + js = f'localStorage.removeItem("{item}");' if item is False else f'localStorage.setItem("{item}","{value}");' + return self.run_script(js, as_expr=True) + + def clear_cache(self, + session_storage: bool = True, + local_storage: bool = True, + cache: bool = True, + cookies: bool = True) -> None: + """清除缓存,可选要清除的项 \n + :param session_storage: 是否清除sessionStorage + :param local_storage: 是否清除localStorage + :param cache: 是否清除cache + :param cookies: 是否清除cookies + :return: None + """ + if session_storage: + self.run_script('sessionStorage.clear();', as_expr=True) + if local_storage: + self.run_script('localStorage.clear();', as_expr=True) + if cache: + self._wait_driver.Network.clearBrowserCache() + if cookies: + self._wait_driver.Network.clearBrowserCookies() + + def _d_connect(self, + to_url: str, + times: int = 0, + interval: float = 1, + show_errmsg: bool = False, + timeout: float = None) -> Union[bool, None]: + """尝试连接,重试若干次 \n + :param to_url: 要访问的url + :param times: 重试次数 + :param interval: 重试间隔(秒) + :param show_errmsg: 是否抛出异常 + :param timeout: 连接超时时间 + :return: 是否成功,返回None表示不确定 + """ + err = None + timeout = timeout if timeout is not None else self.timeouts.page_load + + for _ in range(times + 1): + result = self._driver.Page.navigate(url=to_url) + is_timeout = not self._wait_loading(timeout) + + if is_timeout: + err = TimeoutError('页面连接超时。') + if 'errorText' in result: + err = ConnectionError(result['errorText']) + + if not err: + break + + if _ < times: + sleep(interval) + if show_errmsg: + print(f'重试 {to_url}') + + if err and show_errmsg: + raise err if err is not None else ConnectionError('连接异常。') + + return False if err else True + + +class ChromiumFrame(ChromiumBase): + """实现浏览器frame的类""" + + def __init__(self, page, + ele: ChromiumElement): + """初始化 \n + :param page: 浏览器地址:端口、Tab对象或DriverOptions对象 + :param ele: 页面上的frame元素 + """ + self.page = page + self._inner_ele = ele + frame_id = page.run_cdp('DOM.describeNode', nodeId=ele.node_id)['node'].get('frameId', None) + super().__init__(page.address, frame_id, page.timeout) + + def __repr__(self) -> str: + attrs = [f"{attr}='{self.attrs[attr]}'" for attr in self.attrs] + return f'' + + @property + def tag(self) -> str: + """返回元素tag""" + return self._inner_ele.tag + + @property + def html(self) -> str: + """返回元素outerHTML文本""" + tag = self.tag + out_html = self.page.run_cdp('DOM.getOuterHTML', nodeId=self._inner_ele.node_id)['outerHTML'] + in_html = super().html + sign = search(rf'<{tag}.*?>', out_html).group(0) + return f'{sign}{in_html}' + + @property + def inner_html(self) -> str: + """返回元素innerHTML文本""" + return super().html + + @property + def attrs(self) -> dict: + return self._inner_ele.attrs + + @property + def frame_size(self) -> dict: + """返回frame元素大小""" + return self._inner_ele.size + + def _set_options(self) -> None: + self.set_timeouts(page_load=self.page.timeouts.page_load, + script=self.page.timeouts.script, + implicit=self.page.timeouts.implicit if self.timeout is None else self.timeout) + self._page_load_strategy = self.page.page_load_strategy + + @property + def obj_id(self) -> str: + """返回js中的object id""" + return self._inner_ele.obj_id + + @property + def node_id(self) -> str: + """返回cdp中的node id""" + return self._inner_ele.node_id + + @property + def location(self) -> dict: + """返回frame元素左上角的绝对坐标""" + return self._inner_ele.location + + @property + def is_displayed(self) -> bool: + """返回frame元素是否显示""" + return self._inner_ele.is_displayed + + def attr(self, attr: str) -> Union[str, None]: + """返回frame元素attribute属性值 \n + :param attr: 属性名 + :return: 属性值文本,没有该属性返回None + """ + return self._inner_ele.attr(attr) + + def set_attr(self, attr: str, value: str) -> None: + """设置frame元素attribute属性 \n + :param attr: 属性名 + :param value: 属性值 + :return: None + """ + self._inner_ele.set_attr(attr, value) + + def remove_attr(self, attr: str) -> None: + """删除frame元素attribute属性 \n + :param attr: 属性名 + :return: None + """ + self._inner_ele.remove_attr(attr) + + def parent(self, level_or_loc: Union[tuple, str, int] = 1) -> Union['ChromiumElement', None]: + """返回上面某一级父元素,可指定层数或用查询语法定位 \n + :param level_or_loc: 第几级父元素,或定位符 + :return: 上级元素对象 + """ + return self._inner_ele.parent(level_or_loc) + + def prev(self, + index: int = 1, + filter_loc: Union[tuple, str] = '', + timeout: float = 0) -> Union['ChromiumElement', str, None]: + """返回前面的一个兄弟元素,可用查询语法筛选,可指定返回筛选结果的第几个 \n + :param index: 前面第几个查询结果元素 + :param filter_loc: 用于筛选元素的查询语法 + :param timeout: 查找元素的超时时间 + :return: 兄弟元素 + """ + return self._inner_ele.prev(index, filter_loc, timeout) + + def next(self, + index: int = 1, + filter_loc: Union[tuple, str] = '', + timeout: float = 0) -> Union['ChromiumElement', str, None]: + """返回后面的一个兄弟元素,可用查询语法筛选,可指定返回筛选结果的第几个 \n + :param index: 后面第几个查询结果元素 + :param filter_loc: 用于筛选元素的查询语法 + :param timeout: 查找元素的超时时间 + :return: 兄弟元素 + """ + return self._inner_ele.next(index, filter_loc, timeout) + + def before(self, + index: int = 1, + filter_loc: Union[tuple, str] = '', + timeout: float = None) -> Union['ChromiumElement', str, None]: + """返回当前元素前面的一个元素,可指定筛选条件和第几个。查找范围不限兄弟元素,而是整个DOM文档 \n + :param index: 前面第几个查询结果元素 + :param filter_loc: 用于筛选元素的查询语法 + :param timeout: 查找元素的超时时间 + :return: 本元素前面的某个元素或节点 + """ + return self._inner_ele.before(index, filter_loc, timeout) + + def after(self, + index: int = 1, + filter_loc: Union[tuple, str] = '', + timeout: float = None) -> Union['ChromiumElement', str, None]: + """返回当前元素后面的一个元素,可指定筛选条件和第几个。查找范围不限兄弟元素,而是整个DOM文档 \n + :param index: 后面第几个查询结果元素 + :param filter_loc: 用于筛选元素的查询语法 + :param timeout: 查找元素的超时时间 + :return: 本元素后面的某个元素或节点 + """ + return self._inner_ele.after(index, filter_loc, timeout) + + def prevs(self, + filter_loc: Union[tuple, str] = '', + timeout: float = 0) -> List[Union['ChromiumElement', str]]: + """返回前面全部兄弟元素或节点组成的列表,可用查询语法筛选 \n + :param filter_loc: 用于筛选元素的查询语法 + :param timeout: 查找元素的超时时间 + :return: 兄弟元素或节点文本组成的列表 + """ + return self._inner_ele.prevs(filter_loc, timeout) + + def nexts(self, + filter_loc: Union[tuple, str] = '', + timeout: float = 0) -> List[Union['ChromiumElement', str]]: + """返回后面全部兄弟元素或节点组成的列表,可用查询语法筛选 \n + :param filter_loc: 用于筛选元素的查询语法 + :param timeout: 查找元素的超时时间 + :return: 兄弟元素或节点文本组成的列表 + """ + return self._inner_ele.nexts(filter_loc, timeout) + + def befores(self, + filter_loc: Union[tuple, str] = '', + timeout: float = None) -> List[Union['ChromiumElement', str]]: + """返回当前元素后面符合条件的全部兄弟元素或节点组成的列表,可用查询语法筛选。查找范围不限兄弟元素,而是整个DOM文档 \n + :param filter_loc: 用于筛选元素的查询语法 + :param timeout: 查找元素的超时时间 + :return: 本元素前面的元素或节点组成的列表 + """ + return self._inner_ele.befores(filter_loc, timeout) + + +class Timeout(object): + """用于保存d模式timeout信息的类""" + + def __init__(self, page): + self.page = page + self.page_load = 30 + self.script = 30 + + @property + def implicit(self): + return self.page.timeout \ No newline at end of file diff --git a/DrissionPage/chromium_element.py b/DrissionPage/chromium_element.py index 1522d6b..703659e 100644 --- a/DrissionPage/chromium_element.py +++ b/DrissionPage/chromium_element.py @@ -4,7 +4,6 @@ @Contact : g1879@qq.com @File : chromium_element.py """ -from json import loads from os import sep from os.path import basename from pathlib import Path @@ -13,15 +12,10 @@ from time import perf_counter, sleep from typing import Union, Tuple, List, Any from urllib.parse import urlparse -from requests import Session -from requests.cookies import RequestsCookieJar - -from .base import DrissionElement, BaseElement, BasePage +from .base import DrissionElement, BaseElement from .common import make_absolute_link, get_loc, get_ele_txt, format_html, is_js_func, _location_in_viewport -from .config import DriverOptions, _cookies_to_tuple from .keys import _keys_to_typing, _keyDescriptionForString, _keyDefinitions from .session_element import make_session_ele, SessionElement -from .tab import Tab class ChromiumElement(DrissionElement): @@ -53,7 +47,7 @@ class ChromiumElement(DrissionElement): def __call__(self, loc_or_str: Union[Tuple[str, str], str], - timeout: float = None) -> Union['ChromiumElement', 'ChromiumFrame', str, None]: + timeout: float = None) -> Union['ChromiumElement', str, None]: """在内部查找元素 \n 例:ele2 = ele1('@id=ele_id') \n :param loc_or_str: 元素的定位信息,可以是loc元组,或查询字符串 @@ -426,7 +420,7 @@ class ChromiumElement(DrissionElement): def ele(self, loc_or_str: Union[Tuple[str, str], str], - timeout: float = None) -> Union['ChromiumElement', 'ChromiumFrame', str, None]: + timeout: float = None) -> Union['ChromiumElement', str, None]: """返回当前元素下级符合条件的第一个元素、属性或节点文本 \n :param loc_or_str: 元素的定位信息,可以是loc元组,或查询字符串 :param timeout: 查找元素超时时间,默认与元素所在页面等待时间一致 @@ -436,7 +430,7 @@ class ChromiumElement(DrissionElement): def eles(self, loc_or_str: Union[Tuple[str, str], str], - timeout: float = None) -> List[Union['ChromiumElement', 'ChromiumFrame', str]]: + timeout: float = None) -> List[Union['ChromiumElement', str]]: """返回当前元素下级所有符合条件的子元素、属性或节点文本 \n :param loc_or_str: 元素的定位信息,可以是loc元组,或查询字符串 :param timeout: 查找元素超时时间,默认与元素所在页面等待时间一致 @@ -465,8 +459,8 @@ class ChromiumElement(DrissionElement): def _ele(self, loc_or_str: Union[Tuple[str, str], str], timeout: float = None, single: bool = True, relative=False) \ - -> Union['ChromiumElement', 'ChromiumFrame', str, None, - List[Union['ChromiumElement', 'ChromiumFrame', str]]]: + -> Union['ChromiumElement', str, None, + List[Union['ChromiumElement', str]]]: """返回当前元素下级符合条件的子元素、属性或节点文本,默认返回第一个 \n :param loc_or_str: 元素的定位信息,可以是loc元组,或查询字符串 :param timeout: 查找元素超时时间 @@ -1107,724 +1101,6 @@ class ChromiumShadowRootElement(BaseElement): return self.page.run_cdp('DOM.describeNode', nodeId=node_id)['node']['backendNodeId'] -class ChromiumBase(BasePage): - """标签页、frame、页面基类""" - - def __init__(self, - address: str, - tab_id: str = None, - timeout: float = None): - """初始化 \n - :param address: 浏览器地址:端口 - :param tab_id: 要控制的标签页id,不指定默认为激活的 - :param timeout: 超时时间 - """ - super().__init__(timeout) - self._is_loading = None - self._root_id = None - self._connect_browser(address, tab_id) - - def _connect_browser(self, - addr_tab_opts: Union[str, Tab, DriverOptions] = None, - tab_id: str = None) -> None: - """连接浏览器,在第一次时运行 \n - :param addr_tab_opts: 浏览器地址、Tab对象或DriverOptions对象 - :param tab_id: 要控制的标签页id,不指定默认为激活的 - :return: None - """ - self._root_id = None - self.timeouts = Timeout(self) - self._control_session = Session() - self._control_session.keep_alive = False - self._first_run = True - self._is_reading = False # 用于避免不同线程重复读取document - - self.address = addr_tab_opts - if not tab_id: - json = loads(self._control_session.get(f'http://{self.address}/json').text) - tab_id = [i['id'] for i in json if i['type'] == 'page'][0] - self._set_options() - self._init_page(tab_id) - self._get_document() - - def _init_page(self, tab_id: str = None) -> None: - """新建页面、页面刷新、切换标签页后要进行的cdp参数初始化 - :param tab_id: 要跳转到的标签页id - :return: None - """ - self._is_loading = True - if tab_id: - self._tab_obj = Tab(id=tab_id, type='page', - webSocketDebuggerUrl=f'ws://{self.address}/devtools/page/{tab_id}') - - self._tab_obj.start() - self._tab_obj.DOM.enable() - self._tab_obj.Page.enable() - - self._tab_obj.Page.frameNavigated = self._onFrameNavigated - self._tab_obj.Page.loadEventFired = self._onLoadEventFired - - def _get_document(self) -> None: - """刷新cdp使用的document数据""" - if not self._is_reading: - self._is_reading = True - self._wait_loading() - root_id = self._tab_obj.DOM.getDocument()['root']['nodeId'] - self._root_id = self._tab_obj.DOM.resolveNode(nodeId=root_id)['object']['objectId'] - self._is_loading = False - self._is_reading = False - - def _wait_loading(self, timeout: float = None) -> bool: - """等待页面加载完成 - :param timeout: 超时时间 - :return: 是否成功,超时返回False - """ - timeout = timeout if timeout is not None else self.timeouts.page_load - - end_time = perf_counter() + timeout - while perf_counter() < end_time: - state = self.ready_state - if state == 'complete': - return True - elif self.page_load_strategy == 'eager' and state in ('interactive', 'complete'): - self.stop_loading() - return True - elif self.page_load_strategy == 'none': - self.stop_loading() - return True - sleep(.1) - - self.stop_loading() - return False - - def _onLoadEventFired(self, **kwargs): - """在页面刷新、变化后重新读取页面内容""" - if self._first_run is False and self._is_loading: - self._get_document() - - def _onFrameNavigated(self, **kwargs): - """页面跳转时触发""" - if not kwargs['frame'].get('parentId', None): - self._is_loading = True - - def _set_options(self) -> None: - pass - - def __call__(self, loc_or_str: Union[Tuple[str, str], str, 'ChromiumElement'], - timeout: float = None) -> Union['ChromiumElement', 'ChromiumFrame', None]: - """在内部查找元素 \n - 例:ele = page('@id=ele_id') \n - :param loc_or_str: 元素的定位信息,可以是loc元组,或查询字符串 - :param timeout: 超时时间 - :return: ChromiumElement对象 - """ - return self.ele(loc_or_str, timeout) - - @property - def driver(self) -> Tab: - """返回用于控制浏览器的Tab对象""" - return self._tab_obj - - @property - def _driver(self): - return self._tab_obj - - @property - def _wait_driver(self) -> Tab: - """返回用于控制浏览器的Tab对象,会先等待页面加载完毕""" - while self._is_loading: - sleep(.1) - self._wait_loading() - return self._tab_obj - - @property - def is_loading(self) -> bool: - """返回页面是否正在加载状态""" - return self._is_loading - - @property - def url(self) -> str: - """返回当前页面url""" - json = self._control_session.get(f'http://{self.address}/json').json() - return [i['url'] for i in json if i['id'] == self._tab_obj.id][0] # change_mode要调用,不能用_driver - - @property - def html(self) -> str: - """返回当前页面html文本""" - node_id = self._wait_driver.DOM.getDocument()['root']['nodeId'] - return self._wait_driver.DOM.getOuterHTML(nodeId=node_id)['outerHTML'] - - @property - def json(self) -> dict: - """当返回内容是json格式时,返回对应的字典""" - return loads(self('t:pre').text) - - @property - def tab_id(self) -> str: - """返回当前标签页id""" - return self.driver.id if self.driver.status == 'started' else '' - - @property - def ready_state(self) -> str: - """返回当前页面加载状态,'loading' 'interactive' 'complete'""" - return self._driver.Runtime.evaluate(expression='document.readyState;')['result']['value'] - - @property - def size(self) -> dict: - """返回页面总长宽,{'height': int, 'width': int}""" - w = self.run_script('document.body.scrollWidth;', as_expr=True) - h = self.run_script('document.body.scrollHeight;', as_expr=True) - return {'height': h, 'width': w} - - @property - def active_ele(self) -> ChromiumElement: - """返回当前焦点所在元素""" - return self.run_script('return document.activeElement;') - - @property - def page_load_strategy(self) -> str: - """返回页面加载策略""" - return self._page_load_strategy - - @property - def scroll(self) -> 'ChromeScroll': - """返回用于滚动滚动条的对象""" - if not hasattr(self, '_scroll'): - self._scroll = ChromeScroll(self) - return self._scroll - - def set_page_load_strategy(self, value: str) -> None: - """设置页面加载策略 \n - :param value: 可选'normal', 'eager', 'none' - :return: None - """ - if value not in ('normal', 'eager', 'none'): - raise ValueError("只能选择'normal', 'eager', 'none'。") - self._page_load_strategy = value - - def set_timeouts(self, implicit: float = None, page_load: float = None, script: float = None) -> None: - """设置超时时间,单位为秒 \n - :param implicit: 查找元素超时时间 - :param page_load: 页面加载超时时间 - :param script: 脚本运行超时时间 - :return: None - """ - if implicit is not None: - self.timeout = implicit - - if page_load is not None: - self.timeouts.page_load = page_load - - if script is not None: - self.timeouts.script = script - - def run_script(self, script: str, as_expr: bool = False, *args: Any) -> Any: - """运行javascript代码 \n - :param script: js文本 - :param as_expr: 是否作为表达式运行,为True时args无效 - :param args: 参数,按顺序在js文本中对应argument[0]、argument[2]... - :return: 运行的结果 - """ - return _run_script(self, script, as_expr, self.timeouts.script, args) - - def run_async_script(self, script: str, as_expr: bool = False, *args: Any) -> None: - """以异步方式执行js代码 \n - :param script: js文本 - :param as_expr: 是否作为表达式运行,为True时args无效 - :param args: 参数,按顺序在js文本中对应argument[0]、argument[2]... - :return: None - """ - from threading import Thread - Thread(target=_run_script, args=(self, script, as_expr, self.timeouts.script, args)).start() - - def get(self, - url: str, - show_errmsg: bool = False, - retry: int = None, - interval: float = None, - timeout: float = None) -> Union[None, bool]: - """访问url \n - :param url: 目标url - :param show_errmsg: 是否显示和抛出异常 - :param retry: 重试次数 - :param interval: 重试间隔(秒) - :param timeout: 连接超时时间 - :return: 目标url是否可用,返回None表示不确定 - """ - retry, interval = self._before_connect(url, retry, interval) - self._url_available = self._d_connect(self._url, - times=retry, - interval=interval, - show_errmsg=show_errmsg, - timeout=timeout) - return self._url_available - - def get_cookies(self, as_dict: bool = False) -> Union[list, dict]: - """获取cookies信息 \n - :param as_dict: 为True时返回由{name: value}键值对组成的dict - :return: cookies信息 - """ - cookies = self._wait_driver.Network.getCookies()['cookies'] - if as_dict: - return {cookie['name']: cookie['value'] for cookie in cookies} - else: - return cookies - - def set_cookies(self, cookies: Union[RequestsCookieJar, list, tuple, str, dict]) -> None: - """设置cookies值 \n - :param cookies: cookies信息 - :return: None - """ - cookies = _cookies_to_tuple(cookies) - result_cookies = [] - for cookie in cookies: - if not cookie.get('domain', None): - continue - c = {'value': '' if cookie['value'] is None else cookie['value'], - 'name': cookie['name'], - 'domain': cookie['domain']} - result_cookies.append(c) - self._wait_driver.Network.setCookies(cookies=result_cookies) - - def ele(self, - loc_or_ele: Union[Tuple[str, str], str, ChromiumElement, 'ChromiumFrame'], - timeout: float = None) -> Union[ChromiumElement, 'ChromiumFrame', None]: - """获取第一个符合条件的元素对象 \n - :param loc_or_ele: 定位符或元素对象 - :param timeout: 查找超时时间 - :return: ChromiumElement对象 - """ - return self._ele(loc_or_ele, timeout=timeout) - - def eles(self, - loc_or_ele: Union[Tuple[str, str], str], - timeout: float = None) -> List[Union[ChromiumElement, 'ChromiumFrame']]: - """获取所有符合条件的元素对象 \n - :param loc_or_ele: 定位符或元素对象 - :param timeout: 查找超时时间 - :return: ChromiumElement对象组成的列表 - """ - return self._ele(loc_or_ele, timeout=timeout, single=False) - - def s_ele(self, loc_or_ele: Union[Tuple[str, str], str, ChromiumElement] = None) \ - -> Union[SessionElement, str, None]: - """查找第一个符合条件的元素以SessionElement形式返回,处理复杂页面时效率很高 \n - :param loc_or_ele: 元素的定位信息,可以是loc元组,或查询字符串 - :return: SessionElement对象或属性、文本 - """ - if isinstance(loc_or_ele, ChromiumElement): - return make_session_ele(loc_or_ele) - else: - return make_session_ele(self, loc_or_ele) - - def s_eles(self, loc_or_str: Union[Tuple[str, str], str] = None) -> List[Union[SessionElement, str]]: - """查找所有符合条件的元素以SessionElement列表形式返回 \n - :param loc_or_str: 元素的定位信息,可以是loc元组,或查询字符串 - :return: SessionElement对象组成的列表 - """ - return make_session_ele(self, loc_or_str, single=False) - - def _ele(self, - loc_or_ele: Union[Tuple[str, str], str, ChromiumElement, 'ChromiumFrame'], - timeout: float = None, single: bool = True, relative: bool = False) \ - -> Union[ChromiumElement, 'ChromiumFrame', None, List[Union[ChromiumElement, 'ChromiumFrame']]]: - """执行元素查找 - :param loc_or_ele: 定位符或元素对象 - :param timeout: 查找超时时间 - :param single: 是否只返回第一个 - :return: ChromiumElement对象或元素对象组成的列表 - """ - if isinstance(loc_or_ele, (str, tuple)): - loc = get_loc(loc_or_ele)[1] - elif isinstance(loc_or_ele, ChromiumElement): - return loc_or_ele - else: - raise ValueError('loc_or_str参数只能是tuple、str、ChromiumElement类型。') - - timeout = timeout if timeout is not None else self.timeout - search_result = self._wait_driver.DOM.performSearch(query=loc, includeUserAgentShadowDOM=True) - count = search_result['resultCount'] - - end_time = perf_counter() + timeout - while count == 0 and perf_counter() < end_time: - search_result = self._wait_driver.DOM.performSearch(query=loc, includeUserAgentShadowDOM=True) - count = search_result['resultCount'] - - if count == 0: - return None if single else [] - - count = 1 if single else count - nodeIds = self._wait_driver.DOM.getSearchResults(searchId=search_result['searchId'], fromIndex=0, - toIndex=count) - eles = [] - for i in nodeIds['nodeIds']: - ele = ChromiumElement(self, node_id=i) - if ele.tag in ('iframe', 'frame'): - src = ele.attr('src') - if src: - netloc1 = urlparse(src).netloc - netloc2 = urlparse(self.url).netloc - if netloc1 != netloc2: - ele = ChromiumFrame(self, ele) - eles.append(ele) - - return eles[0] if single else eles - - def wait_ele(self, - loc_or_ele: Union[str, tuple, ChromiumElement], - timeout: float = None) -> 'ChromiumElementWaiter': - """返回用于等待元素到达某个状态的等待器对象 \n - :param loc_or_ele: 可以是元素、查询字符串、loc元组 - :param timeout: 等待超时时间 - :return: 用于等待的ElementWaiter对象 - """ - return ChromiumElementWaiter(self, loc_or_ele, timeout) - - def scroll_to_see(self, loc_or_ele: Union[str, tuple, ChromiumElement]) -> None: - """滚动页面直到元素可见 \n - :param loc_or_ele: 元素的定位信息,可以是loc元组,或查询字符串(详见ele函数注释) - :return: None - """ - node_id = self.ele(loc_or_ele).node_id - try: - self._wait_driver.DOM.scrollIntoViewIfNeeded(nodeId=node_id) - except Exception: - self.ele(loc_or_ele).run_script("this.scrollIntoView();") - - def refresh(self, ignore_cache: bool = False) -> None: - """刷新当前页面 \n - :param ignore_cache: 是否忽略缓存 - :return: None - """ - self._driver.Page.reload(ignoreCache=ignore_cache) - - def forward(self, steps: int = 1) -> None: - """在浏览历史中前进若干步 \n - :param steps: 前进步数 - :return: None - """ - self.run_script(f'window.history.go({steps});', as_expr=True) - - def back(self, steps: int = 1) -> None: - """在浏览历史中后退若干步 \n - :param steps: 后退步数 - :return: None - """ - self.run_script(f'window.history.go({-steps});', as_expr=True) - - def stop_loading(self) -> None: - """页面停止加载""" - self._driver.Page.stopLoading() - self._get_document() - - def run_cdp(self, cmd: str, **cmd_args) -> dict: - """执行Chrome DevTools Protocol语句 \n - :param cmd: 协议项目 - :param cmd_args: 参数 - :return: 执行的结果 - """ - try: - return self._driver.call_method(cmd, **cmd_args) - except Exception as e: - if 'Could not find node with given id' in str(e): - raise RuntimeError('该元素已不在当前页面中。') - raise - - def set_user_agent(self, ua: str) -> None: - """为当前tab设置user agent,只在当前tab有效 \n - :param ua: user agent字符串 - :return: None - """ - self._wait_driver.Network.setUserAgentOverride(userAgent=ua) - - def get_session_storage(self, item: str = None) -> Union[str, dict, None]: - """获取sessionStorage信息,不设置item则获取全部 \n - :param item: 要获取的项,不设置则返回全部 - :return: sessionStorage一个或所有项内容 - """ - js = f'sessionStorage.getItem("{item}");' if item else 'sessionStorage;' - return self.run_script(js, as_expr=True) - - def get_local_storage(self, item: str = None) -> Union[str, dict, None]: - """获取localStorage信息,不设置item则获取全部 \n - :param item: 要获取的项目,不设置则返回全部 - :return: localStorage一个或所有项内容 - """ - js = f'localStorage.getItem("{item}");' if item else 'localStorage;' - return self.run_script(js, as_expr=True) - - def set_session_storage(self, item: str, value: Union[str, bool]) -> None: - """设置或删除某项sessionStorage信息 \n - :param item: 要设置的项 - :param value: 项的值,设置为False时,删除该项 - :return: None - """ - js = f'sessionStorage.removeItem("{item}");' if item is False else f'sessionStorage.setItem("{item}","{value}");' - return self.run_script(js, as_expr=True) - - def set_local_storage(self, item: str, value: Union[str, bool]) -> None: - """设置或删除某项localStorage信息 \n - :param item: 要设置的项 - :param value: 项的值,设置为False时,删除该项 - :return: None - """ - js = f'localStorage.removeItem("{item}");' if item is False else f'localStorage.setItem("{item}","{value}");' - return self.run_script(js, as_expr=True) - - def clear_cache(self, - session_storage: bool = True, - local_storage: bool = True, - cache: bool = True, - cookies: bool = True) -> None: - """清除缓存,可选要清除的项 \n - :param session_storage: 是否清除sessionStorage - :param local_storage: 是否清除localStorage - :param cache: 是否清除cache - :param cookies: 是否清除cookies - :return: None - """ - if session_storage: - self.run_script('sessionStorage.clear();', as_expr=True) - if local_storage: - self.run_script('localStorage.clear();', as_expr=True) - if cache: - self._wait_driver.Network.clearBrowserCache() - if cookies: - self._wait_driver.Network.clearBrowserCookies() - - def _d_connect(self, - to_url: str, - times: int = 0, - interval: float = 1, - show_errmsg: bool = False, - timeout: float = None) -> Union[bool, None]: - """尝试连接,重试若干次 \n - :param to_url: 要访问的url - :param times: 重试次数 - :param interval: 重试间隔(秒) - :param show_errmsg: 是否抛出异常 - :param timeout: 连接超时时间 - :return: 是否成功,返回None表示不确定 - """ - err = None - timeout = timeout if timeout is not None else self.timeouts.page_load - - for _ in range(times + 1): - result = self._driver.Page.navigate(url=to_url) - is_timeout = not self._wait_loading(timeout) - - if is_timeout: - err = TimeoutError('页面连接超时。') - if 'errorText' in result: - err = ConnectionError(result['errorText']) - - if not err: - break - - if _ < times: - sleep(interval) - if show_errmsg: - print(f'重试 {to_url}') - - if err and show_errmsg: - raise err if err is not None else ConnectionError('连接异常。') - - return False if err else True - - -class ChromiumFrame(ChromiumBase): - """实现浏览器frame的类""" - - def __init__(self, page, - ele: ChromiumElement): - """初始化 \n - :param page: 浏览器地址:端口、Tab对象或DriverOptions对象 - :param ele: 页面上的frame元素 - """ - self.page = page - self._inner_ele = ele - frame_id = page.run_cdp('DOM.describeNode', nodeId=ele.node_id)['node'].get('frameId', None) - super().__init__(page.address, frame_id, page.timeout) - - def __repr__(self) -> str: - attrs = [f"{attr}='{self.attrs[attr]}'" for attr in self.attrs] - return f'' - - @property - def tag(self) -> str: - """返回元素tag""" - return self._inner_ele.tag - - @property - def html(self) -> str: - """返回元素outerHTML文本""" - tag = self.tag - out_html = self.page.run_cdp('DOM.getOuterHTML', nodeId=self._inner_ele.node_id)['outerHTML'] - in_html = super().html - sign = search(rf'<{tag}.*?>', out_html).group(0) - return f'{sign}{in_html}' - - @property - def inner_html(self) -> str: - """返回元素innerHTML文本""" - return super().html - - @property - def attrs(self) -> dict: - return self._inner_ele.attrs - - @property - def frame_size(self) -> dict: - """返回frame元素大小""" - return self._inner_ele.size - - def _set_options(self) -> None: - self.set_timeouts(page_load=self.page.timeouts.page_load, - script=self.page.timeouts.script, - implicit=self.page.timeouts.implicit if self.timeout is None else self.timeout) - self._page_load_strategy = self.page.page_load_strategy - - @property - def obj_id(self) -> str: - """返回js中的object id""" - return self._inner_ele.obj_id - - @property - def node_id(self) -> str: - """返回cdp中的node id""" - return self._inner_ele.node_id - - @property - def location(self) -> dict: - """返回frame元素左上角的绝对坐标""" - return self._inner_ele.location - - @property - def is_displayed(self) -> bool: - """返回frame元素是否显示""" - return self._inner_ele.is_displayed - - def attr(self, attr: str) -> Union[str, None]: - """返回frame元素attribute属性值 \n - :param attr: 属性名 - :return: 属性值文本,没有该属性返回None - """ - return self._inner_ele.attr(attr) - - def set_attr(self, attr: str, value: str) -> None: - """设置frame元素attribute属性 \n - :param attr: 属性名 - :param value: 属性值 - :return: None - """ - self._inner_ele.set_attr(attr, value) - - def remove_attr(self, attr: str) -> None: - """删除frame元素attribute属性 \n - :param attr: 属性名 - :return: None - """ - self._inner_ele.remove_attr(attr) - - def parent(self, level_or_loc: Union[tuple, str, int] = 1) -> Union['ChromiumElement', None]: - """返回上面某一级父元素,可指定层数或用查询语法定位 \n - :param level_or_loc: 第几级父元素,或定位符 - :return: 上级元素对象 - """ - return self._inner_ele.parent(level_or_loc) - - def prev(self, - index: int = 1, - filter_loc: Union[tuple, str] = '', - timeout: float = 0) -> Union['ChromiumElement', str, None]: - """返回前面的一个兄弟元素,可用查询语法筛选,可指定返回筛选结果的第几个 \n - :param index: 前面第几个查询结果元素 - :param filter_loc: 用于筛选元素的查询语法 - :param timeout: 查找元素的超时时间 - :return: 兄弟元素 - """ - return self._inner_ele.prev(index, filter_loc, timeout) - - def next(self, - index: int = 1, - filter_loc: Union[tuple, str] = '', - timeout: float = 0) -> Union['ChromiumElement', str, None]: - """返回后面的一个兄弟元素,可用查询语法筛选,可指定返回筛选结果的第几个 \n - :param index: 后面第几个查询结果元素 - :param filter_loc: 用于筛选元素的查询语法 - :param timeout: 查找元素的超时时间 - :return: 兄弟元素 - """ - return self._inner_ele.next(index, filter_loc, timeout) - - def before(self, - index: int = 1, - filter_loc: Union[tuple, str] = '', - timeout: float = None) -> Union['ChromiumElement', str, None]: - """返回当前元素前面的一个元素,可指定筛选条件和第几个。查找范围不限兄弟元素,而是整个DOM文档 \n - :param index: 前面第几个查询结果元素 - :param filter_loc: 用于筛选元素的查询语法 - :param timeout: 查找元素的超时时间 - :return: 本元素前面的某个元素或节点 - """ - return self._inner_ele.before(index, filter_loc, timeout) - - def after(self, - index: int = 1, - filter_loc: Union[tuple, str] = '', - timeout: float = None) -> Union['ChromiumElement', str, None]: - """返回当前元素后面的一个元素,可指定筛选条件和第几个。查找范围不限兄弟元素,而是整个DOM文档 \n - :param index: 后面第几个查询结果元素 - :param filter_loc: 用于筛选元素的查询语法 - :param timeout: 查找元素的超时时间 - :return: 本元素后面的某个元素或节点 - """ - return self._inner_ele.after(index, filter_loc, timeout) - - def prevs(self, - filter_loc: Union[tuple, str] = '', - timeout: float = 0) -> List[Union['ChromiumElement', str]]: - """返回前面全部兄弟元素或节点组成的列表,可用查询语法筛选 \n - :param filter_loc: 用于筛选元素的查询语法 - :param timeout: 查找元素的超时时间 - :return: 兄弟元素或节点文本组成的列表 - """ - return self._inner_ele.prevs(filter_loc, timeout) - - def nexts(self, - filter_loc: Union[tuple, str] = '', - timeout: float = 0) -> List[Union['ChromiumElement', str]]: - """返回后面全部兄弟元素或节点组成的列表,可用查询语法筛选 \n - :param filter_loc: 用于筛选元素的查询语法 - :param timeout: 查找元素的超时时间 - :return: 兄弟元素或节点文本组成的列表 - """ - return self._inner_ele.nexts(filter_loc, timeout) - - def befores(self, - filter_loc: Union[tuple, str] = '', - timeout: float = None) -> List[Union['ChromiumElement', str]]: - """返回当前元素后面符合条件的全部兄弟元素或节点组成的列表,可用查询语法筛选。查找范围不限兄弟元素,而是整个DOM文档 \n - :param filter_loc: 用于筛选元素的查询语法 - :param timeout: 查找元素的超时时间 - :return: 本元素前面的元素或节点组成的列表 - """ - return self._inner_ele.befores(filter_loc, timeout) - - -class Timeout(object): - """用于保存d模式timeout信息的类""" - - def __init__(self, page): - self.page = page - self.page_load = 30 - self.script = 30 - - @property - def implicit(self): - return self.page.timeout - - def make_chromium_ele(ele: ChromiumElement, loc: Union[str, Tuple[str, str]], single: bool = True, @@ -1962,7 +1238,14 @@ def _make_chromium_ele(page, node_id: str = None, obj_id: str = None): """根据node id或object id生成相应元素对象""" ele = ChromiumElement(page, obj_id=obj_id, node_id=node_id) if ele.tag in ('iframe', 'frame') and ele.attr('src'): - ele = ChromiumFrame(page, ele) + src = ele.attr('src') + if src: + netloc1 = urlparse(src).netloc + netloc2 = urlparse(page.url).netloc + if netloc1 != netloc2: + from .chromium_base import ChromiumFrame + ele = ChromiumFrame(page, ele) + return ele diff --git a/DrissionPage/chromium_page.py b/DrissionPage/chromium_page.py index fbaa444..7012a90 100644 --- a/DrissionPage/chromium_page.py +++ b/DrissionPage/chromium_page.py @@ -7,7 +7,7 @@ from typing import Union, Tuple, List from requests import Session -from .chromium_element import Timeout, ChromiumBase +from .chromium_base import Timeout, ChromiumBase from .chromium_tab import ChromiumTab from .config import DriverOptions from .drission import connect_chrome diff --git a/DrissionPage/chromium_tab.py b/DrissionPage/chromium_tab.py index 5f03dd0..64462c4 100644 --- a/DrissionPage/chromium_tab.py +++ b/DrissionPage/chromium_tab.py @@ -1,5 +1,5 @@ # -*- coding:utf-8 -*- -from .chromium_element import ChromiumBase +from .chromium_base import ChromiumBase class ChromiumTab(ChromiumBase): diff --git a/DrissionPage/web_page.py b/DrissionPage/web_page.py index d22e6cb..acd0ca3 100644 --- a/DrissionPage/web_page.py +++ b/DrissionPage/web_page.py @@ -6,8 +6,9 @@ from DownloadKit import DownloadKit from requests import Session, Response from tldextract import extract +from .chromium_base import ChromiumBase, ChromiumFrame from .base import BasePage -from .chromium_element import ChromiumElement, ChromiumFrame, ChromiumBase +from .chromium_element import ChromiumElement # , ChromiumBase from .chromium_page import ChromiumPage from .config import DriverOptions, SessionOptions, _cookies_to_tuple from .session_element import SessionElement @@ -56,10 +57,10 @@ class WebPage(SessionPage, ChromiumPage, BasePage): :param timeout: 超时时间 :return: 子元素对象 """ - if self._mode == 's': - return super().__call__(loc_or_str) - elif self._mode == 'd': + if self._mode == 'd': return super(SessionPage, self).__call__(loc_or_str, timeout) + elif self._mode == 's': + return super().__call__(loc_or_str) # -----------------共有属性和方法------------------- @property diff --git a/setup.py b/setup.py index dd2e82d..f7df1a8 100644 --- a/setup.py +++ b/setup.py @@ -6,7 +6,7 @@ with open("README.md", "r", encoding='utf-8') as fh: setup( name="DrissionPage", - version="3.0.11", + version="3.0.12", author="g1879", author_email="g1879@qq.com", description="A module that integrates selenium and requests session, encapsulates common page operations.",