Skip to content

Commit

Permalink
Add starting point traces for StateMachine
Browse files Browse the repository at this point in the history
Change-Id: I2cf55dcfa91fd2733880dc042a6dcec4c3dd6c81
(cherry picked from commit 4ca599671c843f0734974e03c0f972440e7df61c)
  • Loading branch information
jojochuang committed Oct 19, 2023
1 parent 27d8f3d commit 113d1e5
Showing 1 changed file with 29 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.hdds.tracing.TracingUtil;
import org.apache.hadoop.hdds.utils.Cache;
import org.apache.hadoop.hdds.utils.ResourceLimitCache;
import org.apache.hadoop.ozone.OzoneConfigKeys;
Expand Down Expand Up @@ -94,6 +95,8 @@
import org.apache.ratis.util.TaskQueue;
import org.apache.ratis.util.function.CheckedSupplier;
import org.apache.ratis.util.JavaUtils;

import io.opentracing.Span;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -348,9 +351,21 @@ public long takeSnapshot() throws IOException {
@Override
public TransactionContext startTransaction(RaftClientRequest request)
throws IOException {
long startTime = Time.monotonicNowNanos();
final ContainerCommandRequestProto proto =
message2ContainerCommandRequestProto(request.getMessage());
Span span = TracingUtil.importAndCreateSpan("startTransaction", proto.getTraceID());
try {
return startTransactionTraced(request, proto);
} finally {
span.finish();
}
}

private TransactionContext startTransactionTraced(RaftClientRequest request,
ContainerCommandRequestProto proto)
throws IOException {
long startTime = Time.monotonicNowNanos();

Preconditions.checkArgument(request.getRaftGroupId().equals(gid));
try {
dispatcher.validateContainerCommand(proto);
Expand Down Expand Up @@ -491,6 +506,7 @@ private CompletableFuture<Message> handleWriteChunk(
// thread.
CompletableFuture<ContainerCommandResponseProto> writeChunkFuture =
CompletableFuture.supplyAsync(() -> {
Span span = TracingUtil.importAndCreateSpan("writeStateMachineData", requestProto.getTraceID());
try {
return runCommand(requestProto, context);
} catch (Exception e) {
Expand All @@ -503,6 +519,8 @@ private CompletableFuture<Message> handleWriteChunk(
stateMachineHealthy.set(false);
raftFuture.completeExceptionally(e);
throw e;
} finally {
span.finish();
}
}, getChunkExecutor(requestProto.getWriteChunk()));

Expand Down Expand Up @@ -709,8 +727,13 @@ private ByteString readStateMachineData(
new DispatcherContext.Builder().setTerm(term).setLogIndex(index)
.setReadFromTmpFile(true).build();
// read the chunk
ContainerCommandResponseProto response =
dispatchCommand(dataContainerCommandProto, context);
Span span = TracingUtil.importAndCreateSpan("readStateMachineData", requestProto.getTraceID());
ContainerCommandResponseProto response;
try {
response = dispatchCommand(dataContainerCommandProto, context);
} finally {
span.finish();
}
if (response.getResult() != ContainerProtos.Result.SUCCESS) {
StorageContainerException sce =
new StorageContainerException(response.getMessage(),
Expand Down Expand Up @@ -853,11 +876,14 @@ private CompletableFuture<ContainerCommandResponseProto> submitTask(
final long containerId = request.getContainerID();
final CheckedSupplier<ContainerCommandResponseProto, Exception> task
= () -> {
Span span = TracingUtil.importAndCreateSpan("applyTransaction", request.getTraceID());
try {
return runCommand(request, context.build());
} catch (Exception e) {
exceptionHandler.accept(e);
throw e;
} finally {
span.finish();
}
};
return containerTaskQueues.submit(containerId, task, executor);
Expand Down

0 comments on commit 113d1e5

Please sign in to comment.