Pre Merge pull request !40 from g1879/dev

This commit is contained in:
g1879 2024-01-29 15:06:58 +00:00 committed by Gitee
commit ee0b80002d
No known key found for this signature in database
GPG Key ID: 173E9B9CA92EEF8F
15 changed files with 141 additions and 69 deletions

View File

@ -14,4 +14,4 @@ from ._configs.chromium_options import ChromiumOptions
from ._configs.session_options import SessionOptions
__all__ = ['ChromiumPage', 'ChromiumOptions', 'SessionOptions', 'SessionPage', 'WebPage', '__version__']
__version__ = '4.0.4.1'
__version__ = '4.0.4.2'

View File

@ -6,7 +6,9 @@
@License : BSD 3-Clause.
"""
from abc import abstractmethod
from pathlib import Path
from re import sub
from urllib.parse import quote
from DownloadKit import DownloadKit
@ -395,6 +397,24 @@ class BasePage(BaseParser):
self._DownloadKit = DownloadKit(driver=self, goal_path=self.download_path)
return self._DownloadKit
def _before_connect(self, url, retry, interval):
"""连接前的准备
:param url: 要访问的url
:param retry: 重试次数
:param interval: 重试间隔
:return: 重试次数间隔是否文件组成的tuple
"""
is_file = False
if isinstance(url, Path) or '://' not in url or ':\\\\' not in url:
p = Path(url)
if p.exists():
url = str(p.absolute())
is_file = True
self._url = quote(url, safe='-_.~!*\'"();:@&=+$,/\\?#[]%')
retry = retry if retry is not None else self.retry_times
interval = interval if interval is not None else self.retry_interval
return retry, interval, is_file
# ----------------以下属性或方法由后代实现----------------
@property
def url(self):

View File

@ -218,6 +218,8 @@ class BasePage(BaseParser):
@property
def download(self) -> DownloadKit: ...
def _before_connect(self, url: str, retry: int, interval: float) -> tuple: ...
# ----------------以下属性或方法由后代实现----------------
@property
def url(self) -> str: ...

View File

@ -267,6 +267,21 @@ class ChromiumOptions(object):
self.clear_file_flags = True
return self
def clear_flags(self):
"""清空本对象已设置的flag参数"""
self._flags = {}
return self
def clear_arguments(self):
"""清空本对象已设置的argument参数"""
self._arguments = []
return self
def clear_prefs(self):
"""清空本对象已设置的pref参数"""
self._prefs = {}
return self
def set_timeouts(self, base=None, page_load=None, script=None, implicit=None):
"""设置超时时间,单位为秒
:param base: 默认超时时间

View File

@ -109,8 +109,14 @@ class ChromiumOptions(object):
def clear_flags_in_file(self) -> ChromiumOptions: ...
def set_timeouts(self,
base: float = None,
def clear_flags(self) -> ChromiumOptions: ...
def clear_arguments(self) -> ChromiumOptions: ...
def clear_prefs(self) -> ChromiumOptions: ...
def set_timeouts(self,
base: float = None,
page_load: float = None,
script: float = None) -> ChromiumOptions: ...

View File

@ -198,6 +198,11 @@ class SessionOptions(object):
return self
def clear_headers(self):
"""清空已设置的header参数"""
self._headers = None
self._del_set.add('headers')
@property
def cookies(self):
"""以list形式返回cookies"""

View File

@ -55,6 +55,8 @@ class SessionOptions(object):
def remove_a_header(self, name: str) -> SessionOptions: ...
def clear_headers(self) -> SessionOptions: ...
@property
def cookies(self) -> list: ...

View File

@ -1409,6 +1409,13 @@ def run_js(page_or_ele, script, as_expr, timeout, args=None):
if page.states.has_alert:
raise AlertExistsError
try:
if Path(script).exists():
with open(script, 'r', encoding='utf-8') as f:
script = f.read()
except OSError:
pass
end_time = perf_counter() + timeout
try:
if as_expr:

View File

@ -11,7 +11,6 @@ from pathlib import Path
from re import findall
from threading import Thread
from time import perf_counter, sleep
from urllib.parse import quote
from DataRecorder.tools import make_valid_name
@ -264,7 +263,6 @@ class ChromiumBase(BasePage):
self.stop_loading()
# ----------挂件----------
@property
def wait(self):
"""返回用于等待的对象"""
@ -329,7 +327,7 @@ class ChromiumBase(BasePage):
"""返回timeouts设置"""
return self._timeouts
# ----------挂件----------
# ----------挂件结束----------
@property
def browser(self):
@ -432,7 +430,7 @@ class ChromiumBase(BasePage):
def run_js(self, script, *args, as_expr=False, timeout=None):
"""运行javascript代码
:param script: js文本
:param script: js文本或js文件路径
:param args: 参数按顺序在js文本中对应arguments[0]arguments[1]...
:param as_expr: 是否作为表达式运行为True时args无效
:param timeout: js超时时间为None则使用页面timeouts.script设置
@ -442,7 +440,7 @@ class ChromiumBase(BasePage):
def run_js_loaded(self, script, *args, as_expr=False, timeout=None):
"""运行javascript代码执行前等待页面加载完毕
:param script: js文本
:param script: js文本或js文件路径
:param args: 参数按顺序在js文本中对应arguments[0]arguments[1]...
:param as_expr: 是否作为表达式运行为True时args无效
:param timeout: js超时时间为None则使用页面timeouts.script属性值
@ -452,7 +450,7 @@ class ChromiumBase(BasePage):
return run_js(self, script, as_expr, self.timeouts.script if timeout is None else timeout, args)
def run_async_js(self, script, *args, as_expr=False):
"""以异步方式执行js代码
"""以异步方式执行js代码或js文件路径
:param script: js文本
:param args: 参数按顺序在js文本中对应arguments[0]arguments[1]...
:param as_expr: 是否作为表达式运行为True时args无效
@ -469,7 +467,7 @@ class ChromiumBase(BasePage):
:param timeout: 连接超时时间为None时使用页面对象timeouts.page_load属性值
:return: 目标url是否可用
"""
retry, interval = self._before_connect(url, retry, interval)
retry, interval, is_file = self._before_connect(url, retry, interval)
self._url_available = self._d_connect(self._url, times=retry, interval=interval,
show_errmsg=show_errmsg, timeout=timeout)
return self._url_available
@ -670,6 +668,33 @@ class ChromiumBase(BasePage):
if ele:
self.run_cdp('DOM.removeNode', nodeId=ele._node_id)
def add_ele(self, outerHTML, insert_to, before=None):
"""新建一个元素
:param outerHTML: 新元素的html文本
:param insert_to: 插入到哪个元素中可接收元素对象和定位符为None添加到body
:param before: 在哪个子节点前面插入可接收对象和定位符为None插入到父元素末尾
:return: 元素对象
"""
insert_to = self.ele(insert_to) if insert_to else self.ele('t:body')
args = [outerHTML, insert_to]
if before:
args.append(self.ele(before))
js = '''
ele = document.createElement(null);
arguments[1].insertBefore(ele, arguments[2]);
ele.outerHTML = arguments[0];
return arguments[2].previousElementSibling;
'''
else:
js = '''
ele = document.createElement(null);
arguments[1].appendChild(ele);
ele.outerHTML = arguments[0];
return arguments[1].lastElementChild;
'''
ele = self.run_js(js, *args)
return ele
def get_frame(self, loc_ind_ele, timeout=None):
"""获取页面中一个frame对象
:param loc_ind_ele: 定位符iframe序号ChromiumFrame对象序号从1开始可传入负数获取倒数第几个
@ -722,6 +747,17 @@ class ChromiumBase(BasePage):
frames = self._ele(locator, timeout=timeout, index=None, raise_err=False)
return [i for i in frames if i._type == 'ChromiumFrame']
def upload(self, loc_or_ele, file_paths, by_js=False):
"""触发上传文件选择框并自动填入指定路径
:param loc_or_ele: 被点击后会触发文件选择框的元素或它的定位符
:param file_paths: 文件路径如果上传框支持多文件可传入列表或字符串字符串时多个文件用回车分隔
:param by_js: 是否用js方式点击
:return: None
"""
self.set.upload_files(file_paths)
self.ele(loc_or_ele).click(by_js=by_js)
self.wait.upload_paths_inputted()
def session_storage(self, item=None):
"""返回sessionStorage信息不设置item则获取全部
:param item: 要获取的项不设置则返回全部
@ -923,22 +959,6 @@ class ChromiumBase(BasePage):
pass
return False
def _before_connect(self, url, retry, interval):
"""连接前的准备
:param url: 要访问的url
:param retry: 重试次数
:param interval: 重试间隔
:return: 重试次数和间隔组成的tuple
"""
p = Path(url)
if p.exists():
self._url = str(p.absolute())
else:
self._url = quote(url, safe='-_.~!*\'"();:@&=+$,/\\?#[]%') or 'chrome://newtab/'
retry = retry if retry is not None else self.retry_times
interval = interval if interval is not None else self.retry_interval
return retry, interval
def _d_connect(self, to_url, times=0, interval=1, show_errmsg=False, timeout=None):
"""尝试连接,重试若干次
:param to_url: 要访问的url

View File

@ -167,16 +167,17 @@ class ChromiumBase(BasePage):
@property
def states(self) -> PageStates: ...
def run_js(self, script: str, *args, as_expr: bool = False, timeout: float = None) -> Any: ...
def run_js(self, script: Union[str, Path], *args, as_expr: bool = False, timeout: float = None) -> Any: ...
def run_js_loaded(self, script: str, *args, as_expr: bool = False, timeout: float = None) -> Any: ...
def run_js_loaded(self, script: Union[str, Path], *args, as_expr: bool = False, timeout: float = None) -> Any: ...
def run_async_js(self, script: str, *args, as_expr: bool = False) -> None: ...
def run_async_js(self, script: Union[str, Path], *args, as_expr: bool = False) -> None: ...
def get(self, url: str, show_errmsg: bool = False, retry: int = None,
interval: float = None, timeout: float = None) -> Union[None, bool]: ...
def cookies(self, as_dict: bool = False, all_domains: bool = False, all_info: bool = False) -> Union[list, dict]: ...
def cookies(self, as_dict: bool = False, all_domains: bool = False, all_info: bool = False) -> Union[
list, dict]: ...
def ele(self,
locator: Union[Tuple[str, str], str, ChromiumElement, ChromiumFrame],
@ -213,10 +214,20 @@ class ChromiumBase(BasePage):
def remove_ele(self, loc_or_ele: Union[ChromiumElement, ChromiumFrame, str, Tuple[str, str]]) -> None: ...
def add_ele(self,
outerHTML: str,
insert_to: Optional[ChromiumElement, str, Tuple[str, str]],
before: Optional[ChromiumElement, str, Tuple[str, str]] = None) -> ChromiumElement: ...
def get_frame(self, loc_ind_ele: Union[str, int, tuple, ChromiumFrame], timeout: float = None) -> ChromiumFrame: ...
def get_frames(self, locator: Union[str, tuple] = None, timeout: float = None) -> List[ChromiumFrame]: ...
def upload(self,
loc_or_ele: Union[str, Tuple[str, str], ChromiumElement],
file_paths: Union[str, list, tuple],
by_js: bool = False) -> None: ...
def run_cdp(self, cmd: str, **cmd_args) -> dict: ...
def run_cdp_loaded(self, cmd: str, **cmd_args) -> dict: ...
@ -254,8 +265,6 @@ class ChromiumBase(BasePage):
def _on_alert_open(self, **kwargs): ...
def _before_connect(self, url: str, retry: int, interval: float) -> tuple: ...
def _d_connect(self, to_url: str, times: int = 0, interval: float = 1, show_errmsg: bool = False,
timeout: float = None) -> Union[bool, None]: ...

View File

@ -8,7 +8,7 @@
from pathlib import Path
from re import search, DOTALL
from time import sleep
from urllib.parse import urlparse, quote
from urllib.parse import urlparse
from requests import Session, Response
from requests.structures import CaseInsensitiveDict
@ -154,19 +154,16 @@ class SessionPage(BasePage):
:param kwargs: 连接参数
:return: url是否可用
"""
if isinstance(url, Path):
url = str(url.absolute())
if not url.lower().startswith('http'):
if url.startswith('file:///'):
url = url[8:]
if Path(url).exists():
with open(url, 'rb') as f:
r = Response()
r._content = f.read()
r.status_code = 200
self._response = r
return
return self._s_connect(url, 'get', show_errmsg, retry, interval, **kwargs)
retry, interval, is_file = self._before_connect(url.lstrip('file:///'), retry, interval)
if is_file:
with open(self._url, 'rb') as f:
r = Response()
r._content = f.read()
r.status_code = 200
r.url = self._url
self._response = r
return True
return self._s_connect(self._url, 'get', show_errmsg, retry, interval, **kwargs)
def post(self, url, show_errmsg=False, retry=None, interval=None, **kwargs):
"""用post方式跳转到url
@ -256,18 +253,6 @@ class SessionPage(BasePage):
if self._response is not None:
self._response.close()
def _before_connect(self, url, retry, interval):
"""连接前的准备
:param url: 要访问的url
:param retry: 重试次数
:param interval: 重试间隔
:return: 重试次数和间隔组成的tuple
"""
self._url = quote(url, safe='-_.~!*\'"();:@&=+$,/\\?#[]%')
retry = retry if retry is not None else self.retry_times
interval = interval if interval is not None else self.retry_interval
return retry, interval
def _s_connect(self, url, mode, show_errmsg=False, retry=None, interval=None, **kwargs):
"""执行get或post连接
:param url: 目标url
@ -278,7 +263,7 @@ class SessionPage(BasePage):
:param kwargs: 连接参数
:return: url是否可用
"""
retry, interval = self._before_connect(url, retry, interval)
retry, interval, is_file = self._before_connect(url, retry, interval)
self._response, info = self._make_response(self._url, mode, retry, interval, show_errmsg, **kwargs)
if self._response is None:

View File

@ -152,8 +152,6 @@ class SessionPage(BasePage):
def close(self) -> None: ...
def _before_connect(self, url: str, retry: int, interval: float) -> tuple: ...
def _s_connect(self,
url: str,
mode: str,

View File

@ -273,12 +273,8 @@ class WebPage(SessionPage, ChromiumPage, BasePage):
if copy_cookies:
self.cookies_to_session()
if go:
url = super(SessionPage, self).url
if url.startswith('http'):
r = self.get(url)
if not r:
raise ConnectionError('s模式访问失败请设置go=False自行构造连接参数进行访问。')
if go and not self.get(super(SessionPage, self).url):
raise ConnectionError('s模式访问失败请设置go=False自行构造连接参数进行访问。')
def cookies_to_session(self, copy_user_agent=True):
"""把driver对象的cookies复制到session对象

View File

@ -133,7 +133,7 @@ class Clicker(object):
x, y = offset_scroll(self._ele, offset_x, offset_y)
self._click(x, y, button, count)
def multiple(self, times=2):
def multi(self, times=2):
"""多次点击
:param times: 默认双击
:return: None
@ -159,3 +159,10 @@ class Clicker(object):
def twice(self):
"""双击元素"""
self.at(count=2)
def multiple(self, times=2):
"""多次点击
:param times: 默认双击
:return: None
"""
self.at(count=times)

View File

@ -24,6 +24,6 @@ class Clicker(object):
def at(self, offset_x: float = None, offset_y: float = None, button: str = 'left', count: int = 1) -> None: ...
def multiple(self, times: int = 2) -> None: ...
def multi(self, times: int = 2) -> None: ...
def _click(self, client_x: float, client_y: float, button: str = 'left', count: int = 1) -> None: ...