255 lines
7.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding:utf-8 -*-
"""
@Author : g1879
@Contact : g1879@qq.com
"""
from platform import system
from pathlib import Path
from re import search, sub
from shutil import rmtree
from time import perf_counter, sleep
def get_usable_path(path):
"""检查文件或文件夹是否有重名,并返回可以使用的路径
:param path: 文件或文件夹路径
:return: 可用的路径Path对象
"""
path = Path(path)
parent = path.parent
parent.mkdir(parents=True, exist_ok=True)
path = parent / make_valid_name(path.name)
name = path.stem if path.is_file() else path.name
ext = path.suffix if path.is_file() else ''
first_time = True
while path.exists():
r = search(r'(.*)_(\d+)$', name)
if not r or (r and first_time):
src_name, num = name, '1'
else:
src_name, num = r.group(1), int(r.group(2)) + 1
name = f'{src_name}_{num}'
path = parent / f'{name}{ext}'
first_time = None
return path
def make_valid_name(full_name):
"""获取有效的文件名
:param full_name: 文件名
:return: 可用的文件名
"""
# ----------------去除前后空格----------------
full_name = full_name.strip()
# ----------------使总长度不大于255个字符一个汉字是2个字符----------------
r = search(r'(.*)(\.[^.]+$)', full_name) # 拆分文件名和后缀名
if r:
name, ext = r.group(1), r.group(2)
ext_long = len(ext)
else:
name, ext = full_name, ''
ext_long = 0
while get_long(name) > 255 - ext_long:
name = name[:-1]
full_name = f'{name}{ext}'
# ----------------去除不允许存在的字符----------------
return sub(r'[<>/\\|:*?\n]', '', full_name)
def get_long(txt):
"""返回字符串中字符个数一个汉字是2个字符
:param txt: 字符串
:return: 字符个数
"""
txt_len = len(txt)
return int((len(txt.encode('utf-8')) - txt_len) / 2 + txt_len)
def port_is_using(ip, port):
"""检查端口是否被占用
:param ip: 浏览器地址
:param port: 浏览器端口
:return: bool
"""
from socket import socket, AF_INET, SOCK_STREAM
s = socket(AF_INET, SOCK_STREAM)
s.settimeout(.1)
result = s.connect_ex((ip, int(port)))
s.close()
return result == 0
def clean_folder(folder_path, ignore=None):
"""清空一个文件夹除了ignore里的文件和文件夹
:param folder_path: 要清空的文件夹路径
:param ignore: 忽略列表
:return: None
"""
ignore = [] if not ignore else ignore
p = Path(folder_path)
for f in p.iterdir():
if f.name not in ignore:
if f.is_file():
f.unlink()
elif f.is_dir():
rmtree(f, True)
def show_or_hide_browser(page, hide=True):
"""执行显示或隐藏浏览器窗口
:param page: ChromePage对象
:param hide: 是否隐藏
:return: None
"""
if not page.address.startswith(('127.0.0.1', 'localhost')):
return
if system().lower() != 'windows':
raise OSError('该方法只能在Windows系统使用。')
try:
from win32gui import ShowWindow
from win32con import SW_HIDE, SW_SHOW
except ImportError:
raise ImportError('请先安装pip install pypiwin32')
pid = page.process_id
if not pid:
return None
hds = get_chrome_hwnds_from_pid(pid, page.title)
sw = SW_HIDE if hide else SW_SHOW
for hd in hds:
ShowWindow(hd, sw)
def get_browser_progress_id(progress, address):
"""获取浏览器进程id
:param progress: 已知的进程对象没有时传入None
:param address: 浏览器管理地址,含端口
:return: 进程id或None
"""
if progress:
return progress.pid
from os import popen
port = address.split(':')[-1]
txt = ''
progresses = popen(f'netstat -nao | findstr :{port}').read().split('\n')
for progress in progresses:
if 'LISTENING' in progress:
txt = progress
break
if not txt:
return None
return txt.split(' ')[-1]
def get_chrome_hwnds_from_pid(pid, title):
"""通过PID查询句柄ID
:param pid: 进程id
:param title: 窗口标题
:return: 进程句柄组成的列表
"""
try:
from win32gui import IsWindow, GetWindowText, EnumWindows
from win32process import GetWindowThreadProcessId
except ImportError:
raise ImportError('请先安装win32guipip install pypiwin32')
def callback(hwnd, hds):
if IsWindow(hwnd) and title in GetWindowText(hwnd):
_, found_pid = GetWindowThreadProcessId(hwnd)
if str(found_pid) == str(pid):
hds.append(hwnd)
return True
hwnds = []
EnumWindows(callback, hwnds)
return hwnds
def wait_until(page, condition, timeout=10, poll=0.1, raise_err=True):
"""等待返回值不为False或空直到超时
:param page (DrissionPage): DrissionPage对象
:param condition (function | str | tuple): 等待条件返回值不为False则停止等待
:param timeout (float, optional): 超时时间
:param poll (float, optional): 轮询间隔
:param message (str, optional): 超时时的报错信息
:param ignored_exceptions (bool, optional): 是否忽略异常
:return: DP Element or bool
"""
end_time = perf_counter() + timeout
if isinstance(condition, str) or isinstance(condition, tuple):
if not callable(getattr(page, 's_ele', None)):
raise AttributeError('page对象缺少s_ele方法')
condition_method = lambda page: page.s_ele(condition)
elif callable(condition):
condition_method = condition
else:
raise ValueError('condition必须是函数或者字符串或者元组')
while perf_counter() < end_time:
try:
value = condition_method(page)
if value:
return value
except Exception as exc:
pass
sleep(poll)
if perf_counter() > end_time:
break
if raise_err:
raise TimeoutError('等待超时')
else:
return False
# def get_exe_from_port(port):
# """获取端口号第一条进程的可执行文件路径
# :param port: 端口号
# :return: 可执行文件的绝对路径
# """
# from os import popen
#
# pid = get_pid_from_port(port)
# if not pid:
# return
# else:
# file_lst = popen(f'wmic process where processid={pid} get executablepath').read().split('\n')
# return file_lst[2].strip() if len(file_lst) > 2 else None
#
#
# def get_pid_from_port(port):
# """获取端口号第一条进程的pid
# :param port: 端口号
# :return: 进程id
# """
# from platform import system
# if system().lower() != 'windows' or port is None:
# return None
#
# from os import popen
# from time import perf_counter
#
# try: # 避免Anaconda中可能产生的报错
# process = popen(f'netstat -ano |findstr {port}').read().split('\n')[0]
#
# t = perf_counter()
# while not process and perf_counter() - t < 5:
# process = popen(f'netstat -ano |findstr {port}').read().split('\n')[0]
#
# return process.split(' ')[-1] or None
#
# except Exception:
# return None