From ecfc4e781c17061bf854be4a3b4df88e794c0c63 Mon Sep 17 00:00:00 2001 From: nitish Date: Tue, 10 Dec 2024 21:01:14 +0530 Subject: [PATCH 1/2] added check before setting checkpoint directory --- .../java/zingg/spark/core/executor/ZinggSparkContext.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/spark/core/src/main/java/zingg/spark/core/executor/ZinggSparkContext.java b/spark/core/src/main/java/zingg/spark/core/executor/ZinggSparkContext.java index bf28e5fb3..cee20244d 100644 --- a/spark/core/src/main/java/zingg/spark/core/executor/ZinggSparkContext.java +++ b/spark/core/src/main/java/zingg/spark/core/executor/ZinggSparkContext.java @@ -75,7 +75,9 @@ public void init(IZinggLicense license) JavaSparkContext.jarOfClass(IZingg.class); LOG.debug("Context " + ctx.toString()); //initHashFns(); - ctx.setCheckpointDir("/tmp/checkpoint"); + if (!ctx.getCheckpointDir().isPresent()) { + ctx.setCheckpointDir("/tmp/checkpoint"); + } setUtils(); } } From 7d879d854eeb8f6f4be02d2dc35f385a760f6d0b Mon Sep 17 00:00:00 2001 From: nitish Date: Tue, 10 Dec 2024 22:55:59 +0530 Subject: [PATCH 2/2] added check before setting checkpoint directory --- .../zingg/spark/core/executor/ZinggSparkContext.java | 9 +++++++-- .../zingg/spark/core/executor/ZinggSparkTester.java | 12 +++++++++--- 2 files changed, 16 insertions(+), 5 deletions(-) diff --git a/spark/core/src/main/java/zingg/spark/core/executor/ZinggSparkContext.java b/spark/core/src/main/java/zingg/spark/core/executor/ZinggSparkContext.java index cee20244d..ee1da56dc 100644 --- a/spark/core/src/main/java/zingg/spark/core/executor/ZinggSparkContext.java +++ b/spark/core/src/main/java/zingg/spark/core/executor/ZinggSparkContext.java @@ -2,6 +2,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.spark.SparkContext; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.Column; import org.apache.spark.sql.Dataset; @@ -71,12 +72,16 @@ public void init(IZinggLicense license) zSession = new ZSparkSession(spark, license); } if (ctx==null) { - ctx = JavaSparkContext.fromSparkContext(zSession.getSession().sparkContext()); + SparkContext sparkContext = zSession.getSession().sparkContext(); + if (sparkContext.getCheckpointDir().isEmpty()) { + sparkContext.setCheckpointDir("/tmp/checkpoint"); + } + ctx = JavaSparkContext.fromSparkContext(sparkContext); JavaSparkContext.jarOfClass(IZingg.class); LOG.debug("Context " + ctx.toString()); //initHashFns(); if (!ctx.getCheckpointDir().isPresent()) { - ctx.setCheckpointDir("/tmp/checkpoint"); + ctx.setCheckpointDir(sparkContext.getCheckpointDir().get()); } setUtils(); } diff --git a/spark/core/src/test/java/zingg/spark/core/executor/ZinggSparkTester.java b/spark/core/src/test/java/zingg/spark/core/executor/ZinggSparkTester.java index adf26d3ac..ccc3e8db2 100644 --- a/spark/core/src/test/java/zingg/spark/core/executor/ZinggSparkTester.java +++ b/spark/core/src/test/java/zingg/spark/core/executor/ZinggSparkTester.java @@ -6,6 +6,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.spark.SparkContext; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; @@ -49,15 +50,20 @@ public static void setup() { .master("local[*]") .appName("Zingg" + "Junit") .getOrCreate(); - ctx = new JavaSparkContext(spark.sparkContext()); + SparkContext sparkContext = spark.sparkContext(); + if (sparkContext.getCheckpointDir().isEmpty()) { + sparkContext.setCheckpointDir("/tmp/checkpoint"); + } + ctx = new JavaSparkContext(sparkContext); JavaSparkContext.jarOfClass(IZingg.class); args = new Arguments(); zsCTX = new ZinggSparkContext(); zsCTX.ctx = ctx; zSession = new ZSparkSession(spark, null); zsCTX.zSession = zSession; - - ctx.setCheckpointDir("/tmp/checkpoint"); + if (!ctx.getCheckpointDir().isPresent()) { + ctx.setCheckpointDir(sparkContext.getCheckpointDir().get()); + } zsCTX.setPipeUtil(new SparkPipeUtil(zSession)); zsCTX.setDSUtil(new SparkDSUtil(zSession)); zsCTX.setHashUtil(new SparkHashUtil(zSession));