From 15901e2018f75583c1a7640c1f1484338ac4cb38 Mon Sep 17 00:00:00 2001 From: Lin Liu Date: Thu, 16 Jan 2025 11:04:52 -0800 Subject: [PATCH 1/2] Fix a concurrency bug --- .../scala/org/apache/hudi/functional/TestCOWDataSource.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala index 03eda68ac6c6..068e809ae91e 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala @@ -690,7 +690,7 @@ class TestCOWDataSource extends HoodieSparkClientTestBase with ScalaAssertionSup class UpdateThread(dataGen: HoodieTestDataGenerator, spark: SparkSession, commonOpts: Map[String, String], basePath: String, instantTime: String, countDownLatch: CountDownLatch, numRetries: Integer = 0) extends Runnable { override def run() { - val updateRecs = recordsToStrings(dataGen.generateUniqueUpdates(instantTime, 500)).asScala.toList + val updateRecs = recordsToStrings(dataGen.generateUniqueUpdates(instantTime, 550)).asScala.toList val insertRecs = recordsToStrings(dataGen.generateInserts(instantTime, 1000)).asScala.toList val updateDf = spark.read.json(spark.sparkContext.parallelize(updateRecs, 2)) val insertDf = spark.read.json(spark.sparkContext.parallelize(insertRecs, 2)) From 4e1995209ae2d7e00919304f67121b134d204074 Mon Sep 17 00:00:00 2001 From: Lin Liu Date: Thu, 16 Jan 2025 19:35:53 -0800 Subject: [PATCH 2/2] Address comments --- .../hudi/functional/TestCOWDataSource.scala | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala index 068e809ae91e..966592cf0abb 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala @@ -669,8 +669,11 @@ class TestCOWDataSource extends HoodieSparkClientTestBase with ScalaAssertionSup assertEquals(1000, snapshotDF1.count()) val countDownLatch = new CountDownLatch(2) + val sharedUpdates = recordsToStrings(dataGen.generateUpdatesForAllRecords("300")).asScala.toList + for (x <- 1 to 2) { - val thread = new Thread(new UpdateThread(dataGen, spark, CommonOptionUtils.commonOpts, basePath, x + "00", countDownLatch, numRetries)) + val thread = new Thread(new UpdateThread( + dataGen, spark, CommonOptionUtils.commonOpts, basePath, x + "00", countDownLatch, numRetries, sharedUpdates)) thread.setName(x + "00_THREAD") thread.start() } @@ -687,12 +690,17 @@ class TestCOWDataSource extends HoodieSparkClientTestBase with ScalaAssertionSup } } - class UpdateThread(dataGen: HoodieTestDataGenerator, spark: SparkSession, commonOpts: Map[String, String], basePath: String, - instantTime: String, countDownLatch: CountDownLatch, numRetries: Integer = 0) extends Runnable { + class UpdateThread(dataGen: HoodieTestDataGenerator, + spark: SparkSession, + commonOpts: Map[String, String], + basePath: String, + instantTime: String, + countDownLatch: CountDownLatch, + numRetries: Integer = 0, + sharedUpdates: List[String]) extends Runnable { override def run() { - val updateRecs = recordsToStrings(dataGen.generateUniqueUpdates(instantTime, 550)).asScala.toList val insertRecs = recordsToStrings(dataGen.generateInserts(instantTime, 1000)).asScala.toList - val updateDf = spark.read.json(spark.sparkContext.parallelize(updateRecs, 2)) + val updateDf = spark.read.json(spark.sparkContext.parallelize(sharedUpdates, 2)) val insertDf = spark.read.json(spark.sparkContext.parallelize(insertRecs, 2)) try { updateDf.union(insertDf).write.format("org.apache.hudi")