mirror of
https://gitee.com/g1879/DrissionPage.git
synced 2024-12-10 04:00:23 +08:00
288 lines
9.0 KiB
Python
288 lines
9.0 KiB
Python
# -*- coding:utf-8 -*-
|
||
"""
|
||
@Author : g1879
|
||
@Contact : g1879@qq.com
|
||
"""
|
||
from pathlib import Path
|
||
from platform import system
|
||
from re import search, sub
|
||
from shutil import rmtree
|
||
from time import perf_counter, sleep
|
||
|
||
from psutil import process_iter, AccessDenied, NoSuchProcess, ZombieProcess
|
||
|
||
from .._configs.options_manage import OptionsManager
|
||
from ..errors import (ContextLostError, ElementLostError, CDPError, PageClosedError, NoRectError, AlertExistsError,
|
||
WrongURLError, StorageError, CookieFormatError)
|
||
|
||
|
||
def get_usable_path(path, is_file=True, parents=True):
|
||
"""检查文件或文件夹是否有重名,并返回可以使用的路径
|
||
:param path: 文件或文件夹路径
|
||
:param is_file: 目标是文件还是文件夹
|
||
:param parents: 是否创建目标路径
|
||
:return: 可用的路径,Path对象
|
||
"""
|
||
path = Path(path)
|
||
parent = path.parent
|
||
if parents:
|
||
parent.mkdir(parents=True, exist_ok=True)
|
||
path = parent / make_valid_name(path.name)
|
||
name = path.stem if path.is_file() else path.name
|
||
ext = path.suffix if path.is_file() else ''
|
||
|
||
first_time = True
|
||
|
||
while path.exists() and path.is_file() == is_file:
|
||
r = search(r'(.*)_(\d+)$', name)
|
||
|
||
if not r or (r and first_time):
|
||
src_name, num = name, '1'
|
||
else:
|
||
src_name, num = r.group(1), int(r.group(2)) + 1
|
||
|
||
name = f'{src_name}_{num}'
|
||
path = parent / f'{name}{ext}'
|
||
first_time = None
|
||
|
||
return path
|
||
|
||
|
||
def make_valid_name(full_name):
|
||
"""获取有效的文件名
|
||
:param full_name: 文件名
|
||
:return: 可用的文件名
|
||
"""
|
||
# ----------------去除前后空格----------------
|
||
full_name = full_name.strip()
|
||
|
||
# ----------------使总长度不大于255个字符(一个汉字是2个字符)----------------
|
||
r = search(r'(.*)(\.[^.]+$)', full_name) # 拆分文件名和后缀名
|
||
if r:
|
||
name, ext = r.group(1), r.group(2)
|
||
ext_long = len(ext)
|
||
else:
|
||
name, ext = full_name, ''
|
||
ext_long = 0
|
||
|
||
while get_long(name) > 255 - ext_long:
|
||
name = name[:-1]
|
||
|
||
full_name = f'{name}{ext}'
|
||
|
||
# ----------------去除不允许存在的字符----------------
|
||
return sub(r'[<>/\\|:*?\n]', '', full_name)
|
||
|
||
|
||
def get_long(txt):
|
||
"""返回字符串中字符个数(一个汉字是2个字符)
|
||
:param txt: 字符串
|
||
:return: 字符个数
|
||
"""
|
||
txt_len = len(txt)
|
||
return int((len(txt.encode('utf-8')) - txt_len) / 2 + txt_len)
|
||
|
||
|
||
def port_is_using(ip, port):
|
||
"""检查端口是否被占用
|
||
:param ip: 浏览器地址
|
||
:param port: 浏览器端口
|
||
:return: bool
|
||
"""
|
||
from socket import socket, AF_INET, SOCK_STREAM
|
||
s = socket(AF_INET, SOCK_STREAM)
|
||
s.settimeout(.1)
|
||
result = s.connect_ex((ip, int(port)))
|
||
s.close()
|
||
return result == 0
|
||
|
||
|
||
def clean_folder(folder_path, ignore=None):
|
||
"""清空一个文件夹,除了ignore里的文件和文件夹
|
||
:param folder_path: 要清空的文件夹路径
|
||
:param ignore: 忽略列表
|
||
:return: None
|
||
"""
|
||
ignore = [] if not ignore else ignore
|
||
p = Path(folder_path)
|
||
|
||
for f in p.iterdir():
|
||
if f.name not in ignore:
|
||
if f.is_file():
|
||
f.unlink()
|
||
elif f.is_dir():
|
||
rmtree(f, True)
|
||
|
||
|
||
def show_or_hide_browser(page, hide=True):
|
||
"""执行显示或隐藏浏览器窗口
|
||
:param page: ChromePage对象
|
||
:param hide: 是否隐藏
|
||
:return: None
|
||
"""
|
||
if not page.address.startswith(('127.0.0.1', 'localhost')):
|
||
return
|
||
|
||
if system().lower() != 'windows':
|
||
raise OSError('该方法只能在Windows系统使用。')
|
||
|
||
try:
|
||
from win32gui import ShowWindow
|
||
from win32con import SW_HIDE, SW_SHOW
|
||
except ImportError:
|
||
raise ImportError('请先安装:pip install pypiwin32')
|
||
|
||
pid = page.process_id
|
||
if not pid:
|
||
return None
|
||
hds = get_chrome_hwnds_from_pid(pid, page.title)
|
||
sw = SW_HIDE if hide else SW_SHOW
|
||
for hd in hds:
|
||
ShowWindow(hd, sw)
|
||
|
||
|
||
def get_browser_progress_id(progress, address):
|
||
"""获取浏览器进程id
|
||
:param progress: 已知的进程对象,没有时传入None
|
||
:param address: 浏览器管理地址,含端口
|
||
:return: 进程id或None
|
||
"""
|
||
if progress:
|
||
return progress.pid
|
||
|
||
from os import popen
|
||
port = address.split(':')[-1]
|
||
txt = ''
|
||
progresses = popen(f'netstat -nao | findstr :{port}').read().split('\n')
|
||
for progress in progresses:
|
||
if 'LISTENING' in progress:
|
||
txt = progress
|
||
break
|
||
if not txt:
|
||
return None
|
||
|
||
return txt.split(' ')[-1]
|
||
|
||
|
||
def get_chrome_hwnds_from_pid(pid, title):
|
||
"""通过PID查询句柄ID
|
||
:param pid: 进程id
|
||
:param title: 窗口标题
|
||
:return: 进程句柄组成的列表
|
||
"""
|
||
try:
|
||
from win32gui import IsWindow, GetWindowText, EnumWindows
|
||
from win32process import GetWindowThreadProcessId
|
||
except ImportError:
|
||
raise ImportError('请先安装win32gui,pip install pypiwin32')
|
||
|
||
def callback(hwnd, hds):
|
||
if IsWindow(hwnd) and title in GetWindowText(hwnd):
|
||
_, found_pid = GetWindowThreadProcessId(hwnd)
|
||
if str(found_pid) == str(pid):
|
||
hds.append(hwnd)
|
||
return True
|
||
|
||
hwnds = []
|
||
EnumWindows(callback, hwnds)
|
||
return hwnds
|
||
|
||
|
||
def wait_until(page, condition, timeout=10, poll=0.1, raise_err=True):
|
||
"""等待返回值不为False或空,直到超时
|
||
:param page: DrissionPage对象
|
||
:param condition: 等待条件,返回值不为False则停止等待
|
||
:param timeout: 超时时间
|
||
:param poll: 轮询间隔
|
||
:param raise_err: 是否抛出异常
|
||
:return: DP Element or bool
|
||
"""
|
||
end_time = perf_counter() + timeout
|
||
if isinstance(condition, str) or isinstance(condition, tuple):
|
||
if not callable(getattr(page, 's_ele', None)):
|
||
raise AttributeError('page对象缺少s_ele方法')
|
||
condition_method = lambda page: page.s_ele(condition)
|
||
elif callable(condition):
|
||
condition_method = condition
|
||
else:
|
||
raise ValueError('condition必须是函数或者字符串或者元组')
|
||
while perf_counter() < end_time:
|
||
try:
|
||
value = condition_method(page)
|
||
if value:
|
||
return value
|
||
except Exception as exc:
|
||
pass
|
||
|
||
sleep(poll)
|
||
if perf_counter() > end_time:
|
||
break
|
||
|
||
if raise_err:
|
||
raise TimeoutError('等待超时')
|
||
else:
|
||
return False
|
||
|
||
|
||
def stop_process_on_port(port):
|
||
"""强制关闭某个端口内的进程
|
||
:param port: 端口号
|
||
:return: None
|
||
"""
|
||
for proc in process_iter(['pid', 'connections']):
|
||
try:
|
||
connections = proc.connections()
|
||
except (AccessDenied, NoSuchProcess):
|
||
continue
|
||
for conn in connections:
|
||
if conn.laddr.port == int(port):
|
||
try:
|
||
proc.terminate()
|
||
except (NoSuchProcess, AccessDenied, ZombieProcess):
|
||
pass
|
||
except Exception as e:
|
||
print(f"{proc.pid} {port}: {e}")
|
||
|
||
|
||
def configs_to_here(save_name=None):
|
||
"""把默认ini文件复制到当前目录
|
||
:param save_name: 指定文件名,为None则命名为'dp_configs.ini'
|
||
:return: None
|
||
"""
|
||
om = OptionsManager('default')
|
||
save_name = f'{save_name}.ini' if save_name is not None else 'dp_configs.ini'
|
||
om.save(save_name)
|
||
|
||
|
||
def raise_error(r):
|
||
"""抛出error对应报错
|
||
:param r: 包含error的dict
|
||
:return: None
|
||
"""
|
||
error = r['error']
|
||
if error in ('Cannot find context with specified id', 'Inspected target navigated or closed'):
|
||
raise ContextLostError
|
||
elif error in ('Could not find node with given id', 'Could not find object with given id',
|
||
'No node with given id found', 'Node with given id does not belong to the document',
|
||
'No node found for given backend id'):
|
||
raise ElementLostError
|
||
elif error == 'tab closed':
|
||
raise PageClosedError
|
||
elif error == 'timeout':
|
||
raise TimeoutError
|
||
elif error == 'alert exists.':
|
||
raise AlertExistsError
|
||
elif error in ('Node does not have a layout object', 'Could not compute box model.'):
|
||
raise NoRectError
|
||
elif error == 'Cannot navigate to invalid URL':
|
||
raise WrongURLError(f'无效的url:{r["args"]["url"]}。也许要加上"http://"?')
|
||
elif error == 'Frame corresponds to an opaque origin and its storage key cannot be serialized':
|
||
raise StorageError
|
||
elif error == 'Sanitizing cookie failed':
|
||
raise CookieFormatError(f'cookie格式不正确:{r["args"]}')
|
||
elif r['type'] == 'call_method_error':
|
||
raise CDPError(f'\n错误:{r["error"]}\nmethod:{r["method"]}\nargs:{r["args"]}\n出现这个错误可能意味着程序有bug,'
|
||
'请把错误信息和重现方法告知作者,谢谢。\n报告网站:https://gitee.com/g1879/DrissionPage/issues')
|
||
else:
|
||
raise RuntimeError(r)
|