Skip to content

Commit

Permalink
fixing for various features types
Browse files Browse the repository at this point in the history
  • Loading branch information
modanesh committed Nov 28, 2024
1 parent 3378e46 commit a22eb52
Showing 1 changed file with 75 additions and 49 deletions.
124 changes: 75 additions & 49 deletions pyod/models/deep_svdd.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,46 +73,64 @@ class InnerDeepSVDD(nn.Module):
def __init__(self, n_features, use_ae,
hidden_neurons, hidden_activation,
output_activation,
dropout_rate, l2_regularizer, input_shape=None):
dropout_rate, l2_regularizer, feature_type, input_shape=None):
super(InnerDeepSVDD, self).__init__()
self.n_features = n_features
self.use_ae = use_ae
self.hidden_neurons = hidden_neurons or [64, 32]
self.hidden_activation = hidden_activation
self.output_activation = output_activation
self.dropout_rate = dropout_rate
self.l2_regularizer = l2_regularizer
self.feature_type = feature_type
self.input_shape = input_shape
self.model = self._build_model()
if self.feature_type == "obs":
self.embedder_features = n_features
self.linear_features = n_features
self.embedder = self._build_embedder()
elif self.feature_type in ["hidden", "dist"]:
self.linear_features = self.input_shape[1]
elif self.feature_type == "hidden_obs":
self.embedder_features = n_features
self.linear_features = n_features + self.input_shape[-1]
self.embedder = self._build_embedder()
self.fc_part = self._build_fc()
self.c = None # Center of the hypersphere for DeepSVDD

def _init_c(self, X_norm, eps=0.1):
intermediate_output = {}
hook_handle = self.model._modules.get('net_output').register_forward_hook(
hook_handle = self.fc_part._modules.get('net_output').register_forward_hook(
lambda module, input, output: intermediate_output.update({'net_output': output})
)
output = self.model(X_norm)
if self.feature_type in ["obs", "hidden", "dist"]:
output = self.forward(X_norm)
elif self.feature_type == "hidden_obs":
output = self.forward([X_norm[0], X_norm[1]])
out = intermediate_output['net_output']
hook_handle.remove()
self.c = torch.mean(out, dim=0)
self.c[(torch.abs(self.c) < eps) & (self.c < 0)] = -eps
self.c[(torch.abs(self.c) < eps) & (self.c > 0)] = eps

def _build_model(self):
def _build_embedder(self):
if len(self.input_shape) == 3:
channels = self.input_shape[0]
else:
channels = self.input_shape[1]
layers = nn.Sequential()

channels = self.input_shape[0]
layers.add_module('cnn_layer1', nn.Conv2d(channels, 16, kernel_size=3, stride=1, padding=1))
layers.add_module('cnn_activation1', nn.ReLU())
layers.add_module('cnn_pool', nn.MaxPool2d(kernel_size=2, stride=2))
layers.add_module('cnn_layer2', nn.Conv2d(16, 32, kernel_size=3, stride=1, padding=1))
layers.add_module('cnn_activation2', nn.ReLU())
layers.add_module('cnn_adaptive_pool', nn.AdaptiveMaxPool2d((32, 32)))
layers.add_module('flatten', nn.Flatten())
layers.add_module('cnn_fc', nn.Linear(32 * 32 * 32, self.n_features, bias=False))
layers.add_module('cnn_fc', nn.Linear(32 * 32 * 32, self.embedder_features, bias=False))
layers.add_module('cnn_fc_activation', nn.ReLU())
return layers

layers.add_module('input_layer', nn.Linear(self.n_features, self.hidden_neurons[0], bias=False))
def _build_fc(self):
layers = nn.Sequential()
layers.add_module('input_layer', nn.Linear(self.linear_features, self.hidden_neurons[0], bias=False))
layers.add_module('hidden_activation_e0', get_activation_by_name(self.hidden_activation))
for i in range(1, len(self.hidden_neurons) - 1):
layers.add_module(f'hidden_layer_e{i}', nn.Linear(self.hidden_neurons[i - 1], self.hidden_neurons[i], bias=False))
Expand All @@ -133,8 +151,13 @@ def _build_model(self):
return layers

def forward(self, x):
return self.model(x)

if self.feature_type == "obs":
x = self.embedder(x)
elif self.feature_type == "hidden_obs":
features = self.embedder(x[0])
x = torch.cat([features, x[1]], dim=-1)
x = self.fc_part(x)
return x

class DeepSVDD(BaseDetector):
"""Deep One-Class Classifier with AutoEncoder (AE) is a type of neural
Expand Down Expand Up @@ -233,7 +256,7 @@ def __init__(self, n_features, c=None, use_ae=False, hidden_neurons=None,
hidden_activation='relu',
output_activation='sigmoid', optimizer='adam', epochs=100,
batch_size=32,
dropout_rate=0.2, l2_regularizer=0.1, validation_size=0.1,
dropout_rate=0.2, l2_regularizer=0.1, feature_type="obs", validation_size=0.1,
preprocessing=True,
verbose=1, random_state=None, contamination=0.1, input_shape=None):
super(DeepSVDD, self).__init__(contamination=contamination)
Expand All @@ -249,6 +272,7 @@ def __init__(self, n_features, c=None, use_ae=False, hidden_neurons=None,
self.batch_size = batch_size
self.dropout_rate = dropout_rate
self.l2_regularizer = l2_regularizer
self.feature_type = feature_type
self.validation_size = validation_size
self.preprocessing = preprocessing
self.verbose = verbose
Expand All @@ -259,8 +283,20 @@ def __init__(self, n_features, c=None, use_ae=False, hidden_neurons=None,

if self.random_state is not None:
torch.manual_seed(self.random_state)
check_parameter(dropout_rate, 0, 1, param_name='dropout_rate',
include_left=True)
check_parameter(dropout_rate, 0, 1, param_name='dropout_rate', include_left=True)

# Initialize the DeepSVDD model with updated input shape
self.model_ = InnerDeepSVDD(
n_features=self.n_features, # Now determined by CNN output
use_ae=self.use_ae,
hidden_neurons=self.hidden_neurons,
hidden_activation=self.hidden_activation,
output_activation=self.output_activation,
dropout_rate=self.dropout_rate,
l2_regularizer=self.l2_regularizer,
feature_type=self.feature_type,
input_shape=self.input_shape,
)

def fit(self, X, y=None):
"""Fit detector. y is ignored in unsupervised methods.
Expand All @@ -278,39 +314,17 @@ def fit(self, X, y=None):
self : object
Fitted estimator.
"""
# Convert to tensor directly for 4D data and normalize if needed
if isinstance(X, np.ndarray):
X = torch.tensor(X, dtype=torch.float32)

# Normalize the data (e.g., rescale if pixel values are in the range [0, 255])
if X.max() > 1:
X = X / 255.0
X_norm = self.normalization(X)

# Set CNN input shape directly
self.input_shape = X.shape[1:] # (channels, height, width)

# Initialize the DeepSVDD model with updated input shape
self.model_ = InnerDeepSVDD(
n_features=self.n_features, # Now determined by CNN output
use_ae=self.use_ae,
hidden_neurons=self.hidden_neurons,
hidden_activation=self.hidden_activation,
output_activation=self.output_activation,
dropout_rate=self.dropout_rate,
l2_regularizer=self.l2_regularizer,
input_shape=self.input_shape,
)

# No need to standardize further if CNN is extracting features directly
X_norm = X

# Initialize center c for DeepSVDD
if self.c is None:
self.c = 0.0
self.model_._init_c(X_norm)

# Prepare DataLoader for batch processing
dataset = TensorDataset(X_norm, X_norm)
if self.feature_type == "hidden_obs":
dataset = TensorDataset(*X_norm, *X_norm)
else:
dataset = TensorDataset(X_norm, X_norm)
dataloader = DataLoader(dataset, batch_size=self.batch_size, shuffle=True)

best_loss = float('inf')
Expand All @@ -321,7 +335,11 @@ def fit(self, X, y=None):
for epoch in range(self.epochs):
self.model_.train()
epoch_loss = 0
for batch_x, _ in dataloader:
for batch in dataloader:
if self.feature_type == "hidden_obs":
batch_x = batch[0], batch[1]
else:
batch_x = batch[0]
outputs = self.model_(batch_x)
dist = torch.sum((outputs - self.c) ** 2, dim=-1)

Expand Down Expand Up @@ -363,16 +381,24 @@ def decision_function(self, X):
anomaly_scores : numpy array of shape (n_samples,)
The anomaly score of the input samples.
"""
# Convert X to tensor if it isn't already, and normalize if needed
if isinstance(X, np.ndarray):
X = torch.tensor(X, dtype=torch.float32)

# Normalize data if pixel values are in [0, 255] range
if X.max() > 1:
X = X / 255.0
X = self.normalization(X)
self.model_.eval()
with torch.no_grad():
outputs = self.model_(X)
dist = torch.sum((outputs - self.c) ** 2, dim=-1)
anomaly_scores = dist.cpu().numpy()
return anomaly_scores

def normalization(self, X):
if self.feature_type in ["obs", "hidden_obs"]:
X_img = X if self.feature_type == "obs" else X[0]
# Normalize the image data if pixel values are in the range [0, 255]
if X_img.max() > 1:
X_img = X_img / 255.0
X_norm = X_img if self.feature_type == "obs" else [X_img, X[1]]
elif self.feature_type in ["hidden", "dist"]:
X_norm = X
else:
raise ValueError(f"Unknown feature type: {self.feature_type}")
return X_norm

0 comments on commit a22eb52

Please sign in to comment.