Skip to content

Commit

Permalink
Add tests for the P2CSelector
Browse files Browse the repository at this point in the history
  • Loading branch information
bryce-anderson committed Nov 7, 2023
1 parent 652ec78 commit 905e488
Show file tree
Hide file tree
Showing 7 changed files with 903 additions and 594 deletions.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ private static <ResolvedAddress, C extends LoadBalancedConnection> boolean notAv
final Collection<? extends ServiceDiscovererEvent<ResolvedAddress>> events) {
boolean available = false;
for (ServiceDiscovererEvent<ResolvedAddress> event : events) {
if (host.address.equals(event.address())) {
if (host.address().equals(event.address())) {
available = true;
break;
}
Expand Down Expand Up @@ -276,7 +276,7 @@ public void onNext(@Nullable final Collection<? extends ServiceDiscovererEvent<R
}
} else if (UNAVAILABLE.equals(eventStatus)) {
return listWithHostRemoved(oldHostsTyped, host -> {
boolean match = host.address.equals(addr);
boolean match = host.address().equals(addr);
if (match) {
host.markClosed();
}
Expand Down Expand Up @@ -331,7 +331,7 @@ public void onNext(@Nullable final Collection<? extends ServiceDiscovererEvent<R
private List<Host<ResolvedAddress, C>> markHostAsExpired(
final List<Host<ResolvedAddress, C>> oldHostsTyped, final ResolvedAddress addr) {
for (Host<ResolvedAddress, C> host : oldHostsTyped) {
if (host.address.equals(addr)) {
if (host.address().equals(addr)) {
// Host removal will be handled by the Host's onClose::afterFinally callback
host.markExpired();
break; // because duplicates are not allowed, we can stop iteration
Expand All @@ -342,7 +342,7 @@ private List<Host<ResolvedAddress, C>> markHostAsExpired(

private Host<ResolvedAddress, C> createHost(ResolvedAddress addr) {
// All hosts will share the healthcheck config of the parent RR loadbalancer.
Host<ResolvedAddress, C> host = new Host<>(NewRoundRobinLoadBalancer.this.toString(), addr,
Host<ResolvedAddress, C> host = new DefaultHost<>(NewRoundRobinLoadBalancer.this.toString(), addr,
connectionFactory, linearSearchSpace, healthCheckConfig);
host.onClose().afterFinally(() ->
usedHostsUpdater.updateAndGet(NewRoundRobinLoadBalancer.this, previousHosts -> {
Expand All @@ -363,7 +363,7 @@ private List<Host<ResolvedAddress, C>> addHostToList(

// duplicates are not allowed
for (Host<ResolvedAddress, C> host : oldHostsTyped) {
if (host.address.equals(addr)) {
if (host.address().equals(addr)) {
if (!host.markActiveIfNotClosed()) {
// If the host is already in CLOSED state, we should create a new entry.
// For duplicate ACTIVE events or for repeated activation due to failed CAS
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public Single<C> selectConnection(

@Nullable
private Single<C> p2c(int size, List<Host<ResolvedAddress, C>> hosts, Random random, Predicate<C> selector,
boolean forceNewConnectionAndReserve, @Nullable ContextMap contextMap) {
boolean forceNewConnectionAndReserve, @Nullable ContextMap contextMap) {
for (int j = maxEffort; j > 0; j--) {
// Pick two random indexes that don't collide. Limit the range on the second index to 1 less than
// the max value so that if there is a collision we can safety increment. We also increment if
Expand All @@ -103,25 +103,22 @@ private Single<C> p2c(int size, List<Host<ResolvedAddress, C>> hosts, Random ran
}

if (!forceNewConnectionAndReserve) {
// First we're going to see if we can get an already active connection regardless of health status.
// Since t1 is 'better' we'll try it first but if it won't yield a connection, we'll try t2.
// First we're going to see if we can get an existing connection regardless of health status. Since t1
// is 'better' we'll try it first. If it doesn't have any existing connections we don't fall back to t2
// or else we would cause a bias toward hosts with existing connections which could ultimately drive all
// traffic to the first host to make a connection in the case of a multiplexed session.
C c = t1.pickConnection(selector, contextMap);
if (c != null) {
return succeeded(c);
}
// try t2
c = t2.pickConnection(selector, contextMap);
if (c != null) {
return succeeded(c);
}
// Neither candidate yielded an existing connection. We now need to consider the health status
// and make a new connection if either host is considered healthy.
// We now need to consider the health status and make a new connection if either
// host is considered healthy.
}

// We either couldn't find a live connection or are being forced to make a new one. Either way we're
// going to make a new connection which means we need to consider health.
final boolean t1Healthy = t1.isActiveAndHealthy();
final boolean t2Healthy = t1.isActiveAndHealthy();
final boolean t2Healthy = t2.isActiveAndHealthy();
if (t1Healthy) {
return t1.newConnection(selector, forceNewConnectionAndReserve, contextMap);
} else if (t2Healthy) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
/*
* Copyright © 2023 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.servicetalk.loadbalancer;

import io.servicetalk.client.api.NoActiveHostException;
import io.servicetalk.concurrent.api.Single;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.function.Predicate;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.either;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.isA;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

class P2CSelectorTest {

private static final Predicate<TestLoadBalancedConnection> PREDICATE = (ignored) -> true;
private Random random;
private P2CSelector<String, TestLoadBalancedConnection> selector;

@BeforeEach
void init() {
init(5, random);
}

void init(int maxEffort, @Nullable Random random) {
selector = new P2CSelector<>("testResource", maxEffort, random);
}

private Host mockHost(String addr, TestLoadBalancedConnection connection) {
Host<String, TestLoadBalancedConnection> host = mock(Host.class);
when(host.address()).thenReturn(addr);
when(host.isUnhealthy()).thenReturn(true);
when(host.isActiveAndHealthy()).thenReturn(true);
when(host.pickConnection(any(), any())).thenReturn(connection);
when(host.newConnection(any(), anyBoolean(), any())).thenReturn(Single.succeeded(connection));
return host;
}

private List<Host<String, TestLoadBalancedConnection>> connections(String... addresses) {
final List<Host<String, TestLoadBalancedConnection>> results = new ArrayList<>(addresses.length);
for (String addr : addresses) {
results.add(mockHost(addr, TestLoadBalancedConnection.mockConnection(addr)));
}
return results;
}

@ParameterizedTest(name = "{displayName} [{index}]: forceNewConnection={0}")
@ValueSource(booleans = {false, true})
void singleHost(boolean forceNewConnection) throws Exception {
List<Host<String, TestLoadBalancedConnection>> hosts = connections("addr-1");
TestLoadBalancedConnection connection = selector.selectConnection(
hosts, PREDICATE, null, forceNewConnection).toFuture().get();
assertThat(connection.address(), equalTo("addr-1"));
}

@ParameterizedTest(name = "{displayName} [{index}]: forceNewConnection={0}")
@ValueSource(booleans = {false, true})
void singleUnhealthyHostWithConnection(boolean forceNewConnection) throws Exception {
List<Host<String, TestLoadBalancedConnection>> hosts = connections("addr-1");
when(hosts.get(0).isActiveAndHealthy()).thenReturn(false);
if (forceNewConnection) {
Exception e = assertThrows(ExecutionException.class, () -> selector.selectConnection(
hosts, PREDICATE, null, forceNewConnection).toFuture().get());
assertThat(e.getCause(), isA(NoActiveHostException.class));
} else {
TestLoadBalancedConnection connection = selector.selectConnection(
hosts, PREDICATE, null, forceNewConnection).toFuture().get();
assertThat(connection.address(), equalTo("addr-1"));
}
}

@ParameterizedTest(name = "{displayName} [{index}]: forceNewConnection={0}")
@ValueSource(booleans = {false, true})
void singleUnhealthyHostWithoutConnection(boolean forceNewConnection) {
List<Host<String, TestLoadBalancedConnection>> hosts = connections("addr-1");
when(hosts.get(0).isActiveAndHealthy()).thenReturn(false);
when(hosts.get(0).pickConnection(any(), any())).thenReturn(null);
Exception e = assertThrows(ExecutionException.class, () -> selector.selectConnection(
hosts, PREDICATE, null, forceNewConnection).toFuture().get());
assertThat(e.getCause(), isA(NoActiveHostException.class));
}

@ParameterizedTest(name = "{displayName} [{index}]: forceNewConnection={0}")
@ValueSource(booleans = {false, true})
void twoHealthyHosts(boolean forceNewConnection) throws Exception {
List<Host<String, TestLoadBalancedConnection>> hosts = connections("addr-1", "addr-2");
TestLoadBalancedConnection connection = selector.selectConnection(
hosts, PREDICATE, null, forceNewConnection).toFuture().get();
assertThat(connection.address(), either(equalTo("addr-1")).or(equalTo("addr-2")));
}

@Test
void twoUnHealthyHostsWithConnections() throws Exception {
List<Host<String, TestLoadBalancedConnection>> hosts = connections("addr-1", "addr-2");
for (Host<String, TestLoadBalancedConnection> host : hosts) {
when(host.isActiveAndHealthy()).thenReturn(false);
}
TestLoadBalancedConnection connection = selector.selectConnection(
hosts, PREDICATE, null, false).toFuture().get();
assertThat(connection.address(), either(equalTo("addr-1")).or(equalTo("addr-2")));
}

@Test
void twoUnHealthyHostsWithoutConnections() {
List<Host<String, TestLoadBalancedConnection>> hosts = connections("addr-1", "addr-2");
for (Host<String, TestLoadBalancedConnection> host : hosts) {
when(host.isActiveAndHealthy()).thenReturn(false);
when(host.pickConnection(any(), any())).thenReturn(null);
}
Exception e = assertThrows(ExecutionException.class, () -> selector.selectConnection(
hosts, PREDICATE, null, false).toFuture().get());
assertThat(e.getCause(), isA(NoActiveHostException.class));
}

@RepeatedTest(100)
void doesntBiasTowardHostsWithConnections() throws Exception {
List<Host<String, TestLoadBalancedConnection>> hosts = connections("addr-1", "addr-2");
// we setup the first host to always be preferred by score, but it also doesn't have any connections.
when(hosts.get(0).pickConnection(any(), any())).thenReturn(null);
when(hosts.get(0).score()).thenReturn(10);
TestLoadBalancedConnection connection = selector.selectConnection(
hosts, PREDICATE, null, false).toFuture().get();
assertThat(connection.address(), equalTo("addr-1"));
}

@RepeatedTest(100)
void biasesTowardsActiveAndHealthyHostWhenNoConnections() throws Exception {
List<Host<String, TestLoadBalancedConnection>> hosts = connections("addr-1", "addr-2");
when(hosts.get(0).isActiveAndHealthy()).thenReturn(false);
TestLoadBalancedConnection connection = selector.selectConnection(
hosts, PREDICATE, null, true).toFuture().get();
assertThat(connection.address(), equalTo("addr-2"));
}

@RepeatedTest(100)
void biasesTowardTheHighestWeightHostForNewConnections() throws Exception {
biasesTowardTheHighestWeightHost(true);
}

@RepeatedTest(100)
void biasesTowardTheHighestWeightHostForExistingConnections() throws Exception {
biasesTowardTheHighestWeightHost(false);
}

void biasesTowardTheHighestWeightHost(boolean forceNewConnection) throws Exception {
List<Host<String, TestLoadBalancedConnection>> hosts = connections("addr-1", "addr-2");
// Host 0 has the highest score so it should always get the new connection.
when(hosts.get(0).score()).thenReturn(10);
TestLoadBalancedConnection connection = selector.selectConnection(
hosts, PREDICATE, null, forceNewConnection).toFuture().get();
assertThat(connection.address(), equalTo("addr-1"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import io.servicetalk.client.api.ConnectionLimitReachedException;
import io.servicetalk.client.api.ConnectionRejectedException;
import io.servicetalk.client.api.DefaultServiceDiscovererEvent;
import io.servicetalk.client.api.LoadBalancedConnection;
import io.servicetalk.client.api.LoadBalancerReadyEvent;
import io.servicetalk.client.api.NoActiveHostException;
import io.servicetalk.client.api.NoAvailableHostException;
Expand Down Expand Up @@ -127,7 +126,6 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

abstract class RoundRobinLoadBalancerTest {

Expand Down Expand Up @@ -967,15 +965,7 @@ private TestLoadBalancedConnection newConnection(final String address) {
}

private TestLoadBalancedConnection newConnection(final String address, final ListenableAsyncCloseable closeable) {
final TestLoadBalancedConnection cnx = mock(TestLoadBalancedConnection.class);
when(cnx.closeAsync()).thenReturn(closeable.closeAsync());
when(cnx.closeAsyncGracefully()).thenReturn(closeable.closeAsyncGracefully());
when(cnx.onClose()).thenReturn(closeable.onClose());
when(cnx.onClosing()).thenReturn(closeable.onClosing());
when(cnx.address()).thenReturn(address);
when(cnx.toString()).thenReturn(address + '@' + cnx.hashCode());
when(cnx.tryReserve()).thenReturn(true);

final TestLoadBalancedConnection cnx = TestLoadBalancedConnection.mockConnection(address, closeable);
connectionsCreated.add(cnx);
return cnx;
}
Expand All @@ -994,10 +984,6 @@ private static Predicate<TestLoadBalancedConnection> newSaturableConnectionFilte
};
}

interface TestLoadBalancedConnection extends LoadBalancedConnection {
String address();
}

static class DelegatingConnectionFactory implements
ConnectionFactory<String, TestLoadBalancedConnection> {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright © 2023 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.servicetalk.loadbalancer;

import io.servicetalk.client.api.LoadBalancedConnection;
import io.servicetalk.concurrent.api.ListenableAsyncCloseable;

import static io.servicetalk.concurrent.api.AsyncCloseables.emptyAsyncCloseable;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

interface TestLoadBalancedConnection extends LoadBalancedConnection {
String address();

static TestLoadBalancedConnection mockConnection(final String address) {
return mockConnection(address, emptyAsyncCloseable());
}

static TestLoadBalancedConnection mockConnection(final String address, final ListenableAsyncCloseable closeable) {
final TestLoadBalancedConnection cnx = mock(TestLoadBalancedConnection.class);
when(cnx.closeAsync()).thenReturn(closeable.closeAsync());
when(cnx.closeAsyncGracefully()).thenReturn(closeable.closeAsyncGracefully());
when(cnx.onClose()).thenReturn(closeable.onClose());
when(cnx.onClosing()).thenReturn(closeable.onClosing());
when(cnx.address()).thenReturn(address);
when(cnx.toString()).thenReturn(address + '@' + cnx.hashCode());
when(cnx.tryReserve()).thenReturn(true);
return cnx;
}
}

0 comments on commit 905e488

Please sign in to comment.