Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

traffic-resilience-http: Fix flaky testStopAcceptingConnections() test #3125

Merged
merged 13 commits into from
Dec 5, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@
import static io.servicetalk.capacity.limiter.api.CapacityLimiters.fixedCapacity;
import static io.servicetalk.concurrent.api.Single.succeeded;
import static io.servicetalk.concurrent.internal.DeliberateException.DELIBERATE_EXCEPTION;
import static io.servicetalk.concurrent.internal.TestTimeoutConstants.CI;
import static io.servicetalk.http.api.HttpProtocolVersion.HTTP_1_1;
import static io.servicetalk.http.netty.AsyncContextHttpFilterVerifier.verifyServerFilterAsyncContextVisibility;
import static io.servicetalk.http.netty.HttpProtocolConfigs.h1Default;
Expand Down Expand Up @@ -223,49 +222,53 @@ void testStopAcceptingConnections(final boolean dryRun, final String protocol) t
.dryRun(dryRun)
.build();

final HttpServerContext serverContext = HttpServers.forAddress(localAddress(0))
try (HttpServerContext serverContext = HttpServers.forAddress(localAddress(0))
.protocols(protocolConfig)
.listenSocketOption(SO_BACKLOG, TCP_BACKLOG)
.appendNonOffloadingServiceFilter(filter)
.listenStreamingAndAwait((ctx, request, responseFactory) ->
succeeded(responseFactory.ok().payloadBody(Publisher.never())));
succeeded(responseFactory.ok().payloadBody(Publisher.never())))) {

final StreamingHttpClient client = HttpClients.forSingleAddress(serverHostAndPort(serverContext))
.protocols(protocolConfig)
.socketOption(CONNECT_TIMEOUT, (int) SECONDS.toMillis(CI ? 4 : 2))
.buildStreaming();

// First request -> Pending 1
final StreamingHttpRequest meta1 = client.newRequest(HttpRequestMethod.GET, "/");
client.reserveConnection(meta1)
.flatMap(it -> it.request(meta1))
.concat(Completable.defer(() -> {
// First request, has a "never" pub as a body, we don't attempt to consume it.
// Concat second request -> out of capacity -> server yielded
final StreamingHttpRequest meta2 = client.newRequest(HttpRequestMethod.GET, "/");
return client.reserveConnection(meta2).flatMap(it -> it.request(meta2)).ignoreElement();
}))
.toFuture()
.get();
try (StreamingHttpClient client = HttpClients.forSingleAddress(serverHostAndPort(serverContext))
.protocols(protocolConfig)
.socketOption(CONNECT_TIMEOUT, (int) SECONDS.toMillis(2))
.buildStreaming()) {

// Netty will evaluate the "yielding" (i.e., auto-read) on this attempt, so this connection will go through.
assertThat(client.reserveConnection(client.newRequest(HttpRequestMethod.GET, "/"))
.toFuture().get().asConnection(), instanceOf(HttpConnection.class));
// First request -> Pending 1
final StreamingHttpRequest meta1 = client.newRequest(HttpRequestMethod.GET, "/");
client.reserveConnection(meta1)
.flatMap(it -> it.request(meta1))
.concat(Completable.defer(() -> {
// The response has a "never" pub as a body and we don't attempt to consume it.
// Concat second request -> out of capacity -> server yielded
final StreamingHttpRequest meta2 = client.newRequest(HttpRequestMethod.GET, "/");
return client.reserveConnection(meta2).flatMap(it -> it.request(meta2)).ignoreElement();
}))
.toFuture()
.get();

// This connection shall full-fil the BACKLOG=1 setting
assertThat(client.reserveConnection(client.newRequest(HttpRequestMethod.GET, "/"))
.toFuture().get().asConnection(), instanceOf(HttpConnection.class));
// We expect up to a couple connections to succeed due to the intrinsic race between disabling accepts
// and new connect requests, as well as to account for kernel connect backlog (effectively 1 for all
// OS's). That means we can have up to two connects succeed, but expect it to fail by the 3rd attempt.
for (int i = 0; i < 3; i++) {
if (dryRun) {
Scottmitch marked this conversation as resolved.
Show resolved Hide resolved
client.reserveConnection(client.newRequest(HttpRequestMethod.GET, "/")).toFuture().get()
.releaseAsync().toFuture().get();
} else {
try {
assertThat(client.reserveConnection(client.newRequest(HttpRequestMethod.GET, "/"))
.toFuture().get().asConnection(), instanceOf(HttpConnection.class));
} catch (ExecutionException e) {
assertThat(e.getCause(), instanceOf(ConnectTimeoutException.class));
// We saw the connection rejection so we succeeded.
return;
}
}
}

// Any attempt to create a connection now, should time out if we're not in dry mode.
if (dryRun) {
client.reserveConnection(client.newRequest(HttpRequestMethod.GET, "/")).toFuture().get()
.releaseAsync().toFuture().get();
} else {
try {
client.reserveConnection(client.newRequest(HttpRequestMethod.GET, "/")).toFuture().get();
fail("Expected a connection timeout");
} catch (ExecutionException e) {
assertThat(e.getCause(), instanceOf(ConnectTimeoutException.class));
if (!dryRun) {
fail("Connection was never rejected.");
}
}
}
}
Expand Down
Loading