元素滚动添加to_center(),状态增加is_whole_in_viewport;页面对象增加actions属性;修复元素截图问题;get_usable_path()增加is_file, parents参数

This commit is contained in:
g1879 2023-09-22 19:42:53 +08:00
parent 3eee7132d5
commit 1015d6c076
13 changed files with 78 additions and 115 deletions

View File

@ -14,7 +14,7 @@ class ActionChains:
def __init__(self, page):
"""
:param page: ChromiumPage对象
:param page: ChromiumBase对象
"""
self.page = page
self._dr = page.driver

View File

@ -14,7 +14,7 @@ from .chromium_page import ChromiumPage
class ActionChains:
def __init__(self, page: ChromiumBase):
self.page: ChromiumPage = ...
self.page: ChromiumBase = ...
self._dr: ChromiumDriver = ...
self.modifier: int = ...
self.curr_x: int = ...

View File

@ -12,18 +12,20 @@ from time import perf_counter, sleep, time
from requests import Session
from .action_chains import ActionChains
from .base import BasePage
from .chromium_driver import ChromiumDriver
from .chromium_element import ChromiumScroll, ChromiumElement, run_js, make_chromium_ele
from .commons.constants import HANDLE_ALERT_METHOD, ERROR, NoneElement
from .commons.locator import get_loc
from .commons.tools import get_usable_path, clean_folder
from .commons.web import location_in_viewport
from .errors import ContextLossError, ElementLossError, AlertExistsError, CDPError, TabClosedError, \
NoRectError, BrowserConnectError, GetDocumentError
from .network_listener import NetworkListener
from .session_element import make_session_ele
from .setter import ChromiumBaseSetter
from .waiter import ChromiumBaseWaiter, DownloadMission
from .waiter import ChromiumBaseWaiter
class ChromiumBase(BasePage):
@ -43,13 +45,10 @@ class ChromiumBase(BasePage):
self._tab_obj = None
self._set = None
self._screencast = None
self._actions = None
self._listener = None
self._wait_download_flag = None
self._download_rename = None
self._download_path = ''
self._when_download_file_exists = 'rename'
self._download_missions = set()
if isinstance(address, int) or (isinstance(address, str) and address.isdigit()):
address = f'127.0.0.1:{address}'
@ -252,7 +251,7 @@ class ChromiumBase(BasePage):
def _onDownloadWillBegin(self, **kwargs):
"""下载即将开始时执行"""
handle_download(self, kwargs)
self._page._dl_mgr.set_mission(self.tab_id, kwargs['guid'])
def __call__(self, loc_or_str, timeout=None):
"""在内部查找元素
@ -398,6 +397,14 @@ class ChromiumBase(BasePage):
self._screencast = Screencast(self)
return self._screencast
@property
def actions(self):
"""返回用于执行动作链的对象"""
if self._actions is None:
self._actions = ActionChains(self)
self.wait.load_complete()
return self._actions
@property
def listener(self):
"""返回用于聆听数据包的对象"""
@ -894,9 +901,17 @@ class ChromiumBase(BasePage):
x, y = left_top
w = right_bottom[0] - x
h = right_bottom[1] - y
v = not (location_in_viewport(self, x, y) and
location_in_viewport(self, right_bottom[0], right_bottom[1]))
if v and (self.run_js('return document.body.scrollHeight > window.innerHeight;') and
not self.run_js('return document.body.scrollWidth > window.innerWidth;')):
x += 10
vp = {'x': x, 'y': y, 'width': w, 'height': h, 'scale': 1}
png = self.run_cdp_loaded('Page.captureScreenshot', format=pic_type,
captureBeyondViewport=True, clip=vp)['data']
captureBeyondViewport=v, clip=vp)['data']
else:
png = self.run_cdp_loaded('Page.captureScreenshot', format=pic_type)['data']
@ -1132,34 +1147,3 @@ class ScreencastMode(object):
def imgs_mode(self):
self._screencast._mode = 'imgs'
def handle_download(tab, kwargs):
"""在下载开始前处理任务
:param tab: 触发任务的tab对象
:param kwargs: 浏览器返回的数据
:return: None
"""
tab._page._dl_mgr._missions[kwargs['guid']] = None
if tab._download_rename:
tmp = kwargs['suggestedFilename'].rsplit('.', 1)
ext_name = tmp[-1] if len(tmp) > 1 else ''
tmp = tab._download_rename.rsplit('.', 1)
ext_rename = tmp[-1] if len(tmp) > 1 else ''
n = tab._download_rename if ext_rename == ext_name else f'{tab._download_rename}.{ext_name}'
tab._download_rename = None
else:
n = kwargs['suggestedFilename']
m = DownloadMission(tab, kwargs['guid'], tab.download_path, n, kwargs['url'])
tab._page._dl_mgr.add_mission(m)
tab._wait_download_flag = m
tab._download_missions.add(m)
if tab._wait_download_flag is False: # 取消该任务
m._set_done('canceled', True)
if tab._when_download_file_exists == 'skip' and (Path(m.path) / m.name).exists():
m._set_done('skipped', True)

View File

@ -9,6 +9,7 @@ from typing import Union, Tuple, List, Any
from DataRecorder import Recorder
from requests import Session
from .action_chains import ActionChains
from .base import BasePage
from .chromium_driver import ChromiumDriver
from .chromium_element import ChromiumElement, ChromiumScroll
@ -44,11 +45,12 @@ class ChromiumBase(BasePage):
self._wait: ChromiumBaseWaiter = ...
self._set: ChromiumBaseSetter = ...
self._screencast: Screencast = ...
self._actions: ActionChains = ...
self._listener: NetworkListener = ...
self._wait_download_flag: bool = ...
self._download_rename: str = ...
self._when_download_file_exists: str = ...
self._download_missions: set = ...
# self._wait_download_flag: bool = ...
# self._download_rename: str = ...
# self._when_download_file_exists: str = ...
# self._download_missions: set = ...
def _connect_browser(self, tab_id: str = None) -> None: ...
@ -144,6 +146,9 @@ class ChromiumBase(BasePage):
@property
def screencast(self) -> Screencast: ...
@property
def actions(self) -> ActionChains: ...
@property
def listener(self) -> NetworkListener: ...
@ -277,6 +282,3 @@ class ScreencastMode(object):
def frugal_imgs_mode(self) -> None: ...
def imgs_mode(self) -> None: ...
def handle_download(tab: ChromiumBase, kwargs: dict) -> None: ...

View File

@ -1435,6 +1435,14 @@ class ChromiumElementStates(object):
x, y = self._ele.locations.click_point
return location_in_viewport(self._ele.page, x, y) if x else False
@property
def is_whole_in_viewport(self):
"""返回元素是否整个都在视口内"""
x1, y1 = self._ele.location
w, h = self._ele.size
x2, y2 = x1 + w, y1 + h
return location_in_viewport(self._ele.page, x1, y1) and location_in_viewport(self._ele.page, x2, y2)
@property
def is_covered(self):
"""返回元素是否被覆盖,与是否在视口中无关"""
@ -1766,6 +1774,10 @@ class ChromiumElementScroll(ChromiumScroll):
"""
self._driver.page.scroll.to_see(self._driver, center=center)
def to_center(self):
"""元素尽量滚动到视口中间"""
self._driver.page.scroll.to_see(self._driver, center=True)
class ChromiumSelect(object):
"""ChromiumSelect 类专门用于处理 d 模式下 select 标签"""
@ -1936,10 +1948,10 @@ class ChromiumSelect(object):
mode = 'false' if cancel else 'true'
timeout = timeout if timeout is not None else self._ele.page.timeout
condition = {condition} if isinstance(condition, (str, int)) else set(condition)
condition = set(condition) if isinstance(condition, (list, tuple)) else {condition}
if para_type in ('text', 'value'):
return self._text_value(condition, para_type, mode, timeout)
return self._text_value([str(i) for i in condition], para_type, mode, timeout)
elif para_type == 'index':
return self._index(condition, mode, timeout)

View File

@ -232,6 +232,9 @@ class ChromiumElementStates(object):
@property
def is_in_viewport(self) -> bool: ...
@property
def is_whole_in_viewport(self) -> bool: ...
@property
def is_covered(self) -> bool: ...
@ -489,6 +492,8 @@ class ChromiumElementScroll(ChromiumScroll):
def to_see(self, center: Union[bool, None] = None) -> None: ...
def to_center(self) -> None: ...
class ChromiumSelect(object):
def __init__(self, ele: ChromiumElement):
@ -538,7 +543,7 @@ class ChromiumSelect(object):
cancel: bool = False,
timeout: float = None) -> bool: ...
def _text_value(self, condition: set, para_type: str, mode: str, timeout: float) -> bool: ...
def _text_value(self, condition: Union[list, set], para_type: str, mode: str, timeout: float) -> bool: ...
def _index(self, condition: set, mode: str, timeout: float) -> bool: ...

View File

@ -4,14 +4,12 @@
@Contact : g1879@qq.com
"""
from copy import copy
from os.path import sep
from re import search
from threading import Thread
from time import sleep, perf_counter
from .chromium_base import ChromiumBase, ChromiumPageScroll
from .chromium_element import ChromiumElement
from .commons.tools import get_usable_path
from .errors import ContextLossError
from .setter import ChromiumFrameSetter
from .waiter import FrameWaiter
@ -24,7 +22,7 @@ class ChromiumFrame(ChromiumBase):
:param ele: frame所在元素
"""
page_type = str(type(page))
if 'ChromiumPage' in page_type or 'WebPage' in page:
if 'ChromiumPage' in page_type or 'WebPage' in page_type:
self._page = self._target_page = self.tab = page
else: # Tab、Frame
self._page = page.page
@ -79,7 +77,7 @@ class ChromiumFrame(ChromiumBase):
self.retry_interval = self._target_page.retry_interval
self._page_load_strategy = self._target_page.page_load_strategy
self._download_path = self._target_page.download_path
self._when_download_file_exists = self._target_page._when_download_file_exists
# self._when_download_file_exists = self._target_page._when_download_file_exists
def _driver_init(self, tab_id):
"""避免出现服务器500错误
@ -543,16 +541,16 @@ class ChromiumFrame(ChromiumBase):
name = f'{self.title}.jpg'
if not name.endswith(('.jpg', '.jpeg', '.png', '.webp')):
name = f'{name}.jpg'
path = get_usable_path(f'{path}{sep}{name}')
pic_type = path.suffix.lower()
pic_type = 'jpeg' if pic_type == '.jpg' else pic_type[1:]
pic_type = name.split('.')[-1]
if pic_type == 'jpg':
pic_type = 'jpeg'
self.frame_ele.scroll.to_see(center=True)
self.scroll.to_see(ele, center=True)
cx, cy = ele.locations.viewport_location
w, h = ele.size
img_data = f'data:image/{pic_type};base64,{self.frame_ele.get_screenshot(as_base64=True)}'
body = self._target_page('t:body')
body = self.tab('t:body')
first_child = body('c::first-child')
if not isinstance(first_child, ChromiumElement):
first_child = first_child.frame_ele
@ -564,12 +562,17 @@ class ChromiumFrame(ChromiumBase):
arguments[0].insertBefore(img, this);
return img;'''
new_ele = first_child.run_js(js, body)
new_ele.scroll.to_see(True)
new_ele.scroll.to_see(center=True)
top = int(self.frame_ele.style('border-top').split('px')[0])
left = int(self.frame_ele.style('border-left').split('px')[0])
r = self._target_page.get_screenshot(path=path, name=name, as_bytes=as_bytes, as_base64=as_base64,
left_top=(cx + left, cy + top), right_bottom=(cx + w + left, cy + h + top))
self._target_page.remove_ele(new_ele)
r = self.tab.run_cdp('Page.getLayoutMetrics')['visualViewport']
sx = r['pageX']
sy = r['pageY']
r = self.tab.get_screenshot(path=path, name=name, as_bytes=as_bytes, as_base64=as_base64,
left_top=(cx + left + sx, cy + top + sy),
right_bottom=(cx + w + left + sx, cy + h + top + sy))
self.tab.remove_ele(new_ele)
return r
def _find_elements(self, loc_or_ele, timeout=None, single=True, relative=False, raise_err=None):

View File

@ -24,7 +24,6 @@ class ChromiumFrame(ChromiumBase):
self.frame_id: str = ...
self._frame_ele: ChromiumElement = ...
self._backend_id: str = ...
self.frame_page: ChromiumBase = ...
self._doc_ele: ChromiumElement = ...
self._is_diff_domain: bool = ...
self.doc_ele: ChromiumElement = ...

View File

@ -10,21 +10,24 @@ from shutil import rmtree
from time import perf_counter, sleep
def get_usable_path(path):
def get_usable_path(path, is_file=True, parents=True):
"""检查文件或文件夹是否有重名,并返回可以使用的路径
:param path: 文件或文件夹路径
:param is_file: 目标是文件还是文件夹
:param parents: 是否创建目标路径
:return: 可用的路径Path对象
"""
path = Path(path)
parent = path.parent
parent.mkdir(parents=True, exist_ok=True)
if parents:
parent.mkdir(parents=True, exist_ok=True)
path = parent / make_valid_name(path.name)
name = path.stem if path.is_file() else path.name
ext = path.suffix if path.is_file() else ''
first_time = True
while path.exists():
while path.exists() and path.is_file() == is_file:
r = search(r'(.*)_(\d+)$', name)
if not r or (r and first_time):
@ -213,42 +216,3 @@ def wait_until(page, condition, timeout=10, poll=0.1, raise_err=True):
raise TimeoutError('等待超时')
else:
return False
# def get_exe_from_port(port):
# """获取端口号第一条进程的可执行文件路径
# :param port: 端口号
# :return: 可执行文件的绝对路径
# """
# from os import popen
#
# pid = get_pid_from_port(port)
# if not pid:
# return
# else:
# file_lst = popen(f'wmic process where processid={pid} get executablepath').read().split('\n')
# return file_lst[2].strip() if len(file_lst) > 2 else None
#
#
# def get_pid_from_port(port):
# """获取端口号第一条进程的pid
# :param port: 端口号
# :return: 进程id
# """
# from platform import system
# if system().lower() != 'windows' or port is None:
# return None
#
# from os import popen
# from time import perf_counter
#
# try: # 避免Anaconda中可能产生的报错
# process = popen(f'netstat -ano |findstr {port}').read().split('\n')[0]
#
# t = perf_counter()
# while not process and perf_counter() - t < 5:
# process = popen(f'netstat -ano |findstr {port}').read().split('\n')[0]
#
# return process.split(' ')[-1] or None
#
# except Exception:
# return None

View File

@ -11,13 +11,7 @@ from types import FunctionType
from chromium_page import ChromiumPage
# def get_exe_from_port(port: Union[str, int]) -> Union[str, None]: ...
# def get_pid_from_port(port: Union[str, int]) -> Union[str, None]: ...
def get_usable_path(path: Union[str, Path]) -> Path: ...
def get_usable_path(path: Union[str, Path], is_file: bool = True, parents: bool = True) -> Path: ...
def make_valid_name(full_name: str) -> str: ...

View File

@ -92,7 +92,7 @@ def location_in_viewport(page, loc_x, loc_y):
:param page: ChromePage对象
:param loc_x: 页面绝对坐标x
:param loc_y: 页面绝对坐标y
:return:
:return: bool
"""
js = f'''function(){{var x = {loc_x}; var y = {loc_y};
const scrollLeft = document.documentElement.scrollLeft;

View File

@ -20,7 +20,7 @@ def get_ele_txt(e: DrissionElement) -> str: ...
def format_html(text: str) -> str: ...
def location_in_viewport(page, loc_x: int, loc_y: int) -> bool: ...
def location_in_viewport(page: ChromiumBase, loc_x: int, loc_y: int) -> bool: ...
def offset_scroll(ele: ChromiumElement, offset_x: int, offset_y: int) -> tuple: ...

View File

@ -198,7 +198,7 @@ def get_chrome_path(ini_path=None,
from platform import system
sys = system().lower()
if sys == 'macos':
if sys in ('macos', 'darwin'):
return '/Applications/Google Chrome.app/Contents/MacOS/Google Chrome'
elif sys == 'linux':