Skip to content

Commit

Permalink
Mark HTTP/2 connection as closing on exception caught
Browse files Browse the repository at this point in the history
Motivation:

Similar to #2675, `H2ParentConnectionContext.AbstractH2ParentConnection`
should mark the `Channel` as closing on `exceptionCaught`. If there is
no active `Subscriber`, it should enforce channel closure using
`ChannelCloseUtils`.
In case users don't have offloading, there is a risk to retry on the
same IO thread. We should notify `LoadBalancer` that this connection is
closing to avoid retrying on the same connection.

Modifications:

- Invoke `parentContext.notifyOnClosing()` inside `exceptionCaught`
before notifying `transportError` and failing subscriber;
- Close the h2 channel if there is no active `Subscriber` to consume the
error;
- Remove `H2ParentConnectionContext.notifyOnClosingImpl()` that does not
seem required anymore;
- Enhance `ConnectionClosedAfterIoExceptionTest` to make sure we fail
only the first connect attempt, we capture `RetryableException`s, and
capture the callers stack trace if `client.request(...)` fails;

Result:

Retries always select a different connection if existing connection
observes an exception.
Fixes #2685.
  • Loading branch information
idelpivnitskiy committed Oct 11, 2023
1 parent 8b8d1c5 commit 66c080f
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -214,12 +214,15 @@ void tryCompleteSubscriber() {
}

@Override
void tryFailSubscriber(Throwable cause) {
boolean tryFailSubscriber(Throwable cause) {
if (subscriber != null) {
close(parentContext.nettyChannel(), cause);
Subscriber<? super H2ClientParentConnection> subscriberCopy = subscriber;
subscriber = null;
subscriberCopy.onError(cause);
return true;
} else {
return false;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@
import io.servicetalk.transport.api.ConnectionContext;
import io.servicetalk.transport.api.ConnectionObserver;
import io.servicetalk.transport.api.SslConfig;
import io.servicetalk.transport.netty.internal.ChannelCloseUtils;
import io.servicetalk.transport.netty.internal.FlushStrategy;
import io.servicetalk.transport.netty.internal.FlushStrategyHolder;
import io.servicetalk.transport.netty.internal.NettyChannelListenableAsyncCloseable;
import io.servicetalk.transport.netty.internal.NettyConnectionContext;
import io.servicetalk.transport.netty.internal.NoopTransportObserver.NoopConnectionObserver;
import io.servicetalk.transport.netty.internal.StacklessClosedChannelException;

import io.netty.channel.Channel;
Expand All @@ -44,6 +44,8 @@
import io.netty.handler.codec.http2.Http2StreamChannel;
import io.netty.handler.ssl.SslCloseCompletionEvent;
import io.netty.handler.ssl.SslHandshakeCompletionEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.SocketAddress;
import java.net.SocketOption;
Expand All @@ -56,12 +58,14 @@
import static io.servicetalk.http.api.HttpProtocolVersion.HTTP_2_0;
import static io.servicetalk.http.netty.HttpExecutionContextUtils.channelExecutionContext;
import static io.servicetalk.http.netty.NettyHttp2ExceptionUtils.wrapIfNecessary;
import static io.servicetalk.transport.netty.internal.ChannelCloseUtils.assignConnectionError;
import static io.servicetalk.transport.netty.internal.NettyPipelineSslUtils.extractSslSession;
import static io.servicetalk.transport.netty.internal.SocketOptionUtils.getOption;

class H2ParentConnectionContext extends NettyChannelListenableAsyncCloseable implements NettyConnectionContext,
HttpConnectionContext {

private static final Logger LOGGER = LoggerFactory.getLogger(H2ParentConnectionContext.class);

final FlushStrategyHolder flushStrategyHolder;
private final HttpExecutionContext executionContext;
private final SingleSource.Processor<Throwable, Throwable> transportError = newSingleProcessor();
Expand Down Expand Up @@ -162,10 +166,6 @@ protected final void doCloseAsyncGracefully() {
}, true);
}

private void notifyOnClosingImpl() { // For access from AbstractH2ParentConnection
notifyOnClosing();
}

final void trackActiveStream(Http2StreamChannel streamChannel) {
keepAliveManager.trackActiveStream(streamChannel);
}
Expand Down Expand Up @@ -195,7 +195,7 @@ abstract static class AbstractH2ParentConnection extends ChannelInboundHandlerAd

abstract void tryCompleteSubscriber();

abstract void tryFailSubscriber(Throwable cause);
abstract boolean tryFailSubscriber(Throwable cause);

/**
* Receive a settings frame and optionally handle the acknowledgement of the frame.
Expand Down Expand Up @@ -236,7 +236,7 @@ public final void handlerRemoved(ChannelHandlerContext ctx) {
}

private void doChannelClosed(final String method) {
parentContext.notifyOnClosingImpl();
parentContext.notifyOnClosing();

if (hasSubscriber()) {
tryFailSubscriber(StacklessClosedChannelException.newInstance(H2ParentConnectionContext.class, method));
Expand All @@ -246,11 +246,16 @@ private void doChannelClosed(final String method) {

@Override
public final void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// The parent channel will be closed in case of exception after the cause is propagated to subscriber.
// In case users don't have offloading, there is a risk to retry on the same IO thread.
// We should notify LoadBalancer that this connection is closing to avoid retrying on the same connection.
parentContext.notifyOnClosing();
cause = wrapIfNecessary(cause);
if (observer != NoopConnectionObserver.INSTANCE) {
assignConnectionError(ctx.channel(), cause);
}
parentContext.transportError.onSuccess(cause);
if (!tryFailSubscriber(cause)) {
LOGGER.debug("{} closing h2 parent channel on exception caught", parentContext.nettyChannel(), cause);
ChannelCloseUtils.close(ctx, cause);
}
}

@Override
Expand Down Expand Up @@ -283,7 +288,7 @@ public final void channelRead(ChannelHandlerContext ctx, Object msg) {
// We trigger the graceful close process here (with no timeout) to make sure the socket is closed once
// the existing streams are closed. The MultiplexCodec may simulate a GOAWAY when the stream IDs are
// exhausted so we shouldn't rely upon our peer to close the transport.
parentContext.keepAliveManager.initiateGracefulClose(parentContext::notifyOnClosingImpl, false);
parentContext.keepAliveManager.initiateGracefulClose(parentContext::notifyOnClosing, false);
} else if (msg instanceof Http2PingFrame) {
parentContext.keepAliveManager.pingReceived((Http2PingFrame) msg);
} else if (!(msg instanceof Http2SettingsAckFrame)) { // we ignore SETTINGS(ACK)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,12 +236,15 @@ void tryCompleteSubscriber() {
}

@Override
void tryFailSubscriber(Throwable cause) {
boolean tryFailSubscriber(Throwable cause) {
if (subscriber != null) {
ChannelCloseUtils.close(parentContext.nettyChannel(), cause);
Subscriber<? super H2ServerParentConnectionContext> subscriberCopy = subscriber;
subscriber = null;
subscriberCopy.onError(cause);
return true;
} else {
return false;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandler;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
Expand All @@ -37,6 +38,7 @@
import java.nio.channels.ClosedChannelException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;

Expand Down Expand Up @@ -71,9 +73,11 @@ class ConnectionClosedAfterIoExceptionTest {
void test(HttpProtocol protocol) throws Exception {
AtomicReference<FilterableStreamingHttpConnection> firstConnection = new AtomicReference<>();
BlockingQueue<Throwable> errors = new LinkedBlockingQueue<>();
AtomicBoolean firstAttempt = new AtomicBoolean(true);
try (ServerContext serverContext = BuilderUtils.newServerBuilder(SERVER_CTX, protocol)
// Fail only the first connect attempt
.appendEarlyConnectionAcceptor(conn -> errors.isEmpty() ? failed(DELIBERATE_EXCEPTION) : completed())
.appendEarlyConnectionAcceptor(conn -> firstAttempt.compareAndSet(true, false) ?
failed(DELIBERATE_EXCEPTION) : completed())
.listenBlockingAndAwait((ctx, request, responseFactory) -> responseFactory.ok());
BlockingHttpClient client = BuilderUtils.newClientBuilder(serverContext, CLIENT_CTX, protocol)
.appendConnectionFactoryFilter(original -> new DelegatingConnectionFactory<InetSocketAddress,
Expand All @@ -86,15 +90,21 @@ public Single<FilterableStreamingHttpConnection> newConnection(InetSocketAddress
}
})
.appendClientFilter(new RetryingHttpRequesterFilter.Builder()
.retryRetryableExceptions((metaData, t) -> {
errors.add((Throwable) t);
return BackOffPolicy.ofImmediateBounded();
})
.retryOther((metaData, t) -> {
errors.add(t);
return BackOffPolicy.ofImmediateBounded();
})
.build())
.buildBlocking()) {

HttpResponse response = client.request(client.get("/"));
assertThat(response.status(), is(OK));
Assertions.assertDoesNotThrow(() -> {
HttpResponse response = client.request(client.get("/"));
assertThat(response.status(), is(OK));
});

assertThat("Unexpected number of errors, likely retried more than expected", errors, hasSize(1));
assertThat("Did not propagate original IoException", errors.poll(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1350,7 +1350,7 @@ public Single<StreamingHttpResponse> handle(final HttpServiceContext ctx,
}

// We expect this to timeout, because we have not completed the outstanding request.
assertFalse(onServerCloseLatch.await(30, MILLISECONDS));
assertFalse(onServerCloseLatch.await(300, MILLISECONDS));

requestBody.onComplete();

Expand Down

0 comments on commit 66c080f

Please sign in to comment.