diff --git a/GPT_SoVITS/VoiceSave/__init__.py b/GPT_SoVITS/VoiceSave/__init__.py new file mode 100644 index 00000000..dadc19e7 --- /dev/null +++ b/GPT_SoVITS/VoiceSave/__init__.py @@ -0,0 +1,143 @@ +import zipfile +from . import file_lib as fl +from . import time_lib as tl +from . import info_lib as il +import os +from typing import Union +import numpy as np +import torch + +POOL:set = set() +def get_unique_name(name,MySet:set=set()): + _id = 1 + if name not in POOL and name not in MySet: + POOL.add(name) + return name + while name in POOL or name in MySet: + _id += 1 + name = f'{name}_{_id}' + POOL.add(name) + return name + +TEMP_DIR = fl.merge_dir_txt2(fl.get_my_dir(), "Temp") +TEMP_ZIP_DIR = fl.merge_dir_txt2(TEMP_DIR, "ZipTemp") +def _tensor_to_numpy(tensor: torch.Tensor) -> np.ndarray: + cloned = tensor.clone().detach() + np_array = cloned.cpu().numpy() + return np_array + +def save_np(path: str, np_array: np.ndarray) -> None: + np.save(path, np_array) + +class ZIP_File: + def __init__(self, path: str,name:str,MySet:set=set()): + self.path = path + if not os.path.exists(self.path): + with zipfile.ZipFile(self.path, 'w') as zipf: + pass + self.name = get_unique_name(name,MySet=MySet)#MySet用于补充命名集合,防止文件夹混淆 + self.temp_write = fl.merge_dir_txt2(TEMP_ZIP_DIR, self.name) + + if not os.path.exists(self.temp_write): + os.makedirs(self.temp_write) + + def release(self): + '''relaese the zip file, extract it to temp dir''' + if os.path.exists(self.temp_write): + fl.delete_dir(self.temp_write) + fl.create_dir(self.temp_write) + with zipfile.ZipFile(self.path, 'r') as zipf: + zipf.extractall(self.temp_write) + #fl.delete_file(self.path) + def create_dir(self, dir_:str): + dir_path = fl.merge_dir_txt2(self.temp_write, dir_) + if not os.path.exists(dir_path): + os.makedirs(dir_path,exist_ok=True) + + def create_file(self, file_name:str,location:str=''): + if location == '': + file_path = fl.merge_dir_txt2(self.temp_write,file_name) + else: + file_path = fl.merge_dir_txt2(self.temp_write, location, file_name) + if not os.path.exists(file_path): + os.makedirs(os.path.dirname(file_path),exist_ok=True) + with open(file_path, 'w') as f: + pass + + def get_file_path(self, file_name:str,location:str=''): + if location == '': + file_path = fl.merge_dir_txt2(self.temp_write,file_name) + else: + file_path = fl.merge_dir_txt2(self.temp_write, location, file_name) + if not os.path.exists(file_path): + raise FileNotFoundError(f"File {file_path} does not exist.") + return file_path + + def get_file_obj(self, file_name:str,location:str='',mode:str='r'): + if location == '': + file_path = fl.merge_dir_txt2(self.temp_write,file_name) + else: + file_path = fl.merge_dir_txt2(self.temp_write, location, file_name) + if not os.path.exists(file_path): + raise FileNotFoundError(f"File {file_path} does not exist.") + return open(file_path, mode) + + def save_file(self, obj): + obj.close() + + def save_zip(self): + with zipfile.ZipFile(self.path, 'w', zipfile.ZIP_DEFLATED) as zipf: + for root, dirs, files in os.walk(self.temp_write): + for file in files: + file_path = os.path.join(root, file) + arcname = os.path.relpath(file_path, self.temp_write) + zipf.write(file_path, arcname) + #fl.delete_dir(self.temp_write) + + def close(self): + self.save_zip() + fl.delete_dir(self.temp_write) + POOL.remove(self.name) + +def save_tensor(path: str, tensors: Union[torch.Tensor, list],name:str,MySet:set=set(),file_names:Union[str,list,None]=None,**info_save) -> None: + if isinstance(tensors, torch.Tensor): + tensors = [tensors] + if not file_names: + return + if isinstance(file_names, str): + files = [file_names] + else: + files = file_names + + if len(tensors) != len(files): + raise ValueError("The number of tensors and files must be the same.") + np_arrays = [] + for tensor in tensors: + np_array = _tensor_to_numpy(tensor) + np_arrays.append(np_array) + zf = ZIP_File(path, name, MySet=MySet) + zf.create_file("voice.json") + info = {'name': name} + info.update(info_save) + il.save_info(info, str(zf.get_file_path("voice.json"))) + for i in range(len(files)): + file_name = files[i] + np_array = np_arrays[i] + zf.create_file(file_name) + save_np(str(zf.get_file_path(file_name)), np_array) + zf.close() + del zf + +def load_tensor(path: str,name:str,find_func,MySet:set=set()) -> list[torch.Tensor]: + zf = ZIP_File(path, name, MySet=MySet) + zf.release() + voice_path = find_func(zf,il) + tensors = [] + for i in range(len(voice_path)): + v = voice_path[i] + np_array = np.load(v,allow_pickle=True) + tensor = torch.from_numpy(np_array) + tensors.append(tensor) + zf.close() + del zf + return tensors \ No newline at end of file diff --git a/GPT_SoVITS/VoiceSave/file_lib.py b/GPT_SoVITS/VoiceSave/file_lib.py new file mode 100644 index 00000000..e26b7c5a --- /dev/null +++ b/GPT_SoVITS/VoiceSave/file_lib.py @@ -0,0 +1,35 @@ +import os +import shutil +from pathlib import Path + +def get_my_dir(): + return os.path.dirname(os.path.abspath(__file__)) + +def get_parent_dir(dir_path,depth=1): + parent_path = Path(dir_path) + for _ in range(depth): + parent_path = parent_path.parent + return parent_path + +def merge_dir_txt(a,b): + c=os.path.join(a,b) + return c +def merge_dir_txt2(*TXT): + return Path(os.path.join(*TXT)) +def create_dir(path: Path, overwrite=False) -> bool: + if overwrite and path.exists(): + shutil.rmtree(path) + path = Path(path) + path.mkdir(parents=True, exist_ok=True) + return path.exists() +def get_dir_children_dirs(path: Path): + return [item for item in path.iterdir() if item.is_dir()] +def get_dir_children_files(path: Path): + return [item for item in path.iterdir() if item.is_file()] +def delete_dir(path: Path): + return shutil.rmtree(path) +def delete_file(path: Path): + return os.remove(path) +def file_exists(path: Path): + path = Path(path) + return path.exists() \ No newline at end of file diff --git a/GPT_SoVITS/VoiceSave/info_lib.py b/GPT_SoVITS/VoiceSave/info_lib.py new file mode 100644 index 00000000..c8f825b0 --- /dev/null +++ b/GPT_SoVITS/VoiceSave/info_lib.py @@ -0,0 +1,10 @@ +import json + +def load_info(info_path): + with open(info_path, 'r', encoding='utf-8') as f: + info = json.load(f) + return info + +def save_info(info, info_path): + with open(info_path, 'w', encoding='utf-8') as f: + json.dump(info, f, ensure_ascii=False, indent=4) \ No newline at end of file diff --git a/GPT_SoVITS/VoiceSave/time_lib.py b/GPT_SoVITS/VoiceSave/time_lib.py new file mode 100644 index 00000000..819f90f1 --- /dev/null +++ b/GPT_SoVITS/VoiceSave/time_lib.py @@ -0,0 +1,38 @@ +import time +#time styles +STYLE_Y = "%Y" +STYLE_M = "%m" +STYLE_D = "%d" +STYLE_H = "%H" +STYLE_MIN = "%M" +STYLE_S = "%S" +STYLE_FULL = "%Y-%m-%d_%H.%M.%S" +#quick calls +def get_time_y(STYLE = STYLE_Y): + return time.strftime(STYLE, time.localtime()) +def get_time_m(STYLE = STYLE_M): + return time.strftime(STYLE, time.localtime()) +def get_time_d(STYLE = STYLE_D): + return time.strftime(STYLE, time.localtime()) +def get_time_h(STYLE = STYLE_H): + return time.strftime(STYLE, time.localtime()) +def get_time_min(STYLE = STYLE_MIN): + return time.strftime(STYLE, time.localtime()) +def get_time_s(STYLE = STYLE_S): + return time.strftime(STYLE, time.localtime()) +def get_time_full(STYLE = STYLE_FULL): + return time.strftime(STYLE, time.localtime()) + +def s(t:float): + time.sleep(t) + return +### + +if __name__ == '__main__': + print(get_time_y()) + print(get_time_m()) + print(get_time_d()) + print(get_time_h()) + print(get_time_min()) + print(get_time_s()) + print(get_time_full()) \ No newline at end of file diff --git a/GPT_SoVITS/inference_webui.py b/GPT_SoVITS/inference_webui.py index a361ed58..3031b9ba 100644 --- a/GPT_SoVITS/inference_webui.py +++ b/GPT_SoVITS/inference_webui.py @@ -8,6 +8,51 @@ """ import psutil import os +import sys +from pathlib import Path + +def get_my_dir(): + return os.path.dirname(os.path.abspath(__file__)) + +def get_parent_dir(dir_path,depth=1): + parent_path = Path(dir_path) + for _ in range(depth): + parent_path = parent_path.parent + return parent_path + +def merge_dir_txt2(*TXT): + return Path(os.path.join(*TXT)) + +ROOT_DIR = str(get_parent_dir(get_my_dir())) +sys.path.append(get_my_dir()) +import VoiceSave + +POOL:set = set() +def _get_unique_name(name,MySet:set=set()): + _id = 1 + if name not in POOL and name not in MySet: + POOL.add(name) + return name + while name in POOL or name in MySet: + _id += 1 + name = f'{name}_{_id}' + POOL.add(name) + return name + +def find_func(zf,il): + f = zf.get_file_path("voice.json") + info = il.load_info(f) + if info is None: + return None + list_names = info["access_list"] + ret = [] + for name in list_names: + try: + a = zf.get_file_path(name) + ret.append(a) + except FileNotFoundError: + continue + return ret def set_high_priority(): """把当前 Python 进程设为 HIGH_PRIORITY_CLASS""" @@ -765,6 +810,18 @@ def get_tts_wav( sample_steps=8, if_sr=False, pause_second=0.3, + + SaveSvEmb=False, + SaveRefers=False, + SaveSvEmbName="sv_emb.voice", + SaveRefersName="refers.voice", + + InjectSvEmb=False, + InjectRefers=False, + InjectSvEmbName="sv_emb.voice", + InjectRefersName="refers.voice", + + EnableAudioLoad=True, ): global cache if ref_wav_path: @@ -898,20 +955,92 @@ def get_tts_wav( sv_emb = [] if sv_cn_model == None: init_sv_cn() - if inp_refs: - for path in inp_refs: - try: #####这里加上提取sv的逻辑,要么一堆sv一堆refer,要么单个sv单个refer - refer, audio_tensor = get_spepc(hps, path.name, dtype, device, is_v2pro) - refers.append(refer) + + try: + if EnableAudioLoad: + if inp_refs: + for path in inp_refs: + try: #####这里加上提取sv的逻辑,要么一堆sv一堆refer,要么单个sv单个refer + refer, audio_tensor = get_spepc(hps, path.name, dtype, device, is_v2pro) + refers.append(refer) + if is_v2pro: + sv_emb.append(sv_cn_model.compute_embedding3(audio_tensor)) + #print("refer:", refer.shape) + except: + traceback.print_exc() + if len(refers) == 0: + refers, audio_tensor = get_spepc(hps, ref_wav_path, dtype, device, is_v2pro) + refers = [refers] if is_v2pro: - sv_emb.append(sv_cn_model.compute_embedding3(audio_tensor)) - except: - traceback.print_exc() - if len(refers) == 0: - refers, audio_tensor = get_spepc(hps, ref_wav_path, dtype, device, is_v2pro) - refers = [refers] - if is_v2pro: - sv_emb = [sv_cn_model.compute_embedding3(audio_tensor)] + sv_emb = [sv_cn_model.compute_embedding3(audio_tensor)] + else: + refers = [] + sv_emb = [] + except: + traceback.print_exc() + + try: + if SaveSvEmb and is_v2pro: + names = [] + for i in sv_emb: + names.append(_get_unique_name(str(i.shape))+".npy") + sv_path = merge_dir_txt2(ROOT_DIR,"output","sv_emb_opt") + if not os.path.exists(sv_path): + os.makedirs(sv_path,exist_ok=True) + if not os.path.exists(SaveSvEmbName): + _pth_ = str(merge_dir_txt2(ROOT_DIR,"output","sv_emb_opt",SaveSvEmbName)) + else: + _pth_ = SaveSvEmbName + VoiceSave.save_tensor(_pth_,sv_emb,SaveSvEmbName,file_names=names,access_list=names) + except: + traceback.print_exc() + + try: + if SaveRefers: + names = [] + for i in refers: + names.append(_get_unique_name(str(i.shape))+".npy") + refers_path = merge_dir_txt2(ROOT_DIR,"output","refers_opt") + if not os.path.exists(refers_path): + os.makedirs(refers_path,exist_ok=True) + if not os.path.exists(SaveRefersName): + _pth_ = str(merge_dir_txt2(ROOT_DIR,"output","refers_opt",SaveRefersName)) + else: + _pth_ = SaveRefersName + VoiceSave.save_tensor(_pth_,refers,SaveRefersName,file_names=names,access_list=names) + except: + traceback.print_exc() + + #print("refers数量:", len(refers)) + #print("sv_emb数量:", len(sv_emb) if is_v2pro else "无sv_emb") + + try: + if InjectSvEmb and is_v2pro: + if not os.path.exists(InjectSvEmbName): + _pth_ = str(merge_dir_txt2(ROOT_DIR,"output","sv_emb_opt",InjectSvEmbName)) + else: + _pth_ = InjectSvEmbName + _sv_emb = VoiceSave.load_tensor(_pth_,InjectSvEmbName,find_func) + for i in range(len(_sv_emb)): + sv_emb.append(_sv_emb[i].to(device)) + except: + traceback.print_exc() + + try: + if InjectRefers: + if not os.path.exists(InjectRefersName): + _pth_ = str(merge_dir_txt2(ROOT_DIR,"output","refers_opt",InjectRefersName)) + else: + _pth_ = InjectRefersName + _refers = VoiceSave.load_tensor(_pth_,InjectRefersName,find_func) + for i in range(len(_refers)): + refers.append(_refers[i].to(device)) + except: + traceback.print_exc() + + #print("注入后refers数量:", len(refers)) + #print("注入后sv_emb数量:", len(sv_emb) if is_v2pro else "无sv_emb") + if is_v2pro: audio = vq_model.decode( pred_semantic, torch.LongTensor(phones2).to(device).unsqueeze(0), refers, speed=speed, sv_emb=sv_emb diff --git a/config.json b/config.json new file mode 100644 index 00000000..09a5d85e --- /dev/null +++ b/config.json @@ -0,0 +1,5 @@ +{ + "GPU_CHECK":{ + "DisableGPUMemCheck":false + } +} \ No newline at end of file diff --git a/config.py b/config.py index fdc11c0a..76965027 100644 --- a/config.py +++ b/config.py @@ -1,11 +1,20 @@ import os import re import sys +import json +from pathlib import Path import torch from tools.i18n.i18n import I18nAuto +current_dir = str(Path(__file__).parent) +def merge_dir_txt2(*TXT): + return Path(os.path.join(*TXT)) +config_json_location = merge_dir_txt2(current_dir,"config.json") +with open(str(config_json_location),"r") as f: + __info__ = f.read() + i18n = I18nAuto(language=os.environ.get("language", "Auto")) @@ -159,8 +168,9 @@ def get_device_dtype_sm(idx: int) -> tuple[torch.device, torch.dtype, float, flo major, minor = capability sm_version = major + minor / 10.0 is_16_series = bool(re.search(r"16\d{2}", name)) and sm_version == 7.5 - if mem_gb < 4 or sm_version < 5.3: - return cpu, torch.float32, 0.0, 0.0 + if not __info__["GPU_CHECK"]["DisableGPUMemCheck"]: + if mem_gb < 4 or sm_version < 5.3: + return cpu, torch.float32, 0.0, 0.0 if sm_version == 6.1 or is_16_series == True: return cuda, torch.float32, sm_version, mem_gb if sm_version > 6.1: