diff --git a/GPT_SoVITS/text/chinese2.py b/GPT_SoVITS/text/chinese2.py index dcce0d96..acfebfe2 100644 --- a/GPT_SoVITS/text/chinese2.py +++ b/GPT_SoVITS/text/chinese2.py @@ -180,10 +180,15 @@ def _merge_erhua(initials: list[str], finals: list[str], word: str, pos: str) -> def _g2p(segments): phones_list = [] word2ph = [] - for seg in segments: + g2pw_batch_results = [] + g2pw_batch_cursor = 0 + processed_segments = [re.sub("[a-zA-Z]+", "", seg) for seg in segments] + if is_g2pw: + batch_inputs = [seg for seg in processed_segments if seg] + g2pw_batch_results = g2pw._g2pw(batch_inputs) if batch_inputs else [] + + for seg in processed_segments: pinyins = [] - # Replace all English words in the sentence - seg = re.sub("[a-zA-Z]+", "", seg) seg_cut = psg.lcut(seg) seg_cut = tone_modifier.pre_merge_for_modify(seg_cut) initials = [] @@ -204,8 +209,10 @@ def _g2p(segments): finals = sum(finals, []) print("pypinyin结果", initials, finals) else: - # g2pw采用整句推理 - pinyins = g2pw.lazy_pinyin(seg, neutral_tone_with_five=True, style=Style.TONE3) + # g2pw采用整句推理(批量推理,逐句取结果) + if seg: + pinyins = g2pw_batch_results[g2pw_batch_cursor] + g2pw_batch_cursor += 1 pre_word_length = 0 for word, pos in seg_cut: diff --git a/GPT_SoVITS/text/g2pw/dataset.py b/GPT_SoVITS/text/g2pw/dataset.py index ff09cbc2..e464c29a 100644 --- a/GPT_SoVITS/text/g2pw/dataset.py +++ b/GPT_SoVITS/text/g2pw/dataset.py @@ -18,6 +18,7 @@ Credits from typing import Dict from typing import List +from typing import Optional from typing import Tuple import numpy as np @@ -37,6 +38,8 @@ def prepare_onnx_input( use_mask: bool = False, window_size: int = None, max_len: int = 512, + char2id: Optional[Dict[str, int]] = None, + char_phoneme_masks: Optional[Dict[str, List[int]]] = None, ) -> Dict[str, np.array]: if window_size is not None: truncated_texts, truncated_query_ids = _truncate_texts( @@ -48,33 +51,88 @@ def prepare_onnx_input( phoneme_masks = [] char_ids = [] position_ids = [] + tokenized_cache = {} + + if char2id is None: + char2id = {char: idx for idx, char in enumerate(chars)} + if use_mask: + if char_phoneme_masks is None: + char_phoneme_masks = { + char: [1 if i in char2phonemes[char] else 0 for i in range(len(labels))] + for char in char2phonemes + } + else: + full_phoneme_mask = [1] * len(labels) for idx in range(len(texts)): text = (truncated_texts if window_size else texts)[idx].lower() query_id = (truncated_query_ids if window_size else query_ids)[idx] - try: - tokens, text2token, token2text = tokenize_and_map(tokenizer=tokenizer, text=text) - except Exception: - print(f'warning: text "{text}" is invalid') - return {} + cached = tokenized_cache.get(text) + if cached is None: + try: + tokens, text2token, token2text = tokenize_and_map(tokenizer=tokenizer, text=text) + except Exception: + print(f'warning: text "{text}" is invalid') + return {} - text, query_id, tokens, text2token, token2text = _truncate( - max_len=max_len, text=text, query_id=query_id, tokens=tokens, text2token=text2token, token2text=token2text - ) + if len(tokens) <= max_len - 2: + processed_tokens = ["[CLS]"] + tokens + ["[SEP]"] + shared_input_id = list(np.array(tokenizer.convert_tokens_to_ids(processed_tokens))) + shared_token_type_id = list(np.zeros((len(processed_tokens),), dtype=int)) + shared_attention_mask = list(np.ones((len(processed_tokens),), dtype=int)) + cached = { + "is_short": True, + "tokens": tokens, + "text2token": text2token, + "token2text": token2text, + "input_id": shared_input_id, + "token_type_id": shared_token_type_id, + "attention_mask": shared_attention_mask, + } + else: + cached = { + "is_short": False, + "tokens": tokens, + "text2token": text2token, + "token2text": token2text, + } + tokenized_cache[text] = cached - processed_tokens = ["[CLS]"] + tokens + ["[SEP]"] + if cached["is_short"]: + text_for_query = text + query_id_for_query = query_id + text2token_for_query = cached["text2token"] + input_id = cached["input_id"] + token_type_id = cached["token_type_id"] + attention_mask = cached["attention_mask"] + else: + ( + text_for_query, + query_id_for_query, + tokens_for_query, + text2token_for_query, + _token2text_for_query, + ) = _truncate( + max_len=max_len, + text=text, + query_id=query_id, + tokens=cached["tokens"], + text2token=cached["text2token"], + token2text=cached["token2text"], + ) + processed_tokens = ["[CLS]"] + tokens_for_query + ["[SEP]"] + input_id = list(np.array(tokenizer.convert_tokens_to_ids(processed_tokens))) + token_type_id = list(np.zeros((len(processed_tokens),), dtype=int)) + attention_mask = list(np.ones((len(processed_tokens),), dtype=int)) - input_id = list(np.array(tokenizer.convert_tokens_to_ids(processed_tokens))) - token_type_id = list(np.zeros((len(processed_tokens),), dtype=int)) - attention_mask = list(np.ones((len(processed_tokens),), dtype=int)) - - query_char = text[query_id] - phoneme_mask = ( - [1 if i in char2phonemes[query_char] else 0 for i in range(len(labels))] if use_mask else [1] * len(labels) - ) - char_id = chars.index(query_char) - position_id = text2token[query_id] + 1 # [CLS] token locate at first place + query_char = text_for_query[query_id_for_query] + if use_mask: + phoneme_mask = char_phoneme_masks[query_char] + else: + phoneme_mask = full_phoneme_mask + char_id = char2id[query_char] + position_id = text2token_for_query[query_id_for_query] + 1 # [CLS] token locate at first place input_ids.append(input_id) token_type_ids.append(token_type_id) @@ -83,10 +141,15 @@ def prepare_onnx_input( char_ids.append(char_id) position_ids.append(position_id) + max_token_length = max(len(seq) for seq in input_ids) + + def _pad_sequences(sequences, pad_value=0): + return [seq + [pad_value] * (max_token_length - len(seq)) for seq in sequences] + outputs = { - "input_ids": np.array(input_ids).astype(np.int64), - "token_type_ids": np.array(token_type_ids).astype(np.int64), - "attention_masks": np.array(attention_masks).astype(np.int64), + "input_ids": np.array(_pad_sequences(input_ids, pad_value=0)).astype(np.int64), + "token_type_ids": np.array(_pad_sequences(token_type_ids, pad_value=0)).astype(np.int64), + "attention_masks": np.array(_pad_sequences(attention_masks, pad_value=0)).astype(np.int64), "phoneme_masks": np.array(phoneme_masks).astype(np.float32), "char_ids": np.array(char_ids).astype(np.int64), "position_ids": np.array(position_ids).astype(np.int64), diff --git a/GPT_SoVITS/text/g2pw/onnx_api.py b/GPT_SoVITS/text/g2pw/onnx_api.py index 1d5e4231..3c2b0169 100644 --- a/GPT_SoVITS/text/g2pw/onnx_api.py +++ b/GPT_SoVITS/text/g2pw/onnx_api.py @@ -10,7 +10,6 @@ from typing import Any, Dict, List, Tuple import numpy as np import onnxruntime import requests -import torch from opencc import OpenCC from pypinyin import Style, pinyin from transformers.models.auto.tokenization_auto import AutoTokenizer @@ -22,9 +21,8 @@ from .utils import load_config onnxruntime.set_default_logger_severity(3) try: onnxruntime.preload_dlls() -except: +except Exception: pass - # traceback.print_exc() warnings.filterwarnings("ignore") model_version = "1.1" @@ -55,6 +53,24 @@ def predict(session, onnx_input: Dict[str, Any], labels: List[str]) -> Tuple[Lis return all_preds, all_confidences +def _load_json_from_candidates(filename: str, candidate_dirs: List[str]) -> Dict[str, Any]: + for candidate_dir in candidate_dirs: + if not candidate_dir: + continue + json_path = os.path.join(candidate_dir, filename) + if os.path.exists(json_path): + with open(json_path, "r", encoding="utf-8") as fr: + return json.load(fr) + raise FileNotFoundError(f"Cannot locate {filename} in candidate dirs: {candidate_dirs}") + + +def _find_first_existing_file(*paths: str) -> str: + for path in paths: + if path and os.path.exists(path): + return path + raise FileNotFoundError(f"Files not found: {paths}") + + def download_and_decompress(model_dir: str = "G2PWModel/"): if not os.path.exists(model_dir): parent_directory = os.path.dirname(model_dir) @@ -62,7 +78,7 @@ def download_and_decompress(model_dir: str = "G2PWModel/"): extract_dir = os.path.join(parent_directory, "G2PWModel_1.1") extract_dir_new = os.path.join(parent_directory, "G2PWModel") print("Downloading g2pw model...") - modelscope_url = "https://www.modelscope.cn/models/kamiorinn/g2pw/resolve/master/G2PWModel_1.1.zip" # "https://paddlespeech.cdn.bcebos.com/Parakeet/released_models/g2p/G2PWModel_1.1.zip" + modelscope_url = "https://www.modelscope.cn/models/kamiorinn/g2pw/resolve/master/G2PWModel_1.1.zip" with requests.get(modelscope_url, stream=True) as r: r.raise_for_status() with open(zip_dir, "wb") as f: @@ -79,7 +95,7 @@ def download_and_decompress(model_dir: str = "G2PWModel/"): return model_dir -class G2PWOnnxConverter: +class _G2PWBaseOnnxConverter: def __init__( self, model_dir: str = "G2PWModel/", @@ -87,33 +103,16 @@ class G2PWOnnxConverter: model_source: str = None, enable_non_tradional_chinese: bool = False, ): - uncompress_path = download_and_decompress(model_dir) - - sess_options = onnxruntime.SessionOptions() - sess_options.graph_optimization_level = onnxruntime.GraphOptimizationLevel.ORT_ENABLE_ALL - sess_options.execution_mode = onnxruntime.ExecutionMode.ORT_SEQUENTIAL - sess_options.intra_op_num_threads = 2 if torch.cuda.is_available() else 0 - if "CUDAExecutionProvider" in onnxruntime.get_available_providers(): - self.session_g2pW = onnxruntime.InferenceSession( - os.path.join(uncompress_path, "g2pW.onnx"), - sess_options=sess_options, - providers=["CUDAExecutionProvider", "CPUExecutionProvider"], - ) - else: - self.session_g2pW = onnxruntime.InferenceSession( - os.path.join(uncompress_path, "g2pW.onnx"), - sess_options=sess_options, - providers=["CPUExecutionProvider"], - ) - self.config = load_config(config_path=os.path.join(uncompress_path, "config.py"), use_default=True) + self.model_dir = download_and_decompress(model_dir) + self.config = load_config(config_path=os.path.join(self.model_dir, "config.py"), use_default=True) self.model_source = model_source if model_source else self.config.model_source self.enable_opencc = enable_non_tradional_chinese - self.tokenizer = AutoTokenizer.from_pretrained(self.model_source) - polyphonic_chars_path = os.path.join(uncompress_path, "POLYPHONIC_CHARS.txt") - monophonic_chars_path = os.path.join(uncompress_path, "MONOPHONIC_CHARS.txt") + polyphonic_chars_path = os.path.join(self.model_dir, "POLYPHONIC_CHARS.txt") + monophonic_chars_path = os.path.join(self.model_dir, "MONOPHONIC_CHARS.txt") + self.polyphonic_chars = [ line.split("\t") for line in open(polyphonic_chars_path, encoding="utf-8").read().strip().split("\n") ] @@ -149,31 +148,47 @@ class G2PWOnnxConverter: ) self.chars = sorted(list(self.char2phonemes.keys())) + self.char2id = {char: idx for idx, char in enumerate(self.chars)} + self.char_phoneme_masks = ( + { + char: [1 if i in self.char2phonemes[char] else 0 for i in range(len(self.labels))] + for char in self.char2phonemes + } + if self.config.use_mask + else None + ) self.polyphonic_chars_new = set(self.chars) for char in self.non_polyphonic: - if char in self.polyphonic_chars_new: - self.polyphonic_chars_new.remove(char) + self.polyphonic_chars_new.discard(char) self.monophonic_chars_dict = {char: phoneme for char, phoneme in self.monophonic_chars} for char in self.non_monophonic: - if char in self.monophonic_chars_dict: - self.monophonic_chars_dict.pop(char) + self.monophonic_chars_dict.pop(char, None) - self.pos_tags = ["UNK", "A", "C", "D", "I", "N", "P", "T", "V", "DE", "SHI"] + default_asset_dir = os.path.normpath(os.path.join(os.path.dirname(__file__), "..", "G2PWModel")) + candidate_asset_dirs = [self.model_dir, default_asset_dir] + self.bopomofo_convert_dict = _load_json_from_candidates( + "bopomofo_to_pinyin_wo_tune_dict.json", candidate_asset_dirs + ) + self.char_bopomofo_dict = _load_json_from_candidates("char_bopomofo_dict.json", candidate_asset_dirs) - with open(os.path.join(uncompress_path, "bopomofo_to_pinyin_wo_tune_dict.json"), "r", encoding="utf-8") as fr: - self.bopomofo_convert_dict = json.load(fr) self.style_convert_func = { "bopomofo": lambda x: x, "pinyin": self._convert_bopomofo_to_pinyin, }[style] - with open(os.path.join(uncompress_path, "char_bopomofo_dict.json"), "r", encoding="utf-8") as fr: - self.char_bopomofo_dict = json.load(fr) - if self.enable_opencc: self.cc = OpenCC("s2tw") + self.enable_sentence_dedup = os.getenv("g2pw_sentence_dedup", "true").strip().lower() in { + "1", + "true", + "yes", + "y", + "on", + } + # 聚焦到多音字附近上下文,默认左右各16字;设为0表示关闭裁剪(整句)。 + self.polyphonic_context_chars = max(0, int(os.getenv("g2pw_polyphonic_context_chars", "16"))) def _convert_bopomofo_to_pinyin(self, bopomofo: str) -> str: tone = bopomofo[-1] @@ -181,9 +196,8 @@ class G2PWOnnxConverter: component = self.bopomofo_convert_dict.get(bopomofo[:-1]) if component: return component + tone - else: - print(f'Warning: "{bopomofo}" cannot convert to pinyin') - return None + print(f'Warning: "{bopomofo}" cannot convert to pinyin') + return None def __call__(self, sentences: List[str]) -> List[List[str]]: if isinstance(sentences, str): @@ -197,51 +211,147 @@ class G2PWOnnxConverter: translated_sentences.append(translated_sent) sentences = translated_sentences - texts, query_ids, sent_ids, partial_results = self._prepare_data(sentences=sentences) + texts, model_query_ids, result_query_ids, sent_ids, partial_results = self._prepare_data(sentences=sentences) if len(texts) == 0: - # sentences no polyphonic words return partial_results - onnx_input = prepare_onnx_input( + model_input = prepare_onnx_input( tokenizer=self.tokenizer, labels=self.labels, char2phonemes=self.char2phonemes, chars=self.chars, texts=texts, - query_ids=query_ids, + query_ids=model_query_ids, use_mask=self.config.use_mask, window_size=None, + char2id=self.char2id, + char_phoneme_masks=self.char_phoneme_masks, ) - preds, confidences = predict(session=self.session_g2pW, onnx_input=onnx_input, labels=self.labels) + if not model_input: + return partial_results + + if self.enable_sentence_dedup: + preds, _confidences = self._predict_with_sentence_dedup(model_input=model_input, texts=texts) + else: + preds, _confidences = self._predict(model_input=model_input) + if self.config.use_char_phoneme: preds = [pred.split(" ")[1] for pred in preds] results = partial_results - for sent_id, query_id, pred in zip(sent_ids, query_ids, preds): + for sent_id, query_id, pred in zip(sent_ids, result_query_ids, preds): results[sent_id][query_id] = self.style_convert_func(pred) return results - def _prepare_data(self, sentences: List[str]) -> Tuple[List[str], List[int], List[int], List[List[str]]]: - texts, query_ids, sent_ids, partial_results = [], [], [], [] + def _prepare_data( + self, sentences: List[str] + ) -> Tuple[List[str], List[int], List[int], List[int], List[List[str]]]: + texts, model_query_ids, result_query_ids, sent_ids, partial_results = [], [], [], [], [] for sent_id, sent in enumerate(sentences): - # pypinyin works well for Simplified Chinese than Traditional Chinese sent_s = tranditional_to_simplified(sent) pypinyin_result = pinyin(sent_s, neutral_tone_with_five=True, style=Style.TONE3) partial_result = [None] * len(sent) + polyphonic_indices: List[int] = [] for i, char in enumerate(sent): if char in self.polyphonic_chars_new: - texts.append(sent) - query_ids.append(i) - sent_ids.append(sent_id) + polyphonic_indices.append(i) elif char in self.monophonic_chars_dict: partial_result[i] = self.style_convert_func(self.monophonic_chars_dict[char]) elif char in self.char_bopomofo_dict: partial_result[i] = pypinyin_result[i][0] - # partial_result[i] = self.style_convert_func(self.char_bopomofo_dict[char][0]) else: partial_result[i] = pypinyin_result[i][0] + if polyphonic_indices: + if self.polyphonic_context_chars > 0: + left = max(0, polyphonic_indices[0] - self.polyphonic_context_chars) + right = min(len(sent), polyphonic_indices[-1] + self.polyphonic_context_chars + 1) + sent_for_predict = sent[left:right] + query_offset = left + else: + sent_for_predict = sent + query_offset = 0 + + for index in polyphonic_indices: + texts.append(sent_for_predict) + model_query_ids.append(index - query_offset) + result_query_ids.append(index) + sent_ids.append(sent_id) + partial_results.append(partial_result) - return texts, query_ids, sent_ids, partial_results + return texts, model_query_ids, result_query_ids, sent_ids, partial_results + + def _predict(self, model_input: Dict[str, Any]) -> Tuple[List[str], List[float]]: + raise NotImplementedError + + def _predict_with_sentence_dedup( + self, model_input: Dict[str, Any], texts: List[str] + ) -> Tuple[List[str], List[float]]: + if len(texts) <= 1: + return self._predict(model_input=model_input) + + grouped_indices: Dict[str, List[int]] = {} + for idx, text in enumerate(texts): + grouped_indices.setdefault(text, []).append(idx) + + if all(len(indices) == 1 for indices in grouped_indices.values()): + return self._predict(model_input=model_input) + + preds: List[str] = [""] * len(texts) + confidences: List[float] = [0.0] * len(texts) + for indices in grouped_indices.values(): + group_input = {name: value[indices] for name, value in model_input.items()} + if len(indices) > 1: + for name in ("input_ids", "token_type_ids", "attention_masks"): + group_input[name] = group_input[name][:1] + + group_preds, group_confidences = self._predict(model_input=group_input) + for output_idx, pred, confidence in zip(indices, group_preds, group_confidences): + preds[output_idx] = pred + confidences[output_idx] = confidence + + return preds, confidences + + +class G2PWOnnxConverter(_G2PWBaseOnnxConverter): + def __init__( + self, + model_dir: str = "G2PWModel/", + style: str = "bopomofo", + model_source: str = None, + enable_non_tradional_chinese: bool = False, + ): + super().__init__( + model_dir=model_dir, + style=style, + model_source=model_source, + enable_non_tradional_chinese=enable_non_tradional_chinese, + ) + + sess_options = onnxruntime.SessionOptions() + sess_options.graph_optimization_level = onnxruntime.GraphOptimizationLevel.ORT_ENABLE_ALL + sess_options.execution_mode = onnxruntime.ExecutionMode.ORT_SEQUENTIAL + sess_options.intra_op_num_threads = 2 + + onnx_path = _find_first_existing_file( + os.path.join(self.model_dir, "g2pW.onnx"), + os.path.join(self.model_dir, "g2pw.onnx"), + ) + + if "CUDAExecutionProvider" in onnxruntime.get_available_providers(): + self.session_g2pw = onnxruntime.InferenceSession( + onnx_path, + sess_options=sess_options, + providers=["CUDAExecutionProvider", "CPUExecutionProvider"], + ) + else: + self.session_g2pw = onnxruntime.InferenceSession( + onnx_path, + sess_options=sess_options, + providers=["CPUExecutionProvider"], + ) + + def _predict(self, model_input: Dict[str, Any]) -> Tuple[List[str], List[float]]: + return predict(session=self.session_g2pw, onnx_input=model_input, labels=self.labels)