Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pysparkling fails to score unseen levels #5737

Open
hutch3232 opened this issue Jul 18, 2024 · 1 comment
Open

pysparkling fails to score unseen levels #5737

hutch3232 opened this issue Jul 18, 2024 · 1 comment

Comments

@hutch3232
Copy link
Contributor

  • Sparkling Water/PySparkling/RSparkling version
    • Spark - 3.5.0
    • pyspark - 3.5.0
    • h2o_pysparkling_3.5 - 3.46.0.4.post1
  • Hadoop Version & Distribution
    • 3.3.6
  • Execution mode YARN-client, YARN-cluster, standalone, local ..
    • Standalone
  • Are you using Windows/Linux/MAC?
    • Linux (Ubuntu 20.04)

Apologies it's in two languages.

First I built an extremely basic GBM in R and saved out the mojo to S3:

library(data.table)
library(h2o)

h2o.init()

mt <- as.h2o(mtcars)

mt$engine_size <- h2o.ifelse(mt$cyl == 4, "small",
                             h2o.ifelse(mt$cyl == 6, "medium", "large"))

mod <- h2o.gbm(
  x = "engine_size",
  y = "mpg",
  training_frame = mt,
  ntrees = 1,
  max_depth = 3,
  learn_rate = 0.25,
  distribution = "gaussian",
  seed = 123
)

test <- as.h2o(
  data.frame(engine_size = c("small", "medium", "large", "extra-large"))
)

test$predict <- h2o.predict(object = mod,
                            newdata = test)
# Warning message:
# In doTryCatch(return(expr, name, parentenv, handler) : 
#     Test/Validation dataset column 'engine_size' has levels not trained on: ["extra-large"]
test
#   engine_size  predict
# 1       small 21.73388
# 2      medium 19.22987
# 3       large 19.22987
# 4 extra-large 19.22987
#
# [4 rows x 2 columns]

h2o.save_mojo(object = mod, path = "s3://<my bucket>/<my prefix>")

Then I spin up a Spark cluster, read in my mojo with pysparkling and make a SDF with a level unseen during training. Attempting to score throws errors about UDFs as well as unseen categorical levels.

import os
import pyspark.sql.functions as F
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pysparkling.ml import H2OMOJOModel

conf = SparkConf()
conf.set('spark.hadoop.fs.s3a.access.key', "<my access key>")
conf.set('spark.hadoop.fs.s3a.secret.key', "<my secret key>")

conf.set("spark.sql.view.maxNestedViewDepth", "1000")
conf.set("spark.sql.legacy.storeAnalyzedPlanForView", "True")
conf.set("spark.dynamicAllocation.enabled", True)

h2o_jvm_args = " ".join(
    [
        "-Dsys.ai.h2o.persist.s3.enable.path.style=true",
        "-Dsys.ai.h2o.persist.s3.maxErrorRetry=10",
        "-Dsys.ai.h2o.persist.s3.socketTimeout=100000",
        "-Dsys.ai.h2o.persist.s3.connectionTimeout=20000",
        "-Dsys.ai.h2o.persist.s3.maxHttpConnections=50",
    ]
)
conf.set("spark.executor.extraJavaOptions", h2o_jvm_args)
conf.set("spark.driver.extraJavaOptions", h2o_jvm_args)
conf.set("spark.serializer", "org.apache.spark.serializer.JavaSerializer")

spark = SparkSession.builder.config(conf=conf).getOrCreate()

model = H2OMOJOModel.createFromMojo("s3a://<my bucket>/<my prefix>/GBM_model_R_1721319539433_5.zip")

import pandas as pd
engine_size = ["small", "medium", "large", "extra-large"]

# Convert R vector to pandas DataFrame
df_pandas = pd.DataFrame({"engine_size": engine_size})

# Convert pandas DataFrame to Spark DataFrame
df = spark.createDataFrame(df_pandas)

preds = model.transform(df)
preds.select("prediction").show()

Traceback from Spark:

Note especially:

Caused by: hex.genmodel.easy.exception.PredictUnknownCategoricalLevelException: Unknown categorical level (engine_size,extra-large)
Py4JJavaError: An error occurred while calling o229.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 3.0 failed 4 times, most recent failure: Lost task 0.3 in stage 3.0 (TID 6) (10.42.244.248 executor 1): org.apache.spark.SparkException: [FAILED_EXECUTE_UDF] Failed to execute user defined function (`H2OMOJOPredictionRegression$$Lambda$3188/0x0000000841557840`: (struct<engine_size:string>, double) => struct<value:double>).
        at org.apache.spark.sql.errors.QueryExecutionErrors$.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala:198)
        at org.apache.spark.sql.errors.QueryExecutionErrors.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)
        at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
        at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
        at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
        at org.apache.spark.scheduler.Task.run(Task.scala:141)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
        at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
        at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)
Caused by: hex.genmodel.easy.exception.PredictUnknownCategoricalLevelException: Unknown categorical level (engine_size,extra-large)
        at hex.genmodel.easy.RowToRawDataConverter.convertValue(RowToRawDataConverter.java:97)
        at hex.genmodel.easy.RowToRawDataConverter.convert(RowToRawDataConverter.java:55)
        at hex.genmodel.easy.EasyPredictModelWrapper.fillRawData(EasyPredictModelWrapper.java:1045)
        at hex.genmodel.easy.EasyPredictModelWrapper.predict(EasyPredictModelWrapper.java:1050)
        at hex.genmodel.easy.EasyPredictModelWrapper.preamble(EasyPredictModelWrapper.java:1041)
        at hex.genmodel.easy.EasyPredictModelWrapper.predictRegression(EasyPredictModelWrapper.java:884)
        at ai.h2o.sparkling.ml.models.H2OMOJOPredictionRegression.$anonfun$getRegressionPredictionUDF$1(H2OMOJOPredictionRegression.scala:44)
        at ai.h2o.sparkling.ml.models.H2OMOJOPredictionRegression.$anonfun$getRegressionPredictionUDF$1$adapted(H2OMOJOPredictionRegression.scala:42)
        ... 20 more

Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2844)
        at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2780)
        at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2779)
        at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
        at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2779)
        at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1242)
        at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1242)
        at scala.Option.foreach(Option.scala:407)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1242)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3048)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2982)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2971)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:984)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2398)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2419)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2438)
        at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:530)
        at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:483)
        at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:61)
        at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4344)
        at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:3326)
        at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4334)
        at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:546)
        at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4332)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
        at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4332)
        at org.apache.spark.sql.Dataset.head(Dataset.scala:3326)
        at org.apache.spark.sql.Dataset.take(Dataset.scala:3549)
        at org.apache.spark.sql.Dataset.getRows(Dataset.scala:280)
        at org.apache.spark.sql.Dataset.showString(Dataset.scala:315)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
        at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
        at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.spark.SparkException: [FAILED_EXECUTE_UDF] Failed to execute user defined function (`H2OMOJOPredictionRegression$$Lambda$3188/0x0000000841557840`: (struct<engine_size:string>, double) => struct<value:double>).
        at org.apache.spark.sql.errors.QueryExecutionErrors$.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala:198)
        at org.apache.spark.sql.errors.QueryExecutionErrors.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)
        at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
        at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
        at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
        at org.apache.spark.scheduler.Task.run(Task.scala:141)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
        at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
        at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)
Caused by: hex.genmodel.easy.exception.PredictUnknownCategoricalLevelException: Unknown categorical level (engine_size,extra-large)
        at hex.genmodel.easy.RowToRawDataConverter.convertValue(RowToRawDataConverter.java:97)
        at hex.genmodel.easy.RowToRawDataConverter.convert(RowToRawDataConverter.java:55)
        at hex.genmodel.easy.EasyPredictModelWrapper.fillRawData(EasyPredictModelWrapper.java:1045)
        at hex.genmodel.easy.EasyPredictModelWrapper.predict(EasyPredictModelWrapper.java:1050)
        at hex.genmodel.easy.EasyPredictModelWrapper.preamble(EasyPredictModelWrapper.java:1041)
        at hex.genmodel.easy.EasyPredictModelWrapper.predictRegression(EasyPredictModelWrapper.java:884)
        at ai.h2o.sparkling.ml.models.H2OMOJOPredictionRegression.$anonfun$getRegressionPredictionUDF$1(H2OMOJOPredictionRegression.scala:44)
        at ai.h2o.sparkling.ml.models.H2OMOJOPredictionRegression.$anonfun$getRegressionPredictionUDF$1$adapted(H2OMOJOPredictionRegression.scala:42)

I think that "extra-large" should be scored following the most populous path, per: https://docs.h2o.ai/h2o/latest-stable/h2o-docs/data-science/gbm-faq/missing_values.html

@hutch3232
Copy link
Contributor Author

There is a setting for this: convertUnknownCategoricalLevelsToNa per: https://docs.h2o.ai/sparkling-water/3.5/latest-stable/doc/deployment/load_mojo.html#customizing-the-mojo-settings

It would be nice if the error message guided users to this option. It took me awhile to find this was a possible solution.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant