From ad137c144b3bfcc4562ea1eb334cc023dfa175b5 Mon Sep 17 00:00:00 2001 From: zhengtao Date: Tue, 17 Dec 2024 11:08:29 +0800 Subject: [PATCH] shutdown revert --- .../service/deploy/MiniClusterFeature.scala | 33 ++++++++++++------- 1 file changed, 21 insertions(+), 12 deletions(-) diff --git a/worker/src/test/scala/org/apache/celeborn/service/deploy/MiniClusterFeature.scala b/worker/src/test/scala/org/apache/celeborn/service/deploy/MiniClusterFeature.scala index 718160287f9..f4815702054 100644 --- a/worker/src/test/scala/org/apache/celeborn/service/deploy/MiniClusterFeature.scala +++ b/worker/src/test/scala/org/apache/celeborn/service/deploy/MiniClusterFeature.scala @@ -38,7 +38,7 @@ trait MiniClusterFeature extends Logging { val workerInfos = new mutable.HashMap[Worker, Thread]() val indToWorkerInfos = new mutable.HashMap[Int, (Worker, Thread)]() var workerConfForAdding: Map[String, String] = _ - var testKillWorker: Int = -1 + var testKillWorker: (Int, Thread) = (-1, null) val maxRetries = 4 val masterWaitingTimeoutMs = TimeUnit.SECONDS.toMillis(60) @@ -224,15 +224,17 @@ trait MiniClusterFeature extends Logging { worker.initialize() } catch { case ex: Exception => - if (workers(i - 1) != null) { - workers(i - 1).shutdownGracefully() - } - workerStarted = false - workerStartRetry += 1 - logError(s"cannot start worker $i, retrying: ", ex) - if (workerStartRetry == maxRetries) { - logError(s"cannot start worker $i, reached to max retrying", ex) - throw ex + if (testKillWorker != (i - 1)) { + if (workers(i - 1) != null) { + workers(i - 1).shutdownGracefully() + } + workerStarted = false + workerStartRetry += 1 + logError(s"cannot start worker $i, retrying: ", ex) + if (workerStartRetry == maxRetries) { + logError(s"cannot start worker $i, reached to max retrying", ex) + throw ex + } } } } @@ -289,17 +291,20 @@ trait MiniClusterFeature extends Logging { workerInfo = indToWorkerInfos.get(ind) } if (workerInfo.nonEmpty && ind < workerNum) { - testKillWorker = ind + testKillWorker = (ind, null) } val killerThread = new RunnerWrap({ Thread.sleep(sleepTime) - if (testKillWorker != -1) { + if (testKillWorker._1 != -1) { workerInfo.get._1.stop(CelebornExitKind.EXIT_IMMEDIATELY) workerInfo.get._1.rpcEnv.shutdown() workerInfo.get._2.interrupt() } }) killerThread.start() + if (testKillWorker._1 != -1) { + testKillWorker = (ind, killerThread) + } } def shutdownMiniCluster(): Unit = { @@ -321,6 +326,10 @@ trait MiniClusterFeature extends Logging { worker.stop(CelebornExitKind.EXIT_IMMEDIATELY) thread.interrupt() } + if (testKillWorker._1 != -1) { + testKillWorker._2.interrupt() + testKillWorker = (-1, null) + } workerInfos.clear() masterInfo._2.interrupt() MemoryManager.reset()