diff --git a/CHANGES.txt b/CHANGES.txt index cecfc6bf8..b2ba2b591 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -176,3 +176,4 @@ v<1.0.7>, <12/14/2022> -- Enable automatic thresholding by pythresh (#454). v<1.0.8>, <03/08/2023> -- Improve clone compatibility (#471). v<1.0.8>, <03/08/2023> -- Add QMCD detector (#452). v<1.0.8>, <03/08/2023> -- Optimized ECDF and drop Statsmodels dependency (#467). +v<1.0.9>, <03/19/2023> -- Hot fix for errors in ECOD and COPOD due to the issue of scipy. diff --git a/pyod/models/copod.py b/pyod/models/copod.py index 3286e6d83..392ac082a 100644 --- a/pyod/models/copod.py +++ b/pyod/models/copod.py @@ -12,7 +12,7 @@ import matplotlib.pyplot as plt import numpy as np from joblib import Parallel, delayed -from scipy.stats import skew +from scipy.stats import skew as skew_sp from sklearn.utils import check_array from .base import BaseDetector @@ -20,6 +20,10 @@ from ..utils.stat_models import column_ecdf +def skew(X, axis=0): + return np.nan_to_num(skew_sp(X, axis=axis)) + + def _parallel_ecdf(n_dims, X): """Private method to calculate ecdf in parallel. Parameters @@ -120,82 +124,68 @@ def decision_function(self, X): anomaly_scores : numpy array of shape (n_samples,) The anomaly score of the input samples. """ - # use multi-thread execution - if self.n_jobs != 1: - return self._decision_function_parallel(X) + + # merge training and test data if hasattr(self, 'X_train'): original_size = X.shape[0] X = np.concatenate((self.X_train, X), axis=0) - self.U_l = -1 * np.log(column_ecdf(X)) - self.U_r = -1 * np.log(column_ecdf(-X)) - skewness = np.sign(skew(X, axis=0)) - self.U_skew = self.U_l * -1 * np.sign( - skewness - 1) + self.U_r * np.sign(skewness + 1) - self.O = np.maximum(self.U_skew, np.add(self.U_l, self.U_r) / 2) - if hasattr(self, 'X_train'): - decision_scores_ = self.O.sum(axis=1)[-original_size:] - else: - decision_scores_ = self.O.sum(axis=1) - return decision_scores_.ravel() + # use multi-thread execution + if self.n_jobs != 1: + n_samples, n_features = X.shape[0], X.shape[1] - def _decision_function_parallel(self, X): - """Predict raw anomaly score of X using the fitted detector. - For consistency, outliers are assigned with larger anomaly scores. - Parameters - ---------- - X : numpy array of shape (n_samples, n_features) - The training input samples. Sparse matrices are accepted only - if they are supported by the base estimator. - Returns - ------- - anomaly_scores : numpy array of shape (n_samples,) - The anomaly score of the input samples. - """ - if hasattr(self, 'X_train'): - original_size = X.shape[0] - X = np.concatenate((self.X_train, X), axis=0) + if n_features < 2: + raise ValueError( + 'n_jobs should not be used on one dimensional dataset') + + if n_features <= self.n_jobs: + self.n_jobs = n_features + warnings.warn("n_features <= n_jobs; setting them equal instead.") + + n_jobs, n_dims_list, starts = _partition_estimators(n_features, + self.n_jobs) - n_samples, n_features = X.shape[0], X.shape[1] + all_results = Parallel(n_jobs=n_jobs, max_nbytes=None, + verbose=True)( + delayed(_parallel_ecdf)( + n_dims_list[i], + X[:, starts[i]:starts[i + 1]], + ) + for i in range(n_jobs)) - if n_features < 2: - raise ValueError( - 'n_jobs should not be used on one dimensional dataset') + # recover the results + self.U_l = np.zeros([n_samples, n_features]) + self.U_r = np.zeros([n_samples, n_features]) - if n_features <= self.n_jobs: - self.n_jobs = n_features - warnings.warn("n_features <= n_jobs; setting them equal instead.") + for i in range(n_jobs): + self.U_l[:, starts[i]:starts[i + 1]] = all_results[i][0] + self.U_r[:, starts[i]:starts[i + 1]] = all_results[i][1] - n_jobs, n_dims_list, starts = _partition_estimators(n_features, - self.n_jobs) + self.U_l = -1 * np.log(self.U_l) + self.U_r = -1 * np.log(self.U_r) - all_results = Parallel(n_jobs=n_jobs, max_nbytes=None, - verbose=True)( - delayed(_parallel_ecdf)( - n_dims_list[i], - X[:, starts[i]:starts[i + 1]], - ) - for i in range(n_jobs)) + else: # no parallel jobs + self.U_l = -1 * np.log(column_ecdf(X)) + self.U_r = -1 * np.log(column_ecdf(-X)) - # recover the results - self.U_l = np.zeros([n_samples, n_features]) - self.U_r = np.zeros([n_samples, n_features]) + # compute the skewness corrected tail probability + skewness_weight = np.sign(skew(X, axis=0)) < 0 + self.U_skew = self.U_l * skewness_weight + self.U_r * (~skewness_weight) - for i in range(n_jobs): - self.U_l[:, starts[i]:starts[i + 1]] = all_results[i][0] - self.U_r[:, starts[i]:starts[i + 1]] = all_results[i][1] + # compute left, right and skewness log tail probabilitity + O_left = self.U_l.sum(axis=1) + O_right = self.U_r.sum(axis=1) + O_skew = self.U_skew.sum(axis=1) - self.U_l = -1 * np.log(self.U_l) - self.U_r = -1 * np.log(self.U_r) + # get the maximum for each of the three probabilities as outlier probability for each sample + self.O = np.max(np.concatenate((O_left.reshape(-1, 1), O_right.reshape(-1, 1), O_skew.reshape(-1, 1)), axis=1), + axis=1) - skewness = np.sign(skew(X, axis=0)) - self.U_skew = self.U_l * -1 * np.sign( - skewness - 1) + self.U_r * np.sign(skewness + 1) - self.O = np.maximum(self.U_skew, np.add(self.U_l, self.U_r) / 2) + # trim the decision scores if hasattr(self, 'X_train'): - decision_scores_ = self.O.sum(axis=1)[-original_size:] + decision_scores_ = self.O[-original_size:] else: - decision_scores_ = self.O.sum(axis=1) + decision_scores_ = self.O return decision_scores_.ravel() def explain_outlier(self, ind, columns=None, cutoffs=None, diff --git a/pyod/models/ecod.py b/pyod/models/ecod.py index df00a7c5a..f7e4496fc 100644 --- a/pyod/models/ecod.py +++ b/pyod/models/ecod.py @@ -13,7 +13,7 @@ import matplotlib.pyplot as plt import numpy as np from joblib import Parallel, delayed -from scipy.stats import skew +from scipy.stats import skew as skew_sp from sklearn.utils import check_array from .base import BaseDetector @@ -21,6 +21,10 @@ from ..utils.stat_models import column_ecdf +def skew(X, axis=0): + return np.nan_to_num(skew_sp(X, axis=axis)) + + def _parallel_ecdf(n_dims, X): """Private method to calculate ecdf in parallel. Parameters @@ -122,88 +126,67 @@ def decision_function(self, X): anomaly_scores : numpy array of shape (n_samples,) The anomaly score of the input samples. """ - # use multi-thread execution - if self.n_jobs != 1: - return self._decision_function_parallel(X) + if hasattr(self, 'X_train'): original_size = X.shape[0] X = np.concatenate((self.X_train, X), axis=0) - self.U_l = -1 * np.log(column_ecdf(X)) - self.U_r = -1 * np.log(column_ecdf(-X)) - skewness = np.sign(skew(X, axis=0)) - self.U_skew = self.U_l * -1 * np.sign( - skewness - 1) + self.U_r * np.sign(skewness + 1) - - self.O = np.maximum(self.U_l, self.U_r) - self.O = np.maximum(self.U_skew, self.O) - - if hasattr(self, 'X_train'): - decision_scores_ = self.O.sum(axis=1)[-original_size:] - else: - decision_scores_ = self.O.sum(axis=1) - return decision_scores_.ravel() + # use multi-thread execution for ecdf estimation + if self.n_jobs != 1: - def _decision_function_parallel(self, X): - """Predict raw anomaly score of X using the fitted detector. - For consistency, outliers are assigned with larger anomaly scores. - Parameters - ---------- - X : numpy array of shape (n_samples, n_features) - The training input samples. Sparse matrices are accepted only - if they are supported by the base estimator. - Returns - ------- - anomaly_scores : numpy array of shape (n_samples,) - The anomaly score of the input samples. - """ - if hasattr(self, 'X_train'): - original_size = X.shape[0] - X = np.concatenate((self.X_train, X), axis=0) + # get the number of samples and features + n_samples, n_features = X.shape[0], X.shape[1] - n_samples, n_features = X.shape[0], X.shape[1] + if n_features < 2: + raise ValueError( + 'n_jobs should not be used on one dimensional dataset') - if n_features < 2: - raise ValueError( - 'n_jobs should not be used on one dimensional dataset') + if n_features <= self.n_jobs: + self.n_jobs = n_features + warnings.warn("n_features <= n_jobs; setting them equal instead.") - if n_features <= self.n_jobs: - self.n_jobs = n_features - warnings.warn("n_features <= n_jobs; setting them equal instead.") + n_jobs, n_dims_list, starts = _partition_estimators(n_features, self.n_jobs) - n_jobs, n_dims_list, starts = _partition_estimators(n_features, - self.n_jobs) + all_results = Parallel(n_jobs=n_jobs, max_nbytes=None, verbose=True)(delayed(_parallel_ecdf)( + n_dims_list[i], X[:, starts[i]:starts[i + 1]],) for i in range(n_jobs)) - all_results = Parallel(n_jobs=n_jobs, max_nbytes=None, - verbose=True)( - delayed(_parallel_ecdf)( - n_dims_list[i], - X[:, starts[i]:starts[i + 1]], - ) - for i in range(n_jobs)) + # recover the results + self.U_l = np.zeros([n_samples, n_features]) + self.U_r = np.zeros([n_samples, n_features]) + for i in range(n_jobs): + self.U_l[:, starts[i]:starts[i + 1]] = all_results[i][0] + self.U_r[:, starts[i]:starts[i + 1]] = all_results[i][1] - # recover the results - self.U_l = np.zeros([n_samples, n_features]) - self.U_r = np.zeros([n_samples, n_features]) + # single threaded execution of ecdf estimation + else: - for i in range(n_jobs): - self.U_l[:, starts[i]:starts[i + 1]] = all_results[i][0] - self.U_r[:, starts[i]:starts[i + 1]] = all_results[i][1] + # estimate the ecdfs + self.U_l = column_ecdf(X) + self.U_r = column_ecdf(-X) + # calculate log probability per sample and feature self.U_l = -1 * np.log(self.U_l) + + # calculate log probability per sample and feature self.U_r = -1 * np.log(self.U_r) - skewness = np.sign(skew(X, axis=0)) - self.U_skew = self.U_l * -1 * np.sign( - skewness - 1) + self.U_r * np.sign(skewness + 1) + # calculate the skewed tail probability + skewness_weight = skew(X, axis=0) < 0 + self.U_skew = self.U_l * skewness_weight + self.U_r * (~skewness_weight) + + # compute left, right and skewness log tail probabilitity + O_left = self.U_l.sum(axis=1) + O_right = self.U_r.sum(axis=1) + O_skew = self.U_skew.sum(axis=1) - self.O = np.maximum(self.U_l, self.U_r) - self.O = np.maximum(self.U_skew, self.O) + # get the maximum for each of the three probabilities as outlier probability for each sample + self.O = np.max(np.concatenate((O_left.reshape(-1, 1), O_right.reshape(-1, 1), O_skew.reshape(-1, 1)), axis=1), + axis=1) if hasattr(self, 'X_train'): - decision_scores_ = self.O.sum(axis=1)[-original_size:] + decision_scores_ = self.O[-original_size:] else: - decision_scores_ = self.O.sum(axis=1) + decision_scores_ = self.O return decision_scores_.ravel() def explain_outlier(self, ind, columns=None, cutoffs=None, diff --git a/pyod/version.py b/pyod/version.py index b8b77cde2..51aef5c17 100644 --- a/pyod/version.py +++ b/pyod/version.py @@ -20,4 +20,4 @@ # Dev branch marker is: 'X.Y.dev' or 'X.Y.devN' where N is an integer. # 'X.Y.dev0' is the canonical version of 'X.Y.dev' # -__version__ = '1.0.8' # pragma: no cover +__version__ = '1.0.9' # pragma: no cover