From 9cb78d69bfe0beab18fa776465f1fc3e9cd72ea9 Mon Sep 17 00:00:00 2001 From: Joshua Wasserman Date: Fri, 21 May 2021 15:23:44 -0500 Subject: [PATCH 1/2] ENH changes --- muffnn/autoencoder/autoencoder.py | 2 +- muffnn/fm/fm_classifier.py | 2 +- muffnn/fm/fm_regressor.py | 2 +- muffnn/mlp/mlp_classifier.py | 2 +- muffnn/mlp/mlp_regressor.py | 2 +- muffnn/mlp/tf2_mlp_classifier.py | 237 ++++++++++++++++++++++++++++++ requirements.txt | 2 +- setup.py | 2 +- 8 files changed, 244 insertions(+), 7 deletions(-) create mode 100644 muffnn/mlp/tf2_mlp_classifier.py diff --git a/muffnn/autoencoder/autoencoder.py b/muffnn/autoencoder/autoencoder.py index 17499a8..41196ae 100644 --- a/muffnn/autoencoder/autoencoder.py +++ b/muffnn/autoencoder/autoencoder.py @@ -215,7 +215,7 @@ def _set_up_graph(self): message=("Converting sparse IndexedSlices to a dense Tensor " "of unknown shape"), module='tensorflow') - self._train_step = tf.train.AdamOptimizer( + self._train_step = tf.keras.optimizers.Adam( learning_rate=self.learning_rate).minimize(self._obj_func) def _build_output_layer_and_scores(self, t): diff --git a/muffnn/fm/fm_classifier.py b/muffnn/fm/fm_classifier.py index 45e3531..5eff56e 100644 --- a/muffnn/fm/fm_classifier.py +++ b/muffnn/fm/fm_classifier.py @@ -63,7 +63,7 @@ class FMClassifier(TFPicklingBase, ClassifierMixin, BaseEstimator): """ def __init__(self, rank=8, batch_size=64, n_epochs=5, random_state=None, lambda_v=0.0, - lambda_beta=0.0, solver=tf.train.AdadeltaOptimizer, + lambda_beta=0.0, solver=tf.keras.optimizers.Adadelta, init_scale=0.1, solver_kwargs=None): self.rank = rank self.batch_size = batch_size diff --git a/muffnn/fm/fm_regressor.py b/muffnn/fm/fm_regressor.py index 8475d0b..758ec79 100644 --- a/muffnn/fm/fm_regressor.py +++ b/muffnn/fm/fm_regressor.py @@ -58,7 +58,7 @@ class FMRegressor(TFPicklingBase, RegressorMixin, BaseEstimator): """ def __init__(self, rank=8, batch_size=64, n_epochs=5, random_state=None, lambda_v=0.0, - lambda_beta=0.0, solver=tf.train.AdadeltaOptimizer, + lambda_beta=0.0, solver=tf.keras.optimizers.Adadelta, init_scale=0.1, solver_kwargs=None): self.rank = rank self.batch_size = batch_size diff --git a/muffnn/mlp/mlp_classifier.py b/muffnn/mlp/mlp_classifier.py index 8f3012f..fda8f96 100644 --- a/muffnn/mlp/mlp_classifier.py +++ b/muffnn/mlp/mlp_classifier.py @@ -82,7 +82,7 @@ class MLPClassifier(MLPBaseEstimator, ClassifierMixin): def __init__(self, hidden_units=(256,), batch_size=64, n_epochs=5, keep_prob=1.0, activation=nn.relu, - random_state=None, solver=tf.train.AdamOptimizer, + random_state=None, solver=tf.keras.optimizers.Adam, solver_kwargs=None, transform_layer_index=None): self.hidden_units = hidden_units self.batch_size = batch_size diff --git a/muffnn/mlp/mlp_regressor.py b/muffnn/mlp/mlp_regressor.py index 798408b..fbe8478 100644 --- a/muffnn/mlp/mlp_regressor.py +++ b/muffnn/mlp/mlp_regressor.py @@ -83,7 +83,7 @@ class MLPRegressor(MLPBaseEstimator, RegressorMixin): def __init__(self, hidden_units=(256,), batch_size=64, n_epochs=5, keep_prob=1.0, activation=nn.relu, - random_state=None, solver=tf.train.AdamOptimizer, + random_state=None, solver=tf.keras.optimizers.Adam, solver_kwargs=None, transform_layer_index=None): self.hidden_units = hidden_units self.batch_size = batch_size diff --git a/muffnn/mlp/tf2_mlp_classifier.py b/muffnn/mlp/tf2_mlp_classifier.py new file mode 100644 index 0000000..8de92a3 --- /dev/null +++ b/muffnn/mlp/tf2_mlp_classifier.py @@ -0,0 +1,237 @@ +import logging +import time + +import civis +from itertools import product +import numpy as np +import pandas as pd +from patsy import dmatrix +import scipy +import scipy.sparse as sp +from sklearn.exceptions import NotFittedError +from sklearn.linear_model import LogisticRegression +from sklearn.metrics import roc_auc_score +from sklearn.model_selection import cross_val_predict +from sklearn.preprocessing import LabelEncoder +from sklearn.utils import check_random_state +import tensorflow as tf + +from muffnn import FMClassifier + +logger = logging.getLogger('FMClassifier') +logger.setLevel('DEBUG') + + +class TF2MockFMClassifier(FMClassifier): + def __init__(self, kwargs={'solver': tf.keras.optimizers.Adam, + 'solver_kwargs': {'learning_rate': 0.1}, + }): + super().__init__(**kwargs) + + def _init_vars(self, x, y, classes=None): + """Initialize TF objects (needed before fitting or restoring).""" + if not self._is_fitted: + self._random_state = check_random_state(self.random_state) + assert self.batch_size > 0, "batch_size <= 0" + + self.n_dims_ = x.shape[1] + + if classes is not None: + self._enc = LabelEncoder().fit(classes) + else: + self._enc = LabelEncoder().fit(y) + + self.classes_ = self._enc.classes_ + self.n_classes_ = len(self.classes_) + + if self.n_classes_ <= 2: + self._output_size = 1 + else: + self._output_size = self.n_classes + + if sp.issparse(x): + self.is_sparse_ = True + else: + self.is_sparse_ = False + + tf.random.set_seed(self._random_state.randint(0, 10000000)) + self._v = tf.Variable(tf.ones(shape=(self.rank, self.n_dims_, self._output_size)), + name="v") + self._beta = tf.Variable(tf.ones(shape=(self.n_dims_, self._output_size)), + name="beta") + self._beta0 = tf.Variable(tf.zeros(shape=(self._output_size)), name="beta0") + + self._solver = self.solver(**self.solver_kwargs if self.solver_kwargs else {}) + + def __call__(self, x, v, beta, beta0): + x2 = x * x + vx = tf.stack([tf.linalg.matmul(x, v[i, :, :]) + for i in range(self.rank)], axis=-1) + v2 = v * v + v2x2 = tf.stack([tf.linalg.matmul(x2, v2[i, :, :]) + for i in range(self.rank)], axis=-1) + int_term = 0.5 * tf.math.reduce_sum(tf.square(vx) - v2x2, axis=-1) + return beta0 + tf.linalg.matmul(x, beta) + int_term + + def _fit(self, x, y, sample_weight=None, classes=None): + def loss_fn(y, logits, sample_weights): + def reduce_weighted_mean(loss, weights): + weighted = tf.math.multiply(loss, weights) + return tf.math.divide(tf.math.reduce_sum(weighted), + tf.math.reduce_sum(weights)) + cross_entropy = tf.nn.sigmoid_cross_entropy_with_logits( + logits=logits, + labels=y) + val = reduce_weighted_mean(cross_entropy, sample_weights) + if self.lambda_v > 0: + val += tf.keras.regularizers.L2(self.lambda_v)(self._v) + + if self.lambda_beta > 0: + val += tf.keras.regularizers.L2(self.lambda_beta)(self._beta) + + return val + + self._is_fitted = False + self._init_vars(x, y, classes) + + self._x = x.astype(np.float32) + self._y = y.astype(np.float32) + n_examples = self._x.shape[0] + + if sample_weight is not None: + self._sample_weight = sample_weight.astype(np.float32) + else: + self._sample_weight = np.ones(self._x.shape[0]).astype(np.float32) + + @tf.function(input_signature=[ + tf.TensorSpec(shape=(None, self.n_dims_), dtype=np.float32), + tf.TensorSpec(shape=(None,), dtype=np.float32), + tf.TensorSpec(shape=(None,), dtype=np.float32) + ]) + def train_step(x, y, sample_weights): + with tf.GradientTape() as tape: + logits = tf.squeeze(self(x, self._v, self._beta, self._beta0)) + obj_val = loss_fn(y, logits, sample_weights) + + # gradients = tape.gradient(obj_val, [self._v, self._beta, self._beta0]) + # + # self._solver.apply_gradients(zip(gradients, [self._v, self._beta, self._beta0])) + self._solver.minimize(obj_val, [self._v, self._beta, self._beta0], tape=tape) + return logits, obj_val#, gradients + + (self._logit_y_proba, + self._obj_val) = train_step(self._x, self._y, self._sample_weight) + # self._train_step = self.solver.apply_gradients( + # zip(self._gradients, [self._v, self._beta, self._beta0])) + # self._train_step = self.solver.minimize(self._obj_val, + # [self._v, self._beta, self._beta0], + # tape=self._tape) + self._is_fitted = True + + self._train_set = tf.data.Dataset.from_tensor_slices( + (self._x, self._y, self._sample_weight)) + + start_time = time.time() + for epoch in range(self.n_epochs): + train_set = (self._train_set + .shuffle(buffer_size=n_examples, + seed=self._random_state.randint(0, 10000000)) + .batch(self.batch_size) + .prefetch(2)) + for step, (_x, _y, _wt) in enumerate(train_set): + (self._logit_y_proba, + self._obj_val) = train_step(_x, _y, _wt) + # self._train_step = self.solver.apply_gradients( + # zip(self._gradients, [self._v, self._beta, self._beta0])) + # self._train_step = self.solver.minimize(self._obj_val, + # [self._v, self._beta, self._beta0], + # tape=self._tape) + + logger.debug("objective: %.4f, epoch: %d, step: %d", + float(self._obj_val), epoch, step) + logger.debug("objective: %.4f, epoch: %d, step: %d", + float(self._obj_val), epoch, step) + duration = time.time() - start_time + logger.debug("Training in batches took %.4f s", duration) + + return self + + def _predict_proba(self, x): + if not self._is_fitted: + raise NotFittedError('Must fit the new FM classifier first!') + + @tf.function(input_signature=[ + tf.TensorSpec(shape=(None, self.n_dims_), dtype=np.float32) + ]) + def _predict(x): + return tf.squeeze(tf.math.sigmoid( + self(x, self._v, self._beta, self._beta0))) + + self._x = x.astype(np.float32) + self.test_set = tf.data.Dataset.from_tensor_slices(self._x) + test_set = self.test_set.batch(self.batch_size).prefetch(2) + + probs = [] + start_time = time.time() + for batch in test_set: + probs.append(np.atleast_1d(_predict(batch))) + duration = time.time() - start_time + logger.debug("Predicting in batches took %.4f s", duration) + + probs = np.concatenate(probs, axis=0) + if probs.ndim == 1: + return np.column_stack([1. - probs, probs]) + else: + return probs + + +ncol = 10 +form = ' + '.join([f'x{str(i)}' for i in range(ncol)]) +interaction_iter = product([i for i in range(ncol)], [i for i in range(ncol)]) +form += ' - 1 + ' +form += ' + '.join( + [f'x{str(i)}:x{str(j)}' for (i, j) in interaction_iter if i < j]) + +np.random.seed(1) +nonsparse_x = np.random.binomial(1, .5, 20000).reshape((2000, ncol)) +dmat = dmatrix(form, + data=pd.DataFrame(nonsparse_x).rename(columns={ + i: f'x{str(i)}' for i in range(ncol)})) +betas = np.random.standard_normal(dmat.shape[1]) + +lin_fx_sd = 1 +interaction_fx_sd = 0.25 +betas[0:(ncol - 1)] /= betas[0:(ncol - 1)].std() / lin_fx_sd +betas[ncol:] /= betas[ncol:].std() / interaction_fx_sd +probs = scipy.special.expit(dmat @ betas) +binary_y = np.random.binomial(1, probs) +no_sample_weight = np.ones(nonsparse_x.shape[0]) +fm1 = TF2MockFMClassifier() +fm1._fit(nonsparse_x, binary_y) +preds = fm1._predict_proba(nonsparse_x) +np.mean(preds[:, 1]) +preds.argmax(axis=1) +binary_y.mean() +int_term = 0.5 * tf.math.reduce_sum( + tf.math.square(tf.stack([tf.linalg.matmul(fm1._x, fm1._v[i, :, :]) + for i in range(fm1.rank)], axis=-1)) - + tf.stack([tf.linalg.matmul(fm1._x * fm1._x, (fm1._v * fm1._v)[i, :, :]) + for i in range(fm1.rank)], axis=-1), axis=-1) +tf.sigmoid(int_term + fm1._beta0 + tf.linalg.matmul(fm1._x, fm1._beta)) + +fm2 = FMClassifier(solver=tf.train.AdamOptimizer, + solver_kwargs={'learning_rate': 0.01}, + random_state=2045) +fm2.fit(nonsparse_x, binary_y) +data = tf.data.Dataset.from_tensor_slices(nonsparse_x) +fm1(nonsparse_x.astype(np.float32), fm1._v, fm1._beta, fm1._beta0) +preds = pd.DataFrame(fm2.predict_proba(nonsparse_x)) +civis.io.dataframe_to_civis(preds, "redshift-general", "survey.old_muffnn_fm_preds", + api_key='LMO7hW61K5wHGBp_6dlNOcfUp5qc6YqstL-ZWkGE-Gg') +civis.io.dataframe_to_civis(pd.DataFrame(binary_y), "redshift-general", + "survey.fm_y_values", + api_key='LMO7hW61K5wHGBp_6dlNOcfUp5qc6YqstL-ZWkGE-Gg') +help(civis.io.dataframe_to_civis) +lm = LogisticRegression(C=0.1) +lm_preds = cross_val_predict(lm, nonsparse_x, binary_y, cv=10, method='predict_proba') +roc_auc_score(binary_y, lm_preds[:, 1]) diff --git a/requirements.txt b/requirements.txt index 33e67a5..80c504f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,4 @@ numpy~=1.14 scipy~=1.0 scikit-learn>=0.20.0,<0.23.0 -tensorflow>=1.15.4,<2 +tensorflow==2.5.* diff --git a/setup.py b/setup.py index b9113db..c9caae6 100644 --- a/setup.py +++ b/setup.py @@ -28,6 +28,6 @@ install_requires=['numpy', 'scipy', 'scikit-learn>=0.20.0,<0.23.0', - 'tensorflow>=1.15.4,<2'], + 'tensorflow==2.5.*'], classifiers=CLASSIFIERS ) From 79c6ac5eb7335899a3e0b10819f5be13a949fe6b Mon Sep 17 00:00:00 2001 From: Joshua Wasserman Date: Fri, 21 May 2021 15:43:35 -0500 Subject: [PATCH 2/2] MAINT correct file location --- .../tf2_fm_classifier.py} | 123 ++++++++++++------ 1 file changed, 82 insertions(+), 41 deletions(-) rename muffnn/{mlp/tf2_mlp_classifier.py => fm/tf2_fm_classifier.py} (75%) diff --git a/muffnn/mlp/tf2_mlp_classifier.py b/muffnn/fm/tf2_fm_classifier.py similarity index 75% rename from muffnn/mlp/tf2_mlp_classifier.py rename to muffnn/fm/tf2_fm_classifier.py index 8de92a3..da71ad4 100644 --- a/muffnn/mlp/tf2_mlp_classifier.py +++ b/muffnn/fm/tf2_fm_classifier.py @@ -22,7 +22,7 @@ logger.setLevel('DEBUG') -class TF2MockFMClassifier(FMClassifier): +class TF2MockFMClassifier(FMClassifier, tf.Module): def __init__(self, kwargs={'solver': tf.keras.optimizers.Adam, 'solver_kwargs': {'learning_rate': 0.1}, }): @@ -116,21 +116,15 @@ def train_step(x, y, sample_weights): # gradients = tape.gradient(obj_val, [self._v, self._beta, self._beta0]) # # self._solver.apply_gradients(zip(gradients, [self._v, self._beta, self._beta0])) - self._solver.minimize(obj_val, [self._v, self._beta, self._beta0], tape=tape) + self._solver.minimize(obj_val, self.trainable_variables, tape=tape) return logits, obj_val#, gradients (self._logit_y_proba, self._obj_val) = train_step(self._x, self._y, self._sample_weight) - # self._train_step = self.solver.apply_gradients( - # zip(self._gradients, [self._v, self._beta, self._beta0])) - # self._train_step = self.solver.minimize(self._obj_val, - # [self._v, self._beta, self._beta0], - # tape=self._tape) self._is_fitted = True self._train_set = tf.data.Dataset.from_tensor_slices( (self._x, self._y, self._sample_weight)) - start_time = time.time() for epoch in range(self.n_epochs): train_set = (self._train_set @@ -141,14 +135,9 @@ def train_step(x, y, sample_weights): for step, (_x, _y, _wt) in enumerate(train_set): (self._logit_y_proba, self._obj_val) = train_step(_x, _y, _wt) - # self._train_step = self.solver.apply_gradients( - # zip(self._gradients, [self._v, self._beta, self._beta0])) - # self._train_step = self.solver.minimize(self._obj_val, - # [self._v, self._beta, self._beta0], - # tape=self._tape) - logger.debug("objective: %.4f, epoch: %d, step: %d", float(self._obj_val), epoch, step) + logger.debug("objective: %.4f, epoch: %d, step: %d", float(self._obj_val), epoch, step) duration = time.time() - start_time @@ -184,6 +173,79 @@ def _predict(x): else: return probs + @property + def _is_fitted(self): + """Return True if the model has been at least partially fitted. + + Returns + ------- + bool + + Notes + ----- + This is to indicate whether, e.g., the TensorFlow graph for the model + has been created. + """ + return getattr(self, '_fitted', False) + + @_is_fitted.setter + def _is_fitted(self, b): + """Set whether the model has been at least partially fitted. + + Parameters + ---------- + b : bool + True if the model has been fitted. + """ + self._fitted = b + + def __getstate__(self): + # Override __getstate__ so that TF model parameters are pickled + # properly. + state = {} + state.update(dict( + rank=self.rank, + batch_size=self.batch_size, + n_epochs=self.n_epochs, + random_state=self.random_state, + lambda_v=self.lambda_v, + lambda_beta=self.lambda_beta, + solver=self.solver, + init_scale=self.init_scale, + solver_kwargs=self.solver_kwargs, + n_dims_=self.n_dims_, + is_sparse_=self.is_sparse_, + _fitted=self._fitted, + )) + + if self._fitted: + weights = {} + for var in self.trainable_variables: + name = '_' + var.name.split(':')[0] + weights.update({name: tf.io.serialize_tensor(var)}) + state.update(dict( + variables=weights, + )) + + return state + + def __setstate__(self, state): + # Override __setstate__ so that TF model parameters are unpickled + # properly. + for k, v in state.items(): + if k != 'variables': + self.__dict__[k] = v + if self.__dict__['_fitted']: + for name, weight in state['variables'].items(): + replace_name = name.replace('_', '') + new_var = tf.io.parse_tensor(weight, out_type=np.float32) + self.__dict__[name] = tf.Variable( + new_var, + dtype=np.float32, + name=replace_name) + + return self + ncol = 10 form = ' + '.join([f'x{str(i)}' for i in range(ncol)]) @@ -208,30 +270,9 @@ def _predict(x): no_sample_weight = np.ones(nonsparse_x.shape[0]) fm1 = TF2MockFMClassifier() fm1._fit(nonsparse_x, binary_y) -preds = fm1._predict_proba(nonsparse_x) -np.mean(preds[:, 1]) -preds.argmax(axis=1) -binary_y.mean() -int_term = 0.5 * tf.math.reduce_sum( - tf.math.square(tf.stack([tf.linalg.matmul(fm1._x, fm1._v[i, :, :]) - for i in range(fm1.rank)], axis=-1)) - - tf.stack([tf.linalg.matmul(fm1._x * fm1._x, (fm1._v * fm1._v)[i, :, :]) - for i in range(fm1.rank)], axis=-1), axis=-1) -tf.sigmoid(int_term + fm1._beta0 + tf.linalg.matmul(fm1._x, fm1._beta)) - -fm2 = FMClassifier(solver=tf.train.AdamOptimizer, - solver_kwargs={'learning_rate': 0.01}, - random_state=2045) -fm2.fit(nonsparse_x, binary_y) -data = tf.data.Dataset.from_tensor_slices(nonsparse_x) -fm1(nonsparse_x.astype(np.float32), fm1._v, fm1._beta, fm1._beta0) -preds = pd.DataFrame(fm2.predict_proba(nonsparse_x)) -civis.io.dataframe_to_civis(preds, "redshift-general", "survey.old_muffnn_fm_preds", - api_key='LMO7hW61K5wHGBp_6dlNOcfUp5qc6YqstL-ZWkGE-Gg') -civis.io.dataframe_to_civis(pd.DataFrame(binary_y), "redshift-general", - "survey.fm_y_values", - api_key='LMO7hW61K5wHGBp_6dlNOcfUp5qc6YqstL-ZWkGE-Gg') -help(civis.io.dataframe_to_civis) -lm = LogisticRegression(C=0.1) -lm_preds = cross_val_predict(lm, nonsparse_x, binary_y, cv=10, method='predict_proba') -roc_auc_score(binary_y, lm_preds[:, 1]) +fm1_preds = fm1._predict_proba(nonsparse_x) + +pickled_fm = pickle.dumps(fm1) +loaded_fm = pickle.loads(pickled_fm) + +loaded_preds = loaded_fm._predict_proba(nonsparse_x)