Skip to content

Commit

Permalink
More modifications
Browse files Browse the repository at this point in the history
  • Loading branch information
daschl committed Oct 23, 2023
1 parent eed7439 commit 6a77c7e
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 127 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,6 @@ final class DefaultSingleAddressHttpClientBuilder<U, R> implements SingleAddress
this.loadBalancerFactory = DefaultHttpLoadBalancerFactory.Builder.<R>fromDefaults().build();
this.serviceDiscoverer = requireNonNull(serviceDiscoverer);

connectionFilterFactory = HttpMessageDiscardWatchdogClientFilter.CONNECTION_CLEANER;
clientFilterFactory = appendFilter(clientFilterFactory, HttpMessageDiscardWatchdogClientFilter.CLIENT_CLEANER);
}

Expand Down Expand Up @@ -323,9 +322,6 @@ connectionFilterFactory, new AlpnReqRespFactoryFunc(
ctx.builder.retryingHttpRequesterFilter);
}

currClientFilterFactory = appendFilter(currClientFilterFactory,
HttpMessageDiscardWatchdogClientFilter.INSTANCE);

// Internal retries must be one of the last filters in the chain.
currClientFilterFactory = appendFilter(currClientFilterFactory, InternalRetryingHttpClientFilter.INSTANCE);
FilterableStreamingHttpClient wrappedClient =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,7 @@
* Filter which tracks HTTP responses and makes sure that if an exception is raised during filter pipeline
* processing message payload bodies are cleaned up.
*/
final class HttpMessageDiscardWatchdogClientFilter implements StreamingHttpClientFilterFactory,
StreamingHttpConnectionFilterFactory {
final class HttpMessageDiscardWatchdogClientFilter implements StreamingHttpConnectionFilterFactory {

private static final ContextMap.Key<AtomicReference<Publisher<?>>> MESSAGE_PUBLISHER_KEY = ContextMap.Key
.newKey(HttpMessageDiscardWatchdogClientFilter.class.getName() + ".messagePublisher",
Expand All @@ -60,12 +59,6 @@ final class HttpMessageDiscardWatchdogClientFilter implements StreamingHttpClien
*/
static final StreamingHttpClientFilterFactory CLIENT_CLEANER = new CleanerStreamingHttpClientFilterFactory();

/**
* Instance of {@link StreamingHttpConnectionFilterFactory} with cleaner implementation.
*/
static final StreamingHttpConnectionFilterFactory CONNECTION_CLEANER =
new CleanerStreamingHttpConnectionFilterFactory();

private HttpMessageDiscardWatchdogClientFilter() {
// Singleton
}
Expand All @@ -75,18 +68,28 @@ public StreamingHttpConnectionFilter create(final FilterableStreamingHttpConnect
return new StreamingHttpConnectionFilter(connection) {
@Override
public Single<StreamingHttpResponse> request(final StreamingHttpRequest request) {
return delegate().request(request).map(response -> trackResponsePayload(request, response));
}
};
}

@Override
public StreamingHttpClientFilter create(final FilterableStreamingHttpClient client) {
return new StreamingHttpClientFilter(client) {
@Override
protected Single<StreamingHttpResponse> request(final StreamingHttpRequester delegate,
final StreamingHttpRequest request) {
return delegate.request(request).map(response -> trackResponsePayload(request, response));
return delegate().request(request).map(response -> {
// always write the buffer publisher into the request context. When a downstream subscriber
// arrives, mark the message as subscribed explicitly (having a message present and no
// subscription is an indicator that it must be freed later on).
final AtomicReference<Publisher<?>> reference = request.context()
.computeIfAbsent(MESSAGE_PUBLISHER_KEY, key -> new AtomicReference<>());
assert reference != null;
if (reference.getAndSet(response.messageBody()) != null) {
// If a previous message exists, the Single<StreamingHttpResponse> got resubscribed to
// (i.e. during a retry) and so previous message body needs to be cleaned up by the
// user.
LOGGER.warn("Discovered un-drained HTTP response message body which has " +
"been dropped by user code - this is a strong indication of a bug " +
"in a user-defined filter. Responses (or their message body) must " +
"be fully consumed before retrying.");
}

return response.transformMessageBody(msgPublisher -> msgPublisher.beforeSubscriber(() -> {
reference.set(null);
return HttpMessageDiscardWatchdogServiceFilter.NoopSubscriber.INSTANCE;
}));
});
}
};
}
Expand All @@ -96,62 +99,6 @@ public HttpExecutionStrategy requiredOffloads() {
return HttpExecutionStrategies.offloadNone();
}

/**
* Tracks the response message payload for potential clean-up.
*
* @param request the original request.
* @param response the response on which the payload is tracked.
* @return the transformed response passed to upstream filters.
*/
private StreamingHttpResponse trackResponsePayload(final StreamingHttpRequest request,
final StreamingHttpResponse response) {
// always write the buffer publisher into the request context. When a downstream subscriber
// arrives, mark the message as subscribed explicitly (having a message present and no
// subscription is an indicator that it must be freed later on).
final AtomicReference<Publisher<?>> reference = request.context()
.computeIfAbsent(MESSAGE_PUBLISHER_KEY, key -> new AtomicReference<>());
assert reference != null;
final Publisher<?> previous = reference.getAndSet(response.messageBody());
if (previous != null) {
// If a previous message exists, the Single<StreamingHttpResponse> got resubscribed to
// (i.e. during a retry) and so previous message body needs to be cleaned up.
LOGGER.warn("Automatically draining previous HTTP response message body that was " +
"not consumed. Users-defined retry logic must drain response payload before " +
"retrying.");

previous.ignoreElements().subscribe();
}

return response.transformMessageBody(msgPublisher -> msgPublisher.beforeSubscriber(() -> {
reference.set(null);
return HttpMessageDiscardWatchdogServiceFilter.NoopSubscriber.INSTANCE;
}));
}

/**
* Cleans the message publisher for both client and connection cleaners.
*
* @param request the original request.
* @param cause the cause of the error.
* @return the (failed) response.
*/
private static Single<StreamingHttpResponse> cleanMessagePublisher(final StreamingHttpRequest request,
final Throwable cause) {
final AtomicReference<?> maybePublisher = request.context().get(MESSAGE_PUBLISHER_KEY);
if (maybePublisher != null) {
Publisher<?> message = (Publisher<?>) maybePublisher.getAndSet(null);
if (message != null) {
// No-one subscribed to the message (or there is none), so if there is a message
// proactively clean it up.
return message
.ignoreElements()
.concat(Single.<StreamingHttpResponse>failed(cause))
.shareContextOnSubscribe();
}
}
return Single.<StreamingHttpResponse>failed(cause).shareContextOnSubscribe();
}

private static final class CleanerStreamingHttpClientFilterFactory implements StreamingHttpClientFilterFactory {
@Override
public StreamingHttpClientFilter create(final FilterableStreamingHttpClient client) {
Expand All @@ -161,34 +108,18 @@ protected Single<StreamingHttpResponse> request(final StreamingHttpRequester del
final StreamingHttpRequest request) {
return delegate
.request(request)
.onErrorResume(cause -> cleanMessagePublisher(request, cause));
}
};
}

@Override
public HttpExecutionStrategy requiredOffloads() {
return HttpExecutionStrategies.offloadNone();
}
}

private static final class CleanerStreamingHttpConnectionFilterFactory
implements StreamingHttpConnectionFilterFactory {
@Override
public StreamingHttpConnectionFilter create(final FilterableStreamingHttpConnection connection) {
return new StreamingHttpConnectionFilter(connection) {
@Override
public Single<StreamingHttpResponse> request(final StreamingHttpRequest request) {
return delegate()
.request(request)
.map(response -> {
.onErrorResume(cause -> {
final AtomicReference<?> maybePublisher = request.context().get(MESSAGE_PUBLISHER_KEY);
if (maybePublisher != null) {
maybePublisher.set(null);
if (maybePublisher != null && maybePublisher.getAndSet(null) != null) {
// No-one subscribed to the message (or there is none), so if there is a message
// tell the user to clean it up.
LOGGER.warn("Discovered un-drained HTTP response message body which has " +
"been dropped by user code - this is a strong indication of a bug " +
"in a user-defined filter. Responses (or their message body) must " +
"be fully consumed before discarding.");
}
return response;
})
.onErrorResume(cause -> cleanMessagePublisher(request, cause));
return Single.<StreamingHttpResponse>failed(cause).shareContextOnSubscribe();
});
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,9 @@
*/
package io.servicetalk.http.netty;

import io.servicetalk.client.api.DelegatingConnectionFactory;
import io.servicetalk.concurrent.api.Publisher;
import io.servicetalk.concurrent.api.Single;
import io.servicetalk.concurrent.internal.DeliberateException;
import io.servicetalk.context.api.ContextMap;
import io.servicetalk.http.api.FilterableStreamingHttpConnection;
import io.servicetalk.http.api.HttpResponseStatus;
import io.servicetalk.http.api.HttpServerContext;
import io.servicetalk.http.api.StreamingHttpClient;
Expand All @@ -29,20 +26,21 @@
import io.servicetalk.http.api.StreamingHttpRequest;
import io.servicetalk.http.api.StreamingHttpRequester;
import io.servicetalk.http.api.StreamingHttpResponse;
import io.servicetalk.transport.api.TransportObserver;
import io.servicetalk.log4j2.mdc.utils.LoggerStringWriter;
import io.servicetalk.transport.netty.internal.ExecutionContextExtension;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiFunction;
import java.util.stream.Stream;
import javax.annotation.Nullable;
Expand All @@ -64,6 +62,16 @@ final class HttpMessageDiscardWatchdogClientFilterTest {
ExecutionContextExtension.cached("client-io", "client-executor")
.setClassLevel(true);

@BeforeEach
public void setup() {
LoggerStringWriter.reset();
}

@AfterEach
public void tearDown() {
LoggerStringWriter.remove();
}

/**
* Asserts that the response message payload is cleaned up properly if discarded in a filter and not
* properly cleaned up by the filter body.
Expand All @@ -74,25 +82,12 @@ void cleansClientResponseMessageBodyIfDiscarded(final FilterType filterType,
final @Nullable Class<?> expectedException,
ResponseTransformer transformer)
throws Exception {
final AtomicLong numConnectionsOpened = new AtomicLong(0);

try (HttpServerContext serverContext = newServerBuilder(SERVER_CTX)
.listenStreamingAndAwait((ctx, request, responseFactory) ->
Single.fromSupplier(() -> responseFactory.ok().payloadBody(Publisher.from(ctx.executionContext()
.bufferAllocator().fromUtf8("Hello, World!")))))) {
try (StreamingHttpClient client = newClientBuilder(serverContext, CLIENT_CTX)
.appendConnectionFactoryFilter(original ->
new DelegatingConnectionFactory<InetSocketAddress,
FilterableStreamingHttpConnection>(original) {
@Override
public Single<FilterableStreamingHttpConnection> newConnection(
final InetSocketAddress inetSocketAddress,
@Nullable final ContextMap context,
@Nullable final TransportObserver observer) {
numConnectionsOpened.incrementAndGet();
return delegate().newConnection(inetSocketAddress, context, observer);
}
})
.appendConnectionFilter(c -> new StreamingHttpConnectionFilter(c) {
@Override
public Single<StreamingHttpResponse> request(final StreamingHttpRequest request) {
Expand Down Expand Up @@ -125,11 +120,16 @@ protected Single<StreamingHttpResponse> request(final StreamingHttpRequester del
response.messageBody().ignoreElements().toFuture().get();
} else {
ExecutionException ex = assertThrows(ExecutionException.class,
() -> client.request(client.get("/")).toFuture().get());
() -> client.request(client.get("/")).timeout(Duration.ofMillis(100)).toFuture().get());
System.err.println(ex);
// TODO: Connection-level stuck (or times out if applied above)
// TODO: client will raise the expected exception.
assertTrue(ex.getCause().getClass().isAssignableFrom(expectedException));
}
}
assertEquals(1, numConnectionsOpened.get());

String output = LoggerStringWriter.stableAccumulated(1000);
System.err.println(output);
}
}
}
Expand Down

0 comments on commit 6a77c7e

Please sign in to comment.