diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java index 8533f7384d4..bc703ac6a55 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java @@ -18,7 +18,6 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -58,11 +57,11 @@ public class CloseContainerCommandHandler implements CommandHandler { private final AtomicLong invocationCount = new AtomicLong(0); private final AtomicInteger queuedCount = new AtomicInteger(0); - private final ExecutorService executor; + private final ThreadPoolExecutor executor; private long totalTime; /** - * Constructs a ContainerReport handler. + * Constructs a close container command handler. */ public CloseContainerCommandHandler( int threadPoolSize, int queueSize, String threadNamePrefix) { @@ -220,4 +219,14 @@ public long getTotalRunTime() { public int getQueuedCount() { return queuedCount.get(); } + + @Override + public int getThreadPoolMaxPoolSize() { + return executor.getMaximumPoolSize(); + } + + @Override + public int getThreadPoolActivePoolSize() { + return executor.getActiveCount(); + } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java index 9035b79c670..c3f8da74c7a 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java @@ -56,11 +56,6 @@ public final class CommandDispatcher { private CommandDispatcher(OzoneContainer container, SCMConnectionManager connectionManager, StateContext context, CommandHandler... handlers) { - Preconditions.checkNotNull(context); - Preconditions.checkNotNull(handlers); - Preconditions.checkArgument(handlers.length > 0); - Preconditions.checkNotNull(container); - Preconditions.checkNotNull(connectionManager); this.context = context; this.container = container; this.connectionManager = connectionManager; @@ -77,6 +72,7 @@ private CommandDispatcher(OzoneContainer container, SCMConnectionManager commandHandlerMetrics = CommandHandlerMetrics.create(handlerMap); } + @VisibleForTesting public CommandHandler getCloseContainerHandler() { return handlerMap.get(Type.closeContainerCommand); } @@ -201,11 +197,12 @@ public Builder setContext(StateContext stateContext) { * @return Command Dispatcher. */ public CommandDispatcher build() { - Preconditions.checkNotNull(this.connectionManager, "Missing connection" + - " manager."); - Preconditions.checkNotNull(this.container, "Missing container."); - Preconditions.checkNotNull(this.context, "Missing context."); - Preconditions.checkArgument(this.handlerList.size() > 0); + Preconditions.checkNotNull(this.connectionManager, + "Missing scm connection manager."); + Preconditions.checkNotNull(this.container, "Missing ozone container."); + Preconditions.checkNotNull(this.context, "Missing state context."); + Preconditions.checkArgument(this.handlerList.size() > 0, + "The number of command handlers must be greater than 0."); return new CommandDispatcher(this.container, this.connectionManager, this.context, handlerList.toArray( new CommandHandler[handlerList.size()])); diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java index 747749066e3..bd7431c6145 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java @@ -168,12 +168,12 @@ public int getQueuedCount() { @Override public int getThreadPoolMaxPoolSize() { - return ((ThreadPoolExecutor)executor).getMaximumPoolSize(); + return executor.getMaximumPoolSize(); } @Override public int getThreadPoolActivePoolSize() { - return ((ThreadPoolExecutor)executor).getActiveCount(); + return executor.getActiveCount(); } /** diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteContainerCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteContainerCommandHandler.java index ead81c32e5b..b76e306e1c0 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteContainerCommandHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteContainerCommandHandler.java @@ -36,7 +36,6 @@ import java.io.IOException; import java.time.Clock; import java.util.OptionalLong; -import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -53,7 +52,7 @@ public class DeleteContainerCommandHandler implements CommandHandler { private final AtomicInteger invocationCount = new AtomicInteger(0); private final AtomicInteger timeoutCount = new AtomicInteger(0); private final AtomicLong totalTime = new AtomicLong(0); - private final ExecutorService executor; + private final ThreadPoolExecutor executor; private final Clock clock; private int maxQueueSize; @@ -70,7 +69,7 @@ public DeleteContainerCommandHandler( } protected DeleteContainerCommandHandler(Clock clock, - ExecutorService executor, int queueSize) { + ThreadPoolExecutor executor, int queueSize) { this.executor = executor; this.clock = clock; maxQueueSize = queueSize; @@ -131,7 +130,7 @@ private void handleInternal(SCMCommand command, StateContext context, @Override public int getQueuedCount() { - return ((ThreadPoolExecutor)executor).getQueue().size(); + return executor.getQueue().size(); } @Override @@ -160,6 +159,16 @@ public long getTotalRunTime() { return totalTime.get(); } + @Override + public int getThreadPoolMaxPoolSize() { + return executor.getMaximumPoolSize(); + } + + @Override + public int getThreadPoolActivePoolSize() { + return executor.getActiveCount(); + } + @Override public void stop() { try { diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerCommandHandler.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerCommandHandler.java index 219645c8edc..a3b60aa36da 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerCommandHandler.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerCommandHandler.java @@ -35,6 +35,7 @@ import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand; import org.apache.ozone.test.GenericTestUtils; +import org.junit.jupiter.api.Test; import java.io.IOException; import java.util.UUID; @@ -43,6 +44,8 @@ import static org.apache.hadoop.ozone.OzoneConsts.GB; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.Mockito.any; import static org.mockito.Mockito.eq; import static org.mockito.Mockito.mock; @@ -292,4 +295,28 @@ private void waitTillFinishExecution( GenericTestUtils.waitFor(() -> closeHandler.getQueuedCount() <= 0, 10, 3000); } + + @Test + public void testThreadPoolPoolSize() { + assertEquals(1, subject.getThreadPoolMaxPoolSize()); + assertEquals(0, subject.getThreadPoolActivePoolSize()); + + CloseContainerCommandHandler closeContainerCommandHandler = + new CloseContainerCommandHandler(10, 10, ""); + closeContainerCommandHandler.handle(new CloseContainerCommand( + CONTAINER_ID + 1, PipelineID.randomId()), + ozoneContainer, context, null); + closeContainerCommandHandler.handle(new CloseContainerCommand( + CONTAINER_ID + 2, PipelineID.randomId()), + ozoneContainer, context, null); + closeContainerCommandHandler.handle(new CloseContainerCommand( + CONTAINER_ID + 3, PipelineID.randomId()), + ozoneContainer, context, null); + closeContainerCommandHandler.handle(new CloseContainerCommand( + CONTAINER_ID + 4, PipelineID.randomId()), + ozoneContainer, context, null); + assertEquals(10, closeContainerCommandHandler.getThreadPoolMaxPoolSize()); + assertTrue(closeContainerCommandHandler.getThreadPoolActivePoolSize() > 0); + } + } diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestDeleteContainerCommandHandler.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestDeleteContainerCommandHandler.java index 49c34828fbd..5ee31b97fd6 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestDeleteContainerCommandHandler.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestDeleteContainerCommandHandler.java @@ -19,6 +19,14 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.LinkedBlockingQueue; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.ozone.container.common.statemachine.StateContext; import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController; import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; @@ -32,7 +40,6 @@ import java.time.ZoneId; import java.util.OptionalLong; -import static com.google.common.util.concurrent.MoreExecutors.newDirectExecutorService; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; @@ -63,8 +70,14 @@ public void setup() { } @Test - public void testExpiredCommandsAreNotProcessed() throws IOException { - DeleteContainerCommandHandler handler = createSubject(clock, 1000); + public void testExpiredCommandsAreNotProcessed() + throws IOException, InterruptedException { + CountDownLatch latch1 = new CountDownLatch(1); + ThreadFactory threadFactory = new ThreadFactoryBuilder().build(); + ThreadPoolWithLockExecutor executor = new ThreadPoolWithLockExecutor( + threadFactory, latch1); + DeleteContainerCommandHandler handler = new DeleteContainerCommandHandler( + clock, executor, 100); DeleteContainerCommand command1 = new DeleteContainerCommand(1L); command1.setDeadline(clock.millis() + 10000); @@ -75,9 +88,14 @@ public void testExpiredCommandsAreNotProcessed() throws IOException { clock.fastForward(15000); handler.handle(command1, ozoneContainer, null, null); + latch1.await(); assertEquals(1, handler.getTimeoutCount()); + CountDownLatch latch2 = new CountDownLatch(2); + executor.setLatch(latch2); handler.handle(command2, ozoneContainer, null, null); handler.handle(command3, ozoneContainer, null, null); + latch2.await(); + assertEquals(1, handler.getTimeoutCount()); assertEquals(3, handler.getInvocationCount()); verify(controller, times(0)) @@ -89,7 +107,8 @@ public void testExpiredCommandsAreNotProcessed() throws IOException { } @Test - public void testCommandForCurrentTermIsExecuted() throws IOException { + public void testCommandForCurrentTermIsExecuted() + throws IOException, InterruptedException { // GIVEN DeleteContainerCommand command = new DeleteContainerCommand(1L); command.setTerm(1); @@ -97,10 +116,17 @@ public void testCommandForCurrentTermIsExecuted() throws IOException { when(context.getTermOfLeaderSCM()) .thenReturn(OptionalLong.of(command.getTerm())); - DeleteContainerCommandHandler subject = createSubject(); + TestClock testClock = new TestClock(Instant.now(), ZoneId.systemDefault()); + CountDownLatch latch = new CountDownLatch(1); + ThreadFactory threadFactory = new ThreadFactoryBuilder().build(); + ThreadPoolWithLockExecutor executor = new ThreadPoolWithLockExecutor( + threadFactory, latch); + DeleteContainerCommandHandler subject = new DeleteContainerCommandHandler( + testClock, executor, 100); // WHEN subject.handle(command, ozoneContainer, context, null); + latch.await(); // THEN verify(controller, times(1)) @@ -163,8 +189,10 @@ private static DeleteContainerCommandHandler createSubject() { private static DeleteContainerCommandHandler createSubject( TestClock clock, int queueSize) { - return new DeleteContainerCommandHandler(clock, - newDirectExecutorService(), queueSize); + ThreadFactory threadFactory = new ThreadFactoryBuilder().build(); + ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors. + newFixedThreadPool(1, threadFactory); + return new DeleteContainerCommandHandler(clock, executor, queueSize); } private static DeleteContainerCommandHandler createSubjectWithPoolSize( @@ -172,4 +200,21 @@ private static DeleteContainerCommandHandler createSubjectWithPoolSize( return new DeleteContainerCommandHandler(1, clock, queueSize, ""); } + static class ThreadPoolWithLockExecutor extends ThreadPoolExecutor { + private CountDownLatch countDownLatch; + ThreadPoolWithLockExecutor(ThreadFactory threadFactory, CountDownLatch latch) { + super(1, 1, 0, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue(), threadFactory); + this.countDownLatch = latch; + } + + void setLatch(CountDownLatch latch) { + this.countDownLatch = latch; + } + + @Override + protected void afterExecute(Runnable r, Throwable t) { + countDownLatch.countDown(); + } + } }