Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

loadbalancer: add builder for the DefaultLoadBalancer #2749

Merged
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
95991ad
WIP of using a sub-structure for configuring the LB policy
bryce-anderson Nov 9, 2023
feaba42
This may be closer to what Scott wanted, and still lets us do providers
bryce-anderson Nov 16, 2023
7274e13
See what adding a selector specific call could look like
bryce-anderson Nov 16, 2023
57f47f1
Merge branch 'main' into bl_anderson/lb_config_option2
bryce-anderson Nov 17, 2023
d97123f
Back to an interface for Provider purposes
bryce-anderson Nov 17, 2023
3776296
Rename NewRoundRobinLoadBalancer to DefaultLoadBalancer and get build…
bryce-anderson Nov 17, 2023
e904972
Finish some builds and reshape some tests
bryce-anderson Nov 17, 2023
2030c86
Add P2C tests
bryce-anderson Nov 17, 2023
35fbf21
Fix style problems
bryce-anderson Nov 17, 2023
4fda3c2
Merge branch 'main' into bl_anderson/lb_config_option2
bryce-anderson Nov 27, 2023
3c3a9e4
Some tidy ups
bryce-anderson Nov 27, 2023
3b0a608
Add some javadocs
bryce-anderson Nov 28, 2023
cd4a636
Fix up some tests
bryce-anderson Nov 29, 2023
f4b5887
Make tests work with p2c
bryce-anderson Nov 29, 2023
e818152
More docs
bryce-anderson Nov 29, 2023
2200404
Add maxEffort to the P2C name
bryce-anderson Nov 29, 2023
5393866
Thomas feedback
bryce-anderson Dec 4, 2023
cca43e6
Merge remote-tracking branch 'origin/main' into bl_anderson/lb_config…
bryce-anderson Dec 5, 2023
b3b0f42
Fixes for new pmd rules
bryce-anderson Dec 5, 2023
1beec8c
Idel feedback
bryce-anderson Dec 5, 2023
3e393f6
Make static instances for the both p2c and rr default lb policies
bryce-anderson Dec 5, 2023
b27acb7
Another javadoc
bryce-anderson Dec 6, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

import io.servicetalk.client.api.ConnectionFactory;
import io.servicetalk.client.api.LoadBalancedConnection;
import io.servicetalk.client.api.LoadBalancer;
import io.servicetalk.client.api.ServiceDiscovererEvent;
import io.servicetalk.concurrent.PublisherSource.Processor;
import io.servicetalk.concurrent.PublisherSource.Subscriber;
Expand Down Expand Up @@ -72,7 +71,7 @@
import static java.util.stream.Collectors.toList;

/**
* Consult {@link RoundRobinLoadBalancerFactory} for a description of this {@link LoadBalancer} type.
* The (new) default load balancer implementation.
*
* @param <ResolvedAddress> The resolved address type.
* @param <C> The type of connection.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
/*
* Copyright © 2023 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.servicetalk.loadbalancer;

import io.servicetalk.client.api.ConnectionFactory;
import io.servicetalk.client.api.LoadBalancedConnection;
import io.servicetalk.client.api.LoadBalancer;
import io.servicetalk.client.api.LoadBalancerFactory;
import io.servicetalk.client.api.ServiceDiscovererEvent;
import io.servicetalk.concurrent.api.Executor;
import io.servicetalk.concurrent.api.Publisher;

import java.time.Duration;
import java.util.Collection;
import javax.annotation.Nullable;

import static io.servicetalk.loadbalancer.HealthCheckConfig.DEFAULT_HEALTH_CHECK_FAILED_CONNECTIONS_THRESHOLD;
import static io.servicetalk.loadbalancer.HealthCheckConfig.DEFAULT_HEALTH_CHECK_INTERVAL;
import static io.servicetalk.loadbalancer.HealthCheckConfig.DEFAULT_HEALTH_CHECK_JITTER;
import static io.servicetalk.loadbalancer.HealthCheckConfig.DEFAULT_HEALTH_CHECK_RESUBSCRIBE_INTERVAL;
import static io.servicetalk.loadbalancer.HealthCheckConfig.validateHealthCheckIntervals;
import static java.util.Objects.requireNonNull;

final class DefaultLoadBalancerBuilder<ResolvedAddress, C extends LoadBalancedConnection>
implements LoadBalancerBuilder<ResolvedAddress, C> {

private static final int DEFAULT_LINEAR_SEARCH_SPACE = Integer.MAX_VALUE;
private static final LoadBalancingPolicy DEFAULT_LOAD_BALANCING_POLICY = LoadBalancingPolicies.roundRobin();

private final String id;
private LoadBalancingPolicy loadBalancingPolicy = DEFAULT_LOAD_BALANCING_POLICY;
private int linearSearchSpace = DEFAULT_LINEAR_SEARCH_SPACE;

@Nullable
private Executor backgroundExecutor;
private Duration healthCheckInterval = DEFAULT_HEALTH_CHECK_INTERVAL;
private Duration healthCheckJitter = DEFAULT_HEALTH_CHECK_JITTER;
private int healthCheckFailedConnectionsThreshold = DEFAULT_HEALTH_CHECK_FAILED_CONNECTIONS_THRESHOLD;
private Duration healthCheckResubscribeInterval = DEFAULT_HEALTH_CHECK_RESUBSCRIBE_INTERVAL;
private Duration healthCheckResubscribeJitter = DEFAULT_HEALTH_CHECK_JITTER;

// package private constructor so users must funnel through providers in `LoadBalancers`
DefaultLoadBalancerBuilder(final String id) {
this.id = requireNonNull(id, "id");
}

@Override
public LoadBalancerBuilder<ResolvedAddress, C> linearSearchSpace(int linearSearchSpace) {
if (linearSearchSpace <= 0) {
throw new IllegalArgumentException("Illegal linear search space: " + linearSearchSpace +
".Search space must be a positive number.");
}
this.linearSearchSpace = linearSearchSpace;
return this;
}

@Override
public LoadBalancerBuilder<ResolvedAddress, C> loadBalancingPolicy(LoadBalancingPolicy loadBalancingPolicy) {
this.loadBalancingPolicy = requireNonNull(loadBalancingPolicy, "loadBalancingPolicy");
return this;
}

public LoadBalancerBuilder<ResolvedAddress, C> backgroundExecutor(Executor backgroundExecutor) {
this.backgroundExecutor = new NormalizedTimeSourceExecutor(backgroundExecutor);
return this;
}

@Override
public LoadBalancerBuilder<ResolvedAddress, C> healthCheckInterval(Duration interval, Duration jitter) {
validateHealthCheckIntervals(interval, jitter);
this.healthCheckInterval = interval;
this.healthCheckJitter = jitter;
return this;
}

@Override
public LoadBalancerBuilder<ResolvedAddress, C> healthCheckResubscribeInterval(
Duration interval, Duration jitter) {
validateHealthCheckIntervals(interval, jitter);
this.healthCheckResubscribeInterval = interval;
this.healthCheckResubscribeJitter = jitter;
return this;
}

@Override
public LoadBalancerBuilder<ResolvedAddress, C> healthCheckFailedConnectionsThreshold(
int threshold) {
if (threshold == 0) {
throw new IllegalArgumentException("Health check failed connections threshold should not be 0");
}
this.healthCheckFailedConnectionsThreshold = threshold;
return this;
}

public LoadBalancerFactory<ResolvedAddress, C> build() {
final HealthCheckConfig healthCheckConfig;
if (this.healthCheckFailedConnectionsThreshold < 0) {
healthCheckConfig = null;
} else {
healthCheckConfig = new HealthCheckConfig(this.backgroundExecutor == null ?
RoundRobinLoadBalancerFactory.SharedExecutor.getInstance() : this.backgroundExecutor,
healthCheckInterval, healthCheckJitter, healthCheckFailedConnectionsThreshold,
healthCheckResubscribeInterval, healthCheckResubscribeJitter);
}
return new DefaultLoadBalancerFactory<>(id, loadBalancingPolicy, linearSearchSpace, healthCheckConfig);
}

private static final class DefaultLoadBalancerFactory<ResolvedAddress, C extends LoadBalancedConnection>
implements LoadBalancerFactory<ResolvedAddress, C> {

private final String id;
private final LoadBalancingPolicy loadBalancingPolicy;
private final int linearSearchSpace;
@Nullable
private final HealthCheckConfig healthCheckConfig;

DefaultLoadBalancerFactory(final String id, final LoadBalancingPolicy loadBalancingPolicy,
final int linearSearchSpace, final HealthCheckConfig healthCheckConfig) {
this.id = requireNonNull(id, "id");
this.loadBalancingPolicy = requireNonNull(loadBalancingPolicy, "loadBalancingPolicy");
this.linearSearchSpace = linearSearchSpace;
this.healthCheckConfig = healthCheckConfig;
}

@Override
public <T extends C> LoadBalancer<T> newLoadBalancer(String targetResource,
Publisher<? extends Collection<? extends ServiceDiscovererEvent<ResolvedAddress>>> eventPublisher,
ConnectionFactory<ResolvedAddress, T> connectionFactory) {
return new DefaultLoadBalancer<>(id, targetResource, eventPublisher,
loadBalancingPolicy.buildSelector(targetResource), connectionFactory, linearSearchSpace,
healthCheckConfig);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/*
* Copyright © 2023 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.servicetalk.loadbalancer;

import io.servicetalk.client.api.LoadBalancedConnection;
import io.servicetalk.client.api.LoadBalancer;
import io.servicetalk.client.api.LoadBalancerFactory;
import io.servicetalk.client.api.ServiceDiscoverer;
import io.servicetalk.concurrent.api.Executor;

import java.time.Duration;

interface LoadBalancerBuilder<ResolvedAddress, C extends LoadBalancedConnection> {
bryce-anderson marked this conversation as resolved.
Show resolved Hide resolved
/**
* Set the {@code loadBalancingPolicy} to use with this load balancer.
* @param loadBalancingPolicy the policy to use
* @return {@code this}
*/
LoadBalancerBuilder<ResolvedAddress, C> loadBalancingPolicy(LoadBalancingPolicy loadBalancingPolicy);

/**
* This {@link LoadBalancer} may monitor hosts to which connection establishment has failed
* using health checks that run in the background. The health check tries to establish a new connection
* and if it succeeds, the host is returned to the load balancing pool. As long as the connection
* establishment fails, the host is not considered for opening new connections for processed requests.
* If an {@link Executor} is not provided using this method, a default shared instance is used
* for all {@link LoadBalancer LoadBalancers} created by this factory.
* <p>
* {@link #healthCheckFailedConnectionsThreshold(int)} can be used to disable this mechanism and always
* consider all hosts for establishing new connections.
*
* @param backgroundExecutor {@link Executor} on which to schedule health checking.
* @return {@code this}.
* @see #healthCheckFailedConnectionsThreshold(int)
*/
LoadBalancerBuilder<ResolvedAddress, C> backgroundExecutor(Executor backgroundExecutor);

/**
* Sets the linear search space to find an available connection for the next host.
* <p>
* When the next host has already opened connections, this {@link LoadBalancer} will perform a linear search for
* a connection that can serve the next request up to a specified number of attempts. If there are more open
* connections, selection of remaining connections will be attempted randomly.
* <p>
* Higher linear search space may help to better identify excess connections in highly concurrent environments,
* but may result in slightly increased selection time.
*
* @param linearSearchSpace the number of attempts for a linear search space, {@code 0} enforces random
* selection all the time.
* @return {@code this}.
*/
LoadBalancerBuilder<ResolvedAddress, C> linearSearchSpace(int linearSearchSpace);
idelpivnitskiy marked this conversation as resolved.
Show resolved Hide resolved

// TODO: these healthCheck* methods should be moved into their own OutlierDetection configuration instance
// and much like the LoadBalancingPolicy, we should be able to add `OutlierDetectionPolicy`s
/**
* Configure an interval for health checking a host that failed to open connections. If no interval is provided
* using this method, a default value will be used.
* <p>
* {@link #healthCheckFailedConnectionsThreshold(int)} can be used to disable the health checking mechanism
* and always consider all hosts for establishing new connections.
*
* @param interval interval at which a background health check will be scheduled.
* @param jitter the amount of jitter to apply to each retry {@code interval}.
* @return {@code this}.
* @see #healthCheckFailedConnectionsThreshold(int)
*/
LoadBalancerBuilder<ResolvedAddress, C> healthCheckInterval(Duration interval, Duration jitter);

/**
* Configure an interval for re-subscribing to the original events stream in case all existing hosts become
* unhealthy.
* <p>
* In situations when there is a latency between {@link ServiceDiscoverer} propagating the updated state and all
* known hosts become unhealthy, which could happen due to intermediate caching layers, re-subscribe to the
* events stream can help to exit from a dead state.
* <p>
* {@link #healthCheckFailedConnectionsThreshold(int)} can be used to disable the health checking mechanism
* and always consider all hosts for establishing new connections.
*
* @param interval interval at which re-subscribes will be scheduled.
* @param jitter the amount of jitter to apply to each re-subscribe {@code interval}.
* @return {@code this}.
* @see #healthCheckFailedConnectionsThreshold(int)
*/
LoadBalancerBuilder<ResolvedAddress, C> healthCheckResubscribeInterval(Duration interval, Duration jitter);

/**
* Configure a threshold for consecutive connection failures to a host. When the {@link LoadBalancer}
* consecutively fails to open connections in the amount greater or equal to the specified value,
* the host will be marked as unhealthy and connection establishment will take place in the background
* repeatedly until a connection is established. During that time, the host will not take part in
* load balancing selection.
* <p>
* Use a negative value of the argument to disable health checking.
*
* @param threshold number of consecutive connection failures to consider a host unhealthy and eligible for
* background health checking. Use negative value to disable the health checking mechanism.
* @return {@code this}.
* @see #backgroundExecutor(Executor)
*/
LoadBalancerBuilder<ResolvedAddress, C> healthCheckFailedConnectionsThreshold(int threshold);
bryce-anderson marked this conversation as resolved.
Show resolved Hide resolved

/**
* Builds the {@link LoadBalancerFactory} configured by this builder.
*
* @return a new instance of {@link LoadBalancerFactory} with settings from this builder.
*/
LoadBalancerFactory<ResolvedAddress, C> build();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright © 2023 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.servicetalk.loadbalancer;

import io.servicetalk.client.api.LoadBalancedConnection;

// TODO: this has to be public for the service loading to work.
interface LoadBalancerBuilderProvider {
<ResolvedAddress, C extends LoadBalancedConnection> LoadBalancerBuilder<ResolvedAddress, C>
newBuilder(String id, LoadBalancerBuilder<ResolvedAddress, C> builder);
bryce-anderson marked this conversation as resolved.
Show resolved Hide resolved
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright © 2023 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.servicetalk.loadbalancer;

import io.servicetalk.client.api.LoadBalancedConnection;

import java.util.ArrayList;
import java.util.List;

// TODO: docs
final class LoadBalancers {

private static final List<LoadBalancerBuilderProvider> PROVIDERS;

static {
// TODO: we can't service load the providers until we make the interface public.
PROVIDERS = new ArrayList<>();
}

private LoadBalancers() {
// no instances.
}

private static <ResolvedAddress, C extends LoadBalancedConnection>
LoadBalancerBuilder<ResolvedAddress, C> applyProviders(String id, LoadBalancerBuilder<ResolvedAddress, C> builder) {
for (LoadBalancerBuilderProvider provider : PROVIDERS) {
builder = provider.newBuilder(id, builder);
}
return builder;
}

/**
* A new {@link LoadBalancerBuilder} instance.
* <p>
* The returned builder can be customized using {@link LoadBalancerBuilderProvider}.
*
* @param id a (unique) ID to identify the created {@link LoadBalancerBuilder}.
* @param <ResolvedAddress> The resolved address type.
* @param <C> The type of connection.
* @return a new {@link LoadBalancerBuilder}.
*/
public static <ResolvedAddress, C extends LoadBalancedConnection>
LoadBalancerBuilder<ResolvedAddress, C> builder(final String id) {
return applyProviders(id, new DefaultLoadBalancerBuilder<>(id));
}
}
Loading
Loading