diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/DefaultSingleAddressHttpClientBuilder.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/DefaultSingleAddressHttpClientBuilder.java index a761fbfa70..76d1745efd 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/DefaultSingleAddressHttpClientBuilder.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/DefaultSingleAddressHttpClientBuilder.java @@ -141,7 +141,6 @@ final class DefaultSingleAddressHttpClientBuilder implements SingleAddress this.loadBalancerFactory = DefaultHttpLoadBalancerFactory.Builder.fromDefaults().build(); this.serviceDiscoverer = requireNonNull(serviceDiscoverer); - connectionFilterFactory = HttpMessageDiscardWatchdogClientFilter.CONNECTION_CLEANER; clientFilterFactory = appendFilter(clientFilterFactory, HttpMessageDiscardWatchdogClientFilter.CLIENT_CLEANER); } @@ -323,9 +322,6 @@ connectionFilterFactory, new AlpnReqRespFactoryFunc( ctx.builder.retryingHttpRequesterFilter); } - currClientFilterFactory = appendFilter(currClientFilterFactory, - HttpMessageDiscardWatchdogClientFilter.INSTANCE); - // Internal retries must be one of the last filters in the chain. currClientFilterFactory = appendFilter(currClientFilterFactory, InternalRetryingHttpClientFilter.INSTANCE); FilterableStreamingHttpClient wrappedClient = diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/HttpMessageDiscardWatchdogClientFilter.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/HttpMessageDiscardWatchdogClientFilter.java index c53128a595..56a67acdf8 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/HttpMessageDiscardWatchdogClientFilter.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/HttpMessageDiscardWatchdogClientFilter.java @@ -41,8 +41,7 @@ * Filter which tracks HTTP responses and makes sure that if an exception is raised during filter pipeline * processing message payload bodies are cleaned up. */ -final class HttpMessageDiscardWatchdogClientFilter implements StreamingHttpClientFilterFactory, - StreamingHttpConnectionFilterFactory { +final class HttpMessageDiscardWatchdogClientFilter implements StreamingHttpConnectionFilterFactory { private static final ContextMap.Key>> MESSAGE_PUBLISHER_KEY = ContextMap.Key .newKey(HttpMessageDiscardWatchdogClientFilter.class.getName() + ".messagePublisher", @@ -60,12 +59,6 @@ final class HttpMessageDiscardWatchdogClientFilter implements StreamingHttpClien */ static final StreamingHttpClientFilterFactory CLIENT_CLEANER = new CleanerStreamingHttpClientFilterFactory(); - /** - * Instance of {@link StreamingHttpConnectionFilterFactory} with cleaner implementation. - */ - static final StreamingHttpConnectionFilterFactory CONNECTION_CLEANER = - new CleanerStreamingHttpConnectionFilterFactory(); - private HttpMessageDiscardWatchdogClientFilter() { // Singleton } @@ -75,18 +68,28 @@ public StreamingHttpConnectionFilter create(final FilterableStreamingHttpConnect return new StreamingHttpConnectionFilter(connection) { @Override public Single request(final StreamingHttpRequest request) { - return delegate().request(request).map(response -> trackResponsePayload(request, response)); - } - }; - } - - @Override - public StreamingHttpClientFilter create(final FilterableStreamingHttpClient client) { - return new StreamingHttpClientFilter(client) { - @Override - protected Single request(final StreamingHttpRequester delegate, - final StreamingHttpRequest request) { - return delegate.request(request).map(response -> trackResponsePayload(request, response)); + return delegate().request(request).map(response -> { + // always write the buffer publisher into the request context. When a downstream subscriber + // arrives, mark the message as subscribed explicitly (having a message present and no + // subscription is an indicator that it must be freed later on). + final AtomicReference> reference = request.context() + .computeIfAbsent(MESSAGE_PUBLISHER_KEY, key -> new AtomicReference<>()); + assert reference != null; + if (reference.getAndSet(response.messageBody()) != null) { + // If a previous message exists, the Single got resubscribed to + // (i.e. during a retry) and so previous message body needs to be cleaned up by the + // user. + LOGGER.warn("Discovered un-drained HTTP response message body which has " + + "been dropped by user code - this is a strong indication of a bug " + + "in a user-defined filter. Responses (or their message body) must " + + "be fully consumed before retrying."); + } + + return response.transformMessageBody(msgPublisher -> msgPublisher.beforeSubscriber(() -> { + reference.set(null); + return HttpMessageDiscardWatchdogServiceFilter.NoopSubscriber.INSTANCE; + })); + }); } }; } @@ -96,62 +99,6 @@ public HttpExecutionStrategy requiredOffloads() { return HttpExecutionStrategies.offloadNone(); } - /** - * Tracks the response message payload for potential clean-up. - * - * @param request the original request. - * @param response the response on which the payload is tracked. - * @return the transformed response passed to upstream filters. - */ - private StreamingHttpResponse trackResponsePayload(final StreamingHttpRequest request, - final StreamingHttpResponse response) { - // always write the buffer publisher into the request context. When a downstream subscriber - // arrives, mark the message as subscribed explicitly (having a message present and no - // subscription is an indicator that it must be freed later on). - final AtomicReference> reference = request.context() - .computeIfAbsent(MESSAGE_PUBLISHER_KEY, key -> new AtomicReference<>()); - assert reference != null; - final Publisher previous = reference.getAndSet(response.messageBody()); - if (previous != null) { - // If a previous message exists, the Single got resubscribed to - // (i.e. during a retry) and so previous message body needs to be cleaned up. - LOGGER.warn("Automatically draining previous HTTP response message body that was " + - "not consumed. Users-defined retry logic must drain response payload before " + - "retrying."); - - previous.ignoreElements().subscribe(); - } - - return response.transformMessageBody(msgPublisher -> msgPublisher.beforeSubscriber(() -> { - reference.set(null); - return HttpMessageDiscardWatchdogServiceFilter.NoopSubscriber.INSTANCE; - })); - } - - /** - * Cleans the message publisher for both client and connection cleaners. - * - * @param request the original request. - * @param cause the cause of the error. - * @return the (failed) response. - */ - private static Single cleanMessagePublisher(final StreamingHttpRequest request, - final Throwable cause) { - final AtomicReference maybePublisher = request.context().get(MESSAGE_PUBLISHER_KEY); - if (maybePublisher != null) { - Publisher message = (Publisher) maybePublisher.getAndSet(null); - if (message != null) { - // No-one subscribed to the message (or there is none), so if there is a message - // proactively clean it up. - return message - .ignoreElements() - .concat(Single.failed(cause)) - .shareContextOnSubscribe(); - } - } - return Single.failed(cause).shareContextOnSubscribe(); - } - private static final class CleanerStreamingHttpClientFilterFactory implements StreamingHttpClientFilterFactory { @Override public StreamingHttpClientFilter create(final FilterableStreamingHttpClient client) { @@ -161,34 +108,18 @@ protected Single request(final StreamingHttpRequester del final StreamingHttpRequest request) { return delegate .request(request) - .onErrorResume(cause -> cleanMessagePublisher(request, cause)); - } - }; - } - - @Override - public HttpExecutionStrategy requiredOffloads() { - return HttpExecutionStrategies.offloadNone(); - } - } - - private static final class CleanerStreamingHttpConnectionFilterFactory - implements StreamingHttpConnectionFilterFactory { - @Override - public StreamingHttpConnectionFilter create(final FilterableStreamingHttpConnection connection) { - return new StreamingHttpConnectionFilter(connection) { - @Override - public Single request(final StreamingHttpRequest request) { - return delegate() - .request(request) - .map(response -> { + .onErrorResume(cause -> { final AtomicReference maybePublisher = request.context().get(MESSAGE_PUBLISHER_KEY); - if (maybePublisher != null) { - maybePublisher.set(null); + if (maybePublisher != null && maybePublisher.getAndSet(null) != null) { + // No-one subscribed to the message (or there is none), so if there is a message + // tell the user to clean it up. + LOGGER.warn("Discovered un-drained HTTP response message body which has " + + "been dropped by user code - this is a strong indication of a bug " + + "in a user-defined filter. Responses (or their message body) must " + + "be fully consumed before discarding."); } - return response; - }) - .onErrorResume(cause -> cleanMessagePublisher(request, cause)); + return Single.failed(cause).shareContextOnSubscribe(); + }); } }; } diff --git a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/HttpMessageDiscardWatchdogClientFilterTest.java b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/HttpMessageDiscardWatchdogClientFilterTest.java index 40182c2d15..48a95042a8 100644 --- a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/HttpMessageDiscardWatchdogClientFilterTest.java +++ b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/HttpMessageDiscardWatchdogClientFilterTest.java @@ -15,12 +15,9 @@ */ package io.servicetalk.http.netty; -import io.servicetalk.client.api.DelegatingConnectionFactory; import io.servicetalk.concurrent.api.Publisher; import io.servicetalk.concurrent.api.Single; import io.servicetalk.concurrent.internal.DeliberateException; -import io.servicetalk.context.api.ContextMap; -import io.servicetalk.http.api.FilterableStreamingHttpConnection; import io.servicetalk.http.api.HttpResponseStatus; import io.servicetalk.http.api.HttpServerContext; import io.servicetalk.http.api.StreamingHttpClient; @@ -29,20 +26,21 @@ import io.servicetalk.http.api.StreamingHttpRequest; import io.servicetalk.http.api.StreamingHttpRequester; import io.servicetalk.http.api.StreamingHttpResponse; -import io.servicetalk.transport.api.TransportObserver; +import io.servicetalk.log4j2.mdc.utils.LoggerStringWriter; import io.servicetalk.transport.netty.internal.ExecutionContextExtension; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.extension.RegisterExtension; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; -import java.net.InetSocketAddress; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.concurrent.ExecutionException; -import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiFunction; import java.util.stream.Stream; import javax.annotation.Nullable; @@ -64,6 +62,16 @@ final class HttpMessageDiscardWatchdogClientFilterTest { ExecutionContextExtension.cached("client-io", "client-executor") .setClassLevel(true); + @BeforeEach + public void setup() { + LoggerStringWriter.reset(); + } + + @AfterEach + public void tearDown() { + LoggerStringWriter.remove(); + } + /** * Asserts that the response message payload is cleaned up properly if discarded in a filter and not * properly cleaned up by the filter body. @@ -74,25 +82,12 @@ void cleansClientResponseMessageBodyIfDiscarded(final FilterType filterType, final @Nullable Class expectedException, ResponseTransformer transformer) throws Exception { - final AtomicLong numConnectionsOpened = new AtomicLong(0); try (HttpServerContext serverContext = newServerBuilder(SERVER_CTX) .listenStreamingAndAwait((ctx, request, responseFactory) -> Single.fromSupplier(() -> responseFactory.ok().payloadBody(Publisher.from(ctx.executionContext() .bufferAllocator().fromUtf8("Hello, World!")))))) { try (StreamingHttpClient client = newClientBuilder(serverContext, CLIENT_CTX) - .appendConnectionFactoryFilter(original -> - new DelegatingConnectionFactory(original) { - @Override - public Single newConnection( - final InetSocketAddress inetSocketAddress, - @Nullable final ContextMap context, - @Nullable final TransportObserver observer) { - numConnectionsOpened.incrementAndGet(); - return delegate().newConnection(inetSocketAddress, context, observer); - } - }) .appendConnectionFilter(c -> new StreamingHttpConnectionFilter(c) { @Override public Single request(final StreamingHttpRequest request) { @@ -125,11 +120,16 @@ protected Single request(final StreamingHttpRequester del response.messageBody().ignoreElements().toFuture().get(); } else { ExecutionException ex = assertThrows(ExecutionException.class, - () -> client.request(client.get("/")).toFuture().get()); + () -> client.request(client.get("/")).timeout(Duration.ofMillis(100)).toFuture().get()); + System.err.println(ex); + // TODO: Connection-level stuck (or times out if applied above) + // TODO: client will raise the expected exception. assertTrue(ex.getCause().getClass().isAssignableFrom(expectedException)); } } - assertEquals(1, numConnectionsOpened.get()); + + String output = LoggerStringWriter.stableAccumulated(1000); + System.err.println(output); } } }