From 66c080f117f8e1228de6526869c4dbe2ac4c7eb8 Mon Sep 17 00:00:00 2001 From: Idel Pivnitskiy Date: Fri, 1 Sep 2023 09:43:57 -0700 Subject: [PATCH 1/5] Mark HTTP/2 connection as closing on exception caught Motivation: Similar to #2675, `H2ParentConnectionContext.AbstractH2ParentConnection` should mark the `Channel` as closing on `exceptionCaught`. If there is no active `Subscriber`, it should enforce channel closure using `ChannelCloseUtils`. In case users don't have offloading, there is a risk to retry on the same IO thread. We should notify `LoadBalancer` that this connection is closing to avoid retrying on the same connection. Modifications: - Invoke `parentContext.notifyOnClosing()` inside `exceptionCaught` before notifying `transportError` and failing subscriber; - Close the h2 channel if there is no active `Subscriber` to consume the error; - Remove `H2ParentConnectionContext.notifyOnClosingImpl()` that does not seem required anymore; - Enhance `ConnectionClosedAfterIoExceptionTest` to make sure we fail only the first connect attempt, we capture `RetryableException`s, and capture the callers stack trace if `client.request(...)` fails; Result: Retries always select a different connection if existing connection observes an exception. Fixes #2685. --- .../H2ClientParentConnectionContext.java | 5 +++- .../http/netty/H2ParentConnectionContext.java | 29 +++++++++++-------- .../H2ServerParentConnectionContext.java | 5 +++- .../ConnectionClosedAfterIoExceptionTest.java | 16 ++++++++-- .../H2PriorKnowledgeFeatureParityTest.java | 2 +- 5 files changed, 39 insertions(+), 18 deletions(-) diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/H2ClientParentConnectionContext.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/H2ClientParentConnectionContext.java index dc3e69c731..8ef33c83ba 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/H2ClientParentConnectionContext.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/H2ClientParentConnectionContext.java @@ -214,12 +214,15 @@ void tryCompleteSubscriber() { } @Override - void tryFailSubscriber(Throwable cause) { + boolean tryFailSubscriber(Throwable cause) { if (subscriber != null) { close(parentContext.nettyChannel(), cause); Subscriber subscriberCopy = subscriber; subscriber = null; subscriberCopy.onError(cause); + return true; + } else { + return false; } } diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/H2ParentConnectionContext.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/H2ParentConnectionContext.java index d041778814..31fa757ccf 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/H2ParentConnectionContext.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/H2ParentConnectionContext.java @@ -25,11 +25,11 @@ import io.servicetalk.transport.api.ConnectionContext; import io.servicetalk.transport.api.ConnectionObserver; import io.servicetalk.transport.api.SslConfig; +import io.servicetalk.transport.netty.internal.ChannelCloseUtils; import io.servicetalk.transport.netty.internal.FlushStrategy; import io.servicetalk.transport.netty.internal.FlushStrategyHolder; import io.servicetalk.transport.netty.internal.NettyChannelListenableAsyncCloseable; import io.servicetalk.transport.netty.internal.NettyConnectionContext; -import io.servicetalk.transport.netty.internal.NoopTransportObserver.NoopConnectionObserver; import io.servicetalk.transport.netty.internal.StacklessClosedChannelException; import io.netty.channel.Channel; @@ -44,6 +44,8 @@ import io.netty.handler.codec.http2.Http2StreamChannel; import io.netty.handler.ssl.SslCloseCompletionEvent; import io.netty.handler.ssl.SslHandshakeCompletionEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.net.SocketAddress; import java.net.SocketOption; @@ -56,12 +58,14 @@ import static io.servicetalk.http.api.HttpProtocolVersion.HTTP_2_0; import static io.servicetalk.http.netty.HttpExecutionContextUtils.channelExecutionContext; import static io.servicetalk.http.netty.NettyHttp2ExceptionUtils.wrapIfNecessary; -import static io.servicetalk.transport.netty.internal.ChannelCloseUtils.assignConnectionError; import static io.servicetalk.transport.netty.internal.NettyPipelineSslUtils.extractSslSession; import static io.servicetalk.transport.netty.internal.SocketOptionUtils.getOption; class H2ParentConnectionContext extends NettyChannelListenableAsyncCloseable implements NettyConnectionContext, HttpConnectionContext { + + private static final Logger LOGGER = LoggerFactory.getLogger(H2ParentConnectionContext.class); + final FlushStrategyHolder flushStrategyHolder; private final HttpExecutionContext executionContext; private final SingleSource.Processor transportError = newSingleProcessor(); @@ -162,10 +166,6 @@ protected final void doCloseAsyncGracefully() { }, true); } - private void notifyOnClosingImpl() { // For access from AbstractH2ParentConnection - notifyOnClosing(); - } - final void trackActiveStream(Http2StreamChannel streamChannel) { keepAliveManager.trackActiveStream(streamChannel); } @@ -195,7 +195,7 @@ abstract static class AbstractH2ParentConnection extends ChannelInboundHandlerAd abstract void tryCompleteSubscriber(); - abstract void tryFailSubscriber(Throwable cause); + abstract boolean tryFailSubscriber(Throwable cause); /** * Receive a settings frame and optionally handle the acknowledgement of the frame. @@ -236,7 +236,7 @@ public final void handlerRemoved(ChannelHandlerContext ctx) { } private void doChannelClosed(final String method) { - parentContext.notifyOnClosingImpl(); + parentContext.notifyOnClosing(); if (hasSubscriber()) { tryFailSubscriber(StacklessClosedChannelException.newInstance(H2ParentConnectionContext.class, method)); @@ -246,11 +246,16 @@ private void doChannelClosed(final String method) { @Override public final void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + // The parent channel will be closed in case of exception after the cause is propagated to subscriber. + // In case users don't have offloading, there is a risk to retry on the same IO thread. + // We should notify LoadBalancer that this connection is closing to avoid retrying on the same connection. + parentContext.notifyOnClosing(); cause = wrapIfNecessary(cause); - if (observer != NoopConnectionObserver.INSTANCE) { - assignConnectionError(ctx.channel(), cause); - } parentContext.transportError.onSuccess(cause); + if (!tryFailSubscriber(cause)) { + LOGGER.debug("{} closing h2 parent channel on exception caught", parentContext.nettyChannel(), cause); + ChannelCloseUtils.close(ctx, cause); + } } @Override @@ -283,7 +288,7 @@ public final void channelRead(ChannelHandlerContext ctx, Object msg) { // We trigger the graceful close process here (with no timeout) to make sure the socket is closed once // the existing streams are closed. The MultiplexCodec may simulate a GOAWAY when the stream IDs are // exhausted so we shouldn't rely upon our peer to close the transport. - parentContext.keepAliveManager.initiateGracefulClose(parentContext::notifyOnClosingImpl, false); + parentContext.keepAliveManager.initiateGracefulClose(parentContext::notifyOnClosing, false); } else if (msg instanceof Http2PingFrame) { parentContext.keepAliveManager.pingReceived((Http2PingFrame) msg); } else if (!(msg instanceof Http2SettingsAckFrame)) { // we ignore SETTINGS(ACK) diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/H2ServerParentConnectionContext.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/H2ServerParentConnectionContext.java index 2e6551500a..98e88d8e32 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/H2ServerParentConnectionContext.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/H2ServerParentConnectionContext.java @@ -236,12 +236,15 @@ void tryCompleteSubscriber() { } @Override - void tryFailSubscriber(Throwable cause) { + boolean tryFailSubscriber(Throwable cause) { if (subscriber != null) { ChannelCloseUtils.close(parentContext.nettyChannel(), cause); Subscriber subscriberCopy = subscriber; subscriber = null; subscriberCopy.onError(cause); + return true; + } else { + return false; } } diff --git a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/ConnectionClosedAfterIoExceptionTest.java b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/ConnectionClosedAfterIoExceptionTest.java index 1a5fe76b0f..a8aa15064d 100644 --- a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/ConnectionClosedAfterIoExceptionTest.java +++ b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/ConnectionClosedAfterIoExceptionTest.java @@ -28,6 +28,7 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandler; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.extension.RegisterExtension; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.EnumSource; @@ -37,6 +38,7 @@ import java.nio.channels.ClosedChannelException; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import javax.annotation.Nullable; @@ -71,9 +73,11 @@ class ConnectionClosedAfterIoExceptionTest { void test(HttpProtocol protocol) throws Exception { AtomicReference firstConnection = new AtomicReference<>(); BlockingQueue errors = new LinkedBlockingQueue<>(); + AtomicBoolean firstAttempt = new AtomicBoolean(true); try (ServerContext serverContext = BuilderUtils.newServerBuilder(SERVER_CTX, protocol) // Fail only the first connect attempt - .appendEarlyConnectionAcceptor(conn -> errors.isEmpty() ? failed(DELIBERATE_EXCEPTION) : completed()) + .appendEarlyConnectionAcceptor(conn -> firstAttempt.compareAndSet(true, false) ? + failed(DELIBERATE_EXCEPTION) : completed()) .listenBlockingAndAwait((ctx, request, responseFactory) -> responseFactory.ok()); BlockingHttpClient client = BuilderUtils.newClientBuilder(serverContext, CLIENT_CTX, protocol) .appendConnectionFactoryFilter(original -> new DelegatingConnectionFactory newConnection(InetSocketAddress } }) .appendClientFilter(new RetryingHttpRequesterFilter.Builder() + .retryRetryableExceptions((metaData, t) -> { + errors.add((Throwable) t); + return BackOffPolicy.ofImmediateBounded(); + }) .retryOther((metaData, t) -> { errors.add(t); return BackOffPolicy.ofImmediateBounded(); @@ -93,8 +101,10 @@ public Single newConnection(InetSocketAddress .build()) .buildBlocking()) { - HttpResponse response = client.request(client.get("/")); - assertThat(response.status(), is(OK)); + Assertions.assertDoesNotThrow(() -> { + HttpResponse response = client.request(client.get("/")); + assertThat(response.status(), is(OK)); + }); assertThat("Unexpected number of errors, likely retried more than expected", errors, hasSize(1)); assertThat("Did not propagate original IoException", errors.poll(), diff --git a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/H2PriorKnowledgeFeatureParityTest.java b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/H2PriorKnowledgeFeatureParityTest.java index a41602bd61..1d80cb5949 100644 --- a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/H2PriorKnowledgeFeatureParityTest.java +++ b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/H2PriorKnowledgeFeatureParityTest.java @@ -1350,7 +1350,7 @@ public Single handle(final HttpServiceContext ctx, } // We expect this to timeout, because we have not completed the outstanding request. - assertFalse(onServerCloseLatch.await(30, MILLISECONDS)); + assertFalse(onServerCloseLatch.await(300, MILLISECONDS)); requestBody.onComplete(); From 6713504040714d4f87688cf96573330ceb7d256f Mon Sep 17 00:00:00 2001 From: Idel Pivnitskiy Date: Fri, 13 Oct 2023 08:49:34 -0700 Subject: [PATCH 2/5] Add logging for tests --- servicetalk-http-netty/build.gradle | 6 ++++++ .../servicetalk/http/netty/H2ParentConnectionContext.java | 2 +- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/servicetalk-http-netty/build.gradle b/servicetalk-http-netty/build.gradle index a234e21cc7..9a66e18029 100644 --- a/servicetalk-http-netty/build.gradle +++ b/servicetalk-http-netty/build.gradle @@ -108,3 +108,9 @@ task testProps(type: Test) { test.dependsOn testProps // ServiceTalkLibraryPlugin adds a spotbugs task for each sourceSet, we don't need it. spotbugsTestProps.enabled = false + +tasks.withType(Test).all { + testLogging { + showStandardStreams = true + } +} diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/H2ParentConnectionContext.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/H2ParentConnectionContext.java index 31fa757ccf..0bd6ee7548 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/H2ParentConnectionContext.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/H2ParentConnectionContext.java @@ -253,7 +253,7 @@ public final void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause = wrapIfNecessary(cause); parentContext.transportError.onSuccess(cause); if (!tryFailSubscriber(cause)) { - LOGGER.debug("{} closing h2 parent channel on exception caught", parentContext.nettyChannel(), cause); + LOGGER.error("{} closing h2 parent channel on exception caught", parentContext.nettyChannel(), cause); ChannelCloseUtils.close(ctx, cause); } } From 827e5ea3d673a8eea6077a301c069190d02c42d4 Mon Sep 17 00:00:00 2001 From: Idel Pivnitskiy Date: Fri, 13 Oct 2023 09:08:56 -0700 Subject: [PATCH 3/5] Revert notifyOnClosingImpl --- .../http/netty/H2ParentConnectionContext.java | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/H2ParentConnectionContext.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/H2ParentConnectionContext.java index 0bd6ee7548..e58ca884e4 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/H2ParentConnectionContext.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/H2ParentConnectionContext.java @@ -166,6 +166,14 @@ protected final void doCloseAsyncGracefully() { }, true); } + /** + * This method is required to access notifyOnClosing() from AbstractH2ParentConnection, because usage of + * {@code parentContext.notifyOnClosing()} directly triggers {@link java.lang.IllegalAccessError}. + */ + private void notifyOnClosingImpl() { + notifyOnClosing(); + } + final void trackActiveStream(Http2StreamChannel streamChannel) { keepAliveManager.trackActiveStream(streamChannel); } @@ -236,7 +244,7 @@ public final void handlerRemoved(ChannelHandlerContext ctx) { } private void doChannelClosed(final String method) { - parentContext.notifyOnClosing(); + parentContext.notifyOnClosingImpl(); if (hasSubscriber()) { tryFailSubscriber(StacklessClosedChannelException.newInstance(H2ParentConnectionContext.class, method)); @@ -249,7 +257,7 @@ public final void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // The parent channel will be closed in case of exception after the cause is propagated to subscriber. // In case users don't have offloading, there is a risk to retry on the same IO thread. // We should notify LoadBalancer that this connection is closing to avoid retrying on the same connection. - parentContext.notifyOnClosing(); + parentContext.notifyOnClosingImpl(); cause = wrapIfNecessary(cause); parentContext.transportError.onSuccess(cause); if (!tryFailSubscriber(cause)) { @@ -288,7 +296,7 @@ public final void channelRead(ChannelHandlerContext ctx, Object msg) { // We trigger the graceful close process here (with no timeout) to make sure the socket is closed once // the existing streams are closed. The MultiplexCodec may simulate a GOAWAY when the stream IDs are // exhausted so we shouldn't rely upon our peer to close the transport. - parentContext.keepAliveManager.initiateGracefulClose(parentContext::notifyOnClosing, false); + parentContext.keepAliveManager.initiateGracefulClose(parentContext::notifyOnClosingImpl, false); } else if (msg instanceof Http2PingFrame) { parentContext.keepAliveManager.pingReceived((Http2PingFrame) msg); } else if (!(msg instanceof Http2SettingsAckFrame)) { // we ignore SETTINGS(ACK) From 1114a180d49a37795ae056f6423db571608a01b5 Mon Sep 17 00:00:00 2001 From: Idel Pivnitskiy Date: Fri, 13 Oct 2023 09:37:40 -0700 Subject: [PATCH 4/5] Revert back intermediate logging --- servicetalk-http-netty/build.gradle | 6 ------ .../servicetalk/http/netty/H2ParentConnectionContext.java | 2 +- 2 files changed, 1 insertion(+), 7 deletions(-) diff --git a/servicetalk-http-netty/build.gradle b/servicetalk-http-netty/build.gradle index 9a66e18029..a234e21cc7 100644 --- a/servicetalk-http-netty/build.gradle +++ b/servicetalk-http-netty/build.gradle @@ -108,9 +108,3 @@ task testProps(type: Test) { test.dependsOn testProps // ServiceTalkLibraryPlugin adds a spotbugs task for each sourceSet, we don't need it. spotbugsTestProps.enabled = false - -tasks.withType(Test).all { - testLogging { - showStandardStreams = true - } -} diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/H2ParentConnectionContext.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/H2ParentConnectionContext.java index e58ca884e4..2fbf4c590a 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/H2ParentConnectionContext.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/H2ParentConnectionContext.java @@ -261,7 +261,7 @@ public final void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause = wrapIfNecessary(cause); parentContext.transportError.onSuccess(cause); if (!tryFailSubscriber(cause)) { - LOGGER.error("{} closing h2 parent channel on exception caught", parentContext.nettyChannel(), cause); + LOGGER.debug("{} closing h2 parent channel on exception caught", parentContext.nettyChannel(), cause); ChannelCloseUtils.close(ctx, cause); } } From 6408c2a034595d501d1c40f1afd2ff38bc9cff2f Mon Sep 17 00:00:00 2001 From: Idel Pivnitskiy Date: Fri, 13 Oct 2023 09:37:52 -0700 Subject: [PATCH 5/5] Remove `hasSubscriber()` --- .../http/netty/H2ClientParentConnectionContext.java | 5 ----- .../servicetalk/http/netty/H2ParentConnectionContext.java | 6 +----- .../http/netty/H2ServerParentConnectionContext.java | 5 ----- 3 files changed, 1 insertion(+), 15 deletions(-) diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/H2ClientParentConnectionContext.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/H2ClientParentConnectionContext.java index 8ef33c83ba..3502d5ed97 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/H2ClientParentConnectionContext.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/H2ClientParentConnectionContext.java @@ -198,11 +198,6 @@ private static final class DefaultH2ClientParentConnection extends AbstractH2Par maxConcurrencyPublisher.ignoreElements().subscribe(); } - @Override - boolean hasSubscriber() { - return subscriber != null; - } - @Override void tryCompleteSubscriber() { if (subscriber != null) { diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/H2ParentConnectionContext.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/H2ParentConnectionContext.java index 2fbf4c590a..b45303ef31 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/H2ParentConnectionContext.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/H2ParentConnectionContext.java @@ -199,8 +199,6 @@ abstract static class AbstractH2ParentConnection extends ChannelInboundHandlerAd this.observer = observer; } - abstract boolean hasSubscriber(); - abstract void tryCompleteSubscriber(); abstract boolean tryFailSubscriber(Throwable cause); @@ -246,9 +244,7 @@ public final void handlerRemoved(ChannelHandlerContext ctx) { private void doChannelClosed(final String method) { parentContext.notifyOnClosingImpl(); - if (hasSubscriber()) { - tryFailSubscriber(StacklessClosedChannelException.newInstance(H2ParentConnectionContext.class, method)); - } + tryFailSubscriber(StacklessClosedChannelException.newInstance(H2ParentConnectionContext.class, method)); parentContext.keepAliveManager.channelClosed(); } diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/H2ServerParentConnectionContext.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/H2ServerParentConnectionContext.java index 98e88d8e32..b33c291027 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/H2ServerParentConnectionContext.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/H2ServerParentConnectionContext.java @@ -220,11 +220,6 @@ private static final class DefaultH2ServerParentConnection extends AbstractH2Par this.subscriber = requireNonNull(subscriber); } - @Override - boolean hasSubscriber() { - return subscriber != null; - } - @Override void tryCompleteSubscriber() { if (subscriber != null) {