Skip to content

Commit

Permalink
Merge pull request #291 from ydb-platform/grpc_logging
Browse files Browse the repository at this point in the history
Improve grpc calls logging
  • Loading branch information
alex268 authored Jul 4, 2024
2 parents 30f8df0 + 7a11b45 commit cda2677
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 33 deletions.
1 change: 1 addition & 0 deletions core/src/main/java/tech/ydb/core/grpc/GrpcTransport.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
* @author Nikolay Perfilov
*/
public interface GrpcTransport extends AutoCloseable {
// TODO: <ReqT extends Message, RespT extends Message>
<ReqT, RespT> CompletableFuture<Result<RespT>> unaryCall(
MethodDescriptor<ReqT, RespT> method,
GrpcRequestSettings settings,
Expand Down
49 changes: 23 additions & 26 deletions core/src/main/java/tech/ydb/core/impl/BaseGrpcTransport.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ public <ReqT, RespT> CompletableFuture<Result<RespT>> unaryCall(
return CompletableFuture.completedFuture(SHUTDOWN_RESULT.map(null));
}

String traceId = settings.getTraceId();
CallOptions options = getAuthCallOptions().getGrpcCallOptions();
if (settings.getDeadlineAfter() != 0) {
final long now = System.nanoTime();
Expand All @@ -84,19 +85,17 @@ public <ReqT, RespT> CompletableFuture<Result<RespT>> unaryCall(
ChannelStatusHandler handler = new ChannelStatusHandler(channel, settings);

if (logger.isTraceEnabled()) {
logger.trace("Sending request with traceId {} to {}, method `{}', request: `{}'",
settings.getTraceId(),
channel.getEndpoint(),
method,
request);
logger.trace("UnaryCall[{}] with method {} and endpoint {} created",
traceId, method.getFullMethodName(), channel.getEndpoint().getHostAndPort()
);
}

return new UnaryCall<>(call, handler).startCall(request, makeMetadataFromSettings(settings));
return new UnaryCall<>(traceId, call, handler).startCall(request, makeMetadataFromSettings(settings));
} catch (UnexpectedResultException ex) {
logger.error("unary call with traceId {} unexprected status {}", settings.getTraceId(), ex.getStatus());
logger.error("UnaryCall[{}] got unexprected status {}", traceId, ex.getStatus());
return CompletableFuture.completedFuture(Result.fail(ex));
} catch (RuntimeException ex) {
logger.error("unary call with traceId {} problem {}", settings.getTraceId(), ex.getMessage());
logger.error("UnaryCall[{}] got problem {}", traceId, ex.getMessage());
return CompletableFuture.completedFuture(Result.error(ex.getMessage(), ex));
}
}
Expand All @@ -111,6 +110,7 @@ public <ReqT, RespT> GrpcReadStream<RespT> readStreamCall(
return new EmptyStream<>(SHUTDOWN_RESULT.getStatus());
}

String traceId = settings.getTraceId();
CallOptions options = getAuthCallOptions().getGrpcCallOptions();
if (settings.getDeadlineAfter() != 0) {
final long now = System.nanoTime();
Expand All @@ -126,20 +126,17 @@ public <ReqT, RespT> GrpcReadStream<RespT> readStreamCall(
ChannelStatusHandler handler = new ChannelStatusHandler(channel, settings);

if (logger.isTraceEnabled()) {
logger.trace("Creating stream call with traceId {} to {}, method `{}', request: `{}'",
settings.getTraceId(),
channel.getEndpoint(),
method,
request);
logger.trace("ReadStreamCall[{}] with method {} and endpoint {} created",
traceId, method.getFullMethodName(), channel.getEndpoint().getHostAndPort()
);
}

return new ReadStreamCall<>(call, request, makeMetadataFromSettings(settings), handler);
return new ReadStreamCall<>(traceId, call, request, makeMetadataFromSettings(settings), handler);
} catch (UnexpectedResultException ex) {
logger.error("server stream call with traceId {} unexpected status {}",
settings.getTraceId(), ex.getStatus());
logger.error("ReadStreamCall[{}] got unexpected status {}", traceId, ex.getStatus());
return new EmptyStream<>(ex.getStatus());
} catch (RuntimeException ex) {
logger.error("server stream call with traceId {} problem {}", settings.getTraceId(), ex.getMessage());
logger.error("ReadStreamCall[{}] got problem {}", traceId, ex.getMessage());
Issue issue = Issue.of(ex.getMessage(), Issue.Severity.ERROR);
return new EmptyStream<>(Status.of(StatusCode.CLIENT_INTERNAL_ERROR, issue));
}
Expand All @@ -155,6 +152,7 @@ public <ReqT, RespT> GrpcReadWriteStream<RespT, ReqT> readWriteStreamCall(
return new EmptyStream<>(SHUTDOWN_RESULT.getStatus());
}

String traceId = settings.getTraceId();
CallOptions options = getAuthCallOptions().getGrpcCallOptions();
if (settings.getDeadlineAfter() != 0) {
final long now = System.nanoTime();
Expand All @@ -170,20 +168,19 @@ public <ReqT, RespT> GrpcReadWriteStream<RespT, ReqT> readWriteStreamCall(
ChannelStatusHandler handler = new ChannelStatusHandler(channel, settings);

if (logger.isTraceEnabled()) {
logger.trace("Creating bidirectional stream call with traceId {} to {}, method `{}'",
settings.getTraceId(),
channel.getEndpoint(),
method);
logger.trace("ReadWriteStreamCall[{}] with method {} and endpoint {} created",
traceId, method.getFullMethodName(), channel.getEndpoint().getHostAndPort()
);
}

return new ReadWriteStreamCall<>(call, makeMetadataFromSettings(settings), getAuthCallOptions(), handler);
return new ReadWriteStreamCall<>(
traceId, call, makeMetadataFromSettings(settings), getAuthCallOptions(), handler
);
} catch (UnexpectedResultException ex) {
logger.error("server bidirectional stream call with traceId {} unexpected status {}",
settings.getTraceId(), ex.getStatus());
logger.error("ReadWriteStreamCall[{}] got unexpected status {}", traceId, ex.getStatus());
return new EmptyStream<>(ex.getStatus());
} catch (RuntimeException ex) {
logger.error("server bidirectional stream call with traceId {} problem {}", settings.getTraceId(),
ex.getMessage());
logger.error("ReadWriteStreamCall[{}] got problem {}", traceId, ex.getMessage());
Issue issue = Issue.of(ex.getMessage(), Issue.Severity.ERROR);
return new EmptyStream<>(Status.of(StatusCode.CLIENT_INTERNAL_ERROR, issue));
}
Expand Down
22 changes: 19 additions & 3 deletions core/src/main/java/tech/ydb/core/impl/call/ReadStreamCall.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

import javax.annotation.Nullable;

import com.google.protobuf.Message;
import com.google.protobuf.TextFormat;
import io.grpc.ClientCall;
import io.grpc.Metadata;
import org.slf4j.Logger;
Expand All @@ -14,6 +16,7 @@
import tech.ydb.core.Status;
import tech.ydb.core.grpc.GrpcReadStream;
import tech.ydb.core.grpc.GrpcStatuses;
import tech.ydb.core.grpc.GrpcTransport;

/**
*
Expand All @@ -22,8 +25,9 @@
* @param <RespT> type of read stream messages
*/
public class ReadStreamCall<ReqT, RespT> extends ClientCall.Listener<RespT> implements GrpcReadStream<RespT> {
private static final Logger logger = LoggerFactory.getLogger(ReadStreamCall.class);
private static final Logger logger = LoggerFactory.getLogger(GrpcTransport.class);

private final String traceId;
private final ClientCall<ReqT, RespT> call;
private final GrpcStatusHandler statusConsumer;
private final ReqT request;
Expand All @@ -33,11 +37,13 @@ public class ReadStreamCall<ReqT, RespT> extends ClientCall.Listener<RespT> impl
private final AtomicReference<Observer<RespT>> observerReference = new AtomicReference<>();

public ReadStreamCall(
String traceId,
ClientCall<ReqT, RespT> call,
ReqT request,
Metadata headers,
GrpcStatusHandler statusConsumer
) {
this.traceId = traceId;
this.call = call;
this.request = request;
this.headers = headers;
Expand All @@ -54,14 +60,17 @@ public CompletableFuture<Status> start(Observer<RespT> observer) {
try {
call.start(this, headers);
call.request(1);
if (logger.isTraceEnabled()) {
logger.trace("ReadStreamCall[{}] --> {}", traceId, TextFormat.shortDebugString((Message) request));
}
call.sendMessage(request);
// close stream by client side
call.halfClose();
} catch (Throwable t) {
try {
call.cancel(null, t);
} catch (Throwable ex) {
logger.error("Exception encountered while closing the unary call", ex);
logger.error("ReadStreamCall[{}] got exception while canceling", traceId, ex);
}

statusFuture.completeExceptionally(t);
Expand All @@ -81,6 +90,9 @@ public void cancel() {
@Override
public void onMessage(RespT message) {
try {
if (logger.isTraceEnabled()) {
logger.trace("ReadStreamCall[{}] <-- {}", traceId, TextFormat.shortDebugString((Message) message));
}
observerReference.get().onNext(message);
// request delivery of the next inbound message.
synchronized (call) {
Expand All @@ -94,13 +106,17 @@ public void onMessage(RespT message) {
call.cancel("Canceled by exception from observer", ex);
}
} catch (Throwable th) {
logger.error("Exception encountered while canceling the read stream call", th);
logger.error("ReadStreamCall[{}] got exception while canceling", traceId, th);
}
}
}

@Override
public void onClose(io.grpc.Status status, @Nullable Metadata trailers) {
if (logger.isTraceEnabled()) {
logger.trace("ReadStreamCall[{}] closed with status {}", status);
}

statusConsumer.accept(status, trailers);

if (status.isOk()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

import javax.annotation.Nullable;

import com.google.protobuf.Message;
import com.google.protobuf.TextFormat;
import io.grpc.ClientCall;
import io.grpc.Metadata;
import org.slf4j.Logger;
Expand All @@ -16,6 +18,7 @@
import tech.ydb.core.Status;
import tech.ydb.core.grpc.GrpcReadWriteStream;
import tech.ydb.core.grpc.GrpcStatuses;
import tech.ydb.core.grpc.GrpcTransport;
import tech.ydb.core.impl.auth.AuthCallOptions;

/**
Expand All @@ -25,8 +28,9 @@
* @param <W> type of message to be sent to the server
*/
public class ReadWriteStreamCall<R, W> extends ClientCall.Listener<R> implements GrpcReadWriteStream<R, W> {
private static final Logger logger = LoggerFactory.getLogger(ReadStreamCall.class);
private static final Logger logger = LoggerFactory.getLogger(GrpcTransport.class);

private final String traceId;
private final ClientCall<W, R> call;
private final GrpcStatusHandler statusConsumer;
private final Metadata headers;
Expand All @@ -37,11 +41,13 @@ public class ReadWriteStreamCall<R, W> extends ClientCall.Listener<R> implements
private final Queue<W> messagesQueue = new ArrayDeque<>();

public ReadWriteStreamCall(
String traceId,
ClientCall<W, R> call,
Metadata headers,
AuthCallOptions callOptions,
GrpcStatusHandler statusConsumer
) {
this.traceId = traceId;
this.call = call;
this.headers = headers;
this.statusConsumer = statusConsumer;
Expand Down Expand Up @@ -95,6 +101,9 @@ private boolean flush() {
return true;
}

if (logger.isTraceEnabled()) {
logger.trace("ReadWriteStreamCall[{}] --> {}", traceId, TextFormat.shortDebugString((Message) next));
}
call.sendMessage(next);
}
// call is not ready
Expand All @@ -111,6 +120,10 @@ public void cancel() {
@Override
public void onMessage(R message) {
try {
if (logger.isTraceEnabled()) {
logger.trace("ReadWriteStreamCall[{}] <-- {}", traceId, TextFormat.shortDebugString((Message) message));
}

observerReference.get().onNext(message);
// request delivery of the next inbound message.
synchronized (call) {
Expand Down Expand Up @@ -145,6 +158,9 @@ public void close() {

@Override
public void onClose(io.grpc.Status status, @Nullable Metadata trailers) {
if (logger.isTraceEnabled()) {
logger.trace("ReadWriteStreamCall[{}] closed with status {}", status);
}
statusConsumer.accept(status, trailers);

if (status.isOk()) {
Expand Down
20 changes: 17 additions & 3 deletions core/src/main/java/tech/ydb/core/impl/call/UnaryCall.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

import javax.annotation.Nullable;

import com.google.protobuf.Message;
import com.google.protobuf.TextFormat;
import io.grpc.ClientCall;
import io.grpc.Metadata;
import org.slf4j.Logger;
Expand All @@ -15,6 +17,7 @@
import tech.ydb.core.Status;
import tech.ydb.core.StatusCode;
import tech.ydb.core.grpc.GrpcStatuses;
import tech.ydb.core.grpc.GrpcTransport;

/**
*
Expand All @@ -23,21 +26,23 @@
* @param <RespT> type of call return
*/
public class UnaryCall<ReqT, RespT> extends ClientCall.Listener<RespT> {
private static final Logger logger = LoggerFactory.getLogger(UnaryCall.class);
private static final Logger logger = LoggerFactory.getLogger(GrpcTransport.class);

private static final Status NO_VALUE = Status.of(StatusCode.CLIENT_INTERNAL_ERROR)
.withIssues(Issue.of("No value received for gRPC unary call", Issue.Severity.ERROR));

private static final Status MULTIPLY_VALUES = Status.of(StatusCode.CLIENT_INTERNAL_ERROR)
.withIssues(Issue.of("More than one value received for gRPC unary call", Issue.Severity.ERROR));

private final String traceId;
private final ClientCall<ReqT, RespT> call;
private final GrpcStatusHandler statusConsumer;

private final CompletableFuture<Result<RespT>> future = new CompletableFuture<>();
private final AtomicReference<RespT> value = new AtomicReference<>();

public UnaryCall(ClientCall<ReqT, RespT> call, GrpcStatusHandler statusConsumer) {
public UnaryCall(String traceId, ClientCall<ReqT, RespT> call, GrpcStatusHandler statusConsumer) {
this.traceId = traceId;
this.call = call;
this.statusConsumer = statusConsumer;
}
Expand All @@ -46,14 +51,17 @@ public CompletableFuture<Result<RespT>> startCall(ReqT request, Metadata headers
try {
call.start(this, headers);
call.request(1);
if (logger.isTraceEnabled()) {
logger.trace("UnaryCall[{}] --> {}", traceId, TextFormat.shortDebugString((Message) request));
}
call.sendMessage(request);
call.halfClose();
} catch (Exception ex) {
future.completeExceptionally(ex);
try {
call.cancel(ex.getMessage(), ex);
} catch (Exception ex2) {
logger.error("Exception encountered while closing the unary call", ex2);
logger.error("UnaryCall[{}] got exception while canceling", traceId, ex2);
}
}

Expand All @@ -62,6 +70,9 @@ public CompletableFuture<Result<RespT>> startCall(ReqT request, Metadata headers

@Override
public void onMessage(RespT value) {
if (logger.isTraceEnabled()) {
logger.trace("UnaryCall[{}] <-- {}", traceId, TextFormat.shortDebugString((Message) value));
}
if (!this.value.compareAndSet(null, value)) {
future.complete(Result.fail(MULTIPLY_VALUES));
}
Expand All @@ -70,6 +81,9 @@ public void onMessage(RespT value) {
@Override
public void onClose(io.grpc.Status status, @Nullable Metadata trailers) {
statusConsumer.accept(status, trailers);
if (logger.isTraceEnabled()) {
logger.trace("UnaryCall[{}] closed with status {}", traceId, status);
}

if (status.isOk()) {
RespT snapshotValue = value.get();
Expand Down

0 comments on commit cda2677

Please sign in to comment.