Skip to content

Commit

Permalink
do not affect other workers
Browse files Browse the repository at this point in the history
  • Loading branch information
zaynt4606 committed Dec 17, 2024
1 parent 2e1ad06 commit feba418
Showing 1 changed file with 19 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@ trait MiniClusterFeature extends Logging {

var masterInfo: (Master, Thread) = _
val workerInfos = new mutable.HashMap[Worker, Thread]()
val indToWorkerInfos = new mutable.HashMap[Int, (Worker, Thread)]()
var workerConfForAdding: Map[String, String] = _
var testKillWorker: Boolean = false
var testKillWorker: Int = -1

class RunnerWrap[T](code: => T) extends Thread {

Expand Down Expand Up @@ -199,7 +200,7 @@ trait MiniClusterFeature extends Logging {
worker.initialize()
} catch {
case ex: Exception =>
if (!testKillWorker) {
if (testKillWorker != (i - 1)) {
if (workers(i - 1) != null) {
workers(i - 1).shutdownGracefully()
}
Expand Down Expand Up @@ -231,6 +232,7 @@ trait MiniClusterFeature extends Logging {
throw new IllegalStateException(s"worker $i hasn't been initialized")
} else if (!workerInfos.contains(workers(i))) {
workerInfos.put(workers(i), threads(i))
indToWorkerInfos.put(i, (workers(i), threads(i)))
}
if (!workers(i).registered.get()) {
throw new IllegalStateException(s"worker $i hasn't been registered")
Expand Down Expand Up @@ -259,14 +261,23 @@ trait MiniClusterFeature extends Logging {
(setUpMaster(masterConf), setUpWorkers(workerConf, workerNum))
}

def workerKiller(sleepTime: Int): Unit = {
testKillWorker = true
def workerKiller(sleepTime: Int, workerNum: Int = 3): Unit = {
var ind = 0
var workerInfo = indToWorkerInfos.get(ind)
while (workerInfo.isEmpty && (ind + 1 < workerNum)) {
ind += 1
workerInfo = indToWorkerInfos.get(ind)
}
if (workerInfo.nonEmpty && ind < workerNum) {
testKillWorker = ind
}
val killerThread = new RunnerWrap({
Thread.sleep(sleepTime)
val workerInfo = workerInfos.toList(0)
workerInfo._1.stop(CelebornExitKind.EXIT_IMMEDIATELY)
workerInfo._1.rpcEnv.shutdown()
workerInfo._2.interrupt()
if (testKillWorker != -1) {
workerInfo.get._1.stop(CelebornExitKind.EXIT_IMMEDIATELY)
workerInfo.get._1.rpcEnv.shutdown()
workerInfo.get._2.interrupt()
}
})
killerThread.start()
}
Expand Down

0 comments on commit feba418

Please sign in to comment.