Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HDDS-11304. Make up for the missing functionality in CommandDispatcher #7062

Merged
merged 12 commits into from
Aug 29, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -58,11 +57,11 @@ public class CloseContainerCommandHandler implements CommandHandler {

private final AtomicLong invocationCount = new AtomicLong(0);
private final AtomicInteger queuedCount = new AtomicInteger(0);
private final ExecutorService executor;
private final ThreadPoolExecutor executor;
private long totalTime;

/**
* Constructs a ContainerReport handler.
* Constructs a close container command handler.
*/
public CloseContainerCommandHandler(
int threadPoolSize, int queueSize, String threadNamePrefix) {
Expand Down Expand Up @@ -220,4 +219,14 @@ public long getTotalRunTime() {
public int getQueuedCount() {
return queuedCount.get();
}

@Override
public int getThreadPoolMaxPoolSize() {
return executor.getMaximumPoolSize();
}

@Override
public int getThreadPoolActivePoolSize() {
return executor.getActiveCount();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,6 @@ public final class CommandDispatcher {
private CommandDispatcher(OzoneContainer container, SCMConnectionManager
connectionManager, StateContext context,
CommandHandler... handlers) {
Preconditions.checkNotNull(context);
Preconditions.checkNotNull(handlers);
Preconditions.checkArgument(handlers.length > 0);
Preconditions.checkNotNull(container);
Preconditions.checkNotNull(connectionManager);
this.context = context;
this.container = container;
this.connectionManager = connectionManager;
Expand All @@ -77,6 +72,7 @@ private CommandDispatcher(OzoneContainer container, SCMConnectionManager
commandHandlerMetrics = CommandHandlerMetrics.create(handlerMap);
}

@VisibleForTesting
public CommandHandler getCloseContainerHandler() {
return handlerMap.get(Type.closeContainerCommand);
}
Expand Down Expand Up @@ -201,11 +197,12 @@ public Builder setContext(StateContext stateContext) {
* @return Command Dispatcher.
*/
public CommandDispatcher build() {
Preconditions.checkNotNull(this.connectionManager, "Missing connection" +
" manager.");
Preconditions.checkNotNull(this.container, "Missing container.");
Preconditions.checkNotNull(this.context, "Missing context.");
Preconditions.checkArgument(this.handlerList.size() > 0);
Preconditions.checkNotNull(this.connectionManager,
"Missing scm connection manager.");
Preconditions.checkNotNull(this.container, "Missing ozone container.");
Preconditions.checkNotNull(this.context, "Missing state context.");
Preconditions.checkArgument(this.handlerList.size() > 0,
"The number of command handlers must be greater than 0.");
return new CommandDispatcher(this.container, this.connectionManager,
this.context, handlerList.toArray(
new CommandHandler[handlerList.size()]));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,12 +168,12 @@ public int getQueuedCount() {

@Override
public int getThreadPoolMaxPoolSize() {
return ((ThreadPoolExecutor)executor).getMaximumPoolSize();
return executor.getMaximumPoolSize();
}

@Override
public int getThreadPoolActivePoolSize() {
return ((ThreadPoolExecutor)executor).getActiveCount();
return executor.getActiveCount();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import java.io.IOException;
import java.time.Clock;
import java.util.OptionalLong;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -53,7 +52,7 @@ public class DeleteContainerCommandHandler implements CommandHandler {
private final AtomicInteger invocationCount = new AtomicInteger(0);
private final AtomicInteger timeoutCount = new AtomicInteger(0);
private final AtomicLong totalTime = new AtomicLong(0);
private final ExecutorService executor;
private final ThreadPoolExecutor executor;
private final Clock clock;
private int maxQueueSize;

Expand All @@ -70,7 +69,7 @@ public DeleteContainerCommandHandler(
}

protected DeleteContainerCommandHandler(Clock clock,
ExecutorService executor, int queueSize) {
ThreadPoolExecutor executor, int queueSize) {
this.executor = executor;
this.clock = clock;
maxQueueSize = queueSize;
Expand Down Expand Up @@ -131,7 +130,7 @@ private void handleInternal(SCMCommand command, StateContext context,

@Override
public int getQueuedCount() {
return ((ThreadPoolExecutor)executor).getQueue().size();
return executor.getQueue().size();
}

@Override
Expand Down Expand Up @@ -160,6 +159,16 @@ public long getTotalRunTime() {
return totalTime.get();
}

@Override
public int getThreadPoolMaxPoolSize() {
return executor.getMaximumPoolSize();
}

@Override
public int getThreadPoolActivePoolSize() {
return executor.getActiveCount();
}

@Override
public void stop() {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
import org.apache.ozone.test.GenericTestUtils;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.util.UUID;
Expand All @@ -43,6 +44,8 @@
import static org.apache.hadoop.ozone.OzoneConsts.GB;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -292,4 +295,28 @@ private void waitTillFinishExecution(
GenericTestUtils.waitFor(()
-> closeHandler.getQueuedCount() <= 0, 10, 3000);
}

@Test
public void testThreadPoolPoolSize() {
assertEquals(1, subject.getThreadPoolMaxPoolSize());
assertEquals(0, subject.getThreadPoolActivePoolSize());

CloseContainerCommandHandler closeContainerCommandHandler =
new CloseContainerCommandHandler(10, 10, "");
closeContainerCommandHandler.handle(new CloseContainerCommand(
CONTAINER_ID + 1, PipelineID.randomId()),
ozoneContainer, context, null);
closeContainerCommandHandler.handle(new CloseContainerCommand(
CONTAINER_ID + 2, PipelineID.randomId()),
ozoneContainer, context, null);
closeContainerCommandHandler.handle(new CloseContainerCommand(
CONTAINER_ID + 3, PipelineID.randomId()),
ozoneContainer, context, null);
closeContainerCommandHandler.handle(new CloseContainerCommand(
CONTAINER_ID + 4, PipelineID.randomId()),
ozoneContainer, context, null);
assertEquals(10, closeContainerCommandHandler.getThreadPoolMaxPoolSize());
assertTrue(closeContainerCommandHandler.getThreadPoolActivePoolSize() > 0);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,14 @@

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.LinkedBlockingQueue;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
Expand All @@ -32,7 +40,6 @@
import java.time.ZoneId;
import java.util.OptionalLong;

import static com.google.common.util.concurrent.MoreExecutors.newDirectExecutorService;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -63,8 +70,14 @@ public void setup() {
}

@Test
public void testExpiredCommandsAreNotProcessed() throws IOException {
DeleteContainerCommandHandler handler = createSubject(clock, 1000);
public void testExpiredCommandsAreNotProcessed()
throws IOException, InterruptedException {
CountDownLatch latch1 = new CountDownLatch(1);
ThreadFactory threadFactory = new ThreadFactoryBuilder().build();
ThreadPoolWithLockExecutor executor = new ThreadPoolWithLockExecutor(
threadFactory, latch1);
DeleteContainerCommandHandler handler = new DeleteContainerCommandHandler(
clock, executor, 100);

DeleteContainerCommand command1 = new DeleteContainerCommand(1L);
command1.setDeadline(clock.millis() + 10000);
Expand All @@ -75,9 +88,14 @@ public void testExpiredCommandsAreNotProcessed() throws IOException {

clock.fastForward(15000);
handler.handle(command1, ozoneContainer, null, null);
latch1.await();
assertEquals(1, handler.getTimeoutCount());
CountDownLatch latch2 = new CountDownLatch(2);
executor.setLatch(latch2);
handler.handle(command2, ozoneContainer, null, null);
handler.handle(command3, ozoneContainer, null, null);
latch2.await();

assertEquals(1, handler.getTimeoutCount());
assertEquals(3, handler.getInvocationCount());
verify(controller, times(0))
Expand All @@ -89,18 +107,26 @@ public void testExpiredCommandsAreNotProcessed() throws IOException {
}

@Test
public void testCommandForCurrentTermIsExecuted() throws IOException {
public void testCommandForCurrentTermIsExecuted()
throws IOException, InterruptedException {
// GIVEN
DeleteContainerCommand command = new DeleteContainerCommand(1L);
command.setTerm(1);

when(context.getTermOfLeaderSCM())
.thenReturn(OptionalLong.of(command.getTerm()));

DeleteContainerCommandHandler subject = createSubject();
TestClock testClock = new TestClock(Instant.now(), ZoneId.systemDefault());
CountDownLatch latch = new CountDownLatch(1);
ThreadFactory threadFactory = new ThreadFactoryBuilder().build();
ThreadPoolWithLockExecutor executor = new ThreadPoolWithLockExecutor(
threadFactory, latch);
DeleteContainerCommandHandler subject = new DeleteContainerCommandHandler(
testClock, executor, 100);

// WHEN
subject.handle(command, ozoneContainer, context, null);
latch.await();

// THEN
verify(controller, times(1))
Expand Down Expand Up @@ -163,13 +189,32 @@ private static DeleteContainerCommandHandler createSubject() {

private static DeleteContainerCommandHandler createSubject(
TestClock clock, int queueSize) {
return new DeleteContainerCommandHandler(clock,
newDirectExecutorService(), queueSize);
ThreadFactory threadFactory = new ThreadFactoryBuilder().build();
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.
newFixedThreadPool(1, threadFactory);
return new DeleteContainerCommandHandler(clock, executor, queueSize);
}

private static DeleteContainerCommandHandler createSubjectWithPoolSize(
TestClock clock, int queueSize) {
return new DeleteContainerCommandHandler(1, clock, queueSize, "");
}

static class ThreadPoolWithLockExecutor extends ThreadPoolExecutor {
private CountDownLatch countDownLatch;
ThreadPoolWithLockExecutor(ThreadFactory threadFactory, CountDownLatch latch) {
super(1, 1, 0, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(), threadFactory);
this.countDownLatch = latch;
}

void setLatch(CountDownLatch latch) {
this.countDownLatch = latch;
}

@Override
protected void afterExecute(Runnable r, Throwable t) {
countDownLatch.countDown();
}
}
}