4.0.4.2(+)

增加add_ele()
run_js()可读取文件
修复旧版python中get()报错问题
click.multiple()改为click.multi()
This commit is contained in:
g1879 2024-01-29 23:06:11 +08:00
parent 1a6418918c
commit 7e08dea72e
10 changed files with 91 additions and 63 deletions

View File

@ -6,7 +6,9 @@
@License : BSD 3-Clause.
"""
from abc import abstractmethod
from pathlib import Path
from re import sub
from urllib.parse import quote
from DownloadKit import DownloadKit
@ -395,6 +397,24 @@ class BasePage(BaseParser):
self._DownloadKit = DownloadKit(driver=self, goal_path=self.download_path)
return self._DownloadKit
def _before_connect(self, url, retry, interval):
"""连接前的准备
:param url: 要访问的url
:param retry: 重试次数
:param interval: 重试间隔
:return: 重试次数间隔是否文件组成的tuple
"""
is_file = False
if isinstance(url, Path) or '://' not in url or ':\\\\' not in url:
p = Path(url)
if p.exists():
url = str(p.absolute())
is_file = True
self._url = quote(url, safe='-_.~!*\'"();:@&=+$,/\\?#[]%')
retry = retry if retry is not None else self.retry_times
interval = interval if interval is not None else self.retry_interval
return retry, interval, is_file
# ----------------以下属性或方法由后代实现----------------
@property
def url(self):

View File

@ -218,6 +218,8 @@ class BasePage(BaseParser):
@property
def download(self) -> DownloadKit: ...
def _before_connect(self, url: str, retry: int, interval: float) -> tuple: ...
# ----------------以下属性或方法由后代实现----------------
@property
def url(self) -> str: ...

View File

@ -1409,6 +1409,13 @@ def run_js(page_or_ele, script, as_expr, timeout, args=None):
if page.states.has_alert:
raise AlertExistsError
try:
if Path(script).exists():
with open(script, 'r', encoding='utf-8') as f:
script = f.read()
except OSError:
pass
end_time = perf_counter() + timeout
try:
if as_expr:

View File

@ -11,7 +11,6 @@ from pathlib import Path
from re import findall
from threading import Thread
from time import perf_counter, sleep
from urllib.parse import quote
from DataRecorder.tools import make_valid_name
@ -431,7 +430,7 @@ class ChromiumBase(BasePage):
def run_js(self, script, *args, as_expr=False, timeout=None):
"""运行javascript代码
:param script: js文本
:param script: js文本或js文件路径
:param args: 参数按顺序在js文本中对应arguments[0]arguments[1]...
:param as_expr: 是否作为表达式运行为True时args无效
:param timeout: js超时时间为None则使用页面timeouts.script设置
@ -441,7 +440,7 @@ class ChromiumBase(BasePage):
def run_js_loaded(self, script, *args, as_expr=False, timeout=None):
"""运行javascript代码执行前等待页面加载完毕
:param script: js文本
:param script: js文本或js文件路径
:param args: 参数按顺序在js文本中对应arguments[0]arguments[1]...
:param as_expr: 是否作为表达式运行为True时args无效
:param timeout: js超时时间为None则使用页面timeouts.script属性值
@ -451,7 +450,7 @@ class ChromiumBase(BasePage):
return run_js(self, script, as_expr, self.timeouts.script if timeout is None else timeout, args)
def run_async_js(self, script, *args, as_expr=False):
"""以异步方式执行js代码
"""以异步方式执行js代码或js文件路径
:param script: js文本
:param args: 参数按顺序在js文本中对应arguments[0]arguments[1]...
:param as_expr: 是否作为表达式运行为True时args无效
@ -468,7 +467,7 @@ class ChromiumBase(BasePage):
:param timeout: 连接超时时间为None时使用页面对象timeouts.page_load属性值
:return: 目标url是否可用
"""
retry, interval = self._before_connect(url, retry, interval)
retry, interval, is_file = self._before_connect(url, retry, interval)
self._url_available = self._d_connect(self._url, times=retry, interval=interval,
show_errmsg=show_errmsg, timeout=timeout)
return self._url_available
@ -669,6 +668,33 @@ class ChromiumBase(BasePage):
if ele:
self.run_cdp('DOM.removeNode', nodeId=ele._node_id)
def add_ele(self, outerHTML, insert_to, before=None):
"""新建一个元素
:param outerHTML: 新元素的html文本
:param insert_to: 插入到哪个元素中可接收元素对象和定位符为None添加到body
:param before: 在哪个子节点前面插入可接收对象和定位符为None插入到父元素末尾
:return: 元素对象
"""
insert_to = self.ele(insert_to) if insert_to else self.ele('t:body')
args = [outerHTML, insert_to]
if before:
args.append(self.ele(before))
js = '''
ele = document.createElement(null);
arguments[1].insertBefore(ele, arguments[2]);
ele.outerHTML = arguments[0];
return arguments[2].previousElementSibling;
'''
else:
js = '''
ele = document.createElement(null);
arguments[1].appendChild(ele);
ele.outerHTML = arguments[0];
return arguments[1].lastElementChild;
'''
ele = self.run_js(js, *args)
return ele
def get_frame(self, loc_ind_ele, timeout=None):
"""获取页面中一个frame对象
:param loc_ind_ele: 定位符iframe序号ChromiumFrame对象序号从1开始可传入负数获取倒数第几个
@ -933,22 +959,6 @@ class ChromiumBase(BasePage):
pass
return False
def _before_connect(self, url, retry, interval):
"""连接前的准备
:param url: 要访问的url
:param retry: 重试次数
:param interval: 重试间隔
:return: 重试次数和间隔组成的tuple
"""
p = Path(url)
if p.exists():
self._url = str(p.absolute())
else:
self._url = quote(url, safe='-_.~!*\'"();:@&=+$,/\\?#[]%') or 'chrome://newtab/'
retry = retry if retry is not None else self.retry_times
interval = interval if interval is not None else self.retry_interval
return retry, interval
def _d_connect(self, to_url, times=0, interval=1, show_errmsg=False, timeout=None):
"""尝试连接,重试若干次
:param to_url: 要访问的url

View File

@ -167,11 +167,11 @@ class ChromiumBase(BasePage):
@property
def states(self) -> PageStates: ...
def run_js(self, script: str, *args, as_expr: bool = False, timeout: float = None) -> Any: ...
def run_js(self, script: Union[str, Path], *args, as_expr: bool = False, timeout: float = None) -> Any: ...
def run_js_loaded(self, script: str, *args, as_expr: bool = False, timeout: float = None) -> Any: ...
def run_js_loaded(self, script: Union[str, Path], *args, as_expr: bool = False, timeout: float = None) -> Any: ...
def run_async_js(self, script: str, *args, as_expr: bool = False) -> None: ...
def run_async_js(self, script: Union[str, Path], *args, as_expr: bool = False) -> None: ...
def get(self, url: str, show_errmsg: bool = False, retry: int = None,
interval: float = None, timeout: float = None) -> Union[None, bool]: ...
@ -214,6 +214,11 @@ class ChromiumBase(BasePage):
def remove_ele(self, loc_or_ele: Union[ChromiumElement, ChromiumFrame, str, Tuple[str, str]]) -> None: ...
def add_ele(self,
outerHTML: str,
insert_to: Optional[ChromiumElement, str, Tuple[str, str]],
before: Optional[ChromiumElement, str, Tuple[str, str]] = None) -> ChromiumElement: ...
def get_frame(self, loc_ind_ele: Union[str, int, tuple, ChromiumFrame], timeout: float = None) -> ChromiumFrame: ...
def get_frames(self, locator: Union[str, tuple] = None, timeout: float = None) -> List[ChromiumFrame]: ...
@ -260,8 +265,6 @@ class ChromiumBase(BasePage):
def _on_alert_open(self, **kwargs): ...
def _before_connect(self, url: str, retry: int, interval: float) -> tuple: ...
def _d_connect(self, to_url: str, times: int = 0, interval: float = 1, show_errmsg: bool = False,
timeout: float = None) -> Union[bool, None]: ...

View File

@ -8,7 +8,7 @@
from pathlib import Path
from re import search, DOTALL
from time import sleep
from urllib.parse import urlparse, quote
from urllib.parse import urlparse
from requests import Session, Response
from requests.structures import CaseInsensitiveDict
@ -154,19 +154,16 @@ class SessionPage(BasePage):
:param kwargs: 连接参数
:return: url是否可用
"""
if isinstance(url, Path):
url = str(url.absolute())
if not url.lower().startswith('http'):
if url.startswith('file:///'):
url = url[8:]
if Path(url).exists():
with open(url, 'rb') as f:
r = Response()
r._content = f.read()
r.status_code = 200
self._response = r
return
return self._s_connect(url, 'get', show_errmsg, retry, interval, **kwargs)
retry, interval, is_file = self._before_connect(url.lstrip('file:///'), retry, interval)
if is_file:
with open(self._url, 'rb') as f:
r = Response()
r._content = f.read()
r.status_code = 200
r.url = self._url
self._response = r
return True
return self._s_connect(self._url, 'get', show_errmsg, retry, interval, **kwargs)
def post(self, url, show_errmsg=False, retry=None, interval=None, **kwargs):
"""用post方式跳转到url
@ -256,18 +253,6 @@ class SessionPage(BasePage):
if self._response is not None:
self._response.close()
def _before_connect(self, url, retry, interval):
"""连接前的准备
:param url: 要访问的url
:param retry: 重试次数
:param interval: 重试间隔
:return: 重试次数和间隔组成的tuple
"""
self._url = quote(url, safe='-_.~!*\'"();:@&=+$,/\\?#[]%')
retry = retry if retry is not None else self.retry_times
interval = interval if interval is not None else self.retry_interval
return retry, interval
def _s_connect(self, url, mode, show_errmsg=False, retry=None, interval=None, **kwargs):
"""执行get或post连接
:param url: 目标url
@ -278,7 +263,7 @@ class SessionPage(BasePage):
:param kwargs: 连接参数
:return: url是否可用
"""
retry, interval = self._before_connect(url, retry, interval)
retry, interval, is_file = self._before_connect(url, retry, interval)
self._response, info = self._make_response(self._url, mode, retry, interval, show_errmsg, **kwargs)
if self._response is None:

View File

@ -152,8 +152,6 @@ class SessionPage(BasePage):
def close(self) -> None: ...
def _before_connect(self, url: str, retry: int, interval: float) -> tuple: ...
def _s_connect(self,
url: str,
mode: str,

View File

@ -273,12 +273,8 @@ class WebPage(SessionPage, ChromiumPage, BasePage):
if copy_cookies:
self.cookies_to_session()
if go:
url = super(SessionPage, self).url
if url.startswith('http'):
r = self.get(url)
if not r:
raise ConnectionError('s模式访问失败请设置go=False自行构造连接参数进行访问。')
if go and not self.get(super(SessionPage, self).url):
raise ConnectionError('s模式访问失败请设置go=False自行构造连接参数进行访问。')
def cookies_to_session(self, copy_user_agent=True):
"""把driver对象的cookies复制到session对象

View File

@ -133,7 +133,7 @@ class Clicker(object):
x, y = offset_scroll(self._ele, offset_x, offset_y)
self._click(x, y, button, count)
def multiple(self, times=2):
def multi(self, times=2):
"""多次点击
:param times: 默认双击
:return: None
@ -159,3 +159,10 @@ class Clicker(object):
def twice(self):
"""双击元素"""
self.at(count=2)
def multiple(self, times=2):
"""多次点击
:param times: 默认双击
:return: None
"""
self.at(count=times)

View File

@ -24,6 +24,6 @@ class Clicker(object):
def at(self, offset_x: float = None, offset_y: float = None, button: str = 'left', count: int = 1) -> None: ...
def multiple(self, times: int = 2) -> None: ...
def multi(self, times: int = 2) -> None: ...
def _click(self, client_x: float, client_y: float, button: str = 'left', count: int = 1) -> None: ...