优化页面对象启动逻辑;WebPage取消自动切换模式功能;WebPage现在创建时会同时连接浏览器和Session;截图移到ChromiumBase。未完成

This commit is contained in:
g1879 2023-02-12 19:39:27 +08:00
parent 7636ab98f3
commit 82ac13fe16
19 changed files with 274 additions and 290 deletions

View File

@ -9,6 +9,7 @@ from time import perf_counter, sleep
from requests import Session
from .functions.tools import get_usable_path
from .base import BasePage
from .chromium_driver import ChromiumDriver
from .chromium_element import ChromiumElementWaiter, ChromiumScroll, ChromiumElement, run_js, make_chromium_ele
@ -30,27 +31,36 @@ class ChromiumBase(BasePage):
self._root_id = None
self._debug = False
self._debug_recorder = None
self._tab_obj = None
self._timeouts = None
self._page_load_strategy = None
self._connect_browser(address, tab_id)
self._set_start_options(address, None)
self._set_runtime_settings()
self._connect_browser(tab_id)
timeout = timeout if timeout is not None else self.timeouts.implicit
super().__init__(timeout)
def _connect_browser(self, addr_driver_opts=None, tab_id=None):
def _set_start_options(self, address, none):
"""设置浏览器启动属性
:param address: 'ip:port'
:param none: 用于后代继承
:return: None
"""
self.address = address
def _set_runtime_settings(self):
self._timeouts = Timeout(self)
self._page_load_strategy = 'normal'
def _connect_browser(self, tab_id=None):
"""连接浏览器,在第一次时运行
:param addr_driver_opts: 浏览器地址ChromiumDriver对象或DriverOptions对象
:param tab_id: 要控制的标签页id不指定默认为激活的
:return: None
"""
self._chromium_init()
self.address = addr_driver_opts
if not tab_id:
json = self._control_session.get(f'http://{self.address}/json').json()
tab_id = [i['id'] for i in json if i['type'] == 'page'][0]
self._init_page(tab_id)
self._set_options()
self._driver_init(tab_id)
self._get_document()
self._first_run = False
@ -62,18 +72,12 @@ class ChromiumBase(BasePage):
self._is_reading = False
self._upload_list = None
def _set_options(self):
"""设置与s模式共用的运行参数便于被子类覆盖"""
self._timeouts = Timeout(self)
self._page_load_strategy = 'normal'
def _init_page(self, tab_id=None):
def _driver_init(self, tab_id):
"""新建页面、页面刷新、切换标签页后要进行的cdp参数初始化
:param tab_id: 要跳转到的标签页id
:return: None
"""
self._is_loading = True
if tab_id:
self._tab_obj = ChromiumDriver(tab_id=tab_id, tab_type='page', address=self.address)
self._tab_obj.start()
@ -231,11 +235,8 @@ class ChromiumBase(BasePage):
@property
def driver(self):
"""返回用于控制浏览器的ChromiumDriver对象"""
return self._tab_obj
@property
def _driver(self):
"""返回用于控制浏览器的ChromiumDriver对象"""
if self._tab_obj is None:
raise RuntimeError('浏览器已关闭或链接已断开。')
return self._tab_obj
@property
@ -243,7 +244,7 @@ class ChromiumBase(BasePage):
"""返回用于控制浏览器的ChromiumDriver对象会先等待页面加载完毕"""
while self._is_loading:
sleep(.1)
return self._tab_obj
return self.driver
@property
def is_loading(self):
@ -289,7 +290,7 @@ class ChromiumBase(BasePage):
# w = self.run_js('document.body.scrollWidth;', as_expr=True)
# h = self.run_js('document.body.scrollHeight;', as_expr=True)
# return w, h
r = self.run_cdp('Page.getLayoutMetrics', not_change=False)['contentSize']
r = self.run_cdp('Page.getLayoutMetrics')['contentSize']
return r['width'], r['height']
@property
@ -305,6 +306,7 @@ class ChromiumBase(BasePage):
@property
def scroll(self):
"""返回用于滚动滚动条的对象"""
self._wait_loaded()
if not hasattr(self, '_scroll'):
self._scroll = ChromiumScroll(self)
return self._scroll
@ -347,7 +349,6 @@ class ChromiumBase(BasePage):
:param args: 参数按顺序在js文本中对应argument[0]argument[1]...
:return: 运行的结果
"""
self._to_d_mode()
return run_js(self, script, as_expr, self.timeouts.script, args)
def run_async_js(self, script, as_expr=False, *args):
@ -357,7 +358,6 @@ class ChromiumBase(BasePage):
:param args: 参数按顺序在js文本中对应argument[0]argument[1]...
:return: None
"""
self._to_d_mode()
from threading import Thread
Thread(target=run_js, args=(self, script, as_expr, self.timeouts.script, args)).start()
@ -424,7 +424,7 @@ class ChromiumBase(BasePage):
:param headers: dict格式的headers数据
:return: None
"""
self.run_cdp('Network.setExtraHTTPHeaders', headers=headers, not_change=True)
self.run_cdp('Network.setExtraHTTPHeaders', headers=headers)
def ele(self, loc_or_ele, timeout=None):
"""获取第一个符合条件的元素对象
@ -532,7 +532,8 @@ class ChromiumBase(BasePage):
:return: None
"""
self._is_loading = True
self._driver.Page.reload(ignoreCache=ignore_cache)
self.driver.Page.reload(ignoreCache=ignore_cache)
self.wait_loading()
def forward(self, steps=1):
"""在浏览历史中前进若干步
@ -591,18 +592,18 @@ class ChromiumBase(BasePage):
:param cmd_args: 参数
:return: 执行的结果
"""
if cmd_args.get('not_change', None):
driver = self._tab_obj
cmd_args.pop('not_change')
else:
driver = self._driver
r = self.driver.call_method(cmd, **cmd_args)
if 'error' not in r:
return r
try:
return driver.call_method(cmd, **cmd_args)
except Exception as e:
if 'Could not find node with given id' in str(e):
if 'Cannot find context with specified id' in r['error']:
raise RuntimeError('页面被刷新请操作前尝试等待页面刷新或加载完成可尝试wait.load_complete()方法。')
elif 'Could not find node with given id' in r['error']:
raise RuntimeError('该元素已不在当前页面中。')
raise
elif 'tab closed' in r['error']:
raise RuntimeError('标签页已关闭。')
else:
raise RuntimeError(r)
def set_user_agent(self, ua, platform=None):
"""为当前tab设置user agent只在当前tab有效
@ -650,6 +651,58 @@ class ChromiumBase(BasePage):
js = f'localStorage.removeItem("{item}");' if item is False else f'localStorage.setItem("{item}","{value}");'
return self.run_js(js, as_expr=True)
def get_screenshot(self, path=None, as_bytes=None, full_page=False, left_top=None, right_bottom=None):
"""对页面进行截图可对整个网页、可见网页、指定范围截图。对可视范围外截图需要90以上版本浏览器支持
:param path: 完整路径后缀可选 'jpg','jpeg','png','webp'
:param as_bytes: 是否已字节形式返回图片可选 'jpg','jpeg','png','webp'生效时path参数无效
:param full_page: 是否整页截图为True截取整个网页为False截取可视窗口
:param left_top: 截取范围左上角坐标
:param right_bottom: 截取范围右下角角坐标
:return: 图片完整路径或字节文本
"""
if as_bytes:
if as_bytes is True:
pic_type = 'png'
else:
if as_bytes not in ('jpg', 'jpeg', 'png', 'webp'):
raise ValueError("只能接收'jpg', 'jpeg', 'png', 'webp'四种格式。")
pic_type = 'jpeg' if as_bytes == 'jpg' else as_bytes
else:
if not path:
path = f'{self.title}.jpg'
path = get_usable_path(path)
pic_type = path.suffix.lower()
if pic_type not in ('.jpg', '.jpeg', '.png', '.webp'):
raise TypeError(f'不支持的文件格式:{pic_type}')
pic_type = 'jpeg' if pic_type == '.jpg' else pic_type[1:]
width, height = self.size
if full_page:
vp = {'x': 0, 'y': 0, 'width': width, 'height': height, 'scale': 1}
png = self._wait_driver.Page.captureScreenshot(format=pic_type, captureBeyondViewport=True, clip=vp)['data']
else:
if left_top and right_bottom:
x, y = left_top
w = right_bottom[0] - x
h = right_bottom[1] - y
vp = {'x': x, 'y': y, 'width': w, 'height': h, 'scale': 1}
png = self._wait_driver.Page.captureScreenshot(format=pic_type, captureBeyondViewport=True, clip=vp)[
'data']
else:
png = self._wait_driver.Page.captureScreenshot(format=pic_type)['data']
from base64 import b64decode
png = b64decode(png)
if as_bytes:
return png
path.parent.mkdir(parents=True, exist_ok=True)
with open(path, 'wb') as f:
f.write(png)
return str(path.absolute())
def clear_cache(self, session_storage=True, local_storage=True, cache=True, cookies=True):
"""清除缓存,可选要清除的项
:param session_storage: 是否清除sessionStorage
@ -681,7 +734,7 @@ class ChromiumBase(BasePage):
for t in range(times + 1):
err = None
result = self._driver.Page.navigate(url=to_url)
result = self.driver.Page.navigate(url=to_url)
is_timeout = not self._wait_loaded(timeout)
while self.is_loading:
@ -711,10 +764,6 @@ class ChromiumBase(BasePage):
return True
def _to_d_mode(self):
"""用于使WebPage切换到d模式"""
return self._driver
class Timeout(object):
"""用于保存d模式timeout信息的类"""

View File

@ -3,6 +3,7 @@
@Author : g1879
@Contact : g1879@qq.com
"""
from pathlib import Path
from typing import Union, Tuple, List, Any
from DataRecorder import Recorder
@ -13,7 +14,6 @@ from .base import BasePage
from .chromium_driver import ChromiumDriver
from .chromium_element import ChromiumElement, ChromiumElementWaiter, ChromiumScroll
from .chromium_frame import ChromiumFrame
from .configs.driver_options import DriverOptions
from .session_element import SessionElement
@ -38,13 +38,11 @@ class ChromiumBase(BasePage):
self._debug_recorder: Recorder = ...
self._upload_list: list = ...
def _connect_browser(self,
addr_driver_opts: Union[str, ChromiumDriver, DriverOptions] = None,
tab_id: str = None) -> None: ...
def _connect_browser(self, tab_id: str = None) -> None: ...
def _chromium_init(self): ...
def _init_page(self, tab_id: str = None) -> None: ...
def _driver_init(self, tab_id: str) -> None: ...
def _get_document(self) -> None: ...
@ -64,7 +62,9 @@ class ChromiumBase(BasePage):
def set_upload_files(self, files: Union[str, list, tuple]) -> None: ...
def _set_options(self) -> None: ...
def _set_start_options(self, address, none) -> None: ...
def _set_runtime_settings(self) -> None: ...
def __call__(self, loc_or_str: Union[Tuple[str, str], str, ChromiumElement],
timeout: float = None) -> Union[ChromiumElement, ChromiumFrame, None]: ...
@ -75,8 +75,8 @@ class ChromiumBase(BasePage):
@property
def driver(self) -> ChromiumDriver: ...
@property
def _driver(self) -> ChromiumDriver: ...
# @property
# def _driver(self) -> ChromiumDriver: ...
@property
def _wait_driver(self) -> ChromiumDriver: ...
@ -187,6 +187,12 @@ class ChromiumBase(BasePage):
def set_local_storage(self, item: str, value: Union[str, bool]) -> None: ...
def get_screenshot(self, path: [str, Path] = None,
as_bytes: [bool, str] = None,
full_page: bool = False,
left_top: Tuple[int, int] = None,
right_bottom: Tuple[int, int] = None) -> Union[str, bytes]: ...
def clear_cache(self,
session_storage: bool = True,
local_storage: bool = True,
@ -200,8 +206,6 @@ class ChromiumBase(BasePage):
show_errmsg: bool = False,
timeout: float = None) -> Union[bool, None]: ...
def _to_d_mode(self): ...
class Timeout(object):

View File

@ -165,19 +165,23 @@ class ChromiumDriver(object):
:return: 执行结果
"""
if not self._started:
raise RuntimeError("不能在启动前调用方法。")
self.start()
# raise RuntimeError("不能在启动前调用方法。")
if args:
raise CallMethodException("参数必须是key=value形式。")
if self._stopped.is_set():
return {'tab_closed': True}
return {'error': 'tab closed', 'type': 'tab_closed'}
timeout = kwargs.pop("_timeout", None)
result = self._send({"method": _method, "params": kwargs}, timeout=timeout)
if result is None:
return {'tab_closed': True}
return {'error': 'tab closed', 'type': 'tab_closed'}
if 'result' not in result and 'error' in result:
raise CallMethodException(f"\n调用方法:{_method}\n参数:{kwargs}\n错误:{result['error']['message']}")
return {'error': result['error']['message'],
'type': 'call_method_error',
'method': _method,
'args': kwargs}
return result['result']
@ -201,7 +205,7 @@ class ChromiumDriver(object):
if self._stopped.is_set():
return False
if not self._started:
raise RuntimeError("Driver在运行。")
raise RuntimeError("Driver在运行。")
self.status = self._STOPPED_
self._stopped.set()

View File

@ -68,14 +68,14 @@ class ChromiumElement(DrissionElement):
def tag(self):
"""返回元素tag"""
if self._tag is None:
self._tag = self.page.run_cdp('DOM.describeNode', nodeId=self._node_id, not_change=True)['node'][
self._tag = self.page.run_cdp('DOM.describeNode', nodeId=self._node_id)['node'][
'localName'].lower()
return self._tag
@property
def html(self):
"""返回元素outerHTML文本"""
return self.page.run_cdp('DOM.getOuterHTML', nodeId=self._node_id, not_change=True)['outerHTML']
return self.page.run_cdp('DOM.getOuterHTML', nodeId=self._node_id)['outerHTML']
@property
def inner_html(self):
@ -85,7 +85,7 @@ class ChromiumElement(DrissionElement):
@property
def attrs(self):
"""返回元素所有attribute属性"""
attrs = self.page.run_cdp('DOM.getAttributes', nodeId=self._node_id, not_change=True)['attributes']
attrs = self.page.run_cdp('DOM.getAttributes', nodeId=self._node_id)['attributes']
return {attrs[i]: attrs[i + 1] for i in range(0, len(attrs), 2)}
@property
@ -123,7 +123,7 @@ class ChromiumElement(DrissionElement):
def size(self):
"""返回元素宽和高"""
try:
model = self.page.run_cdp('DOM.getBoxModel', nodeId=self._node_id, not_change=True)['model']
model = self.page.run_cdp('DOM.getBoxModel', nodeId=self._node_id)['model']
return model['height'], model['width']
except Exception:
return 0, 0
@ -167,7 +167,7 @@ class ChromiumElement(DrissionElement):
@property
def shadow_root(self):
"""返回当前元素的shadow_root元素对象"""
info = self.page.run_cdp('DOM.describeNode', nodeId=self.node_id, not_change=True)['node']
info = self.page.run_cdp('DOM.describeNode', nodeId=self.node_id)['node']
if not info.get('shadowRoots', None):
return None
@ -372,7 +372,7 @@ class ChromiumElement(DrissionElement):
:param prop: 属性名
:return: 属性值文本
"""
p = self.page.run_cdp('Runtime.getProperties', objectId=self._obj_id, not_change=True)['result']
p = self.page.run_cdp('Runtime.getProperties', objectId=self._obj_id)['result']
for i in p:
if i['name'] == prop:
if 'value' not in i or 'value' not in i['value']:
@ -403,7 +403,7 @@ class ChromiumElement(DrissionElement):
:param args: 参数按顺序在js文本中对应argument[0]argument[1]...
:return: 运行的结果
"""
return run_js(self, script, as_expr, self.page.timeouts.script, args, True)
return run_js(self, script, as_expr, self.page.timeouts.script, args)
def run_async_js(self, script, as_expr=False, *args):
"""以异步方式执行js代码
@ -483,7 +483,7 @@ class ChromiumElement(DrissionElement):
while not self.run_js(js) and perf_counter() < end_time:
sleep(.1)
node = self.page.run_cdp('DOM.describeNode', nodeId=self._node_id, not_change=True)['node']
node = self.page.run_cdp('DOM.describeNode', nodeId=self._node_id)['node']
frame = node.get('frameId', None)
frame = frame or self.page.tab_id
try:
@ -534,6 +534,8 @@ class ChromiumElement(DrissionElement):
height, width = self.size
left_top = (left, top)
right_bottom = (left + width, top + height)
if not path:
path = f'{self.tag}.jpg'
return self.page.get_screenshot(path, as_bytes=as_bytes, full_page=False,
left_top=left_top, right_bottom=right_bottom)
@ -547,7 +549,7 @@ class ChromiumElement(DrissionElement):
return self._set_file_input(vals)
try:
self.page.run_cdp('DOM.focus', nodeId=self._node_id, not_change=True)
self.page.run_cdp('DOM.focus', nodeId=self._node_id)
except Exception:
self.click(by_js=True)
@ -578,7 +580,7 @@ class ChromiumElement(DrissionElement):
if isinstance(files, str):
files = files.split('\n')
files = [str(Path(i).absolute()) for i in files]
self.page.run_cdp('DOM.setFileInputFiles', files=files, nodeId=self._node_id, not_change=True)
self.page.run_cdp('DOM.setFileInputFiles', files=files, nodeId=self._node_id)
def clear(self, by_js=False):
"""清空元素文本
@ -754,9 +756,9 @@ class ChromiumElement(DrissionElement):
:return: js中的object id
"""
if node_id:
return self.page.run_cdp('DOM.resolveNode', nodeId=node_id, not_change=True)['object']['objectId']
return self.page.run_cdp('DOM.resolveNode', nodeId=node_id)['object']['objectId']
else:
return self.page.run_cdp('DOM.resolveNode', backendNodeId=backend_id, not_change=True)['object']['objectId']
return self.page.run_cdp('DOM.resolveNode', backendNodeId=backend_id)['object']['objectId']
def _get_node_id(self, obj_id=None, backend_id=None):
"""根据传入object id获取cdp中的node id
@ -765,16 +767,16 @@ class ChromiumElement(DrissionElement):
:return: cdp中的node id
"""
if obj_id:
return self.page.run_cdp('DOM.requestNode', objectId=obj_id, not_change=True)['nodeId']
return self.page.run_cdp('DOM.requestNode', objectId=obj_id)['nodeId']
else:
return self.page.run_cdp('DOM.describeNode', backendNodeId=backend_id, not_change=True)['node']['nodeId']
return self.page.run_cdp('DOM.describeNode', backendNodeId=backend_id)['node']['nodeId']
def _get_backend_id(self, node_id):
"""根据传入node id获取backend id
:param node_id:
:return: backend id
"""
return self.page.run_cdp('DOM.describeNode', nodeId=node_id, not_change=True)['node']['backendNodeId']
return self.page.run_cdp('DOM.describeNode', nodeId=node_id)['node']['backendNodeId']
def _get_ele_path(self, mode):
"""返获取css路径或xpath路径"""
@ -822,7 +824,7 @@ class ChromiumElement(DrissionElement):
:return: 四个角坐标大小为0时返回None
"""
try:
return self.page.run_cdp('DOM.getBoxModel', nodeId=self.node_id, not_change=True)['model'][quad]
return self.page.run_cdp('DOM.getBoxModel', nodeId=self.node_id)['model'][quad]
except Exception:
return None
@ -875,7 +877,7 @@ class ChromiumShadowRootElement(BaseElement):
def is_alive(self):
"""返回元素是否仍在DOM中"""
try:
self.page.run_cdp('DOM.describeNode', nodeId=self._node_id, not_change=True)
self.page.run_cdp('DOM.describeNode', nodeId=self._node_id)
return True
except Exception:
return False
@ -1066,29 +1068,29 @@ class ChromiumShadowRootElement(BaseElement):
css_paths = [i.css_path[47:] for i in eles]
if single:
node_id = self.page.run_cdp('DOM.querySelector',
nodeId=self._node_id, selector=css_paths[0], not_change=True)['nodeId']
nodeId=self._node_id, selector=css_paths[0])['nodeId']
return make_chromium_ele(self.page, node_id=node_id) if node_id else None
else:
results = []
for i in css_paths:
node_id = self.page.run_cdp('DOM.querySelector',
nodeId=self._node_id, selector=i, not_change=True)['nodeId']
nodeId=self._node_id, selector=i)['nodeId']
if node_id:
results.append(make_chromium_ele(self.page, node_id=node_id))
return results
def _get_node_id(self, obj_id):
"""返回元素node id"""
return self.page.run_cdp('DOM.requestNode', objectId=obj_id, not_change=True)['nodeId']
return self.page.run_cdp('DOM.requestNode', objectId=obj_id)['nodeId']
def _get_obj_id(self, back_id):
"""返回元素object id"""
return self.page.run_cdp('DOM.resolveNode', backendNodeId=back_id, not_change=True)['object']['objectId']
return self.page.run_cdp('DOM.resolveNode', backendNodeId=back_id)['object']['objectId']
def _get_backend_id(self, node_id):
"""返回元素object id"""
return self.page.run_cdp('DOM.describeNode', nodeId=node_id, not_change=True)['node']['backendNodeId']
return self.page.run_cdp('DOM.describeNode', nodeId=node_id)['node']['backendNodeId']
def find_in_chromium_ele(ele, loc, single=True, timeout=None, relative=True):
@ -1260,14 +1262,13 @@ else{a.push(e.snapshotItem(i));}}"""
return js
def run_js(page_or_ele, script, as_expr=False, timeout=None, args=None, not_change=False):
def run_js(page_or_ele, script, as_expr=False, timeout=None, args=None):
"""运行javascript代码
:param page_or_ele: 页面对象或元素对象
:param script: js文本
:param as_expr: 是否作为表达式运行为True时args无效
:param timeout: 超时时间
:param args: 参数按顺序在js文本中对应argument[0]argument[1]...
:param not_change: 执行时是否切换页面对象模式
:return: js执行结果
"""
if isinstance(page_or_ele, (ChromiumElement, ChromiumShadowRootElement)):
@ -1283,8 +1284,7 @@ def run_js(page_or_ele, script, as_expr=False, timeout=None, args=None, not_chan
returnByValue=False,
awaitPromise=True,
userGesture=True,
timeout=timeout * 1000,
not_change=not_change)
timeout=timeout * 1000)
else:
args = args or ()
@ -1296,8 +1296,7 @@ def run_js(page_or_ele, script, as_expr=False, timeout=None, args=None, not_chan
arguments=[_convert_argument(arg) for arg in args],
returnByValue=False,
awaitPromise=True,
userGesture=True,
not_change=not_change)
userGesture=True)
exceptionDetails = res.get('exceptionDetails')
if exceptionDetails:

View File

@ -363,7 +363,7 @@ def _make_js_for_find_ele_by_xpath(xpath: str, type_txt: str, node_txt: str) ->
def run_js(page_or_ele: Union[ChromiumBase, ChromiumElement, ChromiumShadowRootElement], script: str,
as_expr: bool = False, timeout: float = None, args: tuple = ..., not_change: bool = ...) -> Any: ...
as_expr: bool = False, timeout: float = None, args: tuple = ...) -> Any: ...
def _parse_js_result(page: ChromiumBase, ele: ChromiumElement, result: dict): ...

View File

@ -14,7 +14,7 @@ class ChromiumFrame(ChromiumBase):
def __init__(self, page, ele):
self.page = page
self.address = page.address
node = page.run_cdp('DOM.describeNode', nodeId=ele.node_id, not_change=True)['node']
node = page.run_cdp('DOM.describeNode', nodeId=ele.node_id)['node']
self.frame_id = node['frameId']
self._backend_id = ele.backend_id
self._frame_ele = ele
@ -43,23 +43,23 @@ class ChromiumFrame(ChromiumBase):
attrs = [f"{attr}='{attrs[attr]}'" for attr in attrs]
return f'<ChromiumFrame {self.frame_ele.tag} {" ".join(attrs)}>'
def _set_options(self):
def _runtime_settings(self):
"""重写设置浏览器运行参数方法"""
self._timeouts = self.page.timeouts
self._page_load_strategy = self.page.page_load_strategy
def _init_page(self, tab_id=None):
def _driver_init(self, tab_id):
"""避免出现服务器500错误
:param tab_id: 要跳转到的标签页id
:return: None
"""
self._control_session.get(f'http://{self.address}/json')
super()._init_page(tab_id)
super()._driver_init(tab_id)
def _reload(self):
"""重新获取document"""
self._frame_ele = ChromiumElement(self.page, backend_id=self._backend_id)
node = self.page.run_cdp('DOM.describeNode', nodeId=self._frame_ele.node_id, not_change=True)['node']
node = self.page.run_cdp('DOM.describeNode', nodeId=self._frame_ele.node_id)['node']
if self._is_inner_frame():
self._is_diff_domain = False
@ -95,7 +95,7 @@ class ChromiumFrame(ChromiumBase):
try:
if self._is_diff_domain is False:
node = self.page.run_cdp('DOM.describeNode',
backendNodeId=self.backend_id, not_change=True)['node']
backendNodeId=self.backend_id)['node']
self.doc_ele = ChromiumElement(self.page, backend_id=node['contentDocument']['backendNodeId'])
else:
@ -171,7 +171,7 @@ class ChromiumFrame(ChromiumBase):
self._check_ok()
tag = self.tag
out_html = self.page.run_cdp('DOM.getOuterHTML',
nodeId=self.frame_ele.node_id, not_change=True)['outerHTML']
nodeId=self.frame_ele.node_id)['outerHTML']
sign = search(rf'<{tag}.*?>', out_html).group(0)
return f'{sign}{self.inner_html}</{tag}>'
@ -412,7 +412,7 @@ class ChromiumFrame(ChromiumBase):
for t in range(times + 1):
err = None
result = self._driver.Page.navigate(url=to_url, frameId=self.frame_id)
result = self.driver.Page.navigate(url=to_url, frameId=self.frame_id)
is_timeout = not self._wait_loaded(timeout)
while self.is_loading:
@ -444,4 +444,4 @@ class ChromiumFrame(ChromiumBase):
def _is_inner_frame(self):
"""返回当前frame是否同域"""
return self.frame_id in str(self.page.run_cdp('Page.getFrameTree', not_change=True)['frameTree'])
return self.frame_id in str(self.page.run_cdp('Page.getFrameTree')['frameTree'])

View File

@ -28,9 +28,9 @@ class ChromiumFrame(ChromiumBase):
def __repr__(self) -> str: ...
def _set_options(self) -> None: ...
def _runtime_settings(self) -> None: ...
def _init_page(self, tab_id: str = None) -> None: ...
def _driver_init(self, tab_id: str) -> None: ...
def _reload(self) -> None: ...

View File

@ -34,67 +34,71 @@ class ChromiumPage(ChromiumBase):
self._download_path = None
super().__init__(addr_driver_opts, tab_id, timeout)
def _connect_browser(self, addr_driver_opts=None, tab_id=None):
"""连接浏览器,在第一次时运行
:param addr_driver_opts: 浏览器地址ChromiumDriver对象或DriverOptions对象
:param tab_id: 要控制的标签页id不指定默认为激活的
def _set_start_options(self, addr_driver_opts, none):
"""设置浏览器启动属性
:param addr_driver_opts: 'ip:port'ChromiumDriverChromiumOptions
:param none: 用于后代继承
:return: None
"""
# 接管或启动浏览器
self._chromium_init()
if addr_driver_opts is None or isinstance(addr_driver_opts, (ChromiumOptions, DriverOptions)):
self._driver_options = addr_driver_opts or ChromiumOptions() # 从ini文件读取
self.address = self._driver_options.debugger_address
self.process = connect_browser(self._driver_options)[1]
json = self._control_session.get(f'http://{self.address}/json').json()
tab_id = [i['id'] for i in json if i['type'] == 'page'][0]
if not addr_driver_opts or isinstance(addr_driver_opts, (ChromiumOptions, DriverOptions)):
self._driver_options = addr_driver_opts or ChromiumOptions(addr_driver_opts)
# 接收浏览器地址和端口
elif isinstance(addr_driver_opts, str):
self.address = addr_driver_opts
self._driver_options = ChromiumOptions()
self._driver_options.debugger_address = addr_driver_opts
self.process = connect_browser(self._driver_options)[1]
if not tab_id:
json = self._control_session.get(f'http://{self.address}/json').json()
tab_id = [i['id'] for i in json if i['type'] == 'page'][0]
# 接收传递过来的ChromiumDriver浏览器
elif isinstance(addr_driver_opts, ChromiumDriver):
self._tab_obj = addr_driver_opts
self.address = addr_driver_opts.address
self.process = None
self._driver_options = ChromiumOptions(read_file=False)
self._driver_options.debugger_address = addr_driver_opts.address
self._tab_obj = addr_driver_opts
else:
raise TypeError('只能接收ChromiumDriver或ChromiumOptions类型参数。')
self._set_options()
self._set_chromium_options()
self._init_page(tab_id)
self._get_document()
self._first_run = False
self.address = self._driver_options.debugger_address
def _set_options(self):
"""设置WebPage中与s模式共用的配置便于WebPage覆盖掉"""
def _set_runtime_settings(self):
"""设置运行时用到的属性"""
self._timeouts = Timeout(self,
page_load=self._driver_options.timeouts['pageLoad'],
script=self._driver_options.timeouts['script'],
implicit=self._driver_options.timeouts['implicit'])
self._page_load_strategy = self._driver_options.page_load_strategy
self._download_path = self._driver_options.download_path
def _set_chromium_options(self):
"""设置浏览器专有的配置"""
def _connect_browser(self, tab_id=None):
"""连接浏览器,在第一次时运行
:param tab_id: 要控制的标签页id不指定默认为激活的
:return: None
"""
self._chromium_init() # todo: 传递驱动器时是否须要
if self._tab_obj:
self.process = None
else:
self.process = connect_browser(self._driver_options)[1]
if not tab_id:
json = self._control_session.get(f'http://{self.address}/json').json()
tab_id = [i['id'] for i in json if i['type'] == 'page'][0]
self._driver_init(tab_id)
self._get_document()
self._first_run = False
def _chromium_init(self):
"""添加ChromiumPage独有的运行配置"""
super()._chromium_init()
self._alert = Alert()
self._window_setter = None
def _init_page(self, tab_id=None):
def _driver_init(self, tab_id):
"""新建页面、页面刷新、切换标签页后要进行的cdp参数初始化
:param tab_id: 要跳转到的标签页id
:return: None
"""
super()._init_page(tab_id)
super()._driver_init(tab_id)
ws = self._control_session.get(f'http://{self.address}/json/version').json()['webSocketDebuggerUrl']
self._browser_driver = ChromiumDriver(ws.split('/')[-1], 'browser', self.address)
self._browser_driver.start()
@ -135,7 +139,7 @@ class ChromiumPage(ChromiumBase):
def process_id(self):
"""返回浏览器进程id"""
try:
return self._driver.SystemInfo.getProcessInfo()['id']
return self.driver.SystemInfo.getProcessInfo()['id']
except Exception:
return None
@ -172,58 +176,6 @@ class ChromiumPage(ChromiumBase):
tab_id = tab_id or self.tab_id
return ChromiumTab(self, tab_id)
def get_screenshot(self, path=None, as_bytes=None, full_page=False, left_top=None, right_bottom=None):
"""对页面进行截图可对整个网页、可见网页、指定范围截图。对可视范围外截图需要90以上版本浏览器支持
:param path: 完整路径后缀可选 'jpg','jpeg','png','webp'
:param as_bytes: 是否已字节形式返回图片可选 'jpg','jpeg','png','webp'生效时path参数无效
:param full_page: 是否整页截图为True截取整个网页为False截取可视窗口
:param left_top: 截取范围左上角坐标
:param right_bottom: 截取范围右下角角坐标
:return: 图片完整路径或字节文本
"""
if as_bytes:
if as_bytes is True:
pic_type = 'png'
else:
if as_bytes not in ('jpg', 'jpeg', 'png', 'webp'):
raise ValueError("只能接收'jpg', 'jpeg', 'png', 'webp'四种格式。")
pic_type = 'jpeg' if as_bytes == 'jpg' else as_bytes
else:
if not path:
raise ValueError('保存为文件时必须传入路径。')
path = Path(path)
pic_type = path.suffix.lower()
if pic_type not in ('.jpg', '.jpeg', '.png', '.webp'):
raise TypeError(f'不支持的文件格式:{pic_type}')
pic_type = 'jpeg' if pic_type == '.jpg' else pic_type[1:]
width, height = self.size
if full_page:
vp = {'x': 0, 'y': 0, 'width': width, 'height': height, 'scale': 1}
png = self._wait_driver.Page.captureScreenshot(format=pic_type, captureBeyondViewport=True, clip=vp)['data']
else:
if left_top and right_bottom:
x, y = left_top
w = right_bottom[0] - x
h = right_bottom[1] - y
vp = {'x': x, 'y': y, 'width': w, 'height': h, 'scale': 1}
png = self._wait_driver.Page.captureScreenshot(format=pic_type, captureBeyondViewport=True, clip=vp)[
'data']
else:
png = self._wait_driver.Page.captureScreenshot(format=pic_type)['data']
from base64 import b64decode
png = b64decode(png)
if as_bytes:
return png
path.parent.mkdir(parents=True, exist_ok=True)
with open(path, 'wb') as f:
f.write(png)
return str(path.absolute())
def to_front(self):
"""激活当前标签页使其处于最前面"""
self._control_session.get(f'http://{self.address}/json/activate/{self.tab_id}')
@ -293,8 +245,8 @@ class ChromiumPage(ChromiumBase):
if tab_id == self.tab_id:
return
self._driver.stop()
self._init_page(tab_id)
self.driver.stop()
self._driver_init(tab_id)
if read_doc and self.ready_state == 'complete':
self._get_document()
@ -328,7 +280,7 @@ class ChromiumPage(ChromiumBase):
return
if self.tab_id in tabs:
self._driver.stop()
self.driver.stop()
for tab in tabs:
self._control_session.get(f'http://{self.address}/json/close/{tab}')
@ -364,9 +316,9 @@ class ChromiumPage(ChromiumBase):
res_text = self._alert.text
if self._alert.type == 'prompt':
self._driver.Page.handleJavaScriptDialog(accept=accept, promptText=send)
self.driver.Page.handleJavaScriptDialog(accept=accept, promptText=send)
else:
self._driver.Page.handleJavaScriptDialog(accept=accept)
self.driver.Page.handleJavaScriptDialog(accept=accept)
return res_text
def hide_browser(self):
@ -445,7 +397,7 @@ class ChromiumDownloadSetter(DownloadSetter):
eventsEnabled=True)
except CallMethodException:
warn('\n您的浏览器版本太低,用新标签页下载文件可能崩溃,建议升级。')
self._page.run_cdp('Page.setDownloadBehavior', behavior='allow', downloadPath=path, not_change=True)
self._page.run_cdp('Page.setDownloadBehavior', behavior='allow', downloadPath=path)
self.DownloadKit.goal_path = path

View File

@ -38,11 +38,9 @@ class ChromiumPage(ChromiumBase):
addr_driver_opts: Union[str, ChromiumDriver, DriverOptions] = None,
tab_id: str = None) -> None: ...
def _set_options(self) -> None: ...
def _set_start_options(self, addr_driver_opts: Union[str, ChromiumDriver, DriverOptions], none) -> None: ...
def _set_chromium_options(self) -> None: ...
def _init_page(self, tab_id: str = None) -> None: ...
def _driver_init(self, tab_id: str) -> None: ...
@property
def browser_driver(self) -> ChromiumDriver: ...
@ -76,12 +74,6 @@ class ChromiumPage(ChromiumBase):
def get_tab(self, tab_id: str = None) -> ChromiumTab: ...
def get_screenshot(self, path: [str, Path] = None,
as_bytes: [bool, str] = None,
full_page: bool = False,
left_top: Tuple[int, int] = None,
right_bottom: Tuple[int, int] = None) -> Union[str, bytes]: ...
def to_front(self) -> None: ...
def new_tab(self, url: str = None, switch_to: bool = True) -> None: ...

View File

@ -17,7 +17,7 @@ class ChromiumTab(ChromiumBase):
self.page = page
super().__init__(page.address, tab_id, page.timeout)
def _set_options(self):
def _set_runtime_settings(self):
"""重写设置浏览器运行参数方法"""
self._timeouts = self.page.timeouts
self._page_load_strategy = self.page.page_load_strategy

View File

@ -12,4 +12,4 @@ class ChromiumTab(ChromiumBase):
def __init__(self, page: ChromiumPage, tab_id: str = None):
self.page: ChromiumPage = ...
def _set_options(self) -> None: ...
def _set_runtime_settings(self) -> None: ...

View File

@ -20,7 +20,7 @@ class ChromiumOptions(object):
self._user = 'Default'
self._prefs_to_del = []
if read_file:
if read_file is not False:
self.ini_path = str(ini_path) if ini_path else str(Path(__file__).parent / 'configs.ini')
om = OptionsManager(self.ini_path)
options = om.chrome_options

View File

@ -8,7 +8,7 @@ from typing import Union, Tuple, Any
class ChromiumOptions(object):
def __init__(self, read_file: bool = True, ini_path: Union[str, Path] = None):
def __init__(self, read_file: [bool, None] = True, ini_path: Union[str, Path] = None):
self.ini_path: str = ...
self._driver_path: str = ...
self._user_data_path: str = ...

View File

@ -35,7 +35,7 @@ class SessionOptions(object):
self._del_set = set() # 记录要从ini文件删除的参数
if read_file:
if read_file is not False:
self.ini_path = str(ini_path) if ini_path else str(Path(__file__).parent / 'configs.ini')
om = OptionsManager(self.ini_path)
options_dict = om.session_options

View File

@ -12,7 +12,7 @@ from requests.cookies import RequestsCookieJar
class SessionOptions(object):
def __init__(self, read_file: bool = True, ini_path: Union[str, Path] = None):
def __init__(self, read_file: [bool, None] = True, ini_path: Union[str, Path] = None):
self.ini_path: str = ...
self._download_path: str = ...
self._headers: dict = ...

View File

@ -28,25 +28,36 @@ class SessionPage(BasePage):
"""
self._response = None
self._download_set = None
self._create_session(session_or_options)
self._session = None
self._set_start_options(session_or_options, None)
self._set_runtime_settings()
self._create_session()
timeout = timeout if timeout is not None else self.timeout
super().__init__(timeout)
def _create_session(self, Session_or_Options):
"""创建内建Session对象
:param Session_or_Options: Session对象或SessionOptions对象
def _set_start_options(self, session_or_options, none):
"""启动配置
:param session_or_options: SessionSessionOptions
:param none: 用于后代继承
:return: None
"""
if Session_or_Options is None or isinstance(Session_or_Options, SessionOptions):
self._session_options = Session_or_Options or SessionOptions()
if not session_or_options or isinstance(session_or_options, SessionOptions):
self._session_options = session_or_options or SessionOptions(session_or_options)
elif isinstance(session_or_options, Session):
self._session_options = SessionOptions()
self._session = session_or_options
def _set_runtime_settings(self):
"""设置运行时用到的属性"""
self._timeout = self._session_options.timeout
self._download_path = self._session_options.download_path
def _create_session(self):
"""创建内建Session对象"""
if not self._session:
self._set_session(self._session_options)
elif isinstance(Session_or_Options, Session):
self._session = Session_or_Options
self._session_options = SessionOptions(read_file=False)
self._set_options()
def _set_session(self, opt):
"""根据传入字典对session进行设置
:param opt: session配置字典
@ -69,11 +80,6 @@ class SessionPage(BasePage):
if attr:
self._session.__setattr__(i, attr)
def _set_options(self):
"""设置WebPage中与d模式共用的配置便于WebPage覆盖掉"""
self._timeout = self._session_options.timeout
self._download_path = self._session_options.download_path
def set_cookies(self, cookies):
"""为Session对象设置cookies
:param cookies: cookies信息
@ -104,6 +110,12 @@ class SessionPage(BasePage):
return self.ele(loc_or_str)
# -----------------共有属性和方法-------------------
@property
def title(self):
"""返回网页title"""
ele = self.ele('xpath://title')
return ele.text if ele else None
@property
def url(self):
"""返回当前访问url"""

View File

@ -33,11 +33,13 @@ class SessionPage(BasePage):
self.retry_times: int = ...
self.retry_interval: float = ...
def _create_session(self, Session_or_Options: Union[Session, SessionOptions]) -> None: ...
def _set_start_options(self, session_or_options, none) -> None: ...
def _create_session(self) -> None: ...
def _set_session(self, opt: SessionOptions) -> None: ...
def _set_options(self) -> None: ...
def _set_runtime_settings(self) -> None: ...
def set_cookies(self, cookies: Union[RequestsCookieJar, list, tuple, str, dict]) -> None: ...
@ -50,6 +52,8 @@ class SessionPage(BasePage):
timeout: float = None) -> Union[SessionElement, str, None]: ...
# -----------------共有属性和方法-------------------
@property
def title(self) -> str: ...
@property
def url(self) -> str: ...

View File

@ -4,7 +4,6 @@
@Contact : g1879@qq.com
"""
from pathlib import Path
from time import sleep
from warnings import warn
from requests import Session
@ -34,7 +33,8 @@ class WebPage(SessionPage, ChromiumPage, BasePage):
self._mode = mode.lower()
if self._mode not in ('s', 'd'):
raise ValueError('mode参数只能是s或d。')
self._has_driver, self._has_session = (None, True) if self._mode == 's' else (True, None)
self._has_driver = True
self._has_session = True
self._debug = False
self._debug_recorder = None
@ -47,15 +47,15 @@ class WebPage(SessionPage, ChromiumPage, BasePage):
self._response = None
self._download_set = None
self._set_both_options(driver_or_options, session_or_options)
if self._mode == 'd':
self._to_d_mode()
self._set_start_options(driver_or_options, session_or_options)
self._set_runtime_settings()
self._connect_browser()
self._create_session()
t = timeout if isinstance(timeout, (int, float)) else self.timeouts.implicit
super(ChromiumBase, self).__init__(t) # 调用Base的__init__()
def _set_both_options(self, dr_opt, se_opt):
def _set_start_options(self, dr_opt, se_opt):
"""处理两种模式的设置
:param dr_opt: ChromiumDriver或DriverOptions对象为None则从ini读取为False用默认信息创建
:param se_opt: SessionSessionOptions对象或配置信息为None则从ini读取为False用默认信息创建
@ -63,9 +63,9 @@ class WebPage(SessionPage, ChromiumPage, BasePage):
"""
# 浏览器配置
if isinstance(dr_opt, ChromiumDriver):
self._connect_browser(dr_opt)
self._has_driver = True
# self._driver_options = None
self._tab_obj = dr_opt
self._driver_options = ChromiumOptions()
self._driver_options.debugger_address = dr_opt.address
dr_opt = False
else:
@ -81,11 +81,12 @@ class WebPage(SessionPage, ChromiumPage, BasePage):
else:
raise TypeError('driver_or_options参数只能接收ChromiumDriver, ChromiumOptions、None或False。')
self.address = self._driver_options.debugger_address
# Session配置
if isinstance(se_opt, Session):
self._session = se_opt
self._has_session = True
self._session_options = SessionOptions(read_file=False)
self._session_options = SessionOptions()
se_opt = False
else:
@ -101,10 +102,10 @@ class WebPage(SessionPage, ChromiumPage, BasePage):
else:
raise TypeError('session_or_options参数只能接收Session, SessionOptions、None或False。')
# 通用配置
self._timeouts = Timeout(self)
self._page_load_strategy = self._driver_options.page_load_strategy
self._download_path = None
if se_opt is not False:
self.set_timeouts(implicit=self._session_options.timeout)
self._download_path = self._session_options.download_path
@ -114,8 +115,8 @@ class WebPage(SessionPage, ChromiumPage, BasePage):
self.set_timeouts(t['implicit'], t['pageLoad'], t['script'])
self._download_path = self._driver_options.download_path
def _set_options(self):
"""覆盖父类同名方法"""
def _set_runtime_settings(self):
"""设置运行时用到的属性"""
pass
def __call__(self, loc_or_str, timeout=None):
@ -138,6 +139,14 @@ class WebPage(SessionPage, ChromiumPage, BasePage):
elif self._mode == 's':
return self._session_url
@property
def title(self):
"""返回当前页面title"""
if self._mode == 's':
return super().title
elif self._mode == 'd':
return super(SessionPage, self).title
@property
def html(self):
"""返回页面html文本"""
@ -157,7 +166,6 @@ class WebPage(SessionPage, ChromiumPage, BasePage):
@property
def response(self):
"""返回 s 模式获取到的 Response 对象,切换到 s 模式"""
self.change_mode('s')
return self._response
@property
@ -177,33 +185,8 @@ class WebPage(SessionPage, ChromiumPage, BasePage):
"""返回Session对象如未初始化则按配置信息创建"""
if self._session is None:
self._set_session(self._session_options)
return self._session
@property
def driver(self):
"""返回纯粹的ChromiumDriver对象"""
return self._tab_obj
@property
def _wait_driver(self):
"""返回用于控制浏览器的ChromiumDriver对象会先等待页面加载完毕"""
while self._is_loading:
sleep(.1)
return self._driver
@property
def _driver(self):
"""返回纯粹的ChromiumDriver对象调用时切换到d模式并连接浏览器"""
self.change_mode('d')
if self._tab_obj is None:
self._connect_browser(self._driver_options)
return self._tab_obj
@_driver.setter
def _driver(self, tab):
self._tab_obj = tab
@property
def _session_url(self):
"""返回 session 保存的url"""
@ -266,7 +249,6 @@ class WebPage(SessionPage, ChromiumPage, BasePage):
:param kwargs: 连接参数
:return: url是否可用
"""
self.change_mode('s', go=False)
return super().post(url, data, show_errmsg, retry, interval, **kwargs)
def ele(self, loc_or_ele, timeout=None):
@ -447,6 +429,7 @@ class WebPage(SessionPage, ChromiumPage, BasePage):
self.driver.Browser.close()
except Exception:
pass
self._tab_obj = None
self._has_driver = None
def close_session(self):
@ -516,8 +499,7 @@ class WebPageDownloadSetter(ChromiumDownloadSetter):
eventsEnabled=True)
except CallMethodException:
warn('\n您的浏览器版本太低,用新标签页下载文件可能崩溃,建议升级。')
self._page.run_cdp('Page.setDownloadBehavior', behavior=self._behavior, downloadPath=path,
not_change=True)
self._page.run_cdp('Page.setDownloadBehavior', behavior=self._behavior, downloadPath=path)
def by_browser(self):
"""设置使用浏览器下载文件"""

View File

@ -25,8 +25,8 @@ class WebPage(SessionPage, ChromiumPage, BasePage):
def __init__(self,
mode: str = 'd',
timeout: float = None,
driver_or_options: Union[ChromiumDriver, ChromiumOptions, DriverOptions, bool] = None,
session_or_options: Union[Session, SessionOptions, bool] = None) -> None:
driver_or_options: Union[ChromiumDriver, ChromiumOptions, DriverOptions] = None,
session_or_options: Union[Session, SessionOptions] = None) -> None:
self._mode: str = ...
self._has_driver: bool = ...
self._has_session: bool = ...
@ -35,6 +35,7 @@ class WebPage(SessionPage, ChromiumPage, BasePage):
self._driver_options: Union[ChromiumOptions, DriverOptions, None] = ...
self._download_set: WebPageDownloadSetter = ...
self._download_path: str = ...
self._tab_obj: ChromiumDriver = ...
def __call__(self,
loc_or_str: Union[Tuple[str, str], str, ChromiumElement, SessionElement],
@ -44,6 +45,9 @@ class WebPage(SessionPage, ChromiumPage, BasePage):
@property
def url(self) -> Union[str, None]: ...
@property
def title(self) -> str: ...
@property
def html(self) -> str: ...
@ -62,18 +66,6 @@ class WebPage(SessionPage, ChromiumPage, BasePage):
@property
def session(self) -> Session: ...
@property
def driver(self) -> ChromiumDriver: ...
@property
def _wait_driver(self) -> ChromiumDriver: ...
@property
def _driver(self) -> ChromiumDriver: ...
@_driver.setter
def _driver(self, tab: ChromiumDriver): ...
@property
def _session_url(self) -> str: ...
@ -170,14 +162,8 @@ class WebPage(SessionPage, ChromiumPage, BasePage):
-> Union[ChromiumElement, SessionElement, ChromiumFrame, str, None, List[Union[SessionElement, str]], List[
Union[ChromiumElement, str, ChromiumFrame]]]: ...
def _set_both_options(self, dr_opt: Union[ChromiumDriver, DriverOptions],
se_opt: Union[Session, SessionOptions, dict, bool, None]) -> None: ...
def _set_options(self) -> None: ...
def _set_driver_options(self, driver_or_Options: Union[ChromiumDriver, DriverOptions]) -> None: ...
def _set_session_options(self, Session_or_Options: Union[Session, SessionOptions]) -> None: ...
def _set_start_options(self, dr_opt: Union[ChromiumDriver, DriverOptions, bool, None],
se_opt: Union[Session, SessionOptions, bool, None]) -> None: ...
def quit(self) -> None: ...