mirror of
https://gitee.com/g1879/DrissionPage.git
synced 2024-12-10 04:00:23 +08:00
完善url处理
This commit is contained in:
parent
8c02cb99f8
commit
7648c91de8
@ -5,14 +5,15 @@
|
||||
@File : driver_page.py
|
||||
"""
|
||||
from glob import glob
|
||||
from time import sleep
|
||||
from typing import Union, List, Any
|
||||
from urllib import parse
|
||||
from urllib.parse import quote
|
||||
|
||||
from selenium.common.exceptions import NoAlertPresentException
|
||||
from selenium.webdriver.chrome.webdriver import WebDriver
|
||||
from selenium.webdriver.remote.webelement import WebElement
|
||||
|
||||
from .common import get_loc_from_str
|
||||
from .common import get_loc_from_str, clean_folder, avoid_duplicate_name
|
||||
from .config import OptionsManager
|
||||
from .driver_element import DriverElement, execute_driver_find
|
||||
|
||||
@ -59,9 +60,9 @@ class DriverPage(object):
|
||||
"""获取网页title"""
|
||||
return self._driver.title
|
||||
|
||||
def get(self, url: str, params: dict = None, go_anyway: bool = False) -> Union[None, bool]:
|
||||
def get(self, url: str, go_anyway: bool = False) -> Union[None, bool]:
|
||||
"""跳转到url"""
|
||||
to_url = f'{url}?{parse.urlencode(params)}' if params else url
|
||||
to_url = quote(url, safe='/:&?=%;#@')
|
||||
if not url or (not go_anyway and self.url == to_url):
|
||||
return
|
||||
self._url = to_url
|
||||
@ -100,11 +101,7 @@ class DriverPage(object):
|
||||
|
||||
def run_script(self, script: str) -> Any:
|
||||
"""执行js脚本"""
|
||||
ele = self.ele(('css selector', 'html'))
|
||||
try:
|
||||
return ele.run_script(script)
|
||||
except:
|
||||
raise
|
||||
return self.driver.execute_script(script)
|
||||
|
||||
def get_tabs_sum(self) -> int:
|
||||
"""获取标签页数量"""
|
||||
@ -165,16 +162,40 @@ class DriverPage(object):
|
||||
ele = self.ele(loc_or_ele)
|
||||
self.driver.switch_to.frame(ele.inner_ele)
|
||||
|
||||
def screenshot(self, path: str = None, filename: str = None) -> str:
|
||||
def screenshot(self, path: str, filename: str = None) -> str:
|
||||
"""获取网页截图"""
|
||||
ele = self.ele(('css selector', 'html'))
|
||||
path = path or OptionsManager().get_value('paths', 'global_tmp_path')
|
||||
if not path:
|
||||
raise IOError('No path specified.')
|
||||
# tmp_path = OptionsManager().get_value('paths', 'global_tmp_path')
|
||||
# clean_folder(tmp_path)
|
||||
name = filename or self.title
|
||||
img_path = f'{path}\\{name}.png'
|
||||
ele.screenshot(path, name)
|
||||
return img_path
|
||||
name = avoid_duplicate_name(path, f'{name}.png')
|
||||
img_path = f'{path}\\{name}'
|
||||
|
||||
self.driver.save_screenshot(img_path)
|
||||
# TODO: 实现全页截图
|
||||
# self.set_window_size()
|
||||
# self.scroll_to('top')
|
||||
# window_height = self.driver.get_window_size()['height'] # 窗口高度
|
||||
#
|
||||
# page_height = self.driver.execute_script('return document.documentElement.scrollHeight') # 页面高度
|
||||
#
|
||||
# if page_height <= window_height:
|
||||
# self.driver.save_screenshot(img_path)
|
||||
# else:
|
||||
# from PIL import Image
|
||||
# import numpy as np
|
||||
# self.driver.save_screenshot(f'{tmp_path}\\{name}.png')
|
||||
# n = page_height // window_height # 需要滚动的次数
|
||||
# base_mat = np.atleast_2d(Image.open(f'{tmp_path}\\{name}.png')) # 打开截图并转为二维矩阵
|
||||
#
|
||||
# for i in range(n):
|
||||
# self.driver.execute_script(f'document.documentElement.scrollTop={window_height * (i + 1)};')
|
||||
# sleep(.5)
|
||||
# self.driver.save_screenshot(f'{tmp_path}\\{name}_{i}.png') # 保存截图
|
||||
# mat = np.atleast_2d(Image.open(f'{tmp_path}\\{name}_{i}.png')) # 打开截图并转为二维矩阵
|
||||
# base_mat = np.append(base_mat, mat, axis=0) # 拼接图片的二维矩阵
|
||||
# Image.fromarray(base_mat).save(img_path)
|
||||
# clean_folder(tmp_path)
|
||||
return name
|
||||
|
||||
def scroll_to_see(self, loc_or_ele: Union[str, tuple, WebElement, DriverElement]) -> None:
|
||||
"""滚动直到元素可见"""
|
||||
|
@ -6,6 +6,7 @@
|
||||
"""
|
||||
from typing import Union, List
|
||||
from urllib import parse
|
||||
from urllib.parse import quote
|
||||
|
||||
from requests import Response
|
||||
from requests_html import HTMLSession
|
||||
@ -179,21 +180,21 @@ class MixPage(Null, SessionPage, DriverPage):
|
||||
|
||||
# ----------------以下为共用函数-----------------------
|
||||
|
||||
def get(self, url: str, params: dict = None, go_anyway=False, **kwargs) -> Union[bool, None]:
|
||||
def get(self, url: str, go_anyway=False, **kwargs) -> Union[bool, None]:
|
||||
"""跳转到一个url,跳转前先同步cookies,跳转后判断目标url是否可用"""
|
||||
to_url = f'{url}?{parse.urlencode(params)}' if params else url
|
||||
if not url or (not go_anyway and self.url == to_url):
|
||||
return
|
||||
# to_url = quote(url, safe='/:&?=%;#@')
|
||||
# if not url or (not go_anyway and self.url == to_url):
|
||||
# return
|
||||
if self._mode == 'd':
|
||||
super(SessionPage, self).get(url=to_url, go_anyway=go_anyway)
|
||||
if super(SessionPage, self).get(url=url, go_anyway=go_anyway) is None:
|
||||
return
|
||||
if self.session_url == self.url:
|
||||
self._url_available = True if self._response and self._response.ok else False
|
||||
else:
|
||||
self._url_available = self.check_page()
|
||||
return self._url_available
|
||||
elif self._mode == 's':
|
||||
super().get(url=to_url, go_anyway=go_anyway, **kwargs)
|
||||
return self._url_available
|
||||
return None if super().get(url=url, go_anyway=go_anyway, **kwargs) is None else self._url_available
|
||||
|
||||
def ele(self, loc_or_ele: Union[tuple, str, DriverElement, SessionElement], mode: str = None, timeout: float = None,
|
||||
show_errmsg: bool = False) -> Union[DriverElement, SessionElement]:
|
||||
|
@ -10,15 +10,13 @@ from pathlib import Path
|
||||
from random import random
|
||||
from time import time
|
||||
from typing import Union, List
|
||||
from urllib import parse
|
||||
from urllib.parse import urlparse
|
||||
from urllib.parse import urlparse, quote
|
||||
|
||||
from requests_html import HTMLSession, HTMLResponse
|
||||
|
||||
from .common import get_loc_from_str, translate_loc_to_xpath, avoid_duplicate_name
|
||||
from .config import OptionsManager
|
||||
from .session_element import SessionElement, execute_session_find
|
||||
from html import unescape
|
||||
|
||||
|
||||
class SessionPage(object):
|
||||
@ -87,9 +85,9 @@ class SessionPage(object):
|
||||
"""查找符合条件的所有元素"""
|
||||
return self.ele(loc, mode='all', show_errmsg=True)
|
||||
|
||||
def get(self, url: str, params: dict = None, go_anyway: bool = False, **kwargs) -> Union[bool, None]:
|
||||
def get(self, url: str, go_anyway: bool = False, **kwargs) -> Union[bool, None]:
|
||||
"""用get方式跳转到url,调用_make_response()函数生成response对象"""
|
||||
to_url = f'{url}?{parse.urlencode(params)}' if params else url
|
||||
to_url = quote(url, safe='/:&?=%;#@')
|
||||
if not url or (not go_anyway and self.url == to_url):
|
||||
return
|
||||
self._url = url
|
||||
@ -99,10 +97,10 @@ class SessionPage(object):
|
||||
self._url_available = True if self._response and self._response.ok else False
|
||||
return self._url_available
|
||||
|
||||
def post(self, url: str, params: dict = None, data: dict = None, go_anyway: bool = False, **kwargs) \
|
||||
def post(self, url: str, data: dict = None, go_anyway: bool = False, **kwargs) \
|
||||
-> Union[bool, None]:
|
||||
"""用post方式跳转到url,调用_make_response()函数生成response对象"""
|
||||
to_url = f'{url}?{parse.urlencode(params)}' if params else url
|
||||
to_url = quote(url, safe='/:&?=%;#@')
|
||||
if not url or (not go_anyway and self._url == to_url):
|
||||
return
|
||||
self._url = url
|
||||
|
Loading…
x
Reference in New Issue
Block a user