Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adaptive rate limiting for OpenSearch bulk requests #1011

Open
wants to merge 14 commits into
base: main
Choose a base branch
from

Conversation

seankao-az
Copy link
Collaborator

@seankao-az seankao-az commented Jan 8, 2025

Description

Add (client side) adaptive rate limiting for OpenSearch bulk requests. For each successful bulk request, rate limit will increase linearly; for each failure, rate limit will decrease exponentially.

Screenshot 2025-01-02 at 5 20 57 PM
Each worker node adaptively adjust their own rate limit based on responses they get from the OpenSearch cluster. They update their rate limit each time they receive a response.

Retry requests will not be rate limited. They will go through regardless of the rate limit. Only new request will be blocked by rate limiter. An alternative is to have retries also be rate limited. However, this will result in slowing down retries and increasing overall response time, since we do not ensure fairness or priority. Retries could be delayed due to new requests consuming the available rate.

Several new spark properties are added:

  • spark.datasource.flint.write.bulk.rate_limit_per_node.enabled: to enable the feature
  • spark.datasource.flint.write.bulk.rate_limit_per_node.min: min rate limit (documents/sec)
  • spark.datasource.flint.write.bulk.rate_limit_per_node.max: max rate limit (documents/sec)
  • spark.datasource.flint.write.bulk.rate_limit_per_node.increase_step: rate limit increase step for successful bulk request
  • spark.datasource.flint.write.bulk.rate_limit_per_node.decrease_ratio: rate limit decrease ratio for failed bulk request

This PR also introduces a new (shaded) dependency:

"com.google.guava" % "guava" % "33.3.1-jre"

Test

Screenshot 2025-01-09 at 5 28 45 PM

25/01/10 00:34:35 INFO BulkRequestRateLimiter: Setting rate limit for bulk request to 17396 documents/sec
25/01/10 00:34:35 INFO BulkRequestRateLimiter: Acquired 5068 permits
25/01/10 00:34:35 INFO HttpStatusCodeResultPredicate: Checking if status code is retryable: 200
25/01/10 00:34:35 INFO HttpStatusCodeResultPredicate: Status code 200 check result: false
25/01/10 00:34:35 INFO HttpAOSSResultPredicate: Checking if response is retryable
25/01/10 00:34:35 INFO HttpAOSSResultPredicate: Status code 200 is not 400. Check result: false
25/01/10 00:34:35 INFO OpenSearchBulkWrapper: Found retryable failure in the bulk response
25/01/10 00:34:35 INFO BulkRequestRateLimiter: Setting rate limit for bulk request to 13916 documents/sec
25/01/10 00:34:35 INFO OpenSearchBulkWrapper: Added 5068 requests to nextRequest
25/01/10 00:34:35 INFO OpenSearchBulkWrapper: Found retryable failure in the bulk response
25/01/10 00:34:35 ERROR FlintRetryOptions: Attempt to execute request failed: ExecutionAttemptedEvent[result=org.opensearch.action.bulk.BulkResponse@58f820d3, exception=null]
25/01/10 00:34:37 INFO HttpStatusCodeResultPredicate: Checking if status code is retryable: 200
25/01/10 00:34:37 INFO HttpStatusCodeResultPredicate: Status code 200 check result: false
25/01/10 00:34:37 INFO HttpAOSSResultPredicate: Checking if response is retryable
25/01/10 00:34:37 INFO HttpAOSSResultPredicate: Status code 200 is not 400. Check result: false
25/01/10 00:34:37 INFO BulkRequestRateLimiter: Setting rate limit for bulk request to 14415 documents/sec
25/01/10 00:34:37 INFO BulkRequestRateLimiter: Acquired 5261 permits
25/01/10 00:34:38 INFO HttpStatusCodeResultPredicate: Checking if status code is retryable: 200
25/01/10 00:34:38 INFO HttpStatusCodeResultPredicate: Status code 200 check result: false
25/01/10 00:34:38 INFO HttpAOSSResultPredicate: Checking if response is retryable
25/01/10 00:34:38 INFO HttpAOSSResultPredicate: Status code 200 is not 400. Check result: false
25/01/10 00:34:38 INFO OpenSearchBulkWrapper: Found retryable failure in the bulk response
25/01/10 00:34:38 INFO BulkRequestRateLimiter: Setting rate limit for bulk request to 11532 documents/sec
25/01/10 00:34:38 INFO OpenSearchBulkWrapper: Added 2688 requests to nextRequest
25/01/10 00:34:38 INFO OpenSearchBulkWrapper: Found retryable failure in the bulk response
25/01/10 00:34:38 ERROR FlintRetryOptions: Attempt to execute request failed: ExecutionAttemptedEvent[result=org.opensearch.action.bulk.BulkResponse@4d3d78e3, exception=null]
25/01/10 00:34:39 WARN FlintRetryOptions: Retrying failed request at #1
25/01/10 00:34:39 INFO HttpStatusCodeResultPredicate: Checking if status code is retryable: 200
25/01/10 00:34:39 INFO HttpStatusCodeResultPredicate: Status code 200 check result: false
25/01/10 00:34:39 INFO HttpAOSSResultPredicate: Checking if response is retryable
25/01/10 00:34:39 INFO HttpAOSSResultPredicate: Status code 200 is not 400. Check result: false
25/01/10 00:34:39 INFO BulkRequestRateLimiter: Setting rate limit for bulk request to 12032 documents/sec
25/01/10 00:34:40 INFO BulkRequestRateLimiter: Acquired 5459 permits

Next Steps

The rate limit metric would be more useful if they can have more granular dimensions so we can see rate limit per node, or per OpenSearch domain/collection

Related Issues

- replace fail safe rate limiter for google guava's
- move rate limiter from RestHighLevelClientWrapper to
  OpenSearchBulkRetryWrapper
- add metrics for rate limit (now convert rate from double to int)
- add spark conf for rate limit parameters
- adjust rate limit based on retryable result percentage

Signed-off-by: Sean Kao <[email protected]>
Signed-off-by: Sean Kao <[email protected]>
@seankao-az seankao-az force-pushed the adaptive-rate-control branch 4 times, most recently from 55f6c83 to beca67f Compare January 9, 2025 00:57
@seankao-az seankao-az force-pushed the adaptive-rate-control branch from beca67f to c0865ce Compare January 9, 2025 01:01
@seankao-az seankao-az force-pushed the adaptive-rate-control branch from 07b5173 to 43424d7 Compare January 9, 2025 17:36
@seankao-az seankao-az marked this pull request as ready for review January 9, 2025 19:01
@seankao-az seankao-az force-pushed the adaptive-rate-control branch from c20dbf8 to b6e9b04 Compare January 9, 2025 19:06
@seankao-az seankao-az force-pushed the adaptive-rate-control branch from b6e9b04 to d55bb13 Compare January 9, 2025 19:11
Signed-off-by: Sean Kao <[email protected]>
@seankao-az seankao-az force-pushed the adaptive-rate-control branch from 72e8996 to 7c2c734 Compare January 10, 2025 00:41
Signed-off-by: Sean Kao <[email protected]>
@seankao-az seankao-az force-pushed the adaptive-rate-control branch from 7c2c734 to fa2a4c3 Compare January 10, 2025 01:41
@seankao-az seankao-az requested a review from ykmr1224 January 10, 2025 01:42
@@ -535,7 +535,11 @@ In the index mapping, the `_meta` and `properties`field stores meta and schema i
- `spark.datasource.flint.write.batch_size`: "The number of documents written to Flint in a single batch request. Default value is Integer.MAX_VALUE.
- `spark.datasource.flint.write.batch_bytes`: The approximately amount of data in bytes written to Flint in a single batch request. The actual data write to OpenSearch may more than it. Default value is 1mb. The writing process checks after each document whether the total number of documents (docCount) has reached batch_size or the buffer size has surpassed batch_bytes. If either condition is met, the current batch is flushed and the document count resets to zero.
- `spark.datasource.flint.write.refresh_policy`: default value is false. valid values [NONE(false), IMMEDIATE(true), WAIT_UNTIL(wait_for)]
- `spark.datasource.flint.write.bulkRequestRateLimitPerNode`: [Experimental] Rate limit(request/sec) for bulk request per worker node. Only accept integer value. To reduce the traffic less than 1 req/sec, batch_bytes or batch_size should be reduced. Default value is 0, which disables rate limit.
- `spark.datasource.flint.write.bulk.rate_limit_per_node.enabled`: [Experimental] Enable rate limit for bulk request per worker node. Default is false.
- `spark.datasource.flint.write.bulk.rate_limit_per_node.min`: [Experimental] Lower limit (documents/sec) for adaptive rate limiting for bulk request per worker node, if rate limit enabled. The adaptive rate will not drop below this value. Must be greater than 0. Default is 5000.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure what is the main purpose of having min value? is there any concerns if the rate limiter goes down to zero incase of continuous failures?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For one, min value is used for initial rate limit.
Second is that rate could only increase if there's some successful request, so we can't have 0 rate limit, or else it wouldn't bounce back at all.
Having too small of a rate could also be troublesome, because that means the rate limiter will react slower. For example, say the rate limit becomes 10 docs per sec, however a single request consists of 1000 docs. Then once the 1000 docs request go through, there'll be 100 seconds where other requests couldn't go through (so it satisfy the 10 doc per sec rate). That's 100 seconds of not updating the rate.

- `spark.datasource.flint.write.bulk.rate_limit_per_node.min`: [Experimental] Lower limit (documents/sec) for adaptive rate limiting for bulk request per worker node, if rate limit enabled. The adaptive rate will not drop below this value. Must be greater than 0. Default is 5000.
- `spark.datasource.flint.write.bulk.rate_limit_per_node.max`: [Experimental] Upper limit (documents/sec) for adaptive rate limiting for bulk request per worker node, if rate limit enabled. The adaptive rate will not exceed this value. Set to -1 for no upper bound. Default is 50000.
- `spark.datasource.flint.write.bulk.rate_limit_per_node.increase_step`: [Experimental] Adaptive rate limit increase step for bulk request per worker node, if rate limit enabled. Must be greater than 0. Default is 500.
- `spark.datasource.flint.write.bulk.rate_limit_per_node.decrease_ratio`: [Experimental] Adaptive rate limit decrease ratio for bulk request per worker node, if rate limit enabled. Must be between 0 and 1. Default is 0.8.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason why we are using steps to increase and ratio to decrease? can this be consistent?

Copy link
Collaborator Author

@seankao-az seankao-az Jan 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this uses the idea of AIMD (additive-increase, multiplicative-decrease) algorithm for TCP congestion control.

Multiple flows using AIMD congestion control will eventually converge to an equal usage of a shared link

I'm not well versed in the mathematics behind this but AIAD (additive-increase additive-decrease), and MIMD (multiplicative-increase multiplicative-decrease) are both said not able to reach stability in dynamic systems

- `spark.datasource.flint.write.bulk.rate_limit_per_node.min`: [Experimental] Lower limit (documents/sec) for adaptive rate limiting for bulk request per worker node, if rate limit enabled. The adaptive rate will not drop below this value. Must be greater than 0. Default is 5000.
- `spark.datasource.flint.write.bulk.rate_limit_per_node.max`: [Experimental] Upper limit (documents/sec) for adaptive rate limiting for bulk request per worker node, if rate limit enabled. The adaptive rate will not exceed this value. Set to -1 for no upper bound. Default is 50000.
- `spark.datasource.flint.write.bulk.rate_limit_per_node.increase_step`: [Experimental] Adaptive rate limit increase step for bulk request per worker node, if rate limit enabled. Must be greater than 0. Default is 500.
- `spark.datasource.flint.write.bulk.rate_limit_per_node.decrease_ratio`: [Experimental] Adaptive rate limit decrease ratio for bulk request per worker node, if rate limit enabled. Must be between 0 and 1. Default is 0.8.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

0.8 ratio looks too aggressive. Do you have some testing to explain why you chose 0.8?

Copy link
Collaborator Author

@seankao-az seankao-az Jan 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mainly because 0.8^3 ~= 0.5
and TCP congestion control typically uses a decrease rate of 0.5

So if a bulk request attempts 3 times and all fails, we cut the rate limit in half.

Comment on lines 71 to 73
/**
* Rate getter and setter are public for testing purpose
*/
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we replace it with @VisibleForTest? (same for setRate)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

making an interface class. leaving the getter and setter as just part of the interface. restricting it for test doesn't seem necessary

- swap parameter for test case asserts
- remove excessive null check (create noop impl for rate limiter)

Signed-off-by: Sean Kao <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants