diff --git a/Ref_Audio_Selector/common/__init__.py b/Ref_Audio_Selector/common/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/Ref_Audio_Selector/common/common.py b/Ref_Audio_Selector/common/common.py new file mode 100644 index 0000000..f7de06d --- /dev/null +++ b/Ref_Audio_Selector/common/common.py @@ -0,0 +1,67 @@ +from tools import my_utils +import glob +import os + +class RefAudioListManager: + def __init__(self, root_dir): + self.audio_dict = {'default': []} + absolute_root = os.path.abspath(root_dir) + + for subdir, dirs, files in os.walk(absolute_root): + relative_path = os.path.relpath(subdir, absolute_root) + + if relative_path == '.': + category = 'default' + else: + category = relative_path.replace(os.sep, '') + + for file in files: + if file.endswith('.wav'): + # 将相对路径转换为绝对路径 + audio_abs_path = os.path.join(subdir, file) + self.audio_dict[category].append(audio_abs_path) + + def get_audio_list(self): + return self.audio_dict + + def get_flattened_audio_list(self): + all_audio_files = [] + for category_audios in self.audio_dict.values(): + all_audio_files.extend(category_audios) + return all_audio_files + + def get_ref_audio_list(self): + audio_info_list = [] + for category, audio_paths in self.audio_dict.items(): + for audio_path in audio_paths: + filename_without_extension = os.path.splitext(os.path.basename(audio_path))[0] + audio_info = { + 'emotion': f"{category}-{filename_without_extension}", + 'ref_path': audio_path, + 'ref_text': filename_without_extension, + } + audio_info_list.append(audio_info) + return audio_info_list + +def batch_clean_paths(paths): + """ + 批量处理路径列表,对每个路径调用 clean_path() 函数。 + + 参数: + paths (list[str]): 包含待处理路径的列表。 + + 返回: + list[str]: 经过 clean_path() 处理后的路径列表。 + """ + cleaned_paths = [] + for path in paths: + cleaned_paths.append(my_utils.clean_path(path)) + return cleaned_paths + + +def read_text_file_to_list(file_path): + # 按照UTF-8编码打开文件(确保能够正确读取中文) + with open(file_path, mode='r', encoding='utf-8') as file: + # 读取所有行并存储到一个列表中 + lines = file.read().splitlines() + return lines \ No newline at end of file diff --git a/Ref_Audio_Selector/ref_audio_selector_webui.py b/Ref_Audio_Selector/ref_audio_selector_webui.py index 6946fee..74f2f2a 100644 --- a/Ref_Audio_Selector/ref_audio_selector_webui.py +++ b/Ref_Audio_Selector/ref_audio_selector_webui.py @@ -1,7 +1,9 @@ import os.path import gradio as gr -import Ref_Audio_Selector.tool.ref_audio_opt as ref_audio_opt +import Ref_Audio_Selector.tool.audio_similarity as audio_similarity +import Ref_Audio_Selector.tool.audio_inference as audio_inference +import Ref_Audio_Selector.common.common as common from tools.i18n.i18n import I18nAuto i18n = I18nAuto() @@ -17,14 +19,14 @@ def check_base_info(text_work_space_dir, text_character): # 从list文件,提取参考音频 def convert_from_list(text_work_space_dir, text_character, text_list_input): - ref_audio_all = os.path.join(text_work_space_dir, 'ref_audio_all') - text_convert_from_list_info = f"转换成功:生成目录${ref_audio_all}" + ref_audio_all = os.path.join(text_work_space_dir, 'refer_audio_all') + text_convert_from_list_info = f"转换成功:生成目录{ref_audio_all}" text_sample_dir = ref_audio_all try: check_base_info(text_work_space_dir, text_character) if text_list_input is None or text_list_input == '': raise Exception(i18n("list文件路径不能为空")) - ref_audio_opt.convert_from_list(text_list_input, ref_audio_all) + audio_similarity.convert_from_list(text_list_input, ref_audio_all) except Exception as e: text_convert_from_list_info = f"发生异常:{e}" text_sample_dir = '' @@ -34,8 +36,8 @@ def convert_from_list(text_work_space_dir, text_character, text_list_input): # 基于一个基准音频,从参考音频目录中进行分段抽样 def sample(text_work_space_dir, text_character, text_sample_dir, text_base_voice_path, text_subsection_num, text_sample_num, checkbox_similarity_output): - text_sample_info = "抽样成功:生成目录XXX" - ref_audio_dir = "D://tt" + ref_audio_dir = os.path.join(text_work_space_dir, 'refer_audio') + text_sample_info = f"抽样成功:生成目录{ref_audio_dir}" try: check_base_info(text_work_space_dir, text_character) if text_sample_dir is None or text_sample_dir == '': @@ -46,7 +48,14 @@ def sample(text_work_space_dir, text_character, text_sample_dir, text_base_voice raise Exception(i18n("分段数不能为空")) if text_sample_num is None or text_sample_num == '': raise Exception(i18n("每段随机抽样个数不能为空")) - pass + + similarity_list = audio_similarity.start_similarity_analysis(text_work_space_dir, text_sample_dir, text_base_voice_path, checkbox_similarity_output) + + if similarity_list is None: + raise Exception(i18n("相似度分析失败")) + + audio_similarity.sample(ref_audio_dir, similarity_list, text_subsection_num, text_sample_num) + except Exception as e: text_sample_info = f"发生异常:{e}" ref_audio_dir = '' @@ -61,8 +70,9 @@ def sample(text_work_space_dir, text_character, text_sample_dir, text_base_voice def model_inference(text_work_space_dir, text_character, text_model_inference_voice_dir, text_url, text_text, text_ref_path, text_ref_text, text_emotion, text_test_content): - text_model_inference_info = "推理成功:生成目录XXX" - text_asr_audio_dir = "D://tt" + inference_dir = os.path.join(text_work_space_dir, 'inference_audio') + text_asr_audio_dir = os.path.join(inference_dir, 'text') + text_model_inference_info = f"推理成功:生成目录{inference_dir}" try: check_base_info(text_work_space_dir, text_character) if text_model_inference_voice_dir is None or text_model_inference_voice_dir == '': @@ -76,7 +86,15 @@ def model_inference(text_work_space_dir, text_character, text_model_inference_vo if (text_ref_path is None or text_ref_path == '') and (text_ref_text is None or text_ref_text == '') and ( text_emotion is None or text_emotion == ''): raise Exception(i18n("参考音频路径/文本和角色情绪二选一填写,不能全部为空")) - pass + url_composer = audio_inference.URLComposer(text_url, text_emotion, text_text, text_ref_path, text_ref_text) + url_composer.is_valid() + text_list = common.read_text_file_to_list(text_test_content) + if text_list is None or len(text_list) == 0: + raise Exception(i18n("待推理文本内容不能为空")) + ref_audio_manager = common.RefAudioListManager(text_model_inference_voice_dir) + if len(ref_audio_manager.get_audio_list()) == 0: + raise Exception(i18n("待推理的参考音频不能为空")) + audio_inference.generate_audio_files(url_composer, text_list, ref_audio_manager.get_ref_audio_list(), inference_dir) except Exception as e: text_model_inference_info = f"发生异常:{e}" text_asr_audio_dir = '' @@ -86,8 +104,9 @@ def model_inference(text_work_space_dir, text_character, text_model_inference_vo # 对推理生成音频执行asr def asr(text_work_space_dir, text_character, text_asr_audio_dir, dropdown_asr_model, dropdown_asr_size, dropdown_asr_lang): - text_asr_info = "asr成功:生成目录XXX" - text_text_similarity_analysis_path = "D://tt" + asr_file = os.path.join(text_work_space_dir, 'asr.list') + text_text_similarity_analysis_path = asr_file + text_asr_info = f"asr成功:生成文件asr.list" try: check_base_info(text_work_space_dir, text_character) if text_asr_audio_dir is None or text_asr_audio_dir == '': @@ -108,7 +127,8 @@ def asr(text_work_space_dir, text_character, text_asr_audio_dir, dropdown_asr_mo # 对asr生成的文件,与原本的文本内容,进行相似度分析 def text_similarity_analysis(text_work_space_dir, text_character, text_text_similarity_analysis_path): - text_text_similarity_analysis_info = "相似度分析成功:生成目录XXX" + similarity_file = os.path.join(text_work_space_dir, 'similarity.txt') + text_text_similarity_analysis_info = f"相似度分析成功:生成文件{similarity_file}" try: check_base_info(text_work_space_dir, text_character) if text_text_similarity_analysis_path is None or text_text_similarity_analysis_path == '': @@ -153,7 +173,8 @@ def sync_ref_audio(text_work_space_dir, text_character, text_sync_ref_audio_dir, # 根据模板和参考音频目录,生成参考音频配置内容 def create_config(text_work_space_dir, text_character, text_template, text_sync_ref_audio_dir2): - text_create_config_info = "配置生成成功:生成目录XXX" + config_file = os.path.join(text_work_space_dir, 'refer_audio.json') + text_create_config_info = f"配置生成成功:生成文件{config_file}" try: check_base_info(text_work_space_dir, text_character) if text_template is None or text_template == '': @@ -168,8 +189,12 @@ def create_config(text_work_space_dir, text_character, text_template, text_sync_ # 基于请求路径和参数,合成完整的请求路径 def whole_url(text_url, text_text, text_ref_path, text_ref_text, text_emotion): - text_whole_url = f'{text_url}?{text_text}=文本内容&{text_ref_path}=参考音频路径&{text_ref_text}=参考文本&{text_emotion}=情绪类型' - return [text_whole_url] + url_composer = audio_inference.URLComposer(text_url, text_emotion, text_text, text_ref_path, text_ref_text) + if url_composer.is_emotion(): + text_whole_url = url_composer.build_url_with_emotion('测试内容','情绪类型') + else: + text_whole_url = url_composer.build_url_with_ref('测试内容','参考路径','参考文本') + return text_whole_url with gr.Blocks() as app: diff --git a/Ref_Audio_Selector/tool/audio_inference.py b/Ref_Audio_Selector/tool/audio_inference.py new file mode 100644 index 0000000..2c29122 --- /dev/null +++ b/Ref_Audio_Selector/tool/audio_inference.py @@ -0,0 +1,104 @@ +import os +import requests +import urllib.parse + +class URLComposer: + def __init__(self, base_url, emotion_param_name, text_param_name, ref_path_param_name, ref_text_param_name): + self.base_url = base_url + self.emotion_param_name = emotion_param_name + self.text_param_name = text_param_name + self.ref_path_param_name = ref_path_param_name + self.ref_text_param_name = ref_text_param_name + + + def is_valid(self): + if self.base_url is None or self.base_url == '': + raise ValueError("请输入url") + + if self.text_param_name is None or self.text_param_name == '': + raise ValueError("请输入text参数名") + + if self.emotion_param_name is None and self.ref_path_param_name is None and self.ref_text_param_name is None: + raise ValueError("请输入至少一个参考or情绪的参数") + + def is_emotion(self): + return self.emotion_param_name is not None and self.emotion_param_name != '' + + def build_url_with_emotion(self, text_value, emotion_value): + if not self.emotion_param_name: + raise ValueError("Emotion parameter name is not set.") + params = { + self.text_param_name: urllib.parse.quote(text_value), + self.emotion_param_name: urllib.parse.quote(emotion_value), + } + return self._append_params_to_url(params) + + def build_url_with_ref(self, text_value, ref_path_value, ref_text_value): + if self.emotion_param_name: + raise ValueError("Cannot use reference parameters when emotion parameter is set.") + params = { + self.text_param_name: urllib.parse.quote(text_value), + self.ref_path_param_name: urllib.parse.quote(ref_path_value), + self.ref_text_param_name: urllib.parse.quote(ref_text_value), + } + return self._append_params_to_url(params) + + def _append_params_to_url(self, params: dict): + url_with_params = self.base_url + if params: + query_params = '&'.join([f"{k}={v}" for k, v in params.items()]) + url_with_params += '?' + query_params if '?' not in self.base_url else '&' + query_params + return url_with_params + + +def generate_audio_files(url_composer, text_list, emotion_list, output_dir_path): + + # Ensure the output directory exists + output_dir = Path(output_dir_path) + output_dir.mkdir(parents=True, exist_ok=True) + + # Create subdirectories for text and emotion categories + text_subdir = os.path.join(output_dir, 'text') + text_subdir.mkdir(exist_ok=True) + emotion_subdir = os.path.join(output_dir, 'emotion') + emotion_subdir.mkdir(exist_ok=True) + + for text, emotion in zip(text_list, emotion_list): + # Generate audio byte stream using the create_audio function + + if url_composer.is_emotion(): + real_url = url_composer.build_url_with_emotion(text, emotion['emotion']) + else: + real_url = url_composer.build_url_with_ref(text, emotion['ref_path'], emotion['ref_text']) + + audio_bytes = inference_audio_from_api(real_url) + + emotion_name = emotion['emotion'] + + # Save audio files in both directories with the desired structure + text_file_path = os.path.join(text_subdir, text, emotion_name, '.wav') + emotion_file_path = os.path.join(emotion_subdir, emotion_name, text, '.wav') + + # Ensure intermediate directories for nested file paths exist + text_file_path.parent.mkdir(parents=True, exist_ok=True) + emotion_file_path.parent.mkdir(parents=True, exist_ok=True) + + # Write audio bytes to the respective files + with open(text_file_path, 'wb') as f: + f.write(audio_bytes) + with open(emotion_file_path, 'wb') as f: + f.write(audio_bytes) + + + +def inference_audio_from_api(url): + + # 发起GET请求 + response = requests.get(url, stream=True) + + # 检查响应状态码是否正常(例如200表示成功) + if response.status_code == 200: + # 返回音频数据的字节流 + return response.content + else: + raise Exception(f"Failed to fetch audio from API. Server responded with status code {response.status_code}.") \ No newline at end of file diff --git a/Ref_Audio_Selector/tool/audio_similarity.py b/Ref_Audio_Selector/tool/audio_similarity.py new file mode 100644 index 0000000..9eb7172 --- /dev/null +++ b/Ref_Audio_Selector/tool/audio_similarity.py @@ -0,0 +1,182 @@ +import os +import shutil +from config import python_exec +from subprocess import Popen + +def convert_from_list(list_file, output_dir): + # 创建输出目录,如果它不存在的话 + if not os.path.exists(output_dir): + os.makedirs(output_dir) + + # 解析.list文件,并操作文件 + with open(list_file, 'r', encoding='utf-8') as file: + lines = file.readlines() + + for line in lines: + parts = line.strip().split('|') + if len(parts) != 4: + print(f"Line format incorrect: {line}") + continue + + audio_path, _, _, transcription = parts + + # 构建新的文件名和路径 + new_filename = transcription + '.wav' + # new_filename = new_filename.replace(' ', '_') # 移除空格 + # new_filename = ''.join(e for e in new_filename if e.isalnum() or e in ['_', '.']) # 移除非法字符 + new_path = os.path.join(output_dir, new_filename) + + # 如果目标文件已存在,不要覆盖 + if os.path.exists(new_path): + print(f"File already exists: {new_path}") + continue + + try: + # 检查音频文件是否存在 + if not os.path.exists(audio_path): + print(f"Audio file does not exist: {audio_path}") + continue + + # 复制音频文件到output目录并重命名 + shutil.copy2(audio_path, new_path) + print(f"File copied and renamed to: {new_path}") + except Exception as e: + print(f"An error occurred while processing: {audio_path}") + print(e) + + print("Processing complete.") + + +def sample(output_audio_dir, similarity_list, subsection_num, sample_num): + # 按照相似度分值降序排序相似度列表 + similarity_list.sort(key=lambda x: x['score'], reverse=True) + + # 计算每段的起始索引 + step = len(similarity_list) // subsection_num + if len(similarity_list) % subsection_num != 0: + step += 1 + + # 分段并随机采样 + for i in range(subsection_num): + start = i * step + end = (i + 1) * step + end = min(end, len(similarity_list)) # 防止最后一段越界 + + num = min(sample_num, len(similarity_list[start:end])) + + # 随机采样 + random.shuffle(similarity_list[start:end]) + sampled_subsection = similarity_list[start:start+num] + + # 创建并进入子目录 + subdir_name = f'subsection_{i+1}' + subdir_path = os.path.join(output_audio_dir, subdir_name) + os.makedirs(subdir_path, exist_ok=True) + + # 复制采样结果的音频到子目录 + for item in sampled_subsection: + src_path = item['wav_path'] + dst_path = os.path.join(subdir_path, os.path.basename(src_path)) + shutil.copyfile(src_path, dst_path) + + print("Sampling completed.") + + + +def start_similarity_analysis(work_space_dir, sample_dir, base_voice_path, need_similarity_output): + + similarity_list = None + + similarity_dir = os.path.join(work_space_dir, 'similarity') + os.makedirs(similarity_dir, exist_ok=True) + + base_voice_file_name = ref_audio_opt.get_filename_without_extension(base_voice_path) + similarity_file = os.path.join(similarity_dir, f'{base_voice_file_name}.txt') + + global p_similarity + if(p_similarity==None): + cmd = f'"{python_exec}" tools/speaker_verification/audio_similarity.py ' + cmd += f' -r "{base_voice_path}"' + cmd += f' -c "{sample_dir}"' + cmd += f' -o {similarity_file}' + + print(cmd) + p_similarity = Popen(cmd, shell=True) + p_similarity.wait() + + if need_similarity_output: + similarity_list = ref_audio_opt.parse_similarity_file(similarity_file) + similarity_file_dir = os.path.dirname(similarity_dir, base_voice_file_name) + ref_audio_opt.copy_and_move(similarity_file_dir, similarity_list) + + p_similarity=None + return similarity_list + else: + return similarity_list + + +def parse_similarity_file(file_path): + """ + 解析指定文本文件,将其中的内容以元组形式存入列表。 + + 参数: + file_path (str): 待解析的文本文件路径。 + + 返回: + list[tuple[float, str]]: 存储浮点数和路径的元组列表。 + """ + result_list = [] + + with open(file_path, 'r') as file: + for line in file: + # 去除行尾换行符并按'|'分割 + score, filepath = line.strip().split('|') + + # 将浮点数字符串转换为浮点数类型 + score = float(score) + + # 将得分和路径作为元组添加到结果列表 + result_list.append({ + 'score': score, + 'wav_path': filepath + }) + + return result_list + + +def copy_and_move(output_audio_directory, similarity_scores): + + # 确保新目录存在 + if not os.path.exists(output_audio_directory): + os.makedirs(output_audio_directory) + + # 遍历并复制文件 + for item in similarity_scores: + # 构造新的文件名 + base_name = os.path.basename(item['wav_path'])[:-4] # 去掉.wav扩展名 + new_name = f"{item['score']}-{base_name}.wav" + + # 新文件的完整路径 + new_path = os.path.join(output_audio_directory, new_name) + + # 复制文件到新目录 + shutil.copyfile(item['wav_path'], new_path) + + print("已完成复制和重命名操作。") + + +def get_filename_without_extension(file_path): + """ + Given a file path string, returns the file name without its extension. + + Parameters: + file_path (str): The full path to the file. + + Returns: + str: The file name without its extension. + """ + base_name = os.path.basename(file_path) # Get the base name (file name with extension) + file_name, file_extension = os.path.splitext(base_name) # Split the base name into file name and extension + return file_name # Return the file name without extension + + diff --git a/Ref_Audio_Selector/tool/ref_audio_opt.py b/Ref_Audio_Selector/tool/ref_audio_opt.py deleted file mode 100644 index fb7bd7e..0000000 --- a/Ref_Audio_Selector/tool/ref_audio_opt.py +++ /dev/null @@ -1,46 +0,0 @@ -import os -import shutil - - -def convert_from_list(list_file, output_dir): - # 创建输出目录,如果它不存在的话 - if not os.path.exists(output_dir): - os.makedirs(output_dir) - - # 解析.list文件,并操作文件 - with open(list_file, 'r', encoding='utf-8') as file: - lines = file.readlines() - - for line in lines: - parts = line.strip().split('|') - if len(parts) != 4: - print(f"Line format incorrect: {line}") - continue - - audio_path, _, _, transcription = parts - - # 构建新的文件名和路径 - new_filename = transcription + '.wav' - # new_filename = new_filename.replace(' ', '_') # 移除空格 - # new_filename = ''.join(e for e in new_filename if e.isalnum() or e in ['_', '.']) # 移除非法字符 - new_path = os.path.join(output_dir, new_filename) - - # 如果目标文件已存在,不要覆盖 - if os.path.exists(new_path): - print(f"File already exists: {new_path}") - continue - - try: - # 检查音频文件是否存在 - if not os.path.exists(audio_path): - print(f"Audio file does not exist: {audio_path}") - continue - - # 复制音频文件到output目录并重命名 - shutil.copy2(audio_path, new_path) - print(f"File copied and renamed to: {new_path}") - except Exception as e: - print(f"An error occurred while processing: {audio_path}") - print(e) - - print("Processing complete.") diff --git a/tools/speaker_verification/__init__.py b/tools/speaker_verification/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tools/speaker_verification/audio_similarity.py b/tools/speaker_verification/audio_similarity.py new file mode 100644 index 0000000..c1f415e --- /dev/null +++ b/tools/speaker_verification/audio_similarity.py @@ -0,0 +1,64 @@ +import argparse +import os + +from modelscope.pipelines import pipeline +sv_pipeline = pipeline( + task='speaker-verification', + model='/tools/speaker_verification/models/speech_campplus_sv_zh-cn_16k-common', + model_revision='v1.0.0' +) + + +def compare_audio_and_generate_report(reference_audio_path, comparison_dir_path, output_file_path): + # Step 1: 获取比较音频目录下所有音频文件的路径 + comparison_audio_paths = [os.path.join(comparison_dir_path, f) for f in os.listdir(comparison_dir_path) if f.endswith('.wav')] + + # Step 2: 用参考音频依次比较音频目录下的每个音频,获取相似度分数及对应路径 + similarity_scores = [] + for audio_path in comparison_audio_paths: + score = sv_pipeline([reference_audio_path, audio_path])['score'] + similarity_scores.append({ + 'score': score, + 'path': audio_path + }) + + # Step 3: 根据相似度分数降序排列 + similarity_scores.sort(key=lambda x: x['score'], reverse=True) + + # Step 4: 处理输出文件不存在的情况,创建新文件 + if not os.path.exists(output_file_path): + open(output_file_path, 'w').close() # Create an empty file + + # Step 5: 将排序后的结果写入输出结果文件(支持中文) + formatted_scores = [f'{item["score"]}|{item["path"]}' for item in similarity_scores] + with open(output_file_path, 'w', encoding='utf-8') as f: + # 使用'\n'将每个字符串分开,使其写入不同行 + content = '\n'.join(formatted_scores ) + f.write(content) + + +def parse_arguments(): + parser = argparse.ArgumentParser(description="Audio processing script arguments") + + # Reference audio path + parser.add_argument("-r", "--reference_audio", type=str, required=True, + help="Path to the reference WAV file.") + + # Comparison directory path + parser.add_argument("-c", "--comparison_dir", type=str, required=True, + help="Path to the directory containing comparison WAV files.") + + # Output file path + parser.add_argument("-o", "--output_file", type=str, required=True, + help="Path to the output file where results will be written.") + + return parser.parse_args() + + +if __name__ == '__main__': + cmd = parse_arguments() + compare_audio_and_generate_report( + reference_audio_path = cmd.reference_audio, + comparison_dir = cmd.comparison_dir, + output_file = cmd.output_file, + ) \ No newline at end of file