From 941be68560cd27701179aa944dd77095851aefae Mon Sep 17 00:00:00 2001 From: Bryce Anderson Date: Tue, 24 Oct 2023 14:38:32 -0600 Subject: [PATCH 1/4] Clone the RR load balancer in preparation for refactoring Motivation: We are going to start refactoring our load balancing components and in order to preserve the current stable form we're cloning it into another implementation (currently identical) which we can then start to pull apart. Modifications: - Add a private interface TestableLoadBalancer to expose a generic way of surfacing the current host set for tests. - Clone RoundRobinLoadBalancer into NewRoundRobinLoadBalancer which is currently identical to the original other than it's name. - Thread through the option to switch round robin load balancer implementation in the tests so we can detect regressions in the new implementation without text-copying the tests. When we get to the point we want to try it out we can start to surface the switch in the public interfaces. Result: What is the result of this change? --- .../NewRoundRobinLoadBalancer.java | 787 ++++++++++++++++++ .../loadbalancer/RoundRobinLoadBalancer.java | 6 +- .../RoundRobinLoadBalancerFactory.java | 24 +- .../loadbalancer/TestableLoadBalancer.java | 30 + .../EagerNewRoundRobinLoadBalancerTest.java | 24 + ...ingeringNewRoundRobinLoadBalancerTest.java | 24 + .../RoundRobinLoadBalancerTest.java | 36 +- 7 files changed, 908 insertions(+), 23 deletions(-) create mode 100644 servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/NewRoundRobinLoadBalancer.java create mode 100644 servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/TestableLoadBalancer.java create mode 100644 servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/EagerNewRoundRobinLoadBalancerTest.java create mode 100644 servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/LingeringNewRoundRobinLoadBalancerTest.java diff --git a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/NewRoundRobinLoadBalancer.java b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/NewRoundRobinLoadBalancer.java new file mode 100644 index 0000000000..54bbce2343 --- /dev/null +++ b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/NewRoundRobinLoadBalancer.java @@ -0,0 +1,787 @@ +/* + * Copyright © 2018-2023 Apple Inc. and the ServiceTalk project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.servicetalk.loadbalancer; + +import io.servicetalk.client.api.ConnectionFactory; +import io.servicetalk.client.api.LoadBalancedConnection; +import io.servicetalk.client.api.LoadBalancer; +import io.servicetalk.client.api.ServiceDiscovererEvent; +import io.servicetalk.concurrent.PublisherSource.Processor; +import io.servicetalk.concurrent.PublisherSource.Subscriber; +import io.servicetalk.concurrent.PublisherSource.Subscription; +import io.servicetalk.concurrent.api.Completable; +import io.servicetalk.concurrent.api.CompositeCloseable; +import io.servicetalk.concurrent.api.ListenableAsyncCloseable; +import io.servicetalk.concurrent.api.Publisher; +import io.servicetalk.concurrent.api.Single; +import io.servicetalk.concurrent.internal.SequentialCancellable; +import io.servicetalk.context.api.ContextMap; +import io.servicetalk.loadbalancer.Exceptions.StacklessConnectionRejectedException; +import io.servicetalk.loadbalancer.Exceptions.StacklessNoActiveHostException; +import io.servicetalk.loadbalancer.Exceptions.StacklessNoAvailableHostException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.ListIterator; +import java.util.Map.Entry; +import java.util.Spliterator; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.concurrent.atomic.AtomicLongFieldUpdater; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import java.util.function.Consumer; +import java.util.function.Predicate; +import java.util.function.UnaryOperator; +import java.util.stream.Stream; +import javax.annotation.Nullable; + +import static io.servicetalk.client.api.LoadBalancerReadyEvent.LOAD_BALANCER_NOT_READY_EVENT; +import static io.servicetalk.client.api.LoadBalancerReadyEvent.LOAD_BALANCER_READY_EVENT; +import static io.servicetalk.client.api.ServiceDiscovererEvent.Status.AVAILABLE; +import static io.servicetalk.client.api.ServiceDiscovererEvent.Status.EXPIRED; +import static io.servicetalk.client.api.ServiceDiscovererEvent.Status.UNAVAILABLE; +import static io.servicetalk.concurrent.api.AsyncCloseables.newCompositeCloseable; +import static io.servicetalk.concurrent.api.AsyncCloseables.toAsyncCloseable; +import static io.servicetalk.concurrent.api.Processors.newPublisherProcessorDropHeadOnOverflow; +import static io.servicetalk.concurrent.api.Single.defer; +import static io.servicetalk.concurrent.api.Single.failed; +import static io.servicetalk.concurrent.api.Single.succeeded; +import static io.servicetalk.concurrent.api.SourceAdapters.fromSource; +import static io.servicetalk.concurrent.api.SourceAdapters.toSource; +import static java.lang.Integer.toHexString; +import static java.lang.Math.min; +import static java.lang.System.identityHashCode; +import static java.util.Collections.emptyList; +import static java.util.Collections.singletonList; +import static java.util.Objects.requireNonNull; +import static java.util.concurrent.TimeUnit.NANOSECONDS; +import static java.util.stream.Collectors.toList; + +/** + * Consult {@link RoundRobinLoadBalancerFactory} for a description of this {@link LoadBalancer} type. + * + * @param The resolved address type. + * @param The type of connection. + */ +final class NewRoundRobinLoadBalancer + implements TestableLoadBalancer { + + private static final Logger LOGGER = LoggerFactory.getLogger(NewRoundRobinLoadBalancer.class); + + @SuppressWarnings("rawtypes") + private static final AtomicReferenceFieldUpdater usedHostsUpdater = + AtomicReferenceFieldUpdater.newUpdater(NewRoundRobinLoadBalancer.class, List.class, "usedHosts"); + @SuppressWarnings("rawtypes") + private static final AtomicIntegerFieldUpdater indexUpdater = + AtomicIntegerFieldUpdater.newUpdater(NewRoundRobinLoadBalancer.class, "index"); + @SuppressWarnings("rawtypes") + private static final AtomicLongFieldUpdater nextResubscribeTimeUpdater = + AtomicLongFieldUpdater.newUpdater(NewRoundRobinLoadBalancer.class, "nextResubscribeTime"); + + private static final long RESUBSCRIBING = -1L; + + /** + * With a relatively small number of connections we can minimize connection creation under moderate concurrency by + * exhausting the full search space without sacrificing too much latency caused by the cost of a CAS operation per + * selection attempt. + */ + private static final int MIN_RANDOM_SEARCH_SPACE = 64; + + /** + * For larger search spaces, due to the cost of a CAS operation per selection attempt we see diminishing returns for + * trying to locate an available connection when most connections are in use. This increases tail latencies, thus + * after some number of failed attempts it appears to be more beneficial to open a new connection instead. + *

+ * The current heuristics were chosen based on a set of benchmarks under various circumstances, low connection + * counts, larger connection counts, low connection churn, high connection churn. + */ + private static final float RANDOM_SEARCH_FACTOR = 0.75f; + + private volatile long nextResubscribeTime = RESUBSCRIBING; + @SuppressWarnings("unused") + private volatile int index; + private volatile List> usedHosts = emptyList(); + + private final String id; + private final String targetResource; + private final Publisher>> eventPublisher; + private final Processor eventStreamProcessor = newPublisherProcessorDropHeadOnOverflow(32); + private final Publisher eventStream; + private final SequentialCancellable discoveryCancellable = new SequentialCancellable(); + private final ConnectionFactory connectionFactory; + private final int linearSearchSpace; + @Nullable + private final HealthCheckConfig healthCheckConfig; + private final ListenableAsyncCloseable asyncCloseable; + + /** + * Creates a new instance. + * + * @param id a (unique) ID to identify the created {@link NewRoundRobinLoadBalancer}. + * @param targetResourceName {@link String} representation of the target resource for which this instance + * is performing load balancing. + * @param eventPublisher provides a stream of addresses to connect to. + * @param connectionFactory a function which creates new connections. + * @param healthCheckConfig configuration for the health checking mechanism, which monitors hosts that + * are unable to have a connection established. Providing {@code null} disables this mechanism (meaning the host + * continues being eligible for connecting on the request path). + * @see RoundRobinLoadBalancerFactory + */ + NewRoundRobinLoadBalancer( + final String id, + final String targetResourceName, + final Publisher>> eventPublisher, + final ConnectionFactory connectionFactory, + final int linearSearchSpace, + @Nullable final HealthCheckConfig healthCheckConfig) { + this.id = id + '@' + toHexString(identityHashCode(this)); + this.targetResource = requireNonNull(targetResourceName); + this.eventPublisher = requireNonNull(eventPublisher); + this.eventStream = fromSource(eventStreamProcessor) + .replay(1); // Allow for multiple subscribers and provide new subscribers with last signal. + this.connectionFactory = requireNonNull(connectionFactory); + this.linearSearchSpace = linearSearchSpace; + this.healthCheckConfig = healthCheckConfig; + this.asyncCloseable = toAsyncCloseable(graceful -> { + discoveryCancellable.cancel(); + eventStreamProcessor.onComplete(); + final CompositeCloseable compositeCloseable; + for (;;) { + List> currentList = usedHosts; + if (isClosedList(currentList) || + usedHostsUpdater.compareAndSet(this, currentList, new ClosedList<>(currentList))) { + compositeCloseable = newCompositeCloseable().appendAll(currentList).appendAll(connectionFactory); + LOGGER.debug("{} is closing {}gracefully. Last seen addresses (size={}): {}.", + this, graceful ? "" : "non", currentList.size(), currentList); + break; + } + } + return (graceful ? compositeCloseable.closeAsyncGracefully() : compositeCloseable.closeAsync()) + .beforeOnError(t -> { + if (!graceful) { + usedHosts = new ClosedList<>(emptyList()); + } + }) + .beforeOnComplete(() -> usedHosts = new ClosedList<>(emptyList())); + }); + // Maintain a Subscriber so signals are always delivered to replay and new Subscribers get the latest signal. + eventStream.ignoreElements().subscribe(); + subscribeToEvents(false); + } + + private void subscribeToEvents(boolean resubscribe) { + // This method is invoked only when we are in RESUBSCRIBING state. Only one thread can own this state. + assert nextResubscribeTime == RESUBSCRIBING; + if (resubscribe) { + LOGGER.debug("{}: resubscribing to the ServiceDiscoverer event publisher.", this); + discoveryCancellable.cancelCurrent(); + } + toSource(eventPublisher).subscribe(new EventSubscriber(resubscribe)); + if (healthCheckConfig != null) { + assert healthCheckConfig.executor instanceof NormalizedTimeSourceExecutor; + nextResubscribeTime = nextResubscribeTime(healthCheckConfig, this); + } + } + + private static long nextResubscribeTime( + final HealthCheckConfig config, final NewRoundRobinLoadBalancer lb) { + final long lower = config.healthCheckResubscribeLowerBound; + final long upper = config.healthCheckResubscribeUpperBound; + final long currentTime = config.executor.currentTime(NANOSECONDS); + final long result = currentTime + (lower == upper ? lower : ThreadLocalRandom.current().nextLong(lower, upper)); + LOGGER.debug("{}: current time {}, next resubscribe attempt can be performed at {}.", + lb, currentTime, result); + return result; + } + + private static boolean allUnhealthy( + final List> usedHosts) { + boolean allUnhealthy = !usedHosts.isEmpty(); + for (Host host : usedHosts) { + if (!host.isUnhealthy()) { + allUnhealthy = false; + break; + } + } + return allUnhealthy; + } + + private static boolean onlyAvailable( + final Collection> events) { + boolean onlyAvailable = !events.isEmpty(); + for (ServiceDiscovererEvent event : events) { + if (!AVAILABLE.equals(event.status())) { + onlyAvailable = false; + break; + } + } + return onlyAvailable; + } + + private static boolean notAvailable( + final Host host, + final Collection> events) { + boolean available = false; + for (ServiceDiscovererEvent event : events) { + if (host.address.equals(event.address())) { + available = true; + break; + } + } + return !available; + } + + private final class EventSubscriber + implements Subscriber>> { + + private boolean firstEventsAfterResubscribe; + + EventSubscriber(boolean resubscribe) { + this.firstEventsAfterResubscribe = resubscribe; + } + + @Override + public void onSubscribe(final Subscription s) { + // We request max value here to make sure we do not access Subscription concurrently + // (requestN here and cancel from discoveryCancellable). If we request-1 in onNext we would have to wrap + // the Subscription in a ConcurrentSubscription which is costly. + // Since, we synchronously process onNexts we do not really care about flow control. + s.request(Long.MAX_VALUE); + discoveryCancellable.nextCancellable(s); + } + + @Override + public void onNext(@Nullable final Collection> events) { + if (events == null) { + LOGGER.debug("{}: unexpectedly received null instead of events.", NewRoundRobinLoadBalancer.this); + return; + } + for (ServiceDiscovererEvent event : events) { + final ServiceDiscovererEvent.Status eventStatus = event.status(); + LOGGER.debug("{}: received new ServiceDiscoverer event {}. Inferred status: {}.", + NewRoundRobinLoadBalancer.this, event, eventStatus); + + @SuppressWarnings("unchecked") + final List> usedAddresses = + usedHostsUpdater.updateAndGet(NewRoundRobinLoadBalancer.this, oldHosts -> { + if (isClosedList(oldHosts)) { + return oldHosts; + } + final ResolvedAddress addr = requireNonNull(event.address()); + @SuppressWarnings("unchecked") + final List> oldHostsTyped = + (List>) oldHosts; + + if (AVAILABLE.equals(eventStatus)) { + return addHostToList(oldHostsTyped, addr); + } else if (EXPIRED.equals(eventStatus)) { + if (oldHostsTyped.isEmpty()) { + return emptyList(); + } else { + return markHostAsExpired(oldHostsTyped, addr); + } + } else if (UNAVAILABLE.equals(eventStatus)) { + return listWithHostRemoved(oldHostsTyped, host -> { + boolean match = host.address.equals(addr); + if (match) { + host.markClosed(); + } + return match; + }); + } else { + LOGGER.error("{}: Unexpected Status in event:" + + " {} (mapped to {}). Leaving usedHosts unchanged: {}", + NewRoundRobinLoadBalancer.this, event, eventStatus, oldHosts); + return oldHosts; + } + }); + + LOGGER.debug("{}: now using addresses (size={}): {}.", + NewRoundRobinLoadBalancer.this, usedAddresses.size(), usedAddresses); + + if (AVAILABLE.equals(eventStatus)) { + if (usedAddresses.size() == 1) { + eventStreamProcessor.onNext(LOAD_BALANCER_READY_EVENT); + } + } else if (usedAddresses.isEmpty()) { + eventStreamProcessor.onNext(LOAD_BALANCER_NOT_READY_EVENT); + } + } + + if (firstEventsAfterResubscribe) { + // We can enter this path only if we re-subscribed because all previous hosts were UNHEALTHY. + if (events.isEmpty()) { + return; // Wait for the next collection of events. + } + firstEventsAfterResubscribe = false; + + if (!onlyAvailable(events)) { + // Looks like the current ServiceDiscoverer maintains a state between re-subscribes. It already + // assigned correct states to all hosts. Even if some of them were left UNHEALTHY, we should keep + // running health-checks. + return; + } + // Looks like the current ServiceDiscoverer doesn't maintain a state between re-subscribes and always + // starts from an empty state propagating only AVAILABLE events. To be in sync with the + // ServiceDiscoverer we should clean up and close gracefully all hosts that are not present in the + // initial collection of events, regardless of their current state. + final List> currentHosts = usedHosts; + for (Host host : currentHosts) { + if (notAvailable(host, events)) { + host.closeAsyncGracefully().subscribe(); + } + } + } + } + + private List> markHostAsExpired( + final List> oldHostsTyped, final ResolvedAddress addr) { + for (Host host : oldHostsTyped) { + if (host.address.equals(addr)) { + // Host removal will be handled by the Host's onClose::afterFinally callback + host.markExpired(); + break; // because duplicates are not allowed, we can stop iteration + } + } + return oldHostsTyped; + } + + private Host createHost(ResolvedAddress addr) { + // All hosts will share the healthcheck config of the parent RR loadbalancer. + Host host = new Host<>(NewRoundRobinLoadBalancer.this.toString(), addr, + healthCheckConfig); + host.onClose().afterFinally(() -> + usedHostsUpdater.updateAndGet(NewRoundRobinLoadBalancer.this, previousHosts -> { + @SuppressWarnings("unchecked") + List> previousHostsTyped = + (List>) previousHosts; + return listWithHostRemoved(previousHostsTyped, current -> current == host); + } + )).subscribe(); + return host; + } + + private List> addHostToList( + List> oldHostsTyped, ResolvedAddress addr) { + if (oldHostsTyped.isEmpty()) { + return singletonList(createHost(addr)); + } + + // duplicates are not allowed + for (Host host : oldHostsTyped) { + if (host.address.equals(addr)) { + if (!host.markActiveIfNotClosed()) { + // If the host is already in CLOSED state, we should create a new entry. + // For duplicate ACTIVE events or for repeated activation due to failed CAS + // of replacing the usedHosts array the marking succeeds so we will not add a new entry. + break; + } + return oldHostsTyped; + } + } + + final List> newHosts = new ArrayList<>(oldHostsTyped.size() + 1); + newHosts.addAll(oldHostsTyped); + newHosts.add(createHost(addr)); + return newHosts; + } + + private List> listWithHostRemoved( + List> oldHostsTyped, Predicate> hostPredicate) { + if (oldHostsTyped.isEmpty()) { + // this can happen when an expired host is removed during closing of the RoundRobinLoadBalancer, + // but all of its connections have already been closed + return oldHostsTyped; + } + final List> newHosts = new ArrayList<>(oldHostsTyped.size() - 1); + for (int i = 0; i < oldHostsTyped.size(); ++i) { + final Host current = oldHostsTyped.get(i); + if (hostPredicate.test(current)) { + for (int x = i + 1; x < oldHostsTyped.size(); ++x) { + newHosts.add(oldHostsTyped.get(x)); + } + return newHosts.isEmpty() ? emptyList() : newHosts; + } else { + newHosts.add(current); + } + } + return newHosts; + } + + @Override + public void onError(final Throwable t) { + List> hosts = usedHosts; + if (healthCheckConfig == null) { + // Terminate processor only if we will never re-subscribe + eventStreamProcessor.onError(t); + } + LOGGER.error( + "{}: service discoverer {} emitted an error. Last seen addresses (size={}): {}.", + NewRoundRobinLoadBalancer.this, eventPublisher, hosts.size(), hosts, t); + } + + @Override + public void onComplete() { + List> hosts = usedHosts; + if (healthCheckConfig == null) { + // Terminate processor only if we will never re-subscribe + eventStreamProcessor.onComplete(); + } + LOGGER.error("{}: service discoverer completed. Last seen addresses (size={}): {}.", + NewRoundRobinLoadBalancer.this, hosts.size(), hosts); + } + } + + private static Single failedLBClosed(String targetResource) { + return failed(new IllegalStateException("LoadBalancer for " + targetResource + " has closed")); + } + + @Override + public Single selectConnection(final Predicate selector, @Nullable final ContextMap context) { + return defer(() -> selectConnection0(selector, context, false).shareContextOnSubscribe()); + } + + @Override + public Single newConnection(@Nullable final ContextMap context) { + return defer(() -> selectConnection0(c -> true, context, true).shareContextOnSubscribe()); + } + + @Override + public Publisher eventStream() { + return eventStream; + } + + @Override + public String toString() { + return "RoundRobinLoadBalancer{" + + "id=" + id + + ", targetResource=" + targetResource + + '}'; + } + + private Single selectConnection0(final Predicate selector, @Nullable final ContextMap context, + final boolean forceNewConnectionAndReserve) { + final List> usedHosts = this.usedHosts; + if (usedHosts.isEmpty()) { + return isClosedList(usedHosts) ? failedLBClosed(targetResource) : + // This is the case when SD has emitted some items but none of the hosts are available. + failed(StacklessNoAvailableHostException.newInstance( + "No hosts are available to connect for " + targetResource + ".", + NewRoundRobinLoadBalancer.class, "selectConnection0(...)")); + } + + // try one loop over hosts and if all are expired, give up + final int cursor = (indexUpdater.getAndIncrement(this) & Integer.MAX_VALUE) % usedHosts.size(); + final ThreadLocalRandom rnd = ThreadLocalRandom.current(); + Host pickedHost = null; + for (int i = 0; i < usedHosts.size(); ++i) { + // for a particular iteration we maintain a local cursor without contention with other requests + final int localCursor = (cursor + i) % usedHosts.size(); + final Host host = usedHosts.get(localCursor); + assert host != null : "Host can't be null."; + + if (!forceNewConnectionAndReserve) { + // Try first to see if an existing connection can be used + final Object[] connections = host.connections(); + // Exhaust the linear search space first: + final int linearAttempts = min(connections.length, linearSearchSpace); + for (int j = 0; j < linearAttempts; ++j) { + @SuppressWarnings("unchecked") + final C connection = (C) connections[j]; + if (selector.test(connection)) { + return succeeded(connection); + } + } + // Try other connections randomly: + if (connections.length > linearAttempts) { + final int diff = connections.length - linearAttempts; + // With small enough search space, attempt number of times equal to number of remaining connections. + // Back off after exploring most of the search space, it gives diminishing returns. + final int randomAttempts = diff < MIN_RANDOM_SEARCH_SPACE ? diff : + (int) (diff * RANDOM_SEARCH_FACTOR); + for (int j = 0; j < randomAttempts; ++j) { + @SuppressWarnings("unchecked") + final C connection = (C) connections[rnd.nextInt(linearAttempts, connections.length)]; + if (selector.test(connection)) { + return succeeded(connection); + } + } + } + } + + // Don't open new connections for expired or unhealthy hosts, try a different one. + // Unhealthy hosts have no open connections – that's why we don't fail earlier, the loop will not progress. + if (host.isActiveAndHealthy()) { + pickedHost = host; + break; + } + } + if (pickedHost == null) { + if (healthCheckConfig != null && allUnhealthy(usedHosts)) { + final long currNextResubscribeTime = nextResubscribeTime; + if (currNextResubscribeTime >= 0 && + healthCheckConfig.executor.currentTime(NANOSECONDS) >= currNextResubscribeTime && + nextResubscribeTimeUpdater.compareAndSet(this, currNextResubscribeTime, RESUBSCRIBING)) { + subscribeToEvents(true); + } + } + return failed(StacklessNoActiveHostException.newInstance("Failed to pick an active host for " + + targetResource + ". Either all are busy, expired, or unhealthy: " + usedHosts, + NewRoundRobinLoadBalancer.class, "selectConnection0(...)")); + } + // No connection was selected: create a new one. + final Host host = pickedHost; + + // This LB implementation does not automatically provide TransportObserver. Therefore, we pass "null" here. + // Users can apply a ConnectionFactoryFilter if they need to override this "null" value with TransportObserver. + Single establishConnection = connectionFactory.newConnection(host.address, context, null); + if (healthCheckConfig != null) { + // Schedule health check before returning + establishConnection = establishConnection.beforeOnError(t -> host.markUnhealthy(t, connectionFactory)); + } + return establishConnection + .flatMap(newCnx -> { + if (forceNewConnectionAndReserve && !newCnx.tryReserve()) { + return newCnx.closeAsync().concat(failed(StacklessConnectionRejectedException.newInstance( + "Newly created connection " + newCnx + " for " + targetResource + + " could not be reserved.", + NewRoundRobinLoadBalancer.class, "selectConnection0(...)"))) + .shareContextOnSubscribe(); + } + + // Invoke the selector before adding the connection to the pool, otherwise, connection can be + // used concurrently and hence a new connection can be rejected by the selector. + if (!selector.test(newCnx)) { + // Failure in selection could be the result of connection factory returning cached connection, + // and not having visibility into max-concurrent-requests, or other threads already selected the + // connection which uses all the max concurrent request count. + + // If there is caching Propagate the exception and rely upon retry strategy. + Single failedSingle = failed(StacklessConnectionRejectedException.newInstance( + "Newly created connection " + newCnx + " for " + targetResource + + " was rejected by the selection filter.", + NewRoundRobinLoadBalancer.class, "selectConnection0(...)")); + + // Just in case the connection is not closed add it to the host so we don't lose track, + // duplicates will be filtered out. + return (host.addConnection(newCnx, null) ? + failedSingle : newCnx.closeAsync().concat(failedSingle)).shareContextOnSubscribe(); + } + if (host.addConnection(newCnx, null)) { + return succeeded(newCnx).shareContextOnSubscribe(); + } + return newCnx.closeAsync().concat(isClosedList(this.usedHosts) ? failedLBClosed(targetResource) : + failed(StacklessConnectionRejectedException.newInstance( + "Failed to add newly created connection " + newCnx + " for " + targetResource + + " for " + host, NewRoundRobinLoadBalancer.class, "selectConnection0(...)"))) + .shareContextOnSubscribe(); + }); + } + + @Override + public Completable onClose() { + return asyncCloseable.onClose(); + } + + @Override + public Completable onClosing() { + return asyncCloseable.onClosing(); + } + + @Override + public Completable closeAsync() { + return asyncCloseable.closeAsync(); + } + + @Override + public Completable closeAsyncGracefully() { + return asyncCloseable.closeAsyncGracefully(); + } + + // Visible for testing + @Override + public List>> usedAddresses() { + return usedHosts.stream().map(Host::asEntry).collect(toList()); + } + + private static boolean isClosedList(List list) { + return list.getClass().equals(ClosedList.class); + } + + private static final class ClosedList implements List { + private final List delegate; + + private ClosedList(final List delegate) { + this.delegate = requireNonNull(delegate); + } + + @Override + public int size() { + return delegate.size(); + } + + @Override + public boolean isEmpty() { + return delegate.isEmpty(); + } + + @Override + public boolean contains(final Object o) { + return delegate.contains(o); + } + + @Override + public Iterator iterator() { + return delegate.iterator(); + } + + @Override + public void forEach(final Consumer action) { + delegate.forEach(action); + } + + @Override + public Object[] toArray() { + return delegate.toArray(); + } + + @Override + public T1[] toArray(final T1[] a) { + return delegate.toArray(a); + } + + @Override + public boolean add(final T t) { + return delegate.add(t); + } + + @Override + public boolean remove(final Object o) { + return delegate.remove(o); + } + + @Override + public boolean containsAll(final Collection c) { + return delegate.containsAll(c); + } + + @Override + public boolean addAll(final Collection c) { + return delegate.addAll(c); + } + + @Override + public boolean addAll(final int index, final Collection c) { + return delegate.addAll(c); + } + + @Override + public boolean removeAll(final Collection c) { + return delegate.removeAll(c); + } + + @Override + public boolean removeIf(final Predicate filter) { + return delegate.removeIf(filter); + } + + @Override + public boolean retainAll(final Collection c) { + return delegate.retainAll(c); + } + + @Override + public void replaceAll(final UnaryOperator operator) { + delegate.replaceAll(operator); + } + + @Override + public void sort(final Comparator c) { + delegate.sort(c); + } + + @Override + public void clear() { + delegate.clear(); + } + + @Override + public T get(final int index) { + return delegate.get(index); + } + + @Override + public T set(final int index, final T element) { + return delegate.set(index, element); + } + + @Override + public void add(final int index, final T element) { + delegate.add(index, element); + } + + @Override + public T remove(final int index) { + return delegate.remove(index); + } + + @Override + public int indexOf(final Object o) { + return delegate.indexOf(o); + } + + @Override + public int lastIndexOf(final Object o) { + return delegate.lastIndexOf(o); + } + + @Override + public ListIterator listIterator() { + return delegate.listIterator(); + } + + @Override + public ListIterator listIterator(final int index) { + return delegate.listIterator(index); + } + + @Override + public List subList(final int fromIndex, final int toIndex) { + return new ClosedList<>(delegate.subList(fromIndex, toIndex)); + } + + @Override + public Spliterator spliterator() { + return delegate.spliterator(); + } + + @Override + public Stream stream() { + return delegate.stream(); + } + + @Override + public Stream parallelStream() { + return delegate.parallelStream(); + } + } +} diff --git a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancer.java b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancer.java index 35c3a1a52f..4cebb04d80 100644 --- a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancer.java +++ b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancer.java @@ -42,6 +42,7 @@ import java.util.Iterator; import java.util.List; import java.util.ListIterator; +import java.util.Map; import java.util.Map.Entry; import java.util.Spliterator; import java.util.concurrent.ThreadLocalRandom; @@ -83,7 +84,7 @@ * @param The type of connection. */ final class RoundRobinLoadBalancer - implements LoadBalancer { + implements TestableLoadBalancer { private static final Logger LOGGER = LoggerFactory.getLogger(RoundRobinLoadBalancer.class); @@ -617,7 +618,8 @@ public Completable closeAsyncGracefully() { } // Visible for testing - List>> usedAddresses() { + @Override + public List>> usedAddresses() { return usedHosts.stream().map(Host::asEntry).collect(toList()); } diff --git a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancerFactory.java b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancerFactory.java index e47b9a3464..29a41719ab 100644 --- a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancerFactory.java +++ b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancerFactory.java @@ -60,14 +60,17 @@ public final class RoundRobinLoadBalancerFactory LoadBalancer newLoadBalancer( final String targetResource, final Publisher>> eventPublisher, final ConnectionFactory connectionFactory) { - return new RoundRobinLoadBalancer<>(id, targetResource, eventPublisher, connectionFactory, + return useNewRoundRobin ? + new NewRoundRobinLoadBalancer<>(id, targetResource, eventPublisher, connectionFactory, + linearSearchSpace, healthCheckConfig) + : new RoundRobinLoadBalancer<>(id, targetResource, eventPublisher, connectionFactory, linearSearchSpace, healthCheckConfig); } @@ -86,7 +92,9 @@ public LoadBalancer newLoadBalancer( final Publisher>> eventPublisher, final ConnectionFactory connectionFactory, final String targetResource) { - return new RoundRobinLoadBalancer<>(id, targetResource, eventPublisher, connectionFactory, + return useNewRoundRobin ? new NewRoundRobinLoadBalancer<>(id, targetResource, eventPublisher, connectionFactory, + linearSearchSpace, healthCheckConfig) + : new RoundRobinLoadBalancer<>(id, targetResource, eventPublisher, connectionFactory, linearSearchSpace, healthCheckConfig); } @@ -109,6 +117,7 @@ public static final class Builder { private final String id; private int linearSearchSpace = 16; + private boolean useNewRoundRobin; @Nullable private Executor backgroundExecutor; private Duration healthCheckInterval = DEFAULT_HEALTH_CHECK_INTERVAL; @@ -145,6 +154,12 @@ public RoundRobinLoadBalancerFactory.Builder linearSearchSpa return this; } + // In the future we'll elevate this to the RoundRobinLoadBalancerBuilder interface. + public RoundRobinLoadBalancerBuilder useNewRoundRobin(boolean useNewRoundRobin) { + this.useNewRoundRobin = useNewRoundRobin; + return this; + } + @Override public RoundRobinLoadBalancerFactory.Builder backgroundExecutor( Executor backgroundExecutor) { @@ -217,15 +232,14 @@ public RoundRobinLoadBalancerFactory.Builder healthCheckFail @Override public RoundRobinLoadBalancerFactory build() { if (this.healthCheckFailedConnectionsThreshold < 0) { - return new RoundRobinLoadBalancerFactory<>(id, linearSearchSpace, null); + return new RoundRobinLoadBalancerFactory<>(id, linearSearchSpace, useNewRoundRobin, null); } HealthCheckConfig healthCheckConfig = new HealthCheckConfig( this.backgroundExecutor == null ? SharedExecutor.getInstance() : this.backgroundExecutor, healthCheckInterval, healthCheckJitter, healthCheckFailedConnectionsThreshold, healthCheckResubscribeLowerBound, healthCheckResubscribeUpperBound); - - return new RoundRobinLoadBalancerFactory<>(id, linearSearchSpace, healthCheckConfig); + return new RoundRobinLoadBalancerFactory<>(id, linearSearchSpace, useNewRoundRobin, healthCheckConfig); } } diff --git a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/TestableLoadBalancer.java b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/TestableLoadBalancer.java new file mode 100644 index 0000000000..4b4a717b45 --- /dev/null +++ b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/TestableLoadBalancer.java @@ -0,0 +1,30 @@ +/* + * Copyright © 2023 Apple Inc. and the ServiceTalk project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.servicetalk.loadbalancer; + +import io.servicetalk.client.api.LoadBalancedConnection; +import io.servicetalk.client.api.LoadBalancer; + +import java.util.List; +import java.util.Map; + +// An intermediate interface so we can universally expose the current list of addresses. This should become +// unnecessary once we extract the logic of managing the host list from the load balancer itself into it's +// own abstraction. +interface TestableLoadBalancer extends LoadBalancer { + + List>> usedAddresses(); +} diff --git a/servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/EagerNewRoundRobinLoadBalancerTest.java b/servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/EagerNewRoundRobinLoadBalancerTest.java new file mode 100644 index 0000000000..e72da88385 --- /dev/null +++ b/servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/EagerNewRoundRobinLoadBalancerTest.java @@ -0,0 +1,24 @@ +/* + * Copyright © 2021 Apple Inc. and the ServiceTalk project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.servicetalk.loadbalancer; + +public class EagerNewRoundRobinLoadBalancerTest extends EagerRoundRobinLoadBalancerTest { + @Override + protected RoundRobinLoadBalancerBuilder baseLoadBalancerBuilder() { + return ((RoundRobinLoadBalancerFactory.Builder) + super.baseLoadBalancerBuilder()).useNewRoundRobin(true); + } +} diff --git a/servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/LingeringNewRoundRobinLoadBalancerTest.java b/servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/LingeringNewRoundRobinLoadBalancerTest.java new file mode 100644 index 0000000000..5c48df374f --- /dev/null +++ b/servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/LingeringNewRoundRobinLoadBalancerTest.java @@ -0,0 +1,24 @@ +/* + * Copyright © 2021 Apple Inc. and the ServiceTalk project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.servicetalk.loadbalancer; + +class LingeringNewRoundRobinLoadBalancerTest extends LingeringRoundRobinLoadBalancerTest { + @Override + protected RoundRobinLoadBalancerBuilder baseLoadBalancerBuilder() { + return ((RoundRobinLoadBalancerFactory.Builder) + super.baseLoadBalancerBuilder()).useNewRoundRobin(true); + } +} diff --git a/servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancerTest.java b/servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancerTest.java index 9f6ba2aff1..f89c72f752 100644 --- a/servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancerTest.java +++ b/servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancerTest.java @@ -149,9 +149,9 @@ abstract class RoundRobinLoadBalancerTest { private DelegatingConnectionFactory connectionFactory = new DelegatingConnectionFactory(this::newRealizedConnectionSingle); - RoundRobinLoadBalancer lb; + TestableLoadBalancer lb; - private TestExecutor testExecutor; + protected TestExecutor testExecutor; static Predicate any() { return __ -> true; @@ -161,17 +161,21 @@ Predicate alwaysNewConnectionFilter() { return cnx -> lb.usedAddresses().stream().noneMatch(addr -> addr.getValue().stream().anyMatch(cnx::equals)); } - RoundRobinLoadBalancer defaultLb() { + TestableLoadBalancer defaultLb() { return newTestLoadBalancer(); } - RoundRobinLoadBalancer defaultLb( + TestableLoadBalancer defaultLb( DelegatingConnectionFactory connectionFactory) { return newTestLoadBalancer(serviceDiscoveryPublisher, connectionFactory); } protected abstract boolean eagerConnectionShutdown(); + protected RoundRobinLoadBalancerBuilder baseLoadBalancerBuilder() { + return RoundRobinLoadBalancers.builder(getClass().getSimpleName()); + } + @BeforeEach void initialize() { testExecutor = executor.executor(); @@ -507,8 +511,8 @@ void disabledHealthCheckDoesntRun() throws Exception { "address-1", timeAdvancementsTillHealthy, properConnection); final DelegatingConnectionFactory connectionFactory = unhealthyHostConnectionFactory.createFactory(); - lb = (RoundRobinLoadBalancer) - RoundRobinLoadBalancers.builder(getClass().getSimpleName()) + lb = (TestableLoadBalancer) + baseLoadBalancerBuilder() .healthCheckFailedConnectionsThreshold(-1) .build() .newLoadBalancer(serviceDiscoveryPublisher, connectionFactory, "test-service"); @@ -604,8 +608,8 @@ void healthCheckRecoversFromUnexpectedError() throws Exception { final DelegatingConnectionFactory connectionFactory = unhealthyHostConnectionFactory.createFactory(); final AtomicInteger scheduleCnt = new AtomicInteger(); - lb = (RoundRobinLoadBalancer) - RoundRobinLoadBalancers.builder(getClass().getSimpleName()) + lb = (TestableLoadBalancer) + baseLoadBalancerBuilder() .healthCheckInterval(ofMillis(50), ofMillis(10)) .healthCheckFailedConnectionsThreshold(1) .backgroundExecutor(new DelegatingExecutor(testExecutor) { @@ -773,8 +777,8 @@ void resubscribeToEventsNotTriggeredWhenDisabled() throws Exception { DelegatingConnectionFactory alwaysFailConnectionFactory = new DelegatingConnectionFactory(address -> Single.failed(UNHEALTHY_HOST_EXCEPTION)); - lb = (RoundRobinLoadBalancer) - RoundRobinLoadBalancers.builder(getClass().getSimpleName()) + lb = (TestableLoadBalancer) + baseLoadBalancerBuilder() .healthCheckInterval(ofMillis(50), ofMillis(10)) // Set resubscribe interval to very large number .healthCheckResubscribeInterval(ofNanos(MAX_VALUE), ZERO) @@ -805,8 +809,8 @@ void resubscribeToEventsNotTriggeredWhenDisabled() throws Exception { void handleAllDiscoveryEvents() throws Exception { serviceDiscoveryPublisher.onComplete(); - lb = (RoundRobinLoadBalancer) - RoundRobinLoadBalancers.builder(getClass().getSimpleName()) + lb = (TestableLoadBalancer) + baseLoadBalancerBuilder() .healthCheckInterval(ofMillis(50), ofMillis(10)) .backgroundExecutor(testExecutor) .build() @@ -894,15 +898,15 @@ ServiceDiscovererEvent downEvent(final String address, ServiceDiscovererEvent.St return new DefaultServiceDiscovererEvent<>(address, status); } - RoundRobinLoadBalancer newTestLoadBalancer() { + TestableLoadBalancer newTestLoadBalancer() { return newTestLoadBalancer(serviceDiscoveryPublisher, connectionFactory); } - RoundRobinLoadBalancer newTestLoadBalancer( + TestableLoadBalancer newTestLoadBalancer( final TestPublisher>> serviceDiscoveryPublisher, final DelegatingConnectionFactory connectionFactory) { - return (RoundRobinLoadBalancer) - RoundRobinLoadBalancers.builder(getClass().getSimpleName()) + return (TestableLoadBalancer) + baseLoadBalancerBuilder() .healthCheckInterval(ofMillis(50), ofMillis(10)) .backgroundExecutor(testExecutor) .build() From 0c036cdaf3006c15fd70aa2b922908a3f0353cc3 Mon Sep 17 00:00:00 2001 From: Bryce Anderson Date: Tue, 24 Oct 2023 15:10:27 -0600 Subject: [PATCH 2/4] Small cleanup --- .../loadbalancer/EagerNewRoundRobinLoadBalancerTest.java | 4 ++-- .../loadbalancer/LingeringNewRoundRobinLoadBalancerTest.java | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/EagerNewRoundRobinLoadBalancerTest.java b/servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/EagerNewRoundRobinLoadBalancerTest.java index e72da88385..b8896813d0 100644 --- a/servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/EagerNewRoundRobinLoadBalancerTest.java +++ b/servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/EagerNewRoundRobinLoadBalancerTest.java @@ -1,5 +1,5 @@ /* - * Copyright © 2021 Apple Inc. and the ServiceTalk project authors + * Copyright © 2023 Apple Inc. and the ServiceTalk project authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -15,7 +15,7 @@ */ package io.servicetalk.loadbalancer; -public class EagerNewRoundRobinLoadBalancerTest extends EagerRoundRobinLoadBalancerTest { +class EagerNewRoundRobinLoadBalancerTest extends EagerRoundRobinLoadBalancerTest { @Override protected RoundRobinLoadBalancerBuilder baseLoadBalancerBuilder() { return ((RoundRobinLoadBalancerFactory.Builder) diff --git a/servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/LingeringNewRoundRobinLoadBalancerTest.java b/servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/LingeringNewRoundRobinLoadBalancerTest.java index 5c48df374f..baef7a8536 100644 --- a/servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/LingeringNewRoundRobinLoadBalancerTest.java +++ b/servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/LingeringNewRoundRobinLoadBalancerTest.java @@ -1,5 +1,5 @@ /* - * Copyright © 2021 Apple Inc. and the ServiceTalk project authors + * Copyright © 2023 Apple Inc. and the ServiceTalk project authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. From 75b1c1a5d00181fdc96cbc2292ae4259381e3e66 Mon Sep 17 00:00:00 2001 From: Bryce Anderson Date: Wed, 25 Oct 2023 12:00:19 -0600 Subject: [PATCH 3/4] Remove dead comment --- .../java/io/servicetalk/loadbalancer/RoundRobinLoadBalancer.java | 1 - 1 file changed, 1 deletion(-) diff --git a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancer.java b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancer.java index 4cebb04d80..3c17c9cf23 100644 --- a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancer.java +++ b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancer.java @@ -617,7 +617,6 @@ public Completable closeAsyncGracefully() { return asyncCloseable.closeAsyncGracefully(); } - // Visible for testing @Override public List>> usedAddresses() { return usedHosts.stream().map(Host::asEntry).collect(toList()); From 8954f47bf91cfa6ac32ddc19faf39cb129210bd8 Mon Sep 17 00:00:00 2001 From: Bryce Anderson Date: Wed, 25 Oct 2023 15:29:54 -0600 Subject: [PATCH 4/4] Make useNewRoundRobin package private, like it should have been. h/t Idel. --- .../loadbalancer/RoundRobinLoadBalancerFactory.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancerFactory.java b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancerFactory.java index 29a41719ab..f88d499689 100644 --- a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancerFactory.java +++ b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancerFactory.java @@ -154,8 +154,9 @@ public RoundRobinLoadBalancerFactory.Builder linearSearchSpa return this; } - // In the future we'll elevate this to the RoundRobinLoadBalancerBuilder interface. - public RoundRobinLoadBalancerBuilder useNewRoundRobin(boolean useNewRoundRobin) { + // In the future we may elevate this to the RoundRobinLoadBalancerBuilder interface or pick another + // route to transition to the new load balancer structure. + RoundRobinLoadBalancerBuilder useNewRoundRobin(boolean useNewRoundRobin) { this.useNewRoundRobin = useNewRoundRobin; return this; }