继续完善,即将完成

This commit is contained in:
g1879 2022-11-10 00:11:45 +08:00
parent dbbb989937
commit 0b3fca70e4
5 changed files with 176 additions and 27 deletions

View File

@ -1,6 +0,0 @@
# -*- coding:utf-8 -*-
class ChromeDriver(object):
def __init__(self,
address: str = 'localhost:9222',
path: str = 'chrome'):
self.address = address[7:] if address.startswith('http://') else address

View File

@ -4,6 +4,8 @@
@Contact : g1879@qq.com @Contact : g1879@qq.com
@File : chrome_element.py @File : chrome_element.py
""" """
from os import sep
from os.path import basename
from pathlib import Path from pathlib import Path
from re import search from re import search
from typing import Union, Tuple, List, Any from typing import Union, Tuple, List, Any
@ -320,14 +322,14 @@ class ChromeElement(DrissionElement):
# 获取href属性时返回绝对url # 获取href属性时返回绝对url
attrs = self.attrs attrs = self.attrs
if attr == 'href': if attr == 'href':
link = attrs['href'] link = attrs.get('href', None)
if not link or link.lower().startswith(('javascript:', 'mailto:')): if not link or link.lower().startswith(('javascript:', 'mailto:')):
return link return link
else: else:
return make_absolute_link(link, self.page) return make_absolute_link(link, self.page)
elif attr == 'src': elif attr == 'src':
return make_absolute_link(attrs['src'], self.page) return make_absolute_link(attrs.get('src', None), self.page)
elif attr == 'text': elif attr == 'text':
return self.text return self.text
@ -451,6 +453,37 @@ class ChromeElement(DrissionElement):
js = f'return window.getComputedStyle(this{pseudo_ele}).getPropertyValue("{style}");' js = f'return window.getComputedStyle(this{pseudo_ele}).getPropertyValue("{style}");'
return self.run_script(js) return self.run_script(js)
def save(self, path: [str, bool] = None, rename: str = None) -> Union[bytes, str, bool]:
"""保存图片或其它有src属性的元素的资源 \n
:param path: 文件保存路径为None时保存到当前文件夹为False时不保存
:param rename: 文件名称为None时从资源url获取
:return: 资源内容文本
"""
src = self.attr('src')
if not src:
return False
path = path or '.'
node = self.page.driver.DOM.describeNode(nodeId=self._node_id)['node']
frame = node.get('frameId', None)
frame = frame or self.page.current_tab_handle
result = self.page.driver.Page.getResourceContent(frameId=frame, url=src)
if result['base64Encoded']:
from base64 import b64decode
data = b64decode(result['content'])
write_type = 'wb'
else:
data = result['content']
write_type = 'w'
if path:
rename = rename or basename(src)
Path(path).mkdir(parents=True, exist_ok=True)
with open(f'{path}{sep}{rename}', write_type) as f:
f.write(data)
return data
def get_screenshot(self, path: [str, Path] = None, def get_screenshot(self, path: [str, Path] = None,
as_bytes: [bool, str] = None) -> Union[str, bytes]: as_bytes: [bool, str] = None) -> Union[str, bytes]:
"""对当前元素截图 \n """对当前元素截图 \n
@ -509,7 +542,7 @@ class ChromeElement(DrissionElement):
def _set_file_input(self, files: Union[str, list, tuple]) -> None: def _set_file_input(self, files: Union[str, list, tuple]) -> None:
"""设置上传控件值""" """设置上传控件值"""
if isinstance(files): if isinstance(files, str):
files = files.split('\n') files = files.split('\n')
self.page.driver.DOM.setFileInputFiles(files=files, nodeId=self._node_id) self.page.driver.DOM.setFileInputFiles(files=files, nodeId=self._node_id)
@ -821,7 +854,6 @@ class ChromeShadowRootElement(BaseElement):
xpath = f'xpath:./following::{loc}' xpath = f'xpath:./following::{loc}'
return eles1 + self.parent_ele.eles(xpath, timeout=0.1) return eles1 + self.parent_ele.eles(xpath, timeout=0.1)
def ele(self, def ele(self,
loc_or_str: Union[Tuple[str, str], str], loc_or_str: Union[Tuple[str, str], str],
timeout: float = None) -> Union[ChromeElement, str, None]: timeout: float = None) -> Union[ChromeElement, str, None]:
@ -1048,7 +1080,7 @@ def _run_script(page_or_ele, script: str, as_expr: bool = False, timeout: float
:param args: 参数按顺序在js文本中对应argument[0]argument[2]... :param args: 参数按顺序在js文本中对应argument[0]argument[2]...
:return: :return:
""" """
if isinstance(page_or_ele, BaseElement): if isinstance(page_or_ele, (ChromeElement, ChromeShadowRootElement)):
page = page_or_ele.page page = page_or_ele.page
obj_id = page_or_ele.obj_id obj_id = page_or_ele.obj_id
else: else:

View File

@ -1,5 +1,6 @@
# -*- coding:utf-8 -*- # -*- coding:utf-8 -*-
from pathlib import Path from pathlib import Path
from platform import system
from re import search from re import search
from time import perf_counter, sleep from time import perf_counter, sleep
from typing import Union, Tuple, List, Any from typing import Union, Tuple, List, Any
@ -33,13 +34,14 @@ class ChromePage(BasePage):
self._driver = Tab_or_Options self._driver = Tab_or_Options
self.address = search(r'ws://(.*?)/dev', Tab_or_Options._websocket_url).group(1) self.address = search(r'ws://(.*?)/dev', Tab_or_Options._websocket_url).group(1)
self.options = None self.options = None
self.process = None
elif isinstance(Tab_or_Options, DriverOptions): elif isinstance(Tab_or_Options, DriverOptions):
self.options = Tab_or_Options or DriverOptions() # 从ini文件读取 self.options = Tab_or_Options or DriverOptions() # 从ini文件读取
self.set_timeouts(page_load=self.options.timeouts['pageLoad'], self.set_timeouts(page_load=self.options.timeouts['pageLoad'],
script=self.options.timeouts['script']) script=self.options.timeouts['script'])
self._page_load_strategy = self.options.page_load_strategy self._page_load_strategy = self.options.page_load_strategy
connect_chrome(self.options) self.process = connect_chrome(self.options)[1]
self.address = self.options.debugger_address self.address = self.options.debugger_address
tab_handle = self.tab_handles[0] if not tab_handle else tab_handle tab_handle = self.tab_handles[0] if not tab_handle else tab_handle
self._driver = Tab(id=tab_handle, type='page', self._driver = Tab(id=tab_handle, type='page',
@ -489,6 +491,29 @@ class ChromePage(BasePage):
""" """
self.close_tabs(num_or_handles, True) self.close_tabs(num_or_handles, True)
def set_window_size(self, width: int = None, height: int = None) -> None:
"""设置浏览器窗口大小默认最大化任一参数为0最小化 \n
:param width: 浏览器窗口高
:param height: 浏览器窗口宽
:return: None
"""
self.driver.Emulation.setDeviceMetricsOverride(width=500, height=500,
deviceScaleFactor=0, mobile=False,
)
# if width is None and height is None:
# self.driver.maximize_window()
#
# elif width == 0 or height == 0:
# self.driver.minimize_window()
#
# else:
# if width < 0 or height < 0:
# raise ValueError('x 和 y参数必须大于0。')
#
# new_x = width or self.driver.get_window_size()['width']
# new_y = height or self.driver.get_window_size()['height']
# self.driver.set_window_size(new_x, new_y)
def clear_cache(self, def clear_cache(self,
session_storage: bool = True, session_storage: bool = True,
local_storage: bool = True, local_storage: bool = True,
@ -531,6 +556,14 @@ class ChromePage(BasePage):
self.driver.Page.handleJavaScriptDialog(accept=accept) self.driver.Page.handleJavaScriptDialog(accept=accept)
return res_text return res_text
def hide_browser(self) -> None:
"""隐藏浏览器窗口只在Windows系统可用"""
_show_or_hide_browser(self, hide=True)
def show_browser(self) -> None:
"""显示浏览器窗口只在Windows系统可用"""
_show_or_hide_browser(self, hide=False)
def check_page(self) -> Union[bool, None]: def check_page(self) -> Union[bool, None]:
"""检查页面是否符合预期 \n """检查页面是否符合预期 \n
由子类自行实现各页面的判定规则 由子类自行实现各页面的判定规则
@ -553,7 +586,7 @@ class ChromePage(BasePage):
""" """
err = None err = None
is_ok = False is_ok = False
timeout = timeout if timeout is not None else self.timeout timeout = timeout if timeout is not None else self.timeouts.page_load
for _ in range(times + 1): for _ in range(times + 1):
result = self.driver.Page.navigate(url=to_url) result = self.driver.Page.navigate(url=to_url)
@ -595,7 +628,7 @@ class ChromePage(BasePage):
self._alert.text = None self._alert.text = None
self._alert.type = None self._alert.type = None
self._alert.defaultPrompt = None self._alert.defaultPrompt = None
self._alert.response_accept = kwargs.get['result'] self._alert.response_accept = kwargs.get('result')
self._alert.response_text = kwargs['userInput'] self._alert.response_text = kwargs['userInput']
def _on_alert_open(self, **kwargs): def _on_alert_open(self, **kwargs):
@ -645,3 +678,68 @@ def _get_tabs(handles: list, num_or_handles: Union[int, str, list, tuple, set])
raise TypeError('num_or_handle参数只能是int、str、list、set 或 tuple类型。') raise TypeError('num_or_handle参数只能是int、str、list、set 或 tuple类型。')
return set(i if isinstance(i, str) else handles[i] for i in num_or_handles) return set(i if isinstance(i, str) else handles[i] for i in num_or_handles)
def _show_or_hide_browser(page: ChromePage, hide: bool = True) -> None:
if system().lower() != 'windows':
raise OSError('该方法只能在Windows系统使用。')
try:
from win32gui import ShowWindow
from win32con import SW_HIDE, SW_SHOW
except ImportError:
raise ImportError('请先安装pip install pypiwin32')
pid = _get_browser_progress_id(page.process, page.address)
if not pid:
return None
hds = _get_chrome_hwnds_from_pid(pid, page.title)
sw = SW_HIDE if hide else SW_SHOW
for hd in hds:
ShowWindow(hd, sw)
def _get_browser_progress_id(progress, address: str) -> Union[str, None]:
"""获取浏览器进程id"""
if progress:
return progress.pid
address = address.split(':')
if len(address) != 2:
return None
ip, port = address
if ip not in ('127.0.0.1', 'localhost') or not port.isdigit():
return None
from os import popen
txt = ''
progresses = popen(f'netstat -nao | findstr :{port}').read().split('\n')
for progress in progresses:
if 'LISTENING' in progress:
txt = progress
break
if not txt:
return None
return txt.split(' ')[-1]
def _get_chrome_hwnds_from_pid(pid, title) -> list:
"""通过PID查询句柄ID"""
try:
from win32gui import IsWindow, GetWindowText, EnumWindows
from win32process import GetWindowThreadProcessId
except ImportError:
raise ImportError('请先安装win32guipip install pypiwin32')
def callback(hwnd, hds):
if IsWindow(hwnd) and title in GetWindowText(hwnd):
_, found_pid = GetWindowThreadProcessId(hwnd)
if str(found_pid) == str(pid):
hds.append(hwnd)
return True
hwnds = []
EnumWindows(callback, hwnds)
return hwnds

View File

@ -334,6 +334,8 @@ _modifierBit = {'\ue00a': 1,
def _keys_to_typing(value) -> Tuple[int, str]: def _keys_to_typing(value) -> Tuple[int, str]:
"""把要输入的内容连成字符串,去掉其中 ctrl 等键。
返回的modifier表示是否有按下组合键"""
typing: List[str] = [] typing: List[str] = []
modifier = 0 modifier = 0
for val in value: for val in value:

View File

@ -43,16 +43,6 @@ class WebPage(SessionPage, ChromePage, BasePage):
if self._mode == 'd': if self._mode == 'd':
self.driver self.driver
# self._ready()
# if self._mode == 'd':
# try:
# timeouts = self.drission.driver_options.timeouts
# t = timeout if timeout is not None else timeouts['implicit'] / 1000
# self.set_timeouts(t, timeouts['pageLoad'] / 1000, timeouts['script'] / 1000)
#
# except Exception:
# self.timeout = timeout if timeout is not None else 10
def __call__(self, def __call__(self,
loc_or_str: Union[Tuple[str, str], str, ChromeElement, SessionElement], loc_or_str: Union[Tuple[str, str], str, ChromeElement, SessionElement],
@ -221,10 +211,10 @@ class WebPage(SessionPage, ChromePage, BasePage):
# s模式转d模式 # s模式转d模式
if self._mode == 'd': if self._mode == 'd':
# if not self._has_driver: if not self._has_driver:
# self._ready() self.driver
self._has_driver = True
self._url = None if not self._has_driver else super(SessionPage, self).url self._url = None if not self._has_driver else super(SessionPage, self).url
self._has_driver = True
if self._session_url: if self._session_url:
self.cookies_to_driver() self.cookies_to_driver()
@ -301,6 +291,39 @@ class WebPage(SessionPage, ChromePage, BasePage):
self.session.cookies.set(cookie['name'], cookie['value'], **kwargs) self.session.cookies.set(cookie['name'], cookie['value'], **kwargs)
def check_page(self, by_requests: bool = False) -> Union[bool, None]:
"""d模式时检查网页是否符合预期 \n
默认由response状态检查可重载实现针对性检查 \n
:param by_requests: 是否用内置response检查
:return: bool或NoneNone代表不知道结果
"""
if self._session_url and self._session_url == self.url:
return self._response.ok
# 使用requests访问url并判断可用性
if by_requests:
self.cookies_to_session()
r = self._make_response(self.url, retry=0)[0]
return r.ok if r else False
def close_driver(self) -> None:
"""关闭driver及浏览器"""
if self._has_driver:
self.change_mode('s')
try:
self.driver.Browser.close()
except Exception:
pass
self._has_driver = None
def close_session(self) -> None:
"""关闭session"""
if self._has_session:
self.change_mode('d')
self._session = None
self._response = None
self._has_session = None
# ----------------重写SessionPage的函数----------------------- # ----------------重写SessionPage的函数-----------------------
def post(self, def post(self,
url: str, url: str,