Enhance G2P processing by implementing batch input handling in _g2p function, improving efficiency. Update prepare_onnx_input to utilize caching for tokenization and add optional parameters for character ID mapping and phoneme masks. Refactor G2PWOnnxConverter to streamline model loading and configuration management.

This commit is contained in:
baicai-1145 2026-03-07 05:47:22 +08:00
parent 2d9193b0d3
commit 800acd45ff
3 changed files with 233 additions and 72 deletions

View File

@ -180,10 +180,15 @@ def _merge_erhua(initials: list[str], finals: list[str], word: str, pos: str) ->
def _g2p(segments):
phones_list = []
word2ph = []
for seg in segments:
g2pw_batch_results = []
g2pw_batch_cursor = 0
processed_segments = [re.sub("[a-zA-Z]+", "", seg) for seg in segments]
if is_g2pw:
batch_inputs = [seg for seg in processed_segments if seg]
g2pw_batch_results = g2pw._g2pw(batch_inputs) if batch_inputs else []
for seg in processed_segments:
pinyins = []
# Replace all English words in the sentence
seg = re.sub("[a-zA-Z]+", "", seg)
seg_cut = psg.lcut(seg)
seg_cut = tone_modifier.pre_merge_for_modify(seg_cut)
initials = []
@ -204,8 +209,10 @@ def _g2p(segments):
finals = sum(finals, [])
print("pypinyin结果", initials, finals)
else:
# g2pw采用整句推理
pinyins = g2pw.lazy_pinyin(seg, neutral_tone_with_five=True, style=Style.TONE3)
# g2pw采用整句推理批量推理逐句取结果
if seg:
pinyins = g2pw_batch_results[g2pw_batch_cursor]
g2pw_batch_cursor += 1
pre_word_length = 0
for word, pos in seg_cut:

View File

@ -18,6 +18,7 @@ Credits
from typing import Dict
from typing import List
from typing import Optional
from typing import Tuple
import numpy as np
@ -37,6 +38,8 @@ def prepare_onnx_input(
use_mask: bool = False,
window_size: int = None,
max_len: int = 512,
char2id: Optional[Dict[str, int]] = None,
char_phoneme_masks: Optional[Dict[str, List[int]]] = None,
) -> Dict[str, np.array]:
if window_size is not None:
truncated_texts, truncated_query_ids = _truncate_texts(
@ -48,33 +51,88 @@ def prepare_onnx_input(
phoneme_masks = []
char_ids = []
position_ids = []
tokenized_cache = {}
if char2id is None:
char2id = {char: idx for idx, char in enumerate(chars)}
if use_mask:
if char_phoneme_masks is None:
char_phoneme_masks = {
char: [1 if i in char2phonemes[char] else 0 for i in range(len(labels))]
for char in char2phonemes
}
else:
full_phoneme_mask = [1] * len(labels)
for idx in range(len(texts)):
text = (truncated_texts if window_size else texts)[idx].lower()
query_id = (truncated_query_ids if window_size else query_ids)[idx]
try:
tokens, text2token, token2text = tokenize_and_map(tokenizer=tokenizer, text=text)
except Exception:
print(f'warning: text "{text}" is invalid')
return {}
cached = tokenized_cache.get(text)
if cached is None:
try:
tokens, text2token, token2text = tokenize_and_map(tokenizer=tokenizer, text=text)
except Exception:
print(f'warning: text "{text}" is invalid')
return {}
text, query_id, tokens, text2token, token2text = _truncate(
max_len=max_len, text=text, query_id=query_id, tokens=tokens, text2token=text2token, token2text=token2text
)
if len(tokens) <= max_len - 2:
processed_tokens = ["[CLS]"] + tokens + ["[SEP]"]
shared_input_id = list(np.array(tokenizer.convert_tokens_to_ids(processed_tokens)))
shared_token_type_id = list(np.zeros((len(processed_tokens),), dtype=int))
shared_attention_mask = list(np.ones((len(processed_tokens),), dtype=int))
cached = {
"is_short": True,
"tokens": tokens,
"text2token": text2token,
"token2text": token2text,
"input_id": shared_input_id,
"token_type_id": shared_token_type_id,
"attention_mask": shared_attention_mask,
}
else:
cached = {
"is_short": False,
"tokens": tokens,
"text2token": text2token,
"token2text": token2text,
}
tokenized_cache[text] = cached
processed_tokens = ["[CLS]"] + tokens + ["[SEP]"]
if cached["is_short"]:
text_for_query = text
query_id_for_query = query_id
text2token_for_query = cached["text2token"]
input_id = cached["input_id"]
token_type_id = cached["token_type_id"]
attention_mask = cached["attention_mask"]
else:
(
text_for_query,
query_id_for_query,
tokens_for_query,
text2token_for_query,
_token2text_for_query,
) = _truncate(
max_len=max_len,
text=text,
query_id=query_id,
tokens=cached["tokens"],
text2token=cached["text2token"],
token2text=cached["token2text"],
)
processed_tokens = ["[CLS]"] + tokens_for_query + ["[SEP]"]
input_id = list(np.array(tokenizer.convert_tokens_to_ids(processed_tokens)))
token_type_id = list(np.zeros((len(processed_tokens),), dtype=int))
attention_mask = list(np.ones((len(processed_tokens),), dtype=int))
input_id = list(np.array(tokenizer.convert_tokens_to_ids(processed_tokens)))
token_type_id = list(np.zeros((len(processed_tokens),), dtype=int))
attention_mask = list(np.ones((len(processed_tokens),), dtype=int))
query_char = text[query_id]
phoneme_mask = (
[1 if i in char2phonemes[query_char] else 0 for i in range(len(labels))] if use_mask else [1] * len(labels)
)
char_id = chars.index(query_char)
position_id = text2token[query_id] + 1 # [CLS] token locate at first place
query_char = text_for_query[query_id_for_query]
if use_mask:
phoneme_mask = char_phoneme_masks[query_char]
else:
phoneme_mask = full_phoneme_mask
char_id = char2id[query_char]
position_id = text2token_for_query[query_id_for_query] + 1 # [CLS] token locate at first place
input_ids.append(input_id)
token_type_ids.append(token_type_id)
@ -83,10 +141,15 @@ def prepare_onnx_input(
char_ids.append(char_id)
position_ids.append(position_id)
max_token_length = max(len(seq) for seq in input_ids)
def _pad_sequences(sequences, pad_value=0):
return [seq + [pad_value] * (max_token_length - len(seq)) for seq in sequences]
outputs = {
"input_ids": np.array(input_ids).astype(np.int64),
"token_type_ids": np.array(token_type_ids).astype(np.int64),
"attention_masks": np.array(attention_masks).astype(np.int64),
"input_ids": np.array(_pad_sequences(input_ids, pad_value=0)).astype(np.int64),
"token_type_ids": np.array(_pad_sequences(token_type_ids, pad_value=0)).astype(np.int64),
"attention_masks": np.array(_pad_sequences(attention_masks, pad_value=0)).astype(np.int64),
"phoneme_masks": np.array(phoneme_masks).astype(np.float32),
"char_ids": np.array(char_ids).astype(np.int64),
"position_ids": np.array(position_ids).astype(np.int64),

View File

@ -10,7 +10,6 @@ from typing import Any, Dict, List, Tuple
import numpy as np
import onnxruntime
import requests
import torch
from opencc import OpenCC
from pypinyin import Style, pinyin
from transformers.models.auto.tokenization_auto import AutoTokenizer
@ -22,9 +21,8 @@ from .utils import load_config
onnxruntime.set_default_logger_severity(3)
try:
onnxruntime.preload_dlls()
except:
except Exception:
pass
# traceback.print_exc()
warnings.filterwarnings("ignore")
model_version = "1.1"
@ -55,6 +53,24 @@ def predict(session, onnx_input: Dict[str, Any], labels: List[str]) -> Tuple[Lis
return all_preds, all_confidences
def _load_json_from_candidates(filename: str, candidate_dirs: List[str]) -> Dict[str, Any]:
for candidate_dir in candidate_dirs:
if not candidate_dir:
continue
json_path = os.path.join(candidate_dir, filename)
if os.path.exists(json_path):
with open(json_path, "r", encoding="utf-8") as fr:
return json.load(fr)
raise FileNotFoundError(f"Cannot locate {filename} in candidate dirs: {candidate_dirs}")
def _find_first_existing_file(*paths: str) -> str:
for path in paths:
if path and os.path.exists(path):
return path
raise FileNotFoundError(f"Files not found: {paths}")
def download_and_decompress(model_dir: str = "G2PWModel/"):
if not os.path.exists(model_dir):
parent_directory = os.path.dirname(model_dir)
@ -62,7 +78,7 @@ def download_and_decompress(model_dir: str = "G2PWModel/"):
extract_dir = os.path.join(parent_directory, "G2PWModel_1.1")
extract_dir_new = os.path.join(parent_directory, "G2PWModel")
print("Downloading g2pw model...")
modelscope_url = "https://www.modelscope.cn/models/kamiorinn/g2pw/resolve/master/G2PWModel_1.1.zip" # "https://paddlespeech.cdn.bcebos.com/Parakeet/released_models/g2p/G2PWModel_1.1.zip"
modelscope_url = "https://www.modelscope.cn/models/kamiorinn/g2pw/resolve/master/G2PWModel_1.1.zip"
with requests.get(modelscope_url, stream=True) as r:
r.raise_for_status()
with open(zip_dir, "wb") as f:
@ -79,7 +95,7 @@ def download_and_decompress(model_dir: str = "G2PWModel/"):
return model_dir
class G2PWOnnxConverter:
class _G2PWBaseOnnxConverter:
def __init__(
self,
model_dir: str = "G2PWModel/",
@ -87,33 +103,16 @@ class G2PWOnnxConverter:
model_source: str = None,
enable_non_tradional_chinese: bool = False,
):
uncompress_path = download_and_decompress(model_dir)
sess_options = onnxruntime.SessionOptions()
sess_options.graph_optimization_level = onnxruntime.GraphOptimizationLevel.ORT_ENABLE_ALL
sess_options.execution_mode = onnxruntime.ExecutionMode.ORT_SEQUENTIAL
sess_options.intra_op_num_threads = 2 if torch.cuda.is_available() else 0
if "CUDAExecutionProvider" in onnxruntime.get_available_providers():
self.session_g2pW = onnxruntime.InferenceSession(
os.path.join(uncompress_path, "g2pW.onnx"),
sess_options=sess_options,
providers=["CUDAExecutionProvider", "CPUExecutionProvider"],
)
else:
self.session_g2pW = onnxruntime.InferenceSession(
os.path.join(uncompress_path, "g2pW.onnx"),
sess_options=sess_options,
providers=["CPUExecutionProvider"],
)
self.config = load_config(config_path=os.path.join(uncompress_path, "config.py"), use_default=True)
self.model_dir = download_and_decompress(model_dir)
self.config = load_config(config_path=os.path.join(self.model_dir, "config.py"), use_default=True)
self.model_source = model_source if model_source else self.config.model_source
self.enable_opencc = enable_non_tradional_chinese
self.tokenizer = AutoTokenizer.from_pretrained(self.model_source)
polyphonic_chars_path = os.path.join(uncompress_path, "POLYPHONIC_CHARS.txt")
monophonic_chars_path = os.path.join(uncompress_path, "MONOPHONIC_CHARS.txt")
polyphonic_chars_path = os.path.join(self.model_dir, "POLYPHONIC_CHARS.txt")
monophonic_chars_path = os.path.join(self.model_dir, "MONOPHONIC_CHARS.txt")
self.polyphonic_chars = [
line.split("\t") for line in open(polyphonic_chars_path, encoding="utf-8").read().strip().split("\n")
]
@ -149,31 +148,45 @@ class G2PWOnnxConverter:
)
self.chars = sorted(list(self.char2phonemes.keys()))
self.char2id = {char: idx for idx, char in enumerate(self.chars)}
self.char_phoneme_masks = (
{
char: [1 if i in self.char2phonemes[char] else 0 for i in range(len(self.labels))]
for char in self.char2phonemes
}
if self.config.use_mask
else None
)
self.polyphonic_chars_new = set(self.chars)
for char in self.non_polyphonic:
if char in self.polyphonic_chars_new:
self.polyphonic_chars_new.remove(char)
self.polyphonic_chars_new.discard(char)
self.monophonic_chars_dict = {char: phoneme for char, phoneme in self.monophonic_chars}
for char in self.non_monophonic:
if char in self.monophonic_chars_dict:
self.monophonic_chars_dict.pop(char)
self.monophonic_chars_dict.pop(char, None)
self.pos_tags = ["UNK", "A", "C", "D", "I", "N", "P", "T", "V", "DE", "SHI"]
default_asset_dir = os.path.normpath(os.path.join(os.path.dirname(__file__), "..", "G2PWModel"))
candidate_asset_dirs = [self.model_dir, default_asset_dir]
self.bopomofo_convert_dict = _load_json_from_candidates(
"bopomofo_to_pinyin_wo_tune_dict.json", candidate_asset_dirs
)
self.char_bopomofo_dict = _load_json_from_candidates("char_bopomofo_dict.json", candidate_asset_dirs)
with open(os.path.join(uncompress_path, "bopomofo_to_pinyin_wo_tune_dict.json"), "r", encoding="utf-8") as fr:
self.bopomofo_convert_dict = json.load(fr)
self.style_convert_func = {
"bopomofo": lambda x: x,
"pinyin": self._convert_bopomofo_to_pinyin,
}[style]
with open(os.path.join(uncompress_path, "char_bopomofo_dict.json"), "r", encoding="utf-8") as fr:
self.char_bopomofo_dict = json.load(fr)
if self.enable_opencc:
self.cc = OpenCC("s2tw")
self.enable_sentence_dedup = os.getenv("g2pw_sentence_dedup", "true").strip().lower() in {
"1",
"true",
"yes",
"y",
"on",
}
def _convert_bopomofo_to_pinyin(self, bopomofo: str) -> str:
tone = bopomofo[-1]
@ -181,9 +194,8 @@ class G2PWOnnxConverter:
component = self.bopomofo_convert_dict.get(bopomofo[:-1])
if component:
return component + tone
else:
print(f'Warning: "{bopomofo}" cannot convert to pinyin')
return None
print(f'Warning: "{bopomofo}" cannot convert to pinyin')
return None
def __call__(self, sentences: List[str]) -> List[List[str]]:
if isinstance(sentences, str):
@ -199,10 +211,9 @@ class G2PWOnnxConverter:
texts, query_ids, sent_ids, partial_results = self._prepare_data(sentences=sentences)
if len(texts) == 0:
# sentences no polyphonic words
return partial_results
onnx_input = prepare_onnx_input(
model_input = prepare_onnx_input(
tokenizer=self.tokenizer,
labels=self.labels,
char2phonemes=self.char2phonemes,
@ -211,9 +222,18 @@ class G2PWOnnxConverter:
query_ids=query_ids,
use_mask=self.config.use_mask,
window_size=None,
char2id=self.char2id,
char_phoneme_masks=self.char_phoneme_masks,
)
preds, confidences = predict(session=self.session_g2pW, onnx_input=onnx_input, labels=self.labels)
if not model_input:
return partial_results
if self.enable_sentence_dedup:
preds, _confidences = self._predict_with_sentence_dedup(model_input=model_input, texts=texts)
else:
preds, _confidences = self._predict(model_input=model_input)
if self.config.use_char_phoneme:
preds = [pred.split(" ")[1] for pred in preds]
@ -226,7 +246,6 @@ class G2PWOnnxConverter:
def _prepare_data(self, sentences: List[str]) -> Tuple[List[str], List[int], List[int], List[List[str]]]:
texts, query_ids, sent_ids, partial_results = [], [], [], []
for sent_id, sent in enumerate(sentences):
# pypinyin works well for Simplified Chinese than Traditional Chinese
sent_s = tranditional_to_simplified(sent)
pypinyin_result = pinyin(sent_s, neutral_tone_with_five=True, style=Style.TONE3)
partial_result = [None] * len(sent)
@ -239,9 +258,81 @@ class G2PWOnnxConverter:
partial_result[i] = self.style_convert_func(self.monophonic_chars_dict[char])
elif char in self.char_bopomofo_dict:
partial_result[i] = pypinyin_result[i][0]
# partial_result[i] = self.style_convert_func(self.char_bopomofo_dict[char][0])
else:
partial_result[i] = pypinyin_result[i][0]
partial_results.append(partial_result)
return texts, query_ids, sent_ids, partial_results
def _predict(self, model_input: Dict[str, Any]) -> Tuple[List[str], List[float]]:
raise NotImplementedError
def _predict_with_sentence_dedup(
self, model_input: Dict[str, Any], texts: List[str]
) -> Tuple[List[str], List[float]]:
if len(texts) <= 1:
return self._predict(model_input=model_input)
grouped_indices: Dict[str, List[int]] = {}
for idx, text in enumerate(texts):
grouped_indices.setdefault(text, []).append(idx)
if all(len(indices) == 1 for indices in grouped_indices.values()):
return self._predict(model_input=model_input)
preds: List[str] = [""] * len(texts)
confidences: List[float] = [0.0] * len(texts)
for indices in grouped_indices.values():
group_input = {name: value[indices] for name, value in model_input.items()}
if len(indices) > 1:
for name in ("input_ids", "token_type_ids", "attention_masks"):
group_input[name] = group_input[name][:1]
group_preds, group_confidences = self._predict(model_input=group_input)
for output_idx, pred, confidence in zip(indices, group_preds, group_confidences):
preds[output_idx] = pred
confidences[output_idx] = confidence
return preds, confidences
class G2PWOnnxConverter(_G2PWBaseOnnxConverter):
def __init__(
self,
model_dir: str = "G2PWModel/",
style: str = "bopomofo",
model_source: str = None,
enable_non_tradional_chinese: bool = False,
):
super().__init__(
model_dir=model_dir,
style=style,
model_source=model_source,
enable_non_tradional_chinese=enable_non_tradional_chinese,
)
sess_options = onnxruntime.SessionOptions()
sess_options.graph_optimization_level = onnxruntime.GraphOptimizationLevel.ORT_ENABLE_ALL
sess_options.execution_mode = onnxruntime.ExecutionMode.ORT_SEQUENTIAL
sess_options.intra_op_num_threads = 2
onnx_path = _find_first_existing_file(
os.path.join(self.model_dir, "g2pW.onnx"),
os.path.join(self.model_dir, "g2pw.onnx"),
)
if "CUDAExecutionProvider" in onnxruntime.get_available_providers():
self.session_g2pw = onnxruntime.InferenceSession(
onnx_path,
sess_options=sess_options,
providers=["CUDAExecutionProvider", "CPUExecutionProvider"],
)
else:
self.session_g2pw = onnxruntime.InferenceSession(
onnx_path,
sess_options=sess_options,
providers=["CPUExecutionProvider"],
)
def _predict(self, model_input: Dict[str, Any]) -> Tuple[List[str], List[float]]:
return predict(session=self.session_g2pw, onnx_input=model_input, labels=self.labels)