Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extract round-robin host selection from the NewRoundRobinLoadBalancer #2741

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright © 2023 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.servicetalk.loadbalancer;

import io.servicetalk.client.api.LoadBalancedConnection;
import io.servicetalk.concurrent.api.Single;
import io.servicetalk.context.api.ContextMap;

import java.util.List;
import java.util.function.Predicate;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

/**
* Interface abstracting away the method of host selection.
*/
interface HostSelector<ResolvedAddress, C extends LoadBalancedConnection> {

/**
* Select or establish a new connection from an existing Host.
*
* This method will be called concurrently with other selectConnection calls and
* hostSetChanged calls and must be thread safe under those conditions.
*/
Single<C> selectConnection(@Nonnull List<Host<ResolvedAddress, C>> hosts, @Nonnull Predicate<C> selector,
@Nullable ContextMap context, boolean forceNewConnectionAndReserve);
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import java.util.Map.Entry;
import java.util.Spliterator;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Consumer;
Expand All @@ -61,7 +60,6 @@
import static io.servicetalk.concurrent.api.Processors.newPublisherProcessorDropHeadOnOverflow;
import static io.servicetalk.concurrent.api.Single.defer;
import static io.servicetalk.concurrent.api.Single.failed;
import static io.servicetalk.concurrent.api.Single.succeeded;
import static io.servicetalk.concurrent.api.SourceAdapters.fromSource;
import static io.servicetalk.concurrent.api.SourceAdapters.toSource;
import static java.lang.Integer.toHexString;
Expand All @@ -86,22 +84,19 @@ final class NewRoundRobinLoadBalancer<ResolvedAddress, C extends LoadBalancedCon
@SuppressWarnings("rawtypes")
private static final AtomicReferenceFieldUpdater<NewRoundRobinLoadBalancer, List> usedHostsUpdater =
AtomicReferenceFieldUpdater.newUpdater(NewRoundRobinLoadBalancer.class, List.class, "usedHosts");
@SuppressWarnings("rawtypes")
private static final AtomicIntegerFieldUpdater<NewRoundRobinLoadBalancer> indexUpdater =
AtomicIntegerFieldUpdater.newUpdater(NewRoundRobinLoadBalancer.class, "index");

@SuppressWarnings("rawtypes")
private static final AtomicLongFieldUpdater<NewRoundRobinLoadBalancer> nextResubscribeTimeUpdater =
AtomicLongFieldUpdater.newUpdater(NewRoundRobinLoadBalancer.class, "nextResubscribeTime");

private static final long RESUBSCRIBING = -1L;

private volatile long nextResubscribeTime = RESUBSCRIBING;
@SuppressWarnings("unused")
private volatile int index;
private volatile List<Host<ResolvedAddress, C>> usedHosts = emptyList();

private final String targetResource;
private final String lbDescription;
private final HostSelector<ResolvedAddress, C> hostSelector;
private final Publisher<? extends Collection<? extends ServiceDiscovererEvent<ResolvedAddress>>> eventPublisher;
private final Processor<Object, Object> eventStreamProcessor = newPublisherProcessorDropHeadOnOverflow(32);
private final Publisher<Object> eventStream;
Expand Down Expand Up @@ -134,6 +129,7 @@ final class NewRoundRobinLoadBalancer<ResolvedAddress, C extends LoadBalancedCon
@Nullable final HealthCheckConfig healthCheckConfig) {
this.targetResource = requireNonNull(targetResourceName);
this.lbDescription = makeDescription(id, targetResource);
this.hostSelector = new RoundRobinSelector<>(targetResource);
this.eventPublisher = requireNonNull(eventPublisher);
this.eventStream = fromSource(eventStreamProcessor)
.replay(1); // Allow for multiple subscribers and provide new subscribers with last signal.
Expand Down Expand Up @@ -306,6 +302,7 @@ public void onNext(@Nullable final Collection<? extends ServiceDiscovererEvent<R
}
}

final List<Host<ResolvedAddress, C>> currentHosts = usedHosts;
if (firstEventsAfterResubscribe) {
// We can enter this path only if we re-subscribed because all previous hosts were UNHEALTHY.
if (events.isEmpty()) {
Expand All @@ -323,7 +320,6 @@ public void onNext(@Nullable final Collection<? extends ServiceDiscovererEvent<R
// starts from an empty state propagating only AVAILABLE events. To be in sync with the
// ServiceDiscoverer we should clean up and close gracefully all hosts that are not present in the
// initial collection of events, regardless of their current state.
final List<Host<ResolvedAddress, C>> currentHosts = usedHosts;
for (Host<ResolvedAddress, C> host : currentHosts) {
if (notAvailable(host, events)) {
host.closeAsyncGracefully().subscribe();
Expand Down Expand Up @@ -444,6 +440,35 @@ public Single<C> newConnection(@Nullable final ContextMap context) {
return defer(() -> selectConnection0(c -> true, context, true).shareContextOnSubscribe());
}

private Single<C> selectConnection0(final Predicate<C> selector, @Nullable final ContextMap context,
final boolean forceNewConnectionAndReserve) {
final List<Host<ResolvedAddress, C>> currentHosts = this.usedHosts;
// It's possible that we're racing with updates from the `onNext` method but since it's intrinsically
// racy it's fine to do these 'are there any hosts at all' checks here using the total host set.
if (currentHosts.isEmpty()) {
return isClosedList(currentHosts) ? failedLBClosed(targetResource) :
// This is the case when SD has emitted some items but none of the hosts are available.
failed(Exceptions.StacklessNoAvailableHostException.newInstance(
"No hosts are available to connect for " + targetResource + ".",
NewRoundRobinLoadBalancer.class, "selectConnection0(...)"));
}

Single<C> result = hostSelector.selectConnection(currentHosts, selector, context, forceNewConnectionAndReserve);
if (healthCheckConfig != null) {
result = result.beforeOnError(exn -> {
if (exn instanceof Exceptions.StacklessNoActiveHostException && allUnhealthy(currentHosts)) {
final long currNextResubscribeTime = nextResubscribeTime;
if (currNextResubscribeTime >= 0 &&
healthCheckConfig.executor.currentTime(NANOSECONDS) >= currNextResubscribeTime &&
nextResubscribeTimeUpdater.compareAndSet(this, currNextResubscribeTime, RESUBSCRIBING)) {
subscribeToEvents(true);
}
}
});
}
bryce-anderson marked this conversation as resolved.
Show resolved Hide resolved
return result;
}

@Override
public Publisher<Object> eventStream() {
return eventStream;
Expand All @@ -454,58 +479,6 @@ public String toString() {
return lbDescription;
}

private Single<C> selectConnection0(final Predicate<C> selector, @Nullable final ContextMap context,
final boolean forceNewConnectionAndReserve) {
final List<Host<ResolvedAddress, C>> usedHosts = this.usedHosts;
if (usedHosts.isEmpty()) {
return isClosedList(usedHosts) ? failedLBClosed(targetResource) :
// This is the case when SD has emitted some items but none of the hosts are available.
failed(Exceptions.StacklessNoAvailableHostException.newInstance(
"No hosts are available to connect for " + targetResource + ".",
NewRoundRobinLoadBalancer.class, "selectConnection0(...)"));
}

// try one loop over hosts and if all are expired, give up
final int cursor = (indexUpdater.getAndIncrement(this) & Integer.MAX_VALUE) % usedHosts.size();
Host<ResolvedAddress, C> pickedHost = null;
for (int i = 0; i < usedHosts.size(); ++i) {
// for a particular iteration we maintain a local cursor without contention with other requests
final int localCursor = (cursor + i) % usedHosts.size();
final Host<ResolvedAddress, C> host = usedHosts.get(localCursor);
assert host != null : "Host can't be null.";

if (!forceNewConnectionAndReserve) {
// First see if an existing connection can be used
C connection = host.pickConnection(selector, context);
if (connection != null) {
return succeeded(connection);
}
}

// Don't open new connections for expired or unhealthy hosts, try a different one.
// Unhealthy hosts have no open connections – that's why we don't fail earlier, the loop will not progress.
if (host.isActiveAndHealthy()) {
pickedHost = host;
break;
}
}
if (pickedHost == null) {
if (healthCheckConfig != null && allUnhealthy(usedHosts)) {
final long currNextResubscribeTime = nextResubscribeTime;
if (currNextResubscribeTime >= 0 &&
healthCheckConfig.executor.currentTime(NANOSECONDS) >= currNextResubscribeTime &&
nextResubscribeTimeUpdater.compareAndSet(this, currNextResubscribeTime, RESUBSCRIBING)) {
subscribeToEvents(true);
}
}
return failed(Exceptions.StacklessNoActiveHostException.newInstance("Failed to pick an active host for " +
targetResource + ". Either all are busy, expired, or unhealthy: " + usedHosts,
NewRoundRobinLoadBalancer.class, "selectConnection0(...)"));
}
// No connection was selected: create a new one.
return pickedHost.newConnection(selector, forceNewConnectionAndReserve, context);
}

@Override
public Completable onClose() {
return asyncCloseable.onClose();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Copyright © 2023 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.servicetalk.loadbalancer;

import io.servicetalk.client.api.LoadBalancedConnection;
import io.servicetalk.concurrent.api.Single;
import io.servicetalk.context.api.ContextMap;

import java.util.List;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.function.Predicate;
import javax.annotation.Nullable;

import static io.servicetalk.concurrent.api.Single.failed;
import static io.servicetalk.concurrent.api.Single.succeeded;
import static java.util.Objects.requireNonNull;

final class RoundRobinSelector<ResolvedAddress, C extends LoadBalancedConnection>
implements HostSelector<ResolvedAddress, C> {

@SuppressWarnings("rawtypes")
private static final AtomicIntegerFieldUpdater<RoundRobinSelector> indexUpdater =
AtomicIntegerFieldUpdater.newUpdater(RoundRobinSelector.class, "index");

private final String targetResource;
@SuppressWarnings("unused")
private volatile int index;

RoundRobinSelector(final String targetResource) {
this.targetResource = requireNonNull(targetResource);
}

@Override
public Single<C> selectConnection(
final List<Host<ResolvedAddress, C>> usedHosts,
final Predicate<C> selector, @Nullable final ContextMap context,
final boolean forceNewConnectionAndReserve) {
// try one loop over hosts and if all are expired, give up
final int cursor = (indexUpdater.getAndIncrement(this) & Integer.MAX_VALUE) % usedHosts.size();
Host<ResolvedAddress, C> pickedHost = null;
for (int i = 0; i < usedHosts.size(); ++i) {
// for a particular iteration we maintain a local cursor without contention with other requests
final int localCursor = (cursor + i) % usedHosts.size();
final Host<ResolvedAddress, C> host = usedHosts.get(localCursor);
assert host != null : "Host can't be null.";

if (!forceNewConnectionAndReserve) {
// First see if an existing connection can be used
C connection = host.pickConnection(selector, context);
if (connection != null) {
return succeeded(connection);
}
}

// Don't open new connections for expired or unhealthy hosts, try a different one.
// Unhealthy hosts have no open connections – that's why we don't fail earlier, the loop will not progress.
if (host.isActiveAndHealthy()) {
pickedHost = host;
break;
}
bryce-anderson marked this conversation as resolved.
Show resolved Hide resolved
}
if (pickedHost == null) {
return failed(Exceptions.StacklessNoActiveHostException.newInstance("Failed to pick an active host for " +
targetResource + ". Either all are busy, expired, or unhealthy: " + usedHosts,
this.getClass(), "selectConnection(...)"));
}
// We have a host but no connection was selected: create a new one.
return pickedHost.newConnection(selector, forceNewConnectionAndReserve, context);
}
}
Loading