Skip to content

Commit

Permalink
Merge pull request #250 from ydb-platform/trace-id
Browse files Browse the repository at this point in the history
Send trace-id header with every request
  • Loading branch information
pnv1 authored Mar 27, 2024
2 parents 42323cf + 3c9af20 commit 1981ce8
Show file tree
Hide file tree
Showing 13 changed files with 197 additions and 105 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package tech.ydb.coordination.impl;

import java.time.Clock;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;

import tech.ydb.coordination.CoordinationClient;
Expand Down Expand Up @@ -40,9 +41,14 @@ private String validatePath(String path) {
return path.startsWith("/") ? path : rpc.getDatabase() + "/" + path;
}

private GrpcRequestSettings makeGrpcRequestSettings(BaseRequestSettings settings) {
private static String getTraceIdOrGenerateNew(String traceId) {
return traceId == null ? UUID.randomUUID().toString() : traceId;
}

private GrpcRequestSettings makeGrpcRequestSettings(BaseRequestSettings settings, String traceId) {
return GrpcRequestSettings.newBuilder()
.withDeadline(settings.getRequestTimeout())
.withTraceId(traceId)
.build();
}

Expand All @@ -59,7 +65,8 @@ public CompletableFuture<Status> createNode(String path, CoordinationNodeSetting
.setConfig(settings.getConfig().toProto())
.build();

GrpcRequestSettings grpcSettings = makeGrpcRequestSettings(settings);
String traceId = getTraceIdOrGenerateNew(settings.getTraceId());
GrpcRequestSettings grpcSettings = makeGrpcRequestSettings(settings, traceId);
return rpc.createNode(request, grpcSettings);
}

Expand All @@ -71,7 +78,8 @@ public CompletableFuture<Status> alterNode(String path, CoordinationNodeSettings
.setConfig(settings.getConfig().toProto())
.build();

GrpcRequestSettings grpcSettings = makeGrpcRequestSettings(settings);
String traceId = getTraceIdOrGenerateNew(settings.getTraceId());
GrpcRequestSettings grpcSettings = makeGrpcRequestSettings(settings, traceId);
return rpc.alterNode(request, grpcSettings);
}

Expand All @@ -82,7 +90,8 @@ public CompletableFuture<Status> dropNode(String path, DropCoordinationNodeSetti
.setOperationParams(Operation.buildParams(settings))
.build();

GrpcRequestSettings grpcSettings = makeGrpcRequestSettings(settings);
String traceId = getTraceIdOrGenerateNew(settings.getTraceId());
GrpcRequestSettings grpcSettings = makeGrpcRequestSettings(settings, traceId);
return rpc.dropNode(request, grpcSettings);
}

Expand All @@ -94,7 +103,8 @@ public CompletableFuture<Result<NodeConfig>> describeNode(String path,
.setOperationParams(Operation.buildParams(settings))
.build();

GrpcRequestSettings grpcSettings = makeGrpcRequestSettings(settings);
String traceId = getTraceIdOrGenerateNew(settings.getTraceId());
GrpcRequestSettings grpcSettings = makeGrpcRequestSettings(settings, traceId);
return rpc.describeNode(request, grpcSettings);
}

Expand Down
54 changes: 30 additions & 24 deletions core/src/main/java/tech/ydb/core/grpc/GrpcRequestSettings.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package tech.ydb.core.grpc;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;

import io.grpc.Metadata;
Expand All @@ -11,14 +13,16 @@
public class GrpcRequestSettings {
private final long deadlineAfter;
private final Integer preferredNodeID;
private final Metadata extraHeaders;
private final String traceId;
private final List<String> clientCapabilities;
private final Consumer<Metadata> trailersHandler;

private GrpcRequestSettings(Builder builder) {
this.deadlineAfter = builder.getDeadlineAfter();
this.preferredNodeID = builder.getPreferredNodeID();
this.extraHeaders = builder.getExtraHeaders();
this.trailersHandler = builder.getTrailersHandler();
this.deadlineAfter = builder.deadlineAfter;
this.preferredNodeID = builder.preferredNodeID;
this.traceId = builder.traceId;
this.clientCapabilities = builder.clientCapabilities;
this.trailersHandler = builder.trailersHandler;
}

public static Builder newBuilder() {
Expand All @@ -33,8 +37,12 @@ public Integer getPreferredNodeID() {
return preferredNodeID;
}

public Metadata getExtraHeaders() {
return extraHeaders;
public String getTraceId() {
return traceId;
}

public List<String> getClientCapabilities() {
return clientCapabilities;
}

public Consumer<Metadata> getTrailersHandler() {
Expand All @@ -44,7 +52,8 @@ public Consumer<Metadata> getTrailersHandler() {
public static final class Builder {
private long deadlineAfter = 0L;
private Integer preferredNodeID = null;
private Metadata extraHeaders = null;
private String traceId = null;
private List<String> clientCapabilities = null;
private Consumer<Metadata> trailersHandler = null;

/**
Expand Down Expand Up @@ -81,30 +90,27 @@ public Builder withPreferredNodeID(Integer preferredNodeID) {
return this;
}

public Builder withExtraHeaders(Metadata headers) {
this.extraHeaders = headers;
public Builder withTraceId(String traceId) {
this.traceId = traceId;
return this;
}

public Builder withTrailersHandler(Consumer<Metadata> handler) {
this.trailersHandler = handler;
public Builder withClientCapabilities(List<String> clientCapabilities) {
this.clientCapabilities = clientCapabilities;
return this;
}

public long getDeadlineAfter() {
return deadlineAfter;
}

public Integer getPreferredNodeID() {
return preferredNodeID;
}

public Metadata getExtraHeaders() {
return extraHeaders;
public Builder addClientCapability(String clientCapability) {
if (this.clientCapabilities == null) {
this.clientCapabilities = new ArrayList<>();
}
this.clientCapabilities.add(clientCapability);
return this;
}

public Consumer<Metadata> getTrailersHandler() {
return trailersHandler;
public Builder withTrailersHandler(Consumer<Metadata> handler) {
this.trailersHandler = handler;
return this;
}

public GrpcRequestSettings build() {
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/java/tech/ydb/core/grpc/YdbHeaders.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ public class YdbHeaders {
public static final Metadata.Key<String> DATABASE =
Metadata.Key.of("x-ydb-database", Metadata.ASCII_STRING_MARSHALLER);

static final Metadata.Key<String> TRACE_ID =
public static final Metadata.Key<String> TRACE_ID =
Metadata.Key.of("x-ydb-trace-id", Metadata.ASCII_STRING_MARSHALLER);

public static final Metadata.Key<String> BUILD_INFO =
Expand Down
50 changes: 34 additions & 16 deletions core/src/main/java/tech/ydb/core/impl/BaseGrpcTransport.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import tech.ydb.core.grpc.GrpcRequestSettings;
import tech.ydb.core.grpc.GrpcStatuses;
import tech.ydb.core.grpc.GrpcTransport;
import tech.ydb.core.grpc.YdbHeaders;
import tech.ydb.core.impl.auth.AuthCallOptions;
import tech.ydb.core.impl.call.EmptyStream;
import tech.ydb.core.impl.call.GrpcStatusHandler;
Expand Down Expand Up @@ -65,7 +66,7 @@ public <ReqT, RespT> CompletableFuture<Result<RespT>> unaryCall(
if (settings.getDeadlineAfter() != 0) {
final long now = System.nanoTime();
if (now >= settings.getDeadlineAfter()) {
return CompletableFuture.completedFuture(deadlineExpiredResult(method));
return CompletableFuture.completedFuture(deadlineExpiredResult(method, settings));
}
options = options.withDeadlineAfter(settings.getDeadlineAfter() - now, TimeUnit.NANOSECONDS);
}
Expand All @@ -76,15 +77,16 @@ public <ReqT, RespT> CompletableFuture<Result<RespT>> unaryCall(
ChannelStatusHandler handler = new ChannelStatusHandler(channel, settings);

if (logger.isTraceEnabled()) {
logger.trace("Sending request to {}, method `{}', request: `{}'",
logger.trace("Sending request with traceId {} to {}, method `{}', request: `{}'",
settings.getTraceId(),
channel.getEndpoint(),
method,
request);
}

return new UnaryCall<>(call, handler).startCall(request, settings.getExtraHeaders());
return new UnaryCall<>(call, handler).startCall(request, makeMetadataFromSettings(settings));
} catch (RuntimeException ex) {
logger.error("unary call problem {}", ex.getMessage());
logger.error("unary call with traceId {} problem {}", settings.getTraceId(), ex.getMessage());
return Async.failedFuture(ex);
}
}
Expand All @@ -103,7 +105,7 @@ public <ReqT, RespT> GrpcReadStream<RespT> readStreamCall(
if (settings.getDeadlineAfter() != 0) {
final long now = System.nanoTime();
if (now >= settings.getDeadlineAfter()) {
return new EmptyStream<>(GrpcStatuses.toStatus(deadlineExpiredStatus(method)));
return new EmptyStream<>(GrpcStatuses.toStatus(deadlineExpiredStatus(method, settings)));
}
options = options.withDeadlineAfter(settings.getDeadlineAfter() - now, TimeUnit.NANOSECONDS);
}
Expand All @@ -114,15 +116,16 @@ public <ReqT, RespT> GrpcReadStream<RespT> readStreamCall(
ChannelStatusHandler handler = new ChannelStatusHandler(channel, settings);

if (logger.isTraceEnabled()) {
logger.trace("Creating stream call to {}, method `{}', request: `{}'",
logger.trace("Creating stream call with traceId {} to {}, method `{}', request: `{}'",
settings.getTraceId(),
channel.getEndpoint(),
method,
request);
}

return new ReadStreamCall<>(call, request, settings.getExtraHeaders(), handler);
return new ReadStreamCall<>(call, request, makeMetadataFromSettings(settings), handler);
} catch (RuntimeException ex) {
logger.error("server stream call problem {}", ex.getMessage());
logger.error("server stream call with traceId {} problem {}", settings.getTraceId(), ex.getMessage());
Issue issue = Issue.of(ex.getMessage(), Issue.Severity.ERROR);
return new EmptyStream<>(Status.of(StatusCode.CLIENT_INTERNAL_ERROR, null, issue));
}
Expand All @@ -142,7 +145,7 @@ public <ReqT, RespT> GrpcReadWriteStream<RespT, ReqT> readWriteStreamCall(
if (settings.getDeadlineAfter() != 0) {
final long now = System.nanoTime();
if (now >= settings.getDeadlineAfter()) {
return new EmptyStream<>(GrpcStatuses.toStatus(deadlineExpiredStatus(method)));
return new EmptyStream<>(GrpcStatuses.toStatus(deadlineExpiredStatus(method, settings)));
}
options = options.withDeadlineAfter(settings.getDeadlineAfter() - now, TimeUnit.NANOSECONDS);
}
Expand All @@ -153,31 +156,46 @@ public <ReqT, RespT> GrpcReadWriteStream<RespT, ReqT> readWriteStreamCall(
ChannelStatusHandler handler = new ChannelStatusHandler(channel, settings);

if (logger.isTraceEnabled()) {
logger.trace("Creating bidirectional stream call to {}, method `{}'",
logger.trace("Creating bidirectional stream call with traceId {} to {}, method `{}'",
settings.getTraceId(),
channel.getEndpoint(),
method);
}

return new ReadWriteStreamCall<>(call, settings.getExtraHeaders(), getAuthCallOptions(), handler);
return new ReadWriteStreamCall<>(call, makeMetadataFromSettings(settings), getAuthCallOptions(), handler);
} catch (RuntimeException ex) {
logger.error("server bidirectional stream call problem {}", ex.getMessage());
logger.error("server bidirectional stream call with traceId {} problem {}", settings.getTraceId(),
ex.getMessage());
Issue issue = Issue.of(ex.getMessage(), Issue.Severity.ERROR);
return new EmptyStream<>(Status.of(StatusCode.CLIENT_INTERNAL_ERROR, null, issue));
}
}

private static <T> Result<T> deadlineExpiredResult(MethodDescriptor<?, T> method) {
String message = "deadline expired before calling method " + method.getFullMethodName();
private static <T> Result<T> deadlineExpiredResult(MethodDescriptor<?, T> method, GrpcRequestSettings settings) {
String message = "deadline expired before calling method " + method.getFullMethodName() + " with traceId " +
settings.getTraceId();
return Result.fail(Status.of(
StatusCode.CLIENT_DEADLINE_EXPIRED, null, Issue.of(message, Issue.Severity.ERROR)
));
}

private static io.grpc.Status deadlineExpiredStatus(MethodDescriptor<?, ?> method) {
String message = "deadline expired before calling method " + method.getFullMethodName();
private static io.grpc.Status deadlineExpiredStatus(MethodDescriptor<?, ?> method, GrpcRequestSettings settings) {
String message = "deadline expired before calling method " + method.getFullMethodName() + " with traceId " +
settings.getTraceId();
return io.grpc.Status.DEADLINE_EXCEEDED.withDescription(message);
}

private Metadata makeMetadataFromSettings(GrpcRequestSettings settings) {
Metadata metadata = new Metadata();
if (settings.getTraceId() != null) {
metadata.put(YdbHeaders.TRACE_ID, settings.getTraceId());
}
if (settings.getClientCapabilities() != null) {
settings.getClientCapabilities().forEach(name -> metadata.put(YdbHeaders.YDB_CLIENT_CAPABILITIES, name));
}
return metadata;
}

private class ChannelStatusHandler implements GrpcStatusHandler {
private final GrpcChannel channel;
private final GrpcRequestSettings settings;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public ReadStreamCall(
) {
this.call = call;
this.request = request;
this.headers = headers != null ? headers : new Metadata();
this.headers = headers;
this.statusConsumer = statusConsumer;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public ReadWriteStreamCall(
GrpcStatusHandler statusConsumer
) {
this.call = call;
this.headers = headers != null ? headers : new Metadata();
this.headers = headers;
this.statusConsumer = statusConsumer;
this.callOptions = callOptions;
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/java/tech/ydb/core/impl/call/UnaryCall.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public UnaryCall(ClientCall<ReqT, RespT> call, GrpcStatusHandler statusConsumer)

public CompletableFuture<Result<RespT>> startCall(ReqT request, Metadata headers) {
try {
call.start(this, headers != null ? headers : new Metadata());
call.start(this, headers);
call.request(1);
call.sendMessage(request);
call.halfClose();
Expand Down
12 changes: 12 additions & 0 deletions core/src/main/java/tech/ydb/core/settings/BaseRequestSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,18 @@ public Self withRequestTimeout(long duration, TimeUnit unit) {
return self();
}

/**
* Set request trace id. Used for debug purposes.
* If not set explicitly, random UUID will be generated
*
* @param traceId request trace id
* @return Builder
*/
public Self withTraceId(String traceId) {
this.traceId = traceId;
return self();
}

public BaseRequestSettings build() {
return new BaseRequestSettings(this);
}
Expand Down
15 changes: 12 additions & 3 deletions export/src/main/java/tech/ydb/export/impl/ExportClientImpl.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package tech.ydb.export.impl;

import java.util.UUID;
import java.util.concurrent.CompletableFuture;

import tech.ydb.core.Result;
Expand All @@ -24,9 +25,15 @@ public class ExportClientImpl implements ExportClient {
public ExportClientImpl(ExportRpc exportRpc) {
this.exportRpc = exportRpc;
}
private GrpcRequestSettings makeGrpcRequestSettings(BaseRequestSettings settings) {

private String getTraceIdOrGenerateNew(String traceId) {
return traceId == null ? UUID.randomUUID().toString() : traceId;
}

private GrpcRequestSettings makeGrpcRequestSettings(BaseRequestSettings settings, String traceId) {
return GrpcRequestSettings.newBuilder()
.withDeadline(settings.getRequestTimeout())
.withTraceId(traceId)
.build();
}

Expand Down Expand Up @@ -78,7 +85,8 @@ public CompletableFuture<Operation<Result<ExportToS3Result>>> startExportToS3(
.setOperationParams(Operation.buildParams(settings))
.build();

return exportRpc.exportS3(request, makeGrpcRequestSettings(settings))
String traceId = getTraceIdOrGenerateNew(settings.getTraceId());
return exportRpc.exportS3(request, makeGrpcRequestSettings(settings, traceId))
.thenApply(op -> op.transform(r -> r.map(ExportToS3Result::new)));
}

Expand Down Expand Up @@ -120,7 +128,8 @@ public CompletableFuture<Operation<Result<ExportToYtResult>>> startExportToYt(
.setOperationParams(Operation.buildParams(settings))
.build();

return exportRpc.exportYt(request, makeGrpcRequestSettings(settings))
String traceId = getTraceIdOrGenerateNew(settings.getTraceId());
return exportRpc.exportYt(request, makeGrpcRequestSettings(settings, traceId))
.thenApply(op -> op.transform(r -> r.map(ExportToYtResult::new)));
}
}
Loading

0 comments on commit 1981ce8

Please sign in to comment.