import pickle import os import re import wordsegment from g2p_en import G2p from string import punctuation from text import symbols current_file_path = os.path.dirname(__file__) CMU_DICT_PATH = os.path.join(current_file_path, "cmudict.rep") CMU_DICT_FAST_PATH = os.path.join(current_file_path, "cmudict-fast.rep") CMU_DICT_HOT_PATH = os.path.join(current_file_path, "engdict-hot.rep") CACHE_PATH = os.path.join(current_file_path, "engdict_cache.pickle") arpa = { "AH0", "S", "AH1", "EY2", "AE2", "EH0", "OW2", "UH0", "NG", "B", "G", "AY0", "M", "AA0", "F", "AO0", "ER2", "UH1", "IY1", "AH2", "DH", "IY0", "EY1", "IH0", "K", "N", "W", "IY2", "T", "AA1", "ER1", "EH2", "OY0", "UH2", "UW1", "Z", "AW2", "AW1", "V", "UW2", "AA2", "ER", "AW0", "UW0", "R", "OW1", "EH1", "ZH", "AE0", "IH2", "IH", "Y", "JH", "P", "AY1", "EY0", "OY2", "TH", "HH", "D", "ER0", "CH", "AO1", "AE1", "AO2", "OY1", "AY2", "IH1", "OW0", "L", "SH", } def replace_phs(phs): rep_map = {";": ",", ":": ",", "'": "-", '"': "-"} phs_new = [] for ph in phs: if ph in symbols: phs_new.append(ph) elif ph in rep_map.keys(): phs_new.append(rep_map[ph]) else: print("ph not in symbols: ", ph) return phs_new def read_dict(): g2p_dict = {} start_line = 49 with open(CMU_DICT_PATH) as f: line = f.readline() line_index = 1 while line: if line_index >= start_line: line = line.strip() word_split = line.split(" ") word = word_split[0].lower() syllable_split = word_split[1].split(" - ") g2p_dict[word] = [] for syllable in syllable_split: phone_split = syllable.split(" ") g2p_dict[word].append(phone_split) line_index = line_index + 1 line = f.readline() return g2p_dict def read_dict_new(): g2p_dict = {} with open(CMU_DICT_PATH) as f: line = f.readline() line_index = 1 while line: if line_index >= 57: line = line.strip() word_split = line.split(" ") word = word_split[0].lower() g2p_dict[word] = [word_split[1].split(" ")] line_index = line_index + 1 line = f.readline() with open(CMU_DICT_FAST_PATH) as f: line = f.readline() line_index = 1 while line: if line_index >= 0: line = line.strip() word_split = line.split(" ") word = word_split[0].lower() if word not in g2p_dict: g2p_dict[word] = [word_split[1:]] line_index = line_index + 1 line = f.readline() with open(CMU_DICT_HOT_PATH) as f: line = f.readline() line_index = 1 while line: if line_index >= 0: line = line.strip() word_split = line.split(" ") word = word_split[0].lower() # 自定义发音词直接覆盖字典 g2p_dict[word] = [word_split[1:]] line_index = line_index + 1 line = f.readline() return g2p_dict def cache_dict(g2p_dict, file_path): with open(file_path, "wb") as pickle_file: pickle.dump(g2p_dict, pickle_file) def get_dict(): if os.path.exists(CACHE_PATH): with open(CACHE_PATH, "rb") as pickle_file: g2p_dict = pickle.load(pickle_file) else: g2p_dict = read_dict_new() cache_dict(g2p_dict, CACHE_PATH) return g2p_dict eng_dict = get_dict() def text_normalize(text): # todo: eng text normalize # 适配 g2p_en 标点 return text.replace(";", ",").replace(":", ",").replace('"', "'") class en_G2p(G2p): def __init__(self): super().__init__() # 分词初始化 wordsegment.load() # 扩展过时字典 self.cmu = get_dict() # 剔除读音错误的几个缩写 for word in ["AE", "AI", "AR", "IOS", "HUD", "OS"]: del self.cmu[word.lower()] # "A" 落单不读 "AH0" 读 "EY1" self.cmu['a'] = [['EY1']] def predict(self, word): # 小写 oov 长度小于等于 3 直接读字母 if (len(word) <= 3): return [phone for w in word for phone in self(w)] # 尝试进行分词,应对复合词 comps = wordsegment.segment(word.lower()) # 无法分词的送回去预测 if len(comps)==1: return super().predict(word) # 可以分词的递归处理 return [phone for comp in comps for phone in self(comp)] _g2p = en_G2p() def g2p(text): text = text_normalize(text) # g2p_en 整段推理,剔除不存在的arpa返回 phone_list = _g2p(text) phones = [ph if ph != "" else "UNK" for ph in phone_list if ph not in [" ", "", "UW", "", ""]] return replace_phs(phones) if __name__ == "__main__": # print(get_dict()) print(g2p("hello")) print(g2p("In this; paper, we propose 1 DSPGAN, a GAN-based universal vocoder.")) # all_phones = set() # for k, syllables in eng_dict.items(): # for group in syllables: # for ph in group: # all_phones.add(ph) # print(all_phones)