diff --git a/pyod/models/deep_svdd.py b/pyod/models/deep_svdd.py index 1df26955..640e0a06 100644 --- a/pyod/models/deep_svdd.py +++ b/pyod/models/deep_svdd.py @@ -73,35 +73,50 @@ class InnerDeepSVDD(nn.Module): def __init__(self, n_features, use_ae, hidden_neurons, hidden_activation, output_activation, - dropout_rate, l2_regularizer, input_shape=None): + dropout_rate, l2_regularizer, feature_type, input_shape=None): super(InnerDeepSVDD, self).__init__() - self.n_features = n_features self.use_ae = use_ae self.hidden_neurons = hidden_neurons or [64, 32] self.hidden_activation = hidden_activation self.output_activation = output_activation self.dropout_rate = dropout_rate self.l2_regularizer = l2_regularizer + self.feature_type = feature_type self.input_shape = input_shape - self.model = self._build_model() + if self.feature_type == "obs": + self.embedder_features = n_features + self.linear_features = n_features + self.embedder = self._build_embedder() + elif self.feature_type in ["hidden", "dist"]: + self.linear_features = self.input_shape[1] + elif self.feature_type == "hidden_obs": + self.embedder_features = n_features + self.linear_features = n_features + self.input_shape[-1] + self.embedder = self._build_embedder() + self.fc_part = self._build_fc() self.c = None # Center of the hypersphere for DeepSVDD def _init_c(self, X_norm, eps=0.1): intermediate_output = {} - hook_handle = self.model._modules.get('net_output').register_forward_hook( + hook_handle = self.fc_part._modules.get('net_output').register_forward_hook( lambda module, input, output: intermediate_output.update({'net_output': output}) ) - output = self.model(X_norm) + if self.feature_type in ["obs", "hidden", "dist"]: + output = self.forward(X_norm) + elif self.feature_type == "hidden_obs": + output = self.forward([X_norm[0], X_norm[1]]) out = intermediate_output['net_output'] hook_handle.remove() self.c = torch.mean(out, dim=0) self.c[(torch.abs(self.c) < eps) & (self.c < 0)] = -eps self.c[(torch.abs(self.c) < eps) & (self.c > 0)] = eps - def _build_model(self): + def _build_embedder(self): + if len(self.input_shape) == 3: + channels = self.input_shape[0] + else: + channels = self.input_shape[1] layers = nn.Sequential() - - channels = self.input_shape[0] layers.add_module('cnn_layer1', nn.Conv2d(channels, 16, kernel_size=3, stride=1, padding=1)) layers.add_module('cnn_activation1', nn.ReLU()) layers.add_module('cnn_pool', nn.MaxPool2d(kernel_size=2, stride=2)) @@ -109,10 +124,13 @@ def _build_model(self): layers.add_module('cnn_activation2', nn.ReLU()) layers.add_module('cnn_adaptive_pool', nn.AdaptiveMaxPool2d((32, 32))) layers.add_module('flatten', nn.Flatten()) - layers.add_module('cnn_fc', nn.Linear(32 * 32 * 32, self.n_features, bias=False)) + layers.add_module('cnn_fc', nn.Linear(32 * 32 * 32, self.embedder_features, bias=False)) layers.add_module('cnn_fc_activation', nn.ReLU()) + return layers - layers.add_module('input_layer', nn.Linear(self.n_features, self.hidden_neurons[0], bias=False)) + def _build_fc(self): + layers = nn.Sequential() + layers.add_module('input_layer', nn.Linear(self.linear_features, self.hidden_neurons[0], bias=False)) layers.add_module('hidden_activation_e0', get_activation_by_name(self.hidden_activation)) for i in range(1, len(self.hidden_neurons) - 1): layers.add_module(f'hidden_layer_e{i}', nn.Linear(self.hidden_neurons[i - 1], self.hidden_neurons[i], bias=False)) @@ -133,8 +151,13 @@ def _build_model(self): return layers def forward(self, x): - return self.model(x) - + if self.feature_type == "obs": + x = self.embedder(x) + elif self.feature_type == "hidden_obs": + features = self.embedder(x[0]) + x = torch.cat([features, x[1]], dim=-1) + x = self.fc_part(x) + return x class DeepSVDD(BaseDetector): """Deep One-Class Classifier with AutoEncoder (AE) is a type of neural @@ -233,7 +256,7 @@ def __init__(self, n_features, c=None, use_ae=False, hidden_neurons=None, hidden_activation='relu', output_activation='sigmoid', optimizer='adam', epochs=100, batch_size=32, - dropout_rate=0.2, l2_regularizer=0.1, validation_size=0.1, + dropout_rate=0.2, l2_regularizer=0.1, feature_type="obs", validation_size=0.1, preprocessing=True, verbose=1, random_state=None, contamination=0.1, input_shape=None): super(DeepSVDD, self).__init__(contamination=contamination) @@ -249,6 +272,7 @@ def __init__(self, n_features, c=None, use_ae=False, hidden_neurons=None, self.batch_size = batch_size self.dropout_rate = dropout_rate self.l2_regularizer = l2_regularizer + self.feature_type = feature_type self.validation_size = validation_size self.preprocessing = preprocessing self.verbose = verbose @@ -259,8 +283,20 @@ def __init__(self, n_features, c=None, use_ae=False, hidden_neurons=None, if self.random_state is not None: torch.manual_seed(self.random_state) - check_parameter(dropout_rate, 0, 1, param_name='dropout_rate', - include_left=True) + check_parameter(dropout_rate, 0, 1, param_name='dropout_rate', include_left=True) + + # Initialize the DeepSVDD model with updated input shape + self.model_ = InnerDeepSVDD( + n_features=self.n_features, # Now determined by CNN output + use_ae=self.use_ae, + hidden_neurons=self.hidden_neurons, + hidden_activation=self.hidden_activation, + output_activation=self.output_activation, + dropout_rate=self.dropout_rate, + l2_regularizer=self.l2_regularizer, + feature_type=self.feature_type, + input_shape=self.input_shape, + ) def fit(self, X, y=None): """Fit detector. y is ignored in unsupervised methods. @@ -278,39 +314,17 @@ def fit(self, X, y=None): self : object Fitted estimator. """ - # Convert to tensor directly for 4D data and normalize if needed - if isinstance(X, np.ndarray): - X = torch.tensor(X, dtype=torch.float32) - - # Normalize the data (e.g., rescale if pixel values are in the range [0, 255]) - if X.max() > 1: - X = X / 255.0 + X_norm = self.normalization(X) - # Set CNN input shape directly - self.input_shape = X.shape[1:] # (channels, height, width) - - # Initialize the DeepSVDD model with updated input shape - self.model_ = InnerDeepSVDD( - n_features=self.n_features, # Now determined by CNN output - use_ae=self.use_ae, - hidden_neurons=self.hidden_neurons, - hidden_activation=self.hidden_activation, - output_activation=self.output_activation, - dropout_rate=self.dropout_rate, - l2_regularizer=self.l2_regularizer, - input_shape=self.input_shape, - ) - - # No need to standardize further if CNN is extracting features directly - X_norm = X - - # Initialize center c for DeepSVDD if self.c is None: self.c = 0.0 self.model_._init_c(X_norm) # Prepare DataLoader for batch processing - dataset = TensorDataset(X_norm, X_norm) + if self.feature_type == "hidden_obs": + dataset = TensorDataset(*X_norm, *X_norm) + else: + dataset = TensorDataset(X_norm, X_norm) dataloader = DataLoader(dataset, batch_size=self.batch_size, shuffle=True) best_loss = float('inf') @@ -321,7 +335,11 @@ def fit(self, X, y=None): for epoch in range(self.epochs): self.model_.train() epoch_loss = 0 - for batch_x, _ in dataloader: + for batch in dataloader: + if self.feature_type == "hidden_obs": + batch_x = batch[0], batch[1] + else: + batch_x = batch[0] outputs = self.model_(batch_x) dist = torch.sum((outputs - self.c) ** 2, dim=-1) @@ -363,16 +381,24 @@ def decision_function(self, X): anomaly_scores : numpy array of shape (n_samples,) The anomaly score of the input samples. """ - # Convert X to tensor if it isn't already, and normalize if needed - if isinstance(X, np.ndarray): - X = torch.tensor(X, dtype=torch.float32) - # Normalize data if pixel values are in [0, 255] range - if X.max() > 1: - X = X / 255.0 + X = self.normalization(X) self.model_.eval() with torch.no_grad(): outputs = self.model_(X) dist = torch.sum((outputs - self.c) ** 2, dim=-1) anomaly_scores = dist.cpu().numpy() return anomaly_scores + + def normalization(self, X): + if self.feature_type in ["obs", "hidden_obs"]: + X_img = X if self.feature_type == "obs" else X[0] + # Normalize the image data if pixel values are in the range [0, 255] + if X_img.max() > 1: + X_img = X_img / 255.0 + X_norm = X_img if self.feature_type == "obs" else [X_img, X[1]] + elif self.feature_type in ["hidden", "dist"]: + X_norm = X + else: + raise ValueError(f"Unknown feature type: {self.feature_type}") + return X_norm \ No newline at end of file