Merge 50a88a596dea718c83e535136e9cb46b513cef6f into 9da7e17efe05041e31d3c3f42c8730ae890397f2

This commit is contained in:
FengQingYunDan 2025-04-01 19:05:56 +08:00 committed by GitHub
commit 2e0881e4c7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
34 changed files with 3050 additions and 0 deletions

View File

@ -398,4 +398,5 @@ arpa = {
symbols = [pad] + c + v + ja_symbols + pu_symbols + list(arpa)
symbols = sorted(set(symbols))
if __name__ == "__main__":
print(symbols)
print(len(symbols))

View File

View File

View File

@ -0,0 +1,156 @@
from tools import my_utils
from config import python_exec, is_half
import subprocess
import sys
import os
class RefAudioListManager:
def __init__(self, root_dir):
self.audio_dict = {'default': []}
absolute_root = os.path.abspath(root_dir)
for subdir, dirs, files in os.walk(absolute_root):
relative_path = os.path.relpath(subdir, absolute_root)
if relative_path == '.':
category = 'default'
else:
category = relative_path.replace(os.sep, '')
for file in files:
if file.endswith('.wav'):
# 将相对路径转换为绝对路径
audio_abs_path = os.path.join(subdir, file)
if category not in self.audio_dict:
self.audio_dict[category] = []
self.audio_dict[category].append(audio_abs_path)
def get_audio_list(self):
return self.audio_dict
def get_flattened_audio_list(self):
all_audio_files = []
for category_audios in self.audio_dict.values():
all_audio_files.extend(category_audios)
return all_audio_files
def get_ref_audio_list(self):
audio_info_list = []
for category, audio_paths in self.audio_dict.items():
for audio_path in audio_paths:
filename_without_extension = os.path.splitext(os.path.basename(audio_path))[0]
audio_info = {
'emotion': f"{category}-{filename_without_extension}",
'ref_path': audio_path,
'ref_text': filename_without_extension,
}
audio_info_list.append(audio_info)
return audio_info_list
def batch_clean_paths(paths):
"""
批量处理路径列表对每个路径调用 clean_path() 函数
参数:
paths (list[str]): 包含待处理路径的列表
返回:
list[str]: 经过 clean_path() 处理后的路径列表
"""
cleaned_paths = []
for path in paths:
cleaned_paths.append(my_utils.clean_path(path))
return cleaned_paths
def read_text_file_to_list(file_path):
# 按照UTF-8编码打开文件确保能够正确读取中文
with open(file_path, mode='r', encoding='utf-8') as file:
# 读取所有行并存储到一个列表中
lines = file.read().splitlines()
return lines
def get_filename_without_extension(file_path):
"""
Given a file path string, returns the file name without its extension.
Parameters:
file_path (str): The full path to the file.
Returns:
str: The file name without its extension.
"""
base_name = os.path.basename(file_path) # Get the base name (file name with extension)
file_name, file_extension = os.path.splitext(base_name) # Split the base name into file name and extension
return file_name # Return the file name without extension
def read_file(file_path):
# 使用with语句打开并读取文件
with open(file_path, 'r', encoding='utf-8') as file: # 'r' 表示以读取模式打开文件
# 一次性读取文件所有内容
file_content = file.read()
# 文件在with语句结束时会自动关闭
# 现在file_content变量中存储了文件的所有文本内容
return file_content
def write_text_to_file(text, output_file_path):
try:
with open(output_file_path, 'w', encoding='utf-8') as file:
file.write(text)
except IOError as e:
print(f"Error occurred while writing to the file: {e}")
else:
print(f"Text successfully written to file: {output_file_path}")
def check_path_existence_and_return(path):
"""
检查给定路径文件或目录是否存在如果存在返回该路径否则返回空字符串
:param path: 待检查的文件或目录路径字符串
:return: 如果路径存在返回原路径否则返回空字符串
"""
if os.path.exists(path):
return path
else:
return ""
def open_file(filepath):
if sys.platform.startswith('darwin'):
subprocess.run(['open', filepath]) # macOS
elif os.name == 'nt': # For Windows
os.startfile(filepath)
elif os.name == 'posix': # For Linux, Unix, etc.
subprocess.run(['xdg-open', filepath])
def start_new_service(script_path):
# 对于Windows系统
if sys.platform.startswith('win'):
cmd = f'start cmd /k {python_exec} {script_path}'
# 对于Mac或者Linux系统
else:
cmd = f'xterm -e {python_exec} {script_path}'
proc = subprocess.Popen(cmd, shell=True)
# 关闭之前启动的子进程
# proc.terminate()
# 或者如果需要强制关闭可以使用
# proc.kill()
return proc
if __name__ == '__main__':
dir = r'C:\Users\Administrator\Desktop/test'
dir2 = r'"C:\Users\Administrator\Desktop\test2"'
dir, dir2 = batch_clean_paths([dir, dir2])
print(dir, dir2)

View File

@ -0,0 +1,46 @@
import os
import re
pretrained_sovits_name = "GPT_SoVITS/pretrained_models/s2G488k.pth"
pretrained_gpt_name = "GPT_SoVITS/pretrained_models/s1bert25hz-2kh-longer-epoch=68e-step=50232.ckpt"
SoVITS_weight_root = "SoVITS_weights"
GPT_weight_root = "GPT_weights"
os.makedirs(SoVITS_weight_root, exist_ok=True)
os.makedirs(GPT_weight_root, exist_ok=True)
speaker_verification_models = {
'speech_campplus_sv_zh-cn_16k-common': {
'task': 'speaker-verification',
'model': 'Ref_Audio_Selector/tool/speaker_verification/models/speech_campplus_sv_zh-cn_16k-common',
'model_revision': 'v1.0.0'
},
'speech_eres2net_sv_zh-cn_16k-common': {
'task': 'speaker-verification',
'model': 'Ref_Audio_Selector/tool/speaker_verification/models/speech_eres2net_sv_zh-cn_16k-common',
'model_revision': 'v1.0.5'
}
}
def custom_sort_key(s):
# 使用正则表达式提取字符串中的数字部分和非数字部分
parts = re.split('(\d+)', s)
# 将数字部分转换为整数,非数字部分保持不变
parts = [int(part) if part.isdigit() else part for part in parts]
return parts
def get_gpt_model_names():
gpt_names = [pretrained_gpt_name]
for name in os.listdir(GPT_weight_root):
if name.endswith(".ckpt"): gpt_names.append("%s/%s" % (GPT_weight_root, name))
sorted(gpt_names, key=custom_sort_key)
return gpt_names
def get_sovits_model_names():
sovits_names = [pretrained_sovits_name]
for name in os.listdir(SoVITS_weight_root):
if name.endswith(".pth"): sovits_names.append("%s/%s" % (SoVITS_weight_root, name))
sorted(sovits_names, key=custom_sort_key)
return sovits_names

View File

@ -0,0 +1,72 @@
import time
import os
from Ref_Audio_Selector.config_param.log_config import p_logger
import Ref_Audio_Selector.config_param.config_params as params
def timeit_decorator(func):
"""
装饰器用于计算被装饰函数的执行时间
参数:
func (function): 要计时的函数
返回:
function: 包含计时功能的新函数
"""
def wrapper(*args, **kwargs):
if params.time_log_print_type != 'file':
return func(*args, **kwargs)
start_time = time.perf_counter() # 使用 perf_counter 获取高精度计时起点
func_result = func(*args, **kwargs) # 执行原函数
end_time = time.perf_counter() # 获取计时终点
elapsed_time = end_time - start_time # 计算执行耗时
# 记录日志内容
log_message = f"进程ID: {os.getpid()}, {func.__name__} 执行耗时: {elapsed_time:.6f}"
p_logger.info(log_message)
return func_result
return wrapper
def time_monitor(func):
"""
返回结果追加时间
"""
def wrapper(*args, **kwargs):
start_time = time.perf_counter() # 使用 perf_counter 获取高精度计时起点
func_result = func(*args, **kwargs) # 执行原函数
end_time = time.perf_counter() # 获取计时终点
elapsed_time = end_time - start_time # 计算执行耗时
return elapsed_time, func_result
return wrapper
# 使用装饰器
@timeit_decorator
def example_function(n):
time.sleep(n) # 假设这是需要计时的函数,这里模拟耗时操作
return n * 2
def example_function2(n):
time.sleep(n) # 假设这是需要计时的函数,这里模拟耗时操作
return n * 2
if __name__ == "__main__":
# 调用经过装饰的函数
# result = example_function(2)
print(time_monitor(example_function2)(2))

View File

@ -0,0 +1,57 @@
# config.ini
[Base]
# 服务端口号
server_port = 9423
# 参考音频目录
reference_audio_dir = refer_audio
# 临时文件目录
temp_dir = Ref_Audio_Selector/temp
[Log]
# 日志保存目录路径
log_dir = Ref_Audio_Selector/log/general
# 日志级别 CRITICAL、FATAL、ERROR、WARNING、WARN、INFO、DEBUG、NOTSET、
log_level = INFO
# 函数时间消耗日志打印类型 file 打印到文件; close 关闭
time_log_print_type = file
# 函数时间消耗日志保存目录路径
time_log_print_dir = Ref_Audio_Selector/log/performance
[AudioSample]
# list转换待选参考音频目录
list_to_convert_reference_audio_dir = refer_audio_all
# 音频相似度目录
audio_similarity_dir = similarity
# 是否开启基准音频预采样 true false
enable_pre_sample = true
[Inference]
# 默认测试文本位置
default_test_text_path = Ref_Audio_Selector/file/test_content/test_content.txt
# 推理音频目录
inference_audio_dir = inference_audio
# 推理音频文本聚合目录
inference_audio_text_aggregation_dir = text
# 推理音频情绪聚合目录
inference_audio_emotion_aggregation_dir = emotion
[ResultCheck]
# asr输出文件
asr_filename = asr
# 文本相似度输出目录
text_similarity_output_dir = text_similarity
# 文本情绪平均相似度报告文件名
text_emotion_average_similarity_report_filename = average_similarity
# 文本相似度按情绪聚合明细文件名
text_similarity_by_emotion_detail_filename = emotion_group_detail
# 文本相似度按文本聚合明细文件名
text_similarity_by_text_detail_filename = text_group_detail
[AudioConfig]
# 默认模板文件位置
default_template_path = Ref_Audio_Selector/file/config_template/ref_audio_template.txt
# 参考音频配置文件名
reference_audio_config_filename = refer_audio
[Other]

View File

@ -0,0 +1,111 @@
import configparser
import os
import Ref_Audio_Selector.common.common as common
class ParamReadWriteManager:
def __init__(self):
self.base_dir = 'Ref_Audio_Selector/file/base_info'
os.makedirs(self.base_dir, exist_ok=True)
# 基础信息
self.work_dir = 'work_dir'
self.role = 'role'
# 第一步
self.subsection_num = 'subsection_num'
self.sample_num = 'sample_num'
# 第二步
self.api_set_model_base_url = 'api_set_model_base_url'
self.api_gpt_param = 'api_gpt_param'
self.api_sovits_param = 'api_sovits_param'
self.api_v2_set_gpt_model_base_url = 'api_v2_set_gpt_model_base_url'
self.api_v2_gpt_model_param = 'api_v2_gpt_model_param'
self.api_v2_set_sovits_model_base_url = 'api_v2_set_sovits_model_base_url'
self.api_v2_sovits_model_param = 'api_v2_sovits_model_param'
self.text_url = 'text_url'
self.text_param = 'text_param'
self.refer_type_param = 'refer_type_param'
self.ref_path_param = 'ref_path_param'
self.ref_text_param = 'ref_text_param'
self.emotion_param = 'emotion_param'
self.test_content_path = 'test_content_path'
self.request_concurrency_num = 'request_concurrency_num'
# 第三步
self.text_similarity_amplification_boundary = 'text_similarity_amplification_boundary'
# 第四步
# 第五步
self.text_template = 'text_template'
def read(self, key):
file_path = os.path.join(self.base_dir, key + '.txt')
if os.path.exists(file_path):
content = common.read_file(file_path)
return content.strip()
else:
return ''
def write(self, key, content):
file_path = os.path.join(self.base_dir, key + '.txt')
# 确保内容是字符串类型,如果不是,转换为字符串
if not isinstance(content, str):
clean_content = str(content).strip() # 转换为字符串并移除首尾空白
else:
clean_content = content.strip()
common.write_text_to_file(clean_content, file_path)
class ConfigManager:
def __init__(self):
self.config_path = 'Ref_Audio_Selector/config.ini'
self.config = configparser.ConfigParser()
self.config.read(self.config_path, encoding='utf-8')
def get_base(self, key):
return self.config.get('Base', key)
def get_log(self, key):
return self.config.get('Log', key)
def get_audio_sample(self, key):
return self.config.get('AudioSample', key)
def get_inference(self, key):
return self.config.get('Inference', key)
def get_result_check(self, key):
return self.config.get('ResultCheck', key)
def get_audio_config(self, key):
return self.config.get('AudioConfig', key)
def get_other(self, key):
return self.config.get('Other', key)
def print(self):
# 打印所有配置
for section in self.config.sections():
print('[{}]'.format(section))
for key in self.config[section]:
print('{} = {}'.format(key, self.config[section][key]))
print()
_config = ConfigManager()
_param_read_write_manager = ParamReadWriteManager()
def get_config():
return _config
def get_rw_param():
return _param_read_write_manager
if __name__ == '__main__':
print(_config.print())

View File

@ -0,0 +1,58 @@
import Ref_Audio_Selector.config_param.config_manager as config_manager
config = config_manager.get_config()
# [Base]
# 服务端口号
server_port = int(config.get_base('server_port'))
# 参考音频目录
reference_audio_dir = config.get_base('reference_audio_dir')
# 临时文件目录
temp_dir = config.get_base('temp_dir')
# [Log]
# 日志保存目录路径
log_dir = config.get_log('log_dir')
# 日志级别 CRITICAL、FATAL、ERROR、WARNING、WARN、INFO、DEBUG、NOTSET、
log_level = config.get_log('log_level')
# 函数时间消耗日志打印类型 file 打印到文件; close 关闭
time_log_print_type = config.get_log('time_log_print_type')
# 函数时间消耗日志保存目录路径
time_log_print_dir = config.get_log('time_log_print_dir')
# [AudioSample]
# list转换待选参考音频目录
list_to_convert_reference_audio_dir = config.get_audio_sample('list_to_convert_reference_audio_dir')
# 音频相似度目录
audio_similarity_dir = config.get_audio_sample('audio_similarity_dir')
# 是否开启基准音频预采样 true false
enable_pre_sample = config.get_audio_sample('enable_pre_sample')
# [Inference]
# 默认测试文本位置
default_test_text_path = config.get_inference('default_test_text_path')
# 推理音频目录
inference_audio_dir = config.get_inference('inference_audio_dir')
# 推理音频文本聚合目录
inference_audio_text_aggregation_dir = config.get_inference('inference_audio_text_aggregation_dir')
# 推理音频情绪聚合目录
inference_audio_emotion_aggregation_dir = config.get_inference('inference_audio_emotion_aggregation_dir')
# [ResultCheck]
# asr输出文件
asr_filename = config.get_result_check('asr_filename')
# 文本相似度输出目录
text_similarity_output_dir = config.get_result_check('text_similarity_output_dir')
# 文本情绪平均相似度报告文件名
text_emotion_average_similarity_report_filename = config.get_result_check('text_emotion_average_similarity_report_filename')
# 文本相似度按情绪聚合明细文件名
text_similarity_by_emotion_detail_filename = config.get_result_check('text_similarity_by_emotion_detail_filename')
# 文本相似度按文本聚合明细文件名
text_similarity_by_text_detail_filename = config.get_result_check('text_similarity_by_text_detail_filename')
# [AudioConfig]
# 默认模板文件位置
default_template_path = config.get_audio_config('default_template_path')
# 参考音频配置文件名
reference_audio_config_filename = config.get_audio_config('reference_audio_config_filename')

View File

@ -0,0 +1,65 @@
import logging
import os
import datetime
import Ref_Audio_Selector.config_param.config_params as params
def create_general_logger():
# 获取当前日期,用于文件名和日志内容
current_date = datetime.datetime.now().strftime('%Y-%m-%d')
# 创建一个用于控制台输出的处理器,并设置日志级别
console_handler = logging.StreamHandler()
# console_handler.setLevel(logging.INFO)
# 可以设置控制台输出的格式
console_formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
console_handler.setFormatter(console_formatter)
console_handler.encoding = 'utf-8' # 设置字符编码为utf-8
os.makedirs(params.log_dir, exist_ok=True)
# 创建一个用于常规日志的处理器
general_handler = logging.FileHandler(f"{params.log_dir}/{current_date}.log", mode='a', encoding='utf-8')
# general_handler.setLevel(logging.INFO)
general_formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
general_handler.setFormatter(general_formatter)
# 配置一个常规的logger
general_logger = logging.getLogger('general')
level = logging.getLevelName(params.log_level)
general_logger.setLevel(level)
general_logger.addHandler(console_handler)
general_logger.addHandler(general_handler)
# 配置根logger以防万一
logging.basicConfig(level=logging.WARNING, handlers=[general_handler])
return general_logger
def create_performance_logger():
# 获取当前日期,用于文件名和日志内容
current_date = datetime.datetime.now().strftime('%Y-%m-%d')
os.makedirs(params.time_log_print_dir, exist_ok=True)
# 创建一个专用于性能监控日志的处理器
performance_handler = logging.FileHandler(
f"{params.time_log_print_dir}/{current_date}.log", mode='a', encoding='utf-8')
# performance_handler.setLevel(logging.INFO)
performance_formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
performance_handler.setFormatter(performance_formatter)
# 配置一个专门用于性能监控的logger
performance_logger = logging.getLogger('performance')
performance_logger.setLevel(logging.INFO)
performance_logger.addHandler(performance_handler)
return performance_logger
def setup_logging():
return create_general_logger(), create_performance_logger()
logger, p_logger = setup_logging()

View File

@ -0,0 +1,5 @@
"${emotion}": {
"ref_wav_path": "${ref_path}",
"prompt_text": "${ref_text}",
"prompt_language": "中文"
}

View File

@ -0,0 +1,4 @@
也是只有一次。”白蓉简单地回答,然后迅速转移话锋,搂住罗辑的脖子说,“算了,我不要那生日礼物了,你也回到正常的生活中来,好吗?”
云天明看到那是一条丑陋的虫子,软乎乎湿漉漉的,在她白皙的手指间蠕动着,旁边一个女生尖叫道:恶心死了,你碰它干吗?!程心把虫子轻轻放到旁边的草丛中,说,它在这里会给踩死的。
“那么多的星星,像雾似的。”云天明感叹道。程心把目光从银河收回,转头看着他,指着下面的校园和城市说:“你看下面也很漂亮啊,我们的生活是在这儿,可不是在那么远的银河里。”
“可我们的专业,不就是为了到地球之外去吗?”“那是为了这里的生活更好,可不是为了逃离地球啊。”云天明当然知道程心的话是委婉地指向他的孤僻和自闭,他也只有默然以对。

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,5 @@
CHCP 65001
@echo off
cd ../
runtime\python.exe ./Ref_Audio_Selector/ref_audio_selector_webui.py
pause

View File

View File

View File

@ -0,0 +1,120 @@
import argparse
import os
import traceback
import Ref_Audio_Selector.config_param.config_params as params
os.environ["HF_ENDPOINT"] = "https://hf-mirror.com"
os.environ["KMP_DUPLICATE_LIB_OK"] = "TRUE"
import torch
from faster_whisper import WhisperModel
from tqdm import tqdm
from tools.asr.config import check_fw_local_models
from Ref_Audio_Selector.config_param.log_config import logger
language_code_list = [
"af", "am", "ar", "as", "az",
"ba", "be", "bg", "bn", "bo",
"br", "bs", "ca", "cs", "cy",
"da", "de", "el", "en", "es",
"et", "eu", "fa", "fi", "fo",
"fr", "gl", "gu", "ha", "haw",
"he", "hi", "hr", "ht", "hu",
"hy", "id", "is", "it", "ja",
"jw", "ka", "kk", "km", "kn",
"ko", "la", "lb", "ln", "lo",
"lt", "lv", "mg", "mi", "mk",
"ml", "mn", "mr", "ms", "mt",
"my", "ne", "nl", "nn", "no",
"oc", "pa", "pl", "ps", "pt",
"ro", "ru", "sa", "sd", "si",
"sk", "sl", "sn", "so", "sq",
"sr", "su", "sv", "sw", "ta",
"te", "tg", "th", "tk", "tl",
"tr", "tt", "uk", "ur", "uz",
"vi", "yi", "yo", "zh", "yue",
"auto"]
def execute_asr_multi_level_dir(input_folder, output_folder, model_size, language, precision):
if '-local' in model_size:
model_size = model_size[:-6]
model_path = f'tools/asr/models/faster-whisper-{model_size}'
else:
model_path = model_size
if language == 'auto':
language = None # 不设置语种由模型自动输出概率最高的语种
logger.info("loading faster whisper model:", model_size, model_path)
device = 'cuda' if torch.cuda.is_available() else 'cpu'
try:
model = WhisperModel(model_path, device=device, compute_type=precision)
except:
return logger.error(traceback.format_exc())
output = []
# 递归遍历输入目录及所有子目录
for root, dirs, files in os.walk(input_folder):
for file_name in sorted(files):
# 只处理wav文件假设是wav文件
if file_name.endswith(".wav"):
try:
file_path = os.path.join(root, file_name)
original_text = os.path.basename(root)
segments, info = model.transcribe(
audio=file_path,
beam_size=5,
vad_filter=True,
vad_parameters=dict(min_silence_duration_ms=700),
language=language)
text = ''
if info.language == "zh":
logger.info("检测为中文文本, 转 FunASR 处理")
if ("only_asr" not in globals()):
from Ref_Audio_Selector.tool.asr.funasr_asr_multi_level_dir import \
only_asr # #如果用英文就不需要导入下载模型
text = only_asr(file_path)
if text == '':
for segment in segments:
text += segment.text
output.append(f"{file_path}|{original_text}|{info.language.upper()}|{text}")
print(f"{file_path}|{original_text}|{info.language.upper()}|{text}")
except:
return logger.error(traceback.format_exc())
output_folder = output_folder
os.makedirs(output_folder, exist_ok=True)
output_file_path = os.path.abspath(f'{output_folder}/{params.asr_filename}.list')
with open(output_file_path, "w", encoding="utf-8") as f:
f.write("\n".join(output))
logger.info(f"ASR 任务完成->标注文件路径: {output_file_path}\n")
return output_file_path
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument("-i", "--input_folder", type=str, required=True,
help="Path to the folder containing WAV files.")
parser.add_argument("-o", "--output_folder", type=str, required=True,
help="Output folder to store transcriptions.")
parser.add_argument("-s", "--model_size", type=str, default='large-v3',
choices=check_fw_local_models(),
help="Model Size of Faster Whisper")
parser.add_argument("-l", "--language", type=str, default='ja',
choices=language_code_list,
help="Language of the audio files.")
parser.add_argument("-p", "--precision", type=str, default='float16', choices=['float16', 'float32'],
help="fp16 or fp32")
cmd = parser.parse_args()
output_file_path = execute_asr_multi_level_dir(
input_folder=cmd.input_folder,
output_folder=cmd.output_folder,
model_size=cmd.model_size,
language=cmd.language,
precision=cmd.precision,
)

View File

@ -0,0 +1,94 @@
# -*- coding:utf-8 -*-
import argparse
import os
import traceback
import Ref_Audio_Selector.config_param.config_params as params
from Ref_Audio_Selector.config_param.log_config import logger
from Ref_Audio_Selector.common.time_util import timeit_decorator
from tqdm import tqdm
from funasr import AutoModel
path_asr = 'tools/asr/models/speech_paraformer-large_asr_nat-zh-cn-16k-common-vocab8404-pytorch'
path_vad = 'tools/asr/models/speech_fsmn_vad_zh-cn-16k-common-pytorch'
path_punc = 'tools/asr/models/punc_ct-transformer_zh-cn-common-vocab272727-pytorch'
path_asr = path_asr if os.path.exists(
path_asr) else "iic/speech_paraformer-large_asr_nat-zh-cn-16k-common-vocab8404-pytorch"
path_vad = path_vad if os.path.exists(path_vad) else "iic/speech_fsmn_vad_zh-cn-16k-common-pytorch"
path_punc = path_punc if os.path.exists(path_punc) else "iic/punc_ct-transformer_zh-cn-common-vocab272727-pytorch"
model = AutoModel(
model=path_asr,
model_revision="v2.0.4",
vad_model=path_vad,
vad_model_revision="v2.0.4",
punc_model=path_punc,
punc_model_revision="v2.0.4",
)
def only_asr(input_file):
try:
text = model.generate(input=input_file)[0]["text"]
except:
text = ''
logger.error(traceback.format_exc())
return text
@timeit_decorator
def execute_asr_multi_level_dir(input_folder, output_folder, model_size, language):
output = []
# 递归遍历输入目录及所有子目录
for root, dirs, files in os.walk(input_folder):
for name in sorted(files):
# 只处理wav文件假设是wav文件
if name.endswith(".wav"):
try:
original_text = os.path.basename(root)
# 构造完整的输入音频文件路径
input_file_path = os.path.join(root, name)
input_file_path = os.path.normpath(input_file_path) # 先标准化可能存在混合斜杠的情况
asr_text = model.generate(input=input_file_path)[0]["text"]
output.append(f"{input_file_path}|{original_text}|{language.upper()}|{asr_text}")
except:
logger.error(traceback.format_exc())
# 创建或打开指定的输出目录
output_folder = output_folder
output_dir_abs = os.path.abspath(output_folder)
os.makedirs(output_dir_abs, exist_ok=True)
# 构造输出文件路径
output_file_path = os.path.join(output_dir_abs, f'{params.asr_filename}.list')
# 将输出写入文件
with open(output_file_path, "w", encoding="utf-8") as f:
f.write("\n".join(output))
logger.info(f"ASR 任务完成->标注文件路径: {output_file_path}\n")
return output_file_path
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument("-i", "--input_folder", type=str, required=True,
help="Path to the folder containing WAV files.")
parser.add_argument("-o", "--output_folder", type=str, required=True,
help="Output folder to store transcriptions.")
parser.add_argument("-s", "--model_size", type=str, default='large',
help="Model Size of FunASR is Large")
parser.add_argument("-l", "--language", type=str, default='zh', choices=['zh'],
help="Language of the audio files.")
parser.add_argument("-p", "--precision", type=str, default='float16', choices=['float16', 'float32'],
help="fp16 or fp32") # 还没接入
cmd = parser.parse_args()
execute_asr_multi_level_dir(
input_folder=cmd.input_folder,
output_folder=cmd.output_folder,
model_size=cmd.model_size,
language=cmd.language,
)

View File

@ -0,0 +1,54 @@
import os
import shutil
import Ref_Audio_Selector.common.common as common
import Ref_Audio_Selector.config_param.config_params as params
from Ref_Audio_Selector.config_param.log_config import logger
def remove_matching_audio_files_in_text_dir(text_dir, emotions_list):
count = 0
emotions = [item['emotion'] for item in emotions_list]
for root, dirs, files in os.walk(text_dir):
for file in files:
if file.endswith(".wav"):
emotion_tag = os.path.basename(file)[:-4]
if emotion_tag not in emotions:
file_path = os.path.join(root, file)
logger.info(f"Deleting file: {file_path}")
try:
os.remove(file_path)
count += 1
except Exception as e:
logger.error(f"Error deleting file {file_path}: {e}")
return count
def delete_emotion_subdirectories(emotion_dir, emotions_list):
count = 0
emotions = [item['emotion'] for item in emotions_list]
for entry in os.listdir(emotion_dir):
entry_path = os.path.join(emotion_dir, entry)
if os.path.isdir(entry_path):
if entry not in emotions:
logger.info(f"Deleting directory: {entry_path}")
try:
# 使用shutil.rmtree删除整个子目录及其内容
shutil.rmtree(entry_path)
count += 1
except Exception as e:
logger.error(f"Error deleting directory {entry_path}: {e}")
return count
def sync_ref_audio(ref_audio_dir, inference_audio_dir):
ref_audio_manager = common.RefAudioListManager(ref_audio_dir)
ref_list = ref_audio_manager.get_ref_audio_list()
text_dir = os.path.join(inference_audio_dir, params.inference_audio_text_aggregation_dir)
emotion_dir = os.path.join(inference_audio_dir, params.inference_audio_emotion_aggregation_dir)
delete_text_wav_num = remove_matching_audio_files_in_text_dir(text_dir, ref_list)
delete_emotion_dir_num = delete_emotion_subdirectories(emotion_dir, ref_list)
return delete_text_wav_num, delete_emotion_dir_num

View File

@ -0,0 +1,31 @@
import os
import platform
def generate_audio_config(work_space_dir, template_str, audio_list, output_file_path):
# 定义一个空字符串来存储最终要写入文件的内容
file_content = ""
# 遍历参考音频列表
for audio_info in audio_list:
emotion = audio_info['emotion']
ref_path = audio_info['ref_path']
ref_text = audio_info['ref_text']
relative_path = os.path.relpath(ref_path, work_space_dir)
if platform.system() == 'Windows':
relative_path = relative_path.replace('\\', '/')
# 使用字符串模板替换变量
formatted_line = template_str.replace('${emotion}', emotion).replace('${ref_path}', relative_path).replace(
'${ref_text}', ref_text)
# 将格式化后的行添加到内容中,使用逗号和换行符分隔
file_content += formatted_line + ",\n"
# 删除最后一个逗号和换行符,确保格式整洁
file_content = file_content[:-2]
# 将内容写入输出文件
with open(output_file_path, 'w', encoding='utf-8') as output_file:
output_file.write(file_content)

View File

@ -0,0 +1,238 @@
import time
import os
import requests
import itertools
import multiprocessing
from concurrent.futures import ProcessPoolExecutor
import numpy as np
import Ref_Audio_Selector.config_param.config_params as params
from urllib.parse import urlparse, parse_qs, urlencode, urlunparse, quote
from Ref_Audio_Selector.config_param.log_config import logger, p_logger
class SetModelURLComposer:
def __init__(self, type, base_url, gpt_param_name, sovits_param_name):
self.type = type
self.base_url = base_url
self.gpt_param_name = gpt_param_name
self.sovits_param_name = sovits_param_name
def is_valid(self):
if self.base_url is None or self.base_url == '':
raise Exception("请求地址不能为空")
if self.type in ['gpt', 'all']:
if self.gpt_param_name is None or self.gpt_param_name == '':
raise Exception("GPT参数名不能为空")
if self.type in ['sovits', 'all']:
if self.sovits_param_name is None or self.sovits_param_name == '':
raise Exception("Sovits参数名不能为空")
def build_get_url(self, value_array, need_url_encode=True):
params = {}
if self.type == 'gpt':
params[self.gpt_param_name] = value_array[0]
if self.type == 'sovits':
params[self.sovits_param_name] = value_array[0]
if self.type == 'all':
params[self.gpt_param_name] = value_array[0]
params[self.sovits_param_name] = value_array[1]
return append_params_to_url(self.base_url, params, need_url_encode)
def build_post_url(self, value_array, need_url_encode=True):
url = append_params_to_url(self.base_url, {}, need_url_encode)
params = {}
if self.type == 'gpt':
params[self.gpt_param_name] = value_array[0]
if self.type == 'sovits':
params[self.sovits_param_name] = value_array[0]
if self.type == 'all':
params[self.gpt_param_name] = value_array[0]
params[self.sovits_param_name] = value_array[1]
return url, params
class TTSURLComposer:
def __init__(self, base_url, refer_type_param, emotion_param_name, text_param_name, ref_path_param_name, ref_text_param_name):
self.base_url = base_url
# 角色情绪 or 参考音频
self.refer_type_param = refer_type_param
self.emotion_param_name = emotion_param_name
self.text_param_name = text_param_name
self.ref_path_param_name = ref_path_param_name
self.ref_text_param_name = ref_text_param_name
def is_valid(self):
if self.base_url is None or self.base_url == '':
raise ValueError("请输入url")
if self.text_param_name is None or self.text_param_name == '':
raise ValueError("请输入text参数名")
if self.emotion_param_name is None and self.ref_path_param_name is None and self.ref_text_param_name is None:
raise ValueError("请输入至少一个参考or情绪的参数")
def is_emotion(self):
return self.refer_type_param == '角色情绪'
def build_url_with_emotion(self, text_value, emotion_value, need_url_encode=True):
params = {
self.text_param_name: text_value,
self.emotion_param_name: emotion_value,
}
return append_params_to_url(self.base_url, params, need_url_encode)
def build_url_with_ref(self, text_value, ref_path_value, ref_text_value, need_url_encode=True):
params = {
self.text_param_name: text_value,
self.ref_path_param_name: ref_path_value,
self.ref_text_param_name: ref_text_value,
}
return append_params_to_url(self.base_url, params, need_url_encode)
def append_params_to_url(url_with_params, params, need_url_encode):
if params:
query_params = '&'.join([f"{k}={v}" for k, v in params.items()])
url_with_params += '?' + query_params if '?' not in url_with_params else '&' + query_params
return url_with_params if not need_url_encode else safe_encode_query_params(url_with_params)
def safe_encode_query_params(original_url):
# 分析URL以获取查询字符串部分
parsed_url = urlparse(original_url)
query_params = parse_qs(parsed_url.query)
# 将查询参数转换为编码过的字典(键值对会被转码)
encoded_params = {k: quote(v[0]) for k, v in query_params.items()}
# 重新编码查询字符串
new_query_string = urlencode(encoded_params, doseq=False)
# 重建完整的URL
new_parsed_url = parsed_url._replace(query=new_query_string)
encoded_url = urlunparse(new_parsed_url)
logger.info(encoded_url)
return encoded_url
def generate_audio_files_parallel(url_composer, text_list, emotion_list, output_dir_path, num_processes=1):
# 将emotion_list均匀分成num_processes个子集
emotion_groups = np.array_split(emotion_list, num_processes)
with ProcessPoolExecutor(max_workers=num_processes) as executor:
futures = [
executor.submit(generate_audio_files_for_emotion_group, url_composer, text_list, group, output_dir_path)
for group in emotion_groups]
for future in futures:
future.result() # 等待所有进程完成
def generate_audio_files_for_emotion_group(url_composer, text_list, emotion_list, output_dir_path):
start_time = time.perf_counter() # 使用 perf_counter 获取高精度计时起点
# Ensure the output directory exists
output_dir = os.path.abspath(output_dir_path)
os.makedirs(output_dir, exist_ok=True)
# Create subdirectories for text and emotion categories
text_subdir = os.path.join(output_dir, params.inference_audio_text_aggregation_dir)
os.makedirs(text_subdir, exist_ok=True)
emotion_subdir = os.path.join(output_dir, params.inference_audio_emotion_aggregation_dir)
os.makedirs(emotion_subdir, exist_ok=True)
all_count = len(text_list) * len(emotion_list)
has_generated_count = 0
all_text_count = sum(len(item) for item in text_list)
# 计算笛卡尔积
cartesian_product = list(itertools.product(text_list, emotion_list))
for text, emotion in cartesian_product:
# Generate audio byte stream using the create_audio function
emotion_name = emotion['emotion']
text_subdir_text = os.path.join(text_subdir, text)
os.makedirs(text_subdir_text, exist_ok=True)
text_subdir_text_file_path = os.path.join(text_subdir_text, emotion_name + '.wav')
emotion_subdir_emotion = os.path.join(emotion_subdir, emotion_name)
os.makedirs(emotion_subdir_emotion, exist_ok=True)
emotion_subdir_emotion_file_path = os.path.join(emotion_subdir_emotion, text + '.wav')
# 检查是否已经存在对应的音频文件,如果存在则跳过
if os.path.exists(text_subdir_text_file_path) and os.path.exists(emotion_subdir_emotion_file_path):
has_generated_count += 1
logger.info(f"进程ID: {os.getpid()}, 进度: {has_generated_count}/{all_count}")
continue
if url_composer.is_emotion():
real_url = url_composer.build_url_with_emotion(text, emotion['emotion'], False)
else:
real_url = url_composer.build_url_with_ref(text, emotion['ref_path'], emotion['ref_text'], False)
audio_bytes = inference_audio_from_api(real_url)
# Write audio bytes to the respective files
with open(text_subdir_text_file_path, 'wb') as f:
f.write(audio_bytes)
with open(emotion_subdir_emotion_file_path, 'wb') as f:
f.write(audio_bytes)
has_generated_count += 1
logger.info(f"进程ID: {os.getpid()}, 进度: {has_generated_count}/{all_count}")
end_time = time.perf_counter() # 获取计时终点
elapsed_time = end_time - start_time # 计算执行耗时
# 记录日志内容
log_message = f"进程ID: {os.getpid()}, generate_audio_files_for_emotion_group 执行耗时: {elapsed_time:.6f} 秒;推理数量: {has_generated_count} 字符总数:{all_text_count};每秒推理字符数:{all_text_count*len(emotion_list) / elapsed_time:.3f}"
p_logger.info(log_message)
logger.info(log_message)
def inference_audio_from_api(url):
logger.info(f'inference_audio_from_api url: {url}')
# 发起GET请求
response = requests.get(url, stream=True)
# 检查响应状态码是否正常例如200表示成功
if response.status_code == 200:
# 返回音频数据的字节流
return response.content
else:
raise Exception(f"Failed to fetch audio from API. Server responded with status code {response.status_code}.message: {response.json()}")
def start_api_set_model(set_model_url_composer, gpt_models, sovits_models):
url, post_body = set_model_url_composer.build_post_url([gpt_models, sovits_models], True)
logger.info(f'set_model_url_composer url: {set_model_url_composer}')
logger.info(f'start_api_set_model url: {url}')
logger.info(f'start_api_set_model post_body: {post_body}')
response = requests.post(url, json=post_body)
if response.status_code == 200:
result = response.text
return result
else:
return f'请求失败,状态码:{response.status_code}'
def start_api_v2_set_gpt_model(set_model_url_composer, gpt_models):
url = set_model_url_composer.build_get_url([gpt_models], False)
logger.info(f'start_api_v2_set_gpt_model url: {url}')
response = requests.get(url)
if response.status_code == 200:
result = response.text
return result
else:
return f'请求失败,状态码:{response.status_code}'
def start_api_v2_set_sovits_model(set_model_url_composer, sovits_models):
url = set_model_url_composer.build_get_url([sovits_models], False)
logger.info(f'start_api_v2_set_sovits_model url: {url}')
response = requests.get(url)
if response.status_code == 200:
result = response.text
return result
else:
return f'请求失败,状态码:{response.status_code}'

View File

@ -0,0 +1,162 @@
import os
import shutil
import random
import librosa
from Ref_Audio_Selector.config_param.log_config import logger
def check_audio_duration(path, min_duration=3, max_duration=10):
try:
# 直接计算音频文件的时长(单位:秒)
duration = librosa.get_duration(filename=path)
# 判断时长是否在3s至10s之间
if min_duration <= duration <= max_duration:
return True
else:
return False
except Exception as e:
logger.error(f"无法打开或处理音频文件:{e}")
return None
def convert_from_list(list_file, output_dir):
# 创建输出目录,如果它不存在的话
if not os.path.exists(output_dir):
os.makedirs(output_dir)
# 解析.list文件并操作文件
with open(list_file, 'r', encoding='utf-8') as file:
lines = file.readlines()
for line in lines:
parts = line.strip().split('|')
if len(parts) != 4:
logger.error(f"Line format incorrect: {line}")
continue
audio_path, _, _, transcription = parts
# 构建新的文件名和路径
new_filename = transcription.strip() + '.wav'
# new_filename = new_filename.replace(' ', '_') # 移除空格
# new_filename = ''.join(e for e in new_filename if e.isalnum() or e in ['_', '.']) # 移除非法字符
new_path = os.path.join(output_dir, new_filename)
# 如果目标文件已存在,不要覆盖
if os.path.exists(new_path):
logger.info(f"File already exists: {new_path}")
continue
try:
# 检查音频文件是否存在
if not os.path.exists(audio_path):
logger.info(f"Audio file does not exist: {audio_path}")
continue
if check_audio_duration(audio_path):
# 复制音频文件到output目录并重命名
shutil.copy2(audio_path, new_path)
logger.info(f"File copied and renamed to: {new_path}")
else:
logger.info(f"File skipped due to duration: {audio_path}")
except Exception as e:
logger.error(f"An error occurred while processing: {audio_path}")
logger.error(e)
logger.info("Processing complete.")
def sample(output_audio_dir, similarity_list, subsection_num, sample_num):
# 按照相似度分值降序排序相似度列表
similarity_list.sort(key=lambda x: x['score'], reverse=True)
# 计算每段的起始索引
step = len(similarity_list) // subsection_num
if len(similarity_list) % subsection_num != 0:
step += 1
# 分段并随机采样
for i in range(subsection_num):
start = i * step
end = (i + 1) * step
end = min(end, len(similarity_list)) # 防止最后一段越界
# 创建子列表
subsection = similarity_list[start:end]
# 在子列表上随机打乱
random.shuffle(subsection)
# 从打乱后的子列表中抽取相应数量的个体
num = min(sample_num, len(subsection))
sampled_subsection = subsection[:num]
# 创建并进入子目录
subdir_name = f'emotion_{i + 1}'
subdir_path = os.path.join(output_audio_dir, subdir_name)
os.makedirs(subdir_path, exist_ok=True)
# 复制采样结果的音频到子目录
for item in sampled_subsection:
src_path = item['wav_path']
dst_path = os.path.join(subdir_path, os.path.basename(src_path))
shutil.copyfile(src_path, dst_path)
logger.info("Sampling completed.")
def parse_similarity_file(file_path):
"""
解析指定文本文件将其中的内容以元组形式存入列表
参数:
file_path (str): 待解析的文本文件路径
返回:
list[tuple[float, str]]: 存储浮点数和路径的元组列表
"""
result_list = []
with open(file_path, 'r', encoding='utf-8') as file:
for line in file:
# 去除行尾换行符并按'|'分割
score, filepath = line.strip().split('|')
# 将浮点数字符串转换为浮点数类型
score = float(score)
# 将得分和路径作为元组添加到结果列表
result_list.append({
'score': score,
'wav_path': filepath
})
return result_list
def copy_and_move(output_audio_directory, similarity_scores):
# 确保新目录存在
if not os.path.exists(output_audio_directory):
os.makedirs(output_audio_directory)
# 遍历并复制文件
for item in similarity_scores:
# 构造新的文件名
base_name = os.path.basename(item['wav_path'])[:-4] # 去掉.wav扩展名
new_name = f"{item['score'] * 10000:04.0f}-{base_name}.wav"
# 新文件的完整路径
new_path = os.path.join(output_audio_directory, new_name)
# 复制文件到新目录
shutil.copyfile(item['wav_path'], new_path)
logger.info("已完成复制和重命名操作。")
if __name__ == '__main__':
similarity_list = parse_similarity_file("D:/tt/similarity/啊,除了伊甸和樱,竟然还有其他人会提起我?.txt")
sample('D:/tt/similarity/output', similarity_list, 10, 4)

View File

@ -0,0 +1,142 @@
import argparse
import os
import torchaudio
import torchaudio.transforms as T
import platform
import Ref_Audio_Selector.config_param.config_params as params
import Ref_Audio_Selector.config_param.log_config as log_config
from Ref_Audio_Selector.common.time_util import timeit_decorator
from Ref_Audio_Selector.common.model_manager import speaker_verification_models as models
from modelscope.pipelines import pipeline
def init_model(model_type='speech_campplus_sv_zh-cn_16k-common'):
log_config.logger.info(f'人声识别模型类型:{model_type}')
return pipeline(
task=models[model_type]['task'],
model=models[model_type]['model'],
model_revision=models[model_type]['model_revision']
)
@timeit_decorator
def compare_audio_and_generate_report(reference_audio_path, comparison_dir_path, output_file_path, model_type):
sv_pipeline = init_model(model_type)
# Step 1: 获取比较音频目录下所有音频文件的路径
comparison_audio_paths = [os.path.join(comparison_dir_path, f) for f in os.listdir(comparison_dir_path) if
f.endswith('.wav')]
if platform.system() == 'Windows':
# 因为这个模型是基于16k音频数据训练的为了避免后续比较时每次都对参考音频进行重采样所以提前进行了采样
# windows不支持torchaudio.sox_effects.apply_effects_tensor所以改写了依赖文件中的重采样方法
# 改用torchaudio.transforms.Resample进行重采样如果在非windows环境下没有更改依赖包的采样方法的话
# 使用这段代码进行预采样会出现因为采样方法不同,而导致的模型相似度计算不准确的问题
# 当然如果在windows下使用了其他的采样方法也会出现不准确的问题
if params.enable_pre_sample == 'true':
reference_audio_16k = ensure_16k_wav(reference_audio_path)
else:
reference_audio_16k = reference_audio_path
else:
reference_audio_16k = reference_audio_path
# Step 2: 用参考音频依次比较音频目录下的每个音频,获取相似度分数及对应路径
all_count = len(comparison_audio_paths)
has_processed_count = 0
similarity_scores = []
for audio_path in comparison_audio_paths:
score = sv_pipeline([reference_audio_16k, audio_path])['score']
similarity_scores.append({
'score': score,
'path': audio_path
})
has_processed_count += 1
log_config.logger.info(f'进度:{has_processed_count}/{all_count}')
# Step 3: 根据相似度分数降序排列
similarity_scores.sort(key=lambda x: x['score'], reverse=True)
# Step 4: 处理输出文件不存在的情况,创建新文件
if not os.path.exists(output_file_path):
open(output_file_path, 'w').close() # Create an empty file
# Step 5: 将排序后的结果写入输出结果文件(支持中文)
formatted_scores = [f'{item["score"]}|{item["path"]}' for item in similarity_scores]
with open(output_file_path, 'w', encoding='utf-8') as f:
# 使用'\n'将每个字符串分开,使其写入不同行
content = '\n'.join(formatted_scores)
f.write(content)
def ensure_16k_wav(audio_file_path, target_sample_rate=16000):
"""
输入一个音频文件地址判断其采样率并决定是否进行重采样然后将结果保存到指定的输出文件
参数:
audio_file_path (str): 音频文件路径
output_file_path (str): 保存重采样后音频数据的目标文件路径
target_sample_rate (int, optional): 目标采样率默认为16000Hz
"""
# 读取音频文件并获取其采样率
waveform, sample_rate = torchaudio.load(audio_file_path)
# 判断是否需要重采样
if sample_rate == target_sample_rate:
return audio_file_path
else:
# 创建Resample实例
resampler = T.Resample(orig_freq=sample_rate, new_freq=target_sample_rate)
# 应用重采样
resampled_waveform = resampler(waveform)
# 创建临时文件夹
os.makedirs(params.temp_dir, exist_ok=True)
# 设置临时文件名
temp_file_path = os.path.join(params.temp_dir, os.path.basename(audio_file_path))
# 保存重采样后的音频到指定文件
torchaudio.save(temp_file_path, resampled_waveform, target_sample_rate)
return temp_file_path
def parse_arguments():
parser = argparse.ArgumentParser(description="Audio processing script arguments")
# Reference audio path
parser.add_argument("-r", "--reference_audio", type=str, required=True,
help="Path to the reference WAV file.")
# Comparison directory path
parser.add_argument("-c", "--comparison_dir", type=str, required=True,
help="Path to the directory containing comparison WAV files.")
# Output file path
parser.add_argument("-o", "--output_file", type=str, required=True,
help="Path to the output file where results will be written.")
# Model Type
parser.add_argument("-m", "--model_type", type=str, required=True,
help="Path to the model type.")
return parser.parse_args()
if __name__ == '__main__':
cmd = parse_arguments()
compare_audio_and_generate_report(
reference_audio_path=cmd.reference_audio,
comparison_dir_path=cmd.comparison_dir,
output_file_path=cmd.output_file,
model_type=cmd.model_type,
)
# compare_audio_and_generate_report(
# reference_audio_path="D:/tt/渡鸦/refer_audio_all/也对,你的身份和我们不同吗?.wav",
# comparison_dir_path='D:/tt/渡鸦/refer_audio_all',
# output_file_path='D:/tt/渡鸦/test.txt',
# )

View File

@ -0,0 +1,77 @@
import os
import Ref_Audio_Selector.common.common as common
import Ref_Audio_Selector.tool.audio_check as audio_check
from Ref_Audio_Selector.config_param.log_config import logger
def parse_text_similarity_result_txt(file_path):
"""
解析指定格式的txt文件每行格式f"{item['average_similarity_score']}|{item['count']}|{item['emotion']}"
:param file_path: txt文件的路径
:return: 包含解析后数据的字典列表
"""
data_list = []
with open(file_path, 'r', encoding='utf-8') as file:
for line in file:
# 使用'|'作为分隔符分割每行数据
parts = line.strip().split('|')
if len(parts) == 3:
# 将分割后的字符串转换为浮点数、整数和字符串
try:
item = {
'average_similarity_score': float(parts[0]),
'count': int(parts[1]),
'emotion': parts[2]
}
data_list.append(item)
except ValueError as e:
# 如果转换失败,打印错误信息并跳过该行
logger.error(f"Error parsing line: {line.strip()} - {e}")
return data_list
def remove_low_similarity_files(ref_audio_list, report_list, audio_text_similarity_boundary):
"""
根据条件删除低相似度音频文件并返回删除数量
:param ref_audio_list: 包含音频路径和情感属性的列表
:param report_list: 包含相似度评分和情感属性的列表
:param audio_text_similarity_boundary: 相似度阈值
:return: 删除的文件数量
"""
deleted_count = 0
# 筛选出平均相似度低于阈值的报告
low_similarity_reports = [report for report in report_list if
report['average_similarity_score'] < audio_text_similarity_boundary]
# 遍历低相似度报告,查找并删除对应音频文件
for report in low_similarity_reports:
emotion = report['emotion']
# 查找ref_audio_list中相同情感的音频文件路径
matching_refs = [ref for ref in ref_audio_list if ref['emotion'] == emotion]
for match in matching_refs:
ref_path = match['ref_path']
# 检查文件是否存在,然后尝试删除
if os.path.exists(ref_path):
try:
os.remove(ref_path)
deleted_count += 1
logger.info(f"Deleted file: {ref_path}")
except Exception as e:
logger.error(f"Error deleting file {ref_path}: {e}")
else:
logger.error(f"File not found: {ref_path}")
return deleted_count
def delete_ref_audio_below_boundary(ref_audio_path, text_similarity_result_path, sync_inference_audio_dir,
audio_text_similarity_boundary):
ref_audio_list = common.RefAudioListManager(ref_audio_path).get_ref_audio_list()
report_list = parse_text_similarity_result_txt(text_similarity_result_path)
count = remove_low_similarity_files(ref_audio_list, report_list, audio_text_similarity_boundary)
audio_check.sync_ref_audio(ref_audio_path, sync_inference_audio_dir)
return count

View File

@ -0,0 +1,161 @@
import os
import argparse
from collections import defaultdict
from operator import itemgetter
from Ref_Audio_Selector.common.time_util import timeit_decorator
import Ref_Audio_Selector.tool.text_comparison.text_comparison as text_comparison
import Ref_Audio_Selector.config_param.config_params as params
import Ref_Audio_Selector.common.common as common
from Ref_Audio_Selector.config_param.log_config import logger
def parse_asr_file(file_path):
output = []
with open(file_path, 'r', encoding='utf-8') as file:
for line in file:
# 假设每行都是正确的格式,且"|"'是固定分隔符
input_file_path, original_text, language, asr_text = line.strip().split('|')
emotion = common.get_filename_without_extension(input_file_path)
# 将解析出的数据构造成新的字典或元组等结构
parsed_data = {
'emotion': emotion,
'input_file_path': input_file_path,
'original_text': original_text,
'language': language,
'asr_text': asr_text,
'similarity_score': 0
}
output.append(parsed_data)
return output
@timeit_decorator
def calculate_similarity_and_append_to_list(input_list, boundary):
all_count = len(input_list)
has_been_processed_count = 0
for item in input_list:
original_score, similarity_score = text_comparison.calculate_result(item['original_text'], item['asr_text'], boundary)
item['similarity_score'] = similarity_score
item['original_score'] = original_score
has_been_processed_count += 1
logger.info(f'进度:{has_been_processed_count}/{all_count}')
return input_list
def calculate_average_similarity_by_emotion(data_list):
result_dict = defaultdict(list)
for item in data_list:
emotion = item['emotion']
similarity_score = item['similarity_score']
result_dict[emotion].append(similarity_score)
average_scores = [{'emotion': emotion, 'average_similarity_score': sum(scores) / len(scores), 'count': len(scores)}
for emotion, scores in result_dict.items()]
average_scores.sort(key=lambda x: x['average_similarity_score'], reverse=True)
return average_scores
def group_and_sort_by_field(data, group_by_field):
# 创建一个空的结果字典键是group_by_field指定的字段值是一个列表
result_dict = defaultdict(list)
# 遍历输入列表
for item in data:
# 根据指定的group_by_field将当前元素添加到对应键的列表中
key_to_group = item[group_by_field]
result_dict[key_to_group].append(item)
# 对每个键对应的列表中的元素按similarity_score降序排序
for key in result_dict:
result_dict[key].sort(key=itemgetter('similarity_score'), reverse=True)
# 将结果字典转换为列表每个元素是一个包含键emotion或original_text和排序后数组的元组
result_list = [(k, v) for k, v in result_dict.items()]
return result_list
def format_list_to_text(data_list, output_filename):
with open(output_filename, 'w', encoding='utf-8') as output_file:
output_file.write('放大后的相似度分值|原始分值|ASR文本|原文文本\n')
for key, items in data_list:
# 写入情绪标题
output_file.write(key + '\n')
# 写入每条记录
for item in items:
formatted_line = f"{item['similarity_score']}|{item['original_score']}|{item['asr_text']}|{item['original_text']}\n"
output_file.write(formatted_line)
def format_list_to_emotion(data_list, output_filename):
with open(output_filename, 'w', encoding='utf-8') as output_file:
output_file.write('放大后的相似度分值|原始分值|ASR文本|情绪类型\n')
for key, items in data_list:
# 写入情绪标题
output_file.write(key + '\n')
# 写入每条记录
for item in items:
formatted_line = f"{item['similarity_score']}|{item['original_score']}|{item['asr_text']}|{item['emotion']}\n"
output_file.write(formatted_line)
@timeit_decorator
def process(asr_file_path, output_dir, similarity_enlarge_boundary):
# 检查输出目录是否存在,如果不存在则创建
if not os.path.exists(output_dir):
os.makedirs(output_dir)
records = parse_asr_file(asr_file_path)
calculate_similarity_and_append_to_list(records, similarity_enlarge_boundary)
average_similarity_list = calculate_average_similarity_by_emotion(records)
average_similarity_file = os.path.join(output_dir,
f'{params.text_emotion_average_similarity_report_filename}.txt')
average_similarity_content = \
'\n'.join([f"{item['average_similarity_score']}|{item['count']}|{item['emotion']}" for item in average_similarity_list])
common.write_text_to_file(average_similarity_content, average_similarity_file)
emotion_detail_list = group_and_sort_by_field(records, 'emotion')
emotion_detail_file = os.path.join(output_dir, f'{params.text_similarity_by_emotion_detail_filename}.txt')
format_list_to_text(emotion_detail_list, emotion_detail_file)
original_text_detail_list = group_and_sort_by_field(records, 'original_text')
original_text_detail_file = os.path.join(output_dir, f'{params.text_similarity_by_text_detail_filename}.txt')
format_list_to_emotion(original_text_detail_list, original_text_detail_file)
logger.info('文本相似度分析完成。')
def parse_arguments():
parser = argparse.ArgumentParser(description="Process ASR files and analyze similarity.")
parser.add_argument("-a", "--asr_file_path", type=str, required=True,
help="Path to the directory containing ASR files or path to a single ASR file.")
parser.add_argument("-o", "--output_dir", type=str, required=True,
help="Path to the directory where the analysis results should be saved.")
parser.add_argument("-b", "--similarity_enlarge_boundary", type=float, required=True,
help="Similarity score boundary value to be used in your calculations.")
args = parser.parse_args()
return args
if __name__ == '__main__':
cmd = parse_arguments()
# print(cmd)
process(cmd.asr_file_path, cmd.output_dir, cmd.similarity_enlarge_boundary)

View File

@ -0,0 +1,128 @@
import os
import torch
from transformers import AutoTokenizer, AutoModel
from scipy.spatial.distance import cosine
from Ref_Audio_Selector.config_param.log_config import logger
bert_path = os.environ.get(
"bert_path", "GPT_SoVITS/pretrained_models/chinese-roberta-wwm-ext-large"
)
# Set device to GPU if available, else CPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
logger.info(f'使用计算设备: {device}')
tokenizer = AutoTokenizer.from_pretrained(bert_path)
model = AutoModel.from_pretrained(bert_path).to(device)
def calculate_similarity(text1, text2, max_length=512):
# 预处理文本,设置最大长度
inputs1 = tokenizer(text1, padding=True, truncation=True, max_length=max_length, return_tensors='pt').to(device)
inputs2 = tokenizer(text2, padding=True, truncation=True, max_length=max_length, return_tensors='pt').to(device)
# 获取句子向量这里是取CLS token的向量并展平为一维
with torch.no_grad():
encoded_text1 = model(**inputs1)[0][:, 0, :].flatten()
encoded_text2 = model(**inputs2)[0][:, 0, :].flatten()
# 确保转换为numpy数组并且是一维的
similarity = 1 - cosine(encoded_text1.cpu().numpy().flatten(), encoded_text2.cpu().numpy().flatten())
return similarity
# 对boundary到1区间的值进行放大
def adjusted_similarity(similarity_score2, boundary=0.8):
if similarity_score2 < boundary:
return 0
# 倍数
multiple = 1 / (1 - boundary)
adjusted_score = (similarity_score2 - boundary) * multiple
return adjusted_score
def calculate_result(t1, t2, boundary):
# 计算并打印相似度
similarity_score2 = calculate_similarity(t1, t2)
# 调整相似度
adjusted_similarity_score2 = adjusted_similarity(similarity_score2, boundary)
return similarity_score2, adjusted_similarity_score2
def print_result(t1, t2, boundary):
print(f't2: {t2}')
# 计算并打印相似度
similarity_score2 = calculate_similarity(t1, t2)
print(f"两句话的相似度为: {similarity_score2:.4f}")
# 调整相似度
adjusted_similarity_score2 = adjusted_similarity(similarity_score2, boundary)
print(f"调整后的相似度为: {adjusted_similarity_score2:.4f}")
def test(boundary):
# 原始文本
text1 = "这是第一个句子"
list = """
这是第一个句子
这是第二个句子
那么这是第三个表达
当前呈现的是第四个句子
接下来我们有第五句话
在此展示第六条陈述
继续下去这是第七个短句
不容忽视的是第八个表述
顺延着序列这是第九句
此处列举的是第十个说法
进入新的篇章这是第十一个句子
下一段内容即为第十二个句子
显而易见这是第十三个叙述
渐进地我们来到第十四句话
向下滚动您会看到第十五个表达
此刻呈现在眼前的是第十六个句子
它们中的一个第十七个句子在此
如同链条般连接这是第十八个断言
按照顺序排列接下来是第十九个话语
逐一列举这是第二十个陈述句
结构相似本例给出第二十一个实例句
这是最初的陈述句
首先表达的是这一个句子
第一句内容即为此处所示
这是起始的叙述段落
开篇所展示的第一句话就是这个
明媚的阳光洒满大地
窗外飘落粉色樱花瓣
笔尖轻触纸面思绪万千
深夜的月光如水般静谧
穿越丛林的小径蜿蜒曲折
浅酌清茶品味人生百态
破晓时分雄鸡一唱天下白
草原上奔驰的骏马无拘无束
秋叶纷飞描绘季节更替画卷
寒冬雪夜炉火旁围坐共话家常
kszdRjYXw
pfsMgTlVHnB
uQaGxIbWz
ZtqNhPmKcOe
jfyrXsStVUo
wDiEgLkZbn
yhNvAfUmqC
TpKjxMrWgs
eBzHUaFJtYd
oQnXcVSiPkL
00000
"""
list2 = list.strip().split('\n')
for item in list2:
print_result(text1, item, boundary)
if __name__ == '__main__':
test(0.9)

View File

View File

@ -0,0 +1,197 @@
import os
import multiprocessing
import Ref_Audio_Selector.config_param.config_params as params
import Ref_Audio_Selector.tool.audio_inference as audio_inference
import Ref_Audio_Selector.common.common as common
rw_param = params.config_manager.get_rw_param()
# -------------------基本信息---------------------------
# 角色所在工作目录
base_dir_default = None
# 工作目录
text_work_space_dir_default = None
# 角色名称
text_role_default = None
# 参考音频所在目录
text_refer_audio_file_dir_default = None
# 推理音频所在目录
text_inference_audio_file_dir_default = None
# -------------------第一步------------------------------
# 参考音频抽样目录
text_sample_dir_default = None
# 分段数
slider_subsection_num_default = None
# 每段随机抽样个数
slider_sample_num_default = None
# -------------------第二步------------------------------
# api服务模型切换接口地址
text_api_set_model_base_url_default = None
# GPT模型参数名
text_api_gpt_param_default = None
# SoVITS模型参数名
text_api_sovits_param_default = None
# api服务GPT模型切换接口地址
text_api_v2_set_gpt_model_base_url_default = None
# GPT模型参数名
text_api_v2_gpt_model_param_default = None
# api服务SoVITS模型切换接口地址
text_api_v2_set_sovits_model_base_url_default = None
# SoVITS模型参数名
text_api_v2_sovits_model_param_default = None
# 推理服务请求地址与参数
text_url_default = None
# 推理服务请求完整地址
text_whole_url_default = None
# 文本参数名
text_text_default = None
# 参考参数类型
dropdown_refer_type_param_default = None
# 参考音频路径参数名
text_ref_path_default = None
# 参考音频文本参数名
text_ref_text_default = None
# 角色情绪参数名
text_emotion_default = None
# 待推理文本路径
text_test_content_default = None
# 请求并发数
slider_request_concurrency_num_default = 3
# 最大并发数
slider_request_concurrency_max_num = None
# -------------------第三步------------------------------
# 待asr的音频所在目录
text_asr_audio_dir_default = None
# 待分析的文件路径
text_text_similarity_analysis_path_default = None
# 文本相似度放大边界
slider_text_similarity_amplification_boundary_default = 0.90
# 文本相似度分析结果文件所在路径
text_text_similarity_result_path_default = None
# -------------------第四步------------------------------
# -------------------第五步------------------------------
# 模板内容
text_template_default = None
def empty_default(vale, default_value):
if vale is None or vale == "":
return default_value
else:
return vale
def init_base():
global text_work_space_dir_default, text_role_default, base_dir_default, text_refer_audio_file_dir_default, text_inference_audio_file_dir_default
text_work_space_dir_default = rw_param.read(rw_param.work_dir)
text_role_default = rw_param.read(rw_param.role)
base_dir_default = os.path.join(text_work_space_dir_default, text_role_default)
text_refer_audio_file_dir_default = common.check_path_existence_and_return(
os.path.join(base_dir_default, params.reference_audio_dir))
text_inference_audio_file_dir_default = common.check_path_existence_and_return(
os.path.join(base_dir_default, params.inference_audio_dir))
def init_first():
global text_sample_dir_default, slider_subsection_num_default, slider_sample_num_default
text_sample_dir_default = common.check_path_existence_and_return(
os.path.join(base_dir_default, params.list_to_convert_reference_audio_dir))
slider_subsection_num_default = int(empty_default(rw_param.read(rw_param.subsection_num), 10))
slider_sample_num_default = (empty_default(rw_param.read(rw_param.sample_num), 4))
def init_second():
global text_api_set_model_base_url_default, text_api_gpt_param_default, text_api_sovits_param_default, text_api_v2_set_gpt_model_base_url_default, text_api_v2_gpt_model_param_default
global text_api_v2_set_sovits_model_base_url_default, text_api_v2_sovits_model_param_default, text_url_default, text_whole_url_default, text_text_default, dropdown_refer_type_param_default, text_ref_path_default
global text_ref_text_default, text_emotion_default, text_test_content_default, slider_request_concurrency_num_default, slider_request_concurrency_max_num
text_api_set_model_base_url_default = empty_default(rw_param.read(rw_param.api_set_model_base_url),
'http://localhost:9880/set_model')
text_api_gpt_param_default = empty_default(rw_param.read(rw_param.api_gpt_param), 'gpt_model_path')
text_api_sovits_param_default = empty_default(rw_param.read(rw_param.api_sovits_param), 'sovits_model_path')
text_api_v2_set_gpt_model_base_url_default = empty_default(rw_param.read(rw_param.api_v2_set_gpt_model_base_url),
'http://localhost:9880/set_gpt_weights')
text_api_v2_gpt_model_param_default = empty_default(rw_param.read(rw_param.api_v2_gpt_model_param), 'weights_path')
text_api_v2_set_sovits_model_base_url_default = empty_default(
rw_param.read(rw_param.api_v2_set_sovits_model_base_url), 'http://localhost:9880/set_sovits_weights')
text_api_v2_sovits_model_param_default = empty_default(rw_param.read(rw_param.api_v2_sovits_model_param), 'weights_path')
text_url_default = empty_default(rw_param.read(rw_param.text_url),
'http://localhost:9880?prompt_language=中文&text_language=中文&cut_punc=,.;?!、,。?!;:…')
text_text_default = empty_default(rw_param.read(rw_param.text_param), 'text')
dropdown_refer_type_param_default = empty_default(rw_param.read(rw_param.refer_type_param), '参考音频')
text_ref_path_default = empty_default(rw_param.read(rw_param.ref_path_param), 'refer_wav_path')
text_ref_text_default = empty_default(rw_param.read(rw_param.ref_text_param), 'prompt_text')
text_emotion_default = empty_default(rw_param.read(rw_param.emotion_param), 'emotion')
text_whole_url_default = whole_url(text_url_default, dropdown_refer_type_param_default, text_text_default,
text_ref_path_default, text_ref_text_default, text_emotion_default)
text_test_content_default = empty_default(rw_param.read(rw_param.test_content_path), params.default_test_text_path)
slider_request_concurrency_max_num = multiprocessing.cpu_count()
slider_request_concurrency_num_default = empty_default(rw_param.read(rw_param.request_concurrency_num), 3)
slider_request_concurrency_num_default = min(int(slider_request_concurrency_num_default), slider_request_concurrency_max_num)
# 基于请求路径和参数,合成完整的请求路径
def whole_url(text_url, dropdown_refer_type_param, text_text, text_ref_path, text_ref_text, text_emotion):
url_composer = audio_inference.TTSURLComposer(text_url, dropdown_refer_type_param, text_emotion, text_text,
text_ref_path, text_ref_text)
if url_composer.is_emotion():
text_whole_url = url_composer.build_url_with_emotion('测试内容', '情绪类型', False)
else:
text_whole_url = url_composer.build_url_with_ref('测试内容', '参考路径', '参考文本', False)
return text_whole_url
def init_third():
global text_asr_audio_dir_default, text_text_similarity_analysis_path_default, slider_text_similarity_amplification_boundary_default, text_text_similarity_result_path_default
text_asr_audio_dir_default = common.check_path_existence_and_return(
os.path.join(base_dir_default, params.inference_audio_dir, params.inference_audio_text_aggregation_dir))
text_text_similarity_analysis_path_default = common.check_path_existence_and_return(
os.path.join(base_dir_default, params.asr_filename + '.list'))
slider_text_similarity_amplification_boundary_default = empty_default(
rw_param.read(rw_param.text_similarity_amplification_boundary), 0.90)
text_text_similarity_result_path_default = common.check_path_existence_and_return(
os.path.join(base_dir_default, params.text_emotion_average_similarity_report_filename + '.txt'))
def init_fourth():
pass
def init_fifth():
global text_template_default
default_template_path = params.default_template_path
text_template_default = empty_default(rw_param.read(rw_param.text_template),
common.read_file(default_template_path))
def init_all():
init_base()
init_first()
init_second()
init_third()
init_fourth()
init_fifth()

Binary file not shown.

After

Width:  |  Height:  |  Size: 95 KiB