Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mark HTTP/2 connection as closing on exception caught #2686

Merged
merged 5 commits into from
Oct 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -198,11 +198,6 @@ private static final class DefaultH2ClientParentConnection extends AbstractH2Par
maxConcurrencyPublisher.ignoreElements().subscribe();
}

@Override
boolean hasSubscriber() {
return subscriber != null;
}

@Override
void tryCompleteSubscriber() {
if (subscriber != null) {
Expand All @@ -214,12 +209,15 @@ void tryCompleteSubscriber() {
}

@Override
void tryFailSubscriber(Throwable cause) {
boolean tryFailSubscriber(Throwable cause) {
if (subscriber != null) {
close(parentContext.nettyChannel(), cause);
Subscriber<? super H2ClientParentConnection> subscriberCopy = subscriber;
subscriber = null;
subscriberCopy.onError(cause);
return true;
} else {
return false;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@
import io.servicetalk.transport.api.ConnectionContext;
import io.servicetalk.transport.api.ConnectionObserver;
import io.servicetalk.transport.api.SslConfig;
import io.servicetalk.transport.netty.internal.ChannelCloseUtils;
import io.servicetalk.transport.netty.internal.FlushStrategy;
import io.servicetalk.transport.netty.internal.FlushStrategyHolder;
import io.servicetalk.transport.netty.internal.NettyChannelListenableAsyncCloseable;
import io.servicetalk.transport.netty.internal.NettyConnectionContext;
import io.servicetalk.transport.netty.internal.NoopTransportObserver.NoopConnectionObserver;
import io.servicetalk.transport.netty.internal.StacklessClosedChannelException;

import io.netty.channel.Channel;
Expand All @@ -44,6 +44,8 @@
import io.netty.handler.codec.http2.Http2StreamChannel;
import io.netty.handler.ssl.SslCloseCompletionEvent;
import io.netty.handler.ssl.SslHandshakeCompletionEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.SocketAddress;
import java.net.SocketOption;
Expand All @@ -56,12 +58,14 @@
import static io.servicetalk.http.api.HttpProtocolVersion.HTTP_2_0;
import static io.servicetalk.http.netty.HttpExecutionContextUtils.channelExecutionContext;
import static io.servicetalk.http.netty.NettyHttp2ExceptionUtils.wrapIfNecessary;
import static io.servicetalk.transport.netty.internal.ChannelCloseUtils.assignConnectionError;
import static io.servicetalk.transport.netty.internal.NettyPipelineSslUtils.extractSslSession;
import static io.servicetalk.transport.netty.internal.SocketOptionUtils.getOption;

class H2ParentConnectionContext extends NettyChannelListenableAsyncCloseable implements NettyConnectionContext,
HttpConnectionContext {

private static final Logger LOGGER = LoggerFactory.getLogger(H2ParentConnectionContext.class);

final FlushStrategyHolder flushStrategyHolder;
private final HttpExecutionContext executionContext;
private final SingleSource.Processor<Throwable, Throwable> transportError = newSingleProcessor();
Expand Down Expand Up @@ -162,7 +166,11 @@ protected final void doCloseAsyncGracefully() {
}, true);
}

private void notifyOnClosingImpl() { // For access from AbstractH2ParentConnection
/**
* This method is required to access notifyOnClosing() from AbstractH2ParentConnection, because usage of
* {@code parentContext.notifyOnClosing()} directly triggers {@link java.lang.IllegalAccessError}.
*/
private void notifyOnClosingImpl() {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removal of this method was the cause for H2PriorKnowledgeFeatureParityTest > serverGracefulClose failures 🤦
I'm not sure why Intellij IDEA or javac could not catch this as a problem.

    java.lang.BootstrapMethodError: java.lang.IllegalAccessError: tried to access method io.servicetalk.transport.netty.internal.NettyChannelListenableAsyncCloseable.notifyOnClosing()V from class io.servicetalk.http.netty.H2ParentConnectionContext$AbstractH2ParentConnection
    	at io.servicetalk.http.netty.H2ParentConnectionContext$AbstractH2ParentConnection.channelRead(H2ParentConnectionContext.java:291) ~[servicetalk-http-netty-0.42.39-SNAPSHOT.jar:?]

notifyOnClosing();
}

Expand Down Expand Up @@ -191,11 +199,9 @@ abstract static class AbstractH2ParentConnection extends ChannelInboundHandlerAd
this.observer = observer;
}

abstract boolean hasSubscriber();

abstract void tryCompleteSubscriber();

abstract void tryFailSubscriber(Throwable cause);
abstract boolean tryFailSubscriber(Throwable cause);

/**
* Receive a settings frame and optionally handle the acknowledgement of the frame.
Expand Down Expand Up @@ -238,19 +244,22 @@ public final void handlerRemoved(ChannelHandlerContext ctx) {
private void doChannelClosed(final String method) {
parentContext.notifyOnClosingImpl();

if (hasSubscriber()) {
tryFailSubscriber(StacklessClosedChannelException.newInstance(H2ParentConnectionContext.class, method));
}
tryFailSubscriber(StacklessClosedChannelException.newInstance(H2ParentConnectionContext.class, method));
daschl marked this conversation as resolved.
Show resolved Hide resolved
parentContext.keepAliveManager.channelClosed();
}

@Override
public final void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// The parent channel will be closed in case of exception after the cause is propagated to subscriber.
// In case users don't have offloading, there is a risk to retry on the same IO thread.
// We should notify LoadBalancer that this connection is closing to avoid retrying on the same connection.
parentContext.notifyOnClosingImpl();
cause = wrapIfNecessary(cause);
if (observer != NoopConnectionObserver.INSTANCE) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume this is done at a higher level now?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will be done a couple lines below. Either tryFailSubscriber or ChannelCloseUtils will assign a connection error before closing the channel.

assignConnectionError(ctx.channel(), cause);
}
parentContext.transportError.onSuccess(cause);
if (!tryFailSubscriber(cause)) {
LOGGER.debug("{} closing h2 parent channel on exception caught", parentContext.nettyChannel(), cause);
ChannelCloseUtils.close(ctx, cause);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,11 +220,6 @@ private static final class DefaultH2ServerParentConnection extends AbstractH2Par
this.subscriber = requireNonNull(subscriber);
}

@Override
boolean hasSubscriber() {
return subscriber != null;
}

@Override
void tryCompleteSubscriber() {
if (subscriber != null) {
Expand All @@ -236,12 +231,15 @@ void tryCompleteSubscriber() {
}

@Override
void tryFailSubscriber(Throwable cause) {
boolean tryFailSubscriber(Throwable cause) {
if (subscriber != null) {
ChannelCloseUtils.close(parentContext.nettyChannel(), cause);
Subscriber<? super H2ServerParentConnectionContext> subscriberCopy = subscriber;
subscriber = null;
subscriberCopy.onError(cause);
return true;
} else {
return false;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandler;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
Expand All @@ -37,6 +38,7 @@
import java.nio.channels.ClosedChannelException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;

Expand Down Expand Up @@ -71,9 +73,11 @@ class ConnectionClosedAfterIoExceptionTest {
void test(HttpProtocol protocol) throws Exception {
AtomicReference<FilterableStreamingHttpConnection> firstConnection = new AtomicReference<>();
BlockingQueue<Throwable> errors = new LinkedBlockingQueue<>();
AtomicBoolean firstAttempt = new AtomicBoolean(true);
try (ServerContext serverContext = BuilderUtils.newServerBuilder(SERVER_CTX, protocol)
// Fail only the first connect attempt
.appendEarlyConnectionAcceptor(conn -> errors.isEmpty() ? failed(DELIBERATE_EXCEPTION) : completed())
.appendEarlyConnectionAcceptor(conn -> firstAttempt.compareAndSet(true, false) ?
failed(DELIBERATE_EXCEPTION) : completed())
.listenBlockingAndAwait((ctx, request, responseFactory) -> responseFactory.ok());
BlockingHttpClient client = BuilderUtils.newClientBuilder(serverContext, CLIENT_CTX, protocol)
.appendConnectionFactoryFilter(original -> new DelegatingConnectionFactory<InetSocketAddress,
Expand All @@ -86,15 +90,21 @@ public Single<FilterableStreamingHttpConnection> newConnection(InetSocketAddress
}
})
.appendClientFilter(new RetryingHttpRequesterFilter.Builder()
.retryRetryableExceptions((metaData, t) -> {
errors.add((Throwable) t);
return BackOffPolicy.ofImmediateBounded();
})
.retryOther((metaData, t) -> {
errors.add(t);
return BackOffPolicy.ofImmediateBounded();
})
.build())
.buildBlocking()) {

HttpResponse response = client.request(client.get("/"));
assertThat(response.status(), is(OK));
Assertions.assertDoesNotThrow(() -> {
HttpResponse response = client.request(client.get("/"));
assertThat(response.status(), is(OK));
});

assertThat("Unexpected number of errors, likely retried more than expected", errors, hasSize(1));
assertThat("Did not propagate original IoException", errors.poll(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1350,7 +1350,7 @@ public Single<StreamingHttpResponse> handle(final HttpServiceContext ctx,
}

// We expect this to timeout, because we have not completed the outstanding request.
assertFalse(onServerCloseLatch.await(30, MILLISECONDS));
assertFalse(onServerCloseLatch.await(300, MILLISECONDS));

requestBody.onComplete();

Expand Down
Loading