功能补全

This commit is contained in:
Downupanddownup 2024-04-24 16:54:51 +08:00
parent 29b8370c45
commit e69e449599
8 changed files with 458 additions and 62 deletions

View File

View File

@ -0,0 +1,67 @@
from tools import my_utils
import glob
import os
class RefAudioListManager:
def __init__(self, root_dir):
self.audio_dict = {'default': []}
absolute_root = os.path.abspath(root_dir)
for subdir, dirs, files in os.walk(absolute_root):
relative_path = os.path.relpath(subdir, absolute_root)
if relative_path == '.':
category = 'default'
else:
category = relative_path.replace(os.sep, '')
for file in files:
if file.endswith('.wav'):
# 将相对路径转换为绝对路径
audio_abs_path = os.path.join(subdir, file)
self.audio_dict[category].append(audio_abs_path)
def get_audio_list(self):
return self.audio_dict
def get_flattened_audio_list(self):
all_audio_files = []
for category_audios in self.audio_dict.values():
all_audio_files.extend(category_audios)
return all_audio_files
def get_ref_audio_list(self):
audio_info_list = []
for category, audio_paths in self.audio_dict.items():
for audio_path in audio_paths:
filename_without_extension = os.path.splitext(os.path.basename(audio_path))[0]
audio_info = {
'emotion': f"{category}-{filename_without_extension}",
'ref_path': audio_path,
'ref_text': filename_without_extension,
}
audio_info_list.append(audio_info)
return audio_info_list
def batch_clean_paths(paths):
"""
批量处理路径列表对每个路径调用 clean_path() 函数
参数:
paths (list[str]): 包含待处理路径的列表
返回:
list[str]: 经过 clean_path() 处理后的路径列表
"""
cleaned_paths = []
for path in paths:
cleaned_paths.append(my_utils.clean_path(path))
return cleaned_paths
def read_text_file_to_list(file_path):
# 按照UTF-8编码打开文件确保能够正确读取中文
with open(file_path, mode='r', encoding='utf-8') as file:
# 读取所有行并存储到一个列表中
lines = file.read().splitlines()
return lines

View File

@ -1,7 +1,9 @@
import os.path import os.path
import gradio as gr import gradio as gr
import Ref_Audio_Selector.tool.ref_audio_opt as ref_audio_opt import Ref_Audio_Selector.tool.audio_similarity as audio_similarity
import Ref_Audio_Selector.tool.audio_inference as audio_inference
import Ref_Audio_Selector.common.common as common
from tools.i18n.i18n import I18nAuto from tools.i18n.i18n import I18nAuto
i18n = I18nAuto() i18n = I18nAuto()
@ -17,14 +19,14 @@ def check_base_info(text_work_space_dir, text_character):
# 从list文件提取参考音频 # 从list文件提取参考音频
def convert_from_list(text_work_space_dir, text_character, text_list_input): def convert_from_list(text_work_space_dir, text_character, text_list_input):
ref_audio_all = os.path.join(text_work_space_dir, 'ref_audio_all') ref_audio_all = os.path.join(text_work_space_dir, 'refer_audio_all')
text_convert_from_list_info = f"转换成功:生成目录${ref_audio_all}" text_convert_from_list_info = f"转换成功:生成目录{ref_audio_all}"
text_sample_dir = ref_audio_all text_sample_dir = ref_audio_all
try: try:
check_base_info(text_work_space_dir, text_character) check_base_info(text_work_space_dir, text_character)
if text_list_input is None or text_list_input == '': if text_list_input is None or text_list_input == '':
raise Exception(i18n("list文件路径不能为空")) raise Exception(i18n("list文件路径不能为空"))
ref_audio_opt.convert_from_list(text_list_input, ref_audio_all) audio_similarity.convert_from_list(text_list_input, ref_audio_all)
except Exception as e: except Exception as e:
text_convert_from_list_info = f"发生异常:{e}" text_convert_from_list_info = f"发生异常:{e}"
text_sample_dir = '' text_sample_dir = ''
@ -34,8 +36,8 @@ def convert_from_list(text_work_space_dir, text_character, text_list_input):
# 基于一个基准音频,从参考音频目录中进行分段抽样 # 基于一个基准音频,从参考音频目录中进行分段抽样
def sample(text_work_space_dir, text_character, text_sample_dir, text_base_voice_path, def sample(text_work_space_dir, text_character, text_sample_dir, text_base_voice_path,
text_subsection_num, text_sample_num, checkbox_similarity_output): text_subsection_num, text_sample_num, checkbox_similarity_output):
text_sample_info = "抽样成功生成目录XXX" ref_audio_dir = os.path.join(text_work_space_dir, 'refer_audio')
ref_audio_dir = "D//tt" text_sample_info = f"抽样成功:生成目录{ref_audio_dir}"
try: try:
check_base_info(text_work_space_dir, text_character) check_base_info(text_work_space_dir, text_character)
if text_sample_dir is None or text_sample_dir == '': if text_sample_dir is None or text_sample_dir == '':
@ -46,7 +48,14 @@ def sample(text_work_space_dir, text_character, text_sample_dir, text_base_voice
raise Exception(i18n("分段数不能为空")) raise Exception(i18n("分段数不能为空"))
if text_sample_num is None or text_sample_num == '': if text_sample_num is None or text_sample_num == '':
raise Exception(i18n("每段随机抽样个数不能为空")) raise Exception(i18n("每段随机抽样个数不能为空"))
pass
similarity_list = audio_similarity.start_similarity_analysis(text_work_space_dir, text_sample_dir, text_base_voice_path, checkbox_similarity_output)
if similarity_list is None:
raise Exception(i18n("相似度分析失败"))
audio_similarity.sample(ref_audio_dir, similarity_list, text_subsection_num, text_sample_num)
except Exception as e: except Exception as e:
text_sample_info = f"发生异常:{e}" text_sample_info = f"发生异常:{e}"
ref_audio_dir = '' ref_audio_dir = ''
@ -61,8 +70,9 @@ def sample(text_work_space_dir, text_character, text_sample_dir, text_base_voice
def model_inference(text_work_space_dir, text_character, text_model_inference_voice_dir, text_url, def model_inference(text_work_space_dir, text_character, text_model_inference_voice_dir, text_url,
text_text, text_ref_path, text_ref_text, text_emotion, text_text, text_ref_path, text_ref_text, text_emotion,
text_test_content): text_test_content):
text_model_inference_info = "推理成功生成目录XXX" inference_dir = os.path.join(text_work_space_dir, 'inference_audio')
text_asr_audio_dir = "D//tt" text_asr_audio_dir = os.path.join(inference_dir, 'text')
text_model_inference_info = f"推理成功:生成目录{inference_dir}"
try: try:
check_base_info(text_work_space_dir, text_character) check_base_info(text_work_space_dir, text_character)
if text_model_inference_voice_dir is None or text_model_inference_voice_dir == '': if text_model_inference_voice_dir is None or text_model_inference_voice_dir == '':
@ -76,7 +86,15 @@ def model_inference(text_work_space_dir, text_character, text_model_inference_vo
if (text_ref_path is None or text_ref_path == '') and (text_ref_text is None or text_ref_text == '') and ( if (text_ref_path is None or text_ref_path == '') and (text_ref_text is None or text_ref_text == '') and (
text_emotion is None or text_emotion == ''): text_emotion is None or text_emotion == ''):
raise Exception(i18n("参考音频路径/文本和角色情绪二选一填写,不能全部为空")) raise Exception(i18n("参考音频路径/文本和角色情绪二选一填写,不能全部为空"))
pass url_composer = audio_inference.URLComposer(text_url, text_emotion, text_text, text_ref_path, text_ref_text)
url_composer.is_valid()
text_list = common.read_text_file_to_list(text_test_content)
if text_list is None or len(text_list) == 0:
raise Exception(i18n("待推理文本内容不能为空"))
ref_audio_manager = common.RefAudioListManager(text_model_inference_voice_dir)
if len(ref_audio_manager.get_audio_list()) == 0:
raise Exception(i18n("待推理的参考音频不能为空"))
audio_inference.generate_audio_files(url_composer, text_list, ref_audio_manager.get_ref_audio_list(), inference_dir)
except Exception as e: except Exception as e:
text_model_inference_info = f"发生异常:{e}" text_model_inference_info = f"发生异常:{e}"
text_asr_audio_dir = '' text_asr_audio_dir = ''
@ -86,8 +104,9 @@ def model_inference(text_work_space_dir, text_character, text_model_inference_vo
# 对推理生成音频执行asr # 对推理生成音频执行asr
def asr(text_work_space_dir, text_character, text_asr_audio_dir, dropdown_asr_model, def asr(text_work_space_dir, text_character, text_asr_audio_dir, dropdown_asr_model,
dropdown_asr_size, dropdown_asr_lang): dropdown_asr_size, dropdown_asr_lang):
text_asr_info = "asr成功生成目录XXX" asr_file = os.path.join(text_work_space_dir, 'asr.list')
text_text_similarity_analysis_path = "D//tt" text_text_similarity_analysis_path = asr_file
text_asr_info = f"asr成功生成文件asr.list"
try: try:
check_base_info(text_work_space_dir, text_character) check_base_info(text_work_space_dir, text_character)
if text_asr_audio_dir is None or text_asr_audio_dir == '': if text_asr_audio_dir is None or text_asr_audio_dir == '':
@ -108,7 +127,8 @@ def asr(text_work_space_dir, text_character, text_asr_audio_dir, dropdown_asr_mo
# 对asr生成的文件与原本的文本内容进行相似度分析 # 对asr生成的文件与原本的文本内容进行相似度分析
def text_similarity_analysis(text_work_space_dir, text_character, def text_similarity_analysis(text_work_space_dir, text_character,
text_text_similarity_analysis_path): text_text_similarity_analysis_path):
text_text_similarity_analysis_info = "相似度分析成功生成目录XXX" similarity_file = os.path.join(text_work_space_dir, 'similarity.txt')
text_text_similarity_analysis_info = f"相似度分析成功:生成文件{similarity_file}"
try: try:
check_base_info(text_work_space_dir, text_character) check_base_info(text_work_space_dir, text_character)
if text_text_similarity_analysis_path is None or text_text_similarity_analysis_path == '': if text_text_similarity_analysis_path is None or text_text_similarity_analysis_path == '':
@ -153,7 +173,8 @@ def sync_ref_audio(text_work_space_dir, text_character, text_sync_ref_audio_dir,
# 根据模板和参考音频目录,生成参考音频配置内容 # 根据模板和参考音频目录,生成参考音频配置内容
def create_config(text_work_space_dir, text_character, text_template, text_sync_ref_audio_dir2): def create_config(text_work_space_dir, text_character, text_template, text_sync_ref_audio_dir2):
text_create_config_info = "配置生成成功生成目录XXX" config_file = os.path.join(text_work_space_dir, 'refer_audio.json')
text_create_config_info = f"配置生成成功:生成文件{config_file}"
try: try:
check_base_info(text_work_space_dir, text_character) check_base_info(text_work_space_dir, text_character)
if text_template is None or text_template == '': if text_template is None or text_template == '':
@ -168,8 +189,12 @@ def create_config(text_work_space_dir, text_character, text_template, text_sync_
# 基于请求路径和参数,合成完整的请求路径 # 基于请求路径和参数,合成完整的请求路径
def whole_url(text_url, text_text, text_ref_path, text_ref_text, text_emotion): def whole_url(text_url, text_text, text_ref_path, text_ref_text, text_emotion):
text_whole_url = f'{text_url}?{text_text}=文本内容&{text_ref_path}=参考音频路径&{text_ref_text}=参考文本&{text_emotion}=情绪类型' url_composer = audio_inference.URLComposer(text_url, text_emotion, text_text, text_ref_path, text_ref_text)
return [text_whole_url] if url_composer.is_emotion():
text_whole_url = url_composer.build_url_with_emotion('测试内容','情绪类型')
else:
text_whole_url = url_composer.build_url_with_ref('测试内容','参考路径','参考文本')
return text_whole_url
with gr.Blocks() as app: with gr.Blocks() as app:

View File

@ -0,0 +1,104 @@
import os
import requests
import urllib.parse
class URLComposer:
def __init__(self, base_url, emotion_param_name, text_param_name, ref_path_param_name, ref_text_param_name):
self.base_url = base_url
self.emotion_param_name = emotion_param_name
self.text_param_name = text_param_name
self.ref_path_param_name = ref_path_param_name
self.ref_text_param_name = ref_text_param_name
def is_valid(self):
if self.base_url is None or self.base_url == '':
raise ValueError("请输入url")
if self.text_param_name is None or self.text_param_name == '':
raise ValueError("请输入text参数名")
if self.emotion_param_name is None and self.ref_path_param_name is None and self.ref_text_param_name is None:
raise ValueError("请输入至少一个参考or情绪的参数")
def is_emotion(self):
return self.emotion_param_name is not None and self.emotion_param_name != ''
def build_url_with_emotion(self, text_value, emotion_value):
if not self.emotion_param_name:
raise ValueError("Emotion parameter name is not set.")
params = {
self.text_param_name: urllib.parse.quote(text_value),
self.emotion_param_name: urllib.parse.quote(emotion_value),
}
return self._append_params_to_url(params)
def build_url_with_ref(self, text_value, ref_path_value, ref_text_value):
if self.emotion_param_name:
raise ValueError("Cannot use reference parameters when emotion parameter is set.")
params = {
self.text_param_name: urllib.parse.quote(text_value),
self.ref_path_param_name: urllib.parse.quote(ref_path_value),
self.ref_text_param_name: urllib.parse.quote(ref_text_value),
}
return self._append_params_to_url(params)
def _append_params_to_url(self, params: dict):
url_with_params = self.base_url
if params:
query_params = '&'.join([f"{k}={v}" for k, v in params.items()])
url_with_params += '?' + query_params if '?' not in self.base_url else '&' + query_params
return url_with_params
def generate_audio_files(url_composer, text_list, emotion_list, output_dir_path):
# Ensure the output directory exists
output_dir = Path(output_dir_path)
output_dir.mkdir(parents=True, exist_ok=True)
# Create subdirectories for text and emotion categories
text_subdir = os.path.join(output_dir, 'text')
text_subdir.mkdir(exist_ok=True)
emotion_subdir = os.path.join(output_dir, 'emotion')
emotion_subdir.mkdir(exist_ok=True)
for text, emotion in zip(text_list, emotion_list):
# Generate audio byte stream using the create_audio function
if url_composer.is_emotion():
real_url = url_composer.build_url_with_emotion(text, emotion['emotion'])
else:
real_url = url_composer.build_url_with_ref(text, emotion['ref_path'], emotion['ref_text'])
audio_bytes = inference_audio_from_api(real_url)
emotion_name = emotion['emotion']
# Save audio files in both directories with the desired structure
text_file_path = os.path.join(text_subdir, text, emotion_name, '.wav')
emotion_file_path = os.path.join(emotion_subdir, emotion_name, text, '.wav')
# Ensure intermediate directories for nested file paths exist
text_file_path.parent.mkdir(parents=True, exist_ok=True)
emotion_file_path.parent.mkdir(parents=True, exist_ok=True)
# Write audio bytes to the respective files
with open(text_file_path, 'wb') as f:
f.write(audio_bytes)
with open(emotion_file_path, 'wb') as f:
f.write(audio_bytes)
def inference_audio_from_api(url):
# 发起GET请求
response = requests.get(url, stream=True)
# 检查响应状态码是否正常例如200表示成功
if response.status_code == 200:
# 返回音频数据的字节流
return response.content
else:
raise Exception(f"Failed to fetch audio from API. Server responded with status code {response.status_code}.")

View File

@ -0,0 +1,182 @@
import os
import shutil
from config import python_exec
from subprocess import Popen
def convert_from_list(list_file, output_dir):
# 创建输出目录,如果它不存在的话
if not os.path.exists(output_dir):
os.makedirs(output_dir)
# 解析.list文件并操作文件
with open(list_file, 'r', encoding='utf-8') as file:
lines = file.readlines()
for line in lines:
parts = line.strip().split('|')
if len(parts) != 4:
print(f"Line format incorrect: {line}")
continue
audio_path, _, _, transcription = parts
# 构建新的文件名和路径
new_filename = transcription + '.wav'
# new_filename = new_filename.replace(' ', '_') # 移除空格
# new_filename = ''.join(e for e in new_filename if e.isalnum() or e in ['_', '.']) # 移除非法字符
new_path = os.path.join(output_dir, new_filename)
# 如果目标文件已存在,不要覆盖
if os.path.exists(new_path):
print(f"File already exists: {new_path}")
continue
try:
# 检查音频文件是否存在
if not os.path.exists(audio_path):
print(f"Audio file does not exist: {audio_path}")
continue
# 复制音频文件到output目录并重命名
shutil.copy2(audio_path, new_path)
print(f"File copied and renamed to: {new_path}")
except Exception as e:
print(f"An error occurred while processing: {audio_path}")
print(e)
print("Processing complete.")
def sample(output_audio_dir, similarity_list, subsection_num, sample_num):
# 按照相似度分值降序排序相似度列表
similarity_list.sort(key=lambda x: x['score'], reverse=True)
# 计算每段的起始索引
step = len(similarity_list) // subsection_num
if len(similarity_list) % subsection_num != 0:
step += 1
# 分段并随机采样
for i in range(subsection_num):
start = i * step
end = (i + 1) * step
end = min(end, len(similarity_list)) # 防止最后一段越界
num = min(sample_num, len(similarity_list[start:end]))
# 随机采样
random.shuffle(similarity_list[start:end])
sampled_subsection = similarity_list[start:start+num]
# 创建并进入子目录
subdir_name = f'subsection_{i+1}'
subdir_path = os.path.join(output_audio_dir, subdir_name)
os.makedirs(subdir_path, exist_ok=True)
# 复制采样结果的音频到子目录
for item in sampled_subsection:
src_path = item['wav_path']
dst_path = os.path.join(subdir_path, os.path.basename(src_path))
shutil.copyfile(src_path, dst_path)
print("Sampling completed.")
def start_similarity_analysis(work_space_dir, sample_dir, base_voice_path, need_similarity_output):
similarity_list = None
similarity_dir = os.path.join(work_space_dir, 'similarity')
os.makedirs(similarity_dir, exist_ok=True)
base_voice_file_name = ref_audio_opt.get_filename_without_extension(base_voice_path)
similarity_file = os.path.join(similarity_dir, f'{base_voice_file_name}.txt')
global p_similarity
if(p_similarity==None):
cmd = f'"{python_exec}" tools/speaker_verification/audio_similarity.py '
cmd += f' -r "{base_voice_path}"'
cmd += f' -c "{sample_dir}"'
cmd += f' -o {similarity_file}'
print(cmd)
p_similarity = Popen(cmd, shell=True)
p_similarity.wait()
if need_similarity_output:
similarity_list = ref_audio_opt.parse_similarity_file(similarity_file)
similarity_file_dir = os.path.dirname(similarity_dir, base_voice_file_name)
ref_audio_opt.copy_and_move(similarity_file_dir, similarity_list)
p_similarity=None
return similarity_list
else:
return similarity_list
def parse_similarity_file(file_path):
"""
解析指定文本文件将其中的内容以元组形式存入列表
参数:
file_path (str): 待解析的文本文件路径
返回:
list[tuple[float, str]]: 存储浮点数和路径的元组列表
"""
result_list = []
with open(file_path, 'r') as file:
for line in file:
# 去除行尾换行符并按'|'分割
score, filepath = line.strip().split('|')
# 将浮点数字符串转换为浮点数类型
score = float(score)
# 将得分和路径作为元组添加到结果列表
result_list.append({
'score': score,
'wav_path': filepath
})
return result_list
def copy_and_move(output_audio_directory, similarity_scores):
# 确保新目录存在
if not os.path.exists(output_audio_directory):
os.makedirs(output_audio_directory)
# 遍历并复制文件
for item in similarity_scores:
# 构造新的文件名
base_name = os.path.basename(item['wav_path'])[:-4] # 去掉.wav扩展名
new_name = f"{item['score']}-{base_name}.wav"
# 新文件的完整路径
new_path = os.path.join(output_audio_directory, new_name)
# 复制文件到新目录
shutil.copyfile(item['wav_path'], new_path)
print("已完成复制和重命名操作。")
def get_filename_without_extension(file_path):
"""
Given a file path string, returns the file name without its extension.
Parameters:
file_path (str): The full path to the file.
Returns:
str: The file name without its extension.
"""
base_name = os.path.basename(file_path) # Get the base name (file name with extension)
file_name, file_extension = os.path.splitext(base_name) # Split the base name into file name and extension
return file_name # Return the file name without extension

View File

@ -1,46 +0,0 @@
import os
import shutil
def convert_from_list(list_file, output_dir):
# 创建输出目录,如果它不存在的话
if not os.path.exists(output_dir):
os.makedirs(output_dir)
# 解析.list文件并操作文件
with open(list_file, 'r', encoding='utf-8') as file:
lines = file.readlines()
for line in lines:
parts = line.strip().split('|')
if len(parts) != 4:
print(f"Line format incorrect: {line}")
continue
audio_path, _, _, transcription = parts
# 构建新的文件名和路径
new_filename = transcription + '.wav'
# new_filename = new_filename.replace(' ', '_') # 移除空格
# new_filename = ''.join(e for e in new_filename if e.isalnum() or e in ['_', '.']) # 移除非法字符
new_path = os.path.join(output_dir, new_filename)
# 如果目标文件已存在,不要覆盖
if os.path.exists(new_path):
print(f"File already exists: {new_path}")
continue
try:
# 检查音频文件是否存在
if not os.path.exists(audio_path):
print(f"Audio file does not exist: {audio_path}")
continue
# 复制音频文件到output目录并重命名
shutil.copy2(audio_path, new_path)
print(f"File copied and renamed to: {new_path}")
except Exception as e:
print(f"An error occurred while processing: {audio_path}")
print(e)
print("Processing complete.")

View File

View File

@ -0,0 +1,64 @@
import argparse
import os
from modelscope.pipelines import pipeline
sv_pipeline = pipeline(
task='speaker-verification',
model='/tools/speaker_verification/models/speech_campplus_sv_zh-cn_16k-common',
model_revision='v1.0.0'
)
def compare_audio_and_generate_report(reference_audio_path, comparison_dir_path, output_file_path):
# Step 1: 获取比较音频目录下所有音频文件的路径
comparison_audio_paths = [os.path.join(comparison_dir_path, f) for f in os.listdir(comparison_dir_path) if f.endswith('.wav')]
# Step 2: 用参考音频依次比较音频目录下的每个音频,获取相似度分数及对应路径
similarity_scores = []
for audio_path in comparison_audio_paths:
score = sv_pipeline([reference_audio_path, audio_path])['score']
similarity_scores.append({
'score': score,
'path': audio_path
})
# Step 3: 根据相似度分数降序排列
similarity_scores.sort(key=lambda x: x['score'], reverse=True)
# Step 4: 处理输出文件不存在的情况,创建新文件
if not os.path.exists(output_file_path):
open(output_file_path, 'w').close() # Create an empty file
# Step 5: 将排序后的结果写入输出结果文件(支持中文)
formatted_scores = [f'{item["score"]}|{item["path"]}' for item in similarity_scores]
with open(output_file_path, 'w', encoding='utf-8') as f:
# 使用'\n'将每个字符串分开,使其写入不同行
content = '\n'.join(formatted_scores )
f.write(content)
def parse_arguments():
parser = argparse.ArgumentParser(description="Audio processing script arguments")
# Reference audio path
parser.add_argument("-r", "--reference_audio", type=str, required=True,
help="Path to the reference WAV file.")
# Comparison directory path
parser.add_argument("-c", "--comparison_dir", type=str, required=True,
help="Path to the directory containing comparison WAV files.")
# Output file path
parser.add_argument("-o", "--output_file", type=str, required=True,
help="Path to the output file where results will be written.")
return parser.parse_args()
if __name__ == '__main__':
cmd = parse_arguments()
compare_audio_and_generate_report(
reference_audio_path = cmd.reference_audio,
comparison_dir = cmd.comparison_dir,
output_file = cmd.output_file,
)