mirror of
https://github.com/RVC-Boss/GPT-SoVITS.git
synced 2026-06-07 07:08:16 +08:00
Compare commits
5 Commits
29fb89337c
...
3666fd2bbf
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3666fd2bbf | ||
|
|
08d627c333 | ||
|
|
6d95b559e8 | ||
|
|
7f6787121b | ||
|
|
6e027ec111 |
5
.gitignore
vendored
5
.gitignore
vendored
@ -193,3 +193,8 @@ cython_debug/
|
||||
|
||||
# PyPI configuration file
|
||||
.pypirc
|
||||
/.vs
|
||||
/GPT_SoVITS/configs/tts_infer.yaml
|
||||
/GPT_SoVITS/configs/infer_settings.json
|
||||
/last_selected_preset.json
|
||||
/last_selected_models.json
|
||||
|
||||
76
GPT_SoVITS/AR/models/embedding_cudagraph.py
Normal file
76
GPT_SoVITS/AR/models/embedding_cudagraph.py
Normal file
@ -0,0 +1,76 @@
|
||||
import math
|
||||
|
||||
import torch
|
||||
from torch import nn
|
||||
|
||||
|
||||
class TokenEmbedding(nn.Module):
|
||||
def __init__(self, embedding_dim: int, vocab_size: int, dropout: float = 0.0):
|
||||
super().__init__()
|
||||
self.vocab_size = vocab_size
|
||||
self.embedding_dim = embedding_dim
|
||||
self.dropout = nn.Dropout(p=dropout)
|
||||
self.word_embeddings = nn.Embedding(self.vocab_size, self.embedding_dim)
|
||||
|
||||
@property
|
||||
def weight(self) -> torch.Tensor:
|
||||
return self.word_embeddings.weight
|
||||
|
||||
def embedding(self, index: int) -> torch.Tensor:
|
||||
return self.word_embeddings.weight[index : index + 1]
|
||||
|
||||
def forward(self, x: torch.Tensor):
|
||||
x = self.word_embeddings(x)
|
||||
x = self.dropout(x)
|
||||
return x
|
||||
|
||||
|
||||
class SinePositionalEmbeddingNested(nn.Module):
|
||||
def __init__(
|
||||
self,
|
||||
embedding_dim: int,
|
||||
dropout: float = 0.0,
|
||||
scale: bool = False,
|
||||
alpha: bool = False,
|
||||
max_batch_size: int = 20,
|
||||
max_seq_len: int = 2500,
|
||||
):
|
||||
super().__init__()
|
||||
self.embedding_dim = embedding_dim
|
||||
self.x_scale = math.sqrt(embedding_dim) if scale else 1.0
|
||||
self.alpha = nn.Parameter(torch.ones(1), requires_grad=alpha)
|
||||
self.dropout = nn.Dropout(p=dropout)
|
||||
self.max_batch_size = max_batch_size
|
||||
self.max_seq_len = max_seq_len
|
||||
|
||||
self.reverse = False
|
||||
self.register_buffer(
|
||||
"pe", torch.zeros(max_batch_size, max_seq_len, embedding_dim), persistent=False
|
||||
)
|
||||
self.pe: torch.Tensor
|
||||
self.compute_pe()
|
||||
|
||||
def compute_pe(self):
|
||||
if self.reverse:
|
||||
position = torch.arange(self.max_seq_len - 1, -1, -1.0, dtype=torch.float32).unsqueeze(1)
|
||||
else:
|
||||
position = torch.arange(self.max_seq_len, dtype=torch.float32).unsqueeze(1)
|
||||
div_term = torch.exp(
|
||||
torch.arange(0, self.embedding_dim, 2, dtype=torch.float32)
|
||||
* -(math.log(10000.0) / self.embedding_dim)
|
||||
)
|
||||
pe = self.pe
|
||||
pe[:, :, 0::2] = torch.sin(position * div_term)
|
||||
pe[:, :, 1::2] = torch.cos(position * div_term)
|
||||
|
||||
def forward(self, input_pos: torch.Tensor, x: torch.Tensor) -> torch.Tensor:
|
||||
batch_size = x.shape[0]
|
||||
pe_values = self.pe[torch.arange(batch_size), input_pos - 1]
|
||||
return x * self.x_scale + self.alpha * pe_values.unsqueeze(1)
|
||||
|
||||
def prefill(self, x: torch.Tensor) -> torch.Tensor:
|
||||
input_pos = torch.tensor([i.shape[0] for i in x.unbind()])
|
||||
pe_values = torch.nested.nested_tensor(
|
||||
[self.pe[i, : input_pos[i], :] for i in range(input_pos.size(0))]
|
||||
)
|
||||
return x * self.x_scale + self.alpha.item() * pe_values
|
||||
78
GPT_SoVITS/AR/models/structs_cudagraph.py
Normal file
78
GPT_SoVITS/AR/models/structs_cudagraph.py
Normal file
@ -0,0 +1,78 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
from typing import List, Literal, Optional
|
||||
|
||||
import torch
|
||||
|
||||
Tensor = torch.Tensor
|
||||
|
||||
|
||||
@dataclass
|
||||
class T2SResult:
|
||||
result: List[Tensor] | None = None
|
||||
infer_speed: float = 0.0
|
||||
status: Literal["Success", "Error"] = "Success"
|
||||
exception: Optional[Exception] = None
|
||||
traceback: Optional[str] = None
|
||||
|
||||
|
||||
@dataclass
|
||||
class T2SRequest:
|
||||
x: List[torch.Tensor]
|
||||
x_lens: Tensor
|
||||
prompts: torch.Tensor
|
||||
bert_feature: List[Tensor]
|
||||
valid_length: int
|
||||
top_k: int = 5
|
||||
top_p: float = 1
|
||||
early_stop_num: int = -1
|
||||
temperature: float = 1.0
|
||||
repetition_penalty: float = 1.35
|
||||
use_cuda_graph: bool = False
|
||||
debug: bool = False
|
||||
|
||||
|
||||
class T2SSession:
|
||||
def __init__(self, decoder, request: T2SRequest, device: torch.device, dtype: torch.dtype):
|
||||
with device:
|
||||
self.decoder = decoder
|
||||
self.request = request
|
||||
self.device = device
|
||||
self.dtype = dtype
|
||||
|
||||
bsz = len(request.x)
|
||||
y_len = request.prompts.size(-1)
|
||||
self.bsz = bsz
|
||||
self.y_len = y_len
|
||||
|
||||
from AR.models.t2s_model_cudagraph import Sampler
|
||||
|
||||
self.sampler = Sampler(bsz, decoder.vocab_size)
|
||||
|
||||
self.x = request.x
|
||||
self.x_lens = request.x_lens.to(torch.int32)
|
||||
self.y = request.prompts
|
||||
self.bert_feature = request.bert_feature
|
||||
|
||||
self.prefill_len = self.x_lens + self.y.size(1)
|
||||
|
||||
self.input_pos = torch.zeros_like(self.prefill_len)
|
||||
self.input_pos.add_(self.prefill_len)
|
||||
|
||||
self.completed = torch.Tensor([False] * len(self.x)).bool().to(device)
|
||||
self.y_results: List[Tensor] = [None] * len(self.x) # type: ignore
|
||||
|
||||
self.xy_pos = decoder.embed(self.x, self.y, self.bert_feature)
|
||||
|
||||
attn_mask = []
|
||||
for bs in range(bsz):
|
||||
pos = int(self.x_lens[bs].item())
|
||||
mask = torch.zeros(pos + y_len, pos + y_len).bool()
|
||||
mask[:, :pos].fill_(True)
|
||||
if y_len > 0:
|
||||
mask[-y_len:, -y_len:] = ~torch.triu(
|
||||
torch.ones(y_len, y_len, dtype=torch.bool), diagonal=1
|
||||
)
|
||||
attn_mask.append(mask)
|
||||
self.attn_mask_nested = torch.nested.nested_tensor(attn_mask)
|
||||
602
GPT_SoVITS/AR/models/t2s_model_cudagraph.py
Normal file
602
GPT_SoVITS/AR/models/t2s_model_cudagraph.py
Normal file
@ -0,0 +1,602 @@
|
||||
"""
|
||||
CUDA Graph accelerated T2S decoder.
|
||||
Uses PyTorch native scaled_dot_product_attention (no flash_attn dependency).
|
||||
Adapted from gsvpp/AR/models/t2s_model_abc.py and t2s_model_flash_attn.py.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
import time
|
||||
import traceback
|
||||
from typing import Dict, List, MutableSequence, Optional, Tuple
|
||||
|
||||
import torch
|
||||
import torch.nn as nn
|
||||
import torch.nn.functional as F
|
||||
from torch.cuda.graphs import CUDAGraph
|
||||
from tqdm import tqdm
|
||||
|
||||
from AR.models.embedding_cudagraph import (
|
||||
SinePositionalEmbeddingNested as SinePositionalEmbedding,
|
||||
)
|
||||
from AR.models.embedding_cudagraph import TokenEmbedding
|
||||
from AR.models.structs_cudagraph import T2SRequest, T2SResult, T2SSession
|
||||
|
||||
Tensor = torch.Tensor
|
||||
|
||||
|
||||
class Sampler(nn.Module):
|
||||
def __init__(self, batch_size: int, vocab_size: int) -> None:
|
||||
super().__init__()
|
||||
self.batch_size = batch_size
|
||||
|
||||
def sample(
|
||||
self,
|
||||
logits: Tensor,
|
||||
previous_tokens: Tensor,
|
||||
temperature: float,
|
||||
top_k: int,
|
||||
top_p: float,
|
||||
repetition_penalty: float,
|
||||
) -> Tensor:
|
||||
previous_tokens = previous_tokens.long()
|
||||
score = torch.gather(logits, dim=1, index=previous_tokens)
|
||||
score = torch.where(
|
||||
score < 0, score * repetition_penalty, score / repetition_penalty
|
||||
)
|
||||
logits.scatter_(dim=1, index=previous_tokens, src=score)
|
||||
|
||||
sorted_logits, sorted_indices = torch.sort(logits, descending=True)
|
||||
cum_probs = torch.cumsum(
|
||||
torch.nn.functional.softmax(sorted_logits, dim=-1), dim=-1
|
||||
)
|
||||
sorted_indices_to_remove = cum_probs > top_p
|
||||
sorted_indices_to_remove[:, 0] = False
|
||||
indices_to_remove = sorted_indices_to_remove.scatter(
|
||||
dim=1, index=sorted_indices, src=sorted_indices_to_remove
|
||||
)
|
||||
logits = logits.masked_fill(indices_to_remove, -float("Inf"))
|
||||
|
||||
logits = logits / max(temperature, 1e-5)
|
||||
|
||||
v, _ = torch.topk(logits, top_k)
|
||||
pivot = v[:, -1].unsqueeze(-1)
|
||||
logits = torch.where(logits < pivot, -float("Inf"), logits)
|
||||
|
||||
probs = torch.nn.functional.softmax(logits, dim=-1)
|
||||
q = torch.empty_like(probs).exponential_(1.0)
|
||||
idx_next = torch.argmax(probs / q, dim=-1, keepdim=True).to(dtype=torch.int32)
|
||||
|
||||
return idx_next
|
||||
|
||||
|
||||
# ─── KV Cache ────────────────────<E29480><E29480><EFBFBD>───────────────────────────────────────────
|
||||
|
||||
|
||||
class KVCacheNHD(nn.Module):
|
||||
def __init__(self, batch_size, max_seq_length, n_heads, head_dim):
|
||||
super().__init__()
|
||||
assert batch_size > 0
|
||||
cache_shape = (batch_size, max_seq_length, n_heads, head_dim)
|
||||
self.n_head = n_heads
|
||||
self.head_dim = head_dim
|
||||
self.batch_size = batch_size
|
||||
self.max_seq_length = max_seq_length
|
||||
self.register_buffer(
|
||||
"k_cache", torch.zeros(size=cache_shape), persistent=False
|
||||
)
|
||||
self.register_buffer(
|
||||
"v_cache", torch.zeros(size=cache_shape), persistent=False
|
||||
)
|
||||
|
||||
def update(self, input_pos: Tensor, k_val: Tensor, v_val: Tensor):
|
||||
index = (
|
||||
(input_pos - 1)
|
||||
.unsqueeze(-1)
|
||||
.unsqueeze(-1)
|
||||
.unsqueeze(-1)
|
||||
.expand(-1, -1, self.n_head, self.head_dim)
|
||||
.to(torch.int64)
|
||||
)
|
||||
k_out = self.k_cache
|
||||
v_out = self.v_cache
|
||||
k_out.scatter_(1, index, k_val)
|
||||
v_out.scatter_(1, index, v_val)
|
||||
return k_out, v_out
|
||||
|
||||
def empty(self):
|
||||
self.k_cache.zero_()
|
||||
self.v_cache.zero_()
|
||||
|
||||
def prefill_kv(self, k_val: Tensor, v_val: Tensor, bs: int):
|
||||
self.k_cache[[bs], : k_val.shape[1]] = k_val
|
||||
self.v_cache[[bs], : v_val.shape[1]] = v_val
|
||||
|
||||
|
||||
# ─── Attention (PyTorch native SDPA, no flash_attn) ─────────────────────────
|
||||
|
||||
|
||||
class Attention(nn.Module):
|
||||
def __init__(self, n_head: int, hidden_dim: int):
|
||||
super().__init__()
|
||||
self.n_head = n_head
|
||||
self.hidden_dim = hidden_dim
|
||||
assert hidden_dim % n_head == 0
|
||||
self.head_dim = hidden_dim // n_head
|
||||
self.in_proj = nn.Linear(hidden_dim, hidden_dim * 3, bias=True)
|
||||
self.out_proj = nn.Linear(hidden_dim, hidden_dim, bias=True)
|
||||
self.dropout = nn.Dropout(0.1)
|
||||
|
||||
self._register_load_state_dict_pre_hook(self.load_hook)
|
||||
|
||||
def load_hook(self, state_dict: dict, prefix, *args):
|
||||
keys_to_modify = [key for key in state_dict if "in_proj_" in key]
|
||||
for key in keys_to_modify:
|
||||
new_key = key.replace("in_proj_", "in_proj.")
|
||||
state_dict[new_key] = state_dict.pop(key)
|
||||
|
||||
def forward(
|
||||
self, x: Tensor, input_pos: Tensor, kv_cache: KVCacheNHD
|
||||
) -> Tensor:
|
||||
bsz, seqlen, _ = x.shape
|
||||
|
||||
q, k, v = self.in_proj.forward(x).chunk(3, dim=-1)
|
||||
|
||||
q = q.view(bsz, seqlen, self.n_head, self.head_dim)
|
||||
k = k.view(bsz, seqlen, self.n_head, self.head_dim)
|
||||
v = v.view(bsz, seqlen, self.n_head, self.head_dim)
|
||||
|
||||
k_cache, v_cache = kv_cache.update(input_pos, k, v)
|
||||
|
||||
q = q.transpose(1, 2) # [B, H, 1, D]
|
||||
k_out = k_cache.transpose(1, 2) # [B, H, max_seq, D]
|
||||
v_out = v_cache.transpose(1, 2) # [B, H, max_seq, D]
|
||||
|
||||
attn = F.scaled_dot_product_attention(q, k_out, v_out)
|
||||
|
||||
attn = self.dropout.forward(attn)
|
||||
attn = attn.transpose(1, 2).reshape(bsz, seqlen, self.hidden_dim)
|
||||
attn = self.out_proj.forward(attn)
|
||||
return attn
|
||||
|
||||
def prefill(self, x: Tensor, mask: Tensor, kv_cache: KVCacheNHD) -> Tensor:
|
||||
bsz = x.size(0)
|
||||
outputs = []
|
||||
for bs in range(bsz):
|
||||
x_b = x[bs].unsqueeze(0)
|
||||
q, k, v = self.in_proj.forward(x_b.unsqueeze(0)).chunk(3, dim=-1)
|
||||
q = q.contiguous().view(1, -1, self.n_head, self.head_dim)
|
||||
k = k.contiguous().view(1, -1, self.n_head, self.head_dim)
|
||||
v = v.contiguous().view(1, -1, self.n_head, self.head_dim)
|
||||
kv_cache.prefill_kv(k, v, bs)
|
||||
q, k, v = map(lambda t: t.transpose(1, 2), (q, k, v))
|
||||
attn_mask = (
|
||||
mask[bs].unsqueeze(0).unsqueeze(0).expand(1, self.n_head, -1, -1)
|
||||
)
|
||||
attn = F.scaled_dot_product_attention(q, k, v, attn_mask=attn_mask)
|
||||
attn = self.dropout.forward(attn)
|
||||
attn = attn.transpose(1, 2).contiguous().view(1, -1, self.hidden_dim)
|
||||
output = self.out_proj.forward(attn)
|
||||
outputs.append(output.squeeze(0))
|
||||
return torch.nested.nested_tensor(outputs)
|
||||
|
||||
|
||||
# ─── Feed Forward ────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class FeedForward(nn.Module):
|
||||
def __init__(self, dim: int, hidden_dim: int) -> None:
|
||||
super().__init__()
|
||||
self.linear1 = nn.Linear(dim, hidden_dim, bias=True)
|
||||
self.linear2 = nn.Linear(hidden_dim, dim, bias=True)
|
||||
self.dropout = nn.Dropout(0.1)
|
||||
|
||||
def forward(self, x: Tensor) -> Tensor:
|
||||
return self.dropout.forward(
|
||||
self.linear2(self.dropout.forward(F.relu(self.linear1(x))))
|
||||
)
|
||||
|
||||
|
||||
# ─── Transformer Block ──────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TransformerBlock(nn.Module):
|
||||
def __init__(self, n_head, ffn_dim, hidden_dim) -> None:
|
||||
super().__init__()
|
||||
self.hidden_dim = hidden_dim
|
||||
self.attention = Attention(n_head, hidden_dim)
|
||||
self.feed_forward = FeedForward(hidden_dim, ffn_dim)
|
||||
self.attention_norm = nn.LayerNorm([hidden_dim])
|
||||
self.ffn_norm = nn.LayerNorm([hidden_dim])
|
||||
self.dropout = nn.Dropout(0.1)
|
||||
|
||||
self._register_load_state_dict_pre_hook(self.load_hook)
|
||||
|
||||
def load_hook(self, state_dict: dict[str, Tensor], prefix, *args):
|
||||
for key in list(state_dict.keys()):
|
||||
new_key = (
|
||||
key.replace("self_attn", "attention")
|
||||
.replace("linear", "feed_forward.linear")
|
||||
.replace("norm1", "attention_norm")
|
||||
.replace("norm2", "ffn_norm")
|
||||
)
|
||||
state_dict[new_key] = state_dict.pop(key)
|
||||
|
||||
def forward(
|
||||
self, x: Tensor, input_pos: Tensor, kv_cache: KVCacheNHD
|
||||
) -> Tensor:
|
||||
h = self.attention_norm.forward(
|
||||
x + self.dropout.forward(self.attention.forward(x, input_pos, kv_cache))
|
||||
)
|
||||
out = self.ffn_norm.forward(h + self.feed_forward.forward(h))
|
||||
return out
|
||||
|
||||
def prefill(self, x: Tensor, mask: Tensor, kv_cache: KVCacheNHD) -> Tensor:
|
||||
h = self.attention_norm.forward(
|
||||
x + self.dropout.forward(self.attention.prefill(x, mask, kv_cache))
|
||||
)
|
||||
out = self.ffn_norm.forward(h + self.feed_forward.forward(h))
|
||||
return out
|
||||
|
||||
|
||||
# ─── Transformer Decoder ────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TransformerDecoder(nn.Module):
|
||||
def __init__(
|
||||
self,
|
||||
hidden_dim,
|
||||
n_layer,
|
||||
n_head,
|
||||
ffn_dim,
|
||||
vocab_size,
|
||||
max_seq_length,
|
||||
max_batch_size,
|
||||
) -> None:
|
||||
super().__init__()
|
||||
self.hidden_dim = hidden_dim
|
||||
self.n_head = n_head
|
||||
assert hidden_dim % n_head == 0
|
||||
self.head_dim = hidden_dim // n_head
|
||||
self.vocab_size = vocab_size
|
||||
self.n_layer = n_layer
|
||||
self.layers = nn.ModuleList(
|
||||
TransformerBlock(n_head, ffn_dim, hidden_dim) for _ in range(n_layer)
|
||||
)
|
||||
self.max_seq_length: int = max_seq_length
|
||||
self.max_batch_size: int = max_batch_size
|
||||
|
||||
def forward(
|
||||
self,
|
||||
input_pos: Tensor,
|
||||
x: Tensor,
|
||||
kv_caches: MutableSequence[KVCacheNHD],
|
||||
):
|
||||
for layer, kv_cache in zip(self.layers, kv_caches):
|
||||
x = layer.forward(x, input_pos, kv_cache)
|
||||
return x
|
||||
|
||||
def prefill(
|
||||
self,
|
||||
x: Tensor,
|
||||
mask: Tensor,
|
||||
kv_caches: MutableSequence[KVCacheNHD],
|
||||
):
|
||||
for layer, kv_cache in zip(self.layers, kv_caches):
|
||||
x = layer.prefill(x, mask, kv_cache)
|
||||
return x
|
||||
|
||||
|
||||
# ─── T2S Decoder ─────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class T2SDecoder(nn.Module):
|
||||
def __init__(
|
||||
self,
|
||||
config,
|
||||
*args,
|
||||
norm_first=False,
|
||||
max_seq_length=2500,
|
||||
max_batch_size=10,
|
||||
**kwds,
|
||||
) -> None:
|
||||
super().__init__()
|
||||
hidden_dim = config["model"]["hidden_dim"]
|
||||
embedding_dim = config["model"]["embedding_dim"]
|
||||
n_head = config["model"]["head"]
|
||||
n_layer = config["model"]["n_layer"]
|
||||
vocab_size = config["model"]["vocab_size"]
|
||||
phoneme_vocab_size = config["model"]["phoneme_vocab_size"]
|
||||
p_dropout = config["model"]["dropout"]
|
||||
EOS = config["model"]["EOS"]
|
||||
ffn_dim = hidden_dim * 4
|
||||
|
||||
self.n_layer = n_layer
|
||||
self.hidden_dim = hidden_dim
|
||||
self.n_head = n_head
|
||||
assert hidden_dim % n_head == 0
|
||||
self.head_dim = hidden_dim // n_head
|
||||
self.embedding_dim = embedding_dim
|
||||
self.vocab_size = vocab_size
|
||||
self.phoneme_vocab_size = phoneme_vocab_size
|
||||
self.p_dropout = p_dropout
|
||||
self.max_seq_length = max_seq_length
|
||||
self.max_batch_size = max_batch_size
|
||||
self.EOS = EOS
|
||||
assert self.EOS == self.vocab_size - 1
|
||||
|
||||
self.bert_proj = nn.Linear(1024, self.embedding_dim)
|
||||
self.ar_text_embedding = TokenEmbedding(
|
||||
self.embedding_dim, self.phoneme_vocab_size, self.p_dropout
|
||||
)
|
||||
self.ar_text_position = SinePositionalEmbedding(
|
||||
self.embedding_dim,
|
||||
dropout=0.1,
|
||||
scale=False,
|
||||
alpha=True,
|
||||
max_batch_size=max_batch_size,
|
||||
max_seq_len=max_seq_length,
|
||||
)
|
||||
self.ar_audio_embedding = TokenEmbedding(
|
||||
self.embedding_dim, self.vocab_size, self.p_dropout
|
||||
)
|
||||
self.ar_audio_position = SinePositionalEmbedding(
|
||||
self.embedding_dim,
|
||||
dropout=0.1,
|
||||
scale=False,
|
||||
alpha=True,
|
||||
max_batch_size=max_batch_size,
|
||||
max_seq_len=max_seq_length,
|
||||
)
|
||||
self.ar_predict_layer = nn.Linear(self.hidden_dim, self.vocab_size, bias=False)
|
||||
self.h = TransformerDecoder(
|
||||
hidden_dim,
|
||||
n_layer,
|
||||
n_head,
|
||||
ffn_dim,
|
||||
vocab_size,
|
||||
max_seq_length,
|
||||
max_batch_size,
|
||||
)
|
||||
|
||||
self._register_load_state_dict_pre_hook(self.load_hook)
|
||||
|
||||
def load_hook(self, state_dict, prefix, *args):
|
||||
model_keys = [key for key in state_dict if key.startswith("model.")]
|
||||
for key in model_keys:
|
||||
new_key = key[len("model.") :]
|
||||
state_dict[new_key] = state_dict.pop(key)
|
||||
|
||||
def init_cache(self, bsz: int = 0) -> nn.ModuleList:
|
||||
bsz = bsz or self.h.max_batch_size
|
||||
assert bsz <= self.h.max_batch_size
|
||||
seq_lens = self.h.max_seq_length
|
||||
device = self.bert_proj.bias.device
|
||||
dtype = self.bert_proj.bias.dtype
|
||||
return nn.ModuleList(
|
||||
[
|
||||
KVCacheNHD(bsz, seq_lens, self.n_head, self.head_dim)
|
||||
for _ in range(self.n_layer)
|
||||
],
|
||||
).to(device, dtype)
|
||||
|
||||
def embed(
|
||||
self,
|
||||
x: List[torch.Tensor],
|
||||
y: torch.Tensor,
|
||||
bert_features: List[torch.Tensor],
|
||||
):
|
||||
x_nested = torch.nested.nested_tensor(x)
|
||||
assert x_nested.size(0) <= self.max_batch_size
|
||||
bert_features_nested = torch.nested.nested_tensor(
|
||||
list(map(lambda t: t.transpose(0, 1), bert_features))
|
||||
)
|
||||
x_emb = self.ar_text_embedding.forward(x_nested)
|
||||
bert = self.bert_proj.forward(bert_features_nested)
|
||||
x_emb = x_emb + bert
|
||||
x_pos = self.ar_text_position.prefill(x_emb)
|
||||
|
||||
y_nested = torch.nested.nested_tensor(list(y.unbind(0)))
|
||||
y_emb = self.ar_audio_embedding.forward(y_nested)
|
||||
y_pos = self.ar_audio_position.prefill(y_emb)
|
||||
|
||||
xy_pos = torch.nested.nested_tensor(
|
||||
[torch.cat([x_pos[i], y_pos[i]]) for i in range(len(x))]
|
||||
)
|
||||
return xy_pos
|
||||
|
||||
def capture(
|
||||
self,
|
||||
input_pos: Tensor,
|
||||
x: Tensor,
|
||||
x_dec: Tensor,
|
||||
kv_caches,
|
||||
) -> CUDAGraph:
|
||||
s = torch.cuda.Stream()
|
||||
s.wait_stream(torch.cuda.current_stream())
|
||||
|
||||
graph = torch.cuda.CUDAGraph()
|
||||
|
||||
with torch.cuda.stream(s):
|
||||
for _ in range(5):
|
||||
self.h.forward(input_pos, x, kv_caches)
|
||||
torch.cuda.current_stream().wait_stream(s)
|
||||
|
||||
with torch.cuda.graph(graph):
|
||||
x_dec.copy_(self.h.forward(input_pos, x, kv_caches))
|
||||
torch.cuda.synchronize()
|
||||
|
||||
return graph
|
||||
|
||||
|
||||
# ─── CUDA Graph Runner ───────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class CUDAGraphRunner:
|
||||
def __init__(
|
||||
self,
|
||||
decoder_model: T2SDecoder,
|
||||
device: torch.device = torch.device("cpu"),
|
||||
dtype: torch.dtype = torch.float32,
|
||||
) -> None:
|
||||
assert device.type in {"cpu", "cuda", "mps", "xpu", "mtia"}
|
||||
assert dtype in {torch.float16, torch.bfloat16, torch.float32}
|
||||
self.device = device
|
||||
self.dtype = dtype
|
||||
self.decoder_model: T2SDecoder = decoder_model.to(self.device, self.dtype)
|
||||
self.graph: Optional[CUDAGraph] = None
|
||||
self.xy_pos_ = torch.rand(
|
||||
(1, 1, decoder_model.embedding_dim), device=device
|
||||
).to(dtype)
|
||||
self.xy_dec_ = torch.rand(
|
||||
(1, 1, decoder_model.embedding_dim), device=device
|
||||
).to(dtype)
|
||||
self.kv_cache = decoder_model.init_cache(1)
|
||||
self.input_pos = torch.tensor([10]).int().cuda()
|
||||
|
||||
def _handle_request(self, request: T2SRequest):
|
||||
with self.device:
|
||||
for i in self.kv_cache:
|
||||
i.empty()
|
||||
|
||||
decoder = self.decoder_model
|
||||
session = T2SSession(decoder, request, device=self.device, dtype=self.dtype)
|
||||
self.input_pos.copy_(session.input_pos)
|
||||
|
||||
t1 = 0.0
|
||||
infer_speed = 0.0
|
||||
y = session.y
|
||||
bsz = y.size(0)
|
||||
|
||||
for idx in tqdm(range(1500)):
|
||||
if idx == 0:
|
||||
xy_dec = decoder.h.prefill(
|
||||
session.xy_pos, session.attn_mask_nested, self.kv_cache
|
||||
)
|
||||
xy_dec = torch.stack([t[[-1]] for t in xy_dec.unbind()])
|
||||
else:
|
||||
if (
|
||||
request.use_cuda_graph
|
||||
and self.graph is None
|
||||
and torch.cuda.is_available()
|
||||
):
|
||||
self.xy_pos_.copy_(session.xy_pos)
|
||||
self.graph = decoder.capture(
|
||||
self.input_pos,
|
||||
self.xy_pos_,
|
||||
self.xy_dec_,
|
||||
kv_caches=self.kv_cache,
|
||||
)
|
||||
|
||||
if self.graph:
|
||||
self.xy_pos_.copy_(session.xy_pos)
|
||||
self.graph.replay()
|
||||
xy_dec = self.xy_dec_.clone()
|
||||
else:
|
||||
xy_dec = decoder.h.forward(
|
||||
self.input_pos,
|
||||
session.xy_pos,
|
||||
self.kv_cache,
|
||||
)
|
||||
|
||||
logits = decoder.ar_predict_layer(xy_dec[:, -1])
|
||||
self.input_pos.add_(1)
|
||||
|
||||
if idx == 0:
|
||||
logits[:, -1] = float("-inf")
|
||||
|
||||
samples = session.sampler.sample(
|
||||
logits=logits,
|
||||
previous_tokens=session.y,
|
||||
top_k=request.top_k,
|
||||
top_p=request.top_p,
|
||||
repetition_penalty=request.repetition_penalty,
|
||||
temperature=request.temperature,
|
||||
)
|
||||
|
||||
session.y = torch.cat([session.y, samples], dim=1)
|
||||
|
||||
argmax_token = torch.argmax(logits, dim=-1)
|
||||
sample_token = samples.squeeze(1)
|
||||
EOS_mask = (argmax_token == decoder.EOS) | (
|
||||
sample_token == decoder.EOS
|
||||
)
|
||||
|
||||
newly_done_mask = EOS_mask & (~session.completed)
|
||||
newly_done_indices = newly_done_mask.nonzero()
|
||||
|
||||
if newly_done_indices.numel() > 0:
|
||||
session.y_results[newly_done_indices[0]] = session.y[
|
||||
newly_done_indices[0], session.y_len : -1
|
||||
].squeeze(0)
|
||||
session.completed[newly_done_indices] = True
|
||||
|
||||
if torch.all(session.completed).item():
|
||||
if session.y.size(1) == 0:
|
||||
session.y = torch.cat(
|
||||
[session.y, torch.zeros_like(samples)], dim=1
|
||||
)
|
||||
tqdm.write("Bad Zero Prediction")
|
||||
else:
|
||||
tqdm.write(
|
||||
f"T2S Decoding EOS {session.prefill_len.tolist().__str__().strip('[]')} -> \n"
|
||||
f"{[i.size(0) for i in session.y_results].__str__().strip('[]')}"
|
||||
)
|
||||
tqdm.write(
|
||||
f"Infer Speed: {(idx - 1) / (time.perf_counter() - t1):.2f} token/s"
|
||||
)
|
||||
infer_speed = (idx - 1) / (time.perf_counter() - t1)
|
||||
break
|
||||
|
||||
if (
|
||||
request.early_stop_num != -1
|
||||
and (session.y.size(1) - session.y_len) > request.early_stop_num
|
||||
) or idx == 1499:
|
||||
for i in range(bsz):
|
||||
if not session.completed[i].item():
|
||||
session.y_results[i] = session.y[i, session.y_len :]
|
||||
session.completed[i] = True
|
||||
break
|
||||
|
||||
y_emb = decoder.ar_audio_embedding(session.y[:, -1:])
|
||||
session.xy_pos = decoder.ar_audio_position.forward(
|
||||
self.input_pos - session.x_lens, y_emb
|
||||
)
|
||||
|
||||
if idx == 2:
|
||||
t1 = time.perf_counter()
|
||||
|
||||
if idx % 100 == 0 and self.device.type == "cuda":
|
||||
torch.cuda.empty_cache()
|
||||
|
||||
if self.device.type == "cuda":
|
||||
torch.cuda.empty_cache()
|
||||
|
||||
return session.y_results[: request.valid_length], infer_speed
|
||||
|
||||
def generate(self, request: T2SRequest) -> T2SResult:
|
||||
try:
|
||||
result, infer_speed = self._handle_request(request)
|
||||
t2s_result = T2SResult(
|
||||
result=result, infer_speed=infer_speed, status="Success"
|
||||
)
|
||||
except Exception as e:
|
||||
t2s_result = T2SResult(
|
||||
status="Error", exception=e, traceback=traceback.format_exc()
|
||||
)
|
||||
return t2s_result
|
||||
|
||||
@staticmethod
|
||||
def load_decoder(weights_path, max_batch_size=1) -> T2SDecoder:
|
||||
print(
|
||||
f"Loading Text2Semantic Weights from {weights_path} with CUDA Graph (SDPA) Implement"
|
||||
)
|
||||
dict_s1 = torch.load(
|
||||
weights_path, map_location="cpu", weights_only=False#, mmap=True
|
||||
)
|
||||
config = dict_s1["config"]
|
||||
decoder = T2SDecoder(config, max_batch_size=max_batch_size)
|
||||
state_dict = dict_s1["weight"]
|
||||
decoder.load_state_dict(state_dict)
|
||||
return decoder.eval()
|
||||
@ -1,56 +0,0 @@
|
||||
custom:
|
||||
bert_base_path: GPT_SoVITS/pretrained_models/chinese-roberta-wwm-ext-large
|
||||
cnhuhbert_base_path: GPT_SoVITS/pretrained_models/chinese-hubert-base
|
||||
device: cuda
|
||||
is_half: true
|
||||
t2s_weights_path: GPT_SoVITS/pretrained_models/gsv-v2final-pretrained/s1bert25hz-5kh-longer-epoch=12-step=369668.ckpt
|
||||
version: v2
|
||||
vits_weights_path: GPT_SoVITS/pretrained_models/gsv-v2final-pretrained/s2G2333k.pth
|
||||
v1:
|
||||
bert_base_path: GPT_SoVITS/pretrained_models/chinese-roberta-wwm-ext-large
|
||||
cnhuhbert_base_path: GPT_SoVITS/pretrained_models/chinese-hubert-base
|
||||
device: cpu
|
||||
is_half: false
|
||||
t2s_weights_path: GPT_SoVITS/pretrained_models/s1bert25hz-2kh-longer-epoch=68e-step=50232.ckpt
|
||||
version: v1
|
||||
vits_weights_path: GPT_SoVITS/pretrained_models/s2G488k.pth
|
||||
v2:
|
||||
bert_base_path: GPT_SoVITS/pretrained_models/chinese-roberta-wwm-ext-large
|
||||
cnhuhbert_base_path: GPT_SoVITS/pretrained_models/chinese-hubert-base
|
||||
device: cpu
|
||||
is_half: false
|
||||
t2s_weights_path: GPT_SoVITS/pretrained_models/gsv-v2final-pretrained/s1bert25hz-5kh-longer-epoch=12-step=369668.ckpt
|
||||
version: v2
|
||||
vits_weights_path: GPT_SoVITS/pretrained_models/gsv-v2final-pretrained/s2G2333k.pth
|
||||
v2Pro:
|
||||
bert_base_path: GPT_SoVITS/pretrained_models/chinese-roberta-wwm-ext-large
|
||||
cnhuhbert_base_path: GPT_SoVITS/pretrained_models/chinese-hubert-base
|
||||
device: cpu
|
||||
is_half: false
|
||||
t2s_weights_path: GPT_SoVITS/pretrained_models/s1v3.ckpt
|
||||
version: v2Pro
|
||||
vits_weights_path: GPT_SoVITS/pretrained_models/v2Pro/s2Gv2Pro.pth
|
||||
v2ProPlus:
|
||||
bert_base_path: GPT_SoVITS/pretrained_models/chinese-roberta-wwm-ext-large
|
||||
cnhuhbert_base_path: GPT_SoVITS/pretrained_models/chinese-hubert-base
|
||||
device: cpu
|
||||
is_half: false
|
||||
t2s_weights_path: GPT_SoVITS/pretrained_models/s1v3.ckpt
|
||||
version: v2ProPlus
|
||||
vits_weights_path: GPT_SoVITS/pretrained_models/v2Pro/s2Gv2ProPlus.pth
|
||||
v3:
|
||||
bert_base_path: GPT_SoVITS/pretrained_models/chinese-roberta-wwm-ext-large
|
||||
cnhuhbert_base_path: GPT_SoVITS/pretrained_models/chinese-hubert-base
|
||||
device: cpu
|
||||
is_half: false
|
||||
t2s_weights_path: GPT_SoVITS/pretrained_models/s1v3.ckpt
|
||||
version: v3
|
||||
vits_weights_path: GPT_SoVITS/pretrained_models/s2Gv3.pth
|
||||
v4:
|
||||
bert_base_path: GPT_SoVITS/pretrained_models/chinese-roberta-wwm-ext-large
|
||||
cnhuhbert_base_path: GPT_SoVITS/pretrained_models/chinese-hubert-base
|
||||
device: cpu
|
||||
is_half: false
|
||||
t2s_weights_path: GPT_SoVITS/pretrained_models/s1v3.ckpt
|
||||
version: v4
|
||||
vits_weights_path: GPT_SoVITS/pretrained_models/gsv-v4-pretrained/s2Gv4.pth
|
||||
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
425
GPT_SoVITS/persistence_tools.py
Normal file
425
GPT_SoVITS/persistence_tools.py
Normal file
@ -0,0 +1,425 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
GPT-SoVITS 持久化工具类
|
||||
包含:模型配置、参考音频、推理参数 的持久化读写与管理
|
||||
抽离自主文件,减少主文件臃肿,方便后续维护
|
||||
"""
|
||||
import json
|
||||
import yaml
|
||||
import hashlib
|
||||
import os
|
||||
import shutil
|
||||
import random
|
||||
from pathlib import Path
|
||||
|
||||
# ===================== 全局配置(统一管理所有持久化文件路径) =====================
|
||||
# 模型持久化配置文件
|
||||
LAST_SELECTED_MODELS_JSON = Path("./last_selected_models.json")
|
||||
# 参考预设最后选中配置文件
|
||||
LAST_SELECTED_PRESET_JSON = Path("./last_selected_preset.json")
|
||||
# 参考音频持久化目录
|
||||
REF_AUDIO_DIR = Path("GPT_SoVITS/ref_audios")
|
||||
# 参考预设配置文件
|
||||
REF_PRESETS_YAML = Path("GPT_SoVITS/configs/ref_audios_presets.yaml")
|
||||
# 推理参数配置文件
|
||||
INFER_SETTINGS_JSON = Path("GPT_SoVITS/configs/infer_settings.json")
|
||||
|
||||
# 参考音频配置常量
|
||||
MAX_FILENAME_LENGTH = 40
|
||||
INVALID_FILE_CHARS = set(r'\/:*?"<>|')
|
||||
|
||||
# 默认推理参数
|
||||
DEFAULT_INFER_SETTINGS = {
|
||||
"batch_size": 20,
|
||||
"sample_steps": 32,
|
||||
"fragment_interval": 0.2,
|
||||
"speed_factor": 1.0,
|
||||
"top_k": 5,
|
||||
"top_p": 1.0,
|
||||
"temperature": 1.0,
|
||||
"repetition_penalty": 1.35,
|
||||
"how_to_cut": "凑四句一切",
|
||||
"super_sampling": False,
|
||||
"parallel_infer": True,
|
||||
"split_bucket": True,
|
||||
"seed": -1,
|
||||
"keep_random": True
|
||||
}
|
||||
|
||||
# ===================== 通用工具函数(抽离重复逻辑) =====================
|
||||
def sanitize_filename(name):
|
||||
"""清理文件名中的非法字符,替换为下划线"""
|
||||
if not name:
|
||||
return "unnamed_preset"
|
||||
return ''.join(c if c not in INVALID_FILE_CHARS else '_' for c in name)
|
||||
|
||||
def get_audio_md5(file_path, chunk_size=4096):
|
||||
"""计算音频文件的MD5值(取前8位),用于区分不同音频内容"""
|
||||
if not os.path.exists(file_path):
|
||||
return "invalid_file"
|
||||
try:
|
||||
md5 = hashlib.md5()
|
||||
with open(file_path, 'rb') as f:
|
||||
while chunk := f.read(chunk_size):
|
||||
md5.update(chunk)
|
||||
return md5.hexdigest()[:8]
|
||||
except Exception as e:
|
||||
print(f"计算音频MD5失败:{e}")
|
||||
return f"err_{random.randint(10000000, 99999999)}"
|
||||
|
||||
def ensure_dir_exists(dir_path):
|
||||
"""确保目录存在,不存在则创建"""
|
||||
if dir_path and not dir_path.exists():
|
||||
dir_path.mkdir(exist_ok=True, parents=True)
|
||||
|
||||
# ===================== 1. 模型配置持久化(last_selected_models.json) =====================
|
||||
def init_last_selected_models(gpt_default, sovits_default, current_version):
|
||||
"""初始化模型配置文件,写入默认模型路径"""
|
||||
ensure_dir_exists(LAST_SELECTED_MODELS_JSON.parent)
|
||||
init_data = {
|
||||
"gpt_model_path": gpt_default,
|
||||
"sovits_model_path": sovits_default,
|
||||
"version": current_version
|
||||
}
|
||||
with open(LAST_SELECTED_MODELS_JSON, "w", encoding="utf-8") as f:
|
||||
json.dump(init_data, f, ensure_ascii=False, indent=4)
|
||||
print(f"首次生成模型配置文件:{LAST_SELECTED_MODELS_JSON}")
|
||||
return init_data
|
||||
|
||||
def read_last_selected_models():
|
||||
"""读取模型配置文件中的路径"""
|
||||
if not LAST_SELECTED_MODELS_JSON.exists():
|
||||
return None
|
||||
try:
|
||||
with open(LAST_SELECTED_MODELS_JSON, "r", encoding="utf-8") as f:
|
||||
data = json.load(f)
|
||||
# 校验必要字段
|
||||
required_fields = ["gpt_model_path", "sovits_model_path", "version"]
|
||||
for field in required_fields:
|
||||
if field not in data:
|
||||
return None
|
||||
return data
|
||||
except Exception as e:
|
||||
print(f"读取模型配置失败:{e}")
|
||||
return None
|
||||
|
||||
def write_last_selected_models(gpt_path_new, sovits_path_new, current_version):
|
||||
"""写入新的模型路径到配置文件"""
|
||||
ensure_dir_exists(LAST_SELECTED_MODELS_JSON.parent)
|
||||
try:
|
||||
data = read_last_selected_models() or {}
|
||||
data["gpt_model_path"] = gpt_path_new
|
||||
data["sovits_model_path"] = sovits_path_new
|
||||
data["version"] = current_version
|
||||
with open(LAST_SELECTED_MODELS_JSON, "w", encoding="utf-8") as f:
|
||||
json.dump(data, f, ensure_ascii=False, indent=4)
|
||||
except Exception as e:
|
||||
print(f"写入模型配置失败:{e}")
|
||||
|
||||
# ===================== 2. 参考音频预设持久化(last_selected_preset.json + ref_audios_presets.yaml) =====================
|
||||
# 2.1 最后选中预设的读写清
|
||||
def read_last_selected_preset():
|
||||
"""读取最后一次选中的预设名称"""
|
||||
if not LAST_SELECTED_PRESET_JSON.exists():
|
||||
return None
|
||||
try:
|
||||
with open(LAST_SELECTED_PRESET_JSON, "r", encoding="utf-8") as f:
|
||||
data = json.load(f)
|
||||
return data.get("last_selected_preset")
|
||||
except Exception as e:
|
||||
print(f"读取最后选中预设失败:{e}")
|
||||
return None
|
||||
|
||||
def write_last_selected_preset(preset_name):
|
||||
"""写入最后一次选中的预设名称"""
|
||||
ensure_dir_exists(LAST_SELECTED_PRESET_JSON.parent)
|
||||
try:
|
||||
data = {"last_selected_preset": preset_name.strip()}
|
||||
with open(LAST_SELECTED_PRESET_JSON, "w", encoding="utf-8") as f:
|
||||
json.dump(data, f, ensure_ascii=False, indent=4)
|
||||
print(f"已记录最后选中的预设:{preset_name.strip()}")
|
||||
except Exception as e:
|
||||
print(f"写入最后选中预设失败:{e}")
|
||||
|
||||
def clear_last_selected_preset():
|
||||
"""清空最后选中的预设记录"""
|
||||
if not LAST_SELECTED_PRESET_JSON.exists():
|
||||
return
|
||||
try:
|
||||
with open(LAST_SELECTED_PRESET_JSON, "w", encoding="utf-8") as f:
|
||||
json.dump({"last_selected_preset": ""}, f, ensure_ascii=False, indent=4)
|
||||
except Exception as e:
|
||||
print(f"清空最后选中预设失败:{e}")
|
||||
|
||||
# 2.2 参考预设配置的加载/保存/删除
|
||||
def load_ref_presets():
|
||||
"""加载多组参考预设配置"""
|
||||
ensure_dir_exists(REF_PRESETS_YAML.parent)
|
||||
|
||||
# 新增:配置文件不存在时,自动创建空文件
|
||||
if not REF_PRESETS_YAML.exists():
|
||||
with open(REF_PRESETS_YAML, "w", encoding="utf-8") as f:
|
||||
yaml.dump([], f, indent=4, allow_unicode=True)
|
||||
print(f"暂未检测到参考预设配置文件,已自动创建空文件:{REF_PRESETS_YAML}")
|
||||
return []
|
||||
|
||||
try:
|
||||
with open(REF_PRESETS_YAML, "r", encoding="utf-8") as f:
|
||||
presets = yaml.load(f, Loader=yaml.FullLoader) or []
|
||||
|
||||
# 兼容旧格式转换
|
||||
if isinstance(presets, dict):
|
||||
presets = [{"name": "旧配置转换", "ref_audio_path": presets.get("ref_audio_path"),
|
||||
"prompt_text": presets.get("prompt_text", ""), "prompt_language": presets.get("prompt_language", "中文")}]
|
||||
|
||||
# 补充缺失字段 + 校验音频路径
|
||||
default_template = {"name": "", "ref_audio_path": None, "prompt_text": "", "prompt_language": "中文"}
|
||||
for preset in presets:
|
||||
for key, value in default_template.items():
|
||||
preset.setdefault(key, value)
|
||||
# 校验音频路径有效性
|
||||
audio_path = preset["ref_audio_path"]
|
||||
if audio_path and not os.path.exists(str(audio_path)):
|
||||
preset["ref_audio_path"] = None
|
||||
|
||||
# 清理冗余音频
|
||||
clean_unreferenced_audios(presets)
|
||||
print(f"参考预设加载成功,共 {len(presets)} 组")
|
||||
return presets
|
||||
except Exception as e:
|
||||
print(f"参考预设加载失败:{e}")
|
||||
return []
|
||||
|
||||
def get_preset_by_name(preset_name, presets=None):
|
||||
"""根据配置名称查询对应的配置详情"""
|
||||
# 核心修复:先判断 preset_name 是否为 None,避免 AttributeError
|
||||
if preset_name is None:
|
||||
return {"name": "", "ref_audio_path": None, "prompt_text": "", "prompt_language": "中文"}
|
||||
|
||||
if not presets:
|
||||
presets = load_ref_presets()
|
||||
|
||||
# 现在再调用 strip(),确保 preset_name 不是 None
|
||||
preset_name_str = preset_name.strip()
|
||||
for preset in presets:
|
||||
if preset["name"].strip() == preset_name_str:
|
||||
return preset
|
||||
|
||||
# 无匹配预设时,返回空的合法预设字典
|
||||
return {"name": "", "ref_audio_path": None, "prompt_text": "", "prompt_language": "中文"}
|
||||
|
||||
def save_ref_preset_core(preset_name, ref_audio_path, prompt_text, prompt_language, confirm_override=False):
|
||||
"""保存/覆盖参考预设核心逻辑(返回:提示信息、是否成功、预设列表)"""
|
||||
ensure_dir_exists(REF_AUDIO_DIR)
|
||||
presets = load_ref_presets()
|
||||
preset_name = preset_name.strip()
|
||||
|
||||
# 前置校验
|
||||
if not ref_audio_path or not os.path.exists(str(ref_audio_path)):
|
||||
return "保存失败!请先上传有效的主参考音频文件。", False, [p["name"] for p in presets]
|
||||
if not preset_name:
|
||||
return "保存失败!配置名称不能为空。", False, [p["name"] for p in presets]
|
||||
|
||||
# 音频持久化处理
|
||||
persistent_audio_path = get_persistent_audio_path(ref_audio_path, preset_name)
|
||||
if not persistent_audio_path:
|
||||
return "保存失败!音频文件持久化存储失败。", False, [p["name"] for p in presets]
|
||||
|
||||
# 同名检测
|
||||
preset_index = -1
|
||||
for idx, p in enumerate(presets):
|
||||
if p["name"].strip() == preset_name:
|
||||
preset_index = idx
|
||||
break
|
||||
|
||||
if preset_index >= 0 and not confirm_override:
|
||||
return f"配置「{preset_name}」已存在,如需替换请确认覆盖!", False, [p["name"] for p in presets]
|
||||
|
||||
# 构造新配置
|
||||
new_preset = {
|
||||
"name": preset_name,
|
||||
"ref_audio_path": persistent_audio_path,
|
||||
"prompt_text": prompt_text,
|
||||
"prompt_language": prompt_language
|
||||
}
|
||||
|
||||
# 更新配置列表
|
||||
is_new_preset = preset_index < 0
|
||||
if preset_index >= 0:
|
||||
presets[preset_index] = new_preset
|
||||
tip = "同名配置已覆盖!"
|
||||
else:
|
||||
presets.append(new_preset)
|
||||
tip = "新配置已新增!"
|
||||
|
||||
# 写入配置文件
|
||||
try:
|
||||
with open(REF_PRESETS_YAML, "w", encoding="utf-8") as f:
|
||||
yaml.dump(presets, f, indent=4, allow_unicode=True)
|
||||
|
||||
# 新增预设自动记录为最后选中
|
||||
if is_new_preset:
|
||||
write_last_selected_preset(preset_name)
|
||||
|
||||
preset_names = [p["name"] for p in presets]
|
||||
return f"配置保存成功!{tip}", True, preset_names
|
||||
except Exception as e:
|
||||
return f"保存失败:{str(e)}", False, [p["name"] for p in presets]
|
||||
|
||||
def delete_ref_preset_core(preset_name):
|
||||
"""删除参考预设核心逻辑(返回:提示信息、预设列表、默认选中预设)"""
|
||||
presets = load_ref_presets()
|
||||
preset_name = preset_name.strip()
|
||||
|
||||
if not presets:
|
||||
return "暂无配置可删除!", [], None
|
||||
|
||||
# 获取待删除音频路径
|
||||
target_audio_path = None
|
||||
for p in presets:
|
||||
if p["name"].strip() == preset_name:
|
||||
target_audio_path = p.get("ref_audio_path")
|
||||
break
|
||||
|
||||
# 过滤删除
|
||||
presets = [p for p in presets if p["name"].strip() != preset_name]
|
||||
|
||||
# 写入配置文件
|
||||
try:
|
||||
with open(REF_PRESETS_YAML, "w", encoding="utf-8") as f:
|
||||
yaml.dump(presets, f, indent=4, allow_unicode=True)
|
||||
|
||||
# 删除对应音频
|
||||
if target_audio_path and os.path.exists(target_audio_path):
|
||||
os.unlink(target_audio_path)
|
||||
print(f"同步删除配置对应音频:{target_audio_path}")
|
||||
|
||||
# 清空最后选中记录(若删除的是最后选中的预设)
|
||||
last_selected = read_last_selected_preset()
|
||||
if last_selected and last_selected == preset_name:
|
||||
clear_last_selected_preset()
|
||||
|
||||
preset_names = [p["name"] for p in presets]
|
||||
new_selected = preset_names[0] if preset_names else None
|
||||
tip = "配置删除成功!已同步清理对应音频文件" if preset_names else "配置删除成功!已同步清理对应音频文件,当前无剩余配置"
|
||||
return tip, preset_names, new_selected
|
||||
except Exception as e:
|
||||
return f"删除失败:{str(e)}", [p["name"] for p in presets], preset_name
|
||||
|
||||
# 2.3 参考音频文件管理
|
||||
def get_persistent_audio_path(src_audio_path, preset_name):
|
||||
"""获取音频持久化路径,清理同配置名旧音频"""
|
||||
if not src_audio_path or not os.path.exists(src_audio_path):
|
||||
return None
|
||||
|
||||
# 清理文件名
|
||||
safe_preset_name = sanitize_filename(preset_name)
|
||||
safe_preset_name = safe_preset_name[:MAX_FILENAME_LENGTH]
|
||||
|
||||
# 提取后缀
|
||||
src_suffix = Path(src_audio_path).suffix.lower()
|
||||
if not src_suffix or src_suffix not in [".wav", ".mp3", ".flac", ".ogg", ".m4a"]:
|
||||
src_suffix = ".wav"
|
||||
|
||||
# 计算MD5
|
||||
audio_md5 = get_audio_md5(src_audio_path)
|
||||
dst_filename = f"{safe_preset_name}_{audio_md5}{src_suffix}"
|
||||
dst_path = REF_AUDIO_DIR / dst_filename
|
||||
|
||||
# 清理同配置名旧音频
|
||||
for old_audio in REF_AUDIO_DIR.glob(f"{safe_preset_name}_*"):
|
||||
if old_audio.suffix.lower() in [".wav", ".mp3", ".flac", ".ogg", ".m4a"]:
|
||||
try:
|
||||
old_audio.unlink()
|
||||
except Exception as e:
|
||||
print(f"清理旧音频失败:{e}")
|
||||
|
||||
# 复制新音频
|
||||
try:
|
||||
shutil.copy2(src_audio_path, dst_path)
|
||||
return str(dst_path)
|
||||
except Exception as e:
|
||||
print(f"音频持久化复制失败:{e}")
|
||||
return None
|
||||
|
||||
def clean_unreferenced_audios(presets):
|
||||
"""清理未被任何预设引用的冗余音频"""
|
||||
if not REF_AUDIO_DIR.exists():
|
||||
return
|
||||
|
||||
# 收集已引用音频
|
||||
referenced = set()
|
||||
for preset in presets:
|
||||
audio_path = preset.get("ref_audio_path")
|
||||
if audio_path and os.path.exists(audio_path):
|
||||
referenced.add(Path(audio_path).absolute())
|
||||
|
||||
# 删除未引用音频
|
||||
deleted_count = 0
|
||||
for audio_file in REF_AUDIO_DIR.glob("*"):
|
||||
if audio_file.is_file() and audio_file.suffix.lower() in [".wav", ".mp3", ".flac", ".ogg", ".m4a"]:
|
||||
if audio_file.absolute() not in referenced:
|
||||
try:
|
||||
audio_file.unlink()
|
||||
deleted_count += 1
|
||||
except Exception as e:
|
||||
print(f"清理冗余音频失败:{e}")
|
||||
|
||||
if deleted_count > 0:
|
||||
print(f"清理冗余未引用音频 {deleted_count} 个")
|
||||
|
||||
# ===================== 3. 推理参数持久化(infer_settings.json) =====================
|
||||
def load_infer_settings():
|
||||
"""加载推理参数配置"""
|
||||
ensure_dir_exists(INFER_SETTINGS_JSON.parent)
|
||||
if not INFER_SETTINGS_JSON.exists():
|
||||
return DEFAULT_INFER_SETTINGS
|
||||
try:
|
||||
with open(INFER_SETTINGS_JSON, "r", encoding="utf-8") as f:
|
||||
saved = json.load(f)
|
||||
return {**DEFAULT_INFER_SETTINGS, **saved}
|
||||
except Exception as e:
|
||||
print(f"加载推理参数失败,使用默认值:{e}")
|
||||
return DEFAULT_INFER_SETTINGS
|
||||
|
||||
def save_infer_settings_core(settings):
|
||||
"""保存推理参数核心逻辑(返回:提示信息)"""
|
||||
ensure_dir_exists(INFER_SETTINGS_JSON.parent)
|
||||
try:
|
||||
with open(INFER_SETTINGS_JSON, "w", encoding="utf-8") as f:
|
||||
json.dump(settings, f, indent=4, ensure_ascii=False)
|
||||
|
||||
# 精简日志输出
|
||||
print(f"✅ 推理配置保存成功:{INFER_SETTINGS_JSON.absolute()}")
|
||||
return "推理设置保存成功!已覆盖原有配置文件。"
|
||||
except Exception as e:
|
||||
print(f"❌ 推理配置保存失败:{e}")
|
||||
return f"推理设置保存失败:{str(e)}"
|
||||
|
||||
def restore_default_infer_settings_core():
|
||||
"""恢复推理参数默认值核心逻辑(返回:默认参数列表)"""
|
||||
ensure_dir_exists(INFER_SETTINGS_JSON.parent)
|
||||
try:
|
||||
with open(INFER_SETTINGS_JSON, "w", encoding="utf-8") as f:
|
||||
json.dump(DEFAULT_INFER_SETTINGS, f, indent=4, ensure_ascii=False)
|
||||
print(f"✅ 推理配置已恢复默认值:{INFER_SETTINGS_JSON.absolute()}")
|
||||
except Exception as e:
|
||||
print(f"❌ 推理配置恢复默认失败:{e}")
|
||||
|
||||
# 返回默认参数(按顺序对应UI组件)
|
||||
return [
|
||||
DEFAULT_INFER_SETTINGS["batch_size"],
|
||||
DEFAULT_INFER_SETTINGS["sample_steps"],
|
||||
DEFAULT_INFER_SETTINGS["fragment_interval"],
|
||||
DEFAULT_INFER_SETTINGS["speed_factor"],
|
||||
DEFAULT_INFER_SETTINGS["top_k"],
|
||||
DEFAULT_INFER_SETTINGS["top_p"],
|
||||
DEFAULT_INFER_SETTINGS["temperature"],
|
||||
DEFAULT_INFER_SETTINGS["repetition_penalty"],
|
||||
DEFAULT_INFER_SETTINGS["how_to_cut"],
|
||||
DEFAULT_INFER_SETTINGS["super_sampling"],
|
||||
DEFAULT_INFER_SETTINGS["parallel_infer"],
|
||||
DEFAULT_INFER_SETTINGS["split_bucket"],
|
||||
DEFAULT_INFER_SETTINGS["seed"],
|
||||
DEFAULT_INFER_SETTINGS["keep_random"]
|
||||
]
|
||||
33
gowebui_batched_infer.bat
Normal file
33
gowebui_batched_infer.bat
Normal file
@ -0,0 +1,33 @@
|
||||
@echo off
|
||||
:: 1. 切换命令行编码为UTF-8,解决中文显示乱码(必须放在最前面)
|
||||
chcp 65001 > nul
|
||||
|
||||
:: 2. 获取当前bat文件所在目录并格式化
|
||||
set "SCRIPT_DIR=%~dp0"
|
||||
set "SCRIPT_DIR=%SCRIPT_DIR:~0,-1%"
|
||||
|
||||
:: 3. 切换到脚本根目录
|
||||
cd /d "%SCRIPT_DIR%"
|
||||
|
||||
:: 4. 创建专属TEMP目录(补充主页面的核心步骤)
|
||||
if not exist "TEMP" md "TEMP"
|
||||
set "TEMP=%SCRIPT_DIR%\TEMP"
|
||||
|
||||
:: 5. 设置核心环境变量(补充推理脚本依赖的配置)
|
||||
set "version=v2Pro"
|
||||
:: 语言配置
|
||||
set "language=zh_CN"
|
||||
:: 启用半精度推理(GPU用户推荐,CPU用户改为False)
|
||||
set "is_half=True"
|
||||
:: 指定GPU卡号(多卡可修改,无GPU则删除此行)
|
||||
set "_CUDA_VISIBLE_DEVICES=0"
|
||||
|
||||
:: 6. 将runtime目录加入环境变量,确保能调用内置python
|
||||
set "PATH=%SCRIPT_DIR%\runtime;%PATH%"
|
||||
|
||||
:: 7. 直接启动并行推理脚本,传入中文语言参数
|
||||
echo 正在启动GPT-SoVITS并行推理页面...
|
||||
runtime\python.exe -I GPT_SoVITS/inference_webui_fast.py zh_CN
|
||||
|
||||
:: 8. 执行完成后暂停,便于查看报错信息
|
||||
pause
|
||||
Loading…
x
Reference in New Issue
Block a user