添加文本相似度比较功能

This commit is contained in:
Downupanddownup 2024-04-25 11:54:13 +08:00
parent 2c8f6bd4c9
commit 4daa9ad53c
10 changed files with 248 additions and 71 deletions

View File

@ -92,4 +92,14 @@ def read_file(file_path):
# 文件在with语句结束时会自动关闭
# 现在file_content变量中存储了文件的所有文本内容
return file_content
return file_content
def write_text_to_file(text, output_file_path):
try:
with open(output_file_path, 'w', encoding='utf-8') as file:
file.write(text)
except IOError as e:
print(f"Error occurred while writing to the file: {e}")
else:
print(f"Text successfully written to file: {output_file_path}")

View File

@ -1,19 +1,23 @@
import os.path
import os
import traceback
import gradio as gr
import Ref_Audio_Selector.tool.audio_similarity as audio_similarity
import Ref_Audio_Selector.tool.audio_inference as audio_inference
import Ref_Audio_Selector.tool.audio_asr as audio_asr
import Ref_Audio_Selector.tool.audio_config as audio_config
import Ref_Audio_Selector.common.common as common
from tools.i18n.i18n import I18nAuto
from config import python_exec
from config import python_exec, is_half
from tools import my_utils
from tools.asr.config import asr_dict
from subprocess import Popen
i18n = I18nAuto()
p_similarity = None
p_asr = None
p_text_similarity = None
# 校验基础信息
@ -159,7 +163,7 @@ def asr(text_work_space_dir, text_asr_audio_dir, dropdown_asr_model,
raise Exception(i18n("asr模型大小不能为空"))
if dropdown_asr_lang is None or dropdown_asr_lang == '':
raise Exception(i18n("asr语言不能为空"))
asr_file = audio_asr.open_asr(text_asr_audio_dir, text_work_space_dir, dropdown_asr_model, dropdown_asr_size,
asr_file = open_asr(text_asr_audio_dir, text_work_space_dir, dropdown_asr_model, dropdown_asr_size,
dropdown_asr_lang)
text_text_similarity_analysis_path = asr_file
text_asr_info = f"asr成功生成文件{asr_file}"
@ -170,22 +174,72 @@ def asr(text_work_space_dir, text_asr_audio_dir, dropdown_asr_model,
return [text_asr_info, text_text_similarity_analysis_path]
def open_asr(asr_inp_dir, asr_opt_dir, asr_model, asr_model_size, asr_lang):
global p_asr
if p_asr is None:
asr_inp_dir = my_utils.clean_path(asr_inp_dir)
asr_py_path = asr_dict[asr_model]["path"]
if asr_py_path == 'funasr_asr.py':
asr_py_path = 'funasr_asr_multi_level_dir.py'
if asr_py_path == 'fasterwhisper.py':
asr_py_path = 'fasterwhisper_asr_multi_level_dir.py'
cmd = f'"{python_exec}" Ref_Audio_Selector/tool/asr/{asr_py_path} '
cmd += f' -i "{asr_inp_dir}"'
cmd += f' -o "{asr_opt_dir}"'
cmd += f' -s {asr_model_size}'
cmd += f' -l {asr_lang}'
cmd += " -p %s" % ("float16" if is_half == True else "float32")
print(cmd)
p_asr = Popen(cmd, shell=True)
p_asr.wait()
p_asr = None
output_dir_abs = os.path.abspath(asr_opt_dir)
output_file_name = os.path.basename(asr_inp_dir)
# 构造输出文件路径
output_file_path = os.path.join(output_dir_abs, f'{output_file_name}_asr.list')
return output_file_path
else:
return None
# 对asr生成的文件与原本的文本内容进行相似度分析
def text_similarity_analysis(text_work_space_dir,
text_text_similarity_analysis_path):
similarity_file = os.path.join(text_work_space_dir, 'similarity.txt')
text_text_similarity_analysis_info = f"相似度分析成功:生成文件{similarity_file}"
similarity_dir = os.path.join(text_work_space_dir, 'text_similarity')
text_text_similarity_analysis_info = f"相似度分析成功:生成目录{similarity_dir}"
try:
check_base_info(text_work_space_dir)
if text_text_similarity_analysis_path is None or text_text_similarity_analysis_path == '':
raise Exception(i18n("asr生成的文件路径不能为空请先完成上一步操作"))
pass
open_text_similarity_analysis(text_text_similarity_analysis_path, similarity_dir)
except Exception as e:
traceback.print_exc()
text_text_similarity_analysis_info = f"发生异常:{e}"
return text_text_similarity_analysis_info
def open_text_similarity_analysis(asr_file_path, output_dir, similarity_enlarge_boundary=0.8):
global p_text_similarity
if p_text_similarity is None:
cmd = f'"{python_exec}" Ref_Audio_Selector/tool/text_comparison/asr_text_process.py '
cmd += f' -a "{asr_file_path}"'
cmd += f' -o "{output_dir}"'
cmd += f' -b {similarity_enlarge_boundary}'
print(cmd)
p_text_similarity = Popen(cmd, shell=True)
p_text_similarity.wait()
p_text_similarity = None
return output_dir
else:
return None
# 根据一个参考音频,对指定目录下的音频进行相似度分析,并输出到另一个目录
def similarity_audio_output(text_work_space_dir, text_base_audio_path,
text_compare_audio_dir):

View File

@ -7,22 +7,24 @@ from tqdm import tqdm
from funasr import AutoModel
path_asr = 'tools/asr/models/speech_paraformer-large_asr_nat-zh-cn-16k-common-vocab8404-pytorch'
path_vad = 'tools/asr/models/speech_fsmn_vad_zh-cn-16k-common-pytorch'
path_asr = 'tools/asr/models/speech_paraformer-large_asr_nat-zh-cn-16k-common-vocab8404-pytorch'
path_vad = 'tools/asr/models/speech_fsmn_vad_zh-cn-16k-common-pytorch'
path_punc = 'tools/asr/models/punc_ct-transformer_zh-cn-common-vocab272727-pytorch'
path_asr = path_asr if os.path.exists(path_asr) else "iic/speech_paraformer-large_asr_nat-zh-cn-16k-common-vocab8404-pytorch"
path_vad = path_vad if os.path.exists(path_vad) else "iic/speech_fsmn_vad_zh-cn-16k-common-pytorch"
path_asr = path_asr if os.path.exists(
path_asr) else "iic/speech_paraformer-large_asr_nat-zh-cn-16k-common-vocab8404-pytorch"
path_vad = path_vad if os.path.exists(path_vad) else "iic/speech_fsmn_vad_zh-cn-16k-common-pytorch"
path_punc = path_punc if os.path.exists(path_punc) else "iic/punc_ct-transformer_zh-cn-common-vocab272727-pytorch"
model = AutoModel(
model = path_asr,
model_revision = "v2.0.4",
vad_model = path_vad,
vad_model_revision = "v2.0.4",
punc_model = path_punc,
punc_model_revision = "v2.0.4",
model=path_asr,
model_revision="v2.0.4",
vad_model=path_vad,
vad_model_revision="v2.0.4",
punc_model=path_punc,
punc_model_revision="v2.0.4",
)
def only_asr(input_file):
try:
text = model.generate(input=input_file)[0]["text"]
@ -31,6 +33,7 @@ def only_asr(input_file):
print(traceback.format_exc())
return text
def execute_asr(input_folder, output_folder, model_size, language):
input_file_names = os.listdir(input_folder)
input_file_names.sort()
@ -40,7 +43,7 @@ def execute_asr(input_folder, output_folder, model_size, language):
for name in tqdm(input_file_names):
try:
text = model.generate(input="%s/%s"%(input_folder, name))[0]["text"]
text = model.generate(input="%s/%s" % (input_folder, name))[0]["text"]
output.append(f"{input_folder}/{name}|{output_file_name}|{language.upper()}|{text}")
except:
print(traceback.format_exc())
@ -64,12 +67,13 @@ def execute_asr_multi_level_dir(input_folder, output_folder, model_size, languag
# 只处理wav文件假设是wav文件
if name.endswith(".wav"):
try:
original_text = os.path.basename(root)
# 构造完整的输入音频文件路径
input_file_path = os.path.join(root, name)
input_file_path = os.path.normpath(input_file_path) # 先标准化可能存在混合斜杠的情况
text = model.generate(input=input_file_path)[0]["text"]
asr_text = model.generate(input=input_file_path)[0]["text"]
output.append(f"{input_file_path}|{output_file_name}|{language.upper()}|{text}")
output.append(f"{input_file_path}|{original_text}|{language.upper()}|{asr_text}")
except:
print(traceback.format_exc())
@ -80,7 +84,7 @@ def execute_asr_multi_level_dir(input_folder, output_folder, model_size, languag
os.makedirs(output_dir_abs, exist_ok=True)
# 构造输出文件路径
output_file_path = os.path.join(output_dir_abs, f'{output_file_name}.list')
output_file_path = os.path.join(output_dir_abs, f'{output_file_name}_asr.list')
# 将输出写入文件
with open(output_file_path, "w", encoding="utf-8") as f:
@ -89,6 +93,7 @@ def execute_asr_multi_level_dir(input_folder, output_folder, model_size, languag
return output_file_path
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument("-i", "--input_folder", type=str, required=True,
@ -99,13 +104,13 @@ if __name__ == '__main__':
help="Model Size of FunASR is Large")
parser.add_argument("-l", "--language", type=str, default='zh', choices=['zh'],
help="Language of the audio files.")
parser.add_argument("-p", "--precision", type=str, default='float16', choices=['float16','float32'],
help="fp16 or fp32")#还没接入
parser.add_argument("-p", "--precision", type=str, default='float16', choices=['float16', 'float32'],
help="fp16 or fp32") # 还没接入
cmd = parser.parse_args()
execute_asr_multi_level_dir(
input_folder = cmd.input_folder,
output_folder = cmd.output_folder,
model_size = cmd.model_size,
language = cmd.language,
input_folder=cmd.input_folder,
output_folder=cmd.output_folder,
model_size=cmd.model_size,
language=cmd.language,
)

View File

@ -1,36 +1,4 @@
import os
from config import python_exec, is_half
from tools import my_utils
from tools.asr.config import asr_dict
from subprocess import Popen
def open_asr(asr_inp_dir, asr_opt_dir, asr_model, asr_model_size, asr_lang):
global p_asr
if (p_asr == None):
asr_inp_dir = my_utils.clean_path(asr_inp_dir)
asr_py_path = asr_dict[asr_model]["path"]
if asr_py_path == 'funasr_asr.py':
asr_py_path = 'funasr_asr_multi_level_dir.py'
if asr_py_path == 'fasterwhisper.py':
asr_py_path = 'fasterwhisper_asr_multi_level_dir.py'
cmd = f'"{python_exec}" tools/asr/{asr_py_path}'
cmd += f' -i "{asr_inp_dir}"'
cmd += f' -o "{asr_opt_dir}"'
cmd += f' -s {asr_model_size}'
cmd += f' -l {asr_lang}'
cmd += " -p %s" % ("float16" if is_half == True else "float32")
print(cmd)
p_asr = Popen(cmd, shell=True)
p_asr.wait()
p_asr = None
output_dir_abs = os.path.abspath(asr_opt_dir)
output_file_name = os.path.basename(asr_inp_dir)
# 构造输出文件路径
output_file_path = os.path.join(output_dir_abs, f'{output_file_name}.list')
return output_file_path
else:
return None

View File

@ -1,7 +1,9 @@
import os
import requests
from pathlib import Path
import itertools
from urllib.parse import urlparse, parse_qs, urlencode, urlunparse, quote
from tools.i18n.i18n import I18nAuto
i18n = I18nAuto()
class URLComposer:
@ -14,13 +16,13 @@ class URLComposer:
def is_valid(self):
if self.base_url is None or self.base_url == '':
raise ValueError("请输入url")
raise ValueError(i18n("请输入url"))
if self.text_param_name is None or self.text_param_name == '':
raise ValueError("请输入text参数名")
raise ValueError(i18n("请输入text参数名"))
if self.emotion_param_name is None and self.ref_path_param_name is None and self.ref_text_param_name is None:
raise ValueError("请输入至少一个参考or情绪的参数")
raise ValueError(i18n("请输入至少一个参考or情绪的参数"))
def is_emotion(self):
return self.emotion_param_name is not None and self.emotion_param_name != ''
@ -83,7 +85,10 @@ def generate_audio_files(url_composer, text_list, emotion_list, output_dir_path)
emotion_subdir = os.path.join(output_dir, 'emotion')
os.makedirs(emotion_subdir, exist_ok=True)
for text, emotion in zip(text_list, emotion_list):
# 计算笛卡尔积
cartesian_product = list(itertools.product(text_list, emotion_list))
for text, emotion in cartesian_product:
# Generate audio byte stream using the create_audio function
if url_composer.is_emotion():

View File

@ -0,0 +1,136 @@
import os
import argparse
from collections import defaultdict
from operator import itemgetter
import Ref_Audio_Selector.tool.text_comparison.text_comparison as text_comparison
import Ref_Audio_Selector.common.common as common
def parse_asr_file(file_path):
output = []
with open(file_path, 'r', encoding='utf-8') as file:
for line in file:
# 假设每行都是正确的格式,且"|"'是固定分隔符
input_file_path, original_text, language, asr_text = line.strip().split('|')
emotion = common.get_filename_without_extension(input_file_path)
# 将解析出的数据构造成新的字典或元组等结构
parsed_data = {
'emotion': emotion,
'input_file_path': input_file_path,
'original_text': original_text,
'language': language,
'asr_text': asr_text,
'similarity_score': 0
}
output.append(parsed_data)
return output
def calculate_similarity_and_append_to_list(input_list, boundary):
for item in input_list:
similarity_score = text_comparison.calculate_result(item['original_text'], item['asr_text'], boundary)
item['similarity_score'] = similarity_score
return input_list
def calculate_average_similarity_by_emotion(data_list):
result_dict = defaultdict(list)
for item in data_list:
emotion = item['emotion']
similarity_score = item['similarity_score']
result_dict[emotion].append(similarity_score)
average_scores = [{'emotion': emotion, 'average_similarity_score': sum(scores) / len(scores)}
for emotion, scores in result_dict.items()]
average_scores.sort(key=lambda x: x['average_similarity_score'], reverse=True)
return average_scores
def group_and_sort_by_field(data, group_by_field):
# 创建一个空的结果字典键是group_by_field指定的字段值是一个列表
result_dict = defaultdict(list)
# 遍历输入列表
for item in data:
# 根据指定的group_by_field将当前元素添加到对应键的列表中
key_to_group = item[group_by_field]
result_dict[key_to_group].append(item)
# 对每个键对应的列表中的元素按similarity_score降序排序
for key in result_dict:
result_dict[key].sort(key=itemgetter('similarity_score'), reverse=True)
# 将结果字典转换为列表每个元素是一个包含键emotion或original_text和排序后数组的元组
result_list = [(k, v) for k, v in result_dict.items()]
return result_list
def format_list_to_text(data_list, output_filename):
with open(output_filename, 'w', encoding='utf-8') as output_file:
for key, items in data_list:
# 写入情绪标题
output_file.write(key + '\n')
# 写入每条记录
for item in items:
formatted_line = f"{item['similarity_score']}|{item['original_text']}|{item['asr_text']}\n"
output_file.write(formatted_line)
def process(asr_file_path, output_dir, similarity_enlarge_boundary):
# 检查输出目录是否存在,如果不存在则创建
if not os.path.exists(output_dir):
os.makedirs(output_dir)
records = parse_asr_file(asr_file_path)
calculate_similarity_and_append_to_list(records, similarity_enlarge_boundary)
average_similarity_list = calculate_average_similarity_by_emotion(records)
average_similarity_file = os.path.join(output_dir, 'average_similarity.txt')
average_similarity_content = \
'\n'.join([f"{item['average_similarity_score']}|{item['emotion']}" for item in average_similarity_list])
common.write_text_to_file(average_similarity_content, average_similarity_file)
emotion_detail_list = group_and_sort_by_field(records, 'emotion')
emotion_detail_file = os.path.join(output_dir, 'emotion_group_detail.txt')
format_list_to_text(emotion_detail_list, emotion_detail_file)
original_text_detail_list = group_and_sort_by_field(records, 'original_text')
original_text_detail_file = os.path.join(output_dir, 'text_group_detail.txt')
format_list_to_text(original_text_detail_list, original_text_detail_file)
print('文本相似度分析完成。')
def parse_arguments():
parser = argparse.ArgumentParser(description="Process ASR files and analyze similarity.")
parser.add_argument("-a", "--asr_file_path", type=str, required=True,
help="Path to the directory containing ASR files or path to a single ASR file.")
parser.add_argument("-o", "--output_dir", type=str, required=True,
help="Path to the directory where the analysis results should be saved.")
parser.add_argument("-b", "--similarity_enlarge_boundary", type=float, required=True,
help="Similarity score boundary value to be used in your calculations.")
args = parser.parse_args()
return args
if __name__ == '__main__':
cmd = parse_arguments()
print(cmd)
process(cmd.asr_file_path, cmd.output_dir, cmd.similarity_enlarge_boundary)

View File

@ -8,7 +8,6 @@ bert_path = os.environ.get(
"bert_path", "GPT_SoVITS/pretrained_models/chinese-roberta-wwm-ext-large"
)
tokenizer = AutoTokenizer.from_pretrained(bert_path)
model = AutoModel.from_pretrained(bert_path)
@ -28,26 +27,26 @@ def calculate_similarity(text1, text2, max_length=512):
return similarity
# 对0.8-1区间的值进行放大
def adjusted_similarity(similarity_score2, boundary=0.8):
# 对boundary到1区间的值进行放大
def adjusted_similarity(similarity_score2, boundary=0.8):
if similarity_score2 < boundary:
return 0
# 倍数
multiple = 1/(1 - boundary)
multiple = 1 / (1 - boundary)
adjusted_score = (similarity_score2 - boundary)*multiple
adjusted_score = (similarity_score2 - boundary) * multiple
return adjusted_score
def calculate_result(t1, t2):
def calculate_result(t1, t2, boundary):
# 计算并打印相似度
similarity_score2 = calculate_similarity(t1, t2)
# 调整相似度
adjusted_similarity_score2 = adjusted_similarity(similarity_score2)
adjusted_similarity_score2 = adjusted_similarity(similarity_score2, boundary)
return similarity_score2, adjusted_similarity_score2