开始重写底层,放弃依赖selenium

This commit is contained in:
g1879 2022-10-28 00:40:24 +08:00
parent 661604997b
commit 35f19aa174
6 changed files with 427 additions and 36 deletions

View File

@ -0,0 +1,46 @@
# -*- coding:utf-8 -*-
# 问题跨iframe查找元素可能出现同名元素如何解决
# 须用DOM.documentUpdated检测元素有效性
class ChromeElement(object):
def __init__(self, page, node_id: str = None, obj_id: str = None):
self.page = page
if not node_id and not obj_id:
raise TypeError('node_id或obj_id必须传入一个')
if node_id:
self._node_id = node_id
self._obj_id = self._get_obj_id(node_id)
else:
self._node_id = self._get_node_id(obj_id)
self._obj_id = obj_id
@property
def html(self):
return self.page.driver.DOM.getOuterHTML(nodeId=self._node_id)['outerHTML']
def ele(self, xpath: str):
# todo: 引号记得转码
js = f'''function(){{
frame=this.contentDocument;
return document.evaluate("{xpath}", frame, null, 9, null).singleNodeValue;
}}'''
r = self.page.driver.Runtime.callFunctionOn(functionDeclaration=js,
objectId=self._obj_id)['result'].get('objectId', None)
return r if not r else _ele(self.page, obj_id=r)
def click(self, by_js: bool = True):
if by_js:
js = 'function(){this.click();}'
self.page.driver.Runtime.callFunctionOn(functionDeclaration=js, objectId=self._obj_id)
def _get_obj_id(self, node_id):
return self.page.driver.DOM.resolveNode(nodeId=node_id)['object']['objectId']
def _get_node_id(self, obj_id):
return self.page.driver.DOM.requestNode(objectId=obj_id)['nodeId']
def _ele(page, node_id=None, obj_id=None) -> ChromeElement:
return ChromeElement(page=page, node_id=node_id, obj_id=obj_id)

318
DrissionPage/chrome_page.py Normal file
View File

@ -0,0 +1,318 @@
# -*- coding:utf-8 -*-
from time import perf_counter, sleep
from typing import Union, Tuple
from pychrome import Tab
from requests import get as requests_get
from json import loads
from .base import BasePage
from .common import get_loc
from .drission import connect_chrome
from .chrome_element import ChromeElement
class ChromePage(BasePage):
def __init__(self, address: str,
path: str = 'chrome',
tab_handle: str = None,
timeout: float = 10):
super().__init__(timeout)
self.debugger_address = address[7:] if address.startswith('http://') else address
connect_chrome(path, self.debugger_address)
tab_handle = self.tab_handles[0] if not tab_handle else tab_handle
self._connect_debugger(tab_handle)
def _connect_debugger(self, tab_handle: str):
self.driver = Tab(id=tab_handle, type='page',
webSocketDebuggerUrl=f'ws://{self.debugger_address}/devtools/page/{tab_handle}')
self.driver.start()
self.driver.DOM.enable()
self.driver.DOM.getDocument()
@property
def url(self) -> str:
"""返回当前页面url"""
# todo: 是否有更好的方法?
json = loads(requests_get(f'http://{self.debugger_address}/json').text)
return [i['url'] for i in json if i['id'] == self.driver.id][0]
@property
def html(self) -> str:
"""返回当前页面html文本"""
node_id = self.driver.DOM.getDocument()['root']['nodeId']
return self.driver.DOM.getOuterHTML(nodeId=node_id)['outerHTML']
@property
def json(self) -> dict:
"""当返回内容是json格式时返回对应的字典"""
return loads(self('t:pre').text)
@property
def tab_handles(self) -> list:
"""返回所有标签页id"""
json = loads(requests_get(f'http://{self.debugger_address}/json').text)
return [i['id'] for i in json if i['type'] == 'page']
@property
def current_tab_handle(self) -> str:
"""返回当前标签页handle"""
return self.driver.id
@property
def current_tab_index(self) -> int:
"""返回当前标签页序号"""
return self.tab_handles.index(self.current_tab_handle)
@property
def ready_state(self) -> str:
"""返回当前页面加载状态,"""
return self.driver.Runtime.evaluate(expression='document.readyState;')['result']['value']
def get(self,
url: str,
show_errmsg: bool = False,
retry: int = None,
interval: float = None,
timeout: float = None) -> Union[None, bool]:
"""访问url \n
:param url: 目标url
:param show_errmsg: 是否显示和抛出异常
:param retry: 重试次数
:param interval: 重试间隔
:param timeout: 连接超时时间
:return: 目标url是否可用返回None表示不确定
"""
retry, interval = self._before_connect(url, retry, interval)
self._url_available = self._d_connect(self._url,
times=retry,
interval=interval,
show_errmsg=show_errmsg,
timeout=timeout)
return self._url_available
def get_cookies(self, as_dict: bool = False):
return self.driver.Network.getCookies()
def ele(self, loc_or_ele: Union[Tuple[str, str], str, ChromeElement], timeout: float = None):
return self._ele(loc_or_ele, timeout=timeout)
def eles(self, loc_or_ele: Union[Tuple[str, str], str, ChromeElement], timeout: float = None):
return self._ele(loc_or_ele, timeout=timeout, single=False)
def _ele(self,
loc_or_ele: Union[Tuple[str, str], str, ChromeElement],
timeout: float = None,
single: bool = True):
if isinstance(loc_or_ele, (str, tuple)):
loc = get_loc(loc_or_ele)[1]
elif isinstance(loc_or_ele, ChromeElement):
return loc_or_ele
else:
raise ValueError('loc_or_str参数只能是tuple、str、ChromeElement类型。')
timeout = timeout if timeout is not None else self.timeout
search = self.driver.DOM.performSearch(query=loc)
count = search['resultCount']
t1 = perf_counter()
while count == 0 and perf_counter() - t1 < timeout:
search = self.driver.DOM.performSearch(query=loc)
count = search['resultCount']
if count == 0:
return None
else:
count = 1 if single else count
nodeIds = self.driver.DOM.getSearchResults(searchId=search['searchId'], fromIndex=0, toIndex=count)
if count == 1:
return ChromeElement(self, node_id=nodeIds['nodeIds'][0])
else:
return [ChromeElement(self, node_id=i) for i in nodeIds['nodeIds']]
def refresh(self, ignore_cache: bool = False) -> None:
"""刷新当前页面 \n
:param ignore_cache: 是否忽略缓存
:return: None
"""
self.driver.Page.reload(ignoreCache=ignore_cache)
def forward(self, steps: int = 1) -> None:
"""在浏览历史中前进若干步 \n
:param steps: 次数
:return: None
"""
self.driver.Runtime.evaluate(expression=f'window.history.go({steps});')
def back(self, steps: int = 1) -> None:
"""在浏览历史中后退若干步 \n
:param steps: 次数
:return: None
"""
self.driver.Runtime.evaluate(expression=f'window.history.go({-steps});')
def stop_loading(self) -> None:
self.driver.Page.stopLoading()
def run_cdp(self, cmd: str, **cmd_args):
"""执行Chrome DevTools Protocol语句 \n
:param cmd: 协议项目
:param cmd_args: 参数
:return: 执行的结果
"""
return self.driver.call_method(cmd, **cmd_args)
def create_tab(self, url: str = None) -> None:
"""新建并定位到一个标签页,该标签页在最后面 \n
:param url: 新标签页跳转到的网址
:return: None
"""
url = f'?{url}' if url else ''
requests_get(f'http://{self.debugger_address}/json/new{url}')
def to_tab(self, num_or_handle: Union[int, str] = 0, activate: bool = True) -> None:
"""跳转到标签页 \n
注意当程序使用的是接管的浏览器获取到的 handle 顺序和视觉效果不一致 \n
:param num_or_handle: 标签页序号或handle字符串序号第一个为0最后为-1
:param activate: 切换后是否变为活动状态
:return: None
"""
try:
tab = int(num_or_handle)
except (ValueError, TypeError):
tab = num_or_handle
if not self.tab_handles:
return
tab = self.tab_handles[tab] if isinstance(tab, int) else tab
self.driver.stop()
self._connect_debugger(tab)
if activate:
requests_get(f'http://{self.debugger_address}/json/activate/{tab}')
def close_tabs(self, num_or_handles: Union[int, str, list, tuple, set] = None, others: bool = False) -> None:
"""关闭传入的标签页,默认关闭当前页。可传入多个 \n
注意当程序使用的是接管的浏览器获取到的 handle 顺序和视觉效果不一致不能按序号关闭 \n
:param num_or_handles:要关闭的标签页序号或handle可传入handle和序号组成的列表或元组为None时关闭当前页
:param others: 是否关闭指定标签页之外的
:return: None
"""
if others:
all_tabs = self.tab_handles
reserve_tabs = {self.current_tab_handle} if num_or_handles is None else _get_tabs(all_tabs, num_or_handles)
tabs = set(all_tabs) - reserve_tabs
else:
tabs = (self.current_tab_handle,) if num_or_handles is None else _get_tabs(self.tab_handles, num_or_handles)
tabs_len = len(tabs)
all_len = len(self.tab_handles)
if tabs_len > all_len:
raise ValueError('要关闭的页面数量不能大于总数量。')
is_alive = True
if tabs_len == all_len:
self.driver.stop()
is_alive = False
for tab in tabs:
requests_get(f'http://{self.debugger_address}/json/close/{tab}')
if is_alive:
self.to_tab(0)
def close_other_tabs(self, num_or_handles: Union[int, str, list, tuple] = None) -> None:
"""关闭传入的标签页以外标签页,默认保留当前页。可传入多个 \n
注意当程序使用的是接管的浏览器获取到的 handle 顺序和视觉效果不一致不能按序号关闭 \n
:param num_or_handles: 要保留的标签页序号或handle可传入handle和序号组成的列表或元组为None时保存当前页
:return: None
"""
self.close_tabs(num_or_handles, True)
def clean_cache(self,
session_storage: bool = True,
local_storage: bool = True,
cache: bool = True,
cookies: bool = True) -> None:
"""清除缓存,可选要清除的项 \n
:param session_storage: 是否清除sessionStorage
:param local_storage: 是否清除localStorage
:param cache: 是否清除cache
:param cookies: 是否清除cookies
:return: None
"""
if session_storage:
self.driver.Runtime.evaluate(expression='sessionStorage.clear();')
if local_storage:
self.driver.Runtime.evaluate(expression='localStorage.clear();')
if cache:
self.driver.Network.clearBrowserCache()
if cookies:
self.driver.Network.clearBrowserCookies()
def _d_connect(self,
to_url: str,
times: int = 0,
interval: float = 1,
show_errmsg: bool = False,
timeout: float = None) -> Union[bool, None]:
"""尝试连接,重试若干次 \n
:param to_url: 要访问的url
:param times: 重试次数
:param interval: 重试间隔
:param show_errmsg: 是否抛出异常
:return: 是否成功返回None表示不确定
"""
err = None
is_ok = False
timeout = timeout if timeout is not None else self.timeout
for _ in range(times + 1):
try:
result = self.driver.Page.navigate(url=to_url)
t1 = perf_counter()
while self.ready_state != 'complete' and perf_counter() - t1 < timeout:
sleep(.5)
if self.ready_state != 'complete':
raise TimeoutError
if 'errorText' in result:
raise ConnectionError(result['errorText'])
go_ok = True
except Exception as e:
err = e
go_ok = False
is_ok = self.check_page() if go_ok else False
if is_ok is not False:
break
if _ < times:
sleep(interval)
if show_errmsg:
print(f'重试 {to_url}')
if is_ok is False and show_errmsg:
raise err if err is not None else ConnectionError('连接异常。')
return is_ok
def check_page(self):
pass
def _get_tabs(handles: list, num_or_handles: Union[int, str, list, tuple, set]) -> set:
"""返回指定标签页handle组成的set \n
:param handles: handles列表
:param num_or_handles: 指定的标签页可以是多个
:return: 指定标签页组成的set
"""
if isinstance(num_or_handles, (int, str)):
num_or_handles = (num_or_handles,)
elif not isinstance(num_or_handles, (list, tuple, set)):
raise TypeError('num_or_handle参数只能是int、str、list、set 或 tuple类型。')
return set(i if isinstance(i, str) else handles[i] for i in num_or_handles)

View File

@ -4,10 +4,12 @@
@Contact : g1879@qq.com
@File : drission.py
"""
from subprocess import Popen
from sys import exit
from typing import Union
from requests import Session
from platform import system
from requests import Session, get as requests_get
from requests.cookies import RequestsCookieJar
from requests.structures import CaseInsensitiveDict
from selenium import webdriver
@ -17,7 +19,7 @@ from selenium.webdriver.chrome.webdriver import WebDriver
from selenium.webdriver.remote.webdriver import WebDriver as RemoteWebDriver
from tldextract import extract
from .common import get_pid_from_port
from .common import get_pid_from_port, get_exe_path_from_port
from .config import _session_options_to_dict, SessionOptions, DriverOptions, _cookies_to_tuple
@ -108,12 +110,9 @@ class Drission(object):
chrome_path = self.driver_options.binary_location or 'chrome.exe'
# -----------若指定debug端口且该端口未在使用中则先启动浏览器进程-----------
if self.driver_options.debugger_address and _check_port(self.driver_options.debugger_address) is False:
from subprocess import Popen
port = self.driver_options.debugger_address.split(':')[-1]
if self.driver_options.debugger_address:
# 启动浏览器进程,同时返回该进程使用的 chrome.exe 路径
chrome_path, self._debugger = _create_chrome(chrome_path, port,
chrome_path, self._debugger = connect_chrome(chrome_path, self.driver_options.debugger_address,
self.driver_options.arguments, self._proxy)
# -----------创建WebDriver对象-----------
@ -130,10 +129,6 @@ class Drission(object):
except Exception:
pass
# self._driver.execute_cdp_cmd(
# 'Page.addScriptToEvaluateOnNewDocument',
# {'source': 'Object.defineProperty(navigator,"webdriver",{get:() => Chrome,});'})
return self._driver
@property
@ -225,7 +220,6 @@ class Drission(object):
self._show_or_hide_browser(False)
def _show_or_hide_browser(self, hide: bool = True) -> None:
from platform import system
if system().lower() != 'windows':
raise OSError('该方法只能在Windows系统使用。')
@ -395,18 +389,13 @@ def user_agent_to_session(driver: RemoteWebDriver, session: Session) -> None:
session.headers.update({"User-Agent": selenium_user_agent})
def _check_port(debugger_address: str) -> Union[bool, None]:
"""检查端口是否被占用 \n
:param debugger_address: 浏览器地址及端口
def _port_is_using(ip: str, port: str) -> Union[bool, None]:
"""检查端口是否被占用 \n
:param ip: 浏览器地址
:param port: 浏览器端口
:return: bool
"""
import socket
ip, port = debugger_address.split(':')
if ip not in ('127.0.0.1', 'localhost'):
return
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
@ -420,16 +409,25 @@ def _check_port(debugger_address: str) -> Union[bool, None]:
s.close()
def _create_chrome(chrome_path: str, port: str, args: list, proxy: dict) -> tuple:
"""创建 chrome 进程 \n
def connect_chrome(chrome_path: str, debugger_address: str, args: list = None, proxy: dict = None) -> tuple:
"""连接或启动chrome \n
:param chrome_path: chrome.exe 路径
:param port: 进程运行的端口号
:param debugger_address: 进程运行的ip和端口号
:param args: chrome 配置参数
:return: chrome.exe 路径和进程对象组成的元组
:param proxy: 代理配置
:return: chrome 路径和进程对象组成的元组
"""
from subprocess import Popen
debugger_address = debugger_address[7:] if debugger_address.startswith('http://') else debugger_address
ip, port = debugger_address.split(':')
if ip not in ('127.0.0.1', 'localhost'):
return None, None
if _port_is_using(ip, port):
chrome_path = get_exe_path_from_port(port) if chrome_path == 'chrome.exe' else chrome_path
return chrome_path, None
# ----------为路径加上双引号,避免路径中的空格产生异常----------
args = [] if args is None else args
args1 = []
for arg in args:
if arg.startswith(('--user-data-dir', '--disk-cache-dir')):
@ -440,17 +438,15 @@ def _create_chrome(chrome_path: str, port: str, args: list, proxy: dict) -> tupl
else:
args1.append(arg)
args = ' '.join(set(args1))
args = set(args1)
if proxy:
args = f'{args} --proxy-server={proxy["http"]}'
args.add(f'--proxy-server={proxy["http"]}')
# ----------创建浏览器进程----------
try:
debugger = Popen(f'"{chrome_path}" --remote-debugging-port={port} {args}', shell=False)
debugger = _run_browser(port, chrome_path, args)
if chrome_path == 'chrome.exe':
from .common import get_exe_path_from_port
chrome_path = get_exe_path_from_port(port)
# 传入的路径找不到主动在ini文件、注册表、系统变量中找
@ -461,11 +457,38 @@ def _create_chrome(chrome_path: str, port: str, args: list, proxy: dict) -> tupl
if not chrome_path:
raise FileNotFoundError('无法找到chrome.exe路径请手动配置。')
debugger = Popen(f'"{chrome_path}" --remote-debugging-port={port} {args}', shell=False)
debugger = _run_browser(port, chrome_path, args)
return chrome_path, debugger
def _run_browser(port, path: str, args: set) -> Popen:
"""创建chrome进程 \n
:param port: 端口号
:param path: 浏览器地址
:param args: 启动参数
:return: 进程对象
"""
sys = system().lower()
if sys == 'windows':
args = ' '.join(args)
debugger = Popen(f'"{path}" --remote-debugging-port={port} {args}', shell=False)
elif sys == 'linux':
arguments = [path, f'--remote-debugging-port={port}'] + list(args)
debugger = Popen(arguments, shell=False)
else:
raise OSError('只支持Windows和Linux系统。')
while True:
try:
requests_get(f'http://127.0.0.1:{port}/json')
break
except ConnectionError:
pass
return debugger
def _create_driver(chrome_path: str, driver_path: str, options: Options) -> WebDriver:
"""创建 WebDriver 对象 \n
:param chrome_path: chrome.exe 路径
@ -530,7 +553,6 @@ def _kill_progress(pid: str = None, port: int = None) -> bool:
:return: 是否成功
"""
from os import popen
from platform import system
if system().lower() != 'windows':
return False

View File

@ -270,6 +270,10 @@ def _get_chrome_path(ini_path: str = None,
print('ini文件中', end='')
return str(path)
from platform import system
if system().lower() != 'windows':
return None
# -----------从注册表中获取--------------
if from_regedit:
import winreg

View File

@ -103,7 +103,7 @@ page = MixPage('s')
session = page.session
# 以 head 方式发送请求
response = session.head('https://www.baidu.com')
print(r.headers)
print(response.headers)
```
输出:

View File

@ -1,7 +1,8 @@
selenium>=4.1
selenium
requests
tldextract
lxml
cssselect
DownloadKit
FlowViewer
pychrome