driver_page:增加获取、设置sessionStorage、localStorage方法,新增清空缓存方法;run_cdp()的cmd_args参数改为**cmd_args。

关闭driver时会主动关闭chromedriver进程。
优化关闭浏览器进程逻辑
This commit is contained in:
g1879 2022-05-28 18:24:23 +08:00
parent c2f13213d7
commit 575231f06a
4 changed files with 186 additions and 25 deletions

View File

@ -356,22 +356,36 @@ def get_exe_path_from_port(port: Union[str, int]) -> Union[str, None]:
:return: 可执行文件的绝对路径
"""
from os import popen
from time import perf_counter
process = popen(f'netstat -ano |findstr {port}').read().split('\n')[0]
t = perf_counter()
while not process and perf_counter() - t < 10:
process = popen(f'netstat -ano |findstr {port}').read().split('\n')[0]
processid = process.split(' ')[-1]
if not processid:
pid = get_pid_from_port(port)
if not pid:
return
else:
file_lst = popen(f'wmic process where processid={processid} get executablepath').read().split('\n')
file_lst = popen(f'wmic process where processid={pid} get executablepath').read().split('\n')
return file_lst[2].strip() if len(file_lst) > 2 else None
def get_pid_from_port(port: Union[str, int]) -> Union[str, None]:
"""获取端口号第一条进程的pid \n
:param port: 端口号
:return: 进程id
"""
from os import popen
from platform import system
from time import perf_counter
if system().lower() != 'windows' or port is None:
return
process = popen(f'netstat -ano |findstr {port}').read().split('\n')[0]
t = perf_counter()
while not process and perf_counter() - t < 5:
process = popen(f'netstat -ano |findstr {port}').read().split('\n')[0]
return process.split(' ')[-1] or None
def get_usable_path(path: Union[str, Path]) -> Path:
"""检查文件或文件夹是否有重名,并返回可以使用的路径 \n
:param path: 文件或文件夹路径

View File

@ -17,6 +17,7 @@ from selenium.webdriver.chrome.webdriver import WebDriver
from selenium.webdriver.remote.webdriver import WebDriver as RemoteWebDriver
from tldextract import extract
from .common import get_pid_from_port
from .config import _session_options_to_dict, SessionOptions, DriverOptions, _cookies_to_tuple
@ -188,19 +189,8 @@ class Drission(object):
def kill_browser(self) -> None:
"""关闭浏览器进程(如果可以)"""
if self.debugger_progress:
self.debugger_progress.kill()
return
pid = self.get_browser_progress_id()
from os import popen
from platform import system
if pid and system().lower() == 'windows' \
and popen(f'tasklist | findstr {pid}').read().lower().startswith('chrome.exe'):
popen(f'taskkill /pid {pid} /F')
else:
if not _kill_progress(pid):
self._driver.quit()
def get_browser_progress_id(self) -> Union[str, None]:
@ -369,6 +359,8 @@ class Drission(object):
def close_driver(self, kill: bool = False) -> None:
"""关闭driver和浏览器"""
if self._driver:
_kill_progress(port=self._driver.service.port) # 关闭chromedriver.exe进程
if kill:
self.kill_browser()
else:
@ -455,7 +447,7 @@ def _create_chrome(chrome_path: str, port: str, args: list, proxy: dict) -> tupl
# ----------创建浏览器进程----------
try:
debugger = Popen(f'"{chrome_path}" --remote-debugging-port={port} {args}', shell=False)
debugger = Popen(f'{chrome_path} --remote-debugging-port={port} {args}', shell=False)
if chrome_path == 'chrome.exe':
from .common import get_exe_path_from_port
@ -529,3 +521,25 @@ def _get_chrome_hwnds_from_pid(pid) -> list:
hwnds = []
EnumWindows(callback, hwnds)
return hwnds
def _kill_progress(pid: str = None, port: int = None) -> bool:
"""获取端口号第一条进程的pid \n
:param pid: 进程id
:param port: 端口号如没有进程id从端口号获取
:return: 是否成功
"""
from os import popen
from platform import system
if system().lower() != 'windows':
return False
pid = pid or get_pid_from_port(port)
if not pid:
return False
if popen(f'tasklist | findstr {pid}').read().lower().startswith('chrome.exe'):
popen(f'taskkill /pid {pid} /F')
return True
else:
return False

View File

@ -321,7 +321,7 @@ class DriverPage(BasePage):
"""
return self.driver.execute_async_script(script, *args)
def run_cdp(self, cmd: str, cmd_args: dict) -> Any:
def run_cdp(self, cmd: str, **cmd_args) -> Any:
"""执行Chrome DevTools Protocol语句
:param cmd: 协议项目
:param cmd_args: 参数
@ -387,6 +387,61 @@ class DriverPage(BasePage):
"""
self.driver.execute_cdp_cmd("Network.setUserAgentOverride", {"userAgent": ua})
def get_session_storage(self, item: str = None) -> Union[str, dict, None]:
"""获取sessionStorage信息不设置item则获取全部 \n
:param item: 要获取的项不设置则返回全部
:return: sessionStorage一个或所有项内容
"""
js = f'return sessionStorage.getItem("{item}");' if item else 'return sessionStorage;'
return self.run_script(js)
def get_local_storage(self, item: str = None) -> Union[str, dict, None]:
"""获取localStorage信息不设置item则获取全部 \n
:param item: 要获取的项目不设置则返回全部
:return: localStorage一个或所有项内容
"""
js = f'return localStorage.getItem("{item}");' if item else 'return localStorage;'
return self.run_script(js)
def set_session_storage(self, item: str, value: Union[str, bool]) -> None:
"""设置或删除某项sessionStorage信息 \n
:param item: 要设置的项
:param value: 项的值设置为False时删除该项
:return: None
"""
s = f'sessionStorage.removeItem("{item}");' if item is False else f'sessionStorage.setItem("{item}","{value}");'
self.run_script(s)
def set_local_storage(self, item: str, value: Union[str, bool]) -> None:
"""设置或删除某项localStorage信息 \n
:param item: 要设置的项
:param value: 项的值设置为False时删除该项
:return: None
"""
s = f'localStorage.removeItem("{item}");' if item is False else f'localStorage.setItem("{item}","{value}");'
self.run_script(s)
def clean_cache(self,
session_storage: bool = True,
local_storage: bool = True,
cache: bool = True,
cookies: bool = True) -> None:
"""清除缓存,可选要清除的项 \n
:param session_storage: 是否清除sessionStorage
:param local_storage: 是否清除localStorage
:param cache: 是否清除cache
:param cookies: 是否清除cookies
:return: None
"""
if session_storage:
self.run_script('sessionStorage.clear();')
if local_storage:
self.run_script('localStorage.clear();')
if cache:
self.run_cdp('Network.clearBrowserCache')
if cookies:
self.run_cdp('Network.clearBrowserCookies')
def screenshot(self, path: str, filename: str = None) -> str:
"""截取页面可见范围截图 \n
:param path: 保存路径
@ -416,7 +471,7 @@ class DriverPage(BasePage):
def stop_loading(self) -> None:
"""强制停止页面加载"""
self.run_cdp('Page.stopLoading', {})
self.run_cdp('Page.stopLoading')
def back(self) -> None:
"""在浏览历史中后退一步"""

View File

@ -202,6 +202,24 @@ page.run_script('alert(arguments[0]+arguments[1])', 'Hello', ' world!')
返回:脚本执行结果
## run_cdp()
此方法用于执行 Chrome DevTools Protocol 语句。cdp
用法详见[Chrome DevTools Protocol](https://chromedevtools.github.io/devtools-protocol/)。
参数:
- cmd协议项目方法
- **cmd_args项目参数
返回:该语句返回的值
```python
# 停止页面加载
page.run_cdp('Page.stopLoading')
```
## set_timeouts()
此方法用于设置三种超时时间selenium 4 以上版本生效。
@ -224,6 +242,66 @@ page.run_script('alert(arguments[0]+arguments[1])', 'Hello', ' world!')
返回None
## get_session_storage()
此方法用于获取 sessionStorage 信息,可获取全部或单个项。
参数:
- item要获取的项不设置则返回全部
返回sessionStorage 一个或所有项内容
## get_local_storage()
此方法用于获取 localStorage 信息,可获取全部或单个项。
参数:
- item要获取的项不设置则返回全部
返回localStorage 一个或所有项内容
## set_session_storage()
此方法用于设置或删除某项 sessionStorage 信息。
参数:
- item要设置的项
- value项的值设置为False时删除该项
返回:`None`
## set_local_storage()
此方法用于设置或删除某项 localStorage 信息。
参数:
- item要设置的项
- value项的值设置为False时删除该项
返回:`None`
## clean_cache()
此方法用于清除缓存,可选要清除的项。
参数:
- session_storage是否清除 sessionstorage
- local_storage是否清除 localStorage
- cache是否清除 cache
- cookies是否清除 cookies
返回:`None`
## to_frame
此属性用于将页面焦点移到某个`frame``iframe`