Skip to content

Commit

Permalink
loadbalancer-experimental: reduce dogpiling on hosts after healthy ho…
Browse files Browse the repository at this point in the history
…sts (#3021)

Motivation:

Round-robin algorithms very often suffer from bugs where the host
after a healthy host will tend to get more traffic. This is because
the algorithm will then try to advance to the 'next' index.

Modifications:

If we find a bad host attempt to advance the cursor so the next
selection doesn't also pick it.

Result:

Less dogpiling behavior.
  • Loading branch information
bryce-anderson authored Jul 30, 2024
1 parent 8ae9e7b commit 6b3a09f
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ final class RoundRobinSelector<ResolvedAddress, C extends LoadBalancedConnection
this(new AtomicInteger(), hosts, lbDescription, failOpen, ignoreWeights);
}

private RoundRobinSelector(final AtomicInteger index, final List<? extends Host<ResolvedAddress, C>> hosts,
// visible for testing purposes.
RoundRobinSelector(final AtomicInteger index, final List<? extends Host<ResolvedAddress, C>> hosts,
final String targetResource, final boolean failOpen, final boolean ignoreWeights) {
super(hosts, targetResource);
this.index = index;
Expand All @@ -83,6 +84,10 @@ protected Single<C> selectConnection0(
if (host.isHealthy()) {
Single<C> result = selectFromHost(host, selector, forceNewConnectionAndReserve, context);
if (result != null) {
if (i != 0) {
// let the scheduler attempt to skip past the bad hosts for fairness reasons.
scheduler.foundUnhealthy(cursor, i);
}
return result;
}
}
Expand Down Expand Up @@ -134,22 +139,45 @@ private static Scheduler buildScheduler(AtomicInteger index, List<? extends Host
}

private abstract static class Scheduler {

private final AtomicInteger index;
protected final int hostsSize;

Scheduler(final AtomicInteger index, final int hostsSize) {
this.index = index;
this.hostsSize = hostsSize;
}

// Get the index of the next host
abstract int nextHost();

protected final long nextIndex() {
return Integer.toUnsignedLong(index.getAndIncrement());
}

// Let the scheduler know the index was found to be unhealthy in an attempt to avoid causing the node
// after an unhealthy node to effectively receive double traffic.
final void foundUnhealthy(int cursor, int unhealthyCount) {
int i = index.get();
// We have to check against `i - 1` because we perform a getAndIncrement so the index we returned from
// `nextIndex()` is one behind where index currently is.

if (cursor == (Integer.toUnsignedLong(i) - 1) % hostsSize) {
// We do CAS to conditionally advance the cursor only if someone else hasn't already.
index.compareAndSet(i, i + unhealthyCount);
}
}
}

private static final class ConstantScheduler extends Scheduler {

private final AtomicInteger index;
private final int hostsSize;

ConstantScheduler(AtomicInteger index, int hostsSize) {
this.index = index;
this.hostsSize = hostsSize;
super(index, hostsSize);
}

@Override
int nextHost() {
return (int) (Integer.toUnsignedLong(index.getAndIncrement()) % hostsSize);
return (int) (nextIndex() % hostsSize);
}
}

Expand All @@ -159,21 +187,19 @@ int nextHost() {
// See the java-grpc library for more details:
// https://github.com/grpc/grpc-java/blob/da619e2b/xds/src/main/java/io/grpc/xds/WeightedRoundRobinLoadBalancer.java
private static final class StrideScheduler extends Scheduler {

private final AtomicInteger index;
private final int[] weights;

StrideScheduler(AtomicInteger index, int[] weights) {
this.index = index;
super(index, weights.length);
this.weights = weights;
}

@Override
int nextHost() {
while (true) {
long counter = Integer.toUnsignedLong(index.getAndIncrement());
long pass = counter / weights.length;
int i = (int) counter % weights.length;
long counter = nextIndex();
long pass = counter / hostsSize;
int i = (int) counter % hostsSize;
// We add a unique offset for each offset which could be anything so long as it's constant throughout
// iteration. This is helpful in the case where weights are [1, .. 1, 5] since the scheduling could
// otherwise look something like this:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nullable;

import static io.servicetalk.loadbalancer.SelectorTestHelpers.PREDICATE;
Expand All @@ -39,6 +40,7 @@
class RoundRobinSelectorTest {

private boolean failOpen;
private final AtomicInteger index = new AtomicInteger();
@Nullable
private HostSelector<String, TestLoadBalancedConnection> selector;

Expand All @@ -50,12 +52,14 @@ void setup() {
}

void init(List<Host<String, TestLoadBalancedConnection>> hosts) {
selector = new RoundRobinSelector<>(hosts, "testResource", failOpen, false);
selector = new RoundRobinSelector<>(index, hosts, "testResource", failOpen, false);
}

@Test
void roundRobining() throws Exception {
@ParameterizedTest(name = "{displayName} [{index}]: negativeIndex={0}")
@ValueSource(booleans = {true, false})
void roundRobining(boolean negativeIndex) throws Exception {
List<Host<String, TestLoadBalancedConnection>> hosts = SelectorTestHelpers.generateHosts("addr-1", "addr-2");
index.set(negativeIndex ? -1000 : 0);
init(hosts);
List<String> addresses = new ArrayList<>();
for (int i = 0; i < 5; i++) {
Expand Down Expand Up @@ -191,4 +195,48 @@ void singleInactiveHostWithoutConnections(boolean unhealthy, boolean failOpen) {
PREDICATE, null, false).toFuture().get());
assertThat(e.getCause(), isA(NoActiveHostException.class));
}

@ParameterizedTest(name = "{displayName} [{index}]: negativeIndex={0}")
@ValueSource(booleans = {true, false})
void equalWeightsDoesNotOverPrioritizeTheNodeAfterAFailingNode(boolean negativeIndex) throws Exception {
List<Host<String, TestLoadBalancedConnection>> hosts =
SelectorTestHelpers.generateHosts("addr-1", "addr-2", "addr-3", "addr-4");
when(hosts.get(0).isHealthy()).thenReturn(false);
when(hosts.get(1).isHealthy()).thenReturn(false);
index.set(negativeIndex ? -1000 : 0);
init(hosts);

List<String> addresses = new ArrayList<>();
for (int i = 0; i < 4; i++) {
TestLoadBalancedConnection connection = selector.selectConnection(
PREDICATE, null, true).toFuture().get();
addresses.add(connection.address());
}
assertThat(addresses, contains("addr-3", "addr-4", "addr-3", "addr-4"));
}

@Test
void unequalWeightsDoesNotOverPrioritizeTheNodeAfterAFailingNode() throws Exception {
List<Host<String, TestLoadBalancedConnection>> hosts = SelectorTestHelpers.generateHosts(
"addr-1", "addr-2", "addr-3", "addr-4");
when(hosts.get(0).isHealthy()).thenReturn(false);
when(hosts.get(1).isHealthy()).thenReturn(false);
when(hosts.get(0).weight()).thenReturn(1.0);
when(hosts.get(1).weight()).thenReturn(1.1);
when(hosts.get(2).weight()).thenReturn(1.2);
when(hosts.get(3).weight()).thenReturn(1.3);
init(hosts);

// The stream of 7 selections for healthy elements is
// [addr-2, addr-3, addr-4, addr-1, addr-2, addr-3, addr-4]
// Since 1 and 2 are unhealthy we expect the first 4 picks to be
// [ X , addr-3, addr-4, X , X , addr-3, addr-4]
List<String> addresses = new ArrayList<>();
for (int i = 0; i < 4; i++) {
TestLoadBalancedConnection connection = selector.selectConnection(
PREDICATE, null, true).toFuture().get();
addresses.add(connection.address());
}
assertThat(addresses, contains("addr-3", "addr-4", "addr-3", "addr-4"));
}
}

0 comments on commit 6b3a09f

Please sign in to comment.