Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clone the RR load balancer in preparation for refactoring #2737

Merged
merged 4 commits into from
Oct 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Spliterator;
import java.util.concurrent.ThreadLocalRandom;
Expand Down Expand Up @@ -83,7 +84,7 @@
* @param <C> The type of connection.
*/
final class RoundRobinLoadBalancer<ResolvedAddress, C extends LoadBalancedConnection>
implements LoadBalancer<C> {
implements TestableLoadBalancer<ResolvedAddress, C> {

private static final Logger LOGGER = LoggerFactory.getLogger(RoundRobinLoadBalancer.class);

Expand Down Expand Up @@ -616,8 +617,8 @@ public Completable closeAsyncGracefully() {
return asyncCloseable.closeAsyncGracefully();
}

// Visible for testing
List<Entry<ResolvedAddress, List<C>>> usedAddresses() {
@Override
public List<Map.Entry<ResolvedAddress, List<C>>> usedAddresses() {
return usedHosts.stream().map(Host::asEntry).collect(toList());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,17 @@ public final class RoundRobinLoadBalancerFactory<ResolvedAddress, C extends Load

private final String id;
private final int linearSearchSpace;
private final boolean useNewRoundRobin;
@Nullable
private final HealthCheckConfig healthCheckConfig;

private RoundRobinLoadBalancerFactory(final String id,
final int linearSearchSpace,
final boolean useNewRoundRobin,
@Nullable final HealthCheckConfig healthCheckConfig) {
this.id = id;
this.linearSearchSpace = linearSearchSpace;
this.useNewRoundRobin = useNewRoundRobin;
this.healthCheckConfig = healthCheckConfig;
}

Expand All @@ -77,7 +80,10 @@ public <T extends C> LoadBalancer<T> newLoadBalancer(
final String targetResource,
final Publisher<? extends Collection<? extends ServiceDiscovererEvent<ResolvedAddress>>> eventPublisher,
final ConnectionFactory<ResolvedAddress, T> connectionFactory) {
return new RoundRobinLoadBalancer<>(id, targetResource, eventPublisher, connectionFactory,
return useNewRoundRobin ?
new NewRoundRobinLoadBalancer<>(id, targetResource, eventPublisher, connectionFactory,
linearSearchSpace, healthCheckConfig)
: new RoundRobinLoadBalancer<>(id, targetResource, eventPublisher, connectionFactory,
linearSearchSpace, healthCheckConfig);
}

Expand All @@ -86,7 +92,9 @@ public LoadBalancer<C> newLoadBalancer(
final Publisher<? extends Collection<? extends ServiceDiscovererEvent<ResolvedAddress>>> eventPublisher,
final ConnectionFactory<ResolvedAddress, C> connectionFactory,
final String targetResource) {
return new RoundRobinLoadBalancer<>(id, targetResource, eventPublisher, connectionFactory,
return useNewRoundRobin ? new NewRoundRobinLoadBalancer<>(id, targetResource, eventPublisher, connectionFactory,
linearSearchSpace, healthCheckConfig)
: new RoundRobinLoadBalancer<>(id, targetResource, eventPublisher, connectionFactory,
linearSearchSpace, healthCheckConfig);
}

Expand All @@ -109,6 +117,7 @@ public static final class Builder<ResolvedAddress, C extends LoadBalancedConnect
implements RoundRobinLoadBalancerBuilder<ResolvedAddress, C> {
private final String id;
private int linearSearchSpace = 16;
private boolean useNewRoundRobin;
@Nullable
private Executor backgroundExecutor;
private Duration healthCheckInterval = DEFAULT_HEALTH_CHECK_INTERVAL;
Expand Down Expand Up @@ -145,6 +154,13 @@ public RoundRobinLoadBalancerFactory.Builder<ResolvedAddress, C> linearSearchSpa
return this;
}

// In the future we may elevate this to the RoundRobinLoadBalancerBuilder interface or pick another
// route to transition to the new load balancer structure.
RoundRobinLoadBalancerBuilder<ResolvedAddress, C> useNewRoundRobin(boolean useNewRoundRobin) {
this.useNewRoundRobin = useNewRoundRobin;
return this;
}

@Override
public RoundRobinLoadBalancerFactory.Builder<ResolvedAddress, C> backgroundExecutor(
Executor backgroundExecutor) {
Expand Down Expand Up @@ -217,15 +233,14 @@ public RoundRobinLoadBalancerFactory.Builder<ResolvedAddress, C> healthCheckFail
@Override
public RoundRobinLoadBalancerFactory<ResolvedAddress, C> build() {
if (this.healthCheckFailedConnectionsThreshold < 0) {
return new RoundRobinLoadBalancerFactory<>(id, linearSearchSpace, null);
return new RoundRobinLoadBalancerFactory<>(id, linearSearchSpace, useNewRoundRobin, null);
}

HealthCheckConfig healthCheckConfig = new HealthCheckConfig(
this.backgroundExecutor == null ? SharedExecutor.getInstance() : this.backgroundExecutor,
healthCheckInterval, healthCheckJitter, healthCheckFailedConnectionsThreshold,
healthCheckResubscribeLowerBound, healthCheckResubscribeUpperBound);

return new RoundRobinLoadBalancerFactory<>(id, linearSearchSpace, healthCheckConfig);
return new RoundRobinLoadBalancerFactory<>(id, linearSearchSpace, useNewRoundRobin, healthCheckConfig);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright © 2023 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.servicetalk.loadbalancer;

import io.servicetalk.client.api.LoadBalancedConnection;
import io.servicetalk.client.api.LoadBalancer;

import java.util.List;
import java.util.Map;

// An intermediate interface so we can universally expose the current list of addresses. This should become
// unnecessary once we extract the logic of managing the host list from the load balancer itself into it's
// own abstraction.
interface TestableLoadBalancer<ResolvedAddress, C extends LoadBalancedConnection> extends LoadBalancer<C> {

List<Map.Entry<ResolvedAddress, List<C>>> usedAddresses();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright © 2023 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.servicetalk.loadbalancer;

class EagerNewRoundRobinLoadBalancerTest extends EagerRoundRobinLoadBalancerTest {
@Override
protected RoundRobinLoadBalancerBuilder<String, TestLoadBalancedConnection> baseLoadBalancerBuilder() {
return ((RoundRobinLoadBalancerFactory.Builder<String, TestLoadBalancedConnection>)
super.baseLoadBalancerBuilder()).useNewRoundRobin(true);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright © 2023 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.servicetalk.loadbalancer;

class LingeringNewRoundRobinLoadBalancerTest extends LingeringRoundRobinLoadBalancerTest {
@Override
protected RoundRobinLoadBalancerBuilder<String, TestLoadBalancedConnection> baseLoadBalancerBuilder() {
return ((RoundRobinLoadBalancerFactory.Builder<String, TestLoadBalancedConnection>)
super.baseLoadBalancerBuilder()).useNewRoundRobin(true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -149,9 +149,9 @@ abstract class RoundRobinLoadBalancerTest {
private DelegatingConnectionFactory connectionFactory =
new DelegatingConnectionFactory(this::newRealizedConnectionSingle);

RoundRobinLoadBalancer<String, TestLoadBalancedConnection> lb;
TestableLoadBalancer<String, TestLoadBalancedConnection> lb;

private TestExecutor testExecutor;
protected TestExecutor testExecutor;

static <T> Predicate<T> any() {
return __ -> true;
Expand All @@ -161,17 +161,21 @@ Predicate<TestLoadBalancedConnection> alwaysNewConnectionFilter() {
return cnx -> lb.usedAddresses().stream().noneMatch(addr -> addr.getValue().stream().anyMatch(cnx::equals));
}

RoundRobinLoadBalancer<String, TestLoadBalancedConnection> defaultLb() {
TestableLoadBalancer<String, TestLoadBalancedConnection> defaultLb() {
return newTestLoadBalancer();
}

RoundRobinLoadBalancer<String, TestLoadBalancedConnection> defaultLb(
TestableLoadBalancer<String, TestLoadBalancedConnection> defaultLb(
DelegatingConnectionFactory connectionFactory) {
return newTestLoadBalancer(serviceDiscoveryPublisher, connectionFactory);
}

protected abstract boolean eagerConnectionShutdown();

protected RoundRobinLoadBalancerBuilder<String, TestLoadBalancedConnection> baseLoadBalancerBuilder() {
return RoundRobinLoadBalancers.builder(getClass().getSimpleName());
}

@BeforeEach
void initialize() {
testExecutor = executor.executor();
Expand Down Expand Up @@ -507,8 +511,8 @@ void disabledHealthCheckDoesntRun() throws Exception {
"address-1", timeAdvancementsTillHealthy, properConnection);
final DelegatingConnectionFactory connectionFactory = unhealthyHostConnectionFactory.createFactory();

lb = (RoundRobinLoadBalancer<String, TestLoadBalancedConnection>)
RoundRobinLoadBalancers.<String, TestLoadBalancedConnection>builder(getClass().getSimpleName())
lb = (TestableLoadBalancer<String, TestLoadBalancedConnection>)
baseLoadBalancerBuilder()
.healthCheckFailedConnectionsThreshold(-1)
.build()
.newLoadBalancer(serviceDiscoveryPublisher, connectionFactory, "test-service");
Expand Down Expand Up @@ -604,8 +608,8 @@ void healthCheckRecoversFromUnexpectedError() throws Exception {
final DelegatingConnectionFactory connectionFactory = unhealthyHostConnectionFactory.createFactory();

final AtomicInteger scheduleCnt = new AtomicInteger();
lb = (RoundRobinLoadBalancer<String, TestLoadBalancedConnection>)
RoundRobinLoadBalancers.<String, TestLoadBalancedConnection>builder(getClass().getSimpleName())
lb = (TestableLoadBalancer<String, TestLoadBalancedConnection>)
baseLoadBalancerBuilder()
.healthCheckInterval(ofMillis(50), ofMillis(10))
.healthCheckFailedConnectionsThreshold(1)
.backgroundExecutor(new DelegatingExecutor(testExecutor) {
Expand Down Expand Up @@ -773,8 +777,8 @@ void resubscribeToEventsNotTriggeredWhenDisabled() throws Exception {

DelegatingConnectionFactory alwaysFailConnectionFactory =
new DelegatingConnectionFactory(address -> Single.failed(UNHEALTHY_HOST_EXCEPTION));
lb = (RoundRobinLoadBalancer<String, TestLoadBalancedConnection>)
RoundRobinLoadBalancers.<String, TestLoadBalancedConnection>builder(getClass().getSimpleName())
lb = (TestableLoadBalancer<String, TestLoadBalancedConnection>)
baseLoadBalancerBuilder()
.healthCheckInterval(ofMillis(50), ofMillis(10))
// Set resubscribe interval to very large number
.healthCheckResubscribeInterval(ofNanos(MAX_VALUE), ZERO)
Expand Down Expand Up @@ -805,8 +809,8 @@ void resubscribeToEventsNotTriggeredWhenDisabled() throws Exception {
void handleAllDiscoveryEvents() throws Exception {
serviceDiscoveryPublisher.onComplete();

lb = (RoundRobinLoadBalancer<String, TestLoadBalancedConnection>)
RoundRobinLoadBalancers.<String, TestLoadBalancedConnection>builder(getClass().getSimpleName())
lb = (TestableLoadBalancer<String, TestLoadBalancedConnection>)
baseLoadBalancerBuilder()
.healthCheckInterval(ofMillis(50), ofMillis(10))
.backgroundExecutor(testExecutor)
.build()
Expand Down Expand Up @@ -894,15 +898,15 @@ ServiceDiscovererEvent downEvent(final String address, ServiceDiscovererEvent.St
return new DefaultServiceDiscovererEvent<>(address, status);
}

RoundRobinLoadBalancer<String, TestLoadBalancedConnection> newTestLoadBalancer() {
TestableLoadBalancer<String, TestLoadBalancedConnection> newTestLoadBalancer() {
return newTestLoadBalancer(serviceDiscoveryPublisher, connectionFactory);
}

RoundRobinLoadBalancer<String, TestLoadBalancedConnection> newTestLoadBalancer(
TestableLoadBalancer<String, TestLoadBalancedConnection> newTestLoadBalancer(
final TestPublisher<Collection<ServiceDiscovererEvent<String>>> serviceDiscoveryPublisher,
final DelegatingConnectionFactory connectionFactory) {
return (RoundRobinLoadBalancer<String, TestLoadBalancedConnection>)
RoundRobinLoadBalancers.<String, TestLoadBalancedConnection>builder(getClass().getSimpleName())
return (TestableLoadBalancer<String, TestLoadBalancedConnection>)
baseLoadBalancerBuilder()
.healthCheckInterval(ofMillis(50), ofMillis(10))
.backgroundExecutor(testExecutor)
.build()
Expand Down
Loading