diff --git a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/HostSelector.java b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/HostSelector.java new file mode 100644 index 0000000000..4c76a69017 --- /dev/null +++ b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/HostSelector.java @@ -0,0 +1,40 @@ +/* + * Copyright © 2023 Apple Inc. and the ServiceTalk project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.servicetalk.loadbalancer; + +import io.servicetalk.client.api.LoadBalancedConnection; +import io.servicetalk.concurrent.api.Single; +import io.servicetalk.context.api.ContextMap; + +import java.util.List; +import java.util.function.Predicate; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +/** + * Interface abstracting away the method of host selection. + */ +interface HostSelector { + + /** + * Select or establish a new connection from an existing Host. + * + * This method will be called concurrently with other selectConnection calls and + * hostSetChanged calls and must be thread safe under those conditions. + */ + Single selectConnection(@Nonnull List> hosts, @Nonnull Predicate selector, + @Nullable ContextMap context, boolean forceNewConnectionAndReserve); +} diff --git a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/NewRoundRobinLoadBalancer.java b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/NewRoundRobinLoadBalancer.java index e0744d1c91..d228692d0e 100644 --- a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/NewRoundRobinLoadBalancer.java +++ b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/NewRoundRobinLoadBalancer.java @@ -42,7 +42,6 @@ import java.util.Map.Entry; import java.util.Spliterator; import java.util.concurrent.ThreadLocalRandom; -import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.function.Consumer; @@ -61,7 +60,6 @@ import static io.servicetalk.concurrent.api.Processors.newPublisherProcessorDropHeadOnOverflow; import static io.servicetalk.concurrent.api.Single.defer; import static io.servicetalk.concurrent.api.Single.failed; -import static io.servicetalk.concurrent.api.Single.succeeded; import static io.servicetalk.concurrent.api.SourceAdapters.fromSource; import static io.servicetalk.concurrent.api.SourceAdapters.toSource; import static java.lang.Integer.toHexString; @@ -86,9 +84,7 @@ final class NewRoundRobinLoadBalancer usedHostsUpdater = AtomicReferenceFieldUpdater.newUpdater(NewRoundRobinLoadBalancer.class, List.class, "usedHosts"); - @SuppressWarnings("rawtypes") - private static final AtomicIntegerFieldUpdater indexUpdater = - AtomicIntegerFieldUpdater.newUpdater(NewRoundRobinLoadBalancer.class, "index"); + @SuppressWarnings("rawtypes") private static final AtomicLongFieldUpdater nextResubscribeTimeUpdater = AtomicLongFieldUpdater.newUpdater(NewRoundRobinLoadBalancer.class, "nextResubscribeTime"); @@ -96,12 +92,11 @@ final class NewRoundRobinLoadBalancer> usedHosts = emptyList(); private final String targetResource; private final String lbDescription; + private final HostSelector hostSelector; private final Publisher>> eventPublisher; private final Processor eventStreamProcessor = newPublisherProcessorDropHeadOnOverflow(32); private final Publisher eventStream; @@ -134,6 +129,7 @@ final class NewRoundRobinLoadBalancer(targetResource); this.eventPublisher = requireNonNull(eventPublisher); this.eventStream = fromSource(eventStreamProcessor) .replay(1); // Allow for multiple subscribers and provide new subscribers with last signal. @@ -306,6 +302,7 @@ public void onNext(@Nullable final Collection> currentHosts = usedHosts; if (firstEventsAfterResubscribe) { // We can enter this path only if we re-subscribed because all previous hosts were UNHEALTHY. if (events.isEmpty()) { @@ -323,7 +320,6 @@ public void onNext(@Nullable final Collection> currentHosts = usedHosts; for (Host host : currentHosts) { if (notAvailable(host, events)) { host.closeAsyncGracefully().subscribe(); @@ -444,6 +440,35 @@ public Single newConnection(@Nullable final ContextMap context) { return defer(() -> selectConnection0(c -> true, context, true).shareContextOnSubscribe()); } + private Single selectConnection0(final Predicate selector, @Nullable final ContextMap context, + final boolean forceNewConnectionAndReserve) { + final List> currentHosts = this.usedHosts; + // It's possible that we're racing with updates from the `onNext` method but since it's intrinsically + // racy it's fine to do these 'are there any hosts at all' checks here using the total host set. + if (currentHosts.isEmpty()) { + return isClosedList(currentHosts) ? failedLBClosed(targetResource) : + // This is the case when SD has emitted some items but none of the hosts are available. + failed(Exceptions.StacklessNoAvailableHostException.newInstance( + "No hosts are available to connect for " + targetResource + ".", + NewRoundRobinLoadBalancer.class, "selectConnection0(...)")); + } + + Single result = hostSelector.selectConnection(currentHosts, selector, context, forceNewConnectionAndReserve); + if (healthCheckConfig != null) { + result = result.beforeOnError(exn -> { + if (exn instanceof Exceptions.StacklessNoActiveHostException && allUnhealthy(currentHosts)) { + final long currNextResubscribeTime = nextResubscribeTime; + if (currNextResubscribeTime >= 0 && + healthCheckConfig.executor.currentTime(NANOSECONDS) >= currNextResubscribeTime && + nextResubscribeTimeUpdater.compareAndSet(this, currNextResubscribeTime, RESUBSCRIBING)) { + subscribeToEvents(true); + } + } + }); + } + return result; + } + @Override public Publisher eventStream() { return eventStream; @@ -454,58 +479,6 @@ public String toString() { return lbDescription; } - private Single selectConnection0(final Predicate selector, @Nullable final ContextMap context, - final boolean forceNewConnectionAndReserve) { - final List> usedHosts = this.usedHosts; - if (usedHosts.isEmpty()) { - return isClosedList(usedHosts) ? failedLBClosed(targetResource) : - // This is the case when SD has emitted some items but none of the hosts are available. - failed(Exceptions.StacklessNoAvailableHostException.newInstance( - "No hosts are available to connect for " + targetResource + ".", - NewRoundRobinLoadBalancer.class, "selectConnection0(...)")); - } - - // try one loop over hosts and if all are expired, give up - final int cursor = (indexUpdater.getAndIncrement(this) & Integer.MAX_VALUE) % usedHosts.size(); - Host pickedHost = null; - for (int i = 0; i < usedHosts.size(); ++i) { - // for a particular iteration we maintain a local cursor without contention with other requests - final int localCursor = (cursor + i) % usedHosts.size(); - final Host host = usedHosts.get(localCursor); - assert host != null : "Host can't be null."; - - if (!forceNewConnectionAndReserve) { - // First see if an existing connection can be used - C connection = host.pickConnection(selector, context); - if (connection != null) { - return succeeded(connection); - } - } - - // Don't open new connections for expired or unhealthy hosts, try a different one. - // Unhealthy hosts have no open connections – that's why we don't fail earlier, the loop will not progress. - if (host.isActiveAndHealthy()) { - pickedHost = host; - break; - } - } - if (pickedHost == null) { - if (healthCheckConfig != null && allUnhealthy(usedHosts)) { - final long currNextResubscribeTime = nextResubscribeTime; - if (currNextResubscribeTime >= 0 && - healthCheckConfig.executor.currentTime(NANOSECONDS) >= currNextResubscribeTime && - nextResubscribeTimeUpdater.compareAndSet(this, currNextResubscribeTime, RESUBSCRIBING)) { - subscribeToEvents(true); - } - } - return failed(Exceptions.StacklessNoActiveHostException.newInstance("Failed to pick an active host for " + - targetResource + ". Either all are busy, expired, or unhealthy: " + usedHosts, - NewRoundRobinLoadBalancer.class, "selectConnection0(...)")); - } - // No connection was selected: create a new one. - return pickedHost.newConnection(selector, forceNewConnectionAndReserve, context); - } - @Override public Completable onClose() { return asyncCloseable.onClose(); diff --git a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinSelector.java b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinSelector.java new file mode 100644 index 0000000000..fd43f022f8 --- /dev/null +++ b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinSelector.java @@ -0,0 +1,83 @@ +/* + * Copyright © 2023 Apple Inc. and the ServiceTalk project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.servicetalk.loadbalancer; + +import io.servicetalk.client.api.LoadBalancedConnection; +import io.servicetalk.concurrent.api.Single; +import io.servicetalk.context.api.ContextMap; + +import java.util.List; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.function.Predicate; +import javax.annotation.Nullable; + +import static io.servicetalk.concurrent.api.Single.failed; +import static io.servicetalk.concurrent.api.Single.succeeded; +import static java.util.Objects.requireNonNull; + +final class RoundRobinSelector + implements HostSelector { + + @SuppressWarnings("rawtypes") + private static final AtomicIntegerFieldUpdater indexUpdater = + AtomicIntegerFieldUpdater.newUpdater(RoundRobinSelector.class, "index"); + + private final String targetResource; + @SuppressWarnings("unused") + private volatile int index; + + RoundRobinSelector(final String targetResource) { + this.targetResource = requireNonNull(targetResource); + } + + @Override + public Single selectConnection( + final List> usedHosts, + final Predicate selector, @Nullable final ContextMap context, + final boolean forceNewConnectionAndReserve) { + // try one loop over hosts and if all are expired, give up + final int cursor = (indexUpdater.getAndIncrement(this) & Integer.MAX_VALUE) % usedHosts.size(); + Host pickedHost = null; + for (int i = 0; i < usedHosts.size(); ++i) { + // for a particular iteration we maintain a local cursor without contention with other requests + final int localCursor = (cursor + i) % usedHosts.size(); + final Host host = usedHosts.get(localCursor); + assert host != null : "Host can't be null."; + + if (!forceNewConnectionAndReserve) { + // First see if an existing connection can be used + C connection = host.pickConnection(selector, context); + if (connection != null) { + return succeeded(connection); + } + } + + // Don't open new connections for expired or unhealthy hosts, try a different one. + // Unhealthy hosts have no open connections – that's why we don't fail earlier, the loop will not progress. + if (host.isActiveAndHealthy()) { + pickedHost = host; + break; + } + } + if (pickedHost == null) { + return failed(Exceptions.StacklessNoActiveHostException.newInstance("Failed to pick an active host for " + + targetResource + ". Either all are busy, expired, or unhealthy: " + usedHosts, + this.getClass(), "selectConnection(...)")); + } + // We have a host but no connection was selected: create a new one. + return pickedHost.newConnection(selector, forceNewConnectionAndReserve, context); + } +}