Skip to content

Commit

Permalink
[HUDI-7656] Fix a flaky test for concurrent writes (#12655)
Browse files Browse the repository at this point in the history
  • Loading branch information
linliu-code authored Jan 22, 2025
1 parent 283921c commit e59764f
Showing 1 changed file with 13 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -669,8 +669,11 @@ class TestCOWDataSource extends HoodieSparkClientTestBase with ScalaAssertionSup
assertEquals(1000, snapshotDF1.count())

val countDownLatch = new CountDownLatch(2)
val sharedUpdates = recordsToStrings(dataGen.generateUpdatesForAllRecords("300")).asScala.toList

for (x <- 1 to 2) {
val thread = new Thread(new UpdateThread(dataGen, spark, CommonOptionUtils.commonOpts, basePath, x + "00", countDownLatch, numRetries))
val thread = new Thread(new UpdateThread(
dataGen, spark, CommonOptionUtils.commonOpts, basePath, x + "00", countDownLatch, numRetries, sharedUpdates))
thread.setName(x + "00_THREAD")
thread.start()
}
Expand All @@ -687,12 +690,17 @@ class TestCOWDataSource extends HoodieSparkClientTestBase with ScalaAssertionSup
}
}

class UpdateThread(dataGen: HoodieTestDataGenerator, spark: SparkSession, commonOpts: Map[String, String], basePath: String,
instantTime: String, countDownLatch: CountDownLatch, numRetries: Integer = 0) extends Runnable {
class UpdateThread(dataGen: HoodieTestDataGenerator,
spark: SparkSession,
commonOpts: Map[String, String],
basePath: String,
instantTime: String,
countDownLatch: CountDownLatch,
numRetries: Integer = 0,
sharedUpdates: List[String]) extends Runnable {
override def run() {
val updateRecs = recordsToStrings(dataGen.generateUniqueUpdates(instantTime, 500)).asScala.toList
val insertRecs = recordsToStrings(dataGen.generateInserts(instantTime, 1000)).asScala.toList
val updateDf = spark.read.json(spark.sparkContext.parallelize(updateRecs, 2))
val updateDf = spark.read.json(spark.sparkContext.parallelize(sharedUpdates, 2))
val insertDf = spark.read.json(spark.sparkContext.parallelize(insertRecs, 2))
try {
updateDf.union(insertDf).write.format("org.apache.hudi")
Expand Down

0 comments on commit e59764f

Please sign in to comment.