Skip to content

Commit

Permalink
[LogisticRegressionMG] Support sparse vectors (#5632)
Browse files Browse the repository at this point in the history
Authors:
  - Jinfeng Li (https://github.com/lijinf2)
  - Dante Gama Dessavre (https://github.com/dantegd)

Approvers:
  - Corey J. Nolet (https://github.com/cjnolet)

URL: #5632
  • Loading branch information
lijinf2 authored Nov 28, 2023
1 parent 21fbf04 commit 197d4f3
Show file tree
Hide file tree
Showing 7 changed files with 365 additions and 34 deletions.
31 changes: 31 additions & 0 deletions cpp/include/cuml/linear_model/qn_mg.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,37 @@ void qnFit(raft::handle_t& handle,
float* f,
int* num_iters);

/**
* @brief support sparse vectors (Compressed Sparse Row format) for MNMG logistic regression fit
* using quasi newton methods
* @param[in] handle: the internal cuml handle object
* @param[in] input_values: vector holding non-zero values of all partitions for that rank
* @param[in] input_cols: vector holding column indices of non-zero values of all partitions for
* that rank
* @param[in] input_row_ids: vector holding row pointers of non-zero values of all partitions for
* that rank
* @param[in] X_nnz: the number of non-zero values of that rank
* @param[in] input_desc: PartDescriptor object for the input
* @param[in] labels: labels data
* @param[out] coef: learned coefficients
* @param[in] pams: model parameters
* @param[in] n_classes: number of outputs (number of classes or `1` for regression)
* @param[out] f: host pointer holding the final objective value
* @param[out] num_iters: host pointer holding the actual number of iterations taken
*/
void qnFitSparse(raft::handle_t& handle,
std::vector<Matrix::Data<float>*>& input_values,
int* input_cols,
int* input_row_ids,
int X_nnz,
Matrix::PartDescriptor& input_desc,
std::vector<Matrix::Data<float>*>& labels,
float* coef,
const qn_params& pams,
int n_classes,
float* f,
int* num_iters);

}; // namespace opg
}; // namespace GLM
}; // namespace ML
21 changes: 14 additions & 7 deletions cpp/src/glm/qn/mg/glm_base_mg.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#include <raft/core/comms.hpp>
#include <raft/core/handle.hpp>
#include <raft/linalg/add.cuh>
#include <raft/linalg/multiply.cuh>
#include <raft/util/cudart_utils.hpp>

Expand Down Expand Up @@ -112,34 +113,42 @@ struct GLMWithDataMG : ML::GLM::detail::GLMWithData<T, GLMObjective> {
T* dev_scalar,
cudaStream_t stream)
{
raft::comms::comms_t const& communicator = raft::resource::get_comms(*(this->handle_p));
SimpleDenseMat<T> W(wFlat.data, this->C, this->dims);
SimpleDenseMat<T> G(gradFlat.data, this->C, this->dims);
SimpleVec<T> lossVal(dev_scalar, 1);

// Ensure the same coefficients on all GPU
communicator.bcast(wFlat.data, this->C * this->dims, 0, stream);
communicator.sync_stream(stream);

// apply regularization
auto regularizer_obj = this->objective;
auto lossFunc = regularizer_obj->loss;
auto reg = regularizer_obj->reg;
G.fill(0, stream);
float reg_host = 0;
T reg_host = 0;
if (reg->l2_penalty != 0) {
reg->reg_grad(dev_scalar, G, W, lossFunc->fit_intercept, stream);
raft::update_host(&reg_host, dev_scalar, 1, stream);
// note: avoid syncing here because there's a sync before reg_host is used.
raft::resource::sync_stream(*(this->handle_p));
}

// apply linearFwd, getLossAndDz, linearBwd
ML::GLM::detail::linearFwd(
lossFunc->handle, *(this->Z), *(this->X), W); // linear part: forward pass

raft::comms::comms_t const& communicator = raft::resource::get_comms(*(this->handle_p));

lossFunc->getLossAndDZ(dev_scalar, *(this->Z), *(this->y), stream); // loss specific part

// normalize local loss before allreduce sum
T factor = 1.0 * (*this->y).len / this->n_samples;
raft::linalg::multiplyScalar(dev_scalar, dev_scalar, factor, 1, stream);

// GPUs calculates reg_host independently and may get values that show tiny divergence.
// Take the averaged reg_host to avoid the divergence.
T reg_factor = reg_host / this->n_ranks;
raft::linalg::addScalar(dev_scalar, dev_scalar, reg_factor, 1, stream);

communicator.allreduce(dev_scalar, dev_scalar, 1, raft::comms::op_t::SUM, stream);
communicator.sync_stream(stream);

Expand All @@ -154,11 +163,9 @@ struct GLMWithDataMG : ML::GLM::detail::GLMWithData<T, GLMObjective> {
communicator.allreduce(G.data, G.data, this->C * this->dims, raft::comms::op_t::SUM, stream);
communicator.sync_stream(stream);

float loss_host;
T loss_host;
raft::update_host(&loss_host, dev_scalar, 1, stream);
raft::resource::sync_stream(*(this->handle_p));
loss_host += reg_host;
lossVal.fill(loss_host + reg_host, stream);

return loss_host;
}
Expand Down
73 changes: 73 additions & 0 deletions cpp/src/glm/qn_mg.cu
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
#include <vector>
using namespace MLCommon;

#include <iostream>

namespace ML {
namespace GLM {
namespace opg {
Expand Down Expand Up @@ -172,6 +174,77 @@ void qnFit(raft::handle_t& handle,
handle, input_data, input_desc, labels, coef, pams, X_col_major, n_classes, f, num_iters);
}

template <typename T, typename I>
void qnFitSparse_impl(const raft::handle_t& handle,
const qn_params& pams,
T* X_values,
I* X_cols,
I* X_row_ids,
I X_nnz,
T* y,
size_t N,
size_t D,
size_t C,
T* w0,
T* f,
int* num_iters,
size_t n_samples,
int rank,
int n_ranks)
{
auto X_simple = SimpleSparseMat<T>(X_values, X_cols, X_row_ids, X_nnz, N, D);

ML::GLM::opg::qn_fit_x_mg(handle,
pams,
X_simple,
y,
C,
w0,
f,
num_iters,
n_samples,
rank,
n_ranks); // ignore sample_weight, svr_eps
return;
}

void qnFitSparse(raft::handle_t& handle,
std::vector<Matrix::Data<float>*>& input_values,
int* input_cols,
int* input_row_ids,
int X_nnz,
Matrix::PartDescriptor& input_desc,
std::vector<Matrix::Data<float>*>& labels,
float* coef,
const qn_params& pams,
int n_classes,
float* f,
int* num_iters)
{
RAFT_EXPECTS(input_values.size() == 1,
"qn_mg.cu currently does not accept more than one input matrix");

auto data_input_values = input_values[0];
auto data_y = labels[0];

qnFitSparse_impl<float, int>(handle,
pams,
data_input_values->ptr,
input_cols,
input_row_ids,
X_nnz,
data_y->ptr,
input_desc.totalElementsOwnedBy(input_desc.rank),
input_desc.N,
n_classes,
coef,
f,
num_iters,
input_desc.M,
input_desc.rank,
input_desc.uniqueRanks().size());
}

}; // namespace opg
}; // namespace GLM
}; // namespace ML
18 changes: 17 additions & 1 deletion python/cuml/dask/linear_model/logistic_regression.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from raft_dask.common.comms import get_raft_comm_state
from dask.distributed import get_worker

from cuml.common.sparse_utils import is_sparse, has_scipy
from cuml.dask.common import parts_to_ranks
from cuml.dask.common.input_utils import DistributedDataHandler, concatenate
from raft_dask.common.comms import Comms
Expand All @@ -29,7 +30,9 @@
from cuml.internals.safe_imports import gpu_only_import

cp = gpu_only_import("cupy")
cupyx = gpu_only_import("cupyx")
np = cpu_only_import("numpy")
scipy = cpu_only_import("scipy")


class LogisticRegression(LinearRegression):
Expand Down Expand Up @@ -172,7 +175,20 @@ def _create_model(sessionId, datatype, **kwargs):

@staticmethod
def _func_fit(f, data, n_rows, n_cols, partsToSizes, rank):
inp_X = concatenate([X for X, _ in data])
if is_sparse(data[0][0]) is False:
inp_X = concatenate([X for X, _ in data])

elif has_scipy() and scipy.sparse.isspmatrix(data[0][0]):
inp_X = scipy.sparse.vstack([X for X, _ in data])

elif cupyx.scipy.sparse.isspmatrix(data[0][0]):
inp_X = cupyx.scipy.sparse.vstack([X for X, _ in data])

else:
raise ValueError(
"input matrix must be dense, scipy sparse, or cupy sparse"
)

inp_y = concatenate([y for _, y in data])
n_ranks = max([p[0] for p in partsToSizes]) + 1
aggregated_partsToSizes = [[i, 0] for i in range(n_ranks)]
Expand Down
55 changes: 45 additions & 10 deletions python/cuml/linear_model/base_mg.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ from cuml.common.opg_data_utils_mg cimport *
from cuml.internals.input_utils import input_to_cuml_array
from cuml.decomposition.utils cimport *

from cuml.common.sparse_utils import is_sparse
from cuml.internals.array_sparse import SparseCumlArray


class MGFitMixin(object):

Expand All @@ -45,8 +48,10 @@ class MGFitMixin(object):
:param partsToSizes: array of tuples in the format: [(rank,size)]
:return: self
"""

self._set_output_type(input_data[0][0])
self._set_n_features_in(n_cols)
sparse_input = is_sparse(input_data[0][0])

X_arys = []
y_arys = []
Expand All @@ -57,8 +62,14 @@ class MGFitMixin(object):
else:
check_dtype = self.dtype

X_m, _, self.n_cols, _ = \
input_to_cuml_array(input_data[i][0], check_dtype=check_dtype, order=order)
if sparse_input:

X_m = SparseCumlArray(input_data[i][0], convert_index=np.int32)
_, self.n_cols = X_m.shape
else:
X_m, _, self.n_cols, _ = \
input_to_cuml_array(input_data[i][0], check_dtype=check_dtype, order=order)

X_arys.append(X_m)

if i == 0:
Expand All @@ -81,18 +92,42 @@ class MGFitMixin(object):
rank_to_sizes,
rank)

cdef uintptr_t X_arg = opg.build_data_t(X_arys)
cdef uintptr_t X_arg
cdef uintptr_t y_arg = opg.build_data_t(y_arys)

# call inheriting class _fit that does all cython pointers and calls
self._fit(X=X_arg,
y=y_arg,
coef_ptr=coef_ptr_arg,
input_desc=part_desc)
cdef uintptr_t X_cols
cdef uintptr_t X_row_ids

if sparse_input is False:

X_arg = opg.build_data_t(X_arys)

# call inheriting class _fit that does all cython pointers and calls
self._fit(X=X_arg,
y=y_arg,
coef_ptr=coef_ptr_arg,
input_desc=part_desc)

opg.free_data_t(X_arg, self.dtype)

else:

assert len(X_arys) == 1, "does not support more than one sparse input matrix"
X_arg = opg.build_data_t([x.data for x in X_arys])
X_cols = X_arys[0].indices.ptr
X_row_ids = X_arys[0].indptr.ptr
X_nnz = sum([x.nnz for x in X_arys])

# call inheriting class _fit that does all cython pointers and calls
self._fit(X=[X_arg, X_cols, X_row_ids, X_nnz],
y=y_arg,
coef_ptr=coef_ptr_arg,
input_desc=part_desc)

for ary in X_arys:
del ary

opg.free_rank_size_pair(rank_to_sizes)
opg.free_part_descriptor(part_desc)
opg.free_data_t(X_arg, self.dtype)
opg.free_data_t(y_arg, self.dtype)

return self
Loading

0 comments on commit 197d4f3

Please sign in to comment.