Skip to content

Commit

Permalink
Add more logging for GrpcRouter and other transport classes (#2055)
Browse files Browse the repository at this point in the history
Motivation:

Better visibility of the request-response processing.

Modifications:

- Add debug level logging when route fails with an error in `GrpcRouter`;
- Add debug level logging when `GrpcStatusUpdater` converts an exception
into a `grpc-status` code;
- Add more debug logging in `NettyChannelPublisher` and
`WriteStreamSubscriber`;

Result:

Easier debugging.
  • Loading branch information
idelpivnitskiy committed Jan 13, 2022
1 parent 65fe5cb commit 22ae4cb
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@
import io.servicetalk.transport.api.ExecutionContext;
import io.servicetalk.transport.api.ServerContext;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -90,6 +93,8 @@
* implementation of a <a href="https://www.grpc.io">gRPC</a> method.
*/
final class GrpcRouter {
private static final Logger LOGGER = LoggerFactory.getLogger(GrpcRouter.class);

private final Map<String, RouteProvider> routes;
private final Map<String, RouteProvider> streamingRoutes;
private final Map<String, RouteProvider> blockingRoutes;
Expand Down Expand Up @@ -264,9 +269,14 @@ public Single<HttpResponse> handle(final HttpServiceContext ctx, final HttpReque
.payloadBody(rawResp,
serializationProvider.serializerFor(responseEncoding,
responseClass)))
.onErrorReturn(cause -> newErrorResponse(responseFactory, finalServiceContext,
null, cause, ctx.executionContext().bufferAllocator()));
.onErrorReturn(cause -> {
LOGGER.debug("Unexpected exception from aggregated response for path : {}",
path, cause);
return newErrorResponse(responseFactory, finalServiceContext,
null, cause, ctx.executionContext().bufferAllocator());
});
} catch (Throwable t) {
LOGGER.debug("Unexpected exception from aggregated endpoint for path: {}", path, t);
return succeeded(newErrorResponse(responseFactory, serviceContext, null, t,
ctx.executionContext().bufferAllocator()));
}
Expand Down Expand Up @@ -320,6 +330,7 @@ public Single<StreamingHttpResponse> handle(final HttpServiceContext ctx,
serializationProvider.serializerFor(responseEncoding, responseClass),
ctx.executionContext().bufferAllocator()));
} catch (Throwable t) {
LOGGER.debug("Unexpected exception from streaming endpoint for path: {}", path, t);
return succeeded(newErrorResponse(responseFactory, serviceContext, null, t,
ctx.executionContext().bufferAllocator()));
}
Expand Down Expand Up @@ -465,6 +476,8 @@ public HttpResponse handle(final HttpServiceContext ctx, final HttpRequest reque
ctx.executionContext().bufferAllocator()).payloadBody(response,
serializationProvider.serializerFor(responseEncoding, responseClass));
} catch (Throwable t) {
LOGGER.debug("Unexpected exception from blocking aggregated endpoint for path: {}",
path, t);
return newErrorResponse(responseFactory, serviceContext, null, t,
ctx.executionContext().bufferAllocator());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@

import com.google.protobuf.InvalidProtocolBufferException;
import com.google.rpc.Status;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.Base64;
Expand Down Expand Up @@ -487,7 +489,9 @@ private static final class StatusHolder {
}
}

static final class GrpcStatusUpdater extends StatelessTrailersTransformer<Buffer> {
private static final class GrpcStatusUpdater extends StatelessTrailersTransformer<Buffer> {
private static final Logger LOGGER = LoggerFactory.getLogger(GrpcStatusUpdater.class);

private final BufferAllocator allocator;
private final GrpcStatus successStatus;

Expand All @@ -506,6 +510,10 @@ protected HttpHeaders payloadComplete(final HttpHeaders trailers) {
protected HttpHeaders payloadFailed(final Throwable cause, final HttpHeaders trailers) {
setStatus(trailers, cause, allocator);
// Swallow exception as we are converting it to the trailers.
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Converted an exception into grpc-status: {}",
trailers.get(GRPC_STATUS_CODE_TRAILER), cause);
}
return trailers;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import io.netty.channel.Channel;
import io.netty.channel.EventLoop;
import io.netty.util.ReferenceCounted;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayDeque;
import java.util.Queue;
Expand All @@ -37,6 +39,8 @@
import static java.util.Objects.requireNonNull;

final class NettyChannelPublisher<T> extends SubscribablePublisher<T> {
private static final Logger LOGGER = LoggerFactory.getLogger(NettyChannelPublisher.class);

// All state is only touched from eventloop.
private long requestCount;
private boolean requested;
Expand Down Expand Up @@ -252,6 +256,7 @@ private void cancel(SubscriptionImpl forSubscription) {
// Subscriptions shares common state hence a requestN after termination/cancellation must be ignored
return;
}
LOGGER.debug("{} Cancelling subscription", channel);
resetSubscription();

// If a cancel occurs with a valid subscription we need to clear any pending data and set a fatalError so that
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,9 @@ private boolean nettySharedPromiseTryStatus() {

private void terminateSubscriber(@Nullable Throwable cause) {
if (cause == null) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("{} Terminate subscriber, state: {}", channel, Integer.toString(state, 2));
}
try {
observer.writeComplete();
subscriber.onComplete();
Expand All @@ -506,6 +509,10 @@ private void terminateSubscriber(@Nullable Throwable cause) {
Throwable enrichedCause = enrichProtocolError.apply(cause);
assignConnectionError(channel, enrichedCause);
enrichedCause = !written ? new AbortedFirstWriteException(enrichedCause) : enrichedCause;
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("{} Terminate subscriber with an error, state: {}",
channel, Integer.toString(state, 2), cause);
}
try {
observer.writeFailed(enrichedCause);
subscriber.onError(enrichedCause);
Expand Down

0 comments on commit 22ae4cb

Please sign in to comment.