Skip to content

Commit

Permalink
tcp-netty-internal: fix race in TcpConnector (#3069)
Browse files Browse the repository at this point in the history
Motivation:

We rely on some ordering of the TransportObserver to register
listeners on the netty connect promise before the bootstrap
has an opportunity to register it's own listener that will
close the channel. Without that ordering we can sometimes miss
that the connect operation failed.

Modifications:

Ensure ordering by listening from within the pipelines
`.connect(..)` pathway. This will always be called from the
channels event loop and before the bootstrap registers its
listener.

Result:

One less flaky test.

Fixes #3060
  • Loading branch information
bryce-anderson authored Sep 27, 2024
1 parent a5b42a0 commit 19d0907
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import io.netty.channel.ConnectTimeoutException;
import io.netty.channel.EventLoop;
import io.netty.resolver.AbstractAddressResolver;
Expand Down Expand Up @@ -112,24 +115,6 @@ protected void handleSubscribe(final Subscriber<? super C> subscriber) {
Future<?> connectFuture = connect0(localAddress, resolvedRemoteAddress, config, autoRead,
executionContext, connectHandler);
connectHandler.connectFuture(connectFuture);
connectFuture.addListener(f -> {
Throwable cause = f.cause();
if (cause != null) {
if (cause instanceof ConnectTimeoutException) {
String msg = resolvedRemoteAddress instanceof FileDescriptorSocketAddress ?
"Failed to register: " + resolvedRemoteAddress :
"Failed to connect: " + resolvedRemoteAddress + " (localAddress: " +
localAddress + ")";
cause = new io.servicetalk.client.api.ConnectTimeoutException(msg, cause);
} else if (cause instanceof ConnectException) {
cause = new RetryableConnectException((ConnectException) cause);
}
if (f instanceof ChannelFuture) {
assignConnectionError(((ChannelFuture) f).channel(), cause);
}
connectHandler.connectFailed(cause);
}
});
} catch (Throwable t) {
connectHandler.unexpectedFailure(t);
}
Expand All @@ -140,13 +125,44 @@ protected void handleSubscribe(final Subscriber<? super C> subscriber) {
private static Future<?> connect0(@Nullable SocketAddress localAddress, Object resolvedRemoteAddress,
ReadOnlyTcpClientConfig config, boolean autoRead,
ExecutionContext<?> executionContext,
Consumer<? super Channel> subscriber) {
ConnectHandler<?> connectHandler) {
// Create the handler here and ensure in connectWithBootstrap / initFileDescriptorBasedChannel it is added
// to the ChannelPipeline after registration is complete as otherwise we may miss channelActive events.
ChannelHandler handler = new io.netty.channel.ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel channel) {
subscriber.accept(channel);
// We need to intercept the `connect` call in the pipeline and add our listener because right after the
// pipeline finishes its `connect(..)` sequence the netty Bootstrap will add the `CLOSE_ON_FAILURE`
// listener to the connect future. That will close the channel and complete the close future before we
// can call the `ChannelCloseUtils.assignConnectionError` helpers and can cause us to miss the reason
// for the channel closing.
channel.pipeline().addLast(new ChannelOutboundHandlerAdapter() {
@Override
public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress,
SocketAddress localAddress, ChannelPromise promise) throws Exception {
ctx.pipeline().remove(this);
promise.addListener(f -> {
Throwable cause = f.cause();
if (cause != null) {
if (cause instanceof ConnectTimeoutException) {
String msg = resolvedRemoteAddress instanceof FileDescriptorSocketAddress ?
"Failed to register: " + resolvedRemoteAddress :
"Failed to connect: " + resolvedRemoteAddress + " (localAddress: " +
localAddress + ")";
cause = new io.servicetalk.client.api.ConnectTimeoutException(msg, cause);
} else if (cause instanceof ConnectException) {
cause = new RetryableConnectException((ConnectException) cause);
}
if (f instanceof ChannelFuture) {
assignConnectionError(((ChannelFuture) f).channel(), cause);
}
connectHandler.connectFailed(cause);
}
});
super.connect(ctx, remoteAddress, localAddress, promise);
}
});
connectHandler.accept(channel);
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ void setUp() throws Exception {
client = createClient();
}

// Visible for overriding.
private TcpClient createClient() {
return new TcpClient(getTcpClientConfig(), getClientTransportObserver());
}
Expand Down

0 comments on commit 19d0907

Please sign in to comment.