Skip to content

Commit

Permalink
Properly drain failed responses in http-service-composition example (#…
Browse files Browse the repository at this point in the history
…3026)

Motivation:

Existing `ResponseCheckingClientFilter` demonstrated in
`servicetalk-http-service-composition` maps the response to an
exception, but does not drain the response payload body, leaking the
connection.

Modifications:

- Replace `ResponseCheckingClientFilter` with
`RetryingHttpRequesterFilter.Builder.requestMapper` feature;
- Rename `BadResponseHandlingServiceFilter` to
`HttpResponseExceptionHandlingServiceFilter` and update it to target
`HttpResponseException` instead of `BadResponseStatusException`;

Result:

We don't show users how to leak a connection :)
  • Loading branch information
idelpivnitskiy authored Jul 31, 2024
1 parent b3f8d56 commit 21c0a74
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.servicetalk.http.netty.HttpClients;
import io.servicetalk.http.netty.HttpServers;
import io.servicetalk.http.netty.RetryingHttpRequesterFilter;
import io.servicetalk.http.netty.RetryingHttpRequesterFilter.HttpResponseException;
import io.servicetalk.http.router.predicate.HttpPredicateRouterBuilder;
import io.servicetalk.http.utils.TimeoutHttpRequesterFilter;
import io.servicetalk.transport.api.HostAndPort;
Expand All @@ -36,7 +37,7 @@
import static io.servicetalk.examples.http.service.composition.backends.PortRegistry.RATINGS_BACKEND_ADDRESS;
import static io.servicetalk.examples.http.service.composition.backends.PortRegistry.RECOMMENDATIONS_BACKEND_ADDRESS;
import static io.servicetalk.examples.http.service.composition.backends.PortRegistry.USER_BACKEND_ADDRESS;
import static io.servicetalk.http.netty.RetryingHttpRequesterFilter.BackOffPolicy.ofConstantBackoffDeltaJitter;
import static io.servicetalk.http.api.HttpResponseStatus.OK;
import static io.servicetalk.http.netty.RetryingHttpRequesterFilter.BackOffPolicy.ofExponentialBackoffDeltaJitter;
import static io.servicetalk.transport.netty.NettyIoExecutors.createIoExecutor;
import static java.time.Duration.ofMillis;
Expand Down Expand Up @@ -95,7 +96,7 @@ public static void main(String[] args) throws Exception {
// Starting the server will start listening for incoming client requests.
ServerContext serverContext = HttpServers.forPort(8080)
.ioExecutor(ioExecutor)
.appendServiceFilter(new BadResponseHandlingServiceFilter())
.appendServiceFilter(new HttpResponseExceptionHandlingServiceFilter())
.listenStreamingAndAwait(gatewayService);

LOGGER.info("Listening on {}", serverContext.listenAddress());
Expand All @@ -118,8 +119,15 @@ private static StreamingHttpClient newClient(final IoExecutor ioExecutor,
.build())
// Apply a timeout filter for the client to guard against latent clients.
.appendClientFilter(new TimeoutHttpRequesterFilter(ofMillis(500), false))
// Apply a filter that returns an error if any response status code is not 200 OK
.appendClientFilter(new ResponseCheckingClientFilter(backendName))
// Apply a filter that returns an error if any response status code is not 200 OK.
// For this purpose we use a responseMapper feature of RetryingHttpRequesterFilter that can be
// additionally configured to retry those status codes, if necessary. But more important,
// it helps to properly drain the failed response without leaking unused data and resources.
.appendClientFilter(new RetryingHttpRequesterFilter.Builder()
.responseMapper(responseMetaData -> OK.equals(responseMetaData.status()) ? null :
new HttpResponseException("Bad response status from " + backendName + ": " +
responseMetaData.status(), responseMetaData))
.build())
.ioExecutor(ioExecutor)
.buildStreaming());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,36 +16,37 @@
package io.servicetalk.examples.http.service.composition;

import io.servicetalk.concurrent.api.Single;
import io.servicetalk.examples.http.service.composition.ResponseCheckingClientFilter.BadResponseStatusException;
import io.servicetalk.http.api.HttpServiceContext;
import io.servicetalk.http.api.StreamingHttpRequest;
import io.servicetalk.http.api.StreamingHttpResponse;
import io.servicetalk.http.api.StreamingHttpResponseFactory;
import io.servicetalk.http.api.StreamingHttpService;
import io.servicetalk.http.api.StreamingHttpServiceFilter;
import io.servicetalk.http.api.StreamingHttpServiceFilterFactory;
import io.servicetalk.http.netty.RetryingHttpRequesterFilter.HttpResponseException;

import static io.servicetalk.concurrent.api.Single.failed;
import static io.servicetalk.http.api.HttpSerializers.textSerializerUtf8;

/**
* Example service filter that returns a response with the exception message if the wrapped service completes with a
* {@link BadResponseStatusException}.
* {@link HttpResponseException}.
*/
final class BadResponseHandlingServiceFilter implements StreamingHttpServiceFilterFactory {
final class HttpResponseExceptionHandlingServiceFilter implements StreamingHttpServiceFilterFactory {
@Override
public StreamingHttpServiceFilter create(final StreamingHttpService service) {
return new StreamingHttpServiceFilter(service) {
@Override
public Single<StreamingHttpResponse> handle(HttpServiceContext ctx, StreamingHttpRequest request,
StreamingHttpResponseFactory responseFactory) {
return super.handle(ctx, request, responseFactory).onErrorResume(cause -> {
if (cause instanceof BadResponseStatusException) {
if (cause instanceof HttpResponseException) {
// It's useful to include the exception message in the payload for demonstration purposes, but
// this is not recommended in production as it may leak internal information.
return responseFactory.internalServerError().toResponse().map(
resp -> resp.payloadBody(cause.getMessage(), textSerializerUtf8()).toStreamingResponse());
}
// Pass all other exceptions as-is, they will be handled by default HttpExceptionMapperServiceFilter
return failed(cause);
});
}
Expand Down

This file was deleted.

0 comments on commit 21c0a74

Please sign in to comment.