Skip to content

Commit

Permalink
shutdown revert
Browse files Browse the repository at this point in the history
  • Loading branch information
zaynt4606 committed Dec 17, 2024
1 parent 2341d5a commit ad137c1
Showing 1 changed file with 21 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ trait MiniClusterFeature extends Logging {
val workerInfos = new mutable.HashMap[Worker, Thread]()
val indToWorkerInfos = new mutable.HashMap[Int, (Worker, Thread)]()
var workerConfForAdding: Map[String, String] = _
var testKillWorker: Int = -1
var testKillWorker: (Int, Thread) = (-1, null)

val maxRetries = 4
val masterWaitingTimeoutMs = TimeUnit.SECONDS.toMillis(60)
Expand Down Expand Up @@ -224,15 +224,17 @@ trait MiniClusterFeature extends Logging {
worker.initialize()
} catch {
case ex: Exception =>
if (workers(i - 1) != null) {
workers(i - 1).shutdownGracefully()
}
workerStarted = false
workerStartRetry += 1
logError(s"cannot start worker $i, retrying: ", ex)
if (workerStartRetry == maxRetries) {
logError(s"cannot start worker $i, reached to max retrying", ex)
throw ex
if (testKillWorker != (i - 1)) {
if (workers(i - 1) != null) {
workers(i - 1).shutdownGracefully()
}
workerStarted = false
workerStartRetry += 1
logError(s"cannot start worker $i, retrying: ", ex)
if (workerStartRetry == maxRetries) {
logError(s"cannot start worker $i, reached to max retrying", ex)
throw ex
}
}
}
}
Expand Down Expand Up @@ -289,17 +291,20 @@ trait MiniClusterFeature extends Logging {
workerInfo = indToWorkerInfos.get(ind)
}
if (workerInfo.nonEmpty && ind < workerNum) {
testKillWorker = ind
testKillWorker = (ind, null)
}
val killerThread = new RunnerWrap({
Thread.sleep(sleepTime)
if (testKillWorker != -1) {
if (testKillWorker._1 != -1) {
workerInfo.get._1.stop(CelebornExitKind.EXIT_IMMEDIATELY)
workerInfo.get._1.rpcEnv.shutdown()
workerInfo.get._2.interrupt()
}
})
killerThread.start()
if (testKillWorker._1 != -1) {
testKillWorker = (ind, killerThread)
}
}

def shutdownMiniCluster(): Unit = {
Expand All @@ -321,6 +326,10 @@ trait MiniClusterFeature extends Logging {
worker.stop(CelebornExitKind.EXIT_IMMEDIATELY)
thread.interrupt()
}
if (testKillWorker._1 != -1) {
testKillWorker._2.interrupt()
testKillWorker = (-1, null)
}
workerInfos.clear()
masterInfo._2.interrupt()
MemoryManager.reset()
Expand Down

0 comments on commit ad137c1

Please sign in to comment.