Merge 7478a8a85622d2a4ff953b168cdd3748ccc49ce3 into 35e755427da174037da246642cab6987876c74fa

This commit is contained in:
Kevin Zhang 2024-05-16 16:20:40 +08:00 committed by GitHub
commit a93d60cb1a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 797 additions and 288 deletions

View File

@ -34,9 +34,6 @@ RUN if [ "$IMAGE_TYPE" != "elite" ]; then \
fi
# Copy the rest of the application
COPY . /workspace
# Copy the rest of the application
COPY . /workspace

View File

@ -5,6 +5,7 @@ import random
import traceback
from tqdm import tqdm
now_dir = os.getcwd()
sys.path.append(now_dir)
import ffmpeg
@ -26,6 +27,7 @@ from my_utils import load_audio
from module.mel_processing import spectrogram_torch
from TTS_infer_pack.text_segmentation_method import splits
from TTS_infer_pack.TextPreprocessor import TextPreprocessor
i18n = I18nAuto()
# configs/tts_infer.yaml
@ -49,7 +51,8 @@ custom:
"""
def set_seed(seed:int):
def set_seed(seed: int):
seed = int(seed)
seed = seed if seed != -1 else random.randrange(1 << 32)
print(f"Set seed to {seed}")
@ -71,40 +74,42 @@ def set_seed(seed:int):
pass
return seed
class TTS_Config:
default_configs={
default_configs = {
"device": "cpu",
"is_half": False,
"t2s_weights_path": "GPT_SoVITS/pretrained_models/s1bert25hz-2kh-longer-epoch=68e-step=50232.ckpt",
"vits_weights_path": "GPT_SoVITS/pretrained_models/s2G488k.pth",
"cnhuhbert_base_path": "GPT_SoVITS/pretrained_models/chinese-hubert-base",
"bert_base_path": "GPT_SoVITS/pretrained_models/chinese-roberta-wwm-ext-large",
"load_base": True,
}
configs:dict = None
def __init__(self, configs: Union[dict, str]=None):
configs: dict = None
def __init__(self, configs: Union[dict, str] = None):
# 设置默认配置文件路径
configs_base_path:str = "GPT_SoVITS/configs/"
configs_base_path: str = "GPT_SoVITS/configs/"
os.makedirs(configs_base_path, exist_ok=True)
self.configs_path:str = os.path.join(configs_base_path, "tts_infer.yaml")
self.configs_path: str = os.path.join(configs_base_path, "tts_infer.yaml")
if configs in ["", None]:
if not os.path.exists(self.configs_path):
self.save_configs()
print(f"Create default config file at {self.configs_path}")
configs:dict = {"default": deepcopy(self.default_configs)}
configs: dict = {"default": deepcopy(self.default_configs)}
if isinstance(configs, str):
self.configs_path = configs
configs:dict = self._load_configs(self.configs_path)
configs: dict = self._load_configs(self.configs_path)
assert isinstance(configs, dict)
default_configs:dict = configs.get("default", None)
default_configs: dict = configs.get("default", None)
if default_configs is not None:
self.default_configs = default_configs
self.configs:dict = configs.get("custom", deepcopy(self.default_configs))
self.configs: dict = configs.get("custom", deepcopy(self.default_configs))
self.device = self.configs.get("device", torch.device("cpu"))
self.is_half = self.configs.get("is_half", False)
@ -112,7 +117,7 @@ class TTS_Config:
self.vits_weights_path = self.configs.get("vits_weights_path", None)
self.bert_base_path = self.configs.get("bert_base_path", None)
self.cnhuhbert_base_path = self.configs.get("cnhuhbert_base_path", None)
self.load_base = self.configs.get("load_base", True)
if (self.t2s_weights_path in [None, ""]) or (not os.path.exists(self.t2s_weights_path)):
self.t2s_weights_path = self.default_configs['t2s_weights_path']
@ -128,29 +133,27 @@ class TTS_Config:
print(f"fall back to default cnhuhbert_base_path: {self.cnhuhbert_base_path}")
self.update_configs()
self.max_sec = None
self.hz:int = 50
self.semantic_frame_rate:str = "25hz"
self.segment_size:int = 20480
self.filter_length:int = 2048
self.sampling_rate:int = 32000
self.hop_length:int = 640
self.win_length:int = 2048
self.n_speakers:int = 300
self.hz: int = 50
self.semantic_frame_rate: str = "25hz"
self.segment_size: int = 20480
self.filter_length: int = 2048
self.sampling_rate: int = 32000
self.hop_length: int = 640
self.win_length: int = 2048
self.n_speakers: int = 300
self.languages:list = ["auto", "en", "zh", "ja", "all_zh", "all_ja"]
self.languages: list = ["auto", "en", "zh", "ja", "all_zh", "all_ja"]
def _load_configs(self, configs_path: str)->dict:
def _load_configs(self, configs_path: str) -> dict:
with open(configs_path, 'r') as f:
configs = yaml.load(f, Loader=yaml.FullLoader)
return configs
def save_configs(self, configs_path:str=None)->None:
configs={
"default":self.default_configs,
def save_configs(self, configs_path: str = None) -> None:
configs = {
"default": self.default_configs,
}
if self.configs is not None:
configs["custom"] = self.update_configs()
@ -162,12 +165,13 @@ class TTS_Config:
def update_configs(self):
self.config = {
"device" : str(self.device),
"is_half" : self.is_half,
"t2s_weights_path" : self.t2s_weights_path,
"vits_weights_path" : self.vits_weights_path,
"bert_base_path" : self.bert_base_path,
"device": str(self.device),
"is_half": self.is_half,
"t2s_weights_path": self.t2s_weights_path,
"vits_weights_path": self.vits_weights_path,
"bert_base_path": self.bert_base_path,
"cnhuhbert_base_path": self.cnhuhbert_base_path,
"load_base": self.load_base,
}
return self.config
@ -182,70 +186,82 @@ class TTS_Config:
def __repr__(self):
return self.__str__()
def __hash__(self):
return hash(self.configs_path)
def __eq__(self, other):
return isinstance(other, TTS_Config) and self.configs_path == other.configs_path
class TTS:
bert_tokenizer: AutoTokenizer = None
bert_model: AutoModelForMaskedLM = None
cnhuhbert_model: CNHubert = None
def __init__(self, configs: Union[dict, str, TTS_Config]):
if isinstance(configs, TTS_Config):
self.configs = configs
else:
self.configs:TTS_Config = TTS_Config(configs)
self.configs: TTS_Config = TTS_Config(configs)
self.t2s_model:Text2SemanticLightningModule = None
self.vits_model:SynthesizerTrn = None
self.bert_tokenizer:AutoTokenizer = None
self.bert_model:AutoModelForMaskedLM = None
self.cnhuhbert_model:CNHubert = None
self.t2s_model: Text2SemanticLightningModule = None
self.vits_model: SynthesizerTrn = None
# self.bert_tokenizer:AutoTokenizer = None
# self.bert_model:AutoModelForMaskedLM = None
# self.cnhuhbert_model:CNHubert = None
self._init_models()
self.text_preprocessor:TextPreprocessor = \
TextPreprocessor(self.bert_model,
self.bert_tokenizer,
self.text_preprocessor: TextPreprocessor = \
TextPreprocessor(TTS.bert_model,
TTS.bert_tokenizer,
self.configs.device)
self.prompt_cache:dict = {
"ref_audio_path" : None,
self.prompt_cache: dict = {
"ref_audio_path": None,
"prompt_semantic": None,
"refer_spec" : None,
"prompt_text" : None,
"prompt_lang" : None,
"phones" : None,
"bert_features" : None,
"norm_text" : None,
"refer_spec": None,
"prompt_text": None,
"prompt_lang": None,
"phones": None,
"bert_features": None,
"norm_text": None,
}
self.stop_flag: bool = False
self.precision: torch.dtype = torch.float16 if self.configs.is_half else torch.float32
self.stop_flag:bool = False
self.precision:torch.dtype = torch.float16 if self.configs.is_half else torch.float32
def _init_models(self,):
def _init_models(self):
self.init_t2s_weights(self.configs.t2s_weights_path)
self.init_vits_weights(self.configs.vits_weights_path)
self.init_bert_weights(self.configs.bert_base_path)
self.init_cnhuhbert_weights(self.configs.cnhuhbert_base_path)
if self.configs.load_base:
TTS.init_bert_weights(self.configs)
TTS.init_cnhuhbert_weights(self.configs)
# self.enable_half_precision(self.configs.is_half)
@staticmethod
def init_base_models(configs: TTS_Config):
TTS.init_bert_weights(configs)
TTS.init_cnhuhbert_weights(configs)
@staticmethod
def init_cnhuhbert_weights(configs: TTS_Config):
print(f"Loading CNHuBERT weights from {configs.cnhuhbert_base_path}")
TTS.cnhuhbert_model = CNHubert(configs.cnhuhbert_base_path)
TTS.cnhuhbert_model = TTS.cnhuhbert_model.eval()
TTS.cnhuhbert_model = TTS.cnhuhbert_model.to(configs.device)
if configs.is_half and str(configs.device) != "cpu":
TTS.cnhuhbert_model = TTS.cnhuhbert_model.half()
def init_cnhuhbert_weights(self, base_path: str):
print(f"Loading CNHuBERT weights from {base_path}")
self.cnhuhbert_model = CNHubert(base_path)
self.cnhuhbert_model=self.cnhuhbert_model.eval()
self.cnhuhbert_model = self.cnhuhbert_model.to(self.configs.device)
if self.configs.is_half and str(self.configs.device)!="cpu":
self.cnhuhbert_model = self.cnhuhbert_model.half()
def init_bert_weights(self, base_path: str):
print(f"Loading BERT weights from {base_path}")
self.bert_tokenizer = AutoTokenizer.from_pretrained(base_path)
self.bert_model = AutoModelForMaskedLM.from_pretrained(base_path)
self.bert_model=self.bert_model.eval()
self.bert_model = self.bert_model.to(self.configs.device)
if self.configs.is_half and str(self.configs.device)!="cpu":
self.bert_model = self.bert_model.half()
@staticmethod
def init_bert_weights(configs: TTS_Config):
print(f"Loading BERT weights from {configs.bert_base_path}")
TTS.bert_tokenizer = AutoTokenizer.from_pretrained(configs.bert_base_path)
TTS.bert_model = AutoModelForMaskedLM.from_pretrained(configs.bert_base_path)
TTS.bert_model = TTS.bert_model.eval()
TTS.bert_model = TTS.bert_model.to(configs.device)
if configs.is_half and str(configs.device) != "cpu":
TTS.bert_model = TTS.bert_model.half()
def init_vits_weights(self, weights_path: str):
print(f"Loading VITS weights from {weights_path}")
@ -275,10 +291,9 @@ class TTS:
vits_model = vits_model.eval()
vits_model.load_state_dict(dict_s2["weight"], strict=False)
self.vits_model = vits_model
if self.configs.is_half and str(self.configs.device)!="cpu":
if self.configs.is_half and str(self.configs.device) != "cpu":
self.vits_model = self.vits_model.half()
def init_t2s_weights(self, weights_path: str):
print(f"Loading Text2Semantic weights from {weights_path}")
self.configs.t2s_weights_path = weights_path
@ -292,7 +307,7 @@ class TTS:
t2s_model = t2s_model.to(self.configs.device)
t2s_model = t2s_model.eval()
self.t2s_model = t2s_model
if self.configs.is_half and str(self.configs.device)!="cpu":
if self.configs.is_half and str(self.configs.device) != "cpu":
self.t2s_model = self.t2s_model.half()
def enable_half_precision(self, enable: bool = True):
@ -311,22 +326,22 @@ class TTS:
self.configs.save_configs()
if enable:
if self.t2s_model is not None:
self.t2s_model =self.t2s_model.half()
self.t2s_model = self.t2s_model.half()
if self.vits_model is not None:
self.vits_model = self.vits_model.half()
if self.bert_model is not None:
self.bert_model =self.bert_model.half()
if self.cnhuhbert_model is not None:
self.cnhuhbert_model = self.cnhuhbert_model.half()
if TTS.bert_model is not None:
TTS.bert_model = TTS.bert_model.half()
if TTS.cnhuhbert_model is not None:
TTS.cnhuhbert_model = TTS.cnhuhbert_model.half()
else:
if self.t2s_model is not None:
self.t2s_model = self.t2s_model.float()
if self.vits_model is not None:
self.vits_model = self.vits_model.float()
if self.bert_model is not None:
self.bert_model = self.bert_model.float()
if self.cnhuhbert_model is not None:
self.cnhuhbert_model = self.cnhuhbert_model.float()
if TTS.bert_model is not None:
TTS.bert_model = TTS.bert_model.float()
if TTS.cnhuhbert_model is not None:
TTS.cnhuhbert_model = TTS.cnhuhbert_model.float()
def set_device(self, device: torch.device):
'''
@ -340,12 +355,12 @@ class TTS:
self.t2s_model = self.t2s_model.to(device)
if self.vits_model is not None:
self.vits_model = self.vits_model.to(device)
if self.bert_model is not None:
self.bert_model = self.bert_model.to(device)
if self.cnhuhbert_model is not None:
self.cnhuhbert_model = self.cnhuhbert_model.to(device)
if TTS.bert_model is not None:
TTS.bert_model = TTS.bert_model.to(device)
if TTS.cnhuhbert_model is not None:
TTS.cnhuhbert_model = TTS.cnhuhbert_model.to(device)
def set_ref_audio(self, ref_audio_path:str):
def set_ref_audio(self, ref_audio_path: str):
'''
To set the reference audio for the TTS model,
including the prompt_semantic and refer_spepc.
@ -374,8 +389,7 @@ class TTS:
# self.refer_spec = spec
self.prompt_cache["refer_spec"] = spec
def _set_prompt_semantic(self, ref_wav_path:str):
def _set_prompt_semantic(self, ref_wav_path: str):
zero_wav = np.zeros(
int(self.configs.sampling_rate * 0.3),
dtype=np.float16 if self.configs.is_half else np.float32,
@ -403,12 +417,12 @@ class TTS:
prompt_semantic = codes[0, 0].to(self.configs.device)
self.prompt_cache["prompt_semantic"] = prompt_semantic
def batch_sequences(self, sequences: List[torch.Tensor], axis: int = 0, pad_value: int = 0, max_length:int=None):
def batch_sequences(self, sequences: List[torch.Tensor], axis: int = 0, pad_value: int = 0, max_length: int = None):
seq = sequences[0]
ndim = seq.dim()
if axis < 0:
axis += ndim
dtype:torch.dtype = seq.dtype
dtype: torch.dtype = seq.dtype
pad_value = torch.tensor(pad_value, dtype=dtype)
seq_lengths = [seq.shape[axis] for seq in sequences]
if max_length is None:
@ -424,15 +438,15 @@ class TTS:
batch = torch.stack(padded_sequences)
return batch
def to_batch(self, data:list,
prompt_data:dict=None,
batch_size:int=5,
threshold:float=0.75,
split_bucket:bool=True,
device:torch.device=torch.device("cpu"),
precision:torch.dtype=torch.float32,
def to_batch(self, data: list,
prompt_data: dict = None,
batch_size: int = 5,
threshold: float = 0.75,
split_bucket: bool = True,
device: torch.device = torch.device("cpu"),
precision: torch.dtype = torch.float32,
):
_data:list = []
_data: list = []
index_and_len_list = []
for idx, item in enumerate(data):
norm_text_len = len(item["norm_text"])
@ -445,29 +459,28 @@ class TTS:
batch_index_list_len = 0
pos = 0
while pos <index_and_len_list.shape[0]:
while pos < index_and_len_list.shape[0]:
# batch_index_list.append(index_and_len_list[pos:min(pos+batch_size,len(index_and_len_list))])
pos_end = min(pos+batch_size,index_and_len_list.shape[0])
pos_end = min(pos + batch_size, index_and_len_list.shape[0])
while pos < pos_end:
batch=index_and_len_list[pos:pos_end, 1].astype(np.float32)
score=batch[(pos_end-pos)//2]/(batch.mean()+1e-8)
if (score>=threshold) or (pos_end-pos==1):
batch_index=index_and_len_list[pos:pos_end, 0].tolist()
batch = index_and_len_list[pos:pos_end, 1].astype(np.float32)
score = batch[(pos_end - pos) // 2] / (batch.mean() + 1e-8)
if (score >= threshold) or (pos_end - pos == 1):
batch_index = index_and_len_list[pos:pos_end, 0].tolist()
batch_index_list_len += len(batch_index)
batch_index_list.append(batch_index)
pos = pos_end
break
pos_end=pos_end-1
pos_end = pos_end - 1
assert batch_index_list_len == len(data)
else:
for i in range(len(data)):
if i%batch_size == 0:
if i % batch_size == 0:
batch_index_list.append([])
batch_index_list[-1].append(i)
for batch_idx, index_list in enumerate(batch_index_list):
item_list = [data[idx] for idx in index_list]
phones_list = []
@ -481,13 +494,13 @@ class TTS:
phones_max_len = 0
for item in item_list:
if prompt_data is not None:
all_bert_features = torch.cat([prompt_data["bert_features"], item["bert_features"]], 1)\
all_bert_features = torch.cat([prompt_data["bert_features"], item["bert_features"]], 1) \
.to(dtype=precision, device=device)
all_phones = torch.LongTensor(prompt_data["phones"]+item["phones"]).to(device)
all_phones = torch.LongTensor(prompt_data["phones"] + item["phones"]).to(device)
phones = torch.LongTensor(item["phones"]).to(device)
# norm_text = prompt_data["norm_text"]+item["norm_text"]
else:
all_bert_features = item["bert_features"]\
all_bert_features = item["bert_features"] \
.to(dtype=precision, device=device)
phones = torch.LongTensor(item["phones"]).to(device)
all_phones = phones
@ -507,7 +520,6 @@ class TTS:
all_phones_batch = all_phones_list
all_bert_features_batch = all_bert_features_list
max_len = max(bert_max_len, phones_max_len)
# phones_batch = self.batch_sequences(phones_list, axis=0, pad_value=0, max_length=max_len)
#### 直接对phones和bert_features进行pad。padding策略会影响T2S模型生成的结果但不直接影响复读概率。影响复读概率的主要因素是mask的策略
@ -539,7 +551,7 @@ class TTS:
return _data, batch_index_list
def recovery_order(self, data:list, batch_index_list:list)->list:
def recovery_order(self, data: list, batch_index_list: list) -> list:
'''
Recovery the order of the audio according to the batch_index_list.
@ -551,20 +563,20 @@ class TTS:
list (List[np.ndarray]): the data in the original order.
'''
length = len(sum(batch_index_list, []))
_data = [None]*length
_data = [None] * length
for i, index_list in enumerate(batch_index_list):
for j, index in enumerate(index_list):
_data[index] = data[i][j]
return _data
def stop(self,):
def stop(self, ):
'''
Stop the inference process.
'''
self.stop_flag = True
@torch.no_grad()
def run(self, inputs:dict):
def run(self, inputs: dict):
"""
Text to speech inference.
@ -594,16 +606,16 @@ class TTS:
tuple[int, np.ndarray]: sampling rate and audio data.
"""
########## variables initialization ###########
self.stop_flag:bool = False
text:str = inputs.get("text", "")
text_lang:str = inputs.get("text_lang", "")
ref_audio_path:str = inputs.get("ref_audio_path", "")
prompt_text:str = inputs.get("prompt_text", "")
prompt_lang:str = inputs.get("prompt_lang", "")
top_k:int = inputs.get("top_k", 5)
top_p:float = inputs.get("top_p", 1)
temperature:float = inputs.get("temperature", 1)
text_split_method:str = inputs.get("text_split_method", "cut0")
self.stop_flag: bool = False
text: str = inputs.get("text", "")
text_lang: str = inputs.get("text_lang", "")
ref_audio_path: str = inputs.get("ref_audio_path", "")
prompt_text: str = inputs.get("prompt_text", "")
prompt_lang: str = inputs.get("prompt_lang", "")
top_k: int = inputs.get("top_k", 5)
top_p: float = inputs.get("top_p", 1)
temperature: float = inputs.get("temperature", 1)
text_split_method: str = inputs.get("text_split_method", "cut0")
batch_size = inputs.get("batch_size", 1)
batch_threshold = inputs.get("batch_threshold", 0.75)
speed_factor = inputs.get("speed_factor", 1.0)
@ -632,7 +644,7 @@ class TTS:
if split_bucket:
print(i18n("分桶处理模式已开启"))
if fragment_interval<0.01:
if fragment_interval < 0.01:
fragment_interval = 0.01
print(i18n("分段间隔过小已自动设置为0.01"))
@ -646,7 +658,8 @@ class TTS:
if ref_audio_path in [None, ""] and \
((self.prompt_cache["prompt_semantic"] is None) or (self.prompt_cache["refer_spec"] is None)):
raise ValueError("ref_audio_path cannot be empty, when the reference audio is not set using set_ref_audio()")
raise ValueError(
"ref_audio_path cannot be empty, when the reference audio is not set using set_ref_audio()")
###### setting reference audio and prompt text preprocessing ########
t0 = ttime()
@ -670,7 +683,7 @@ class TTS:
###### text preprocessing ########
t1 = ttime()
data:list = None
data: list = None
if not return_fragment:
data = self.text_preprocessor.preprocess(text, text_lang, text_split_method)
if len(data) == 0:
@ -678,7 +691,7 @@ class TTS:
dtype=np.int16)
return
batch_index_list:list = None
batch_index_list: list = None
data, batch_index_list = self.to_batch(data,
prompt_data=self.prompt_cache if not no_prompt_text else None,
batch_size=batch_size,
@ -692,7 +705,7 @@ class TTS:
texts = self.text_preprocessor.pre_seg_text(text, text_lang, text_split_method)
data = []
for i in range(len(texts)):
if i%batch_size == 0:
if i % batch_size == 0:
data.append([])
data[-1].append(texts[i])
@ -700,10 +713,11 @@ class TTS:
batch_data = []
print(i18n("############ 提取文本Bert特征 ############"))
for text in tqdm(batch_texts):
phones, bert_features, norm_text = self.text_preprocessor.segment_and_extract_feature_for_text(text, text_lang)
phones, bert_features, norm_text = self.text_preprocessor.segment_and_extract_feature_for_text(text,
text_lang)
if phones is None:
continue
res={
res = {
"phones": phones,
"bert_features": bert_features,
"norm_text": norm_text,
@ -721,7 +735,6 @@ class TTS:
)
return batch[0]
t2 = ttime()
try:
print("############ 推理 ############")
@ -736,21 +749,21 @@ class TTS:
if item is None:
continue
batch_phones:List[torch.LongTensor] = item["phones"]
batch_phones: List[torch.LongTensor] = item["phones"]
# batch_phones:torch.LongTensor = item["phones"]
batch_phones_len:torch.LongTensor = item["phones_len"]
all_phoneme_ids:torch.LongTensor = item["all_phones"]
all_phoneme_lens:torch.LongTensor = item["all_phones_len"]
all_bert_features:torch.LongTensor = item["all_bert_features"]
norm_text:str = item["norm_text"]
batch_phones_len: torch.LongTensor = item["phones_len"]
all_phoneme_ids: torch.LongTensor = item["all_phones"]
all_phoneme_lens: torch.LongTensor = item["all_phones_len"]
all_bert_features: torch.LongTensor = item["all_bert_features"]
norm_text: str = item["norm_text"]
max_len = item["max_len"]
print(i18n("前端处理后的文本(每句):"), norm_text)
if no_prompt_text :
if no_prompt_text:
prompt = None
else:
prompt = self.prompt_cache["prompt_semantic"].expand(len(all_phoneme_ids), -1).to(self.configs.device)
prompt = self.prompt_cache["prompt_semantic"].expand(len(all_phoneme_ids), -1).to(
self.configs.device)
pred_semantic_list, idx_list = self.t2s_model.model.infer_panel(
all_phoneme_ids,
@ -768,7 +781,7 @@ class TTS:
t4 = ttime()
t_34 += t4 - t3
refer_audio_spec:torch.Tensor = self.prompt_cache["refer_spec"]\
refer_audio_spec: torch.Tensor = self.prompt_cache["refer_spec"] \
.to(dtype=self.precision, device=self.configs.device)
batch_audio_fragment = []
@ -792,15 +805,17 @@ class TTS:
# ## vits并行推理 method 2
pred_semantic_list = [item[-idx:] for item, idx in zip(pred_semantic_list, idx_list)]
upsample_rate = math.prod(self.vits_model.upsample_rates)
audio_frag_idx = [pred_semantic_list[i].shape[0]*2*upsample_rate for i in range(0, len(pred_semantic_list))]
audio_frag_end_idx = [ sum(audio_frag_idx[:i+1]) for i in range(0, len(audio_frag_idx))]
audio_frag_idx = [pred_semantic_list[i].shape[0] * 2 * upsample_rate for i in
range(0, len(pred_semantic_list))]
audio_frag_end_idx = [sum(audio_frag_idx[:i + 1]) for i in range(0, len(audio_frag_idx))]
all_pred_semantic = torch.cat(pred_semantic_list).unsqueeze(0).unsqueeze(0).to(self.configs.device)
_batch_phones = torch.cat(batch_phones).unsqueeze(0).to(self.configs.device)
_batch_audio_fragment = (self.vits_model.decode(
all_pred_semantic, _batch_phones, refer_audio_spec
).detach()[0, 0, :])
audio_frag_end_idx.insert(0, 0)
batch_audio_fragment= [_batch_audio_fragment[audio_frag_end_idx[i-1]:audio_frag_end_idx[i]] for i in range(1, len(audio_frag_end_idx))]
batch_audio_fragment = [_batch_audio_fragment[audio_frag_end_idx[i - 1]:audio_frag_end_idx[i]] for i in
range(1, len(audio_frag_end_idx))]
# ## vits串行推理
# for i, idx in enumerate(idx_list):
@ -868,13 +883,13 @@ class TTS:
pass
def audio_postprocess(self,
audio:List[torch.Tensor],
sr:int,
batch_index_list:list=None,
speed_factor:float=1.0,
split_bucket:bool=True,
fragment_interval:float=0.3
)->tuple[int, np.ndarray]:
audio: List[torch.Tensor],
sr: int,
batch_index_list: list = None,
speed_factor: float = 1.0,
split_bucket: bool = True,
fragment_interval: float = 0.3
) -> tuple[int, np.ndarray]:
zero_wav = torch.zeros(
int(self.configs.sampling_rate * fragment_interval),
dtype=self.precision,
@ -883,19 +898,17 @@ class TTS:
for i, batch in enumerate(audio):
for j, audio_fragment in enumerate(batch):
max_audio=torch.abs(audio_fragment).max()#简单防止16bit爆音
if max_audio>1: audio_fragment/=max_audio
audio_fragment:torch.Tensor = torch.cat([audio_fragment, zero_wav], dim=0)
max_audio = torch.abs(audio_fragment).max() # 简单防止16bit爆音
if max_audio > 1: audio_fragment /= max_audio
audio_fragment: torch.Tensor = torch.cat([audio_fragment, zero_wav], dim=0)
audio[i][j] = audio_fragment.cpu().numpy()
if split_bucket:
audio = self.recovery_order(audio, batch_index_list)
else:
# audio = [item for batch in audio for item in batch]
audio = sum(audio, [])
audio = np.concatenate(audio, 0)
audio = (audio * 32768).astype(np.int16)
@ -908,9 +921,7 @@ class TTS:
return sr, audio
def speed_change(input_audio:np.ndarray, speed:float, sr:int):
def speed_change(input_audio: np.ndarray, speed: float, sr: int):
# 将 NumPy 数组转换为原始 PCM 流
raw_audio = input_audio.astype(np.int16).tobytes()

View File

@ -1,6 +1,7 @@
custom:
bert_base_path: GPT_SoVITS/pretrained_models/chinese-roberta-wwm-ext-large
cnhuhbert_base_path: GPT_SoVITS/pretrained_models/chinese-hubert-base
load_base: true
device: cuda
is_half: true
t2s_weights_path: GPT_SoVITS/pretrained_models/s1bert25hz-2kh-longer-epoch=68e-step=50232.ckpt

View File

@ -0,0 +1,15 @@
custom:
bert_base_path: GPT_SoVITS/pretrained_models/chinese-roberta-wwm-ext-large
cnhuhbert_base_path: GPT_SoVITS/pretrained_models/chinese-hubert-base
device: cpu
is_half: false
load_base: false
t2s_weights_path: GPT_weights/voice1-e10.ckpt
vits_weights_path: SoVITS_weights/voice1_e8_s192.pth
default:
bert_base_path: GPT_SoVITS/pretrained_models/chinese-roberta-wwm-ext-large
cnhuhbert_base_path: GPT_SoVITS/pretrained_models/chinese-hubert-base
device: cpu
is_half: false
t2s_weights_path: GPT_SoVITS/pretrained_models/s1bert25hz-2kh-longer-epoch=68e-step=50232.ckpt
vits_weights_path: GPT_SoVITS/pretrained_models/s2G488k.pth

View File

@ -0,0 +1,15 @@
custom:
bert_base_path: GPT_SoVITS/pretrained_models/chinese-roberta-wwm-ext-large
cnhuhbert_base_path: GPT_SoVITS/pretrained_models/chinese-hubert-base
device: cpu
is_half: false
load_base: false
t2s_weights_path: GPT_weights/voice1-e10.ckpt
vits_weights_path: SoVITS_weights/voice1_e8_s192.pth
default:
bert_base_path: GPT_SoVITS/pretrained_models/chinese-roberta-wwm-ext-large
cnhuhbert_base_path: GPT_SoVITS/pretrained_models/chinese-hubert-base
device: cpu
is_half: false
t2s_weights_path: GPT_SoVITS/pretrained_models/s1bert25hz-2kh-longer-epoch=68e-step=50232.ckpt
vits_weights_path: GPT_SoVITS/pretrained_models/s2G488k.pth

470
api_v3.py Normal file
View File

@ -0,0 +1,470 @@
"""
# WebAPI文档 (3.0) - 使用了缓存技术初始化时使用LRU Cache TTS 实例,缓存加载模型的世界,达到减少切换不同语音时的推理时间
` python api_v2.py -a 127.0.0.1 -p 9880 -c GPT_SoVITS/configs/tts_infer.yaml `
## 执行参数:
`-a` - `绑定地址, 默认"127.0.0.1"`
`-p` - `绑定端口, 默认9880`
`-c` - `TTS配置文件路径, 默认"GPT_SoVITS/configs/tts_infer.yaml"`
## 调用:
### 推理
endpoint: `/tts`
GET:
```
http://127.0.0.1:9880/tts?text=先帝创业未半而中道崩殂今天下三分益州疲弊此诚危急存亡之秋也&text_lang=zh&ref_audio_path=archive_jingyuan_1.wav&prompt_lang=zh&prompt_text=我是罗浮云骑将军景元不必拘谨将军只是一时的身份你称呼我景元便可&text_split_method=cut5&batch_size=1&media_type=wav&streaming_mode=true
```
POST:
```json
{
"text": "", # str.(required) text to be synthesized
"text_lang": "", # str.(required) language of the text to be synthesized
"ref_audio_path": "", # str.(required) reference audio path.
"prompt_text": "", # str.(optional) prompt text for the reference audio
"prompt_lang": "", # str.(required) language of the prompt text for the reference audio
"top_k": 5, # int.(optional) top k sampling
"top_p": 1, # float.(optional) top p sampling
"temperature": 1, # float.(optional) temperature for sampling
"text_split_method": "cut5", # str.(optional) text split method, see text_segmentation_method.py for details.
"batch_size": 1, # int.(optional) batch size for inference
"batch_threshold": 0.75, # float.(optional) threshold for batch splitting.
"split_bucket": true, # bool.(optional) whether to split the batch into multiple buckets.
"speed_factor":1.0, # float.(optional) control the speed of the synthesized audio.
"fragment_interval":0.3, # float.(optional) to control the interval of the audio fragment.
"seed": -1, # int.(optional) random seed for reproducibility.
"media_type": "wav", # str.(optional) media type of the output audio, support "wav", "raw", "ogg", "aac".
"streaming_mode": false, # bool.(optional) whether to return a streaming response.
"parallel_infer": True, # bool.(optional) whether to use parallel inference.
"repetition_penalty": 1.35, # float.(optional) repetition penalty for T2S model.
"tts_infer_yaml_path": GPT_SoVITS/configs/tts_infer.yaml # str.(optional) tts infer yaml path
}
```
RESP:
成功: 直接返回 wav 音频流 http code 200
失败: 返回包含错误信息的 json, http code 400
### 命令控制
endpoint: `/control`
command:
"restart": 重新运行
"exit": 结束运行
GET:
```
http://127.0.0.1:9880/control?command=restart
```
POST:
```json
{
"command": "restart"
}
```
RESP:
### 切换GPT模型
endpoint: `/set_gpt_weights`
GET:
```
http://127.0.0.1:9880/set_gpt_weights?weights_path=GPT_SoVITS/pretrained_models/s1bert25hz-2kh-longer-epoch=68e-step=50232.ckpt
```
RESP:
成功: 返回"success", http code 200
失败: 返回包含错误信息的 json, http code 400
### 切换Sovits模型
endpoint: `/set_sovits_weights`
GET:
```
http://127.0.0.1:9880/set_sovits_weights?weights_path=GPT_SoVITS/pretrained_models/s2G488k.pth
```
RESP:
成功: 返回"success", http code 200
失败: 返回包含错误信息的 json, http code 400
"""
import os
import sys
import traceback
from typing import Generator
import torch
now_dir = os.getcwd()
sys.path.append(now_dir)
sys.path.append("%s/GPT_SoVITS" % (now_dir))
import argparse
import subprocess
import wave
import signal
import numpy as np
import soundfile as sf
from fastapi import Response
from fastapi.responses import JSONResponse
from fastapi import FastAPI
import uvicorn
from io import BytesIO
from GPT_SoVITS.TTS_infer_pack.TTS import TTS, TTS_Config
from GPT_SoVITS.TTS_infer_pack.text_segmentation_method import get_method_names as get_cut_method_names
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from functools import lru_cache
cut_method_names = get_cut_method_names()
parser = argparse.ArgumentParser(description="GPT-SoVITS api")
parser.add_argument("-a", "--bind_addr", type=str, default="0.0.0.0", help="default: 0.0.0.0")
parser.add_argument("-p", "--port", type=int, default="9880", help="default: 9880")
args = parser.parse_args()
port = args.port
host = args.bind_addr
argv = sys.argv
default_tts_config = TTS_Config()
TTS.init_base_models(default_tts_config)
APP = FastAPI()
class TTS_Request(BaseModel):
text: str = None
text_lang: str = None
ref_audio_path: str = None
prompt_lang: str = None
prompt_text: str = ""
top_k: int = 5
top_p: float = 1
temperature: float = 1
text_split_method: str = "cut5"
batch_size: int = 1
batch_threshold: float = 0.75
split_bucket: bool = True
speed_factor: float = 1.0
fragment_interval: float = 0.3
seed: int = -1
media_type: str = "wav"
streaming_mode: bool = False
parallel_infer: bool = True
repetition_penalty: float = 1.35
tts_infer_yaml_path: str = None
"""推理时需要加载的声音模型的yaml配置文件路径GPT_SoVITS/configs/tts_infer.yaml"""
@lru_cache(maxsize=10)
def get_tts_instance(tts_config: TTS_Config) -> TTS:
print(f"load tts config from {tts_config.configs_path}")
return TTS(tts_config)
def pack_ogg(io_buffer: BytesIO, data: np.ndarray, rate: int):
"""modify from https://github.com/RVC-Boss/GPT-SoVITS/pull/894/files"""
with sf.SoundFile(io_buffer, mode='w', samplerate=rate, channels=1, format='ogg') as audio_file:
audio_file.write(data)
return io_buffer
def pack_raw(io_buffer: BytesIO, data: np.ndarray, rate: int):
io_buffer.write(data.tobytes())
return io_buffer
def pack_wav(io_buffer: BytesIO, data: np.ndarray, rate: int):
io_buffer = BytesIO()
sf.write(io_buffer, data, rate, format='wav')
return io_buffer
def pack_aac(io_buffer: BytesIO, data: np.ndarray, rate: int):
process = subprocess.Popen([
'ffmpeg',
'-f', 's16le', # 输入16位有符号小端整数PCM
'-ar', str(rate), # 设置采样率
'-ac', '1', # 单声道
'-i', 'pipe:0', # 从管道读取输入
'-c:a', 'aac', # 音频编码器为AAC
'-b:a', '192k', # 比特率
'-vn', # 不包含视频
'-f', 'adts', # 输出AAC数据流格式
'pipe:1' # 将输出写入管道
], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, _ = process.communicate(input=data.tobytes())
io_buffer.write(out)
return io_buffer
def pack_audio(io_buffer: BytesIO, data: np.ndarray, rate: int, media_type: str):
if media_type == "ogg":
io_buffer = pack_ogg(io_buffer, data, rate)
elif media_type == "aac":
io_buffer = pack_aac(io_buffer, data, rate)
elif media_type == "wav":
io_buffer = pack_wav(io_buffer, data, rate)
else:
io_buffer = pack_raw(io_buffer, data, rate)
io_buffer.seek(0)
return io_buffer
# from https://huggingface.co/spaces/coqui/voice-chat-with-mistral/blob/main/app.py
def wave_header_chunk(frame_input=b"", channels=1, sample_width=2, sample_rate=32000):
# This will create a wave header then append the frame input
# It should be first on a streaming wav file
# Other frames better should not have it (else you will hear some artifacts each chunk start)
wav_buf = BytesIO()
with wave.open(wav_buf, "wb") as vfout:
vfout.setnchannels(channels)
vfout.setsampwidth(sample_width)
vfout.setframerate(sample_rate)
vfout.writeframes(frame_input)
wav_buf.seek(0)
return wav_buf.read()
def handle_control(command: str):
if command == "restart":
os.execl(sys.executable, sys.executable, *argv)
elif command == "exit":
os.kill(os.getpid(), signal.SIGTERM)
exit(0)
def check_params(req: dict, tts_config: TTS_Config):
text: str = req.get("text", "")
text_lang: str = req.get("text_lang", "")
ref_audio_path: str = req.get("ref_audio_path", "")
streaming_mode: bool = req.get("streaming_mode", False)
media_type: str = req.get("media_type", "wav")
prompt_lang: str = req.get("prompt_lang", "")
text_split_method: str = req.get("text_split_method", "cut5")
if ref_audio_path in [None, ""]:
return JSONResponse(status_code=400, content={"message": "ref_audio_path is required"})
if text in [None, ""]:
return JSONResponse(status_code=400, content={"message": "text is required"})
if (text_lang in [None, ""]):
return JSONResponse(status_code=400, content={"message": "text_lang is required"})
elif text_lang.lower() not in tts_config.languages:
return JSONResponse(status_code=400, content={"message": "text_lang is not supported"})
if (prompt_lang in [None, ""]):
return JSONResponse(status_code=400, content={"message": "prompt_lang is required"})
elif prompt_lang.lower() not in tts_config.languages:
return JSONResponse(status_code=400, content={"message": "prompt_lang is not supported"})
if media_type not in ["wav", "raw", "ogg", "aac"]:
return JSONResponse(status_code=400, content={"message": "media_type is not supported"})
elif media_type == "ogg" and not streaming_mode:
return JSONResponse(status_code=400, content={"message": "ogg format is not supported in non-streaming mode"})
if text_split_method not in cut_method_names:
return JSONResponse(status_code=400,
content={"message": f"text_split_method:{text_split_method} is not supported"})
return None
async def tts_handle(req: dict):
"""
Text to speech handler.
Args:
req (dict):
{
"text": "", # str.(required) text to be synthesized
"text_lang: "", # str.(required) language of the text to be synthesized
"ref_audio_path": "", # str.(required) reference audio path
"prompt_text": "", # str.(optional) prompt text for the reference audio
"prompt_lang": "", # str.(required) language of the prompt text for the reference audio
"top_k": 5, # int. top k sampling
"top_p": 1, # float. top p sampling
"temperature": 1, # float. temperature for sampling
"text_split_method": "cut5", # str. text split method, see text_segmentation_method.py for details.
"batch_size": 1, # int. batch size for inference
"batch_threshold": 0.75, # float. threshold for batch splitting.
"split_bucket: True, # bool. whether to split the batch into multiple buckets.
"speed_factor":1.0, # float. control the speed of the synthesized audio.
"fragment_interval":0.3, # float. to control the interval of the audio fragment.
"seed": -1, # int. random seed for reproducibility.
"media_type": "wav", # str. media type of the output audio, support "wav", "raw", "ogg", "aac".
"streaming_mode": False, # bool. whether to return a streaming response.
"parallel_infer": True, # bool.(optional) whether to use parallel inference.
"repetition_penalty": 1.35 # float.(optional) repetition penalty for T2S model.
}
returns:
StreamingResponse: audio stream response.
"""
streaming_mode = req.get("streaming_mode", False)
media_type = req.get("media_type", "wav")
tts_infer_yaml_path = req.get("tts_infer_yaml_path", "GPT_SoVITS/configs/tts_infer.yaml")
tts_config = TTS_Config(tts_infer_yaml_path)
check_res = check_params(req, tts_config)
if check_res is not None:
return check_res
if streaming_mode:
req["return_fragment"] = True
try:
tts_instance = get_tts_instance(tts_config)
move_to_gpu(tts_instance, tts_config)
tts_generator = tts_instance.run(req)
if streaming_mode:
def streaming_generator(tts_generator: Generator, media_type: str):
if media_type == "wav":
yield wave_header_chunk()
media_type = "raw"
for sr, chunk in tts_generator:
yield pack_audio(BytesIO(), chunk, sr, media_type).getvalue()
move_to_cpu(tts_instance)
# _media_type = f"audio/{media_type}" if not (streaming_mode and media_type in ["wav", "raw"]) else f"audio/x-{media_type}"
return StreamingResponse(streaming_generator(tts_generator, media_type, ), media_type=f"audio/{media_type}")
else:
sr, audio_data = next(tts_generator)
audio_data = pack_audio(BytesIO(), audio_data, sr, media_type).getvalue()
move_to_cpu(tts_instance)
return Response(audio_data, media_type=f"audio/{media_type}")
except Exception as e:
return JSONResponse(status_code=400, content={"message": f"tts failed", "Exception": str(e)})
def move_to_cpu(tts):
cpu_device = torch.device('cpu')
tts.set_device(cpu_device)
print("Moved TTS models to CPU to save GPU memory.")
def move_to_gpu(tts: TTS, tts_config: TTS_Config):
tts.set_device(tts_config.device)
print("Moved TTS models back to GPU for performance.")
@APP.get("/control")
async def control(command: str = None):
if command is None:
return JSONResponse(status_code=400, content={"message": "command is required"})
handle_control(command)
@APP.get("/tts")
async def tts_get_endpoint(
text: str = None,
text_lang: str = None,
ref_audio_path: str = None,
prompt_lang: str = None,
prompt_text: str = "",
top_k: int = 5,
top_p: float = 1,
temperature: float = 1,
text_split_method: str = "cut0",
batch_size: int = 1,
batch_threshold: float = 0.75,
split_bucket: bool = True,
speed_factor: float = 1.0,
fragment_interval: float = 0.3,
seed: int = -1,
media_type: str = "wav",
streaming_mode: bool = False,
parallel_infer: bool = True,
repetition_penalty: float = 1.35,
tts_infer_yaml_path: str = "GPT_SoVITS/configs/tts_infer.yaml"
):
req = {
"text": text,
"text_lang": text_lang.lower(),
"ref_audio_path": ref_audio_path,
"prompt_text": prompt_text,
"prompt_lang": prompt_lang.lower(),
"top_k": top_k,
"top_p": top_p,
"temperature": temperature,
"text_split_method": text_split_method,
"batch_size": int(batch_size),
"batch_threshold": float(batch_threshold),
"speed_factor": float(speed_factor),
"split_bucket": split_bucket,
"fragment_interval": fragment_interval,
"seed": seed,
"media_type": media_type,
"streaming_mode": streaming_mode,
"parallel_infer": parallel_infer,
"repetition_penalty": float(repetition_penalty),
"tts_infer_yaml_path": tts_infer_yaml_path
}
return await tts_handle(req)
@APP.post("/tts")
async def tts_post_endpoint(request: TTS_Request):
req = request.dict()
return await tts_handle(req)
@APP.get("/set_refer_audio")
async def set_refer_audio(refer_audio_path: str = None, tts_infer_yaml_path: str = "GPT_SoVITS/configs/tts_infer.yaml"):
try:
tts_config = TTS_Config(tts_infer_yaml_path)
tts_instance = get_tts_instance(tts_config)
tts_instance.set_ref_audio(refer_audio_path)
except Exception as e:
return JSONResponse(status_code=400, content={"message": f"set refer audio failed", "Exception": str(e)})
return JSONResponse(status_code=200, content={"message": "success"})
@APP.get("/set_gpt_weights")
async def set_gpt_weights(weights_path: str = None, tts_infer_yaml_path: str = "GPT_SoVITS/configs/tts_infer.yaml"):
try:
if weights_path in ["", None]:
return JSONResponse(status_code=400, content={"message": "gpt weight path is required"})
tts_config = TTS_Config(tts_infer_yaml_path)
tts_instance = get_tts_instance(tts_config)
tts_instance.init_t2s_weights(weights_path)
except Exception as e:
return JSONResponse(status_code=400, content={"message": f"change gpt weight failed", "Exception": str(e)})
return JSONResponse(status_code=200, content={"message": "success"})
@APP.get("/set_sovits_weights")
async def set_sovits_weights(weights_path: str = None, tts_infer_yaml_path: str = "GPT_SoVITS/configs/tts_infer.yaml"):
try:
if weights_path in ["", None]:
return JSONResponse(status_code=400, content={"message": "sovits weight path is required"})
tts_config = TTS_Config(tts_infer_yaml_path)
tts_instance = get_tts_instance(tts_config)
tts_instance.init_vits_weights(weights_path)
except Exception as e:
return JSONResponse(status_code=400, content={"message": f"change sovits weight failed", "Exception": str(e)})
return JSONResponse(status_code=200, content={"message": "success"})
if __name__ == "__main__":
try:
uvicorn.run(APP, host=host, port=port, workers=1)
except Exception as e:
traceback.print_exc()
os.kill(os.getpid(), signal.SIGTERM)
exit(0)