diff --git a/DrissionPage/common.py b/DrissionPage/common.py index 32d7686..2727456 100644 --- a/DrissionPage/common.py +++ b/DrissionPage/common.py @@ -356,22 +356,36 @@ def get_exe_path_from_port(port: Union[str, int]) -> Union[str, None]: :return: 可执行文件的绝对路径 """ from os import popen - from time import perf_counter - process = popen(f'netstat -ano |findstr {port}').read().split('\n')[0] - t = perf_counter() - while not process and perf_counter() - t < 10: - process = popen(f'netstat -ano |findstr {port}').read().split('\n')[0] - - processid = process.split(' ')[-1] - - if not processid: + pid = get_pid_from_port(port) + if not pid: return else: - file_lst = popen(f'wmic process where processid={processid} get executablepath').read().split('\n') + file_lst = popen(f'wmic process where processid={pid} get executablepath').read().split('\n') return file_lst[2].strip() if len(file_lst) > 2 else None +def get_pid_from_port(port: Union[str, int]) -> Union[str, None]: + """获取端口号第一条进程的pid \n + :param port: 端口号 + :return: 进程id + """ + from os import popen + from platform import system + from time import perf_counter + + if system().lower() != 'windows' or port is None: + return + + process = popen(f'netstat -ano |findstr {port}').read().split('\n')[0] + + t = perf_counter() + while not process and perf_counter() - t < 5: + process = popen(f'netstat -ano |findstr {port}').read().split('\n')[0] + + return process.split(' ')[-1] or None + + def get_usable_path(path: Union[str, Path]) -> Path: """检查文件或文件夹是否有重名,并返回可以使用的路径 \n :param path: 文件或文件夹路径 diff --git a/DrissionPage/drission.py b/DrissionPage/drission.py index 1583c40..80764d9 100644 --- a/DrissionPage/drission.py +++ b/DrissionPage/drission.py @@ -17,6 +17,7 @@ from selenium.webdriver.chrome.webdriver import WebDriver from selenium.webdriver.remote.webdriver import WebDriver as RemoteWebDriver from tldextract import extract +from .common import get_pid_from_port from .config import _session_options_to_dict, SessionOptions, DriverOptions, _cookies_to_tuple @@ -188,19 +189,8 @@ class Drission(object): def kill_browser(self) -> None: """关闭浏览器进程(如果可以)""" - if self.debugger_progress: - self.debugger_progress.kill() - return - pid = self.get_browser_progress_id() - from os import popen - from platform import system - - if pid and system().lower() == 'windows' \ - and popen(f'tasklist | findstr {pid}').read().lower().startswith('chrome.exe'): - popen(f'taskkill /pid {pid} /F') - - else: + if not _kill_progress(pid): self._driver.quit() def get_browser_progress_id(self) -> Union[str, None]: @@ -369,6 +359,8 @@ class Drission(object): def close_driver(self, kill: bool = False) -> None: """关闭driver和浏览器""" if self._driver: + _kill_progress(port=self._driver.service.port) # 关闭chromedriver.exe进程 + if kill: self.kill_browser() else: @@ -455,7 +447,7 @@ def _create_chrome(chrome_path: str, port: str, args: list, proxy: dict) -> tupl # ----------创建浏览器进程---------- try: - debugger = Popen(f'"{chrome_path}" --remote-debugging-port={port} {args}', shell=False) + debugger = Popen(f'{chrome_path} --remote-debugging-port={port} {args}', shell=False) if chrome_path == 'chrome.exe': from .common import get_exe_path_from_port @@ -529,3 +521,25 @@ def _get_chrome_hwnds_from_pid(pid) -> list: hwnds = [] EnumWindows(callback, hwnds) return hwnds + + +def _kill_progress(pid: str = None, port: int = None) -> bool: + """获取端口号第一条进程的pid \n + :param pid: 进程id + :param port: 端口号,如没有进程id,从端口号获取 + :return: 是否成功 + """ + from os import popen + from platform import system + if system().lower() != 'windows': + return False + + pid = pid or get_pid_from_port(port) + if not pid: + return False + + if popen(f'tasklist | findstr {pid}').read().lower().startswith('chrome.exe'): + popen(f'taskkill /pid {pid} /F') + return True + else: + return False diff --git a/DrissionPage/driver_page.py b/DrissionPage/driver_page.py index f7176d8..7fbed73 100644 --- a/DrissionPage/driver_page.py +++ b/DrissionPage/driver_page.py @@ -321,7 +321,7 @@ class DriverPage(BasePage): """ return self.driver.execute_async_script(script, *args) - def run_cdp(self, cmd: str, cmd_args: dict) -> Any: + def run_cdp(self, cmd: str, **cmd_args) -> Any: """执行Chrome DevTools Protocol语句 :param cmd: 协议项目 :param cmd_args: 参数 @@ -387,6 +387,61 @@ class DriverPage(BasePage): """ self.driver.execute_cdp_cmd("Network.setUserAgentOverride", {"userAgent": ua}) + def get_session_storage(self, item: str = None) -> Union[str, dict, None]: + """获取sessionStorage信息,不设置item则获取全部 \n + :param item: 要获取的项,不设置则返回全部 + :return: sessionStorage一个或所有项内容 + """ + js = f'return sessionStorage.getItem("{item}");' if item else 'return sessionStorage;' + return self.run_script(js) + + def get_local_storage(self, item: str = None) -> Union[str, dict, None]: + """获取localStorage信息,不设置item则获取全部 \n + :param item: 要获取的项目,不设置则返回全部 + :return: localStorage一个或所有项内容 + """ + js = f'return localStorage.getItem("{item}");' if item else 'return localStorage;' + return self.run_script(js) + + def set_session_storage(self, item: str, value: Union[str, bool]) -> None: + """设置或删除某项sessionStorage信息 \n + :param item: 要设置的项 + :param value: 项的值,设置为False时,删除该项 + :return: None + """ + s = f'sessionStorage.removeItem("{item}");' if item is False else f'sessionStorage.setItem("{item}","{value}");' + self.run_script(s) + + def set_local_storage(self, item: str, value: Union[str, bool]) -> None: + """设置或删除某项localStorage信息 \n + :param item: 要设置的项 + :param value: 项的值,设置为False时,删除该项 + :return: None + """ + s = f'localStorage.removeItem("{item}");' if item is False else f'localStorage.setItem("{item}","{value}");' + self.run_script(s) + + def clean_cache(self, + session_storage: bool = True, + local_storage: bool = True, + cache: bool = True, + cookies: bool = True) -> None: + """清除缓存,可选要清除的项 \n + :param session_storage: 是否清除sessionStorage + :param local_storage: 是否清除localStorage + :param cache: 是否清除cache + :param cookies: 是否清除cookies + :return: None + """ + if session_storage: + self.run_script('sessionStorage.clear();') + if local_storage: + self.run_script('localStorage.clear();') + if cache: + self.run_cdp('Network.clearBrowserCache') + if cookies: + self.run_cdp('Network.clearBrowserCookies') + def screenshot(self, path: str, filename: str = None) -> str: """截取页面可见范围截图 \n :param path: 保存路径 @@ -416,7 +471,7 @@ class DriverPage(BasePage): def stop_loading(self) -> None: """强制停止页面加载""" - self.run_cdp('Page.stopLoading', {}) + self.run_cdp('Page.stopLoading') def back(self) -> None: """在浏览历史中后退一步""" diff --git a/docs/使用方法/页面操作.md b/docs/使用方法/页面操作.md index 8bf16f5..65f3bf4 100644 --- a/docs/使用方法/页面操作.md +++ b/docs/使用方法/页面操作.md @@ -202,6 +202,24 @@ page.run_script('alert(arguments[0]+arguments[1])', 'Hello', ' world!') 返回:脚本执行结果 +## run_cdp() + +此方法用于执行 Chrome DevTools Protocol 语句。cdp +用法详见[Chrome DevTools Protocol](https://chromedevtools.github.io/devtools-protocol/)。 + +参数: + +- cmd:协议项目方法 + +- **cmd_args:项目参数 + +返回:该语句返回的值 + +```python +# 停止页面加载 +page.run_cdp('Page.stopLoading') +``` + ## set_timeouts() 此方法用于设置三种超时时间,selenium 4 以上版本生效。 @@ -224,6 +242,66 @@ page.run_script('alert(arguments[0]+arguments[1])', 'Hello', ' world!') 返回:None +## get_session_storage() + +此方法用于获取 sessionStorage 信息,可获取全部或单个项。 + +参数: + +- item:要获取的项,不设置则返回全部 + +返回:sessionStorage 一个或所有项内容 + +## get_local_storage() + +此方法用于获取 localStorage 信息,可获取全部或单个项。 + +参数: + +- item:要获取的项,不设置则返回全部 + +返回:localStorage 一个或所有项内容 + +## set_session_storage() + +此方法用于设置或删除某项 sessionStorage 信息。 + +参数: + +- item:要设置的项 + +- value:项的值,设置为False时,删除该项 + +返回:`None` + +## set_local_storage() + +此方法用于设置或删除某项 localStorage 信息。 + +参数: + +- item:要设置的项 + +- value:项的值,设置为False时,删除该项 + +返回:`None` + +## clean_cache() + +此方法用于清除缓存,可选要清除的项。 + +参数: + +- session_storage:是否清除 sessionstorage + +- local_storage:是否清除 localStorage + +- cache:是否清除 cache + +- cookies:是否清除 cookies + +返回:`None` + ## to_frame 此属性用于将页面焦点移到某个`frame`或`iframe`。