From e5d5e955549a5633637642dc8d349792baf617ea Mon Sep 17 00:00:00 2001 From: Bryce Anderson Date: Mon, 30 Oct 2023 18:00:23 -0600 Subject: [PATCH 1/3] Extract round-robin host selection from the NewRoundRobinLoadBalancer Motivation: We want to be able to swap out load balancer algorithms but right now that is in-lined in the same file as the host set management. Modifications: - Extract the host selection algorithm into it's own class RoundRobinSelector. --- .../NewRoundRobinLoadBalancer.java | 89 +++++++------------ .../loadbalancer/RoundRobinSelector.java | 80 +++++++++++++++++ 2 files changed, 110 insertions(+), 59 deletions(-) create mode 100644 servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinSelector.java diff --git a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/NewRoundRobinLoadBalancer.java b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/NewRoundRobinLoadBalancer.java index e0744d1c91..44f46f0a51 100644 --- a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/NewRoundRobinLoadBalancer.java +++ b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/NewRoundRobinLoadBalancer.java @@ -42,7 +42,6 @@ import java.util.Map.Entry; import java.util.Spliterator; import java.util.concurrent.ThreadLocalRandom; -import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.function.Consumer; @@ -61,7 +60,6 @@ import static io.servicetalk.concurrent.api.Processors.newPublisherProcessorDropHeadOnOverflow; import static io.servicetalk.concurrent.api.Single.defer; import static io.servicetalk.concurrent.api.Single.failed; -import static io.servicetalk.concurrent.api.Single.succeeded; import static io.servicetalk.concurrent.api.SourceAdapters.fromSource; import static io.servicetalk.concurrent.api.SourceAdapters.toSource; import static java.lang.Integer.toHexString; @@ -86,9 +84,7 @@ final class NewRoundRobinLoadBalancer usedHostsUpdater = AtomicReferenceFieldUpdater.newUpdater(NewRoundRobinLoadBalancer.class, List.class, "usedHosts"); - @SuppressWarnings("rawtypes") - private static final AtomicIntegerFieldUpdater indexUpdater = - AtomicIntegerFieldUpdater.newUpdater(NewRoundRobinLoadBalancer.class, "index"); + @SuppressWarnings("rawtypes") private static final AtomicLongFieldUpdater nextResubscribeTimeUpdater = AtomicLongFieldUpdater.newUpdater(NewRoundRobinLoadBalancer.class, "nextResubscribeTime"); @@ -96,12 +92,11 @@ final class NewRoundRobinLoadBalancer> usedHosts = emptyList(); private final String targetResource; private final String lbDescription; + private final RoundRobinSelector algorithm; private final Publisher>> eventPublisher; private final Processor eventStreamProcessor = newPublisherProcessorDropHeadOnOverflow(32); private final Publisher eventStream; @@ -134,6 +129,7 @@ final class NewRoundRobinLoadBalancer(targetResource); this.eventPublisher = requireNonNull(eventPublisher); this.eventStream = fromSource(eventStreamProcessor) .replay(1); // Allow for multiple subscribers and provide new subscribers with last signal. @@ -444,6 +440,33 @@ public Single newConnection(@Nullable final ContextMap context) { return defer(() -> selectConnection0(c -> true, context, true).shareContextOnSubscribe()); } + private Single selectConnection0(final Predicate selector, @Nullable final ContextMap context, + final boolean forceNewConnectionAndReserve) { + final List> currentHosts = this.usedHosts; + if (currentHosts.isEmpty()) { + return isClosedList(currentHosts) ? failedLBClosed(targetResource) : + // This is the case when SD has emitted some items but none of the hosts are available. + failed(Exceptions.StacklessNoAvailableHostException.newInstance( + "No hosts are available to connect for " + targetResource + ".", + NewRoundRobinLoadBalancer.class, "selectConnection0(...)")); + } + + Single result = algorithm.selectConnection(currentHosts, selector, context, forceNewConnectionAndReserve); + if (healthCheckConfig != null) { + result = result.beforeOnError(exn -> { + if (exn instanceof Exceptions.StacklessNoActiveHostException && allUnhealthy(currentHosts)) { + final long currNextResubscribeTime = nextResubscribeTime; + if (currNextResubscribeTime >= 0 && + healthCheckConfig.executor.currentTime(NANOSECONDS) >= currNextResubscribeTime && + nextResubscribeTimeUpdater.compareAndSet(this, currNextResubscribeTime, RESUBSCRIBING)) { + subscribeToEvents(true); + } + } + }); + } + return result; + } + @Override public Publisher eventStream() { return eventStream; @@ -454,58 +477,6 @@ public String toString() { return lbDescription; } - private Single selectConnection0(final Predicate selector, @Nullable final ContextMap context, - final boolean forceNewConnectionAndReserve) { - final List> usedHosts = this.usedHosts; - if (usedHosts.isEmpty()) { - return isClosedList(usedHosts) ? failedLBClosed(targetResource) : - // This is the case when SD has emitted some items but none of the hosts are available. - failed(Exceptions.StacklessNoAvailableHostException.newInstance( - "No hosts are available to connect for " + targetResource + ".", - NewRoundRobinLoadBalancer.class, "selectConnection0(...)")); - } - - // try one loop over hosts and if all are expired, give up - final int cursor = (indexUpdater.getAndIncrement(this) & Integer.MAX_VALUE) % usedHosts.size(); - Host pickedHost = null; - for (int i = 0; i < usedHosts.size(); ++i) { - // for a particular iteration we maintain a local cursor without contention with other requests - final int localCursor = (cursor + i) % usedHosts.size(); - final Host host = usedHosts.get(localCursor); - assert host != null : "Host can't be null."; - - if (!forceNewConnectionAndReserve) { - // First see if an existing connection can be used - C connection = host.pickConnection(selector, context); - if (connection != null) { - return succeeded(connection); - } - } - - // Don't open new connections for expired or unhealthy hosts, try a different one. - // Unhealthy hosts have no open connections – that's why we don't fail earlier, the loop will not progress. - if (host.isActiveAndHealthy()) { - pickedHost = host; - break; - } - } - if (pickedHost == null) { - if (healthCheckConfig != null && allUnhealthy(usedHosts)) { - final long currNextResubscribeTime = nextResubscribeTime; - if (currNextResubscribeTime >= 0 && - healthCheckConfig.executor.currentTime(NANOSECONDS) >= currNextResubscribeTime && - nextResubscribeTimeUpdater.compareAndSet(this, currNextResubscribeTime, RESUBSCRIBING)) { - subscribeToEvents(true); - } - } - return failed(Exceptions.StacklessNoActiveHostException.newInstance("Failed to pick an active host for " + - targetResource + ". Either all are busy, expired, or unhealthy: " + usedHosts, - NewRoundRobinLoadBalancer.class, "selectConnection0(...)")); - } - // No connection was selected: create a new one. - return pickedHost.newConnection(selector, forceNewConnectionAndReserve, context); - } - @Override public Completable onClose() { return asyncCloseable.onClose(); diff --git a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinSelector.java b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinSelector.java new file mode 100644 index 0000000000..106f23dc2f --- /dev/null +++ b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinSelector.java @@ -0,0 +1,80 @@ +/* + * Copyright © 2018-2023 Apple Inc. and the ServiceTalk project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.servicetalk.loadbalancer; + +import io.servicetalk.client.api.LoadBalancedConnection; +import io.servicetalk.concurrent.api.Single; +import io.servicetalk.context.api.ContextMap; + +import java.util.List; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.function.Predicate; +import javax.annotation.Nullable; + +import static io.servicetalk.concurrent.api.Single.failed; +import static io.servicetalk.concurrent.api.Single.succeeded; + +final class RoundRobinSelector { + + @SuppressWarnings("rawtypes") + private static final AtomicIntegerFieldUpdater indexUpdater = + AtomicIntegerFieldUpdater.newUpdater(RoundRobinSelector.class, "index"); + + private final String targetResource; + @SuppressWarnings("unused") + private volatile int index; + + RoundRobinSelector(String targetResource) { + this.targetResource = targetResource; + } + + Single selectConnection( + final List> usedHosts, + final Predicate selector, @Nullable final ContextMap context, + final boolean forceNewConnectionAndReserve) { + // try one loop over hosts and if all are expired, give up + final int cursor = (indexUpdater.getAndIncrement(this) & Integer.MAX_VALUE) % usedHosts.size(); + Host pickedHost = null; + for (int i = 0; i < usedHosts.size(); ++i) { + // for a particular iteration we maintain a local cursor without contention with other requests + final int localCursor = (cursor + i) % usedHosts.size(); + final Host host = usedHosts.get(localCursor); + assert host != null : "Host can't be null."; + + if (!forceNewConnectionAndReserve) { + // First see if an existing connection can be used + C connection = host.pickConnection(selector, context); + if (connection != null) { + return succeeded(connection); + } + } + + // Don't open new connections for expired or unhealthy hosts, try a different one. + // Unhealthy hosts have no open connections – that's why we don't fail earlier, the loop will not progress. + if (host.isActiveAndHealthy()) { + pickedHost = host; + break; + } + } + if (pickedHost == null) { + return failed(Exceptions.StacklessNoActiveHostException.newInstance("Failed to pick an active host for " + + targetResource + ". Either all are busy, expired, or unhealthy: " + usedHosts, + this.getClass(), "selectConnection(...)")); + } + // We have a host but no connection was selected: create a new one. + return pickedHost.newConnection(selector, forceNewConnectionAndReserve, context); + } +} From 1ce9de3b65f94e8136d117aae4d3d2caa32a6114 Mon Sep 17 00:00:00 2001 From: Bryce Anderson Date: Tue, 31 Oct 2023 10:29:56 -0600 Subject: [PATCH 2/3] Abstract the host selection interface --- .../loadbalancer/HostSelector.java | 55 +++++++++++++++++++ .../NewRoundRobinLoadBalancer.java | 11 ++-- .../loadbalancer/RoundRobinSelector.java | 16 +++++- 3 files changed, 75 insertions(+), 7 deletions(-) create mode 100644 servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/HostSelector.java diff --git a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/HostSelector.java b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/HostSelector.java new file mode 100644 index 0000000000..fdd204cc99 --- /dev/null +++ b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/HostSelector.java @@ -0,0 +1,55 @@ +/* + * Copyright © 2018-2023 Apple Inc. and the ServiceTalk project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.servicetalk.loadbalancer; + +import io.servicetalk.client.api.LoadBalancedConnection; +import io.servicetalk.concurrent.api.Single; +import io.servicetalk.context.api.ContextMap; + +import java.util.List; +import java.util.function.Predicate; +import javax.annotation.Nullable; + +/** + * Interface abstracting away the method of host selection. + * + * This is useful for supporting multiple load balancing algorithms such as round-robin, + * weighted round-robin, random, and least-loaded where each algorithm only needing to concern + * itself with the data structures important to the algorithm itself. + */ +interface HostSelector { + + /** + * Notify the selector that the set of available hosts has changed. + * + * This method gives the host selector the opportunity to modify or rebuild any + * internal data structures that it uses in it's job of distributing traffic amongst + * the hosts. + * This method will not be called concurrently. + * + * @param hosts the current set of available hosts. + */ + void hostSetChanged(List> hosts); + + /** + * Select or establish a new connection from an exist Host. + * + * This method will be called concurrently with other selectConnection calls and + * hostSetChanged calls and must be thread safe under those conditions. + */ + Single selectConnection(Predicate selector, @Nullable ContextMap context, + boolean forceNewConnectionAndReserve); +} diff --git a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/NewRoundRobinLoadBalancer.java b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/NewRoundRobinLoadBalancer.java index 44f46f0a51..fd6e13d0a8 100644 --- a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/NewRoundRobinLoadBalancer.java +++ b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/NewRoundRobinLoadBalancer.java @@ -96,7 +96,7 @@ final class NewRoundRobinLoadBalancer algorithm; + private final HostSelector hostSelector; private final Publisher>> eventPublisher; private final Processor eventStreamProcessor = newPublisherProcessorDropHeadOnOverflow(32); private final Publisher eventStream; @@ -129,7 +129,7 @@ final class NewRoundRobinLoadBalancer(targetResource); + this.hostSelector = new RoundRobinSelector<>(targetResource); this.eventPublisher = requireNonNull(eventPublisher); this.eventStream = fromSource(eventStreamProcessor) .replay(1); // Allow for multiple subscribers and provide new subscribers with last signal. @@ -302,6 +302,8 @@ public void onNext(@Nullable final Collection> currentHosts = usedHosts; + hostSelector.hostSetChanged(currentHosts); if (firstEventsAfterResubscribe) { // We can enter this path only if we re-subscribed because all previous hosts were UNHEALTHY. if (events.isEmpty()) { @@ -319,7 +321,6 @@ public void onNext(@Nullable final Collection> currentHosts = usedHosts; for (Host host : currentHosts) { if (notAvailable(host, events)) { host.closeAsyncGracefully().subscribe(); @@ -443,6 +444,8 @@ public Single newConnection(@Nullable final ContextMap context) { private Single selectConnection0(final Predicate selector, @Nullable final ContextMap context, final boolean forceNewConnectionAndReserve) { final List> currentHosts = this.usedHosts; + // It's possible that we're racing with updates from the `onNext` method but since it's intrinsically + // racy it's fine to do these 'are there any hosts at all' checks here using the total host set. if (currentHosts.isEmpty()) { return isClosedList(currentHosts) ? failedLBClosed(targetResource) : // This is the case when SD has emitted some items but none of the hosts are available. @@ -451,7 +454,7 @@ private Single selectConnection0(final Predicate selector, @Nullable final NewRoundRobinLoadBalancer.class, "selectConnection0(...)")); } - Single result = algorithm.selectConnection(currentHosts, selector, context, forceNewConnectionAndReserve); + Single result = hostSelector.selectConnection(selector, context, forceNewConnectionAndReserve); if (healthCheckConfig != null) { result = result.beforeOnError(exn -> { if (exn instanceof Exceptions.StacklessNoActiveHostException && allUnhealthy(currentHosts)) { diff --git a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinSelector.java b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinSelector.java index 106f23dc2f..14c53944fe 100644 --- a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinSelector.java +++ b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinSelector.java @@ -19,6 +19,7 @@ import io.servicetalk.concurrent.api.Single; import io.servicetalk.context.api.ContextMap; +import java.util.Collections; import java.util.List; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.function.Predicate; @@ -27,24 +28,33 @@ import static io.servicetalk.concurrent.api.Single.failed; import static io.servicetalk.concurrent.api.Single.succeeded; -final class RoundRobinSelector { +final class RoundRobinSelector + implements HostSelector { @SuppressWarnings("rawtypes") private static final AtomicIntegerFieldUpdater indexUpdater = AtomicIntegerFieldUpdater.newUpdater(RoundRobinSelector.class, "index"); private final String targetResource; + private volatile List> hosts; @SuppressWarnings("unused") private volatile int index; RoundRobinSelector(String targetResource) { this.targetResource = targetResource; + hosts = Collections.emptyList(); } - Single selectConnection( - final List> usedHosts, + @Override + public void hostSetChanged(List> hosts) { + this.hosts = hosts; + } + + @Override + public Single selectConnection( final Predicate selector, @Nullable final ContextMap context, final boolean forceNewConnectionAndReserve) { + final List> usedHosts = hosts; // try one loop over hosts and if all are expired, give up final int cursor = (indexUpdater.getAndIncrement(this) & Integer.MAX_VALUE) % usedHosts.size(); Host pickedHost = null; From eba782b25f57891bc4efbfdc28349213fed387a8 Mon Sep 17 00:00:00 2001 From: Bryce Anderson Date: Wed, 1 Nov 2023 14:20:15 -0600 Subject: [PATCH 3/3] Feedback --- .../loadbalancer/HostSelector.java | 25 ++++--------------- .../NewRoundRobinLoadBalancer.java | 3 +-- .../loadbalancer/RoundRobinSelector.java | 17 ++++--------- 3 files changed, 11 insertions(+), 34 deletions(-) diff --git a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/HostSelector.java b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/HostSelector.java index fdd204cc99..4c76a69017 100644 --- a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/HostSelector.java +++ b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/HostSelector.java @@ -1,5 +1,5 @@ /* - * Copyright © 2018-2023 Apple Inc. and the ServiceTalk project authors + * Copyright © 2023 Apple Inc. and the ServiceTalk project authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,35 +21,20 @@ import java.util.List; import java.util.function.Predicate; +import javax.annotation.Nonnull; import javax.annotation.Nullable; /** * Interface abstracting away the method of host selection. - * - * This is useful for supporting multiple load balancing algorithms such as round-robin, - * weighted round-robin, random, and least-loaded where each algorithm only needing to concern - * itself with the data structures important to the algorithm itself. */ interface HostSelector { /** - * Notify the selector that the set of available hosts has changed. - * - * This method gives the host selector the opportunity to modify or rebuild any - * internal data structures that it uses in it's job of distributing traffic amongst - * the hosts. - * This method will not be called concurrently. - * - * @param hosts the current set of available hosts. - */ - void hostSetChanged(List> hosts); - - /** - * Select or establish a new connection from an exist Host. + * Select or establish a new connection from an existing Host. * * This method will be called concurrently with other selectConnection calls and * hostSetChanged calls and must be thread safe under those conditions. */ - Single selectConnection(Predicate selector, @Nullable ContextMap context, - boolean forceNewConnectionAndReserve); + Single selectConnection(@Nonnull List> hosts, @Nonnull Predicate selector, + @Nullable ContextMap context, boolean forceNewConnectionAndReserve); } diff --git a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/NewRoundRobinLoadBalancer.java b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/NewRoundRobinLoadBalancer.java index fd6e13d0a8..d228692d0e 100644 --- a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/NewRoundRobinLoadBalancer.java +++ b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/NewRoundRobinLoadBalancer.java @@ -303,7 +303,6 @@ public void onNext(@Nullable final Collection> currentHosts = usedHosts; - hostSelector.hostSetChanged(currentHosts); if (firstEventsAfterResubscribe) { // We can enter this path only if we re-subscribed because all previous hosts were UNHEALTHY. if (events.isEmpty()) { @@ -454,7 +453,7 @@ private Single selectConnection0(final Predicate selector, @Nullable final NewRoundRobinLoadBalancer.class, "selectConnection0(...)")); } - Single result = hostSelector.selectConnection(selector, context, forceNewConnectionAndReserve); + Single result = hostSelector.selectConnection(currentHosts, selector, context, forceNewConnectionAndReserve); if (healthCheckConfig != null) { result = result.beforeOnError(exn -> { if (exn instanceof Exceptions.StacklessNoActiveHostException && allUnhealthy(currentHosts)) { diff --git a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinSelector.java b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinSelector.java index 14c53944fe..fd43f022f8 100644 --- a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinSelector.java +++ b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinSelector.java @@ -1,5 +1,5 @@ /* - * Copyright © 2018-2023 Apple Inc. and the ServiceTalk project authors + * Copyright © 2023 Apple Inc. and the ServiceTalk project authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,7 +19,6 @@ import io.servicetalk.concurrent.api.Single; import io.servicetalk.context.api.ContextMap; -import java.util.Collections; import java.util.List; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.function.Predicate; @@ -27,6 +26,7 @@ import static io.servicetalk.concurrent.api.Single.failed; import static io.servicetalk.concurrent.api.Single.succeeded; +import static java.util.Objects.requireNonNull; final class RoundRobinSelector implements HostSelector { @@ -36,25 +36,18 @@ final class RoundRobinSelector> hosts; @SuppressWarnings("unused") private volatile int index; - RoundRobinSelector(String targetResource) { - this.targetResource = targetResource; - hosts = Collections.emptyList(); - } - - @Override - public void hostSetChanged(List> hosts) { - this.hosts = hosts; + RoundRobinSelector(final String targetResource) { + this.targetResource = requireNonNull(targetResource); } @Override public Single selectConnection( + final List> usedHosts, final Predicate selector, @Nullable final ContextMap context, final boolean forceNewConnectionAndReserve) { - final List> usedHosts = hosts; // try one loop over hosts and if all are expired, give up final int cursor = (indexUpdater.getAndIncrement(this) & Integer.MAX_VALUE) % usedHosts.size(); Host pickedHost = null;