Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix ECOD and COPOD implementation according to #453 #493

Open
wants to merge 4 commits into
base: development
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -176,3 +176,4 @@ v<1.0.7>, <12/14/2022> -- Enable automatic thresholding by pythresh (#454).
v<1.0.8>, <03/08/2023> -- Improve clone compatibility (#471).
v<1.0.8>, <03/08/2023> -- Add QMCD detector (#452).
v<1.0.8>, <03/08/2023> -- Optimized ECDF and drop Statsmodels dependency (#467).
v<1.0.9>, <03/19/2023> -- Hot fix for errors in ECOD and COPOD due to the issue of scipy.
112 changes: 51 additions & 61 deletions pyod/models/copod.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,18 @@
import matplotlib.pyplot as plt
import numpy as np
from joblib import Parallel, delayed
from scipy.stats import skew
from scipy.stats import skew as skew_sp
from sklearn.utils import check_array

from .base import BaseDetector
from .sklearn_base import _partition_estimators
from ..utils.stat_models import column_ecdf


def skew(X, axis=0):
return np.nan_to_num(skew_sp(X, axis=axis))


def _parallel_ecdf(n_dims, X):
"""Private method to calculate ecdf in parallel.
Parameters
Expand Down Expand Up @@ -120,82 +124,68 @@ def decision_function(self, X):
anomaly_scores : numpy array of shape (n_samples,)
The anomaly score of the input samples.
"""
# use multi-thread execution
if self.n_jobs != 1:
return self._decision_function_parallel(X)

# merge training and test data
if hasattr(self, 'X_train'):
original_size = X.shape[0]
X = np.concatenate((self.X_train, X), axis=0)
self.U_l = -1 * np.log(column_ecdf(X))
self.U_r = -1 * np.log(column_ecdf(-X))

skewness = np.sign(skew(X, axis=0))
self.U_skew = self.U_l * -1 * np.sign(
skewness - 1) + self.U_r * np.sign(skewness + 1)
self.O = np.maximum(self.U_skew, np.add(self.U_l, self.U_r) / 2)
if hasattr(self, 'X_train'):
decision_scores_ = self.O.sum(axis=1)[-original_size:]
else:
decision_scores_ = self.O.sum(axis=1)
return decision_scores_.ravel()
# use multi-thread execution
if self.n_jobs != 1:
n_samples, n_features = X.shape[0], X.shape[1]

def _decision_function_parallel(self, X):
"""Predict raw anomaly score of X using the fitted detector.
For consistency, outliers are assigned with larger anomaly scores.
Parameters
----------
X : numpy array of shape (n_samples, n_features)
The training input samples. Sparse matrices are accepted only
if they are supported by the base estimator.
Returns
-------
anomaly_scores : numpy array of shape (n_samples,)
The anomaly score of the input samples.
"""
if hasattr(self, 'X_train'):
original_size = X.shape[0]
X = np.concatenate((self.X_train, X), axis=0)
if n_features < 2:
raise ValueError(
'n_jobs should not be used on one dimensional dataset')

if n_features <= self.n_jobs:
self.n_jobs = n_features
warnings.warn("n_features <= n_jobs; setting them equal instead.")

n_jobs, n_dims_list, starts = _partition_estimators(n_features,
self.n_jobs)

n_samples, n_features = X.shape[0], X.shape[1]
all_results = Parallel(n_jobs=n_jobs, max_nbytes=None,
verbose=True)(
delayed(_parallel_ecdf)(
n_dims_list[i],
X[:, starts[i]:starts[i + 1]],
)
for i in range(n_jobs))

if n_features < 2:
raise ValueError(
'n_jobs should not be used on one dimensional dataset')
# recover the results
self.U_l = np.zeros([n_samples, n_features])
self.U_r = np.zeros([n_samples, n_features])

if n_features <= self.n_jobs:
self.n_jobs = n_features
warnings.warn("n_features <= n_jobs; setting them equal instead.")
for i in range(n_jobs):
self.U_l[:, starts[i]:starts[i + 1]] = all_results[i][0]
self.U_r[:, starts[i]:starts[i + 1]] = all_results[i][1]

n_jobs, n_dims_list, starts = _partition_estimators(n_features,
self.n_jobs)
self.U_l = -1 * np.log(self.U_l)
self.U_r = -1 * np.log(self.U_r)

all_results = Parallel(n_jobs=n_jobs, max_nbytes=None,
verbose=True)(
delayed(_parallel_ecdf)(
n_dims_list[i],
X[:, starts[i]:starts[i + 1]],
)
for i in range(n_jobs))
else: # no parallel jobs
self.U_l = -1 * np.log(column_ecdf(X))
self.U_r = -1 * np.log(column_ecdf(-X))

# recover the results
self.U_l = np.zeros([n_samples, n_features])
self.U_r = np.zeros([n_samples, n_features])
# compute the skewness corrected tail probability
skewness_weight = np.sign(skew(X, axis=0)) < 0
self.U_skew = self.U_l * skewness_weight + self.U_r * (~skewness_weight)

for i in range(n_jobs):
self.U_l[:, starts[i]:starts[i + 1]] = all_results[i][0]
self.U_r[:, starts[i]:starts[i + 1]] = all_results[i][1]
# compute left, right and skewness log tail probabilitity
O_left = self.U_l.sum(axis=1)
O_right = self.U_r.sum(axis=1)
O_skew = self.U_skew.sum(axis=1)

self.U_l = -1 * np.log(self.U_l)
self.U_r = -1 * np.log(self.U_r)
# get the maximum for each of the three probabilities as outlier probability for each sample
self.O = np.max(np.concatenate((O_left.reshape(-1, 1), O_right.reshape(-1, 1), O_skew.reshape(-1, 1)), axis=1),
axis=1)

skewness = np.sign(skew(X, axis=0))
self.U_skew = self.U_l * -1 * np.sign(
skewness - 1) + self.U_r * np.sign(skewness + 1)
self.O = np.maximum(self.U_skew, np.add(self.U_l, self.U_r) / 2)
# trim the decision scores
if hasattr(self, 'X_train'):
decision_scores_ = self.O.sum(axis=1)[-original_size:]
decision_scores_ = self.O[-original_size:]
else:
decision_scores_ = self.O.sum(axis=1)
decision_scores_ = self.O
return decision_scores_.ravel()

def explain_outlier(self, ind, columns=None, cutoffs=None,
Expand Down
109 changes: 46 additions & 63 deletions pyod/models/ecod.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,18 @@
import matplotlib.pyplot as plt
import numpy as np
from joblib import Parallel, delayed
from scipy.stats import skew
from scipy.stats import skew as skew_sp
from sklearn.utils import check_array

from .base import BaseDetector
from .sklearn_base import _partition_estimators
from ..utils.stat_models import column_ecdf


def skew(X, axis=0):
return np.nan_to_num(skew_sp(X, axis=axis))


def _parallel_ecdf(n_dims, X):
"""Private method to calculate ecdf in parallel.
Parameters
Expand Down Expand Up @@ -122,88 +126,67 @@ def decision_function(self, X):
anomaly_scores : numpy array of shape (n_samples,)
The anomaly score of the input samples.
"""
# use multi-thread execution
if self.n_jobs != 1:
return self._decision_function_parallel(X)

if hasattr(self, 'X_train'):
original_size = X.shape[0]
X = np.concatenate((self.X_train, X), axis=0)
self.U_l = -1 * np.log(column_ecdf(X))
self.U_r = -1 * np.log(column_ecdf(-X))

skewness = np.sign(skew(X, axis=0))
self.U_skew = self.U_l * -1 * np.sign(
skewness - 1) + self.U_r * np.sign(skewness + 1)

self.O = np.maximum(self.U_l, self.U_r)
self.O = np.maximum(self.U_skew, self.O)

if hasattr(self, 'X_train'):
decision_scores_ = self.O.sum(axis=1)[-original_size:]
else:
decision_scores_ = self.O.sum(axis=1)
return decision_scores_.ravel()
# use multi-thread execution for ecdf estimation
if self.n_jobs != 1:

def _decision_function_parallel(self, X):
"""Predict raw anomaly score of X using the fitted detector.
For consistency, outliers are assigned with larger anomaly scores.
Parameters
----------
X : numpy array of shape (n_samples, n_features)
The training input samples. Sparse matrices are accepted only
if they are supported by the base estimator.
Returns
-------
anomaly_scores : numpy array of shape (n_samples,)
The anomaly score of the input samples.
"""
if hasattr(self, 'X_train'):
original_size = X.shape[0]
X = np.concatenate((self.X_train, X), axis=0)
# get the number of samples and features
n_samples, n_features = X.shape[0], X.shape[1]

n_samples, n_features = X.shape[0], X.shape[1]
if n_features < 2:
raise ValueError(
'n_jobs should not be used on one dimensional dataset')

if n_features < 2:
raise ValueError(
'n_jobs should not be used on one dimensional dataset')
if n_features <= self.n_jobs:
self.n_jobs = n_features
warnings.warn("n_features <= n_jobs; setting them equal instead.")

if n_features <= self.n_jobs:
self.n_jobs = n_features
warnings.warn("n_features <= n_jobs; setting them equal instead.")
n_jobs, n_dims_list, starts = _partition_estimators(n_features, self.n_jobs)

n_jobs, n_dims_list, starts = _partition_estimators(n_features,
self.n_jobs)
all_results = Parallel(n_jobs=n_jobs, max_nbytes=None, verbose=True)(delayed(_parallel_ecdf)(
n_dims_list[i], X[:, starts[i]:starts[i + 1]],) for i in range(n_jobs))

all_results = Parallel(n_jobs=n_jobs, max_nbytes=None,
verbose=True)(
delayed(_parallel_ecdf)(
n_dims_list[i],
X[:, starts[i]:starts[i + 1]],
)
for i in range(n_jobs))
# recover the results
self.U_l = np.zeros([n_samples, n_features])
self.U_r = np.zeros([n_samples, n_features])
for i in range(n_jobs):
self.U_l[:, starts[i]:starts[i + 1]] = all_results[i][0]
self.U_r[:, starts[i]:starts[i + 1]] = all_results[i][1]

# recover the results
self.U_l = np.zeros([n_samples, n_features])
self.U_r = np.zeros([n_samples, n_features])
# single threaded execution of ecdf estimation
else:

for i in range(n_jobs):
self.U_l[:, starts[i]:starts[i + 1]] = all_results[i][0]
self.U_r[:, starts[i]:starts[i + 1]] = all_results[i][1]
# estimate the ecdfs
self.U_l = column_ecdf(X)
self.U_r = column_ecdf(-X)

# calculate log probability per sample and feature
self.U_l = -1 * np.log(self.U_l)

# calculate log probability per sample and feature
self.U_r = -1 * np.log(self.U_r)

skewness = np.sign(skew(X, axis=0))
self.U_skew = self.U_l * -1 * np.sign(
skewness - 1) + self.U_r * np.sign(skewness + 1)
# calculate the skewed tail probability
skewness_weight = skew(X, axis=0) < 0
self.U_skew = self.U_l * skewness_weight + self.U_r * (~skewness_weight)

# compute left, right and skewness log tail probabilitity
O_left = self.U_l.sum(axis=1)
O_right = self.U_r.sum(axis=1)
O_skew = self.U_skew.sum(axis=1)

self.O = np.maximum(self.U_l, self.U_r)
self.O = np.maximum(self.U_skew, self.O)
# get the maximum for each of the three probabilities as outlier probability for each sample
self.O = np.max(np.concatenate((O_left.reshape(-1, 1), O_right.reshape(-1, 1), O_skew.reshape(-1, 1)), axis=1),
axis=1)

if hasattr(self, 'X_train'):
decision_scores_ = self.O.sum(axis=1)[-original_size:]
decision_scores_ = self.O[-original_size:]
else:
decision_scores_ = self.O.sum(axis=1)
decision_scores_ = self.O
return decision_scores_.ravel()

def explain_outlier(self, ind, columns=None, cutoffs=None,
Expand Down
2 changes: 1 addition & 1 deletion pyod/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,4 @@
# Dev branch marker is: 'X.Y.dev' or 'X.Y.devN' where N is an integer.
# 'X.Y.dev0' is the canonical version of 'X.Y.dev'
#
__version__ = '1.0.8' # pragma: no cover
__version__ = '1.0.9' # pragma: no cover