mirror of
https://github.com/RVC-Boss/GPT-SoVITS.git
synced 2026-04-29 21:00:42 +08:00
Compare commits
8 Commits
a9c9b34549
...
65f902ca80
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
65f902ca80 | ||
|
|
938f05fce8 | ||
|
|
445d18ccce | ||
|
|
00ce973412 | ||
|
|
14191901cd | ||
|
|
780383d5bd | ||
|
|
ba8de9b760 | ||
|
|
62ee3c2aa0 |
@ -67,8 +67,10 @@ class Text2SemanticDataset(Dataset):
|
||||
)
|
||||
) # "%s/3-bert"%exp_dir#bert_dir
|
||||
self.path6 = semantic_path # "%s/6-name2semantic.tsv"%exp_dir#semantic_path
|
||||
assert os.path.exists(self.path2)
|
||||
assert os.path.exists(self.path6)
|
||||
if not os.path.exists(self.path2):
|
||||
raise FileNotFoundError(f"Phoneme data file not found: {self.path2}")
|
||||
if not os.path.exists(self.path6):
|
||||
raise FileNotFoundError(f"Semantic data file not found: {self.path6}")
|
||||
self.phoneme_data = {}
|
||||
with open(self.path2, "r", encoding="utf8") as f:
|
||||
lines = f.read().strip("\n").split("\n")
|
||||
@ -131,7 +133,7 @@ class Text2SemanticDataset(Dataset):
|
||||
phoneme, word2ph, text = self.phoneme_data[item_name]
|
||||
except Exception:
|
||||
traceback.print_exc()
|
||||
# print(f"{item_name} not in self.phoneme_data !")
|
||||
print(f"Warning: File \"{item_name}\" not in self.phoneme_data! Skipped. ")
|
||||
num_not_in += 1
|
||||
continue
|
||||
|
||||
@ -152,7 +154,7 @@ class Text2SemanticDataset(Dataset):
|
||||
phoneme_ids = cleaned_text_to_sequence(phoneme, version)
|
||||
except:
|
||||
traceback.print_exc()
|
||||
# print(f"{item_name} not in self.phoneme_data !")
|
||||
print(f"Warning: Failed to convert phonemes to sequence for file \"{item_name}\"! Skipped. ")
|
||||
num_not_in += 1
|
||||
continue
|
||||
# if len(phoneme_ids) >400:###########2:改为恒定限制为semantic/2.5就行
|
||||
@ -228,7 +230,11 @@ class Text2SemanticDataset(Dataset):
|
||||
# bert_feature=torch.zeros_like(phoneme_ids,dtype=torch.float32)
|
||||
bert_feature = None
|
||||
else:
|
||||
assert bert_feature.shape[-1] == len(phoneme_ids)
|
||||
try:
|
||||
assert bert_feature.shape[-1] == len(phoneme_ids)
|
||||
except AssertionError:
|
||||
print(f"AssertionError: The BERT feature dimension ({bert_feature.shape[-1]}) of the file '{item_name}' does not match the length of the phoneme sequence ({len(phoneme_ids)}).")
|
||||
raise
|
||||
return {
|
||||
"idx": idx,
|
||||
"phoneme_ids": phoneme_ids,
|
||||
|
||||
@ -262,7 +262,7 @@ def make_reject_y(y_o, y_lens):
|
||||
reject_y = []
|
||||
reject_y_lens = []
|
||||
for b in range(bs):
|
||||
process_item_idx = torch.randint(0, 1, size=(1,))[0]
|
||||
process_item_idx = torch.randint(0, 2, size=(1,))[0]
|
||||
if process_item_idx == 0:
|
||||
new_y = repeat_P(y_o[b])
|
||||
reject_y.append(new_y)
|
||||
|
||||
@ -499,7 +499,7 @@ class TTS:
|
||||
|
||||
if if_lora_v3 == True and os.path.exists(path_sovits) == False:
|
||||
info = path_sovits + i18n("SoVITS %s 底模缺失,无法加载相应 LoRA 权重" % model_version)
|
||||
raise FileExistsError(info)
|
||||
raise FileNotFoundError(info)
|
||||
|
||||
# dict_s2 = torch.load(weights_path, map_location=self.configs.device,weights_only=False)
|
||||
dict_s2 = load_sovits_new(weights_path)
|
||||
@ -1578,16 +1578,15 @@ class TTS:
|
||||
max_audio = np.abs(audio).max()
|
||||
if max_audio > 1:
|
||||
audio /= max_audio
|
||||
audio = (audio * 32768).astype(np.int16)
|
||||
audio = (audio * 32768).astype(np.int16)
|
||||
else:
|
||||
audio = audio.cpu().numpy()
|
||||
audio = (audio * 32768).astype(np.int16)
|
||||
t2 = time.perf_counter()
|
||||
print(f"超采样用时:{t2 - t1:.3f}s")
|
||||
else:
|
||||
# audio = audio.float() * 32768
|
||||
# audio = audio.to(dtype=torch.int16).clamp(-32768, 32767).cpu().numpy()
|
||||
|
||||
audio = audio.cpu().numpy()
|
||||
|
||||
audio = (audio * 32768).astype(np.int16)
|
||||
audio = (audio * 32768).astype(np.int16)
|
||||
|
||||
|
||||
# try:
|
||||
@ -1768,7 +1767,10 @@ class TTS:
|
||||
pos += chunk_len * upsample_rate
|
||||
|
||||
audio = self.sola_algorithm(audio_fragments, overlapped_len * upsample_rate)
|
||||
audio = audio[overlapped_len * upsample_rate : -padding_len * upsample_rate]
|
||||
if padding_len > 0:
|
||||
audio = audio[overlapped_len * upsample_rate : -padding_len * upsample_rate]
|
||||
else:
|
||||
audio = audio[overlapped_len * upsample_rate :]
|
||||
|
||||
audio_fragments = []
|
||||
for feat_len in feat_lens:
|
||||
|
||||
@ -92,7 +92,7 @@ def cut0(inp):
|
||||
if not set(inp).issubset(punctuation):
|
||||
return inp
|
||||
else:
|
||||
return "/n"
|
||||
return "\n"
|
||||
|
||||
|
||||
# 凑四句一切
|
||||
|
||||
@ -6,20 +6,7 @@
|
||||
全部按英文识别
|
||||
全部按日文识别
|
||||
"""
|
||||
import psutil
|
||||
import os
|
||||
|
||||
def set_high_priority():
|
||||
"""把当前 Python 进程设为 HIGH_PRIORITY_CLASS"""
|
||||
if os.name != "nt":
|
||||
return # 仅 Windows 有效
|
||||
p = psutil.Process(os.getpid())
|
||||
try:
|
||||
p.nice(psutil.HIGH_PRIORITY_CLASS)
|
||||
print("已将进程优先级设为 High")
|
||||
except psutil.AccessDenied:
|
||||
print("权限不足,无法修改优先级(请用管理员运行)")
|
||||
set_high_priority()
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
@ -60,7 +47,6 @@ import gradio as gr
|
||||
from TTS_infer_pack.text_segmentation_method import get_method
|
||||
from TTS_infer_pack.TTS import NO_PROMPT_ERROR, TTS, TTS_Config
|
||||
|
||||
from tools.assets import css, js, top_html
|
||||
from tools.i18n.i18n import I18nAuto, scan_language_list
|
||||
|
||||
language = os.environ.get("language", "Auto")
|
||||
@ -112,28 +98,66 @@ cut_method = {
|
||||
i18n("按标点符号切"): "cut5",
|
||||
}
|
||||
|
||||
from config import change_choices, get_weights_names, name2gpt_path, name2sovits_path
|
||||
# 推理参数预设系统
|
||||
INFERENCE_PRESETS = {
|
||||
i18n("快速合成"): {
|
||||
"batch_size": 1,
|
||||
"sample_steps": 8,
|
||||
"top_k": 5,
|
||||
"top_p": 1,
|
||||
"temperature": 1,
|
||||
"repetition_penalty": 1.35,
|
||||
"parallel_infer": True,
|
||||
"split_bucket": True,
|
||||
},
|
||||
i18n("高质量"): {
|
||||
"batch_size": 1,
|
||||
"sample_steps": 64,
|
||||
"top_k": 15,
|
||||
"top_p": 0.8,
|
||||
"temperature": 0.8,
|
||||
"repetition_penalty": 1.35,
|
||||
"parallel_infer": False,
|
||||
"split_bucket": False,
|
||||
},
|
||||
i18n("平衡"): {
|
||||
"batch_size": 20,
|
||||
"sample_steps": 32,
|
||||
"top_k": 5,
|
||||
"top_p": 1,
|
||||
"temperature": 1,
|
||||
"repetition_penalty": 1.35,
|
||||
"parallel_infer": True,
|
||||
"split_bucket": True,
|
||||
},
|
||||
i18n("自定义"): None, # 不应用任何预设
|
||||
}
|
||||
|
||||
SoVITS_names, GPT_names = get_weights_names()
|
||||
from config import pretrained_sovits_name
|
||||
|
||||
path_sovits_v3 = pretrained_sovits_name["v3"]
|
||||
path_sovits_v4 = pretrained_sovits_name["v4"]
|
||||
is_exist_s2gv3 = os.path.exists(path_sovits_v3)
|
||||
is_exist_s2gv4 = os.path.exists(path_sovits_v4)
|
||||
def apply_preset(preset_name):
|
||||
"""Apply inference preset and return updated parameter values"""
|
||||
if preset_name == i18n("自定义") or preset_name not in INFERENCE_PRESETS:
|
||||
# Return current values without changes
|
||||
return [gr.update() for _ in range(8)]
|
||||
|
||||
preset = INFERENCE_PRESETS[preset_name]
|
||||
return [
|
||||
gr.update(value=preset["batch_size"]),
|
||||
gr.update(value=preset["sample_steps"]),
|
||||
gr.update(value=preset["top_k"]),
|
||||
gr.update(value=preset["top_p"]),
|
||||
gr.update(value=preset["temperature"]),
|
||||
gr.update(value=preset["repetition_penalty"]),
|
||||
gr.update(value=preset["parallel_infer"]),
|
||||
gr.update(value=preset["split_bucket"]),
|
||||
]
|
||||
|
||||
tts_config = TTS_Config("GPT_SoVITS/configs/tts_infer.yaml")
|
||||
tts_config.device = device
|
||||
tts_config.is_half = is_half
|
||||
# tts_config.version = version
|
||||
tts_config.update_version(version)
|
||||
tts_config.version = version
|
||||
if gpt_path is not None:
|
||||
if "!" in gpt_path or "!" in gpt_path:
|
||||
gpt_path = name2gpt_path[gpt_path]
|
||||
tts_config.t2s_weights_path = gpt_path
|
||||
if sovits_path is not None:
|
||||
if "!" in sovits_path or "!" in sovits_path:
|
||||
sovits_path = name2sovits_path[sovits_path]
|
||||
tts_config.vits_weights_path = sovits_path
|
||||
if cnhubert_base_path is not None:
|
||||
tts_config.cnhuhbert_base_path = cnhubert_base_path
|
||||
@ -209,6 +233,40 @@ def custom_sort_key(s):
|
||||
return parts
|
||||
|
||||
|
||||
def change_choices():
|
||||
SoVITS_names, GPT_names = get_weights_names(GPT_weight_root, SoVITS_weight_root)
|
||||
return {"choices": sorted(SoVITS_names, key=custom_sort_key), "__type__": "update"}, {
|
||||
"choices": sorted(GPT_names, key=custom_sort_key),
|
||||
"__type__": "update",
|
||||
}
|
||||
|
||||
|
||||
path_sovits_v3 = "GPT_SoVITS/pretrained_models/s2Gv3.pth"
|
||||
path_sovits_v4 = "GPT_SoVITS/pretrained_models/gsv-v4-pretrained/s2Gv4.pth"
|
||||
is_exist_s2gv3 = os.path.exists(path_sovits_v3)
|
||||
is_exist_s2gv4 = os.path.exists(path_sovits_v4)
|
||||
pretrained_sovits_name = [
|
||||
"GPT_SoVITS/pretrained_models/s2G488k.pth",
|
||||
"GPT_SoVITS/pretrained_models/gsv-v2final-pretrained/s2G2333k.pth",
|
||||
"GPT_SoVITS/pretrained_models/s2Gv3.pth",
|
||||
"GPT_SoVITS/pretrained_models/gsv-v4-pretrained/s2Gv4.pth",
|
||||
]
|
||||
pretrained_gpt_name = [
|
||||
"GPT_SoVITS/pretrained_models/s1bert25hz-2kh-longer-epoch=68e-step=50232.ckpt",
|
||||
"GPT_SoVITS/pretrained_models/gsv-v2final-pretrained/s1bert25hz-5kh-longer-epoch=12-step=369668.ckpt",
|
||||
"GPT_SoVITS/pretrained_models/s1v3.ckpt",
|
||||
"GPT_SoVITS/pretrained_models/s1v3.ckpt",
|
||||
]
|
||||
|
||||
|
||||
_ = [[], []]
|
||||
for i in range(4):
|
||||
if os.path.exists(pretrained_gpt_name[i]):
|
||||
_[0].append(pretrained_gpt_name[i])
|
||||
if os.path.exists(pretrained_sovits_name[i]):
|
||||
_[-1].append(pretrained_sovits_name[i])
|
||||
pretrained_gpt_name, pretrained_sovits_name = _
|
||||
|
||||
if os.path.exists("./weight.json"):
|
||||
pass
|
||||
else:
|
||||
@ -218,28 +276,50 @@ else:
|
||||
with open("./weight.json", "r", encoding="utf-8") as file:
|
||||
weight_data = file.read()
|
||||
weight_data = json.loads(weight_data)
|
||||
gpt_path = os.environ.get("gpt_path", weight_data.get("GPT", {}).get(version, GPT_names[-1]))
|
||||
sovits_path = os.environ.get("sovits_path", weight_data.get("SoVITS", {}).get(version, SoVITS_names[0]))
|
||||
gpt_path = os.environ.get("gpt_path", weight_data.get("GPT", {}).get(version, pretrained_gpt_name))
|
||||
sovits_path = os.environ.get("sovits_path", weight_data.get("SoVITS", {}).get(version, pretrained_sovits_name))
|
||||
if isinstance(gpt_path, list):
|
||||
gpt_path = gpt_path[0]
|
||||
if isinstance(sovits_path, list):
|
||||
sovits_path = sovits_path[0]
|
||||
|
||||
|
||||
SoVITS_weight_root = ["SoVITS_weights", "SoVITS_weights_v2", "SoVITS_weights_v3", "SoVITS_weights_v4"]
|
||||
GPT_weight_root = ["GPT_weights", "GPT_weights_v2", "GPT_weights_v3", "GPT_weights_v4"]
|
||||
for path in SoVITS_weight_root + GPT_weight_root:
|
||||
os.makedirs(path, exist_ok=True)
|
||||
|
||||
|
||||
def get_weights_names(GPT_weight_root, SoVITS_weight_root):
|
||||
SoVITS_names = [i for i in pretrained_sovits_name]
|
||||
for path in SoVITS_weight_root:
|
||||
for name in os.listdir(path):
|
||||
if name.endswith(".pth"):
|
||||
SoVITS_names.append("%s/%s" % (path, name))
|
||||
GPT_names = [i for i in pretrained_gpt_name]
|
||||
for path in GPT_weight_root:
|
||||
for name in os.listdir(path):
|
||||
if name.endswith(".ckpt"):
|
||||
GPT_names.append("%s/%s" % (path, name))
|
||||
return SoVITS_names, GPT_names
|
||||
|
||||
|
||||
SoVITS_names, GPT_names = get_weights_names(GPT_weight_root, SoVITS_weight_root)
|
||||
|
||||
|
||||
from process_ckpt import get_sovits_version_from_path_fast
|
||||
|
||||
v3v4set = {"v3", "v4"}
|
||||
|
||||
|
||||
def change_sovits_weights(sovits_path, prompt_language=None, text_language=None):
|
||||
if "!" in sovits_path or "!" in sovits_path:
|
||||
sovits_path = name2sovits_path[sovits_path]
|
||||
global version, model_version, dict_language, if_lora_v3
|
||||
version, model_version, if_lora_v3 = get_sovits_version_from_path_fast(sovits_path)
|
||||
# print(sovits_path,version, model_version, if_lora_v3)
|
||||
is_exist = is_exist_s2gv3 if model_version == "v3" else is_exist_s2gv4
|
||||
path_sovits = path_sovits_v3 if model_version == "v3" else path_sovits_v4
|
||||
if if_lora_v3 == True and is_exist == False:
|
||||
info = path_sovits + "SoVITS %s" % model_version + i18n("底模缺失,无法加载相应 LoRA 权重")
|
||||
info = path_sovits + i18n("SoVITS %s 底模缺失,无法加载相应 LoRA 权重" % model_version)
|
||||
gr.Warning(info)
|
||||
raise FileExistsError(info)
|
||||
dict_language = dict_language_v1 if version == "v1" else dict_language_v2
|
||||
@ -297,19 +377,11 @@ def change_sovits_weights(sovits_path, prompt_language=None, text_language=None)
|
||||
f.write(json.dumps(data))
|
||||
|
||||
|
||||
def change_gpt_weights(gpt_path):
|
||||
if "!" in gpt_path or "!" in gpt_path:
|
||||
gpt_path = name2gpt_path[gpt_path]
|
||||
tts_pipeline.init_t2s_weights(gpt_path)
|
||||
|
||||
|
||||
with gr.Blocks(title="GPT-SoVITS WebUI", analytics_enabled=False, js=js, css=css) as app:
|
||||
gr.HTML(
|
||||
top_html.format(
|
||||
i18n("本软件以MIT协议开源, 作者不对软件具备任何控制力, 使用软件者、传播软件导出的声音者自负全责.")
|
||||
+ i18n("如不认可该条款, 则不能使用或引用软件包内任何代码和文件. 详见根目录LICENSE.")
|
||||
),
|
||||
elem_classes="markdown",
|
||||
with gr.Blocks(title="GPT-SoVITS WebUI", analytics_enabled=False) as app:
|
||||
gr.Markdown(
|
||||
value=i18n("本软件以MIT协议开源, 作者不对软件具备任何控制力, 使用软件者、传播软件导出的声音者自负全责.")
|
||||
+ "<br>"
|
||||
+ i18n("如不认可该条款, 则不能使用或引用软件包内任何代码和文件. 详见根目录LICENSE.")
|
||||
)
|
||||
|
||||
with gr.Column():
|
||||
@ -368,6 +440,14 @@ with gr.Blocks(title="GPT-SoVITS WebUI", analytics_enabled=False, js=js, css=css
|
||||
|
||||
with gr.Group():
|
||||
gr.Markdown(value=i18n("推理设置"))
|
||||
with gr.Row():
|
||||
preset_dropdown = gr.Dropdown(
|
||||
label=i18n("参数预设"),
|
||||
choices=list(INFERENCE_PRESETS.keys()),
|
||||
value=i18n("平衡"),
|
||||
interactive=True,
|
||||
info=i18n("选择预设可快速配置推理参数")
|
||||
)
|
||||
with gr.Row():
|
||||
with gr.Column():
|
||||
with gr.Row():
|
||||
@ -382,10 +462,10 @@ with gr.Blocks(title="GPT-SoVITS WebUI", analytics_enabled=False, js=js, css=css
|
||||
minimum=0.01, maximum=1, step=0.01, label=i18n("分段间隔(秒)"), value=0.3, interactive=True
|
||||
)
|
||||
speed_factor = gr.Slider(
|
||||
minimum=0.6, maximum=1.65, step=0.05, label="语速", value=1.0, interactive=True
|
||||
minimum=0.6, maximum=1.65, step=0.05, label=i18n("语速"), value=1.0, interactive=True
|
||||
)
|
||||
with gr.Row():
|
||||
top_k = gr.Slider(minimum=1, maximum=100, step=1, label=i18n("top_k"), value=15, interactive=True)
|
||||
top_k = gr.Slider(minimum=1, maximum=100, step=1, label=i18n("top_k"), value=5, interactive=True)
|
||||
top_p = gr.Slider(minimum=0, maximum=1, step=0.05, label=i18n("top_p"), value=1, interactive=True)
|
||||
with gr.Row():
|
||||
temperature = gr.Slider(
|
||||
@ -477,7 +557,14 @@ with gr.Blocks(title="GPT-SoVITS WebUI", analytics_enabled=False, js=js, css=css
|
||||
inference_button,
|
||||
],
|
||||
) #
|
||||
GPT_dropdown.change(change_gpt_weights, [GPT_dropdown], [])
|
||||
GPT_dropdown.change(tts_pipeline.init_t2s_weights, [GPT_dropdown], [])
|
||||
|
||||
# 预设选择事件绑定
|
||||
preset_dropdown.change(
|
||||
apply_preset,
|
||||
[preset_dropdown],
|
||||
[batch_size, sample_steps, top_k, top_p, temperature, repetition_penalty, parallel_infer, split_bucket]
|
||||
)
|
||||
|
||||
with gr.Group():
|
||||
gr.Markdown(
|
||||
|
||||
@ -87,7 +87,7 @@ def sync_buffer(buffers, average=True):
|
||||
for buffer, handle in handles:
|
||||
handle.wait()
|
||||
if average:
|
||||
buffer.data /= world_size
|
||||
buffer.data /= world_size()
|
||||
|
||||
|
||||
def sync_grad(params):
|
||||
|
||||
@ -55,6 +55,10 @@ def main():
|
||||
n_gpus = torch.cuda.device_count()
|
||||
else:
|
||||
n_gpus = 1
|
||||
if n_gpus <= 1:
|
||||
run(0, n_gpus, hps)
|
||||
return
|
||||
|
||||
os.environ["MASTER_ADDR"] = "localhost"
|
||||
os.environ["MASTER_PORT"] = str(randint(20000, 55555))
|
||||
|
||||
@ -77,12 +81,14 @@ def run(rank, n_gpus, hps):
|
||||
writer = SummaryWriter(log_dir=hps.s2_ckpt_dir)
|
||||
writer_eval = SummaryWriter(log_dir=os.path.join(hps.s2_ckpt_dir, "eval"))
|
||||
|
||||
dist.init_process_group(
|
||||
backend="gloo" if os.name == "nt" or not torch.cuda.is_available() else "nccl",
|
||||
init_method="env://?use_libuv=False",
|
||||
world_size=n_gpus,
|
||||
rank=rank,
|
||||
)
|
||||
use_ddp = n_gpus > 1
|
||||
if use_ddp:
|
||||
dist.init_process_group(
|
||||
backend="gloo" if os.name == "nt" or not torch.cuda.is_available() else "nccl",
|
||||
init_method="env://?use_libuv=False",
|
||||
world_size=n_gpus,
|
||||
rank=rank,
|
||||
)
|
||||
torch.manual_seed(hps.train.seed)
|
||||
if torch.cuda.is_available():
|
||||
torch.cuda.set_device(rank)
|
||||
@ -118,15 +124,20 @@ def run(rank, n_gpus, hps):
|
||||
shuffle=True,
|
||||
)
|
||||
collate_fn = TextAudioSpeakerCollate()
|
||||
train_loader = DataLoader(
|
||||
train_dataset,
|
||||
num_workers=5,
|
||||
worker_count = 0 if os.name == "nt" and n_gpus <= 1 else min(2 if os.name == "nt" else 5, os.cpu_count() or 1)
|
||||
loader_kwargs = dict(
|
||||
num_workers=worker_count,
|
||||
shuffle=False,
|
||||
pin_memory=True,
|
||||
pin_memory=torch.cuda.is_available(),
|
||||
collate_fn=collate_fn,
|
||||
batch_sampler=train_sampler,
|
||||
persistent_workers=True,
|
||||
prefetch_factor=3,
|
||||
)
|
||||
if worker_count > 0:
|
||||
loader_kwargs["persistent_workers"] = True
|
||||
loader_kwargs["prefetch_factor"] = 2 if os.name == "nt" else 3
|
||||
train_loader = DataLoader(
|
||||
train_dataset,
|
||||
**loader_kwargs,
|
||||
)
|
||||
save_root = "%s/logs_s2_%s_lora_%s" % (hps.data.exp_dir, hps.model.version, hps.train.lora_rank)
|
||||
os.makedirs(save_root, exist_ok=True)
|
||||
@ -156,7 +167,9 @@ def run(rank, n_gpus, hps):
|
||||
|
||||
def model2cuda(net_g, rank):
|
||||
if torch.cuda.is_available():
|
||||
net_g = DDP(net_g.cuda(rank), device_ids=[rank], find_unused_parameters=True)
|
||||
net_g = net_g.cuda(rank)
|
||||
if use_ddp:
|
||||
net_g = DDP(net_g, device_ids=[rank], find_unused_parameters=True)
|
||||
else:
|
||||
net_g = net_g.to(device)
|
||||
return net_g
|
||||
@ -242,6 +255,8 @@ def run(rank, n_gpus, hps):
|
||||
None,
|
||||
)
|
||||
scheduler_g.step()
|
||||
if use_ddp and dist.is_initialized():
|
||||
dist.destroy_process_group()
|
||||
print("training done")
|
||||
|
||||
|
||||
|
||||
@ -180,10 +180,15 @@ def _merge_erhua(initials: list[str], finals: list[str], word: str, pos: str) ->
|
||||
def _g2p(segments):
|
||||
phones_list = []
|
||||
word2ph = []
|
||||
for seg in segments:
|
||||
g2pw_batch_results = []
|
||||
g2pw_batch_cursor = 0
|
||||
processed_segments = [re.sub("[a-zA-Z]+", "", seg) for seg in segments]
|
||||
if is_g2pw:
|
||||
batch_inputs = [seg for seg in processed_segments if seg]
|
||||
g2pw_batch_results = g2pw._g2pw(batch_inputs) if batch_inputs else []
|
||||
|
||||
for seg in processed_segments:
|
||||
pinyins = []
|
||||
# Replace all English words in the sentence
|
||||
seg = re.sub("[a-zA-Z]+", "", seg)
|
||||
seg_cut = psg.lcut(seg)
|
||||
seg_cut = tone_modifier.pre_merge_for_modify(seg_cut)
|
||||
initials = []
|
||||
@ -204,8 +209,10 @@ def _g2p(segments):
|
||||
finals = sum(finals, [])
|
||||
print("pypinyin结果", initials, finals)
|
||||
else:
|
||||
# g2pw采用整句推理
|
||||
pinyins = g2pw.lazy_pinyin(seg, neutral_tone_with_five=True, style=Style.TONE3)
|
||||
# g2pw采用整句推理(批量推理,逐句取结果)
|
||||
if seg:
|
||||
pinyins = g2pw_batch_results[g2pw_batch_cursor]
|
||||
g2pw_batch_cursor += 1
|
||||
|
||||
pre_word_length = 0
|
||||
for word, pos in seg_cut:
|
||||
|
||||
@ -18,6 +18,7 @@ Credits
|
||||
|
||||
from typing import Dict
|
||||
from typing import List
|
||||
from typing import Optional
|
||||
from typing import Tuple
|
||||
|
||||
import numpy as np
|
||||
@ -37,6 +38,8 @@ def prepare_onnx_input(
|
||||
use_mask: bool = False,
|
||||
window_size: int = None,
|
||||
max_len: int = 512,
|
||||
char2id: Optional[Dict[str, int]] = None,
|
||||
char_phoneme_masks: Optional[Dict[str, List[int]]] = None,
|
||||
) -> Dict[str, np.array]:
|
||||
if window_size is not None:
|
||||
truncated_texts, truncated_query_ids = _truncate_texts(
|
||||
@ -48,33 +51,88 @@ def prepare_onnx_input(
|
||||
phoneme_masks = []
|
||||
char_ids = []
|
||||
position_ids = []
|
||||
tokenized_cache = {}
|
||||
|
||||
if char2id is None:
|
||||
char2id = {char: idx for idx, char in enumerate(chars)}
|
||||
if use_mask:
|
||||
if char_phoneme_masks is None:
|
||||
char_phoneme_masks = {
|
||||
char: [1 if i in char2phonemes[char] else 0 for i in range(len(labels))]
|
||||
for char in char2phonemes
|
||||
}
|
||||
else:
|
||||
full_phoneme_mask = [1] * len(labels)
|
||||
|
||||
for idx in range(len(texts)):
|
||||
text = (truncated_texts if window_size else texts)[idx].lower()
|
||||
query_id = (truncated_query_ids if window_size else query_ids)[idx]
|
||||
|
||||
try:
|
||||
tokens, text2token, token2text = tokenize_and_map(tokenizer=tokenizer, text=text)
|
||||
except Exception:
|
||||
print(f'warning: text "{text}" is invalid')
|
||||
return {}
|
||||
cached = tokenized_cache.get(text)
|
||||
if cached is None:
|
||||
try:
|
||||
tokens, text2token, token2text = tokenize_and_map(tokenizer=tokenizer, text=text)
|
||||
except Exception:
|
||||
print(f'warning: text "{text}" is invalid')
|
||||
return {}
|
||||
|
||||
text, query_id, tokens, text2token, token2text = _truncate(
|
||||
max_len=max_len, text=text, query_id=query_id, tokens=tokens, text2token=text2token, token2text=token2text
|
||||
)
|
||||
if len(tokens) <= max_len - 2:
|
||||
processed_tokens = ["[CLS]"] + tokens + ["[SEP]"]
|
||||
shared_input_id = list(np.array(tokenizer.convert_tokens_to_ids(processed_tokens)))
|
||||
shared_token_type_id = list(np.zeros((len(processed_tokens),), dtype=int))
|
||||
shared_attention_mask = list(np.ones((len(processed_tokens),), dtype=int))
|
||||
cached = {
|
||||
"is_short": True,
|
||||
"tokens": tokens,
|
||||
"text2token": text2token,
|
||||
"token2text": token2text,
|
||||
"input_id": shared_input_id,
|
||||
"token_type_id": shared_token_type_id,
|
||||
"attention_mask": shared_attention_mask,
|
||||
}
|
||||
else:
|
||||
cached = {
|
||||
"is_short": False,
|
||||
"tokens": tokens,
|
||||
"text2token": text2token,
|
||||
"token2text": token2text,
|
||||
}
|
||||
tokenized_cache[text] = cached
|
||||
|
||||
processed_tokens = ["[CLS]"] + tokens + ["[SEP]"]
|
||||
if cached["is_short"]:
|
||||
text_for_query = text
|
||||
query_id_for_query = query_id
|
||||
text2token_for_query = cached["text2token"]
|
||||
input_id = cached["input_id"]
|
||||
token_type_id = cached["token_type_id"]
|
||||
attention_mask = cached["attention_mask"]
|
||||
else:
|
||||
(
|
||||
text_for_query,
|
||||
query_id_for_query,
|
||||
tokens_for_query,
|
||||
text2token_for_query,
|
||||
_token2text_for_query,
|
||||
) = _truncate(
|
||||
max_len=max_len,
|
||||
text=text,
|
||||
query_id=query_id,
|
||||
tokens=cached["tokens"],
|
||||
text2token=cached["text2token"],
|
||||
token2text=cached["token2text"],
|
||||
)
|
||||
processed_tokens = ["[CLS]"] + tokens_for_query + ["[SEP]"]
|
||||
input_id = list(np.array(tokenizer.convert_tokens_to_ids(processed_tokens)))
|
||||
token_type_id = list(np.zeros((len(processed_tokens),), dtype=int))
|
||||
attention_mask = list(np.ones((len(processed_tokens),), dtype=int))
|
||||
|
||||
input_id = list(np.array(tokenizer.convert_tokens_to_ids(processed_tokens)))
|
||||
token_type_id = list(np.zeros((len(processed_tokens),), dtype=int))
|
||||
attention_mask = list(np.ones((len(processed_tokens),), dtype=int))
|
||||
|
||||
query_char = text[query_id]
|
||||
phoneme_mask = (
|
||||
[1 if i in char2phonemes[query_char] else 0 for i in range(len(labels))] if use_mask else [1] * len(labels)
|
||||
)
|
||||
char_id = chars.index(query_char)
|
||||
position_id = text2token[query_id] + 1 # [CLS] token locate at first place
|
||||
query_char = text_for_query[query_id_for_query]
|
||||
if use_mask:
|
||||
phoneme_mask = char_phoneme_masks[query_char]
|
||||
else:
|
||||
phoneme_mask = full_phoneme_mask
|
||||
char_id = char2id[query_char]
|
||||
position_id = text2token_for_query[query_id_for_query] + 1 # [CLS] token locate at first place
|
||||
|
||||
input_ids.append(input_id)
|
||||
token_type_ids.append(token_type_id)
|
||||
@ -83,10 +141,15 @@ def prepare_onnx_input(
|
||||
char_ids.append(char_id)
|
||||
position_ids.append(position_id)
|
||||
|
||||
max_token_length = max(len(seq) for seq in input_ids)
|
||||
|
||||
def _pad_sequences(sequences, pad_value=0):
|
||||
return [seq + [pad_value] * (max_token_length - len(seq)) for seq in sequences]
|
||||
|
||||
outputs = {
|
||||
"input_ids": np.array(input_ids).astype(np.int64),
|
||||
"token_type_ids": np.array(token_type_ids).astype(np.int64),
|
||||
"attention_masks": np.array(attention_masks).astype(np.int64),
|
||||
"input_ids": np.array(_pad_sequences(input_ids, pad_value=0)).astype(np.int64),
|
||||
"token_type_ids": np.array(_pad_sequences(token_type_ids, pad_value=0)).astype(np.int64),
|
||||
"attention_masks": np.array(_pad_sequences(attention_masks, pad_value=0)).astype(np.int64),
|
||||
"phoneme_masks": np.array(phoneme_masks).astype(np.float32),
|
||||
"char_ids": np.array(char_ids).astype(np.int64),
|
||||
"position_ids": np.array(position_ids).astype(np.int64),
|
||||
|
||||
@ -10,7 +10,6 @@ from typing import Any, Dict, List, Tuple
|
||||
import numpy as np
|
||||
import onnxruntime
|
||||
import requests
|
||||
import torch
|
||||
from opencc import OpenCC
|
||||
from pypinyin import Style, pinyin
|
||||
from transformers.models.auto.tokenization_auto import AutoTokenizer
|
||||
@ -22,9 +21,8 @@ from .utils import load_config
|
||||
onnxruntime.set_default_logger_severity(3)
|
||||
try:
|
||||
onnxruntime.preload_dlls()
|
||||
except:
|
||||
except Exception:
|
||||
pass
|
||||
# traceback.print_exc()
|
||||
warnings.filterwarnings("ignore")
|
||||
|
||||
model_version = "1.1"
|
||||
@ -55,6 +53,24 @@ def predict(session, onnx_input: Dict[str, Any], labels: List[str]) -> Tuple[Lis
|
||||
return all_preds, all_confidences
|
||||
|
||||
|
||||
def _load_json_from_candidates(filename: str, candidate_dirs: List[str]) -> Dict[str, Any]:
|
||||
for candidate_dir in candidate_dirs:
|
||||
if not candidate_dir:
|
||||
continue
|
||||
json_path = os.path.join(candidate_dir, filename)
|
||||
if os.path.exists(json_path):
|
||||
with open(json_path, "r", encoding="utf-8") as fr:
|
||||
return json.load(fr)
|
||||
raise FileNotFoundError(f"Cannot locate {filename} in candidate dirs: {candidate_dirs}")
|
||||
|
||||
|
||||
def _find_first_existing_file(*paths: str) -> str:
|
||||
for path in paths:
|
||||
if path and os.path.exists(path):
|
||||
return path
|
||||
raise FileNotFoundError(f"Files not found: {paths}")
|
||||
|
||||
|
||||
def download_and_decompress(model_dir: str = "G2PWModel/"):
|
||||
if not os.path.exists(model_dir):
|
||||
parent_directory = os.path.dirname(model_dir)
|
||||
@ -62,7 +78,7 @@ def download_and_decompress(model_dir: str = "G2PWModel/"):
|
||||
extract_dir = os.path.join(parent_directory, "G2PWModel_1.1")
|
||||
extract_dir_new = os.path.join(parent_directory, "G2PWModel")
|
||||
print("Downloading g2pw model...")
|
||||
modelscope_url = "https://www.modelscope.cn/models/kamiorinn/g2pw/resolve/master/G2PWModel_1.1.zip" # "https://paddlespeech.cdn.bcebos.com/Parakeet/released_models/g2p/G2PWModel_1.1.zip"
|
||||
modelscope_url = "https://www.modelscope.cn/models/kamiorinn/g2pw/resolve/master/G2PWModel_1.1.zip"
|
||||
with requests.get(modelscope_url, stream=True) as r:
|
||||
r.raise_for_status()
|
||||
with open(zip_dir, "wb") as f:
|
||||
@ -79,7 +95,7 @@ def download_and_decompress(model_dir: str = "G2PWModel/"):
|
||||
return model_dir
|
||||
|
||||
|
||||
class G2PWOnnxConverter:
|
||||
class _G2PWBaseOnnxConverter:
|
||||
def __init__(
|
||||
self,
|
||||
model_dir: str = "G2PWModel/",
|
||||
@ -87,33 +103,16 @@ class G2PWOnnxConverter:
|
||||
model_source: str = None,
|
||||
enable_non_tradional_chinese: bool = False,
|
||||
):
|
||||
uncompress_path = download_and_decompress(model_dir)
|
||||
|
||||
sess_options = onnxruntime.SessionOptions()
|
||||
sess_options.graph_optimization_level = onnxruntime.GraphOptimizationLevel.ORT_ENABLE_ALL
|
||||
sess_options.execution_mode = onnxruntime.ExecutionMode.ORT_SEQUENTIAL
|
||||
sess_options.intra_op_num_threads = 2 if torch.cuda.is_available() else 0
|
||||
if "CUDAExecutionProvider" in onnxruntime.get_available_providers():
|
||||
self.session_g2pW = onnxruntime.InferenceSession(
|
||||
os.path.join(uncompress_path, "g2pW.onnx"),
|
||||
sess_options=sess_options,
|
||||
providers=["CUDAExecutionProvider", "CPUExecutionProvider"],
|
||||
)
|
||||
else:
|
||||
self.session_g2pW = onnxruntime.InferenceSession(
|
||||
os.path.join(uncompress_path, "g2pW.onnx"),
|
||||
sess_options=sess_options,
|
||||
providers=["CPUExecutionProvider"],
|
||||
)
|
||||
self.config = load_config(config_path=os.path.join(uncompress_path, "config.py"), use_default=True)
|
||||
self.model_dir = download_and_decompress(model_dir)
|
||||
self.config = load_config(config_path=os.path.join(self.model_dir, "config.py"), use_default=True)
|
||||
|
||||
self.model_source = model_source if model_source else self.config.model_source
|
||||
self.enable_opencc = enable_non_tradional_chinese
|
||||
|
||||
self.tokenizer = AutoTokenizer.from_pretrained(self.model_source)
|
||||
|
||||
polyphonic_chars_path = os.path.join(uncompress_path, "POLYPHONIC_CHARS.txt")
|
||||
monophonic_chars_path = os.path.join(uncompress_path, "MONOPHONIC_CHARS.txt")
|
||||
polyphonic_chars_path = os.path.join(self.model_dir, "POLYPHONIC_CHARS.txt")
|
||||
monophonic_chars_path = os.path.join(self.model_dir, "MONOPHONIC_CHARS.txt")
|
||||
|
||||
self.polyphonic_chars = [
|
||||
line.split("\t") for line in open(polyphonic_chars_path, encoding="utf-8").read().strip().split("\n")
|
||||
]
|
||||
@ -149,31 +148,47 @@ class G2PWOnnxConverter:
|
||||
)
|
||||
|
||||
self.chars = sorted(list(self.char2phonemes.keys()))
|
||||
self.char2id = {char: idx for idx, char in enumerate(self.chars)}
|
||||
self.char_phoneme_masks = (
|
||||
{
|
||||
char: [1 if i in self.char2phonemes[char] else 0 for i in range(len(self.labels))]
|
||||
for char in self.char2phonemes
|
||||
}
|
||||
if self.config.use_mask
|
||||
else None
|
||||
)
|
||||
|
||||
self.polyphonic_chars_new = set(self.chars)
|
||||
for char in self.non_polyphonic:
|
||||
if char in self.polyphonic_chars_new:
|
||||
self.polyphonic_chars_new.remove(char)
|
||||
self.polyphonic_chars_new.discard(char)
|
||||
|
||||
self.monophonic_chars_dict = {char: phoneme for char, phoneme in self.monophonic_chars}
|
||||
for char in self.non_monophonic:
|
||||
if char in self.monophonic_chars_dict:
|
||||
self.monophonic_chars_dict.pop(char)
|
||||
self.monophonic_chars_dict.pop(char, None)
|
||||
|
||||
self.pos_tags = ["UNK", "A", "C", "D", "I", "N", "P", "T", "V", "DE", "SHI"]
|
||||
default_asset_dir = os.path.normpath(os.path.join(os.path.dirname(__file__), "..", "G2PWModel"))
|
||||
candidate_asset_dirs = [self.model_dir, default_asset_dir]
|
||||
self.bopomofo_convert_dict = _load_json_from_candidates(
|
||||
"bopomofo_to_pinyin_wo_tune_dict.json", candidate_asset_dirs
|
||||
)
|
||||
self.char_bopomofo_dict = _load_json_from_candidates("char_bopomofo_dict.json", candidate_asset_dirs)
|
||||
|
||||
with open(os.path.join(uncompress_path, "bopomofo_to_pinyin_wo_tune_dict.json"), "r", encoding="utf-8") as fr:
|
||||
self.bopomofo_convert_dict = json.load(fr)
|
||||
self.style_convert_func = {
|
||||
"bopomofo": lambda x: x,
|
||||
"pinyin": self._convert_bopomofo_to_pinyin,
|
||||
}[style]
|
||||
|
||||
with open(os.path.join(uncompress_path, "char_bopomofo_dict.json"), "r", encoding="utf-8") as fr:
|
||||
self.char_bopomofo_dict = json.load(fr)
|
||||
|
||||
if self.enable_opencc:
|
||||
self.cc = OpenCC("s2tw")
|
||||
self.enable_sentence_dedup = os.getenv("g2pw_sentence_dedup", "true").strip().lower() in {
|
||||
"1",
|
||||
"true",
|
||||
"yes",
|
||||
"y",
|
||||
"on",
|
||||
}
|
||||
# 聚焦到多音字附近上下文,默认左右各16字;设为0表示关闭裁剪(整句)。
|
||||
self.polyphonic_context_chars = max(0, int(os.getenv("g2pw_polyphonic_context_chars", "16")))
|
||||
|
||||
def _convert_bopomofo_to_pinyin(self, bopomofo: str) -> str:
|
||||
tone = bopomofo[-1]
|
||||
@ -181,9 +196,8 @@ class G2PWOnnxConverter:
|
||||
component = self.bopomofo_convert_dict.get(bopomofo[:-1])
|
||||
if component:
|
||||
return component + tone
|
||||
else:
|
||||
print(f'Warning: "{bopomofo}" cannot convert to pinyin')
|
||||
return None
|
||||
print(f'Warning: "{bopomofo}" cannot convert to pinyin')
|
||||
return None
|
||||
|
||||
def __call__(self, sentences: List[str]) -> List[List[str]]:
|
||||
if isinstance(sentences, str):
|
||||
@ -197,51 +211,147 @@ class G2PWOnnxConverter:
|
||||
translated_sentences.append(translated_sent)
|
||||
sentences = translated_sentences
|
||||
|
||||
texts, query_ids, sent_ids, partial_results = self._prepare_data(sentences=sentences)
|
||||
texts, model_query_ids, result_query_ids, sent_ids, partial_results = self._prepare_data(sentences=sentences)
|
||||
if len(texts) == 0:
|
||||
# sentences no polyphonic words
|
||||
return partial_results
|
||||
|
||||
onnx_input = prepare_onnx_input(
|
||||
model_input = prepare_onnx_input(
|
||||
tokenizer=self.tokenizer,
|
||||
labels=self.labels,
|
||||
char2phonemes=self.char2phonemes,
|
||||
chars=self.chars,
|
||||
texts=texts,
|
||||
query_ids=query_ids,
|
||||
query_ids=model_query_ids,
|
||||
use_mask=self.config.use_mask,
|
||||
window_size=None,
|
||||
char2id=self.char2id,
|
||||
char_phoneme_masks=self.char_phoneme_masks,
|
||||
)
|
||||
|
||||
preds, confidences = predict(session=self.session_g2pW, onnx_input=onnx_input, labels=self.labels)
|
||||
if not model_input:
|
||||
return partial_results
|
||||
|
||||
if self.enable_sentence_dedup:
|
||||
preds, _confidences = self._predict_with_sentence_dedup(model_input=model_input, texts=texts)
|
||||
else:
|
||||
preds, _confidences = self._predict(model_input=model_input)
|
||||
|
||||
if self.config.use_char_phoneme:
|
||||
preds = [pred.split(" ")[1] for pred in preds]
|
||||
|
||||
results = partial_results
|
||||
for sent_id, query_id, pred in zip(sent_ids, query_ids, preds):
|
||||
for sent_id, query_id, pred in zip(sent_ids, result_query_ids, preds):
|
||||
results[sent_id][query_id] = self.style_convert_func(pred)
|
||||
|
||||
return results
|
||||
|
||||
def _prepare_data(self, sentences: List[str]) -> Tuple[List[str], List[int], List[int], List[List[str]]]:
|
||||
texts, query_ids, sent_ids, partial_results = [], [], [], []
|
||||
def _prepare_data(
|
||||
self, sentences: List[str]
|
||||
) -> Tuple[List[str], List[int], List[int], List[int], List[List[str]]]:
|
||||
texts, model_query_ids, result_query_ids, sent_ids, partial_results = [], [], [], [], []
|
||||
for sent_id, sent in enumerate(sentences):
|
||||
# pypinyin works well for Simplified Chinese than Traditional Chinese
|
||||
sent_s = tranditional_to_simplified(sent)
|
||||
pypinyin_result = pinyin(sent_s, neutral_tone_with_five=True, style=Style.TONE3)
|
||||
partial_result = [None] * len(sent)
|
||||
polyphonic_indices: List[int] = []
|
||||
for i, char in enumerate(sent):
|
||||
if char in self.polyphonic_chars_new:
|
||||
texts.append(sent)
|
||||
query_ids.append(i)
|
||||
sent_ids.append(sent_id)
|
||||
polyphonic_indices.append(i)
|
||||
elif char in self.monophonic_chars_dict:
|
||||
partial_result[i] = self.style_convert_func(self.monophonic_chars_dict[char])
|
||||
elif char in self.char_bopomofo_dict:
|
||||
partial_result[i] = pypinyin_result[i][0]
|
||||
# partial_result[i] = self.style_convert_func(self.char_bopomofo_dict[char][0])
|
||||
else:
|
||||
partial_result[i] = pypinyin_result[i][0]
|
||||
|
||||
if polyphonic_indices:
|
||||
if self.polyphonic_context_chars > 0:
|
||||
left = max(0, polyphonic_indices[0] - self.polyphonic_context_chars)
|
||||
right = min(len(sent), polyphonic_indices[-1] + self.polyphonic_context_chars + 1)
|
||||
sent_for_predict = sent[left:right]
|
||||
query_offset = left
|
||||
else:
|
||||
sent_for_predict = sent
|
||||
query_offset = 0
|
||||
|
||||
for index in polyphonic_indices:
|
||||
texts.append(sent_for_predict)
|
||||
model_query_ids.append(index - query_offset)
|
||||
result_query_ids.append(index)
|
||||
sent_ids.append(sent_id)
|
||||
|
||||
partial_results.append(partial_result)
|
||||
return texts, query_ids, sent_ids, partial_results
|
||||
return texts, model_query_ids, result_query_ids, sent_ids, partial_results
|
||||
|
||||
def _predict(self, model_input: Dict[str, Any]) -> Tuple[List[str], List[float]]:
|
||||
raise NotImplementedError
|
||||
|
||||
def _predict_with_sentence_dedup(
|
||||
self, model_input: Dict[str, Any], texts: List[str]
|
||||
) -> Tuple[List[str], List[float]]:
|
||||
if len(texts) <= 1:
|
||||
return self._predict(model_input=model_input)
|
||||
|
||||
grouped_indices: Dict[str, List[int]] = {}
|
||||
for idx, text in enumerate(texts):
|
||||
grouped_indices.setdefault(text, []).append(idx)
|
||||
|
||||
if all(len(indices) == 1 for indices in grouped_indices.values()):
|
||||
return self._predict(model_input=model_input)
|
||||
|
||||
preds: List[str] = [""] * len(texts)
|
||||
confidences: List[float] = [0.0] * len(texts)
|
||||
for indices in grouped_indices.values():
|
||||
group_input = {name: value[indices] for name, value in model_input.items()}
|
||||
if len(indices) > 1:
|
||||
for name in ("input_ids", "token_type_ids", "attention_masks"):
|
||||
group_input[name] = group_input[name][:1]
|
||||
|
||||
group_preds, group_confidences = self._predict(model_input=group_input)
|
||||
for output_idx, pred, confidence in zip(indices, group_preds, group_confidences):
|
||||
preds[output_idx] = pred
|
||||
confidences[output_idx] = confidence
|
||||
|
||||
return preds, confidences
|
||||
|
||||
|
||||
class G2PWOnnxConverter(_G2PWBaseOnnxConverter):
|
||||
def __init__(
|
||||
self,
|
||||
model_dir: str = "G2PWModel/",
|
||||
style: str = "bopomofo",
|
||||
model_source: str = None,
|
||||
enable_non_tradional_chinese: bool = False,
|
||||
):
|
||||
super().__init__(
|
||||
model_dir=model_dir,
|
||||
style=style,
|
||||
model_source=model_source,
|
||||
enable_non_tradional_chinese=enable_non_tradional_chinese,
|
||||
)
|
||||
|
||||
sess_options = onnxruntime.SessionOptions()
|
||||
sess_options.graph_optimization_level = onnxruntime.GraphOptimizationLevel.ORT_ENABLE_ALL
|
||||
sess_options.execution_mode = onnxruntime.ExecutionMode.ORT_SEQUENTIAL
|
||||
sess_options.intra_op_num_threads = 2
|
||||
|
||||
onnx_path = _find_first_existing_file(
|
||||
os.path.join(self.model_dir, "g2pW.onnx"),
|
||||
os.path.join(self.model_dir, "g2pw.onnx"),
|
||||
)
|
||||
|
||||
if "CUDAExecutionProvider" in onnxruntime.get_available_providers():
|
||||
self.session_g2pw = onnxruntime.InferenceSession(
|
||||
onnx_path,
|
||||
sess_options=sess_options,
|
||||
providers=["CUDAExecutionProvider", "CPUExecutionProvider"],
|
||||
)
|
||||
else:
|
||||
self.session_g2pw = onnxruntime.InferenceSession(
|
||||
onnx_path,
|
||||
sess_options=sess_options,
|
||||
providers=["CPUExecutionProvider"],
|
||||
)
|
||||
|
||||
def _predict(self, model_input: Dict[str, Any]) -> Tuple[List[str], List[float]]:
|
||||
return predict(session=self.session_g2pw, onnx_input=model_input, labels=self.labels)
|
||||
|
||||
454
api.py
454
api.py
@ -163,7 +163,7 @@ from transformers import AutoModelForMaskedLM, AutoTokenizer
|
||||
import numpy as np
|
||||
from feature_extractor import cnhubert
|
||||
from io import BytesIO
|
||||
from module.models import Generator, SynthesizerTrn, SynthesizerTrnV3
|
||||
from module.models import SynthesizerTrn, SynthesizerTrnV3
|
||||
from peft import LoraConfig, get_peft_model
|
||||
from AR.models.t2s_lightning_module import Text2SemanticLightningModule
|
||||
from text import cleaned_text_to_sequence
|
||||
@ -198,44 +198,39 @@ def is_full(*items): # 任意一项为空返回False
|
||||
return True
|
||||
|
||||
|
||||
bigvgan_model = hifigan_model = sv_cn_model = None
|
||||
|
||||
|
||||
def clean_hifigan_model():
|
||||
global hifigan_model
|
||||
if hifigan_model:
|
||||
hifigan_model = hifigan_model.cpu()
|
||||
hifigan_model = None
|
||||
try:
|
||||
torch.cuda.empty_cache()
|
||||
except:
|
||||
pass
|
||||
|
||||
|
||||
def clean_bigvgan_model():
|
||||
global bigvgan_model
|
||||
if bigvgan_model:
|
||||
bigvgan_model = bigvgan_model.cpu()
|
||||
bigvgan_model = None
|
||||
try:
|
||||
torch.cuda.empty_cache()
|
||||
except:
|
||||
pass
|
||||
|
||||
|
||||
def clean_sv_cn_model():
|
||||
global sv_cn_model
|
||||
if sv_cn_model:
|
||||
sv_cn_model.embedding_model = sv_cn_model.embedding_model.cpu()
|
||||
sv_cn_model = None
|
||||
try:
|
||||
torch.cuda.empty_cache()
|
||||
except:
|
||||
pass
|
||||
def normalize_api_params(params: dict) -> dict:
|
||||
"""
|
||||
Normalize API parameters to support both v1 and v2 naming conventions.
|
||||
This provides backward compatibility for different API versions.
|
||||
|
||||
Mapping (v2 -> v1):
|
||||
ref_audio_path -> refer_wav_path
|
||||
text_lang -> text_language
|
||||
prompt_lang -> prompt_language
|
||||
speed_factor -> speed
|
||||
"""
|
||||
param_mappings = {
|
||||
# v2 style -> v1 style
|
||||
"ref_audio_path": "refer_wav_path",
|
||||
"text_lang": "text_language",
|
||||
"prompt_lang": "prompt_language",
|
||||
"speed_factor": "speed",
|
||||
"aux_ref_audio_paths": "inp_refs",
|
||||
}
|
||||
|
||||
normalized = {}
|
||||
for key, value in params.items():
|
||||
# If this is a v2 parameter name, map it to v1; otherwise keep original
|
||||
normalized_key = param_mappings.get(key, key)
|
||||
# Don't overwrite if v1 key already exists
|
||||
if normalized_key not in normalized:
|
||||
normalized[normalized_key] = value
|
||||
|
||||
return normalized
|
||||
|
||||
|
||||
def init_bigvgan():
|
||||
global bigvgan_model, hifigan_model, sv_cn_model
|
||||
global bigvgan_model
|
||||
from BigVGAN import bigvgan
|
||||
|
||||
bigvgan_model = bigvgan.BigVGAN.from_pretrained(
|
||||
@ -245,57 +240,20 @@ def init_bigvgan():
|
||||
# remove weight norm in the model and set to eval mode
|
||||
bigvgan_model.remove_weight_norm()
|
||||
bigvgan_model = bigvgan_model.eval()
|
||||
|
||||
if is_half == True:
|
||||
bigvgan_model = bigvgan_model.half().to(device)
|
||||
else:
|
||||
bigvgan_model = bigvgan_model.to(device)
|
||||
|
||||
|
||||
def init_hifigan():
|
||||
global hifigan_model, bigvgan_model, sv_cn_model
|
||||
hifigan_model = Generator(
|
||||
initial_channel=100,
|
||||
resblock="1",
|
||||
resblock_kernel_sizes=[3, 7, 11],
|
||||
resblock_dilation_sizes=[[1, 3, 5], [1, 3, 5], [1, 3, 5]],
|
||||
upsample_rates=[10, 6, 2, 2, 2],
|
||||
upsample_initial_channel=512,
|
||||
upsample_kernel_sizes=[20, 12, 4, 4, 4],
|
||||
gin_channels=0,
|
||||
is_bias=True,
|
||||
)
|
||||
hifigan_model.eval()
|
||||
hifigan_model.remove_weight_norm()
|
||||
state_dict_g = torch.load(
|
||||
"%s/GPT_SoVITS/pretrained_models/gsv-v4-pretrained/vocoder.pth" % (now_dir,),
|
||||
map_location="cpu",
|
||||
weights_only=False,
|
||||
)
|
||||
print("loading vocoder", hifigan_model.load_state_dict(state_dict_g))
|
||||
if is_half == True:
|
||||
hifigan_model = hifigan_model.half().to(device)
|
||||
else:
|
||||
hifigan_model = hifigan_model.to(device)
|
||||
|
||||
|
||||
from sv import SV
|
||||
|
||||
|
||||
def init_sv_cn():
|
||||
global hifigan_model, bigvgan_model, sv_cn_model
|
||||
sv_cn_model = SV(device, is_half)
|
||||
|
||||
|
||||
resample_transform_dict = {}
|
||||
|
||||
|
||||
def resample(audio_tensor, sr0, sr1, device):
|
||||
def resample(audio_tensor, sr0):
|
||||
global resample_transform_dict
|
||||
key = "%s-%s-%s" % (sr0, sr1, str(device))
|
||||
if key not in resample_transform_dict:
|
||||
resample_transform_dict[key] = torchaudio.transforms.Resample(sr0, sr1).to(device)
|
||||
return resample_transform_dict[key](audio_tensor)
|
||||
if sr0 not in resample_transform_dict:
|
||||
resample_transform_dict[sr0] = torchaudio.transforms.Resample(sr0, 24000).to(device)
|
||||
return resample_transform_dict[sr0](audio_tensor)
|
||||
|
||||
|
||||
from module.mel_processing import mel_spectrogram_torch
|
||||
@ -325,19 +283,6 @@ mel_fn = lambda x: mel_spectrogram_torch(
|
||||
"center": False,
|
||||
},
|
||||
)
|
||||
mel_fn_v4 = lambda x: mel_spectrogram_torch(
|
||||
x,
|
||||
**{
|
||||
"n_fft": 1280,
|
||||
"win_size": 1280,
|
||||
"hop_size": 320,
|
||||
"num_mels": 100,
|
||||
"sampling_rate": 32000,
|
||||
"fmin": 0,
|
||||
"fmax": None,
|
||||
"center": False,
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
sr_model = None
|
||||
@ -379,19 +324,12 @@ from process_ckpt import get_sovits_version_from_path_fast, load_sovits_new
|
||||
|
||||
|
||||
def get_sovits_weights(sovits_path):
|
||||
from config import pretrained_sovits_name
|
||||
|
||||
path_sovits_v3 = pretrained_sovits_name["v3"]
|
||||
path_sovits_v4 = pretrained_sovits_name["v4"]
|
||||
path_sovits_v3 = "GPT_SoVITS/pretrained_models/s2Gv3.pth"
|
||||
is_exist_s2gv3 = os.path.exists(path_sovits_v3)
|
||||
is_exist_s2gv4 = os.path.exists(path_sovits_v4)
|
||||
|
||||
version, model_version, if_lora_v3 = get_sovits_version_from_path_fast(sovits_path)
|
||||
is_exist = is_exist_s2gv3 if model_version == "v3" else is_exist_s2gv4
|
||||
path_sovits = path_sovits_v3 if model_version == "v3" else path_sovits_v4
|
||||
|
||||
if if_lora_v3 == True and is_exist == False:
|
||||
logger.info("SoVITS %s 底模缺失,无法加载相应 LoRA 权重" % model_version)
|
||||
if if_lora_v3 == True and is_exist_s2gv3 == False:
|
||||
logger.info("SoVITS V3 底模缺失,无法加载相应 LoRA 权重")
|
||||
|
||||
dict_s2 = load_sovits_new(sovits_path)
|
||||
hps = dict_s2["config"]
|
||||
@ -404,13 +342,11 @@ def get_sovits_weights(sovits_path):
|
||||
else:
|
||||
hps.model.version = "v2"
|
||||
|
||||
model_params_dict = vars(hps.model)
|
||||
if model_version not in {"v3", "v4"}:
|
||||
if "Pro" in model_version:
|
||||
hps.model.version = model_version
|
||||
if sv_cn_model == None:
|
||||
init_sv_cn()
|
||||
if model_version == "v3":
|
||||
hps.model.version = "v3"
|
||||
|
||||
model_params_dict = vars(hps.model)
|
||||
if model_version != "v3":
|
||||
vq_model = SynthesizerTrn(
|
||||
hps.data.filter_length // 2 + 1,
|
||||
hps.train.segment_size // hps.data.hop_length,
|
||||
@ -418,18 +354,13 @@ def get_sovits_weights(sovits_path):
|
||||
**model_params_dict,
|
||||
)
|
||||
else:
|
||||
hps.model.version = model_version
|
||||
vq_model = SynthesizerTrnV3(
|
||||
hps.data.filter_length // 2 + 1,
|
||||
hps.train.segment_size // hps.data.hop_length,
|
||||
n_speakers=hps.data.n_speakers,
|
||||
**model_params_dict,
|
||||
)
|
||||
if model_version == "v3":
|
||||
init_bigvgan()
|
||||
if model_version == "v4":
|
||||
init_hifigan()
|
||||
|
||||
init_bigvgan()
|
||||
model_version = hps.model.version
|
||||
logger.info(f"模型版本: {model_version}")
|
||||
if "pretrained" not in sovits_path:
|
||||
@ -445,8 +376,7 @@ def get_sovits_weights(sovits_path):
|
||||
if if_lora_v3 == False:
|
||||
vq_model.load_state_dict(dict_s2["weight"], strict=False)
|
||||
else:
|
||||
path_sovits = path_sovits_v3 if model_version == "v3" else path_sovits_v4
|
||||
vq_model.load_state_dict(load_sovits_new(path_sovits)["weight"], strict=False)
|
||||
vq_model.load_state_dict(load_sovits_new(path_sovits_v3)["weight"], strict=False)
|
||||
lora_rank = dict_s2["lora_rank"]
|
||||
lora_config = LoraConfig(
|
||||
target_modules=["to_k", "to_q", "to_v", "to_out.0"],
|
||||
@ -475,7 +405,7 @@ hz = 50
|
||||
|
||||
|
||||
def get_gpt_weights(gpt_path):
|
||||
dict_s1 = torch.load(gpt_path, map_location="cpu", weights_only=False)
|
||||
dict_s1 = torch.load(gpt_path, map_location="cpu")
|
||||
config = dict_s1["config"]
|
||||
max_sec = config["data"]["max_sec"]
|
||||
t2s_model = Text2SemanticLightningModule(config, "****", is_train=False)
|
||||
@ -543,65 +473,62 @@ from text import chinese
|
||||
|
||||
|
||||
def get_phones_and_bert(text, language, version, final=False):
|
||||
text = re.sub(r' {2,}', ' ', text)
|
||||
textlist = []
|
||||
langlist = []
|
||||
if language == "all_zh":
|
||||
for tmp in LangSegmenter.getTexts(text,"zh"):
|
||||
langlist.append(tmp["lang"])
|
||||
textlist.append(tmp["text"])
|
||||
elif language == "all_yue":
|
||||
for tmp in LangSegmenter.getTexts(text,"zh"):
|
||||
if tmp["lang"] == "zh":
|
||||
tmp["lang"] = "yue"
|
||||
langlist.append(tmp["lang"])
|
||||
textlist.append(tmp["text"])
|
||||
elif language == "all_ja":
|
||||
for tmp in LangSegmenter.getTexts(text,"ja"):
|
||||
langlist.append(tmp["lang"])
|
||||
textlist.append(tmp["text"])
|
||||
elif language == "all_ko":
|
||||
for tmp in LangSegmenter.getTexts(text,"ko"):
|
||||
langlist.append(tmp["lang"])
|
||||
textlist.append(tmp["text"])
|
||||
elif language == "en":
|
||||
langlist.append("en")
|
||||
textlist.append(text)
|
||||
elif language == "auto":
|
||||
for tmp in LangSegmenter.getTexts(text):
|
||||
langlist.append(tmp["lang"])
|
||||
textlist.append(tmp["text"])
|
||||
elif language == "auto_yue":
|
||||
for tmp in LangSegmenter.getTexts(text):
|
||||
if tmp["lang"] == "zh":
|
||||
tmp["lang"] = "yue"
|
||||
langlist.append(tmp["lang"])
|
||||
textlist.append(tmp["text"])
|
||||
else:
|
||||
for tmp in LangSegmenter.getTexts(text):
|
||||
if langlist:
|
||||
if (tmp["lang"] == "en" and langlist[-1] == "en") or (tmp["lang"] != "en" and langlist[-1] != "en"):
|
||||
textlist[-1] += tmp["text"]
|
||||
continue
|
||||
if tmp["lang"] == "en":
|
||||
langlist.append(tmp["lang"])
|
||||
if language in {"en", "all_zh", "all_ja", "all_ko", "all_yue"}:
|
||||
formattext = text
|
||||
while " " in formattext:
|
||||
formattext = formattext.replace(" ", " ")
|
||||
if language == "all_zh":
|
||||
if re.search(r"[A-Za-z]", formattext):
|
||||
formattext = re.sub(r"[a-z]", lambda x: x.group(0).upper(), formattext)
|
||||
formattext = chinese.mix_text_normalize(formattext)
|
||||
return get_phones_and_bert(formattext, "zh", version)
|
||||
else:
|
||||
# 因无法区别中日韩文汉字,以用户输入为准
|
||||
langlist.append(language)
|
||||
textlist.append(tmp["text"])
|
||||
phones_list = []
|
||||
bert_list = []
|
||||
norm_text_list = []
|
||||
for i in range(len(textlist)):
|
||||
lang = langlist[i]
|
||||
phones, word2ph, norm_text = clean_text_inf(textlist[i], lang, version)
|
||||
bert = get_bert_inf(phones, word2ph, norm_text, lang)
|
||||
phones_list.append(phones)
|
||||
norm_text_list.append(norm_text)
|
||||
bert_list.append(bert)
|
||||
bert = torch.cat(bert_list, dim=1)
|
||||
phones = sum(phones_list, [])
|
||||
norm_text = "".join(norm_text_list)
|
||||
phones, word2ph, norm_text = clean_text_inf(formattext, language, version)
|
||||
bert = get_bert_feature(norm_text, word2ph).to(device)
|
||||
elif language == "all_yue" and re.search(r"[A-Za-z]", formattext):
|
||||
formattext = re.sub(r"[a-z]", lambda x: x.group(0).upper(), formattext)
|
||||
formattext = chinese.mix_text_normalize(formattext)
|
||||
return get_phones_and_bert(formattext, "yue", version)
|
||||
else:
|
||||
phones, word2ph, norm_text = clean_text_inf(formattext, language, version)
|
||||
bert = torch.zeros(
|
||||
(1024, len(phones)),
|
||||
dtype=torch.float16 if is_half == True else torch.float32,
|
||||
).to(device)
|
||||
elif language in {"zh", "ja", "ko", "yue", "auto", "auto_yue"}:
|
||||
textlist = []
|
||||
langlist = []
|
||||
if language == "auto":
|
||||
for tmp in LangSegmenter.getTexts(text):
|
||||
langlist.append(tmp["lang"])
|
||||
textlist.append(tmp["text"])
|
||||
elif language == "auto_yue":
|
||||
for tmp in LangSegmenter.getTexts(text):
|
||||
if tmp["lang"] == "zh":
|
||||
tmp["lang"] = "yue"
|
||||
langlist.append(tmp["lang"])
|
||||
textlist.append(tmp["text"])
|
||||
else:
|
||||
for tmp in LangSegmenter.getTexts(text):
|
||||
if tmp["lang"] == "en":
|
||||
langlist.append(tmp["lang"])
|
||||
else:
|
||||
# 因无法区别中日韩文汉字,以用户输入为准
|
||||
langlist.append(language)
|
||||
textlist.append(tmp["text"])
|
||||
phones_list = []
|
||||
bert_list = []
|
||||
norm_text_list = []
|
||||
for i in range(len(textlist)):
|
||||
lang = langlist[i]
|
||||
phones, word2ph, norm_text = clean_text_inf(textlist[i], lang, version)
|
||||
bert = get_bert_inf(phones, word2ph, norm_text, lang)
|
||||
phones_list.append(phones)
|
||||
norm_text_list.append(norm_text)
|
||||
bert_list.append(bert)
|
||||
bert = torch.cat(bert_list, dim=1)
|
||||
phones = sum(phones_list, [])
|
||||
norm_text = "".join(norm_text_list)
|
||||
|
||||
if not final and len(phones) < 6:
|
||||
return get_phones_and_bert("." + text, language, version, final=True)
|
||||
@ -637,34 +564,23 @@ class DictToAttrRecursive(dict):
|
||||
raise AttributeError(f"Attribute {item} not found")
|
||||
|
||||
|
||||
def get_spepc(hps, filename, dtype, device, is_v2pro=False):
|
||||
sr1 = int(hps.data.sampling_rate)
|
||||
audio, sr0 = torchaudio.load(filename)
|
||||
if sr0 != sr1:
|
||||
audio = audio.to(device)
|
||||
if audio.shape[0] == 2:
|
||||
audio = audio.mean(0).unsqueeze(0)
|
||||
audio = resample(audio, sr0, sr1, device)
|
||||
else:
|
||||
audio = audio.to(device)
|
||||
if audio.shape[0] == 2:
|
||||
audio = audio.mean(0).unsqueeze(0)
|
||||
|
||||
def get_spepc(hps, filename):
|
||||
audio, _ = librosa.load(filename, sr=int(hps.data.sampling_rate))
|
||||
audio = torch.FloatTensor(audio)
|
||||
maxx = audio.abs().max()
|
||||
if maxx > 1:
|
||||
audio /= min(2, maxx)
|
||||
audio_norm = audio
|
||||
audio_norm = audio_norm.unsqueeze(0)
|
||||
spec = spectrogram_torch(
|
||||
audio,
|
||||
audio_norm,
|
||||
hps.data.filter_length,
|
||||
hps.data.sampling_rate,
|
||||
hps.data.hop_length,
|
||||
hps.data.win_length,
|
||||
center=False,
|
||||
)
|
||||
spec = spec.to(dtype)
|
||||
if is_v2pro == True:
|
||||
audio = resample(audio, sr1, 16000, device).to(dtype)
|
||||
return spec, audio
|
||||
return spec
|
||||
|
||||
|
||||
def pack_audio(audio_bytes, data, rate):
|
||||
@ -851,16 +767,6 @@ def get_tts_wav(
|
||||
t2s_model = infer_gpt.t2s_model
|
||||
max_sec = infer_gpt.max_sec
|
||||
|
||||
if version == "v3":
|
||||
if sample_steps not in [4, 8, 16, 32, 64, 128]:
|
||||
sample_steps = 32
|
||||
elif version == "v4":
|
||||
if sample_steps not in [4, 8, 16, 32]:
|
||||
sample_steps = 8
|
||||
|
||||
if if_sr and version != "v3":
|
||||
if_sr = False
|
||||
|
||||
t0 = ttime()
|
||||
prompt_text = prompt_text.strip("\n")
|
||||
if prompt_text[-1] not in splits:
|
||||
@ -884,29 +790,19 @@ def get_tts_wav(
|
||||
prompt_semantic = codes[0, 0]
|
||||
prompt = prompt_semantic.unsqueeze(0).to(device)
|
||||
|
||||
is_v2pro = version in {"v2Pro", "v2ProPlus"}
|
||||
if version not in {"v3", "v4"}:
|
||||
if version != "v3":
|
||||
refers = []
|
||||
if is_v2pro:
|
||||
sv_emb = []
|
||||
if sv_cn_model == None:
|
||||
init_sv_cn()
|
||||
if inp_refs:
|
||||
for path in inp_refs:
|
||||
try: #####这里加上提取sv的逻辑,要么一堆sv一堆refer,要么单个sv单个refer
|
||||
refer, audio_tensor = get_spepc(hps, path.name, dtype, device, is_v2pro)
|
||||
try:
|
||||
refer = get_spepc(hps, path).to(dtype).to(device)
|
||||
refers.append(refer)
|
||||
if is_v2pro:
|
||||
sv_emb.append(sv_cn_model.compute_embedding3(audio_tensor))
|
||||
except Exception as e:
|
||||
logger.error(e)
|
||||
if len(refers) == 0:
|
||||
refers, audio_tensor = get_spepc(hps, ref_wav_path, dtype, device, is_v2pro)
|
||||
refers = [refers]
|
||||
if is_v2pro:
|
||||
sv_emb = [sv_cn_model.compute_embedding3(audio_tensor)]
|
||||
refers = [get_spepc(hps, ref_wav_path).to(dtype).to(device)]
|
||||
else:
|
||||
refer, audio_tensor = get_spepc(hps, ref_wav_path, dtype, device)
|
||||
refer = get_spepc(hps, ref_wav_path).to(device).to(dtype)
|
||||
|
||||
t1 = ttime()
|
||||
# os.environ['version'] = version
|
||||
@ -946,56 +842,41 @@ def get_tts_wav(
|
||||
pred_semantic = pred_semantic[:, -idx:].unsqueeze(0)
|
||||
t3 = ttime()
|
||||
|
||||
if version not in {"v3", "v4"}:
|
||||
if is_v2pro:
|
||||
audio = (
|
||||
vq_model.decode(
|
||||
pred_semantic,
|
||||
torch.LongTensor(phones2).to(device).unsqueeze(0),
|
||||
refers,
|
||||
speed=speed,
|
||||
sv_emb=sv_emb,
|
||||
)
|
||||
.detach()
|
||||
.cpu()
|
||||
.numpy()[0, 0]
|
||||
)
|
||||
else:
|
||||
audio = (
|
||||
vq_model.decode(
|
||||
pred_semantic, torch.LongTensor(phones2).to(device).unsqueeze(0), refers, speed=speed
|
||||
)
|
||||
.detach()
|
||||
.cpu()
|
||||
.numpy()[0, 0]
|
||||
)
|
||||
if version != "v3":
|
||||
audio = (
|
||||
vq_model.decode(pred_semantic, torch.LongTensor(phones2).to(device).unsqueeze(0), refers, speed=speed)
|
||||
.detach()
|
||||
.cpu()
|
||||
.numpy()[0, 0]
|
||||
) ###试试重建不带上prompt部分
|
||||
else:
|
||||
phoneme_ids0 = torch.LongTensor(phones1).to(device).unsqueeze(0)
|
||||
phoneme_ids1 = torch.LongTensor(phones2).to(device).unsqueeze(0)
|
||||
|
||||
# print(11111111, phoneme_ids0, phoneme_ids1)
|
||||
fea_ref, ge = vq_model.decode_encp(prompt.unsqueeze(0), phoneme_ids0, refer)
|
||||
ref_audio, sr = torchaudio.load(ref_wav_path)
|
||||
ref_audio = ref_audio.to(device).float()
|
||||
if ref_audio.shape[0] == 2:
|
||||
ref_audio = ref_audio.mean(0).unsqueeze(0)
|
||||
|
||||
tgt_sr = 24000 if version == "v3" else 32000
|
||||
if sr != tgt_sr:
|
||||
ref_audio = resample(ref_audio, sr, tgt_sr, device)
|
||||
mel2 = mel_fn(ref_audio) if version == "v3" else mel_fn_v4(ref_audio)
|
||||
if sr != 24000:
|
||||
ref_audio = resample(ref_audio, sr)
|
||||
# print("ref_audio",ref_audio.abs().mean())
|
||||
mel2 = mel_fn(ref_audio)
|
||||
mel2 = norm_spec(mel2)
|
||||
T_min = min(mel2.shape[2], fea_ref.shape[2])
|
||||
mel2 = mel2[:, :, :T_min]
|
||||
fea_ref = fea_ref[:, :, :T_min]
|
||||
Tref = 468 if version == "v3" else 500
|
||||
Tchunk = 934 if version == "v3" else 1000
|
||||
if T_min > Tref:
|
||||
mel2 = mel2[:, :, -Tref:]
|
||||
fea_ref = fea_ref[:, :, -Tref:]
|
||||
T_min = Tref
|
||||
chunk_len = Tchunk - T_min
|
||||
if T_min > 468:
|
||||
mel2 = mel2[:, :, -468:]
|
||||
fea_ref = fea_ref[:, :, -468:]
|
||||
T_min = 468
|
||||
chunk_len = 934 - T_min
|
||||
# print("fea_ref",fea_ref,fea_ref.shape)
|
||||
# print("mel2",mel2)
|
||||
mel2 = mel2.to(dtype)
|
||||
fea_todo, ge = vq_model.decode_encp(pred_semantic, phoneme_ids1, refer, ge, speed)
|
||||
# print("fea_todo",fea_todo)
|
||||
# print("ge",ge.abs().mean())
|
||||
cfm_resss = []
|
||||
idx = 0
|
||||
while 1:
|
||||
@ -1004,24 +885,22 @@ def get_tts_wav(
|
||||
break
|
||||
idx += chunk_len
|
||||
fea = torch.cat([fea_ref, fea_todo_chunk], 2).transpose(2, 1)
|
||||
# set_seed(123)
|
||||
cfm_res = vq_model.cfm.inference(
|
||||
fea, torch.LongTensor([fea.size(1)]).to(fea.device), mel2, sample_steps, inference_cfg_rate=0
|
||||
)
|
||||
cfm_res = cfm_res[:, :, mel2.shape[2] :]
|
||||
mel2 = cfm_res[:, :, -T_min:]
|
||||
# print("fea", fea)
|
||||
# print("mel2in", mel2)
|
||||
fea_ref = fea_todo_chunk[:, :, -T_min:]
|
||||
cfm_resss.append(cfm_res)
|
||||
cfm_res = torch.cat(cfm_resss, 2)
|
||||
cfm_res = denorm_spec(cfm_res)
|
||||
if version == "v3":
|
||||
if bigvgan_model == None:
|
||||
init_bigvgan()
|
||||
else: # v4
|
||||
if hifigan_model == None:
|
||||
init_hifigan()
|
||||
vocoder_model = bigvgan_model if version == "v3" else hifigan_model
|
||||
cmf_res = torch.cat(cfm_resss, 2)
|
||||
cmf_res = denorm_spec(cmf_res)
|
||||
if bigvgan_model == None:
|
||||
init_bigvgan()
|
||||
with torch.inference_mode():
|
||||
wav_gen = vocoder_model(cfm_res)
|
||||
wav_gen = bigvgan_model(cmf_res)
|
||||
audio = wav_gen[0][0].cpu().detach().numpy()
|
||||
|
||||
max_audio = np.abs(audio).max()
|
||||
@ -1032,13 +911,7 @@ def get_tts_wav(
|
||||
audio_opt = np.concatenate(audio_opt, 0)
|
||||
t4 = ttime()
|
||||
|
||||
if version in {"v1", "v2", "v2Pro", "v2ProPlus"}:
|
||||
sr = 32000
|
||||
elif version == "v3":
|
||||
sr = 24000
|
||||
else:
|
||||
sr = 48000 # v4
|
||||
|
||||
sr = hps.data.sampling_rate if version != "v3" else 24000
|
||||
if if_sr and sr == 24000:
|
||||
audio_opt = torch.from_numpy(audio_opt).float().to(device)
|
||||
audio_opt, sr = audio_sr(audio_opt.unsqueeze(0), sr)
|
||||
@ -1058,12 +931,8 @@ def get_tts_wav(
|
||||
|
||||
if not stream_mode == "normal":
|
||||
if media_type == "wav":
|
||||
if version in {"v1", "v2", "v2Pro", "v2ProPlus"}:
|
||||
sr = 32000
|
||||
elif version == "v3":
|
||||
sr = 48000 if if_sr else 24000
|
||||
else:
|
||||
sr = 48000 # v4
|
||||
sr = 48000 if if_sr else 24000
|
||||
sr = hps.data.sampling_rate if version != "v3" else sr
|
||||
audio_bytes = pack_wav(audio_bytes, sr)
|
||||
yield audio_bytes.getvalue()
|
||||
|
||||
@ -1128,6 +997,9 @@ def handle(
|
||||
if not default_refer.is_ready():
|
||||
return JSONResponse({"code": 400, "message": "未指定参考音频且接口无预设"}, status_code=400)
|
||||
|
||||
if sample_steps not in [4, 8, 16, 32]:
|
||||
sample_steps = 32
|
||||
|
||||
if cut_punc == None:
|
||||
text = cut_text(text, default_cut_punc)
|
||||
else:
|
||||
@ -1230,10 +1102,10 @@ default_refer = DefaultRefer(args.default_refer_path, args.default_refer_text, a
|
||||
# 模型路径检查
|
||||
if sovits_path == "":
|
||||
sovits_path = g_config.pretrained_sovits_path
|
||||
logger.warning(f"未指定SoVITS模型路径, fallback后当前值: {sovits_path}")
|
||||
logger.warn(f"未指定SoVITS模型路径, fallback后当前值: {sovits_path}")
|
||||
if gpt_path == "":
|
||||
gpt_path = g_config.pretrained_gpt_path
|
||||
logger.warning(f"未指定GPT模型路径, fallback后当前值: {gpt_path}")
|
||||
logger.warn(f"未指定GPT模型路径, fallback后当前值: {gpt_path}")
|
||||
|
||||
# 指定默认参考音频, 调用方 未提供/未给全 参考音频参数时使用
|
||||
if default_refer.path == "" or default_refer.text == "" or default_refer.language == "":
|
||||
@ -1341,20 +1213,22 @@ async def change_refer(refer_wav_path: str = None, prompt_text: str = None, prom
|
||||
@app.post("/")
|
||||
async def tts_endpoint(request: Request):
|
||||
json_post_raw = await request.json()
|
||||
# Normalize parameters to support both v1 and v2 naming conventions
|
||||
params = normalize_api_params(json_post_raw)
|
||||
return handle(
|
||||
json_post_raw.get("refer_wav_path"),
|
||||
json_post_raw.get("prompt_text"),
|
||||
json_post_raw.get("prompt_language"),
|
||||
json_post_raw.get("text"),
|
||||
json_post_raw.get("text_language"),
|
||||
json_post_raw.get("cut_punc"),
|
||||
json_post_raw.get("top_k", 15),
|
||||
json_post_raw.get("top_p", 1.0),
|
||||
json_post_raw.get("temperature", 1.0),
|
||||
json_post_raw.get("speed", 1.0),
|
||||
json_post_raw.get("inp_refs", []),
|
||||
json_post_raw.get("sample_steps", 32),
|
||||
json_post_raw.get("if_sr", False),
|
||||
params.get("refer_wav_path"),
|
||||
params.get("prompt_text"),
|
||||
params.get("prompt_language"),
|
||||
params.get("text"),
|
||||
params.get("text_language"),
|
||||
params.get("cut_punc"),
|
||||
params.get("top_k", 15),
|
||||
params.get("top_p", 1.0),
|
||||
params.get("temperature", 1.0),
|
||||
params.get("speed", 1.0),
|
||||
params.get("inp_refs", []),
|
||||
params.get("sample_steps", 32),
|
||||
params.get("if_sr", False),
|
||||
)
|
||||
|
||||
|
||||
|
||||
@ -39,6 +39,7 @@ def create_model(language="zh"):
|
||||
local_dir="tools/asr/models/speech_paraformer-large_asr_nat-zh-cn-16k-common-vocab8404-pytorch",
|
||||
)
|
||||
model_revision = "v2.0.4"
|
||||
vad_model_revision = punc_model_revision = "v2.0.4"
|
||||
elif language == "yue":
|
||||
path_asr = "tools/asr/models/speech_UniASR_asr_2pass-cantonese-CHS-16k-common-vocab1468-tensorflow1-online"
|
||||
snapshot_download(
|
||||
@ -51,8 +52,6 @@ def create_model(language="zh"):
|
||||
else:
|
||||
raise ValueError(f"{language} is not supported")
|
||||
|
||||
vad_model_revision = punc_model_revision = "v2.0.4"
|
||||
|
||||
if language in funasr_models:
|
||||
return funasr_models[language]
|
||||
else:
|
||||
|
||||
@ -18,7 +18,6 @@
|
||||
"ASR 模型": "ASR model",
|
||||
"ASR 模型尺寸": "ASR model size",
|
||||
"ASR 语言设置": "ASR language",
|
||||
"CPU训练,较慢": "Training on CPU (slower)",
|
||||
"GPT 训练: 模型权重文件在 GPT_weights/": "GPT Training: Model Weights saved in GPT_weights/",
|
||||
"GPT模型列表": "GPT weight list",
|
||||
"GPT训练": "GPT Training",
|
||||
@ -26,19 +25,18 @@
|
||||
"GPU卡号,只能填1个整数": "GPU number, can only input ONE integer",
|
||||
"GPU卡号以-分割,每个卡号一个进程": "GPU number is separated by -, each GPU will run one process ",
|
||||
"LoRA秩": "LoRA Rank",
|
||||
"SoVITS V3 底模缺失,无法加载相应 LoRA 权重": "Missing Pretrained SoVITS V3 Model, Cannot Load LoRA Weights",
|
||||
"SoVITS 训练: 模型权重文件在 SoVITS_weights/": "SoVITS Training: Model Weights saved in SoVITS_weights/",
|
||||
"SoVITS模型列表": "SoVITS weight list",
|
||||
"SoVITS训练": "SoVITS Training",
|
||||
"Submit Text: 将当前页所有文本框内容手工保存到内存和文件(翻页前后或者退出标注页面前如果没点这个按钮,你再翻回来就回滚了,白忙活。)": "Submit Text: Manually save all text box contents on the current page to memory and file (If you don't click this button before switching pages or exiting the labeling page, the data will be rolled back when you return, which would be a waste of work.)",
|
||||
"TTS推理WebUI": "TTS Inference WebUI",
|
||||
"UVR5人声伴奏分离&去混响去延迟工具": "UVR5 WebUI (Vocal Separation/Deecho/Dereverb)",
|
||||
"V3不支持无参考文本模式,请填写参考文本!": "V3 does not support the no-reference-text mode. Please provide reference text!",
|
||||
"alpha_mix:混多少比例归一化后音频进来": "alpha_mix: proportion of normalized audio merged into dataset",
|
||||
"batch_size": "Batch Size",
|
||||
"hop_size:怎么算音量曲线,越小精度越大计算量越高(不是精度越大效果越好)": "hop_size: FO hop size, the smaller the value, the higher the accuracy)",
|
||||
"max:归一化后最大值多少": "Loudness multiplier after normalized",
|
||||
"max_sil_kept:切完后静音最多留多长": "Maximum length for silence to be kept",
|
||||
"min_interval:最短切割间隔": "Minimum interval for audio cutting",
|
||||
"min_interval:最短切割间隔": "Minumum interval for audio cutting",
|
||||
"min_length:每段最小多长,如果第一段太短一直和后面段连起来直到超过这个值": "min_length: the minimum length of each segment. If the first segment is too short, it will be concatenated with the next segment until it exceeds this value",
|
||||
"temperature": "temperature",
|
||||
"threshold:音量小于这个值视作静音的备选切割点": "Noise gate threshold (loudness below this value will be treated as noise",
|
||||
@ -47,11 +45,8 @@
|
||||
"v3暂不支持该模式,使用了会报错。": "v3 does not support this mode currently, using it will cause an error.",
|
||||
"v3输出如果觉得闷可以试试开超分": "For V3 model, if generated audio sounds somewhat muffled, try enable audio super-resolution.",
|
||||
"不切": "No slice",
|
||||
"不训练直接推v2ProPlus底模!": "Use v2ProPlus base model directly without training!",
|
||||
"不训练直接推v2Pro底模!": "Use v2Pro base model directly without training!",
|
||||
"不训练直接推v2底模!": "Use v2 base model directly without training!",
|
||||
"不训练直接推v3底模!": "Use v3 base model directly without training!",
|
||||
"中文": "Chinese",
|
||||
"中文教程文档": "Chinese Tutorial",
|
||||
"中英混合": "Chinese-English Mixed",
|
||||
"主参考音频(请上传3~10秒内参考音频,超过会报错!)": "Primary Reference Audio (Please upload reference audio within 3-10 seconds, exceeding this limit will cause an error!)",
|
||||
"主参考音频的文本": "Text of Primary Reference Audio",
|
||||
@ -92,7 +87,6 @@
|
||||
"句间停顿秒数": "Pause Duration between Sentences (Seconds)",
|
||||
"可选项:通过拖拽多个文件上传多个参考音频(建议同性),平均融合他们的音色。如不填写此项,音色由左侧单个参考音频控制。如是微调模型,建议参考音频全部在微调训练集音色内,底模不用管。": "Optional: Upload multiple reference audio files by dragging and dropping them (recommended to be of the same gender), and average their tone. If this option is left blank, the tone will be controlled by the single reference audio on the left. If fine-tuning the model, it is recommended that all reference audio files have tones within the fine-tuning training set; the pretrained model can be ignored.",
|
||||
"合成语音": "Start inference",
|
||||
"合成音频": "Synthesize Audio",
|
||||
"合格的文件夹路径格式举例: E:\\codes\\py39\\vits_vc_gpu\\白鹭霜华测试样例(去文件管理器地址栏拷就行了)。": "An example of a valid folder path format: E:\\codes\\py39\\vits_vc_gpu\\白鹭霜华测试样例 (simply copy the address from the file manager's address bar).",
|
||||
"后续将支持转音素、手工修改音素、语音合成分步执行。": "Support for Phoneme Conversion, Manual Phoneme Editing, and Step-by-Step Speech Synthesis will be added in the future.",
|
||||
"听不清参考音频说的啥(不晓得写啥)可以开。开启后无视填写的参考文本。": "If reference audio is not clear or unsure what to write, enable this option to ignore the reference text.",
|
||||
@ -110,15 +104,11 @@
|
||||
"已关闭": " is Closed",
|
||||
"已完成": " Finished",
|
||||
"已开启": " is Opened",
|
||||
"并行合成中": "Parallel Synthesis in Progress",
|
||||
"并行推理": "Parallel Inference",
|
||||
"并行推理模式已关闭": "Parallel Inference Mode Disabled",
|
||||
"并行推理模式已开启": "Parallel Inference Mode Enabled",
|
||||
"底模缺失,无法加载相应 LoRA 权重": "Missing Pretrained Model, Cannot Load LoRA Weights",
|
||||
"开启": "Open ",
|
||||
"开启无参考文本模式。不填参考文本亦相当于开启。": "Enable no reference mode. If you don't fill 'Text for reference audio', no reference mode will be enabled.",
|
||||
"当开启并行推理模式时,SoVits V3/4模型不支持分桶处理,已自动关闭分桶处理": "When parallel inference mode is enabled, SoVITS V3/4 models do not support bucket processing; bucket processing has been automatically disabled.",
|
||||
"微调模型信息": "Fine-tuned Model Information",
|
||||
"微调训练": "Fine-Tuning",
|
||||
"怎么切": "How to slice the sentence",
|
||||
"总训练轮数total_epoch": "Total training epochs (total_epoch):",
|
||||
@ -150,8 +140,8 @@
|
||||
"模型": "Model",
|
||||
"模型分为三类:": "Models are categorized into three types:",
|
||||
"模型切换": "Model switch",
|
||||
"模型加载中,请等待": "Model is loading, please wait...",
|
||||
"每张显卡的batch_size": "Batch size per GPU:",
|
||||
"版本": "Version",
|
||||
"粤英混合": "Yue-English Mixed",
|
||||
"粤语": "Yue",
|
||||
"终止合成": "Terminate Synthesis",
|
||||
@ -160,7 +150,6 @@
|
||||
"缺少音素数据集": "Missing Phoneme Dataset",
|
||||
"缺少音频数据集": "Missing Audio Dataset",
|
||||
"英文": "English",
|
||||
"训练模型的版本": "Version of the trained model",
|
||||
"训练集格式化一键三连": "Training Set One-Click Formatting",
|
||||
"训练集格式化工具": "Dataset Formatting Tool",
|
||||
"语义Token提取": "Semantics Token Extraction",
|
||||
@ -174,9 +163,10 @@
|
||||
"语音识别": "Speech Recognition",
|
||||
"语音识别工具": "Speech Recognition Tool",
|
||||
"语音降噪": "Speech Denoising",
|
||||
"语音降噪工具": "Speech Denoising Tool",
|
||||
"请上传3~10秒内参考音频,超过会报错!": "Please upload a reference audio within the 3-10 second range; if it exceeds this duration, it will raise errors.",
|
||||
"请上传参考音频": "Please Upload the Reference Audio",
|
||||
"请填入推理文本": "Please Fill in the Target Text",
|
||||
"请填入推理文本": "Please Fill in the Terget Text",
|
||||
"请填入正确的List路径": "Please Fill in the Correct List Path",
|
||||
"请填入正确的音频文件夹路径": "Please Fill in the Correct Audio Folder Path",
|
||||
"请输入有效文本": "Please enter valid text.",
|
||||
@ -197,8 +187,7 @@
|
||||
"进度": "Progress",
|
||||
"进程已终止": " Process Terminated",
|
||||
"进程输出信息": " Process Output Information",
|
||||
"选择训练完存放在SoVITS_weights和GPT_weights下的模型。默认的几个是底模,体验5秒Zero Shot TTS不训练推理用。": "Select the model from SoVITS_weights and GPT_weights. The default models are pretrained models for experiencing 5-second Zero-Shot TTS without training.",
|
||||
"采样步数(仅对V3/4生效)": "Sampling Steps (V3/V4 Only)",
|
||||
"选择训练完存放在SoVITS_weights和GPT_weights下的模型。默认的一个是底模,体验5秒Zero Shot TTS用。": "Choose the models from SoVITS_weights and GPT_weights. The default one is a pretrain, so you can experience zero shot TTS.",
|
||||
"采样步数,如果觉得电,提高试试,如果觉得慢,降低试试": "Sampling Steps: If feel noisy, try increasing, if feel slow, try decreasing",
|
||||
"重复惩罚": "Repetition Penalty",
|
||||
"随机种子": "Random Seed",
|
||||
@ -214,13 +203,29 @@
|
||||
"音频标注WebUI": "Audio Labeling WebUI",
|
||||
"音频自动切分输入路径,可文件可文件夹": "Audio slicer input (file or folder)",
|
||||
"音频超分中": "Running Audio Super-Resolution",
|
||||
"音频超采样": "Audio Upsampling",
|
||||
"音频超采样(仅对V3生效))": "Audio Upsampling (V3 Only)",
|
||||
"预测语义Token": "Predict Semantic Token",
|
||||
"预训练GPT模型路径": "Pretrained GPT Model Path",
|
||||
"预训练SSL模型路径": "Pretrained SSL Model Path",
|
||||
"预训练SoVITS-D模型路径": "Pretrained SoVITS-D Model Path",
|
||||
"预训练SoVITS-G模型路径": "Pretrained SoVITS-G Model Path",
|
||||
"预训练中文BERT模型路径": "Pretrained Chinese BERT Model Path",
|
||||
"预训练模型路径": "Pretrained Model Path"
|
||||
}
|
||||
"参数预设": "Preset",
|
||||
"选择预设可快速配置推理参数": "Select a preset to quickly configure inference parameters",
|
||||
"快速合成": "Fast Synthesis",
|
||||
"高质量": "High Quality",
|
||||
"平衡": "Balanced",
|
||||
"自定义": "Custom",
|
||||
"请输入包含音频文件的文件夹路径": "Please enter the folder path containing audio files",
|
||||
"请输入 .list 标注文件的完整路径": "Please enter the full path to the .list annotation file",
|
||||
"音频超采样(仅对V3生效))": "Audio Super-Sampling (V3 Only)",
|
||||
"采样步数(仅对V3/4生效)": "Sampling Steps (V3/V4 Only)",
|
||||
"选择文件/文件夹": "Select File/Folder",
|
||||
"选择输出目录(选择其中任意文件)": "Select Output Directory (Select any file inside)",
|
||||
"选择输入目录": "Select Input Directory",
|
||||
"选择输出目录": "Select Output Directory",
|
||||
"选择标注文件": "Select Annotation File",
|
||||
"选择音频目录": "Select Audio Directory",
|
||||
"选择文件夹": "Select Folder",
|
||||
"选择文件": "Select File",
|
||||
"📁 选择文件夹": "📁 Select Folder",
|
||||
"📄 选择文件": "📄 Select File"
|
||||
}
|
||||
@ -485,6 +485,8 @@ def istft(spec, hl):
|
||||
wave_right = librosa.istft(spec_right, hop_length=hl)
|
||||
wave = np.asfortranarray([wave_left, wave_right])
|
||||
|
||||
return wave
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import argparse
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user