Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HDDS-9384. Update OpenTelemetry traces in the write path #5468

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,13 @@

package org.apache.hadoop.hdds.scm;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;

import io.opentracing.Scope;
import io.opentracing.Span;
import io.opentracing.util.GlobalTracer;

import java.io.IOException;
import java.util.Collection;
import java.util.Comparator;
Expand Down Expand Up @@ -48,8 +55,6 @@
import org.apache.hadoop.hdds.security.SecurityConfig;
import org.apache.hadoop.hdds.tracing.TracingUtil;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.client.api.DataStreamApi;
import org.apache.ratis.grpc.GrpcTlsConfig;
Expand All @@ -65,6 +70,7 @@
import org.apache.ratis.rpc.SupportedRpcType;
import org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
import org.apache.ratis.util.JavaUtils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -256,27 +262,21 @@ private CompletableFuture<RaftClientReply> sendRequestAsync(
return CompletableFuture.completedFuture(response);
}
}
return TracingUtil.executeInNewSpan(
"XceiverClientRatis." + request.getCmdType().name(),
() -> {
final ContainerCommandRequestMessage message
= ContainerCommandRequestMessage.toMessage(
request, TracingUtil.exportCurrentSpan());
if (HddsUtils.isReadOnly(request)) {
if (LOG.isDebugEnabled()) {
LOG.debug("sendCommandAsync ReadOnly {}", message);
}
return getClient().async().sendReadOnly(message);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("sendCommandAsync {}", message);
}
return getClient().async().send(message);
}

String fullSpanID = TracingUtil.exportCurrentSpan();
final ContainerCommandRequestMessage message
= ContainerCommandRequestMessage.toMessage(
request, fullSpanID);
if (HddsUtils.isReadOnly(request)) {
if (LOG.isDebugEnabled()) {
LOG.debug("sendCommandAsync ReadOnly {}", message);
}

);
return getClient().async().sendReadOnly(message);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("sendCommandAsync {}", message);
}
return getClient().async().send(message);
}
}

// gets the minimum log index replicated to all servers
Expand Down Expand Up @@ -368,11 +368,19 @@ public XceiverClientReply sendCommandAsync(
ContainerCommandRequestProto request) {
XceiverClientReply asyncReply = new XceiverClientReply(null);
long requestTime = System.currentTimeMillis();

Span span = GlobalTracer.get()
.buildSpan("XceiverClientRatis.sendCommandAsync(" + request.getCmdType() +")").start();
Scope scope = GlobalTracer.get().activateSpan(span);

CompletableFuture<RaftClientReply> raftClientReply =
sendRequestAsync(request);
metrics.incrPendingContainerOpsMetrics(request.getCmdType());
CompletableFuture<ContainerCommandResponseProto> containerCommandResponse =
raftClientReply.whenComplete((reply, e) -> {
scope.close();
span.finish();

if (LOG.isDebugEnabled()) {
LOG.debug("received reply {} for request: cmdType={}, containerID={}, pipelineID={}, traceID={}", reply,
request.getCmdType(), request.getContainerID(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;

import io.opentracing.Scope;
import io.opentracing.Span;
import io.opentracing.util.GlobalTracer;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
Expand All @@ -47,6 +50,7 @@
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.tracing.TracingUtil;
import org.apache.hadoop.ozone.common.Checksum;
import org.apache.hadoop.ozone.common.ChecksumData;
import org.apache.hadoop.ozone.common.ChunkBuffer;
Expand Down Expand Up @@ -621,7 +625,8 @@ private CompletableFuture<PutBlockResult> writeChunkAndPutBlock(ChunkBuffer buff
*/
protected void handleFlush(boolean close) throws IOException {
try {
handleFlushInternal(close);
TracingUtil.executeInNewSpan("BlockOutputStream.handleFlush",
() -> handleFlushInternal(close));
if (close) {
waitForAllPendingFlushes();
}
Expand All @@ -633,16 +638,18 @@ protected void handleFlush(boolean close) throws IOException {
} catch (Throwable e) {
String msg = "Failed to flush. error: " + e.getMessage();
LOG.error(msg, e);
throw e;
if (e instanceof IOException) {
throw (IOException)e;
}
throw new IOException(e);
} finally {
if (close) {
cleanup(false);
}
}
}

private void handleFlushInternal(boolean close)
throws IOException, InterruptedException, ExecutionException {
private void handleFlushInternal(boolean close) throws Exception {
checkOpen();
LOG.debug("Start handleFlushInternal close={}", close);
CompletableFuture<Void> toWaitFor = handleFlushInternalSynchronized(close);
Expand Down Expand Up @@ -724,9 +731,11 @@ private synchronized CompletableFuture<Void> handleFlushInternalSynchronized(boo
}

private CompletableFuture<Void> watchForCommitAsync(CompletableFuture<PutBlockResult> putBlockResultFuture) {
Span span = GlobalTracer.get().activeSpan();
return putBlockResultFuture.thenAccept(x -> {
try {
watchForCommit(x.commitIndex);
try (Scope ignored = GlobalTracer.get().activateSpan(span)) {
TracingUtil.executeInNewSpan("BlockOutputStream.watchForCommit",
() -> watchForCommit(x.commitIndex));
} catch (IOException e) {
throw new FlushRuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
import java.util.stream.Collectors;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.opentracing.Scope;
import io.opentracing.util.GlobalTracer;
import org.apache.hadoop.hdds.HddsUtils;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.conf.StorageUnit;
Expand All @@ -59,6 +61,7 @@
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.hdds.tracing.TracingUtil;
import org.apache.hadoop.hdds.utils.Cache;
import org.apache.hadoop.hdds.utils.ResourceCache;
import org.apache.hadoop.ozone.HddsDatanodeService;
Expand Down Expand Up @@ -101,6 +104,8 @@
import org.apache.ratis.util.TaskQueue;
import org.apache.ratis.util.function.CheckedSupplier;
import org.apache.ratis.util.JavaUtils;

import io.opentracing.Span;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -410,9 +415,20 @@ public TransactionContext startTransaction(LogEntryProto entry, RaftPeerRole rol
@Override
public TransactionContext startTransaction(RaftClientRequest request)
throws IOException {
long startTime = Time.monotonicNowNanos();
final ContainerCommandRequestProto proto =
message2ContainerCommandRequestProto(request.getMessage());
Span span = TracingUtil.importAndCreateSpan("startTransaction", proto.getTraceID());
try (Scope ignored = GlobalTracer.get().activateSpan(span)) {
return startTransactionTraced(request, proto);
} finally {
span.finish();
}
}

private TransactionContext startTransactionTraced(RaftClientRequest request,
ContainerCommandRequestProto proto)
throws IOException {
long startTime = Time.monotonicNowNanos();
Preconditions.checkArgument(request.getRaftGroupId().equals(gid));

final TransactionContext.Builder builder = TransactionContext.newBuilder()
Expand Down Expand Up @@ -553,9 +569,10 @@ private CompletableFuture<Message> writeStateMachineData(
// thread.
CompletableFuture<ContainerCommandResponseProto> writeChunkFuture =
CompletableFuture.supplyAsync(() -> {
try {
metrics.recordWriteStateMachineQueueingLatencyNs(
Time.monotonicNowNanos() - startTime);
metrics.recordWriteStateMachineQueueingLatencyNs(
Time.monotonicNowNanos() - startTime);
Span span = TracingUtil.importAndCreateSpan("writeStateMachineData", requestProto.getTraceID());
try (Scope ignored = GlobalTracer.get().activateSpan(span)) {
return dispatchCommand(requestProto, context);
} catch (Exception e) {
LOG.error("{}: writeChunk writeStateMachineData failed: blockId" +
Expand All @@ -567,6 +584,8 @@ private CompletableFuture<Message> writeStateMachineData(
stateMachineHealthy.set(false);
raftFuture.completeExceptionally(e);
throw e;
} finally {
span.finish();
}
}, getChunkExecutor(requestProto.getWriteChunk()));

Expand Down Expand Up @@ -773,8 +792,13 @@ private ByteString readStateMachineData(
.setLogIndex(index)
.build();
// read the chunk
ContainerCommandResponseProto response =
dispatchCommand(dataContainerCommandProto, context);
Span span = TracingUtil.importAndCreateSpan("readStateMachineData", requestProto.getTraceID());
ContainerCommandResponseProto response;
try (Scope ignored = GlobalTracer.get().activateSpan(span)) {
response = dispatchCommand(dataContainerCommandProto, context);
} finally {
span.finish();
}
if (response.getResult() != ContainerProtos.Result.SUCCESS) {
StorageContainerException sce =
new StorageContainerException(response.getMessage(),
Expand Down Expand Up @@ -964,16 +988,18 @@ private CompletableFuture<ContainerCommandResponseProto> applyTransaction(
final long containerId = request.getContainerID();
final CheckedSupplier<ContainerCommandResponseProto, Exception> task
= () -> {
try {
long timeNow = Time.monotonicNowNanos();
long queueingDelay = timeNow - context.getStartTime();
metrics.recordQueueingDelay(request.getCmdType(), queueingDelay);
// TODO: add a counter to track number of executing applyTransaction
// and queue size
long queueingDelay = Time.monotonicNowNanos() - context.getStartTime();
metrics.recordQueueingDelay(request.getCmdType(), queueingDelay);
// TODO: add a counter to track number of executing applyTransaction
// and queue size
Span span = TracingUtil.importAndCreateSpan("applyTransaction", request.getTraceID());
try (Scope ignored = GlobalTracer.get().activateSpan(span)) {
return dispatchCommand(request, context);
} catch (Exception e) {
exceptionHandler.accept(e);
throw e;
} finally {
span.finish();
}
};
return containerTaskQueues.submit(containerId, task, executor);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.hdds.tracing.TracingUtil;
import org.apache.hadoop.hdds.upgrade.HDDSLayoutFeature;
import org.apache.hadoop.hdds.utils.db.BatchOperation;
import org.apache.hadoop.hdds.utils.db.Table;
Expand Down Expand Up @@ -89,9 +90,11 @@ public long putBlock(Container container, BlockData data) throws IOException {
@Override
public long putBlock(Container container, BlockData data,
boolean endOfBlock) throws IOException {
return persistPutBlock(
(KeyValueContainer) container,
data, endOfBlock);
return TracingUtil.executeInNewSpan("BlockManagerImpl.putBlock",
() -> persistPutBlock(
(KeyValueContainer) container,
data,
endOfBlock));
}

public long persistPutBlock(KeyValueContainer container,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.hdds.tracing.TracingUtil;
import org.apache.hadoop.ozone.common.ChunkBuffer;
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
Expand Down Expand Up @@ -114,6 +115,13 @@ public StateMachine.DataChannel getStreamDataChannel(
public void writeChunk(Container container, BlockID blockID, ChunkInfo info,
ChunkBuffer data, DispatcherContext dispatcherContext)
throws StorageContainerException {
TracingUtil.executeInNewSpan("FilePerBlockStrategy.writeChunk",
() -> writeChunkInSpan(container, blockID, info, data, dispatcherContext));
}

private void writeChunkInSpan(Container container, BlockID blockID, ChunkInfo info,
ChunkBuffer data, DispatcherContext dispatcherContext)
throws StorageContainerException {

checkLayoutVersion(container);

Expand Down Expand Up @@ -173,6 +181,13 @@ public void writeChunk(Container container, BlockID blockID, ChunkInfo info,
public ChunkBuffer readChunk(Container container, BlockID blockID,
ChunkInfo info, DispatcherContext dispatcherContext)
throws StorageContainerException {
return TracingUtil.executeInNewSpan("FilePerBlockStrategy.readChunk",
() -> readChunkInSpan(container, blockID, info, dispatcherContext));
}

private ChunkBuffer readChunkInSpan(Container container, BlockID blockID,
ChunkInfo info, DispatcherContext dispatcherContext)
throws StorageContainerException {

checkLayoutVersion(container);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hdds.server;

import io.opentracing.Scope;
import io.opentracing.util.GlobalTracer;
import org.apache.hadoop.hdds.tracing.TracingUtil;
import org.apache.hadoop.hdds.utils.ProtocolMessageMetrics;

Expand Down Expand Up @@ -71,7 +73,7 @@ public RESPONSE processRequest(
TYPE type,
String traceId) throws ServiceException {
Span span = TracingUtil.importAndCreateSpan(type.toString(), traceId);
try {
try (Scope scope = GlobalTracer.get().activateSpan(span)) {
if (logger.isTraceEnabled()) {
logger.trace(
"[service={}] [type={}] request is received: <json>{}</json>",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,13 @@ public class BasicRootedOzoneFileSystem extends FileSystem {
@Override
public void initialize(URI name, Configuration conf) throws IOException {
super.initialize(name, conf);
TracingUtil.executeInNewSpan("ofs initialize",
() -> initializeWithTrace(name, conf));
}

private void initializeWithTrace(URI name, Configuration conf) throws IOException {
GlobalTracer.get().activeSpan()
.setTag("name", name.toString());
listingPageSize = conf.getInt(
OZONE_FS_LISTING_PAGE_SIZE,
OZONE_FS_LISTING_PAGE_SIZE_DEFAULT);
Expand Down Expand Up @@ -223,7 +230,10 @@ protected boolean isHsyncEnabled() {
@Override
public void close() throws IOException {
try {
adapter.close();
TracingUtil.executeInNewSpan("ofs close",
() -> {
adapter.close();
});
} finally {
super.close();
}
Expand Down Expand Up @@ -702,9 +712,11 @@ public boolean delete(Path f, boolean recursive) throws IOException {
private boolean deleteInSpan(Path f, boolean recursive) throws IOException {
incrementCounter(Statistic.INVOCATION_DELETE, 1);
statistics.incrementWriteOps(1);
Span span = GlobalTracer.get().activeSpan();
String key = pathToKey(f);
span.setTag("path", key);
LOG.debug("Delete path {} - recursive {}", f, recursive);

String key = pathToKey(f);
OFSPath ofsPath = new OFSPath(key,
ozoneConfiguration);
// Handle rm root
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ public synchronized void flush() throws IOException {

@Override
public synchronized void close() throws IOException {
outputStream.close();
TracingUtil.executeInNewSpan("OzoneFSOutputStream.close",
outputStream::close);
}

@Override
Expand Down
Loading
Loading