Skip to content

Commit

Permalink
do not affect other workers
Browse files Browse the repository at this point in the history
  • Loading branch information
zaynt4606 committed Dec 17, 2024
1 parent f43b519 commit 2341d5a
Showing 1 changed file with 18 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,9 @@ trait MiniClusterFeature extends Logging {

var masterInfo: (Master, Thread) = _
val workerInfos = new mutable.HashMap[Worker, Thread]()
val indToWorkerInfos = new mutable.HashMap[Int, (Worker, Thread)]()
var workerConfForAdding: Map[String, String] = _
var testKillWorker: Boolean = false
var testKillWorker: Int = -1

val maxRetries = 4
val masterWaitingTimeoutMs = TimeUnit.SECONDS.toMillis(60)
Expand Down Expand Up @@ -251,6 +252,7 @@ trait MiniClusterFeature extends Logging {
throw new IllegalStateException(s"worker $i hasn't been initialized")
} else if (!workerInfos.contains(workers(i))) {
workerInfos.put(workers(i), threads(i))
indToWorkerInfos.put(i, (workers(i), threads(i)))
}
if (!workers(i).registered.get()) {
throw new IllegalStateException(s"worker $i hasn't been registered")
Expand Down Expand Up @@ -279,14 +281,23 @@ trait MiniClusterFeature extends Logging {
(setUpMaster(masterConf), setUpWorkers(workerConf, workerNum))
}

def workerKiller(sleepTime: Int): Unit = {
testKillWorker = true
def workerKiller(sleepTime: Int, workerNum: Int = 3): Unit = {
var ind = 0
var workerInfo = indToWorkerInfos.get(ind)
while (workerInfo.isEmpty && (ind + 1 < workerNum)) {
ind += 1
workerInfo = indToWorkerInfos.get(ind)
}
if (workerInfo.nonEmpty && ind < workerNum) {
testKillWorker = ind
}
val killerThread = new RunnerWrap({
Thread.sleep(sleepTime)
val workerInfo = workerInfos.toList(0)
workerInfo._1.stop(CelebornExitKind.EXIT_IMMEDIATELY)
workerInfo._1.rpcEnv.shutdown()
workerInfo._2.interrupt()
if (testKillWorker != -1) {
workerInfo.get._1.stop(CelebornExitKind.EXIT_IMMEDIATELY)
workerInfo.get._1.rpcEnv.shutdown()
workerInfo.get._2.interrupt()
}
})
killerThread.start()
}
Expand Down

0 comments on commit 2341d5a

Please sign in to comment.