From af0a5435e9ae420c8dd01c0c58612baf053cba16 Mon Sep 17 00:00:00 2001 From: zhengtao Date: Tue, 17 Dec 2024 11:08:29 +0800 Subject: [PATCH] shutdown revert --- .../service/deploy/MiniClusterFeature.scala | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/worker/src/test/scala/org/apache/celeborn/service/deploy/MiniClusterFeature.scala b/worker/src/test/scala/org/apache/celeborn/service/deploy/MiniClusterFeature.scala index b325c57af09..0410b7e59a4 100644 --- a/worker/src/test/scala/org/apache/celeborn/service/deploy/MiniClusterFeature.scala +++ b/worker/src/test/scala/org/apache/celeborn/service/deploy/MiniClusterFeature.scala @@ -39,7 +39,7 @@ trait MiniClusterFeature extends Logging { val workerInfos = new mutable.HashMap[Worker, Thread]() val indToWorkerInfos = new mutable.HashMap[Int, (Worker, Thread)]() var workerConfForAdding: Map[String, String] = _ - var testKillWorker: Int = -1 + var testKillWorker: (Int, Thread) = (-1, null) class RunnerWrap[T](code: => T) extends Thread { @@ -200,7 +200,7 @@ trait MiniClusterFeature extends Logging { worker.initialize() } catch { case ex: Exception => - if (testKillWorker != (i - 1)) { + if (testKillWorker._1 != (i - 1)) { if (workers(i - 1) != null) { workers(i - 1).shutdownGracefully() } @@ -269,17 +269,20 @@ trait MiniClusterFeature extends Logging { workerInfo = indToWorkerInfos.get(ind) } if (workerInfo.nonEmpty && ind < workerNum) { - testKillWorker = ind + testKillWorker = (ind, null) } val killerThread = new RunnerWrap({ Thread.sleep(sleepTime) - if (testKillWorker != -1) { + if (testKillWorker._1 != -1) { workerInfo.get._1.stop(CelebornExitKind.EXIT_IMMEDIATELY) workerInfo.get._1.rpcEnv.shutdown() workerInfo.get._2.interrupt() } }) killerThread.start() + if (testKillWorker._1 != -1) { + testKillWorker = (ind, killerThread) + } } def shutdownMiniCluster(): Unit = { @@ -301,6 +304,10 @@ trait MiniClusterFeature extends Logging { worker.stop(CelebornExitKind.EXIT_IMMEDIATELY) thread.interrupt() } + if (testKillWorker._1 != -1) { + testKillWorker._2.interrupt() + testKillWorker = (-1, null) + } workerInfos.clear() masterInfo._2.interrupt() MemoryManager.reset()