From de8360c037bb135cd8674c1f3a76f153b0400233 Mon Sep 17 00:00:00 2001 From: Rodion Chernomordin Date: Fri, 10 May 2024 00:02:53 +0300 Subject: [PATCH] add first vr of VAE on PyTorch --- pyod/models/vae.py | 445 ++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 443 insertions(+), 2 deletions(-) diff --git a/pyod/models/vae.py b/pyod/models/vae.py index 4938cbc8c..49213b215 100644 --- a/pyod/models/vae.py +++ b/pyod/models/vae.py @@ -7,7 +7,7 @@ :cite:`kingma2013auto` Kingma, Diederik, Welling 'Auto-Encodeing Variational Bayes' https://arxiv.org/abs/1312.6114 - + :cite:`burgess2018understanding` Burges et al 'Understanding disentangling in beta-VAE' https://arxiv.org/pdf/1804.03599.pdf @@ -24,12 +24,18 @@ from sklearn.preprocessing import StandardScaler from sklearn.utils import check_array from sklearn.utils.validation import check_is_fitted +from sklearn.model_selection import train_test_split from .base import BaseDetector from .base_dl import _get_tensorflow_version from ..utils.stat_models import pairwise_distances_no_broadcast from ..utils.utility import check_parameter +import torch +import torch.nn as nn +from torch.utils.data import DataLoader, TensorDataset +from torchsummary import summary + # if tensorflow 2, import from tf directly if _get_tensorflow_version() == 1: from keras.models import Model @@ -221,7 +227,7 @@ def sampling(self, args): ---------- args : tensor Mean and log of variance of Q(z|X). - + Returns ------- z : tensor @@ -384,3 +390,438 @@ def decision_function(self, X): # Predict on X and return the reconstruction errors pred_scores = self.model_.predict(X_norm) return pairwise_distances_no_broadcast(X_norm, pred_scores) + + +def return_activation_func(activation_name): + if activation_name == 'relu': + return nn.ReLU() + elif activation_name == 'sigmoid': + return nn.Sigmoid() + else: + # TODO add more activ funcs + raise ValueError( + f'Don\'t have any information about {activation_name} func in torch') + + +class TorchVAE(VAE): + """ Variational auto encoder + Encoder maps X onto a latent space Z + Decoder samples Z from N(0,1) + VAE_loss = Reconstruction_loss + KL_loss + + Reference + See :cite:`kingma2013auto` Kingma, Diederik, Welling + 'Auto-Encodeing Variational Bayes' + https://arxiv.org/abs/1312.6114 for details. + + beta VAE + In Loss, the emphasis is on KL_loss + and capacity of a bottleneck: + VAE_loss = Reconstruction_loss + gamma*KL_loss + + Reference + See :cite:`burgess2018understanding` Burges et al + 'Understanding disentangling in beta-VAE' + https://arxiv.org/pdf/1804.03599.pdf for details. + + + Parameters + ---------- + encoder_neurons : list, optional (default=[128, 64, 32]) + The number of neurons per hidden layer in encoder. + + decoder_neurons : list, optional (default=[32, 64, 128]) + The number of neurons per hidden layer in decoder. + + hidden_activation : str, optional (default='relu') + Activation function to use for hidden layers. + All hidden layers are forced to use the same type of activation. + See https://keras.io/activations/ + + output_activation : str, optional (default='sigmoid') + Activation function to use for output layer. + See https://keras.io/activations/ + + loss : str or obj, optional (default=keras.losses.mean_squared_error + String (name of objective function) or objective function. + See https://keras.io/losses/ + + gamma : float, optional (default=1.0) + Coefficient of beta VAE regime. + Default is regular VAE. + + capacity : float, optional (default=0.0) + Maximum capacity of a loss bottle neck. + + optimizer : str, optional (default='adam') + String (name of optimizer) or optimizer instance. + See https://keras.io/optimizers/ + + epochs : int, optional (default=100) + Number of epochs to train the model. + + batch_size : int, optional (default=32) + Number of samples per gradient update. + + dropout_rate : float in (0., 1), optional (default=0.2) + The dropout to be used across all layers. + + l2_regularizer : float in (0., 1), optional (default=0.1) + The regularization strength of activity_regularizer + applied on each layer. By default, l2 regularizer is used. See + https://keras.io/regularizers/ + + validation_size : float in (0., 1), optional (default=0.1) + The percentage of data to be used for validation. + + preprocessing : bool, optional (default=True) + If True, apply standardization on the data. + + verbose : int, optional (default=1) + verbose mode. + + - 0 = silent + - 1 = progress bar + - 2 = one line per epoch. + + For verbose >= 1, model summary may be printed. + + random_state : random_state: int, RandomState instance or None, opti + (default=None) + If int, random_state is the seed used by the random + number generator; If RandomState instance, random_state is the r + number generator; If None, the random number generator is the + RandomState instance used by `np.random`. + + contamination : float in (0., 0.5), optional (default=0.1) + The amount of contamination of the data set, i.e. + the proportion of outliers in the data set. When fitting this is + to define the threshold on the decision function. + + Attributes + ---------- + encoding_dim_ : int + The number of neurons in the encoding layer. + + compression_rate_ : float + The ratio between the original feature and + the number of neurons in the encoding layer. + + model_ : Keras Object + The underlying AutoEncoder in Keras. + + history_: Keras Object + The AutoEncoder training history. + + decision_scores_ : numpy array of shape (n_samples,) + The outlier scores of the training data. + The higher, the more abnormal. Outliers tend to have higher + scores. This value is available once the detector is + fitted. + + threshold_ : float + The threshold is based on ``contamination``. It is the + ``n_samples * contamination`` most abnormal samples in + ``decision_scores_``. The threshold is calculated for generating + binary outlier labels. + + labels_ : int, either 0 or 1 + The binary labels of the training data. 0 stands for inliers + and 1 for outliers/anomalies. It is generated by applying + ``threshold_`` on ``decision_scores_``. + """ + + class Encoder(nn.Module): + def __init__(self, n_features_, hidden_activation, encoder_neurons, + dropout_rate, latent_dim): + super().__init__() + # Build Encoder + self.log = None + self.mean = None + self.encoder = nn.Sequential() + + # Input layer + input_layer = nn.Sequential( + nn.Linear(n_features_, n_features_), + return_activation_func(hidden_activation) + ) + self.encoder.append(input_layer) + + # Hidden layers + prev_neur = n_features_ + for neurons in encoder_neurons: + layer = nn.Sequential( + nn.Linear(prev_neur, neurons), + return_activation_func(hidden_activation), + nn.Dropout(dropout_rate), + ) + prev_neur = neurons + self.encoder.append(layer) + + # Create mu and sigma of latent variables + self.z_mean = nn.Linear(prev_neur, latent_dim) + self.z_log = nn.Linear(prev_neur, latent_dim) + + def sampling(self, args): + """Reparametrisation by sampling from Gaussian, N(0,I) + To sample from epsilon = Norm(0,I) instead of from likelihood Q(z|X) + with latent variables z: z = z_mean + sqrt(var) * epsilon + + Parameters + ---------- + args : tensor + Mean and log of variance of Q(z|X). + + Returns + ------- + z : tensor + Sampled latent variable. + """ + + z_mean, z_log = args + batch = z_mean.shape[0] # batch size + dim = z_mean.shape[1] # latent dimension + epsilon = torch.randn(batch, dim) # mean=0, std=1.0 + + return z_mean + torch.exp(0.5 * z_log) * epsilon + + def get_z_mean(self, X): + return self.z_mean(self.encoder(X)) + + def get_z_log(self, X): + return self.z_log(self.encoder(X)) + + def run(self, x): + out = self.encoder(x) + self.mean = self.z_mean(out) + self.log = self.z_log(out) + # Use parametrisation sampling + return self.sampling([self.mean, self.log]) + + def __call__(self, X): + return self.run(X) + + class Decoder(nn.Module): + def __init__(self, latent_dim, decoder_neurons, hidden_activation, + n_features, output_activation, dropout_rate): + super().__init__() + # Build Decoder + latent_inputs = torch.nn.Linear(latent_dim, latent_dim) + # Latent input layer + self.decoder = nn.Sequential( + latent_inputs, + nn.Linear(latent_dim, latent_dim), + return_activation_func(hidden_activation) + ) + # Hidden layers + prev_neurons = latent_dim + for neurons in decoder_neurons: + layer = torch.nn.Sequential( + torch.nn.Linear(prev_neurons, neurons), + return_activation_func(hidden_activation), + nn.Dropout(dropout_rate) + ) + prev_neurons = neurons + self.decoder.append(layer) + # Output layer + self.decoder.append(nn.Linear(prev_neurons, n_features)) + self.decoder.append(return_activation_func(output_activation)) + + def run(self, X): + return self.decoder(X) + + def __call__(self, X): + return self.run(X) + + def __init__(self, encoder_neurons=None, decoder_neurons=None, + latent_dim=2, hidden_activation='relu', + output_activation='sigmoid', loss=mse, optimizer='adam', + epochs=100, batch_size=32, dropout_rate=0.2, + l2_regularizer=0.1, validation_size=0.1, preprocessing=True, + verbose=1, random_state=None, contamination=0.1, + gamma=1.0, capacity=0.0): + super(TorchVAE, self).__init__(encoder_neurons, decoder_neurons, + latent_dim, hidden_activation, + output_activation, loss, optimizer, + epochs, batch_size, dropout_rate, + l2_regularizer, validation_size, + preprocessing, + verbose, random_state, contamination, + gamma, capacity) + + def vae_loss(self, inputs, outputs, z_mean, z_log): + """ Loss = Recreation loss + Kullback-Leibler loss + for probability function divergence (ELBO). + gamma > 1 and capacity != 0 for beta-VAE + """ + + reconstruction_loss = self.loss(inputs, outputs) + reconstruction_loss *= self.n_features_ + kl_loss = 1 + z_log - torch.square(z_mean) - torch.exp(z_log) + kl_loss = -0.5 * torch.sum(kl_loss, dim=-1) + kl_loss = self.gamma * torch.abs(kl_loss - self.capacity) + + return torch.mean(reconstruction_loss + kl_loss) + + def _build_model(self): + """Build VAE = encoder + decoder + vae_loss""" + # Build Encoder + self.encoder = self.Encoder(self.n_features_, self.hidden_activation, + self.encoder_neurons, + self.dropout_rate, self.latent_dim) + + # Build Decoder + self.decoder = self.Decoder(self.latent_dim, self.decoder_neurons, + self.hidden_activation, self.n_features_, + self.output_activation, self.dropout_rate) + + # Instantiate VAE + vae = nn.Sequential(self.encoder, self.decoder) + if self.verbose >= 1: + summary(self.encoder) + summary(self.decoder) + summary(vae) + return vae + + def _fit_model(self, X, epochs, batch_size, shuffle, validation_split, + verbose): + + history = {'loss': [], 'epochs': epochs, 'batch_size': batch_size, + 'val_loss': []} + + X_train, X_val = train_test_split(X, random_state=self.random_state, + test_size=validation_split, + shuffle=shuffle) + x_train_dataset = TensorDataset(X_train) + x_val_dataset = TensorDataset(X_val) + train_loader = DataLoader(x_train_dataset, batch_size=batch_size, + shuffle=shuffle) + val_loader = DataLoader(x_val_dataset, batch_size=batch_size, + shuffle=shuffle) + + for epoch in range(epochs): + self.model_.train() + curr_loss = [] + for batch in train_loader: + self.optimizer.zero_grad() + out = self.model_(batch) + loss = self.vae_loss(batch, out, + self.encoder.get_z_mean(batch), + self.encoder.get_z_log(batch)) + loss.backward() + self.optimizer.step() + + curr_loss.append(loss.item()) + + history['loss'].append(np.mean(curr_loss)) + + curr_loss = [] + self.model_.eval() + for batch in val_loader: + out = self.model_(batch) + loss = self.vae_loss(batch, out, + self.encoder.get_z_mean(batch), + self.encoder.get_z_log(batch)) + curr_loss.append(loss.item()) + history['val_loss'].append(np.mean(curr_loss)) + + if verbose: + print( + f'Epoch {epoch + 1}: loss = {history['loss'][-1]}, val_loss = {history['val_loss'][-1]}') + + return history + + def fit(self, X, y=None): + """Fit detector. y is optional for unsupervised methods. + + Parameters + ---------- + X : numpy array of shape (n_samples, n_features) + The input samples. + + y : numpy array of shape (n_samples,), optional (default=None) + The ground truth of the input samples (labels). + """ + # validate inputs X and y (optional) + X = check_array(X) + self._set_n_classes(y) + + # Verify and construct the hidden units + self.n_samples_, self.n_features_ = X.shape[0], X.shape[1] + + # Standardize data for better performance + if self.preprocessing: + self.scaler_ = StandardScaler() + X_norm = self.scaler_.fit_transform(X) + else: + X_norm = np.copy(X) + + # Shuffle the data for validation as Keras do not shuffling for + # Validation Split + np.random.shuffle(X_norm) + + # Validate and complete the number of hidden neurons + if np.min(self.encoder_neurons) > self.n_features_: + raise ValueError("The number of neurons should not exceed " + "the number of features") + + # Build VAE model & fit with X + self.model_ = self._build_model() + + # optimizer + if self.optimizer == 'adam': + self.optimizer = torch.optim.Adam(self.model_.parameters()) + else: + raise ValueError( + f'Don\'t have any information about {self.optimizer}') + + self.history_ = self._fit_model(X_norm, + epochs=self.epochs, + batch_size=self.batch_size, + shuffle=True, + validation_split=self.validation_size, + verbose=self.verbose) + + # Predict on X itself and calculate the reconstruction error as + # the outlier scores. Noted X_norm was shuffled has to recreate + if self.preprocessing: + X_norm = self.scaler_.transform(X) + else: + X_norm = np.copy(X) + + # pred + pred_scores = self.model_.predict(X_norm) + self.decision_scores_ = pairwise_distances_no_broadcast(X_norm, + pred_scores) + self._process_decision_scores() + return self + + def decision_function(self, X): + """Predict raw anomaly score of X using the fitted detector. + + The anomaly score of an input sample is computed based on different + detector algorithms. For consistency, outliers are assigned with + larger anomaly scores. + + Parameters + ---------- + X : numpy array of shape (n_samples, n_features) + The training input samples. Sparse matrices are accepted only + if they are supported by the base estimator. + + Returns + ------- + anomaly_scores : numpy array of shape (n_samples,) + The anomaly score of the input samples. + """ + check_is_fitted(self, ['model_', 'history_']) + X = check_array(X) + + if self.preprocessing: + X_norm = self.scaler_.transform(X) + else: + X_norm = np.copy(X) + + # Predict on X and return the reconstruction errors + pred_scores = self.model_.predict(X_norm) + return pairwise_distances_no_broadcast(X_norm, pred_scores)