mirror of
https://gitee.com/g1879/DrissionPage.git
synced 2024-12-10 04:00:23 +08:00
4.0.4.22(+)
动作链scroll参数位置变化; tab_ids忽略插件tab; 修复全局代理时无法连接浏览器的问题; 修复截图文件名过长时的问题; page.save()可根据后缀自动判断类型; click.middle()增加get_tab参数,返回Tab对象; 修复带html节点的sr获取不到子元素问题; 优化cookies设置逻辑; Frame对象初始化时不再等待url变化
This commit is contained in:
parent
6de716159f
commit
c14a3af41b
@ -14,4 +14,4 @@ from ._configs.chromium_options import ChromiumOptions
|
||||
from ._configs.session_options import SessionOptions
|
||||
|
||||
__all__ = ['ChromiumPage', 'ChromiumOptions', 'SessionOptions', 'SessionPage', 'WebPage', '__version__']
|
||||
__version__ = '4.0.4.21'
|
||||
__version__ = '4.0.4.22'
|
||||
|
@ -5,7 +5,6 @@
|
||||
@Copyright: (c) 2024 by g1879, Inc. All Rights Reserved.
|
||||
@License : BSD 3-Clause.
|
||||
"""
|
||||
from os import waitpid
|
||||
from pathlib import Path
|
||||
from shutil import rmtree
|
||||
from time import perf_counter, sleep
|
||||
@ -135,7 +134,8 @@ class Browser(object):
|
||||
def tab_ids(self):
|
||||
"""返回所有标签页id组成的列表"""
|
||||
j = self._driver.get(f'http://{self.address}/json').json() # 不要改用cdp,因为顺序不对
|
||||
return [i['id'] for i in j if i['type'] in ('page', 'webview') and not i['url'].startswith('devtools://')]
|
||||
return [i['id'] for i in j if i['type'] in ('page', 'webview')
|
||||
and not i['url'].startswith(('devtools://', 'chrome-extension://'))]
|
||||
|
||||
@property
|
||||
def process_id(self):
|
||||
@ -143,7 +143,7 @@ class Browser(object):
|
||||
return self._process_id
|
||||
|
||||
def find_tabs(self, title=None, url=None, tab_type=None):
|
||||
"""查找符合条件的tab,返回它们组成的列表
|
||||
"""查找符合条件的tab,返回它们组成的列表,title和url是与关系
|
||||
:param title: 要匹配title的文本
|
||||
:param url: 要匹配url的文本
|
||||
:param tab_type: tab类型,可用列表输入多个
|
||||
@ -274,10 +274,6 @@ class Browser(object):
|
||||
|
||||
if ok:
|
||||
break
|
||||
sleep(.05)
|
||||
|
||||
if self.process_id:
|
||||
waitpid(self.process_id, 0)
|
||||
|
||||
def _on_disconnect(self):
|
||||
self.page._on_disconnect()
|
||||
@ -293,4 +289,4 @@ class Browser(object):
|
||||
break
|
||||
except (PermissionError, FileNotFoundError, OSError):
|
||||
pass
|
||||
sleep(.05)
|
||||
sleep(.03)
|
||||
|
@ -10,12 +10,12 @@ from queue import Queue, Empty
|
||||
from threading import Thread, Event
|
||||
from time import perf_counter, sleep
|
||||
|
||||
from requests import get
|
||||
from requests import Session
|
||||
from websocket import (WebSocketTimeoutException, WebSocketConnectionClosedException, create_connection,
|
||||
WebSocketException, WebSocketBadStatusException)
|
||||
|
||||
from .._functions.settings import Settings
|
||||
from ..errors import PageDisconnectedError, TargetNotFoundError
|
||||
from ..errors import PageDisconnectedError
|
||||
|
||||
|
||||
class Driver(object):
|
||||
@ -201,13 +201,10 @@ class Driver(object):
|
||||
try:
|
||||
self._ws = create_connection(self._websocket_url, enable_multithread=True, suppress_origin=True)
|
||||
except WebSocketBadStatusException as e:
|
||||
txt = str(e)
|
||||
if 'No such target id' in txt:
|
||||
raise TargetNotFoundError(f'找不到页面:{self.id}。')
|
||||
elif 'Handshake status 403 Forbidden' in txt:
|
||||
if 'Handshake status 403 Forbidden' in str(e):
|
||||
raise RuntimeError('请升级websocket-client库。')
|
||||
else:
|
||||
raise e
|
||||
return
|
||||
self._recv_th.start()
|
||||
self._handle_event_th.start()
|
||||
return True
|
||||
@ -274,11 +271,13 @@ class BrowserDriver(Driver):
|
||||
self._created = True
|
||||
BrowserDriver.BROWSERS[tab_id] = self
|
||||
super().__init__(tab_id, tab_type, address, owner)
|
||||
self._control_session = Session()
|
||||
self._control_session.trust_env = False
|
||||
|
||||
def __repr__(self):
|
||||
return f'<BrowserDriver {self.id}>'
|
||||
|
||||
def get(self, url):
|
||||
r = get(url, headers={'Connection': 'close'})
|
||||
r = self._control_session.get(url, headers={'Connection': 'close'})
|
||||
r.close()
|
||||
return r
|
||||
|
@ -9,7 +9,7 @@ from queue import Queue
|
||||
from threading import Thread, Event
|
||||
from typing import Union, Callable, Dict, Optional
|
||||
|
||||
from requests import Response
|
||||
from requests import Response, Session
|
||||
from websocket import WebSocket
|
||||
|
||||
from .browser import Browser
|
||||
@ -68,14 +68,10 @@ class Driver(object):
|
||||
class BrowserDriver(Driver):
|
||||
BROWSERS: Dict[str, Driver] = ...
|
||||
owner: Browser = ...
|
||||
_control_session: Session = ...
|
||||
|
||||
def __new__(cls, tab_id: str, tab_type: str, address: str, owner: Browser): ...
|
||||
|
||||
def __init__(self, tab_id: str, tab_type: str, address: str, owner: Browser):
|
||||
"""
|
||||
|
||||
:rtype: object
|
||||
"""
|
||||
...
|
||||
def __init__(self, tab_id: str, tab_type: str, address: str, owner: Browser): ...
|
||||
|
||||
def get(self, url) -> Response: ...
|
||||
|
@ -375,14 +375,14 @@ class ChromiumElement(DrissionElement):
|
||||
"""
|
||||
attrs = self.attrs
|
||||
if attr == 'href': # 获取href属性时返回绝对url
|
||||
link = attrs.get('href', None)
|
||||
link = attrs.get('href')
|
||||
if not link or link.lower().startswith(('javascript:', 'mailto:')):
|
||||
return link
|
||||
else:
|
||||
return make_absolute_link(link, self.property('baseURI'))
|
||||
|
||||
elif attr == 'src':
|
||||
return make_absolute_link(attrs.get('src', None), self.property('baseURI'))
|
||||
return make_absolute_link(attrs.get('src'), self.property('baseURI'))
|
||||
|
||||
elif attr == 'text':
|
||||
return self.text
|
||||
@ -686,7 +686,7 @@ class ChromiumElement(DrissionElement):
|
||||
"""拖拽当前元素到相对位置
|
||||
:param offset_x: x变化值
|
||||
:param offset_y: y变化值
|
||||
:param duration: 拖动用时,传入0即瞬间到j达
|
||||
:param duration: 拖动用时,传入0即瞬间到达
|
||||
:return: None
|
||||
"""
|
||||
curr_x, curr_y = self.rect.midpoint
|
||||
@ -1115,8 +1115,8 @@ class ShadowRoot(BaseElement):
|
||||
r = make_chromium_eles(self.owner, _ids=node_id, is_obj_id=False)
|
||||
return None if r is False else r
|
||||
else:
|
||||
node_ids = [self.owner.run_cdp('DOM.querySelector',
|
||||
nodeId=self._node_id, selector=i)['nodeId'] for i in css]
|
||||
node_ids = [self.owner.run_cdp('DOM.querySelector', nodeId=self._node_id, selector=i)['nodeId']
|
||||
for i in css]
|
||||
if 0 in node_ids:
|
||||
return None
|
||||
r = make_chromium_eles(self.owner, _ids=node_ids, index=index, is_obj_id=False)
|
||||
|
@ -6,7 +6,7 @@
|
||||
@License : BSD 3-Clause.
|
||||
"""
|
||||
from html import unescape
|
||||
from re import match, sub, DOTALL
|
||||
from re import match, sub, DOTALL, search
|
||||
|
||||
from lxml.etree import tostring
|
||||
from lxml.html import HtmlElement, fromstring
|
||||
@ -373,7 +373,11 @@ def make_session_ele(html_or_ele, loc=None, index=1):
|
||||
# ShadowRoot
|
||||
elif isinstance(html_or_ele, BaseElement):
|
||||
page = html_or_ele.owner
|
||||
html_or_ele = fromstring(html_or_ele.html)
|
||||
html = html_or_ele.html
|
||||
r = search(r'^<shadow_root>[ \n]*?<html>[ \n]*?(.*?)[ \n]*?</html>[ \n]*?</shadow_root>$', html)
|
||||
if r:
|
||||
html = r.group(1)
|
||||
html_or_ele = fromstring(html)
|
||||
|
||||
else:
|
||||
raise TypeError('html_or_ele参数只能是元素、页面对象或html文本。')
|
||||
|
@ -8,12 +8,11 @@
|
||||
from json import load, dump, JSONDecodeError
|
||||
from os import environ
|
||||
from pathlib import Path
|
||||
from platform import system
|
||||
from subprocess import Popen, DEVNULL
|
||||
from tempfile import gettempdir
|
||||
from time import perf_counter, sleep
|
||||
|
||||
from requests import get as requests_get
|
||||
from requests import Session
|
||||
|
||||
from .tools import port_is_using
|
||||
from .._configs.options_manage import OptionsManager
|
||||
@ -200,16 +199,21 @@ def test_connect(ip, port, timeout=30):
|
||||
:return: None
|
||||
"""
|
||||
end_time = perf_counter() + timeout
|
||||
s = Session()
|
||||
s.trust_env = False
|
||||
while perf_counter() < end_time:
|
||||
try:
|
||||
tabs = requests_get(f'http://{ip}:{port}/json', timeout=10, headers={'Connection': 'close'},
|
||||
proxies={'http': None, 'https': None}).json()
|
||||
for tab in tabs:
|
||||
r = s.get(f'http://{ip}:{port}/json', timeout=10, headers={'Connection': 'close'})
|
||||
for tab in r.json():
|
||||
if tab['type'] in ('page', 'webview'):
|
||||
r.close()
|
||||
s.close()
|
||||
return
|
||||
r.close()
|
||||
except Exception:
|
||||
sleep(.2)
|
||||
|
||||
s.close()
|
||||
raise BrowserConnectError(f'\n{ip}:{port}浏览器无法链接。\n请确认:\n1、该端口为浏览器\n'
|
||||
f'2、已添加\'--remote-debugging-port={port}\'启动项\n'
|
||||
f'3、用户文件夹没有和已打开的浏览器冲突\n'
|
||||
|
@ -10,7 +10,6 @@ from pathlib import Path
|
||||
from threading import Lock
|
||||
from typing import Union, Tuple
|
||||
|
||||
from ..errors import BaseError
|
||||
from .._pages.chromium_base import ChromiumBase
|
||||
|
||||
|
||||
@ -46,4 +45,4 @@ def wait_until(function: callable, kwargs: dict = None, timeout: float = 10): ..
|
||||
def configs_to_here(file_name: Union[Path, str] = None) -> None: ...
|
||||
|
||||
|
||||
def raise_error(result: dict, ignore: BaseError = None) -> None: ...
|
||||
def raise_error(result: dict, ignore=None) -> None: ...
|
||||
|
@ -8,9 +8,12 @@
|
||||
from datetime import datetime
|
||||
from html import unescape
|
||||
from http.cookiejar import Cookie, CookieJar
|
||||
from re import sub
|
||||
from os.path import sep
|
||||
from pathlib import Path
|
||||
from re import sub, match
|
||||
from urllib.parse import urlparse, urljoin, urlunparse
|
||||
|
||||
from DataRecorder.tools import make_valid_name
|
||||
from tldextract import extract
|
||||
|
||||
|
||||
@ -138,8 +141,11 @@ def make_absolute_link(link, baseURI=None):
|
||||
if not link:
|
||||
return link
|
||||
|
||||
link = link.strip()
|
||||
link = link.strip().replace('\\', '/')
|
||||
parsed = urlparse(link)._asdict()
|
||||
if baseURI:
|
||||
p = urlparse(baseURI)._asdict()
|
||||
baseURI = f'{p["scheme"]}://{p["netloc"]}'
|
||||
|
||||
# 是相对路径,与页面url拼接并返回
|
||||
if not parsed['netloc']:
|
||||
@ -207,7 +213,14 @@ def cookies_to_tuple(cookies):
|
||||
|
||||
elif isinstance(cookies, str):
|
||||
c_dict = {}
|
||||
for attr in cookies.strip().rstrip(';, ').split(',' if ',' in cookies else ';'):
|
||||
r = match(r'.*?=([^=]+)=', cookies)
|
||||
if not r: # 只有一个
|
||||
cookies = [cookies.rstrip(',;')]
|
||||
else:
|
||||
s = match(r'.*([,;]).*', r.group(1)).group(1)
|
||||
cookies = cookies.rstrip(s).split(s)
|
||||
|
||||
for attr in cookies:
|
||||
attr_val = attr.strip().split('=', 1)
|
||||
c_dict[attr_val[0]] = attr_val[1] if len(attr_val) == 2 else True
|
||||
cookies = _dict_cookies_to_tuple(c_dict)
|
||||
@ -314,8 +327,7 @@ def set_browser_cookies(page, cookies):
|
||||
tmp.append(i)
|
||||
|
||||
for i in range(len(tmp)):
|
||||
d = ''.join(tmp[i:])
|
||||
cookie['domain'] = d
|
||||
cookie['domain'] = ''.join(tmp[i:])
|
||||
page.run_cdp_loaded('Network.setCookie', **cookie)
|
||||
if is_cookie_in_driver(page, cookie):
|
||||
break
|
||||
@ -374,6 +386,86 @@ def get_blob(page, url, as_bytes=True):
|
||||
return result
|
||||
|
||||
|
||||
def save_page(tab, path=None, name=None, as_pdf=False, kwargs=None):
|
||||
"""把当前页面保存为文件,如果path和name参数都为None,只返回文本
|
||||
:param tab: Tab或Page对象
|
||||
:param path: 保存路径,为None且name不为None时保存在当前路径
|
||||
:param name: 文件名,为None且path不为None时用title属性值
|
||||
:param as_pdf: 为Ture保存为pdf,否则为mhtml且忽略kwargs参数
|
||||
:param kwargs: pdf生成参数
|
||||
:return: as_pdf为True时返回bytes,否则返回文件文本
|
||||
"""
|
||||
if name:
|
||||
if name.endswith('.pdf'):
|
||||
name = name[:-4]
|
||||
as_pdf = True
|
||||
elif name.endswith('.mhtml'):
|
||||
name = name[:-6]
|
||||
as_pdf = False
|
||||
|
||||
if path:
|
||||
path = Path(path)
|
||||
if path.suffix.lower() == '.mhtml':
|
||||
name = path.stem
|
||||
path = path.parent
|
||||
as_pdf = False
|
||||
elif path.suffix.lower() == '.pdf':
|
||||
name = path.stem
|
||||
path = path.parent
|
||||
as_pdf = True
|
||||
|
||||
return get_pdf(tab, path, name, kwargs) if as_pdf else get_mhtml(tab, path, name)
|
||||
|
||||
|
||||
def get_mhtml(page, path=None, name=None):
|
||||
"""把当前页面保存为mhtml文件,如果path和name参数都为None,只返回mhtml文本
|
||||
:param page: 要保存的页面对象
|
||||
:param path: 保存路径,为None且name不为None时保存在当前路径
|
||||
:param name: 文件名,为None且path不为None时用title属性值
|
||||
:return: mhtml文本
|
||||
"""
|
||||
r = page.run_cdp('Page.captureSnapshot')['data']
|
||||
if path is None and name is None:
|
||||
return r
|
||||
|
||||
path = path or '.'
|
||||
Path(path).mkdir(parents=True, exist_ok=True)
|
||||
name = make_valid_name(name or page.title)
|
||||
with open(f'{path}{sep}{name}.mhtml', 'w', encoding='utf-8') as f:
|
||||
f.write(r.replace('\r\n', '\n'))
|
||||
return r
|
||||
|
||||
|
||||
def get_pdf(page, path=None, name=None, kwargs=None):
|
||||
"""把当前页面保存为pdf文件,如果path和name参数都为None,只返回字节
|
||||
:param page: 要保存的页面对象
|
||||
:param path: 保存路径,为None且name不为None时保存在当前路径
|
||||
:param name: 文件名,为None且path不为None时用title属性值
|
||||
:param kwargs: pdf生成参数
|
||||
:return: pdf文本
|
||||
"""
|
||||
if not kwargs:
|
||||
kwargs = {}
|
||||
kwargs['transferMode'] = 'ReturnAsBase64'
|
||||
if 'printBackground' not in kwargs:
|
||||
kwargs['printBackground'] = True
|
||||
try:
|
||||
r = page.run_cdp('Page.printToPDF', **kwargs)['data']
|
||||
except:
|
||||
raise RuntimeError('保存失败,可能浏览器版本不支持。')
|
||||
from base64 import b64decode
|
||||
r = b64decode(r)
|
||||
if path is None and name is None:
|
||||
return r
|
||||
|
||||
path = path or '.'
|
||||
Path(path).mkdir(parents=True, exist_ok=True)
|
||||
name = make_valid_name(name or page.title)
|
||||
with open(f'{path}{sep}{name}.pdf', 'wb') as f:
|
||||
f.write(r)
|
||||
return r
|
||||
|
||||
|
||||
def tree(ele_or_page):
|
||||
"""把页面或元素对象DOM结构打印出来
|
||||
:param ele_or_page: 页面或元素对象
|
||||
|
@ -6,7 +6,8 @@
|
||||
@License : BSD 3-Clause.
|
||||
"""
|
||||
from http.cookiejar import Cookie
|
||||
from typing import Union
|
||||
from pathlib import Path
|
||||
from typing import Union, Optional
|
||||
|
||||
from requests import Session
|
||||
from requests.cookies import RequestsCookieJar
|
||||
@ -14,6 +15,8 @@ from requests.cookies import RequestsCookieJar
|
||||
from .._base.base import DrissionElement, BaseParser
|
||||
from .._elements.chromium_element import ChromiumElement
|
||||
from .._pages.chromium_base import ChromiumBase
|
||||
from .._pages.chromium_page import ChromiumPage
|
||||
from .._pages.chromium_tab import ChromiumTab
|
||||
|
||||
|
||||
def get_ele_txt(e: DrissionElement) -> str: ...
|
||||
@ -52,6 +55,24 @@ def is_cookie_in_driver(page: ChromiumBase, cookie: dict) -> bool: ...
|
||||
def get_blob(page: ChromiumBase, url: str, as_bytes: bool = True) -> bytes: ...
|
||||
|
||||
|
||||
def save_page(tab: Union[ChromiumPage, ChromiumTab],
|
||||
path: Union[Path, str, None] = None,
|
||||
name: Optional[str] = None,
|
||||
as_pdf: bool = False,
|
||||
kwargs: dict = None) -> Union[bytes, str]: ...
|
||||
|
||||
|
||||
def get_mhtml(page: Union[ChromiumPage, ChromiumTab],
|
||||
path: Optional[Path] = None,
|
||||
name: Optional[str] = None) -> Union[bytes, str]: ...
|
||||
|
||||
|
||||
def get_pdf(page: Union[ChromiumPage, ChromiumTab],
|
||||
path: Optional[Path] = None,
|
||||
name: Optional[str] = None,
|
||||
kwargs: dict = None) -> Union[bytes, str]: ...
|
||||
|
||||
|
||||
def tree(ele_or_page: BaseParser) -> None: ...
|
||||
|
||||
|
||||
|
@ -1065,7 +1065,7 @@ class ChromiumBase(BasePage):
|
||||
name = f'{self.title}.jpg'
|
||||
elif not name.endswith(('.jpg', '.jpeg', '.png', '.webp')):
|
||||
name = f'{name}.jpg'
|
||||
path = f'{path}{sep}{name}'
|
||||
path = f'{path}{sep}{make_valid_name(name)}'
|
||||
|
||||
path = Path(path)
|
||||
pic_type = path.suffix.lower()
|
||||
@ -1228,50 +1228,3 @@ def close_privacy_dialog(page, tid):
|
||||
|
||||
except:
|
||||
pass
|
||||
|
||||
|
||||
def get_mhtml(page, path=None, name=None):
|
||||
"""把当前页面保存为mhtml文件,如果path和name参数都为None,只返回mhtml文本
|
||||
:param page: 要保存的页面对象
|
||||
:param path: 保存路径,为None且name不为None时保存在当前路径
|
||||
:param name: 文件名,为None且path不为None时用title属性值
|
||||
:return: mhtml文本
|
||||
"""
|
||||
r = page.run_cdp('Page.captureSnapshot')['data']
|
||||
if path is None and name is None:
|
||||
return r
|
||||
path = path or '.'
|
||||
Path(path).mkdir(parents=True, exist_ok=True)
|
||||
name = make_valid_name(name or page.title)
|
||||
with open(f'{path}{sep}{name}.mhtml', 'w', encoding='utf-8') as f:
|
||||
f.write(r.replace('\r\n', '\n'))
|
||||
return r
|
||||
|
||||
|
||||
def get_pdf(page, path=None, name=None, kwargs=None):
|
||||
"""把当前页面保存为pdf文件,如果path和name参数都为None,只返回字节
|
||||
:param page: 要保存的页面对象
|
||||
:param path: 保存路径,为None且name不为None时保存在当前路径
|
||||
:param name: 文件名,为None且path不为None时用title属性值
|
||||
:param kwargs: pdf生成参数
|
||||
:return: pdf文本
|
||||
"""
|
||||
if not kwargs:
|
||||
kwargs = {}
|
||||
kwargs['transferMode'] = 'ReturnAsBase64'
|
||||
if 'printBackground' not in kwargs:
|
||||
kwargs['printBackground'] = True
|
||||
try:
|
||||
r = page.run_cdp('Page.printToPDF', **kwargs)['data']
|
||||
except:
|
||||
raise RuntimeError('保存失败,可能浏览器版本不支持。')
|
||||
from base64 import b64decode
|
||||
r = b64decode(r)
|
||||
if path is None and name is None:
|
||||
return r
|
||||
path = path or '.'
|
||||
Path(path).mkdir(parents=True, exist_ok=True)
|
||||
name = make_valid_name(name or page.title)
|
||||
with open(f'{path}{sep}{name}.pdf', 'wb') as f:
|
||||
f.write(r)
|
||||
return r
|
||||
|
@ -58,11 +58,11 @@ class ChromiumFrame(ChromiumBase):
|
||||
|
||||
self._rect = None
|
||||
self._type = 'ChromiumFrame'
|
||||
end_time = perf_counter() + 2
|
||||
while perf_counter() < end_time: # todo: 优化
|
||||
if self.url not in (None, 'about:blank'):
|
||||
break
|
||||
sleep(.1)
|
||||
# end_time = perf_counter() + 2
|
||||
# while perf_counter() < end_time:
|
||||
# if self.url not in (None, 'about:blank'):
|
||||
# break
|
||||
# sleep(.1)
|
||||
|
||||
def __call__(self, locator, index=1, timeout=None):
|
||||
"""在内部查找元素
|
||||
|
@ -9,14 +9,15 @@ from pathlib import Path
|
||||
from threading import Lock
|
||||
from time import sleep, perf_counter
|
||||
|
||||
from requests import get
|
||||
from requests import Session
|
||||
|
||||
from .._base.browser import Browser
|
||||
from .._configs.chromium_options import ChromiumOptions
|
||||
from .._functions.browser import connect_browser
|
||||
from .._functions.settings import Settings
|
||||
from .._functions.tools import PortFinder
|
||||
from .._pages.chromium_base import ChromiumBase, get_mhtml, get_pdf, Timeout
|
||||
from .._functions.web import save_page
|
||||
from .._pages.chromium_base import ChromiumBase, Timeout
|
||||
from .._pages.chromium_tab import ChromiumTab
|
||||
from .._units.setter import ChromiumPageSetter
|
||||
from .._units.waiter import PageWaiter
|
||||
@ -76,9 +77,13 @@ class ChromiumPage(ChromiumBase):
|
||||
if self._is_exist and self._chromium_options._headless is False and 'headless' in r['userAgent'].lower():
|
||||
self._browser.quit(3)
|
||||
connect_browser(self._chromium_options)
|
||||
ws = get(f'http://{self._chromium_options.address}/json/version', headers={'Connection': 'close'})
|
||||
ws = ws.json()['webSocketDebuggerUrl'].split('/')[-1]
|
||||
self._browser = Browser(self._chromium_options.address, ws, self)
|
||||
s = Session()
|
||||
s.trust_env = False
|
||||
ws = s.get(f'http://{self._chromium_options.address}/json/version', headers={'Connection': 'close'})
|
||||
bid = ws.json()['webSocketDebuggerUrl'].split('/')[-1]
|
||||
self._browser = Browser(self._chromium_options.address, bid, self)
|
||||
ws.close()
|
||||
s.close()
|
||||
|
||||
def _d_set_runtime_settings(self):
|
||||
"""设置运行时用到的属性"""
|
||||
@ -154,7 +159,7 @@ class ChromiumPage(ChromiumBase):
|
||||
:param kwargs: pdf生成参数
|
||||
:return: as_pdf为True时返回bytes,否则返回文件文本
|
||||
"""
|
||||
return get_pdf(self, path, name, kwargs) if as_pdf else get_mhtml(self, path, name)
|
||||
return save_page(self, path, name, as_pdf, kwargs)
|
||||
|
||||
def get_tab(self, id_or_num=None, title=None, url=None, tab_type='page', as_id=False):
|
||||
"""获取一个标签页对象,id_or_num不为None时,后面几个参数无效
|
||||
@ -336,12 +341,16 @@ def run_browser(chromium_options):
|
||||
"""连接浏览器"""
|
||||
is_exist = connect_browser(chromium_options)
|
||||
try:
|
||||
ws = get(f'http://{chromium_options.address}/json/version', headers={'Connection': 'close'})
|
||||
s = Session()
|
||||
s.trust_env = False
|
||||
ws = s.get(f'http://{chromium_options.address}/json/version', headers={'Connection': 'close'})
|
||||
if not ws:
|
||||
raise BrowserConnectError('\n浏览器连接失败,如使用全局代理,须设置不代理127.0.0.1地址。')
|
||||
browser_id = ws.json()['webSocketDebuggerUrl'].split('/')[-1]
|
||||
ws.close()
|
||||
s.close()
|
||||
except KeyError:
|
||||
raise BrowserConnectError('浏览器版本太旧,请升级。')
|
||||
raise BrowserConnectError('浏览器版本太旧或此浏览器不支持接管。')
|
||||
except:
|
||||
raise BrowserConnectError('\n浏览器连接失败,如使用全局代理,须设置不代理127.0.0.1地址。')
|
||||
return is_exist, browser_id
|
||||
|
@ -11,8 +11,8 @@ from time import sleep
|
||||
from .._base.base import BasePage
|
||||
from .._configs.session_options import SessionOptions
|
||||
from .._functions.settings import Settings
|
||||
from .._functions.web import set_session_cookies, set_browser_cookies
|
||||
from .._pages.chromium_base import ChromiumBase, get_mhtml, get_pdf
|
||||
from .._functions.web import set_session_cookies, set_browser_cookies, save_page
|
||||
from .._pages.chromium_base import ChromiumBase
|
||||
from .._pages.session_page import SessionPage
|
||||
from .._units.setter import TabSetter, WebPageTabSetter
|
||||
from .._units.waiter import TabWaiter
|
||||
@ -91,7 +91,7 @@ class ChromiumTab(ChromiumBase):
|
||||
:param kwargs: pdf生成参数
|
||||
:return: as_pdf为True时返回bytes,否则返回文件文本
|
||||
"""
|
||||
return get_pdf(self, path, name, kwargs) if as_pdf else get_mhtml(self, path, name)
|
||||
return save_page(self, path, name, as_pdf, kwargs)
|
||||
|
||||
def __repr__(self):
|
||||
return f'<ChromiumTab browser_id={self.browser.id} tab_id={self.tab_id}>'
|
||||
|
@ -24,6 +24,7 @@ class Actions:
|
||||
self.modifier = 0 # 修饰符,Alt=1, Ctrl=2, Meta/Command=4, Shift=8
|
||||
self.curr_x = 0 # 视口坐标
|
||||
self.curr_y = 0
|
||||
self._holding = 'left'
|
||||
|
||||
def move_to(self, ele_or_loc, offset_x=0, offset_y=0, duration=.5):
|
||||
"""鼠标移动到元素中点,或页面上的某个绝对坐标。可设置偏移量
|
||||
@ -86,7 +87,7 @@ class Actions:
|
||||
t = perf_counter()
|
||||
self.curr_x = x
|
||||
self.curr_y = y
|
||||
self._dr.run('Input.dispatchMouseEvent', type='mouseMoved',
|
||||
self._dr.run('Input.dispatchMouseEvent', type='mouseMoved', button=self._holding,
|
||||
x=self.curr_x, y=self.curr_y, modifiers=self.modifier)
|
||||
ss = .02 - perf_counter() + t
|
||||
if ss > 0:
|
||||
@ -191,6 +192,7 @@ class Actions:
|
||||
self.move_to(on_ele, duration=0)
|
||||
self._dr.run('Input.dispatchMouseEvent', type='mousePressed', button=button, clickCount=count,
|
||||
x=self.curr_x, y=self.curr_y, modifiers=self.modifier)
|
||||
self._holding = button
|
||||
return self
|
||||
|
||||
def _release(self, button):
|
||||
@ -200,12 +202,13 @@ class Actions:
|
||||
"""
|
||||
self._dr.run('Input.dispatchMouseEvent', type='mouseReleased', button=button, clickCount=1,
|
||||
x=self.curr_x, y=self.curr_y, modifiers=self.modifier)
|
||||
self._holding = 'left'
|
||||
return self
|
||||
|
||||
def scroll(self, delta_x=0, delta_y=0, on_ele=None):
|
||||
def scroll(self, delta_y=0, delta_x=0, on_ele=None):
|
||||
"""滚动鼠标滚轮,可先移动到元素上
|
||||
:param delta_x: 滚轮变化值x
|
||||
:param delta_y: 滚轮变化值y
|
||||
:param delta_x: 滚轮变化值x
|
||||
:param on_ele: ChromiumElement元素
|
||||
:return: self
|
||||
"""
|
||||
|
@ -50,11 +50,12 @@ class Actions:
|
||||
self.modifier: int = ...
|
||||
self.curr_x: int = ...
|
||||
self.curr_y: int = ...
|
||||
self._holding: str = ...
|
||||
|
||||
def move_to(self, ele_or_loc: Union[ChromiumElement, Tuple[int, int], str],
|
||||
offset_x: int = 0, offset_y: int = 0, duration: float = .5) -> Actions: ...
|
||||
def move_to(self, ele_or_loc: Union[ChromiumElement, Tuple[float, float], str],
|
||||
offset_x: float = 0, offset_y: float = 0, duration: float = .5) -> Actions: ...
|
||||
|
||||
def move(self, offset_x: int = 0, offset_y: int = 0, duration: float = .5) -> Actions: ...
|
||||
def move(self, offset_x: float = 0, offset_y: float = 0, duration: float = .5) -> Actions: ...
|
||||
|
||||
def click(self, on_ele: Union[ChromiumElement, str] = None) -> Actions: ...
|
||||
|
||||
@ -81,7 +82,7 @@ class Actions:
|
||||
|
||||
def _release(self, button: str) -> Actions: ...
|
||||
|
||||
def scroll(self, delta_x: int = 0, delta_y: int = 0,
|
||||
def scroll(self, delta_y: int = 0, delta_x: int = 0,
|
||||
on_ele: Union[ChromiumElement, str] = None) -> Actions: ...
|
||||
|
||||
def up(self, pixel: int) -> Actions: ...
|
||||
|
@ -87,8 +87,8 @@ class Clicker(object):
|
||||
x = rect[1][0] - (rect[1][0] - rect[0][0]) / 2
|
||||
y = rect[0][0] + 3
|
||||
try:
|
||||
r = self._ele.owner.run_cdp('DOM.getNodeForLocation', x=x, y=y, includeUserAgentShadowDOM=True,
|
||||
ignorePointerEventsNone=True)
|
||||
r = self._ele.owner.run_cdp('DOM.getNodeForLocation', x=int(x), y=int(y),
|
||||
includeUserAgentShadowDOM=True, ignorePointerEventsNone=True)
|
||||
if r['backendNodeId'] != self._ele._backend_id:
|
||||
vx, vy = self._ele.rect.viewport_midpoint
|
||||
else:
|
||||
@ -113,11 +113,19 @@ class Clicker(object):
|
||||
x, y = self._ele.rect.viewport_click_point
|
||||
self._click(x, y, 'right')
|
||||
|
||||
def middle(self):
|
||||
"""中键单击"""
|
||||
def middle(self, get_tab=True):
|
||||
"""中键单击,默认返回新出现的tab对象
|
||||
:param get_tab: 是否返回新tab对象,为False则返回None
|
||||
:return: Tab对象或None
|
||||
"""
|
||||
self._ele.owner.scroll.to_see(self._ele)
|
||||
x, y = self._ele.rect.viewport_click_point
|
||||
self._click(x, y, 'middle')
|
||||
if get_tab:
|
||||
tid = self._ele.page.wait.new_tab()
|
||||
if not tid:
|
||||
raise RuntimeError('没有出现新标签页。')
|
||||
return self._ele.page.get_tab(tid)
|
||||
|
||||
def at(self, offset_x=None, offset_y=None, button='left', count=1):
|
||||
"""带偏移量点击本元素,相对于左上角坐标。不传入x或y值时点击元素中间点
|
||||
|
@ -23,7 +23,7 @@ class Clicker(object):
|
||||
|
||||
def right(self) -> None: ...
|
||||
|
||||
def middle(self) -> None: ...
|
||||
def middle(self, get_tab: bool = True) -> Union[ChromiumTab, WebPageTab, None]: ...
|
||||
|
||||
def at(self,
|
||||
offset_x: float = None,
|
||||
@ -39,10 +39,10 @@ class Clicker(object):
|
||||
suffix: str = None,
|
||||
new_tab: bool = False,
|
||||
by_js: bool = False,
|
||||
timeout:float=None) -> DownloadMission: ...
|
||||
timeout: float = None) -> DownloadMission: ...
|
||||
|
||||
def to_upload(self, file_paths: Union[str, Path, list, tuple], by_js: bool = False) -> None: ...
|
||||
|
||||
def for_new_tab(self, by_js:bool=False)->Union[ChromiumTab, WebPageTab]:...
|
||||
def for_new_tab(self, by_js: bool = False) -> Union[ChromiumTab, WebPageTab]: ...
|
||||
|
||||
def _click(self, client_x: float, client_y: float, button: str = 'left', count: int = 1) -> None: ...
|
||||
|
@ -128,7 +128,7 @@ class Listener(object):
|
||||
raise RuntimeError('监听未启动或已暂停。')
|
||||
if not timeout:
|
||||
while self._caught.qsize() < count:
|
||||
sleep(.05)
|
||||
sleep(.03)
|
||||
fail = False
|
||||
|
||||
else:
|
||||
@ -140,6 +140,7 @@ class Listener(object):
|
||||
if self._caught.qsize() >= count:
|
||||
fail = False
|
||||
break
|
||||
sleep(.03)
|
||||
|
||||
if fail:
|
||||
if fit_count or not self._caught.qsize():
|
||||
@ -177,7 +178,7 @@ class Listener(object):
|
||||
caught += gap
|
||||
if caught >= count:
|
||||
return
|
||||
sleep(.05)
|
||||
sleep(.03)
|
||||
|
||||
def stop(self):
|
||||
"""停止监听,清空已监听到的列表"""
|
||||
|
@ -16,7 +16,7 @@ class ElementRect(object):
|
||||
|
||||
@property
|
||||
def corners(self):
|
||||
"""返回元素四个角坐标,顺序:坐上、右上、右下、左下,没有大小的元素抛出NoRectError"""
|
||||
"""返回元素四个角坐标,顺序:左上、右上、右下、左下,没有大小的元素抛出NoRectError"""
|
||||
vr = self._get_viewport_rect('border')
|
||||
r = self._ele.owner.run_cdp_loaded('Page.getLayoutMetrics')['visualViewport']
|
||||
sx = r['pageX']
|
||||
@ -25,7 +25,7 @@ class ElementRect(object):
|
||||
|
||||
@property
|
||||
def viewport_corners(self):
|
||||
"""返回元素四个角视口坐标,顺序:坐上、右上、右下、左下,没有大小的元素抛出NoRectError"""
|
||||
"""返回元素四个角视口坐标,顺序:左上、右上、右下、左下,没有大小的元素抛出NoRectError"""
|
||||
r = self._get_viewport_rect('border')
|
||||
return (r[0], r[1]), (r[2], r[3]), (r[4], r[5]), (r[6], r[7])
|
||||
|
||||
@ -225,10 +225,10 @@ class FrameRect(object):
|
||||
|
||||
@property
|
||||
def corners(self):
|
||||
"""返回元素四个角坐标,顺序:坐上、右上、右下、左下"""
|
||||
"""返回元素四个角坐标,顺序:左上、右上、右下、左下"""
|
||||
return self._frame.frame_ele.rect.corners
|
||||
|
||||
@property
|
||||
def viewport_corners(self):
|
||||
"""返回元素四个角视口坐标,顺序:坐上、右上、右下、左下"""
|
||||
"""返回元素四个角视口坐标,顺序:左上、右上、右下、左下"""
|
||||
return self._frame.frame_ele.rect.viewport_corners
|
||||
|
Loading…
x
Reference in New Issue
Block a user