From 22ae4cbcfbb310b9fc055986080533fe67afbb1e Mon Sep 17 00:00:00 2001 From: Idel Pivnitskiy Date: Thu, 13 Jan 2022 09:52:27 -0800 Subject: [PATCH] Add more logging for `GrpcRouter` and other transport classes (#2055) Motivation: Better visibility of the request-response processing. Modifications: - Add debug level logging when route fails with an error in `GrpcRouter`; - Add debug level logging when `GrpcStatusUpdater` converts an exception into a `grpc-status` code; - Add more debug logging in `NettyChannelPublisher` and `WriteStreamSubscriber`; Result: Easier debugging. --- .../io/servicetalk/grpc/api/GrpcRouter.java | 17 +++++++++++++++-- .../java/io/servicetalk/grpc/api/GrpcUtils.java | 10 +++++++++- .../netty/internal/NettyChannelPublisher.java | 5 +++++ .../netty/internal/WriteStreamSubscriber.java | 7 +++++++ 4 files changed, 36 insertions(+), 3 deletions(-) diff --git a/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/GrpcRouter.java b/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/GrpcRouter.java index 5718cfea32..f93561c792 100644 --- a/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/GrpcRouter.java +++ b/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/GrpcRouter.java @@ -55,6 +55,9 @@ import io.servicetalk.transport.api.ExecutionContext; import io.servicetalk.transport.api.ServerContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.IOException; import java.util.HashMap; import java.util.List; @@ -90,6 +93,8 @@ * implementation of a gRPC method. */ final class GrpcRouter { + private static final Logger LOGGER = LoggerFactory.getLogger(GrpcRouter.class); + private final Map routes; private final Map streamingRoutes; private final Map blockingRoutes; @@ -264,9 +269,14 @@ public Single handle(final HttpServiceContext ctx, final HttpReque .payloadBody(rawResp, serializationProvider.serializerFor(responseEncoding, responseClass))) - .onErrorReturn(cause -> newErrorResponse(responseFactory, finalServiceContext, - null, cause, ctx.executionContext().bufferAllocator())); + .onErrorReturn(cause -> { + LOGGER.debug("Unexpected exception from aggregated response for path : {}", + path, cause); + return newErrorResponse(responseFactory, finalServiceContext, + null, cause, ctx.executionContext().bufferAllocator()); + }); } catch (Throwable t) { + LOGGER.debug("Unexpected exception from aggregated endpoint for path: {}", path, t); return succeeded(newErrorResponse(responseFactory, serviceContext, null, t, ctx.executionContext().bufferAllocator())); } @@ -320,6 +330,7 @@ public Single handle(final HttpServiceContext ctx, serializationProvider.serializerFor(responseEncoding, responseClass), ctx.executionContext().bufferAllocator())); } catch (Throwable t) { + LOGGER.debug("Unexpected exception from streaming endpoint for path: {}", path, t); return succeeded(newErrorResponse(responseFactory, serviceContext, null, t, ctx.executionContext().bufferAllocator())); } @@ -465,6 +476,8 @@ public HttpResponse handle(final HttpServiceContext ctx, final HttpRequest reque ctx.executionContext().bufferAllocator()).payloadBody(response, serializationProvider.serializerFor(responseEncoding, responseClass)); } catch (Throwable t) { + LOGGER.debug("Unexpected exception from blocking aggregated endpoint for path: {}", + path, t); return newErrorResponse(responseFactory, serviceContext, null, t, ctx.executionContext().bufferAllocator()); } diff --git a/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/GrpcUtils.java b/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/GrpcUtils.java index 582ca6752c..7f5d9974b1 100644 --- a/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/GrpcUtils.java +++ b/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/GrpcUtils.java @@ -38,6 +38,8 @@ import com.google.protobuf.InvalidProtocolBufferException; import com.google.rpc.Status; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.time.Duration; import java.util.Base64; @@ -487,7 +489,9 @@ private static final class StatusHolder { } } - static final class GrpcStatusUpdater extends StatelessTrailersTransformer { + private static final class GrpcStatusUpdater extends StatelessTrailersTransformer { + private static final Logger LOGGER = LoggerFactory.getLogger(GrpcStatusUpdater.class); + private final BufferAllocator allocator; private final GrpcStatus successStatus; @@ -506,6 +510,10 @@ protected HttpHeaders payloadComplete(final HttpHeaders trailers) { protected HttpHeaders payloadFailed(final Throwable cause, final HttpHeaders trailers) { setStatus(trailers, cause, allocator); // Swallow exception as we are converting it to the trailers. + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Converted an exception into grpc-status: {}", + trailers.get(GRPC_STATUS_CODE_TRAILER), cause); + } return trailers; } } diff --git a/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/NettyChannelPublisher.java b/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/NettyChannelPublisher.java index 6b0e94f899..0b76a0cfd2 100644 --- a/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/NettyChannelPublisher.java +++ b/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/NettyChannelPublisher.java @@ -22,6 +22,8 @@ import io.netty.channel.Channel; import io.netty.channel.EventLoop; import io.netty.util.ReferenceCounted; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.ArrayDeque; import java.util.Queue; @@ -37,6 +39,8 @@ import static java.util.Objects.requireNonNull; final class NettyChannelPublisher extends SubscribablePublisher { + private static final Logger LOGGER = LoggerFactory.getLogger(NettyChannelPublisher.class); + // All state is only touched from eventloop. private long requestCount; private boolean requested; @@ -252,6 +256,7 @@ private void cancel(SubscriptionImpl forSubscription) { // Subscriptions shares common state hence a requestN after termination/cancellation must be ignored return; } + LOGGER.debug("{} Cancelling subscription", channel); resetSubscription(); // If a cancel occurs with a valid subscription we need to clear any pending data and set a fatalError so that diff --git a/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/WriteStreamSubscriber.java b/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/WriteStreamSubscriber.java index e58197a4c0..978311439c 100644 --- a/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/WriteStreamSubscriber.java +++ b/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/WriteStreamSubscriber.java @@ -493,6 +493,9 @@ private boolean nettySharedPromiseTryStatus() { private void terminateSubscriber(@Nullable Throwable cause) { if (cause == null) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("{} Terminate subscriber, state: {}", channel, Integer.toString(state, 2)); + } try { observer.writeComplete(); subscriber.onComplete(); @@ -506,6 +509,10 @@ private void terminateSubscriber(@Nullable Throwable cause) { Throwable enrichedCause = enrichProtocolError.apply(cause); assignConnectionError(channel, enrichedCause); enrichedCause = !written ? new AbortedFirstWriteException(enrichedCause) : enrichedCause; + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("{} Terminate subscriber with an error, state: {}", + channel, Integer.toString(state, 2), cause); + } try { observer.writeFailed(enrichedCause); subscriber.onError(enrichedCause);