Merge 53b17bd2d28cb7f45363e49614a013d3d504a6e8 into 2d9193b0d3c0eae0c3a14d8c68a839f1bae157dc

This commit is contained in:
__kaning123__ 2026-02-25 07:31:23 +00:00 committed by GitHub
commit d02ca0131c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 385 additions and 15 deletions

View File

@ -0,0 +1,143 @@
import zipfile
from . import file_lib as fl
from . import time_lib as tl
from . import info_lib as il
import os
from typing import Union
import numpy as np
import torch
POOL:set = set()
def get_unique_name(name,MySet:set=set()):
_id = 1
if name not in POOL and name not in MySet:
POOL.add(name)
return name
while name in POOL or name in MySet:
_id += 1
name = f'{name}_{_id}'
POOL.add(name)
return name
TEMP_DIR = fl.merge_dir_txt2(fl.get_my_dir(), "Temp")
TEMP_ZIP_DIR = fl.merge_dir_txt2(TEMP_DIR, "ZipTemp")
def _tensor_to_numpy(tensor: torch.Tensor) -> np.ndarray:
cloned = tensor.clone().detach()
np_array = cloned.cpu().numpy()
return np_array
def save_np(path: str, np_array: np.ndarray) -> None:
np.save(path, np_array)
class ZIP_File:
def __init__(self, path: str,name:str,MySet:set=set()):
self.path = path
if not os.path.exists(self.path):
with zipfile.ZipFile(self.path, 'w') as zipf:
pass
self.name = get_unique_name(name,MySet=MySet)#MySet用于补充命名集合防止文件夹混淆
self.temp_write = fl.merge_dir_txt2(TEMP_ZIP_DIR, self.name)
if not os.path.exists(self.temp_write):
os.makedirs(self.temp_write)
def release(self):
'''relaese the zip file, extract it to temp dir'''
if os.path.exists(self.temp_write):
fl.delete_dir(self.temp_write)
fl.create_dir(self.temp_write)
with zipfile.ZipFile(self.path, 'r') as zipf:
zipf.extractall(self.temp_write)
#fl.delete_file(self.path)
def create_dir(self, dir_:str):
dir_path = fl.merge_dir_txt2(self.temp_write, dir_)
if not os.path.exists(dir_path):
os.makedirs(dir_path,exist_ok=True)
def create_file(self, file_name:str,location:str=''):
if location == '':
file_path = fl.merge_dir_txt2(self.temp_write,file_name)
else:
file_path = fl.merge_dir_txt2(self.temp_write, location, file_name)
if not os.path.exists(file_path):
os.makedirs(os.path.dirname(file_path),exist_ok=True)
with open(file_path, 'w') as f:
pass
def get_file_path(self, file_name:str,location:str=''):
if location == '':
file_path = fl.merge_dir_txt2(self.temp_write,file_name)
else:
file_path = fl.merge_dir_txt2(self.temp_write, location, file_name)
if not os.path.exists(file_path):
raise FileNotFoundError(f"File {file_path} does not exist.")
return file_path
def get_file_obj(self, file_name:str,location:str='',mode:str='r'):
if location == '':
file_path = fl.merge_dir_txt2(self.temp_write,file_name)
else:
file_path = fl.merge_dir_txt2(self.temp_write, location, file_name)
if not os.path.exists(file_path):
raise FileNotFoundError(f"File {file_path} does not exist.")
return open(file_path, mode)
def save_file(self, obj):
obj.close()
def save_zip(self):
with zipfile.ZipFile(self.path, 'w', zipfile.ZIP_DEFLATED) as zipf:
for root, dirs, files in os.walk(self.temp_write):
for file in files:
file_path = os.path.join(root, file)
arcname = os.path.relpath(file_path, self.temp_write)
zipf.write(file_path, arcname)
#fl.delete_dir(self.temp_write)
def close(self):
self.save_zip()
fl.delete_dir(self.temp_write)
POOL.remove(self.name)
def save_tensor(path: str, tensors: Union[torch.Tensor, list],name:str,MySet:set=set(),file_names:Union[str,list,None]=None,**info_save) -> None:
if isinstance(tensors, torch.Tensor):
tensors = [tensors]
if not file_names:
return
if isinstance(file_names, str):
files = [file_names]
else:
files = file_names
if len(tensors) != len(files):
raise ValueError("The number of tensors and files must be the same.")
np_arrays = []
for tensor in tensors:
np_array = _tensor_to_numpy(tensor)
np_arrays.append(np_array)
zf = ZIP_File(path, name, MySet=MySet)
zf.create_file("voice.json")
info = {'name': name}
info.update(info_save)
il.save_info(info, str(zf.get_file_path("voice.json")))
for i in range(len(files)):
file_name = files[i]
np_array = np_arrays[i]
zf.create_file(file_name)
save_np(str(zf.get_file_path(file_name)), np_array)
zf.close()
del zf
def load_tensor(path: str,name:str,find_func,MySet:set=set()) -> list[torch.Tensor]:
zf = ZIP_File(path, name, MySet=MySet)
zf.release()
voice_path = find_func(zf,il)
tensors = []
for i in range(len(voice_path)):
v = voice_path[i]
np_array = np.load(v,allow_pickle=True)
tensor = torch.from_numpy(np_array)
tensors.append(tensor)
zf.close()
del zf
return tensors

View File

@ -0,0 +1,35 @@
import os
import shutil
from pathlib import Path
def get_my_dir():
return os.path.dirname(os.path.abspath(__file__))
def get_parent_dir(dir_path,depth=1):
parent_path = Path(dir_path)
for _ in range(depth):
parent_path = parent_path.parent
return parent_path
def merge_dir_txt(a,b):
c=os.path.join(a,b)
return c
def merge_dir_txt2(*TXT):
return Path(os.path.join(*TXT))
def create_dir(path: Path, overwrite=False) -> bool:
if overwrite and path.exists():
shutil.rmtree(path)
path = Path(path)
path.mkdir(parents=True, exist_ok=True)
return path.exists()
def get_dir_children_dirs(path: Path):
return [item for item in path.iterdir() if item.is_dir()]
def get_dir_children_files(path: Path):
return [item for item in path.iterdir() if item.is_file()]
def delete_dir(path: Path):
return shutil.rmtree(path)
def delete_file(path: Path):
return os.remove(path)
def file_exists(path: Path):
path = Path(path)
return path.exists()

View File

@ -0,0 +1,10 @@
import json
def load_info(info_path):
with open(info_path, 'r', encoding='utf-8') as f:
info = json.load(f)
return info
def save_info(info, info_path):
with open(info_path, 'w', encoding='utf-8') as f:
json.dump(info, f, ensure_ascii=False, indent=4)

View File

@ -0,0 +1,38 @@
import time
#time styles
STYLE_Y = "%Y"
STYLE_M = "%m"
STYLE_D = "%d"
STYLE_H = "%H"
STYLE_MIN = "%M"
STYLE_S = "%S"
STYLE_FULL = "%Y-%m-%d_%H.%M.%S"
#quick calls
def get_time_y(STYLE = STYLE_Y):
return time.strftime(STYLE, time.localtime())
def get_time_m(STYLE = STYLE_M):
return time.strftime(STYLE, time.localtime())
def get_time_d(STYLE = STYLE_D):
return time.strftime(STYLE, time.localtime())
def get_time_h(STYLE = STYLE_H):
return time.strftime(STYLE, time.localtime())
def get_time_min(STYLE = STYLE_MIN):
return time.strftime(STYLE, time.localtime())
def get_time_s(STYLE = STYLE_S):
return time.strftime(STYLE, time.localtime())
def get_time_full(STYLE = STYLE_FULL):
return time.strftime(STYLE, time.localtime())
def s(t:float):
time.sleep(t)
return
###
if __name__ == '__main__':
print(get_time_y())
print(get_time_m())
print(get_time_d())
print(get_time_h())
print(get_time_min())
print(get_time_s())
print(get_time_full())

View File

@ -8,6 +8,51 @@
""" """
import psutil import psutil
import os import os
import sys
from pathlib import Path
def get_my_dir():
return os.path.dirname(os.path.abspath(__file__))
def get_parent_dir(dir_path,depth=1):
parent_path = Path(dir_path)
for _ in range(depth):
parent_path = parent_path.parent
return parent_path
def merge_dir_txt2(*TXT):
return Path(os.path.join(*TXT))
ROOT_DIR = str(get_parent_dir(get_my_dir()))
sys.path.append(get_my_dir())
import VoiceSave
POOL:set = set()
def _get_unique_name(name,MySet:set=set()):
_id = 1
if name not in POOL and name not in MySet:
POOL.add(name)
return name
while name in POOL or name in MySet:
_id += 1
name = f'{name}_{_id}'
POOL.add(name)
return name
def find_func(zf,il):
f = zf.get_file_path("voice.json")
info = il.load_info(f)
if info is None:
return None
list_names = info["access_list"]
ret = []
for name in list_names:
try:
a = zf.get_file_path(name)
ret.append(a)
except FileNotFoundError:
continue
return ret
def set_high_priority(): def set_high_priority():
"""把当前 Python 进程设为 HIGH_PRIORITY_CLASS""" """把当前 Python 进程设为 HIGH_PRIORITY_CLASS"""
@ -765,6 +810,18 @@ def get_tts_wav(
sample_steps=8, sample_steps=8,
if_sr=False, if_sr=False,
pause_second=0.3, pause_second=0.3,
SaveSvEmb=False,
SaveRefers=False,
SaveSvEmbName="sv_emb.voice",
SaveRefersName="refers.voice",
InjectSvEmb=False,
InjectRefers=False,
InjectSvEmbName="sv_emb.voice",
InjectRefersName="refers.voice",
EnableAudioLoad=True,
): ):
global cache global cache
if ref_wav_path: if ref_wav_path:
@ -898,20 +955,92 @@ def get_tts_wav(
sv_emb = [] sv_emb = []
if sv_cn_model == None: if sv_cn_model == None:
init_sv_cn() init_sv_cn()
if inp_refs:
for path in inp_refs: try:
try: #####这里加上提取sv的逻辑要么一堆sv一堆refer要么单个sv单个refer if EnableAudioLoad:
refer, audio_tensor = get_spepc(hps, path.name, dtype, device, is_v2pro) if inp_refs:
refers.append(refer) for path in inp_refs:
try: #####这里加上提取sv的逻辑要么一堆sv一堆refer要么单个sv单个refer
refer, audio_tensor = get_spepc(hps, path.name, dtype, device, is_v2pro)
refers.append(refer)
if is_v2pro:
sv_emb.append(sv_cn_model.compute_embedding3(audio_tensor))
#print("refer:", refer.shape)
except:
traceback.print_exc()
if len(refers) == 0:
refers, audio_tensor = get_spepc(hps, ref_wav_path, dtype, device, is_v2pro)
refers = [refers]
if is_v2pro: if is_v2pro:
sv_emb.append(sv_cn_model.compute_embedding3(audio_tensor)) sv_emb = [sv_cn_model.compute_embedding3(audio_tensor)]
except: else:
traceback.print_exc() refers = []
if len(refers) == 0: sv_emb = []
refers, audio_tensor = get_spepc(hps, ref_wav_path, dtype, device, is_v2pro) except:
refers = [refers] traceback.print_exc()
if is_v2pro:
sv_emb = [sv_cn_model.compute_embedding3(audio_tensor)] try:
if SaveSvEmb and is_v2pro:
names = []
for i in sv_emb:
names.append(_get_unique_name(str(i.shape))+".npy")
sv_path = merge_dir_txt2(ROOT_DIR,"output","sv_emb_opt")
if not os.path.exists(sv_path):
os.makedirs(sv_path,exist_ok=True)
if not os.path.exists(SaveSvEmbName):
_pth_ = str(merge_dir_txt2(ROOT_DIR,"output","sv_emb_opt",SaveSvEmbName))
else:
_pth_ = SaveSvEmbName
VoiceSave.save_tensor(_pth_,sv_emb,SaveSvEmbName,file_names=names,access_list=names)
except:
traceback.print_exc()
try:
if SaveRefers:
names = []
for i in refers:
names.append(_get_unique_name(str(i.shape))+".npy")
refers_path = merge_dir_txt2(ROOT_DIR,"output","refers_opt")
if not os.path.exists(refers_path):
os.makedirs(refers_path,exist_ok=True)
if not os.path.exists(SaveRefersName):
_pth_ = str(merge_dir_txt2(ROOT_DIR,"output","refers_opt",SaveRefersName))
else:
_pth_ = SaveRefersName
VoiceSave.save_tensor(_pth_,refers,SaveRefersName,file_names=names,access_list=names)
except:
traceback.print_exc()
#print("refers数量:", len(refers))
#print("sv_emb数量:", len(sv_emb) if is_v2pro else "无sv_emb")
try:
if InjectSvEmb and is_v2pro:
if not os.path.exists(InjectSvEmbName):
_pth_ = str(merge_dir_txt2(ROOT_DIR,"output","sv_emb_opt",InjectSvEmbName))
else:
_pth_ = InjectSvEmbName
_sv_emb = VoiceSave.load_tensor(_pth_,InjectSvEmbName,find_func)
for i in range(len(_sv_emb)):
sv_emb.append(_sv_emb[i].to(device))
except:
traceback.print_exc()
try:
if InjectRefers:
if not os.path.exists(InjectRefersName):
_pth_ = str(merge_dir_txt2(ROOT_DIR,"output","refers_opt",InjectRefersName))
else:
_pth_ = InjectRefersName
_refers = VoiceSave.load_tensor(_pth_,InjectRefersName,find_func)
for i in range(len(_refers)):
refers.append(_refers[i].to(device))
except:
traceback.print_exc()
#print("注入后refers数量:", len(refers))
#print("注入后sv_emb数量:", len(sv_emb) if is_v2pro else "无sv_emb")
if is_v2pro: if is_v2pro:
audio = vq_model.decode( audio = vq_model.decode(
pred_semantic, torch.LongTensor(phones2).to(device).unsqueeze(0), refers, speed=speed, sv_emb=sv_emb pred_semantic, torch.LongTensor(phones2).to(device).unsqueeze(0), refers, speed=speed, sv_emb=sv_emb

5
config.json Normal file
View File

@ -0,0 +1,5 @@
{
"GPU_CHECK":{
"DisableGPUMemCheck":false
}
}

View File

@ -1,11 +1,20 @@
import os import os
import re import re
import sys import sys
import json
from pathlib import Path
import torch import torch
from tools.i18n.i18n import I18nAuto from tools.i18n.i18n import I18nAuto
current_dir = str(Path(__file__).parent)
def merge_dir_txt2(*TXT):
return Path(os.path.join(*TXT))
config_json_location = merge_dir_txt2(current_dir,"config.json")
with open(str(config_json_location),"r") as f:
__info__ = f.read()
i18n = I18nAuto(language=os.environ.get("language", "Auto")) i18n = I18nAuto(language=os.environ.get("language", "Auto"))
@ -159,8 +168,9 @@ def get_device_dtype_sm(idx: int) -> tuple[torch.device, torch.dtype, float, flo
major, minor = capability major, minor = capability
sm_version = major + minor / 10.0 sm_version = major + minor / 10.0
is_16_series = bool(re.search(r"16\d{2}", name)) and sm_version == 7.5 is_16_series = bool(re.search(r"16\d{2}", name)) and sm_version == 7.5
if mem_gb < 4 or sm_version < 5.3: if not __info__["GPU_CHECK"]["DisableGPUMemCheck"]:
return cpu, torch.float32, 0.0, 0.0 if mem_gb < 4 or sm_version < 5.3:
return cpu, torch.float32, 0.0, 0.0
if sm_version == 6.1 or is_16_series == True: if sm_version == 6.1 or is_16_series == True:
return cuda, torch.float32, sm_version, mem_gb return cuda, torch.float32, sm_version, mem_gb
if sm_version > 6.1: if sm_version > 6.1: