Skip to content

Commit

Permalink
Merge pull request #535 from NVIDIA/branch-23.12
Browse files Browse the repository at this point in the history
[auto-merge] branch-23.12 to branch-24.02 [skip ci] [bot]
  • Loading branch information
nvauto authored Dec 28, 2023
2 parents 37d4389 + 215e623 commit c3f095c
Show file tree
Hide file tree
Showing 3 changed files with 220 additions and 1 deletion.
2 changes: 1 addition & 1 deletion ci/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -38,5 +38,5 @@ RUN wget --quiet https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86

# install cuML
ARG CUML_VER=23.12
RUN conda install -y -c rapidsai-nightly -c conda-forge -c nvidia cuml=$CUML_VER python=3.9 cuda-version=11.8 \
RUN conda install -y -c rapidsai -c conda-forge -c nvidia cuml=$CUML_VER python=3.9 cuda-version=11.8 \
&& conda clean --all -f -y
38 changes: 38 additions & 0 deletions python/src/spark_rapids_ml/classification.py
Original file line number Diff line number Diff line change
Expand Up @@ -976,6 +976,33 @@ def _single_fit(init_parameters: Dict[str, Any]) -> Dict[str, Any]:
"dtype": logistic_regression.dtype.name,
"num_iters": logistic_regression.solver_model.num_iters,
}

# check if invalid label exists
for class_val in model["classes_"]:
if class_val < 0:
raise RuntimeError(
f"Labels MUST be in [0, 2147483647), but got {class_val}"
)
elif not class_val.is_integer():
raise RuntimeError(
f"Labels MUST be Integers, but got {class_val}"
)

if len(logistic_regression.classes_) == 1:
class_val = logistic_regression.classes_[0]
# TODO: match Spark to use max(class_list) to calculate the number of classes
# Cuml currently uses unique(class_list)
if class_val != 1.0 and class_val != 0.0:
raise RuntimeError(
"class value must be either 1. or 0. when dataset has one label"
)

if init_parameters["fit_intercept"] is True:
model["coef_"] = [[0.0] * logistic_regression.n_cols]
model["intercept_"] = [
float("inf") if class_val == 1.0 else float("-inf")
]

del logistic_regression
return model

Expand Down Expand Up @@ -1027,6 +1054,17 @@ def _out_schema(self) -> Union[StructType, str]:
)

def _create_pyspark_model(self, result: Row) -> "LogisticRegressionModel":
logger = get_logger(self.__class__)
if len(result["classes_"]) == 1:
if self.getFitIntercept() is False:
logger.warning(
"All labels belong to a single class and fitIntercept=false. It's a dangerous ground, so the algorithm may not converge."
)
else:
logger.warning(
"All labels are the same value and fitIntercept=true, so the coefficients will be zeros. Training is not needed."
)

return LogisticRegressionModel._from_row(result)

def _set_cuml_reg_params(self) -> "LogisticRegression":
Expand Down
181 changes: 181 additions & 0 deletions python/tests/test_logistic_regression.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import pytest
from _pytest.logging import LogCaptureFixture
from packaging import version
from py4j.protocol import Py4JJavaError

if version.parse(pyspark.__version__) < version.parse("3.4.0"):
from pyspark.sql.utils import IllegalArgumentException # type: ignore
Expand Down Expand Up @@ -1076,3 +1077,183 @@ def test_parameters_validation() -> None:
# charge of validating it.
with pytest.raises(ValueError, match="C or regParam given invalid value -1.0"):
LogisticRegression().setRegParam(-1.0).fit(df)


@pytest.mark.compat
@pytest.mark.parametrize("fit_intercept", [True, False])
@pytest.mark.parametrize("label", [1.0, 0.0, -3.0, 4.0])
@pytest.mark.parametrize(
"lr_types",
[
(SparkLogisticRegression, SparkLogisticRegressionModel),
(LogisticRegression, LogisticRegressionModel),
],
)
def test_compat_one_label(
fit_intercept: bool,
label: float,
lr_types: Tuple[LogisticRegressionType, LogisticRegressionModelType],
caplog: LogCaptureFixture,
) -> None:
assert label % 1 == 0.0, "label value must be an integer"

tolerance = 0.001
_LogisticRegression, _LogisticRegressionModel = lr_types

X = np.array(
[
[1.0, 2.0],
[1.0, 3.0],
[2.0, 1.0],
[3.0, 1.0],
]
)
y = np.array([label] * 4)

num_rows = len(X)

feature_cols = ["c0", "c1"]
schema = ["c0 float, c1 float, label float"]

with CleanSparkSession() as spark:
np_array = np.concatenate((X, y.reshape(num_rows, 1)), axis=1)

bdf = spark.createDataFrame(
np_array.tolist(),
",".join(schema),
)

bdf = bdf.withColumn("features", array_to_vector(array(*feature_cols))).drop(
*feature_cols
)

blor = _LogisticRegression(
regParam=0.1, fitIntercept=fit_intercept, standardization=False
)

if label < 0:
msg = f"Labels MUST be in [0, 2147483647), but got {label}"

try:
blor_model = blor.fit(bdf)
assert False, "There should be a java exception"
except Py4JJavaError as e:
assert msg in e.java_exception.getMessage()

return

if label > 1: # Spark and Cuml do not match
if _LogisticRegression is SparkLogisticRegression:
blor_model = blor.fit(bdf)
assert blor_model.numClasses == label + 1
else:
msg = "class value must be either 1. or 0. when dataset has one label"
try:
blor_model = blor.fit(bdf)
except Py4JJavaError as e:
assert msg in e.java_exception.getMessage()

return

assert label == 1.0 or label == 0.0

blor_model = blor.fit(bdf)

if fit_intercept is False:
if _LogisticRegression is SparkLogisticRegression:
# Got empty caplog.text. Spark prints warning message from jvm
assert caplog.text == ""
else:
assert (
"All labels belong to a single class and fitIntercept=false. It's a dangerous ground, so the algorithm may not converge."
in caplog.text
)

if label == 1.0:
assert array_equal(
blor_model.coefficients.toArray(),
[0.85431526, 0.85431526],
tolerance,
)
else:
assert array_equal(
blor_model.coefficients.toArray(),
[-0.85431526, -0.85431526],
tolerance,
)
assert blor_model.intercept == 0.0
else:
if _LogisticRegression is SparkLogisticRegression:
# Got empty caplog.text. Spark prints warning message from jvm
assert caplog.text == ""
else:
assert (
"All labels are the same value and fitIntercept=true, so the coefficients will be zeros. Training is not needed."
in caplog.text
)

assert array_equal(blor_model.coefficients.toArray(), [0, 0], 0.0)
assert blor_model.intercept == (
float("inf") if label == 1.0 else float("-inf")
)


@pytest.mark.compat
@pytest.mark.parametrize(
"lr_types",
[
(SparkLogisticRegression, SparkLogisticRegressionModel),
(LogisticRegression, LogisticRegressionModel),
],
)
def test_compat_wrong_label(
lr_types: Tuple[LogisticRegressionType, LogisticRegressionModelType],
caplog: LogCaptureFixture,
) -> None:
_LogisticRegression, _LogisticRegressionModel = lr_types

X = np.array(
[
[1.0, 2.0],
[1.0, 3.0],
[2.0, 1.0],
[3.0, 1.0],
]
)

num_rows = len(X)
feature_cols = ["c0", "c1"]
schema = ["c0 float, c1 float, label float"]

def test_functor(y: np.ndarray, err_msg: str) -> None:
with CleanSparkSession() as spark:
np_array = np.concatenate((X, y.reshape(num_rows, 1)), axis=1)

df = spark.createDataFrame(
np_array.tolist(),
",".join(schema),
)

df = df.withColumn("features", array_to_vector(array(*feature_cols))).drop(
*feature_cols
)

lr = _LogisticRegression(standardization=False)

try:
lr.fit(df)
assert False, "There should be a java exception"
except Py4JJavaError as e:
assert err_msg in e.java_exception.getMessage()

# negative label
wrong_label = -1.1
y = np.array([1.0, 0.0, wrong_label, 2.0])
msg = f"Labels MUST be in [0, 2147483647), but got {wrong_label}"
test_functor(y, msg)

# non-integer label
wrong_label = 0.4
y = np.array([1.0, 0.0, wrong_label, 2.0])
msg = f"Labels MUST be Integers, but got {wrong_label}"
test_functor(y, msg)

0 comments on commit c3f095c

Please sign in to comment.