Skip to content

Commit

Permalink
delete duplicated code
Browse files Browse the repository at this point in the history
  • Loading branch information
zaynt4606 committed Dec 17, 2024
1 parent ae259da commit f43b519
Showing 1 changed file with 0 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -105,27 +105,4 @@ class RetryReviveTest extends AnyFunSuite
assert(result1 == result)
ss.stop()
}

test("celeborn spark integration test - revive test replicate enabled when workers are randomly killed in write time") {
setupMiniClusterWithRandomPorts()
ShuffleClient.reset()
val sparkConf = new SparkConf()
.set(s"spark.${CelebornConf.CLIENT_PUSH_REPLICATE_ENABLED.key}", "true")
.set(s"spark.${CelebornConf.CLIENT_STAGE_RERUN_ENABLED.key}", "false")
.setAppName("celeborn-demo").setMaster("local[2]")
val ss = SparkSession.builder()
.config(updateSparkConf(sparkConf, ShuffleMode.HASH))
.getOrCreate()

val startTime = System.currentTimeMillis()
val result = ss.sparkContext.parallelize(1 to 1000, 100)
.flatMap(_ => (1 to 15000).iterator.map(num => num)).repartition(100).count()
val taskTime = System.currentTimeMillis() - startTime
val writeTime = taskTime * 0.35
workerKiller(writeTime.toInt)
val result1 = ss.sparkContext.parallelize(1 to 1000, 100)
.flatMap(_ => (1 to 15000).iterator.map(num => num)).repartition(100).count()
assert(result1 == result)
ss.stop()
}
}

0 comments on commit f43b519

Please sign in to comment.