From 377dbe6e6c2df30bb7da8c5a71fd3bd36d477534 Mon Sep 17 00:00:00 2001 From: Siddharth Jain Date: Mon, 9 Dec 2024 00:23:26 +0530 Subject: [PATCH 01/20] GH-249 Added properties for handling backpressure in KplMessageHandler. Added Unit Tests. --- .../aws/outbound/KplMessageHandler.java | 36 +++++ .../aws/outbound/KplMessageHandlerTest.java | 142 ++++++++++++++++++ 2 files changed, 178 insertions(+) create mode 100644 src/test/java/org/springframework/integration/aws/outbound/KplMessageHandlerTest.java diff --git a/src/main/java/org/springframework/integration/aws/outbound/KplMessageHandler.java b/src/main/java/org/springframework/integration/aws/outbound/KplMessageHandler.java index 1b2fe6b8..04b11027 100644 --- a/src/main/java/org/springframework/integration/aws/outbound/KplMessageHandler.java +++ b/src/main/java/org/springframework/integration/aws/outbound/KplMessageHandler.java @@ -68,6 +68,7 @@ * * @author Arnaud Lecollaire * @author Artem Bilan + * @author Siddharth Jain * * @since 2.2 * @@ -99,6 +100,10 @@ public class KplMessageHandler extends AbstractAwsMessageHandler implement private volatile ScheduledFuture flushFuture; + private long maxRecordsInFlight = 0; + + private int maxRecordInFlightsSleepDurationInMillis = 100; + public KplMessageHandler(KinesisProducer kinesisProducer) { Assert.notNull(kinesisProducer, "'kinesisProducer' must not be null."); this.kinesisProducer = kinesisProducer; @@ -115,8 +120,29 @@ public void setConverter(Converter converter) { setMessageConverter(new ConvertingFromMessageConverter(converter)); } + /** + * When in KPL mode, the setting allows handling backpressure on the KPL native process. Setting this value would enable a sleep on the KPL Thread for the specified number of milliseconds defined in maxRecordInFlightsSleepDurationInMillis. + * + * @param maxRecordsInFlight Defaulted to 0. Value of 0 indicates that Backpressure handling is not enabled. Specify a positive value to enable back pressure. + */ + public void setMaxOutstandingRecordsInFlight(long maxRecordsInFlight) { + Assert.isTrue(maxRecordsInFlight > 0, "'maxRecordsInFlight must be greater than 0."); + this.maxRecordsInFlight = maxRecordsInFlight; + } + + /** + * The setting allows handling backpressure on the KPL native process. Enabled when maxOutstandingRecordsCount is greater than 0. The configurations puts the KPL Thread to sleep for the specified number of milliseconds. + * + * @param maxRecordInFlightsSleepDurationInMillis Default is 100ms. + */ + public void setMaxRecordInFlightsSleepDurationInMillis(int maxRecordInFlightsSleepDurationInMillis) { + Assert.isTrue(maxRecordInFlightsSleepDurationInMillis > 0, "'maxRecordInFlightsSleepDurationInMillis must be greater than 0."); + this.maxRecordInFlightsSleepDurationInMillis = maxRecordInFlightsSleepDurationInMillis; + } + /** * Configure a {@link MessageConverter} for converting payload to {@code byte[]} for Kinesis record. + * * @param messageConverter the {@link MessageConverter} to use. * @since 2.3 */ @@ -368,6 +394,16 @@ private void setGlueSchemaIntoUserRecordIfAny(UserRecord userRecord, Message } private CompletableFuture handleUserRecord(UserRecord userRecord) { + if (this.maxRecordsInFlight != -1 && this.kinesisProducer.getOutstandingRecordsCount() > this.maxRecordsInFlight) { + try { + Thread.sleep(this.maxRecordInFlightsSleepDurationInMillis); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + } + ListenableFuture recordResult = this.kinesisProducer.addUserRecord(userRecord); return listenableFutureToCompletableFuture(recordResult) .thenApply(UserRecordResponse::new); diff --git a/src/test/java/org/springframework/integration/aws/outbound/KplMessageHandlerTest.java b/src/test/java/org/springframework/integration/aws/outbound/KplMessageHandlerTest.java new file mode 100644 index 00000000..93fe2c7e --- /dev/null +++ b/src/test/java/org/springframework/integration/aws/outbound/KplMessageHandlerTest.java @@ -0,0 +1,142 @@ +/* + * Copyright 2019-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.aws.outbound; + +import com.amazonaws.services.kinesis.producer.KinesisProducer; +import com.amazonaws.services.kinesis.producer.UserRecord; +import com.google.common.util.concurrent.ListenableFuture; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.integration.annotation.ServiceActivator; +import org.springframework.integration.aws.support.AwsHeaders; +import org.springframework.integration.config.EnableIntegration; +import org.springframework.integration.support.json.EmbeddedJsonHeadersMessageMapper; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.MessageHandler; +import org.springframework.messaging.support.MessageBuilder; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.BDDMockito.given; +import static org.mockito.Mockito.clearInvocations; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +/** + * @author Siddharth Jain + * + * @since 4.0 + */ +@SpringJUnitConfig +@DirtiesContext +public class KplMessageHandlerTest { + + @Autowired + protected KinesisProducer kinesisProducer; + + @Autowired + protected MessageChannel kinesisSendChannel; + + @Autowired + protected KplMessageHandler kplMessageHandler; + + @Test + @SuppressWarnings("unchecked") + void testKPLMessageHandler_raw_payload_success() { + given(this.kinesisProducer.addUserRecord(any(UserRecord.class))) + .willReturn(mock(ListenableFuture.class)); + final Message message = MessageBuilder + .withPayload("message1") + .setHeader(AwsHeaders.PARTITION_KEY, "fooKey") + .setHeader(AwsHeaders.SEQUENCE_NUMBER, "10") + .setHeader("foo", "bar") + .build(); + + + ArgumentCaptor userRecordRequestArgumentCaptor = ArgumentCaptor + .forClass(UserRecord.class); + + this.kinesisSendChannel.send(message); + verify(this.kinesisProducer).addUserRecord(userRecordRequestArgumentCaptor.capture()); + + UserRecord userRecord = userRecordRequestArgumentCaptor.getValue(); + assertThat(userRecord.getStreamName()).isEqualTo("foo"); + assertThat(userRecord.getPartitionKey()).isEqualTo("fooKey"); + assertThat(userRecord.getExplicitHashKey()).isNull(); + } + + @Test + @SuppressWarnings("unchecked") + void testKPLMessageHandler_raw_payload_success_backpressure_test() { + given(this.kinesisProducer.addUserRecord(any(UserRecord.class))) + .willReturn(mock(ListenableFuture.class)); + this.kplMessageHandler.setMaxOutstandingRecordsInFlight(1); + this.kplMessageHandler.setMaxRecordInFlightsSleepDurationInMillis(100); + given(this.kinesisProducer.getOutstandingRecordsCount()).willReturn(2); + final Message message = MessageBuilder + .withPayload("message1") + .setHeader(AwsHeaders.PARTITION_KEY, "fooKey") + .setHeader(AwsHeaders.SEQUENCE_NUMBER, "10") + .setHeader("foo", "bar") + .build(); + + + ArgumentCaptor userRecordRequestArgumentCaptor = ArgumentCaptor + .forClass(UserRecord.class); + + this.kinesisSendChannel.send(message); + verify(this.kinesisProducer).addUserRecord(userRecordRequestArgumentCaptor.capture()); + + UserRecord userRecord = userRecordRequestArgumentCaptor.getValue(); + assertThat(userRecord.getStreamName()).isEqualTo("foo"); + assertThat(userRecord.getPartitionKey()).isEqualTo("fooKey"); + assertThat(userRecord.getExplicitHashKey()).isNull(); + } + + @AfterEach + public void tearDown() { + clearInvocations(this.kinesisProducer); + } + + @Configuration + @EnableIntegration + public static class ContextConfiguration { + + @Bean + public KinesisProducer kinesisProducer() { + return mock(KinesisProducer.class); + } + + @Bean + @ServiceActivator(inputChannel = "kinesisSendChannel") + public MessageHandler kplMessageHandler(KinesisProducer kinesisProducer) { + KplMessageHandler kplMessageHandler = new KplMessageHandler(kinesisProducer); + kplMessageHandler.setAsync(true); + kplMessageHandler.setStream("foo"); + kplMessageHandler.setEmbeddedHeadersMapper(new EmbeddedJsonHeadersMessageMapper("foo")); + return kplMessageHandler; + } + } +} From 3d7de0fced524204784f90d04c3fdec5ce657853 Mon Sep 17 00:00:00 2001 From: Siddharth Jain Date: Sat, 14 Dec 2024 12:44:54 +0530 Subject: [PATCH 02/20] GH-249 Addressing code review comments related to code style. --- src/checkstyle/checkstyle.xml | 6 ++- .../aws/outbound/KplMessageHandler.java | 38 ++++++++++++------- ...rTest.java => KplMessageHandlerTests.java} | 9 +++-- 3 files changed, 34 insertions(+), 19 deletions(-) rename src/test/java/org/springframework/integration/aws/outbound/{KplMessageHandlerTest.java => KplMessageHandlerTests.java} (96%) diff --git a/src/checkstyle/checkstyle.xml b/src/checkstyle/checkstyle.xml index 4c4e8403..194131df 100644 --- a/src/checkstyle/checkstyle.xml +++ b/src/checkstyle/checkstyle.xml @@ -177,6 +177,10 @@ - + + + + + diff --git a/src/main/java/org/springframework/integration/aws/outbound/KplMessageHandler.java b/src/main/java/org/springframework/integration/aws/outbound/KplMessageHandler.java index 04b11027..5f31ecdd 100644 --- a/src/main/java/org/springframework/integration/aws/outbound/KplMessageHandler.java +++ b/src/main/java/org/springframework/integration/aws/outbound/KplMessageHandler.java @@ -100,9 +100,9 @@ public class KplMessageHandler extends AbstractAwsMessageHandler implement private volatile ScheduledFuture flushFuture; - private long maxRecordsInFlight = 0; + private long maxInFlightRecords = 0; - private int maxRecordInFlightsSleepDurationInMillis = 100; + private int maxInFlightRecordsDuration = 100; public KplMessageHandler(KinesisProducer kinesisProducer) { Assert.notNull(kinesisProducer, "'kinesisProducer' must not be null."); @@ -121,23 +121,31 @@ public void setConverter(Converter converter) { } /** - * When in KPL mode, the setting allows handling backpressure on the KPL native process. Setting this value would enable a sleep on the KPL Thread for the specified number of milliseconds defined in maxRecordInFlightsSleepDurationInMillis. + * When in KPL mode, the setting allows handling backpressure on the KPL native process. + * Setting this value would enable a sleep on the KPL Thread for the specified number of milliseconds defined in + * maxRecordInFlightsSleepDurationInMillis. * - * @param maxRecordsInFlight Defaulted to 0. Value of 0 indicates that Backpressure handling is not enabled. Specify a positive value to enable back pressure. + * @param maxRecordsInFlight Defaulted to 0. Value of 0 indicates that Backpressure handling is not enabled. + * Specify a positive value to enable back pressure. + * @since 3.0.9 */ - public void setMaxOutstandingRecordsInFlight(long maxRecordsInFlight) { + public void setMaxRecordsInFlight(long maxRecordsInFlight) { Assert.isTrue(maxRecordsInFlight > 0, "'maxRecordsInFlight must be greater than 0."); - this.maxRecordsInFlight = maxRecordsInFlight; + this.maxInFlightRecords = maxRecordsInFlight; } /** - * The setting allows handling backpressure on the KPL native process. Enabled when maxOutstandingRecordsCount is greater than 0. The configurations puts the KPL Thread to sleep for the specified number of milliseconds. + * The setting allows handling backpressure on the KPL native process. + * Enabled when maxOutstandingRecordsCount is greater than 0. + * The configurations puts the KPL Thread to sleep for the specified number of milliseconds. * - * @param maxRecordInFlightsSleepDurationInMillis Default is 100ms. + * @param maxInFlightRecordsDuration Default is 100ms. + * @since 3.0.9 */ - public void setMaxRecordInFlightsSleepDurationInMillis(int maxRecordInFlightsSleepDurationInMillis) { - Assert.isTrue(maxRecordInFlightsSleepDurationInMillis > 0, "'maxRecordInFlightsSleepDurationInMillis must be greater than 0."); - this.maxRecordInFlightsSleepDurationInMillis = maxRecordInFlightsSleepDurationInMillis; + public void setMaxInFlightRecordsDuration(int maxInFlightRecordsDuration) { + Assert.isTrue(maxInFlightRecordsDuration > 0, + "'maxRecordInFlightsSleepDurationInMillis must be greater than 0."); + this.maxInFlightRecordsDuration = maxInFlightRecordsDuration; } /** @@ -394,9 +402,10 @@ private void setGlueSchemaIntoUserRecordIfAny(UserRecord userRecord, Message } private CompletableFuture handleUserRecord(UserRecord userRecord) { - if (this.maxRecordsInFlight != -1 && this.kinesisProducer.getOutstandingRecordsCount() > this.maxRecordsInFlight) { + if (this.maxInFlightRecords != -1 && + this.kinesisProducer.getOutstandingRecordsCount() > this.maxInFlightRecords) { try { - Thread.sleep(this.maxRecordInFlightsSleepDurationInMillis); + Thread.sleep(this.maxInFlightRecordsDuration); } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -439,7 +448,8 @@ private PutRecordRequest buildPutRecordRequest(Message message) { if (!StringUtils.hasText(partitionKey) && this.partitionKeyExpression != null) { partitionKey = this.partitionKeyExpression.getValue(getEvaluationContext(), message, String.class); } - Assert.state(partitionKey != null, "'partitionKey' must not be null for sending a Kinesis record. " + Assert.state(partitionKey != null, + "'partitionKey' must not be null for sending a Kinesis record. " + "Consider configuring this handler with a 'partitionKey'( or 'partitionKeyExpression') " + "or supply an 'aws_partitionKey' message header."); diff --git a/src/test/java/org/springframework/integration/aws/outbound/KplMessageHandlerTest.java b/src/test/java/org/springframework/integration/aws/outbound/KplMessageHandlerTests.java similarity index 96% rename from src/test/java/org/springframework/integration/aws/outbound/KplMessageHandlerTest.java rename to src/test/java/org/springframework/integration/aws/outbound/KplMessageHandlerTests.java index 93fe2c7e..4a0b1ebe 100644 --- a/src/test/java/org/springframework/integration/aws/outbound/KplMessageHandlerTest.java +++ b/src/test/java/org/springframework/integration/aws/outbound/KplMessageHandlerTests.java @@ -47,11 +47,11 @@ /** * @author Siddharth Jain * - * @since 4.0 + * @since 3.0.9 */ @SpringJUnitConfig @DirtiesContext -public class KplMessageHandlerTest { +public class KplMessageHandlerTests { @Autowired protected KinesisProducer kinesisProducer; @@ -92,8 +92,8 @@ void testKPLMessageHandler_raw_payload_success() { void testKPLMessageHandler_raw_payload_success_backpressure_test() { given(this.kinesisProducer.addUserRecord(any(UserRecord.class))) .willReturn(mock(ListenableFuture.class)); - this.kplMessageHandler.setMaxOutstandingRecordsInFlight(1); - this.kplMessageHandler.setMaxRecordInFlightsSleepDurationInMillis(100); + this.kplMessageHandler.setMaxRecordsInFlight(1); + this.kplMessageHandler.setMaxInFlightRecordsDuration(100); given(this.kinesisProducer.getOutstandingRecordsCount()).willReturn(2); final Message message = MessageBuilder .withPayload("message1") @@ -139,4 +139,5 @@ public MessageHandler kplMessageHandler(KinesisProducer kinesisProducer) { return kplMessageHandler; } } + } From 4e3b5713d3687ecdad60f77c696069858601e077 Mon Sep 17 00:00:00 2001 From: Siddharth Jain Date: Sat, 14 Dec 2024 19:48:59 +0530 Subject: [PATCH 03/20] GH-249 Addressed smart handling when KPL Buffer is at max capacity. Implemented exponential retry pattern. --- .../aws/outbound/KplMessageHandler.java | 83 +++++++++++++++--- .../aws/outbound/KplMessageHandlerTests.java | 86 +++++++++++++++++-- 2 files changed, 150 insertions(+), 19 deletions(-) diff --git a/src/main/java/org/springframework/integration/aws/outbound/KplMessageHandler.java b/src/main/java/org/springframework/integration/aws/outbound/KplMessageHandler.java index 5f31ecdd..27a319fe 100644 --- a/src/main/java/org/springframework/integration/aws/outbound/KplMessageHandler.java +++ b/src/main/java/org/springframework/integration/aws/outbound/KplMessageHandler.java @@ -102,7 +102,11 @@ public class KplMessageHandler extends AbstractAwsMessageHandler implement private long maxInFlightRecords = 0; - private int maxInFlightRecordsDuration = 100; + private int maxInFlightRecordsBackoffDuration = 100; + + private int maxInFlightRecordsBackoffRate = 2; + + private int maxInFlightRecordsBackoffMaxAttempts = 3; public KplMessageHandler(KinesisProducer kinesisProducer) { Assert.notNull(kinesisProducer, "'kinesisProducer' must not be null."); @@ -126,7 +130,7 @@ public void setConverter(Converter converter) { * maxRecordInFlightsSleepDurationInMillis. * * @param maxRecordsInFlight Defaulted to 0. Value of 0 indicates that Backpressure handling is not enabled. - * Specify a positive value to enable back pressure. + * Specify a positive value to enable back pressure. * @since 3.0.9 */ public void setMaxRecordsInFlight(long maxRecordsInFlight) { @@ -139,13 +143,36 @@ public void setMaxRecordsInFlight(long maxRecordsInFlight) { * Enabled when maxOutstandingRecordsCount is greater than 0. * The configurations puts the KPL Thread to sleep for the specified number of milliseconds. * - * @param maxInFlightRecordsDuration Default is 100ms. + * @param maxInFlightRecordsBackoffDuration Default is 100ms. * @since 3.0.9 */ - public void setMaxInFlightRecordsDuration(int maxInFlightRecordsDuration) { - Assert.isTrue(maxInFlightRecordsDuration > 0, + public void setMaxInFlightRecordsBackoffDuration(int maxInFlightRecordsBackoffDuration) { + Assert.isTrue(maxInFlightRecordsBackoffDuration > 0, "'maxRecordInFlightsSleepDurationInMillis must be greater than 0."); - this.maxInFlightRecordsDuration = maxInFlightRecordsDuration; + this.maxInFlightRecordsBackoffDuration = maxInFlightRecordsBackoffDuration; + } + + /** + * The setting allows handling backpressure on the KPL native process using exponential retry. + * + * @param maxInFlightRecordsBackoffRate The property enables a back off rate to + * * define the exponential retry duration defined in setMaxInFlightRecordsBackoffDuration. Default is 2 + * @since 3.0.9 + */ + public void setMaxInFlightRecordsBackoffRate(int maxInFlightRecordsBackoffRate) { + this.maxInFlightRecordsBackoffRate = maxInFlightRecordsBackoffRate; + } + + /** + * The setting allows handling backpressure on the KPL native process using exponential retry. On attempts + * exhausted, RunTimeException is thrown. + * + * @param maxInFlightRecordsBackoffMaxAttempts When specified, maxInFlightRecordsBackoffMaxAttempts defines the + * maximum of exponential retry attempts to wait until the KPL Buffer clears out. + * @since 3.0.9 + */ + public void setMaxInFlightRecordsBackoffMaxAttempts(int maxInFlightRecordsBackoffMaxAttempts) { + this.maxInFlightRecordsBackoffMaxAttempts = maxInFlightRecordsBackoffMaxAttempts; } /** @@ -402,20 +429,48 @@ private void setGlueSchemaIntoUserRecordIfAny(UserRecord userRecord, Message } private CompletableFuture handleUserRecord(UserRecord userRecord) { - if (this.maxInFlightRecords != -1 && - this.kinesisProducer.getOutstandingRecordsCount() > this.maxInFlightRecords) { + if (this.maxInFlightRecords != -1) { + waitForCapacityInUserRecordsBuffer(); + } + + ListenableFuture recordResult = this.kinesisProducer.addUserRecord(userRecord); + return listenableFutureToCompletableFuture(recordResult) + .thenApply(UserRecordResponse::new); + } + + private void waitForCapacityInUserRecordsBuffer() { + var kplOutstandingRecordsCount = this.kinesisProducer.getOutstandingRecordsCount(); + var attempts = 1; + var sleepDuration = this.maxInFlightRecordsBackoffDuration; + while (kplOutstandingRecordsCount >= this.maxInFlightRecords && + attempts <= this.maxInFlightRecordsBackoffMaxAttempts) { try { - Thread.sleep(this.maxInFlightRecordsDuration); + logger.info("Buffer size: {} has reached the max records limit of {}. Attempt# {}".formatted( + kplOutstandingRecordsCount, this.maxInFlightRecords)); + logger.info("Buffer sleeping for {} ms".formatted( + this.maxInFlightRecordsBackoffDuration)); + Thread.sleep(this.maxInFlightRecordsBackoffDuration); } - catch (InterruptedException e) { + catch (InterruptedException ex) { Thread.currentThread().interrupt(); - throw new RuntimeException(e); + } + finally { + sleepDuration = sleepDuration * this.maxInFlightRecordsBackoffRate; + attempts++; + kplOutstandingRecordsCount = this.kinesisProducer.getOutstandingRecordsCount(); } } - ListenableFuture recordResult = this.kinesisProducer.addUserRecord(userRecord); - return listenableFutureToCompletableFuture(recordResult) - .thenApply(UserRecordResponse::new); + if (kplOutstandingRecordsCount < this.maxInFlightRecords) { + logger.info("Buffer cleared on number of attempts: {}".formatted(attempts)); + return; + } + + if (attempts > this.maxInFlightRecordsBackoffMaxAttempts) { + logger.error("Buffer not cleared after maximum {} number of attempts & {} sleepDuration".formatted(attempts, + sleepDuration)); + throw new RuntimeException("KPL Buffer already at max capacity."); + } } private PutRecordRequest buildPutRecordRequest(Message message) { diff --git a/src/test/java/org/springframework/integration/aws/outbound/KplMessageHandlerTests.java b/src/test/java/org/springframework/integration/aws/outbound/KplMessageHandlerTests.java index 4a0b1ebe..ffc6287b 100644 --- a/src/test/java/org/springframework/integration/aws/outbound/KplMessageHandlerTests.java +++ b/src/test/java/org/springframework/integration/aws/outbound/KplMessageHandlerTests.java @@ -22,6 +22,7 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; @@ -44,9 +45,9 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; + /** * @author Siddharth Jain - * * @since 3.0.9 */ @SpringJUnitConfig @@ -89,12 +90,15 @@ void testKPLMessageHandler_raw_payload_success() { @Test @SuppressWarnings("unchecked") - void testKPLMessageHandler_raw_payload_success_backpressure_test() { + void testKPLMessageHandler_raw_payload_backpressure_capacity_available_Test() { given(this.kinesisProducer.addUserRecord(any(UserRecord.class))) .willReturn(mock(ListenableFuture.class)); - this.kplMessageHandler.setMaxRecordsInFlight(1); - this.kplMessageHandler.setMaxInFlightRecordsDuration(100); - given(this.kinesisProducer.getOutstandingRecordsCount()).willReturn(2); + this.kplMessageHandler.setMaxRecordsInFlight(2); + this.kplMessageHandler.setMaxInFlightRecordsBackoffDuration(100); + this.kplMessageHandler.setMaxInFlightRecordsBackoffMaxAttempts(2); + this.kplMessageHandler.setMaxInFlightRecordsBackoffRate(2); + given(this.kinesisProducer.getOutstandingRecordsCount()) + .willReturn(1); final Message message = MessageBuilder .withPayload("message1") .setHeader(AwsHeaders.PARTITION_KEY, "fooKey") @@ -108,13 +112,85 @@ void testKPLMessageHandler_raw_payload_success_backpressure_test() { this.kinesisSendChannel.send(message); verify(this.kinesisProducer).addUserRecord(userRecordRequestArgumentCaptor.capture()); + verify(this.kinesisProducer, Mockito.times(1)).getOutstandingRecordsCount(); + UserRecord userRecord = userRecordRequestArgumentCaptor.getValue(); + assertThat(userRecord.getStreamName()).isEqualTo("foo"); + assertThat(userRecord.getPartitionKey()).isEqualTo("fooKey"); + assertThat(userRecord.getExplicitHashKey()).isNull(); + } + + @Test + @SuppressWarnings("unchecked") + void testKPLMessageHandler_raw_payload_backpressure_insuff_capacity_test() { + given(this.kinesisProducer.addUserRecord(any(UserRecord.class))) + .willReturn(mock(ListenableFuture.class)); + this.kplMessageHandler.setMaxRecordsInFlight(2); + this.kplMessageHandler.setMaxInFlightRecordsBackoffDuration(100); + this.kplMessageHandler.setMaxInFlightRecordsBackoffMaxAttempts(2); + this.kplMessageHandler.setMaxInFlightRecordsBackoffRate(2); + given(this.kinesisProducer.getOutstandingRecordsCount()) + .willReturn(3) + .willReturn(2) + .willReturn(1) + .willReturn(0); + final Message message = MessageBuilder + .withPayload("message1") + .setHeader(AwsHeaders.PARTITION_KEY, "fooKey") + .setHeader(AwsHeaders.SEQUENCE_NUMBER, "10") + .setHeader("foo", "bar") + .build(); + + + ArgumentCaptor userRecordRequestArgumentCaptor = ArgumentCaptor + .forClass(UserRecord.class); + this.kinesisSendChannel.send(message); + verify(this.kinesisProducer).addUserRecord(userRecordRequestArgumentCaptor.capture()); + verify(this.kinesisProducer, Mockito.times(3)).getOutstandingRecordsCount(); UserRecord userRecord = userRecordRequestArgumentCaptor.getValue(); assertThat(userRecord.getStreamName()).isEqualTo("foo"); assertThat(userRecord.getPartitionKey()).isEqualTo("fooKey"); assertThat(userRecord.getExplicitHashKey()).isNull(); } + @Test + @SuppressWarnings("unchecked") + void testKPLMessageHandler_raw_payload_backpressure_failure_test() { + given(this.kinesisProducer.addUserRecord(any(UserRecord.class))) + .willReturn(mock(ListenableFuture.class)); + this.kplMessageHandler.setMaxRecordsInFlight(2); + this.kplMessageHandler.setMaxInFlightRecordsBackoffDuration(100); + this.kplMessageHandler.setMaxInFlightRecordsBackoffMaxAttempts(2); + this.kplMessageHandler.setMaxInFlightRecordsBackoffRate(2); + given(this.kinesisProducer.getOutstandingRecordsCount()) + .willReturn(5) + .willReturn(4) + .willReturn(3); + final Message message = MessageBuilder + .withPayload("message1") + .setHeader(AwsHeaders.PARTITION_KEY, "fooKey") + .setHeader(AwsHeaders.SEQUENCE_NUMBER, "10") + .setHeader("foo", "bar") + .build(); + + ArgumentCaptor userRecordRequestArgumentCaptor = ArgumentCaptor + .forClass(UserRecord.class); + + try { + this.kinesisSendChannel.send(message); + } + catch (Exception ex) { + assertThat(ex).isNotNull(); + assertThat(ex.getCause()).isNotNull(); + assertThat(ex.getCause().getClass()).isEqualTo(RuntimeException.class); + assertThat(ex.getCause().getClass()).isEqualTo(RuntimeException.class); + assertThat(ex.getCause().getMessage()).isEqualTo("KPL Buffer already at max capacity."); + } + + verify(this.kinesisProducer, Mockito.times(0)).addUserRecord(any(UserRecord.class)); + verify(this.kinesisProducer, Mockito.times(3)).getOutstandingRecordsCount(); + } + @AfterEach public void tearDown() { clearInvocations(this.kinesisProducer); From 99e4d2d2b2340f60f1c5dda80a751de5b149b957 Mon Sep 17 00:00:00 2001 From: Siddharth Jain Date: Sat, 21 Dec 2024 13:30:20 +0530 Subject: [PATCH 04/20] GH-249 Code Review comments address. Updated Tests, javadocs. Added links and references to the properties. Removed commented code from checkstyle. --- src/checkstyle/checkstyle.xml | 5 -- .../aws/outbound/KplMessageHandler.java | 47 +++++++++++-------- .../aws/outbound/KplMessageHandlerTests.java | 4 -- 3 files changed, 27 insertions(+), 29 deletions(-) diff --git a/src/checkstyle/checkstyle.xml b/src/checkstyle/checkstyle.xml index 194131df..afabea26 100644 --- a/src/checkstyle/checkstyle.xml +++ b/src/checkstyle/checkstyle.xml @@ -178,9 +178,4 @@ - - - - - diff --git a/src/main/java/org/springframework/integration/aws/outbound/KplMessageHandler.java b/src/main/java/org/springframework/integration/aws/outbound/KplMessageHandler.java index 27a319fe..fb95cd80 100644 --- a/src/main/java/org/springframework/integration/aws/outbound/KplMessageHandler.java +++ b/src/main/java/org/springframework/integration/aws/outbound/KplMessageHandler.java @@ -125,13 +125,13 @@ public void setConverter(Converter converter) { } /** - * When in KPL mode, the setting allows handling backpressure on the KPL native process. - * Setting this value would enable a sleep on the KPL Thread for the specified number of milliseconds defined in - * maxRecordInFlightsSleepDurationInMillis. - * + * Configure maximum records in flight on the KPL Native Process for handling backpressure. Used in conjuction + * with {@link KplMessageHandler#maxInFlightRecordsBackoffDuration} * @param maxRecordsInFlight Defaulted to 0. Value of 0 indicates that Backpressure handling is not enabled. - * Specify a positive value to enable back pressure. * @since 3.0.9 + * @see KplMessageHandler#setMaxInFlightRecordsBackoffDuration + * @see KplMessageHandler#setMaxInFlightRecordsBackoffRate + * @see KplMessageHandler#setMaxInFlightRecordsBackoffMaxAttempts */ public void setMaxRecordsInFlight(long maxRecordsInFlight) { Assert.isTrue(maxRecordsInFlight > 0, "'maxRecordsInFlight must be greater than 0."); @@ -139,37 +139,44 @@ public void setMaxRecordsInFlight(long maxRecordsInFlight) { } /** - * The setting allows handling backpressure on the KPL native process. - * Enabled when maxOutstandingRecordsCount is greater than 0. - * The configurations puts the KPL Thread to sleep for the specified number of milliseconds. - * - * @param maxInFlightRecordsBackoffDuration Default is 100ms. + * Configure a backoff duration period in milliseconds when the number of records in flight in KPL Native Process + * is greater than or equal to {@link KplMessageHandler#maxInFlightRecords}. The configuration helps in handling + * backpressure by sleeping the KPL Thread using exponential backoff. Enabled when + * {@link KplMessageHandler#maxInFlightRecords} is greater than 0. + * @param maxInFlightRecordsBackoffDuration Initial backoff duration in milliseconds. Default is 100ms. * @since 3.0.9 + * @see KplMessageHandler#setMaxRecordsInFlight + * @see KplMessageHandler#setMaxInFlightRecordsBackoffRate + * @see KplMessageHandler#setMaxInFlightRecordsBackoffMaxAttempts */ public void setMaxInFlightRecordsBackoffDuration(int maxInFlightRecordsBackoffDuration) { Assert.isTrue(maxInFlightRecordsBackoffDuration > 0, - "'maxRecordInFlightsSleepDurationInMillis must be greater than 0."); + "'maxInFlightRecordsBackoffDuration must be greater than 0."); this.maxInFlightRecordsBackoffDuration = maxInFlightRecordsBackoffDuration; } /** - * The setting allows handling backpressure on the KPL native process using exponential retry. - * - * @param maxInFlightRecordsBackoffRate The property enables a back off rate to - * * define the exponential retry duration defined in setMaxInFlightRecordsBackoffDuration. Default is 2 + * Configure exponential back off rate when handling backpressure on the KPL Native process using + * {@link KplMessageHandler#maxInFlightRecords}. + * @param maxInFlightRecordsBackoffRate Exponential back off rate. Default is 2 * @since 3.0.9 + * @see KplMessageHandler#setMaxRecordsInFlight + * @see KplMessageHandler#setMaxInFlightRecordsBackoffDuration + * @see KplMessageHandler#setMaxInFlightRecordsBackoffMaxAttempts */ public void setMaxInFlightRecordsBackoffRate(int maxInFlightRecordsBackoffRate) { this.maxInFlightRecordsBackoffRate = maxInFlightRecordsBackoffRate; } /** - * The setting allows handling backpressure on the KPL native process using exponential retry. On attempts - * exhausted, RunTimeException is thrown. - * - * @param maxInFlightRecordsBackoffMaxAttempts When specified, maxInFlightRecordsBackoffMaxAttempts defines the - * maximum of exponential retry attempts to wait until the KPL Buffer clears out. + * Configure maximum number of retry attempts with exponential backoff until there is a capacity in the KPL + * native process using. On maximum attempts exhausted, RunTimeException is thrown. + * @param maxInFlightRecordsBackoffMaxAttempts maximum of exponential retry attempts to waiting for capacity in KPL + * buffer. * @since 3.0.9 + * @see KplMessageHandler#setMaxRecordsInFlight + * @see KplMessageHandler#setMaxInFlightRecordsBackoffDuration + * @see KplMessageHandler#setMaxInFlightRecordsBackoffRate */ public void setMaxInFlightRecordsBackoffMaxAttempts(int maxInFlightRecordsBackoffMaxAttempts) { this.maxInFlightRecordsBackoffMaxAttempts = maxInFlightRecordsBackoffMaxAttempts; diff --git a/src/test/java/org/springframework/integration/aws/outbound/KplMessageHandlerTests.java b/src/test/java/org/springframework/integration/aws/outbound/KplMessageHandlerTests.java index ffc6287b..f450c2f1 100644 --- a/src/test/java/org/springframework/integration/aws/outbound/KplMessageHandlerTests.java +++ b/src/test/java/org/springframework/integration/aws/outbound/KplMessageHandlerTests.java @@ -30,7 +30,6 @@ import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.aws.support.AwsHeaders; import org.springframework.integration.config.EnableIntegration; -import org.springframework.integration.support.json.EmbeddedJsonHeadersMessageMapper; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; @@ -45,7 +44,6 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; - /** * @author Siddharth Jain * @since 3.0.9 @@ -183,7 +181,6 @@ void testKPLMessageHandler_raw_payload_backpressure_failure_test() { assertThat(ex).isNotNull(); assertThat(ex.getCause()).isNotNull(); assertThat(ex.getCause().getClass()).isEqualTo(RuntimeException.class); - assertThat(ex.getCause().getClass()).isEqualTo(RuntimeException.class); assertThat(ex.getCause().getMessage()).isEqualTo("KPL Buffer already at max capacity."); } @@ -211,7 +208,6 @@ public MessageHandler kplMessageHandler(KinesisProducer kinesisProducer) { KplMessageHandler kplMessageHandler = new KplMessageHandler(kinesisProducer); kplMessageHandler.setAsync(true); kplMessageHandler.setStream("foo"); - kplMessageHandler.setEmbeddedHeadersMapper(new EmbeddedJsonHeadersMessageMapper("foo")); return kplMessageHandler; } } From 512fa18b2d365db462c4d4844e81a976ad6396ce Mon Sep 17 00:00:00 2001 From: Siddharth Jain Date: Sat, 21 Dec 2024 13:43:07 +0530 Subject: [PATCH 05/20] GH-249 Addressing consistency related comments in the javadocs. --- .../aws/outbound/KplMessageHandler.java | 46 +++++++++---------- .../aws/outbound/KplMessageHandlerTests.java | 12 ++--- 2 files changed, 29 insertions(+), 29 deletions(-) diff --git a/src/main/java/org/springframework/integration/aws/outbound/KplMessageHandler.java b/src/main/java/org/springframework/integration/aws/outbound/KplMessageHandler.java index fb95cd80..be5c4987 100644 --- a/src/main/java/org/springframework/integration/aws/outbound/KplMessageHandler.java +++ b/src/main/java/org/springframework/integration/aws/outbound/KplMessageHandler.java @@ -102,7 +102,7 @@ public class KplMessageHandler extends AbstractAwsMessageHandler implement private long maxInFlightRecords = 0; - private int maxInFlightRecordsBackoffDuration = 100; + private int maxInFlightRecordsInitBackoffDuration = 100; private int maxInFlightRecordsBackoffRate = 2; @@ -126,33 +126,33 @@ public void setConverter(Converter converter) { /** * Configure maximum records in flight on the KPL Native Process for handling backpressure. Used in conjuction - * with {@link KplMessageHandler#maxInFlightRecordsBackoffDuration} - * @param maxRecordsInFlight Defaulted to 0. Value of 0 indicates that Backpressure handling is not enabled. + * with {@link KplMessageHandler#maxInFlightRecordsInitBackoffDuration} + * @param maxInFlightRecords Defaulted to 0. Value of 0 indicates that Backpressure handling is not enabled. * @since 3.0.9 - * @see KplMessageHandler#setMaxInFlightRecordsBackoffDuration + * @see KplMessageHandler#setMaxInFlightRecordsInitBackoffDuration * @see KplMessageHandler#setMaxInFlightRecordsBackoffRate * @see KplMessageHandler#setMaxInFlightRecordsBackoffMaxAttempts */ - public void setMaxRecordsInFlight(long maxRecordsInFlight) { - Assert.isTrue(maxRecordsInFlight > 0, "'maxRecordsInFlight must be greater than 0."); - this.maxInFlightRecords = maxRecordsInFlight; + public void setMaxInFlightRecords(long maxInFlightRecords) { + Assert.isTrue(maxInFlightRecords > 0, "'maxInFlightRecords must be greater than 0."); + this.maxInFlightRecords = maxInFlightRecords; } /** - * Configure a backoff duration period in milliseconds when the number of records in flight in KPL Native Process - * is greater than or equal to {@link KplMessageHandler#maxInFlightRecords}. The configuration helps in handling - * backpressure by sleeping the KPL Thread using exponential backoff. Enabled when - * {@link KplMessageHandler#maxInFlightRecords} is greater than 0. - * @param maxInFlightRecordsBackoffDuration Initial backoff duration in milliseconds. Default is 100ms. + * Configure a initial backoff duration period in milliseconds when the number of records in flight in KPL Native + * Process is greater than or equal to {@link KplMessageHandler#maxInFlightRecords}. + * The configuration helps in handling backpressure by sleeping the KPL Thread using exponential backoff. Enabled + * when {@link KplMessageHandler#maxInFlightRecords} is greater than 0. + * @param maxInFlightRecordsInitBackoffDuration Initial backoff duration in milliseconds. Default is 100ms. * @since 3.0.9 - * @see KplMessageHandler#setMaxRecordsInFlight + * @see KplMessageHandler#setMaxInFlightRecords * @see KplMessageHandler#setMaxInFlightRecordsBackoffRate * @see KplMessageHandler#setMaxInFlightRecordsBackoffMaxAttempts */ - public void setMaxInFlightRecordsBackoffDuration(int maxInFlightRecordsBackoffDuration) { - Assert.isTrue(maxInFlightRecordsBackoffDuration > 0, + public void setMaxInFlightRecordsInitBackoffDuration(int maxInFlightRecordsInitBackoffDuration) { + Assert.isTrue(maxInFlightRecordsInitBackoffDuration > 0, "'maxInFlightRecordsBackoffDuration must be greater than 0."); - this.maxInFlightRecordsBackoffDuration = maxInFlightRecordsBackoffDuration; + this.maxInFlightRecordsInitBackoffDuration = maxInFlightRecordsInitBackoffDuration; } /** @@ -160,8 +160,8 @@ public void setMaxInFlightRecordsBackoffDuration(int maxInFlightRecordsBackoffDu * {@link KplMessageHandler#maxInFlightRecords}. * @param maxInFlightRecordsBackoffRate Exponential back off rate. Default is 2 * @since 3.0.9 - * @see KplMessageHandler#setMaxRecordsInFlight - * @see KplMessageHandler#setMaxInFlightRecordsBackoffDuration + * @see KplMessageHandler#setMaxInFlightRecords + * @see KplMessageHandler#setMaxInFlightRecordsInitBackoffDuration * @see KplMessageHandler#setMaxInFlightRecordsBackoffMaxAttempts */ public void setMaxInFlightRecordsBackoffRate(int maxInFlightRecordsBackoffRate) { @@ -174,8 +174,8 @@ public void setMaxInFlightRecordsBackoffRate(int maxInFlightRecordsBackoffRate) * @param maxInFlightRecordsBackoffMaxAttempts maximum of exponential retry attempts to waiting for capacity in KPL * buffer. * @since 3.0.9 - * @see KplMessageHandler#setMaxRecordsInFlight - * @see KplMessageHandler#setMaxInFlightRecordsBackoffDuration + * @see KplMessageHandler#setMaxInFlightRecords + * @see KplMessageHandler#setMaxInFlightRecordsInitBackoffDuration * @see KplMessageHandler#setMaxInFlightRecordsBackoffRate */ public void setMaxInFlightRecordsBackoffMaxAttempts(int maxInFlightRecordsBackoffMaxAttempts) { @@ -448,15 +448,15 @@ private CompletableFuture handleUserRecord(UserRecord userRe private void waitForCapacityInUserRecordsBuffer() { var kplOutstandingRecordsCount = this.kinesisProducer.getOutstandingRecordsCount(); var attempts = 1; - var sleepDuration = this.maxInFlightRecordsBackoffDuration; + var sleepDuration = this.maxInFlightRecordsInitBackoffDuration; while (kplOutstandingRecordsCount >= this.maxInFlightRecords && attempts <= this.maxInFlightRecordsBackoffMaxAttempts) { try { logger.info("Buffer size: {} has reached the max records limit of {}. Attempt# {}".formatted( kplOutstandingRecordsCount, this.maxInFlightRecords)); logger.info("Buffer sleeping for {} ms".formatted( - this.maxInFlightRecordsBackoffDuration)); - Thread.sleep(this.maxInFlightRecordsBackoffDuration); + this.maxInFlightRecordsInitBackoffDuration)); + Thread.sleep(this.maxInFlightRecordsInitBackoffDuration); } catch (InterruptedException ex) { Thread.currentThread().interrupt(); diff --git a/src/test/java/org/springframework/integration/aws/outbound/KplMessageHandlerTests.java b/src/test/java/org/springframework/integration/aws/outbound/KplMessageHandlerTests.java index f450c2f1..263d455b 100644 --- a/src/test/java/org/springframework/integration/aws/outbound/KplMessageHandlerTests.java +++ b/src/test/java/org/springframework/integration/aws/outbound/KplMessageHandlerTests.java @@ -91,8 +91,8 @@ void testKPLMessageHandler_raw_payload_success() { void testKPLMessageHandler_raw_payload_backpressure_capacity_available_Test() { given(this.kinesisProducer.addUserRecord(any(UserRecord.class))) .willReturn(mock(ListenableFuture.class)); - this.kplMessageHandler.setMaxRecordsInFlight(2); - this.kplMessageHandler.setMaxInFlightRecordsBackoffDuration(100); + this.kplMessageHandler.setMaxInFlightRecords(2); + this.kplMessageHandler.setMaxInFlightRecordsInitBackoffDuration(100); this.kplMessageHandler.setMaxInFlightRecordsBackoffMaxAttempts(2); this.kplMessageHandler.setMaxInFlightRecordsBackoffRate(2); given(this.kinesisProducer.getOutstandingRecordsCount()) @@ -122,8 +122,8 @@ void testKPLMessageHandler_raw_payload_backpressure_capacity_available_Test() { void testKPLMessageHandler_raw_payload_backpressure_insuff_capacity_test() { given(this.kinesisProducer.addUserRecord(any(UserRecord.class))) .willReturn(mock(ListenableFuture.class)); - this.kplMessageHandler.setMaxRecordsInFlight(2); - this.kplMessageHandler.setMaxInFlightRecordsBackoffDuration(100); + this.kplMessageHandler.setMaxInFlightRecords(2); + this.kplMessageHandler.setMaxInFlightRecordsInitBackoffDuration(100); this.kplMessageHandler.setMaxInFlightRecordsBackoffMaxAttempts(2); this.kplMessageHandler.setMaxInFlightRecordsBackoffRate(2); given(this.kinesisProducer.getOutstandingRecordsCount()) @@ -156,8 +156,8 @@ void testKPLMessageHandler_raw_payload_backpressure_insuff_capacity_test() { void testKPLMessageHandler_raw_payload_backpressure_failure_test() { given(this.kinesisProducer.addUserRecord(any(UserRecord.class))) .willReturn(mock(ListenableFuture.class)); - this.kplMessageHandler.setMaxRecordsInFlight(2); - this.kplMessageHandler.setMaxInFlightRecordsBackoffDuration(100); + this.kplMessageHandler.setMaxInFlightRecords(2); + this.kplMessageHandler.setMaxInFlightRecordsInitBackoffDuration(100); this.kplMessageHandler.setMaxInFlightRecordsBackoffMaxAttempts(2); this.kplMessageHandler.setMaxInFlightRecordsBackoffRate(2); given(this.kinesisProducer.getOutstandingRecordsCount()) From af06a15a1c4da54f01a48e86848ca4603977f438 Mon Sep 17 00:00:00 2001 From: Siddharth Jain Date: Sun, 22 Dec 2024 11:27:28 +0530 Subject: [PATCH 06/20] GH-249 Removing KPL Native Mentions in the java docs for KPL Message Handler. --- .../aws/outbound/KplMessageHandler.java | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/src/main/java/org/springframework/integration/aws/outbound/KplMessageHandler.java b/src/main/java/org/springframework/integration/aws/outbound/KplMessageHandler.java index be5c4987..cd7864c1 100644 --- a/src/main/java/org/springframework/integration/aws/outbound/KplMessageHandler.java +++ b/src/main/java/org/springframework/integration/aws/outbound/KplMessageHandler.java @@ -125,7 +125,7 @@ public void setConverter(Converter converter) { } /** - * Configure maximum records in flight on the KPL Native Process for handling backpressure. Used in conjuction + * Configure maximum records in flight for handling backpressure. Used in conjuction * with {@link KplMessageHandler#maxInFlightRecordsInitBackoffDuration} * @param maxInFlightRecords Defaulted to 0. Value of 0 indicates that Backpressure handling is not enabled. * @since 3.0.9 @@ -139,9 +139,8 @@ public void setMaxInFlightRecords(long maxInFlightRecords) { } /** - * Configure a initial backoff duration period in milliseconds when the number of records in flight in KPL Native - * Process is greater than or equal to {@link KplMessageHandler#maxInFlightRecords}. - * The configuration helps in handling backpressure by sleeping the KPL Thread using exponential backoff. Enabled + * Configure a initial backoff duration period in milliseconds when the number of records in flight is greater than or equal to {@link KplMessageHandler#maxInFlightRecords}. + * The configuration helps in handling backpressure by sleeping the Thread using exponential backoff. Enabled * when {@link KplMessageHandler#maxInFlightRecords} is greater than 0. * @param maxInFlightRecordsInitBackoffDuration Initial backoff duration in milliseconds. Default is 100ms. * @since 3.0.9 @@ -156,8 +155,7 @@ public void setMaxInFlightRecordsInitBackoffDuration(int maxInFlightRecordsInitB } /** - * Configure exponential back off rate when handling backpressure on the KPL Native process using - * {@link KplMessageHandler#maxInFlightRecords}. + * Configure exponential backoff rate when handling backpressure using {@link KplMessageHandler#maxInFlightRecords}. * @param maxInFlightRecordsBackoffRate Exponential back off rate. Default is 2 * @since 3.0.9 * @see KplMessageHandler#setMaxInFlightRecords @@ -169,10 +167,9 @@ public void setMaxInFlightRecordsBackoffRate(int maxInFlightRecordsBackoffRate) } /** - * Configure maximum number of retry attempts with exponential backoff until there is a capacity in the KPL - * native process using. On maximum attempts exhausted, RunTimeException is thrown. - * @param maxInFlightRecordsBackoffMaxAttempts maximum of exponential retry attempts to waiting for capacity in KPL - * buffer. + * Configure maximum number of retry attempts with exponential backoff until there is available capacity. On + * maximum attempts exhausted, RunTimeException is thrown. + * @param maxInFlightRecordsBackoffMaxAttempts maximum of exponential retry attempts to waiting for capacity. * @since 3.0.9 * @see KplMessageHandler#setMaxInFlightRecords * @see KplMessageHandler#setMaxInFlightRecordsInitBackoffDuration @@ -476,7 +473,7 @@ private void waitForCapacityInUserRecordsBuffer() { if (attempts > this.maxInFlightRecordsBackoffMaxAttempts) { logger.error("Buffer not cleared after maximum {} number of attempts & {} sleepDuration".formatted(attempts, sleepDuration)); - throw new RuntimeException("KPL Buffer already at max capacity."); + throw new RuntimeException("Buffer already at max capacity."); } } From 7daa438c2f6bdceb5be555c002684a175b48de61 Mon Sep 17 00:00:00 2001 From: Siddharth Jain Date: Sun, 22 Dec 2024 11:51:20 +0530 Subject: [PATCH 07/20] GH-249 Handling the sleep duration inside the while loop. --- .../integration/aws/outbound/KplMessageHandler.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/org/springframework/integration/aws/outbound/KplMessageHandler.java b/src/main/java/org/springframework/integration/aws/outbound/KplMessageHandler.java index cd7864c1..3a30ee2f 100644 --- a/src/main/java/org/springframework/integration/aws/outbound/KplMessageHandler.java +++ b/src/main/java/org/springframework/integration/aws/outbound/KplMessageHandler.java @@ -452,8 +452,8 @@ private void waitForCapacityInUserRecordsBuffer() { logger.info("Buffer size: {} has reached the max records limit of {}. Attempt# {}".formatted( kplOutstandingRecordsCount, this.maxInFlightRecords)); logger.info("Buffer sleeping for {} ms".formatted( - this.maxInFlightRecordsInitBackoffDuration)); - Thread.sleep(this.maxInFlightRecordsInitBackoffDuration); + sleepDuration)); + Thread.sleep(sleepDuration); } catch (InterruptedException ex) { Thread.currentThread().interrupt(); From 35f0d9c760614f57681346cff5c9173d446d7dde Mon Sep 17 00:00:00 2001 From: Siddharth Jain Date: Sun, 22 Dec 2024 12:15:03 +0530 Subject: [PATCH 08/20] GH-249 Addressing code review comments, handling checkstyle failures. --- .../integration/aws/outbound/KplMessageHandlerTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/java/org/springframework/integration/aws/outbound/KplMessageHandlerTests.java b/src/test/java/org/springframework/integration/aws/outbound/KplMessageHandlerTests.java index 263d455b..f707369d 100644 --- a/src/test/java/org/springframework/integration/aws/outbound/KplMessageHandlerTests.java +++ b/src/test/java/org/springframework/integration/aws/outbound/KplMessageHandlerTests.java @@ -181,7 +181,7 @@ void testKPLMessageHandler_raw_payload_backpressure_failure_test() { assertThat(ex).isNotNull(); assertThat(ex.getCause()).isNotNull(); assertThat(ex.getCause().getClass()).isEqualTo(RuntimeException.class); - assertThat(ex.getCause().getMessage()).isEqualTo("KPL Buffer already at max capacity."); + assertThat(ex.getCause().getMessage()).isEqualTo("Buffer already at max capacity."); } verify(this.kinesisProducer, Mockito.times(0)).addUserRecord(any(UserRecord.class)); From 9fb17f39f44df5c9833b0f6be416272a605a6c1e Mon Sep 17 00:00:00 2001 From: Siddharth Jain Date: Sun, 22 Dec 2024 12:50:32 +0530 Subject: [PATCH 09/20] GH-249 Renamed maxInFlightRecordsInitBackoffDuration to maxInFlightRecordsInitialBackoffDuration. Corrected capacity loop entry condition to use greater than zero. --- src/checkstyle/checkstyle.xml | 1 + .../aws/outbound/KplMessageHandler.java | 31 ++++++++++--------- .../aws/outbound/KplMessageHandlerTests.java | 13 +++----- 3 files changed, 22 insertions(+), 23 deletions(-) diff --git a/src/checkstyle/checkstyle.xml b/src/checkstyle/checkstyle.xml index afabea26..4c4e8403 100644 --- a/src/checkstyle/checkstyle.xml +++ b/src/checkstyle/checkstyle.xml @@ -177,5 +177,6 @@ + diff --git a/src/main/java/org/springframework/integration/aws/outbound/KplMessageHandler.java b/src/main/java/org/springframework/integration/aws/outbound/KplMessageHandler.java index 3a30ee2f..be8a4acd 100644 --- a/src/main/java/org/springframework/integration/aws/outbound/KplMessageHandler.java +++ b/src/main/java/org/springframework/integration/aws/outbound/KplMessageHandler.java @@ -102,7 +102,7 @@ public class KplMessageHandler extends AbstractAwsMessageHandler implement private long maxInFlightRecords = 0; - private int maxInFlightRecordsInitBackoffDuration = 100; + private int maxInFlightRecordsInitialBackoffDuration = 100; private int maxInFlightRecordsBackoffRate = 2; @@ -125,11 +125,11 @@ public void setConverter(Converter converter) { } /** - * Configure maximum records in flight for handling backpressure. Used in conjuction - * with {@link KplMessageHandler#maxInFlightRecordsInitBackoffDuration} + * Configure maximum records in flight for handling backpressure. Used together with + * {@link KplMessageHandler#maxInFlightRecordsInitialBackoffDuration} * @param maxInFlightRecords Defaulted to 0. Value of 0 indicates that Backpressure handling is not enabled. * @since 3.0.9 - * @see KplMessageHandler#setMaxInFlightRecordsInitBackoffDuration + * @see KplMessageHandler#setMaxInFlightRecordsInitialBackoffDuration * @see KplMessageHandler#setMaxInFlightRecordsBackoffRate * @see KplMessageHandler#setMaxInFlightRecordsBackoffMaxAttempts */ @@ -139,19 +139,20 @@ public void setMaxInFlightRecords(long maxInFlightRecords) { } /** - * Configure a initial backoff duration period in milliseconds when the number of records in flight is greater than or equal to {@link KplMessageHandler#maxInFlightRecords}. - * The configuration helps in handling backpressure by sleeping the Thread using exponential backoff. Enabled - * when {@link KplMessageHandler#maxInFlightRecords} is greater than 0. - * @param maxInFlightRecordsInitBackoffDuration Initial backoff duration in milliseconds. Default is 100ms. + * Configure initial backoff duration period in milliseconds when the number of records in flight + * is greater than or equal to {@link KplMessageHandler#maxInFlightRecords}. + * The configuration helps in handling backpressure by sleeping the Thread using exponential backoff. + * Enabled when {@link KplMessageHandler#maxInFlightRecords} is greater than 0. + * @param maxInFlightRecordsInitialBackoffDuration Initial backoff duration in milliseconds. Default is 100ms. * @since 3.0.9 * @see KplMessageHandler#setMaxInFlightRecords * @see KplMessageHandler#setMaxInFlightRecordsBackoffRate * @see KplMessageHandler#setMaxInFlightRecordsBackoffMaxAttempts */ - public void setMaxInFlightRecordsInitBackoffDuration(int maxInFlightRecordsInitBackoffDuration) { - Assert.isTrue(maxInFlightRecordsInitBackoffDuration > 0, + public void setMaxInFlightRecordsInitialBackoffDuration(int maxInFlightRecordsInitialBackoffDuration) { + Assert.isTrue(maxInFlightRecordsInitialBackoffDuration > 0, "'maxInFlightRecordsBackoffDuration must be greater than 0."); - this.maxInFlightRecordsInitBackoffDuration = maxInFlightRecordsInitBackoffDuration; + this.maxInFlightRecordsInitialBackoffDuration = maxInFlightRecordsInitialBackoffDuration; } /** @@ -159,7 +160,7 @@ public void setMaxInFlightRecordsInitBackoffDuration(int maxInFlightRecordsInitB * @param maxInFlightRecordsBackoffRate Exponential back off rate. Default is 2 * @since 3.0.9 * @see KplMessageHandler#setMaxInFlightRecords - * @see KplMessageHandler#setMaxInFlightRecordsInitBackoffDuration + * @see KplMessageHandler#setMaxInFlightRecordsInitialBackoffDuration * @see KplMessageHandler#setMaxInFlightRecordsBackoffMaxAttempts */ public void setMaxInFlightRecordsBackoffRate(int maxInFlightRecordsBackoffRate) { @@ -172,7 +173,7 @@ public void setMaxInFlightRecordsBackoffRate(int maxInFlightRecordsBackoffRate) * @param maxInFlightRecordsBackoffMaxAttempts maximum of exponential retry attempts to waiting for capacity. * @since 3.0.9 * @see KplMessageHandler#setMaxInFlightRecords - * @see KplMessageHandler#setMaxInFlightRecordsInitBackoffDuration + * @see KplMessageHandler#setMaxInFlightRecordsInitialBackoffDuration * @see KplMessageHandler#setMaxInFlightRecordsBackoffRate */ public void setMaxInFlightRecordsBackoffMaxAttempts(int maxInFlightRecordsBackoffMaxAttempts) { @@ -433,7 +434,7 @@ private void setGlueSchemaIntoUserRecordIfAny(UserRecord userRecord, Message } private CompletableFuture handleUserRecord(UserRecord userRecord) { - if (this.maxInFlightRecords != -1) { + if (this.maxInFlightRecords > 0) { waitForCapacityInUserRecordsBuffer(); } @@ -445,7 +446,7 @@ private CompletableFuture handleUserRecord(UserRecord userRe private void waitForCapacityInUserRecordsBuffer() { var kplOutstandingRecordsCount = this.kinesisProducer.getOutstandingRecordsCount(); var attempts = 1; - var sleepDuration = this.maxInFlightRecordsInitBackoffDuration; + var sleepDuration = this.maxInFlightRecordsInitialBackoffDuration; while (kplOutstandingRecordsCount >= this.maxInFlightRecords && attempts <= this.maxInFlightRecordsBackoffMaxAttempts) { try { diff --git a/src/test/java/org/springframework/integration/aws/outbound/KplMessageHandlerTests.java b/src/test/java/org/springframework/integration/aws/outbound/KplMessageHandlerTests.java index f707369d..5f99f2c2 100644 --- a/src/test/java/org/springframework/integration/aws/outbound/KplMessageHandlerTests.java +++ b/src/test/java/org/springframework/integration/aws/outbound/KplMessageHandlerTests.java @@ -79,7 +79,7 @@ void testKPLMessageHandler_raw_payload_success() { this.kinesisSendChannel.send(message); verify(this.kinesisProducer).addUserRecord(userRecordRequestArgumentCaptor.capture()); - + verify(this.kinesisProducer, Mockito.times(0)).getOutstandingRecordsCount(); UserRecord userRecord = userRecordRequestArgumentCaptor.getValue(); assertThat(userRecord.getStreamName()).isEqualTo("foo"); assertThat(userRecord.getPartitionKey()).isEqualTo("fooKey"); @@ -92,7 +92,7 @@ void testKPLMessageHandler_raw_payload_backpressure_capacity_available_Test() { given(this.kinesisProducer.addUserRecord(any(UserRecord.class))) .willReturn(mock(ListenableFuture.class)); this.kplMessageHandler.setMaxInFlightRecords(2); - this.kplMessageHandler.setMaxInFlightRecordsInitBackoffDuration(100); + this.kplMessageHandler.setMaxInFlightRecordsInitialBackoffDuration(100); this.kplMessageHandler.setMaxInFlightRecordsBackoffMaxAttempts(2); this.kplMessageHandler.setMaxInFlightRecordsBackoffRate(2); given(this.kinesisProducer.getOutstandingRecordsCount()) @@ -119,11 +119,11 @@ void testKPLMessageHandler_raw_payload_backpressure_capacity_available_Test() { @Test @SuppressWarnings("unchecked") - void testKPLMessageHandler_raw_payload_backpressure_insuff_capacity_test() { + void testKPLMessageHandler_raw_payload_backpressure_insufficient_capacity_test() { given(this.kinesisProducer.addUserRecord(any(UserRecord.class))) .willReturn(mock(ListenableFuture.class)); this.kplMessageHandler.setMaxInFlightRecords(2); - this.kplMessageHandler.setMaxInFlightRecordsInitBackoffDuration(100); + this.kplMessageHandler.setMaxInFlightRecordsInitialBackoffDuration(100); this.kplMessageHandler.setMaxInFlightRecordsBackoffMaxAttempts(2); this.kplMessageHandler.setMaxInFlightRecordsBackoffRate(2); given(this.kinesisProducer.getOutstandingRecordsCount()) @@ -157,7 +157,7 @@ void testKPLMessageHandler_raw_payload_backpressure_failure_test() { given(this.kinesisProducer.addUserRecord(any(UserRecord.class))) .willReturn(mock(ListenableFuture.class)); this.kplMessageHandler.setMaxInFlightRecords(2); - this.kplMessageHandler.setMaxInFlightRecordsInitBackoffDuration(100); + this.kplMessageHandler.setMaxInFlightRecordsInitialBackoffDuration(100); this.kplMessageHandler.setMaxInFlightRecordsBackoffMaxAttempts(2); this.kplMessageHandler.setMaxInFlightRecordsBackoffRate(2); given(this.kinesisProducer.getOutstandingRecordsCount()) @@ -171,9 +171,6 @@ void testKPLMessageHandler_raw_payload_backpressure_failure_test() { .setHeader("foo", "bar") .build(); - ArgumentCaptor userRecordRequestArgumentCaptor = ArgumentCaptor - .forClass(UserRecord.class); - try { this.kinesisSendChannel.send(message); } From fc96e7e8835b82a79e06e081f716fab88ab828f2 Mon Sep 17 00:00:00 2001 From: Siddharth Jain Date: Tue, 24 Dec 2024 09:00:21 +0530 Subject: [PATCH 10/20] GH-249 Removing Blank lines from Java doc in KPL Message Handler. --- .../integration/aws/outbound/KplMessageHandler.java | 1 - 1 file changed, 1 deletion(-) diff --git a/src/main/java/org/springframework/integration/aws/outbound/KplMessageHandler.java b/src/main/java/org/springframework/integration/aws/outbound/KplMessageHandler.java index be8a4acd..12ab5c5f 100644 --- a/src/main/java/org/springframework/integration/aws/outbound/KplMessageHandler.java +++ b/src/main/java/org/springframework/integration/aws/outbound/KplMessageHandler.java @@ -182,7 +182,6 @@ public void setMaxInFlightRecordsBackoffMaxAttempts(int maxInFlightRecordsBackof /** * Configure a {@link MessageConverter} for converting payload to {@code byte[]} for Kinesis record. - * * @param messageConverter the {@link MessageConverter} to use. * @since 2.3 */ From 3e6d1782503aa23d06b6fe66ae06af7c8faba482 Mon Sep 17 00:00:00 2001 From: Siddharth Jain Date: Fri, 27 Dec 2024 18:50:04 +0530 Subject: [PATCH 11/20] GH-249 Using mockito mock method that doesnt require args, --- .../aws/outbound/KplMessageHandlerTests.java | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/src/test/java/org/springframework/integration/aws/outbound/KplMessageHandlerTests.java b/src/test/java/org/springframework/integration/aws/outbound/KplMessageHandlerTests.java index 5f99f2c2..0d52a5c2 100644 --- a/src/test/java/org/springframework/integration/aws/outbound/KplMessageHandlerTests.java +++ b/src/test/java/org/springframework/integration/aws/outbound/KplMessageHandlerTests.java @@ -18,7 +18,6 @@ import com.amazonaws.services.kinesis.producer.KinesisProducer; import com.amazonaws.services.kinesis.producer.UserRecord; -import com.google.common.util.concurrent.ListenableFuture; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; @@ -65,7 +64,7 @@ public class KplMessageHandlerTests { @SuppressWarnings("unchecked") void testKPLMessageHandler_raw_payload_success() { given(this.kinesisProducer.addUserRecord(any(UserRecord.class))) - .willReturn(mock(ListenableFuture.class)); + .willReturn(mock()); final Message message = MessageBuilder .withPayload("message1") .setHeader(AwsHeaders.PARTITION_KEY, "fooKey") @@ -90,7 +89,7 @@ void testKPLMessageHandler_raw_payload_success() { @SuppressWarnings("unchecked") void testKPLMessageHandler_raw_payload_backpressure_capacity_available_Test() { given(this.kinesisProducer.addUserRecord(any(UserRecord.class))) - .willReturn(mock(ListenableFuture.class)); + .willReturn(mock()); this.kplMessageHandler.setMaxInFlightRecords(2); this.kplMessageHandler.setMaxInFlightRecordsInitialBackoffDuration(100); this.kplMessageHandler.setMaxInFlightRecordsBackoffMaxAttempts(2); @@ -121,7 +120,7 @@ void testKPLMessageHandler_raw_payload_backpressure_capacity_available_Test() { @SuppressWarnings("unchecked") void testKPLMessageHandler_raw_payload_backpressure_insufficient_capacity_test() { given(this.kinesisProducer.addUserRecord(any(UserRecord.class))) - .willReturn(mock(ListenableFuture.class)); + .willReturn(mock()); this.kplMessageHandler.setMaxInFlightRecords(2); this.kplMessageHandler.setMaxInFlightRecordsInitialBackoffDuration(100); this.kplMessageHandler.setMaxInFlightRecordsBackoffMaxAttempts(2); @@ -155,7 +154,7 @@ void testKPLMessageHandler_raw_payload_backpressure_insufficient_capacity_test() @SuppressWarnings("unchecked") void testKPLMessageHandler_raw_payload_backpressure_failure_test() { given(this.kinesisProducer.addUserRecord(any(UserRecord.class))) - .willReturn(mock(ListenableFuture.class)); + .willReturn(mock()); this.kplMessageHandler.setMaxInFlightRecords(2); this.kplMessageHandler.setMaxInFlightRecordsInitialBackoffDuration(100); this.kplMessageHandler.setMaxInFlightRecordsBackoffMaxAttempts(2); @@ -196,7 +195,7 @@ public static class ContextConfiguration { @Bean public KinesisProducer kinesisProducer() { - return mock(KinesisProducer.class); + return mock(); } @Bean From 33657f7b35b74e5a66585a9a6b03baee77e3ccf0 Mon Sep 17 00:00:00 2001 From: Siddharth Jain Date: Sun, 29 Dec 2024 19:43:06 +0530 Subject: [PATCH 12/20] GH-249 Simplication of backpressure handling, introduced KPLBackpressureException. --- .../aws/outbound/KplMessageHandler.java | 116 +++--------------- .../aws/support/KPLBackpressureException.java | 33 +++++ .../aws/outbound/KplMessageHandlerTests.java | 55 ++------- 3 files changed, 60 insertions(+), 144 deletions(-) create mode 100644 src/main/java/org/springframework/integration/aws/support/KPLBackpressureException.java diff --git a/src/main/java/org/springframework/integration/aws/outbound/KplMessageHandler.java b/src/main/java/org/springframework/integration/aws/outbound/KplMessageHandler.java index 12ab5c5f..91787434 100644 --- a/src/main/java/org/springframework/integration/aws/outbound/KplMessageHandler.java +++ b/src/main/java/org/springframework/integration/aws/outbound/KplMessageHandler.java @@ -49,6 +49,7 @@ import org.springframework.expression.Expression; import org.springframework.expression.common.LiteralExpression; import org.springframework.integration.aws.support.AwsHeaders; +import org.springframework.integration.aws.support.KPLBackpressureException; import org.springframework.integration.aws.support.UserRecordResponse; import org.springframework.integration.expression.ValueExpression; import org.springframework.integration.handler.AbstractMessageHandler; @@ -100,13 +101,7 @@ public class KplMessageHandler extends AbstractAwsMessageHandler implement private volatile ScheduledFuture flushFuture; - private long maxInFlightRecords = 0; - - private int maxInFlightRecordsInitialBackoffDuration = 100; - - private int maxInFlightRecordsBackoffRate = 2; - - private int maxInFlightRecordsBackoffMaxAttempts = 3; + private long backPressureThreshold = 0; public KplMessageHandler(KinesisProducer kinesisProducer) { Assert.notNull(kinesisProducer, "'kinesisProducer' must not be null."); @@ -125,59 +120,15 @@ public void setConverter(Converter converter) { } /** - * Configure maximum records in flight for handling backpressure. Used together with - * {@link KplMessageHandler#maxInFlightRecordsInitialBackoffDuration} - * @param maxInFlightRecords Defaulted to 0. Value of 0 indicates that Backpressure handling is not enabled. - * @since 3.0.9 - * @see KplMessageHandler#setMaxInFlightRecordsInitialBackoffDuration - * @see KplMessageHandler#setMaxInFlightRecordsBackoffRate - * @see KplMessageHandler#setMaxInFlightRecordsBackoffMaxAttempts - */ - public void setMaxInFlightRecords(long maxInFlightRecords) { - Assert.isTrue(maxInFlightRecords > 0, "'maxInFlightRecords must be greater than 0."); - this.maxInFlightRecords = maxInFlightRecords; - } - - /** - * Configure initial backoff duration period in milliseconds when the number of records in flight - * is greater than or equal to {@link KplMessageHandler#maxInFlightRecords}. - * The configuration helps in handling backpressure by sleeping the Thread using exponential backoff. - * Enabled when {@link KplMessageHandler#maxInFlightRecords} is greater than 0. - * @param maxInFlightRecordsInitialBackoffDuration Initial backoff duration in milliseconds. Default is 100ms. - * @since 3.0.9 - * @see KplMessageHandler#setMaxInFlightRecords - * @see KplMessageHandler#setMaxInFlightRecordsBackoffRate - * @see KplMessageHandler#setMaxInFlightRecordsBackoffMaxAttempts - */ - public void setMaxInFlightRecordsInitialBackoffDuration(int maxInFlightRecordsInitialBackoffDuration) { - Assert.isTrue(maxInFlightRecordsInitialBackoffDuration > 0, - "'maxInFlightRecordsBackoffDuration must be greater than 0."); - this.maxInFlightRecordsInitialBackoffDuration = maxInFlightRecordsInitialBackoffDuration; - } - - /** - * Configure exponential backoff rate when handling backpressure using {@link KplMessageHandler#maxInFlightRecords}. - * @param maxInFlightRecordsBackoffRate Exponential back off rate. Default is 2 - * @since 3.0.9 - * @see KplMessageHandler#setMaxInFlightRecords - * @see KplMessageHandler#setMaxInFlightRecordsInitialBackoffDuration - * @see KplMessageHandler#setMaxInFlightRecordsBackoffMaxAttempts - */ - public void setMaxInFlightRecordsBackoffRate(int maxInFlightRecordsBackoffRate) { - this.maxInFlightRecordsBackoffRate = maxInFlightRecordsBackoffRate; - } - - /** - * Configure maximum number of retry attempts with exponential backoff until there is available capacity. On - * maximum attempts exhausted, RunTimeException is thrown. - * @param maxInFlightRecordsBackoffMaxAttempts maximum of exponential retry attempts to waiting for capacity. + * Configure maximum records in flight for handling backpressure. By Default, backpressure handling is not enabled. + * On number of records flights exceeding the threshold, {@link KPLBackpressureException} would be thrown. + * If Backpressure handling is enabled, {@link KPLBackpressureException} must be handled. + * @param backPressureThreshold Defaulted to 0. Set a value greater than 0 to enable backpressure handling. * @since 3.0.9 - * @see KplMessageHandler#setMaxInFlightRecords - * @see KplMessageHandler#setMaxInFlightRecordsInitialBackoffDuration - * @see KplMessageHandler#setMaxInFlightRecordsBackoffRate */ - public void setMaxInFlightRecordsBackoffMaxAttempts(int maxInFlightRecordsBackoffMaxAttempts) { - this.maxInFlightRecordsBackoffMaxAttempts = maxInFlightRecordsBackoffMaxAttempts; + public void setBackPressureThreshold(long backPressureThreshold) { + Assert.isTrue(backPressureThreshold > 0, "'maxInFlightRecords must be greater than 0."); + this.backPressureThreshold = backPressureThreshold; } /** @@ -432,9 +383,17 @@ private void setGlueSchemaIntoUserRecordIfAny(UserRecord userRecord, Message } } - private CompletableFuture handleUserRecord(UserRecord userRecord) { - if (this.maxInFlightRecords > 0) { - waitForCapacityInUserRecordsBuffer(); + private CompletableFuture handleUserRecord(UserRecord userRecord) + throws KPLBackpressureException { + + if (this.backPressureThreshold > 0) { + var numberOfRecordsInFlight = this.kinesisProducer.getOutstandingRecordsCount(); + if (numberOfRecordsInFlight > this.backPressureThreshold) { + logger.error(String.format("Backpressure handling is enabled, Number of records in flight: %s is " + + "greater than backpressure threshold: %s" + + ".", numberOfRecordsInFlight, this.backPressureThreshold)); + throw new KPLBackpressureException("Buffer already at max capacity."); + } } ListenableFuture recordResult = this.kinesisProducer.addUserRecord(userRecord); @@ -442,41 +401,6 @@ private CompletableFuture handleUserRecord(UserRecord userRe .thenApply(UserRecordResponse::new); } - private void waitForCapacityInUserRecordsBuffer() { - var kplOutstandingRecordsCount = this.kinesisProducer.getOutstandingRecordsCount(); - var attempts = 1; - var sleepDuration = this.maxInFlightRecordsInitialBackoffDuration; - while (kplOutstandingRecordsCount >= this.maxInFlightRecords && - attempts <= this.maxInFlightRecordsBackoffMaxAttempts) { - try { - logger.info("Buffer size: {} has reached the max records limit of {}. Attempt# {}".formatted( - kplOutstandingRecordsCount, this.maxInFlightRecords)); - logger.info("Buffer sleeping for {} ms".formatted( - sleepDuration)); - Thread.sleep(sleepDuration); - } - catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - } - finally { - sleepDuration = sleepDuration * this.maxInFlightRecordsBackoffRate; - attempts++; - kplOutstandingRecordsCount = this.kinesisProducer.getOutstandingRecordsCount(); - } - } - - if (kplOutstandingRecordsCount < this.maxInFlightRecords) { - logger.info("Buffer cleared on number of attempts: {}".formatted(attempts)); - return; - } - - if (attempts > this.maxInFlightRecordsBackoffMaxAttempts) { - logger.error("Buffer not cleared after maximum {} number of attempts & {} sleepDuration".formatted(attempts, - sleepDuration)); - throw new RuntimeException("Buffer already at max capacity."); - } - } - private PutRecordRequest buildPutRecordRequest(Message message) { Object payload = message.getPayload(); diff --git a/src/main/java/org/springframework/integration/aws/support/KPLBackpressureException.java b/src/main/java/org/springframework/integration/aws/support/KPLBackpressureException.java new file mode 100644 index 00000000..d59653e2 --- /dev/null +++ b/src/main/java/org/springframework/integration/aws/support/KPLBackpressureException.java @@ -0,0 +1,33 @@ +/* + * Copyright 2017-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.aws.support; + +import org.springframework.messaging.MessagingException; + +/** + * An exception triggered from {@link org.springframework.integration.aws.outbound.KplMessageHandler} while sending + * records to kinesis when maximum number of records in flight exceeds the backpressure threshold. + * @author Siddharth Jain + * @since 3.0.9 + */ +public class KPLBackpressureException extends MessagingException { + private static final long serialVersionUID = 1L; + + public KPLBackpressureException(String message) { + super(message); + } +} diff --git a/src/test/java/org/springframework/integration/aws/outbound/KplMessageHandlerTests.java b/src/test/java/org/springframework/integration/aws/outbound/KplMessageHandlerTests.java index 0d52a5c2..4d2a87a1 100644 --- a/src/test/java/org/springframework/integration/aws/outbound/KplMessageHandlerTests.java +++ b/src/test/java/org/springframework/integration/aws/outbound/KplMessageHandlerTests.java @@ -28,6 +28,7 @@ import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.aws.support.AwsHeaders; +import org.springframework.integration.aws.support.KPLBackpressureException; import org.springframework.integration.config.EnableIntegration; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; @@ -87,13 +88,10 @@ void testKPLMessageHandler_raw_payload_success() { @Test @SuppressWarnings("unchecked") - void testKPLMessageHandler_raw_payload_backpressure_capacity_available_Test() { + void testKPLMessageHandler_raw_payload_backpressure_capacity_available_test() { given(this.kinesisProducer.addUserRecord(any(UserRecord.class))) .willReturn(mock()); - this.kplMessageHandler.setMaxInFlightRecords(2); - this.kplMessageHandler.setMaxInFlightRecordsInitialBackoffDuration(100); - this.kplMessageHandler.setMaxInFlightRecordsBackoffMaxAttempts(2); - this.kplMessageHandler.setMaxInFlightRecordsBackoffRate(2); + this.kplMessageHandler.setBackPressureThreshold(2); given(this.kinesisProducer.getOutstandingRecordsCount()) .willReturn(1); final Message message = MessageBuilder @@ -121,48 +119,9 @@ void testKPLMessageHandler_raw_payload_backpressure_capacity_available_Test() { void testKPLMessageHandler_raw_payload_backpressure_insufficient_capacity_test() { given(this.kinesisProducer.addUserRecord(any(UserRecord.class))) .willReturn(mock()); - this.kplMessageHandler.setMaxInFlightRecords(2); - this.kplMessageHandler.setMaxInFlightRecordsInitialBackoffDuration(100); - this.kplMessageHandler.setMaxInFlightRecordsBackoffMaxAttempts(2); - this.kplMessageHandler.setMaxInFlightRecordsBackoffRate(2); + this.kplMessageHandler.setBackPressureThreshold(2); given(this.kinesisProducer.getOutstandingRecordsCount()) - .willReturn(3) - .willReturn(2) - .willReturn(1) - .willReturn(0); - final Message message = MessageBuilder - .withPayload("message1") - .setHeader(AwsHeaders.PARTITION_KEY, "fooKey") - .setHeader(AwsHeaders.SEQUENCE_NUMBER, "10") - .setHeader("foo", "bar") - .build(); - - - ArgumentCaptor userRecordRequestArgumentCaptor = ArgumentCaptor - .forClass(UserRecord.class); - - this.kinesisSendChannel.send(message); - verify(this.kinesisProducer).addUserRecord(userRecordRequestArgumentCaptor.capture()); - verify(this.kinesisProducer, Mockito.times(3)).getOutstandingRecordsCount(); - UserRecord userRecord = userRecordRequestArgumentCaptor.getValue(); - assertThat(userRecord.getStreamName()).isEqualTo("foo"); - assertThat(userRecord.getPartitionKey()).isEqualTo("fooKey"); - assertThat(userRecord.getExplicitHashKey()).isNull(); - } - - @Test - @SuppressWarnings("unchecked") - void testKPLMessageHandler_raw_payload_backpressure_failure_test() { - given(this.kinesisProducer.addUserRecord(any(UserRecord.class))) - .willReturn(mock()); - this.kplMessageHandler.setMaxInFlightRecords(2); - this.kplMessageHandler.setMaxInFlightRecordsInitialBackoffDuration(100); - this.kplMessageHandler.setMaxInFlightRecordsBackoffMaxAttempts(2); - this.kplMessageHandler.setMaxInFlightRecordsBackoffRate(2); - given(this.kinesisProducer.getOutstandingRecordsCount()) - .willReturn(5) - .willReturn(4) - .willReturn(3); + .willReturn(5); final Message message = MessageBuilder .withPayload("message1") .setHeader(AwsHeaders.PARTITION_KEY, "fooKey") @@ -176,12 +135,12 @@ void testKPLMessageHandler_raw_payload_backpressure_failure_test() { catch (Exception ex) { assertThat(ex).isNotNull(); assertThat(ex.getCause()).isNotNull(); - assertThat(ex.getCause().getClass()).isEqualTo(RuntimeException.class); + assertThat(ex.getCause().getClass()).isEqualTo(KPLBackpressureException.class); assertThat(ex.getCause().getMessage()).isEqualTo("Buffer already at max capacity."); } verify(this.kinesisProducer, Mockito.times(0)).addUserRecord(any(UserRecord.class)); - verify(this.kinesisProducer, Mockito.times(3)).getOutstandingRecordsCount(); + verify(this.kinesisProducer, Mockito.times(1)).getOutstandingRecordsCount(); } @AfterEach From 4d4fcf826972dbc93ffd15f8d1cd77a4057ab3af Mon Sep 17 00:00:00 2001 From: Siddharth Jain Date: Sun, 29 Dec 2024 19:45:07 +0530 Subject: [PATCH 13/20] GH-249 Javadoc corrections. --- .../integration/aws/outbound/KplMessageHandler.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/org/springframework/integration/aws/outbound/KplMessageHandler.java b/src/main/java/org/springframework/integration/aws/outbound/KplMessageHandler.java index 91787434..3fed27ec 100644 --- a/src/main/java/org/springframework/integration/aws/outbound/KplMessageHandler.java +++ b/src/main/java/org/springframework/integration/aws/outbound/KplMessageHandler.java @@ -121,13 +121,13 @@ public void setConverter(Converter converter) { /** * Configure maximum records in flight for handling backpressure. By Default, backpressure handling is not enabled. - * On number of records flights exceeding the threshold, {@link KPLBackpressureException} would be thrown. + * On number of records in flight exceeding the threshold, {@link KPLBackpressureException} would be thrown. * If Backpressure handling is enabled, {@link KPLBackpressureException} must be handled. * @param backPressureThreshold Defaulted to 0. Set a value greater than 0 to enable backpressure handling. * @since 3.0.9 */ public void setBackPressureThreshold(long backPressureThreshold) { - Assert.isTrue(backPressureThreshold > 0, "'maxInFlightRecords must be greater than 0."); + Assert.isTrue(backPressureThreshold > 0, "'backPressureThreshold must be greater than 0."); this.backPressureThreshold = backPressureThreshold; } From 186314e81635b9af11892fc6a45fb14bf7dbaf67 Mon Sep 17 00:00:00 2001 From: Siddharth Jain Date: Sun, 29 Dec 2024 22:57:40 +0530 Subject: [PATCH 14/20] GH-249 Code Review comments addressed. Added Test cases with RetryAdvice. --- .../aws/outbound/KplMessageHandler.java | 23 +++--- ...ion.java => KplBackpressureException.java} | 20 ++++- .../aws/outbound/KplMessageHandlerTests.java | 82 +++++++++++-------- 3 files changed, 77 insertions(+), 48 deletions(-) rename src/main/java/org/springframework/integration/aws/support/{KPLBackpressureException.java => KplBackpressureException.java} (68%) diff --git a/src/main/java/org/springframework/integration/aws/outbound/KplMessageHandler.java b/src/main/java/org/springframework/integration/aws/outbound/KplMessageHandler.java index 3fed27ec..9c28788b 100644 --- a/src/main/java/org/springframework/integration/aws/outbound/KplMessageHandler.java +++ b/src/main/java/org/springframework/integration/aws/outbound/KplMessageHandler.java @@ -49,7 +49,7 @@ import org.springframework.expression.Expression; import org.springframework.expression.common.LiteralExpression; import org.springframework.integration.aws.support.AwsHeaders; -import org.springframework.integration.aws.support.KPLBackpressureException; +import org.springframework.integration.aws.support.KplBackpressureException; import org.springframework.integration.aws.support.UserRecordResponse; import org.springframework.integration.expression.ValueExpression; import org.springframework.integration.handler.AbstractMessageHandler; @@ -67,6 +67,8 @@ * The {@link AbstractMessageHandler} implementation for the Amazon Kinesis Producer * Library {@code putRecord(s)}. * + * @exception KplBackpressureException When backpressure handling is enabled and buffer is at max capacity. + * * @author Arnaud Lecollaire * @author Artem Bilan * @author Siddharth Jain @@ -121,13 +123,16 @@ public void setConverter(Converter converter) { /** * Configure maximum records in flight for handling backpressure. By Default, backpressure handling is not enabled. - * On number of records in flight exceeding the threshold, {@link KPLBackpressureException} would be thrown. - * If Backpressure handling is enabled, {@link KPLBackpressureException} must be handled. + * On number of records in flight exceeding the threshold, {@link KplBackpressureException} would be thrown. + * If Backpressure handling is enabled, {@link KplBackpressureException} must be handled. + * * @param backPressureThreshold Defaulted to 0. Set a value greater than 0 to enable backpressure handling. + * * @since 3.0.9 */ public void setBackPressureThreshold(long backPressureThreshold) { - Assert.isTrue(backPressureThreshold > 0, "'backPressureThreshold must be greater than 0."); + Assert.isTrue(backPressureThreshold >= 0, + "'backPressureThreshold must be greater than equal to 0."); this.backPressureThreshold = backPressureThreshold; } @@ -383,16 +388,12 @@ private void setGlueSchemaIntoUserRecordIfAny(UserRecord userRecord, Message } } - private CompletableFuture handleUserRecord(UserRecord userRecord) - throws KPLBackpressureException { - + private CompletableFuture handleUserRecord(UserRecord userRecord) { if (this.backPressureThreshold > 0) { var numberOfRecordsInFlight = this.kinesisProducer.getOutstandingRecordsCount(); if (numberOfRecordsInFlight > this.backPressureThreshold) { - logger.error(String.format("Backpressure handling is enabled, Number of records in flight: %s is " + - "greater than backpressure threshold: %s" + - ".", numberOfRecordsInFlight, this.backPressureThreshold)); - throw new KPLBackpressureException("Buffer already at max capacity."); + throw new KplBackpressureException("Cannot send record to kinesis since buffer is at max capacity.", + userRecord); } } diff --git a/src/main/java/org/springframework/integration/aws/support/KPLBackpressureException.java b/src/main/java/org/springframework/integration/aws/support/KplBackpressureException.java similarity index 68% rename from src/main/java/org/springframework/integration/aws/support/KPLBackpressureException.java rename to src/main/java/org/springframework/integration/aws/support/KplBackpressureException.java index d59653e2..fee33ab8 100644 --- a/src/main/java/org/springframework/integration/aws/support/KPLBackpressureException.java +++ b/src/main/java/org/springframework/integration/aws/support/KplBackpressureException.java @@ -16,18 +16,32 @@ package org.springframework.integration.aws.support; -import org.springframework.messaging.MessagingException; +import com.amazonaws.services.kinesis.producer.UserRecord; /** * An exception triggered from {@link org.springframework.integration.aws.outbound.KplMessageHandler} while sending * records to kinesis when maximum number of records in flight exceeds the backpressure threshold. + * * @author Siddharth Jain + * * @since 3.0.9 */ -public class KPLBackpressureException extends MessagingException { +public class KplBackpressureException extends RuntimeException { + private static final long serialVersionUID = 1L; - public KPLBackpressureException(String message) { + private final UserRecord userRecord; + + public KplBackpressureException(String message, UserRecord userRecord) { super(message); + this.userRecord = userRecord; + } + + /** + * Get the {@link UserRecord} related. + * @return {@link UserRecord} linked while sending the record to kinesis. + */ + public UserRecord getUserRecord() { + return this.userRecord; } } diff --git a/src/test/java/org/springframework/integration/aws/outbound/KplMessageHandlerTests.java b/src/test/java/org/springframework/integration/aws/outbound/KplMessageHandlerTests.java index 4d2a87a1..b9150fea 100644 --- a/src/test/java/org/springframework/integration/aws/outbound/KplMessageHandlerTests.java +++ b/src/test/java/org/springframework/integration/aws/outbound/KplMessageHandlerTests.java @@ -28,24 +28,30 @@ import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.aws.support.AwsHeaders; -import org.springframework.integration.aws.support.KPLBackpressureException; +import org.springframework.integration.aws.support.KplBackpressureException; import org.springframework.integration.config.EnableIntegration; +import org.springframework.integration.handler.advice.RequestHandlerRetryAdvice; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; +import org.springframework.messaging.MessageHandlingException; import org.springframework.messaging.support.MessageBuilder; +import org.springframework.retry.support.RetryTemplate; import org.springframework.test.annotation.DirtiesContext; import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatExceptionOfType; import static org.mockito.ArgumentMatchers.any; import static org.mockito.BDDMockito.given; import static org.mockito.Mockito.clearInvocations; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; -/** +/** The class contains test cases for KplMessageHandler. + * * @author Siddharth Jain + * * @since 3.0.9 */ @SpringJUnitConfig @@ -63,42 +69,42 @@ public class KplMessageHandlerTests { @Test @SuppressWarnings("unchecked") - void testKPLMessageHandler_raw_payload_success() { + void kplMessageHandlerWithRawPayloadBackpressureDisabledSuccess() { given(this.kinesisProducer.addUserRecord(any(UserRecord.class))) .willReturn(mock()); final Message message = MessageBuilder - .withPayload("message1") - .setHeader(AwsHeaders.PARTITION_KEY, "fooKey") + .withPayload("someMessage") + .setHeader(AwsHeaders.PARTITION_KEY, "somePartitionKey") .setHeader(AwsHeaders.SEQUENCE_NUMBER, "10") - .setHeader("foo", "bar") + .setHeader("someHeaderKey", "someHeaderValue") .build(); ArgumentCaptor userRecordRequestArgumentCaptor = ArgumentCaptor .forClass(UserRecord.class); - + this.kplMessageHandler.setBackPressureThreshold(0); this.kinesisSendChannel.send(message); verify(this.kinesisProducer).addUserRecord(userRecordRequestArgumentCaptor.capture()); - verify(this.kinesisProducer, Mockito.times(0)).getOutstandingRecordsCount(); + verify(this.kinesisProducer, Mockito.never()).getOutstandingRecordsCount(); UserRecord userRecord = userRecordRequestArgumentCaptor.getValue(); - assertThat(userRecord.getStreamName()).isEqualTo("foo"); - assertThat(userRecord.getPartitionKey()).isEqualTo("fooKey"); + assertThat(userRecord.getStreamName()).isEqualTo("someStream"); + assertThat(userRecord.getPartitionKey()).isEqualTo("somePartitionKey"); assertThat(userRecord.getExplicitHashKey()).isNull(); } @Test @SuppressWarnings("unchecked") - void testKPLMessageHandler_raw_payload_backpressure_capacity_available_test() { + void kplMessageHandlerWithRawPayloadBackpressureEnabledCapacityAvailable() { given(this.kinesisProducer.addUserRecord(any(UserRecord.class))) .willReturn(mock()); this.kplMessageHandler.setBackPressureThreshold(2); given(this.kinesisProducer.getOutstandingRecordsCount()) .willReturn(1); final Message message = MessageBuilder - .withPayload("message1") - .setHeader(AwsHeaders.PARTITION_KEY, "fooKey") + .withPayload("someMessage") + .setHeader(AwsHeaders.PARTITION_KEY, "somePartitionKey") .setHeader(AwsHeaders.SEQUENCE_NUMBER, "10") - .setHeader("foo", "bar") + .setHeader("someHeaderKey", "someHeaderValue") .build(); @@ -107,40 +113,36 @@ void testKPLMessageHandler_raw_payload_backpressure_capacity_available_test() { this.kinesisSendChannel.send(message); verify(this.kinesisProducer).addUserRecord(userRecordRequestArgumentCaptor.capture()); - verify(this.kinesisProducer, Mockito.times(1)).getOutstandingRecordsCount(); + verify(this.kinesisProducer).getOutstandingRecordsCount(); UserRecord userRecord = userRecordRequestArgumentCaptor.getValue(); - assertThat(userRecord.getStreamName()).isEqualTo("foo"); - assertThat(userRecord.getPartitionKey()).isEqualTo("fooKey"); + assertThat(userRecord.getStreamName()).isEqualTo("someStream"); + assertThat(userRecord.getPartitionKey()).isEqualTo("somePartitionKey"); assertThat(userRecord.getExplicitHashKey()).isNull(); } @Test @SuppressWarnings("unchecked") - void testKPLMessageHandler_raw_payload_backpressure_insufficient_capacity_test() { + void kplMessageHandlerWithRawPayloadBackpressureEnabledCapacityInsufficient() { given(this.kinesisProducer.addUserRecord(any(UserRecord.class))) .willReturn(mock()); this.kplMessageHandler.setBackPressureThreshold(2); given(this.kinesisProducer.getOutstandingRecordsCount()) .willReturn(5); final Message message = MessageBuilder - .withPayload("message1") - .setHeader(AwsHeaders.PARTITION_KEY, "fooKey") + .withPayload("someMessage") + .setHeader(AwsHeaders.PARTITION_KEY, "somePartitionKey") .setHeader(AwsHeaders.SEQUENCE_NUMBER, "10") - .setHeader("foo", "bar") + .setHeader("someHeaderKey", "someHeaderValue") .build(); - try { - this.kinesisSendChannel.send(message); - } - catch (Exception ex) { - assertThat(ex).isNotNull(); - assertThat(ex.getCause()).isNotNull(); - assertThat(ex.getCause().getClass()).isEqualTo(KPLBackpressureException.class); - assertThat(ex.getCause().getMessage()).isEqualTo("Buffer already at max capacity."); - } + assertThatExceptionOfType(RuntimeException.class) + .isThrownBy(() -> this.kinesisSendChannel.send(message)) + .withCauseInstanceOf(MessageHandlingException.class) + .withRootCauseExactlyInstanceOf(KplBackpressureException.class) + .withStackTraceContaining("Cannot send record to kinesis since buffer is at max capacity."); - verify(this.kinesisProducer, Mockito.times(0)).addUserRecord(any(UserRecord.class)); - verify(this.kinesisProducer, Mockito.times(1)).getOutstandingRecordsCount(); + verify(this.kinesisProducer, Mockito.never()).addUserRecord(any(UserRecord.class)); + verify(this.kinesisProducer).getOutstandingRecordsCount(); } @AfterEach @@ -158,13 +160,25 @@ public KinesisProducer kinesisProducer() { } @Bean - @ServiceActivator(inputChannel = "kinesisSendChannel") + public RequestHandlerRetryAdvice retryAdvice() { + RequestHandlerRetryAdvice requestHandlerRetryAdvice = new RequestHandlerRetryAdvice(); + requestHandlerRetryAdvice.setRetryTemplate(RetryTemplate.builder() + .retryOn(KplBackpressureException.class) + .exponentialBackoff(100, 2.0, 1000) + .maxAttempts(3) + .build()); + return requestHandlerRetryAdvice; + } + + @Bean + @ServiceActivator(inputChannel = "kinesisSendChannel", adviceChain = {"retryAdvice"}) public MessageHandler kplMessageHandler(KinesisProducer kinesisProducer) { KplMessageHandler kplMessageHandler = new KplMessageHandler(kinesisProducer); kplMessageHandler.setAsync(true); - kplMessageHandler.setStream("foo"); + kplMessageHandler.setStream("someStream"); return kplMessageHandler; } + } } From 3c442c7c1e70f9ff70955449c55adf5f95f0758b Mon Sep 17 00:00:00 2001 From: Siddharth Jain Date: Sun, 29 Dec 2024 23:06:33 +0530 Subject: [PATCH 15/20] GH-249 Added Javadoc for KplBackpressureException in the KplMessageHandler class level. --- .../integration/aws/outbound/KplMessageHandler.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/main/java/org/springframework/integration/aws/outbound/KplMessageHandler.java b/src/main/java/org/springframework/integration/aws/outbound/KplMessageHandler.java index 9c28788b..643bd133 100644 --- a/src/main/java/org/springframework/integration/aws/outbound/KplMessageHandler.java +++ b/src/main/java/org/springframework/integration/aws/outbound/KplMessageHandler.java @@ -65,9 +65,9 @@ /** * The {@link AbstractMessageHandler} implementation for the Amazon Kinesis Producer - * Library {@code putRecord(s)}. - * - * @exception KplBackpressureException When backpressure handling is enabled and buffer is at max capacity. + * Library {@code putRecord(s)}. {@link KplBackpressureException} is thrown When backpressure handling is enabled and + * buffer is at max capacity. The exceptions can be used with + * {@link org.springframework.integration.handler.advice.AbstractRequestHandlerAdvice}. * * @author Arnaud Lecollaire * @author Artem Bilan @@ -78,6 +78,7 @@ * @see KinesisAsyncClient#putRecord(PutRecordRequest) * @see KinesisAsyncClient#putRecords(PutRecordsRequest) * @see com.amazonaws.handlers.AsyncHandler + * @see org.springframework.integration.handler.advice.AbstractRequestHandlerAdvice */ public class KplMessageHandler extends AbstractAwsMessageHandler implements Lifecycle { From b7ae2967e24873e2fcffb20a74731872d48ffb97 Mon Sep 17 00:00:00 2001 From: Siddharth Jain Date: Sun, 29 Dec 2024 23:10:33 +0530 Subject: [PATCH 16/20] GH-249 Corrected all java docs in KplMessageHandler --- .../aws/outbound/KplMessageHandler.java | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/src/main/java/org/springframework/integration/aws/outbound/KplMessageHandler.java b/src/main/java/org/springframework/integration/aws/outbound/KplMessageHandler.java index 643bd133..e9378372 100644 --- a/src/main/java/org/springframework/integration/aws/outbound/KplMessageHandler.java +++ b/src/main/java/org/springframework/integration/aws/outbound/KplMessageHandler.java @@ -139,7 +139,9 @@ public void setBackPressureThreshold(long backPressureThreshold) { /** * Configure a {@link MessageConverter} for converting payload to {@code byte[]} for Kinesis record. + * * @param messageConverter the {@link MessageConverter} to use. + * * @since 2.3 */ public void setMessageConverter(MessageConverter messageConverter) { @@ -194,9 +196,12 @@ public void setSequenceNumberExpression(Expression sequenceNumberExpression) { /** * Specify a {@link OutboundMessageMapper} for embedding message headers into the * record data together with payload. + * * @param embeddedHeadersMapper the {@link OutboundMessageMapper} to embed headers * into the record data. + * * @since 2.0 + * * @see org.springframework.integration.support.json.EmbeddedJsonHeadersMessageMapper */ public void setEmbeddedHeadersMapper(OutboundMessageMapper embeddedHeadersMapper) { @@ -205,7 +210,9 @@ public void setEmbeddedHeadersMapper(OutboundMessageMapper embeddedHeade /** * Configure a {@link Duration} how often to call a {@link KinesisProducer#flush()}. + * * @param flushDuration the {@link Duration} to periodic call of a {@link KinesisProducer#flush()}. + * * @since 2.3.6 */ public void setFlushDuration(Duration flushDuration) { @@ -215,7 +222,9 @@ public void setFlushDuration(Duration flushDuration) { /** * Unsupported operation. Use {@link #setEmbeddedHeadersMapper} instead. + * * @param headerMapper is not used. + * * @see #setEmbeddedHeadersMapper */ @Override @@ -226,8 +235,11 @@ public void setHeaderMapper(HeaderMapper headerMapper) { /** * Set a {@link Schema} to add into a {@link UserRecord} built from the request message. + * * @param glueSchema the {@link Schema} to add into a {@link UserRecord}. + * * @since 2.5.2 + * * @see UserRecord#setSchema(Schema) */ public void setGlueSchema(Schema glueSchema) { @@ -237,8 +249,11 @@ public void setGlueSchema(Schema glueSchema) { /** * Set a SpEL expression for {@link Schema} to add into a {@link UserRecord} * built from the request message. + * * @param glueSchemaExpression the SpEL expression to evaluate a {@link Schema}. + * * @since 2.5.2 + * * @see UserRecord#setSchema(Schema) */ public void setGlueSchemaExpressionString(String glueSchemaExpression) { @@ -248,8 +263,11 @@ public void setGlueSchemaExpressionString(String glueSchemaExpression) { /** * Set a SpEL expression for {@link Schema} to add into a {@link UserRecord} * built from the request message. + * * @param glueSchemaExpression the SpEL expression to evaluate a {@link Schema}. + * * @since 2.5.2 + * * @see UserRecord#setSchema(Schema) */ public void setGlueSchemaExpression(Expression glueSchemaExpression) { From 25f98e612f686892281c81c7e7e76ce852a8b4d9 Mon Sep 17 00:00:00 2001 From: Siddharth Jain Date: Tue, 31 Dec 2024 12:41:54 +0530 Subject: [PATCH 17/20] Revert "GH-249 Corrected all java docs in KplMessageHandler" This reverts commit b7ae2967e24873e2fcffb20a74731872d48ffb97. --- .../aws/outbound/KplMessageHandler.java | 18 ------------------ 1 file changed, 18 deletions(-) diff --git a/src/main/java/org/springframework/integration/aws/outbound/KplMessageHandler.java b/src/main/java/org/springframework/integration/aws/outbound/KplMessageHandler.java index e9378372..643bd133 100644 --- a/src/main/java/org/springframework/integration/aws/outbound/KplMessageHandler.java +++ b/src/main/java/org/springframework/integration/aws/outbound/KplMessageHandler.java @@ -139,9 +139,7 @@ public void setBackPressureThreshold(long backPressureThreshold) { /** * Configure a {@link MessageConverter} for converting payload to {@code byte[]} for Kinesis record. - * * @param messageConverter the {@link MessageConverter} to use. - * * @since 2.3 */ public void setMessageConverter(MessageConverter messageConverter) { @@ -196,12 +194,9 @@ public void setSequenceNumberExpression(Expression sequenceNumberExpression) { /** * Specify a {@link OutboundMessageMapper} for embedding message headers into the * record data together with payload. - * * @param embeddedHeadersMapper the {@link OutboundMessageMapper} to embed headers * into the record data. - * * @since 2.0 - * * @see org.springframework.integration.support.json.EmbeddedJsonHeadersMessageMapper */ public void setEmbeddedHeadersMapper(OutboundMessageMapper embeddedHeadersMapper) { @@ -210,9 +205,7 @@ public void setEmbeddedHeadersMapper(OutboundMessageMapper embeddedHeade /** * Configure a {@link Duration} how often to call a {@link KinesisProducer#flush()}. - * * @param flushDuration the {@link Duration} to periodic call of a {@link KinesisProducer#flush()}. - * * @since 2.3.6 */ public void setFlushDuration(Duration flushDuration) { @@ -222,9 +215,7 @@ public void setFlushDuration(Duration flushDuration) { /** * Unsupported operation. Use {@link #setEmbeddedHeadersMapper} instead. - * * @param headerMapper is not used. - * * @see #setEmbeddedHeadersMapper */ @Override @@ -235,11 +226,8 @@ public void setHeaderMapper(HeaderMapper headerMapper) { /** * Set a {@link Schema} to add into a {@link UserRecord} built from the request message. - * * @param glueSchema the {@link Schema} to add into a {@link UserRecord}. - * * @since 2.5.2 - * * @see UserRecord#setSchema(Schema) */ public void setGlueSchema(Schema glueSchema) { @@ -249,11 +237,8 @@ public void setGlueSchema(Schema glueSchema) { /** * Set a SpEL expression for {@link Schema} to add into a {@link UserRecord} * built from the request message. - * * @param glueSchemaExpression the SpEL expression to evaluate a {@link Schema}. - * * @since 2.5.2 - * * @see UserRecord#setSchema(Schema) */ public void setGlueSchemaExpressionString(String glueSchemaExpression) { @@ -263,11 +248,8 @@ public void setGlueSchemaExpressionString(String glueSchemaExpression) { /** * Set a SpEL expression for {@link Schema} to add into a {@link UserRecord} * built from the request message. - * * @param glueSchemaExpression the SpEL expression to evaluate a {@link Schema}. - * * @since 2.5.2 - * * @see UserRecord#setSchema(Schema) */ public void setGlueSchemaExpression(Expression glueSchemaExpression) { From 974d0fc15c060e3e5d4287d2623ba1cfb04986df Mon Sep 17 00:00:00 2001 From: Siddharth Jain Date: Tue, 31 Dec 2024 13:17:51 +0530 Subject: [PATCH 18/20] GH-249 Javadoc related code review actions fixed. --- .../aws/outbound/KplMessageHandler.java | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/src/main/java/org/springframework/integration/aws/outbound/KplMessageHandler.java b/src/main/java/org/springframework/integration/aws/outbound/KplMessageHandler.java index 643bd133..82c4870f 100644 --- a/src/main/java/org/springframework/integration/aws/outbound/KplMessageHandler.java +++ b/src/main/java/org/springframework/integration/aws/outbound/KplMessageHandler.java @@ -64,10 +64,12 @@ import org.springframework.util.StringUtils; /** - * The {@link AbstractMessageHandler} implementation for the Amazon Kinesis Producer - * Library {@code putRecord(s)}. {@link KplBackpressureException} is thrown When backpressure handling is enabled and - * buffer is at max capacity. The exceptions can be used with + * The {@link AbstractMessageHandler} implementation for the Amazon Kinesis Producer Library {@code putRecord(s)}. + *

+ * {@link KplBackpressureException} is thrown when backpressure handling is enabled and buffer is at max capacity. + * The exception can be handled with * {@link org.springframework.integration.handler.advice.AbstractRequestHandlerAdvice}. + *

* * @author Arnaud Lecollaire * @author Artem Bilan @@ -78,7 +80,6 @@ * @see KinesisAsyncClient#putRecord(PutRecordRequest) * @see KinesisAsyncClient#putRecords(PutRecordsRequest) * @see com.amazonaws.handlers.AsyncHandler - * @see org.springframework.integration.handler.advice.AbstractRequestHandlerAdvice */ public class KplMessageHandler extends AbstractAwsMessageHandler implements Lifecycle { @@ -123,12 +124,10 @@ public void setConverter(Converter converter) { } /** - * Configure maximum records in flight for handling backpressure. By Default, backpressure handling is not enabled. - * On number of records in flight exceeding the threshold, {@link KplBackpressureException} would be thrown. - * If Backpressure handling is enabled, {@link KplBackpressureException} must be handled. - * + * Configure maximum records in flight for handling backpressure. By default, backpressure handling is not enabled. + * When backpressure handling is enabled and number of records in flight exceeds the threshold, a + * {@link KplBackpressureException} would be thrown. * @param backPressureThreshold Defaulted to 0. Set a value greater than 0 to enable backpressure handling. - * * @since 3.0.9 */ public void setBackPressureThreshold(long backPressureThreshold) { @@ -434,7 +433,7 @@ private PutRecordRequest buildPutRecordRequest(Message message) { partitionKey = this.partitionKeyExpression.getValue(getEvaluationContext(), message, String.class); } Assert.state(partitionKey != null, - "'partitionKey' must not be null for sending a Kinesis record. " + "'partitionKey' must not be null for sending a Kinesis record." + "Consider configuring this handler with a 'partitionKey'( or 'partitionKeyExpression') " + "or supply an 'aws_partitionKey' message header."); From 51027aa346b358220e28e3fa38488734aba0e672 Mon Sep 17 00:00:00 2001 From: Siddharth Jain Date: Thu, 2 Jan 2025 19:06:07 +0530 Subject: [PATCH 19/20] GH-249 Updated Copyright and Javadoc related comments. --- .../aws/outbound/AbstractAwsMessageHandler.java | 2 +- .../integration/aws/outbound/KplMessageHandler.java | 12 ++++++------ .../aws/support/KplBackpressureException.java | 6 +++--- .../aws/outbound/KplMessageHandlerTests.java | 4 ++-- 4 files changed, 12 insertions(+), 12 deletions(-) diff --git a/src/main/java/org/springframework/integration/aws/outbound/AbstractAwsMessageHandler.java b/src/main/java/org/springframework/integration/aws/outbound/AbstractAwsMessageHandler.java index f71c80e1..20df041e 100644 --- a/src/main/java/org/springframework/integration/aws/outbound/AbstractAwsMessageHandler.java +++ b/src/main/java/org/springframework/integration/aws/outbound/AbstractAwsMessageHandler.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2024 the original author or authors. + * Copyright 2017-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/main/java/org/springframework/integration/aws/outbound/KplMessageHandler.java b/src/main/java/org/springframework/integration/aws/outbound/KplMessageHandler.java index 82c4870f..ea9e19ab 100644 --- a/src/main/java/org/springframework/integration/aws/outbound/KplMessageHandler.java +++ b/src/main/java/org/springframework/integration/aws/outbound/KplMessageHandler.java @@ -1,5 +1,5 @@ /* - * Copyright 2019-2024 the original author or authors. + * Copyright 2019-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -66,8 +66,8 @@ /** * The {@link AbstractMessageHandler} implementation for the Amazon Kinesis Producer Library {@code putRecord(s)}. *

- * {@link KplBackpressureException} is thrown when backpressure handling is enabled and buffer is at max capacity. - * The exception can be handled with + * The {@link KplBackpressureException} is thrown when backpressure handling is enabled and buffer is at max capacity. + * This exception can be handled with * {@link org.springframework.integration.handler.advice.AbstractRequestHandlerAdvice}. *

* @@ -127,12 +127,12 @@ public void setConverter(Converter converter) { * Configure maximum records in flight for handling backpressure. By default, backpressure handling is not enabled. * When backpressure handling is enabled and number of records in flight exceeds the threshold, a * {@link KplBackpressureException} would be thrown. - * @param backPressureThreshold Defaulted to 0. Set a value greater than 0 to enable backpressure handling. + * @param backPressureThreshold Set a value greater than 0 to enable backpressure handling. * @since 3.0.9 */ public void setBackPressureThreshold(long backPressureThreshold) { Assert.isTrue(backPressureThreshold >= 0, - "'backPressureThreshold must be greater than equal to 0."); + "'backPressureThreshold must be greater than or equal to 0."); this.backPressureThreshold = backPressureThreshold; } @@ -392,7 +392,7 @@ private CompletableFuture handleUserRecord(UserRecord userRe if (this.backPressureThreshold > 0) { var numberOfRecordsInFlight = this.kinesisProducer.getOutstandingRecordsCount(); if (numberOfRecordsInFlight > this.backPressureThreshold) { - throw new KplBackpressureException("Cannot send record to kinesis since buffer is at max capacity.", + throw new KplBackpressureException("Cannot send record to Kinesis since buffer is at max capacity.", userRecord); } } diff --git a/src/main/java/org/springframework/integration/aws/support/KplBackpressureException.java b/src/main/java/org/springframework/integration/aws/support/KplBackpressureException.java index fee33ab8..f13c5641 100644 --- a/src/main/java/org/springframework/integration/aws/support/KplBackpressureException.java +++ b/src/main/java/org/springframework/integration/aws/support/KplBackpressureException.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2024 the original author or authors. + * Copyright 2025-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,7 +20,7 @@ /** * An exception triggered from {@link org.springframework.integration.aws.outbound.KplMessageHandler} while sending - * records to kinesis when maximum number of records in flight exceeds the backpressure threshold. + * records to Kinesis when maximum number of records in flight exceeds the backpressure threshold. * * @author Siddharth Jain * @@ -39,7 +39,7 @@ public KplBackpressureException(String message, UserRecord userRecord) { /** * Get the {@link UserRecord} related. - * @return {@link UserRecord} linked while sending the record to kinesis. + * @return {@link UserRecord} linked while sending the record to Kinesis. */ public UserRecord getUserRecord() { return this.userRecord; diff --git a/src/test/java/org/springframework/integration/aws/outbound/KplMessageHandlerTests.java b/src/test/java/org/springframework/integration/aws/outbound/KplMessageHandlerTests.java index b9150fea..b9449ae5 100644 --- a/src/test/java/org/springframework/integration/aws/outbound/KplMessageHandlerTests.java +++ b/src/test/java/org/springframework/integration/aws/outbound/KplMessageHandlerTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2019-2024 the original author or authors. + * Copyright 2019-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -139,7 +139,7 @@ void kplMessageHandlerWithRawPayloadBackpressureEnabledCapacityInsufficient() { .isThrownBy(() -> this.kinesisSendChannel.send(message)) .withCauseInstanceOf(MessageHandlingException.class) .withRootCauseExactlyInstanceOf(KplBackpressureException.class) - .withStackTraceContaining("Cannot send record to kinesis since buffer is at max capacity."); + .withStackTraceContaining("Cannot send record to Kinesis since buffer is at max capacity."); verify(this.kinesisProducer, Mockito.never()).addUserRecord(any(UserRecord.class)); verify(this.kinesisProducer).getOutstandingRecordsCount(); From 12f757283f393143d00337dc8305fb4e45fecd7e Mon Sep 17 00:00:00 2001 From: Siddharth Jain Date: Thu, 2 Jan 2025 19:07:24 +0530 Subject: [PATCH 20/20] GH-249 Updated Copyright and Javadoc related comments. Revert in AbstractAwsMessageHandler. --- .../integration/aws/outbound/AbstractAwsMessageHandler.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/org/springframework/integration/aws/outbound/AbstractAwsMessageHandler.java b/src/main/java/org/springframework/integration/aws/outbound/AbstractAwsMessageHandler.java index 20df041e..f71c80e1 100644 --- a/src/main/java/org/springframework/integration/aws/outbound/AbstractAwsMessageHandler.java +++ b/src/main/java/org/springframework/integration/aws/outbound/AbstractAwsMessageHandler.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2025 the original author or authors. + * Copyright 2017-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License.