Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adaptive rate limiting for OpenSearch bulk requests #1011

Merged
merged 14 commits into from
Jan 13, 2025
2 changes: 2 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ val packagesToShade = Seq(
"com.fasterxml.jackson.core.**",
"com.fasterxml.jackson.dataformat.**",
"com.fasterxml.jackson.databind.**",
"com.google.**",
"com.sun.jna.**",
"com.thoughtworks.paranamer.**",
"javax.annotation.**",
Expand Down Expand Up @@ -121,6 +122,7 @@ lazy val flintCore = (project in file("flint-core"))
exclude ("org.apache.httpcomponents.client5", "httpclient5"),
"org.opensearch" % "opensearch-job-scheduler-spi" % opensearchMavenVersion,
"dev.failsafe" % "failsafe" % "3.3.2",
"com.google.guava" % "guava" % "33.3.1-jre",
"com.amazonaws" % "aws-java-sdk" % "1.12.397" % "provided"
exclude ("com.fasterxml.jackson.core", "jackson-databind"),
"com.amazonaws" % "aws-java-sdk-cloudwatch" % "1.12.593"
Expand Down
6 changes: 5 additions & 1 deletion docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,11 @@ In the index mapping, the `_meta` and `properties`field stores meta and schema i
- `spark.datasource.flint.write.batch_size`: "The number of documents written to Flint in a single batch request. Default value is Integer.MAX_VALUE.
- `spark.datasource.flint.write.batch_bytes`: The approximately amount of data in bytes written to Flint in a single batch request. The actual data write to OpenSearch may more than it. Default value is 1mb. The writing process checks after each document whether the total number of documents (docCount) has reached batch_size or the buffer size has surpassed batch_bytes. If either condition is met, the current batch is flushed and the document count resets to zero.
- `spark.datasource.flint.write.refresh_policy`: default value is false. valid values [NONE(false), IMMEDIATE(true), WAIT_UNTIL(wait_for)]
- `spark.datasource.flint.write.bulkRequestRateLimitPerNode`: [Experimental] Rate limit(request/sec) for bulk request per worker node. Only accept integer value. To reduce the traffic less than 1 req/sec, batch_bytes or batch_size should be reduced. Default value is 0, which disables rate limit.
- `spark.datasource.flint.write.bulk.rate_limit_per_node.enabled`: [Experimental] Enable rate limit for bulk request per worker node. Default is false.
- `spark.datasource.flint.write.bulk.rate_limit_per_node.min`: [Experimental] Lower limit (documents/sec) for adaptive rate limiting for bulk request per worker node, if rate limit enabled. The adaptive rate will not drop below this value. Must be greater than 0. Default is 5000.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure what is the main purpose of having min value? is there any concerns if the rate limiter goes down to zero incase of continuous failures?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For one, min value is used for initial rate limit.
Second is that rate could only increase if there's some successful request, so we can't have 0 rate limit, or else it wouldn't bounce back at all.
Having too small of a rate could also be troublesome, because that means the rate limiter will react slower. For example, say the rate limit becomes 10 docs per sec, however a single request consists of 1000 docs. Then once the 1000 docs request go through, there'll be 100 seconds where other requests couldn't go through (so it satisfy the 10 doc per sec rate). That's 100 seconds of not updating the rate.

- `spark.datasource.flint.write.bulk.rate_limit_per_node.max`: [Experimental] Upper limit (documents/sec) for adaptive rate limiting for bulk request per worker node, if rate limit enabled. The adaptive rate will not exceed this value. Set to -1 for no upper bound. Default is 50000.
- `spark.datasource.flint.write.bulk.rate_limit_per_node.increase_step`: [Experimental] Adaptive rate limit increase step for bulk request per worker node, if rate limit enabled. Must be greater than 0. Default is 500.
- `spark.datasource.flint.write.bulk.rate_limit_per_node.decrease_ratio`: [Experimental] Adaptive rate limit decrease ratio for bulk request per worker node, if rate limit enabled. Must be between 0 and 1. Default is 0.8.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason why we are using steps to increase and ratio to decrease? can this be consistent?

Copy link
Collaborator Author

@seankao-az seankao-az Jan 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this uses the idea of AIMD (additive-increase, multiplicative-decrease) algorithm for TCP congestion control.

Multiple flows using AIMD congestion control will eventually converge to an equal usage of a shared link

I'm not well versed in the mathematics behind this but AIAD (additive-increase additive-decrease), and MIMD (multiplicative-increase multiplicative-decrease) are both said not able to reach stability in dynamic systems

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

0.8 ratio looks too aggressive. Do you have some testing to explain why you chose 0.8?

Copy link
Collaborator Author

@seankao-az seankao-az Jan 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mainly because 0.8^3 ~= 0.5
and TCP congestion control typically uses a decrease rate of 0.5

So if a bulk request attempts 3 times and all fails, we cut the rate limit in half.

- `spark.datasource.flint.read.scroll_size`: default value is 100.
- `spark.datasource.flint.read.scroll_duration`: default value is 5 minutes. scroll context keep alive duration.
- `spark.datasource.flint.retry.max_retries`: max retries on failed HTTP request. default value is 3. Use 0 to disable retry.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,7 @@
import org.opensearch.client.transport.rest_client.RestClientTransport;

import java.io.IOException;
import org.opensearch.flint.core.storage.BulkRequestRateLimiter;
import org.opensearch.flint.core.storage.OpenSearchBulkRetryWrapper;
import org.opensearch.flint.core.storage.OpenSearchBulkWrapper;

import static org.opensearch.flint.core.metrics.MetricConstants.OS_BULK_OP_METRIC_PREFIX;
import static org.opensearch.flint.core.metrics.MetricConstants.OS_CREATE_OP_METRIC_PREFIX;
Expand All @@ -54,8 +53,7 @@
*/
public class RestHighLevelClientWrapper implements IRestHighLevelClient {
private final RestHighLevelClient client;
private final BulkRequestRateLimiter rateLimiter;
private final OpenSearchBulkRetryWrapper bulkRetryWrapper;
private final OpenSearchBulkWrapper bulkRetryWrapper;

private final static JacksonJsonpMapper JACKSON_MAPPER = new JacksonJsonpMapper();

Expand All @@ -64,22 +62,15 @@ public class RestHighLevelClientWrapper implements IRestHighLevelClient {
*
* @param client the RestHighLevelClient instance to wrap
*/
public RestHighLevelClientWrapper(RestHighLevelClient client, BulkRequestRateLimiter rateLimiter, OpenSearchBulkRetryWrapper bulkRetryWrapper) {
public RestHighLevelClientWrapper(RestHighLevelClient client, OpenSearchBulkWrapper bulkRetryWrapper) {
this.client = client;
this.rateLimiter = rateLimiter;
this.bulkRetryWrapper = bulkRetryWrapper;
}

@Override
public BulkResponse bulk(BulkRequest bulkRequest, RequestOptions options) throws IOException {
return execute(() -> {
try {
rateLimiter.acquirePermit();
return bulkRetryWrapper.bulkWithPartialRetry(client, bulkRequest, options);
} catch (InterruptedException e) {
throw new RuntimeException("rateLimiter.acquirePermit was interrupted.", e);
}
}, OS_WRITE_OP_METRIC_PREFIX, OS_BULK_OP_METRIC_PREFIX);
return execute(() -> bulkRetryWrapper.bulk(client, bulkRequest, options),
OS_WRITE_OP_METRIC_PREFIX, OS_BULK_OP_METRIC_PREFIX);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ public final class MetricConstants {
public static final String OPENSEARCH_BULK_RETRY_COUNT_METRIC = "opensearch.bulk.retry.count";
public static final String OPENSEARCH_BULK_ALL_RETRY_FAILED_COUNT_METRIC = "opensearch.bulk.allRetryFailed.count";

/**
* Metric name for opensearch bulk request rate limit
*/
public static final String OS_BULK_RATE_LIMIT_METRIC = "opensearch.bulk.rateLimit.documentsPerSecond.count";

/**
* Metric name for counting the errors encountered with Amazon S3 operations.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,17 @@ public class FlintOptions implements Serializable {

private static final String UNKNOWN = "UNKNOWN";

public static final String BULK_REQUEST_RATE_LIMIT_PER_NODE = "bulkRequestRateLimitPerNode";
public static final String DEFAULT_BULK_REQUEST_RATE_LIMIT_PER_NODE = "0";
public static final String BULK_REQUEST_RATE_LIMIT_PER_NODE_ENABLED = "write.bulk.rate_limit_per_node.enabled";
public static final String DEFAULT_BULK_REQUEST_RATE_LIMIT_PER_NODE_ENABLED = "false";
public static final String BULK_REQUEST_MIN_RATE_LIMIT_PER_NODE = "write.bulk.rate_limit_per_node.min";
public static final String DEFAULT_BULK_REQUEST_MIN_RATE_LIMIT_PER_NODE = "5000";
public static final String BULK_REQUEST_MAX_RATE_LIMIT_PER_NODE = "write.bulk.rate_limit_per_node.max";
public static final String DEFAULT_BULK_REQUEST_MAX_RATE_LIMIT_PER_NODE = "50000";
public static final String BULK_REQUEST_RATE_LIMIT_PER_NODE_INCREASE_STEP = "write.bulk.rate_limit_per_node.increase_step";
public static final String DEFAULT_BULK_REQUEST_RATE_LIMIT_PER_NODE_INCREASE_STEP = "500";
public static final String BULK_REQUEST_RATE_LIMIT_PER_NODE_DECREASE_RATIO = "write.bulk.rate_limit_per_node.decrease_ratio";
public static final String DEFAULT_BULK_REQUEST_RATE_LIMIT_PER_NODE_DECREASE_RATIO = "0.8";

public static final String DEFAULT_EXTERNAL_SCHEDULER_INTERVAL = "5 minutes";

public FlintOptions(Map<String, String> options) {
Expand Down Expand Up @@ -234,8 +243,24 @@ public boolean supportShard() {
DEFAULT_SUPPORT_SHARD);
}

public long getBulkRequestRateLimitPerNode() {
return Long.parseLong(options.getOrDefault(BULK_REQUEST_RATE_LIMIT_PER_NODE, DEFAULT_BULK_REQUEST_RATE_LIMIT_PER_NODE));
public boolean getBulkRequestRateLimitPerNodeEnabled() {
return Boolean.parseBoolean(options.getOrDefault(BULK_REQUEST_RATE_LIMIT_PER_NODE_ENABLED, DEFAULT_BULK_REQUEST_RATE_LIMIT_PER_NODE_ENABLED));
}

public long getBulkRequestMinRateLimitPerNode() {
return Long.parseLong(options.getOrDefault(BULK_REQUEST_MIN_RATE_LIMIT_PER_NODE, DEFAULT_BULK_REQUEST_MIN_RATE_LIMIT_PER_NODE));
}

public long getBulkRequestMaxRateLimitPerNode() {
return Long.parseLong(options.getOrDefault(BULK_REQUEST_MAX_RATE_LIMIT_PER_NODE, DEFAULT_BULK_REQUEST_MAX_RATE_LIMIT_PER_NODE));
}

public long getBulkRequestRateLimitPerNodeIncreaseStep() {
return Long.parseLong(options.getOrDefault(BULK_REQUEST_RATE_LIMIT_PER_NODE_INCREASE_STEP, DEFAULT_BULK_REQUEST_RATE_LIMIT_PER_NODE_INCREASE_STEP));
}

public double getBulkRequestRateLimitPerNodeDecreaseRatio() {
return Double.parseDouble(options.getOrDefault(BULK_REQUEST_RATE_LIMIT_PER_NODE_DECREASE_RATIO, DEFAULT_BULK_REQUEST_RATE_LIMIT_PER_NODE_DECREASE_RATIO));
}

public String getCustomAsyncQuerySchedulerClass() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,31 +5,16 @@

package org.opensearch.flint.core.storage;

import dev.failsafe.RateLimiter;
import java.time.Duration;
import java.util.logging.Logger;
import org.opensearch.flint.core.FlintOptions;
public interface BulkRequestRateLimiter {
void acquirePermit();

public class BulkRequestRateLimiter {
private static final Logger LOG = Logger.getLogger(BulkRequestRateLimiter.class.getName());
private RateLimiter<Void> rateLimiter;
void acquirePermit(int permits);

public BulkRequestRateLimiter(FlintOptions flintOptions) {
long bulkRequestRateLimitPerNode = flintOptions.getBulkRequestRateLimitPerNode();
if (bulkRequestRateLimitPerNode > 0) {
LOG.info("Setting rate limit for bulk request to " + bulkRequestRateLimitPerNode + "/sec");
this.rateLimiter = RateLimiter.<Void>smoothBuilder(
flintOptions.getBulkRequestRateLimitPerNode(),
Duration.ofSeconds(1)).build();
} else {
LOG.info("Rate limit for bulk request was not set.");
}
}
void increaseRate();

// Wait so it won't exceed rate limit. Does nothing if rate limit is not set.
public void acquirePermit() throws InterruptedException {
if (rateLimiter != null) {
this.rateLimiter.acquirePermit();
}
}
void decreaseRate();

long getRate();

void setRate(long permitsPerSecond);
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,11 @@ private BulkRequestRateLimiterHolder() {}
public synchronized static BulkRequestRateLimiter getBulkRequestRateLimiter(
FlintOptions flintOptions) {
if (instance == null) {
instance = new BulkRequestRateLimiter(flintOptions);
if (flintOptions.getBulkRequestRateLimitPerNodeEnabled()) {
instance = new BulkRequestRateLimiterImpl(flintOptions);
} else {
instance = new BulkRequestRateLimiterNoop();
}
}
return instance;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.core.storage;

import com.google.common.util.concurrent.RateLimiter;
import java.util.logging.Logger;
import org.opensearch.flint.core.FlintOptions;
import org.opensearch.flint.core.metrics.MetricConstants;
import org.opensearch.flint.core.metrics.MetricsUtil;

public class BulkRequestRateLimiterImpl implements BulkRequestRateLimiter {
private static final Logger LOG = Logger.getLogger(BulkRequestRateLimiterImpl.class.getName());
private RateLimiter rateLimiter;

private final long minRate;
private final long maxRate;
private final long increaseStep;
private final double decreaseRatio;

public BulkRequestRateLimiterImpl(FlintOptions flintOptions) {
minRate = flintOptions.getBulkRequestMinRateLimitPerNode();
maxRate = flintOptions.getBulkRequestMaxRateLimitPerNode();
increaseStep = flintOptions.getBulkRequestRateLimitPerNodeIncreaseStep();
decreaseRatio = flintOptions.getBulkRequestRateLimitPerNodeDecreaseRatio();

LOG.info("Setting rate limit for bulk request to " + minRate + " documents/sec");
this.rateLimiter = RateLimiter.create(minRate);
MetricsUtil.addHistoricGauge(MetricConstants.OS_BULK_RATE_LIMIT_METRIC, minRate);
}

// Wait so it won't exceed rate limit. Does nothing if rate limit is not set.
@Override
public void acquirePermit() {
this.rateLimiter.acquire();
LOG.info("Acquired 1 permit");
}

@Override
public void acquirePermit(int permits) {
this.rateLimiter.acquire(permits);
LOG.info("Acquired " + permits + " permits");
}

/**
* Increase rate limit additively.
*/
@Override
public void increaseRate() {
setRate(getRate() + increaseStep);
}

/**
* Decrease rate limit multiplicatively.
*/
@Override
public void decreaseRate() {
setRate((long) (getRate() * decreaseRatio));
}

@Override
public long getRate() {
return (long) this.rateLimiter.getRate();
}

/**
* Set rate limit to the given value, clamped by minRate and maxRate. Non-positive maxRate means
* there's no maximum rate restriction, and the rate can be set to any value greater than
* minRate.
*/
@Override
public void setRate(long permitsPerSecond) {
if (maxRate > 0) {
permitsPerSecond = Math.min(permitsPerSecond, maxRate);
}
permitsPerSecond = Math.max(minRate, permitsPerSecond);
LOG.info("Setting rate limit for bulk request to " + permitsPerSecond + " documents/sec");
this.rateLimiter.setRate(permitsPerSecond);
MetricsUtil.addHistoricGauge(MetricConstants.OS_BULK_RATE_LIMIT_METRIC, permitsPerSecond);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.core.storage;

import java.util.logging.Logger;

public class BulkRequestRateLimiterNoop implements BulkRequestRateLimiter {
private static final Logger LOG = Logger.getLogger(BulkRequestRateLimiterNoop.class.getName());

public BulkRequestRateLimiterNoop() {
LOG.info("Rate limit for bulk request was not set.");
}

@Override
public void acquirePermit() {}

@Override
public void acquirePermit(int permits) {}

@Override
public void increaseRate() {}

@Override
public void decreaseRate() {}

@Override
public long getRate() {
return 0;
}

@Override
public void setRate(long permitsPerSecond) {}
}

Original file line number Diff line number Diff line change
Expand Up @@ -25,25 +25,36 @@
import org.opensearch.flint.core.metrics.MetricsUtil;
import org.opensearch.rest.RestStatus;

public class OpenSearchBulkRetryWrapper {
/**
* Wrapper class for OpenSearch bulk API with retry and rate limiting capability.
*/
public class OpenSearchBulkWrapper {

private static final Logger LOG = Logger.getLogger(OpenSearchBulkRetryWrapper.class.getName());
private static final Logger LOG = Logger.getLogger(OpenSearchBulkWrapper.class.getName());

private final RetryPolicy<BulkResponse> retryPolicy;
private final BulkRequestRateLimiter rateLimiter;

public OpenSearchBulkRetryWrapper(FlintRetryOptions retryOptions) {
public OpenSearchBulkWrapper(FlintRetryOptions retryOptions, BulkRequestRateLimiter rateLimiter) {
this.retryPolicy = retryOptions.getBulkRetryPolicy(bulkItemRetryableResultPredicate);
this.rateLimiter = rateLimiter;
}

/**
* Delegate bulk request to the client, and retry the request if the response contains retryable
* failure. It won't retry when bulk call thrown exception.
* Bulk request with retry and rate limiting. Delegate bulk request to the client, and retry the
* request if the response contains retryable failure. It won't retry when bulk call thrown
* exception. In addition, adjust rate limit based on the responses.
* @param client used to call bulk API
* @param bulkRequest requests passed to bulk method
* @param options options passed to bulk method
* @return Last result
*/
public BulkResponse bulkWithPartialRetry(RestHighLevelClient client, BulkRequest bulkRequest,
public BulkResponse bulk(RestHighLevelClient client, BulkRequest bulkRequest, RequestOptions options) {
rateLimiter.acquirePermit(bulkRequest.requests().size());
return bulkWithPartialRetry(client, bulkRequest, options);
}

private BulkResponse bulkWithPartialRetry(RestHighLevelClient client, BulkRequest bulkRequest,
RequestOptions options) {
final AtomicInteger requestCount = new AtomicInteger(0);
try {
Expand All @@ -59,9 +70,14 @@ public BulkResponse bulkWithPartialRetry(RestHighLevelClient client, BulkRequest
.get(() -> {
requestCount.incrementAndGet();
BulkResponse response = client.bulk(nextRequest.get(), options);
if (retryPolicy.getConfig().allowsRetries() && bulkItemRetryableResultPredicate.test(
response)) {
nextRequest.set(getRetryableRequest(nextRequest.get(), response));

if (!bulkItemRetryableResultPredicate.test(response)) {
rateLimiter.increaseRate();
} else {
rateLimiter.decreaseRate();
if (retryPolicy.getConfig().allowsRetries()) {
nextRequest.set(getRetryableRequest(nextRequest.get(), response));
}
dai-chen marked this conversation as resolved.
Show resolved Hide resolved
}
return response;
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ public static RestHighLevelClient createRestHighLevelClient(FlintOptions options

public static IRestHighLevelClient createClient(FlintOptions options) {
return new RestHighLevelClientWrapper(createRestHighLevelClient(options),
BulkRequestRateLimiterHolder.getBulkRequestRateLimiter(options),
new OpenSearchBulkRetryWrapper(options.getRetryOptions()));
new OpenSearchBulkWrapper(options.getRetryOptions(),
BulkRequestRateLimiterHolder.getBulkRequestRateLimiter(options)));
}

/**
Expand Down
Loading
Loading