mirror of
https://github.com/RVC-Boss/GPT-SoVITS.git
synced 2025-04-05 04:22:46 +08:00
Merge 50a88a596dea718c83e535136e9cb46b513cef6f into 03b662a769946b7a6a8569a354860e8eeeb743aa
This commit is contained in:
commit
cc88d33348
@ -398,4 +398,5 @@ arpa = {
|
||||
symbols = [pad] + c + v + ja_symbols + pu_symbols + list(arpa)
|
||||
symbols = sorted(set(symbols))
|
||||
if __name__ == "__main__":
|
||||
print(symbols)
|
||||
print(len(symbols))
|
||||
|
0
Ref_Audio_Selector/__init__.py
Normal file
0
Ref_Audio_Selector/__init__.py
Normal file
0
Ref_Audio_Selector/common/__init__.py
Normal file
0
Ref_Audio_Selector/common/__init__.py
Normal file
156
Ref_Audio_Selector/common/common.py
Normal file
156
Ref_Audio_Selector/common/common.py
Normal file
@ -0,0 +1,156 @@
|
||||
from tools import my_utils
|
||||
from config import python_exec, is_half
|
||||
import subprocess
|
||||
import sys
|
||||
import os
|
||||
|
||||
|
||||
class RefAudioListManager:
|
||||
def __init__(self, root_dir):
|
||||
self.audio_dict = {'default': []}
|
||||
absolute_root = os.path.abspath(root_dir)
|
||||
|
||||
for subdir, dirs, files in os.walk(absolute_root):
|
||||
relative_path = os.path.relpath(subdir, absolute_root)
|
||||
|
||||
if relative_path == '.':
|
||||
category = 'default'
|
||||
else:
|
||||
category = relative_path.replace(os.sep, '')
|
||||
|
||||
for file in files:
|
||||
if file.endswith('.wav'):
|
||||
# 将相对路径转换为绝对路径
|
||||
audio_abs_path = os.path.join(subdir, file)
|
||||
if category not in self.audio_dict:
|
||||
self.audio_dict[category] = []
|
||||
self.audio_dict[category].append(audio_abs_path)
|
||||
|
||||
def get_audio_list(self):
|
||||
return self.audio_dict
|
||||
|
||||
def get_flattened_audio_list(self):
|
||||
all_audio_files = []
|
||||
for category_audios in self.audio_dict.values():
|
||||
all_audio_files.extend(category_audios)
|
||||
return all_audio_files
|
||||
|
||||
def get_ref_audio_list(self):
|
||||
audio_info_list = []
|
||||
for category, audio_paths in self.audio_dict.items():
|
||||
for audio_path in audio_paths:
|
||||
filename_without_extension = os.path.splitext(os.path.basename(audio_path))[0]
|
||||
audio_info = {
|
||||
'emotion': f"{category}-{filename_without_extension}",
|
||||
'ref_path': audio_path,
|
||||
'ref_text': filename_without_extension,
|
||||
}
|
||||
audio_info_list.append(audio_info)
|
||||
return audio_info_list
|
||||
|
||||
|
||||
def batch_clean_paths(paths):
|
||||
"""
|
||||
批量处理路径列表,对每个路径调用 clean_path() 函数。
|
||||
|
||||
参数:
|
||||
paths (list[str]): 包含待处理路径的列表。
|
||||
|
||||
返回:
|
||||
list[str]: 经过 clean_path() 处理后的路径列表。
|
||||
"""
|
||||
cleaned_paths = []
|
||||
for path in paths:
|
||||
cleaned_paths.append(my_utils.clean_path(path))
|
||||
return cleaned_paths
|
||||
|
||||
|
||||
def read_text_file_to_list(file_path):
|
||||
# 按照UTF-8编码打开文件(确保能够正确读取中文)
|
||||
with open(file_path, mode='r', encoding='utf-8') as file:
|
||||
# 读取所有行并存储到一个列表中
|
||||
lines = file.read().splitlines()
|
||||
return lines
|
||||
|
||||
|
||||
def get_filename_without_extension(file_path):
|
||||
"""
|
||||
Given a file path string, returns the file name without its extension.
|
||||
|
||||
Parameters:
|
||||
file_path (str): The full path to the file.
|
||||
|
||||
Returns:
|
||||
str: The file name without its extension.
|
||||
"""
|
||||
base_name = os.path.basename(file_path) # Get the base name (file name with extension)
|
||||
file_name, file_extension = os.path.splitext(base_name) # Split the base name into file name and extension
|
||||
return file_name # Return the file name without extension
|
||||
|
||||
|
||||
def read_file(file_path):
|
||||
# 使用with语句打开并读取文件
|
||||
with open(file_path, 'r', encoding='utf-8') as file: # 'r' 表示以读取模式打开文件
|
||||
# 一次性读取文件所有内容
|
||||
file_content = file.read()
|
||||
|
||||
# 文件在with语句结束时会自动关闭
|
||||
# 现在file_content变量中存储了文件的所有文本内容
|
||||
return file_content
|
||||
|
||||
|
||||
def write_text_to_file(text, output_file_path):
|
||||
try:
|
||||
with open(output_file_path, 'w', encoding='utf-8') as file:
|
||||
file.write(text)
|
||||
except IOError as e:
|
||||
print(f"Error occurred while writing to the file: {e}")
|
||||
else:
|
||||
print(f"Text successfully written to file: {output_file_path}")
|
||||
|
||||
|
||||
def check_path_existence_and_return(path):
|
||||
"""
|
||||
检查给定路径(文件或目录)是否存在。如果存在,返回该路径;否则,返回空字符串。
|
||||
:param path: 待检查的文件或目录路径(字符串)
|
||||
:return: 如果路径存在,返回原路径;否则,返回空字符串
|
||||
"""
|
||||
if os.path.exists(path):
|
||||
return path
|
||||
else:
|
||||
return ""
|
||||
|
||||
|
||||
def open_file(filepath):
|
||||
if sys.platform.startswith('darwin'):
|
||||
subprocess.run(['open', filepath]) # macOS
|
||||
elif os.name == 'nt': # For Windows
|
||||
os.startfile(filepath)
|
||||
elif os.name == 'posix': # For Linux, Unix, etc.
|
||||
subprocess.run(['xdg-open', filepath])
|
||||
|
||||
|
||||
def start_new_service(script_path):
|
||||
# 对于Windows系统
|
||||
if sys.platform.startswith('win'):
|
||||
cmd = f'start cmd /k {python_exec} {script_path}'
|
||||
# 对于Mac或者Linux系统
|
||||
else:
|
||||
cmd = f'xterm -e {python_exec} {script_path}'
|
||||
|
||||
proc = subprocess.Popen(cmd, shell=True)
|
||||
|
||||
# 关闭之前启动的子进程
|
||||
# proc.terminate()
|
||||
|
||||
# 或者如果需要强制关闭可以使用
|
||||
# proc.kill()
|
||||
|
||||
return proc
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
dir = r'C:\Users\Administrator\Desktop/test'
|
||||
dir2 = r'"C:\Users\Administrator\Desktop\test2"'
|
||||
dir, dir2 = batch_clean_paths([dir, dir2])
|
||||
print(dir, dir2)
|
46
Ref_Audio_Selector/common/model_manager.py
Normal file
46
Ref_Audio_Selector/common/model_manager.py
Normal file
@ -0,0 +1,46 @@
|
||||
import os
|
||||
import re
|
||||
|
||||
pretrained_sovits_name = "GPT_SoVITS/pretrained_models/s2G488k.pth"
|
||||
pretrained_gpt_name = "GPT_SoVITS/pretrained_models/s1bert25hz-2kh-longer-epoch=68e-step=50232.ckpt"
|
||||
SoVITS_weight_root = "SoVITS_weights"
|
||||
GPT_weight_root = "GPT_weights"
|
||||
os.makedirs(SoVITS_weight_root, exist_ok=True)
|
||||
os.makedirs(GPT_weight_root, exist_ok=True)
|
||||
|
||||
speaker_verification_models = {
|
||||
'speech_campplus_sv_zh-cn_16k-common': {
|
||||
'task': 'speaker-verification',
|
||||
'model': 'Ref_Audio_Selector/tool/speaker_verification/models/speech_campplus_sv_zh-cn_16k-common',
|
||||
'model_revision': 'v1.0.0'
|
||||
},
|
||||
'speech_eres2net_sv_zh-cn_16k-common': {
|
||||
'task': 'speaker-verification',
|
||||
'model': 'Ref_Audio_Selector/tool/speaker_verification/models/speech_eres2net_sv_zh-cn_16k-common',
|
||||
'model_revision': 'v1.0.5'
|
||||
}
|
||||
}
|
||||
|
||||
def custom_sort_key(s):
|
||||
# 使用正则表达式提取字符串中的数字部分和非数字部分
|
||||
parts = re.split('(\d+)', s)
|
||||
# 将数字部分转换为整数,非数字部分保持不变
|
||||
parts = [int(part) if part.isdigit() else part for part in parts]
|
||||
return parts
|
||||
|
||||
|
||||
def get_gpt_model_names():
|
||||
gpt_names = [pretrained_gpt_name]
|
||||
for name in os.listdir(GPT_weight_root):
|
||||
if name.endswith(".ckpt"): gpt_names.append("%s/%s" % (GPT_weight_root, name))
|
||||
sorted(gpt_names, key=custom_sort_key)
|
||||
return gpt_names
|
||||
|
||||
|
||||
def get_sovits_model_names():
|
||||
sovits_names = [pretrained_sovits_name]
|
||||
for name in os.listdir(SoVITS_weight_root):
|
||||
if name.endswith(".pth"): sovits_names.append("%s/%s" % (SoVITS_weight_root, name))
|
||||
sorted(sovits_names, key=custom_sort_key)
|
||||
return sovits_names
|
||||
|
72
Ref_Audio_Selector/common/time_util.py
Normal file
72
Ref_Audio_Selector/common/time_util.py
Normal file
@ -0,0 +1,72 @@
|
||||
import time
|
||||
import os
|
||||
from Ref_Audio_Selector.config_param.log_config import p_logger
|
||||
import Ref_Audio_Selector.config_param.config_params as params
|
||||
|
||||
|
||||
def timeit_decorator(func):
|
||||
"""
|
||||
装饰器,用于计算被装饰函数的执行时间。
|
||||
|
||||
参数:
|
||||
func (function): 要计时的函数。
|
||||
|
||||
返回:
|
||||
function: 包含计时功能的新函数。
|
||||
"""
|
||||
|
||||
def wrapper(*args, **kwargs):
|
||||
if params.time_log_print_type != 'file':
|
||||
return func(*args, **kwargs)
|
||||
|
||||
start_time = time.perf_counter() # 使用 perf_counter 获取高精度计时起点
|
||||
|
||||
func_result = func(*args, **kwargs) # 执行原函数
|
||||
|
||||
end_time = time.perf_counter() # 获取计时终点
|
||||
elapsed_time = end_time - start_time # 计算执行耗时
|
||||
|
||||
# 记录日志内容
|
||||
log_message = f"进程ID: {os.getpid()}, {func.__name__} 执行耗时: {elapsed_time:.6f} 秒"
|
||||
p_logger.info(log_message)
|
||||
|
||||
return func_result
|
||||
|
||||
return wrapper
|
||||
|
||||
|
||||
def time_monitor(func):
|
||||
"""
|
||||
返回结果,追加时间
|
||||
"""
|
||||
|
||||
def wrapper(*args, **kwargs):
|
||||
|
||||
start_time = time.perf_counter() # 使用 perf_counter 获取高精度计时起点
|
||||
|
||||
func_result = func(*args, **kwargs) # 执行原函数
|
||||
|
||||
end_time = time.perf_counter() # 获取计时终点
|
||||
elapsed_time = end_time - start_time # 计算执行耗时
|
||||
|
||||
return elapsed_time, func_result
|
||||
|
||||
return wrapper
|
||||
|
||||
|
||||
# 使用装饰器
|
||||
@timeit_decorator
|
||||
def example_function(n):
|
||||
time.sleep(n) # 假设这是需要计时的函数,这里模拟耗时操作
|
||||
return n * 2
|
||||
|
||||
|
||||
def example_function2(n):
|
||||
time.sleep(n) # 假设这是需要计时的函数,这里模拟耗时操作
|
||||
return n * 2
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
# 调用经过装饰的函数
|
||||
# result = example_function(2)
|
||||
print(time_monitor(example_function2)(2))
|
57
Ref_Audio_Selector/config.ini
Normal file
57
Ref_Audio_Selector/config.ini
Normal file
@ -0,0 +1,57 @@
|
||||
# config.ini
|
||||
|
||||
[Base]
|
||||
# 服务端口号
|
||||
server_port = 9423
|
||||
# 参考音频目录
|
||||
reference_audio_dir = refer_audio
|
||||
# 临时文件目录
|
||||
temp_dir = Ref_Audio_Selector/temp
|
||||
|
||||
[Log]
|
||||
# 日志保存目录路径
|
||||
log_dir = Ref_Audio_Selector/log/general
|
||||
# 日志级别 CRITICAL、FATAL、ERROR、WARNING、WARN、INFO、DEBUG、NOTSET、
|
||||
log_level = INFO
|
||||
# 函数时间消耗日志打印类型 file 打印到文件; close 关闭
|
||||
time_log_print_type = file
|
||||
# 函数时间消耗日志保存目录路径
|
||||
time_log_print_dir = Ref_Audio_Selector/log/performance
|
||||
|
||||
[AudioSample]
|
||||
# list转换待选参考音频目录
|
||||
list_to_convert_reference_audio_dir = refer_audio_all
|
||||
# 音频相似度目录
|
||||
audio_similarity_dir = similarity
|
||||
# 是否开启基准音频预采样 true false
|
||||
enable_pre_sample = true
|
||||
|
||||
[Inference]
|
||||
# 默认测试文本位置
|
||||
default_test_text_path = Ref_Audio_Selector/file/test_content/test_content.txt
|
||||
# 推理音频目录
|
||||
inference_audio_dir = inference_audio
|
||||
# 推理音频文本聚合目录
|
||||
inference_audio_text_aggregation_dir = text
|
||||
# 推理音频情绪聚合目录
|
||||
inference_audio_emotion_aggregation_dir = emotion
|
||||
|
||||
[ResultCheck]
|
||||
# asr输出文件
|
||||
asr_filename = asr
|
||||
# 文本相似度输出目录
|
||||
text_similarity_output_dir = text_similarity
|
||||
# 文本情绪平均相似度报告文件名
|
||||
text_emotion_average_similarity_report_filename = average_similarity
|
||||
# 文本相似度按情绪聚合明细文件名
|
||||
text_similarity_by_emotion_detail_filename = emotion_group_detail
|
||||
# 文本相似度按文本聚合明细文件名
|
||||
text_similarity_by_text_detail_filename = text_group_detail
|
||||
|
||||
[AudioConfig]
|
||||
# 默认模板文件位置
|
||||
default_template_path = Ref_Audio_Selector/file/config_template/ref_audio_template.txt
|
||||
# 参考音频配置文件名
|
||||
reference_audio_config_filename = refer_audio
|
||||
|
||||
[Other]
|
0
Ref_Audio_Selector/config_param/__init__.py
Normal file
0
Ref_Audio_Selector/config_param/__init__.py
Normal file
111
Ref_Audio_Selector/config_param/config_manager.py
Normal file
111
Ref_Audio_Selector/config_param/config_manager.py
Normal file
@ -0,0 +1,111 @@
|
||||
import configparser
|
||||
import os
|
||||
import Ref_Audio_Selector.common.common as common
|
||||
|
||||
|
||||
class ParamReadWriteManager:
|
||||
def __init__(self):
|
||||
self.base_dir = 'Ref_Audio_Selector/file/base_info'
|
||||
os.makedirs(self.base_dir, exist_ok=True)
|
||||
# 基础信息
|
||||
self.work_dir = 'work_dir'
|
||||
self.role = 'role'
|
||||
# 第一步
|
||||
self.subsection_num = 'subsection_num'
|
||||
self.sample_num = 'sample_num'
|
||||
# 第二步
|
||||
self.api_set_model_base_url = 'api_set_model_base_url'
|
||||
self.api_gpt_param = 'api_gpt_param'
|
||||
self.api_sovits_param = 'api_sovits_param'
|
||||
|
||||
self.api_v2_set_gpt_model_base_url = 'api_v2_set_gpt_model_base_url'
|
||||
self.api_v2_gpt_model_param = 'api_v2_gpt_model_param'
|
||||
self.api_v2_set_sovits_model_base_url = 'api_v2_set_sovits_model_base_url'
|
||||
self.api_v2_sovits_model_param = 'api_v2_sovits_model_param'
|
||||
|
||||
self.text_url = 'text_url'
|
||||
self.text_param = 'text_param'
|
||||
self.refer_type_param = 'refer_type_param'
|
||||
self.ref_path_param = 'ref_path_param'
|
||||
self.ref_text_param = 'ref_text_param'
|
||||
self.emotion_param = 'emotion_param'
|
||||
|
||||
self.test_content_path = 'test_content_path'
|
||||
self.request_concurrency_num = 'request_concurrency_num'
|
||||
|
||||
# 第三步
|
||||
self.text_similarity_amplification_boundary = 'text_similarity_amplification_boundary'
|
||||
# 第四步
|
||||
# 第五步
|
||||
self.text_template = 'text_template'
|
||||
|
||||
def read(self, key):
|
||||
file_path = os.path.join(self.base_dir, key + '.txt')
|
||||
if os.path.exists(file_path):
|
||||
content = common.read_file(file_path)
|
||||
return content.strip()
|
||||
else:
|
||||
return ''
|
||||
|
||||
def write(self, key, content):
|
||||
file_path = os.path.join(self.base_dir, key + '.txt')
|
||||
|
||||
# 确保内容是字符串类型,如果不是,转换为字符串
|
||||
if not isinstance(content, str):
|
||||
clean_content = str(content).strip() # 转换为字符串并移除首尾空白
|
||||
else:
|
||||
clean_content = content.strip()
|
||||
|
||||
common.write_text_to_file(clean_content, file_path)
|
||||
|
||||
|
||||
class ConfigManager:
|
||||
def __init__(self):
|
||||
self.config_path = 'Ref_Audio_Selector/config.ini'
|
||||
self.config = configparser.ConfigParser()
|
||||
self.config.read(self.config_path, encoding='utf-8')
|
||||
|
||||
def get_base(self, key):
|
||||
return self.config.get('Base', key)
|
||||
|
||||
def get_log(self, key):
|
||||
return self.config.get('Log', key)
|
||||
|
||||
def get_audio_sample(self, key):
|
||||
return self.config.get('AudioSample', key)
|
||||
|
||||
def get_inference(self, key):
|
||||
return self.config.get('Inference', key)
|
||||
|
||||
def get_result_check(self, key):
|
||||
return self.config.get('ResultCheck', key)
|
||||
|
||||
def get_audio_config(self, key):
|
||||
return self.config.get('AudioConfig', key)
|
||||
|
||||
def get_other(self, key):
|
||||
return self.config.get('Other', key)
|
||||
|
||||
def print(self):
|
||||
# 打印所有配置
|
||||
for section in self.config.sections():
|
||||
print('[{}]'.format(section))
|
||||
for key in self.config[section]:
|
||||
print('{} = {}'.format(key, self.config[section][key]))
|
||||
print()
|
||||
|
||||
|
||||
_config = ConfigManager()
|
||||
_param_read_write_manager = ParamReadWriteManager()
|
||||
|
||||
|
||||
def get_config():
|
||||
return _config
|
||||
|
||||
|
||||
def get_rw_param():
|
||||
return _param_read_write_manager
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
print(_config.print())
|
58
Ref_Audio_Selector/config_param/config_params.py
Normal file
58
Ref_Audio_Selector/config_param/config_params.py
Normal file
@ -0,0 +1,58 @@
|
||||
import Ref_Audio_Selector.config_param.config_manager as config_manager
|
||||
|
||||
config = config_manager.get_config()
|
||||
|
||||
# [Base]
|
||||
# 服务端口号
|
||||
server_port = int(config.get_base('server_port'))
|
||||
# 参考音频目录
|
||||
reference_audio_dir = config.get_base('reference_audio_dir')
|
||||
# 临时文件目录
|
||||
temp_dir = config.get_base('temp_dir')
|
||||
|
||||
# [Log]
|
||||
# 日志保存目录路径
|
||||
log_dir = config.get_log('log_dir')
|
||||
# 日志级别 CRITICAL、FATAL、ERROR、WARNING、WARN、INFO、DEBUG、NOTSET、
|
||||
log_level = config.get_log('log_level')
|
||||
# 函数时间消耗日志打印类型 file 打印到文件; close 关闭
|
||||
time_log_print_type = config.get_log('time_log_print_type')
|
||||
# 函数时间消耗日志保存目录路径
|
||||
time_log_print_dir = config.get_log('time_log_print_dir')
|
||||
|
||||
# [AudioSample]
|
||||
# list转换待选参考音频目录
|
||||
list_to_convert_reference_audio_dir = config.get_audio_sample('list_to_convert_reference_audio_dir')
|
||||
# 音频相似度目录
|
||||
audio_similarity_dir = config.get_audio_sample('audio_similarity_dir')
|
||||
# 是否开启基准音频预采样 true false
|
||||
enable_pre_sample = config.get_audio_sample('enable_pre_sample')
|
||||
|
||||
# [Inference]
|
||||
# 默认测试文本位置
|
||||
default_test_text_path = config.get_inference('default_test_text_path')
|
||||
# 推理音频目录
|
||||
inference_audio_dir = config.get_inference('inference_audio_dir')
|
||||
# 推理音频文本聚合目录
|
||||
inference_audio_text_aggregation_dir = config.get_inference('inference_audio_text_aggregation_dir')
|
||||
# 推理音频情绪聚合目录
|
||||
inference_audio_emotion_aggregation_dir = config.get_inference('inference_audio_emotion_aggregation_dir')
|
||||
|
||||
# [ResultCheck]
|
||||
# asr输出文件
|
||||
asr_filename = config.get_result_check('asr_filename')
|
||||
# 文本相似度输出目录
|
||||
text_similarity_output_dir = config.get_result_check('text_similarity_output_dir')
|
||||
# 文本情绪平均相似度报告文件名
|
||||
text_emotion_average_similarity_report_filename = config.get_result_check('text_emotion_average_similarity_report_filename')
|
||||
# 文本相似度按情绪聚合明细文件名
|
||||
text_similarity_by_emotion_detail_filename = config.get_result_check('text_similarity_by_emotion_detail_filename')
|
||||
# 文本相似度按文本聚合明细文件名
|
||||
text_similarity_by_text_detail_filename = config.get_result_check('text_similarity_by_text_detail_filename')
|
||||
|
||||
# [AudioConfig]
|
||||
# 默认模板文件位置
|
||||
default_template_path = config.get_audio_config('default_template_path')
|
||||
# 参考音频配置文件名
|
||||
reference_audio_config_filename = config.get_audio_config('reference_audio_config_filename')
|
||||
|
65
Ref_Audio_Selector/config_param/log_config.py
Normal file
65
Ref_Audio_Selector/config_param/log_config.py
Normal file
@ -0,0 +1,65 @@
|
||||
import logging
|
||||
import os
|
||||
import datetime
|
||||
import Ref_Audio_Selector.config_param.config_params as params
|
||||
|
||||
|
||||
def create_general_logger():
|
||||
# 获取当前日期,用于文件名和日志内容
|
||||
current_date = datetime.datetime.now().strftime('%Y-%m-%d')
|
||||
|
||||
# 创建一个用于控制台输出的处理器,并设置日志级别
|
||||
console_handler = logging.StreamHandler()
|
||||
# console_handler.setLevel(logging.INFO)
|
||||
# 可以设置控制台输出的格式
|
||||
console_formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
|
||||
console_handler.setFormatter(console_formatter)
|
||||
console_handler.encoding = 'utf-8' # 设置字符编码为utf-8
|
||||
|
||||
os.makedirs(params.log_dir, exist_ok=True)
|
||||
|
||||
# 创建一个用于常规日志的处理器
|
||||
general_handler = logging.FileHandler(f"{params.log_dir}/{current_date}.log", mode='a', encoding='utf-8')
|
||||
# general_handler.setLevel(logging.INFO)
|
||||
general_formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
|
||||
general_handler.setFormatter(general_formatter)
|
||||
|
||||
# 配置一个常规的logger
|
||||
general_logger = logging.getLogger('general')
|
||||
level = logging.getLevelName(params.log_level)
|
||||
general_logger.setLevel(level)
|
||||
general_logger.addHandler(console_handler)
|
||||
general_logger.addHandler(general_handler)
|
||||
|
||||
# 配置根logger,以防万一
|
||||
logging.basicConfig(level=logging.WARNING, handlers=[general_handler])
|
||||
|
||||
return general_logger
|
||||
|
||||
|
||||
def create_performance_logger():
|
||||
# 获取当前日期,用于文件名和日志内容
|
||||
current_date = datetime.datetime.now().strftime('%Y-%m-%d')
|
||||
|
||||
os.makedirs(params.time_log_print_dir, exist_ok=True)
|
||||
|
||||
# 创建一个专用于性能监控日志的处理器
|
||||
performance_handler = logging.FileHandler(
|
||||
f"{params.time_log_print_dir}/{current_date}.log", mode='a', encoding='utf-8')
|
||||
# performance_handler.setLevel(logging.INFO)
|
||||
performance_formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
|
||||
performance_handler.setFormatter(performance_formatter)
|
||||
|
||||
# 配置一个专门用于性能监控的logger
|
||||
performance_logger = logging.getLogger('performance')
|
||||
performance_logger.setLevel(logging.INFO)
|
||||
performance_logger.addHandler(performance_handler)
|
||||
|
||||
return performance_logger
|
||||
|
||||
|
||||
def setup_logging():
|
||||
return create_general_logger(), create_performance_logger()
|
||||
|
||||
|
||||
logger, p_logger = setup_logging()
|
0
Ref_Audio_Selector/file/base_info/role.txt
Normal file
0
Ref_Audio_Selector/file/base_info/role.txt
Normal file
0
Ref_Audio_Selector/file/base_info/work_dir.txt
Normal file
0
Ref_Audio_Selector/file/base_info/work_dir.txt
Normal file
@ -0,0 +1,5 @@
|
||||
"${emotion}": {
|
||||
"ref_wav_path": "${ref_path}",
|
||||
"prompt_text": "${ref_text}",
|
||||
"prompt_language": "中文"
|
||||
}
|
4
Ref_Audio_Selector/file/test_content/test_content.txt
Normal file
4
Ref_Audio_Selector/file/test_content/test_content.txt
Normal file
@ -0,0 +1,4 @@
|
||||
也是只有一次。”白蓉简单地回答,然后迅速转移话锋,搂住罗辑的脖子说,“算了,我不要那生日礼物了,你也回到正常的生活中来,好吗?”
|
||||
云天明看到那是一条丑陋的虫子,软乎乎湿漉漉的,在她白皙的手指间蠕动着,旁边一个女生尖叫道:恶心死了,你碰它干吗?!程心把虫子轻轻放到旁边的草丛中,说,它在这里会给踩死的。
|
||||
“那么多的星星,像雾似的。”云天明感叹道。程心把目光从银河收回,转头看着他,指着下面的校园和城市说:“你看下面也很漂亮啊,我们的生活是在这儿,可不是在那么远的银河里。”
|
||||
“可我们的专业,不就是为了到地球之外去吗?”“那是为了这里的生活更好,可不是为了逃离地球啊。”云天明当然知道程心的话是委婉地指向他的孤僻和自闭,他也只有默然以对。
|
1066
Ref_Audio_Selector/ref_audio_selector_webui.py
Normal file
1066
Ref_Audio_Selector/ref_audio_selector_webui.py
Normal file
File diff suppressed because it is too large
Load Diff
5
Ref_Audio_Selector/start_ref_audio_selector_webui.bat
Normal file
5
Ref_Audio_Selector/start_ref_audio_selector_webui.bat
Normal file
@ -0,0 +1,5 @@
|
||||
CHCP 65001
|
||||
@echo off
|
||||
cd ../
|
||||
runtime\python.exe ./Ref_Audio_Selector/ref_audio_selector_webui.py
|
||||
pause
|
0
Ref_Audio_Selector/tool/__init__.py
Normal file
0
Ref_Audio_Selector/tool/__init__.py
Normal file
0
Ref_Audio_Selector/tool/asr/__init__.py
Normal file
0
Ref_Audio_Selector/tool/asr/__init__.py
Normal file
120
Ref_Audio_Selector/tool/asr/fasterwhisper_asr_multi_level_dir.py
Normal file
120
Ref_Audio_Selector/tool/asr/fasterwhisper_asr_multi_level_dir.py
Normal file
@ -0,0 +1,120 @@
|
||||
import argparse
|
||||
import os
|
||||
import traceback
|
||||
import Ref_Audio_Selector.config_param.config_params as params
|
||||
|
||||
os.environ["HF_ENDPOINT"] = "https://hf-mirror.com"
|
||||
os.environ["KMP_DUPLICATE_LIB_OK"] = "TRUE"
|
||||
|
||||
import torch
|
||||
from faster_whisper import WhisperModel
|
||||
from tqdm import tqdm
|
||||
|
||||
from tools.asr.config import check_fw_local_models
|
||||
from Ref_Audio_Selector.config_param.log_config import logger
|
||||
|
||||
language_code_list = [
|
||||
"af", "am", "ar", "as", "az",
|
||||
"ba", "be", "bg", "bn", "bo",
|
||||
"br", "bs", "ca", "cs", "cy",
|
||||
"da", "de", "el", "en", "es",
|
||||
"et", "eu", "fa", "fi", "fo",
|
||||
"fr", "gl", "gu", "ha", "haw",
|
||||
"he", "hi", "hr", "ht", "hu",
|
||||
"hy", "id", "is", "it", "ja",
|
||||
"jw", "ka", "kk", "km", "kn",
|
||||
"ko", "la", "lb", "ln", "lo",
|
||||
"lt", "lv", "mg", "mi", "mk",
|
||||
"ml", "mn", "mr", "ms", "mt",
|
||||
"my", "ne", "nl", "nn", "no",
|
||||
"oc", "pa", "pl", "ps", "pt",
|
||||
"ro", "ru", "sa", "sd", "si",
|
||||
"sk", "sl", "sn", "so", "sq",
|
||||
"sr", "su", "sv", "sw", "ta",
|
||||
"te", "tg", "th", "tk", "tl",
|
||||
"tr", "tt", "uk", "ur", "uz",
|
||||
"vi", "yi", "yo", "zh", "yue",
|
||||
"auto"]
|
||||
|
||||
|
||||
def execute_asr_multi_level_dir(input_folder, output_folder, model_size, language, precision):
|
||||
if '-local' in model_size:
|
||||
model_size = model_size[:-6]
|
||||
model_path = f'tools/asr/models/faster-whisper-{model_size}'
|
||||
else:
|
||||
model_path = model_size
|
||||
if language == 'auto':
|
||||
language = None # 不设置语种由模型自动输出概率最高的语种
|
||||
logger.info("loading faster whisper model:", model_size, model_path)
|
||||
device = 'cuda' if torch.cuda.is_available() else 'cpu'
|
||||
try:
|
||||
model = WhisperModel(model_path, device=device, compute_type=precision)
|
||||
except:
|
||||
return logger.error(traceback.format_exc())
|
||||
|
||||
output = []
|
||||
|
||||
# 递归遍历输入目录及所有子目录
|
||||
for root, dirs, files in os.walk(input_folder):
|
||||
for file_name in sorted(files):
|
||||
# 只处理wav文件(假设是wav文件)
|
||||
if file_name.endswith(".wav"):
|
||||
try:
|
||||
file_path = os.path.join(root, file_name)
|
||||
original_text = os.path.basename(root)
|
||||
segments, info = model.transcribe(
|
||||
audio=file_path,
|
||||
beam_size=5,
|
||||
vad_filter=True,
|
||||
vad_parameters=dict(min_silence_duration_ms=700),
|
||||
language=language)
|
||||
text = ''
|
||||
|
||||
if info.language == "zh":
|
||||
logger.info("检测为中文文本, 转 FunASR 处理")
|
||||
if ("only_asr" not in globals()):
|
||||
from Ref_Audio_Selector.tool.asr.funasr_asr_multi_level_dir import \
|
||||
only_asr # #如果用英文就不需要导入下载模型
|
||||
text = only_asr(file_path)
|
||||
|
||||
if text == '':
|
||||
for segment in segments:
|
||||
text += segment.text
|
||||
output.append(f"{file_path}|{original_text}|{info.language.upper()}|{text}")
|
||||
print(f"{file_path}|{original_text}|{info.language.upper()}|{text}")
|
||||
except:
|
||||
return logger.error(traceback.format_exc())
|
||||
|
||||
output_folder = output_folder
|
||||
os.makedirs(output_folder, exist_ok=True)
|
||||
output_file_path = os.path.abspath(f'{output_folder}/{params.asr_filename}.list')
|
||||
|
||||
with open(output_file_path, "w", encoding="utf-8") as f:
|
||||
f.write("\n".join(output))
|
||||
logger.info(f"ASR 任务完成->标注文件路径: {output_file_path}\n")
|
||||
return output_file_path
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("-i", "--input_folder", type=str, required=True,
|
||||
help="Path to the folder containing WAV files.")
|
||||
parser.add_argument("-o", "--output_folder", type=str, required=True,
|
||||
help="Output folder to store transcriptions.")
|
||||
parser.add_argument("-s", "--model_size", type=str, default='large-v3',
|
||||
choices=check_fw_local_models(),
|
||||
help="Model Size of Faster Whisper")
|
||||
parser.add_argument("-l", "--language", type=str, default='ja',
|
||||
choices=language_code_list,
|
||||
help="Language of the audio files.")
|
||||
parser.add_argument("-p", "--precision", type=str, default='float16', choices=['float16', 'float32'],
|
||||
help="fp16 or fp32")
|
||||
|
||||
cmd = parser.parse_args()
|
||||
output_file_path = execute_asr_multi_level_dir(
|
||||
input_folder=cmd.input_folder,
|
||||
output_folder=cmd.output_folder,
|
||||
model_size=cmd.model_size,
|
||||
language=cmd.language,
|
||||
precision=cmd.precision,
|
||||
)
|
94
Ref_Audio_Selector/tool/asr/funasr_asr_multi_level_dir.py
Normal file
94
Ref_Audio_Selector/tool/asr/funasr_asr_multi_level_dir.py
Normal file
@ -0,0 +1,94 @@
|
||||
# -*- coding:utf-8 -*-
|
||||
|
||||
import argparse
|
||||
import os
|
||||
import traceback
|
||||
import Ref_Audio_Selector.config_param.config_params as params
|
||||
from Ref_Audio_Selector.config_param.log_config import logger
|
||||
from Ref_Audio_Selector.common.time_util import timeit_decorator
|
||||
from tqdm import tqdm
|
||||
from funasr import AutoModel
|
||||
|
||||
path_asr = 'tools/asr/models/speech_paraformer-large_asr_nat-zh-cn-16k-common-vocab8404-pytorch'
|
||||
path_vad = 'tools/asr/models/speech_fsmn_vad_zh-cn-16k-common-pytorch'
|
||||
path_punc = 'tools/asr/models/punc_ct-transformer_zh-cn-common-vocab272727-pytorch'
|
||||
path_asr = path_asr if os.path.exists(
|
||||
path_asr) else "iic/speech_paraformer-large_asr_nat-zh-cn-16k-common-vocab8404-pytorch"
|
||||
path_vad = path_vad if os.path.exists(path_vad) else "iic/speech_fsmn_vad_zh-cn-16k-common-pytorch"
|
||||
path_punc = path_punc if os.path.exists(path_punc) else "iic/punc_ct-transformer_zh-cn-common-vocab272727-pytorch"
|
||||
|
||||
model = AutoModel(
|
||||
model=path_asr,
|
||||
model_revision="v2.0.4",
|
||||
vad_model=path_vad,
|
||||
vad_model_revision="v2.0.4",
|
||||
punc_model=path_punc,
|
||||
punc_model_revision="v2.0.4",
|
||||
)
|
||||
|
||||
|
||||
def only_asr(input_file):
|
||||
try:
|
||||
text = model.generate(input=input_file)[0]["text"]
|
||||
except:
|
||||
text = ''
|
||||
logger.error(traceback.format_exc())
|
||||
return text
|
||||
|
||||
|
||||
@timeit_decorator
|
||||
def execute_asr_multi_level_dir(input_folder, output_folder, model_size, language):
|
||||
output = []
|
||||
# 递归遍历输入目录及所有子目录
|
||||
for root, dirs, files in os.walk(input_folder):
|
||||
for name in sorted(files):
|
||||
# 只处理wav文件(假设是wav文件)
|
||||
if name.endswith(".wav"):
|
||||
try:
|
||||
original_text = os.path.basename(root)
|
||||
# 构造完整的输入音频文件路径
|
||||
input_file_path = os.path.join(root, name)
|
||||
input_file_path = os.path.normpath(input_file_path) # 先标准化可能存在混合斜杠的情况
|
||||
asr_text = model.generate(input=input_file_path)[0]["text"]
|
||||
|
||||
output.append(f"{input_file_path}|{original_text}|{language.upper()}|{asr_text}")
|
||||
|
||||
except:
|
||||
logger.error(traceback.format_exc())
|
||||
|
||||
# 创建或打开指定的输出目录
|
||||
output_folder = output_folder
|
||||
output_dir_abs = os.path.abspath(output_folder)
|
||||
os.makedirs(output_dir_abs, exist_ok=True)
|
||||
|
||||
# 构造输出文件路径
|
||||
output_file_path = os.path.join(output_dir_abs, f'{params.asr_filename}.list')
|
||||
|
||||
# 将输出写入文件
|
||||
with open(output_file_path, "w", encoding="utf-8") as f:
|
||||
f.write("\n".join(output))
|
||||
logger.info(f"ASR 任务完成->标注文件路径: {output_file_path}\n")
|
||||
|
||||
return output_file_path
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("-i", "--input_folder", type=str, required=True,
|
||||
help="Path to the folder containing WAV files.")
|
||||
parser.add_argument("-o", "--output_folder", type=str, required=True,
|
||||
help="Output folder to store transcriptions.")
|
||||
parser.add_argument("-s", "--model_size", type=str, default='large',
|
||||
help="Model Size of FunASR is Large")
|
||||
parser.add_argument("-l", "--language", type=str, default='zh', choices=['zh'],
|
||||
help="Language of the audio files.")
|
||||
parser.add_argument("-p", "--precision", type=str, default='float16', choices=['float16', 'float32'],
|
||||
help="fp16 or fp32") # 还没接入
|
||||
|
||||
cmd = parser.parse_args()
|
||||
execute_asr_multi_level_dir(
|
||||
input_folder=cmd.input_folder,
|
||||
output_folder=cmd.output_folder,
|
||||
model_size=cmd.model_size,
|
||||
language=cmd.language,
|
||||
)
|
54
Ref_Audio_Selector/tool/audio_check.py
Normal file
54
Ref_Audio_Selector/tool/audio_check.py
Normal file
@ -0,0 +1,54 @@
|
||||
import os
|
||||
import shutil
|
||||
import Ref_Audio_Selector.common.common as common
|
||||
import Ref_Audio_Selector.config_param.config_params as params
|
||||
from Ref_Audio_Selector.config_param.log_config import logger
|
||||
|
||||
|
||||
def remove_matching_audio_files_in_text_dir(text_dir, emotions_list):
|
||||
count = 0
|
||||
emotions = [item['emotion'] for item in emotions_list]
|
||||
for root, dirs, files in os.walk(text_dir):
|
||||
for file in files:
|
||||
if file.endswith(".wav"):
|
||||
emotion_tag = os.path.basename(file)[:-4]
|
||||
if emotion_tag not in emotions:
|
||||
file_path = os.path.join(root, file)
|
||||
logger.info(f"Deleting file: {file_path}")
|
||||
try:
|
||||
os.remove(file_path)
|
||||
count += 1
|
||||
except Exception as e:
|
||||
logger.error(f"Error deleting file {file_path}: {e}")
|
||||
|
||||
return count
|
||||
|
||||
|
||||
def delete_emotion_subdirectories(emotion_dir, emotions_list):
|
||||
count = 0
|
||||
|
||||
emotions = [item['emotion'] for item in emotions_list]
|
||||
|
||||
for entry in os.listdir(emotion_dir):
|
||||
entry_path = os.path.join(emotion_dir, entry)
|
||||
if os.path.isdir(entry_path):
|
||||
if entry not in emotions:
|
||||
logger.info(f"Deleting directory: {entry_path}")
|
||||
try:
|
||||
# 使用shutil.rmtree删除整个子目录及其内容
|
||||
shutil.rmtree(entry_path)
|
||||
count += 1
|
||||
except Exception as e:
|
||||
logger.error(f"Error deleting directory {entry_path}: {e}")
|
||||
|
||||
return count
|
||||
|
||||
|
||||
def sync_ref_audio(ref_audio_dir, inference_audio_dir):
|
||||
ref_audio_manager = common.RefAudioListManager(ref_audio_dir)
|
||||
ref_list = ref_audio_manager.get_ref_audio_list()
|
||||
text_dir = os.path.join(inference_audio_dir, params.inference_audio_text_aggregation_dir)
|
||||
emotion_dir = os.path.join(inference_audio_dir, params.inference_audio_emotion_aggregation_dir)
|
||||
delete_text_wav_num = remove_matching_audio_files_in_text_dir(text_dir, ref_list)
|
||||
delete_emotion_dir_num = delete_emotion_subdirectories(emotion_dir, ref_list)
|
||||
return delete_text_wav_num, delete_emotion_dir_num
|
31
Ref_Audio_Selector/tool/audio_config.py
Normal file
31
Ref_Audio_Selector/tool/audio_config.py
Normal file
@ -0,0 +1,31 @@
|
||||
import os
|
||||
import platform
|
||||
|
||||
|
||||
def generate_audio_config(work_space_dir, template_str, audio_list, output_file_path):
|
||||
# 定义一个空字符串来存储最终要写入文件的内容
|
||||
file_content = ""
|
||||
|
||||
# 遍历参考音频列表
|
||||
for audio_info in audio_list:
|
||||
emotion = audio_info['emotion']
|
||||
ref_path = audio_info['ref_path']
|
||||
ref_text = audio_info['ref_text']
|
||||
|
||||
relative_path = os.path.relpath(ref_path, work_space_dir)
|
||||
if platform.system() == 'Windows':
|
||||
relative_path = relative_path.replace('\\', '/')
|
||||
|
||||
# 使用字符串模板替换变量
|
||||
formatted_line = template_str.replace('${emotion}', emotion).replace('${ref_path}', relative_path).replace(
|
||||
'${ref_text}', ref_text)
|
||||
|
||||
# 将格式化后的行添加到内容中,使用逗号和换行符分隔
|
||||
file_content += formatted_line + ",\n"
|
||||
|
||||
# 删除最后一个逗号和换行符,确保格式整洁
|
||||
file_content = file_content[:-2]
|
||||
|
||||
# 将内容写入输出文件
|
||||
with open(output_file_path, 'w', encoding='utf-8') as output_file:
|
||||
output_file.write(file_content)
|
238
Ref_Audio_Selector/tool/audio_inference.py
Normal file
238
Ref_Audio_Selector/tool/audio_inference.py
Normal file
@ -0,0 +1,238 @@
|
||||
import time
|
||||
import os
|
||||
import requests
|
||||
import itertools
|
||||
import multiprocessing
|
||||
from concurrent.futures import ProcessPoolExecutor
|
||||
import numpy as np
|
||||
import Ref_Audio_Selector.config_param.config_params as params
|
||||
from urllib.parse import urlparse, parse_qs, urlencode, urlunparse, quote
|
||||
from Ref_Audio_Selector.config_param.log_config import logger, p_logger
|
||||
|
||||
|
||||
class SetModelURLComposer:
|
||||
def __init__(self, type, base_url, gpt_param_name, sovits_param_name):
|
||||
self.type = type
|
||||
self.base_url = base_url
|
||||
self.gpt_param_name = gpt_param_name
|
||||
self.sovits_param_name = sovits_param_name
|
||||
|
||||
def is_valid(self):
|
||||
if self.base_url is None or self.base_url == '':
|
||||
raise Exception("请求地址不能为空")
|
||||
if self.type in ['gpt', 'all']:
|
||||
if self.gpt_param_name is None or self.gpt_param_name == '':
|
||||
raise Exception("GPT参数名不能为空")
|
||||
if self.type in ['sovits', 'all']:
|
||||
if self.sovits_param_name is None or self.sovits_param_name == '':
|
||||
raise Exception("Sovits参数名不能为空")
|
||||
|
||||
def build_get_url(self, value_array, need_url_encode=True):
|
||||
params = {}
|
||||
if self.type == 'gpt':
|
||||
params[self.gpt_param_name] = value_array[0]
|
||||
if self.type == 'sovits':
|
||||
params[self.sovits_param_name] = value_array[0]
|
||||
if self.type == 'all':
|
||||
params[self.gpt_param_name] = value_array[0]
|
||||
params[self.sovits_param_name] = value_array[1]
|
||||
return append_params_to_url(self.base_url, params, need_url_encode)
|
||||
|
||||
def build_post_url(self, value_array, need_url_encode=True):
|
||||
url = append_params_to_url(self.base_url, {}, need_url_encode)
|
||||
params = {}
|
||||
if self.type == 'gpt':
|
||||
params[self.gpt_param_name] = value_array[0]
|
||||
if self.type == 'sovits':
|
||||
params[self.sovits_param_name] = value_array[0]
|
||||
if self.type == 'all':
|
||||
params[self.gpt_param_name] = value_array[0]
|
||||
params[self.sovits_param_name] = value_array[1]
|
||||
return url, params
|
||||
|
||||
|
||||
class TTSURLComposer:
|
||||
def __init__(self, base_url, refer_type_param, emotion_param_name, text_param_name, ref_path_param_name, ref_text_param_name):
|
||||
self.base_url = base_url
|
||||
# 角色情绪 or 参考音频
|
||||
self.refer_type_param = refer_type_param
|
||||
self.emotion_param_name = emotion_param_name
|
||||
self.text_param_name = text_param_name
|
||||
self.ref_path_param_name = ref_path_param_name
|
||||
self.ref_text_param_name = ref_text_param_name
|
||||
|
||||
def is_valid(self):
|
||||
if self.base_url is None or self.base_url == '':
|
||||
raise ValueError("请输入url")
|
||||
|
||||
if self.text_param_name is None or self.text_param_name == '':
|
||||
raise ValueError("请输入text参数名")
|
||||
|
||||
if self.emotion_param_name is None and self.ref_path_param_name is None and self.ref_text_param_name is None:
|
||||
raise ValueError("请输入至少一个参考or情绪的参数")
|
||||
|
||||
def is_emotion(self):
|
||||
return self.refer_type_param == '角色情绪'
|
||||
|
||||
def build_url_with_emotion(self, text_value, emotion_value, need_url_encode=True):
|
||||
params = {
|
||||
self.text_param_name: text_value,
|
||||
self.emotion_param_name: emotion_value,
|
||||
}
|
||||
return append_params_to_url(self.base_url, params, need_url_encode)
|
||||
|
||||
def build_url_with_ref(self, text_value, ref_path_value, ref_text_value, need_url_encode=True):
|
||||
params = {
|
||||
self.text_param_name: text_value,
|
||||
self.ref_path_param_name: ref_path_value,
|
||||
self.ref_text_param_name: ref_text_value,
|
||||
}
|
||||
return append_params_to_url(self.base_url, params, need_url_encode)
|
||||
|
||||
|
||||
def append_params_to_url(url_with_params, params, need_url_encode):
|
||||
if params:
|
||||
query_params = '&'.join([f"{k}={v}" for k, v in params.items()])
|
||||
url_with_params += '?' + query_params if '?' not in url_with_params else '&' + query_params
|
||||
return url_with_params if not need_url_encode else safe_encode_query_params(url_with_params)
|
||||
|
||||
|
||||
def safe_encode_query_params(original_url):
|
||||
# 分析URL以获取查询字符串部分
|
||||
parsed_url = urlparse(original_url)
|
||||
query_params = parse_qs(parsed_url.query)
|
||||
|
||||
# 将查询参数转换为编码过的字典(键值对会被转码)
|
||||
encoded_params = {k: quote(v[0]) for k, v in query_params.items()}
|
||||
|
||||
# 重新编码查询字符串
|
||||
new_query_string = urlencode(encoded_params, doseq=False)
|
||||
|
||||
# 重建完整的URL
|
||||
new_parsed_url = parsed_url._replace(query=new_query_string)
|
||||
encoded_url = urlunparse(new_parsed_url)
|
||||
|
||||
logger.info(encoded_url)
|
||||
return encoded_url
|
||||
|
||||
|
||||
def generate_audio_files_parallel(url_composer, text_list, emotion_list, output_dir_path, num_processes=1):
|
||||
|
||||
# 将emotion_list均匀分成num_processes个子集
|
||||
emotion_groups = np.array_split(emotion_list, num_processes)
|
||||
|
||||
with ProcessPoolExecutor(max_workers=num_processes) as executor:
|
||||
futures = [
|
||||
executor.submit(generate_audio_files_for_emotion_group, url_composer, text_list, group, output_dir_path)
|
||||
for group in emotion_groups]
|
||||
for future in futures:
|
||||
future.result() # 等待所有进程完成
|
||||
|
||||
|
||||
def generate_audio_files_for_emotion_group(url_composer, text_list, emotion_list, output_dir_path):
|
||||
start_time = time.perf_counter() # 使用 perf_counter 获取高精度计时起点
|
||||
# Ensure the output directory exists
|
||||
output_dir = os.path.abspath(output_dir_path)
|
||||
os.makedirs(output_dir, exist_ok=True)
|
||||
|
||||
# Create subdirectories for text and emotion categories
|
||||
text_subdir = os.path.join(output_dir, params.inference_audio_text_aggregation_dir)
|
||||
os.makedirs(text_subdir, exist_ok=True)
|
||||
emotion_subdir = os.path.join(output_dir, params.inference_audio_emotion_aggregation_dir)
|
||||
os.makedirs(emotion_subdir, exist_ok=True)
|
||||
|
||||
all_count = len(text_list) * len(emotion_list)
|
||||
has_generated_count = 0
|
||||
all_text_count = sum(len(item) for item in text_list)
|
||||
|
||||
# 计算笛卡尔积
|
||||
cartesian_product = list(itertools.product(text_list, emotion_list))
|
||||
|
||||
for text, emotion in cartesian_product:
|
||||
# Generate audio byte stream using the create_audio function
|
||||
|
||||
emotion_name = emotion['emotion']
|
||||
|
||||
text_subdir_text = os.path.join(text_subdir, text)
|
||||
os.makedirs(text_subdir_text, exist_ok=True)
|
||||
text_subdir_text_file_path = os.path.join(text_subdir_text, emotion_name + '.wav')
|
||||
|
||||
emotion_subdir_emotion = os.path.join(emotion_subdir, emotion_name)
|
||||
os.makedirs(emotion_subdir_emotion, exist_ok=True)
|
||||
emotion_subdir_emotion_file_path = os.path.join(emotion_subdir_emotion, text + '.wav')
|
||||
|
||||
# 检查是否已经存在对应的音频文件,如果存在则跳过
|
||||
if os.path.exists(text_subdir_text_file_path) and os.path.exists(emotion_subdir_emotion_file_path):
|
||||
has_generated_count += 1
|
||||
logger.info(f"进程ID: {os.getpid()}, 进度: {has_generated_count}/{all_count}")
|
||||
continue
|
||||
|
||||
if url_composer.is_emotion():
|
||||
real_url = url_composer.build_url_with_emotion(text, emotion['emotion'], False)
|
||||
else:
|
||||
real_url = url_composer.build_url_with_ref(text, emotion['ref_path'], emotion['ref_text'], False)
|
||||
|
||||
audio_bytes = inference_audio_from_api(real_url)
|
||||
|
||||
# Write audio bytes to the respective files
|
||||
with open(text_subdir_text_file_path, 'wb') as f:
|
||||
f.write(audio_bytes)
|
||||
with open(emotion_subdir_emotion_file_path, 'wb') as f:
|
||||
f.write(audio_bytes)
|
||||
|
||||
has_generated_count += 1
|
||||
logger.info(f"进程ID: {os.getpid()}, 进度: {has_generated_count}/{all_count}")
|
||||
end_time = time.perf_counter() # 获取计时终点
|
||||
elapsed_time = end_time - start_time # 计算执行耗时
|
||||
# 记录日志内容
|
||||
log_message = f"进程ID: {os.getpid()}, generate_audio_files_for_emotion_group 执行耗时: {elapsed_time:.6f} 秒;推理数量: {has_generated_count}; 字符总数:{all_text_count};每秒推理字符数:{all_text_count*len(emotion_list) / elapsed_time:.3f};"
|
||||
p_logger.info(log_message)
|
||||
logger.info(log_message)
|
||||
|
||||
|
||||
def inference_audio_from_api(url):
|
||||
logger.info(f'inference_audio_from_api url: {url}')
|
||||
# 发起GET请求
|
||||
response = requests.get(url, stream=True)
|
||||
|
||||
# 检查响应状态码是否正常(例如200表示成功)
|
||||
if response.status_code == 200:
|
||||
# 返回音频数据的字节流
|
||||
return response.content
|
||||
else:
|
||||
raise Exception(f"Failed to fetch audio from API. Server responded with status code {response.status_code}.message: {response.json()}")
|
||||
|
||||
|
||||
def start_api_set_model(set_model_url_composer, gpt_models, sovits_models):
|
||||
url, post_body = set_model_url_composer.build_post_url([gpt_models, sovits_models], True)
|
||||
logger.info(f'set_model_url_composer url: {set_model_url_composer}')
|
||||
logger.info(f'start_api_set_model url: {url}')
|
||||
logger.info(f'start_api_set_model post_body: {post_body}')
|
||||
response = requests.post(url, json=post_body)
|
||||
if response.status_code == 200:
|
||||
result = response.text
|
||||
return result
|
||||
else:
|
||||
return f'请求失败,状态码:{response.status_code}'
|
||||
|
||||
|
||||
def start_api_v2_set_gpt_model(set_model_url_composer, gpt_models):
|
||||
url = set_model_url_composer.build_get_url([gpt_models], False)
|
||||
logger.info(f'start_api_v2_set_gpt_model url: {url}')
|
||||
response = requests.get(url)
|
||||
if response.status_code == 200:
|
||||
result = response.text
|
||||
return result
|
||||
else:
|
||||
return f'请求失败,状态码:{response.status_code}'
|
||||
|
||||
|
||||
def start_api_v2_set_sovits_model(set_model_url_composer, sovits_models):
|
||||
url = set_model_url_composer.build_get_url([sovits_models], False)
|
||||
logger.info(f'start_api_v2_set_sovits_model url: {url}')
|
||||
response = requests.get(url)
|
||||
if response.status_code == 200:
|
||||
result = response.text
|
||||
return result
|
||||
else:
|
||||
return f'请求失败,状态码:{response.status_code}'
|
162
Ref_Audio_Selector/tool/audio_sample.py
Normal file
162
Ref_Audio_Selector/tool/audio_sample.py
Normal file
@ -0,0 +1,162 @@
|
||||
import os
|
||||
import shutil
|
||||
import random
|
||||
import librosa
|
||||
from Ref_Audio_Selector.config_param.log_config import logger
|
||||
|
||||
|
||||
def check_audio_duration(path, min_duration=3, max_duration=10):
|
||||
try:
|
||||
|
||||
# 直接计算音频文件的时长(单位:秒)
|
||||
duration = librosa.get_duration(filename=path)
|
||||
|
||||
# 判断时长是否在3s至10s之间
|
||||
if min_duration <= duration <= max_duration:
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"无法打开或处理音频文件:{e}")
|
||||
return None
|
||||
|
||||
|
||||
def convert_from_list(list_file, output_dir):
|
||||
# 创建输出目录,如果它不存在的话
|
||||
if not os.path.exists(output_dir):
|
||||
os.makedirs(output_dir)
|
||||
|
||||
# 解析.list文件,并操作文件
|
||||
with open(list_file, 'r', encoding='utf-8') as file:
|
||||
lines = file.readlines()
|
||||
|
||||
for line in lines:
|
||||
parts = line.strip().split('|')
|
||||
if len(parts) != 4:
|
||||
logger.error(f"Line format incorrect: {line}")
|
||||
continue
|
||||
|
||||
audio_path, _, _, transcription = parts
|
||||
|
||||
# 构建新的文件名和路径
|
||||
new_filename = transcription.strip() + '.wav'
|
||||
# new_filename = new_filename.replace(' ', '_') # 移除空格
|
||||
# new_filename = ''.join(e for e in new_filename if e.isalnum() or e in ['_', '.']) # 移除非法字符
|
||||
new_path = os.path.join(output_dir, new_filename)
|
||||
|
||||
# 如果目标文件已存在,不要覆盖
|
||||
if os.path.exists(new_path):
|
||||
logger.info(f"File already exists: {new_path}")
|
||||
continue
|
||||
|
||||
try:
|
||||
# 检查音频文件是否存在
|
||||
if not os.path.exists(audio_path):
|
||||
logger.info(f"Audio file does not exist: {audio_path}")
|
||||
continue
|
||||
|
||||
if check_audio_duration(audio_path):
|
||||
# 复制音频文件到output目录并重命名
|
||||
shutil.copy2(audio_path, new_path)
|
||||
logger.info(f"File copied and renamed to: {new_path}")
|
||||
else:
|
||||
logger.info(f"File skipped due to duration: {audio_path}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"An error occurred while processing: {audio_path}")
|
||||
logger.error(e)
|
||||
|
||||
logger.info("Processing complete.")
|
||||
|
||||
|
||||
def sample(output_audio_dir, similarity_list, subsection_num, sample_num):
|
||||
# 按照相似度分值降序排序相似度列表
|
||||
similarity_list.sort(key=lambda x: x['score'], reverse=True)
|
||||
|
||||
# 计算每段的起始索引
|
||||
step = len(similarity_list) // subsection_num
|
||||
if len(similarity_list) % subsection_num != 0:
|
||||
step += 1
|
||||
|
||||
# 分段并随机采样
|
||||
for i in range(subsection_num):
|
||||
start = i * step
|
||||
end = (i + 1) * step
|
||||
end = min(end, len(similarity_list)) # 防止最后一段越界
|
||||
|
||||
# 创建子列表
|
||||
subsection = similarity_list[start:end]
|
||||
# 在子列表上随机打乱
|
||||
random.shuffle(subsection)
|
||||
|
||||
# 从打乱后的子列表中抽取相应数量的个体
|
||||
num = min(sample_num, len(subsection))
|
||||
sampled_subsection = subsection[:num]
|
||||
|
||||
# 创建并进入子目录
|
||||
subdir_name = f'emotion_{i + 1}'
|
||||
subdir_path = os.path.join(output_audio_dir, subdir_name)
|
||||
os.makedirs(subdir_path, exist_ok=True)
|
||||
|
||||
# 复制采样结果的音频到子目录
|
||||
for item in sampled_subsection:
|
||||
src_path = item['wav_path']
|
||||
dst_path = os.path.join(subdir_path, os.path.basename(src_path))
|
||||
shutil.copyfile(src_path, dst_path)
|
||||
|
||||
logger.info("Sampling completed.")
|
||||
|
||||
|
||||
def parse_similarity_file(file_path):
|
||||
"""
|
||||
解析指定文本文件,将其中的内容以元组形式存入列表。
|
||||
|
||||
参数:
|
||||
file_path (str): 待解析的文本文件路径。
|
||||
|
||||
返回:
|
||||
list[tuple[float, str]]: 存储浮点数和路径的元组列表。
|
||||
"""
|
||||
result_list = []
|
||||
|
||||
with open(file_path, 'r', encoding='utf-8') as file:
|
||||
for line in file:
|
||||
# 去除行尾换行符并按'|'分割
|
||||
score, filepath = line.strip().split('|')
|
||||
|
||||
# 将浮点数字符串转换为浮点数类型
|
||||
score = float(score)
|
||||
|
||||
# 将得分和路径作为元组添加到结果列表
|
||||
result_list.append({
|
||||
'score': score,
|
||||
'wav_path': filepath
|
||||
})
|
||||
|
||||
return result_list
|
||||
|
||||
|
||||
def copy_and_move(output_audio_directory, similarity_scores):
|
||||
# 确保新目录存在
|
||||
if not os.path.exists(output_audio_directory):
|
||||
os.makedirs(output_audio_directory)
|
||||
|
||||
# 遍历并复制文件
|
||||
for item in similarity_scores:
|
||||
# 构造新的文件名
|
||||
base_name = os.path.basename(item['wav_path'])[:-4] # 去掉.wav扩展名
|
||||
new_name = f"{item['score'] * 10000:04.0f}-{base_name}.wav"
|
||||
|
||||
# 新文件的完整路径
|
||||
new_path = os.path.join(output_audio_directory, new_name)
|
||||
|
||||
# 复制文件到新目录
|
||||
shutil.copyfile(item['wav_path'], new_path)
|
||||
|
||||
logger.info("已完成复制和重命名操作。")
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
similarity_list = parse_similarity_file("D:/tt/similarity/啊,除了伊甸和樱,竟然还有其他人会提起我?.txt")
|
||||
sample('D:/tt/similarity/output', similarity_list, 10, 4)
|
142
Ref_Audio_Selector/tool/speaker_verification/voice_similarity.py
Normal file
142
Ref_Audio_Selector/tool/speaker_verification/voice_similarity.py
Normal file
@ -0,0 +1,142 @@
|
||||
import argparse
|
||||
import os
|
||||
import torchaudio
|
||||
import torchaudio.transforms as T
|
||||
import platform
|
||||
import Ref_Audio_Selector.config_param.config_params as params
|
||||
import Ref_Audio_Selector.config_param.log_config as log_config
|
||||
from Ref_Audio_Selector.common.time_util import timeit_decorator
|
||||
from Ref_Audio_Selector.common.model_manager import speaker_verification_models as models
|
||||
|
||||
from modelscope.pipelines import pipeline
|
||||
|
||||
|
||||
def init_model(model_type='speech_campplus_sv_zh-cn_16k-common'):
|
||||
log_config.logger.info(f'人声识别模型类型:{model_type}')
|
||||
return pipeline(
|
||||
task=models[model_type]['task'],
|
||||
model=models[model_type]['model'],
|
||||
model_revision=models[model_type]['model_revision']
|
||||
)
|
||||
|
||||
|
||||
@timeit_decorator
|
||||
def compare_audio_and_generate_report(reference_audio_path, comparison_dir_path, output_file_path, model_type):
|
||||
sv_pipeline = init_model(model_type)
|
||||
|
||||
# Step 1: 获取比较音频目录下所有音频文件的路径
|
||||
comparison_audio_paths = [os.path.join(comparison_dir_path, f) for f in os.listdir(comparison_dir_path) if
|
||||
f.endswith('.wav')]
|
||||
|
||||
if platform.system() == 'Windows':
|
||||
# 因为这个模型是基于16k音频数据训练的,为了避免后续比较时,每次都对参考音频进行重采样,所以,提前进行了采样
|
||||
# windows不支持torchaudio.sox_effects.apply_effects_tensor,所以改写了依赖文件中的重采样方法
|
||||
# 改用torchaudio.transforms.Resample进行重采样,如果在非windows环境下,没有更改依赖包的采样方法的话,
|
||||
# 使用这段代码进行预采样会出现因为采样方法不同,而导致的模型相似度计算不准确的问题
|
||||
# 当然如果在windows下,使用了其他的采样方法,也会出现不准确的问题
|
||||
if params.enable_pre_sample == 'true':
|
||||
reference_audio_16k = ensure_16k_wav(reference_audio_path)
|
||||
else:
|
||||
reference_audio_16k = reference_audio_path
|
||||
else:
|
||||
reference_audio_16k = reference_audio_path
|
||||
|
||||
# Step 2: 用参考音频依次比较音频目录下的每个音频,获取相似度分数及对应路径
|
||||
all_count = len(comparison_audio_paths)
|
||||
has_processed_count = 0
|
||||
similarity_scores = []
|
||||
for audio_path in comparison_audio_paths:
|
||||
score = sv_pipeline([reference_audio_16k, audio_path])['score']
|
||||
similarity_scores.append({
|
||||
'score': score,
|
||||
'path': audio_path
|
||||
})
|
||||
has_processed_count += 1
|
||||
log_config.logger.info(f'进度:{has_processed_count}/{all_count}')
|
||||
|
||||
# Step 3: 根据相似度分数降序排列
|
||||
similarity_scores.sort(key=lambda x: x['score'], reverse=True)
|
||||
|
||||
# Step 4: 处理输出文件不存在的情况,创建新文件
|
||||
if not os.path.exists(output_file_path):
|
||||
open(output_file_path, 'w').close() # Create an empty file
|
||||
|
||||
# Step 5: 将排序后的结果写入输出结果文件(支持中文)
|
||||
formatted_scores = [f'{item["score"]}|{item["path"]}' for item in similarity_scores]
|
||||
with open(output_file_path, 'w', encoding='utf-8') as f:
|
||||
# 使用'\n'将每个字符串分开,使其写入不同行
|
||||
content = '\n'.join(formatted_scores)
|
||||
f.write(content)
|
||||
|
||||
|
||||
def ensure_16k_wav(audio_file_path, target_sample_rate=16000):
|
||||
"""
|
||||
输入一个音频文件地址,判断其采样率并决定是否进行重采样,然后将结果保存到指定的输出文件。
|
||||
|
||||
参数:
|
||||
audio_file_path (str): 音频文件路径。
|
||||
output_file_path (str): 保存重采样后音频数据的目标文件路径。
|
||||
target_sample_rate (int, optional): 目标采样率,默认为16000Hz。
|
||||
"""
|
||||
# 读取音频文件并获取其采样率
|
||||
waveform, sample_rate = torchaudio.load(audio_file_path)
|
||||
|
||||
# 判断是否需要重采样
|
||||
if sample_rate == target_sample_rate:
|
||||
return audio_file_path
|
||||
else:
|
||||
|
||||
# 创建Resample实例
|
||||
resampler = T.Resample(orig_freq=sample_rate, new_freq=target_sample_rate)
|
||||
|
||||
# 应用重采样
|
||||
resampled_waveform = resampler(waveform)
|
||||
|
||||
# 创建临时文件夹
|
||||
os.makedirs(params.temp_dir, exist_ok=True)
|
||||
|
||||
# 设置临时文件名
|
||||
temp_file_path = os.path.join(params.temp_dir, os.path.basename(audio_file_path))
|
||||
|
||||
# 保存重采样后的音频到指定文件
|
||||
torchaudio.save(temp_file_path, resampled_waveform, target_sample_rate)
|
||||
|
||||
return temp_file_path
|
||||
|
||||
|
||||
def parse_arguments():
|
||||
parser = argparse.ArgumentParser(description="Audio processing script arguments")
|
||||
|
||||
# Reference audio path
|
||||
parser.add_argument("-r", "--reference_audio", type=str, required=True,
|
||||
help="Path to the reference WAV file.")
|
||||
|
||||
# Comparison directory path
|
||||
parser.add_argument("-c", "--comparison_dir", type=str, required=True,
|
||||
help="Path to the directory containing comparison WAV files.")
|
||||
|
||||
# Output file path
|
||||
parser.add_argument("-o", "--output_file", type=str, required=True,
|
||||
help="Path to the output file where results will be written.")
|
||||
|
||||
# Model Type
|
||||
parser.add_argument("-m", "--model_type", type=str, required=True,
|
||||
help="Path to the model type.")
|
||||
|
||||
return parser.parse_args()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
cmd = parse_arguments()
|
||||
compare_audio_and_generate_report(
|
||||
reference_audio_path=cmd.reference_audio,
|
||||
comparison_dir_path=cmd.comparison_dir,
|
||||
output_file_path=cmd.output_file,
|
||||
model_type=cmd.model_type,
|
||||
)
|
||||
|
||||
# compare_audio_and_generate_report(
|
||||
# reference_audio_path="D:/tt/渡鸦/refer_audio_all/也对,你的身份和我们不同吗?.wav",
|
||||
# comparison_dir_path='D:/tt/渡鸦/refer_audio_all',
|
||||
# output_file_path='D:/tt/渡鸦/test.txt',
|
||||
# )
|
77
Ref_Audio_Selector/tool/text_check.py
Normal file
77
Ref_Audio_Selector/tool/text_check.py
Normal file
@ -0,0 +1,77 @@
|
||||
import os
|
||||
import Ref_Audio_Selector.common.common as common
|
||||
import Ref_Audio_Selector.tool.audio_check as audio_check
|
||||
from Ref_Audio_Selector.config_param.log_config import logger
|
||||
|
||||
|
||||
def parse_text_similarity_result_txt(file_path):
|
||||
"""
|
||||
解析指定格式的txt文件,每行格式:f"{item['average_similarity_score']}|{item['count']}|{item['emotion']}"
|
||||
|
||||
:param file_path: txt文件的路径
|
||||
:return: 包含解析后数据的字典列表
|
||||
"""
|
||||
data_list = []
|
||||
with open(file_path, 'r', encoding='utf-8') as file:
|
||||
for line in file:
|
||||
# 使用'|'作为分隔符分割每行数据
|
||||
parts = line.strip().split('|')
|
||||
if len(parts) == 3:
|
||||
# 将分割后的字符串转换为浮点数、整数和字符串
|
||||
try:
|
||||
item = {
|
||||
'average_similarity_score': float(parts[0]),
|
||||
'count': int(parts[1]),
|
||||
'emotion': parts[2]
|
||||
}
|
||||
data_list.append(item)
|
||||
except ValueError as e:
|
||||
# 如果转换失败,打印错误信息并跳过该行
|
||||
logger.error(f"Error parsing line: {line.strip()} - {e}")
|
||||
|
||||
return data_list
|
||||
|
||||
|
||||
def remove_low_similarity_files(ref_audio_list, report_list, audio_text_similarity_boundary):
|
||||
"""
|
||||
根据条件删除低相似度音频文件并返回删除数量。
|
||||
|
||||
:param ref_audio_list: 包含音频路径和情感属性的列表
|
||||
:param report_list: 包含相似度评分和情感属性的列表
|
||||
:param audio_text_similarity_boundary: 相似度阈值
|
||||
:return: 删除的文件数量
|
||||
"""
|
||||
deleted_count = 0
|
||||
|
||||
# 筛选出平均相似度低于阈值的报告
|
||||
low_similarity_reports = [report for report in report_list if
|
||||
report['average_similarity_score'] < audio_text_similarity_boundary]
|
||||
|
||||
# 遍历低相似度报告,查找并删除对应音频文件
|
||||
for report in low_similarity_reports:
|
||||
emotion = report['emotion']
|
||||
# 查找ref_audio_list中相同情感的音频文件路径
|
||||
matching_refs = [ref for ref in ref_audio_list if ref['emotion'] == emotion]
|
||||
for match in matching_refs:
|
||||
ref_path = match['ref_path']
|
||||
# 检查文件是否存在,然后尝试删除
|
||||
if os.path.exists(ref_path):
|
||||
try:
|
||||
os.remove(ref_path)
|
||||
deleted_count += 1
|
||||
logger.info(f"Deleted file: {ref_path}")
|
||||
except Exception as e:
|
||||
logger.error(f"Error deleting file {ref_path}: {e}")
|
||||
else:
|
||||
logger.error(f"File not found: {ref_path}")
|
||||
|
||||
return deleted_count
|
||||
|
||||
|
||||
def delete_ref_audio_below_boundary(ref_audio_path, text_similarity_result_path, sync_inference_audio_dir,
|
||||
audio_text_similarity_boundary):
|
||||
ref_audio_list = common.RefAudioListManager(ref_audio_path).get_ref_audio_list()
|
||||
report_list = parse_text_similarity_result_txt(text_similarity_result_path)
|
||||
count = remove_low_similarity_files(ref_audio_list, report_list, audio_text_similarity_boundary)
|
||||
audio_check.sync_ref_audio(ref_audio_path, sync_inference_audio_dir)
|
||||
return count
|
0
Ref_Audio_Selector/tool/text_comparison/__init__.py
Normal file
0
Ref_Audio_Selector/tool/text_comparison/__init__.py
Normal file
161
Ref_Audio_Selector/tool/text_comparison/asr_text_process.py
Normal file
161
Ref_Audio_Selector/tool/text_comparison/asr_text_process.py
Normal file
@ -0,0 +1,161 @@
|
||||
import os
|
||||
import argparse
|
||||
from collections import defaultdict
|
||||
from operator import itemgetter
|
||||
from Ref_Audio_Selector.common.time_util import timeit_decorator
|
||||
import Ref_Audio_Selector.tool.text_comparison.text_comparison as text_comparison
|
||||
import Ref_Audio_Selector.config_param.config_params as params
|
||||
import Ref_Audio_Selector.common.common as common
|
||||
from Ref_Audio_Selector.config_param.log_config import logger
|
||||
|
||||
|
||||
def parse_asr_file(file_path):
|
||||
output = []
|
||||
|
||||
with open(file_path, 'r', encoding='utf-8') as file:
|
||||
for line in file:
|
||||
# 假设每行都是正确的格式,且"|"'是固定分隔符
|
||||
input_file_path, original_text, language, asr_text = line.strip().split('|')
|
||||
|
||||
emotion = common.get_filename_without_extension(input_file_path)
|
||||
|
||||
# 将解析出的数据构造成新的字典或元组等结构
|
||||
parsed_data = {
|
||||
'emotion': emotion,
|
||||
'input_file_path': input_file_path,
|
||||
'original_text': original_text,
|
||||
'language': language,
|
||||
'asr_text': asr_text,
|
||||
'similarity_score': 0
|
||||
}
|
||||
|
||||
output.append(parsed_data)
|
||||
|
||||
return output
|
||||
|
||||
|
||||
@timeit_decorator
|
||||
def calculate_similarity_and_append_to_list(input_list, boundary):
|
||||
all_count = len(input_list)
|
||||
has_been_processed_count = 0
|
||||
for item in input_list:
|
||||
original_score, similarity_score = text_comparison.calculate_result(item['original_text'], item['asr_text'], boundary)
|
||||
item['similarity_score'] = similarity_score
|
||||
item['original_score'] = original_score
|
||||
has_been_processed_count += 1
|
||||
logger.info(f'进度:{has_been_processed_count}/{all_count}')
|
||||
|
||||
return input_list
|
||||
|
||||
|
||||
def calculate_average_similarity_by_emotion(data_list):
|
||||
result_dict = defaultdict(list)
|
||||
|
||||
for item in data_list:
|
||||
emotion = item['emotion']
|
||||
similarity_score = item['similarity_score']
|
||||
result_dict[emotion].append(similarity_score)
|
||||
|
||||
average_scores = [{'emotion': emotion, 'average_similarity_score': sum(scores) / len(scores), 'count': len(scores)}
|
||||
for emotion, scores in result_dict.items()]
|
||||
|
||||
average_scores.sort(key=lambda x: x['average_similarity_score'], reverse=True)
|
||||
|
||||
return average_scores
|
||||
|
||||
|
||||
def group_and_sort_by_field(data, group_by_field):
|
||||
# 创建一个空的结果字典,键是group_by_field指定的字段,值是一个列表
|
||||
result_dict = defaultdict(list)
|
||||
|
||||
# 遍历输入列表
|
||||
for item in data:
|
||||
# 根据指定的group_by_field将当前元素添加到对应键的列表中
|
||||
key_to_group = item[group_by_field]
|
||||
result_dict[key_to_group].append(item)
|
||||
|
||||
# 对每个键对应的列表中的元素按similarity_score降序排序
|
||||
for key in result_dict:
|
||||
result_dict[key].sort(key=itemgetter('similarity_score'), reverse=True)
|
||||
|
||||
# 将结果字典转换为列表,每个元素是一个包含键(emotion或original_text)和排序后数组的元组
|
||||
result_list = [(k, v) for k, v in result_dict.items()]
|
||||
|
||||
return result_list
|
||||
|
||||
|
||||
def format_list_to_text(data_list, output_filename):
|
||||
with open(output_filename, 'w', encoding='utf-8') as output_file:
|
||||
output_file.write('放大后的相似度分值|原始分值|ASR文本|原文文本\n')
|
||||
for key, items in data_list:
|
||||
# 写入情绪标题
|
||||
output_file.write(key + '\n')
|
||||
|
||||
# 写入每条记录
|
||||
for item in items:
|
||||
formatted_line = f"{item['similarity_score']}|{item['original_score']}|{item['asr_text']}|{item['original_text']}\n"
|
||||
output_file.write(formatted_line)
|
||||
|
||||
|
||||
def format_list_to_emotion(data_list, output_filename):
|
||||
with open(output_filename, 'w', encoding='utf-8') as output_file:
|
||||
output_file.write('放大后的相似度分值|原始分值|ASR文本|情绪类型\n')
|
||||
for key, items in data_list:
|
||||
# 写入情绪标题
|
||||
output_file.write(key + '\n')
|
||||
|
||||
# 写入每条记录
|
||||
for item in items:
|
||||
formatted_line = f"{item['similarity_score']}|{item['original_score']}|{item['asr_text']}|{item['emotion']}\n"
|
||||
output_file.write(formatted_line)
|
||||
|
||||
|
||||
@timeit_decorator
|
||||
def process(asr_file_path, output_dir, similarity_enlarge_boundary):
|
||||
# 检查输出目录是否存在,如果不存在则创建
|
||||
if not os.path.exists(output_dir):
|
||||
os.makedirs(output_dir)
|
||||
|
||||
records = parse_asr_file(asr_file_path)
|
||||
calculate_similarity_and_append_to_list(records, similarity_enlarge_boundary)
|
||||
average_similarity_list = calculate_average_similarity_by_emotion(records)
|
||||
|
||||
average_similarity_file = os.path.join(output_dir,
|
||||
f'{params.text_emotion_average_similarity_report_filename}.txt')
|
||||
average_similarity_content = \
|
||||
'\n'.join([f"{item['average_similarity_score']}|{item['count']}|{item['emotion']}" for item in average_similarity_list])
|
||||
common.write_text_to_file(average_similarity_content, average_similarity_file)
|
||||
|
||||
emotion_detail_list = group_and_sort_by_field(records, 'emotion')
|
||||
|
||||
emotion_detail_file = os.path.join(output_dir, f'{params.text_similarity_by_emotion_detail_filename}.txt')
|
||||
format_list_to_text(emotion_detail_list, emotion_detail_file)
|
||||
|
||||
original_text_detail_list = group_and_sort_by_field(records, 'original_text')
|
||||
|
||||
original_text_detail_file = os.path.join(output_dir, f'{params.text_similarity_by_text_detail_filename}.txt')
|
||||
format_list_to_emotion(original_text_detail_list, original_text_detail_file)
|
||||
|
||||
logger.info('文本相似度分析完成。')
|
||||
|
||||
|
||||
def parse_arguments():
|
||||
parser = argparse.ArgumentParser(description="Process ASR files and analyze similarity.")
|
||||
|
||||
parser.add_argument("-a", "--asr_file_path", type=str, required=True,
|
||||
help="Path to the directory containing ASR files or path to a single ASR file.")
|
||||
|
||||
parser.add_argument("-o", "--output_dir", type=str, required=True,
|
||||
help="Path to the directory where the analysis results should be saved.")
|
||||
|
||||
parser.add_argument("-b", "--similarity_enlarge_boundary", type=float, required=True,
|
||||
help="Similarity score boundary value to be used in your calculations.")
|
||||
|
||||
args = parser.parse_args()
|
||||
return args
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
cmd = parse_arguments()
|
||||
# print(cmd)
|
||||
process(cmd.asr_file_path, cmd.output_dir, cmd.similarity_enlarge_boundary)
|
128
Ref_Audio_Selector/tool/text_comparison/text_comparison.py
Normal file
128
Ref_Audio_Selector/tool/text_comparison/text_comparison.py
Normal file
@ -0,0 +1,128 @@
|
||||
import os
|
||||
import torch
|
||||
from transformers import AutoTokenizer, AutoModel
|
||||
from scipy.spatial.distance import cosine
|
||||
from Ref_Audio_Selector.config_param.log_config import logger
|
||||
|
||||
bert_path = os.environ.get(
|
||||
"bert_path", "GPT_SoVITS/pretrained_models/chinese-roberta-wwm-ext-large"
|
||||
)
|
||||
|
||||
# Set device to GPU if available, else CPU
|
||||
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
|
||||
|
||||
logger.info(f'使用计算设备: {device}')
|
||||
|
||||
tokenizer = AutoTokenizer.from_pretrained(bert_path)
|
||||
model = AutoModel.from_pretrained(bert_path).to(device)
|
||||
|
||||
|
||||
def calculate_similarity(text1, text2, max_length=512):
|
||||
# 预处理文本,设置最大长度
|
||||
inputs1 = tokenizer(text1, padding=True, truncation=True, max_length=max_length, return_tensors='pt').to(device)
|
||||
inputs2 = tokenizer(text2, padding=True, truncation=True, max_length=max_length, return_tensors='pt').to(device)
|
||||
|
||||
# 获取句子向量(这里是取CLS token的向量并展平为一维)
|
||||
with torch.no_grad():
|
||||
encoded_text1 = model(**inputs1)[0][:, 0, :].flatten()
|
||||
encoded_text2 = model(**inputs2)[0][:, 0, :].flatten()
|
||||
|
||||
# 确保转换为numpy数组并且是一维的
|
||||
similarity = 1 - cosine(encoded_text1.cpu().numpy().flatten(), encoded_text2.cpu().numpy().flatten())
|
||||
|
||||
return similarity
|
||||
|
||||
|
||||
# 对boundary到1区间的值进行放大
|
||||
def adjusted_similarity(similarity_score2, boundary=0.8):
|
||||
if similarity_score2 < boundary:
|
||||
return 0
|
||||
|
||||
# 倍数
|
||||
multiple = 1 / (1 - boundary)
|
||||
|
||||
adjusted_score = (similarity_score2 - boundary) * multiple
|
||||
|
||||
return adjusted_score
|
||||
|
||||
|
||||
def calculate_result(t1, t2, boundary):
|
||||
# 计算并打印相似度
|
||||
similarity_score2 = calculate_similarity(t1, t2)
|
||||
|
||||
# 调整相似度
|
||||
adjusted_similarity_score2 = adjusted_similarity(similarity_score2, boundary)
|
||||
|
||||
return similarity_score2, adjusted_similarity_score2
|
||||
|
||||
|
||||
def print_result(t1, t2, boundary):
|
||||
print(f't2: {t2}')
|
||||
# 计算并打印相似度
|
||||
similarity_score2 = calculate_similarity(t1, t2)
|
||||
print(f"两句话的相似度为: {similarity_score2:.4f}")
|
||||
|
||||
# 调整相似度
|
||||
adjusted_similarity_score2 = adjusted_similarity(similarity_score2, boundary)
|
||||
print(f"调整后的相似度为: {adjusted_similarity_score2:.4f}")
|
||||
|
||||
|
||||
def test(boundary):
|
||||
# 原始文本
|
||||
text1 = "这是第一个句子"
|
||||
list = """
|
||||
这是第一个句子
|
||||
这是第二个句子。
|
||||
那么,这是第三个表达。
|
||||
当前呈现的是第四个句子。
|
||||
接下来,我们有第五句话。
|
||||
在此,展示第六条陈述。
|
||||
继续下去,这是第七个短句。
|
||||
不容忽视的是第八个表述。
|
||||
顺延着序列,这是第九句。
|
||||
此处列举的是第十个说法。
|
||||
进入新的篇章,这是第十一个句子。
|
||||
下一段内容即为第十二个句子。
|
||||
显而易见,这是第十三个叙述。
|
||||
渐进地,我们来到第十四句话。
|
||||
向下滚动,您会看到第十五个表达。
|
||||
此刻,呈现在眼前的是第十六个句子。
|
||||
它们中的一个——第十七个句子在此。
|
||||
如同链条般连接,这是第十八个断言。
|
||||
按照顺序排列,接下来是第十九个话语。
|
||||
逐一列举,这是第二十个陈述句。
|
||||
结构相似,本例给出第二十一个实例句。
|
||||
这是最初的陈述句。
|
||||
首先表达的是这一个句子。
|
||||
第一句内容即为此处所示。
|
||||
这是起始的叙述段落。
|
||||
开篇所展示的第一句话就是这个。
|
||||
明媚的阳光洒满大地
|
||||
窗外飘落粉色樱花瓣
|
||||
笔尖轻触纸面思绪万千
|
||||
深夜的月光如水般静谧
|
||||
穿越丛林的小径蜿蜒曲折
|
||||
浅酌清茶品味人生百态
|
||||
破晓时分雄鸡一唱天下白
|
||||
草原上奔驰的骏马无拘无束
|
||||
秋叶纷飞描绘季节更替画卷
|
||||
寒冬雪夜炉火旁围坐共话家常
|
||||
kszdRjYXw
|
||||
pfsMgTlVHnB
|
||||
uQaGxIbWz
|
||||
ZtqNhPmKcOe
|
||||
jfyrXsStVUo
|
||||
wDiEgLkZbn
|
||||
yhNvAfUmqC
|
||||
TpKjxMrWgs
|
||||
eBzHUaFJtYd
|
||||
oQnXcVSiPkL
|
||||
00000
|
||||
"""
|
||||
list2 = list.strip().split('\n')
|
||||
for item in list2:
|
||||
print_result(text1, item, boundary)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
test(0.9)
|
0
Ref_Audio_Selector/ui_init/__init__.py
Normal file
0
Ref_Audio_Selector/ui_init/__init__.py
Normal file
197
Ref_Audio_Selector/ui_init/init_ui_param.py
Normal file
197
Ref_Audio_Selector/ui_init/init_ui_param.py
Normal file
@ -0,0 +1,197 @@
|
||||
import os
|
||||
import multiprocessing
|
||||
import Ref_Audio_Selector.config_param.config_params as params
|
||||
import Ref_Audio_Selector.tool.audio_inference as audio_inference
|
||||
import Ref_Audio_Selector.common.common as common
|
||||
|
||||
rw_param = params.config_manager.get_rw_param()
|
||||
# -------------------基本信息---------------------------
|
||||
|
||||
# 角色所在工作目录
|
||||
base_dir_default = None
|
||||
# 工作目录
|
||||
text_work_space_dir_default = None
|
||||
# 角色名称
|
||||
text_role_default = None
|
||||
# 参考音频所在目录
|
||||
text_refer_audio_file_dir_default = None
|
||||
# 推理音频所在目录
|
||||
text_inference_audio_file_dir_default = None
|
||||
|
||||
# -------------------第一步------------------------------
|
||||
|
||||
# 参考音频抽样目录
|
||||
text_sample_dir_default = None
|
||||
# 分段数
|
||||
slider_subsection_num_default = None
|
||||
# 每段随机抽样个数
|
||||
slider_sample_num_default = None
|
||||
|
||||
# -------------------第二步------------------------------
|
||||
|
||||
# api服务模型切换接口地址
|
||||
text_api_set_model_base_url_default = None
|
||||
# GPT模型参数名
|
||||
text_api_gpt_param_default = None
|
||||
# SoVITS模型参数名
|
||||
text_api_sovits_param_default = None
|
||||
# api服务GPT模型切换接口地址
|
||||
text_api_v2_set_gpt_model_base_url_default = None
|
||||
# GPT模型参数名
|
||||
text_api_v2_gpt_model_param_default = None
|
||||
# api服务SoVITS模型切换接口地址
|
||||
text_api_v2_set_sovits_model_base_url_default = None
|
||||
# SoVITS模型参数名
|
||||
text_api_v2_sovits_model_param_default = None
|
||||
# 推理服务请求地址与参数
|
||||
text_url_default = None
|
||||
# 推理服务请求完整地址
|
||||
text_whole_url_default = None
|
||||
# 文本参数名
|
||||
text_text_default = None
|
||||
# 参考参数类型
|
||||
dropdown_refer_type_param_default = None
|
||||
# 参考音频路径参数名
|
||||
text_ref_path_default = None
|
||||
# 参考音频文本参数名
|
||||
text_ref_text_default = None
|
||||
# 角色情绪参数名
|
||||
text_emotion_default = None
|
||||
# 待推理文本路径
|
||||
text_test_content_default = None
|
||||
# 请求并发数
|
||||
slider_request_concurrency_num_default = 3
|
||||
# 最大并发数
|
||||
slider_request_concurrency_max_num = None
|
||||
|
||||
# -------------------第三步------------------------------
|
||||
|
||||
# 待asr的音频所在目录
|
||||
text_asr_audio_dir_default = None
|
||||
# 待分析的文件路径
|
||||
text_text_similarity_analysis_path_default = None
|
||||
# 文本相似度放大边界
|
||||
slider_text_similarity_amplification_boundary_default = 0.90
|
||||
# 文本相似度分析结果文件所在路径
|
||||
text_text_similarity_result_path_default = None
|
||||
|
||||
# -------------------第四步------------------------------
|
||||
# -------------------第五步------------------------------
|
||||
# 模板内容
|
||||
text_template_default = None
|
||||
|
||||
|
||||
def empty_default(vale, default_value):
|
||||
if vale is None or vale == "":
|
||||
return default_value
|
||||
else:
|
||||
return vale
|
||||
|
||||
|
||||
def init_base():
|
||||
global text_work_space_dir_default, text_role_default, base_dir_default, text_refer_audio_file_dir_default, text_inference_audio_file_dir_default
|
||||
|
||||
text_work_space_dir_default = rw_param.read(rw_param.work_dir)
|
||||
text_role_default = rw_param.read(rw_param.role)
|
||||
base_dir_default = os.path.join(text_work_space_dir_default, text_role_default)
|
||||
|
||||
text_refer_audio_file_dir_default = common.check_path_existence_and_return(
|
||||
os.path.join(base_dir_default, params.reference_audio_dir))
|
||||
|
||||
text_inference_audio_file_dir_default = common.check_path_existence_and_return(
|
||||
os.path.join(base_dir_default, params.inference_audio_dir))
|
||||
|
||||
|
||||
def init_first():
|
||||
global text_sample_dir_default, slider_subsection_num_default, slider_sample_num_default
|
||||
|
||||
text_sample_dir_default = common.check_path_existence_and_return(
|
||||
os.path.join(base_dir_default, params.list_to_convert_reference_audio_dir))
|
||||
|
||||
slider_subsection_num_default = int(empty_default(rw_param.read(rw_param.subsection_num), 10))
|
||||
|
||||
slider_sample_num_default = (empty_default(rw_param.read(rw_param.sample_num), 4))
|
||||
|
||||
|
||||
def init_second():
|
||||
global text_api_set_model_base_url_default, text_api_gpt_param_default, text_api_sovits_param_default, text_api_v2_set_gpt_model_base_url_default, text_api_v2_gpt_model_param_default
|
||||
global text_api_v2_set_sovits_model_base_url_default, text_api_v2_sovits_model_param_default, text_url_default, text_whole_url_default, text_text_default, dropdown_refer_type_param_default, text_ref_path_default
|
||||
global text_ref_text_default, text_emotion_default, text_test_content_default, slider_request_concurrency_num_default, slider_request_concurrency_max_num
|
||||
|
||||
text_api_set_model_base_url_default = empty_default(rw_param.read(rw_param.api_set_model_base_url),
|
||||
'http://localhost:9880/set_model')
|
||||
text_api_gpt_param_default = empty_default(rw_param.read(rw_param.api_gpt_param), 'gpt_model_path')
|
||||
text_api_sovits_param_default = empty_default(rw_param.read(rw_param.api_sovits_param), 'sovits_model_path')
|
||||
|
||||
text_api_v2_set_gpt_model_base_url_default = empty_default(rw_param.read(rw_param.api_v2_set_gpt_model_base_url),
|
||||
'http://localhost:9880/set_gpt_weights')
|
||||
text_api_v2_gpt_model_param_default = empty_default(rw_param.read(rw_param.api_v2_gpt_model_param), 'weights_path')
|
||||
|
||||
text_api_v2_set_sovits_model_base_url_default = empty_default(
|
||||
rw_param.read(rw_param.api_v2_set_sovits_model_base_url), 'http://localhost:9880/set_sovits_weights')
|
||||
text_api_v2_sovits_model_param_default = empty_default(rw_param.read(rw_param.api_v2_sovits_model_param), 'weights_path')
|
||||
|
||||
text_url_default = empty_default(rw_param.read(rw_param.text_url),
|
||||
'http://localhost:9880?prompt_language=中文&text_language=中文&cut_punc=,.;?!、,。?!;:…')
|
||||
text_text_default = empty_default(rw_param.read(rw_param.text_param), 'text')
|
||||
dropdown_refer_type_param_default = empty_default(rw_param.read(rw_param.refer_type_param), '参考音频')
|
||||
|
||||
text_ref_path_default = empty_default(rw_param.read(rw_param.ref_path_param), 'refer_wav_path')
|
||||
text_ref_text_default = empty_default(rw_param.read(rw_param.ref_text_param), 'prompt_text')
|
||||
text_emotion_default = empty_default(rw_param.read(rw_param.emotion_param), 'emotion')
|
||||
|
||||
text_whole_url_default = whole_url(text_url_default, dropdown_refer_type_param_default, text_text_default,
|
||||
text_ref_path_default, text_ref_text_default, text_emotion_default)
|
||||
|
||||
text_test_content_default = empty_default(rw_param.read(rw_param.test_content_path), params.default_test_text_path)
|
||||
|
||||
slider_request_concurrency_max_num = multiprocessing.cpu_count()
|
||||
|
||||
slider_request_concurrency_num_default = empty_default(rw_param.read(rw_param.request_concurrency_num), 3)
|
||||
|
||||
slider_request_concurrency_num_default = min(int(slider_request_concurrency_num_default), slider_request_concurrency_max_num)
|
||||
|
||||
|
||||
# 基于请求路径和参数,合成完整的请求路径
|
||||
def whole_url(text_url, dropdown_refer_type_param, text_text, text_ref_path, text_ref_text, text_emotion):
|
||||
url_composer = audio_inference.TTSURLComposer(text_url, dropdown_refer_type_param, text_emotion, text_text,
|
||||
text_ref_path, text_ref_text)
|
||||
if url_composer.is_emotion():
|
||||
text_whole_url = url_composer.build_url_with_emotion('测试内容', '情绪类型', False)
|
||||
else:
|
||||
text_whole_url = url_composer.build_url_with_ref('测试内容', '参考路径', '参考文本', False)
|
||||
return text_whole_url
|
||||
|
||||
|
||||
def init_third():
|
||||
global text_asr_audio_dir_default, text_text_similarity_analysis_path_default, slider_text_similarity_amplification_boundary_default, text_text_similarity_result_path_default
|
||||
|
||||
text_asr_audio_dir_default = common.check_path_existence_and_return(
|
||||
os.path.join(base_dir_default, params.inference_audio_dir, params.inference_audio_text_aggregation_dir))
|
||||
text_text_similarity_analysis_path_default = common.check_path_existence_and_return(
|
||||
os.path.join(base_dir_default, params.asr_filename + '.list'))
|
||||
slider_text_similarity_amplification_boundary_default = empty_default(
|
||||
rw_param.read(rw_param.text_similarity_amplification_boundary), 0.90)
|
||||
text_text_similarity_result_path_default = common.check_path_existence_and_return(
|
||||
os.path.join(base_dir_default, params.text_emotion_average_similarity_report_filename + '.txt'))
|
||||
|
||||
|
||||
def init_fourth():
|
||||
pass
|
||||
|
||||
|
||||
def init_fifth():
|
||||
global text_template_default
|
||||
|
||||
default_template_path = params.default_template_path
|
||||
text_template_default = empty_default(rw_param.read(rw_param.text_template),
|
||||
common.read_file(default_template_path))
|
||||
|
||||
|
||||
def init_all():
|
||||
init_base()
|
||||
init_first()
|
||||
init_second()
|
||||
init_third()
|
||||
init_fourth()
|
||||
init_fifth()
|
BIN
Ref_Audio_Selector/参考音频筛选流程.png
Normal file
BIN
Ref_Audio_Selector/参考音频筛选流程.png
Normal file
Binary file not shown.
After Width: | Height: | Size: 95 KiB |
Loading…
x
Reference in New Issue
Block a user