Add module managing RVC extension to convert audio sent by ST using voice model

This commit is contained in:
Tony Ribeiro
2023-08-09 03:38:13 +02:00
parent 9a650c4cec
commit 0d00b5fdd0
13 changed files with 3778 additions and 0 deletions

View File

0
data/tmp/.placeholder Normal file
View File

View File

@@ -0,0 +1,186 @@
import gc
import os.path
import numpy as np
import parselmouth
import torch
import pyworld
import torchcrepe
from scipy import signal
from torch import Tensor
def get_f0_crepe_computation(
x,
f0_min,
f0_max,
p_len,
sr,
hop_length=128,
# 512 before. Hop length changes the speed that the voice jumps to a different dramatic pitch. Lower hop lengths means more pitch accuracy but longer inference time.
model="full", # Either use crepe-tiny "tiny" or crepe "full". Default is full
):
x = x.astype(np.float32) # fixes the F.conv2D exception. We needed to convert double to float.
x /= np.quantile(np.abs(x), 0.999)
torch_device = 'cuda' if torch.cuda.is_available() else 'cpu'
audio = torch.from_numpy(x).to(torch_device, copy=True)
audio = torch.unsqueeze(audio, dim=0)
if audio.ndim == 2 and audio.shape[0] > 1:
audio = torch.mean(audio, dim=0, keepdim=True).detach()
audio = audio.detach()
# print("Initiating prediction with a crepe_hop_length of: " + str(hop_length))
pitch: torch.Tensor = torchcrepe.predict(
audio,
sr,
hop_length,
f0_min,
f0_max,
model,
batch_size=hop_length * 2,
device=torch_device,
pad=True
)
p_len = p_len or x.shape[0] // hop_length
# Resize the pitch for final f0
source = np.array(pitch.squeeze(0).cpu().float().numpy())
source[source < 0.001] = np.nan
target = np.interp(
np.arange(0, len(source) * p_len, len(source)) / p_len,
np.arange(0, len(source)),
source
)
f0 = np.nan_to_num(target)
return f0 # Resized f0
def get_mangio_crepe_f0(x, f0_min, f0_max, p_len, sr, crepe_hop_length, model='full'):
# print("Performing crepe pitch extraction. (EXPERIMENTAL)")
# print("CREPE PITCH EXTRACTION HOP LENGTH: " + str(crepe_hop_length))
x = x.astype(np.float32)
x /= np.quantile(np.abs(x), 0.999)
torch_device_index = 0
torch_device = None
if torch.cuda.is_available():
torch_device = torch.device(f"cuda:{torch_device_index % torch.cuda.device_count()}")
elif torch.backends.mps.is_available():
torch_device = torch.device("mps")
else:
torch_device = torch.device("cpu")
audio = torch.from_numpy(x).to(torch_device, copy=True)
audio = torch.unsqueeze(audio, dim=0)
if audio.ndim == 2 and audio.shape[0] > 1:
audio = torch.mean(audio, dim=0, keepdim=True).detach()
audio = audio.detach()
# print(
# "Initiating f0 Crepe Feature Extraction with an extraction_crepe_hop_length of: " +
# str(crepe_hop_length)
# )
# Pitch prediction for pitch extraction
pitch: Tensor = torchcrepe.predict(
audio,
sr,
crepe_hop_length,
f0_min,
f0_max,
model,
batch_size=crepe_hop_length * 2,
device=torch_device,
pad=True
)
p_len = p_len or x.shape[0] // crepe_hop_length
# Resize the pitch
source = np.array(pitch.squeeze(0).cpu().float().numpy())
source[source < 0.001] = np.nan
target = np.interp(
np.arange(0, len(source) * p_len, len(source)) / p_len,
np.arange(0, len(source)),
source
)
return np.nan_to_num(target)
def pitch_extract(f0_method, x, f0_min, f0_max, p_len, time_step, sr, window, crepe_hop_length, filter_radius=3):
f0s = []
f0 = np.zeros(p_len)
for method in f0_method if isinstance(f0_method, list) else [f0_method]:
if method == "pm":
f0 = (
parselmouth.Sound(x, sr)
.to_pitch_ac(
time_step=time_step / 1000,
voicing_threshold=0.6,
pitch_floor=f0_min,
pitch_ceiling=f0_max,
)
.selected_array["frequency"]
)
pad_size = (p_len - len(f0) + 1) // 2
if pad_size > 0 or p_len - len(f0) - pad_size > 0:
f0 = np.pad(
f0, [[pad_size, p_len - len(f0) - pad_size]], mode="constant"
)
elif method in ['harvest', 'dio']:
if method == 'harvest':
f0, t = pyworld.harvest(
x.astype(np.double),
fs=sr,
f0_ceil=f0_max,
f0_floor=f0_min,
frame_period=10,
)
elif method == "dio":
f0, t = pyworld.dio(
x.astype(np.double),
fs=sr,
f0_ceil=f0_max,
f0_floor=f0_min,
frame_period=10,
)
f0 = pyworld.stonemask(x.astype(np.double), f0, t, sr)
elif method == "torchcrepe":
f0 = get_f0_crepe_computation(x, f0_min, f0_max, p_len, sr, crepe_hop_length)
elif method == "torchcrepe tiny":
f0 = get_f0_crepe_computation(x, f0_min, f0_max, p_len, sr, crepe_hop_length, "tiny")
elif method == "mangio-crepe":
f0 = get_mangio_crepe_f0(x, f0_min, f0_max, p_len, sr, crepe_hop_length)
elif method == "mangio-crepe tiny":
f0 = get_mangio_crepe_f0(x, f0_min, f0_max, p_len, sr, crepe_hop_length, 'tiny')
elif method == "rmvpe":
rmvpe_model_path = os.path.join('data', 'models', 'rmvpe')
rmvpe_model_file = os.path.join(rmvpe_model_path, 'rmvpe.pt')
if not os.path.isfile(rmvpe_model_file):
import huggingface_hub
rmvpe_model_file = huggingface_hub.hf_hub_download('lj1995/VoiceConversionWebUI', 'rmvpe.pt', local_dir=rmvpe_model_path, local_dir_use_symlinks=False)
from modules.voice_conversion.rvc.rmvpe import RMVPE
print("loading rmvpe model")
model_rmvpe = RMVPE(rmvpe_model_file, is_half=True, device=None)
f0 = model_rmvpe.infer_from_audio(x, thred=0.03)
del model_rmvpe
torch.cuda.empty_cache()
gc.collect()
f0s.append(f0)
if not f0s:
f0s = [f0]
f0s_new = []
for f0_val in f0s:
_len = f0_val.shape[0]
if _len == p_len:
f0s_new.append(f0)
continue
if _len > p_len:
f0 = f0[:p_len]
f0s_new.append(f0)
continue
if _len < p_len:
print('WARNING: len < p_len, skipping this f0')
f0 = np.nanmedian(np.stack(f0s_new, axis=0), axis=0)
if filter_radius >= 2:
f0 = signal.medfilt(f0, filter_radius)
return f0

View File

@@ -0,0 +1,46 @@
import os.path
import shutil
import urllib.request
import huggingface_hub
class HuBERTManager:
@staticmethod
def make_sure_hubert_installed(download_url: str = 'https://dl.fbaipublicfiles.com/hubert/hubert_base_ls960.pt', file_name: str = 'hubert.pt'):
install_dir = os.path.join('data', 'models', 'hubert')
if not os.path.isdir(install_dir):
os.makedirs(install_dir, exist_ok=True)
install_file = os.path.join(install_dir, file_name)
if not os.path.isfile(install_file):
print('Downloading HuBERT base model')
urllib.request.urlretrieve(download_url, install_file)
print('Downloaded HuBERT')
return install_file
@staticmethod
def make_sure_tokenizer_installed(model: str = 'quantifier_hubert_base_ls960_14.pth', repo: str = 'GitMylo/bark-voice-cloning', local_file: str = 'tokenizer.pth'):
install_dir = os.path.join('data', 'models', 'hubert')
if not os.path.isdir(install_dir):
os.makedirs(install_dir, exist_ok=True)
install_file = os.path.join(install_dir, local_file)
if not os.path.isfile(install_file):
print('Downloading HuBERT custom tokenizer')
huggingface_hub.hf_hub_download(repo, model, local_dir=install_dir, local_dir_use_symlinks=False)
shutil.move(os.path.join(install_dir, model), install_file)
print('Downloaded tokenizer')
return install_file
@staticmethod
def make_sure_hubert_rvc_installed(model: str = 'hubert_base.pt', repo: str = 'lj1995/VoiceConversionWebUI', local_file: str = 'hubert_rvc.pt'):
install_dir = os.path.join('data', 'models', 'hubert')
if not os.path.isdir(install_dir):
os.makedirs(install_dir, exist_ok=True)
install_file = os.path.join(install_dir, local_file)
if not os.path.isfile(install_file):
print('Downloading HuBERT for RVC')
huggingface_hub.hf_hub_download(repo, model, local_dir=install_dir, local_dir_use_symlinks=False)
shutil.move(os.path.join(install_dir, model), install_file)
print('Downloaded HuBERT for RVC')
return install_file

View File

@@ -0,0 +1,417 @@
import copy
import math
import numpy as np
import torch
from torch import nn
from torch.nn import functional as F
from modules.voice_conversion.rvc.infer_pack import commons
from modules.voice_conversion.rvc.infer_pack import modules
from modules.voice_conversion.rvc.infer_pack.modules import LayerNorm
class Encoder(nn.Module):
def __init__(
self,
hidden_channels,
filter_channels,
n_heads,
n_layers,
kernel_size=1,
p_dropout=0.0,
window_size=10,
**kwargs
):
super().__init__()
self.hidden_channels = hidden_channels
self.filter_channels = filter_channels
self.n_heads = n_heads
self.n_layers = n_layers
self.kernel_size = kernel_size
self.p_dropout = p_dropout
self.window_size = window_size
self.drop = nn.Dropout(p_dropout)
self.attn_layers = nn.ModuleList()
self.norm_layers_1 = nn.ModuleList()
self.ffn_layers = nn.ModuleList()
self.norm_layers_2 = nn.ModuleList()
for i in range(self.n_layers):
self.attn_layers.append(
MultiHeadAttention(
hidden_channels,
hidden_channels,
n_heads,
p_dropout=p_dropout,
window_size=window_size,
)
)
self.norm_layers_1.append(LayerNorm(hidden_channels))
self.ffn_layers.append(
FFN(
hidden_channels,
hidden_channels,
filter_channels,
kernel_size,
p_dropout=p_dropout,
)
)
self.norm_layers_2.append(LayerNorm(hidden_channels))
def forward(self, x, x_mask):
attn_mask = x_mask.unsqueeze(2) * x_mask.unsqueeze(-1)
x = x * x_mask
for i in range(self.n_layers):
y = self.attn_layers[i](x, x, attn_mask)
y = self.drop(y)
x = self.norm_layers_1[i](x + y)
y = self.ffn_layers[i](x, x_mask)
y = self.drop(y)
x = self.norm_layers_2[i](x + y)
x = x * x_mask
return x
class Decoder(nn.Module):
def __init__(
self,
hidden_channels,
filter_channels,
n_heads,
n_layers,
kernel_size=1,
p_dropout=0.0,
proximal_bias=False,
proximal_init=True,
**kwargs
):
super().__init__()
self.hidden_channels = hidden_channels
self.filter_channels = filter_channels
self.n_heads = n_heads
self.n_layers = n_layers
self.kernel_size = kernel_size
self.p_dropout = p_dropout
self.proximal_bias = proximal_bias
self.proximal_init = proximal_init
self.drop = nn.Dropout(p_dropout)
self.self_attn_layers = nn.ModuleList()
self.norm_layers_0 = nn.ModuleList()
self.encdec_attn_layers = nn.ModuleList()
self.norm_layers_1 = nn.ModuleList()
self.ffn_layers = nn.ModuleList()
self.norm_layers_2 = nn.ModuleList()
for i in range(self.n_layers):
self.self_attn_layers.append(
MultiHeadAttention(
hidden_channels,
hidden_channels,
n_heads,
p_dropout=p_dropout,
proximal_bias=proximal_bias,
proximal_init=proximal_init,
)
)
self.norm_layers_0.append(LayerNorm(hidden_channels))
self.encdec_attn_layers.append(
MultiHeadAttention(
hidden_channels, hidden_channels, n_heads, p_dropout=p_dropout
)
)
self.norm_layers_1.append(LayerNorm(hidden_channels))
self.ffn_layers.append(
FFN(
hidden_channels,
hidden_channels,
filter_channels,
kernel_size,
p_dropout=p_dropout,
causal=True,
)
)
self.norm_layers_2.append(LayerNorm(hidden_channels))
def forward(self, x, x_mask, h, h_mask):
"""
x: decoder input
h: encoder output
"""
self_attn_mask = commons.subsequent_mask(x_mask.size(2)).to(
device=x.device, dtype=x.dtype
)
encdec_attn_mask = h_mask.unsqueeze(2) * x_mask.unsqueeze(-1)
x = x * x_mask
for i in range(self.n_layers):
y = self.self_attn_layers[i](x, x, self_attn_mask)
y = self.drop(y)
x = self.norm_layers_0[i](x + y)
y = self.encdec_attn_layers[i](x, h, encdec_attn_mask)
y = self.drop(y)
x = self.norm_layers_1[i](x + y)
y = self.ffn_layers[i](x, x_mask)
y = self.drop(y)
x = self.norm_layers_2[i](x + y)
x = x * x_mask
return x
class MultiHeadAttention(nn.Module):
def __init__(
self,
channels,
out_channels,
n_heads,
p_dropout=0.0,
window_size=None,
heads_share=True,
block_length=None,
proximal_bias=False,
proximal_init=False,
):
super().__init__()
assert channels % n_heads == 0
self.channels = channels
self.out_channels = out_channels
self.n_heads = n_heads
self.p_dropout = p_dropout
self.window_size = window_size
self.heads_share = heads_share
self.block_length = block_length
self.proximal_bias = proximal_bias
self.proximal_init = proximal_init
self.attn = None
self.k_channels = channels // n_heads
self.conv_q = nn.Conv1d(channels, channels, 1)
self.conv_k = nn.Conv1d(channels, channels, 1)
self.conv_v = nn.Conv1d(channels, channels, 1)
self.conv_o = nn.Conv1d(channels, out_channels, 1)
self.drop = nn.Dropout(p_dropout)
if window_size is not None:
n_heads_rel = 1 if heads_share else n_heads
rel_stddev = self.k_channels**-0.5
self.emb_rel_k = nn.Parameter(
torch.randn(n_heads_rel, window_size * 2 + 1, self.k_channels)
* rel_stddev
)
self.emb_rel_v = nn.Parameter(
torch.randn(n_heads_rel, window_size * 2 + 1, self.k_channels)
* rel_stddev
)
nn.init.xavier_uniform_(self.conv_q.weight)
nn.init.xavier_uniform_(self.conv_k.weight)
nn.init.xavier_uniform_(self.conv_v.weight)
if proximal_init:
with torch.no_grad():
self.conv_k.weight.copy_(self.conv_q.weight)
self.conv_k.bias.copy_(self.conv_q.bias)
def forward(self, x, c, attn_mask=None):
q = self.conv_q(x)
k = self.conv_k(c)
v = self.conv_v(c)
x, self.attn = self.attention(q, k, v, mask=attn_mask)
x = self.conv_o(x)
return x
def attention(self, query, key, value, mask=None):
# reshape [b, d, t] -> [b, n_h, t, d_k]
b, d, t_s, t_t = (*key.size(), query.size(2))
query = query.view(b, self.n_heads, self.k_channels, t_t).transpose(2, 3)
key = key.view(b, self.n_heads, self.k_channels, t_s).transpose(2, 3)
value = value.view(b, self.n_heads, self.k_channels, t_s).transpose(2, 3)
scores = torch.matmul(query / math.sqrt(self.k_channels), key.transpose(-2, -1))
if self.window_size is not None:
assert (
t_s == t_t
), "Relative attention is only available for self-attention."
key_relative_embeddings = self._get_relative_embeddings(self.emb_rel_k, t_s)
rel_logits = self._matmul_with_relative_keys(
query / math.sqrt(self.k_channels), key_relative_embeddings
)
scores_local = self._relative_position_to_absolute_position(rel_logits)
scores = scores + scores_local
if self.proximal_bias:
assert t_s == t_t, "Proximal bias is only available for self-attention."
scores = scores + self._attention_bias_proximal(t_s).to(
device=scores.device, dtype=scores.dtype
)
if mask is not None:
scores = scores.masked_fill(mask == 0, -1e4)
if self.block_length is not None:
assert (
t_s == t_t
), "Local attention is only available for self-attention."
block_mask = (
torch.ones_like(scores)
.triu(-self.block_length)
.tril(self.block_length)
)
scores = scores.masked_fill(block_mask == 0, -1e4)
p_attn = F.softmax(scores, dim=-1) # [b, n_h, t_t, t_s]
p_attn = self.drop(p_attn)
output = torch.matmul(p_attn, value)
if self.window_size is not None:
relative_weights = self._absolute_position_to_relative_position(p_attn)
value_relative_embeddings = self._get_relative_embeddings(
self.emb_rel_v, t_s
)
output = output + self._matmul_with_relative_values(
relative_weights, value_relative_embeddings
)
output = (
output.transpose(2, 3).contiguous().view(b, d, t_t)
) # [b, n_h, t_t, d_k] -> [b, d, t_t]
return output, p_attn
def _matmul_with_relative_values(self, x, y):
"""
x: [b, h, l, m]
y: [h or 1, m, d]
ret: [b, h, l, d]
"""
ret = torch.matmul(x, y.unsqueeze(0))
return ret
def _matmul_with_relative_keys(self, x, y):
"""
x: [b, h, l, d]
y: [h or 1, m, d]
ret: [b, h, l, m]
"""
ret = torch.matmul(x, y.unsqueeze(0).transpose(-2, -1))
return ret
def _get_relative_embeddings(self, relative_embeddings, length):
max_relative_position = 2 * self.window_size + 1
# Pad first before slice to avoid using cond ops.
pad_length = max(length - (self.window_size + 1), 0)
slice_start_position = max((self.window_size + 1) - length, 0)
slice_end_position = slice_start_position + 2 * length - 1
if pad_length > 0:
padded_relative_embeddings = F.pad(
relative_embeddings,
commons.convert_pad_shape([[0, 0], [pad_length, pad_length], [0, 0]]),
)
else:
padded_relative_embeddings = relative_embeddings
used_relative_embeddings = padded_relative_embeddings[
:, slice_start_position:slice_end_position
]
return used_relative_embeddings
def _relative_position_to_absolute_position(self, x):
"""
x: [b, h, l, 2*l-1]
ret: [b, h, l, l]
"""
batch, heads, length, _ = x.size()
# Concat columns of pad to shift from relative to absolute indexing.
x = F.pad(x, commons.convert_pad_shape([[0, 0], [0, 0], [0, 0], [0, 1]]))
# Concat extra elements so to add up to shape (len+1, 2*len-1).
x_flat = x.view([batch, heads, length * 2 * length])
x_flat = F.pad(
x_flat, commons.convert_pad_shape([[0, 0], [0, 0], [0, length - 1]])
)
# Reshape and slice out the padded elements.
x_final = x_flat.view([batch, heads, length + 1, 2 * length - 1])[
:, :, :length, length - 1 :
]
return x_final
def _absolute_position_to_relative_position(self, x):
"""
x: [b, h, l, l]
ret: [b, h, l, 2*l-1]
"""
batch, heads, length, _ = x.size()
# padd along column
x = F.pad(
x, commons.convert_pad_shape([[0, 0], [0, 0], [0, 0], [0, length - 1]])
)
x_flat = x.view([batch, heads, length**2 + length * (length - 1)])
# add 0's in the beginning that will skew the elements after reshape
x_flat = F.pad(x_flat, commons.convert_pad_shape([[0, 0], [0, 0], [length, 0]]))
x_final = x_flat.view([batch, heads, length, 2 * length])[:, :, :, 1:]
return x_final
def _attention_bias_proximal(self, length):
"""Bias for self-attention to encourage attention to close positions.
Args:
length: an integer scalar.
Returns:
a Tensor with shape [1, 1, length, length]
"""
r = torch.arange(length, dtype=torch.float32)
diff = torch.unsqueeze(r, 0) - torch.unsqueeze(r, 1)
return torch.unsqueeze(torch.unsqueeze(-torch.log1p(torch.abs(diff)), 0), 0)
class FFN(nn.Module):
def __init__(
self,
in_channels,
out_channels,
filter_channels,
kernel_size,
p_dropout=0.0,
activation=None,
causal=False,
):
super().__init__()
self.in_channels = in_channels
self.out_channels = out_channels
self.filter_channels = filter_channels
self.kernel_size = kernel_size
self.p_dropout = p_dropout
self.activation = activation
self.causal = causal
if causal:
self.padding = self._causal_padding
else:
self.padding = self._same_padding
self.conv_1 = nn.Conv1d(in_channels, filter_channels, kernel_size)
self.conv_2 = nn.Conv1d(filter_channels, out_channels, kernel_size)
self.drop = nn.Dropout(p_dropout)
def forward(self, x, x_mask):
x = self.conv_1(self.padding(x * x_mask))
if self.activation == "gelu":
x = x * torch.sigmoid(1.702 * x)
else:
x = torch.relu(x)
x = self.drop(x)
x = self.conv_2(self.padding(x * x_mask))
return x * x_mask
def _causal_padding(self, x):
if self.kernel_size == 1:
return x
pad_l = self.kernel_size - 1
pad_r = 0
padding = [[0, 0], [0, 0], [pad_l, pad_r]]
x = F.pad(x, commons.convert_pad_shape(padding))
return x
def _same_padding(self, x):
if self.kernel_size == 1:
return x
pad_l = (self.kernel_size - 1) // 2
pad_r = self.kernel_size // 2
padding = [[0, 0], [0, 0], [pad_l, pad_r]]
x = F.pad(x, commons.convert_pad_shape(padding))
return x

View File

@@ -0,0 +1,166 @@
import math
import numpy as np
import torch
from torch import nn
from torch.nn import functional as F
def init_weights(m, mean=0.0, std=0.01):
classname = m.__class__.__name__
if classname.find("Conv") != -1:
m.weight.data.normal_(mean, std)
def get_padding(kernel_size, dilation=1):
return int((kernel_size * dilation - dilation) / 2)
def convert_pad_shape(pad_shape):
l = pad_shape[::-1]
pad_shape = [item for sublist in l for item in sublist]
return pad_shape
def kl_divergence(m_p, logs_p, m_q, logs_q):
"""KL(P||Q)"""
kl = (logs_q - logs_p) - 0.5
kl += (
0.5 * (torch.exp(2.0 * logs_p) + ((m_p - m_q) ** 2)) * torch.exp(-2.0 * logs_q)
)
return kl
def rand_gumbel(shape):
"""Sample from the Gumbel distribution, protect from overflows."""
uniform_samples = torch.rand(shape) * 0.99998 + 0.00001
return -torch.log(-torch.log(uniform_samples))
def rand_gumbel_like(x):
g = rand_gumbel(x.size()).to(dtype=x.dtype, device=x.device)
return g
def slice_segments(x, ids_str, segment_size=4):
ret = torch.zeros_like(x[:, :, :segment_size])
for i in range(x.size(0)):
idx_str = ids_str[i]
idx_end = idx_str + segment_size
ret[i] = x[i, :, idx_str:idx_end]
return ret
def slice_segments2(x, ids_str, segment_size=4):
ret = torch.zeros_like(x[:, :segment_size])
for i in range(x.size(0)):
idx_str = ids_str[i]
idx_end = idx_str + segment_size
ret[i] = x[i, idx_str:idx_end]
return ret
def rand_slice_segments(x, x_lengths=None, segment_size=4):
b, d, t = x.size()
if x_lengths is None:
x_lengths = t
ids_str_max = x_lengths - segment_size + 1
ids_str = (torch.rand([b]).to(device=x.device) * ids_str_max).to(dtype=torch.long)
ret = slice_segments(x, ids_str, segment_size)
return ret, ids_str
def get_timing_signal_1d(length, channels, min_timescale=1.0, max_timescale=1.0e4):
position = torch.arange(length, dtype=torch.float)
num_timescales = channels // 2
log_timescale_increment = math.log(float(max_timescale) / float(min_timescale)) / (
num_timescales - 1
)
inv_timescales = min_timescale * torch.exp(
torch.arange(num_timescales, dtype=torch.float) * -log_timescale_increment
)
scaled_time = position.unsqueeze(0) * inv_timescales.unsqueeze(1)
signal = torch.cat([torch.sin(scaled_time), torch.cos(scaled_time)], 0)
signal = F.pad(signal, [0, 0, 0, channels % 2])
signal = signal.view(1, channels, length)
return signal
def add_timing_signal_1d(x, min_timescale=1.0, max_timescale=1.0e4):
b, channels, length = x.size()
signal = get_timing_signal_1d(length, channels, min_timescale, max_timescale)
return x + signal.to(dtype=x.dtype, device=x.device)
def cat_timing_signal_1d(x, min_timescale=1.0, max_timescale=1.0e4, axis=1):
b, channels, length = x.size()
signal = get_timing_signal_1d(length, channels, min_timescale, max_timescale)
return torch.cat([x, signal.to(dtype=x.dtype, device=x.device)], axis)
def subsequent_mask(length):
mask = torch.tril(torch.ones(length, length)).unsqueeze(0).unsqueeze(0)
return mask
@torch.jit.script
def fused_add_tanh_sigmoid_multiply(input_a, input_b, n_channels):
n_channels_int = n_channels[0]
in_act = input_a + input_b
t_act = torch.tanh(in_act[:, :n_channels_int, :])
s_act = torch.sigmoid(in_act[:, n_channels_int:, :])
acts = t_act * s_act
return acts
def convert_pad_shape(pad_shape):
l = pad_shape[::-1]
pad_shape = [item for sublist in l for item in sublist]
return pad_shape
def shift_1d(x):
x = F.pad(x, convert_pad_shape([[0, 0], [0, 0], [1, 0]]))[:, :, :-1]
return x
def sequence_mask(length, max_length=None):
if max_length is None:
max_length = length.max()
x = torch.arange(max_length, dtype=length.dtype, device=length.device)
return x.unsqueeze(0) < length.unsqueeze(1)
def generate_path(duration, mask):
"""
duration: [b, 1, t_x]
mask: [b, 1, t_y, t_x]
"""
device = duration.device
b, _, t_y, t_x = mask.shape
cum_duration = torch.cumsum(duration, -1)
cum_duration_flat = cum_duration.view(b * t_x)
path = sequence_mask(cum_duration_flat, t_y).to(mask.dtype)
path = path.view(b, t_x, t_y)
path = path - F.pad(path, convert_pad_shape([[0, 0], [1, 0], [0, 0]]))[:, :-1]
path = path.unsqueeze(1).transpose(2, 3) * mask
return path
def clip_grad_value_(parameters, clip_value, norm_type=2):
if isinstance(parameters, torch.Tensor):
parameters = [parameters]
parameters = list(filter(lambda p: p.grad is not None, parameters))
norm_type = float(norm_type)
if clip_value is not None:
clip_value = float(clip_value)
total_norm = 0
for p in parameters:
param_norm = p.grad.data.norm(norm_type)
total_norm += param_norm.item() ** norm_type
if clip_value is not None:
p.grad.data.clamp_(min=-clip_value, max=clip_value)
total_norm = total_norm ** (1.0 / norm_type)
return total_norm

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,522 @@
import copy
import math
import numpy as np
import scipy
import torch
from torch import nn
from torch.nn import functional as F
from torch.nn import Conv1d, ConvTranspose1d, AvgPool1d, Conv2d
from torch.nn.utils import weight_norm, remove_weight_norm
from modules.voice_conversion.rvc.infer_pack import commons
from modules.voice_conversion.rvc.infer_pack.commons import init_weights, get_padding
from modules.voice_conversion.rvc.infer_pack.transforms import piecewise_rational_quadratic_transform
LRELU_SLOPE = 0.1
class LayerNorm(nn.Module):
def __init__(self, channels, eps=1e-5):
super().__init__()
self.channels = channels
self.eps = eps
self.gamma = nn.Parameter(torch.ones(channels))
self.beta = nn.Parameter(torch.zeros(channels))
def forward(self, x):
x = x.transpose(1, -1)
x = F.layer_norm(x, (self.channels,), self.gamma, self.beta, self.eps)
return x.transpose(1, -1)
class ConvReluNorm(nn.Module):
def __init__(
self,
in_channels,
hidden_channels,
out_channels,
kernel_size,
n_layers,
p_dropout,
):
super().__init__()
self.in_channels = in_channels
self.hidden_channels = hidden_channels
self.out_channels = out_channels
self.kernel_size = kernel_size
self.n_layers = n_layers
self.p_dropout = p_dropout
assert n_layers > 1, "Number of layers should be larger than 0."
self.conv_layers = nn.ModuleList()
self.norm_layers = nn.ModuleList()
self.conv_layers.append(
nn.Conv1d(
in_channels, hidden_channels, kernel_size, padding=kernel_size // 2
)
)
self.norm_layers.append(LayerNorm(hidden_channels))
self.relu_drop = nn.Sequential(nn.ReLU(), nn.Dropout(p_dropout))
for _ in range(n_layers - 1):
self.conv_layers.append(
nn.Conv1d(
hidden_channels,
hidden_channels,
kernel_size,
padding=kernel_size // 2,
)
)
self.norm_layers.append(LayerNorm(hidden_channels))
self.proj = nn.Conv1d(hidden_channels, out_channels, 1)
self.proj.weight.data.zero_()
self.proj.bias.data.zero_()
def forward(self, x, x_mask):
x_org = x
for i in range(self.n_layers):
x = self.conv_layers[i](x * x_mask)
x = self.norm_layers[i](x)
x = self.relu_drop(x)
x = x_org + self.proj(x)
return x * x_mask
class DDSConv(nn.Module):
"""
Dialted and Depth-Separable Convolution
"""
def __init__(self, channels, kernel_size, n_layers, p_dropout=0.0):
super().__init__()
self.channels = channels
self.kernel_size = kernel_size
self.n_layers = n_layers
self.p_dropout = p_dropout
self.drop = nn.Dropout(p_dropout)
self.convs_sep = nn.ModuleList()
self.convs_1x1 = nn.ModuleList()
self.norms_1 = nn.ModuleList()
self.norms_2 = nn.ModuleList()
for i in range(n_layers):
dilation = kernel_size**i
padding = (kernel_size * dilation - dilation) // 2
self.convs_sep.append(
nn.Conv1d(
channels,
channels,
kernel_size,
groups=channels,
dilation=dilation,
padding=padding,
)
)
self.convs_1x1.append(nn.Conv1d(channels, channels, 1))
self.norms_1.append(LayerNorm(channels))
self.norms_2.append(LayerNorm(channels))
def forward(self, x, x_mask, g=None):
if g is not None:
x = x + g
for i in range(self.n_layers):
y = self.convs_sep[i](x * x_mask)
y = self.norms_1[i](y)
y = F.gelu(y)
y = self.convs_1x1[i](y)
y = self.norms_2[i](y)
y = F.gelu(y)
y = self.drop(y)
x = x + y
return x * x_mask
class WN(torch.nn.Module):
def __init__(
self,
hidden_channels,
kernel_size,
dilation_rate,
n_layers,
gin_channels=0,
p_dropout=0,
):
super(WN, self).__init__()
assert kernel_size % 2 == 1
self.hidden_channels = hidden_channels
self.kernel_size = (kernel_size,)
self.dilation_rate = dilation_rate
self.n_layers = n_layers
self.gin_channels = gin_channels
self.p_dropout = p_dropout
self.in_layers = torch.nn.ModuleList()
self.res_skip_layers = torch.nn.ModuleList()
self.drop = nn.Dropout(p_dropout)
if gin_channels != 0:
cond_layer = torch.nn.Conv1d(
gin_channels, 2 * hidden_channels * n_layers, 1
)
self.cond_layer = torch.nn.utils.weight_norm(cond_layer, name="weight")
for i in range(n_layers):
dilation = dilation_rate**i
padding = int((kernel_size * dilation - dilation) / 2)
in_layer = torch.nn.Conv1d(
hidden_channels,
2 * hidden_channels,
kernel_size,
dilation=dilation,
padding=padding,
)
in_layer = torch.nn.utils.weight_norm(in_layer, name="weight")
self.in_layers.append(in_layer)
# last one is not necessary
if i < n_layers - 1:
res_skip_channels = 2 * hidden_channels
else:
res_skip_channels = hidden_channels
res_skip_layer = torch.nn.Conv1d(hidden_channels, res_skip_channels, 1)
res_skip_layer = torch.nn.utils.weight_norm(res_skip_layer, name="weight")
self.res_skip_layers.append(res_skip_layer)
def forward(self, x, x_mask, g=None, **kwargs):
output = torch.zeros_like(x)
n_channels_tensor = torch.IntTensor([self.hidden_channels])
if g is not None:
g = self.cond_layer(g)
for i in range(self.n_layers):
x_in = self.in_layers[i](x)
if g is not None:
cond_offset = i * 2 * self.hidden_channels
g_l = g[:, cond_offset : cond_offset + 2 * self.hidden_channels, :]
else:
g_l = torch.zeros_like(x_in)
acts = commons.fused_add_tanh_sigmoid_multiply(x_in, g_l, n_channels_tensor)
acts = self.drop(acts)
res_skip_acts = self.res_skip_layers[i](acts)
if i < self.n_layers - 1:
res_acts = res_skip_acts[:, : self.hidden_channels, :]
x = (x + res_acts) * x_mask
output = output + res_skip_acts[:, self.hidden_channels :, :]
else:
output = output + res_skip_acts
return output * x_mask
def remove_weight_norm(self):
if self.gin_channels != 0:
torch.nn.utils.remove_weight_norm(self.cond_layer)
for l in self.in_layers:
torch.nn.utils.remove_weight_norm(l)
for l in self.res_skip_layers:
torch.nn.utils.remove_weight_norm(l)
class ResBlock1(torch.nn.Module):
def __init__(self, channels, kernel_size=3, dilation=(1, 3, 5)):
super(ResBlock1, self).__init__()
self.convs1 = nn.ModuleList(
[
weight_norm(
Conv1d(
channels,
channels,
kernel_size,
1,
dilation=dilation[0],
padding=get_padding(kernel_size, dilation[0]),
)
),
weight_norm(
Conv1d(
channels,
channels,
kernel_size,
1,
dilation=dilation[1],
padding=get_padding(kernel_size, dilation[1]),
)
),
weight_norm(
Conv1d(
channels,
channels,
kernel_size,
1,
dilation=dilation[2],
padding=get_padding(kernel_size, dilation[2]),
)
),
]
)
self.convs1.apply(init_weights)
self.convs2 = nn.ModuleList(
[
weight_norm(
Conv1d(
channels,
channels,
kernel_size,
1,
dilation=1,
padding=get_padding(kernel_size, 1),
)
),
weight_norm(
Conv1d(
channels,
channels,
kernel_size,
1,
dilation=1,
padding=get_padding(kernel_size, 1),
)
),
weight_norm(
Conv1d(
channels,
channels,
kernel_size,
1,
dilation=1,
padding=get_padding(kernel_size, 1),
)
),
]
)
self.convs2.apply(init_weights)
def forward(self, x, x_mask=None):
for c1, c2 in zip(self.convs1, self.convs2):
xt = F.leaky_relu(x, LRELU_SLOPE)
if x_mask is not None:
xt = xt * x_mask
xt = c1(xt)
xt = F.leaky_relu(xt, LRELU_SLOPE)
if x_mask is not None:
xt = xt * x_mask
xt = c2(xt)
x = xt + x
if x_mask is not None:
x = x * x_mask
return x
def remove_weight_norm(self):
for l in self.convs1:
remove_weight_norm(l)
for l in self.convs2:
remove_weight_norm(l)
class ResBlock2(torch.nn.Module):
def __init__(self, channels, kernel_size=3, dilation=(1, 3)):
super(ResBlock2, self).__init__()
self.convs = nn.ModuleList(
[
weight_norm(
Conv1d(
channels,
channels,
kernel_size,
1,
dilation=dilation[0],
padding=get_padding(kernel_size, dilation[0]),
)
),
weight_norm(
Conv1d(
channels,
channels,
kernel_size,
1,
dilation=dilation[1],
padding=get_padding(kernel_size, dilation[1]),
)
),
]
)
self.convs.apply(init_weights)
def forward(self, x, x_mask=None):
for c in self.convs:
xt = F.leaky_relu(x, LRELU_SLOPE)
if x_mask is not None:
xt = xt * x_mask
xt = c(xt)
x = xt + x
if x_mask is not None:
x = x * x_mask
return x
def remove_weight_norm(self):
for l in self.convs:
remove_weight_norm(l)
class Log(nn.Module):
def forward(self, x, x_mask, reverse=False, **kwargs):
if not reverse:
y = torch.log(torch.clamp_min(x, 1e-5)) * x_mask
logdet = torch.sum(-y, [1, 2])
return y, logdet
else:
x = torch.exp(x) * x_mask
return x
class Flip(nn.Module):
def forward(self, x, *args, reverse=False, **kwargs):
x = torch.flip(x, [1])
if not reverse:
logdet = torch.zeros(x.size(0)).to(dtype=x.dtype, device=x.device)
return x, logdet
else:
return x
class ElementwiseAffine(nn.Module):
def __init__(self, channels):
super().__init__()
self.channels = channels
self.m = nn.Parameter(torch.zeros(channels, 1))
self.logs = nn.Parameter(torch.zeros(channels, 1))
def forward(self, x, x_mask, reverse=False, **kwargs):
if not reverse:
y = self.m + torch.exp(self.logs) * x
y = y * x_mask
logdet = torch.sum(self.logs * x_mask, [1, 2])
return y, logdet
else:
x = (x - self.m) * torch.exp(-self.logs) * x_mask
return x
class ResidualCouplingLayer(nn.Module):
def __init__(
self,
channels,
hidden_channels,
kernel_size,
dilation_rate,
n_layers,
p_dropout=0,
gin_channels=0,
mean_only=False,
):
assert channels % 2 == 0, "channels should be divisible by 2"
super().__init__()
self.channels = channels
self.hidden_channels = hidden_channels
self.kernel_size = kernel_size
self.dilation_rate = dilation_rate
self.n_layers = n_layers
self.half_channels = channels // 2
self.mean_only = mean_only
self.pre = nn.Conv1d(self.half_channels, hidden_channels, 1)
self.enc = WN(
hidden_channels,
kernel_size,
dilation_rate,
n_layers,
p_dropout=p_dropout,
gin_channels=gin_channels,
)
self.post = nn.Conv1d(hidden_channels, self.half_channels * (2 - mean_only), 1)
self.post.weight.data.zero_()
self.post.bias.data.zero_()
def forward(self, x, x_mask, g=None, reverse=False):
x0, x1 = torch.split(x, [self.half_channels] * 2, 1)
h = self.pre(x0) * x_mask
h = self.enc(h, x_mask, g=g)
stats = self.post(h) * x_mask
if not self.mean_only:
m, logs = torch.split(stats, [self.half_channels] * 2, 1)
else:
m = stats
logs = torch.zeros_like(m)
if not reverse:
x1 = m + x1 * torch.exp(logs) * x_mask
x = torch.cat([x0, x1], 1)
logdet = torch.sum(logs, [1, 2])
return x, logdet
else:
x1 = (x1 - m) * torch.exp(-logs) * x_mask
x = torch.cat([x0, x1], 1)
return x
def remove_weight_norm(self):
self.enc.remove_weight_norm()
class ConvFlow(nn.Module):
def __init__(
self,
in_channels,
filter_channels,
kernel_size,
n_layers,
num_bins=10,
tail_bound=5.0,
):
super().__init__()
self.in_channels = in_channels
self.filter_channels = filter_channels
self.kernel_size = kernel_size
self.n_layers = n_layers
self.num_bins = num_bins
self.tail_bound = tail_bound
self.half_channels = in_channels // 2
self.pre = nn.Conv1d(self.half_channels, filter_channels, 1)
self.convs = DDSConv(filter_channels, kernel_size, n_layers, p_dropout=0.0)
self.proj = nn.Conv1d(
filter_channels, self.half_channels * (num_bins * 3 - 1), 1
)
self.proj.weight.data.zero_()
self.proj.bias.data.zero_()
def forward(self, x, x_mask, g=None, reverse=False):
x0, x1 = torch.split(x, [self.half_channels] * 2, 1)
h = self.pre(x0)
h = self.convs(h, x_mask, g=g)
h = self.proj(h) * x_mask
b, c, t = x0.shape
h = h.reshape(b, c, -1, t).permute(0, 1, 3, 2) # [b, cx?, t] -> [b, c, t, ?]
unnormalized_widths = h[..., : self.num_bins] / math.sqrt(self.filter_channels)
unnormalized_heights = h[..., self.num_bins : 2 * self.num_bins] / math.sqrt(
self.filter_channels
)
unnormalized_derivatives = h[..., 2 * self.num_bins :]
x1, logabsdet = piecewise_rational_quadratic_transform(
x1,
unnormalized_widths,
unnormalized_heights,
unnormalized_derivatives,
inverse=reverse,
tails="linear",
tail_bound=self.tail_bound,
)
x = torch.cat([x0, x1], 1) * x_mask
logdet = torch.sum(logabsdet * x_mask, [1, 2])
if not reverse:
return x, logdet
else:
return x

View File

@@ -0,0 +1,209 @@
import torch
from torch.nn import functional as F
import numpy as np
DEFAULT_MIN_BIN_WIDTH = 1e-3
DEFAULT_MIN_BIN_HEIGHT = 1e-3
DEFAULT_MIN_DERIVATIVE = 1e-3
def piecewise_rational_quadratic_transform(
inputs,
unnormalized_widths,
unnormalized_heights,
unnormalized_derivatives,
inverse=False,
tails=None,
tail_bound=1.0,
min_bin_width=DEFAULT_MIN_BIN_WIDTH,
min_bin_height=DEFAULT_MIN_BIN_HEIGHT,
min_derivative=DEFAULT_MIN_DERIVATIVE,
):
if tails is None:
spline_fn = rational_quadratic_spline
spline_kwargs = {}
else:
spline_fn = unconstrained_rational_quadratic_spline
spline_kwargs = {"tails": tails, "tail_bound": tail_bound}
outputs, logabsdet = spline_fn(
inputs=inputs,
unnormalized_widths=unnormalized_widths,
unnormalized_heights=unnormalized_heights,
unnormalized_derivatives=unnormalized_derivatives,
inverse=inverse,
min_bin_width=min_bin_width,
min_bin_height=min_bin_height,
min_derivative=min_derivative,
**spline_kwargs
)
return outputs, logabsdet
def searchsorted(bin_locations, inputs, eps=1e-6):
bin_locations[..., -1] += eps
return torch.sum(inputs[..., None] >= bin_locations, dim=-1) - 1
def unconstrained_rational_quadratic_spline(
inputs,
unnormalized_widths,
unnormalized_heights,
unnormalized_derivatives,
inverse=False,
tails="linear",
tail_bound=1.0,
min_bin_width=DEFAULT_MIN_BIN_WIDTH,
min_bin_height=DEFAULT_MIN_BIN_HEIGHT,
min_derivative=DEFAULT_MIN_DERIVATIVE,
):
inside_interval_mask = (inputs >= -tail_bound) & (inputs <= tail_bound)
outside_interval_mask = ~inside_interval_mask
outputs = torch.zeros_like(inputs)
logabsdet = torch.zeros_like(inputs)
if tails == "linear":
unnormalized_derivatives = F.pad(unnormalized_derivatives, pad=(1, 1))
constant = np.log(np.exp(1 - min_derivative) - 1)
unnormalized_derivatives[..., 0] = constant
unnormalized_derivatives[..., -1] = constant
outputs[outside_interval_mask] = inputs[outside_interval_mask]
logabsdet[outside_interval_mask] = 0
else:
raise RuntimeError("{} tails are not implemented.".format(tails))
(
outputs[inside_interval_mask],
logabsdet[inside_interval_mask],
) = rational_quadratic_spline(
inputs=inputs[inside_interval_mask],
unnormalized_widths=unnormalized_widths[inside_interval_mask, :],
unnormalized_heights=unnormalized_heights[inside_interval_mask, :],
unnormalized_derivatives=unnormalized_derivatives[inside_interval_mask, :],
inverse=inverse,
left=-tail_bound,
right=tail_bound,
bottom=-tail_bound,
top=tail_bound,
min_bin_width=min_bin_width,
min_bin_height=min_bin_height,
min_derivative=min_derivative,
)
return outputs, logabsdet
def rational_quadratic_spline(
inputs,
unnormalized_widths,
unnormalized_heights,
unnormalized_derivatives,
inverse=False,
left=0.0,
right=1.0,
bottom=0.0,
top=1.0,
min_bin_width=DEFAULT_MIN_BIN_WIDTH,
min_bin_height=DEFAULT_MIN_BIN_HEIGHT,
min_derivative=DEFAULT_MIN_DERIVATIVE,
):
if torch.min(inputs) < left or torch.max(inputs) > right:
raise ValueError("Input to a transform is not within its domain")
num_bins = unnormalized_widths.shape[-1]
if min_bin_width * num_bins > 1.0:
raise ValueError("Minimal bin width too large for the number of bins")
if min_bin_height * num_bins > 1.0:
raise ValueError("Minimal bin height too large for the number of bins")
widths = F.softmax(unnormalized_widths, dim=-1)
widths = min_bin_width + (1 - min_bin_width * num_bins) * widths
cumwidths = torch.cumsum(widths, dim=-1)
cumwidths = F.pad(cumwidths, pad=(1, 0), mode="constant", value=0.0)
cumwidths = (right - left) * cumwidths + left
cumwidths[..., 0] = left
cumwidths[..., -1] = right
widths = cumwidths[..., 1:] - cumwidths[..., :-1]
derivatives = min_derivative + F.softplus(unnormalized_derivatives)
heights = F.softmax(unnormalized_heights, dim=-1)
heights = min_bin_height + (1 - min_bin_height * num_bins) * heights
cumheights = torch.cumsum(heights, dim=-1)
cumheights = F.pad(cumheights, pad=(1, 0), mode="constant", value=0.0)
cumheights = (top - bottom) * cumheights + bottom
cumheights[..., 0] = bottom
cumheights[..., -1] = top
heights = cumheights[..., 1:] - cumheights[..., :-1]
if inverse:
bin_idx = searchsorted(cumheights, inputs)[..., None]
else:
bin_idx = searchsorted(cumwidths, inputs)[..., None]
input_cumwidths = cumwidths.gather(-1, bin_idx)[..., 0]
input_bin_widths = widths.gather(-1, bin_idx)[..., 0]
input_cumheights = cumheights.gather(-1, bin_idx)[..., 0]
delta = heights / widths
input_delta = delta.gather(-1, bin_idx)[..., 0]
input_derivatives = derivatives.gather(-1, bin_idx)[..., 0]
input_derivatives_plus_one = derivatives[..., 1:].gather(-1, bin_idx)[..., 0]
input_heights = heights.gather(-1, bin_idx)[..., 0]
if inverse:
a = (inputs - input_cumheights) * (
input_derivatives + input_derivatives_plus_one - 2 * input_delta
) + input_heights * (input_delta - input_derivatives)
b = input_heights * input_derivatives - (inputs - input_cumheights) * (
input_derivatives + input_derivatives_plus_one - 2 * input_delta
)
c = -input_delta * (inputs - input_cumheights)
discriminant = b.pow(2) - 4 * a * c
assert (discriminant >= 0).all()
root = (2 * c) / (-b - torch.sqrt(discriminant))
outputs = root * input_bin_widths + input_cumwidths
theta_one_minus_theta = root * (1 - root)
denominator = input_delta + (
(input_derivatives + input_derivatives_plus_one - 2 * input_delta)
* theta_one_minus_theta
)
derivative_numerator = input_delta.pow(2) * (
input_derivatives_plus_one * root.pow(2)
+ 2 * input_delta * theta_one_minus_theta
+ input_derivatives * (1 - root).pow(2)
)
logabsdet = torch.log(derivative_numerator) - 2 * torch.log(denominator)
return outputs, -logabsdet
else:
theta = (inputs - input_cumwidths) / input_bin_widths
theta_one_minus_theta = theta * (1 - theta)
numerator = input_heights * (
input_delta * theta.pow(2) + input_derivatives * theta_one_minus_theta
)
denominator = input_delta + (
(input_derivatives + input_derivatives_plus_one - 2 * input_delta)
* theta_one_minus_theta
)
outputs = input_cumheights + numerator / denominator
derivative_numerator = input_delta.pow(2) * (
input_derivatives_plus_one * theta.pow(2)
+ 2 * input_delta * theta_one_minus_theta
+ input_derivatives * (1 - theta).pow(2)
)
logabsdet = torch.log(derivative_numerator) - 2 * torch.log(denominator)
return outputs, logabsdet

View File

@@ -0,0 +1,344 @@
import sys,torch,numpy as np,traceback,pdb
import torch.nn as nn
from time import time as ttime
import torch.nn.functional as F
class BiGRU(nn.Module):
def __init__(self, input_features, hidden_features, num_layers):
super(BiGRU, self).__init__()
self.gru = nn.GRU(input_features, hidden_features, num_layers=num_layers, batch_first=True, bidirectional=True)
def forward(self, x):
return self.gru(x)[0]
class ConvBlockRes(nn.Module):
def __init__(self, in_channels, out_channels, momentum=0.01):
super(ConvBlockRes, self).__init__()
self.conv = nn.Sequential(
nn.Conv2d(in_channels=in_channels,
out_channels=out_channels,
kernel_size=(3, 3),
stride=(1, 1),
padding=(1, 1),
bias=False),
nn.BatchNorm2d(out_channels, momentum=momentum),
nn.ReLU(),
nn.Conv2d(in_channels=out_channels,
out_channels=out_channels,
kernel_size=(3, 3),
stride=(1, 1),
padding=(1, 1),
bias=False),
nn.BatchNorm2d(out_channels, momentum=momentum),
nn.ReLU(),
)
if in_channels != out_channels:
self.shortcut = nn.Conv2d(in_channels, out_channels, (1, 1))
self.is_shortcut = True
else:
self.is_shortcut = False
def forward(self, x):
if self.is_shortcut:
return self.conv(x) + self.shortcut(x)
else:
return self.conv(x) + x
class Encoder(nn.Module):
def __init__(self, in_channels, in_size, n_encoders, kernel_size, n_blocks, out_channels=16, momentum=0.01):
super(Encoder, self).__init__()
self.n_encoders = n_encoders
self.bn = nn.BatchNorm2d(in_channels, momentum=momentum)
self.layers = nn.ModuleList()
self.latent_channels = []
for i in range(self.n_encoders):
self.layers.append(ResEncoderBlock(in_channels, out_channels, kernel_size, n_blocks, momentum=momentum))
self.latent_channels.append([out_channels, in_size])
in_channels = out_channels
out_channels *= 2
in_size //= 2
self.out_size = in_size
self.out_channel = out_channels
def forward(self, x):
concat_tensors = []
x = self.bn(x)
for i in range(self.n_encoders):
_, x = self.layers[i](x)
concat_tensors.append(_)
return x, concat_tensors
class ResEncoderBlock(nn.Module):
def __init__(self, in_channels, out_channels, kernel_size, n_blocks=1, momentum=0.01):
super(ResEncoderBlock, self).__init__()
self.n_blocks = n_blocks
self.conv = nn.ModuleList()
self.conv.append(ConvBlockRes(in_channels, out_channels, momentum))
for i in range(n_blocks - 1):
self.conv.append(ConvBlockRes(out_channels, out_channels, momentum))
self.kernel_size = kernel_size
if self.kernel_size is not None:
self.pool = nn.AvgPool2d(kernel_size=kernel_size)
def forward(self, x):
for i in range(self.n_blocks):
x = self.conv[i](x)
if self.kernel_size is not None:
return x, self.pool(x)
else:
return x
class Intermediate(nn.Module):#
def __init__(self, in_channels, out_channels, n_inters, n_blocks, momentum=0.01):
super(Intermediate, self).__init__()
self.n_inters = n_inters
self.layers = nn.ModuleList()
self.layers.append(ResEncoderBlock(in_channels, out_channels, None, n_blocks, momentum))
for i in range(self.n_inters-1):
self.layers.append(ResEncoderBlock(out_channels, out_channels, None, n_blocks, momentum))
def forward(self, x):
for i in range(self.n_inters):
x = self.layers[i](x)
return x
class ResDecoderBlock(nn.Module):
def __init__(self, in_channels, out_channels, stride, n_blocks=1, momentum=0.01):
super(ResDecoderBlock, self).__init__()
out_padding = (0, 1) if stride == (1, 2) else (1, 1)
self.n_blocks = n_blocks
self.conv1 = nn.Sequential(
nn.ConvTranspose2d(in_channels=in_channels,
out_channels=out_channels,
kernel_size=(3, 3),
stride=stride,
padding=(1, 1),
output_padding=out_padding,
bias=False),
nn.BatchNorm2d(out_channels, momentum=momentum),
nn.ReLU(),
)
self.conv2 = nn.ModuleList()
self.conv2.append(ConvBlockRes(out_channels * 2, out_channels, momentum))
for i in range(n_blocks-1):
self.conv2.append(ConvBlockRes(out_channels, out_channels, momentum))
def forward(self, x, concat_tensor):
x = self.conv1(x)
x = torch.cat((x, concat_tensor), dim=1)
for i in range(self.n_blocks):
x = self.conv2[i](x)
return x
class Decoder(nn.Module):
def __init__(self, in_channels, n_decoders, stride, n_blocks, momentum=0.01):
super(Decoder, self).__init__()
self.layers = nn.ModuleList()
self.n_decoders = n_decoders
for i in range(self.n_decoders):
out_channels = in_channels // 2
self.layers.append(ResDecoderBlock(in_channels, out_channels, stride, n_blocks, momentum))
in_channels = out_channels
def forward(self, x, concat_tensors):
for i in range(self.n_decoders):
x = self.layers[i](x, concat_tensors[-1-i])
return x
class DeepUnet(nn.Module):
def __init__(self, kernel_size, n_blocks, en_de_layers=5, inter_layers=4, in_channels=1, en_out_channels=16):
super(DeepUnet, self).__init__()
self.encoder = Encoder(in_channels, 128, en_de_layers, kernel_size, n_blocks, en_out_channels)
self.intermediate = Intermediate(self.encoder.out_channel // 2, self.encoder.out_channel, inter_layers, n_blocks)
self.decoder = Decoder(self.encoder.out_channel, en_de_layers, kernel_size, n_blocks)
def forward(self, x):
x, concat_tensors = self.encoder(x)
x = self.intermediate(x)
x = self.decoder(x, concat_tensors)
return x
class E2E(nn.Module):
def __init__(self, n_blocks, n_gru, kernel_size, en_de_layers=5, inter_layers=4, in_channels=1,
en_out_channels=16):
super(E2E, self).__init__()
self.unet = DeepUnet(kernel_size, n_blocks, en_de_layers, inter_layers, in_channels, en_out_channels)
self.cnn = nn.Conv2d(en_out_channels, 3, (3, 3), padding=(1, 1))
if n_gru:
self.fc = nn.Sequential(
BiGRU(3 * 128, 256, n_gru),
nn.Linear(512, 360),
nn.Dropout(0.25),
nn.Sigmoid()
)
else:
self.fc = nn.Sequential(
nn.Linear(3 * N_MELS, N_CLASS),
nn.Dropout(0.25),
nn.Sigmoid()
)
def forward(self, mel):
mel = mel.transpose(-1, -2).unsqueeze(1)
x = self.cnn(self.unet(mel)).transpose(1, 2).flatten(-2)
x = self.fc(x)
return x
from librosa.filters import mel
class MelSpectrogram(torch.nn.Module):
def __init__(
self,
is_half,
n_mel_channels,
sampling_rate,
win_length,
hop_length,
n_fft=None,
mel_fmin=0,
mel_fmax=None,
clamp=1e-5
):
super().__init__()
n_fft = win_length if n_fft is None else n_fft
self.hann_window = {}
mel_basis = mel(
sr=sampling_rate,
n_fft=n_fft,
n_mels=n_mel_channels,
fmin=mel_fmin,
fmax=mel_fmax,
htk=True)
mel_basis = torch.from_numpy(mel_basis).float()
self.register_buffer("mel_basis", mel_basis)
self.n_fft = win_length if n_fft is None else n_fft
self.hop_length = hop_length
self.win_length = win_length
self.sampling_rate = sampling_rate
self.n_mel_channels = n_mel_channels
self.clamp = clamp
self.is_half=is_half
def forward(self, audio, keyshift=0, speed=1, center=True):
factor = 2 ** (keyshift / 12)
n_fft_new = int(np.round(self.n_fft * factor))
win_length_new = int(np.round(self.win_length * factor))
hop_length_new = int(np.round(self.hop_length * speed))
keyshift_key = str(keyshift) + '_' + str(audio.device)
if keyshift_key not in self.hann_window:
self.hann_window[keyshift_key] = torch.hann_window(win_length_new).to(audio.device)
fft = torch.stft(
audio,
n_fft=n_fft_new,
hop_length=hop_length_new,
win_length=win_length_new,
window=self.hann_window[keyshift_key],
center=center,
return_complex=True)
magnitude = torch.sqrt(fft.real.pow(2) + fft.imag.pow(2))
if keyshift != 0:
size = self.n_fft // 2 + 1
resize = magnitude.size(1)
if resize < size:
magnitude = F.pad(magnitude, (0, 0, 0, size - resize))
magnitude = magnitude[:, :size, :]* self.win_length / win_length_new
mel_output = torch.matmul(self.mel_basis, magnitude)
if(self.is_half==True):mel_output=mel_output.half()
log_mel_spec = torch.log(torch.clamp(mel_output, min=self.clamp))
return log_mel_spec
class RMVPE:
def __init__(self, model_path,is_half, device=None):
self.resample_kernel = {}
model = E2E(4, 1, (2, 2))
ckpt = torch.load(model_path,map_location="cpu")
model.load_state_dict(ckpt)
model.eval()
if(is_half==True):model=model.half()
self.model = model
self.resample_kernel = {}
self.is_half=is_half
if device is None:
device = 'cuda' if torch.cuda.is_available() else 'cpu'
self.device=device
self.mel_extractor = MelSpectrogram(is_half,128, 16000, 1024, 160, None, 30, 8000).to(device)
self.model = self.model.to(device)
cents_mapping = (20 * np.arange(360) + 1997.3794084376191)
self.cents_mapping = np.pad(cents_mapping, (4, 4)) # 368
def mel2hidden(self, mel):
with torch.no_grad():
n_frames = mel.shape[-1]
mel = F.pad(mel, (0, 32 * ((n_frames - 1) // 32 + 1) - n_frames), mode='reflect')
hidden = self.model(mel)
return hidden[:, :n_frames]
def decode(self, hidden, thred=0.03):
cents_pred = self.to_local_average_cents(hidden, thred=thred)
f0 = 10 * (2 ** (cents_pred / 1200))
f0[f0==10]=0
# f0 = np.array([10 * (2 ** (cent_pred / 1200)) if cent_pred else 0 for cent_pred in cents_pred])
return f0
def infer_from_audio(self, audio, thred=0.03):
audio = torch.from_numpy(audio).float().to(self.device).unsqueeze(0)
# torch.cuda.synchronize()
# t0=ttime()
mel = self.mel_extractor(audio, center=True)
# torch.cuda.synchronize()
# t1=ttime()
hidden = self.mel2hidden(mel)
# torch.cuda.synchronize()
# t2=ttime()
hidden=hidden.squeeze(0).cpu().numpy()
if(self.is_half==True):hidden=hidden.astype("float32")
f0 = self.decode(hidden, thred=thred)
# torch.cuda.synchronize()
# t3=ttime()
# print("hmvpe:%s\t%s\t%s\t%s"%(t1-t0,t2-t1,t3-t2,t3-t0))
return f0
def to_local_average_cents(self,salience, thred=0.05):
# t0 = ttime()
center = np.argmax(salience, axis=1) # 帧长#index
salience = np.pad(salience, ((0, 0), (4, 4))) # 帧长,368
# t1 = ttime()
center += 4
todo_salience = []
todo_cents_mapping = []
starts = center - 4
ends = center + 5
for idx in range(salience.shape[0]):
todo_salience.append(salience[:, starts[idx]:ends[idx]][idx])
todo_cents_mapping.append(self.cents_mapping[starts[idx]:ends[idx]])
# t2 = ttime()
todo_salience = np.array(todo_salience) # 帧长9
todo_cents_mapping = np.array(todo_cents_mapping) # 帧长9
product_sum = np.sum(todo_salience * todo_cents_mapping, 1)
weight_sum = np.sum(todo_salience, 1) # 帧长
devided = product_sum / weight_sum # 帧长
# t3 = ttime()
maxx = np.max(salience, axis=1) # 帧长
devided[maxx <= thred] = 0
# t4 = ttime()
# print("decode:%s\t%s\t%s\t%s" % (t1 - t0, t2 - t1, t3 - t2, t4 - t3))
return devided
# if __name__ == '__main__':
# audio, sampling_rate = sf.read("卢本伟语录~1.wav")
# if len(audio.shape) > 1:
# audio = librosa.to_mono(audio.transpose(1, 0))
# audio_bak = audio.copy()
# if sampling_rate != 16000:
# audio = librosa.resample(audio, orig_sr=sampling_rate, target_sr=16000)
# model_path = "/bili-coeus/jupyter/jupyterhub-liujing04/vits_ch/test-RMVPE/weights/rmvpe_llc_half.pt"
# thred = 0.03 # 0.01
# device = 'cuda' if torch.cuda.is_available() else 'cpu'
# rmvpe = RMVPE(model_path,is_half=False, device=device)
# t0=ttime()
# f0 = rmvpe.infer_from_audio(audio, thred=thred)
# f0 = rmvpe.infer_from_audio(audio, thred=thred)
# f0 = rmvpe.infer_from_audio(audio, thred=thred)
# f0 = rmvpe.infer_from_audio(audio, thred=thred)
# f0 = rmvpe.infer_from_audio(audio, thred=thred)
# t1=ttime()
# print(f0.shape,t1-t0)

View File

@@ -0,0 +1,355 @@
# From https://github.com/RVC-Project/Retrieval-based-Voice-Conversion-WebUI
"""
Copyright: RVC-Project
License: MIT
"""
import gc
import os
import traceback
import ffmpeg
import numpy as np
import torch.cuda
import argparse
import torch
from multiprocessing import cpu_count
from fairseq import checkpoint_utils
from modules.voice_conversion.rvc.hubert.hubert_manager import HuBERTManager
from modules.voice_conversion.rvc.vc_infer_pipeline import VC
from modules.voice_conversion.rvc.infer_pack.models import (
SynthesizerTrnMs256NSFsid,
SynthesizerTrnMs256NSFsid_nono,
SynthesizerTrnMs768NSFsid,
SynthesizerTrnMs768NSFsid_nono,
)
hubert_model = None
weight_root = os.path.join('') # ST HACK
def config_file_change_fp32():
try:
for config_file in ["32k.json", "40k.json", "48k.json"]:
with open(f"configs/{config_file}", "r") as f:
strr = f.read().replace("true", "false")
with open(f"configs/{config_file}", "w") as f:
f.write(strr)
with open("trainset_preprocess_pipeline_print.py", "r") as f:
strr = f.read().replace("3.7", "3.0")
with open("trainset_preprocess_pipeline_print.py", "w") as f:
f.write(strr)
except Exception as e:
print(f'exception in config_file_change_fp32: {e}')
class Config:
def __init__(self):
self.device = "cuda:0"
self.is_half = True
self.n_cpu = 0
self.gpu_name = None
self.gpu_mem = None
self.x_pad, self.x_query, self.x_center, self.x_max = self.device_config()
def device_config(self) -> tuple:
if torch.cuda.is_available():
i_device = int(self.device.split(":")[-1])
self.gpu_name = torch.cuda.get_device_name(i_device)
if (
("16" in self.gpu_name and "V100" not in self.gpu_name.upper())
or "P40" in self.gpu_name.upper()
or "1060" in self.gpu_name
or "1070" in self.gpu_name
or "1080" in self.gpu_name
):
print("16系/10系显卡和P40强制单精度")
self.is_half = False
config_file_change_fp32()
else:
self.gpu_name = None
self.gpu_mem = int(
torch.cuda.get_device_properties(i_device).total_memory
/ 1024
/ 1024
/ 1024
+ 0.4
)
# if self.gpu_mem <= 4:
# with open("trainset_preprocess_pipeline_print.py", "r") as f:
# strr = f.read().replace("3.7", "3.0")
# with open("trainset_preprocess_pipeline_print.py", "w") as f:
# f.write(strr)
elif torch.backends.mps.is_available():
print("没有发现支持的N卡, 使用MPS进行推理")
self.device = "mps"
self.is_half = False
config_file_change_fp32()
else:
print("没有发现支持的N卡, 使用CPU进行推理")
self.device = "cpu"
self.is_half = False
config_file_change_fp32()
if self.n_cpu == 0:
self.n_cpu = cpu_count()
if self.is_half:
# 6G显存配置
x_pad = 3
x_query = 10
x_center = 60
x_max = 65
else:
# 5G显存配置
x_pad = 1
x_query = 6
x_center = 38
x_max = 41
if self.gpu_mem != None and self.gpu_mem <= 4:
x_pad = 1
x_query = 5
x_center = 30
x_max = 32
return x_pad, x_query, x_center, x_max
config = Config()
def load_hubert():
global hubert_model
if not hubert_model:
models, _, _ = checkpoint_utils.load_model_ensemble_and_task(
[HuBERTManager.make_sure_hubert_rvc_installed()],
suffix="",
)
hubert_model = models[0]
hubert_model = hubert_model.to(config.device)
if config.is_half:
hubert_model = hubert_model.half()
else:
hubert_model = hubert_model.float()
hubert_model.eval()
def load_audio(file, sr):
try:
# https://github.com/openai/whisper/blob/main/whisper/audio.py#L26
# This launches a subprocess to decode audio while down-mixing and resampling as necessary.
# Requires the ffmpeg CLI and `ffmpeg-python` package to be installed.
file = (
file.strip(" ").strip('"').strip("\n").strip('"').strip(" ")
) # 防止小白拷路径头尾带了空格和"和回车
out, _ = (
ffmpeg.input(file, threads=0)
.output("-", format="f32le", acodec="pcm_f32le", ac=1, ar=sr)
.run(cmd=["ffmpeg", "-nostdin"], capture_stdout=True, capture_stderr=True)
)
except Exception as e:
raise RuntimeError(f"Failed to load audio: {e}")
return np.frombuffer(out, np.float32).flatten()
vc = None
rvc_model_name = None
maximum = 0
def unload_rvc():
global vc, rvc_model_name
rvc_model_name = None
vc = None
gc.collect()
torch.cuda.empty_cache()
def load_rvc(model):
global vc, rvc_model_name, maximum
if model != rvc_model_name:
unload_rvc()
rvc_model_name = model # correct for ST
# Load rvc
maximum = get_vc(model)['maximum']
return maximum
def vc_single(
sid,
input_audio_path,
f0_up_key,
f0_file,
f0_method,
file_index,
file_index2,
# file_big_npy,
index_rate,
filter_radius,
resample_sr,
rms_mix_rate,
protect,
crepe_hop_length=128
): # spk_item, input_audio0, vc_transform0,f0_file,f0method0
global tgt_sr, net_g, vc, hubert_model, version
if input_audio_path is None:
return "You need to upload an audio", None
f0_up_key = int(f0_up_key)
try:
audio = load_audio(input_audio_path, 16000)
audio_max = np.abs(audio).max() / 0.95
if audio_max > 1:
audio /= audio_max
times = [0, 0, 0]
if hubert_model is None:
load_hubert()
if_f0 = cpt.get("f0", 1)
file_index = (
(
file_index.strip(" ")
.strip('"')
.strip("\n")
.strip('"')
.strip(" ")
.replace("trained", "added")
)
if file_index != ""
else file_index2
) # 防止小白写错,自动帮他替换掉
# file_big_npy = (
# file_big_npy.strip(" ").strip('"').strip("\n").strip('"').strip(" ")
# )
audio_opt = vc.pipeline(
hubert_model,
net_g,
sid,
audio,
input_audio_path,
times,
f0_up_key,
f0_method,
file_index,
# file_big_npy,
index_rate,
if_f0,
filter_radius,
tgt_sr,
resample_sr,
rms_mix_rate,
version,
protect,
f0_file=f0_file,
crepe_hop_length=crepe_hop_length
)
if resample_sr >= 16000 and tgt_sr != resample_sr:
tgt_sr = resample_sr
index_info = (
"Using index:%s." % file_index
if os.path.exists(file_index)
else "Index not used."
)
return "Success.\n %s\nTime:\n npy:%ss, f0:%ss, infer:%ss" % (
index_info,
times[0],
times[1],
times[2],
), (tgt_sr, audio_opt)
except:
info = traceback.format_exc()
print(info)
return info, (None, None)
# 一个选项卡全局只能有一个音色
def get_vc(sid):
global n_spk, tgt_sr, net_g, vc, cpt, version
if sid == "" or sid == []:
global hubert_model
if hubert_model is not None: # 考虑到轮询, 需要加个判断看是否 sid 是由有模型切换到无模型的
print("clean_empty_cache")
del net_g, n_spk, vc, hubert_model, tgt_sr # ,cpt
hubert_model = net_g = n_spk = vc = hubert_model = tgt_sr = None
if torch.cuda.is_available():
torch.cuda.empty_cache()
###楼下不这么折腾清理不干净
if_f0 = cpt.get("f0", 1)
version = cpt.get("version", "v1")
if version == "v1":
if if_f0 == 1:
net_g = SynthesizerTrnMs256NSFsid(
*cpt["config"], is_half=config.is_half
)
else:
net_g = SynthesizerTrnMs256NSFsid_nono(*cpt["config"])
elif version == "v2":
if if_f0 == 1:
net_g = SynthesizerTrnMs768NSFsid(
*cpt["config"], is_half=config.is_half
)
else:
net_g = SynthesizerTrnMs768NSFsid_nono(*cpt["config"])
del net_g, cpt
if torch.cuda.is_available():
torch.cuda.empty_cache()
cpt = None
return {"visible": False, "__type__": "update"}
#person = "%s/%s" % (weight_root, sid) # ST HACK
person = sid
print("loading %s" % person)
cpt = torch.load(person, map_location="cpu")
tgt_sr = cpt["config"][-1]
cpt["config"][-3] = cpt["weight"]["emb_g.weight"].shape[0] # n_spk
if_f0 = cpt.get("f0", 1)
version = cpt.get("version", "v1")
if version == "v1":
if if_f0 == 1:
net_g = SynthesizerTrnMs256NSFsid(*cpt["config"], is_half=config.is_half)
else:
net_g = SynthesizerTrnMs256NSFsid_nono(*cpt["config"])
elif version == "v2":
if if_f0 == 1:
net_g = SynthesizerTrnMs768NSFsid(*cpt["config"], is_half=config.is_half)
else:
net_g = SynthesizerTrnMs768NSFsid_nono(*cpt["config"])
del net_g.enc_q
print(net_g.load_state_dict(cpt["weight"], strict=False))
net_g.eval().to(config.device)
if config.is_half:
net_g = net_g.half()
else:
net_g = net_g.float()
vc = VC(tgt_sr, config)
n_spk = cpt["config"][-3]
return {"visible": True, "maximum": n_spk, "__type__": "update"}
def change_info(path, info, name):
try:
ckpt = torch.load(path, map_location="cpu")
ckpt["info"] = info
if name == "":
name = os.path.basename(path)
torch.save(ckpt, "weights/%s" % name)
return "Success."
except:
return traceback.format_exc()
def change_info_(ckpt_path):
if not os.path.exists(ckpt_path.replace(os.path.basename(ckpt_path), "train.log")):
return
try:
with open(
ckpt_path.replace(os.path.basename(ckpt_path), "train.log"), "r"
) as f:
info = eval(f.read().strip("\n").split("\n")[0].split("\t")[-1])
sr, f0 = info["sample_rate"], info["if_f0"]
version = "v2" if ("version" in info and info["version"] == "v2") else "v1"
return sr, str(f0), version
except:
traceback.print_exc()

View File

@@ -0,0 +1,402 @@
# https://github.com/RVC-Project/Retrieval-based-Voice-Conversion-WebUI/blob/main/vc_infer_pipeline.py
"""
copyright: RVC-Project
license: MIT
"""
import numpy as np, parselmouth, torch, pdb
from time import time as ttime
import torch.nn.functional as F
import scipy.signal as signal
import pyworld, os, traceback, faiss, librosa, torchcrepe
from scipy import signal
from functools import lru_cache
bh, ah = signal.butter(N=5, Wn=48, btype="high", fs=16000)
input_audio_path2wav = {}
@lru_cache
def cache_harvest_f0(input_audio_path, fs, f0max, f0min, frame_period):
audio = input_audio_path2wav[input_audio_path]
f0, t = pyworld.harvest(
audio,
fs=fs,
f0_ceil=f0max,
f0_floor=f0min,
frame_period=frame_period,
)
f0 = pyworld.stonemask(audio, f0, t, fs)
return f0
def change_rms(data1, sr1, data2, sr2, rate): # 1是输入音频2是输出音频,rate是2的占比
# print(data1.max(),data2.max())
rms1 = librosa.feature.rms(
y=data1, frame_length=sr1 // 2 * 2, hop_length=sr1 // 2
) # 每半秒一个点
rms2 = librosa.feature.rms(y=data2, frame_length=sr2 // 2 * 2, hop_length=sr2 // 2)
rms1 = torch.from_numpy(rms1)
rms1 = F.interpolate(
rms1.unsqueeze(0), size=data2.shape[0], mode="linear"
).squeeze()
rms2 = torch.from_numpy(rms2)
rms2 = F.interpolate(
rms2.unsqueeze(0), size=data2.shape[0], mode="linear"
).squeeze()
rms2 = torch.max(rms2, torch.zeros_like(rms2) + 1e-6)
data2 *= (
torch.pow(rms1, torch.tensor(1 - rate))
* torch.pow(rms2, torch.tensor(rate - 1))
).numpy()
return data2
class VC(object):
def __init__(self, tgt_sr, config):
self.x_pad, self.x_query, self.x_center, self.x_max, self.is_half = (
config.x_pad,
config.x_query,
config.x_center,
config.x_max,
config.is_half,
)
self.sr = 16000 # hubert输入采样率 (hubert model sample rate)
self.window = 160 # 每帧点数 (points per frame)
self.t_pad = self.sr * self.x_pad # 每条前后pad时间 (Pad time before and after each bar)
self.t_pad_tgt = tgt_sr * self.x_pad
self.t_pad2 = self.t_pad * 2
self.t_query = self.sr * self.x_query # 查询切点前后查询时间
self.t_center = self.sr * self.x_center # 查询切点位置
self.t_max = self.sr * self.x_max # 免查询时长阈值
self.device = config.device
def get_f0(
self,
input_audio_path,
x,
p_len,
f0_up_key,
f0_method,
filter_radius,
inp_f0=None,
crepe_hop_length=128
):
global input_audio_path2wav
time_step = self.window / self.sr * 1000
f0_min = 50
f0_max = 1100
f0_mel_min = 1127 * np.log(1 + f0_min / 700)
f0_mel_max = 1127 * np.log(1 + f0_max / 700)
import modules.voice_conversion.rvc.custom_pitch_extraction as cpe
f0 = cpe.pitch_extract(f0_method, x, f0_min, f0_max, p_len, time_step, self.sr, self.window, crepe_hop_length, filter_radius)
f0 *= pow(2, f0_up_key / 12)
# with open("test.txt","w")as f:f.write("\n".join([str(i)for i in f0.tolist()]))
tf0 = self.sr // self.window # 每秒f0点数
if inp_f0 is not None:
delta_t = np.round(
(inp_f0[:, 0].max() - inp_f0[:, 0].min()) * tf0 + 1
).astype("int16")
replace_f0 = np.interp(
list(range(delta_t)), inp_f0[:, 0] * 100, inp_f0[:, 1]
)
shape = f0[self.x_pad * tf0 : self.x_pad * tf0 + len(replace_f0)].shape[0]
f0[self.x_pad * tf0 : self.x_pad * tf0 + len(replace_f0)] = replace_f0[
:shape
]
# with open("test_opt.txt","w")as f:f.write("\n".join([str(i)for i in f0.tolist()]))
f0bak = f0.copy()
f0_mel = 1127 * np.log(1 + f0 / 700)
f0_mel[f0_mel > 0] = (f0_mel[f0_mel > 0] - f0_mel_min) * 254 / (
f0_mel_max - f0_mel_min
) + 1
f0_mel[f0_mel <= 1] = 1
f0_mel[f0_mel > 255] = 255
f0_coarse = np.rint(f0_mel).astype(np.int)
return f0_coarse, f0bak # 1-0
def vc(
self,
model,
net_g,
sid,
audio0,
pitch,
pitchf,
times,
index,
big_npy,
index_rate,
version,
protect,
): # ,file_index,file_big_npy
feats = torch.from_numpy(audio0)
if self.is_half:
feats = feats.half()
else:
feats = feats.float()
if feats.dim() == 2: # double channels
feats = feats.mean(-1)
assert feats.dim() == 1, feats.dim()
feats = feats.view(1, -1)
padding_mask = torch.BoolTensor(feats.shape).to(self.device).fill_(False)
inputs = {
"source": feats.to(self.device),
"padding_mask": padding_mask,
"output_layer": 9 if version == "v1" else 12,
}
t0 = ttime()
with torch.no_grad():
logits = model.extract_features(**inputs)
feats = model.final_proj(logits[0]) if version == "v1" else logits[0]
if protect < 0.5:
feats0 = feats.clone()
if (
isinstance(index, type(None)) == False
and isinstance(big_npy, type(None)) == False
and index_rate != 0
):
npy = feats[0].cpu().numpy()
if self.is_half:
npy = npy.astype("float32")
# _, I = index.search(npy, 1)
# npy = big_npy[I.squeeze()]
score, ix = index.search(npy, k=8)
weight = np.square(1 / score)
weight /= weight.sum(axis=1, keepdims=True)
npy = np.sum(big_npy[ix] * np.expand_dims(weight, axis=2), axis=1)
if self.is_half:
npy = npy.astype("float16")
feats = (
torch.from_numpy(npy).unsqueeze(0).to(self.device) * index_rate
+ (1 - index_rate) * feats
)
feats = F.interpolate(feats.permute(0, 2, 1), scale_factor=2).permute(0, 2, 1)
if protect < 0.5:
feats0 = F.interpolate(feats0.permute(0, 2, 1), scale_factor=2).permute(
0, 2, 1
)
t1 = ttime()
p_len = audio0.shape[0] // self.window
if feats.shape[1] < p_len:
p_len = feats.shape[1]
if pitch != None and pitchf != None:
pitch = pitch[:, :p_len]
pitchf = pitchf[:, :p_len]
if protect < 0.5:
pitchff = pitchf.clone()
pitchff[pitchf > 0] = 1
pitchff[pitchf < 1] = protect
pitchff = pitchff.unsqueeze(-1)
feats = feats * pitchff + feats0 * (1 - pitchff)
feats = feats.to(feats0.dtype)
p_len = torch.tensor([p_len], device=self.device).long()
with torch.no_grad():
if pitch != None and pitchf != None:
audio1 = (
(net_g.infer(feats, p_len, pitch, pitchf, sid)[0][0, 0])
.data.cpu()
.float()
.numpy()
)
else:
audio1 = (
(net_g.infer(feats, p_len, sid)[0][0, 0]).data.cpu().float().numpy()
)
del feats, p_len, padding_mask
if torch.cuda.is_available():
torch.cuda.empty_cache()
t2 = ttime()
times[0] += t1 - t0
times[2] += t2 - t1
return audio1
def pipeline(
self,
model,
net_g,
sid,
audio,
input_audio_path,
times,
f0_up_key,
f0_method,
file_index,
# file_big_npy,
index_rate,
if_f0,
filter_radius,
tgt_sr,
resample_sr,
rms_mix_rate,
version,
protect,
f0_file=None,
crepe_hop_length=128
):
if (
file_index != ""
# and file_big_npy != ""
# and os.path.exists(file_big_npy) == True
and os.path.exists(file_index)
and index_rate != 0
):
try:
index = faiss.read_index(file_index)
# big_npy = np.load(file_big_npy)
big_npy = index.reconstruct_n(0, index.ntotal)
except:
traceback.print_exc()
index = big_npy = None
else:
index = big_npy = None
audio = signal.filtfilt(bh, ah, audio)
audio_pad = np.pad(audio, (self.window // 2, self.window // 2), mode="reflect")
opt_ts = []
if audio_pad.shape[0] > self.t_max:
audio_sum = np.zeros_like(audio)
for i in range(self.window):
audio_sum += audio_pad[i : i - self.window]
for t in range(self.t_center, audio.shape[0], self.t_center):
opt_ts.append(
t
- self.t_query
+ np.where(
np.abs(audio_sum[t - self.t_query : t + self.t_query])
== np.abs(audio_sum[t - self.t_query : t + self.t_query]).min()
)[0][0]
)
s = 0
audio_opt = []
t = None
t1 = ttime()
audio_pad = np.pad(audio, (self.t_pad, self.t_pad), mode="reflect")
p_len = audio_pad.shape[0] // self.window
inp_f0 = None
if hasattr(f0_file, "name") == True:
try:
with open(f0_file.name, "r") as f:
lines = f.read().strip("\n").split("\n")
inp_f0 = []
for line in lines:
inp_f0.append([float(i) for i in line.split(",")])
inp_f0 = np.array(inp_f0, dtype="float32")
except:
traceback.print_exc()
sid = torch.tensor(sid, device=self.device).unsqueeze(0).long()
pitch, pitchf = None, None
if if_f0 == 1:
pitch, pitchf = self.get_f0(
input_audio_path,
audio_pad,
p_len,
f0_up_key,
f0_method,
filter_radius,
inp_f0,
crepe_hop_length=crepe_hop_length
)
pitch = pitch[:p_len]
pitchf = pitchf[:p_len]
if self.device == "mps":
pitchf = pitchf.astype(np.float32)
pitch = torch.tensor(pitch, device=self.device).unsqueeze(0).long()
pitchf = torch.tensor(pitchf, device=self.device).unsqueeze(0).float()
t2 = ttime()
times[1] += t2 - t1
for t in opt_ts:
t = t // self.window * self.window
if if_f0 == 1:
audio_opt.append(
self.vc(
model,
net_g,
sid,
audio_pad[s : t + self.t_pad2 + self.window],
pitch[:, s // self.window : (t + self.t_pad2) // self.window],
pitchf[:, s // self.window : (t + self.t_pad2) // self.window],
times,
index,
big_npy,
index_rate,
version,
protect,
)[self.t_pad_tgt : -self.t_pad_tgt]
)
else:
audio_opt.append(
self.vc(
model,
net_g,
sid,
audio_pad[s : t + self.t_pad2 + self.window],
None,
None,
times,
index,
big_npy,
index_rate,
version,
protect,
)[self.t_pad_tgt : -self.t_pad_tgt]
)
s = t
if if_f0 == 1:
audio_opt.append(
self.vc(
model,
net_g,
sid,
audio_pad[t:],
pitch[:, t // self.window :] if t is not None else pitch,
pitchf[:, t // self.window :] if t is not None else pitchf,
times,
index,
big_npy,
index_rate,
version,
protect,
)[self.t_pad_tgt : -self.t_pad_tgt]
)
else:
audio_opt.append(
self.vc(
model,
net_g,
sid,
audio_pad[t:],
None,
None,
times,
index,
big_npy,
index_rate,
version,
protect,
)[self.t_pad_tgt : -self.t_pad_tgt]
)
audio_opt = np.concatenate(audio_opt)
if rms_mix_rate != 1:
audio_opt = change_rms(audio, 16000, audio_opt, tgt_sr, rms_mix_rate)
if resample_sr >= 16000 and tgt_sr != resample_sr:
audio_opt = librosa.resample(
audio_opt, orig_sr=tgt_sr, target_sr=resample_sr
)
audio_max = np.abs(audio_opt).max() / 0.99
max_int16 = 32768
if audio_max > 1:
max_int16 /= audio_max
audio_opt = (audio_opt * max_int16).astype(np.int16)
del pitch, pitchf, sid
if torch.cuda.is_available():
torch.cuda.empty_cache()
return audio_opt

View File

@@ -369,6 +369,13 @@ if "streaming-stt" in modules:
streaming_module.whisper_model, streaming_module.vosk_model = streaming_module.load_model(file_path=whisper_model_path)
app.add_url_rule("/api/speech-recognition/streaming/record-and-transcript", view_func=streaming_module.record_and_transcript, methods=["POST"])
if "rvc" in modules:
print("Initializing RVC voice conversion (from ST request file)")
import modules.voice_conversion.rvc_module as rvc_module
#app.add_url_rule("/api/voice-conversion/rvc/load-model", view_func=rvc_module.rvc_load_model, methods=["POST"])
app.add_url_rule("/api/voice-conversion/rvc/process-audio", view_func=rvc_module.rvc_process_audio, methods=["POST"])
def require_module(name):
def wrapper(fn):
@wraps(fn)