Skip to content

Commit

Permalink
GH-249 Addressing consistency related comments in the javadocs.
Browse files Browse the repository at this point in the history
  • Loading branch information
siddharthjain210 committed Dec 21, 2024
1 parent 99e4d2d commit 512fa18
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public class KplMessageHandler extends AbstractAwsMessageHandler<Void> implement

private long maxInFlightRecords = 0;

private int maxInFlightRecordsBackoffDuration = 100;
private int maxInFlightRecordsInitBackoffDuration = 100;

private int maxInFlightRecordsBackoffRate = 2;

Expand All @@ -126,42 +126,42 @@ public void setConverter(Converter<Object, byte[]> converter) {

/**
* Configure maximum records in flight on the KPL Native Process for handling backpressure. Used in conjuction
* with {@link KplMessageHandler#maxInFlightRecordsBackoffDuration}
* @param maxRecordsInFlight Defaulted to 0. Value of 0 indicates that Backpressure handling is not enabled.
* with {@link KplMessageHandler#maxInFlightRecordsInitBackoffDuration}
* @param maxInFlightRecords Defaulted to 0. Value of 0 indicates that Backpressure handling is not enabled.
* @since 3.0.9
* @see KplMessageHandler#setMaxInFlightRecordsBackoffDuration
* @see KplMessageHandler#setMaxInFlightRecordsInitBackoffDuration
* @see KplMessageHandler#setMaxInFlightRecordsBackoffRate
* @see KplMessageHandler#setMaxInFlightRecordsBackoffMaxAttempts
*/
public void setMaxRecordsInFlight(long maxRecordsInFlight) {
Assert.isTrue(maxRecordsInFlight > 0, "'maxRecordsInFlight must be greater than 0.");
this.maxInFlightRecords = maxRecordsInFlight;
public void setMaxInFlightRecords(long maxInFlightRecords) {
Assert.isTrue(maxInFlightRecords > 0, "'maxInFlightRecords must be greater than 0.");
this.maxInFlightRecords = maxInFlightRecords;
}

/**
* Configure a backoff duration period in milliseconds when the number of records in flight in KPL Native Process
* is greater than or equal to {@link KplMessageHandler#maxInFlightRecords}. The configuration helps in handling
* backpressure by sleeping the KPL Thread using exponential backoff. Enabled when
* {@link KplMessageHandler#maxInFlightRecords} is greater than 0.
* @param maxInFlightRecordsBackoffDuration Initial backoff duration in milliseconds. Default is 100ms.
* Configure a initial backoff duration period in milliseconds when the number of records in flight in KPL Native
* Process is greater than or equal to {@link KplMessageHandler#maxInFlightRecords}.
* The configuration helps in handling backpressure by sleeping the KPL Thread using exponential backoff. Enabled
* when {@link KplMessageHandler#maxInFlightRecords} is greater than 0.
* @param maxInFlightRecordsInitBackoffDuration Initial backoff duration in milliseconds. Default is 100ms.
* @since 3.0.9
* @see KplMessageHandler#setMaxRecordsInFlight
* @see KplMessageHandler#setMaxInFlightRecords
* @see KplMessageHandler#setMaxInFlightRecordsBackoffRate
* @see KplMessageHandler#setMaxInFlightRecordsBackoffMaxAttempts
*/
public void setMaxInFlightRecordsBackoffDuration(int maxInFlightRecordsBackoffDuration) {
Assert.isTrue(maxInFlightRecordsBackoffDuration > 0,
public void setMaxInFlightRecordsInitBackoffDuration(int maxInFlightRecordsInitBackoffDuration) {
Assert.isTrue(maxInFlightRecordsInitBackoffDuration > 0,
"'maxInFlightRecordsBackoffDuration must be greater than 0.");
this.maxInFlightRecordsBackoffDuration = maxInFlightRecordsBackoffDuration;
this.maxInFlightRecordsInitBackoffDuration = maxInFlightRecordsInitBackoffDuration;
}

/**
* Configure exponential back off rate when handling backpressure on the KPL Native process using
* {@link KplMessageHandler#maxInFlightRecords}.
* @param maxInFlightRecordsBackoffRate Exponential back off rate. Default is 2
* @since 3.0.9
* @see KplMessageHandler#setMaxRecordsInFlight
* @see KplMessageHandler#setMaxInFlightRecordsBackoffDuration
* @see KplMessageHandler#setMaxInFlightRecords
* @see KplMessageHandler#setMaxInFlightRecordsInitBackoffDuration
* @see KplMessageHandler#setMaxInFlightRecordsBackoffMaxAttempts
*/
public void setMaxInFlightRecordsBackoffRate(int maxInFlightRecordsBackoffRate) {
Expand All @@ -174,8 +174,8 @@ public void setMaxInFlightRecordsBackoffRate(int maxInFlightRecordsBackoffRate)
* @param maxInFlightRecordsBackoffMaxAttempts maximum of exponential retry attempts to waiting for capacity in KPL
* buffer.
* @since 3.0.9
* @see KplMessageHandler#setMaxRecordsInFlight
* @see KplMessageHandler#setMaxInFlightRecordsBackoffDuration
* @see KplMessageHandler#setMaxInFlightRecords
* @see KplMessageHandler#setMaxInFlightRecordsInitBackoffDuration
* @see KplMessageHandler#setMaxInFlightRecordsBackoffRate
*/
public void setMaxInFlightRecordsBackoffMaxAttempts(int maxInFlightRecordsBackoffMaxAttempts) {
Expand Down Expand Up @@ -448,15 +448,15 @@ private CompletableFuture<UserRecordResponse> handleUserRecord(UserRecord userRe
private void waitForCapacityInUserRecordsBuffer() {
var kplOutstandingRecordsCount = this.kinesisProducer.getOutstandingRecordsCount();
var attempts = 1;
var sleepDuration = this.maxInFlightRecordsBackoffDuration;
var sleepDuration = this.maxInFlightRecordsInitBackoffDuration;
while (kplOutstandingRecordsCount >= this.maxInFlightRecords &&
attempts <= this.maxInFlightRecordsBackoffMaxAttempts) {
try {
logger.info("Buffer size: {} has reached the max records limit of {}. Attempt# {}".formatted(
kplOutstandingRecordsCount, this.maxInFlightRecords));
logger.info("Buffer sleeping for {} ms".formatted(
this.maxInFlightRecordsBackoffDuration));
Thread.sleep(this.maxInFlightRecordsBackoffDuration);
this.maxInFlightRecordsInitBackoffDuration));
Thread.sleep(this.maxInFlightRecordsInitBackoffDuration);
}
catch (InterruptedException ex) {
Thread.currentThread().interrupt();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,8 @@ void testKPLMessageHandler_raw_payload_success() {
void testKPLMessageHandler_raw_payload_backpressure_capacity_available_Test() {
given(this.kinesisProducer.addUserRecord(any(UserRecord.class)))
.willReturn(mock(ListenableFuture.class));
this.kplMessageHandler.setMaxRecordsInFlight(2);
this.kplMessageHandler.setMaxInFlightRecordsBackoffDuration(100);
this.kplMessageHandler.setMaxInFlightRecords(2);
this.kplMessageHandler.setMaxInFlightRecordsInitBackoffDuration(100);
this.kplMessageHandler.setMaxInFlightRecordsBackoffMaxAttempts(2);
this.kplMessageHandler.setMaxInFlightRecordsBackoffRate(2);
given(this.kinesisProducer.getOutstandingRecordsCount())
Expand Down Expand Up @@ -122,8 +122,8 @@ void testKPLMessageHandler_raw_payload_backpressure_capacity_available_Test() {
void testKPLMessageHandler_raw_payload_backpressure_insuff_capacity_test() {
given(this.kinesisProducer.addUserRecord(any(UserRecord.class)))
.willReturn(mock(ListenableFuture.class));
this.kplMessageHandler.setMaxRecordsInFlight(2);
this.kplMessageHandler.setMaxInFlightRecordsBackoffDuration(100);
this.kplMessageHandler.setMaxInFlightRecords(2);
this.kplMessageHandler.setMaxInFlightRecordsInitBackoffDuration(100);
this.kplMessageHandler.setMaxInFlightRecordsBackoffMaxAttempts(2);
this.kplMessageHandler.setMaxInFlightRecordsBackoffRate(2);
given(this.kinesisProducer.getOutstandingRecordsCount())
Expand Down Expand Up @@ -156,8 +156,8 @@ void testKPLMessageHandler_raw_payload_backpressure_insuff_capacity_test() {
void testKPLMessageHandler_raw_payload_backpressure_failure_test() {
given(this.kinesisProducer.addUserRecord(any(UserRecord.class)))
.willReturn(mock(ListenableFuture.class));
this.kplMessageHandler.setMaxRecordsInFlight(2);
this.kplMessageHandler.setMaxInFlightRecordsBackoffDuration(100);
this.kplMessageHandler.setMaxInFlightRecords(2);
this.kplMessageHandler.setMaxInFlightRecordsInitBackoffDuration(100);
this.kplMessageHandler.setMaxInFlightRecordsBackoffMaxAttempts(2);
this.kplMessageHandler.setMaxInFlightRecordsBackoffRate(2);
given(this.kinesisProducer.getOutstandingRecordsCount())
Expand Down

0 comments on commit 512fa18

Please sign in to comment.