页面对象的设置方法归类到set属性

This commit is contained in:
g1879 2023-02-14 00:36:41 +08:00
parent b3fc6b35e3
commit 01b107154e
13 changed files with 497 additions and 235 deletions

View File

@ -14,6 +14,7 @@ from .base import BasePage
from .chromium_driver import ChromiumDriver from .chromium_driver import ChromiumDriver
from .chromium_element import ChromiumWaiter, ChromiumScroll, ChromiumElement, run_js, make_chromium_ele, \ from .chromium_element import ChromiumWaiter, ChromiumScroll, ChromiumElement, run_js, make_chromium_ele, \
ChromiumElementWaiter ChromiumElementWaiter
from .functions.constants import HANDLE_ALERT_METHOD
from .functions.errors import ContextLossError, ElementLossError, AlertExistsError from .functions.errors import ContextLossError, ElementLossError, AlertExistsError
from .functions.locator import get_loc from .functions.locator import get_loc
from .functions.tools import get_usable_path from .functions.tools import get_usable_path
@ -75,6 +76,7 @@ class ChromiumBase(BasePage):
self._is_reading = False self._is_reading = False
self._upload_list = None self._upload_list = None
self._wait = None self._wait = None
self._set = None
def _driver_init(self, tab_id): def _driver_init(self, tab_id):
"""新建页面、页面刷新、切换标签页后要进行的cdp参数初始化 """新建页面、页面刷新、切换标签页后要进行的cdp参数初始化
@ -210,19 +212,6 @@ class ChromiumBase(BasePage):
self.run_cdp('DOM.setFileInputFiles', files=files, backendNodeId=kwargs['backendNodeId']) self.run_cdp('DOM.setFileInputFiles', files=files, backendNodeId=kwargs['backendNodeId'])
self._upload_list = [] self._upload_list = []
def set_upload_files(self, files):
"""等待上传的文件路径
:param files: 文件路径列表或字符串字符串时多个文件用回车分隔
:return: None
"""
if self._upload_list is None:
self._tab_obj.Page.fileChooserOpened = self._onFileChooserOpened
self.run_cdp('Page.setInterceptFileChooserDialog', enabled=True)
if isinstance(files, str):
files = files.split('\n')
self._upload_list = [str(Path(i).absolute()) for i in files]
def __call__(self, loc_or_str, timeout=None): def __call__(self, loc_or_str, timeout=None):
"""在内部查找元素 """在内部查找元素
ele = page('@id=ele_id') ele = page('@id=ele_id')
@ -316,11 +305,6 @@ class ChromiumBase(BasePage):
"""返回timeouts设置""" """返回timeouts设置"""
return self._timeouts return self._timeouts
@property
def set_page_load_strategy(self):
"""返回用于设置页面加载策略的对象"""
return PageLoadStrategy(self)
@property @property
def upload_list(self): def upload_list(self):
"""返回等待上传文件列表""" """返回等待上传文件列表"""
@ -333,21 +317,12 @@ class ChromiumBase(BasePage):
self._wait = ChromiumPageWaiter(self) self._wait = ChromiumPageWaiter(self)
return self._wait return self._wait
def set_timeouts(self, implicit=None, page_load=None, script=None): @property
"""设置超时时间,单位为秒 def set(self):
:param implicit: 查找元素超时时间 """返回用于等待的对象"""
:param page_load: 页面加载超时时间 if self._set is None:
:param script: 脚本运行超时时间 self._set = ChromiumBaseSetter(self)
:return: None return self._set
"""
if implicit is not None:
self._timeouts.implicit = implicit
if page_load is not None:
self._timeouts.page_load = page_load
if script is not None:
self._timeouts.script = script
def run_cdp(self, cmd, **cmd_args): def run_cdp(self, cmd, **cmd_args):
"""执行Chrome DevTools Protocol语句 """执行Chrome DevTools Protocol语句
@ -355,7 +330,7 @@ class ChromiumBase(BasePage):
:param cmd_args: 参数 :param cmd_args: 参数
:return: 执行的结果 :return: 执行的结果
""" """
if self.driver.has_alert and cmd != 'Page.handleJavaScriptDialog': if self.driver.has_alert and cmd != HANDLE_ALERT_METHOD:
raise AlertExistsError('存在未处理的提示框。') raise AlertExistsError('存在未处理的提示框。')
r = self.driver.call_method(cmd, **cmd_args) r = self.driver.call_method(cmd, **cmd_args)
@ -439,29 +414,6 @@ class ChromiumBase(BasePage):
else: else:
return cookies return cookies
def set_cookies(self, cookies):
"""设置cookies值
:param cookies: cookies信息
:return: None
"""
cookies = cookies_to_tuple(cookies)
result_cookies = []
for cookie in cookies:
if not cookie.get('domain', None):
continue
c = {'value': '' if cookie['value'] is None else cookie['value'],
'name': cookie['name'],
'domain': cookie['domain']}
result_cookies.append(c)
self.run_cdp_loaded('Network.setCookies', cookies=result_cookies)
def set_headers(self, headers: dict) -> None:
"""设置固定发送的headers
:param headers: dict格式的headers数据
:return: None
"""
self.run_cdp('Network.setExtraHTTPHeaders', headers=headers)
def ele(self, loc_or_ele, timeout=None): def ele(self, loc_or_ele, timeout=None):
"""获取第一个符合条件的元素对象 """获取第一个符合条件的元素对象
:param loc_or_ele: 定位符或元素对象 :param loc_or_ele: 定位符或元素对象
@ -599,17 +551,6 @@ class ChromiumBase(BasePage):
while self.ready_state != 'complete': while self.ready_state != 'complete':
sleep(.1) sleep(.1)
def set_user_agent(self, ua, platform=None):
"""为当前tab设置user agent只在当前tab有效
:param ua: user agent字符串
:param platform: platform字符串
:return: None
"""
keys = {'userAgent': ua}
if platform:
keys['platform'] = platform
self.run_cdp('Emulation.setUserAgentOverride', **keys)
def get_session_storage(self, item=None): def get_session_storage(self, item=None):
"""获取sessionStorage信息不设置item则获取全部 """获取sessionStorage信息不设置item则获取全部
:param item: 要获取的项不设置则返回全部 :param item: 要获取的项不设置则返回全部
@ -626,25 +567,6 @@ class ChromiumBase(BasePage):
js = f'localStorage.getItem("{item}");' if item else 'localStorage;' js = f'localStorage.getItem("{item}");' if item else 'localStorage;'
return self.run_js_loaded(js, as_expr=True) return self.run_js_loaded(js, as_expr=True)
def set_session_storage(self, item, value):
"""设置或删除某项sessionStorage信息
:param item: 要设置的项
:param value: 项的值设置为False时删除该项
:return: None
"""
js = f'sessionStorage.removeItem("{item}");' if item is False \
else f'sessionStorage.setItem("{item}","{value}");'
return self.run_js_loaded(js, as_expr=True)
def set_local_storage(self, item, value):
"""设置或删除某项localStorage信息
:param item: 要设置的项
:param value: 项的值设置为False时删除该项
:return: None
"""
js = f'localStorage.removeItem("{item}");' if item is False else f'localStorage.setItem("{item}","{value}");'
return self.run_js_loaded(js, as_expr=True)
def get_screenshot(self, path=None, as_bytes=None, full_page=False, left_top=None, right_bottom=None): def get_screenshot(self, path=None, as_bytes=None, full_page=False, left_top=None, right_bottom=None):
"""对页面进行截图可对整个网页、可见网页、指定范围截图。对可视范围外截图需要90以上版本浏览器支持 """对页面进行截图可对整个网页、可见网页、指定范围截图。对可视范围外截图需要90以上版本浏览器支持
:param path: 完整路径后缀可选 'jpg','jpeg','png','webp' :param path: 完整路径后缀可选 'jpg','jpeg','png','webp'
@ -786,6 +708,164 @@ class ChromiumBase(BasePage):
warn("此方法即将弃用请用scroll.to_see()方法代替。", DeprecationWarning) warn("此方法即将弃用请用scroll.to_see()方法代替。", DeprecationWarning)
self.scroll.to_see(loc_or_ele) self.scroll.to_see(loc_or_ele)
def set_timeouts(self, implicit=None, page_load=None, script=None):
"""设置超时时间,单位为秒
:param implicit: 查找元素超时时间
:param page_load: 页面加载超时时间
:param script: 脚本运行超时时间
:return: None
"""
warn("此方法即将弃用请用set.timeouts()方法代替。", DeprecationWarning)
self.set.timeouts(implicit, page_load, script)
def set_session_storage(self, item, value):
"""设置或删除某项sessionStorage信息
:param item: 要设置的项
:param value: 项的值设置为False时删除该项
:return: None
"""
warn("此方法即将弃用请用set.session_storage()方法代替。", DeprecationWarning)
return self.set.session_storage(item, value)
def set_local_storage(self, item, value):
"""设置或删除某项localStorage信息
:param item: 要设置的项
:param value: 项的值设置为False时删除该项
:return: None
"""
warn("此方法即将弃用请用set.local_storage()方法代替。", DeprecationWarning)
return self.set.local_storage(item, value)
def set_user_agent(self, ua, platform=None):
"""为当前tab设置user agent只在当前tab有效
:param ua: user agent字符串
:param platform: platform字符串
:return: None
"""
warn("此方法即将弃用请用set.user_agent()方法代替。", DeprecationWarning)
self.set.user_agent(ua, platform)
def set_cookies(self, cookies):
"""设置cookies值
:param cookies: cookies信息
:return: None
"""
warn("此方法即将弃用请用set.cookies()方法代替。", DeprecationWarning)
self.set.cookies(cookies)
def set_upload_files(self, files):
"""等待上传的文件路径
:param files: 文件路径列表或字符串字符串时多个文件用回车分隔
:return: None
"""
warn("此方法即将弃用请用set.upload_files()方法代替。", DeprecationWarning)
self.set.upload_files(files)
def set_headers(self, headers: dict) -> None:
"""设置固定发送的headers
:param headers: dict格式的headers数据
:return: None
"""
warn("此方法即将弃用请用set.headers()方法代替。", DeprecationWarning)
self.set.headers(headers)
@property
def set_page_load_strategy(self):
"""返回用于设置页面加载策略的对象"""
warn("此方法即将弃用请用set.load_strategy.xxxx()方法代替。", DeprecationWarning)
return self.set.load_strategy
class ChromiumBaseSetter(object):
def __init__(self, page):
self._page = page
@property
def load_strategy(self):
"""返回用于设置页面加载策略的对象"""
return PageLoadStrategy(self._page)
def timeouts(self, implicit=None, page_load=None, script=None):
"""设置超时时间,单位为秒
:param implicit: 查找元素超时时间
:param page_load: 页面加载超时时间
:param script: 脚本运行超时时间
:return: None
"""
if implicit is not None:
self._page.timeouts.implicit = implicit
if page_load is not None:
self._page.timeouts.page_load = page_load
if script is not None:
self._page.timeouts.script = script
def user_agent(self, ua, platform=None):
"""为当前tab设置user agent只在当前tab有效
:param ua: user agent字符串
:param platform: platform字符串
:return: None
"""
keys = {'userAgent': ua}
if platform:
keys['platform'] = platform
self._page.run_cdp('Emulation.setUserAgentOverride', **keys)
def session_storage(self, item, value):
"""设置或删除某项sessionStorage信息
:param item: 要设置的项
:param value: 项的值设置为False时删除该项
:return: None
"""
js = f'sessionStorage.removeItem("{item}");' if item is False else f'sessionStorage.setItem("{item}","{value}");'
return self._page.run_js_loaded(js, as_expr=True)
def local_storage(self, item, value):
"""设置或删除某项localStorage信息
:param item: 要设置的项
:param value: 项的值设置为False时删除该项
:return: None
"""
js = f'localStorage.removeItem("{item}");' if item is False else f'localStorage.setItem("{item}","{value}");'
return self._page.run_js_loaded(js, as_expr=True)
def cookies(self, cookies):
"""设置cookies值
:param cookies: cookies信息
:return: None
"""
cookies = cookies_to_tuple(cookies)
result_cookies = []
for cookie in cookies:
if not cookie.get('domain', None):
continue
c = {'value': '' if cookie['value'] is None else cookie['value'],
'name': cookie['name'],
'domain': cookie['domain']}
result_cookies.append(c)
self._page.run_cdp_loaded('Network.setCookies', cookies=result_cookies)
def upload_files(self, files):
"""等待上传的文件路径
:param files: 文件路径列表或字符串字符串时多个文件用回车分隔
:return: None
"""
if self._page._upload_list is None:
self._page.driver.Page.fileChooserOpened = self._page._onFileChooserOpened
self._page.run_cdp('Page.setInterceptFileChooserDialog', enabled=True)
if isinstance(files, str):
files = files.split('\n')
self._page._upload_list = [str(Path(i).absolute()) for i in files]
def headers(self, headers: dict) -> None:
"""设置固定发送的headers
:param headers: dict格式的headers数据
:return: None
"""
self._page.run_cdp('Network.setExtraHTTPHeaders', headers=headers)
class ChromiumPageWaiter(ChromiumWaiter): class ChromiumPageWaiter(ChromiumWaiter):
def __init__(self, page): def __init__(self, page):

View File

@ -38,6 +38,7 @@ class ChromiumBase(BasePage):
self._debug_recorder: Recorder = ... self._debug_recorder: Recorder = ...
self._upload_list: list = ... self._upload_list: list = ...
self._wait: ChromiumPageWaiter = ... self._wait: ChromiumPageWaiter = ...
self._set: ChromiumBaseSetter = ...
def _connect_browser(self, tab_id: str = None) -> None: ... def _connect_browser(self, tab_id: str = None) -> None: ...
@ -76,9 +77,6 @@ class ChromiumBase(BasePage):
@property @property
def driver(self) -> ChromiumDriver: ... def driver(self) -> ChromiumDriver: ...
# @property
# def _driver(self) -> ChromiumDriver: ...
@property @property
def _wait_driver(self) -> ChromiumDriver: ... def _wait_driver(self) -> ChromiumDriver: ...
@ -115,15 +113,22 @@ class ChromiumBase(BasePage):
@property @property
def timeouts(self) -> Timeout: ... def timeouts(self) -> Timeout: ...
@property
def set_page_load_strategy(self) -> PageLoadStrategy: ...
@property @property
def upload_list(self) -> list: ... def upload_list(self) -> list: ...
@property @property
def wait(self) -> ChromiumPageWaiter: ... def wait(self) -> ChromiumPageWaiter: ...
@property
def set(self) -> ChromiumBaseSetter: ...
@property
def set_page_load_strategy(self) -> PageLoadStrategy: ...
def set_cookies(self, cookies: Union[RequestsCookieJar, list, tuple, str, dict]) -> None: ...
def set_headers(self, headers: dict) -> None: ...
def set_timeouts(self, implicit: float = None, page_load: float = None, script: float = None) -> None: ... def set_timeouts(self, implicit: float = None, page_load: float = None, script: float = None) -> None: ...
def run_js(self, script: str, as_expr: bool = False, *args: Any) -> Any: ... def run_js(self, script: str, as_expr: bool = False, *args: Any) -> Any: ...
@ -143,10 +148,6 @@ class ChromiumBase(BasePage):
def get_cookies(self, as_dict: bool = False) -> Union[list, dict]: ... def get_cookies(self, as_dict: bool = False) -> Union[list, dict]: ...
def set_cookies(self, cookies: Union[RequestsCookieJar, list, tuple, str, dict]) -> None: ...
def set_headers(self, headers: dict) -> None: ...
def ele(self, def ele(self,
loc_or_ele: Union[Tuple[str, str], str, ChromiumElement, ChromiumFrame], loc_or_ele: Union[Tuple[str, str], str, ChromiumElement, ChromiumFrame],
timeout: float = None) -> Union[ChromiumElement, ChromiumFrame, None]: ... timeout: float = None) -> Union[ChromiumElement, ChromiumFrame, None]: ...
@ -232,6 +233,29 @@ class ChromiumPageScroll(ChromiumScroll):
def to_see(self, loc_or_ele: Union[str, tuple, ChromiumElement]) -> None: ... def to_see(self, loc_or_ele: Union[str, tuple, ChromiumElement]) -> None: ...
class ChromiumBaseSetter(object):
def __init__(self, page):
self._page: ChromiumBase = ...
@property
def load_strategy(self) -> PageLoadStrategy: ...
def timeouts(self, implicit: Union[int, float] = None, page_load: Union[int, float] = None,
script: Union[int, float] = None): ...
def user_agent(self, ua: str, platform: str = None) -> None: ...
def session_storage(self, item: str, value: Union[str, bool]) -> None: ...
def local_storage(self, item: str, value: Union[str, bool]) -> None: ...
def cookies(self, cookies: Union[RequestsCookieJar, list, tuple, str, dict]) -> None: ...
def headers(self, headers: dict) -> None: ...
def upload_files(self, files: Union[str, list, tuple]) -> None: ...
class Timeout(object): class Timeout(object):
def __init__(self, page: ChromiumBase, implicit=None, page_load=None, script=None): def __init__(self, page: ChromiumBase, implicit=None, page_load=None, script=None):

View File

@ -10,14 +10,13 @@ from time import perf_counter, sleep
from warnings import warn from warnings import warn
from .base import DrissionElement, BaseElement from .base import DrissionElement, BaseElement
from .functions.constants import FRAME_ELEMENT
from .functions.errors import ContextLossError, ElementLossError from .functions.errors import ContextLossError, ElementLossError
from .functions.locator import get_loc from .functions.locator import get_loc
from .functions.web import make_absolute_link, get_ele_txt, format_html, is_js_func, location_in_viewport, offset_scroll from .functions.web import make_absolute_link, get_ele_txt, format_html, is_js_func, location_in_viewport, offset_scroll
from .keys import _keys_to_typing, _keyDescriptionForString, _keyDefinitions from .keys import _keys_to_typing, _keyDescriptionForString, _keyDefinitions
from .session_element import make_session_ele from .session_element import make_session_ele
__FRAME_ELEMENT__ = ('iframe', 'frame')
class ChromiumElement(DrissionElement): class ChromiumElement(DrissionElement):
"""ChromePage页面对象中的元素对象""" """ChromePage页面对象中的元素对象"""
@ -71,8 +70,7 @@ class ChromiumElement(DrissionElement):
def tag(self): def tag(self):
"""返回元素tag""" """返回元素tag"""
if self._tag is None: if self._tag is None:
self._tag = self.page.run_cdp('DOM.describeNode', nodeId=self._node_id)['node'][ self._tag = self.page.run_cdp('DOM.describeNode', nodeId=self._node_id)['node']['localName'].lower()
'localName'].lower()
return self._tag return self._tag
@property @property
@ -438,7 +436,7 @@ class ChromiumElement(DrissionElement):
:param loc_or_str: 元素的定位信息可以是loc元组或查询字符串 :param loc_or_str: 元素的定位信息可以是loc元组或查询字符串
:return: SessionElement对象或属性文本 :return: SessionElement对象或属性文本
""" """
if self.tag in ('iframe', 'frame'): if self.tag in FRAME_ELEMENT:
return make_session_ele(self.inner_html, loc_or_str) return make_session_ele(self.inner_html, loc_or_str)
return make_session_ele(self, loc_or_str) return make_session_ele(self, loc_or_str)
@ -447,7 +445,7 @@ class ChromiumElement(DrissionElement):
:param loc_or_str: 定位符 :param loc_or_str: 定位符
:return: SessionElement或属性文本组成的列表 :return: SessionElement或属性文本组成的列表
""" """
if self.tag in ('iframe', 'frame'): if self.tag in FRAME_ELEMENT:
return make_session_ele(self.inner_html, loc_or_str, single=False) return make_session_ele(self.inner_html, loc_or_str, single=False)
return make_session_ele(self, loc_or_str, single=False) return make_session_ele(self, loc_or_str, single=False)
@ -565,12 +563,12 @@ class ChromiumElement(DrissionElement):
if modifier != 0: # 包含修饰符 if modifier != 0: # 包含修饰符
for key in vals: for key in vals:
_send_key(self, modifier, key) send_key(self, modifier, key)
return return
if vals.endswith('\n'): if vals.endswith('\n'):
self.page.run_cdp('Input.insertText', text=vals[:-1]) self.page.run_cdp('Input.insertText', text=vals[:-1])
_send_key(self, modifier, '\n') send_key(self, modifier, '\n')
else: else:
self.page.run_cdp('Input.insertText', text=vals) self.page.run_cdp('Input.insertText', text=vals)
@ -1131,23 +1129,24 @@ def find_in_chromium_ele(ele, loc, single=True, timeout=None, relative=True):
# ---------------执行查找----------------- # ---------------执行查找-----------------
if loc[0] == 'xpath': if loc[0] == 'xpath':
return _find_by_xpath(ele, loc[1], single, timeout, relative=relative) return find_by_xpath(ele, loc[1], single, timeout, relative=relative)
else: else:
return _find_by_css(ele, loc[1], single, timeout) return find_by_css(ele, loc[1], single, timeout)
def _find_by_xpath(ele, xpath, single, timeout, relative=True): def find_by_xpath(ele, xpath, single, timeout, relative=True):
"""执行用xpath在元素中查找元素 """执行用xpath在元素中查找元素
:param ele: 在此元素中查找 :param ele: 在此元素中查找
:param xpath: 查找语句 :param xpath: 查找语句
:param single: 是否只返回第一个结果 :param single: 是否只返回第一个结果
:param timeout: 超时时间 :param timeout: 超时时间
:param relative: 是否相对定位
:return: ChromiumElement或其组成的列表 :return: ChromiumElement或其组成的列表
""" """
type_txt = '9' if single else '7' type_txt = '9' if single else '7'
node_txt = 'this.contentDocument' if ele.tag in ('iframe', 'frame') and not relative else 'this' node_txt = 'this.contentDocument' if ele.tag in FRAME_ELEMENT and not relative else 'this'
js = _make_js_for_find_ele_by_xpath(xpath, type_txt, node_txt) js = make_js_for_find_ele_by_xpath(xpath, type_txt, node_txt)
r = ele.page.run_cdp('Runtime.callFunctionOn', r = ele.page.run_cdp('Runtime.callFunctionOn',
functionDeclaration=js, objectId=ele.obj_id, returnByValue=False, awaitPromise=True, functionDeclaration=js, objectId=ele.obj_id, returnByValue=False, awaitPromise=True,
userGesture=True) userGesture=True)
@ -1156,7 +1155,7 @@ def _find_by_xpath(ele, xpath, single, timeout, relative=True):
if 'exceptionDetails' in r: if 'exceptionDetails' in r:
if 'The result is not a node set' in r['result']['description']: if 'The result is not a node set' in r['result']['description']:
js = _make_js_for_find_ele_by_xpath(xpath, '1', node_txt) js = make_js_for_find_ele_by_xpath(xpath, '1', node_txt)
r = ele.page.run_cdp('Runtime.callFunctionOn', r = ele.page.run_cdp('Runtime.callFunctionOn',
functionDeclaration=js, objectId=ele.obj_id, returnByValue=False, awaitPromise=True, functionDeclaration=js, objectId=ele.obj_id, returnByValue=False, awaitPromise=True,
userGesture=True) userGesture=True)
@ -1183,7 +1182,7 @@ def _find_by_xpath(ele, xpath, single, timeout, relative=True):
for i in r[:-1]] for i in r[:-1]]
def _find_by_css(ele, selector, single, timeout): def find_by_css(ele, selector, single, timeout):
"""执行用css selector在元素中查找元素 """执行用css selector在元素中查找元素
:param ele: 在此元素中查找 :param ele: 在此元素中查找
:param selector: 查找语句 :param selector: 查找语句
@ -1226,14 +1225,14 @@ def make_chromium_ele(page, node_id=None, obj_id=None):
:return: ChromiumElement对象或ChromiumFrame对象 :return: ChromiumElement对象或ChromiumFrame对象
""" """
ele = ChromiumElement(page, obj_id=obj_id, node_id=node_id) ele = ChromiumElement(page, obj_id=obj_id, node_id=node_id)
if ele.tag in ('iframe', 'frame'): if ele.tag in FRAME_ELEMENT:
from .chromium_frame import ChromiumFrame from .chromium_frame import ChromiumFrame
ele = ChromiumFrame(page, ele) ele = ChromiumFrame(page, ele)
return ele return ele
def _make_js_for_find_ele_by_xpath(xpath, type_txt, node_txt): def make_js_for_find_ele_by_xpath(xpath, type_txt, node_txt):
"""生成用xpath在元素中查找元素的js文本 """生成用xpath在元素中查找元素的js文本
:param xpath: xpath文本 :param xpath: xpath文本
:param type_txt: 查找类型 :param type_txt: 查找类型
@ -1302,7 +1301,7 @@ def run_js(page_or_ele, script, as_expr=False, timeout=None, args=None):
if not is_js_func(script): if not is_js_func(script):
script = f'function(){{{script}}}' script = f'function(){{{script}}}'
res = page.run_cdp('Runtime.callFunctionOn', functionDeclaration=script, objectId=obj_id, res = page.run_cdp('Runtime.callFunctionOn', functionDeclaration=script, objectId=obj_id,
arguments=[_convert_argument(arg) for arg in args], returnByValue=False, arguments=[convert_argument(arg) for arg in args], returnByValue=False,
awaitPromise=True, userGesture=True) awaitPromise=True, userGesture=True)
except ContextLossError: except ContextLossError:
@ -1319,12 +1318,12 @@ def run_js(page_or_ele, script, as_expr=False, timeout=None, args=None):
raise RuntimeError(f'javascript{script}\n错误信息: {exceptionDetails}') raise RuntimeError(f'javascript{script}\n错误信息: {exceptionDetails}')
try: try:
return _parse_js_result(page, page_or_ele, res.get('result')) return parse_js_result(page, page_or_ele, res.get('result'))
except Exception: except Exception:
return res return res
def _parse_js_result(page, ele, result): def parse_js_result(page, ele, result):
"""解析js返回的结果""" """解析js返回的结果"""
if 'unserializableValue' in result: if 'unserializableValue' in result:
return result['unserializableValue'] return result['unserializableValue']
@ -1348,7 +1347,7 @@ def _parse_js_result(page, ele, result):
elif sub_type == 'array': elif sub_type == 'array':
r = page.run_cdp('Runtime.getProperties', objectId=result['result']['objectId'], r = page.run_cdp('Runtime.getProperties', objectId=result['result']['objectId'],
ownProperties=True)['result'] ownProperties=True)['result']
return [_parse_js_result(page, ele, result=i['value']) for i in r] return [parse_js_result(page, ele, result=i['value']) for i in r]
else: else:
return result['value'] return result['value']
@ -1360,7 +1359,7 @@ def _parse_js_result(page, ele, result):
return result['value'] return result['value']
def _convert_argument(arg): def convert_argument(arg):
"""把参数转换成js能够接收的形式""" """把参数转换成js能够接收的形式"""
if isinstance(arg, ChromiumElement): if isinstance(arg, ChromiumElement):
return {'objectId': arg.obj_id} return {'objectId': arg.obj_id}
@ -1375,7 +1374,7 @@ def _convert_argument(arg):
return {'unserializableValue': '-Infinity'} return {'unserializableValue': '-Infinity'}
def _send_enter(ele): def send_enter(ele):
"""发送回车""" """发送回车"""
data = {'type': 'keyDown', 'modifiers': 0, 'windowsVirtualKeyCode': 13, 'code': 'Enter', 'key': 'Enter', data = {'type': 'keyDown', 'modifiers': 0, 'windowsVirtualKeyCode': 13, 'code': 'Enter', 'key': 'Enter',
'text': '\r', 'autoRepeat': False, 'unmodifiedText': '\r', 'location': 0, 'isKeypad': False} 'text': '\r', 'autoRepeat': False, 'unmodifiedText': '\r', 'location': 0, 'isKeypad': False}
@ -1385,7 +1384,7 @@ def _send_enter(ele):
ele.page.run_cdp('Input.dispatchKeyEvent', **data) ele.page.run_cdp('Input.dispatchKeyEvent', **data)
def _send_key(ele, modifier, key): def send_key(ele, modifier, key):
"""发送一个字,在键盘中的字符触发按键,其它直接发送文本""" """发送一个字,在键盘中的字符触发按键,其它直接发送文本"""
if key not in _keyDefinitions: if key not in _keyDefinitions:
ele.page.run_cdp('Input.insertText', text=key) ele.page.run_cdp('Input.insertText', text=key)

View File

@ -346,40 +346,40 @@ def find_in_chromium_ele(ele: ChromiumElement,
ChromiumElement, str, None, List[Union[ChromiumElement, str]]]: ... ChromiumElement, str, None, List[Union[ChromiumElement, str]]]: ...
def _find_by_xpath(ele: ChromiumElement, def find_by_xpath(ele: ChromiumElement,
xpath: str, xpath: str,
single: bool, single: bool,
timeout: float, timeout: float,
relative: bool = True) -> Union[ChromiumElement, List[ChromiumElement], None]: ... relative: bool = True) -> Union[ChromiumElement, List[ChromiumElement], None]: ...
def _find_by_css(ele: ChromiumElement, def find_by_css(ele: ChromiumElement,
selector: str, selector: str,
single: bool, single: bool,
timeout: float) -> Union[ChromiumElement, List[ChromiumElement], None]: ... timeout: float) -> Union[ChromiumElement, List[ChromiumElement], None]: ...
def make_chromium_ele(page: ChromiumBase, node_id: str = ..., obj_id: str = ...) -> Union[ def make_chromium_ele(page: ChromiumBase, node_id: str = ..., obj_id: str = ...) -> Union[
ChromiumElement, ChromiumFrame]: ... ChromiumElement, ChromiumFrame]: ...
def _make_js_for_find_ele_by_xpath(xpath: str, type_txt: str, node_txt: str) -> str: ... def make_js_for_find_ele_by_xpath(xpath: str, type_txt: str, node_txt: str) -> str: ...
def run_js(page_or_ele: Union[ChromiumBase, ChromiumElement, ChromiumShadowRootElement], script: str, def run_js(page_or_ele: Union[ChromiumBase, ChromiumElement, ChromiumShadowRootElement], script: str,
as_expr: bool = False, timeout: float = None, args: tuple = ...) -> Any: ... as_expr: bool = False, timeout: float = None, args: tuple = ...) -> Any: ...
def _parse_js_result(page: ChromiumBase, ele: ChromiumElement, result: dict): ... def parse_js_result(page: ChromiumBase, ele: ChromiumElement, result: dict): ...
def _convert_argument(arg: Any) -> dict: ... def convert_argument(arg: Any) -> dict: ...
def _send_enter(ele: ChromiumElement) -> None: ... def send_enter(ele: ChromiumElement) -> None: ...
def _send_key(ele: ChromiumElement, modifier: int, key: str) -> None: ... def send_key(ele: ChromiumElement, modifier: int, key: str) -> None: ...
class ChromiumScroll(object): class ChromiumScroll(object):

View File

@ -5,8 +5,9 @@
""" """
from re import search from re import search
from time import sleep from time import sleep
from warnings import warn
from .chromium_base import ChromiumBase, ChromiumPageScroll from .chromium_base import ChromiumBase, ChromiumPageScroll, ChromiumBaseSetter
from .chromium_element import ChromiumElement from .chromium_element import ChromiumElement
@ -267,6 +268,13 @@ class ChromiumFrame(ChromiumBase):
"""返回用于等待的对象""" """返回用于等待的对象"""
return ChromiumFrameScroll(self) return ChromiumFrameScroll(self)
@property
def set(self):
"""返回用于等待的对象"""
if self._set is None:
self._set = ChromiumFrameSetter(self)
return self._set
def refresh(self): def refresh(self):
"""刷新frame页面""" """刷新frame页面"""
self._check_ok() self._check_ok()
@ -280,15 +288,6 @@ class ChromiumFrame(ChromiumBase):
self._check_ok() self._check_ok()
return self.frame_ele.attr(attr) return self.frame_ele.attr(attr)
def set_attr(self, attr, value):
"""设置frame元素attribute属性
:param attr: 属性名
:param value: 属性值
:return: None
"""
self._check_ok()
self.frame_ele.set_attr(attr, value)
def remove_attr(self, attr): def remove_attr(self, attr):
"""删除frame元素attribute属性 """删除frame元素attribute属性
:param attr: 属性名 :param attr: 属性名
@ -454,6 +453,15 @@ class ChromiumFrame(ChromiumBase):
"""返回当前frame是否同域""" """返回当前frame是否同域"""
return self.frame_id in str(self.page.run_cdp('Page.getFrameTree')['frameTree']) return self.frame_id in str(self.page.run_cdp('Page.getFrameTree')['frameTree'])
def set_attr(self, attr, value):
"""设置frame元素attribute属性
:param attr: 属性名
:param value: 属性值
:return: None
"""
warn("此方法即将弃用请用set.attr()方法代替。", DeprecationWarning)
self.set.attr(attr, value)
class ChromiumFrameScroll(ChromiumPageScroll): class ChromiumFrameScroll(ChromiumPageScroll):
def __init__(self, frame): def __init__(self, frame):
@ -462,3 +470,14 @@ class ChromiumFrameScroll(ChromiumPageScroll):
""" """
self._driver = frame.doc_ele self._driver = frame.doc_ele
self.t1 = self.t2 = 'this.documentElement' self.t1 = self.t2 = 'this.documentElement'
class ChromiumFrameSetter(ChromiumBaseSetter):
def attr(self, attr, value):
"""设置frame元素attribute属性
:param attr: 属性名
:param value: 属性值
:return: None
"""
self._page._check_ok()
self._page.frame_ele.set_attr(attr, value)

View File

@ -5,8 +5,8 @@
""" """
from typing import Union, Tuple, List, Any from typing import Union, Tuple, List, Any
from .chromium_element import ChromiumElement, ChromiumScroll from .chromium_base import ChromiumBase, ChromiumPageScroll, ChromiumBaseSetter
from .chromium_base import ChromiumBase, ChromiumPageScroll from .chromium_element import ChromiumElement
class ChromiumFrame(ChromiumBase): class ChromiumFrame(ChromiumBase):
@ -105,6 +105,9 @@ class ChromiumFrame(ChromiumBase):
@property @property
def scroll(self) -> ChromiumFrameScroll: ... def scroll(self) -> ChromiumFrameScroll: ...
@property
def set(self) -> ChromiumFrameSetter: ...
def refresh(self) -> None: ... def refresh(self) -> None: ...
def attr(self, attr: str) -> Union[str, None]: ... def attr(self, attr: str) -> Union[str, None]: ...
@ -170,3 +173,9 @@ class ChromiumFrame(ChromiumBase):
class ChromiumFrameScroll(ChromiumPageScroll): class ChromiumFrameScroll(ChromiumPageScroll):
def __init__(self, frame: ChromiumFrame) -> None: ... def __init__(self, frame: ChromiumFrame) -> None: ...
class ChromiumFrameSetter(ChromiumBaseSetter):
_page: ChromiumFrame = ...
def attr(self, attr: str, value: str) -> None: ...

View File

@ -11,7 +11,7 @@ from warnings import warn
from requests import Session from requests import Session
from .chromium_base import ChromiumBase, Timeout from .chromium_base import ChromiumBase, Timeout, ChromiumBaseSetter
from .chromium_driver import ChromiumDriver, CallMethodException from .chromium_driver import ChromiumDriver, CallMethodException
from .chromium_tab import ChromiumTab from .chromium_tab import ChromiumTab
from .configs.chromium_options import ChromiumOptions from .configs.chromium_options import ChromiumOptions
@ -91,7 +91,6 @@ class ChromiumPage(ChromiumBase):
"""添加ChromiumPage独有的运行配置""" """添加ChromiumPage独有的运行配置"""
super()._chromium_init() super()._chromium_init()
self._alert = Alert() self._alert = Alert()
self._window_setter = None
def _driver_init(self, tab_id): def _driver_init(self, tab_id):
"""新建页面、页面刷新、切换标签页后要进行的cdp参数初始化 """新建页面、页面刷新、切换标签页后要进行的cdp参数初始化
@ -148,11 +147,11 @@ class ChromiumPage(ChromiumBase):
return None return None
@property @property
def set_window(self): def set(self):
"""返回用于设置窗口大小的对象""" """返回用于等待的对象"""
if self._window_setter is None: if self._set is None:
self._window_setter = WindowSetter(self) self._set = ChromiumPageSetter(self)
return self._window_setter return self._set
@property @property
def download_path(self): def download_path(self):
@ -211,13 +210,6 @@ class ChromiumPage(ChromiumBase):
else: else:
self.run_cdp('Target.createTarget', url='') self.run_cdp('Target.createTarget', url='')
def set_main_tab(self, tab_id=None):
"""设置主tab
:param tab_id: 标签页id不传入则设置当前tab
:return: None
"""
self._main_tab = tab_id or self.tab_id
def to_main_tab(self): def to_main_tab(self):
"""跳转到主标签页""" """跳转到主标签页"""
self.to_tab(self._main_tab) self.to_tab(self._main_tab)
@ -358,6 +350,20 @@ class ChromiumPage(ChromiumBase):
self._alert.response_text = None self._alert.response_text = None
self._tab_obj.has_alert = True self._tab_obj.has_alert = True
def set_main_tab(self, tab_id=None):
"""设置主tab
:param tab_id: 标签页id不传入则设置当前tab
:return: None
"""
warn("此方法即将弃用请用set.main_tab()方法代替。", DeprecationWarning)
self.set.main_tab(tab_id)
@property
def set_window(self):
"""返回用于设置窗口大小的对象"""
warn("此方法即将弃用请用set.window.xxxx()方法代替。", DeprecationWarning)
return WindowSetter(self)
class ChromiumDownloadSetter(DownloadSetter): class ChromiumDownloadSetter(DownloadSetter):
"""用于设置下载参数的类""" """用于设置下载参数的类"""
@ -558,6 +564,20 @@ class WindowSetter(object):
self._page.run_cdp('Browser.setWindowBounds', windowId=self._window_id, bounds=bounds) self._page.run_cdp('Browser.setWindowBounds', windowId=self._window_id, bounds=bounds)
class ChromiumPageSetter(ChromiumBaseSetter):
def main_tab(self, tab_id=None):
"""设置主tab
:param tab_id: 标签页id不传入则设置当前tab
:return: None
"""
self._page._main_tab = tab_id or self._page.tab_id
@property
def windows(self):
"""返回用于设置浏览器窗口的对象"""
return WindowSetter(self._page)
def show_or_hide_browser(page, hide=True): def show_or_hide_browser(page, hide=True):
"""执行显示或隐藏浏览器窗口 """执行显示或隐藏浏览器窗口
:param page: ChromePage对象 :param page: ChromePage对象

View File

@ -11,12 +11,12 @@ from typing import Union, Tuple, List
from DownloadKit import DownloadKit from DownloadKit import DownloadKit
from requests import Session from requests import Session
from session_page import DownloadSetter from .chromium_base import ChromiumBase, ChromiumBaseSetter
from .configs.chromium_options import ChromiumOptions
from .chromium_base import ChromiumBase
from .chromium_driver import ChromiumDriver from .chromium_driver import ChromiumDriver
from .chromium_tab import ChromiumTab from .chromium_tab import ChromiumTab
from .configs.chromium_options import ChromiumOptions
from .configs.driver_options import DriverOptions from .configs.driver_options import DriverOptions
from .session_page import DownloadSetter
class ChromiumPage(ChromiumBase): class ChromiumPage(ChromiumBase):
@ -60,6 +60,9 @@ class ChromiumPage(ChromiumBase):
@property @property
def process_id(self) -> Union[None, int]: ... def process_id(self) -> Union[None, int]: ...
@property
def set(self) -> ChromiumPageSetter: ...
@property @property
def set_window(self) -> WindowSetter: ... def set_window(self) -> WindowSetter: ...
@ -178,3 +181,12 @@ def get_browser_progress_id(progress: Union[popen, None], address: str) -> Union
def get_chrome_hwnds_from_pid(pid: str, title: str) -> list: ... def get_chrome_hwnds_from_pid(pid: str, title: str) -> list: ...
class ChromiumPageSetter(ChromiumBaseSetter):
_page: ChromiumPage = ...
def main_tab(self, tab_id: str = None) -> None: ...
@property
def windows(self) -> WindowSetter: ...

View File

@ -0,0 +1,3 @@
# -*- coding:utf-8 -*-
HANDLE_ALERT_METHOD = 'Page.handleJavaScriptDialog'
FRAME_ELEMENT = ('iframe', 'frame')

View File

@ -6,6 +6,7 @@
from re import search from re import search
from time import sleep from time import sleep
from urllib.parse import urlparse from urllib.parse import urlparse
from warnings import warn
from DownloadKit import DownloadKit from DownloadKit import DownloadKit
from requests import Session, Response from requests import Session, Response
@ -52,6 +53,7 @@ class SessionPage(BasePage):
"""设置运行时用到的属性""" """设置运行时用到的属性"""
self._timeout = self._session_options.timeout self._timeout = self._session_options.timeout
self._download_path = self._session_options.download_path self._download_path = self._session_options.download_path
self._set = None
def _create_session(self): def _create_session(self):
"""创建内建Session对象""" """创建内建Session对象"""
@ -68,7 +70,7 @@ class SessionPage(BasePage):
if opt.headers: if opt.headers:
self._session.headers = CaseInsensitiveDict(opt.headers) self._session.headers = CaseInsensitiveDict(opt.headers)
if opt.cookies: if opt.cookies:
self.set_cookies(opt.cookies) self.set.cookies(opt.cookies)
if opt.adapters: if opt.adapters:
for url, adapter in opt.adapters: for url, adapter in opt.adapters:
self._session.mount(url, adapter) self._session.mount(url, adapter)
@ -80,26 +82,6 @@ class SessionPage(BasePage):
if attr: if attr:
self._session.__setattr__(i, attr) self._session.__setattr__(i, attr)
def set_cookies(self, cookies):
"""为Session对象设置cookies
:param cookies: cookies信息
:return: None
"""
set_session_cookies(self.session, cookies)
def set_headers(self, headers):
"""设置通用的headers设置的headers值回逐个覆盖原有的不会清理原来的
:param headers: dict形式的headers
:return: None
"""
headers = CaseInsensitiveDict(headers)
for i in headers:
self.session.headers[i] = headers[i]
def set_user_agent(self, ua):
"""设置user agent"""
self.session.headers['user-agent'] = ua
def __call__(self, loc_or_str, timeout=None): def __call__(self, loc_or_str, timeout=None):
"""在内部查找元素 """在内部查找元素
ele2 = ele1('@id=ele_id') ele2 = ele1('@id=ele_id')
@ -161,6 +143,13 @@ class SessionPage(BasePage):
"""返回访问url得到的response对象""" """返回访问url得到的response对象"""
return self._response return self._response
@property
def set(self):
"""返回用于等待的对象"""
if self._set is None:
self._set = SessionPageSetter(self)
return self._set
def get(self, url, show_errmsg=False, retry=None, interval=None, timeout=None, **kwargs): def get(self, url, show_errmsg=False, retry=None, interval=None, timeout=None, **kwargs):
"""用get方式跳转到url """用get方式跳转到url
:param url: 目标url :param url: 目标url
@ -340,6 +329,52 @@ class SessionPage(BasePage):
raise ConnectionError(f'状态码:{r.status_code}') raise ConnectionError(f'状态码:{r.status_code}')
return r, f'状态码:{r.status_code}' return r, f'状态码:{r.status_code}'
def set_cookies(self, cookies):
"""为Session对象设置cookies
:param cookies: cookies信息
:return: None
"""
warn("此方法即将弃用请用set.load_strategy.xxxx()方法代替。", DeprecationWarning)
self.set.cookies(cookies)
def set_headers(self, headers):
"""设置通用的headers设置的headers值回逐个覆盖原有的不会清理原来的
:param headers: dict形式的headers
:return: None
"""
warn("此方法即将弃用请用set.load_strategy.xxxx()方法代替。", DeprecationWarning)
self.set.headers(headers)
def set_user_agent(self, ua):
"""设置user agent"""
warn("此方法即将弃用请用set.load_strategy.xxxx()方法代替。", DeprecationWarning)
self.set.user_agent(ua)
class SessionPageSetter(object):
def __init__(self, page):
self._page = page
def cookies(self, cookies):
"""为Session对象设置cookies
:param cookies: cookies信息
:return: None
"""
set_session_cookies(self._page.session, cookies)
def headers(self, headers):
"""设置通用的headers设置的headers值回逐个覆盖原有的不会清理原来的
:param headers: dict形式的headers
:return: None
"""
headers = CaseInsensitiveDict(headers)
for i in headers:
self._page.session.headers[i] = headers[i]
def user_agent(self, ua):
"""设置user agent"""
self._page.session.headers['user-agent'] = ua
class DownloadSetter(object): class DownloadSetter(object):
"""用于设置下载参数的类""" """用于设置下载参数的类"""

View File

@ -32,6 +32,7 @@ class SessionPage(BasePage):
self.timeout: float = ... self.timeout: float = ...
self.retry_times: int = ... self.retry_times: int = ...
self.retry_interval: float = ... self.retry_interval: float = ...
self._set: SessionPageSetter = ...
def _set_start_options(self, session_or_options, none) -> None: ... def _set_start_options(self, session_or_options, none) -> None: ...
@ -119,6 +120,9 @@ class SessionPage(BasePage):
@property @property
def response(self) -> Response: ... def response(self) -> Response: ...
@property
def set(self) -> SessionPageSetter: ...
@property @property
def download(self) -> DownloadKit: ... def download(self) -> DownloadKit: ...
@ -161,6 +165,17 @@ class SessionPage(BasePage):
**kwargs) -> tuple: ... **kwargs) -> tuple: ...
class SessionPageSetter(object):
def __init__(self, page: SessionPage):
self._page: SessionPage = ...
def cookies(self, cookies: Union[RequestsCookieJar, list, tuple, str, dict]) -> None: ...
def headers(self, headers: dict) -> None: ...
def user_agent(self, ua: str) -> None: ...
class DownloadSetter(object): class DownloadSetter(object):
def __init__(self, page: Union[SessionPage, WebPage, ChromiumPage]): def __init__(self, page: Union[SessionPage, WebPage, ChromiumPage]):
self._page: SessionPage = ... self._page: SessionPage = ...

View File

@ -12,12 +12,11 @@ from tldextract import extract
from .base import BasePage from .base import BasePage
from .chromium_base import ChromiumBase, Timeout from .chromium_base import ChromiumBase, Timeout
from .chromium_driver import ChromiumDriver, CallMethodException from .chromium_driver import ChromiumDriver, CallMethodException
from .chromium_page import ChromiumPage, ChromiumDownloadSetter from .chromium_page import ChromiumPage, ChromiumDownloadSetter, ChromiumPageSetter
from .configs.chromium_options import ChromiumOptions from .configs.chromium_options import ChromiumOptions
from .configs.driver_options import DriverOptions from .configs.driver_options import DriverOptions
from .configs.session_options import SessionOptions from .configs.session_options import SessionOptions
from .functions.web import cookies_to_tuple from .session_page import SessionPage, SessionPageSetter
from .session_page import SessionPage
class WebPage(SessionPage, ChromiumPage, BasePage): class WebPage(SessionPage, ChromiumPage, BasePage):
@ -107,12 +106,12 @@ class WebPage(SessionPage, ChromiumPage, BasePage):
self._download_path = None self._download_path = None
if se_opt is not False: if se_opt is not False:
self.set_timeouts(implicit=self._session_options.timeout) self.set.timeouts(implicit=self._session_options.timeout)
self._download_path = self._session_options.download_path self._download_path = self._session_options.download_path
if dr_opt is not False: if dr_opt is not False:
t = self._driver_options.timeouts t = self._driver_options.timeouts
self.set_timeouts(t['implicit'], t['pageLoad'], t['script']) self.set.timeouts(t['implicit'], t['pageLoad'], t['script'])
self._download_path = self._driver_options.download_path self._download_path = self._driver_options.download_path
def _set_runtime_settings(self): def _set_runtime_settings(self):
@ -203,7 +202,7 @@ class WebPage(SessionPage, ChromiumPage, BasePage):
:param second: 秒数 :param second: 秒数
:return: None :return: None
""" """
self.set_timeouts(implicit=second) self.set.timeouts(implicit=second)
@property @property
def download_path(self): def download_path(self):
@ -222,6 +221,13 @@ class WebPage(SessionPage, ChromiumPage, BasePage):
"""返回下载器对象""" """返回下载器对象"""
return self.download_set._switched_DownloadKit return self.download_set._switched_DownloadKit
@property
def set(self):
"""返回用于等待的对象"""
if self._set is None:
self._set = WebPageSetter(self)
return self._set
def get(self, url, show_errmsg=False, retry=None, interval=None, timeout=None, **kwargs): def get(self, url, show_errmsg=False, retry=None, interval=None, timeout=None, **kwargs):
"""跳转到一个url """跳转到一个url
:param url: 目标url :param url: 目标url
@ -249,6 +255,8 @@ class WebPage(SessionPage, ChromiumPage, BasePage):
:param kwargs: 连接参数 :param kwargs: 连接参数
:return: url是否可用 :return: url是否可用
""" """
if self.mode == 'd':
self.cookies_to_session()
return super().post(url, data, show_errmsg, retry, interval, **kwargs) return super().post(url, data, show_errmsg, retry, interval, **kwargs)
def ele(self, loc_or_ele, timeout=None): def ele(self, loc_or_ele, timeout=None):
@ -345,7 +353,7 @@ class WebPage(SessionPage, ChromiumPage, BasePage):
selenium_user_agent = self.run_cdp('Runtime.evaluate', expression='navigator.userAgent;')['result']['value'] selenium_user_agent = self.run_cdp('Runtime.evaluate', expression='navigator.userAgent;')['result']['value']
self.session.headers.update({"User-Agent": selenium_user_agent}) self.session.headers.update({"User-Agent": selenium_user_agent})
self.set_cookies(self._get_driver_cookies(as_dict=True), set_session=True) self.set.cookies(self._get_driver_cookies(as_dict=True), set_session=True)
def cookies_to_driver(self): def cookies_to_driver(self):
"""把session对象的cookies复制到driver对象""" """把session对象的cookies复制到driver对象"""
@ -358,7 +366,7 @@ class WebPage(SessionPage, ChromiumPage, BasePage):
if domain in cookie['domain']: if domain in cookie['domain']:
cookies.append(cookie) cookies.append(cookie)
self.set_cookies(cookies, set_driver=True) self.set.cookies(cookies, set_driver=True)
def get_cookies(self, as_dict=False, all_domains=False): def get_cookies(self, as_dict=False, all_domains=False):
"""返回cookies """返回cookies
@ -371,11 +379,6 @@ class WebPage(SessionPage, ChromiumPage, BasePage):
elif self._mode == 'd': elif self._mode == 'd':
return self._get_driver_cookies(as_dict) return self._get_driver_cookies(as_dict)
def set_user_agent(self, ua, platform=None):
"""设置user agentd模式下只有当前tab有效"""
super().set_user_agent(ua)
super(SessionPage, self).set_user_agent(ua, platform)
def _get_driver_cookies(self, as_dict=False): def _get_driver_cookies(self, as_dict=False):
"""获取浏览器cookies """获取浏览器cookies
:param as_dict: 以dict形式返回 :param as_dict: 以dict形式返回
@ -387,40 +390,6 @@ class WebPage(SessionPage, ChromiumPage, BasePage):
else: else:
return cookies return cookies
def set_cookies(self, cookies, set_session=False, set_driver=False):
"""添加cookies信息到浏览器或session对象
:param cookies: 可以接收`CookieJar``list``tuple``str``dict`格式的`cookies`
:param set_session: 是否设置到Session对象
:param set_driver: 是否设置到浏览器
:return: None
"""
# 添加cookie到driver
if set_driver:
cookies = cookies_to_tuple(cookies)
result_cookies = []
for cookie in cookies:
if not cookie.get('domain', None):
continue
c = {'value': '' if cookie['value'] is None else cookie['value'],
'name': cookie['name'],
'domain': cookie['domain']}
result_cookies.append(c)
self.run_cdp('Network.setCookies', cookies=result_cookies)
# 添加cookie到session
if set_session:
super().set_cookies(cookies)
def set_headers(self, headers: dict) -> None:
"""设置固定发送的headers
:param headers: dict格式的headers数据
:return: None
"""
if self._has_session:
return super().set_headers(headers)
if self._has_driver:
super(SessionPage, self).set_headers(headers)
def close_driver(self): def close_driver(self):
"""关闭driver及浏览器""" """关闭driver及浏览器"""
if self._has_driver: if self._has_driver:
@ -466,6 +435,66 @@ class WebPage(SessionPage, ChromiumPage, BasePage):
self._tab_obj = None self._tab_obj = None
self._has_driver = None self._has_driver = None
def set_cookies(self, cookies, set_session=False, set_driver=False):
"""添加cookies信息到浏览器或session对象
:param cookies: 可以接收`CookieJar``list``tuple``str``dict`格式的`cookies`
:param set_session: 是否设置到Session对象
:param set_driver: 是否设置到浏览器
:return: None
"""
# 添加cookie到driver
warn("此方法即将弃用请用set.user_agent()方法代替。", DeprecationWarning)
self.set.cookies(cookies, set_session, set_driver)
def set_headers(self, headers) -> None:
"""设置固定发送的headers
:param headers: dict格式的headers数据
:return: None
"""
warn("此方法即将弃用请用set.headers()方法代替。", DeprecationWarning)
self.set.headers(headers)
def set_user_agent(self, ua, platform=None):
"""设置user agentd模式下只有当前tab有效"""
warn("此方法即将弃用请用set.user_agent()方法代替。", DeprecationWarning)
self.set.user_agent(ua, platform)
class WebPageSetter(ChromiumPageSetter):
def __init__(self, page):
super().__init__(page)
self._session_setter = SessionPageSetter(self._page)
self._chromium_setter = ChromiumPageSetter(self._page)
def cookies(self, cookies, set_session=False, set_driver=False):
"""添加cookies信息到浏览器或session对象
:param cookies: 可以接收`CookieJar``list``tuple``str``dict`格式的`cookies`
:param set_session: 是否设置到Session对象
:param set_driver: 是否设置到浏览器
:return: None
"""
if set_driver and self._page._has_driver:
self._chromium_setter.cookies(cookies)
if set_session and self._page._has_session:
self._session_setter.cookies(cookies)
def headers(self, headers) -> None:
"""设置固定发送的headers
:param headers: dict格式的headers数据
:return: None
"""
if self._page._has_session:
self._session_setter.headers(headers)
if self._page._has_driver:
self._chromium_setter.headers(headers)
def user_agent(self, ua, platform=None):
"""设置user agentd模式下只有当前tab有效"""
if self._page._has_session:
self._session_setter.user_agent(ua)
if self._page._has_driver:
self._chromium_setter.user_agent(ua, platform)
class WebPageDownloadSetter(ChromiumDownloadSetter): class WebPageDownloadSetter(ChromiumDownloadSetter):
"""用于设置下载参数的类""" """用于设置下载参数的类"""

View File

@ -12,12 +12,12 @@ from .base import BasePage
from .chromium_driver import ChromiumDriver from .chromium_driver import ChromiumDriver
from .chromium_element import ChromiumElement from .chromium_element import ChromiumElement
from .chromium_frame import ChromiumFrame from .chromium_frame import ChromiumFrame
from .chromium_page import ChromiumPage, ChromiumDownloadSetter from .chromium_page import ChromiumPage, ChromiumDownloadSetter, ChromiumPageSetter
from .configs.chromium_options import ChromiumOptions from .configs.chromium_options import ChromiumOptions
from .configs.driver_options import DriverOptions from .configs.driver_options import DriverOptions
from .configs.session_options import SessionOptions from .configs.session_options import SessionOptions
from .session_element import SessionElement from .session_element import SessionElement
from .session_page import SessionPage from .session_page import SessionPage, SessionPageSetter
class WebPage(SessionPage, ChromiumPage, BasePage): class WebPage(SessionPage, ChromiumPage, BasePage):
@ -124,10 +124,12 @@ class WebPage(SessionPage, ChromiumPage, BasePage):
def set_user_agent(self, ua: str, platform: str = None) -> None: ... def set_user_agent(self, ua: str, platform: str = None) -> None: ...
def _get_driver_cookies(self, as_dict: bool = False) -> dict: ... def set_headers(self, headers: dict) -> None: ...
def set_cookies(self, cookies, set_session: bool = False, set_driver: bool = False) -> None: ... def set_cookies(self, cookies, set_session: bool = False, set_driver: bool = False) -> None: ...
def _get_driver_cookies(self, as_dict: bool = False) -> dict: ...
def close_driver(self) -> None: ... def close_driver(self) -> None: ...
def close_session(self) -> None: ... def close_session(self) -> None: ...
@ -156,6 +158,9 @@ class WebPage(SessionPage, ChromiumPage, BasePage):
@property @property
def download(self) -> DownloadKit: ... def download(self) -> DownloadKit: ...
@property
def set(self) -> WebPageSetter: ...
def _ele(self, def _ele(self,
loc_or_ele: Union[Tuple[str, str], str, ChromiumElement, SessionElement, ChromiumFrame], loc_or_ele: Union[Tuple[str, str], str, ChromiumElement, SessionElement, ChromiumFrame],
timeout: float = None, single: bool = True, relative: bool = False) \ timeout: float = None, single: bool = True, relative: bool = False) \
@ -170,6 +175,18 @@ class WebPage(SessionPage, ChromiumPage, BasePage):
def _on_download_begin(self, **kwargs): ... def _on_download_begin(self, **kwargs): ...
class WebPageSetter(ChromiumPageSetter):
_page: WebPage = ...
_session_setter: SessionPageSetter = ...
_chromium_setter: ChromiumPageSetter = ...
def user_agent(self, ua: str, platform: str = None) -> None: ...
def headers(self, headers: dict) -> None: ...
def cookies(self, cookies, set_session: bool = False, set_driver: bool = False) -> None: ...
class WebPageDownloadSetter(ChromiumDownloadSetter): class WebPageDownloadSetter(ChromiumDownloadSetter):
def __init__(self, page: WebPage): def __init__(self, page: WebPage):
self._page: WebPage = ... self._page: WebPage = ...