diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/DefaultSingleAddressHttpClientBuilder.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/DefaultSingleAddressHttpClientBuilder.java index dbf90e8386..76d1745efd 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/DefaultSingleAddressHttpClientBuilder.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/DefaultSingleAddressHttpClientBuilder.java @@ -140,6 +140,8 @@ final class DefaultSingleAddressHttpClientBuilder implements SingleAddress strategyComputation = new ClientStrategyInfluencerChainBuilder(); this.loadBalancerFactory = DefaultHttpLoadBalancerFactory.Builder.fromDefaults().build(); this.serviceDiscoverer = requireNonNull(serviceDiscoverer); + + clientFilterFactory = appendFilter(clientFilterFactory, HttpMessageDiscardWatchdogClientFilter.CLIENT_CLEANER); } private DefaultSingleAddressHttpClientBuilder(@Nullable final U address, @@ -254,10 +256,14 @@ public HttpExecutionStrategy executionStrategy() { final StreamingHttpRequestResponseFactory reqRespFactory = defaultReqRespFactory(roConfig, executionContext.bufferAllocator()); - final StreamingHttpConnectionFilterFactory connectionFilterFactory = + StreamingHttpConnectionFilterFactory connectionFilterFactory = ctx.builder.addIdleTimeoutConnectionFilter ? appendConnectionFilter(ctx.builder.connectionFilterFactory, DEFAULT_IDLE_TIMEOUT_FILTER) : ctx.builder.connectionFilterFactory; + + connectionFilterFactory = appendConnectionFilter(connectionFilterFactory, + HttpMessageDiscardWatchdogClientFilter.INSTANCE); + if (roConfig.isH2PriorKnowledge() && // Direct connection or HTTP proxy (!roConfig.hasProxy() || sslContext == null)) { @@ -296,6 +302,7 @@ connectionFilterFactory, new AlpnReqRespFactoryFunc( targetAddress(ctx))); ContextAwareStreamingHttpClientFilterFactory currClientFilterFactory = ctx.builder.clientFilterFactory; + if (roConfig.hasProxy() && sslContext == null) { // If we're talking to a proxy over http (not https), rewrite the request-target to absolute-form, as // specified by the RFC: https://tools.ietf.org/html/rfc7230#section-5.3.2 @@ -314,7 +321,8 @@ connectionFilterFactory, new AlpnReqRespFactoryFunc( currClientFilterFactory = appendFilter(currClientFilterFactory, ctx.builder.retryingHttpRequesterFilter); } - // Internal retries must be the last filter in the chain, right before LoadBalancedStreamingHttpClient. + + // Internal retries must be one of the last filters in the chain. currClientFilterFactory = appendFilter(currClientFilterFactory, InternalRetryingHttpClientFilter.INSTANCE); FilterableStreamingHttpClient wrappedClient = currClientFilterFactory.create(lbClient, lb.eventStream(), ctx.sdStatus); diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/HttpMessageDiscardWatchdogClientFilter.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/HttpMessageDiscardWatchdogClientFilter.java new file mode 100644 index 0000000000..3a38b74c7f --- /dev/null +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/HttpMessageDiscardWatchdogClientFilter.java @@ -0,0 +1,131 @@ +/* + * Copyright © 2023 Apple Inc. and the ServiceTalk project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.servicetalk.http.netty; + +import io.servicetalk.concurrent.api.Publisher; +import io.servicetalk.concurrent.api.Single; +import io.servicetalk.context.api.ContextMap; +import io.servicetalk.http.api.FilterableStreamingHttpClient; +import io.servicetalk.http.api.FilterableStreamingHttpConnection; +import io.servicetalk.http.api.HttpExecutionStrategies; +import io.servicetalk.http.api.HttpExecutionStrategy; +import io.servicetalk.http.api.StreamingHttpClientFilter; +import io.servicetalk.http.api.StreamingHttpClientFilterFactory; +import io.servicetalk.http.api.StreamingHttpConnectionFilter; +import io.servicetalk.http.api.StreamingHttpConnectionFilterFactory; +import io.servicetalk.http.api.StreamingHttpRequest; +import io.servicetalk.http.api.StreamingHttpRequester; +import io.servicetalk.http.api.StreamingHttpResponse; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.atomic.AtomicReference; + +import static io.servicetalk.http.netty.HttpMessageDiscardWatchdogServiceFilter.generifyAtomicReference; + +/** + * Filter which tracks message bodies and warns if they are not discarded properly. + */ +final class HttpMessageDiscardWatchdogClientFilter implements StreamingHttpConnectionFilterFactory { + + private static final ContextMap.Key>> MESSAGE_PUBLISHER_KEY = ContextMap.Key + .newKey(HttpMessageDiscardWatchdogClientFilter.class.getName() + ".messagePublisher", + generifyAtomicReference()); + + private static final Logger LOGGER = LoggerFactory.getLogger(HttpMessageDiscardWatchdogClientFilter.class); + + /** + * Instance of {@link HttpMessageDiscardWatchdogClientFilter}. + */ + static final HttpMessageDiscardWatchdogClientFilter INSTANCE = new HttpMessageDiscardWatchdogClientFilter(); + + /** + * Instance of {@link StreamingHttpClientFilterFactory} with the cleaner implementation. + */ + static final StreamingHttpClientFilterFactory CLIENT_CLEANER = new CleanerStreamingHttpClientFilterFactory(); + + private HttpMessageDiscardWatchdogClientFilter() { + // Singleton + } + + @Override + public StreamingHttpConnectionFilter create(final FilterableStreamingHttpConnection connection) { + return new StreamingHttpConnectionFilter(connection) { + @Override + public Single request(final StreamingHttpRequest request) { + return delegate().request(request).map(response -> { + // always write the buffer publisher into the request context. When a downstream subscriber + // arrives, mark the message as subscribed explicitly (having a message present and no + // subscription is an indicator that it must be freed later on). + final AtomicReference> reference = request.context() + .computeIfAbsent(MESSAGE_PUBLISHER_KEY, key -> new AtomicReference<>()); + assert reference != null; + if (reference.getAndSet(response.messageBody()) != null) { + // If a previous message exists, the Single got resubscribed to + // (i.e. during a retry) and so previous message body needs to be cleaned up by the + // user. + LOGGER.warn("Discovered un-drained HTTP response message body which has " + + "been dropped by user code - this is a strong indication of a bug " + + "in a user-defined filter. Response payload (message) body must " + + "be fully consumed before retrying."); + } + + return response.transformMessageBody(msgPublisher -> msgPublisher.beforeSubscriber(() -> { + reference.set(null); + return HttpMessageDiscardWatchdogServiceFilter.NoopSubscriber.INSTANCE; + })); + }); + } + }; + } + + @Override + public HttpExecutionStrategy requiredOffloads() { + return HttpExecutionStrategies.offloadNone(); + } + + private static final class CleanerStreamingHttpClientFilterFactory implements StreamingHttpClientFilterFactory { + @Override + public StreamingHttpClientFilter create(final FilterableStreamingHttpClient client) { + return new StreamingHttpClientFilter(client) { + @Override + protected Single request(final StreamingHttpRequester delegate, + final StreamingHttpRequest request) { + return delegate + .request(request) + .onErrorResume(cause -> { + final AtomicReference maybePublisher = request.context().get(MESSAGE_PUBLISHER_KEY); + if (maybePublisher != null && maybePublisher.getAndSet(null) != null) { + // No-one subscribed to the message (or there is none), so if there is a message + // tell the user to clean it up. + LOGGER.warn("Discovered un-drained HTTP response message body which has " + + "been dropped by user code - this is a strong indication of a bug " + + "in a user-defined filter. Response payload (message) body must " + + "be fully consumed before discarding."); + } + return Single.failed(cause).shareContextOnSubscribe(); + }); + } + }; + } + + @Override + public HttpExecutionStrategy requiredOffloads() { + return HttpExecutionStrategies.offloadNone(); + } + } +} diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/HttpMessageDiscardWatchdogServiceFilter.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/HttpMessageDiscardWatchdogServiceFilter.java index bf46ad0ff7..875acf0291 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/HttpMessageDiscardWatchdogServiceFilter.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/HttpMessageDiscardWatchdogServiceFilter.java @@ -40,7 +40,7 @@ import javax.annotation.Nullable; /** - * Filter which tracks HTTP messages sent by the service, so it can be freed if discarded in the pipeline. + * Filter which tracks message bodies and warns if they are not discarded properly. */ final class HttpMessageDiscardWatchdogServiceFilter implements StreamingHttpServiceFilterFactory { @@ -57,9 +57,9 @@ final class HttpMessageDiscardWatchdogServiceFilter implements StreamingHttpServ static final StreamingHttpServiceFilterFactory CLEANER = new HttpLifecycleObserverServiceFilter(new CleanerHttpLifecycleObserver()); - static final ContextMap.Key>> MESSAGE_PUBLISHER_KEY = ContextMap.Key + private static final ContextMap.Key>> MESSAGE_PUBLISHER_KEY = ContextMap.Key .newKey(HttpMessageDiscardWatchdogServiceFilter.class.getName() + ".messagePublisher", - generify(AtomicReference.class)); + generifyAtomicReference()); private HttpMessageDiscardWatchdogServiceFilter() { // Singleton @@ -93,10 +93,7 @@ public Single handle(final HttpServiceContext ctx, } return response.transformMessageBody(msgPublisher -> msgPublisher.beforeSubscriber(() -> { - final AtomicReference maybePublisher = request.context().get(MESSAGE_PUBLISHER_KEY); - if (maybePublisher != null) { - maybePublisher.set(null); - } + reference.set(null); return NoopSubscriber.INSTANCE; })); }); @@ -110,11 +107,11 @@ public HttpExecutionStrategy requiredOffloads() { } @SuppressWarnings("unchecked") - private static Class generify(final Class clazz) { - return (Class) clazz; + static Class generifyAtomicReference() { + return (Class) AtomicReference.class; } - private static final class NoopSubscriber implements PublisherSource.Subscriber { + static final class NoopSubscriber implements PublisherSource.Subscriber { static final NoopSubscriber INSTANCE = new NoopSubscriber(); diff --git a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/HttpMessageDiscardWatchdogClientFilterTest.java b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/HttpMessageDiscardWatchdogClientFilterTest.java new file mode 100644 index 0000000000..a46963eae9 --- /dev/null +++ b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/HttpMessageDiscardWatchdogClientFilterTest.java @@ -0,0 +1,197 @@ +/* + * Copyright © 2023 Apple Inc. and the ServiceTalk project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.servicetalk.http.netty; + +import io.servicetalk.concurrent.api.Publisher; +import io.servicetalk.concurrent.api.Single; +import io.servicetalk.concurrent.internal.DeliberateException; +import io.servicetalk.http.api.HttpResponseStatus; +import io.servicetalk.http.api.HttpServerContext; +import io.servicetalk.http.api.StreamingHttpClient; +import io.servicetalk.http.api.StreamingHttpClientFilter; +import io.servicetalk.http.api.StreamingHttpConnectionFilter; +import io.servicetalk.http.api.StreamingHttpRequest; +import io.servicetalk.http.api.StreamingHttpRequester; +import io.servicetalk.http.api.StreamingHttpResponse; +import io.servicetalk.log4j2.mdc.utils.LoggerStringWriter; +import io.servicetalk.transport.netty.internal.ExecutionContextExtension; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.function.BiFunction; +import java.util.stream.Stream; +import javax.annotation.Nullable; + +import static io.servicetalk.http.netty.BuilderUtils.newClientBuilder; +import static io.servicetalk.http.netty.BuilderUtils.newServerBuilder; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assumptions.assumeTrue; + +final class HttpMessageDiscardWatchdogClientFilterTest { + + @RegisterExtension + static final ExecutionContextExtension SERVER_CTX = + ExecutionContextExtension.cached("server-io", "server-executor") + .setClassLevel(true); + @RegisterExtension + static final ExecutionContextExtension CLIENT_CTX = + ExecutionContextExtension.cached("client-io", "client-executor") + .setClassLevel(true); + + @BeforeEach + public void setup() { + LoggerStringWriter.reset(); + } + + @AfterEach + public void tearDown() { + LoggerStringWriter.remove(); + } + + /** + * Asserts that the response message payload is cleaned up properly if discarded in a filter and not + * properly cleaned up by the filter body. + */ + @ParameterizedTest(name = "{displayName} [{index}] filterType={0} expectedException={1} transformer={2}") + @MethodSource("responseTransformers") + void warnsIfDiscarded(final FilterType filterType, + final @Nullable Class expectedException, + ResponseTransformer transformer) + throws Exception { + // TODO: CONNECTION type filters currently time out instead of propagating the expectedException. + // TODO: Once the root cause has been identified, those tests should be re-enabled again. + assumeTrue(filterType == FilterType.CLIENT || expectedException == null); + + try (HttpServerContext serverContext = newServerBuilder(SERVER_CTX) + .listenStreamingAndAwait((ctx, request, responseFactory) -> + Single.fromSupplier(() -> responseFactory.ok().payloadBody(Publisher.from(ctx.executionContext() + .bufferAllocator().fromUtf8("Hello, World!")))))) { + try (StreamingHttpClient client = newClientBuilder(serverContext, CLIENT_CTX) + .appendConnectionFilter(c -> new StreamingHttpConnectionFilter(c) { + @Override + public Single request(final StreamingHttpRequest request) { + if (filterType.equals(FilterType.CONNECTION)) { + return transformer.apply(delegate(), request); + } else { + return delegate().request(request); + } + } + }) + .appendClientFilter(c -> new StreamingHttpClientFilter(c) { + @Override + protected Single request(final StreamingHttpRequester delegate, + final StreamingHttpRequest request) { + if (filterType.equals(FilterType.CLIENT)) { + return transformer.apply(delegate, request); + } else { + return delegate.request(request); + } + } + }) + .buildStreaming()) { + + int numRequests = 5; + for (int i = 0; i < numRequests; i++) { + if (expectedException == null) { + StreamingHttpResponse response = client.request(client.get("/")).toFuture().get(); + assertEquals(HttpResponseStatus.OK, response.status()); + // Consume the body to release the connection back to the pool + response.messageBody().ignoreElements().toFuture().get(); + } else { + ExecutionException ex = assertThrows(ExecutionException.class, + () -> client.request(client.get("/")).toFuture().get()); + assertTrue(ex.getCause().getClass().isAssignableFrom(expectedException)); + } + } + + String output = LoggerStringWriter.stableAccumulated(1000); + System.err.println(output); + } + } + } + + private enum FilterType { + CLIENT, + CONNECTION + } + + private static Stream responseTransformers() { + final List arguments = new ArrayList<>(); + + for (FilterType filterType : FilterType.values()) { + arguments.addAll(Arrays.asList( + Arguments.of(filterType, null, new ResponseTransformer() { + @Override + public Single apply(final StreamingHttpRequester requester, + final StreamingHttpRequest request) { + return requester.request(request); + } + + @Override + public String toString() { + return "Just delegation, no failure"; + } + }), + Arguments.of(filterType, DeliberateException.class, new ResponseTransformer() { + @Override + public Single apply(final StreamingHttpRequester requester, + final StreamingHttpRequest request) { + return requester + .request(request) + .map(dropped -> { + throw new DeliberateException(); + }); + } + + @Override + public String toString() { + return "Throws exception in filter which drops message"; + } + }), + Arguments.of(filterType, DeliberateException.class, new ResponseTransformer() { + @Override + public Single apply(final StreamingHttpRequester requester, + final StreamingHttpRequest request) { + return requester + .request(request) + .flatMap(dropped -> Single.failed(new DeliberateException())); + } + + @Override + public String toString() { + return "Returns a failed Single which drops message"; + } + }) + )); + } + + return arguments.stream(); + } + + interface ResponseTransformer + extends BiFunction> { } +} diff --git a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/HttpMessageDiscardWatchdogServiceFilterTest.java b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/HttpMessageDiscardWatchdogServiceFilterTest.java index 1ed74db563..6a1b97415a 100644 --- a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/HttpMessageDiscardWatchdogServiceFilterTest.java +++ b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/HttpMessageDiscardWatchdogServiceFilterTest.java @@ -69,7 +69,7 @@ public void tearDown() { @ParameterizedTest(name = "{displayName} [{index}] transformer={0}") @MethodSource("responseTransformers") - void cleansPayloadBodyIfDiscardedInFilter(final ResponseTransformer transformer) throws Exception { + void warnsIfDiscarded(final ResponseTransformer transformer) throws Exception { try (HttpServerContext serverContext = newServerBuilder(SERVER_CTX) .appendServiceFilter(service -> new StreamingHttpServiceFilter(service) { @Override