diff --git a/DrissionPage/base.py b/DrissionPage/base.py index 2f5e497..5fc4669 100644 --- a/DrissionPage/base.py +++ b/DrissionPage/base.py @@ -46,14 +46,9 @@ class BaseParser(object): class BaseElement(BaseParser): """各元素类的基类""" - def __init__(self, ele: Union[WebElement, HtmlElement], page=None): - self._inner_ele = ele + def __init__(self, page=None): self.page = page - @property - def inner_ele(self) -> Union[WebElement, HtmlElement]: - return self._inner_ele - # ----------------以下属性或方法由后代实现---------------- @property def tag(self): diff --git a/DrissionPage/chrome_element.py b/DrissionPage/chrome_element.py index 8157a6a..005b17b 100644 --- a/DrissionPage/chrome_element.py +++ b/DrissionPage/chrome_element.py @@ -1,13 +1,17 @@ # -*- coding:utf-8 -*- # 问题:跨iframe查找元素可能出现同名元素如何解决 # 须用DOM.documentUpdated检测元素有效性 +from typing import Union, Tuple, List + +from .base import DrissionElement +from .common import make_absolute_link, get_loc -class ChromeElement(object): +class ChromeElement(DrissionElement): def __init__(self, page, node_id: str = None, obj_id: str = None): - self.page = page + super().__init__(page) if not node_id and not obj_id: - raise TypeError('node_id或obj_id必须传入一个') + raise TypeError('node_id或obj_id必须传入一个。') if node_id: self._node_id = node_id @@ -17,18 +21,86 @@ class ChromeElement(object): self._obj_id = obj_id @property - def html(self): + def html(self) -> str: + """返回元素outerHTML文本""" return self.page.driver.DOM.getOuterHTML(nodeId=self._node_id)['outerHTML'] - def ele(self, xpath: str): - # todo: 引号记得转码 - js = f'''function(){{ - frame=this.contentDocument; - return document.evaluate("{xpath}", frame, null, 9, null).singleNodeValue; - }}''' - r = self.page.driver.Runtime.callFunctionOn(functionDeclaration=js, - objectId=self._obj_id)['result'].get('objectId', None) - return r if not r else _ele(self.page, obj_id=r) + @property + def inner_html(self) -> str: + """返回元素innerHTML文本""" + return self.page.driver.Runtime.callFunctionOn('function(){this.innerHTML;}') + + @property + def attrs(self) -> dict: + attrs = self.page.driver.DOM.getAttributes(nodeId=self._node_id)['attributes'] + attrs_len = len(attrs) + return {attrs[i]: attrs[i + 1] for i in range(0, attrs_len, 2)} + + def ele(self, + loc_or_str: Union[Tuple[str, str], str], + timeout: float = None) -> Union['ChromeElement', str, None]: + """返回当前元素下级符合条件的第一个元素、属性或节点文本 \n + :param loc_or_str: 元素的定位信息,可以是loc元组,或查询字符串 + :param timeout: 查找元素超时时间,默认与元素所在页面等待时间一致 + :return: DriverElement对象或属性、文本 + """ + return self._ele(loc_or_str, timeout) + + def eles(self, + loc_or_str: Union[Tuple[str, str], str], + timeout: float = None) -> List[Union['ChromeElement', str]]: + """返回当前元素下级所有符合条件的子元素、属性或节点文本 \n + :param loc_or_str: 元素的定位信息,可以是loc元组,或查询字符串 + :param timeout: 查找元素超时时间,默认与元素所在页面等待时间一致 + :return: DriverElement对象或属性、文本组成的列表 + """ + return self._ele(loc_or_str, timeout=timeout, single=False) + + def _ele(self, + loc_or_str: Union[Tuple[str, str], str], + timeout: float = None, + single: bool = True) -> Union['ChromeElement', str, None, List[Union['ChromeElement', str]]]: + """返回当前元素下级符合条件的子元素、属性或节点文本,默认返回第一个 \n + :param loc_or_str: 元素的定位信息,可以是loc元组,或查询字符串 + :param timeout: 查找元素超时时间 + :param single: True则返回第一个,False则返回全部 + :return: DriverElement对象 + """ + return make_chrome_ele(self, loc_or_str, single, timeout) + + def attr(self, attr: str) -> Union[str, None]: + """返回attribute属性值 \n + :param attr: 属性名 + :return: 属性值文本,没有该属性返回None + """ + # 获取href属性时返回绝对url + attrs = self.attrs + if attr == 'href': + link = attrs['href'] + # 若为链接为None、js或邮件,直接返回 + if not link or link.lower().startswith(('javascript:', 'mailto:')): + return link + + else: # 其它情况直接返回绝对url + return make_absolute_link(link, self.page) + + elif attr == 'src': + return make_absolute_link(attrs['src'], self.page) + + elif attr == 'text': + return self.text + + elif attr == 'innerText': + return self.raw_text + + elif attr in ('html', 'outerHTML'): + return self.html + + elif attr == 'innerHTML': + return self.inner_html + + else: + return attrs[attr] def click(self, by_js: bool = True): if by_js: @@ -41,6 +113,237 @@ class ChromeElement(object): def _get_node_id(self, obj_id): return self.page.driver.DOM.requestNode(objectId=obj_id)['nodeId'] + @property + def tag(self) -> str: + return self.page.driver.DOM.describeNode(nodeId=self._node_id)['node']['localName'] -def _ele(page, node_id=None, obj_id=None) -> ChromeElement: - return ChromeElement(page=page, node_id=node_id, obj_id=obj_id) + @property + def is_valid(self): + return True + + @property + def text(self): + return + + @property + def raw_text(self): + return + + def _get_ele_path(self, mode): + return '' + + +def make_chrome_ele(ele: ChromeElement, + loc: Union[str, Tuple[str, str]], + single: bool = True, + timeout: float = None) -> Union[ChromeElement, str, None, List[Union[ChromeElement, str]]]: + """在chrome元素中查找 \n + :param ele: ChromeElement对象 + :param loc: 元素定位元组 + :param single: True则返回第一个,False则返回全部 + :param timeout: 查找元素超时时间 + :return: 返回DriverElement元素或它们组成的列表 + """ + # ---------------处理定位符--------------- + if isinstance(loc, (str, tuple)): + loc = get_loc(loc) + else: + raise ValueError("定位符必须为str或长度为2的tuple对象。") + + loc_str = loc[1] + if loc[0] == 'xpath' and loc[1].lstrip().startswith('/'): + loc_str = f'.{loc_str}' + elif loc[0] == 'css selector' and loc[1].lstrip().startswith('>'): + loc_str = f'{ele.css_path}{loc[1]}' + loc = loc[0], loc_str + + timeout = timeout if timeout is not None else ele.page.timeout + + # ---------------执行查找----------------- + if loc[0] == 'xpath': + type_txt = '9' if single else '7' + node_txt = 'this.contentDocument' if ele.tag in ('iframe', 'frame') else 'this' + js = _make_js(loc[1], type_txt, node_txt) + print(js) + r = ele.page.driver.Runtime.callFunctionOn(functionDeclaration=js, objectId=ele._obj_id,) + # print(r) + if r['result']['type'] == 'string': + return r['result']['value'] + if r['result']['subtype'] == 'null': + return None if single else [] + if r['result']['className'] == 'TypeError': + if 'The result is not a node set' in r['result']['description']: + js = _make_js(loc[1], '1', node_txt) + r = ele.page.driver.Runtime.callFunctionOn(functionDeclaration=js, objectId=ele._obj_id) + return r['result']['value'] + + else: + raise RuntimeError(r['result']['description']) + + elif 'objectId' in r['result']: + if not single: + r = ele.page.driver.Runtime.getProperties(objectId=r['result']['objectId'])['result'] + result = [] + for i in r: + if not i['enumerable']: + break + result.append(ChromeElement(ele.page, obj_id=i['value']['objectId'])) + r = result + + return r + + # try: + # # 使用xpath查找 + # if loc[0] == 'xpath': + # js = _make_js() + # r = ele.page.driver.Runtime.callFunctionOn(functionDeclaration=js, + # objectId=self._obj_id)['result'].get('objectId', None) + # return r if not r else _ele(self.page, obj_id=r) + # + # return wait.until(ElementsByXpath(page, loc[1], single, timeout)) + # + # # 使用css selector查找 + # else: + # if single: + # return DriverElement(wait.until(ec.presence_of_element_located(loc)), page) + # else: + # eles = wait.until(ec.presence_of_all_elements_located(loc)) + # return [DriverElement(ele, page) for ele in eles] + # + # except TimeoutException: + # return [] if not single else None + # + # except InvalidElementStateException: + # raise ValueError(f'无效的查找语句:{loc}') + + +def _make_js(xpath: str, type_txt: str, node_txt: str): + for_txt = '' + + # 获取第一个元素、节点或属性 + if type_txt == '9': + return_txt = ''' +if(e.singleNodeValue==null){return null;} +else if(e.singleNodeValue.constructor.name=="Text"){return e.singleNodeValue.data;} +else if(e.singleNodeValue.constructor.name=="Attr"){return e.singleNodeValue.nodeValue;} +else if(e.singleNodeValue.constructor.name=="Comment"){return e.singleNodeValue.nodeValue;} +else{return e.singleNodeValue;}''' + + # 按顺序获取所有元素、节点或属性 + elif type_txt == '7': + for_txt = """ +var a=new Array(); +for(var i = 0; i Union[str, DriverElement, None, List[str or DriverElement]]: +# +# def get_nodes(node=None, xpath_txt=None, type_txt='7'): +# """用js通过xpath获取元素、节点或属性 +# :param node: 'document' 或 元素对象 +# :param xpath_txt: xpath语句 +# :param type_txt: resultType,参考 https://developer.mozilla.org/zh-CN/docs/Web/API/Document/evaluate +# :return: 元素对象或属性、文本字符串 +# """ +# node_txt = 'document' if not node or node == 'document' else 'arguments[0]' +# for_txt = '' +# +# # 获取第一个元素、节点或属性 +# if type_txt == '9': +# return_txt = ''' +# if(e.singleNodeValue.constructor.name=="Text"){return e.singleNodeValue.data;} +# else if(e.singleNodeValue.constructor.name=="Attr"){return e.singleNodeValue.nodeValue;} +# else if(e.singleNodeValue.constructor.name=="Comment"){return e.singleNodeValue.nodeValue;} +# else{return e.singleNodeValue;} +# ''' +# +# # 按顺序获取所有元素、节点或属性 +# elif type_txt == '7': +# for_txt = """ +# var a=new Array(); +# for(var i = 0; i None: + """为当前tab设置user agent,只在当前tab有效 \n + :param ua: user agent字符串 + :return: None + """ + self.driver.Network.setUserAgentOverride(userAgent=ua) + + def get_session_storage(self, item: str = None) -> Union[str, dict, None]: + """获取sessionStorage信息,不设置item则获取全部 \n + :param item: 要获取的项,不设置则返回全部 + :return: sessionStorage一个或所有项内容 + """ + js = f'sessionStorage.getItem("{item}");' if item else 'sessionStorage;' + return self.driver.Runtime.evaluate(js) + + def get_local_storage(self, item: str = None) -> Union[str, dict, None]: + """获取localStorage信息,不设置item则获取全部 \n + :param item: 要获取的项目,不设置则返回全部 + :return: localStorage一个或所有项内容 + """ + js = f'localStorage.getItem("{item}");' if item else 'localStorage;' + return self.driver.Runtime.evaluate(js) + + def set_session_storage(self, item: str, value: Union[str, bool]) -> None: + """设置或删除某项sessionStorage信息 \n + :param item: 要设置的项 + :param value: 项的值,设置为False时,删除该项 + :return: None + """ + s = f'sessionStorage.removeItem("{item}");' if item is False else f'sessionStorage.setItem("{item}","{value}");' + return self.driver.Runtime.evaluate(s) + + def set_local_storage(self, item: str, value: Union[str, bool]) -> None: + """设置或删除某项localStorage信息 \n + :param item: 要设置的项 + :param value: 项的值,设置为False时,删除该项 + :return: None + """ + s = f'localStorage.removeItem("{item}");' if item is False else f'localStorage.setItem("{item}","{value}");' + return self.driver.Runtime.evaluate(s) + def create_tab(self, url: str = None) -> None: """新建并定位到一个标签页,该标签页在最后面 \n :param url: 新标签页跳转到的网址 @@ -194,6 +246,10 @@ class ChromePage(BasePage): if activate: requests_get(f'http://{self.debugger_address}/json/activate/{tab}') + def to_front(self) -> None: + """激活当前标签页使其处于最前面""" + requests_get(f'http://{self.debugger_address}/json/activate/{self.current_tab_handle}') + def close_tabs(self, num_or_handles: Union[int, str, list, tuple, set] = None, others: bool = False) -> None: """关闭传入的标签页,默认关闭当前页。可传入多个 \n 注意:当程序使用的是接管的浏览器,获取到的 handle 顺序和视觉效果不一致,不能按序号关闭。 \n diff --git a/DrissionPage/common.py b/DrissionPage/common.py index 16bc2e5..815305f 100644 --- a/DrissionPage/common.py +++ b/DrissionPage/common.py @@ -10,6 +10,7 @@ from re import split, search, sub from shutil import rmtree from typing import Union from zipfile import ZipFile +from urllib.parse import urlparse, urljoin, urlunparse def get_ele_txt(e) -> str: @@ -451,3 +452,28 @@ def get_long(txt) -> int: """ txt_len = len(txt) return int((len(txt.encode('utf-8')) - txt_len) / 2 + txt_len) + + +def make_absolute_link(link, page=None) -> str: + """获取绝对url + :param link: 超链接 + :param page: 页面对象 + :return: 绝对链接 + """ + if not link: + return link + + parsed = urlparse(link)._asdict() + + # 是相对路径,与页面url拼接并返回 + if not parsed['netloc']: + return urljoin(page.url, link) if page else link + + # 是绝对路径但缺少协议,从页面url获取协议并修复 + if not parsed['scheme'] and page: + parsed['scheme'] = urlparse(page.url).scheme + parsed = tuple(v for v in parsed.values()) + return urlunparse(parsed) + + # 绝对路径且不缺协议,直接返回 + return link diff --git a/DrissionPage/drission.py b/DrissionPage/drission.py index 74c7aba..88f6f35 100644 --- a/DrissionPage/drission.py +++ b/DrissionPage/drission.py @@ -1,567 +1,570 @@ -# -*- encoding: utf-8 -*- -""" -@Author : g1879 -@Contact : g1879@qq.com -@File : drission.py -""" -from subprocess import Popen -from sys import exit -from typing import Union - -from platform import system -from requests import Session, get as requests_get -from requests.cookies import RequestsCookieJar -from requests.structures import CaseInsensitiveDict -from selenium import webdriver -from selenium.common.exceptions import SessionNotCreatedException, WebDriverException -from selenium.webdriver.chrome.options import Options -from selenium.webdriver.chrome.webdriver import WebDriver -from selenium.webdriver.remote.webdriver import WebDriver as RemoteWebDriver -from tldextract import extract - -from .common import get_pid_from_port, get_exe_path_from_port -from .config import _session_options_to_dict, SessionOptions, DriverOptions, _cookies_to_tuple - - -class Drission(object): - """Drission类用于管理WebDriver对象和Session对象,是驱动器的角色""" - - def __init__(self, - driver_or_options: Union[RemoteWebDriver, Options, DriverOptions, bool] = None, - session_or_options: Union[Session, dict, SessionOptions, bool] = None, - ini_path: str = None, - proxy: dict = None): - """初始化,可接收现成的WebDriver和Session对象,或接收它们的配置信息生成对象 \n - :param driver_or_options: driver对象或DriverOptions、Options类,传入False则创建空配置对象 - :param session_or_options: Session对象或设置字典,传入False则创建空配置对象 - :param ini_path: ini文件路径 - :param proxy: 代理设置 - """ - self._session = None - self._driver = None - self._session_options = None - self._driver_options = None - self._debugger = None - self._proxy = proxy - - # ------------------处理session options---------------------- - if session_or_options is None: - self._session_options = SessionOptions(ini_path=ini_path).as_dict() - - elif session_or_options is False: - self._session_options = SessionOptions(read_file=False).as_dict() - - elif isinstance(session_or_options, Session): - self._session = session_or_options - - elif isinstance(session_or_options, SessionOptions): - self._session_options = session_or_options.as_dict() - - elif isinstance(session_or_options, dict): - self._session_options = session_or_options - - else: - raise TypeError('session_or_options参数只能接收Session, dict, SessionOptions或False。') - - # ------------------处理driver options---------------------- - if driver_or_options is None: - self._driver_options = DriverOptions(ini_path=ini_path) - - elif driver_or_options is False: - self._driver_options = DriverOptions(read_file=False) - - elif isinstance(driver_or_options, RemoteWebDriver): - self._driver = driver_or_options - - elif isinstance(driver_or_options, (Options, DriverOptions)): - self._driver_options = driver_or_options - - else: - raise TypeError('driver_or_options参数只能接收WebDriver, Options, DriverOptions或False。') - - def __del__(self): - """关闭对象时关闭浏览器和Session""" - try: - self.close() - except ImportError: - pass - - @property - def session(self) -> Session: - """返回Session对象,如未初始化则按配置信息创建""" - if self._session is None: - self._set_session(self._session_options) - - if self._proxy: - self._session.proxies = self._proxy - - return self._session - - @property - def driver(self) -> WebDriver: - """返回WebDriver对象,如未初始化则按配置信息创建。 \n - 如设置了本地调试浏览器,可自动接入或打开浏览器进程。 - """ - if self._driver is None: - if not self.driver_options.debugger_address and self._proxy: - self.driver_options.add_argument(f'--proxy-server={self._proxy["http"]}') - - driver_path = self.driver_options.driver_path or 'chromedriver' - chrome_path = self.driver_options.binary_location or 'chrome.exe' - - # -----------若指定debug端口且该端口未在使用中,则先启动浏览器进程----------- - if self.driver_options.debugger_address: - # 启动浏览器进程,同时返回该进程使用的 chrome.exe 路径 - chrome_path, self._debugger = connect_chrome(chrome_path, self.driver_options.debugger_address, - self.driver_options.arguments, self._proxy) - - # -----------创建WebDriver对象----------- - self._driver = _create_driver(chrome_path, driver_path, self.driver_options) - - # -----------解决接管新版浏览器不能定位到正确的标签页的问题----------- - active_tab = self._driver.window_handles[0] - if active_tab != self._driver.current_window_handle: - self._driver.switch_to.window(active_tab) - - # 反反爬设置 - try: - self._driver.execute_script('Object.defineProperty(navigator,"webdriver",{get:() => undefined,});') - except Exception: - pass - - return self._driver - - @property - def driver_options(self) -> Union[DriverOptions, Options]: - """返回driver配置信息""" - return self._driver_options - - @property - def session_options(self) -> dict: - """返回session配置信息""" - return self._session_options - - @session_options.setter - def session_options(self, options: Union[dict, SessionOptions]) -> None: - """设置session配置 \n - :param options: session配置字典 - :return: None - """ - self._session_options = _session_options_to_dict(options) - self._set_session(self._session_options) - - @property - def proxy(self) -> Union[None, dict]: - """返回代理信息""" - return self._proxy - - @proxy.setter - def proxy(self, proxies: dict = None) -> None: - """设置代理信息 \n - :param proxies: 代理信息字典 - :return: None - """ - self._proxy = proxies - - if self._session: - self._session.proxies = proxies - - if self._driver: - cookies = self._driver.get_cookies() - url = self._driver.current_url - self._driver.quit() - self._driver = None - self._driver = self.driver - self._driver.get(url) - - for cookie in cookies: - self.set_cookies(cookie, set_driver=True) - - @property - def debugger_progress(self): - """调试浏览器进程""" - return self._debugger - - def kill_browser(self) -> None: - """关闭浏览器进程(如果可以)""" - pid = self.get_browser_progress_id() - if not _kill_progress(pid): - self._driver.quit() - - def get_browser_progress_id(self) -> Union[str, None]: - """获取浏览器进程id""" - if self.debugger_progress: - return self.debugger_progress.pid - - address = str(self.driver_options.debugger_address).split(':') - if len(address) == 2: - ip, port = address - if ip not in ('127.0.0.1', 'localhost') or not port.isdigit(): - return None - - from os import popen - txt = '' - progresses = popen(f'netstat -nao | findstr :{port}').read().split('\n') - for progress in progresses: - if 'LISTENING' in progress: - txt = progress - break - if not txt: - return None - - return txt.split(' ')[-1] - - def hide_browser(self) -> None: - """隐藏浏览器界面""" - self._show_or_hide_browser() - - def show_browser(self) -> None: - """显示浏览器界面""" - self._show_or_hide_browser(False) - - def _show_or_hide_browser(self, hide: bool = True) -> None: - if system().lower() != 'windows': - raise OSError('该方法只能在Windows系统使用。') - - try: - from win32gui import ShowWindow - from win32con import SW_HIDE, SW_SHOW - except ImportError: - raise ImportError('请先安装:pip install pypiwin32') - - pid = self.get_browser_progress_id() - if not pid: - print('只有设置了debugger_address参数才能使用 show_browser() 和 hide_browser()') - return - hds = _get_chrome_hwnds_from_pid(pid) - sw = SW_HIDE if hide else SW_SHOW - for hd in hds: - ShowWindow(hd, sw) - - def set_cookies(self, - cookies: Union[RequestsCookieJar, list, tuple, str, dict], - set_session: bool = False, - set_driver: bool = False) -> None: - """设置cookies \n - :param cookies: cookies信息,可为CookieJar, list, tuple, str, dict - :param set_session: 是否设置session的cookies - :param set_driver: 是否设置driver的cookies - :return: None - """ - cookies = _cookies_to_tuple(cookies) - - for cookie in cookies: - if cookie['value'] is None: - cookie['value'] = '' - - # 添加cookie到session - if set_session: - kwargs = {x: cookie[x] for x in cookie - if x.lower() not in ('name', 'value', 'httponly', 'expiry', 'samesite')} - - if 'expiry' in cookie: - kwargs['expires'] = cookie['expiry'] - - self.session.cookies.set(cookie['name'], cookie['value'], **kwargs) - - # 添加cookie到driver - if set_driver: - if 'expiry' in cookie: - cookie['expiry'] = int(cookie['expiry']) - - try: - browser_domain = extract(self.driver.current_url).fqdn - except AttributeError: - browser_domain = '' - - if not cookie.get('domain', None): - if browser_domain: - url = extract(browser_domain) - cookie_domain = f'{url.domain}.{url.suffix}' - else: - raise ValueError('cookie中没有域名或浏览器未访问过URL。') - - cookie['domain'] = cookie_domain - - else: - cookie_domain = cookie['domain'] if cookie['domain'][0] != '.' else cookie['domain'][1:] - - if cookie_domain not in browser_domain: - self.driver.get(cookie_domain if cookie_domain.startswith('http://') - else f'http://{cookie_domain}') - - # 避免selenium自动添加.后无法正确覆盖已有cookie - if cookie['domain'][0] != '.': - c = self.driver.get_cookie(cookie['name']) - if c and c['domain'] == cookie['domain']: - self.driver.delete_cookie(cookie['name']) - - self.driver.add_cookie(cookie) - - def _set_session(self, data: dict) -> None: - """根据传入字典对session进行设置 \n - :param data: session配置字典 - :return: None - """ - if self._session is None: - self._session = Session() - - if 'headers' in data: - self._session.headers = CaseInsensitiveDict(data['headers']) - if 'cookies' in data: - self.set_cookies(data['cookies'], set_session=True) - - attrs = ['auth', 'proxies', 'hooks', 'params', 'verify', - 'cert', 'stream', 'trust_env', 'max_redirects'] # , 'adapters' - for i in attrs: - if i in data: - self._session.__setattr__(i, data[i]) - - def cookies_to_session(self, copy_user_agent: bool = False) -> None: - """把driver对象的cookies复制到session对象 \n - :param copy_user_agent: 是否复制ua信息 - :return: None - """ - if copy_user_agent: - user_agent_to_session(self.driver, self.session) - - self.set_cookies(self.driver.get_cookies(), set_session=True) - - def cookies_to_driver(self, url: str) -> None: - """把session对象的cookies复制到driver对象 \n - :param url: 作用域 - :return: None - """ - browser_domain = extract(self.driver.current_url).fqdn - ex_url = extract(url) - - if ex_url.fqdn not in browser_domain: - self.driver.get(url) - - domain = f'{ex_url.domain}.{ex_url.suffix}' - - cookies = [] - for cookie in self.session.cookies: - if cookie.domain == '': - cookie.domain = domain - - if domain in cookie.domain: - cookies.append(cookie) - - self.set_cookies(cookies, set_driver=True) - - def close_driver(self, kill: bool = False) -> None: - """关闭driver和浏览器""" - if self._driver: - _kill_progress(port=self._driver.service.port) # 关闭chromedriver.exe进程 - - if kill: - self.kill_browser() - else: - self._driver.quit() - - self._driver = None - - def close_session(self) -> None: - """关闭session""" - if self._session: - self._session.close() - self._session = None - - def close(self) -> None: - """关闭session、driver和浏览器""" - if self._driver: - self.close_driver() - - if self._session: - self.close_session() - - -def user_agent_to_session(driver: RemoteWebDriver, session: Session) -> None: - """把driver的user-agent复制到session \n - :param driver: 来源driver对象 - :param session: 目标session对象 - :return: None - """ - driver = driver - session = session - selenium_user_agent = driver.execute_script("return navigator.userAgent;") - session.headers.update({"User-Agent": selenium_user_agent}) - - -def _port_is_using(ip: str, port: str) -> Union[bool, None]: - """检查端口是否被占用 \n - :param ip: 浏览器地址 - :param port: 浏览器端口 - :return: bool - """ - import socket - s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - - try: - s.connect((ip, int(port))) - s.shutdown(2) - return True - except socket.error: - return False - finally: - if s: - s.close() - - -def connect_chrome(chrome_path: str, debugger_address: str, args: list = None, proxy: dict = None) -> tuple: - """连接或启动chrome \n - :param chrome_path: chrome.exe 路径 - :param debugger_address: 进程运行的ip和端口号 - :param args: chrome 配置参数 - :param proxy: 代理配置 - :return: chrome 路径和进程对象组成的元组 - """ - debugger_address = debugger_address[7:] if debugger_address.startswith('http://') else debugger_address - ip, port = debugger_address.split(':') - if ip not in ('127.0.0.1', 'localhost'): - return None, None - - if _port_is_using(ip, port): - chrome_path = get_exe_path_from_port(port) if chrome_path == 'chrome.exe' else chrome_path - return chrome_path, None - - # ----------为路径加上双引号,避免路径中的空格产生异常---------- - args = [] if args is None else args - args1 = [] - for arg in args: - if arg.startswith(('--user-data-dir', '--disk-cache-dir')): - index = arg.find('=') + 1 - args1.append(f'{arg[:index]}"{arg[index:].strip()}"') - elif arg.startswith('--user-agent='): - args1.append(f'--user-agent="{arg[13:]}"') - else: - args1.append(arg) - - args = set(args1) - - if proxy: - args.add(f'--proxy-server={proxy["http"]}') - - # ----------创建浏览器进程---------- - try: - debugger = _run_browser(port, chrome_path, args) - if chrome_path == 'chrome.exe': - chrome_path = get_exe_path_from_port(port) - - # 传入的路径找不到,主动在ini文件、注册表、系统变量中找 - except FileNotFoundError: - from DrissionPage.easy_set import _get_chrome_path - chrome_path = _get_chrome_path(show_msg=False) - - if not chrome_path: - raise FileNotFoundError('无法找到chrome.exe路径,请手动配置。') - - debugger = _run_browser(port, chrome_path, args) - - return chrome_path, debugger - - -def _run_browser(port, path: str, args: set) -> Popen: - """创建chrome进程 \n - :param port: 端口号 - :param path: 浏览器地址 - :param args: 启动参数 - :return: 进程对象 - """ - sys = system().lower() - if sys == 'windows': - args = ' '.join(args) - debugger = Popen(f'"{path}" --remote-debugging-port={port} {args}', shell=False) - elif sys == 'linux': - arguments = [path, f'--remote-debugging-port={port}'] + list(args) - debugger = Popen(arguments, shell=False) - else: - raise OSError('只支持Windows和Linux系统。') - - while True: - try: - requests_get(f'http://127.0.0.1:{port}/json') - break - except ConnectionError: - pass - - return debugger - - -def _create_driver(chrome_path: str, driver_path: str, options: Options) -> WebDriver: - """创建 WebDriver 对象 \n - :param chrome_path: chrome.exe 路径 - :param driver_path: chromedriver.exe 路径 - :param options: Options 对象 - :return: WebDriver 对象 - """ - try: - debugger_address = options.debugger_address - if options.debugger_address: - options = Options() - options.debugger_address = debugger_address - - return webdriver.Chrome(driver_path, options=options) - - # 若版本不对,获取对应 chromedriver 再试 - except (WebDriverException, SessionNotCreatedException): - print('打开失败,尝试获取driver。\n') - from .easy_set import get_match_driver - from DrissionPage.easy_set import _get_chrome_path - - if chrome_path == 'chrome.exe': - chrome_path = _get_chrome_path(show_msg=False, from_ini=False) - - if chrome_path: - driver_path = get_match_driver(chrome_path=chrome_path, check_version=False, show_msg=True) - if driver_path: - try: - options.binary_location = chrome_path - return webdriver.Chrome(driver_path, options=options) - except Exception: - pass - - print('无法启动,请检查浏览器路径,或手动设置chromedriver。\n下载地址:http://npm.taobao.org/mirrors/chromedriver/') - exit(0) - - -def _get_chrome_hwnds_from_pid(pid) -> list: - """通过PID查询句柄ID""" - try: - from win32gui import IsWindow, GetWindowText, EnumWindows - from win32process import GetWindowThreadProcessId - except ImportError: - raise ImportError('请先安装win32gui,pip install pypiwin32') - - def callback(hwnd, hds): - if IsWindow(hwnd) and '- Google Chrome' in GetWindowText(hwnd): - _, found_pid = GetWindowThreadProcessId(hwnd) - if str(found_pid) == str(pid): - hds.append(hwnd) - return True - - hwnds = [] - EnumWindows(callback, hwnds) - return hwnds - - -def _kill_progress(pid: str = None, port: int = None) -> bool: - """关闭浏览器进程 \n - :param pid: 进程id - :param port: 端口号,如没有进程id,从端口号获取 - :return: 是否成功 - """ - from os import popen - if system().lower() != 'windows': - return False - - pid = pid or get_pid_from_port(port) - if not pid: - return False - - if popen(f'tasklist | findstr {pid}').read().lower().startswith('chrome.exe'): - popen(f'taskkill /pid {pid} /F') - return True - else: - return False +# -*- encoding: utf-8 -*- +""" +@Author : g1879 +@Contact : g1879@qq.com +@File : drission.py +""" +from subprocess import Popen +from sys import exit +from typing import Union + +from platform import system +from requests import Session, get as requests_get +from requests.cookies import RequestsCookieJar +from requests.structures import CaseInsensitiveDict +from requests.exceptions import ConnectionError as requests_connection_err +from selenium import webdriver +from selenium.common.exceptions import SessionNotCreatedException, WebDriverException +from selenium.webdriver.chrome.options import Options +from selenium.webdriver.chrome.webdriver import WebDriver +from selenium.webdriver.remote.webdriver import WebDriver as RemoteWebDriver +from time import perf_counter +from tldextract import extract + +from .common import get_pid_from_port, get_exe_path_from_port +from .config import _session_options_to_dict, SessionOptions, DriverOptions, _cookies_to_tuple + + +class Drission(object): + """Drission类用于管理WebDriver对象和Session对象,是驱动器的角色""" + + def __init__(self, + driver_or_options: Union[RemoteWebDriver, Options, DriverOptions, bool] = None, + session_or_options: Union[Session, dict, SessionOptions, bool] = None, + ini_path: str = None, + proxy: dict = None): + """初始化,可接收现成的WebDriver和Session对象,或接收它们的配置信息生成对象 \n + :param driver_or_options: driver对象或DriverOptions、Options类,传入False则创建空配置对象 + :param session_or_options: Session对象或设置字典,传入False则创建空配置对象 + :param ini_path: ini文件路径 + :param proxy: 代理设置 + """ + self._session = None + self._driver = None + self._session_options = None + self._driver_options = None + self._debugger = None + self._proxy = proxy + + # ------------------处理session options---------------------- + if session_or_options is None: + self._session_options = SessionOptions(ini_path=ini_path).as_dict() + + elif session_or_options is False: + self._session_options = SessionOptions(read_file=False).as_dict() + + elif isinstance(session_or_options, Session): + self._session = session_or_options + + elif isinstance(session_or_options, SessionOptions): + self._session_options = session_or_options.as_dict() + + elif isinstance(session_or_options, dict): + self._session_options = session_or_options + + else: + raise TypeError('session_or_options参数只能接收Session, dict, SessionOptions或False。') + + # ------------------处理driver options---------------------- + if driver_or_options is None: + self._driver_options = DriverOptions(ini_path=ini_path) + + elif driver_or_options is False: + self._driver_options = DriverOptions(read_file=False) + + elif isinstance(driver_or_options, RemoteWebDriver): + self._driver = driver_or_options + + elif isinstance(driver_or_options, (Options, DriverOptions)): + self._driver_options = driver_or_options + + else: + raise TypeError('driver_or_options参数只能接收WebDriver, Options, DriverOptions或False。') + + def __del__(self): + """关闭对象时关闭浏览器和Session""" + try: + self.close() + except ImportError: + pass + + @property + def session(self) -> Session: + """返回Session对象,如未初始化则按配置信息创建""" + if self._session is None: + self._set_session(self._session_options) + + if self._proxy: + self._session.proxies = self._proxy + + return self._session + + @property + def driver(self) -> WebDriver: + """返回WebDriver对象,如未初始化则按配置信息创建。 \n + 如设置了本地调试浏览器,可自动接入或打开浏览器进程。 + """ + if self._driver is None: + if not self.driver_options.debugger_address and self._proxy: + self.driver_options.add_argument(f'--proxy-server={self._proxy["http"]}') + + driver_path = self.driver_options.driver_path or 'chromedriver' + chrome_path = self.driver_options.binary_location or 'chrome.exe' + + # -----------若指定debug端口且该端口未在使用中,则先启动浏览器进程----------- + if self.driver_options.debugger_address: + # 启动浏览器进程,同时返回该进程使用的 chrome.exe 路径 + chrome_path, self._debugger = connect_chrome(chrome_path, self.driver_options.debugger_address, + self.driver_options.arguments, self._proxy) + + # -----------创建WebDriver对象----------- + self._driver = _create_driver(chrome_path, driver_path, self.driver_options) + + # -----------解决接管新版浏览器不能定位到正确的标签页的问题----------- + active_tab = self._driver.window_handles[0] + if active_tab != self._driver.current_window_handle: + self._driver.switch_to.window(active_tab) + + # 反反爬设置 + try: + self._driver.execute_script('Object.defineProperty(navigator,"webdriver",{get:() => undefined,});') + except Exception: + pass + + return self._driver + + @property + def driver_options(self) -> Union[DriverOptions, Options]: + """返回driver配置信息""" + return self._driver_options + + @property + def session_options(self) -> dict: + """返回session配置信息""" + return self._session_options + + @session_options.setter + def session_options(self, options: Union[dict, SessionOptions]) -> None: + """设置session配置 \n + :param options: session配置字典 + :return: None + """ + self._session_options = _session_options_to_dict(options) + self._set_session(self._session_options) + + @property + def proxy(self) -> Union[None, dict]: + """返回代理信息""" + return self._proxy + + @proxy.setter + def proxy(self, proxies: dict = None) -> None: + """设置代理信息 \n + :param proxies: 代理信息字典 + :return: None + """ + self._proxy = proxies + + if self._session: + self._session.proxies = proxies + + if self._driver: + cookies = self._driver.get_cookies() + url = self._driver.current_url + self._driver.quit() + self._driver = None + self._driver = self.driver + self._driver.get(url) + + for cookie in cookies: + self.set_cookies(cookie, set_driver=True) + + @property + def debugger_progress(self): + """调试浏览器进程""" + return self._debugger + + def kill_browser(self) -> None: + """关闭浏览器进程(如果可以)""" + pid = self.get_browser_progress_id() + if not _kill_progress(pid): + self._driver.quit() + + def get_browser_progress_id(self) -> Union[str, None]: + """获取浏览器进程id""" + if self.debugger_progress: + return self.debugger_progress.pid + + address = str(self.driver_options.debugger_address).split(':') + if len(address) == 2: + ip, port = address + if ip not in ('127.0.0.1', 'localhost') or not port.isdigit(): + return None + + from os import popen + txt = '' + progresses = popen(f'netstat -nao | findstr :{port}').read().split('\n') + for progress in progresses: + if 'LISTENING' in progress: + txt = progress + break + if not txt: + return None + + return txt.split(' ')[-1] + + def hide_browser(self) -> None: + """隐藏浏览器界面""" + self._show_or_hide_browser() + + def show_browser(self) -> None: + """显示浏览器界面""" + self._show_or_hide_browser(False) + + def _show_or_hide_browser(self, hide: bool = True) -> None: + if system().lower() != 'windows': + raise OSError('该方法只能在Windows系统使用。') + + try: + from win32gui import ShowWindow + from win32con import SW_HIDE, SW_SHOW + except ImportError: + raise ImportError('请先安装:pip install pypiwin32') + + pid = self.get_browser_progress_id() + if not pid: + print('只有设置了debugger_address参数才能使用 show_browser() 和 hide_browser()') + return + hds = _get_chrome_hwnds_from_pid(pid) + sw = SW_HIDE if hide else SW_SHOW + for hd in hds: + ShowWindow(hd, sw) + + def set_cookies(self, + cookies: Union[RequestsCookieJar, list, tuple, str, dict], + set_session: bool = False, + set_driver: bool = False) -> None: + """设置cookies \n + :param cookies: cookies信息,可为CookieJar, list, tuple, str, dict + :param set_session: 是否设置session的cookies + :param set_driver: 是否设置driver的cookies + :return: None + """ + cookies = _cookies_to_tuple(cookies) + + for cookie in cookies: + if cookie['value'] is None: + cookie['value'] = '' + + # 添加cookie到session + if set_session: + kwargs = {x: cookie[x] for x in cookie + if x.lower() not in ('name', 'value', 'httponly', 'expiry', 'samesite')} + + if 'expiry' in cookie: + kwargs['expires'] = cookie['expiry'] + + self.session.cookies.set(cookie['name'], cookie['value'], **kwargs) + + # 添加cookie到driver + if set_driver: + if 'expiry' in cookie: + cookie['expiry'] = int(cookie['expiry']) + + try: + browser_domain = extract(self.driver.current_url).fqdn + except AttributeError: + browser_domain = '' + + if not cookie.get('domain', None): + if browser_domain: + url = extract(browser_domain) + cookie_domain = f'{url.domain}.{url.suffix}' + else: + raise ValueError('cookie中没有域名或浏览器未访问过URL。') + + cookie['domain'] = cookie_domain + + else: + cookie_domain = cookie['domain'] if cookie['domain'][0] != '.' else cookie['domain'][1:] + + if cookie_domain not in browser_domain: + self.driver.get(cookie_domain if cookie_domain.startswith('http://') + else f'http://{cookie_domain}') + + # 避免selenium自动添加.后无法正确覆盖已有cookie + if cookie['domain'][0] != '.': + c = self.driver.get_cookie(cookie['name']) + if c and c['domain'] == cookie['domain']: + self.driver.delete_cookie(cookie['name']) + + self.driver.add_cookie(cookie) + + def _set_session(self, data: dict) -> None: + """根据传入字典对session进行设置 \n + :param data: session配置字典 + :return: None + """ + if self._session is None: + self._session = Session() + + if 'headers' in data: + self._session.headers = CaseInsensitiveDict(data['headers']) + if 'cookies' in data: + self.set_cookies(data['cookies'], set_session=True) + + attrs = ['auth', 'proxies', 'hooks', 'params', 'verify', + 'cert', 'stream', 'trust_env', 'max_redirects'] # , 'adapters' + for i in attrs: + if i in data: + self._session.__setattr__(i, data[i]) + + def cookies_to_session(self, copy_user_agent: bool = False) -> None: + """把driver对象的cookies复制到session对象 \n + :param copy_user_agent: 是否复制ua信息 + :return: None + """ + if copy_user_agent: + user_agent_to_session(self.driver, self.session) + + self.set_cookies(self.driver.get_cookies(), set_session=True) + + def cookies_to_driver(self, url: str) -> None: + """把session对象的cookies复制到driver对象 \n + :param url: 作用域 + :return: None + """ + browser_domain = extract(self.driver.current_url).fqdn + ex_url = extract(url) + + if ex_url.fqdn not in browser_domain: + self.driver.get(url) + + domain = f'{ex_url.domain}.{ex_url.suffix}' + + cookies = [] + for cookie in self.session.cookies: + if cookie.domain == '': + cookie.domain = domain + + if domain in cookie.domain: + cookies.append(cookie) + + self.set_cookies(cookies, set_driver=True) + + def close_driver(self, kill: bool = False) -> None: + """关闭driver和浏览器""" + if self._driver: + _kill_progress(port=self._driver.service.port) # 关闭chromedriver.exe进程 + + if kill: + self.kill_browser() + else: + self._driver.quit() + + self._driver = None + + def close_session(self) -> None: + """关闭session""" + if self._session: + self._session.close() + self._session = None + + def close(self) -> None: + """关闭session、driver和浏览器""" + if self._driver: + self.close_driver() + + if self._session: + self.close_session() + + +def user_agent_to_session(driver: RemoteWebDriver, session: Session) -> None: + """把driver的user-agent复制到session \n + :param driver: 来源driver对象 + :param session: 目标session对象 + :return: None + """ + driver = driver + session = session + selenium_user_agent = driver.execute_script("return navigator.userAgent;") + session.headers.update({"User-Agent": selenium_user_agent}) + + +def _port_is_using(ip: str, port: str) -> Union[bool, None]: + """检查端口是否被占用 \n + :param ip: 浏览器地址 + :param port: 浏览器端口 + :return: bool + """ + import socket + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + + try: + s.connect((ip, int(port))) + s.shutdown(2) + return True + except socket.error: + return False + finally: + if s: + s.close() + + +def connect_chrome(chrome_path: str, debugger_address: str, args: list = None, proxy: dict = None) -> tuple: + """连接或启动chrome \n + :param chrome_path: chrome.exe 路径 + :param debugger_address: 进程运行的ip和端口号 + :param args: chrome 配置参数 + :param proxy: 代理配置 + :return: chrome 路径和进程对象组成的元组 + """ + debugger_address = debugger_address[7:] if debugger_address.startswith('http://') else debugger_address + ip, port = debugger_address.split(':') + if ip not in ('127.0.0.1', 'localhost'): + return None, None + + if _port_is_using(ip, port): + chrome_path = get_exe_path_from_port(port) if chrome_path == 'chrome.exe' else chrome_path + return chrome_path, None + + # ----------为路径加上双引号,避免路径中的空格产生异常---------- + args = [] if args is None else args + args1 = [] + for arg in args: + if arg.startswith(('--user-data-dir', '--disk-cache-dir')): + index = arg.find('=') + 1 + args1.append(f'{arg[:index]}"{arg[index:].strip()}"') + elif arg.startswith('--user-agent='): + args1.append(f'--user-agent="{arg[13:]}"') + else: + args1.append(arg) + + args = set(args1) + + if proxy: + args.add(f'--proxy-server={proxy["http"]}') + + # ----------创建浏览器进程---------- + try: + debugger = _run_browser(port, chrome_path, args) + if chrome_path == 'chrome.exe': + chrome_path = get_exe_path_from_port(port) + + # 传入的路径找不到,主动在ini文件、注册表、系统变量中找 + except FileNotFoundError: + from DrissionPage.easy_set import _get_chrome_path + chrome_path = _get_chrome_path(show_msg=False) + + if not chrome_path: + raise FileNotFoundError('无法找到chrome.exe路径,请手动配置。') + + debugger = _run_browser(port, chrome_path, args) + + return chrome_path, debugger + + +def _run_browser(port, path: str, args: set) -> Popen: + """创建chrome进程 \n + :param port: 端口号 + :param path: 浏览器地址 + :param args: 启动参数 + :return: 进程对象 + """ + sys = system().lower() + if sys == 'windows': + args = ' '.join(args) + debugger = Popen(f'"{path}" --remote-debugging-port={port} {args}', shell=False) + elif sys == 'linux': + arguments = [path, f'--remote-debugging-port={port}'] + list(args) + debugger = Popen(arguments, shell=False) + else: + raise OSError('只支持Windows和Linux系统。') + + t1 = perf_counter() + while perf_counter() - t1 < 10: + try: + requests_get(f'http://127.0.0.1:{port}/json') + return debugger + except requests_connection_err: + pass + + raise ConnectionError('无法连接浏览器。') + + +def _create_driver(chrome_path: str, driver_path: str, options: Options) -> WebDriver: + """创建 WebDriver 对象 \n + :param chrome_path: chrome.exe 路径 + :param driver_path: chromedriver.exe 路径 + :param options: Options 对象 + :return: WebDriver 对象 + """ + try: + debugger_address = options.debugger_address + if options.debugger_address: + options = Options() + options.debugger_address = debugger_address + + return webdriver.Chrome(driver_path, options=options) + + # 若版本不对,获取对应 chromedriver 再试 + except (WebDriverException, SessionNotCreatedException): + print('打开失败,尝试获取driver。\n') + from .easy_set import get_match_driver + from DrissionPage.easy_set import _get_chrome_path + + if chrome_path == 'chrome.exe': + chrome_path = _get_chrome_path(show_msg=False, from_ini=False) + + if chrome_path: + driver_path = get_match_driver(chrome_path=chrome_path, check_version=False, show_msg=True) + if driver_path: + try: + options.binary_location = chrome_path + return webdriver.Chrome(driver_path, options=options) + except Exception: + pass + + print('无法启动,请检查浏览器路径,或手动设置chromedriver。\n下载地址:http://npm.taobao.org/mirrors/chromedriver/') + exit(0) + + +def _get_chrome_hwnds_from_pid(pid) -> list: + """通过PID查询句柄ID""" + try: + from win32gui import IsWindow, GetWindowText, EnumWindows + from win32process import GetWindowThreadProcessId + except ImportError: + raise ImportError('请先安装win32gui,pip install pypiwin32') + + def callback(hwnd, hds): + if IsWindow(hwnd) and '- Google Chrome' in GetWindowText(hwnd): + _, found_pid = GetWindowThreadProcessId(hwnd) + if str(found_pid) == str(pid): + hds.append(hwnd) + return True + + hwnds = [] + EnumWindows(callback, hwnds) + return hwnds + + +def _kill_progress(pid: str = None, port: int = None) -> bool: + """关闭浏览器进程 \n + :param pid: 进程id + :param port: 端口号,如没有进程id,从端口号获取 + :return: 是否成功 + """ + from os import popen + if system().lower() != 'windows': + return False + + pid = pid or get_pid_from_port(port) + if not pid: + return False + + if popen(f'tasklist | findstr {pid}').read().lower().startswith('chrome.exe'): + popen(f'taskkill /pid {pid} /F') + return True + else: + return False diff --git a/DrissionPage/driver_element.py b/DrissionPage/driver_element.py index 1c98028..262bc2a 100644 --- a/DrissionPage/driver_element.py +++ b/DrissionPage/driver_element.py @@ -30,9 +30,10 @@ class DriverElement(DrissionElement): :param ele: 被包装的WebElement元素 :param page: 元素所在页面 """ - super().__init__(ele, page) + super().__init__(page) self._select = None self._scroll = None + self._inner_ele = ele def __repr__(self) -> str: attrs = [f"{attr}='{self.attrs[attr]}'" for attr in self.attrs] @@ -50,6 +51,10 @@ class DriverElement(DrissionElement): return self.ele(loc_or_str, timeout) # -----------------共有属性和方法------------------- + @property + def inner_ele(self) -> WebElement: + return self._inner_ele + @property def tag(self) -> str: """返回元素类型""" diff --git a/DrissionPage/session_element.py b/DrissionPage/session_element.py index 6b329b8..093700b 100644 --- a/DrissionPage/session_element.py +++ b/DrissionPage/session_element.py @@ -12,7 +12,7 @@ from lxml.etree import tostring from lxml.html import HtmlElement, fromstring from .base import DrissionElement, BasePage, BaseElement -from .common import get_ele_txt, get_loc +from .common import get_ele_txt, get_loc, make_absolute_link class SessionElement(DrissionElement): @@ -23,7 +23,12 @@ class SessionElement(DrissionElement): :param ele: 被包装的HtmlElement元素 :param page: 元素所在页面对象,如果是从 html 文本生成的元素,则为 None """ - super().__init__(ele, page) + super().__init__(page) + self._inner_ele = ele + + @property + def inner_ele(self) -> HtmlElement: + return self._inner_ele def __repr__(self) -> str: attrs = [f"{attr}='{self.attrs[attr]}'" for attr in self.attrs] @@ -180,10 +185,10 @@ class SessionElement(DrissionElement): return link else: # 其它情况直接返回绝对url - return self._make_absolute(link) + return make_absolute_link(link, self.page) elif attr == 'src': - return self._make_absolute(self.inner_ele.get('src')) + return make_absolute_link(self.inner_ele.get('src'), self.page) elif attr == 'text': return self.text @@ -268,30 +273,6 @@ class SessionElement(DrissionElement): return f':root{path_str[1:]}' if mode == 'css' else path_str - # ----------------session独有方法----------------------- - def _make_absolute(self, link) -> str: - """获取绝对url - :param link: 超链接 - :return: 绝对链接 - """ - if not link: - return link - - parsed = urlparse(link)._asdict() - - # 是相对路径,与页面url拼接并返回 - if not parsed['netloc']: - return urljoin(self.page.url, link) if self.page else link - - # 是绝对路径但缺少协议,从页面url获取协议并修复 - if not parsed['scheme'] and self.page: - parsed['scheme'] = urlparse(self.page.url).scheme - parsed = tuple(v for v in parsed.values()) - return urlunparse(parsed) - - # 绝对路径且不缺协议,直接返回 - return link - def make_session_ele(html_or_ele: Union[str, BaseElement, BasePage], loc: Union[str, Tuple[str, str]] = None, diff --git a/DrissionPage/shadow_root_element.py b/DrissionPage/shadow_root_element.py index d67c042..3b87897 100644 --- a/DrissionPage/shadow_root_element.py +++ b/DrissionPage/shadow_root_element.py @@ -19,8 +19,13 @@ class ShadowRootElement(BaseElement): """ShadowRootElement是用于处理ShadowRoot的类,使用方法和DriverElement基本一致""" def __init__(self, inner_ele: WebElement, parent_ele: DriverElement): - super().__init__(inner_ele, parent_ele.page) + super().__init__(parent_ele.page) self.parent_ele = parent_ele + self._inner_ele = inner_ele + + @property + def inner_ele(self) -> WebElement: + return self._inner_ele def __repr__(self) -> str: return f''