mirror of
https://github.com/RVC-Boss/GPT-SoVITS.git
synced 2026-04-29 21:00:42 +08:00
优化 G2PW 的推理输入构造与多音字处理流程,减少重复计算,降低长句场景下的推理开销 (#2763)
* Enhance G2P processing by implementing batch input handling in _g2p function, improving efficiency. Update prepare_onnx_input to utilize caching for tokenization and add optional parameters for character ID mapping and phoneme masks. Refactor G2PWOnnxConverter to streamline model loading and configuration management. * Enhance G2PW model input handling by introducing polyphonic context character support and updating the data preparation method to return additional query IDs. This improves the processing of polyphonic characters in sentences.
This commit is contained in:
parent
2d9193b0d3
commit
ba8de9b760
@ -180,10 +180,15 @@ def _merge_erhua(initials: list[str], finals: list[str], word: str, pos: str) ->
|
||||
def _g2p(segments):
|
||||
phones_list = []
|
||||
word2ph = []
|
||||
for seg in segments:
|
||||
g2pw_batch_results = []
|
||||
g2pw_batch_cursor = 0
|
||||
processed_segments = [re.sub("[a-zA-Z]+", "", seg) for seg in segments]
|
||||
if is_g2pw:
|
||||
batch_inputs = [seg for seg in processed_segments if seg]
|
||||
g2pw_batch_results = g2pw._g2pw(batch_inputs) if batch_inputs else []
|
||||
|
||||
for seg in processed_segments:
|
||||
pinyins = []
|
||||
# Replace all English words in the sentence
|
||||
seg = re.sub("[a-zA-Z]+", "", seg)
|
||||
seg_cut = psg.lcut(seg)
|
||||
seg_cut = tone_modifier.pre_merge_for_modify(seg_cut)
|
||||
initials = []
|
||||
@ -204,8 +209,10 @@ def _g2p(segments):
|
||||
finals = sum(finals, [])
|
||||
print("pypinyin结果", initials, finals)
|
||||
else:
|
||||
# g2pw采用整句推理
|
||||
pinyins = g2pw.lazy_pinyin(seg, neutral_tone_with_five=True, style=Style.TONE3)
|
||||
# g2pw采用整句推理(批量推理,逐句取结果)
|
||||
if seg:
|
||||
pinyins = g2pw_batch_results[g2pw_batch_cursor]
|
||||
g2pw_batch_cursor += 1
|
||||
|
||||
pre_word_length = 0
|
||||
for word, pos in seg_cut:
|
||||
|
||||
@ -18,6 +18,7 @@ Credits
|
||||
|
||||
from typing import Dict
|
||||
from typing import List
|
||||
from typing import Optional
|
||||
from typing import Tuple
|
||||
|
||||
import numpy as np
|
||||
@ -37,6 +38,8 @@ def prepare_onnx_input(
|
||||
use_mask: bool = False,
|
||||
window_size: int = None,
|
||||
max_len: int = 512,
|
||||
char2id: Optional[Dict[str, int]] = None,
|
||||
char_phoneme_masks: Optional[Dict[str, List[int]]] = None,
|
||||
) -> Dict[str, np.array]:
|
||||
if window_size is not None:
|
||||
truncated_texts, truncated_query_ids = _truncate_texts(
|
||||
@ -48,33 +51,88 @@ def prepare_onnx_input(
|
||||
phoneme_masks = []
|
||||
char_ids = []
|
||||
position_ids = []
|
||||
tokenized_cache = {}
|
||||
|
||||
if char2id is None:
|
||||
char2id = {char: idx for idx, char in enumerate(chars)}
|
||||
if use_mask:
|
||||
if char_phoneme_masks is None:
|
||||
char_phoneme_masks = {
|
||||
char: [1 if i in char2phonemes[char] else 0 for i in range(len(labels))]
|
||||
for char in char2phonemes
|
||||
}
|
||||
else:
|
||||
full_phoneme_mask = [1] * len(labels)
|
||||
|
||||
for idx in range(len(texts)):
|
||||
text = (truncated_texts if window_size else texts)[idx].lower()
|
||||
query_id = (truncated_query_ids if window_size else query_ids)[idx]
|
||||
|
||||
try:
|
||||
tokens, text2token, token2text = tokenize_and_map(tokenizer=tokenizer, text=text)
|
||||
except Exception:
|
||||
print(f'warning: text "{text}" is invalid')
|
||||
return {}
|
||||
cached = tokenized_cache.get(text)
|
||||
if cached is None:
|
||||
try:
|
||||
tokens, text2token, token2text = tokenize_and_map(tokenizer=tokenizer, text=text)
|
||||
except Exception:
|
||||
print(f'warning: text "{text}" is invalid')
|
||||
return {}
|
||||
|
||||
text, query_id, tokens, text2token, token2text = _truncate(
|
||||
max_len=max_len, text=text, query_id=query_id, tokens=tokens, text2token=text2token, token2text=token2text
|
||||
)
|
||||
if len(tokens) <= max_len - 2:
|
||||
processed_tokens = ["[CLS]"] + tokens + ["[SEP]"]
|
||||
shared_input_id = list(np.array(tokenizer.convert_tokens_to_ids(processed_tokens)))
|
||||
shared_token_type_id = list(np.zeros((len(processed_tokens),), dtype=int))
|
||||
shared_attention_mask = list(np.ones((len(processed_tokens),), dtype=int))
|
||||
cached = {
|
||||
"is_short": True,
|
||||
"tokens": tokens,
|
||||
"text2token": text2token,
|
||||
"token2text": token2text,
|
||||
"input_id": shared_input_id,
|
||||
"token_type_id": shared_token_type_id,
|
||||
"attention_mask": shared_attention_mask,
|
||||
}
|
||||
else:
|
||||
cached = {
|
||||
"is_short": False,
|
||||
"tokens": tokens,
|
||||
"text2token": text2token,
|
||||
"token2text": token2text,
|
||||
}
|
||||
tokenized_cache[text] = cached
|
||||
|
||||
processed_tokens = ["[CLS]"] + tokens + ["[SEP]"]
|
||||
if cached["is_short"]:
|
||||
text_for_query = text
|
||||
query_id_for_query = query_id
|
||||
text2token_for_query = cached["text2token"]
|
||||
input_id = cached["input_id"]
|
||||
token_type_id = cached["token_type_id"]
|
||||
attention_mask = cached["attention_mask"]
|
||||
else:
|
||||
(
|
||||
text_for_query,
|
||||
query_id_for_query,
|
||||
tokens_for_query,
|
||||
text2token_for_query,
|
||||
_token2text_for_query,
|
||||
) = _truncate(
|
||||
max_len=max_len,
|
||||
text=text,
|
||||
query_id=query_id,
|
||||
tokens=cached["tokens"],
|
||||
text2token=cached["text2token"],
|
||||
token2text=cached["token2text"],
|
||||
)
|
||||
processed_tokens = ["[CLS]"] + tokens_for_query + ["[SEP]"]
|
||||
input_id = list(np.array(tokenizer.convert_tokens_to_ids(processed_tokens)))
|
||||
token_type_id = list(np.zeros((len(processed_tokens),), dtype=int))
|
||||
attention_mask = list(np.ones((len(processed_tokens),), dtype=int))
|
||||
|
||||
input_id = list(np.array(tokenizer.convert_tokens_to_ids(processed_tokens)))
|
||||
token_type_id = list(np.zeros((len(processed_tokens),), dtype=int))
|
||||
attention_mask = list(np.ones((len(processed_tokens),), dtype=int))
|
||||
|
||||
query_char = text[query_id]
|
||||
phoneme_mask = (
|
||||
[1 if i in char2phonemes[query_char] else 0 for i in range(len(labels))] if use_mask else [1] * len(labels)
|
||||
)
|
||||
char_id = chars.index(query_char)
|
||||
position_id = text2token[query_id] + 1 # [CLS] token locate at first place
|
||||
query_char = text_for_query[query_id_for_query]
|
||||
if use_mask:
|
||||
phoneme_mask = char_phoneme_masks[query_char]
|
||||
else:
|
||||
phoneme_mask = full_phoneme_mask
|
||||
char_id = char2id[query_char]
|
||||
position_id = text2token_for_query[query_id_for_query] + 1 # [CLS] token locate at first place
|
||||
|
||||
input_ids.append(input_id)
|
||||
token_type_ids.append(token_type_id)
|
||||
@ -83,10 +141,15 @@ def prepare_onnx_input(
|
||||
char_ids.append(char_id)
|
||||
position_ids.append(position_id)
|
||||
|
||||
max_token_length = max(len(seq) for seq in input_ids)
|
||||
|
||||
def _pad_sequences(sequences, pad_value=0):
|
||||
return [seq + [pad_value] * (max_token_length - len(seq)) for seq in sequences]
|
||||
|
||||
outputs = {
|
||||
"input_ids": np.array(input_ids).astype(np.int64),
|
||||
"token_type_ids": np.array(token_type_ids).astype(np.int64),
|
||||
"attention_masks": np.array(attention_masks).astype(np.int64),
|
||||
"input_ids": np.array(_pad_sequences(input_ids, pad_value=0)).astype(np.int64),
|
||||
"token_type_ids": np.array(_pad_sequences(token_type_ids, pad_value=0)).astype(np.int64),
|
||||
"attention_masks": np.array(_pad_sequences(attention_masks, pad_value=0)).astype(np.int64),
|
||||
"phoneme_masks": np.array(phoneme_masks).astype(np.float32),
|
||||
"char_ids": np.array(char_ids).astype(np.int64),
|
||||
"position_ids": np.array(position_ids).astype(np.int64),
|
||||
|
||||
@ -10,7 +10,6 @@ from typing import Any, Dict, List, Tuple
|
||||
import numpy as np
|
||||
import onnxruntime
|
||||
import requests
|
||||
import torch
|
||||
from opencc import OpenCC
|
||||
from pypinyin import Style, pinyin
|
||||
from transformers.models.auto.tokenization_auto import AutoTokenizer
|
||||
@ -22,9 +21,8 @@ from .utils import load_config
|
||||
onnxruntime.set_default_logger_severity(3)
|
||||
try:
|
||||
onnxruntime.preload_dlls()
|
||||
except:
|
||||
except Exception:
|
||||
pass
|
||||
# traceback.print_exc()
|
||||
warnings.filterwarnings("ignore")
|
||||
|
||||
model_version = "1.1"
|
||||
@ -55,6 +53,24 @@ def predict(session, onnx_input: Dict[str, Any], labels: List[str]) -> Tuple[Lis
|
||||
return all_preds, all_confidences
|
||||
|
||||
|
||||
def _load_json_from_candidates(filename: str, candidate_dirs: List[str]) -> Dict[str, Any]:
|
||||
for candidate_dir in candidate_dirs:
|
||||
if not candidate_dir:
|
||||
continue
|
||||
json_path = os.path.join(candidate_dir, filename)
|
||||
if os.path.exists(json_path):
|
||||
with open(json_path, "r", encoding="utf-8") as fr:
|
||||
return json.load(fr)
|
||||
raise FileNotFoundError(f"Cannot locate {filename} in candidate dirs: {candidate_dirs}")
|
||||
|
||||
|
||||
def _find_first_existing_file(*paths: str) -> str:
|
||||
for path in paths:
|
||||
if path and os.path.exists(path):
|
||||
return path
|
||||
raise FileNotFoundError(f"Files not found: {paths}")
|
||||
|
||||
|
||||
def download_and_decompress(model_dir: str = "G2PWModel/"):
|
||||
if not os.path.exists(model_dir):
|
||||
parent_directory = os.path.dirname(model_dir)
|
||||
@ -62,7 +78,7 @@ def download_and_decompress(model_dir: str = "G2PWModel/"):
|
||||
extract_dir = os.path.join(parent_directory, "G2PWModel_1.1")
|
||||
extract_dir_new = os.path.join(parent_directory, "G2PWModel")
|
||||
print("Downloading g2pw model...")
|
||||
modelscope_url = "https://www.modelscope.cn/models/kamiorinn/g2pw/resolve/master/G2PWModel_1.1.zip" # "https://paddlespeech.cdn.bcebos.com/Parakeet/released_models/g2p/G2PWModel_1.1.zip"
|
||||
modelscope_url = "https://www.modelscope.cn/models/kamiorinn/g2pw/resolve/master/G2PWModel_1.1.zip"
|
||||
with requests.get(modelscope_url, stream=True) as r:
|
||||
r.raise_for_status()
|
||||
with open(zip_dir, "wb") as f:
|
||||
@ -79,7 +95,7 @@ def download_and_decompress(model_dir: str = "G2PWModel/"):
|
||||
return model_dir
|
||||
|
||||
|
||||
class G2PWOnnxConverter:
|
||||
class _G2PWBaseOnnxConverter:
|
||||
def __init__(
|
||||
self,
|
||||
model_dir: str = "G2PWModel/",
|
||||
@ -87,33 +103,16 @@ class G2PWOnnxConverter:
|
||||
model_source: str = None,
|
||||
enable_non_tradional_chinese: bool = False,
|
||||
):
|
||||
uncompress_path = download_and_decompress(model_dir)
|
||||
|
||||
sess_options = onnxruntime.SessionOptions()
|
||||
sess_options.graph_optimization_level = onnxruntime.GraphOptimizationLevel.ORT_ENABLE_ALL
|
||||
sess_options.execution_mode = onnxruntime.ExecutionMode.ORT_SEQUENTIAL
|
||||
sess_options.intra_op_num_threads = 2 if torch.cuda.is_available() else 0
|
||||
if "CUDAExecutionProvider" in onnxruntime.get_available_providers():
|
||||
self.session_g2pW = onnxruntime.InferenceSession(
|
||||
os.path.join(uncompress_path, "g2pW.onnx"),
|
||||
sess_options=sess_options,
|
||||
providers=["CUDAExecutionProvider", "CPUExecutionProvider"],
|
||||
)
|
||||
else:
|
||||
self.session_g2pW = onnxruntime.InferenceSession(
|
||||
os.path.join(uncompress_path, "g2pW.onnx"),
|
||||
sess_options=sess_options,
|
||||
providers=["CPUExecutionProvider"],
|
||||
)
|
||||
self.config = load_config(config_path=os.path.join(uncompress_path, "config.py"), use_default=True)
|
||||
self.model_dir = download_and_decompress(model_dir)
|
||||
self.config = load_config(config_path=os.path.join(self.model_dir, "config.py"), use_default=True)
|
||||
|
||||
self.model_source = model_source if model_source else self.config.model_source
|
||||
self.enable_opencc = enable_non_tradional_chinese
|
||||
|
||||
self.tokenizer = AutoTokenizer.from_pretrained(self.model_source)
|
||||
|
||||
polyphonic_chars_path = os.path.join(uncompress_path, "POLYPHONIC_CHARS.txt")
|
||||
monophonic_chars_path = os.path.join(uncompress_path, "MONOPHONIC_CHARS.txt")
|
||||
polyphonic_chars_path = os.path.join(self.model_dir, "POLYPHONIC_CHARS.txt")
|
||||
monophonic_chars_path = os.path.join(self.model_dir, "MONOPHONIC_CHARS.txt")
|
||||
|
||||
self.polyphonic_chars = [
|
||||
line.split("\t") for line in open(polyphonic_chars_path, encoding="utf-8").read().strip().split("\n")
|
||||
]
|
||||
@ -149,31 +148,47 @@ class G2PWOnnxConverter:
|
||||
)
|
||||
|
||||
self.chars = sorted(list(self.char2phonemes.keys()))
|
||||
self.char2id = {char: idx for idx, char in enumerate(self.chars)}
|
||||
self.char_phoneme_masks = (
|
||||
{
|
||||
char: [1 if i in self.char2phonemes[char] else 0 for i in range(len(self.labels))]
|
||||
for char in self.char2phonemes
|
||||
}
|
||||
if self.config.use_mask
|
||||
else None
|
||||
)
|
||||
|
||||
self.polyphonic_chars_new = set(self.chars)
|
||||
for char in self.non_polyphonic:
|
||||
if char in self.polyphonic_chars_new:
|
||||
self.polyphonic_chars_new.remove(char)
|
||||
self.polyphonic_chars_new.discard(char)
|
||||
|
||||
self.monophonic_chars_dict = {char: phoneme for char, phoneme in self.monophonic_chars}
|
||||
for char in self.non_monophonic:
|
||||
if char in self.monophonic_chars_dict:
|
||||
self.monophonic_chars_dict.pop(char)
|
||||
self.monophonic_chars_dict.pop(char, None)
|
||||
|
||||
self.pos_tags = ["UNK", "A", "C", "D", "I", "N", "P", "T", "V", "DE", "SHI"]
|
||||
default_asset_dir = os.path.normpath(os.path.join(os.path.dirname(__file__), "..", "G2PWModel"))
|
||||
candidate_asset_dirs = [self.model_dir, default_asset_dir]
|
||||
self.bopomofo_convert_dict = _load_json_from_candidates(
|
||||
"bopomofo_to_pinyin_wo_tune_dict.json", candidate_asset_dirs
|
||||
)
|
||||
self.char_bopomofo_dict = _load_json_from_candidates("char_bopomofo_dict.json", candidate_asset_dirs)
|
||||
|
||||
with open(os.path.join(uncompress_path, "bopomofo_to_pinyin_wo_tune_dict.json"), "r", encoding="utf-8") as fr:
|
||||
self.bopomofo_convert_dict = json.load(fr)
|
||||
self.style_convert_func = {
|
||||
"bopomofo": lambda x: x,
|
||||
"pinyin": self._convert_bopomofo_to_pinyin,
|
||||
}[style]
|
||||
|
||||
with open(os.path.join(uncompress_path, "char_bopomofo_dict.json"), "r", encoding="utf-8") as fr:
|
||||
self.char_bopomofo_dict = json.load(fr)
|
||||
|
||||
if self.enable_opencc:
|
||||
self.cc = OpenCC("s2tw")
|
||||
self.enable_sentence_dedup = os.getenv("g2pw_sentence_dedup", "true").strip().lower() in {
|
||||
"1",
|
||||
"true",
|
||||
"yes",
|
||||
"y",
|
||||
"on",
|
||||
}
|
||||
# 聚焦到多音字附近上下文,默认左右各16字;设为0表示关闭裁剪(整句)。
|
||||
self.polyphonic_context_chars = max(0, int(os.getenv("g2pw_polyphonic_context_chars", "16")))
|
||||
|
||||
def _convert_bopomofo_to_pinyin(self, bopomofo: str) -> str:
|
||||
tone = bopomofo[-1]
|
||||
@ -181,9 +196,8 @@ class G2PWOnnxConverter:
|
||||
component = self.bopomofo_convert_dict.get(bopomofo[:-1])
|
||||
if component:
|
||||
return component + tone
|
||||
else:
|
||||
print(f'Warning: "{bopomofo}" cannot convert to pinyin')
|
||||
return None
|
||||
print(f'Warning: "{bopomofo}" cannot convert to pinyin')
|
||||
return None
|
||||
|
||||
def __call__(self, sentences: List[str]) -> List[List[str]]:
|
||||
if isinstance(sentences, str):
|
||||
@ -197,51 +211,147 @@ class G2PWOnnxConverter:
|
||||
translated_sentences.append(translated_sent)
|
||||
sentences = translated_sentences
|
||||
|
||||
texts, query_ids, sent_ids, partial_results = self._prepare_data(sentences=sentences)
|
||||
texts, model_query_ids, result_query_ids, sent_ids, partial_results = self._prepare_data(sentences=sentences)
|
||||
if len(texts) == 0:
|
||||
# sentences no polyphonic words
|
||||
return partial_results
|
||||
|
||||
onnx_input = prepare_onnx_input(
|
||||
model_input = prepare_onnx_input(
|
||||
tokenizer=self.tokenizer,
|
||||
labels=self.labels,
|
||||
char2phonemes=self.char2phonemes,
|
||||
chars=self.chars,
|
||||
texts=texts,
|
||||
query_ids=query_ids,
|
||||
query_ids=model_query_ids,
|
||||
use_mask=self.config.use_mask,
|
||||
window_size=None,
|
||||
char2id=self.char2id,
|
||||
char_phoneme_masks=self.char_phoneme_masks,
|
||||
)
|
||||
|
||||
preds, confidences = predict(session=self.session_g2pW, onnx_input=onnx_input, labels=self.labels)
|
||||
if not model_input:
|
||||
return partial_results
|
||||
|
||||
if self.enable_sentence_dedup:
|
||||
preds, _confidences = self._predict_with_sentence_dedup(model_input=model_input, texts=texts)
|
||||
else:
|
||||
preds, _confidences = self._predict(model_input=model_input)
|
||||
|
||||
if self.config.use_char_phoneme:
|
||||
preds = [pred.split(" ")[1] for pred in preds]
|
||||
|
||||
results = partial_results
|
||||
for sent_id, query_id, pred in zip(sent_ids, query_ids, preds):
|
||||
for sent_id, query_id, pred in zip(sent_ids, result_query_ids, preds):
|
||||
results[sent_id][query_id] = self.style_convert_func(pred)
|
||||
|
||||
return results
|
||||
|
||||
def _prepare_data(self, sentences: List[str]) -> Tuple[List[str], List[int], List[int], List[List[str]]]:
|
||||
texts, query_ids, sent_ids, partial_results = [], [], [], []
|
||||
def _prepare_data(
|
||||
self, sentences: List[str]
|
||||
) -> Tuple[List[str], List[int], List[int], List[int], List[List[str]]]:
|
||||
texts, model_query_ids, result_query_ids, sent_ids, partial_results = [], [], [], [], []
|
||||
for sent_id, sent in enumerate(sentences):
|
||||
# pypinyin works well for Simplified Chinese than Traditional Chinese
|
||||
sent_s = tranditional_to_simplified(sent)
|
||||
pypinyin_result = pinyin(sent_s, neutral_tone_with_five=True, style=Style.TONE3)
|
||||
partial_result = [None] * len(sent)
|
||||
polyphonic_indices: List[int] = []
|
||||
for i, char in enumerate(sent):
|
||||
if char in self.polyphonic_chars_new:
|
||||
texts.append(sent)
|
||||
query_ids.append(i)
|
||||
sent_ids.append(sent_id)
|
||||
polyphonic_indices.append(i)
|
||||
elif char in self.monophonic_chars_dict:
|
||||
partial_result[i] = self.style_convert_func(self.monophonic_chars_dict[char])
|
||||
elif char in self.char_bopomofo_dict:
|
||||
partial_result[i] = pypinyin_result[i][0]
|
||||
# partial_result[i] = self.style_convert_func(self.char_bopomofo_dict[char][0])
|
||||
else:
|
||||
partial_result[i] = pypinyin_result[i][0]
|
||||
|
||||
if polyphonic_indices:
|
||||
if self.polyphonic_context_chars > 0:
|
||||
left = max(0, polyphonic_indices[0] - self.polyphonic_context_chars)
|
||||
right = min(len(sent), polyphonic_indices[-1] + self.polyphonic_context_chars + 1)
|
||||
sent_for_predict = sent[left:right]
|
||||
query_offset = left
|
||||
else:
|
||||
sent_for_predict = sent
|
||||
query_offset = 0
|
||||
|
||||
for index in polyphonic_indices:
|
||||
texts.append(sent_for_predict)
|
||||
model_query_ids.append(index - query_offset)
|
||||
result_query_ids.append(index)
|
||||
sent_ids.append(sent_id)
|
||||
|
||||
partial_results.append(partial_result)
|
||||
return texts, query_ids, sent_ids, partial_results
|
||||
return texts, model_query_ids, result_query_ids, sent_ids, partial_results
|
||||
|
||||
def _predict(self, model_input: Dict[str, Any]) -> Tuple[List[str], List[float]]:
|
||||
raise NotImplementedError
|
||||
|
||||
def _predict_with_sentence_dedup(
|
||||
self, model_input: Dict[str, Any], texts: List[str]
|
||||
) -> Tuple[List[str], List[float]]:
|
||||
if len(texts) <= 1:
|
||||
return self._predict(model_input=model_input)
|
||||
|
||||
grouped_indices: Dict[str, List[int]] = {}
|
||||
for idx, text in enumerate(texts):
|
||||
grouped_indices.setdefault(text, []).append(idx)
|
||||
|
||||
if all(len(indices) == 1 for indices in grouped_indices.values()):
|
||||
return self._predict(model_input=model_input)
|
||||
|
||||
preds: List[str] = [""] * len(texts)
|
||||
confidences: List[float] = [0.0] * len(texts)
|
||||
for indices in grouped_indices.values():
|
||||
group_input = {name: value[indices] for name, value in model_input.items()}
|
||||
if len(indices) > 1:
|
||||
for name in ("input_ids", "token_type_ids", "attention_masks"):
|
||||
group_input[name] = group_input[name][:1]
|
||||
|
||||
group_preds, group_confidences = self._predict(model_input=group_input)
|
||||
for output_idx, pred, confidence in zip(indices, group_preds, group_confidences):
|
||||
preds[output_idx] = pred
|
||||
confidences[output_idx] = confidence
|
||||
|
||||
return preds, confidences
|
||||
|
||||
|
||||
class G2PWOnnxConverter(_G2PWBaseOnnxConverter):
|
||||
def __init__(
|
||||
self,
|
||||
model_dir: str = "G2PWModel/",
|
||||
style: str = "bopomofo",
|
||||
model_source: str = None,
|
||||
enable_non_tradional_chinese: bool = False,
|
||||
):
|
||||
super().__init__(
|
||||
model_dir=model_dir,
|
||||
style=style,
|
||||
model_source=model_source,
|
||||
enable_non_tradional_chinese=enable_non_tradional_chinese,
|
||||
)
|
||||
|
||||
sess_options = onnxruntime.SessionOptions()
|
||||
sess_options.graph_optimization_level = onnxruntime.GraphOptimizationLevel.ORT_ENABLE_ALL
|
||||
sess_options.execution_mode = onnxruntime.ExecutionMode.ORT_SEQUENTIAL
|
||||
sess_options.intra_op_num_threads = 2
|
||||
|
||||
onnx_path = _find_first_existing_file(
|
||||
os.path.join(self.model_dir, "g2pW.onnx"),
|
||||
os.path.join(self.model_dir, "g2pw.onnx"),
|
||||
)
|
||||
|
||||
if "CUDAExecutionProvider" in onnxruntime.get_available_providers():
|
||||
self.session_g2pw = onnxruntime.InferenceSession(
|
||||
onnx_path,
|
||||
sess_options=sess_options,
|
||||
providers=["CUDAExecutionProvider", "CPUExecutionProvider"],
|
||||
)
|
||||
else:
|
||||
self.session_g2pw = onnxruntime.InferenceSession(
|
||||
onnx_path,
|
||||
sess_options=sess_options,
|
||||
providers=["CPUExecutionProvider"],
|
||||
)
|
||||
|
||||
def _predict(self, model_input: Dict[str, Any]) -> Tuple[List[str], List[float]]:
|
||||
return predict(session=self.session_g2pw, onnx_input=model_input, labels=self.labels)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user